-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfastapi_server.py
78 lines (57 loc) · 1.52 KB
/
fastapi_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import logging
import fastapi
import uvicorn
from fastapi import FastAPI
from darq.app import Darq
from darq.connections import RedisSettings
from darq_ui.integration.fastapi import setup
log = logging.getLogger(__name__)
darq = Darq(
redis_settings=RedisSettings(
host="localhost",
port=6379,
database=0,
),
ctx={},
)
@darq.task
async def say_hello_task(name: str) -> None:
log.info("Hello %s", name)
def create_app() -> FastAPI:
app = FastAPI(
debug=True,
servers=[
{"url": "http://localhost:3000", "description": "Dev environment"},
],
openapi_tags=[
{"name": "api", "description": "Darq-ui api"},
],
)
@app.on_event("startup")
async def startup() -> None:
await darq.connect()
@app.on_event("shutdown")
async def shutdown() -> None:
await darq.disconnect()
# In order for this to work, you need to run npm run build or npm run build-watch at least once
setup(
app,
darq,
base_path="/darq",
logs_url="https://mylogserver.com/taskname=${taskName}",
embed=True,
)
log.info("Server api configured at http:0.0.0.0:3000/api endpoint")
return app
def app_factory() -> fastapi.FastAPI:
return create_app()
def main(reload: bool) -> None:
uvicorn.run(
"fastapi_server:app_factory",
port=3000,
host="0.0.0.0",
reload=reload,
factory=True,
)
if __name__ == "__main__":
main(reload=True)