Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getLedgerEntries: optionally use high-performance Core server #353

Merged
merged 30 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
681c353
Replace getLedgerEntries DB queries with Core fetches
Shaptic Jan 28, 2025
7b0efca
Merge branch 'main' into migrate-getledgerentries
2opremio Feb 3, 2025
41dd850
Bump stellar/go dependency
2opremio Feb 5, 2025
1cc589d
Fix typo
2opremio Feb 5, 2025
a471392
Fix docker compose file
2opremio Feb 5, 2025
65a43d4
Restore the db-based implementation and choose dynamically
2opremio Feb 5, 2025
d34282f
Fix bug in daemon
2opremio Feb 5, 2025
33018b6
Add infrastructure for testing the new Core http query server
2opremio Feb 5, 2025
a2f574a
Add initial getLedgerEntries test using Core
2opremio Feb 5, 2025
9891b39
Workaround core bug by enabling the http port
2opremio Feb 6, 2025
79bbbf5
Fix nil dereference bug
2opremio Feb 6, 2025
32077d6
Migrate the rest of the tests to using core
2opremio Feb 6, 2025
c55a95c
Refactor result formatting
2opremio Feb 6, 2025
583120e
Fix bug in obtention of entries from Core
2opremio Feb 6, 2025
8ae30ba
Sort entries in response according to request order
2opremio Feb 6, 2025
de200a1
Only test the query server from protocol 23 onwards
2opremio Feb 6, 2025
372638d
Appease the linter
2opremio Feb 6, 2025
2026291
Revert unwanted change
2opremio Feb 6, 2025
d28373f
Remove TODO
2opremio Feb 6, 2025
b9cf90d
Correct Debian package name
2opremio Feb 6, 2025
9be46b9
Fix test
2opremio Feb 6, 2025
c1a14cb
Another attempt at fixing the package
2opremio Feb 6, 2025
f008b22
Appease the linter
2opremio Feb 6, 2025
13d28da
We still can't upgrade to protocol 23
2opremio Feb 6, 2025
dda2a98
Enable debug printouts for integration tests
2opremio Feb 6, 2025
1a9e7cb
Make sure all ports are allocated at once to minimize clashes
2opremio Feb 6, 2025
82a419e
Merge branch 'main' into migrate-getledgerentries
2opremio Feb 6, 2025
1a53f82
Revert "Enable debug printouts for integration tests"
2opremio Feb 6, 2025
dc869a1
Revert "Make sure all ports are allocated at once to minimize clashes"
2opremio Feb 6, 2025
b91aee5
Fix bug
2opremio Feb 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/stellar-rpc/internal/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ type Config struct {

Strict bool

StellarCoreURL string
CaptiveCoreStoragePath string
StellarCoreBinaryPath string
CaptiveCoreConfigPath string
CaptiveCoreHTTPPort uint
CaptiveCoreHTTPQueryPort uint
StellarCoreURL string
CaptiveCoreStoragePath string
StellarCoreBinaryPath string
CaptiveCoreConfigPath string
CaptiveCoreHTTPPort uint
CaptiveCoreHTTPQueryPort uint
CaptiveCoreHTTPQueryThreadPoolSize uint
CaptiveCoreHTTPQuerySnapshotLedgers uint

Endpoint string
AdminEndpoint string
Expand Down
20 changes: 16 additions & 4 deletions cmd/stellar-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ const (
OneDayOfLedgers = 17280
SevenDayOfLedgers = OneDayOfLedgers * 7

defaultHTTPEndpoint = "localhost:8000"
defaultCaptiveCoreHTTPPort = 11626 // regular queries like /info
defaultCaptiveCoreQueryPort = 11628 // high-performance bulk queries like /getledgerentry
defaultHTTPEndpoint = "localhost:8000"
defaultCaptiveCoreHTTPPort = 11626 // regular queries like /info
)

// TODO: refactor and remove the linter exceptions
Expand Down Expand Up @@ -62,6 +61,7 @@ func (cfg *Config) options() Options {
Usage: "Admin endpoint to listen and serve on. WARNING: this should not be accessible from the Internet and does not use TLS. \"\" (default) disables the admin server",
ConfigKey: &cfg.AdminEndpoint,
},
// TODO: should be gave a similar stellar-core-query-url parameter or should we assume that all queries will happen to the local captive core?
2opremio marked this conversation as resolved.
Show resolved Hide resolved
{
Name: "stellar-core-url",
Usage: "URL used to query Stellar Core (local captive core by default)",
Expand Down Expand Up @@ -92,7 +92,19 @@ func (cfg *Config) options() Options {
Name: "stellar-captive-core-http-query-port",
Usage: "HTTP port for Captive Core to listen on for high-performance queries like /getledgerentry (0 disables the HTTP server, must not conflict with CAPTIVE_CORE_HTTP_PORT)",
ConfigKey: &cfg.CaptiveCoreHTTPQueryPort,
DefaultValue: uint(defaultCaptiveCoreQueryPort),
DefaultValue: uint(0), // Disabled by default, although it normally uses 11628
},
{
Name: "stellar-captive-core-http-query-thread-pool-size",
Usage: "Number of threads to use by Captive Core's high-performance query server",
ConfigKey: &cfg.CaptiveCoreHTTPQueryThreadPoolSize,
DefaultValue: uint(runtime.NumCPU()), //nolint:gosec
},
{
Name: "stellar-captive-core-http-query-snapshot-ledgers",
Usage: "Size of ledger history in Captive Core's high-performance query server (don't touch unless you know what you are doing)",
ConfigKey: &cfg.CaptiveCoreHTTPQuerySnapshotLedgers,
DefaultValue: uint(4),
},
{
Name: "log-level",
Expand Down
17 changes: 16 additions & 1 deletion cmd/stellar-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@

// newCaptiveCore creates a new captive core backend instance and returns it.
func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbackend.CaptiveStellarCore, error) {
var queryServerParams *ledgerbackend.HTTPQueryServerParams
if cfg.CaptiveCoreHTTPPort != 0 {
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
// Only try to enable the server if the port passed is non-zero
queryServerParams = &ledgerbackend.HTTPQueryServerParams{
Port: uint16(cfg.CaptiveCoreHTTPQueryPort),

Check failure on line 129 in cmd/stellar-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

G115: integer overflow conversion uint -> uint16 (gosec)
ThreadPoolSize: uint16(cfg.CaptiveCoreHTTPQueryThreadPoolSize),

Check failure on line 130 in cmd/stellar-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

G115: integer overflow conversion uint -> uint16 (gosec)
SnapshotLedgers: uint16(cfg.CaptiveCoreHTTPQuerySnapshotLedgers),

Check failure on line 131 in cmd/stellar-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

G115: integer overflow conversion uint -> uint16 (gosec)
}
}

captiveCoreTomlParams := ledgerbackend.CaptiveCoreTomlParams{
HTTPPort: &cfg.CaptiveCoreHTTPPort,
HistoryArchiveURLs: cfg.HistoryArchiveURLs,
Expand All @@ -131,6 +141,7 @@
EnforceSorobanDiagnosticEvents: true,
EnforceSorobanTransactionMetaExtV1: true,
CoreBinaryPath: cfg.StellarCoreBinaryPath,
HTTPQueryServerParams: queryServerParams,
}
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromFile(cfg.CaptiveCoreConfigPath, captiveCoreTomlParams)
if err != nil {
Expand Down Expand Up @@ -239,8 +250,12 @@
}

func createHighperfStellarCoreClient(cfg *config.Config) interfaces.FastCoreClient {
// It doesn't make sense to create a client if the local server is not enabled
if cfg.CaptiveCoreHTTPQueryPort == 0 {
return nil
}
return &stellarcore.Client{
URL: fmt.Sprintf("%s:%d", cfg.StellarCoreURL, cfg.CaptiveCoreHTTPQueryPort),
URL: fmt.Sprintf("http://localhost:%d", cfg.CaptiveCoreHTTPQueryPort),
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
HTTP: &http.Client{Timeout: cfg.CoreRequestTimeout},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,4 @@ PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS"

# should be "core" when running RPC in a container or "localhost:port" when running RPC in the host
ADDRESS="${CORE_HOST_PORT}"
QUALITY="MEDIUM"

# parameters to enable Captive Core's high-performance HTTP server
HTTP_QUERY_PORT=11628
QUERY_THREAD_POOL_SIZE=4
QUERY_SNAPSHOT_LEDGERS=0
QUALITY="MEDIUM"
39 changes: 20 additions & 19 deletions cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,32 +302,33 @@
coreBinaryPath string
captiveCoreConfigPath string
captiveCoreStoragePath string
captiveCoreQueryPort int

Check failure on line 305 in cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

field `captiveCoreQueryPort` is unused (unused)
archiveURL string
sqlitePath string
}

func (vars rpcConfig) toMap() map[string]string {
return map[string]string{
"ENDPOINT": vars.endPoint,
"ADMIN_ENDPOINT": vars.adminEndpoint,
"STELLAR_CORE_URL": vars.stellarCoreURL,
"CORE_REQUEST_TIMEOUT": "2s",
"STELLAR_CORE_BINARY_PATH": vars.coreBinaryPath,
"CAPTIVE_CORE_CONFIG_PATH": vars.captiveCoreConfigPath,
"CAPTIVE_CORE_STORAGE_PATH": vars.captiveCoreStoragePath,
"STELLAR_CAPTIVE_CORE_HTTP_PORT": "0",
"STELLAR_CAPTIVE_CORE_HTTP_QUERY_PORT": strconv.Itoa(vars.captiveCoreQueryPort),
"FRIENDBOT_URL": FriendbotURL,
"NETWORK_PASSPHRASE": StandaloneNetworkPassphrase,
"HISTORY_ARCHIVE_URLS": vars.archiveURL,
"LOG_LEVEL": "debug",
"DB_PATH": vars.sqlitePath,
"INGESTION_TIMEOUT": "10m",
"HISTORY_RETENTION_WINDOW": strconv.Itoa(config.OneDayOfLedgers),
"CHECKPOINT_FREQUENCY": strconv.Itoa(checkpointFrequency),
"MAX_HEALTHY_LEDGER_LATENCY": "10s",
"PREFLIGHT_ENABLE_DEBUG": "true",
"ENDPOINT": vars.endPoint,
"ADMIN_ENDPOINT": vars.adminEndpoint,
"STELLAR_CORE_URL": vars.stellarCoreURL,
"CORE_REQUEST_TIMEOUT": "2s",
"STELLAR_CORE_BINARY_PATH": vars.coreBinaryPath,
"CAPTIVE_CORE_CONFIG_PATH": vars.captiveCoreConfigPath,
"CAPTIVE_CORE_STORAGE_PATH": vars.captiveCoreStoragePath,
"STELLAR_CAPTIVE_CORE_HTTP_PORT": "0",
// TODO: allow setting it for getledgerentry tests
// "STELLAR_CAPTIVE_CORE_HTTP_QUERY_PORT": strconv.Itoa(vars.captiveCoreQueryPort),
"FRIENDBOT_URL": FriendbotURL,
"NETWORK_PASSPHRASE": StandaloneNetworkPassphrase,
"HISTORY_ARCHIVE_URLS": vars.archiveURL,
"LOG_LEVEL": "debug",
"DB_PATH": vars.sqlitePath,
"INGESTION_TIMEOUT": "10m",
"HISTORY_RETENTION_WINDOW": strconv.Itoa(config.OneDayOfLedgers),
"CHECKPOINT_FREQUENCY": strconv.Itoa(checkpointFrequency),
"MAX_HEALTHY_LEDGER_LATENCY": "10s",
"PREFLIGHT_ENABLE_DEBUG": "true",
}
}

Expand Down
16 changes: 11 additions & 5 deletions cmd/stellar-rpc/internal/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler {

retentionWindow := cfg.HistoryRetentionWindow

getLedgerEntriesHandler := methods.NewGetLedgerEntriesFromDBHandler(params.Logger, params.LedgerEntryReader)
if params.Daemon.FastCoreClient() != nil {
// Prioritize getting ledger entries from core if available
getLedgerEntriesHandler = methods.NewGetLedgerEntriesFromCoreHandler(
params.Logger,
params.Daemon.FastCoreClient(),
params.LedgerEntryReader)
}

handlers := []struct {
methodName string
underlyingHandler jrpc2.Handler
Expand Down Expand Up @@ -221,11 +230,8 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler {
requestDurationLimit: cfg.MaxGetLedgersExecutionDuration,
},
{
methodName: protocol.GetLedgerEntriesMethodName,
underlyingHandler: methods.NewGetLedgerEntriesHandler(
params.Logger,
params.Daemon.FastCoreClient(),
params.LedgerEntryReader),
methodName: protocol.GetLedgerEntriesMethodName,
underlyingHandler: getLedgerEntriesHandler,
longName: toSnakeCase(protocol.GetLedgerEntriesMethodName),
queueLimit: cfg.RequestBacklogGetLedgerEntriesQueueLimit,
requestDurationLimit: cfg.MaxGetLedgerEntriesExecutionDuration,
Expand Down
141 changes: 99 additions & 42 deletions cmd/stellar-rpc/internal/methods/get_ledger_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,104 @@

const getLedgerEntriesMaxKeys = 200

// NewGetLedgerEntriesHandler returns a JSON RPC handler to retrieve the specified ledger entries from Stellar Core.
func NewGetLedgerEntriesHandler(
type ledgerEntryGetter interface {
GetLedgerEntries(ctx context.Context, keys []xdr.LedgerKey) ([]db.LedgerKeyAndEntry, uint32, error)
}

type coreLedgerEntryGetter struct {
coreClient interfaces.FastCoreClient
latestLedgerReader db.LedgerEntryReader
}

func (c coreLedgerEntryGetter) GetLedgerEntries(
ctx context.Context,
keys []xdr.LedgerKey,
) ([]db.LedgerKeyAndEntry, uint32, error) {
latestLedger, err := c.latestLedgerReader.GetLatestLedgerSequence(ctx)
if err != nil {
return nil, 0, fmt.Errorf("could not get latest ledger: %w", err)
}
// Pass latest ledger here in case Core is ahead of us (0 would be Core's latest).
resp, err := c.coreClient.GetLedgerEntries(ctx, latestLedger, keys...)
if err != nil {
return nil, 0, fmt.Errorf("could not query captive core: %w", err)
}
Shaptic marked this conversation as resolved.
Show resolved Hide resolved

result := make([]db.LedgerKeyAndEntry, 0, len(resp.Entries))
for i, entry := range resp.Entries {
// This could happen if the user tries to fetch a ledger entry that
// doesn't exist, making it a 404 equivalent, so just skip it.
if entry.State == coreProto.LedgerEntryStateNew {
continue
}

var xdrEntry xdr.LedgerEntry
err := xdr.SafeUnmarshalBase64(entry.Entry, &xdrEntry)
if err != nil {
return nil, 0, fmt.Errorf("could not decode ledger entry: %w", err)
}

newEntry := db.LedgerKeyAndEntry{
Key: keys[i],
Entry: xdrEntry,
}
if entry.Ttl != 0 {
newEntry.LiveUntilLedgerSeq = &entry.Ttl
}
result = append(result, newEntry)
}

return result, latestLedger, nil
}

type dbLedgerEntryGetter struct {
ledgerEntryReader db.LedgerEntryReader
}

func (d dbLedgerEntryGetter) GetLedgerEntries(ctx context.Context, keys []xdr.LedgerKey) ([]db.LedgerKeyAndEntry, uint32, error) {

Check failure on line 78 in cmd/stellar-rpc/internal/methods/get_ledger_entries.go

View workflow job for this annotation

GitHub Actions / golangci-lint

The line is 130 characters long, which exceeds the maximum of 120 characters. (lll)
tx, err := d.ledgerEntryReader.NewTx(ctx, false)
if err != nil {
return nil, 0, fmt.Errorf("could not create transaction: %w", err)
}
defer func() {
_ = tx.Done()
}()

latestLedger, err := tx.GetLatestLedgerSequence()
if err != nil {
return nil, 0, fmt.Errorf("could not get latest ledger: %w", err)
}

result, err := tx.GetLedgerEntries(keys...)
if err != nil {
return nil, 0, fmt.Errorf("could not get entries: %w", err)
}

return result, latestLedger, nil
}

// NewGetLedgerEntriesFromCoreHandler returns a JSON RPC handler to retrieve the specified ledger entries from Stellar Core.

Check failure on line 100 in cmd/stellar-rpc/internal/methods/get_ledger_entries.go

View workflow job for this annotation

GitHub Actions / golangci-lint

The line is 124 characters long, which exceeds the maximum of 120 characters. (lll)
func NewGetLedgerEntriesFromCoreHandler(
logger *log.Entry,
coreClient interfaces.FastCoreClient,
latestLedgerReader db.LedgerEntryReader,
) jrpc2.Handler {
getter := coreLedgerEntryGetter{
coreClient: coreClient,
latestLedgerReader: latestLedgerReader,
}
return newGetLedgerEntriesHandlerFromGetter(logger, getter)
}

// NewGetLedgerEntriesFromDBHandler returns a JSON RPC handler to retrieve the specified ledger entries from the database.

Check failure on line 113 in cmd/stellar-rpc/internal/methods/get_ledger_entries.go

View workflow job for this annotation

GitHub Actions / golangci-lint

The line is 122 characters long, which exceeds the maximum of 120 characters. (lll)
func NewGetLedgerEntriesFromDBHandler(logger *log.Entry, ledgerEntryReader db.LedgerEntryReader) jrpc2.Handler {
getter := dbLedgerEntryGetter{
ledgerEntryReader: ledgerEntryReader,
}
return newGetLedgerEntriesHandlerFromGetter(logger, getter)
}

func newGetLedgerEntriesHandlerFromGetter(logger *log.Entry, getter ledgerEntryGetter) jrpc2.Handler {

Check failure on line 121 in cmd/stellar-rpc/internal/methods/get_ledger_entries.go

View workflow job for this annotation

GitHub Actions / golangci-lint

cognitive complexity 35 of func `newGetLedgerEntriesHandlerFromGetter` is high (> 30) (gocognit)
return NewHandler(func(ctx context.Context, request protocol.GetLedgerEntriesRequest,
) (protocol.GetLedgerEntriesResponse, error) {
if err := protocol.IsValidFormat(request.Format); err != nil {
Expand Down Expand Up @@ -64,53 +156,18 @@
ledgerKeys = append(ledgerKeys, ledgerKey)
}

latestLedger, err := latestLedgerReader.GetLatestLedgerSequence(ctx)
if err != nil {
return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: "could not get latest ledger",
}
}

// Pass latest ledger here in case Core is ahead of us (0 would be Core's latest).
resp, err := coreClient.GetLedgerEntries(ctx, latestLedger, ledgerKeys...)
ledgerEntryResults := make([]protocol.LedgerEntryResult, 0, len(ledgerKeys))
ledgerKeysAndEntries, latestLedger, err := getter.GetLedgerEntries(ctx, ledgerKeys)
if err != nil {
logger.WithError(err).WithField("request", request).
Info("could not obtain ledger entries from storage")
return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

accumulation := []db.LedgerKeyAndEntry{}
for i, entry := range resp.Entries {
// This could happen if the user tries to fetch a ledger entry that
// doesn't exist, making it a 404 equivalent, so just skip it.
if entry.State == coreProto.LedgerEntryStateNew {
continue
}

var xdrEntry xdr.LedgerEntry
err := xdr.SafeUnmarshalBase64(entry.Entry, &xdrEntry)
if err != nil {
return protocol.GetLedgerEntriesResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: "failed to decode ledger entry",
}
}

newEntry := db.LedgerKeyAndEntry{
Key: ledgerKeys[i],
Entry: xdrEntry,
}
if entry.Ttl != 0 {
newEntry.LiveUntilLedgerSeq = &entry.Ttl
}
accumulation = append(accumulation, newEntry)
}

ledgerEntryResults := make([]protocol.LedgerEntryResult, 0, len(ledgerKeys))

for _, ledgerKeyAndEntry := range accumulation {
for _, ledgerKeyAndEntry := range ledgerKeysAndEntries {
switch request.Format {
case protocol.FormatJSON:
keyJs, err := xdr2json.ConvertInterface(ledgerKeyAndEntry.Key)
Expand Down
Loading