-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rework on poller auto scaler #1411
base: master
Are you sure you want to change the base?
rework on poller auto scaler #1411
Conversation
82e6bb6
to
636c433
Compare
Codecov ReportAttention: Patch coverage is
Continue to review full report in Codecov by Sentry.
|
@@ -421,3 +422,20 @@ func (bw *baseWorker) Stop() { | |||
} | |||
return | |||
} | |||
|
|||
func getAutoConfigHint(task interface{}) *shared.AutoConfigHint { | |||
switch t := task.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when I was poking through these types, to see if this was complete, I believe I found a fourth. I don't remember if it was locally-dispatched activity types or query tasks or something else though.
what happens if this is wrong? does it just consider [something] as having no cost and... do what?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nonblocking suggestion: not to be too OO pilled or wahtever, If this is a property of the task type, maybe make it a method on the task interface?
ie, instead consider
type autoConfigAwareTask interface {
getAutoConfigHint() *shared.AutoConfigHint
}
// for each task, make them responsible for their internal getters and internal state
func (t *workflowTask) getAutoConfigHint() {
return t.shared.AutoConfigHint
}
inputChan <- tt.input[i] | ||
<-doneC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is serializing the whole thing, so it doesn't really test anything concurrently - you can even remove the mutexes and it'll still pass the race detector.
what's the goal with the concurrency? it's how it's used so 👍 concurrency in tests is good, but this has artificially nerfed it.
r.index %= len(r.window) | ||
r.sum += value - r.window[r.index] | ||
r.window[r.index] = value | ||
r.index++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general design critique: where possible, make sure all state valid when it leaves the function boundaries.
e.g. in this case, r.index is allowed to be ==len (and unsafe to use) every len(r.window)
times it's called. if this were instead:
r.index %= len(r.window) | |
r.sum += value - r.window[r.index] | |
r.window[r.index] = value | |
r.index++ | |
r.sum += value - r.window[r.index] | |
r.window[r.index] = value | |
r.index++ | |
r.index %= len(r.window) |
it'd always be correct.
c.scope.Gauge("poller_in_action").Update(float64(c.concurrency.PollerPermit.Count())) | ||
c.scope.Gauge("poller_quota").Update(float64(c.concurrency.PollerPermit.Quota())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no complaint at all about emitting both, but: when are we expecting them to be different, aside from for extremely small amounts of time (between "poll complete" and "started the next poll")?
if it's just for general monitoring / if there is a difference we can investigate because it might be a bug, yea seems great. I'm just wondering if there's some other intent / expected difference, like from other limiters or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense. Maybe just poller quota is needed here
if switched := c.enable.CompareAndSwap(!shouldEnable, shouldEnable); switched { | ||
if shouldEnable { | ||
c.logEvent(autoScalerEventEnable) | ||
} else { | ||
c.resetConcurrency() | ||
c.logEvent(autoScalerEventDisable) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tbh I think I'd prefer just using a mutex.
right now there's nothing preventing:
updatePollerPermit
begins, checks enable...- disable
- reset concurrency resets the quota
- ...
updatePollerPermit
sets quota to something else
and leaving things in an abnormal state with no ability to notice it.
with the possible exception of metrics/log buffer flushing and whatnot, there's no I/O in here and no expected lock contention - should be no issue at all with using a mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense
// 2. enable/disable auto scaler | ||
func (c *ConcurrencyAutoScaler) ProcessPollerHint(hint *shared.AutoConfigHint) { | ||
if hint == nil { | ||
c.log.Warn("auto config hint is nil, this results in no action") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just info? otherwise this'll make a flood of warns in user services when rolling out or disabling, but it's not really an unexpected / worrying thing in that state.
or maybe no log if not enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. I'll mark it as info level
const ( | ||
defaultAutoScalerUpdateTick = time.Second | ||
// concurrencyAutoScalerObservabilityTick = time.Millisecond * 500 | ||
targetPollerWaitTimeInMsLog2 = 4 // 16 ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
took a while to figure out what exactly the goal here was, I think because the log2/exp2 steps are so separated...
might be easier to follow if you make this a normal ms value, and do a current*log2(actual/target)
at the point where you're calculating the quota instead? to me that feels more like more obviously a smoothing operation at that point / a "react to the magnitude rather than the value", where currently it's kinda hidden as a current*log2(actual)/log2(target)
that's being done in three separate locations.
re TimeInMs
vs a time.Duration
: seems fine to me. it's a ton of value-casting noise otherwise, sadly. and/or maybe we should consider a "duration math" util somewhere, as we do a moderate amount of duration-related floaty calculations in both client and server, and they're super verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only concern is currently I'm averaging the wait time. average(log2(wait_time))
If I adopt this approach, large wait time will play a bigger role in the average, which is not desired.
updateTime := c.clock.Now() | ||
if updateTime.Before(c.pollerPermitLastUpdate.Add(c.cooldown)) { // before cooldown | ||
c.logEvent(autoScalerEventPollerSkipUpdateCooldown) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this func (and therefore updateTime
's last value) is only ever called in a loop driven by a timer, it seems like this only has two modes of operation:
- cooldown < tick: every tick causes an update
- cooldown > tick: every N ticks cause an update
neither of which seems particularly useful. probably remove / leftovers from an older idea?
or is this intended as a one-time warm-up delay of some kind? I'm not sure that would be useful here / with the current math setup, but there's often some reason to have warmup periods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually updated when poller permit is updated. It serves as a sustain time to avoid updating multiple times in a small time window.
to stick it in here too: overall looks pretty good. simpler and the overall goal (and why it achieves it) is clearer too. seems like just minor tweaks (many optional) and it's probably good to go |
8438696
to
69f7e3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, but I left some nits
@@ -301,7 +308,7 @@ func (bw *baseWorker) pollTask() { | |||
var err error | |||
var task interface{} | |||
|
|||
if bw.pollerAutoScaler != nil { | |||
if bw.concurrencyAutoScaler != nil { | |||
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this looks like a leaking abstraction. This should be handled inside concurrencyAutoScaler.
I suggest moving all
concurrencyAutoScaler != nil checks inside methods where it is required.
This code should be simpler. Just calling methods on autoscaler. If it is nil, do nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense
return | ||
case <-ticker.Chan(): | ||
c.logEvent(autoScalerEventMetrics) | ||
c.lock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: push lock/unlock to updatePollerPermit, then you can use defer inside the function and ensure that unlock happens if anything will cause panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, it's simpler
c.wg.Add(1) | ||
|
||
go func() { | ||
defer c.wg.Done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any calls that start goroutine should have a panic handler.
If a bug exists, it will crash the worker process, significantly impacting customer service.
This is an optional functionality that should be safe to break. Worst case, it won't update concurrency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch
168b02e
to
a9d3781
Compare
a9d3781
to
52dd229
Compare
if hint.EnableAutoConfig != nil && *hint.EnableAutoConfig { | ||
shouldEnable = true | ||
} | ||
if shouldEnable != c.enabled { // flag switched |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider an early abort?
if hint.EnableAutoConfig == nil || !*hint.EnableAutoConfig {
return
}
``
if r.count == 0 { | ||
return 0 | ||
} | ||
return r.sum / T(r.count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there any conditions under which sum might become zero?
What changed?
Why?
How did you test it?
unit test
[WIP] canary test + bench test
Potential risks