Skip to content

Commit

Permalink
update polletracker for type breakdown
Browse files Browse the repository at this point in the history
  • Loading branch information
ketsiambaku committed Dec 2, 2024
1 parent e3802b7 commit 65fdc1b
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 36 deletions.
102 changes: 75 additions & 27 deletions internal/common/debug/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ import (
"fmt"
"sort"
"sync"

"go.uber.org/atomic"
)

type (
// pollerTrackerImpl implements the PollerTracker interface
pollerTrackerImpl struct {
pollerCount atomic.Int32
sync.RWMutex
count map[string]int64
}

// stopperImpl implements the Stopper interface
stopperImpl struct {
sync.Once
workerType string
pollerTracker *pollerTrackerImpl
}

Expand Down Expand Up @@ -93,40 +94,67 @@ func (asi *activityStopperImpl) Stop() {
})
}

func (p *pollerTrackerImpl) Start() Stopper {
p.pollerCount.Inc()
return &stopperImpl{
pollerTracker: p,
}
func (p *pollerTrackerImpl) Start(workerType string) Stopper {
p.Lock()
defer p.Unlock()
p.count[workerType]++
return &stopperImpl{pollerTracker: p, workerType: workerType}
}

func (p *pollerTrackerImpl) Stats() int32 {
return p.pollerCount.Load()
func (p *pollerTrackerImpl) Stats() Pollers {
var pollers Pollers
p.RLock()
defer p.RUnlock()
for pollerType, count := range p.count {
if count > 0 {
pollers = append(pollers, struct {
Type string
Count int64
}{Type: pollerType, Count: count})
}
}
sort.Slice(pollers, func(i, j int) bool {
return pollers[i].Type < pollers[j].Type
})
return pollers
}

func (s *stopperImpl) Stop() {
s.pollerTracker.pollerCount.Dec()
s.Do(func() {
s.pollerTracker.Lock()
defer s.pollerTracker.Unlock()
s.pollerTracker.count[s.workerType]--
if s.pollerTracker.count[s.workerType] == 0 {
delete(s.pollerTracker.count, s.workerType)
}
})
}

func Example() {
var pollerTracker PollerTracker
pollerTracker = &pollerTrackerImpl{}
pollerTracker = &pollerTrackerImpl{count: make(map[string]int64)}

// Initially, poller count should be 0
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start a poller and verify that the count increments
stopper1 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

// Start another poller and verify that the count increments again
stopper2 := pollerTracker.Start()
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))

jsonPollers, _ := json.MarshalIndent(pollerTracker.Stats(), "", " ")
fmt.Println(string(jsonPollers))

// Start pollers and verify that the count increments
stopper1 := pollerTracker.Start("ActivityWorker")
jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ")
fmt.Println(string(jsonPollers))
stopper2 := pollerTracker.Start("ActivityWorker")
jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ")
fmt.Println(string(jsonPollers))
// Start another poller type and verify that the count increments
stopper3 := pollerTracker.Start("WorkflowWorker")
jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ")
fmt.Println(string(jsonPollers))
// Stop the pollers and verify the counter
stopper1.Stop()
stopper2.Stop()
fmt.Println(fmt.Sprintf("poller stats: %d", pollerTracker.Stats()))
stopper3.Stop()
jsonPollers, _ = json.MarshalIndent(pollerTracker.Stats(), "", " ")
fmt.Println(string(jsonPollers))

var activityTracker ActivityTracker
activityTracker = &activityTrackerImpl{activityCount: make(map[ActivityInfo]int64)}
Expand Down Expand Up @@ -157,10 +185,30 @@ func Example() {
fmt.Println(string(jsonActivities))

// Output:
// poller stats: 0
// poller stats: 1
// poller stats: 2
// poller stats: 0
// null
// [
// {
// "Type": "ActivityWorker",
// "Count": 1
// }
// ]
// [
// {
// "Type": "ActivityWorker",
// "Count": 2
// }
// ]
// [
// {
// "Type": "ActivityWorker",
// "Count": 2
// },
// {
// "Type": "WorkflowWorker",
// "Count": 1
// }
// ]
// null
// [
// {
// "Info": {
Expand Down
9 changes: 7 additions & 2 deletions internal/common/debug/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ type (
PollerTracker interface {
// Start collects information on poller start up.
// consumers should provide a concurrency-safe implementation.
Start() Stopper
Start(workerType string) Stopper
// Stats return the number or running pollers
Stats() int32
Stats() Pollers
}

// WorkerStats provides a set of methods that can be used to collect
Expand Down Expand Up @@ -70,6 +70,11 @@ type (
Count int64
}

Pollers []struct {
Type string
Count int64
}

// Debugger exposes stats collected on a running Worker
// Deprecated: in development and very likely to change
Debugger interface {
Expand Down
6 changes: 3 additions & 3 deletions internal/common/debug/workerstats_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type (
activityTrackerNoopImpl struct{}
)

func (lc *pollerTrackerNoopImpl) Start() Stopper { return &stopperNoopImpl{} }
func (lc *pollerTrackerNoopImpl) Stats() int32 { return 0 }
func (r *stopperNoopImpl) Stop() {}
func (lc *pollerTrackerNoopImpl) Start(workerType string) Stopper { return &stopperNoopImpl{} }
func (lc *pollerTrackerNoopImpl) Stats() Pollers { return nil }
func (r *stopperNoopImpl) Stop() {}

// NewNoopPollerTracker creates a new PollerTracker instance
func NewNoopPollerTracker() PollerTracker { return &pollerTrackerNoopImpl{} }
Expand Down
6 changes: 3 additions & 3 deletions internal/common/debug/workerstats_noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func TestWorkerStats(t *testing.T) {
pollerTracker := NewNoopPollerTracker()
activityTracker := NewNoopActivityTracker()
assert.NotNil(t, pollerTracker)
assert.NotNil(t, pollerTracker.Start())
assert.Equal(t, int32(0), pollerTracker.Stats())
assert.NotPanics(t, pollerTracker.Start().Stop)
assert.NotNil(t, pollerTracker.Start(""))
assert.Nil(t, pollerTracker.Stats())
assert.NotPanics(t, pollerTracker.Start("").Stop)
assert.NotNil(t, activityTracker.Start(ActivityInfo{}))
assert.Nil(t, activityTracker.Stats())
}
2 changes: 1 addition & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (bw *baseWorker) isShutdown() bool {

func (bw *baseWorker) runPoller() {
defer bw.shutdownWG.Done()
defer bw.options.pollerTracker.Start().Stop()
defer bw.options.pollerTracker.Start(bw.options.workerType).Stop()

bw.metricsScope.Counter(metrics.PollerStartCounter).Inc(1)

Expand Down

0 comments on commit 65fdc1b

Please sign in to comment.