Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/global worker pool #83

Merged
merged 17 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:

- uses: actions/setup-go@v5
with:
go-version: '1.22.x'
go-version: '1.23.x'

- name: Log in to the Container registry
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
.envrc
.env
.DS_Store
firehose-data*
firehose-data*
/.fleet/settings.json
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22-alpine as build
FROM golang:1.23.4-alpine as build
WORKDIR /app

COPY go.mod go.sum ./
Expand Down
34 changes: 34 additions & 0 deletions cmd/apps/substreams_tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ import (
"github.com/spf13/viper"
"github.com/streamingfast/cli"
"github.com/streamingfast/dauth"
"github.com/streamingfast/dgrpc"
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/logging"
app "github.com/streamingfast/substreams/app"
"github.com/streamingfast/substreams/client"
"github.com/streamingfast/substreams/orchestrator/work"
"github.com/streamingfast/substreams/wasm"
pbworker "github.com/streamingfast/worker-pool-protocol/pb/sf/worker/v1"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -68,6 +72,8 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
This is useful to prevent the tier1 from being overwhelmed by too many requests, most client auto-reconnects on 'Unavailable' code
so they should end up on another tier1 instance, assuming you have proper auto-scaling of the number of instances available.
`))
cmd.Flags().String("substreams-tier1-global-worker-pool-address", "", "Address of the global worker pool to use for the substreams tier1. (disabled if empty)")
cmd.Flags().Duration("substreams-tier1-global-worker-pool-keep-alive-delay", 25*time.Second, "Delay between two keep alive call to the global worker pool. Default is 25s")
// all substreams
registerCommonSubstreamsFlags(cmd)
return nil
Expand Down Expand Up @@ -127,6 +133,8 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
if err != nil {
return nil, fmt.Errorf("getting temporary directory: %w", err)
}
substreamsGlobalWorkerPoolAddress := viper.GetString("substreams-tier1-global-worker-pool-address")
substreamsGlobalWorkerPoolKeepAliveDelay := viper.GetDuration("substreams-tier1-global-worker-pool-keep-alive-delay")

config := app.NewDefaultTier1Config()
config.MeteringConfig = GetCommonMeteringPluginValue()
Expand All @@ -153,13 +161,39 @@ func RegisterSubstreamsTier1App[B firecore.Block](chain *firecore.Chain[B], root
config.GRPCShutdownGracePeriod = time.Second
config.ServiceDiscoveryURL = serviceDiscoveryURL

subRequestsClientConfig := client.NewSubstreamsClientConfig(
config.SubrequestsEndpoint,
"",
client.None,
config.SubrequestsInsecure,
config.SubrequestsPlaintext,
"substreams_tier1",
)

clientFactory := client.NewInternalClientFactory(subRequestsClientConfig)
workerPoolFactory := work.NewSimpleWorkerPoolFactory(clientFactory).WorkerPool

if substreamsGlobalWorkerPoolAddress != "" {
grpcClientConnection, err := dgrpc.NewInternalNoWaitClientConn(substreamsGlobalWorkerPoolAddress)
if err != nil {
return nil, fmt.Errorf("unable to create grpc client connection to global worker pool: %w", err)
}
workerPoolClient := pbworker.NewWorkerPoolClient(grpcClientConnection)
workerPoolFactory = work.NewGlobalWorkerPoolFactory(
workerPoolClient,
clientFactory,
substreamsGlobalWorkerPoolKeepAliveDelay,
).WorkerPool
}

return app.NewTier1(appLogger,
config, &app.Tier1Modules{
Authenticator: authenticator,
HeadTimeDriftMetric: ss1HeadTimeDriftmetric,
HeadBlockNumberMetric: ss1HeadBlockNumMetric,
CheckPendingShutDown: runtime.IsPendingShutdown,
InfoServer: runtime.InfoServer,
WorkerPoolFactory: workerPoolFactory,
}), nil
},
})
Expand Down
16 changes: 15 additions & 1 deletion cmd/apps/substreams_tier2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dgrpc"
discoveryservice "github.com/streamingfast/dgrpc/server/discovery-service"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
"github.com/streamingfast/logging"
"github.com/streamingfast/substreams/app"
"github.com/streamingfast/substreams/wasm"
pbworker "github.com/streamingfast/worker-pool-protocol/pb/sf/worker/v1"
"go.uber.org/zap"
)

Expand All @@ -44,7 +46,7 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root
cmd.Flags().String("substreams-tier2-grpc-listen-addr", firecore.SubstreamsTier2GRPCServingAddr, "Address on which the substreams tier2 will listen. Default is plain-text, appending a '*' to the end to jkkkj")
cmd.Flags().String("substreams-tier2-discovery-service-url", "", "URL to advertise presence to the grpc discovery service") //traffic-director://xds?vpc_network=vpc-global&use_xds_reds=true
cmd.Flags().Uint64("substreams-tier2-max-concurrent-requests", 0, "Maximum number of concurrent requests allowed on the server. When the tier2 service hits this limit, it will set itself as 'Not Ready' until requests are processed. Default 0 (no limit)")

cmd.Flags().String("substreams-tier2-global-worker-pool-address", "", "Address of the global worker pool to use for the substreams tier1. (disabled if empty)")
// all substreams
registerCommonSubstreamsFlags(cmd)
return nil
Expand All @@ -56,6 +58,7 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root

maximumConcurrentRequests := viper.GetUint64("substreams-tier2-max-concurrent-requests")
executionTimeout := viper.GetDuration("substreams-block-execution-timeout")
substreamsGlobalWorkerPoolAddress := viper.GetString("substreams-tier2-global-worker-pool-address")

tracing := os.Getenv("SUBSTREAMS_TRACING") == "modules_exec"

Expand Down Expand Up @@ -87,6 +90,16 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root
return nil, fmt.Errorf("getting temporary directory: %w", err)
}

var remoteWorkerPoolClient pbworker.WorkerPoolClient
if substreamsGlobalWorkerPoolAddress != "" {
grpcClientConnection, err := dgrpc.NewInternalClientConn(substreamsGlobalWorkerPoolAddress)
if err != nil {
return nil, fmt.Errorf("unable to create grpc client connection to global worker pool: %w", err)
}
remoteWorkerPoolClient = pbworker.NewWorkerPoolClient(grpcClientConnection)

}

return app.NewTier2(appLogger,
&app.Tier2Config{
Tracing: tracing,
Expand All @@ -100,6 +113,7 @@ func RegisterSubstreamsTier2App[B firecore.Block](chain *firecore.Chain[B], root
MaximumConcurrentRequests: maximumConcurrentRequests,
}, &app.Tier2Modules{
CheckPendingShutDown: runtime.IsPendingShutdown,
RemoteWorkerClient: remoteWorkerPoolClient,
}), nil
},
})
Expand Down
9 changes: 8 additions & 1 deletion devel/substreams/substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ start:
- substreams-tier2
flags:
common-live-blocks-addr:
common-auth-plugin: trust://
common-merged-blocks-store-url: "$COMMON_MERGED_BLOCKS_STORE_URL"
substreams-tier1-block-type: "sf.ethereum.type.v2.Block"
# Also set FIRECORE_COMMON_FIRST_STREAMABLE_BLOCK to your local value, if not starting at 0
ignore-advertise-validation: true
substreams-tier1-grpc-listen-addr: :9000
substreams-tier1-grpc-listen-addr: :9000*
substreams-tier1-subrequests-insecure: false
substreams-tier1-subrequests-plaintext: true
substreams-tier1-subrequests-endpoint: :9001

substreams-tier1-global-worker-pool-address: :9002
substreams-tier1-global-worker-pool-keep-alive-delay: 25s

substreams-tier2-grpc-listen-addr: :9001
substreams-tier2-global-worker-pool-address: :9002
18 changes: 10 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/streamingfast/firehose-core

go 1.22.7
go 1.23.4

require (
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.16.1-20240117202343-bf8f65e8876c.1
Expand Down Expand Up @@ -31,7 +31,8 @@ require (
github.com/streamingfast/payment-gateway v0.0.0-20240426151444-581e930c76e2
github.com/streamingfast/pbgo v0.0.6-0.20250114182320-0b43084f4000
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0
github.com/streamingfast/substreams v1.12.5-0.20250206200955-a523a786b3ef
github.com/streamingfast/substreams v1.12.5-0.20250218151206-f976e8686cfe
github.com/streamingfast/worker-pool-protocol v0.0.0-20250211140743-fb8ffbc05fbc
github.com/stretchr/testify v1.10.0
github.com/test-go/testify v1.1.4
go.uber.org/multierr v1.10.0
Expand All @@ -51,7 +52,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.49.0 // indirect
github.com/alecthomas/participle v0.7.1 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/bobg/go-generics/v3 v3.4.0 // indirect
github.com/bobg/go-generics/v3 v3.5.0 // indirect
github.com/bufbuild/protocompile v0.4.0 // indirect
github.com/charmbracelet/lipgloss v1.0.0 // indirect
github.com/charmbracelet/x/ansi v0.4.2 // indirect
Expand All @@ -69,6 +70,7 @@ require (
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
go.uber.org/mock v0.5.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250124145028-65684f501c47 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250124145028-65684f501c47 // indirect
)
Expand All @@ -87,14 +89,14 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.25.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.26.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.50.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator v0.0.0-20221018185641-36f91511cfd7 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator v0.39.0 // indirect
github.com/KimMachineGun/automemlimit v0.2.4
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/RoaringBitmap/roaring v1.9.1 // indirect
github.com/ShinyTrinkets/meta-logger v0.2.0 // indirect
github.com/abourget/llerrgroup v0.2.0
github.com/aws/aws-sdk-go v1.44.325 // indirect
github.com/aws/aws-sdk-go v1.49.6 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect
Expand Down Expand Up @@ -186,16 +188,16 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/automaxprocs v1.5.1
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/net v0.34.0
golang.org/x/oauth2 v0.25.0
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.9.0 // indirect
google.golang.org/api v0.218.0 // indirect
google.golang.org/genproto v0.0.0-20250122153221-138b5a5a4fd4 // indirect
google.golang.org/api v0.219.0 // indirect
google.golang.org/genproto v0.0.0-20250106144421-5f5ef82da422 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/olivere/elastic.v3 v3.0.75
Expand Down
Loading
Loading