-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathmachine_operator.py
50 lines (36 loc) · 1.65 KB
/
machine_operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import restate
from restate import VirtualObject, ObjectContext
from utils import bring_up_machine, Status, tear_down_machine
# This is a State Machine implemented with a Virtual Object
#
# - The object holds the state of the state machine and defines the methods
# to transition between the states.
# - The object's unique id identifies the state machine. Many parallel state
# machines exist, but only state machine (object) exists per id.
# - The "single-writer-per-key" characteristic of virtual objects ensures
# that one state transition per state machine is in progress at a time.
# Additional transitions are enqueued for that object, while a transition
# for a machine is still in progress.
machine_operator = VirtualObject("machine-operator")
@machine_operator.handler("setUp")
async def set_up(ctx: ObjectContext):
machine_id = ctx.key()
# Ignore duplicate calls to 'setUp'
status = await ctx.get("status")
if status == Status.UP:
return f"{machine_id} is already up, so nothing to do"
# Bringing up a machine is a slow process that frequently crashes
await bring_up_machine(ctx, machine_id)
ctx.set("status", Status.UP)
return f"{machine_id} is now up"
@machine_operator.handler("tearDown")
async def tear_down(ctx: ObjectContext):
machine_id = ctx.key()
status = await ctx.get("status")
if status != Status.UP:
return f"{machine_id} is not up, cannot tear down"
# Tearing down a machine is a slow process that frequently crashes
await tear_down_machine(ctx, machine_id)
ctx.set("status", Status.DOWN)
return f"{machine_id} is now down"
app = restate.app([machine_operator])