-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathexecution_cache.go
44 lines (37 loc) · 1008 Bytes
/
execution_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package pluto
import (
"sync"
)
var (
ExecutionCache = make(map[string]Pipeline)
ExecutionCacheMutex = new(sync.RWMutex)
)
// Process
//
// TODO:
// 1. Goroutine pool
func Process(processable RoutableProcessable) {
if processable.GetConsumer().PredefinedKind() != KindPipeline {
ApplicationLogger.Debug(ApplicationLog{
Message: "Kind is not supported for processing OutComingProcessable",
Extra: map[string]any{"kind": processable.GetConsumer().PredefinedKind()},
})
return
}
ExecutionCacheMutex.RLock()
defer ExecutionCacheMutex.RUnlock()
p, found := ExecutionCache[processable.GetConsumer().UniqueProperty()]
if !found {
ApplicationLogger.Warning(ApplicationLog{
Message: "Pipeline not found",
Extra: map[string]any{"unique_property": processable.GetConsumer().UniqueProperty()},
})
return
}
p.Process(processable)
}
func ReloadExecutionCache(c map[string]Pipeline) {
ExecutionCacheMutex.Lock()
defer ExecutionCacheMutex.Unlock()
ExecutionCache = c
}