From b9eec15e6ea3810d921e785becc6566c5abebeca Mon Sep 17 00:00:00 2001 From: Gabriel Garrido Calvo <2728080+ggarri@users.noreply.github.com> Date: Tue, 24 May 2022 13:08:52 +0200 Subject: [PATCH] Tx listener optimizations and enhancements (#1018) * fetch pending jobs on new block instead of querying per tx_hash * trigger only on new block * fixed retry on 'not found block' | fixed pending jobs * small adjusment on e2e * no caching latest blocks * improve proxy logging logic * add index on idempotency-key * update changelog * improve stress tests * fix unit testes * rewrite change log --- .circleci/config.yml | 2 +- .golangci.yml | 13 +- CHANGELOG.md | 10 +- Makefile | 13 +- docker-compose.dev.yml | 12 +- docker-compose.e2e.yml | 40 ++-- pkg/errors/errors.go | 2 +- .../app/http/config/dynamic/middleware.go | 4 - pkg/toolkit/ethclient/rpc/client.go | 1 - services/api/app.go | 2 +- services/api/provider.go | 4 +- services/api/provider_test.go | 2 +- services/api/proxy/http-cache.go | 2 +- services/api/proxy/http-cache_test.go | 8 +- services/api/proxy/provider.go | 16 +- .../25_contracts_db_improvements.go | 12 +- .../tx-listener/session/ethereum/session.go | 220 ++++++++++++------ services/tx-listener/session/manager.go | 12 +- services/tx-sentry/app.go | 2 +- services/tx-sentry/config.go | 2 +- .../service/listeners/session_manager.go | 4 +- tests/features/sentry_retries.feature | 2 +- tests/service/stress/exported.go | 2 + tests/service/stress/service.go | 20 +- .../stress/units/testDeployContract.go | 7 +- .../stress/units/testSendContractTxs.go | 8 +- 26 files changed, 266 insertions(+), 156 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index b1e906a22..650223b98 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -298,7 +298,7 @@ jobs: STRESS_ITERATIONS: <> STRESS_TIMEOUT: <> KAFKA_CONSUMER_GROUP_NAME: "stress" - command: ./build/bin/test stress + command: make stress-ci run-performance: docker: diff --git a/.golangci.yml b/.golangci.yml index 6f0e4e4c1..2f8720e0c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -52,18 +52,19 @@ linters: issues: max-same-issues: 10 + exclude-use-default: true run: deadline: 20m skip-dirs: - .gocache - - /pkg/go-ethereum - - /build - - /public + - pkg/go-ethereum + - build + - public - mock - - /scripts - - /vendor - - /tests + - scripts + - vendor + - tests skip-files: - ".*\\.pb(\\.gw)?\\.go$" - pkg/toolkit/app/http/handler/dashboard/genstatic/gen.go diff --git a/CHANGELOG.md b/CHANGELOG.md index fa6e9496e..9dea087c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,13 @@ # Orchestrate Release Notes -## v21.12.7 (WIP) +## v21.12.7 (2022-05-24) ### 🛠 Bug fixes -* Reduced `tx-listener` service demands on `api` by usage of an optional in-memory cache. This is an optional feature -which can be enabled using `API_CACHE_TTL` environment variable. +* Stop printing chain-proxy access logs errors when `ACCESSLOG_ENABLED=false`. + +### 🛠 Enhancements +* Reduced `tx-listener` services request to `orchestrate-api` can be reduced by usage of an +optional in-memory cache. To be enabled set a duration using `API_CACHE_TTL` environment variable. +* Reduced database I/O usage by ~60%. ## v21.12.6 (2022-05-04) ### 🛠 Bug fixes diff --git a/Makefile b/Makefile index e8e2e3d7b..9dd0363e5 100644 --- a/Makefile +++ b/Makefile @@ -17,11 +17,6 @@ ifeq ($(UNAME_S),Darwin) VEGETA_BIN_URL = https://github.com/tsenart/vegeta/releases/download/v12.8.4/vegeta_12.8.4_darwin_amd64.tar.gz endif -ifneq (,$(wildcard ./.env)) - include .env - export -endif - .PHONY: all run-coverage coverage fmt fmt-check vet lint misspell-check misspell race tools help networks: @@ -77,7 +72,7 @@ e2e: run-e2e @$(OPEN) build/report/report.html 2>/dev/null run-stress: gobuild-stress - @./build/bin/test stress + @docker-compose -f docker-compose.e2e.yml up -V stress e2e: run-e2e @docker-compose -f docker-compose.e2e.yml up --build report @@ -92,8 +87,6 @@ e2e-ci: gobuild-e2e deploy-remote-env: @bash ./scripts/deploy-remote-env.sh -stress: run-stress - @exit $(docker inspect orchestrate_stress_1 --format='{{.State.ExitCode}}') stress-ci: @docker-compose -f docker-compose.dev.yml up stress @@ -172,10 +165,10 @@ bootstrap-deps: bootstrap ## Wait for dependencies to be ready @bash scripts/bootstrap-deps.sh gobuild-e2e: ## Build Orchestrate e2e Docker image - @CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./build/bin/test ./tests/cmd + @CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./build/bin/e2e ./tests/cmd gobuild-stress: ## Build Orchestrate stress binary - @CGO_ENABLED=0 go build -o ./build/bin/test ./tests/stress/cmd + @CGO_ENABLED=0 go build -o ./build/bin/stress ./tests/cmd orchestrate: gobuild ## Start Orchestrate @docker-compose -f docker-compose.dev.yml up --force-recreate --build -d $(ORCH_SERVICES) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 787a4357a..5cf016b20 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -30,7 +30,7 @@ x-default-variables: &default-variables x-container-common: &container-common image: golang:1.16.9 - restart: ${CONTAINER_RESTART-on-failure} +# restart: ${CONTAINER_RESTART-on-failure} entrypoint: /bin/main tty: true networks: @@ -136,13 +136,21 @@ services: <<: *container-common ports: - 8082:8082 +# - 2345:2345 environment: <<: *default-variables <<: *tx-listener-common +# volumes: +# - ./build/bin/orchestrate:/bin/main:ro +# - /usr/local/bin/dlv:/usr/bin/dlv depends_on: - api command: tx-listener run - +# entrypoint: +# - sh +# - -c +# - | +# dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec /bin/main tx-listener run networks: orchestrate: external: diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index b763a6a87..0f3081b27 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -29,24 +29,35 @@ x-container-common: &container-common - orchestrate tty: true +x-e2e-variables: &e2e-variables + CUCUMBER_OUTPUTPATH: "/report/output/report.json" + CUCUMBER_PATHS: "/features" + ARTIFACTS_PATH: "/artifacts" + CUCUMBER_FORMAT: "cucumber" + CUCUMBER_CONCURRENCY: ${CUCUMBER_CONCURRENCY-} + CUCUMBER_TAGS: ${CUCUMBER_TAGS-} + TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-} + CUCUMBER_STEPS_TIMEOUT: ${CUCUMBER_STEPS_TIMEOUT-30s} + KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-e2e} + +x-stress-variables: &stress-variables + ARTIFACTS_PATH: "/artifacts" + STRESS_CONCURRENCY: ${STRESS_CONCURRENCY-30} + STRESS_ITERATIONS: ${STRESS_ITERATIONS-500} + STRESS_TIMEOUT: ${STRESS_TIMEOUT-15m} + TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-} + KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-stress} + services: e2e: <<: *container-common environment: <<: *default-variables - CUCUMBER_OUTPUTPATH: "/report/output/report.json" - CUCUMBER_PATHS: "/features" - ARTIFACTS_PATH: "/artifacts" - CUCUMBER_FORMAT: "cucumber" - CUCUMBER_CONCURRENCY: ${CUCUMBER_CONCURRENCY-} - CUCUMBER_TAGS: ${CUCUMBER_TAGS-} - TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-} - CUCUMBER_STEPS_TIMEOUT: ${CUCUMBER_STEPS_TIMEOUT-30s} - KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-e2e} + <<: *e2e-variables restart: "no" command: e2e volumes: - - ./build/bin/test:/bin/main + - ./build/bin/e2e:/bin/main - ./build/report:/report/output - ./tests/features:/features - ./tests/artifacts:/artifacts @@ -72,16 +83,11 @@ services: <<: *container-common environment: <<: *default-variables - ARTIFACTS_PATH: "/artifacts" - STRESS_CONCURRENCY: "15" - STRESS_ITERATIONS: "100" - STRESS_TIMEOUT: "5m" - TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-} - KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-stress} + <<: *stress-variables restart: "no" command: stress volumes: - - ./build/bin/test:/bin/main + - ./build/bin/stress:/bin/main - ./tests/artifacts:/artifacts networks: diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index d6b120732..50dace6bf 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -320,7 +320,7 @@ func EthereumError(format string, a ...interface{}) *ierror.Error { return Errorf(Ethereum, format, a...) } -// IsEthereumError indicate whether an error is an Etehreum error +// IsEthereumError indicate whether an error is an Ethereum error func IsEthereumError(err error) bool { return isErrorClass(FromError(err).GetCode(), Ethereum) } diff --git a/pkg/toolkit/app/http/config/dynamic/middleware.go b/pkg/toolkit/app/http/config/dynamic/middleware.go index 4c6ae0294..8a29766e3 100644 --- a/pkg/toolkit/app/http/config/dynamic/middleware.go +++ b/pkg/toolkit/app/http/config/dynamic/middleware.go @@ -44,10 +44,6 @@ func FromTraefikMiddleware(middleware *traefikdynamic.Middleware) *Middleware { } } -func ToTraefikMiddleware(middleware *Middleware) *traefikdynamic.Middleware { - return middleware.Middleware -} - // +k8s:deepcopy-gen=true type Auth struct{} diff --git a/pkg/toolkit/ethclient/rpc/client.go b/pkg/toolkit/ethclient/rpc/client.go index 584f7894d..bce80e38d 100644 --- a/pkg/toolkit/ethclient/rpc/client.go +++ b/pkg/toolkit/ethclient/rpc/client.go @@ -220,7 +220,6 @@ func processBlockResult(header **ethtypes.Header, body **Body) ParseResultFunc { } if len(raw) == 0 { - // Block was not found return errors.NotFoundError("block not found") } diff --git a/services/api/app.go b/services/api/app.go index facbc4cae..48f6d02f2 100644 --- a/services/api/app.go +++ b/services/api/app.go @@ -103,7 +103,7 @@ func NewAPI( apiHandlerOpt, httpCacheOpt, reverseProxyOpt, - app.ProviderOpt(NewProvider(ucs.SearchChains(), time.Second, cfg.Proxy.ProxyCacheTTL)), + app.ProviderOpt(NewProvider(ucs.SearchChains(), time.Second, cfg.Proxy.ProxyCacheTTL, cfg.App.HTTP.AccessLog)), ) } diff --git a/services/api/provider.go b/services/api/provider.go index 46373d9be..1f268eece 100644 --- a/services/api/provider.go +++ b/services/api/provider.go @@ -20,10 +20,10 @@ const ( InternalProvider = "internal" ) -func NewProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration) provider.Provider { +func NewProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration, accessLog bool) provider.Provider { prvdr := aggregator.New() prvdr.AddProvider(NewInternalProvider()) - prvdr.AddProvider(proxy.NewChainsProxyProvider(searchChains, refresh, proxyCacheTTL)) + prvdr.AddProvider(proxy.NewChainsProxyProvider(searchChains, refresh, proxyCacheTTL, accessLog)) return prvdr } diff --git a/services/api/provider_test.go b/services/api/provider_test.go index 14f51a375..1cdf675e5 100644 --- a/services/api/provider_test.go +++ b/services/api/provider_test.go @@ -235,7 +235,7 @@ func TestNewChainsProxyConfig(t *testing.T) { } for i, test := range testSet { - cfg := proxy.NewProxyConfig(test.chains, nil) + cfg := proxy.NewProxyConfig(test.chains, nil, true) expectedCfg := test.expectedCfg(dynamic.NewConfig()) assert.Equal(t, expectedCfg, cfg, "Chain-registry - Store (%d/%d): expected %v but got %v", i+1, len(testSet), expectedCfg, cfg) } diff --git a/services/api/proxy/http-cache.go b/services/api/proxy/http-cache.go index 3a3fef610..f2908ef83 100644 --- a/services/api/proxy/http-cache.go +++ b/services/api/proxy/http-cache.go @@ -60,7 +60,7 @@ func HTTPCacheRequest(ctx context.Context, req *http.Request) (c bool, k string, cacheKey := fmt.Sprintf("%s(%s)", msg.Method, string(msg.Params)) if msg.Method == "eth_getBlockByNumber" && strings.Contains(string(msg.Params), "latest") { - return true, cacheKey, time.Second, nil + return false, "", 0, nil } return true, cacheKey, 0, nil diff --git a/services/api/proxy/http-cache_test.go b/services/api/proxy/http-cache_test.go index 3ed146f19..002cd5553 100644 --- a/services/api/proxy/http-cache_test.go +++ b/services/api/proxy/http-cache_test.go @@ -30,7 +30,7 @@ func TestHTTPCacheRequest_Valid(t *testing.T) { assert.Equal(t, "eth_getTransactionReceipt([\"0x7d231ca6a5fc03f5365b3d62dcfe372ed5c13ac7014d016b52ed72094919556c\"])", k) } -func TestHTTPCacheRequest_ValidWithCustomTTL(t *testing.T) { +func TestHTTPCacheRequest_IgnoreLatestBlock(t *testing.T) { msg := ethclient.JSONRpcMessage{ Method: "eth_getBlockByNumber", } @@ -39,11 +39,9 @@ func TestHTTPCacheRequest_ValidWithCustomTTL(t *testing.T) { body, _ := json.Marshal(msg) req := httptest.NewRequest(http.MethodPost, "http://localhost", bytes.NewReader(body)) - c, k, ttl, err := HTTPCacheRequest(req.Context(), req) + c, _,_, err := HTTPCacheRequest(req.Context(), req) assert.NoError(t, err) - assert.True(t, c) - assert.Equal(t, time.Second, ttl) - assert.Equal(t, "eth_getBlockByNumber([\"latest\"])", k) + assert.False(t, c) } func TestHTTPCacheRequest_IgnoreReqType(t *testing.T) { diff --git a/services/api/proxy/provider.go b/services/api/proxy/provider.go index 1f29ba5b6..eb31d2222 100644 --- a/services/api/proxy/provider.go +++ b/services/api/proxy/provider.go @@ -19,7 +19,7 @@ import ( const ChainsProxyProvider = "chains-proxy" -func NewChainsProxyProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration) provider.Provider { +func NewChainsProxyProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration, accessLog bool) provider.Provider { poller := func(ctx context.Context) (provider.Message, error) { // Wildcard user including chains owned by individual users (Special rights) chains, err := searchChains.Execute(ctx, &entities.ChainFilters{}, multitenancy.NewInternalAdminUser()) @@ -27,13 +27,13 @@ func NewChainsProxyProvider(searchChains usecases.SearchChainsUseCase, refresh t return nil, err } - return dynamic.NewMessage(ChainsProxyProvider, NewProxyConfig(chains, proxyCacheTTL)), nil + return dynamic.NewMessage(ChainsProxyProvider, NewProxyConfig(chains, proxyCacheTTL, accessLog)), nil } return poll.New(poller, refresh) } -func NewProxyConfig(chains []*entities.Chain, proxyCacheTTL *time.Duration) *dynamic.Configuration { +func NewProxyConfig(chains []*entities.Chain, proxyCacheTTL *time.Duration, accessLog bool) *dynamic.Configuration { cfg := dynamic.NewConfig() for _, chain := range chains { @@ -44,13 +44,13 @@ func NewProxyConfig(chains []*entities.Chain, proxyCacheTTL *time.Duration) *dyn multitenancyMid = fmt.Sprintf("auth-%v:%s@multitenancy", chain.TenantID, chain.OwnerID) } - middlewares := []string{ - "chain-proxy-accesslog@internal", - "auth@multitenancy", - multitenancyMid, - "strip-path@internal", + middlewares := []string{} + if accessLog { + middlewares = append(middlewares, "chain-proxy-accesslog@internal") } + middlewares = append(middlewares, "auth@multitenancy", multitenancyMid, "strip-path@internal") + cfg.HTTP.Middlewares[multitenancyMid] = &dynamic.Middleware{ MultiTenancy: &dynamic.MultiTenancy{ Tenant: chain.TenantID, diff --git a/services/api/store/postgres/migrations/25_contracts_db_improvements.go b/services/api/store/postgres/migrations/25_contracts_db_improvements.go index 42799230e..ea43320a1 100644 --- a/services/api/store/postgres/migrations/25_contracts_db_improvements.go +++ b/services/api/store/postgres/migrations/25_contracts_db_improvements.go @@ -6,7 +6,7 @@ import ( ) func upgradeRemoveFkeysContracts(db migrations.DB) error { - log.Debug("Applying improvements contract tables...") + log.Debug("Applying improvements contract tables and add index on idempotency-key...") _, err := db.Exec(` CREATE UNIQUE INDEX tags_repository_name_idx ON tags (repository_id, (lower(name))); ALTER TABLE repositories drop constraint repositories_name_key; @@ -15,17 +15,19 @@ CREATE UNIQUE INDEX repositories_name_idx ON repositories ((lower(name))); ALTER TABLE transactions ADD COLUMN contract_name TEXT, ADD COLUMN contract_tag TEXT; + +CREATE INDEX transactions_hash_idem_idx on transaction_requests (idempotency_key); `) if err != nil { return err } - log.Info("Applied improvements on contract tables successfully") + log.Info("Applied improvements on contract tables and added index on idempotency-key successfully") return nil } func downgradeRemoveFkeysContracts(db migrations.DB) error { - log.Debug("Undoing improvements on contract tables...") + log.Debug("Undoing improvements on contract tables and removing index on idempotency-key...") _, err := db.Exec(` ALTER TABLE transactions DROP COLUMN contract_name, @@ -34,11 +36,13 @@ ALTER TABLE transactions DROP INDEX repositories_name_idx; ALTER TABLE repositories ADD CONSTRAINT repositories_name_key UNIQUE (name); DROP INDEX tags_repository_name_idx; + +DROP INDEX transactions_hash_idem_idx; `) if err != nil { return err } - log.Info("Downgraded improvements on contract tables") + log.Info("Downgraded improvements on contract tables and removed index on idempotency-key") return nil } diff --git a/services/tx-listener/session/ethereum/session.go b/services/tx-listener/session/ethereum/session.go index f7726d94d..af0f1e1d8 100644 --- a/services/tx-listener/session/ethereum/session.go +++ b/services/tx-listener/session/ethereum/session.go @@ -35,6 +35,11 @@ type Session struct { bckOff backoff.BackOff metrics metrics.ListenerMetrics metricsLabels []string + + pendingJobMap map[string]*entities.Job + pendingJobMapMutex *sync.RWMutex + pendingJobLastCheckedAt time.Time + // Listening session trigger chan struct{} blockPosition uint64 @@ -55,13 +60,15 @@ func NewSession( m metrics.ListenerMetrics, ) *Session { return &Session{ - Chain: chain, - ec: ec, - client: client, - hook: callHook, - offsets: offsets, - bckOff: backoff.NewConstantBackOff(2 * time.Second), - metrics: m, + Chain: chain, + ec: ec, + client: client, + hook: callHook, + offsets: offsets, + bckOff: backoff.NewConstantBackOff(2 * time.Second), + metrics: m, + pendingJobMap: make(map[string]*entities.Job), + pendingJobMapMutex: &sync.RWMutex{}, metricsLabels: []string{ "chain_uuid", chain.UUID, }, @@ -227,8 +234,7 @@ listeningLoop: for { select { case <-ctx.Done(): - s.logger.WithField("block_stop", s.blockPosition). - Debug("stopping fetch block listener") + s.logger.WithField("block_stop", s.blockPosition).Debug("stopping fetch block listener") break listeningLoop case <-s.trigger: if (s.currentChainTip > 0) && s.blockPosition <= s.currentChainTip { @@ -241,6 +247,7 @@ listeningLoop: if err != nil { s.errors <- err } else if tip > s.currentChainTip { + s.logger.WithField("number", tip).Info("fetch chain tip") s.currentChainTip = tip s.trig() } @@ -305,39 +312,57 @@ func (s *Session) callHook(ctx context.Context, block *fetchedBlock) error { func (s *Session) fetchBlock(ctx context.Context, blockPosition uint64) *Future { return NewFuture(func() (interface{}, error) { - blck, err := s.ec.BlockByNumber( - ctx, - s.Chain.URL, - big.NewInt(int64(blockPosition)), + block := &fetchedBlock{} + err := backoff.RetryNotify( + func() error { + blck, err := s.ec.BlockByNumber( + ctx, + s.Chain.URL, + big.NewInt(int64(blockPosition)), + ) + + if err != nil { + // Retry on not found block + if errors.IsInvalidParameterError(err) { + return err + } + + return backoff.Permanent(err) + } + + block.block = blck + return nil + }, + backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3), + func(err error, duration time.Duration) { + s.logger.WithError(err).WithField("block", blockPosition).Warnf("error fetching block, retrying in %v...", duration) + }, ) + if err != nil { - errMessage := "failed to fetch block" - if !errors.IsNotFoundError(err) { - s.logger.WithError(err).WithField("block_number", blockPosition).Error(errMessage) - } - return nil, errors.ConnectionError(errMessage) + errMsg := "failed to fetch block" + s.logger.WithField("block", blockPosition).WithError(err).Error(errMsg) + return nil, errors.ConnectionError(errMsg) } - block := &fetchedBlock{block: blck} - - for _, tx := range blck.Transactions() { + for _, tx := range block.block.Transactions() { s.logger.WithField("tx_hash", tx.Hash().String()). - WithField("block_number", blck.NumberU64()).Debug("found transaction in block") + WithField("block_number", block.block.NumberU64()).Trace("found transaction in block") } - jobMap, err := s.fetchJobs(ctx, blck.Transactions()) + jobMap, err := s.matchPendingJobs(ctx, block.block.Transactions()) if err != nil { return nil, err } // TODO: pass batch variable by environment variable batch := 20 - for i := 0; i < blck.Transactions().Len(); i += batch { + for i := 0; i < block.block.Transactions().Len(); i += batch { j := i + batch - if j > blck.Transactions().Len() { - j = blck.Transactions().Len() + if j > block.block.Transactions().Len() { + j = block.block.Transactions().Len() } - jobs, err := awaitReceipts(s.fetchReceipts(ctx, blck.Transactions()[i:j], jobMap)) + jobs, err := awaitReceipts(s.fetchReceipts(ctx, block.block.Transactions()[i:j], jobMap)) if err != nil { return nil, err } @@ -348,57 +373,122 @@ func (s *Session) fetchBlock(ctx context.Context, blockPosition uint64) *Future }) } -func (s *Session) fetchJobs(ctx context.Context, transactions ethtypes.Transactions) (map[string]*entities.Job, error) { - jobMap := make(map[string]*entities.Job) +func (s *Session) matchPendingJobs(ctx context.Context, transactions ethtypes.Transactions) (map[string]*entities.Job, error) { + s.pendingJobMapMutex.Lock() + defer s.pendingJobMapMutex.Unlock() + jobMap := make(map[string]*entities.Job) if len(transactions) == 0 { return jobMap, nil } - for i := 0; i < transactions.Len(); i += MaxTxHashesLength { - size := i + MaxTxHashesLength - if size > transactions.Len() { - size = transactions.Len() - } - currTransactions := transactions[i:size] - var txHashes []string - for _, t := range currTransactions { - txHashes = append(txHashes, t.Hash().String()) - } + err := s.updatePendingJobs(ctx) + if err != nil { + return nil, err + } - // By design, we will receive 0 or 1 job per tx_hash in the filter because we filter by status PENDING - jobResponses, err := s.client.SearchJob(ctx, &entities.JobFilters{ - TxHashes: txHashes, - ChainUUID: s.Chain.UUID, - Status: entities.StatusPending, - }) - if err != nil { - s.logger.WithError(err).Error("failed to search jobs") - return nil, err + for idx := range transactions { + txHash := transactions[idx].Hash().String() + if job, ok := s.pendingJobMap[txHash]; ok { + jobMap[txHash] = job + delete(s.pendingJobMap, txHash) } + } - for _, jobResponse := range jobResponses { - s.logger.WithField("tx_hash", jobResponse.Transaction.Hash). - WithField("job", jobResponse.UUID).Debug("transaction was matched to a job") - - // Filter by the jobs belonging to same session CHAIN_UUID - jobMap[jobResponse.Transaction.Hash.String()] = &entities.Job{ - UUID: jobResponse.UUID, - ChainUUID: jobResponse.ChainUUID, - ScheduleUUID: jobResponse.ScheduleUUID, - TenantID: jobResponse.TenantID, - OwnerID: jobResponse.OwnerID, - Type: jobResponse.Type, - Labels: jobResponse.Labels, - Transaction: &jobResponse.Transaction, - CreatedAt: jobResponse.CreatedAt, - } + return jobMap, nil +} + +func (s *Session) updatePendingJobs(ctx context.Context) error { + // Initial job creation fetching all pending jobs + jobFilters := &entities.JobFilters{ + Status: entities.StatusPending, + ChainUUID: s.Chain.UUID, + } + + if !s.pendingJobLastCheckedAt.IsZero() { + jobFilters.UpdatedAfter = s.pendingJobLastCheckedAt + } + + s.pendingJobLastCheckedAt = time.Now() + + s.logger.WithField("updated_after", jobFilters.UpdatedAfter.Format("2006-01-02 15:04:05")). + Trace("fetching new pending jobs") + // We get all the pending jobs updated_after the last tick + jobResponses, err := s.client.SearchJob(ctx, jobFilters) + if err != nil { + s.logger.WithError(err).Error("failed to fetch pending jobs") + return err + } + + for _, jobResponse := range jobResponses { + // Filter by the jobs belonging to same session CHAIN_UUID + s.pendingJobMap[jobResponse.Transaction.Hash.String()] = &entities.Job{ + UUID: jobResponse.UUID, + ChainUUID: jobResponse.ChainUUID, + ScheduleUUID: jobResponse.ScheduleUUID, + TenantID: jobResponse.TenantID, + OwnerID: jobResponse.OwnerID, + Type: jobResponse.Type, + Labels: jobResponse.Labels, + Transaction: &jobResponse.Transaction, + CreatedAt: jobResponse.CreatedAt, } } - return jobMap, nil + return nil } +// func (s *Session) fetchJobs(ctx context.Context, transactions ethtypes.Transactions) (map[string]*entities.Job, error) { +// jobMap := make(map[string]*entities.Job) +// +// if len(transactions) == 0 { +// return jobMap, nil +// } +// +// for i := 0; i < transactions.Len(); i += MaxTxHashesLength { +// size := i + MaxTxHashesLength +// if size > transactions.Len() { +// size = transactions.Len() +// } +// currTransactions := transactions[i:size] +// var txHashes []string +// for _, t := range currTransactions { +// txHashes = append(txHashes, t.Hash().String()) +// } +// +// // By design, we will receive 0 or 1 job per tx_hash in the filter because we filter by status PENDING +// jobResponses, err := s.client.SearchJob(ctx, &entities.JobFilters{ +// TxHashes: txHashes, +// ChainUUID: s.Chain.UUID, +// Status: entities.StatusPending, +// }) +// if err != nil { +// s.logger.WithError(err).Error("failed to search jobs") +// return nil, err +// } +// +// for _, jobResponse := range jobResponses { +// s.logger.WithField("tx_hash", jobResponse.Transaction.Hash). +// WithField("job", jobResponse.UUID).Debug("transaction was matched to a job") +// +// // Filter by the jobs belonging to same session CHAIN_UUID +// jobMap[jobResponse.Transaction.Hash.String()] = &entities.Job{ +// UUID: jobResponse.UUID, +// ChainUUID: jobResponse.ChainUUID, +// ScheduleUUID: jobResponse.ScheduleUUID, +// TenantID: jobResponse.TenantID, +// OwnerID: jobResponse.OwnerID, +// Type: jobResponse.Type, +// Labels: jobResponse.Labels, +// Transaction: &jobResponse.Transaction, +// CreatedAt: jobResponse.CreatedAt, +// } +// } +// } +// +// return jobMap, nil +// } + func (s *Session) fetchReceipts(ctx context.Context, transactions ethtypes.Transactions, jobMap map[string]*entities.Job) []*Future { var futureJobs []*Future diff --git a/services/tx-listener/session/manager.go b/services/tx-listener/session/manager.go index e2cc3fee2..1530922cb 100644 --- a/services/tx-listener/session/manager.go +++ b/services/tx-listener/session/manager.go @@ -161,21 +161,23 @@ func (m *Manager) runSession(ctx context.Context, chain *dynamic.Chain) { logger.Info("listener session started") err := sess.session.Run(ctx) m.removeSession(chain.UUID) - if err != nil { + if err != nil && ctx.Err() == nil { logger.WithError(err).Error("failed to remove session") } - m.logger.Info("session stopped") + m.logger.WithField("chain", chain.UUID).Info("session stopped") m.wg.Done() }() } func (m *Manager) stopSession(ctx context.Context, chain *dynamic.Chain) { logger := m.logger.WithContext(ctx) - sess, ok := m.getSession(chain.UUID) - if ok { - logger.Debug("stopping session") + if sess, ok := m.getSession(chain.UUID); ok { + logger.WithField("chain", chain.UUID).Debug("stopping session") sess.cancel() + return } + + logger.WithField("chain", chain.UUID).Warn("trying to stop a not exiting session") } func (m *Manager) addSession(key string, sess *cancelableSession) { diff --git a/services/tx-sentry/app.go b/services/tx-sentry/app.go index 4ac275f20..a278cfa8c 100644 --- a/services/tx-sentry/app.go +++ b/services/tx-sentry/app.go @@ -81,7 +81,7 @@ func (sentry *TxSentry) listen(ctx context.Context) error { case t := <-ticker.C: lastCheckedAt := t.Add(-sentry.config.RefreshInterval) sentry.logger.WithField("updated_after", lastCheckedAt.Format("2006-01-02 15:04:05")). - Debug("fetching new pending jobs") + Trace("fetching new pending jobs") jobFilters.UpdatedAfter = lastCheckedAt err := sentry.createSessions(ctx, jobFilters) diff --git a/services/tx-sentry/config.go b/services/tx-sentry/config.go index 94237b410..1473ccba8 100644 --- a/services/tx-sentry/config.go +++ b/services/tx-sentry/config.go @@ -11,7 +11,7 @@ import ( const ( sentryRefreshIntervalFlag = "tx-sentry-refresh-interval" sentryRefreshIntervalViperKey = "tx-sentry.refresh-interval" - sentryRefreshIntervalDefault = 1 * time.Second + sentryRefreshIntervalDefault = 5 * time.Second sentryRefreshIntervalEnv = "TX_SENTRY_REFRESH_INTERVAL" ) diff --git a/services/tx-sentry/service/listeners/session_manager.go b/services/tx-sentry/service/listeners/session_manager.go index 7d5371aae..39d3058d1 100644 --- a/services/tx-sentry/service/listeners/session_manager.go +++ b/services/tx-sentry/service/listeners/session_manager.go @@ -58,12 +58,12 @@ func (manager *sessionManager) Start(ctx context.Context, job *entities.Job) { ctx = log.With(ctx, logger) if manager.hasSession(job.UUID) { - logger.Debug("job session already exists, skipping session creation") + logger.Trace("job session already exists, skipping session creation") return } if job.InternalData.RetryInterval == 0 { - logger.Debug("job session has retry strategy disabled") + logger.Trace("job session does not have any retry strategy") return } diff --git a/tests/features/sentry_retries.feature b/tests/features/sentry_retries.feature index 0ac0519c9..cd8bda3c1 100644 --- a/tests/features/sentry_retries.feature +++ b/tests/features/sentry_retries.feature @@ -179,7 +179,7 @@ Feature: Send transactions using tx-sentry | txTwoUUID | uuid | When I send "PUT" request to "{{global.api}}/jobs/{{txTwoUUID}}/start" Then the response code should be 202 - Then I sleep "5s" + Then I sleep "10s" When I send "GET" request to "{{global.api}}/schedules/{{scheduleUUID}}" Then the response code should be 200 And Response should have the following fields diff --git a/tests/service/stress/exported.go b/tests/service/stress/exported.go index 58aa3052c..fef7cd66a 100644 --- a/tests/service/stress/exported.go +++ b/tests/service/stress/exported.go @@ -7,6 +7,7 @@ import ( "github.com/consensys/orchestrate/pkg/backoff" "github.com/consensys/orchestrate/pkg/errors" + "github.com/consensys/orchestrate/pkg/sdk/client" "github.com/consensys/orchestrate/pkg/toolkit/app" ethclient "github.com/consensys/orchestrate/pkg/toolkit/ethclient/rpc" "github.com/sirupsen/logrus" @@ -121,6 +122,7 @@ func initComponents(ctx context.Context) { // Initialize Engine func() { engine.Init(ctx) + client.Init() }, func() { broker.InitSyncProducer(ctx) diff --git a/tests/service/stress/service.go b/tests/service/stress/service.go index b50e8b366..a10a23cff 100644 --- a/tests/service/stress/service.go +++ b/tests/service/stress/service.go @@ -62,7 +62,7 @@ func NewService(cfg *Config, producer: producer, items: []*workLoadItem{ {cfg.Iterations, cfg.Concurrency, "BatchDeployContract", units.BatchDeployContractTest}, - // {cfg.Iterations, cfg.Concurrency, "SendContractTxsTest", units.SendContractTxsTest}, + {cfg.Iterations, cfg.Concurrency, "SendContractTxsTest", units.SendContractTxsTest}, // {cfg.Iterations, cfg.Concurrency, "BatchPrivateTxsTest", units.BatchPrivateTxsTest}, }, } @@ -135,8 +135,8 @@ func (c *WorkLoadService) preRun(ctx context.Context) (context.Context, error) { for idx := 0; idx < nBesuNodes; idx++ { besuNode := c.cfg.gData.Nodes.Besu[idx] chainName := fmt.Sprintf("besu_%d-%s", idx, utils2.RandString(5)) - var cUUID string - ctx, cUUID, err = assets.RegisterNewChain(ctx, c.client, c.ec, proxyHost, chainName, &besuNode) + // var cUUID string + ctx, _, err = assets.RegisterNewChain(ctx, c.client, c.ec, proxyHost, chainName, &besuNode) if err != nil { return ctx, err } @@ -149,13 +149,13 @@ func (c *WorkLoadService) preRun(ctx context.Context) (context.Context, error) { } } - for jdx := 0; jdx < nPrivGroupPerChain; jdx++ { - ctx, err = assets.CreatePrivateGroup(ctx, c.ec, utils2.GetProxyURL(proxyHost, cUUID), besuNode.PrivateAddress, - utils2.RandShuffle(privNodeAddress)) - if err != nil { - return ctx, err - } - } + // for jdx := 0; jdx < nPrivGroupPerChain; jdx++ { + // ctx, err = assets.CreatePrivateGroup(ctx, c.ec, utils2.GetProxyURL(proxyHost, cUUID), besuNode.PrivateAddress, + // utils2.RandShuffle(privNodeAddress)) + // if err != nil { + // return ctx, err + // } + // } } return ctx, nil diff --git a/tests/service/stress/units/testDeployContract.go b/tests/service/stress/units/testDeployContract.go index 1719b7256..ccb7494b5 100644 --- a/tests/service/stress/units/testDeployContract.go +++ b/tests/service/stress/units/testDeployContract.go @@ -2,15 +2,16 @@ package units import ( "context" - "encoding/json" "github.com/consensys/orchestrate/pkg/errors" orchestrateclient "github.com/consensys/orchestrate/pkg/sdk/client" + clientutils "github.com/consensys/orchestrate/pkg/toolkit/app/http/client-utils" "github.com/consensys/orchestrate/pkg/toolkit/app/log" "github.com/consensys/orchestrate/pkg/types/api" "github.com/consensys/orchestrate/pkg/types/tx" "github.com/consensys/orchestrate/pkg/utils" + "github.com/consensys/orchestrate/services/api/service/controllers" utils2 "github.com/consensys/orchestrate/tests/service/stress/utils" utils3 "github.com/consensys/orchestrate/tests/utils" "github.com/consensys/orchestrate/tests/utils/chanregistry" @@ -39,7 +40,9 @@ func BatchDeployContractTest(ctx context.Context, cfg *WorkloadConfig, client or sReq, _ := json.Marshal(req) logger = logger.WithField("chain", req.ChainName).WithField("idem", idempotency) - _, err := client.SendDeployTransaction(ctx, req) + _, err := client.SendDeployTransaction(context.WithValue(ctx, clientutils.RequestHeaderKey, map[string]string{ + controllers.IdempotencyKeyHeader: idempotency, + }), req) if err != nil { if !errors.IsConnectionError(err) { diff --git a/tests/service/stress/units/testSendContractTxs.go b/tests/service/stress/units/testSendContractTxs.go index 5b37709f4..db20474ea 100644 --- a/tests/service/stress/units/testSendContractTxs.go +++ b/tests/service/stress/units/testSendContractTxs.go @@ -6,10 +6,12 @@ import ( "github.com/consensys/orchestrate/pkg/errors" orchestrateclient "github.com/consensys/orchestrate/pkg/sdk/client" + clientutils "github.com/consensys/orchestrate/pkg/toolkit/app/http/client-utils" "github.com/consensys/orchestrate/pkg/toolkit/app/log" "github.com/consensys/orchestrate/pkg/types/api" "github.com/consensys/orchestrate/pkg/types/tx" "github.com/consensys/orchestrate/pkg/utils" + "github.com/consensys/orchestrate/services/api/service/controllers" utils2 "github.com/consensys/orchestrate/tests/service/stress/utils" utils3 "github.com/consensys/orchestrate/tests/utils" "github.com/consensys/orchestrate/tests/utils/chanregistry" @@ -25,7 +27,7 @@ func SendContractTxsTest(ctx context.Context, cfg *WorkloadConfig, client orches t := utils2.NewEnvelopeTracker(chanReg, evlp, idempotency) // @TODO Read values from configuration or from context - toAddr := ethcommon.HexToAddress("0xFf80849F797a5feBC96F1737dc78135a79DaF83E") + toAddr := ethcommon.HexToAddress("0xa4470694BC2133f9cA7Ab71D3aC55870c560abfC") req := &api.SendTransactionRequest{ ChainName: cfg.chains[nChain].Name, Params: api.TransactionParams{ @@ -42,7 +44,9 @@ func SendContractTxsTest(ctx context.Context, cfg *WorkloadConfig, client orches sReq, _ := json.Marshal(req) logger = logger.WithField("chain", req.ChainName).WithField("idem", idempotency) - _, err := client.SendContractTransaction(ctx, req) + _, err := client.SendContractTransaction(context.WithValue(ctx, clientutils.RequestHeaderKey, map[string]string{ + controllers.IdempotencyKeyHeader: idempotency, + }), req) if err != nil { if !errors.IsConnectionError(err) {