diff --git a/.chloggen/tailsamplingprocessor_hot-policy-loading.yaml b/.chloggen/tailsamplingprocessor_hot-policy-loading.yaml new file mode 100644 index 000000000000..2d6ab5d63a13 --- /dev/null +++ b/.chloggen/tailsamplingprocessor_hot-policy-loading.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: tailsamplingprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support hot sampling policy loading + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37014] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 02883fbc4778..e993212e85d7 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -45,6 +45,7 @@ type policy struct { type tailSamplingSpanProcessor struct { ctx context.Context + set processor.Settings telemetry *metadata.TelemetryBuilder logger *zap.Logger @@ -59,6 +60,9 @@ type tailSamplingSpanProcessor struct { nonSampledIDCache cache.Cache[bool] deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + + setPolicyMux sync.Mutex + pendingPolicy []PolicyCfg } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -108,6 +112,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume tsp := &tailSamplingSpanProcessor{ ctx: ctx, + set: set, telemetry: telemetry, nextConsumer: nextConsumer, maxNumTraces: cfg.NumTraces, @@ -128,31 +133,9 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume } if tsp.policies == nil { - policyNames := map[string]bool{} - tsp.policies = make([]*policy, len(cfg.PolicyCfgs)) - componentID := set.ID.Name() - for i := range cfg.PolicyCfgs { - policyCfg := &cfg.PolicyCfgs[i] - - if policyNames[policyCfg.Name] { - return nil, fmt.Errorf("duplicate policy name %q", policyCfg.Name) - } - policyNames[policyCfg.Name] = true - - eval, err := getPolicyEvaluator(telemetrySettings, policyCfg) - if err != nil { - return nil, err - } - uniquePolicyName := policyCfg.Name - if componentID != "" { - uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name) - } - p := &policy{ - name: policyCfg.Name, - evaluator: eval, - attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), - } - tsp.policies[i] = p + err := tsp.loadSamplingPolicy(cfg.PolicyCfgs) + if err != nil { + return nil, err } } @@ -262,7 +245,82 @@ type policyMetrics struct { idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64 } +func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error { + telemetrySettings := tsp.set.TelemetrySettings + componentID := tsp.set.ID.Name() + + policyNames := map[string]bool{} + tsp.policies = make([]*policy, len(cfgs)) + + for i := range cfgs { + policyCfg := &cfgs[i] + + if policyNames[policyCfg.Name] { + return fmt.Errorf("duplicate policy name %q", policyCfg.Name) + } + policyNames[policyCfg.Name] = true + + eval, err := getPolicyEvaluator(telemetrySettings, policyCfg) + if err != nil { + return err + } + uniquePolicyName := policyCfg.Name + if componentID != "" { + uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name) + } + p := &policy{ + name: policyCfg.Name, + evaluator: eval, + attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)), + } + tsp.policies[i] = p + } + + tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(tsp.policies))) + + return nil +} + +func (tsp *tailSamplingSpanProcessor) SetSamplingPolicy(cfgs []PolicyCfg) { + tsp.logger.Debug("Setting pending sampling policy", zap.Int("pending.len", len(cfgs))) + + tsp.setPolicyMux.Lock() + defer tsp.setPolicyMux.Unlock() + + tsp.pendingPolicy = cfgs +} + +func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() { + tsp.setPolicyMux.Lock() + defer tsp.setPolicyMux.Unlock() + + // Nothing pending, do nothing. + pLen := len(tsp.pendingPolicy) + if pLen == 0 { + return + } + + tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen)) + + // In case something goes wrong. + prev := tsp.policies + + err := tsp.loadSamplingPolicy(tsp.pendingPolicy) + + // Empty pending regardless of error. If policy is invalid, it will fail on + // every tick, no need to do extra work and flood the log with errors. + tsp.pendingPolicy = nil + + if err != nil { + tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err)) + tsp.logger.Debug("Falling back to previous sampling policy") + tsp.policies = prev + } +} + func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { + tsp.loadPendingSamplingPolicy() + metrics := policyMetrics{} startTime := time.Now() @@ -401,11 +459,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc } lenSpans := int64(len(spans)) - lenPolicies := len(tsp.policies) - initialDecisions := make([]sampling.Decision, lenPolicies) - for i := 0; i < lenPolicies; i++ { - initialDecisions[i] = sampling.Pending - } + d, loaded := tsp.idToTrace.Load(id) if !loaded { spanCount := &atomic.Int64{} diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 3249f64f153a..49e126260130 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -449,6 +449,93 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { } } +func TestSetSamplingPolicy(t *testing.T) { + cfg := Config{ + DecisionWait: defaultTestDecisionWait, + NumTraces: defaultNumTraces, + PolicyCfgs: []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + }, + } + s := setupTestTelemetry() + ct := s.NewSettings() + idb := newSyncIDBatcher() + msp := new(consumertest.TracesSink) + + p, err := newTracesProcessor(context.Background(), ct, msp, cfg, withDecisionBatcher(idb)) + require.NoError(t, err) + + require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + tsp := p.(*tailSamplingSpanProcessor) + + assert.Len(t, tsp.policies, 1) + + tsp.policyTicker.OnTick() + + assert.Len(t, tsp.policies, 1) + + cfgs := []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "everything", + Type: AlwaysSample, + }, + }, + } + tsp.SetSamplingPolicy(cfgs) + + assert.Len(t, tsp.policies, 1) + + tsp.policyTicker.OnTick() + + assert.Len(t, tsp.policies, 2) + + // Duplicate policy name. + cfgs = []PolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "always", + Type: AlwaysSample, + }, + }, + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "everything", + Type: AlwaysSample, + }, + }, + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "everything", + Type: AlwaysSample, + }, + }, + } + tsp.SetSamplingPolicy(cfgs) + + assert.Len(t, tsp.policies, 2) + + tsp.policyTicker.OnTick() + + // Should revert sampling policy. + assert.Len(t, tsp.policies, 2) +} + func TestSubSecondDecisionTime(t *testing.T) { // prepare msp := new(consumertest.TracesSink)