Skip to content

Commit

Permalink
feature(scripts): Putting Live Logs in Browser.
Browse files Browse the repository at this point in the history
Server now uses ``fastapi``.
It is a pain using logs in the terminal.
Logs are pushed to a unix socket, into ``mongodb``, and then can be read
using the ``fastapi`` app.
Moved terminal theme to its own ``scss``.
Added volume to overwrite ``./docker/.data`` in dev server container.
Added ``fastapi`` and ``uvicorn`` as dependencies.
  • Loading branch information
acederberg committed Dec 2, 2024
1 parent 70bc367 commit 484fe26
Show file tree
Hide file tree
Showing 18 changed files with 754 additions and 81 deletions.
12 changes: 8 additions & 4 deletions acederbergio/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
"""

import json
from typing import Annotated, Any
from typing import Annotated, Any, Type

import bson
import pydantic
import rich
import typer
import yaml_settings_pydantic as ysp
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import MongoClient
from pymongo.server_api import ServerApi

Expand Down Expand Up @@ -45,9 +46,9 @@ def check_object_id(value) -> str | None:
FlagDatabase = Annotated[str, pydantic.Field(DATABASE)]


def create_client(*, _mongodb_url: str | None = None):
def create_client(*, _mongodb_url: str | None = None, cls: Type = MongoClient):
mongodb_url = env.require("mongodb_url", _mongodb_url)
return MongoClient(mongodb_url, server_api=ServerApi("1"))
return cls(mongodb_url, server_api=ServerApi("1"))


class Config(ysp.BaseYamlSettings):
Expand All @@ -60,9 +61,12 @@ class Config(ysp.BaseYamlSettings):
database: FlagDatabase
url: FlagURL

def create_client(self):
def create_client(self) -> MongoClient:
return create_client(_mongodb_url=self.url)

def create_client_async(self) -> AsyncIOMotorClient:
return create_client(_mongodb_url=self.url, cls=AsyncIOMotorClient)

# NOTE: While CLI flags can be use with ``cli_parse_args``, it does not look
# too good with ``typer``. It would be a great deal of work for something
# that is only somewhat useful.
Expand Down
249 changes: 215 additions & 34 deletions acederbergio/dev.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Scripts for development.
This includes a custom watcher because I do not like the workflow I am forced
into be ``quarto preview``. A few problems I aim to solve here are:
Expand All @@ -10,30 +8,33 @@
3. Putting debug messages.
"""

import asyncio
import contextlib
import http.server as http_server
import json
import os
import pathlib
import re
import shutil
import socket
import subprocess
import threading
import time
from datetime import datetime
from typing import Annotated, Iterable
from typing import Annotated, Any, Iterable

import fastapi
import fastapi.staticfiles
import pydantic
import rich
import rich.table
import typer
import uvicorn
import uvicorn.config
import yaml_settings_pydantic as ysp
from typing_extensions import Doc, Self
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer

from acederbergio import env, util
from acederbergio import db, env, util

logger = env.create_logger(__name__)

Expand Down Expand Up @@ -189,6 +190,7 @@ class ConfigWatch(pydantic.BaseModel):
env.BLOG / "site_libs",
env.ROOT / ".git",
env.ROOT / ".venv",
env.ROOT / "docker",
env.BLOG / "resume/test.tex",
env.BLOG / "resume/index.tex",
}
Expand Down Expand Up @@ -532,42 +534,204 @@ def on_modified(self, event: FileSystemEvent) -> None:
# shutil.copy(path, dest)


class DualStackServer(http_server.ThreadingHTTPServer):
# =========================================================================== #
# App Definition and Background Tasks.


def decode_jsonl(data_raw: bytes) -> list[dict[str, Any]] | None:
"""Parse JSON lines data into a list of datas.
Data should be written to socket with newline endings.
The socket should contain ``JSON`` lines formatted data.
"""

if data_raw is None:
return None

def server_bind(self):
with contextlib.suppress(Exception):
self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
try:
data = [json.loads(item) for item in data_raw.split(b"\n") if item]
except json.JSONDecodeError as err:
print("`log_reciever` failed to decode invalid ``JSON`` content.")
print(data_raw)
print(err)
return None

return data


# NOTE: This will allow records to be dynamically handled. Using a database
# handler directly would require a factory for logging, which is not a
# good fit with current patterns.
async def watch_logs(config: db.Config):
"""This should injest the logs from the logger using a unix socket"""

# NOTE: Remove the unix domain socket before startup.
async def handle_data(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
logger.log(0, "Data recieved by logging socket.")
data = await reader.read(1024)
if (data_decoded := decode_jsonl(data)) is None:
return

def finish_request(self, request, client_address):
self.RequestHandlerClass(
request,
client_address,
self,
directory=str(env.BUILD), # type: ignore
writer.close()
await asyncio.gather(
writer.wait_closed(),
collection.update_one(
{"_id": object_id},
{"$push": {"items": {"$each": data_decoded}}},
),
)

socket_path = (env.ROOT / "blog.socket").resolve()
if os.path.exists(socket_path):
os.remove(socket_path)

logger.debug("Initializing logging mongodb document.")
client = config.create_client_async()
collection = client[config.database]["logs"]
res = await collection.insert_one(
{
"timestamp": datetime.timestamp(datetime.now()),
"items": [],
}
)
object_id = res.inserted_id

logger.info("Starting logging socket server.")
server = await asyncio.start_unix_server(handle_data, path=str(socket_path))
async with server:
logger.info("Server listening at `%s`...", socket_path)
await server.serve_forever()


def watch(context: Context):
async def watch_quarto(context: Context):
event_handler = BlogHandler(context)
observer = Observer()
observer.schedule(event_handler, env.ROOT, recursive=True) # type: ignore
observer.start()
try:
while True:
time.sleep(100)
# print("Watching.")
await asyncio.sleep(1)
finally:
observer.stop()
observer.join()


def serve():
http_server.test( # type: ignore
HandlerClass=http_server.SimpleHTTPRequestHandler,
ServerClass=DualStackServer,
port=3000,
bind="0.0.0.0",
)
def create_app(
*,
context: Context | None = None,
config: db.Config | None = None,
) -> fastapi.FastAPI:

context = context or Context(config=Config()) # type: ignore
config = config or db.Config() # type: ignore
client = config.create_client()

@contextlib.asynccontextmanager
async def lifespan(_: fastapi.FastAPI):

task_handle_log = asyncio.create_task(watch_logs(config))
task_watch = asyncio.create_task(watch_quarto(context))

yield

task_handle_log.cancel("stop")
task_watch.cancel("it")

collection = client[config.database]["logs"]
app = fastapi.FastAPI(lifespan=lifespan)

@app.get("/dev/log")
def get_log(
slice_start: int | None = None,
slice_count: int | None = None,
) -> dict[str, Any]:
"""Look for the most recent log data."""

steps = [
{"$sort": {"timestamp": -1}},
{"$limit": 1},
{"$addFields": {"count": {"$size": "$items"}}},
]

# NOTE: Add slice counting to steps.
if slice_count is not None:
if slice_start is None:
slice = {"$slice": ["$items", slice_count]}
else:
slice = {"$slice": ["$items", slice_start, slice_count]}

projection = {
"_id": "$_id",
"count": "$count",
"timestamp": "$timestamp",
"items": slice,
}
steps.insert(-1, {"$project": projection})
elif slice_start is not None:
raise fastapi.HTTPException(
422,
detail={
"msg": "`slice_start` may only be specified when `slice_count` is specified."
},
)

res = collection.aggregate(steps)
data = res.next()
if data is None:
raise fastapi.HTTPException(204, detail={"msg": "No log data found."})

return {"count": data["count"], "items": data["items"], "_id": str(data["_id"])}

@app.websocket("/dev/log")
async def watch_log(websocket: fastapi.WebSocket):

log = get_log()
await websocket.send(log)

_id = log["_id"]
count = len(log["items"])

while True:
res = collection.find_one(
{"_id": _id},
{"items": {"$slice": [count - 1, -1]}},
)
if res is None:
continue

new = res["items"]
yield new
count += len(new)

# NOTE: It would appear all other routes must be attched prior to this mount.
app.mount("", fastapi.staticfiles.StaticFiles(directory=env.BUILD))

return app


def serve(context: Context, config: db.Config, **kwargs):
"""
This tends to produce an error in the logs when
`WATCHFILES_IGNORE_PERMISSION_DENIED=0` is not set.
"""

# kwargs["reload_dirs"] = [env.SCRIPTS]
# kwargs["reload_excludes"] = [env.ROOT / "docker"]
if not kwargs.get("host"):
kwargs["host"] = "0.0.0.0"
if not kwargs.get("port"):
kwargs["port"] = 3000

if kwargs.get("reload"):
kwargs["factory"] = True
logger.warning("Ignoring context from command line.")
uvicorn.run(f"{__name__}:create_app", **kwargs)
else:
kwargs["factory"] = False
app = create_app(context=context, config=config)
uvicorn.run(app, **kwargs)


# =========================================================================== #
Expand Down Expand Up @@ -637,21 +801,32 @@ def callback(
cli.add_typer(cli_context, name="context")


@cli.command("render")
def cmd_watch(_context: typer.Context):
"""Watch for changes and trigger rerenders."""
context: Context = _context.obj

watch(context)
# @cli.command("render")
# def cmd_watch(_context: typer.Context):
# """Watch for changes and trigger rerenders."""
# context: Context = _context.obj
#
# watch(context)
#
#
# @cli.command("fastapi")
# def cmd_fastapi():
#
# config = db.Config() # type: ignore
# serve(config, reload=True)


@cli.command("server")
def cmd_server(_context: typer.Context):
"""Run the development server and watch for changes."""
context: Context = _context.obj
config = db.Config() # type: ignore

# threading.Thread(target=watch, args=(context,), daemon=True).start()
# threading.Thread(target=log_reciever, args=(config,), daemon=True).start()
# serve(config, reload=True)

threading.Thread(target=serve).start()
threading.Thread(target=lambda: watch(context)).start()
serve(context, config, reload=True)


@cli_context.command("show")
Expand Down Expand Up @@ -714,5 +889,11 @@ def add_paths(paths: Iterable[pathlib.Path], depth=0, rows=0) -> int:
rich.print(t)


# TODO: Add rich formatting to uvicorn logs.
# uvicorn.config.LOGGING_CONFIG .update( {
# "handlers": {
# }
# }

if __name__ == "__main__":
cli()
Loading

0 comments on commit 484fe26

Please sign in to comment.