From 9c0dc8052746bf32a3098a849a1fd79bca31e583 Mon Sep 17 00:00:00 2001 From: Sean Porter Date: Tue, 7 Jan 2025 00:49:08 -0800 Subject: [PATCH] [processor/tailsampling] Support hot sampling policy loading (#37014) #### Description Adding a feature. This pull-request adds support for hot sampling policy loading to the tail sampling processor. This allows the collector (or another service using the processor) to dynamically update tail sampling policy without needing to restart the processor (or the entire collector). This greatly minimizes the impact of sampling policy modifications on pipeline availability and processing. Changes to policy are safely applied on the next tick loop. A collector (and/or other service) could use OpAMP to remotely manage sampling policy with little to no negative impact on pipeline availability and performance. This is what the https://tailctrl.io/ agent did. #### Usage Currently need to define a custom interface in order to set sampling policy. ``` go type SamplingProcessor interface { processor.Traces SetSamplingPolicy(cfgs []tailsamplingprocessor.PolicyCfg) } factory := tailsamplingprocessor.NewFactory() tsp, _ := factory.CreateTraces() sp = tsp.(SamplingProcessor) sp.SetSamplingPolicy(cfgs) ``` #### Testing Added a test to ensure changes to policy are loaded. Using the changes in a private project. --------- Signed-off-by: Sean Porter Co-authored-by: Matthew Wear --- ...lsamplingprocessor_hot-policy-loading.yaml | 27 +++++ processor/tailsamplingprocessor/processor.go | 114 +++++++++++++----- .../tailsamplingprocessor/processor_test.go | 87 +++++++++++++ 3 files changed, 198 insertions(+), 30 deletions(-) create mode 100644 .chloggen/tailsamplingprocessor_hot-policy-loading.yaml diff --git a/.chloggen/tailsamplingprocessor_hot-policy-loading.yaml b/.chloggen/tailsamplingprocessor_hot-policy-loading.yaml new file mode 100644 index 000000000000..2d6ab5d63a13 --- /dev/null +++ b/.chloggen/tailsamplingprocessor_hot-policy-loading.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: tailsamplingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support hot sampling policy loading + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37014] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 02883fbc4778..e993212e85d7 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -45,6 +45,7 @@ type policy struct { type tailSamplingSpanProcessor struct { ctx context.Context + set processor.Settings telemetry *metadata.TelemetryBuilder logger *zap.Logger @@ -59,6 +60,9 @@ type tailSamplingSpanProcessor struct { nonSampledIDCache cache.Cache[bool] deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + + setPolicyMux sync.Mutex + pendingPolicy []PolicyCfg } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -108,6 +112,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume tsp := &tailSamplingSpanProcessor{ ctx: ctx, + set: set, telemetry: telemetry, nextConsumer: nextConsumer, maxNumTraces: cfg.NumTraces, @@ -128,31 +133,9 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume } if tsp.policies == nil { - policyNames := map[string]bool{} - tsp.policies = make([]*policy, len(cfg.PolicyCfgs)) - componentID := set.ID.Name() - for i := range cfg.PolicyCfgs { - policyCfg := &cfg.PolicyCfgs[i] - - if policyNames[policyCfg.Name] { - return nil, fmt.Errorf("duplicate policy name %q", policyCfg.Name) - } - policyNames[policyCfg.Name] = true - - eval, err := getPolicyEvaluator(telemetrySettings, policyCfg) - if err != nil { - return nil, err - } - uniquePolicyName := policyCfg.Name - if componentID != "" { - uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name) - } - p := &policy{ - name: policyCfg.Name, - evaluator: eval, - attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), - } - tsp.policies[i] = p + err := tsp.loadSamplingPolicy(cfg.PolicyCfgs) + if err != nil { + return nil, err } } @@ -262,7 +245,82 @@ type policyMetrics struct { idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64 } +func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error { + telemetrySettings := tsp.set.TelemetrySettings + componentID := tsp.set.ID.Name() + + policyNames := map[string]bool{} + tsp.policies = make([]*policy, len(cfgs)) + + for i := range cfgs { + policyCfg := &cfgs[i] + + if policyNames[policyCfg.Name] { + return fmt.Errorf("duplicate policy name %q", policyCfg.Name) + } + policyNames[policyCfg.Name] = true + + eval, err := getPolicyEvaluator(telemetrySettings, policyCfg) + if err != nil { + return err + } + uniquePolicyName := policyCfg.Name + if componentID != "" { + uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name) + } + p := &policy{ + name: policyCfg.Name, + evaluator: eval, + attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), + } + tsp.policies[i] = p + } + + tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(tsp.policies))) + + return nil +} + +func (tsp *tailSamplingSpanProcessor) SetSamplingPolicy(cfgs []PolicyCfg) { + tsp.logger.Debug("Setting pending sampling policy", zap.Int("pending.len", len(cfgs))) + + tsp.setPolicyMux.Lock() + defer tsp.setPolicyMux.Unlock() + + tsp.pendingPolicy = cfgs +} + +func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() { + tsp.setPolicyMux.Lock() + defer tsp.setPolicyMux.Unlock() + + // Nothing pending, do nothing. + pLen := len(tsp.pendingPolicy) + if pLen == 0 { + return + } + + tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen)) + + // In case something goes wrong. + prev := tsp.policies + + err := tsp.loadSamplingPolicy(tsp.pendingPolicy) + + // Empty pending regardless of error. If policy is invalid, it will fail on + // every tick, no need to do extra work and flood the log with errors. + tsp.pendingPolicy = nil + + if err != nil { + tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err)) + tsp.logger.Debug("Falling back to previous sampling policy") + tsp.policies = prev + } +} + func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { + tsp.loadPendingSamplingPolicy() + metrics := policyMetrics{} startTime := time.Now() @@ -401,11 +459,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc } lenSpans := int64(len(spans)) - lenPolicies := len(tsp.policies) - initialDecisions := make([]sampling.Decision, lenPolicies) - for i := 0; i < lenPolicies; i++ { - initialDecisions[i] = sampling.Pending - } + d, loaded := tsp.idToTrace.Load(id) if !loaded { spanCount := &atomic.Int64{} diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 3249f64f153a..49e126260130 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -449,6 +449,93 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { } } +func TestSetSamplingPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + PolicyCfgs: []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + }, + } + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + msp := new(consumertest.TracesSink) + + p, err := newTracesProcessor(context.Background(), ct, msp, cfg, withDecisionBatcher(idb)) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + tsp := p.(*tailSamplingSpanProcessor) + + assert.Len(t, tsp.policies, 1) + + tsp.policyTicker.OnTick() + + assert.Len(t, tsp.policies, 1) + + cfgs := []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "everything", + Type: AlwaysSample, + }, + }, + } + tsp.SetSamplingPolicy(cfgs) + + assert.Len(t, tsp.policies, 1) + + tsp.policyTicker.OnTick() + + assert.Len(t, tsp.policies, 2) + + // Duplicate policy name. + cfgs = []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "everything", + Type: AlwaysSample, + }, + }, + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "everything", + Type: AlwaysSample, + }, + }, + } + tsp.SetSamplingPolicy(cfgs) + + assert.Len(t, tsp.policies, 2) + + tsp.policyTicker.OnTick() + + // Should revert sampling policy. + assert.Len(t, tsp.policies, 2) +} + func TestSubSecondDecisionTime(t *testing.T) { // prepare msp := new(consumertest.TracesSink)