Skip to content

Commit

Permalink
revert task permit to use channel implementation instead
Browse files Browse the repository at this point in the history
  • Loading branch information
shijiesheng committed Dec 2, 2024
1 parent d2e13fd commit 0b11e7d
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 236 deletions.
4 changes: 0 additions & 4 deletions internal/common/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ package autoscaler
type (
AutoScaler interface {
Estimator
// Acquire X ResourceUnit of resource
Acquire(ResourceUnit) error
// Release X ResourceUnit of resource
Release(ResourceUnit)
// GetCurrent ResourceUnit of resource
GetCurrent() ResourceUnit
// Start starts the autoscaler go routine that scales the ResourceUnit according to Estimator
Expand Down
10 changes: 0 additions & 10 deletions internal/internal_poller_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,6 @@ func newPollerScaler(
}
}

// Acquire concurrent poll quota
func (p *pollerAutoScaler) Acquire(resource autoscaler.ResourceUnit) error {
return p.permit.Acquire(p.ctx, int(resource))
}

// Release concurrent poll quota
func (p *pollerAutoScaler) Release(resource autoscaler.ResourceUnit) {
p.permit.Release(int(resource))
}

// GetCurrent poll quota
func (p *pollerAutoScaler) GetCurrent() autoscaler.ResourceUnit {
return autoscaler.ResourceUnit(p.permit.Quota())
Expand Down
7 changes: 4 additions & 3 deletions internal/internal_poller_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package internal

import (
"context"
"math/rand"
"sync"
"testing"
Expand Down Expand Up @@ -172,7 +173,7 @@ func Test_pollerAutoscaler(t *testing.T) {
TargetUtilization: float64(tt.args.targetMilliUsage) / 1000,
},
testlogger.NewZap(t),
worker.NewPermit(tt.args.initialPollerCount),
worker.NewResizablePermit(tt.args.initialPollerCount),
// hook function that collects number of iterations
func() {
autoscalerEpoch.Add(1)
Expand All @@ -192,9 +193,9 @@ func Test_pollerAutoscaler(t *testing.T) {
go func() {
defer wg.Done()
for pollResult := range pollChan {
pollerScaler.Acquire(1)
pollerScaler.permit.Acquire(context.Background())
pollerScaler.CollectUsage(pollResult)
pollerScaler.Release(1)
pollerScaler.permit.Release()
}
}()
}
Expand Down
18 changes: 9 additions & 9 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,13 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
ctx, cancel := context.WithCancel(context.Background())

concurrency := &worker.ConcurrencyLimit{
PollerPermit: worker.NewPermit(options.pollerCount),
TaskPermit: worker.NewPermit(options.maxConcurrentTask),
PollerPermit: worker.NewResizablePermit(options.pollerCount),
TaskPermit: worker.NewChannelPermit(options.maxConcurrentTask),
}

var pollerAS *pollerAutoScaler
if pollerOptions := options.pollerAutoScaler; pollerOptions.Enabled {
concurrency.PollerPermit = worker.NewPermit(pollerOptions.InitCount)
concurrency.PollerPermit = worker.NewResizablePermit(pollerOptions.InitCount)
pollerAS = newPollerScaler(
pollerOptions,
logger,
Expand Down Expand Up @@ -252,7 +252,7 @@ func (bw *baseWorker) runPoller() {
select {
case <-bw.shutdownCh:
return
case <-bw.concurrency.TaskPermit.AcquireChan(bw.limiterContext, &bw.shutdownWG): // don't poll unless there is a task permit
case <-bw.concurrency.TaskPermit.GetChan(): // don't poll unless there is a task permit
// TODO move to a centralized place inside the worker
// emit metrics on concurrent task permit quota and current task permit count
// NOTE task permit doesn't mean there is a task running, it still needs to poll until it gets a task to process
Expand Down Expand Up @@ -300,10 +300,10 @@ func (bw *baseWorker) pollTask() {
var task interface{}

if bw.pollerAutoScaler != nil {
if pErr := bw.pollerAutoScaler.Acquire(1); pErr == nil {
defer bw.pollerAutoScaler.Release(1)
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
defer bw.concurrency.PollerPermit.Release()
} else {
bw.logger.Warn("poller auto scaler acquire error", zap.Error(pErr))
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
}
}

Expand Down Expand Up @@ -339,7 +339,7 @@ func (bw *baseWorker) pollTask() {
case <-bw.shutdownCh:
}
} else {
bw.concurrency.TaskPermit.Release(1) // poll failed, trigger a new poll by returning a task permit
bw.concurrency.TaskPermit.Release() // poll failed, trigger a new poll by returning a task permit
}
}

Expand Down Expand Up @@ -374,7 +374,7 @@ func (bw *baseWorker) processTask(task interface{}) {
}

if isPolledTask {
bw.concurrency.TaskPermit.Release(1) // task processed, trigger a new poll by returning a task permit
bw.concurrency.TaskPermit.Release() // task processed, trigger a new poll by returning a task permit
}
}()
err := bw.options.taskWorker.ProcessTask(task)
Expand Down
70 changes: 70 additions & 0 deletions internal/worker/channel_permit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2017-2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package worker

import (
"context"
"fmt"
)

type channelPermit struct {
channel chan struct{}
}

// NewChannelPermit creates a static permit that's not resizable
func NewChannelPermit(count int) ChannelPermit {
channel := make(chan struct{}, count)
for i := 0; i < count; i++ {
channel <- struct{}{}
}
return &channelPermit{channel: channel}

Check warning on line 38 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L33-L38

Added lines #L33 - L38 were not covered by tests
}

func (p *channelPermit) Acquire(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("failed to acquire permit before context is done")
case p.channel <- struct{}{}:
return nil

Check warning on line 46 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L41-L46

Added lines #L41 - L46 were not covered by tests
}
}

// AcquireChan returns a permit ready channel
func (p *channelPermit) GetChan() <-chan struct{} {
return p.channel

Check warning on line 52 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L51-L52

Added lines #L51 - L52 were not covered by tests
}

func (p *channelPermit) Release() {
p.channel <- struct{}{}

Check warning on line 56 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

// Count returns the number of permits available
func (p *channelPermit) Count() int {
return len(p.channel)

Check warning on line 61 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}

func (p *channelPermit) Quota() int {
return cap(p.channel)

Check warning on line 65 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

// SetQuota on static permit doesn't take effect
func (p *channelPermit) SetQuota(_ int) {

Check warning on line 69 in internal/worker/channel_permit.go

View check run for this annotation

Codecov / codecov/patch

internal/worker/channel_permit.go#L69

Added line #L69 was not covered by tests
}
72 changes: 10 additions & 62 deletions internal/worker/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,79 +22,27 @@ package worker

import (
"context"
"fmt"
"sync"

"github.com/marusama/semaphore/v2"
)

var _ Permit = (*permit)(nil)
var _ Permit = (*resizablePermit)(nil)
var _ ChannelPermit = (*channelPermit)(nil)

// ConcurrencyLimit contains synchronization primitives for dynamically controlling the concurrencies in workers
type ConcurrencyLimit struct {
PollerPermit Permit // controls concurrency of pollers
TaskPermit Permit // controls concurrency of task processing
PollerPermit Permit // controls concurrency of pollers
TaskPermit ChannelPermit // controls concurrency of task processing
}

// Permit is an adaptive permit issuer to control concurrency
type Permit interface {
Acquire(context.Context, int) error
AcquireChan(context.Context, *sync.WaitGroup) <-chan struct{}
Acquire(context.Context) error
Count() int
Quota() int
Release()
SetQuota(int)
Count() int
Release(count int)
}

type permit struct {
sem semaphore.Semaphore
}

// NewPermit creates a dynamic permit that's resizable
func NewPermit(initCount int) Permit {
return &permit{sem: semaphore.New(initCount)}
}

// Acquire is blocking until a permit is acquired or returns error after context is done
// Remember to call Release(count) to release the permit after usage
func (p *permit) Acquire(ctx context.Context, count int) error {
if err := p.sem.Acquire(ctx, count); err != nil {
return fmt.Errorf("failed to acquire permit before context is done: %w", err)
}
return nil
}

// AcquireChan returns a permit ready channel. Similar to Acquire, but non-blocking.
// Remember to call Release(1) to release the permit after usage
func (p *permit) AcquireChan(ctx context.Context, wg *sync.WaitGroup) <-chan struct{} {
ch := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
if err := p.sem.Acquire(ctx, 1); err != nil {
return
}
select { // try to send to channel, but don't block if listener is gone
case ch <- struct{}{}:
case <-ctx.Done():
p.sem.Release(1)
}
}()
return ch
}

func (p *permit) Release(count int) {
p.sem.Release(count)
}

func (p *permit) Quota() int {
return p.sem.GetLimit()
}

func (p *permit) SetQuota(c int) {
p.sem.SetLimit(c)
}

func (p *permit) Count() int {
return p.sem.GetCount()
type ChannelPermit interface {
Permit
GetChan() <-chan struct{} // fetch the underlying channel
}
Loading

0 comments on commit 0b11e7d

Please sign in to comment.