-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathuserfeed.go
78 lines (67 loc) · 2.39 KB
/
userfeed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main
import (
"context"
restate "github.com/restatedev/sdk-go"
"github.com/restatedev/sdk-go/server"
"log"
"time"
)
type SocialMediaPost struct {
Content string `json:"content"`
Metadata string `json:"metadata"`
}
// Processing events (from Kafka) to update various downstream systems
// - Journaling actions in Restate and driving retries from Restate, recovering
// partial progress
// - Preserving the order-per-key, but otherwise allowing high-fanout, because
// processing of events does not block other events.
// - Ability to delay events when the downstream systems are busy, without blocking
// entire partitions.
type UserFeed struct{}
// The Kafka key routes events to the correct Virtual Object.
// Events with the same key are processed one after the other.
func (UserFeed) ProcessPost(ctx restate.ObjectContext, post SocialMediaPost) error {
var userId = restate.Key(ctx)
postId, err := restate.Run(ctx, func(ctx restate.RunContext) (string, error) {
return CreatePost(userId, post)
})
if err != nil {
return err
}
// Delay processing until content moderation is complete (handler suspends when on FaaS).
// This only blocks other posts for this user (Virtual Object), not for other users.
for {
status, err := restate.Run(ctx, func(ctx restate.RunContext) (string, error) {
return GetPostStatus(postId), nil
})
if err != nil {
return err
}
if status != PENDING {
break
}
err = restate.Sleep(ctx, 5*time.Second)
if err != nil {
return err
}
}
if _, err := restate.Run(ctx, func(ctx restate.RunContext) (restate.Void, error) {
return restate.Void{}, UpdateUserFeed(userId, postId)
}); err != nil {
return err
}
return nil
}
func main() {
if err := server.NewRestate().
Bind(restate.Reflect(UserFeed{})).
Start(context.Background(), "0.0.0.0:9080"); err != nil {
log.Fatal(err)
}
}
// Process new posts for users via Kafka or by calling the endpoint over HTTP:
/*
curl localhost:8080/UserFeed/userid1/ProcessPost -H 'content-type:application/json' -d '{"content": "Hi! This is my first post!", "metadata": "public"}' &&
curl localhost:8080/UserFeed/userid2/ProcessPost -H 'content-type:application/json' -d '{"content": "Hi! This is my first post!", "metadata": "public"}' &&
curl localhost:8080/UserFeed/userid1/ProcessPost -H 'content-type:application/json' -d '{"content": "Hi! This is my second post!", "metadata": "public"}'
*/