Skip to content

Commit

Permalink
live backfiller now takes a context to prevent goroutine leak
Browse files Browse the repository at this point in the history
  • Loading branch information
colindickson committed Jan 13, 2025
1 parent fcf6183 commit 36c2750
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
11 changes: 9 additions & 2 deletions service/live_back_filler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ type LiveBackFiller struct {
logger *zap.Logger
stageToProcess int
clientFactory client.InternalClientFactory

ctx context.Context
}

func NewLiveBackFiller(nextHandler bstream.Handler, logger *zap.Logger, stageToProcess int, segmentSize uint64, linearHandoff uint64, clientFactory client.InternalClientFactory, requestBackProcessing RequestBackProcessingFunc) *LiveBackFiller {
func NewLiveBackFiller(ctx context.Context, nextHandler bstream.Handler, logger *zap.Logger, stageToProcess int, segmentSize uint64, linearHandoff uint64, clientFactory client.InternalClientFactory, requestBackProcessing RequestBackProcessingFunc) *LiveBackFiller {
return &LiveBackFiller{
RequestBackProcessing: requestBackProcessing,
stageToProcess: stageToProcess,
Expand All @@ -43,6 +45,8 @@ func NewLiveBackFiller(nextHandler bstream.Handler, logger *zap.Logger, stageToP
segmentSize: segmentSize,
logger: logger,
clientFactory: clientFactory,

ctx: ctx,
}
}

Expand All @@ -52,7 +56,10 @@ func (l *LiveBackFiller) ProcessBlock(blk *pbbstream.Block, obj interface{}) (er
return l.NextHandler.ProcessBlock(blk, obj)
}

l.irreversibleBlock <- blk.Number
select {
case <-l.ctx.Done():
case l.irreversibleBlock <- blk.Number:
}

return l.NextHandler.ProcessBlock(blk, obj)
}
Expand Down
2 changes: 1 addition & 1 deletion service/live_back_filler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestBackFiller(t *testing.T) {
jobResult <- err
}

testLiveBackFiller := NewLiveBackFiller(testHandler, testLogger, c.stageToProcess, c.segmentSize, c.linearHandoff, nil, RequestBackProcessingTest)
testLiveBackFiller := NewLiveBackFiller(context.Background(), testHandler, testLogger, c.stageToProcess, c.segmentSize, c.linearHandoff, nil, RequestBackProcessingTest)

go testLiveBackFiller.Start(testContext)

Expand Down
4 changes: 2 additions & 2 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,12 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ

var streamHandler bstream.Handler
if requestDetails.ProductionMode {
liveBackFiller := NewLiveBackFiller(pipe, logger, execGraph.OutputModuleStageIndex(), segmentSize, requestDetails.LinearHandoffBlockNum, s.runtimeConfig.ClientFactory, RequestBackProcessing)
liveBackFiller := NewLiveBackFiller(ctx, pipe, logger, execGraph.OutputModuleStageIndex(), segmentSize, requestDetails.LinearHandoffBlockNum, s.runtimeConfig.ClientFactory, RequestBackProcessing)

// In noop mode, the pipe handler is overwritten by a NoopHandler which produces no outputs.
if request.NoopMode {
noopHandler := NewNoopHandler(respFunc)
liveBackFiller = NewLiveBackFiller(noopHandler, logger, execGraph.OutputModuleStageIndex(), segmentSize, requestDetails.LinearHandoffBlockNum, s.runtimeConfig.ClientFactory, RequestBackProcessing)
liveBackFiller = NewLiveBackFiller(ctx, noopHandler, logger, execGraph.OutputModuleStageIndex(), segmentSize, requestDetails.LinearHandoffBlockNum, s.runtimeConfig.ClientFactory, RequestBackProcessing)
}

go liveBackFiller.Start(ctx)
Expand Down

0 comments on commit 36c2750

Please sign in to comment.