Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Add Flush to force aggregate in-memory buffer #923

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ type worker struct {
views map[string]*viewInternal
startTimes map[*viewInternal]time.Time

timer *time.Ticker
c chan command
quit, done chan bool
timer *time.Ticker
c chan command
quitCh, doneCh, flushCh chan struct{}
}

var defaultWorker *worker
Expand Down Expand Up @@ -107,6 +107,15 @@ func RetrieveData(viewName string) ([]*Row, error) {
return resp.rows, resp.err
}

// Flush force aggregates all buffered in-memory data.
// Otherwise, aggregation is going to happen at each period
// set by SetReportingPeriod.
//
// Flush is useful before program termination to avoid data loss.
func Flush() {
defaultWorker.flushAll()
}

func record(tags *tag.Map, ms interface{}) {
req := &recordReq{
tm: tags,
Expand Down Expand Up @@ -140,8 +149,9 @@ func newWorker() *worker {
startTimes: make(map[*viewInternal]time.Time),
timer: time.NewTicker(defaultReportingDuration),
c: make(chan command, 1024),
quit: make(chan bool),
done: make(chan bool),
quitCh: make(chan struct{}),
doneCh: make(chan struct{}),
flushCh: make(chan struct{}),
}
}

Expand All @@ -152,18 +162,26 @@ func (w *worker) start() {
cmd.handleCommand(w)
case <-w.timer.C:
w.reportUsage(time.Now())
case <-w.quit:
case <-w.flushCh:
w.reportUsage(time.Now())
w.doneCh <- struct{}{}
case <-w.quitCh:
w.timer.Stop()
close(w.c)
w.done <- true
w.doneCh <- struct{}{}
return
}
}
}

func (w *worker) flushAll() {
w.flushCh <- struct{}{}
<-w.doneCh
}

func (w *worker) stop() {
w.quit <- true
<-w.done
w.quitCh <- struct{}{}
<-w.doneCh
}

func (w *worker) getMeasureRef(name string) *measureRef {
Expand Down
30 changes: 30 additions & 0 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,36 @@ func TestUnregisterReportsUsage(t *testing.T) {
}
}

func TestFlush(t *testing.T) {
restart()

ctx := context.Background()

m := stats.Int64("measure", "desc", "unit")
SetReportingPeriod(time.Hour)

e := &vdExporter{}
RegisterExporter(e)

if err := Register(&View{Name: "count", Measure: m, Aggregation: Count()}); err != nil {
t.Fatal(err)
}

stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))
stats.Record(ctx, m.M(1))

Flush()

e.Lock()
got := len(e.vds)
e.Unlock()

if got == 0 {
t.Errorf("got %v aggregations; want at least one", got)
}
}

type countExporter struct {
sync.Mutex
count int64
Expand Down