Skip to content

Commit

Permalink
feature(scripts): Expanded Upon Plan for Dev Mode.
Browse files Browse the repository at this point in the history
Addressed new mypy, ruff, and pytest errors.
  • Loading branch information
acederberg committed Jan 30, 2025
1 parent 5a901aa commit 67d72b4
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 51 deletions.
49 changes: 33 additions & 16 deletions acederbergio/api/quarto.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@
import pathlib
import subprocess
import time
from typing import (Annotated, Any, AsyncGenerator, Awaitable, Callable,
ClassVar, Iterable, Iterator, Optional)
from typing import (
Annotated,
Any,
AsyncGenerator,
ClassVar,
Iterable,
Iterator,
Optional,
)

import bson
import motor
Expand Down Expand Up @@ -565,14 +572,13 @@ async def render_qmd(
*(config := self.config).flags,
]
if not self.config.render:
data = schemas.QuartoRenderJob(
job = schemas.QuartoRenderJob(
origin=str(origin), # type: ignore
target=str(path),
command=command,
item_from=self._from,
kind="direct" if path == origin else "defered",
)
return schemas.QuartoHandlerResult(data=data)
return schemas.QuartoHandlerResult(data=job)
else:
process = await asyncio.create_subprocess_shell(
" ".join(command),
Expand Down Expand Up @@ -677,14 +683,13 @@ async def do_static(self, path: pathlib.Path) -> schemas.QuartoHandlerResult | N
command = ["cp", str(path), str(path_dest)]

if not self.config.render:
data = schemas.QuartoRenderJob( # type: ignore
command=command,
job = schemas.QuartoRenderJob( # type: ignore
item_from=self._from,
kind="static",
origin=str(path),
target=str(path),
)
return schemas.QuartoHandlerResult(data=data)
return schemas.QuartoHandlerResult(data=job)
else:
logger.info("Copying `%s` to `%s`.", path, path_dest)
process = await asyncio.create_subprocess_shell(
Expand Down Expand Up @@ -794,11 +799,20 @@ async def render(
def resolve(data: schemas.QuartoHandlerAny | None) -> schemas.QuartoHandlerAny:
return data if data is not None else schemas.QuartoHandlerRequest(data=item)

def do_break(data: schemas.QuartoHandlerAny):
if data.kind == "request":
return False

if data.data.status_code and render_data.exit_on_failure: # type: ignore
return True

return False

# TODO: Could be dryer.
data: schemas.QuartoHandlerAny | None
data: schemas.QuartoHandlerAny
for item in render_data.items:
if item.kind == "file":
yield resolve(await self(item.path))
yield (data := resolve(await self(item.path)))
# if data is None:
# yield
# ignored.append(item)
Expand All @@ -808,8 +822,8 @@ def resolve(data: schemas.QuartoHandlerAny | None) -> schemas.QuartoHandlerAny:
# if callback:
# await callback(data, item)

# if data.status_code and render_data.exit_on_failure:
# break
if do_break(data):
break
else:
# NOTE: When render request items are emitted, then an item
# falied to render.
Expand All @@ -818,17 +832,17 @@ def resolve(data: schemas.QuartoHandlerAny | None) -> schemas.QuartoHandlerAny:
depth_max=item.directory_depth_max,
)
async for data in iter_directory:
yield resolve(data)
yield (data := resolve(data))

# if isinstance(data, schemas.QuartoRenderRequestItem):
# ignored.append(data)
# continue
# items.append(data)
# if callback:
# await callback(data, item)
#
# if data.status_code and render_data.exit_on_failure:
# break

if do_break(data):
break

# NOTE: Directory items
# return schemas.QuartoRenderResponse[schemas.QuartoRender](
Expand Down Expand Up @@ -1082,6 +1096,9 @@ async def callback(
item: schemas.QuartoHandlerResult,
):
data = item.data
if data.kind_handler_result == "request":
return

if not data.status_code and not include_success:
rich.print(f"[green]Successfully rendered `{data.target}`!")
return
Expand Down
18 changes: 15 additions & 3 deletions acederbergio/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,27 @@ def created_time(self) -> datetime.datetime:


class QuartoRenderJob(util.HasTime):
"""Schema for scheduled jobs.
This should provided `scheduled_time` later on.
"""

kind_handler_result: ClassVar[KindHandlerResult] = "job"

command: list[str]
item_from: QuartoRenderFrom
kind: QuartoRenderKind
origin: str
target: str


# class QuartoRenderExec(QuartoRenderJob):
# """Schema for a job currently being executed."""
#
# kind_handler_result: ClassVar[KindHandlerResult] = "exec"
#


# TODO: Should have scheduled time, exec time, and completed time later.
class QuartoRenderMinimal(util.HasTime):
"""This should not be used internally, only to serve partial results."""

Expand Down Expand Up @@ -566,8 +578,8 @@ async def fromHandlerResults(
callback: Callable[["QuartoHandlerResult"], Awaitable[None]] | None = None,
) -> Self:

items, ignored = [], []
raw = dict(items=items, ignored=ignored)
items, ignored = [], [] # type: ignore[var-annotated]
raw = dict(items=items, ignored=ignored) # type: ignore[var-annotated]

async for item in stream:
if item.kind == "request":
Expand Down
1 change: 0 additions & 1 deletion acederbergio/filters/resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def hydrate_tex(self, element: pf.Element) -> pf.Element:

head_elements = self.create_header_tex()
element.content = (*head_elements, *element.content)
logger.critical("%s", element.to_json())
return element


Expand Down
193 changes: 185 additions & 8 deletions blog/dev/plan.qmd
Original file line number Diff line number Diff line change
@@ -1,24 +1,201 @@
---
title: Long Term Plan for Dev Mode
format:
html:
mermaid:
theme: dark
toc: false
tbl-colwidths: [25, 15, 60]
---

Ideally renders should be scheduled in a queue.
## The Big Picture

A few of the goals I have here are

1. `POST /render` should only schedule the job, clients will see the completed
job via the websocket.
2. `WEBSOCKET /` will inform the user when the job is **scheduled, occuring,
or completed** using `QuartoRenderJob`, `QuartoRenderExec`, and
`QuartoRender` respectively.
3. This will be executed using a queue and a stack all stored within `MongoDB`.
These should accomplish the following:
- The first queue will contain scheduled jobs (`QuartoRenderJob`).
- A pointer will hold on to the scheduled job while it is transformed into
`QuartoRenderExec`, where the command will be computed and executed.
This will be done by transforming the head of the queue in place in `MongoDB`
after the command has been determined and run (to avoid injections if this
is ever deployed in any capacity).
- The websocket will know when the front of the queue has changed so it will
send the client a notification that the next render has started.
- The stack will have every latest render put on the top once the job is
dequeued. It will have the status code, `STDOUT`, and `STERR` output
attached at this point.
- The websocket will know that the top of the stack has changed and thus
send out the new items on the stack to the user.

```{mermaid}
sequenceDiagram
participant Client
actor Client
participant WEBSOCKET
participant POST
participant Handler
participant MongoDB
Client->>POST: Request Render
POST->>MongoDB: Enqueue Job
WEBSOCKET-->>MongoDB: Websocket Reads Scheduled Job
WEBSOCKET->>Client: Client Sees Scheduled Job
Client->>+POST: Request Render
POST->>-MongoDB: Enqueue Job
POST->>Client: Client Sees Scheduled Job
MongoDB-->>+WEBSOCKET: Websocket Reads Scheduled Job
WEBSOCKET-->>-Client: Client Sees Scheduled Job
MongoDB->>Handler: Render Job is Dequeued
Handler->>MongoDB: Render Job Marked as Started
MongoDB-->>WEBSOCKET: WebSocket Reads Started Job
WEBSOCKET-->>Client: Client Sees that Job Started
Handler->>Handler: Render Job is Processed
Handler->>MongoDB: Completed Job Recorded in MongoDB
WEBSOCKET->>MongoDB: Websocket Sees Completed Job
WEBSOCKET->>Client: Client Sees Completed Job
MongoDB-->>WEBSOCKET: Websocket Sees Completed Job
WEBSOCKET-->>Client: Client Sees Completed Job
```

One notable problem with the diagram above is that the client needs to ask the
websocket to send events, as the websocket is lazy. Right now, the javascript
closure `Quarto` has a timer that pings the websocket on a regular interval
to prompt it to send about completed jobs. This limitation will be removed
once the code is written so that there is only a single listener.

## Event Lifetime

```{mermaid}
flowchart LR
POST{"`**POST /render** or write`"}
WEBSOCKET{"`**WEBSOCKET /**`"}
POST -->|Job Enqueued| QuartoRenderJob((QuartoRenderJob))
QuartoRenderJob -->|Job Dequeued| QuartoRenderExec((QuartoRenderExec))
QuartoRenderExec -->|Job Pushed to Stack| QuartoRender((QuartoRender))
QuartoRender -->|Websocket Reads Stack| WEBSOCKET
```

## Handler Lifetime

```{mermaid}
sequenceDiagram
actor Client
participant FastAPI
participant Handler
participant MongoDB
FastAPI ->> Handler: FastAPI App @lifespan Starts Handler
Handler ->> MongoDB: Creates a Document for Queue and Stack
Handler <<->> MongoDB: Handler Routinly Checks MongoDB for New Jobs.
Client -->> FastAPI: Use **POST /render** Adds Document to Queue
FastAPI -->> Client: Says event is scheduled, ``HTTP 201``.
Client -->> FastAPI: **POST /handler?idle=true** Stops Watching
FastAPI -->> Handler: Handler Stops Watching, ``HTTP 200``.
Client -->> FastAPI: **POST /render** Tries to Render
FastAPI -->> Client: Says event cannot be scheduled yet, ``HTTP 400``.
Client -->> FastAPI: **POST /handler?idle=false** Starts Watching Again
FastAPI <<->> Handler: Wakes up Handler
FastAPI -->> Client: Handler Watching Again, ``HTTP 200``
Handler <<->> MongoDB: Handler Routinly Checks MongoDB for New Jobs.
Client -->> FastAPI: Use **POST /render** Adds Document to Queue
FastAPI -->> Client: Says event is scheduled, ``HTTP 201``.
FastAPI ->> Handler: FastAPI Stops Handler when @lifespan Over
```

The handler should start a background task in fastapi using the a context
manager `Handler.listen`. `asyncio.Event` could also be used to tell the
listener to suspend on certain occasions.

### Handler Listener Implementation

To do this, I would like `Handler.listen` to spawn an [`asyncio.Event`](https://docs.python.org/3/library/asyncio-sync.html#asyncio.Event) which the
listener could use to tell the listener to start or stop (using `Handler.idle`).
Stoping the listener can be convenient at times. When the listener is started
back up it will begin to accept new jobs.

The `Handler` instance would have to store the `asyncio.Event` instance
to communicate with a watcher `asyncio.Task` created by `listen`.

When the `Handler` is told not to idle, it could push new events to an
`asyncio.Queue` on the `Handler` instance so that depencies can tune in.

::: { .callout-note collapse=true }

### Example Code for Using a Listener.

I asked GPT to generate an example of something like this.
My prompt was

> Using asyncio, how can a function push an event to a listener function to tell it to pause, for instance?
And in the response it included the following code:

```python
import asyncio


async def worker(pause_event: asyncio.Event):
"""Worker function that pauses when pause_event is cleared."""
while True:
await pause_event.wait() # Wait until event is set (i.e., not paused)
print("Working...")
await asyncio.sleep(1) # Simulate work


async def controller(pause_event: asyncio.Event):
"""Controller function that toggles the pause state."""
await asyncio.sleep(3) # Let the worker run for a while
print("Pausing worker...")
pause_event.clear() # Signal worker to pause

await asyncio.sleep(3) # Keep it paused for a while
print("Resuming worker...")
pause_event.set() # Signal worker to resume


async def main():
pause_event = asyncio.Event()
pause_event.set() # Initially, allow the worker to run

worker_task = asyncio.create_task(worker(pause_event))
controller_task = asyncio.create_task(controller(pause_event))

await asyncio.gather(worker_task, controller_task)


asyncio.run(main())
```

:::

## Todo

::: { #tbl-todo-handler }

| Item | Completed | Description |
| -------------------------- | --------- | ---------------------------------------------------------------------------------------------------------------- |
| `POST /render` | | should return scheduled jobs |
| `WEBSOCKET` | | Should have a single database listener so that clients don't have to ping. |
| `Handler.listen` | | Context manager to allow others to listen, e.g. the above. Should emit `HandlerResult`. |
| `Handler.schedule` | | Should be used by `POST /render` to schedule a job. |
| `Handler.eval` | | Evaluate a scheduled job, mark started job. |
| `Handler.idle` | | Tell the handler background task to idle or stop go. This means that it would be possible to pause the listener. |
| `QuartoRender*` time attrs | | Should contain scheduled time, evaluated time, and completed time. |

: Objectives Handler { .todo-table }

:::

<script type="module">
import {hydrate} from '/js/todo.js'
hydrate()
</script>
1 change: 0 additions & 1 deletion blog/dev/todo.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ format:
html:
page-layout: full
toc: false
live_reload: True
tbl-colwidths: [25, 15, 60]
---

Expand Down
10 changes: 10 additions & 0 deletions blog/themes/extras.scss
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,13 @@ $extraColors : ("blue": $blue,

$utilities : ();
$utilities : map-merge($utilities, createExtras($extraColors));

/*-- scss:rules --*/


p code:not(.sourceCode),
li code:not(.sourceCode),
td code:not(.sourceCode) {
background: var(--bs-body-bg);
color: var(--bs-pink);
}
1 change: 0 additions & 1 deletion errors.yaml

This file was deleted.

Loading

0 comments on commit 67d72b4

Please sign in to comment.