Skip to content

Commit

Permalink
getLedgerEntries: optionally use high-performance Core server (#353)
Browse files Browse the repository at this point in the history
* Replace getLedgerEntries DB queries with Core fetches
* Add infrastructure for testing the new Core http query server
* Sort entries in response according to request order
* Only test the query server from protocol 23 onwards
* Enable debug printouts for integration tests
* Make sure all ports are allocated at once to minimize clashes

---------

Co-authored-by: Alfonso Acosta <[email protected]>
  • Loading branch information
Shaptic and 2opremio authored Feb 7, 2025
1 parent cd2ef19 commit 55c706c
Show file tree
Hide file tree
Showing 15 changed files with 427 additions and 140 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/stellar-rpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ jobs:
STELLAR_RPC_INTEGRATION_TESTS_ENABLED: true
STELLAR_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }}
STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 22.0.0-2138.721fd0a65.focal
PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:22.0.0-2138.721fd0a65.focal
PROTOCOL_22_CORE_DEBIAN_PKG_VERSION: 22.0.0-2138.721fd0a65.focal
PROTOCOL_22_CORE_DOCKER_IMG: stellar/stellar-core:22.0.0-2138.721fd0a65.focal
PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 22.1.0-2194.0241e79f7.focal
PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:22.1.0-2194.0241e79f7.focal
PROTOCOL_22_CORE_DEBIAN_PKG_VERSION: 22.1.1-2251.ac9f21ac7.focal~do~not~use~in~prd
PROTOCOL_22_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:22.1.1-2251.ac9f21ac7.focal-do-not-use-in-prd

steps:
- uses: actions/checkout@v4
Expand Down
13 changes: 8 additions & 5 deletions cmd/stellar-rpc/internal/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ type Config struct {

Strict bool

StellarCoreURL string
CaptiveCoreStoragePath string
StellarCoreBinaryPath string
CaptiveCoreConfigPath string
CaptiveCoreHTTPPort uint
StellarCoreURL string
CaptiveCoreStoragePath string
StellarCoreBinaryPath string
CaptiveCoreConfigPath string
CaptiveCoreHTTPPort uint16
CaptiveCoreHTTPQueryPort uint16
CaptiveCoreHTTPQueryThreadPoolSize uint16
CaptiveCoreHTTPQuerySnapshotLedgers uint16

Endpoint string
AdminEndpoint string
Expand Down
23 changes: 21 additions & 2 deletions cmd/stellar-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const (
OneDayOfLedgers = 17280
SevenDayOfLedgers = OneDayOfLedgers * 7

defaultHTTPEndpoint = "localhost:8000"
defaultHTTPEndpoint = "localhost:8000"
defaultCaptiveCoreHTTPPort = 11626 // regular queries like /info
)

// TODO: refactor and remove the linter exceptions
Expand Down Expand Up @@ -84,7 +85,25 @@ func (cfg *Config) options() Options {
Name: "stellar-captive-core-http-port",
Usage: "HTTP port for Captive Core to listen on (0 disables the HTTP server)",
ConfigKey: &cfg.CaptiveCoreHTTPPort,
DefaultValue: uint(11626),
DefaultValue: uint16(defaultCaptiveCoreHTTPPort),
},
{
Name: "stellar-captive-core-http-query-port",
Usage: "HTTP port for Captive Core to listen on for high-performance queries like /getledgerentry (0 disables the HTTP server, must not conflict with CAPTIVE_CORE_HTTP_PORT)",
ConfigKey: &cfg.CaptiveCoreHTTPQueryPort,
DefaultValue: uint16(0), // Disabled by default, although it normally uses 11628
},
{
Name: "stellar-captive-core-http-query-thread-pool-size",
Usage: "Number of threads to use by Captive Core's high-performance query server",
ConfigKey: &cfg.CaptiveCoreHTTPQueryThreadPoolSize,
DefaultValue: uint16(runtime.NumCPU()), //nolint:gosec
},
{
Name: "stellar-captive-core-http-query-snapshot-ledgers",
Usage: "Size of ledger history in Captive Core's high-performance query server (don't touch unless you know what you are doing)",
ConfigKey: &cfg.CaptiveCoreHTTPQuerySnapshotLedgers,
DefaultValue: uint16(4),
},
{
Name: "log-level",
Expand Down
1 change: 1 addition & 0 deletions cmd/stellar-rpc/internal/config/test.soroban.rpc.config
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ STELLAR_CORE_BINARY_PATH="/usr/bin/stellar-core"
HISTORY_ARCHIVE_URLS=["http://localhost:1570"]
DB_PATH="/opt/stellar/stellar-rpc/rpc_db.sqlite"
STELLAR_CAPTIVE_CORE_HTTP_PORT=0
STELLAR_CAPTIVE_CORE_HTTP_QUERY_PORT=11628
CHECKPOINT_FREQUENCY=64
2 changes: 2 additions & 0 deletions cmd/stellar-rpc/internal/config/toml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func TestRoundTrip(t *testing.T) {
*v = "test"
case *uint:
*v = 42
case *uint16:
*v = 22
case *uint32:
*v = 32
case *time.Duration:
Expand Down
43 changes: 36 additions & 7 deletions cmd/stellar-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package daemon
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -49,6 +50,7 @@ const (
type Daemon struct {
core *ledgerbackend.CaptiveStellarCore
coreClient *CoreClientWithMetrics
coreQueryingClient interfaces.FastCoreClient
ingestService *ingest.Service
db *db.DB
jsonRPCHandler *internal.Handler
Expand Down Expand Up @@ -120,15 +122,27 @@ func (d *Daemon) Close() error {

// newCaptiveCore creates a new captive core backend instance and returns it.
func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbackend.CaptiveStellarCore, error) {
var queryServerParams *ledgerbackend.HTTPQueryServerParams
if cfg.CaptiveCoreHTTPQueryPort != 0 {
// Only try to enable the server if the port passed is non-zero
queryServerParams = &ledgerbackend.HTTPQueryServerParams{
Port: cfg.CaptiveCoreHTTPQueryPort,
ThreadPoolSize: cfg.CaptiveCoreHTTPQueryThreadPoolSize,
SnapshotLedgers: cfg.CaptiveCoreHTTPQuerySnapshotLedgers,
}
}

httpPort := uint(cfg.CaptiveCoreHTTPPort)
captiveCoreTomlParams := ledgerbackend.CaptiveCoreTomlParams{
HTTPPort: &cfg.CaptiveCoreHTTPPort,
HTTPPort: &httpPort,
HistoryArchiveURLs: cfg.HistoryArchiveURLs,
NetworkPassphrase: cfg.NetworkPassphrase,
Strict: true,
UseDB: true,
EnforceSorobanDiagnosticEvents: true,
EnforceSorobanTransactionMetaExtV1: true,
CoreBinaryPath: cfg.StellarCoreBinaryPath,
HTTPQueryServerParams: queryServerParams,
}
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromFile(cfg.CaptiveCoreConfigPath, captiveCoreTomlParams)
if err != nil {
Expand Down Expand Up @@ -156,12 +170,13 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
metricsRegistry := prometheus.NewRegistry()

daemon := &Daemon{
logger: logger,
core: core,
db: mustOpenDatabase(cfg, logger, metricsRegistry),
done: make(chan struct{}),
metricsRegistry: metricsRegistry,
coreClient: newCoreClientWithMetrics(createStellarCoreClient(cfg), metricsRegistry),
logger: logger,
core: core,
db: mustOpenDatabase(cfg, logger, metricsRegistry),
done: make(chan struct{}),
metricsRegistry: metricsRegistry,
coreClient: newCoreClientWithMetrics(createStellarCoreClient(cfg), metricsRegistry),
coreQueryingClient: createHighperfStellarCoreClient(cfg),
}

feewindows := daemon.mustInitializeStorage(cfg)
Expand Down Expand Up @@ -235,6 +250,17 @@ func createStellarCoreClient(cfg *config.Config) stellarcore.Client {
}
}

func createHighperfStellarCoreClient(cfg *config.Config) interfaces.FastCoreClient {
// It doesn't make sense to create a client if the local server is not enabled
if cfg.CaptiveCoreHTTPQueryPort == 0 {
return nil
}
return &stellarcore.Client{
URL: fmt.Sprintf("http://localhost:%d", cfg.CaptiveCoreHTTPQueryPort),
HTTP: &http.Client{Timeout: cfg.CoreRequestTimeout},
}
}

func createIngestService(cfg *config.Config, logger *supportlog.Entry, daemon *Daemon,
feewindows *feewindow.FeeWindows, historyArchive *historyarchive.ArchiveInterface,
) *ingest.Service {
Expand Down Expand Up @@ -486,3 +512,6 @@ func (d *Daemon) Run() {
return
}
}

// Ensure the daemon conforms to the interface
var _ interfaces.Daemon = (*Daemon)(nil)
6 changes: 6 additions & 0 deletions cmd/stellar-rpc/internal/daemon/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stellar/go/ingest/ledgerbackend"
proto "github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/xdr"
)

// Daemon defines the interface that the Daemon would be implementing.
Expand All @@ -16,10 +17,15 @@ type Daemon interface {
MetricsRegistry() *prometheus.Registry
MetricsNamespace() string
CoreClient() CoreClient
FastCoreClient() FastCoreClient
GetCore() *ledgerbackend.CaptiveStellarCore
}

type CoreClient interface {
Info(ctx context.Context) (*proto.InfoResponse, error)
SubmitTransaction(ctx context.Context, txBase64 string) (*proto.TXResponse, error)
}

type FastCoreClient interface {
GetLedgerEntries(ctx context.Context, ledgerSeq uint32, keys ...xdr.LedgerKey) (proto.GetLedgerEntryResponse, error)
}
11 changes: 11 additions & 0 deletions cmd/stellar-rpc/internal/daemon/interfaces/noOpDaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stellar/go/ingest/ledgerbackend"
proto "github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/xdr"
)

// TODO: deprecate and rename to stellar_rpc
Expand Down Expand Up @@ -41,6 +42,10 @@ func (d *NoOpDaemon) CoreClient() CoreClient {
return d.coreClient
}

func (d *NoOpDaemon) FastCoreClient() FastCoreClient {
return d.coreClient
}

func (d *NoOpDaemon) GetCore() *ledgerbackend.CaptiveStellarCore {
return d.core
}
Expand All @@ -54,3 +59,9 @@ func (s noOpCoreClient) Info(context.Context) (*proto.InfoResponse, error) {
func (s noOpCoreClient) SubmitTransaction(context.Context, string) (*proto.TXResponse, error) {
return &proto.TXResponse{Status: proto.PreflightStatusOk}, nil
}

func (s noOpCoreClient) GetLedgerEntries(context.Context,
uint32, ...xdr.LedgerKey,
) (proto.GetLedgerEntryResponse, error) {
return proto.GetLedgerEntryResponse{}, nil
}
4 changes: 4 additions & 0 deletions cmd/stellar-rpc/internal/daemon/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func (d *Daemon) CoreClient() interfaces.CoreClient {
return d.coreClient
}

func (d *Daemon) FastCoreClient() interfaces.FastCoreClient {
return d.coreQueryingClient
}

func (d *Daemon) GetCore() *ledgerbackend.CaptiveStellarCore {
return d.core
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@ import (
)

func TestGetLedgerEntriesNotFound(t *testing.T) {
test := infrastructure.NewTest(t, nil)
t.Run("WithCore", func(t *testing.T) {
testGetLedgerEntriesNotFound(t, true)
})
t.Run("WithoutCore", func(t *testing.T) {
testGetLedgerEntriesNotFound(t, false)
})
}

func testGetLedgerEntriesNotFound(t *testing.T, useCore bool) {
test := infrastructure.NewTest(t, &infrastructure.TestConfig{
EnableCoreHTTPQueryServer: useCore,
})
client := test.GetRPCLient()

hash := xdr.Hash{0xa, 0xb}
Expand Down Expand Up @@ -48,7 +59,18 @@ func TestGetLedgerEntriesNotFound(t *testing.T) {
}

func TestGetLedgerEntriesInvalidParams(t *testing.T) {
test := infrastructure.NewTest(t, nil)
t.Run("WithCore", func(t *testing.T) {
testGetLedgerEntriesInvalidParams(t, true)
})
t.Run("WithoutCore", func(t *testing.T) {
testGetLedgerEntriesInvalidParams(t, false)
})
}

func testGetLedgerEntriesInvalidParams(t *testing.T, useCore bool) {
test := infrastructure.NewTest(t, &infrastructure.TestConfig{
EnableCoreHTTPQueryServer: useCore,
})

client := test.GetRPCLient()

Expand All @@ -66,7 +88,18 @@ func TestGetLedgerEntriesInvalidParams(t *testing.T) {
}

func TestGetLedgerEntriesSucceeds(t *testing.T) {
test := infrastructure.NewTest(t, nil)
t.Run("WithCore", func(t *testing.T) {
testGetLedgerEntriesSucceeds(t, true)
})
t.Run("WithoutCore", func(t *testing.T) {
testGetLedgerEntriesSucceeds(t, false)
})
}

func testGetLedgerEntriesSucceeds(t *testing.T, useCore bool) {
test := infrastructure.NewTest(t, &infrastructure.TestConfig{
EnableCoreHTTPQueryServer: useCore,
})
_, contractID, contractHash := test.CreateHelloWorldContract()

contractCodeKeyB64, err := xdr.MarshalBase64(xdr.LedgerKey{
Expand Down Expand Up @@ -117,7 +150,7 @@ func TestGetLedgerEntriesSucceeds(t *testing.T) {
require.Equal(t, xdr.LedgerEntryTypeContractCode, firstEntry.Type)
require.Equal(t, infrastructure.GetHelloWorldContract(), firstEntry.MustContractCode().Code)

require.Greater(t, result.Entries[1].LastModifiedLedger, uint32(0))
require.Positive(t, result.Entries[1].LastModifiedLedger)
require.LessOrEqual(t, result.Entries[1].LastModifiedLedger, result.LatestLedger)
require.NotNil(t, result.Entries[1].LiveUntilLedgerSeq)
require.Greater(t, *result.Entries[1].LiveUntilLedgerSeq, result.LatestLedger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ PUBLIC_KEY="GD5KD2KEZJIGTC63IGW6UMUSMVUVG5IHG64HUTFWCHVZH2N2IBOQN7PS"

# should be "core" when running RPC in a container or "localhost:port" when running RPC in the host
ADDRESS="${CORE_HOST_PORT}"
QUALITY="MEDIUM"
QUALITY="MEDIUM"
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ services:
# Note: Please keep the image pinned to an immutable tag matching the Captive Core version.
# This avoids implicit updates which break compatibility between
# the Core container and captive core.
image: ${CORE_IMAGE:-stellar/stellar-core:22.0.0-2138.721fd0a65.focal}
image: ${CORE_IMAGE:-stellar/unsafe-stellar-core:22.1.1-2251.ac9f21ac7.focal-do-not-use-in-prd}

depends_on:
- core-postgres
environment:
Expand All @@ -23,6 +24,8 @@ services:
- "127.0.0.1:0:11625"
# http
- "127.0.0.1:0:11626"
# high-perf http
- "127.0.0.1:0:11628"
# history archive
- "127.0.0.1:0:1570"
entrypoint: /usr/bin/env
Expand Down
Loading

0 comments on commit 55c706c

Please sign in to comment.