Skip to content

Commit

Permalink
Various changes
Browse files Browse the repository at this point in the history
- Add validation for messages in the queue
- Add queue handlers
- Add LongJobStatus
- Add last_alive_at to the usage table
- Automatically commit the db session
  • Loading branch information
GianlucaFicarelli committed Jul 1, 2024
1 parent 996a762 commit 5aff60f
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 38 deletions.
4 changes: 2 additions & 2 deletions alembic.ini
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ sqlalchemy.url = driver://user:pass@localhost/dbname
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
hooks = ruff_format, ruff_check
ruff_format.type = exec
ruff_format.executable = %(here)s/.venv/bin/ruff
ruff_format.executable = ruff
ruff_format.options = format REVISION_SCRIPT_FILENAME
ruff_check.type = exec
ruff_check.executable = %(here)s/.venv/bin/ruff
ruff_check.executable = ruff
ruff_check.options = check --fix REVISION_SCRIPT_FILENAME

# Logging configuration
Expand Down
31 changes: 31 additions & 0 deletions alembic/versions/20240628_174304_af57b4dcaecb_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""empty message
Revision ID: af57b4dcaecb
Revises: 63a1d1c101bd
Create Date: 2024-06-28 17:43:04.226926
"""

from collections.abc import Sequence

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "af57b4dcaecb"
down_revision: str | None = "63a1d1c101bd"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("usage", sa.Column("last_alive_at", sa.DateTime(timezone=True), nullable=False))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("usage", "last_alive_at")
# ### end Alembic commands ###
18 changes: 13 additions & 5 deletions app/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@
from enum import StrEnum


class QueueMessageStatus(StrEnum):
"""Queue Message Status."""

COMPLETED = "completed"
FAILED = "failed"


class ServiceType(StrEnum):
"""Service type."""
"""Service Type."""

STORAGE = "storage"
SHORT_JOBS = "short-jobs"
LONG_JOBS = "long-jobs"


class QueueMessageStatus(StrEnum):
"""Queue message status."""
class LongJobStatus(StrEnum):
"""Long Job Status."""

COMPLETED = "completed"
FAILED = "failed"
STARTED = "started"
RUNNING = "running"
FINISHED = "finished"
1 change: 1 addition & 0 deletions app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Usage(Base):
created_at: Mapped[CREATED_AT]
updated_at: Mapped[UPDATED_AT]
started_at: Mapped[datetime]
last_alive_at: Mapped[datetime]
finished_at: Mapped[datetime | None]
properties: Mapped[dict[str, Any] | None]

Expand Down
8 changes: 7 additions & 1 deletion app/db/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ async def session(self) -> AsyncIterator[AsyncSession]:
autocommit=False,
autoflush=False,
) as session:
yield session
try:
yield session
except Exception:
await session.rollback()
raise
else:
await session.commit()


database_session_manager = DatabaseSessionManager()
48 changes: 47 additions & 1 deletion app/queue/consumer/long_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,58 @@

from sqlalchemy.ext.asyncio import AsyncSession

from app.constants import LongJobStatus
from app.queue.consumer.base import QueueConsumer
from app.queue.schemas import LongJobUsageEvent
from app.repositories.usage import UsageRepository


async def _handle_started(repo: UsageRepository, event: LongJobUsageEvent) -> None:
await repo.add_usage(
vlab_id=event.vlab_id,
proj_id=event.proj_id,
job_id=event.proj_id,
service_type=event.type,
service_subtype=event.subtype,
units=event.instances or 0,
started_at=event.timestamp,
properties={"instance_type": event.instance_type},
)


async def _handle_running(repo: UsageRepository, event: LongJobUsageEvent) -> None:
await repo.update_last_alive_at(
vlab_id=event.vlab_id,
proj_id=event.proj_id,
job_id=event.proj_id,
last_alive_at=event.timestamp,
)


async def _handle_finished(repo: UsageRepository, event: LongJobUsageEvent) -> None:
await repo.update_finished_at(
vlab_id=event.vlab_id,
proj_id=event.proj_id,
job_id=event.proj_id,
finished_at=event.timestamp,
)


class LongJobsQueueConsumer(QueueConsumer):
"""Long jobs queue consumer."""

async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None: # noqa: ARG002
async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None:
"""Consume the message."""
self.logger.info("Message received: %s", msg)
event = LongJobUsageEvent.model_validate_json(msg["Body"])
repo = UsageRepository(db=db)
match event.status:
case LongJobStatus.STARTED:
await _handle_started(repo, event)
case LongJobStatus.RUNNING:
await _handle_running(repo, event)
case LongJobStatus.FINISHED:
await _handle_finished(repo, event)
case _:
error = f"Status not handled: {event.status}"
raise ValueError(error)
16 changes: 15 additions & 1 deletion app/queue/consumer/short_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,25 @@
from sqlalchemy.ext.asyncio import AsyncSession

from app.queue.consumer.base import QueueConsumer
from app.queue.schemas import ShortJobUsageEvent
from app.repositories.usage import UsageRepository


class ShortJobsQueueConsumer(QueueConsumer):
"""Short jobs queue consumer."""

async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None: # noqa: ARG002
async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None:
"""Consume the message."""
self.logger.info("Message received: %s", msg)
event = ShortJobUsageEvent.model_validate_json(msg["Body"])
repo = UsageRepository(db=db)
await repo.add_usage(
vlab_id=event.vlab_id,
proj_id=event.proj_id,
job_id=event.proj_id,
service_type=event.type,
service_subtype=event.subtype,
units=event.count,
started_at=event.timestamp,
finished_at=event.timestamp,
)
26 changes: 9 additions & 17 deletions app/queue/consumer/storage.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""Short jobs consumer module."""

import json
from datetime import UTC, datetime
from typing import Any
from uuid import UUID

from sqlalchemy.ext.asyncio import AsyncSession

from app.queue.consumer.base import QueueConsumer
from app.queue.schemas import StorageUsageEvent
from app.repositories.usage import UsageRepository


Expand All @@ -17,20 +15,14 @@ class StorageQueueConsumer(QueueConsumer):
async def _consume(self, msg: dict[str, Any], db: AsyncSession) -> None:
"""Consume the message."""
self.logger.info("Message received: %s", msg)
body = json.loads(msg["Body"])
started_at = datetime.fromtimestamp(
float(body.get("timestamp_ms") or msg["Attributes"]["SentTimestamp"]) / 1000,
tz=UTC,
)
event = StorageUsageEvent.model_validate_json(msg["Body"])
repo = UsageRepository(db=db)
await repo.add_usage(
vlab_id=UUID(body["vlab_id"]),
proj_id=UUID(body["proj_id"]),
job_id=None,
service_type=body["type"],
service_subtype=body.get("subtype", ""),
units=int(body["units"]),
started_at=started_at,
finished_at=None,
properties=None,
vlab_id=event.vlab_id,
proj_id=event.proj_id,
job_id=event.proj_id,
service_type=event.type,
service_subtype=event.subtype,
units=event.size,
started_at=event.timestamp,
)
59 changes: 59 additions & 0 deletions app/queue/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Usage Events schemas."""

from datetime import UTC, datetime
from typing import Annotated, Literal
from uuid import UUID

from pydantic import BaseModel, BeforeValidator, Field, validate_call

from app.constants import LongJobStatus, ServiceType

# min and max just ensure that the timestamp contains a reasonable value expressed in milliseconds
MIN_TS = datetime(2024, 1, 1, tzinfo=UTC).timestamp() * 1000
MAX_TS = datetime(2100, 1, 1, tzinfo=UTC).timestamp() * 1000


@validate_call
def _convert_timestamp(value: Annotated[float, Field(gt=MIN_TS, lt=MAX_TS)]) -> datetime:
return datetime.fromtimestamp(value / 1000, tz=UTC)


TimeStamp = Annotated[datetime, BeforeValidator(_convert_timestamp)]


class StorageUsageEvent(BaseModel):
"""StorageUsageEvent."""

type: Literal[ServiceType.STORAGE]
subtype: str | None = None
vlab_id: UUID
proj_id: UUID
job_id: UUID | None
size: int
timestamp: TimeStamp


class ShortJobUsageEvent(BaseModel):
"""ShortJobUsageEvent."""

type: Literal[ServiceType.SHORT_JOBS]
subtype: str
vlab_id: UUID
proj_id: UUID
job_id: UUID | None
count: int
timestamp: TimeStamp


class LongJobUsageEvent(BaseModel):
"""LongJobUsageEvent."""

type: Literal[ServiceType.LONG_JOBS]
subtype: str
vlab_id: UUID
proj_id: UUID
job_id: UUID
status: LongJobStatus
instances: int | None = None
instance_type: str | None = None
timestamp: TimeStamp
66 changes: 57 additions & 9 deletions app/repositories/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from uuid import UUID

import sqlalchemy as sa
from sqlalchemy import and_

from app.constants import ServiceType
from app.db.models import Usage
Expand All @@ -23,31 +24,78 @@ async def add_usage(
proj_id: UUID,
job_id: UUID | None,
service_type: ServiceType,
service_subtype: str | None,
units: int,
started_at: datetime,
finished_at: datetime | None,
properties: dict | None,
service_subtype: str | None = None,
units: int = 0,
started_at: datetime | None = None,
finished_at: datetime | None = None,
properties: dict | None = None,
) -> Usage:
"""Add usage."""
query = (
sa.insert(Usage)
.values(
job_id=job_id,
vlab_id=vlab_id,
proj_id=proj_id,
job_id=job_id,
service_type=service_type,
service_subtype=service_subtype,
units=units,
started_at=started_at,
last_alive_at=started_at,
finished_at=finished_at,
properties=properties,
)
.returning(Usage)
)
result = (await self.db.execute(query)).scalar_one()
await self.db.commit()
return result
return (await self.db.execute(query)).scalar_one()

async def update_last_alive_at(
self,
vlab_id: UUID,
proj_id: UUID,
job_id: UUID,
last_alive_at: datetime,
) -> Usage:
"""Update last_alive_at."""
query = (
sa.update(Usage)
.values(
last_alive_at=last_alive_at,
)
.where(
and_(
Usage.vlab_id == vlab_id,
Usage.proj_id == proj_id,
Usage.job_id == job_id,
)
)
.returning(Usage)
)
return (await self.db.execute(query)).scalar_one()

async def update_finished_at(
self,
vlab_id: UUID,
proj_id: UUID,
job_id: UUID,
finished_at: datetime,
) -> Usage:
"""Update finished_at."""
query = (
sa.update(Usage)
.values(
finished_at=finished_at,
)
.where(
and_(
Usage.vlab_id == vlab_id,
Usage.proj_id == proj_id,
Usage.job_id == job_id,
)
)
.returning(Usage)
)
return (await self.db.execute(query)).scalar_one()

async def get_all_usages_rows(self) -> Sequence[sa.Row]:
"""Get all the usage rows as Row objects."""
Expand Down
4 changes: 2 additions & 2 deletions tests/queue/consumer/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ async def test_consume(sqs_stubber, sqs_client_factory, db):
"type": "storage",
"vlab_id": "00000000-0000-0000-0000-000000000000",
"proj_id": "00000000-0000-0000-0000-000000000001",
"units": "1073741824",
"timestamp_ms": "1719477803993",
"size": "1073741824",
"timestamp": "1719477803993",
}
_prepare_stub(sqs_stubber, queue_url, message_id, receipt_handle, message_body)

Expand Down

0 comments on commit 5aff60f

Please sign in to comment.