Skip to content

Commit

Permalink
Merge branch 'jaegertracing:main' into fix-sampling-config
Browse files Browse the repository at this point in the history
  • Loading branch information
adityachopra29 authored Jan 7, 2025
2 parents bde692c + 08503ca commit d1a070a
Show file tree
Hide file tree
Showing 58 changed files with 1,311 additions and 415 deletions.
69 changes: 38 additions & 31 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
yaml "gopkg.in/yaml.v3"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/discovery"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
Expand Down Expand Up @@ -201,7 +200,7 @@ func TestProxyClientTLS(t *testing.T) {
tests := []struct {
name string
clientTLS *configtls.ClientConfig
serverTLS tlscfg.Options
serverTLS configtls.ServerConfig
expectError bool
}{
{
Expand All @@ -215,10 +214,11 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client to untrusted TLS server",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
serverTLS: configtls.ServerConfig{
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
ServerName: "example.com",
Expand All @@ -227,10 +227,11 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client to trusted TLS server with incorrect hostname",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
serverTLS: configtls.ServerConfig{
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -241,10 +242,11 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should pass with TLS client to trusted TLS server with correct hostname",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
serverTLS: configtls.ServerConfig{
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -256,11 +258,12 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
serverTLS: configtls.ServerConfig{
ClientCAFile: testCertKeyLocation + "/example-CA-cert.pem",
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -272,11 +275,12 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert from a different CA",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/wrong-CA-cert.pem", // NB: wrong CA
serverTLS: configtls.ServerConfig{
ClientCAFile: testCertKeyLocation + "/wrong-CA-cert.pem", // NB: wrong CA
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -290,11 +294,12 @@ func TestProxyClientTLS(t *testing.T) {
},
{
name: "should pass with TLS client with cert to trusted TLS server requiring cert",
serverTLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
serverTLS: configtls.ServerConfig{
ClientCAFile: testCertKeyLocation + "/example-CA-cert.pem",
Config: configtls.Config{
CertFile: testCertKeyLocation + "/example-server-cert.pem",
KeyFile: testCertKeyLocation + "/example-server-key.pem",
},
},
clientTLS: &configtls.ClientConfig{
Config: configtls.Config{
Expand All @@ -314,11 +319,13 @@ func TestProxyClientTLS(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var opts []grpc.ServerOption
if test.serverTLS.Enabled {
tlsCfg, err := test.serverTLS.ToOtelServerConfig().LoadTLSConfig(ctx)

if test.serverTLS.CertFile != "" && test.serverTLS.KeyFile != "" {
tlsCfg, err := test.serverTLS.LoadTLSConfig(ctx)
require.NoError(t, err)
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
}

spanHandler := &mockSpanHandler{}
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, spanHandler)
Expand Down
7 changes: 3 additions & 4 deletions cmd/agent/app/reporter/grpc/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ func (b *ConnBuilder) InitFromViper(v *viper.Viper) (*ConnBuilder, error) {
b.CollectorHostPorts = strings.Split(hostPorts, ",")
}
b.MaxRetry = v.GetUint(retryFlag)
tls, err := tlsFlagsConfig.InitFromViper(v)
tlsCfg, err := tlsFlagsConfig.InitFromViper(v)
if err != nil {
return b, fmt.Errorf("failed to process TLS options: %w", err)
}
if tls.Enabled {
tlsConf := tls.ToOtelClientConfig()
b.TLS = &tlsConf
if !tlsCfg.Insecure {
b.TLS = &tlsCfg
}
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
return b, nil
Expand Down
6 changes: 5 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
})
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
spanProcessor, err := handlerBuilder.BuildSpanProcessor(additionalProcessors...)
if err != nil {
return fmt.Errorf("could not create span processor: %w", err)
}
c.spanProcessor = spanProcessor
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)
grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Handler: c.spanHandlers.GRPCHandler,
Expand Down
7 changes: 6 additions & 1 deletion cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,12 @@ func TestAggregator(t *testing.T) {
},
},
}
_, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
_, err := c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: spans,
Details: processor.Details{
SpanFormat: processor.JaegerSpanFormat,
},
})
require.NoError(t, err)
require.NoError(t, c.Close())

Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ func addGRPCFlags(flagSet *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort
}

func initHTTPFromViper(v *viper.Viper, opts *confighttp.ServerConfig, cfg serverFlagsConfig) error {
tlsOpts, err := cfg.tls.InitFromViper(v)
tlsHTTPCfg, err := cfg.tls.InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse HTTP TLS options: %w", err)
}
opts.TLSSetting = tlsOpts.ToOtelServerConfig()
opts.TLSSetting = tlsHTTPCfg
opts.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.IdleTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPIdleTimeout)
opts.ReadTimeout = v.GetDuration(cfg.prefix + "." + flagSuffixHTTPReadTimeout)
Expand All @@ -208,11 +208,11 @@ func initHTTPFromViper(v *viper.Viper, opts *confighttp.ServerConfig, cfg server
}

func initGRPCFromViper(v *viper.Viper, opts *configgrpc.ServerConfig, cfg serverFlagsConfig) error {
tlsOpts, err := cfg.tls.InitFromViper(v)
tlsGRPCCfg, err := cfg.tls.InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse GRPC TLS options: %w", err)
}
opts.TLSSetting = tlsOpts.ToOtelServerConfig()
opts.TLSSetting = tlsGRPCCfg
opts.NetAddr.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.MaxRecvMsgSizeMiB = v.GetInt(cfg.prefix+"."+flagSuffixGRPCMaxReceiveMessageLength) / (1024 * 1024)
opts.Keepalive = &configgrpc.KeepaliveServerConfig{
Expand Down
15 changes: 9 additions & 6 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
spanOptions processor.Details // common settings for all spans
tenancyMgr *tenancy.Manager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.Manager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
spanOptions: processor.Details{
InboundTransport: transport,
SpanFormat: spanFormat,
},
Expand All @@ -75,10 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
span.Process = batch.Process
}
}
_, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
_, err = c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
},
})
if err != nil {
if errors.Is(err, processor.ErrBusy) {
Expand Down
17 changes: 11 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -33,17 +34,21 @@ type mockSpanProcessor struct {
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
oks := make([]bool, len(spans))
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
p.tenants = make(map[string]bool)
}
p.tenants[opts.Tenant] = true
p.transport = opts.InboundTransport
p.spanFormat = opts.SpanFormat
p.tenants[batch.GetTenant()] = true
p.transport = batch.GetInboundTransport()
p.spanFormat = batch.GetSpanFormat()
return oks, p.expectedError
}

Expand Down
18 changes: 12 additions & 6 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
},
})
if err != nil {
jbh.logger.Error("Collector failed to process span batch", zap.Error(err))
Expand Down Expand Up @@ -105,9 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
},
})
if err != nil {
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err))
Expand Down
14 changes: 11 additions & 3 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand Down Expand Up @@ -58,12 +59,19 @@ type shouldIErrorProcessor struct {

var errTestError = errors.New("Whoops")

func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.SpansOptions) ([]bool, error) {
func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
if s.shouldError {
return nil, errTestError
}
retMe := make([]bool, len(mSpans))
for i := range mSpans {
var spans []*model.Span
batch.GetSpans(func(sp []*model.Span) {
spans = sp
}, func(_ ptrace.Traces) {
panic("not implemented")
})

retMe := make([]bool, len(spans))
for i := range spans {
retMe[i] = true
}
return retMe, nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/model_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
package app

import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
)

// ProcessSpan processes a Domain Model Span
type ProcessSpan func(span *model.Span, tenant string)

// ProcessSpans processes a batch of Domain Model Spans
type ProcessSpans func(spans []*model.Span, tenant string)
type ProcessSpans func(spans processor.Batch)

// FilterSpan decides whether to allow or disallow a span
type FilterSpan func(span *model.Span) bool
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options {
ret.hostMetrics = metrics.NullFactory
}
if ret.preProcessSpans == nil {
ret.preProcessSpans = func(_ []*model.Span, _ /* tenant */ string) {}
ret.preProcessSpans = func(_ processor.Batch) {}
}
if ret.sanitizer == nil {
ret.sanitizer = func(span *model.Span) *model.Span { return span }
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) {
Options.ServiceMetrics(metrics.NullFactory),
Options.Logger(zap.NewNop()),
Options.NumWorkers(5),
Options.PreProcessSpans(func(_ []*model.Span, _ /* tenant */ string) {}),
Options.PreProcessSpans(func(_ processor.Batch) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Expand All @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) {
assert.Nil(t, opts.collectorTags)
assert.False(t, opts.reportBusy)
assert.False(t, opts.blockingSubmit)
assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") })
assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) })
assert.NotPanics(t, func() { opts.preSave(nil, "") })
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
Expand Down
Loading

0 comments on commit d1a070a

Please sign in to comment.