Skip to content

Commit

Permalink
Lazily initialize the view package's default worker
Browse files Browse the repository at this point in the history
See discussion in census-instrumentation#1191 for impetus, but in general, it's not
particularly good practice to start up goroutines in package `init`
functions, with one reason being that it means that they're started for
projects that don't even make use of the package, and maybe just have it
as a transitive dependency.

So here, remove the `view` package's default worker in favor of a new
function that'll lazily initialize one when any package-level functions
are called that use the default worker. We use a `sync.Once` that has an
optimized short-circuit path in case the work's already been done, so
new overhead should be negligible, and registering/unregistering views
is probably not a particularly common operation anyway.

This change is backwards compatible, with use of the package's API
staying exactly the same. Test coverage seems to be pretty good, with a
function that previously reset the default worker between test cases
which I've modified to instead reset the default worker back to its
uninitialized state, and thereby verify that the test cases can
initialize it if necessary.

I thought about trying to deinitialize the default worker in case its
last view is unregistered, but decided against it because (1) it's more
invasive, (2) it's not likely to actually help anyone in practice as
views probably stay registered for a program's entire duration anwyay,
and (3) its interaction with the existing package-level `Stop` function
would take some thinking through.
  • Loading branch information
brandur committed Dec 11, 2022
1 parent b1a01ee commit aa4db66
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 20 deletions.
2 changes: 2 additions & 0 deletions stats/view/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ type Exporter interface {
//
// Binaries can register exporters, libraries shouldn't register exporters.
func RegisterExporter(e Exporter) {
maybeInitDefaultWorker()
defaultWorker.RegisterExporter(e)
}

// UnregisterExporter unregisters an exporter.
func UnregisterExporter(e Exporter) {
maybeInitDefaultWorker()
defaultWorker.UnregisterExporter(e)
}
24 changes: 24 additions & 0 deletions stats/view/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package view

import "testing"

// Exporting functionality is tested in `worker_test.go`. These are a trivial
// set of tests to make sure that these package-level exports will correctly
// initialize defaultWorker if necessary.
func TestRegisterExporter(t *testing.T) {
stopAndClearDefaultWorker()

e := &countExporter{}
RegisterExporter(e)

if _, ok := defaultWorker.exporters[e]; !ok {
t.Errorf("exporter doesn't appear to be registered with the default worker")
}
}

func TestUnregisterExporter(t *testing.T) {
stopAndClearDefaultWorker()

e := &countExporter{}
UnregisterExporter(e)
}
29 changes: 21 additions & 8 deletions stats/view/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ import (
"go.opencensus.io/tag"
)

func init() {
defaultWorker = NewMeter().(*worker)
go defaultWorker.start()
internal.DefaultRecorder = record
internal.MeasurementRecorder = recordMeasurement
}

type measureRef struct {
measure string
views map[*viewInternal]struct{}
Expand Down Expand Up @@ -113,13 +106,28 @@ type Meter interface {

var _ Meter = (*worker)(nil)

var defaultWorker *worker
var (
defaultWorker *worker
defaultWorkerInit sync.Once
)

// Lazily initializes and starts the package's default worker. Should be invoked
// before any exported functions in this package that use defaultWorker.
func maybeInitDefaultWorker() {
defaultWorkerInit.Do(func() {
defaultWorker = NewMeter().(*worker)
go defaultWorker.start()
internal.DefaultRecorder = record
internal.MeasurementRecorder = recordMeasurement
})
}

var defaultReportingDuration = 10 * time.Second

// Find returns a registered view associated with this name.
// If no registered view is found, nil is returned.
func Find(name string) (v *View) {
maybeInitDefaultWorker()
return defaultWorker.Find(name)
}

Expand All @@ -138,6 +146,7 @@ func (w *worker) Find(name string) (v *View) {
// Register begins collecting data for the given views.
// Once a view is registered, it reports data to the registered exporters.
func Register(views ...*View) error {
maybeInitDefaultWorker()
return defaultWorker.Register(views...)
}

Expand All @@ -157,6 +166,7 @@ func (w *worker) Register(views ...*View) error {
// It is not necessary to unregister from views you expect to collect for the
// duration of your program execution.
func Unregister(views ...*View) {
maybeInitDefaultWorker()
defaultWorker.Unregister(views...)
}

Expand All @@ -180,6 +190,7 @@ func (w *worker) Unregister(views ...*View) {
// RetrieveData gets a snapshot of the data collected for the the view registered
// with the given name. It is intended for testing only.
func RetrieveData(viewName string) ([]*Row, error) {
maybeInitDefaultWorker()
return defaultWorker.RetrieveData(viewName)
}

Expand Down Expand Up @@ -229,11 +240,13 @@ func (w *worker) recordMeasurement(tags *tag.Map, ms []stats.Measurement, attach
// duration is. For example, the Stackdriver exporter recommends a value no
// lower than 1 minute. Consult each exporter per your needs.
func SetReportingPeriod(d time.Duration) {
maybeInitDefaultWorker()
defaultWorker.SetReportingPeriod(d)
}

// Stop stops the default worker.
func Stop() {
maybeInitDefaultWorker()
defaultWorker.Stop()
}

Expand Down
26 changes: 14 additions & 12 deletions stats/view/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Test_Worker_ViewRegistration(t *testing.T) {

for _, tc := range tcs {
t.Run(tc.label, func(t *testing.T) {
restart()
stopAndClearDefaultWorker()

views := map[string]*View{
"v1ID": {
Expand Down Expand Up @@ -123,7 +123,7 @@ func Test_Worker_ViewRegistration(t *testing.T) {
}

func Test_Worker_MultiExport(t *testing.T) {
restart()
stopAndClearDefaultWorker()

// This test reports the same data for the default worker and a secondary
// worker, and ensures that the stats are kept independently.
Expand Down Expand Up @@ -239,7 +239,7 @@ func Test_Worker_MultiExport(t *testing.T) {
}

func Test_Worker_RecordFloat64(t *testing.T) {
restart()
stopAndClearDefaultWorker()

someError := errors.New("some error")
m := stats.Float64("Test_Worker_RecordFloat64/MF1", "desc MF1", "unit")
Expand Down Expand Up @@ -383,7 +383,7 @@ func TestReportUsage(t *testing.T) {
}

for _, tt := range tests {
restart()
stopAndClearDefaultWorker()
SetReportingPeriod(25 * time.Millisecond)

if err := Register(tt.view); err != nil {
Expand Down Expand Up @@ -436,7 +436,7 @@ func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) {
}

func TestWorkerStarttime(t *testing.T) {
restart()
stopAndClearDefaultWorker()

ctx := context.Background()
m := stats.Int64("measure/TestWorkerStarttime", "desc", "unit")
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestWorkerStarttime(t *testing.T) {
}

func TestUnregisterReportsUsage(t *testing.T) {
restart()
stopAndClearDefaultWorker()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestUnregisterReportsUsage(t *testing.T) {
}

func TestWorkerRace(t *testing.T) {
restart()
stopAndClearDefaultWorker()
ctx := context.Background()

m1 := stats.Int64("measure", "desc", "unit")
Expand Down Expand Up @@ -638,11 +638,13 @@ func (e *vdExporter) ExportView(vd *Data) {
e.vds = append(e.vds, vd)
}

// restart stops the current processors and creates a new one.
func restart() {
defaultWorker.Stop()
defaultWorker = NewMeter().(*worker)
go defaultWorker.start()
// stopAndClearDefaultWorker stops defaultWorker's processors, clears it, and
// resets its sync.Once so that it can be lazily initialized again.
func stopAndClearDefaultWorker() {
if defaultWorker != nil {
defaultWorker.Stop()
defaultWorkerInit = sync.Once{}
}
}

// byTag implements sort.Interface for *metricdata.TimeSeries by Labels.
Expand Down

0 comments on commit aa4db66

Please sign in to comment.