Skip to content

Commit

Permalink
fix(api): Optimized logger performance - found missing socket reads.
Browse files Browse the repository at this point in the history
Optimized performance with ``QueueHandler``.
Found missing logs in unix domain socket logging socket reads.
  • Loading branch information
acederberg committed Dec 6, 2024
1 parent d1f1393 commit f7351c4
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 37 deletions.
22 changes: 12 additions & 10 deletions acederbergio/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import asyncio
import contextlib
import json
import logging.config
import os
from typing import Any

Expand Down Expand Up @@ -78,20 +79,19 @@ def __init__(self, context: Context, context_quarto: quarto.Context | None = Non
async def watch_logs(self):
"""This should injest the logs from the logger using a unix socket."""

# state = dict(count=0)

# NOTE: Remove the unix domain socket before startup.
async def handle_data(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
):
logger.log(0, "Data recieved by logging socket.")
data = await reader.read(2**16)
if (data_decoded := decode_jsonl(data)) is None:
return

writer.close()
await asyncio.gather(
writer.wait_closed(),
schemas.Log.push(db, mongo_id, data_decoded),
)
# NOTE: Tried reading all data in socket. Lead to failure to read
# all socket data (race condition?)
# It would appear that this does not exit until the server stops,
# and does not run every time data is pushed to the socket.
async for data in reader:
await schemas.Log.push(db, mongo_id, [json.loads(data)])

socket_path = (env.ROOT / "blog.socket").resolve()
if os.path.exists(socket_path):
Expand Down Expand Up @@ -217,7 +217,9 @@ def cmd_server(_context: typer.Context):


# NOTE: Add rich formatting to uvicorn logs.
uvicorn.config.LOGGING_CONFIG.update(env.create_uvicorn_logging_config())
LOGGING_CONFIG = env.create_uvicorn_logging_config()
uvicorn.config.LOGGING_CONFIG.update(LOGGING_CONFIG)
logging.config.dictConfig(LOGGING_CONFIG)

if __name__ == "__main__":
cli()
42 changes: 19 additions & 23 deletions acederbergio/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,6 @@ def validator(v: Any) -> Any:
ENV_IS_DEV = ENV == "development"


socket_handler = None
if ENV_IS_DEV:
socket_handler = util.SocketHandler(str(ROOT / "blog.socket"), None)
socket_handler.setFormatter(util.JSONFormatter())


def create_logger(name: str):

level = require("log_level", "INFO").upper()
logger = logging.getLogger(name)

if ENV_IS_DEV and socket_handler is not None:
logger.addHandler(socket_handler)
logger.setLevel(level)

return logger


def create_uvicorn_logging_config() -> dict[str, Any]:
"""Create the uvicorn logging config.
Expand All @@ -144,19 +126,23 @@ def create_uvicorn_logging_config() -> dict[str, Any]:
maintain two configs.
"""
handlers: dict[str, Any] = {
"default": {
"_rich": {
"class": "rich.logging.RichHandler",
"level": "INFO",
}
},
"queue": {
"class": "acederbergio.util.QueueHandler",
"handlers": ["cfg://handlers._rich"],
},
}

formatters: dict[str, Any] = {}
if ENV_IS_DEV:
formatters.update({"json": {"class": "acederbergio.util.JSONFormatter"}})

handlers["queue"]["handlers"].append("cfg://handlers._socket")
handlers.update(
{
"socket": {
"_socket": {
"class": "acederbergio.util.SocketHandler",
"level": "INFO",
"host": str(ROOT / "blog.socket"),
Expand All @@ -167,18 +153,28 @@ def create_uvicorn_logging_config() -> dict[str, Any]:
)

out = {
"version": 1,
"formatters": formatters,
"handlers": handlers,
"loggers": {
"root": {
"level": "INFO",
"handlers": ["default", "socket"] if ENV_IS_DEV else ["default"],
"handlers": ["queue"] if ENV_IS_DEV else ["rich"],
}
},
}
return out


def create_logger(name: str):

level = require("log_level", "INFO").upper()
logger = logging.getLogger(name)
logger.setLevel(level)

return logger


cli = typer.Typer(help="Environment variables tools.")


Expand Down
26 changes: 22 additions & 4 deletions acederbergio/util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import logging
import logging.handlers
import queue
from datetime import datetime
from typing import Annotated, Any, Mapping
from typing import Annotated, Any, Iterable, Mapping

import pydantic
import rich
Expand Down Expand Up @@ -141,16 +142,33 @@ def format(self, record: logging.LogRecord) -> str:
return json.dumps(line, default=str) + "\n"


class SocketHandler(logging.handlers.SocketHandler):
LOG_QUEUE = queue.Queue()


class QueueHandler(logging.handlers.QueueHandler):
"""Used to queue messages that are then handled by ``SocketHandler``."""

listener: logging.handlers.QueueListener

# def __init__(self, host=str(ROOT / "blog.socket"), port=None) -> None:
# super().__init__(host, port)
def __init__(self, handlers: list[logging.Handler]) -> None:
super().__init__(LOG_QUEUE)
# NOTE: This next instruction looks stupid, but is really magic. See
# https://rob-blackbourn.medium.com/how-to-use-python-logging-queuehandler-with-dictconfig-1e8b1284e27a
_handlers = (handlers[index] for index in range(len(handlers)))
self.listener = logging.handlers.QueueListener(LOG_QUEUE, *_handlers)
self.listener.start()


class SocketHandler(logging.handlers.SocketHandler):

def emit(self, record: logging.LogRecord):
"""Emit a record without pickling.
Ideally, the formatter is ``JSONFormatter`` from above.
"""

print("pushing")
# NOTE: Might be hitting a race condition. Sometim
try:
self.send(self.format(record).encode())
except Exception:
Expand Down

0 comments on commit f7351c4

Please sign in to comment.