-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathterminator_executor.go
99 lines (85 loc) · 2.97 KB
/
terminator_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package core
import (
"context"
"fmt"
"time"
)
type TerminatorExecutor interface {
Executor
ExecuteUntilTermination(context context.Context, parameters Parameters, configs Configs) (result Result, err error)
}
type FailedTerminationError struct {
Result Result
Message string
}
func (e FailedTerminationError) Error() string {
return e.Message
}
type executeTerminatorWithCheck struct {
Executor
maxRetries int
interval time.Duration
checkTerminate func(ctx context.Context, exec Executor, result ResultWithValue) (terminated bool, err error)
}
func (o *executeTerminatorWithCheck) Unwrap() Executor {
return o.Executor
}
func (o *executeTerminatorWithCheck) Execute(ctx context.Context, parameters Parameters, configs Configs) (result Result, err error) {
result, err = o.Executor.Execute(ctx, parameters, configs)
return ExecutorWrapResult(o, result, err)
}
func (o *executeTerminatorWithCheck) ExecuteUntilTermination(context context.Context, parameters Parameters, configs Configs) (result Result, err error) {
result, err = o.executeUntilTermination(context, parameters, configs)
return ExecutorWrapResult(o, result, err)
}
func (o *executeTerminatorWithCheck) executeUntilTermination(context context.Context, parameters Parameters, configs Configs) (result Result, err error) {
var exec func() (Result, error)
if tExec, ok := ExecutorAs[TerminatorExecutor](o.Unwrap()); ok {
exec = func() (result Result, err error) {
result, err = tExec.ExecuteUntilTermination(context, parameters, configs)
return ExecutorWrapResult(o, result, err)
}
} else {
exec = func() (result Result, err error) {
return o.Execute(context, parameters, configs)
}
}
for i := 0; i < o.maxRetries; i++ {
result, err = exec()
if err != nil {
return result, err
}
resultWithValue, ok := ResultAs[ResultWithValue](result)
if !ok {
return result, fmt.Errorf("result does not have a value")
}
terminated, err := o.checkTerminate(context, o.Executor, resultWithValue)
if err != nil {
return result, err
}
if terminated {
return result, nil
}
timer := time.NewTimer(o.interval)
select {
case <-context.Done():
timer.Stop()
return nil, context.Err()
case <-timer.C:
}
}
msg := fmt.Sprintf("maximum number of retries exceeded. Retries: %d, interval: %s", o.maxRetries, o.interval)
return result, FailedTerminationError{Result: result, Message: msg}
}
var _ TerminatorExecutor = (*executeTerminatorWithCheck)(nil)
var _ ExecutorWrapper = (*executeTerminatorWithCheck)(nil)
// Execute the operation and check the results until it's considered terminated.
// The executor will wait `interval` between retries, executing up to `maxRetries`
func NewTerminatorExecutorWithCheck(
executor Executor,
maxRetries int,
interval time.Duration,
checkTerminate func(ctx context.Context, exec Executor, result ResultWithValue) (terminated bool, err error),
) TerminatorExecutor {
return &executeTerminatorWithCheck{executor, maxRetries, interval, checkTerminate}
}