Skip to content
This repository has been archived by the owner on Aug 17, 2020. It is now read-only.

Recorder concurrency support using channels. #199

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type (
recorder *SpanRecorder
recorderFilename string
flushFrequency time.Duration
concurrencyLevel int

optionalRecorders []tracer.SpanRecorder

Expand All @@ -64,8 +65,8 @@ type (
var (
version = "0.1.16-pre2"

testingModeFrequency = time.Second
nonTestingModeFrequency = time.Minute
testingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequencyInTestMode.Value) * time.Millisecond
nonTestingModeFrequency = time.Duration(env.ScopeTracerDispatcherHealthcheckFrequency.Value) * time.Millisecond
)

func WithApiKey(apiKey string) Option {
Expand Down Expand Up @@ -201,6 +202,7 @@ func NewAgent(options ...Option) (*Agent, error) {
agent.userAgent = fmt.Sprintf("scope-agent-go/%s", agent.version)
agent.panicAsFail = false
agent.failRetriesCount = 0
agent.concurrencyLevel = env.ScopeTracerDispatcherConcurrencyLevel.Value

for _, opt := range options {
opt(agent)
Expand Down
145 changes: 113 additions & 32 deletions agent/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type (
logger *log.Logger
stats *RecorderStats
statsOnce sync.Once

concurrencyLevel int
workerJobs chan *workerJob
workerResults chan *workerResult
}
RecorderStats struct {
totalSpans int64
Expand All @@ -66,6 +70,18 @@ type (

PayloadSpan map[string]interface{}
PayloadEvent map[string]interface{}

workerJob struct {
spans []PayloadSpan
totalSpans int
events []PayloadEvent
totalEvents int
}
workerResult struct {
workerId int
error error
shouldExit bool
}
)

func NewSpanRecorder(agent *Agent) *SpanRecorder {
Expand All @@ -79,9 +95,19 @@ func NewSpanRecorder(agent *Agent) *SpanRecorder {
r.metadata = agent.metadata
r.logger = agent.logger
r.flushFrequency = agent.flushFrequency
r.concurrencyLevel = agent.concurrencyLevel
r.url = agent.getUrl("api/agent/ingest")
r.client = &http.Client{}
r.stats = &RecorderStats{}
r.logger.Printf("recorder frequency: %v", agent.flushFrequency)
r.logger.Printf("recorder concurrency level: %v", agent.concurrencyLevel)

// start workers
r.workerJobs = make(chan *workerJob, r.concurrencyLevel)
r.workerResults = make(chan *workerResult, r.concurrencyLevel)
for i := 0; i < r.concurrencyLevel; i++ {
go r.worker(i + 1)
}
r.t.Go(r.loop)
return r
}
Expand Down Expand Up @@ -144,51 +170,104 @@ func (r *SpanRecorder) sendSpans() (error, bool) {
atomic.AddInt64(&r.stats.sendSpansCalls, 1)
const batchSize = 1000
var lastError error
var jobs int
for {
spans, spMore, spTotal := r.popPayloadSpan(batchSize)
events, evMore, evTotal := r.popPayloadEvents(batchSize)

payload := map[string]interface{}{
"metadata": r.metadata,
"spans": spans,
"events": events,
tags.AgentID: r.agentId,
r.workerJobs <- &workerJob{
spans: spans,
totalSpans: spTotal,
events: events,
totalEvents: evTotal,
}
buf, err := encodePayload(payload)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans)))
return err, false
}

var testSpans int64
for _, span := range spans {
if isTestSpan(span) {
testSpans++
jobs++

if len(r.workerResults) > 0 {
// We check if a previous result to check if we need to cancel all
result := <-r.workerResults
lastError = result.error
jobs--
if result.shouldExit {
r.logger.Printf("worker %d: received a should exit response", result.workerId)
for i := 0; i < jobs; i++ {
<-r.workerResults
}
return result.error, result.shouldExit
}
}

r.logger.Printf("sending %d/%d spans with %d/%d events", len(spans), spTotal, len(events), evTotal)
statusCode, err := r.callIngest(buf)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(spans)))
atomic.AddInt64(&r.stats.testSpansNotSent, testSpans)
} else {
atomic.AddInt64(&r.stats.sendSpansOk, 1)
atomic.AddInt64(&r.stats.spansSent, int64(len(spans)))
atomic.AddInt64(&r.stats.testSpansSent, testSpans)
if !spMore && !evMore {
break
}
if statusCode == 401 {
return err, true
}
shouldExit := false
for i := 0; i < jobs; i++ {
result := <-r.workerResults
lastError = result.error
if result.shouldExit {
r.logger.Printf("worker %d: received a should exit response", result.workerId)
shouldExit = true
}
lastError = err
}
return lastError, shouldExit
}

if !spMore && !evMore {
break
func (r *SpanRecorder) worker(id int) {
for {
select {
case j, ok := <-r.workerJobs:
if !ok {
if r.debugMode {
r.logger.Printf("exiting from worker: %d", id)
}
return
}

payload := map[string]interface{}{
"metadata": r.metadata,
"spans": j.spans,
"events": j.events,
tags.AgentID: r.agentId,
}

buf, err := encodePayload(payload)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans)))
r.workerResults <- &workerResult{
workerId: id,
error: err,
shouldExit: false,
}
continue
}

var testSpans int64
for _, span := range j.spans {
if isTestSpan(span) {
testSpans++
}
}

r.logger.Printf("worker %d: sending %d/%d spans with %d/%d events", id, len(j.spans), j.totalSpans, len(j.events), j.totalEvents)
statusCode, err := r.callIngest(buf)
if err != nil {
atomic.AddInt64(&r.stats.sendSpansKo, 1)
atomic.AddInt64(&r.stats.spansNotSent, int64(len(j.spans)))
atomic.AddInt64(&r.stats.testSpansNotSent, testSpans)
} else {
atomic.AddInt64(&r.stats.sendSpansOk, 1)
atomic.AddInt64(&r.stats.spansSent, int64(len(j.spans)))
atomic.AddInt64(&r.stats.testSpansSent, testSpans)
}
r.workerResults <- &workerResult{
workerId: id,
error: err,
shouldExit: statusCode == 401,
}
}
}
return lastError, false
}

// Stop recorder
Expand All @@ -198,6 +277,8 @@ func (r *SpanRecorder) Stop() {
}
r.t.Kill(nil)
_ = r.t.Wait()
close(r.workerJobs)
close(r.workerResults)
if r.debugMode {
r.writeStats()
}
Expand Down
43 changes: 23 additions & 20 deletions env/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@ package env
import "go.undefinedlabs.com/scopeagent/tags"

var (
ScopeDsn = newStringEnvVar("", "SCOPE_DSN")
ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY")
ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT")
ScopeService = newStringEnvVar("default", "SCOPE_SERVICE")
ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY")
ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA")
ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH")
ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT")
ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH")
ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG")
ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER")
ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE")
ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES")
ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL")
ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION")
ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA")
ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS")
ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE")
ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES")
ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE")
ScopeDsn = newStringEnvVar("", "SCOPE_DSN")
ScopeApiKey = newStringEnvVar("", "SCOPE_APIKEY")
ScopeApiEndpoint = newStringEnvVar("https://app.scope.dev", "SCOPE_API_ENDPOINT")
ScopeService = newStringEnvVar("default", "SCOPE_SERVICE")
ScopeRepository = newStringEnvVar("", "SCOPE_REPOSITORY")
ScopeCommitSha = newStringEnvVar("", "SCOPE_COMMIT_SHA")
ScopeBranch = newStringEnvVar("", "SCOPE_BRANCH")
ScopeSourceRoot = newStringEnvVar("", "SCOPE_SOURCE_ROOT")
ScopeLoggerRoot = newStringEnvVar("", "SCOPE_LOGGER_ROOT", "SCOPE_LOG_ROOT_PATH")
ScopeDebug = newBooleanEnvVar(false, "SCOPE_DEBUG")
ScopeTracerGlobal = newBooleanEnvVar(false, "SCOPE_TRACER_GLOBAL", "SCOPE_SET_GLOBAL_TRACER")
ScopeTestingMode = newBooleanEnvVar(false, "SCOPE_TESTING_MODE")
ScopeTestingFailRetries = newIntEnvVar(0, "SCOPE_TESTING_FAIL_RETRIES")
ScopeTestingPanicAsFail = newBooleanEnvVar(false, "SCOPE_TESTING_PANIC_AS_FAIL")
ScopeConfiguration = newSliceEnvVar([]string{tags.PlatformName, tags.PlatformArchitecture, tags.GoVersion}, "SCOPE_CONFIGURATION")
ScopeMetadata = newMapEnvVar(nil, "SCOPE_METADATA")
ScopeInstrumentationHttpPayloads = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_PAYLOADS")
ScopeInstrumentationHttpStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_HTTP_STACKTRACE")
ScopeInstrumentationDbStatementValues = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STATEMENT_VALUES")
ScopeInstrumentationDbStacktrace = newBooleanEnvVar(false, "SCOPE_INSTRUMENTATION_DB_STACKTRACE")
ScopeTracerDispatcherHealthcheckFrequency = newIntEnvVar(60000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY")
ScopeTracerDispatcherHealthcheckFrequencyInTestMode = newIntEnvVar(1000, "SCOPE_TRACER_DISPATCHER_HEALTHCHECK_FRECUENCY_IN_TESTMODE")
ScopeTracerDispatcherConcurrencyLevel = newIntEnvVar(5, "SCOPE_TRACER_DISPATCHER_CONCURRENCY_LEVEL")
)