diff --git a/alembic.ini b/alembic.ini index 20bf1e9..82586e4 100644 --- a/alembic.ini +++ b/alembic.ini @@ -70,10 +70,10 @@ sqlalchemy.url = driver://user:pass@localhost/dbname # lint with attempts to fix using "ruff" - use the exec runner, execute a binary hooks = ruff_format, ruff_check ruff_format.type = exec -ruff_format.executable = %(here)s/.venv/bin/ruff +ruff_format.executable = ruff ruff_format.options = format REVISION_SCRIPT_FILENAME ruff_check.type = exec -ruff_check.executable = %(here)s/.venv/bin/ruff +ruff_check.executable = ruff ruff_check.options = check --fix REVISION_SCRIPT_FILENAME # Logging configuration diff --git a/alembic/versions/20240628_174304_af57b4dcaecb_.py b/alembic/versions/20240628_174304_af57b4dcaecb_.py new file mode 100644 index 0000000..81207f5 --- /dev/null +++ b/alembic/versions/20240628_174304_af57b4dcaecb_.py @@ -0,0 +1,31 @@ +"""empty message + +Revision ID: af57b4dcaecb +Revises: 63a1d1c101bd +Create Date: 2024-06-28 17:43:04.226926 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "af57b4dcaecb" +down_revision: str | None = "63a1d1c101bd" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.add_column("usage", sa.Column("last_alive_at", sa.DateTime(timezone=True), nullable=False)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("usage", "last_alive_at") + # ### end Alembic commands ### diff --git a/app/constants.py b/app/constants.py index 8210791..aa4eaa0 100644 --- a/app/constants.py +++ b/app/constants.py @@ -3,16 +3,24 @@ from enum import StrEnum +class QueueMessageStatus(StrEnum): + """Queue Message Status.""" + + COMPLETED = "completed" + FAILED = "failed" + + class ServiceType(StrEnum): - """Service type.""" + """Service Type.""" STORAGE = "storage" SHORT_JOBS = "short-jobs" LONG_JOBS = "long-jobs" -class QueueMessageStatus(StrEnum): - """Queue message status.""" +class LongJobStatus(StrEnum): + """Long Job Status.""" - COMPLETED = "completed" - FAILED = "failed" + STARTED = "started" + RUNNING = "running" + FINISHED = "finished" diff --git a/app/db/models.py b/app/db/models.py index 9ae6c58..38e32b9 100644 --- a/app/db/models.py +++ b/app/db/models.py @@ -61,6 +61,7 @@ class Usage(Base): created_at: Mapped[CREATED_AT] updated_at: Mapped[UPDATED_AT] started_at: Mapped[datetime] + last_alive_at: Mapped[datetime] finished_at: Mapped[datetime | None] properties: Mapped[dict[str, Any] | None] diff --git a/app/db/session.py b/app/db/session.py index 20e0d5e..db584c7 100644 --- a/app/db/session.py +++ b/app/db/session.py @@ -46,7 +46,13 @@ async def session(self) -> AsyncIterator[AsyncSession]: autocommit=False, autoflush=False, ) as session: - yield session + try: + yield session + except Exception: + await session.rollback() + raise + else: + await session.commit() database_session_manager = DatabaseSessionManager() diff --git a/app/queue/consumer/long_jobs.py b/app/queue/consumer/long_jobs.py index 025f522..d6dc051 100644 --- a/app/queue/consumer/long_jobs.py +++ b/app/queue/consumer/long_jobs.py @@ -4,12 +4,58 @@ from sqlalchemy.ext.asyncio import AsyncSession +from app.constants import LongJobStatus from app.queue.consumer.base import QueueConsumer +from app.queue.schemas import LongJobUsageEvent +from app.repositories.usage import UsageRepository + + +async def _handle_started(repo: UsageRepository, event: LongJobUsageEvent) -> None: + await repo.add_usage( + vlab_id=event.vlab_id, + proj_id=event.proj_id, + job_id=event.proj_id, + service_type=event.type, + service_subtype=event.subtype, + units=event.instances or 0, + started_at=event.timestamp, + properties={"instance_type": event.instance_type}, + ) + + +async def _handle_running(repo: UsageRepository, event: LongJobUsageEvent) -> None: + await repo.update_last_alive_at( + vlab_id=event.vlab_id, + proj_id=event.proj_id, + job_id=event.proj_id, + last_alive_at=event.timestamp, + ) + + +async def _handle_finished(repo: UsageRepository, event: LongJobUsageEvent) -> None: + await repo.update_finished_at( + vlab_id=event.vlab_id, + proj_id=event.proj_id, + job_id=event.proj_id, + finished_at=event.timestamp, + ) class LongJobsQueueConsumer(QueueConsumer): """Long jobs queue consumer.""" - async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None: # noqa: ARG002 + async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None: """Consume the message.""" self.logger.info("Message received: %s", msg) + event = LongJobUsageEvent.model_validate_json(msg["Body"]) + repo = UsageRepository(db=db) + match event.status: + case LongJobStatus.STARTED: + await _handle_started(repo, event) + case LongJobStatus.RUNNING: + await _handle_running(repo, event) + case LongJobStatus.FINISHED: + await _handle_finished(repo, event) + case _: + error = f"Status not handled: {event.status}" + raise ValueError(error) diff --git a/app/queue/consumer/short_jobs.py b/app/queue/consumer/short_jobs.py index 799dcfb..2e445cf 100644 --- a/app/queue/consumer/short_jobs.py +++ b/app/queue/consumer/short_jobs.py @@ -5,11 +5,25 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.queue.consumer.base import QueueConsumer +from app.queue.schemas import ShortJobUsageEvent +from app.repositories.usage import UsageRepository class ShortJobsQueueConsumer(QueueConsumer): """Short jobs queue consumer.""" - async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None: # noqa: ARG002 + async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None: """Consume the message.""" self.logger.info("Message received: %s", msg) + event = ShortJobUsageEvent.model_validate_json(msg["Body"]) + repo = UsageRepository(db=db) + await repo.add_usage( + vlab_id=event.vlab_id, + proj_id=event.proj_id, + job_id=event.proj_id, + service_type=event.type, + service_subtype=event.subtype, + units=event.count, + started_at=event.timestamp, + finished_at=event.timestamp, + ) diff --git a/app/queue/consumer/storage.py b/app/queue/consumer/storage.py index 1830738..c6b5f26 100644 --- a/app/queue/consumer/storage.py +++ b/app/queue/consumer/storage.py @@ -1,13 +1,11 @@ """Short jobs consumer module.""" -import json -from datetime import UTC, datetime from typing import Any -from uuid import UUID from sqlalchemy.ext.asyncio import AsyncSession from app.queue.consumer.base import QueueConsumer +from app.queue.schemas import StorageUsageEvent from app.repositories.usage import UsageRepository @@ -17,20 +15,14 @@ class StorageQueueConsumer(QueueConsumer): async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None: """Consume the message.""" self.logger.info("Message received: %s", msg) - body = json.loads(msg["Body"]) - started_at = datetime.fromtimestamp( - float(body.get("timestamp_ms") or msg["Attributes"]["SentTimestamp"]) / 1000, - tz=UTC, - ) + event = StorageUsageEvent.model_validate_json(msg["Body"]) repo = UsageRepository(db=db) await repo.add_usage( - vlab_id=UUID(body["vlab_id"]), - proj_id=UUID(body["proj_id"]), - job_id=None, - service_type=body["type"], - service_subtype=body.get("subtype", ""), - units=int(body["units"]), - started_at=started_at, - finished_at=None, - properties=None, + vlab_id=event.vlab_id, + proj_id=event.proj_id, + job_id=event.proj_id, + service_type=event.type, + service_subtype=event.subtype, + units=event.size, + started_at=event.timestamp, ) diff --git a/app/queue/schemas.py b/app/queue/schemas.py new file mode 100644 index 0000000..06bc848 --- /dev/null +++ b/app/queue/schemas.py @@ -0,0 +1,59 @@ +"""Usage Events schemas.""" + +from datetime import UTC, datetime +from typing import Annotated, Literal +from uuid import UUID + +from pydantic import BaseModel, BeforeValidator, Field, validate_call + +from app.constants import LongJobStatus, ServiceType + +# min and max just ensure that the timestamp contains a reasonable value expressed in milliseconds +MIN_TS = datetime(2024, 1, 1, tzinfo=UTC).timestamp() * 1000 +MAX_TS = datetime(2100, 1, 1, tzinfo=UTC).timestamp() * 1000 + + +@validate_call +def _convert_timestamp(value: Annotated[float, Field(gt=MIN_TS, lt=MAX_TS)]) -> datetime: + return datetime.fromtimestamp(value / 1000, tz=UTC) + + +TimeStamp = Annotated[datetime, BeforeValidator(_convert_timestamp)] + + +class StorageUsageEvent(BaseModel): + """StorageUsageEvent.""" + + type: Literal[ServiceType.STORAGE] + subtype: str | None = None + vlab_id: UUID + proj_id: UUID + job_id: UUID | None + size: int + timestamp: TimeStamp + + +class ShortJobUsageEvent(BaseModel): + """ShortJobUsageEvent.""" + + type: Literal[ServiceType.SHORT_JOBS] + subtype: str + vlab_id: UUID + proj_id: UUID + job_id: UUID | None + count: int + timestamp: TimeStamp + + +class LongJobUsageEvent(BaseModel): + """LongJobUsageEvent.""" + + type: Literal[ServiceType.LONG_JOBS] + subtype: str + vlab_id: UUID + proj_id: UUID + job_id: UUID + status: LongJobStatus + instances: int | None = None + instance_type: str | None = None + timestamp: TimeStamp diff --git a/app/repositories/usage.py b/app/repositories/usage.py index 935c3e2..5821fd9 100644 --- a/app/repositories/usage.py +++ b/app/repositories/usage.py @@ -5,6 +5,7 @@ from uuid import UUID import sqlalchemy as sa +from sqlalchemy import and_ from app.constants import ServiceType from app.db.models import Usage @@ -23,31 +24,78 @@ async def add_usage( proj_id: UUID, job_id: UUID | None, service_type: ServiceType, - service_subtype: str | None, - units: int, - started_at: datetime, - finished_at: datetime | None, - properties: dict | None, + service_subtype: str | None = None, + units: int = 0, + started_at: datetime | None = None, + finished_at: datetime | None = None, + properties: dict | None = None, ) -> Usage: """Add usage.""" query = ( sa.insert(Usage) .values( - job_id=job_id, vlab_id=vlab_id, proj_id=proj_id, + job_id=job_id, service_type=service_type, service_subtype=service_subtype, units=units, started_at=started_at, + last_alive_at=started_at, finished_at=finished_at, properties=properties, ) .returning(Usage) ) - result = (await self.db.execute(query)).scalar_one() - await self.db.commit() - return result + return (await self.db.execute(query)).scalar_one() + + async def update_last_alive_at( + self, + vlab_id: UUID, + proj_id: UUID, + job_id: UUID, + last_alive_at: datetime, + ) -> Usage: + """Update last_alive_at.""" + query = ( + sa.update(Usage) + .values( + last_alive_at=last_alive_at, + ) + .where( + and_( + Usage.vlab_id == vlab_id, + Usage.proj_id == proj_id, + Usage.job_id == job_id, + ) + ) + .returning(Usage) + ) + return (await self.db.execute(query)).scalar_one() + + async def update_finished_at( + self, + vlab_id: UUID, + proj_id: UUID, + job_id: UUID, + finished_at: datetime, + ) -> Usage: + """Update finished_at.""" + query = ( + sa.update(Usage) + .values( + finished_at=finished_at, + ) + .where( + and_( + Usage.vlab_id == vlab_id, + Usage.proj_id == proj_id, + Usage.job_id == job_id, + ) + ) + .returning(Usage) + ) + return (await self.db.execute(query)).scalar_one() async def get_all_usages_rows(self) -> Sequence[sa.Row]: """Get all the usage rows as Row objects.""" diff --git a/tests/queue/consumer/test_storage.py b/tests/queue/consumer/test_storage.py index c9e335f..93cfa5a 100644 --- a/tests/queue/consumer/test_storage.py +++ b/tests/queue/consumer/test_storage.py @@ -75,8 +75,8 @@ async def test_consume(sqs_stubber, sqs_client_factory, db): "type": "storage", "vlab_id": "00000000-0000-0000-0000-000000000000", "proj_id": "00000000-0000-0000-0000-000000000001", - "units": "1073741824", - "timestamp_ms": "1719477803993", + "size": "1073741824", + "timestamp": "1719477803993", } _prepare_stub(sqs_stubber, queue_url, message_id, receipt_handle, message_body)