Skip to content

Commit

Permalink
parameterizing the fetcher client limit on the number of items to fet…
Browse files Browse the repository at this point in the history
…ch at once, setting default value to 200
  • Loading branch information
Eduard-Voiculescu committed Jan 23, 2025
1 parent 8f7f7c2 commit 36ce982
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
5 changes: 4 additions & 1 deletion cmd/firestellar/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd.Flags().Duration("latest-block-retry-interval", time.Second, "interval between fetch")
cmd.Flags().Duration("max-block-fetch-duration", 3*time.Second, "maximum delay before considering a block fetch as failed")
cmd.Flags().Int("block-fetch-batch-size", 1, "Number of blocks to fetch in a single batch")
cmd.Flags().Int("client-limit", 200, "Limit for clients to fetch transactions")

return cmd
}
Expand Down Expand Up @@ -63,8 +64,10 @@ func fetchRunE(logger *zap.Logger, _ logging.Tracer) firecore.CommandExecutor {
rpcClients.Add(client)
}

clientLimit := sflags.MustGetInt(cmd, "client-limit")

poller := blockpoller.New(
rpc.NewFetcher(fetchInterval, latestBlockRetryInterval, logger),
rpc.NewFetcher(fetchInterval, latestBlockRetryInterval, clientLimit, logger),
blockpoller.NewFireBlockHandler("type.googleapis.com/sf.stellar.type.v1.Block"),
rpcClients,
blockpoller.WithStoringState[*rpc.Client](stateDir),
Expand Down
8 changes: 5 additions & 3 deletions rpc/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ type Fetcher struct {
latestBlockRetryInterval time.Duration
lastBlockInfo *LastBlockInfo
decoder *decoder.Decoder
clientLimit int

logger *zap.Logger
}

func NewFetcher(fetchInterval, latestBlockRetryInterval time.Duration, logger *zap.Logger) *Fetcher {
func NewFetcher(fetchInterval, latestBlockRetryInterval time.Duration, clientLimit int, logger *zap.Logger) *Fetcher {
return &Fetcher{
fetchInterval: fetchInterval,
latestBlockRetryInterval: latestBlockRetryInterval,
lastBlockInfo: NewLastBlockInfo(),
decoder: decoder.NewDecoder(logger),
clientLimit: clientLimit,
logger: logger,
}
}
Expand Down Expand Up @@ -92,10 +94,10 @@ func (f *Fetcher) Fetch(ctx context.Context, client *Client, requestBlockNum uin

numOfTransactions := len(ledgerMetadata.V1.TxProcessing)
f.logger.Debug("fetching transactions", zap.Uint64("block_num", requestBlockNum), zap.Int("num_of_transactions", numOfTransactions))
if numOfTransactions > 200 {
if numOfTransactions > f.clientLimit {
// There is a hard limit on the number of transactions
// to fetch. The RPC providers tipically set the maximum limit to 200.
numOfTransactions = 200
numOfTransactions = f.clientLimit
}
transactions, err := client.GetTransactions(requestBlockNum, numOfTransactions, f.lastBlockInfo.cursor)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rpc/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func Test_Fetch(t *testing.T) {
c := NewClient("https://mainnet.sorobanrpc.com", nil)
ledger, err := c.GetLatestLedger()
f := NewFetcher(time.Second, time.Second, zap.NewNop())
f := NewFetcher(time.Second, time.Second, 200, zap.NewNop())
b, _, err := f.Fetch(context.Background(), c, uint64(ledger.Sequence))
require.NoError(t, err)
require.NotNil(t, b)
Expand Down

0 comments on commit 36ce982

Please sign in to comment.