Skip to content

Commit

Permalink
Add queue consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
GianlucaFicarelli committed Jun 24, 2024
1 parent a09c34d commit 21cd663
Show file tree
Hide file tree
Showing 26 changed files with 892 additions and 127 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ ENV APP_NAME=${APP_NAME}
ENV APP_VERSION=${APP_VERSION}
ENV COMMIT_SHA=${COMMIT_SHA}

STOPSIGNAL SIGINT
CMD ["./docker-cmd.sh"]
16 changes: 9 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
SHELL := /bin/bash

# export variables to both build and run recipes for automatic rebuilding when pdm.lock is modified
build run: export ENVIRONMENT ?= dev
build run: export APP_NAME := accounting-service
build run: export APP_VERSION := $(shell git describe --abbrev --dirty --always --tags)
build run: export COMMIT_SHA := $(shell git rev-parse HEAD)

help: ## Show this help
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-23s\033[0m %s\n", $$1, $$2}'

Expand All @@ -24,10 +30,6 @@ lint: ## Run linters
pdm run ruff check
pdm run mypy app

build: export ENVIRONMENT ?= dev
build: export APP_NAME := accounting-service
build: export APP_VERSION := $(shell git describe --abbrev --dirty --always --tags)
build: export COMMIT_SHA := $(shell git rev-parse HEAD)
build: ## Build the docker images
docker compose --progress=plain build

Expand All @@ -39,15 +41,15 @@ kill: ## Take down the application

.PHONY: tests
tests: build ## Run tests in the app container
docker compose run app sh -c "\
docker compose run --rm app sh -c "\
alembic downgrade base && alembic upgrade head && \
python -m pytest -vv --cov=app tests && \
python -m coverage xml && \
python -m coverage html"

migration: build ## Create the alembic migration
docker compose run app sh -c "\
docker compose run --rm app sh -c "\
alembic upgrade head && alembic revision --autogenerate"

sh: build ## Run a shell in the app container
docker compose run --rm --service-ports app bash
docker compose run --rm app bash
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ The format of the tag should be `YYYY.MM.DD`, where:

## Documentation

The API documentation is available locally at <https://127.0.0.1:8100/docs> after the local deployment.
The API documentation is available locally at <http://127.0.0.1:8100/docs> after the local deployment.
61 changes: 61 additions & 0 deletions app/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Main entry point."""

import asyncio
import contextlib

import uvicorn
import uvloop

from app import logger # noqa # ensure that logging is configured
from app.config import settings
from app.db.session import database_session_manager
from app.queue.consumer.long_jobs import LongJobsQueueConsumer
from app.queue.consumer.short_jobs import ShortJobsQueueConsumer
from app.queue.consumer.storage import StorageQueueConsumer


async def main():
"""Init and run all the async tasks."""
database_session_manager.initialize(
url=settings.DB_URI,
pool_size=settings.DB_POOL_SIZE,
pool_pre_ping=settings.DB_POOL_PRE_PING,
max_overflow=settings.DB_MAX_OVERFLOW,
)
server = uvicorn.Server(
uvicorn.Config(
"app.application:app",
host="0.0.0.0",
port=settings.UVICORN_PORT,
proxy_headers=True,
log_config=settings.LOGGING_CONFIG,
)
)
storage_queue_consumer = StorageQueueConsumer(
endpoint_url=settings.SQS_ENDPOINT_URL,
queue_name=settings.SQS_QUEUES["storage"],
initial_delay=1,
)
short_jobs_queue_consumer = ShortJobsQueueConsumer(
endpoint_url=settings.SQS_ENDPOINT_URL,
queue_name=settings.SQS_QUEUES["short-jobs"],
initial_delay=2,
)
long_jobs_queue_consumer = LongJobsQueueConsumer(
endpoint_url=settings.SQS_ENDPOINT_URL,
queue_name=settings.SQS_QUEUES["long-jobs"],
initial_delay=3,
)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(storage_queue_consumer.run_forever())
tg.create_task(short_jobs_queue_consumer.run_forever())
tg.create_task(long_jobs_queue_consumer.run_forever())
tg.create_task(server.serve())
finally:
await database_session_manager.close()


if __name__ == "__main__":
with contextlib.suppress(KeyboardInterrupt):
uvloop.run(main())
22 changes: 9 additions & 13 deletions app/application.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""API entry points."""

import asyncio
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
Expand All @@ -11,34 +12,29 @@
from starlette.responses import JSONResponse

from app.config import settings
from app.db.session import database_session_factory
from app.endpoints import router
from app.errors import ApiError
from app.logger import L
from app.logger import get_logger

L = get_logger(__name__)


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[dict[str, Any]]:
"""Execute actions on server startup and shutdown."""
L.info(
"Starting application [ENVIRONMENT=%s, pid=%s, cpu_count=%s]",
settings.ENVIRONMENT,
"Starting application [PID=%s, CPU_COUNT=%s, ENVIRONMENT=%s]",
os.getpid(),
os.cpu_count(),
)
await database_session_factory.initialize(
url=settings.DB_URI,
echo=settings.DB_ECHO,
echo_pool=settings.DB_ECHO_POOL,
pool_size=settings.DB_POOL_SIZE,
pool_pre_ping=settings.DB_POOL_PRE_PING,
max_overflow=settings.DB_MAX_OVERFLOW,
settings.ENVIRONMENT,
)
try:
yield {}
except asyncio.CancelledError as err:
# this can happen if the task is cancelled without sending SIGINT
L.info("Ignored %r in lifespan", err)
finally:
L.info("Stopping application")
await database_session_factory.close()


async def api_error_handler(_: Request, exc: ApiError) -> JSONResponse:
Expand Down
15 changes: 11 additions & 4 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@ class Settings(BaseSettings):
APP_DEBUG: bool = False
COMMIT_SHA: str | None = None

UVICORN_PORT: int = 8000

CORS_ORIGINS: list[str] = ["*"]
LOGGING_CONFIG: str = "app/data/logging.yaml"
LOGGING_LEVEL: str | None = None

SQS_ENDPOINT_URL: str = "http://queue:9324"
SQS_QUEUES: dict[str, str] = {
"storage": "storage.fifo",
"short-jobs": "short-jobs.fifo",
"long-jobs": "long-jobs.fifo",
}
SQS_CLIENT_ERROR_SLEEP: float = 10

DB_ENGINE: str = "postgresql+asyncpg"
DB_USER: str = "accounting_service"
Expand All @@ -30,9 +39,7 @@ class Settings(BaseSettings):
DB_PORT: int = 5432
DB_NAME: str = "accounting_service"

DB_ECHO: bool = False
DB_ECHO_POOL: bool = False
DB_POOL_SIZE: int = 5
DB_POOL_SIZE: int = 30
DB_POOL_PRE_PING: bool = True
DB_MAX_OVERFLOW: int = 0

Expand Down
12 changes: 12 additions & 0 deletions app/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from enum import StrEnum


class ServiceType(StrEnum):
STORAGE = "storage"
SHORT_JOBS = "short-jobs"
LONG_JOBS = "long-jobs"


class QueueMessageStatus(StrEnum):
COMPLETED = "completed"
FAILED = "failed"
10 changes: 7 additions & 3 deletions app/data/logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ formatters:
handlers:
console:
class: logging.StreamHandler
level: INFO
level: DEBUG
formatter: simple
stream: ext://sys.stderr
loggers:
app:
level: DEBUG
uvicorn:
level: INFO
handlers: [console]
propagate: no
sqlalchemy.engine:
level: INFO
sqlalchemy.pool:
level: INFO
root:
level: INFO
handlers: [console]
67 changes: 62 additions & 5 deletions app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,91 @@
import uuid
from datetime import datetime
from decimal import Decimal
from typing import Annotated, Any

from sqlalchemy import String, func
from sqlalchemy import BigInteger, DateTime, Identity, SmallInteger, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

from app.constants import QueueMessageStatus, ServiceType

CREATED_AT = Annotated[
datetime,
mapped_column(DateTime(timezone=True), server_default=func.now(), index=True),
]
UPDATED_AT = Annotated[
datetime,
mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()),
]


class Base(DeclarativeBase):
"""Base class."""

type_annotation_map = {
datetime: DateTime(timezone=True),
dict[str, Any]: JSONB,
}


class QueueMessage(Base):
"""Queue messages table."""

__tablename__ = "queue_message"

message_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
queue_name: Mapped[str]
status: Mapped[QueueMessageStatus]
attributes: Mapped[dict[str, Any]]
body: Mapped[str | None]
error: Mapped[str | None]
counter: Mapped[int] = mapped_column(SmallInteger)
created_at: Mapped[CREATED_AT]
updated_at: Mapped[UPDATED_AT]


class Usage(Base):
"""Usage table."""

__tablename__ = "usage"

id: Mapped[int] = mapped_column(BigInteger, Identity(), primary_key=True)
vlab_id: Mapped[uuid.UUID] = mapped_column(index=True)
proj_id: Mapped[uuid.UUID] = mapped_column(index=True)
job_id: Mapped[uuid.UUID | None] = mapped_column(index=True)
service_type: Mapped[ServiceType]
service_subtype: Mapped[str | None]
units: Mapped[int] = mapped_column(BigInteger)
created_at: Mapped[CREATED_AT]
updated_at: Mapped[UPDATED_AT]
started_at: Mapped[datetime]
finished_at: Mapped[datetime | None]
properties: Mapped[dict[str, Any] | None]


class Transactions(Base):
"""Transactions table."""

# TODO: wip

__tablename__ = "transactions"

id: Mapped[int] = mapped_column(primary_key=True)
vlab_id: Mapped[uuid.UUID] = mapped_column(index=True)
proj_id: Mapped[uuid.UUID] = mapped_column(index=True)
amount: Mapped[Decimal] = mapped_column()
amount: Mapped[Decimal]


class VlabTopup(Base):
"""VlabTopup table."""

# TODO: wip

__tablename__ = "vlab_topup"

id: Mapped[int] = mapped_column(primary_key=True)
vlab_id: Mapped[uuid.UUID] = mapped_column(index=True)
proj_id: Mapped[uuid.UUID] = mapped_column(index=True)
amount: Mapped[Decimal] = mapped_column()
created_at: Mapped[datetime] = mapped_column(default=func.now())
stripe_event_id: Mapped[str] = mapped_column(String(30))
amount: Mapped[Decimal]
created_at: Mapped[CREATED_AT]
stripe_event_id: Mapped[str]
30 changes: 14 additions & 16 deletions app/db/session.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
"""Database session utils."""

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine

from app.logger import L
from app.logger import get_logger

L = get_logger(__name__)

class DatabaseSessionFactory:
"""DatabaseSessionFactory."""

class DatabaseSessionManager:
"""DatabaseSessionManager."""

def __init__(self) -> None:
"""Init the factory."""
"""Init the manager."""
self._engine: AsyncEngine | None = None

async def initialize(self, url: str, **kwargs) -> None:
def initialize(self, url: str, **kwargs) -> None:
"""Initialize the database engine."""
if self._engine:
err = "DB engine already initialized"
Expand All @@ -31,24 +34,19 @@ async def close(self) -> None:
self._engine = None
L.info("DB engine has been closed")

async def __call__(self) -> AsyncIterator[AsyncSession]:
"""Return a new database session."""
@asynccontextmanager
async def session(self) -> AsyncIterator[AsyncSession]:
"""Yield a new database session."""
if not self._engine:
err = "DB engine not initialized"
raise RuntimeError(err)
session = AsyncSession(
async with AsyncSession(
self._engine,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
try:
) as session:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()


database_session_factory = DatabaseSessionFactory()
database_session_manager = DatabaseSessionManager()
Loading

0 comments on commit 21cd663

Please sign in to comment.