Skip to content

Commit

Permalink
Tx listener optimizations and enhancements (#1018)
Browse files Browse the repository at this point in the history
* fetch pending jobs on new block instead of querying per tx_hash

* trigger only on new block

* fixed retry on 'not found block' | fixed pending jobs

* small adjusment on e2e

* no caching latest blocks

* improve proxy logging logic

* add index on idempotency-key

* update changelog

* improve stress tests

* fix unit testes

* rewrite change log
  • Loading branch information
ggarri authored May 24, 2022
1 parent 29dc660 commit b9eec15
Show file tree
Hide file tree
Showing 26 changed files with 266 additions and 156 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ jobs:
STRESS_ITERATIONS: <<parameters.stress_iteration>>
STRESS_TIMEOUT: <<parameters.stress_timeout>>
KAFKA_CONSUMER_GROUP_NAME: "stress"
command: ./build/bin/test stress
command: make stress-ci

run-performance:
docker:
Expand Down
13 changes: 7 additions & 6 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ linters:

issues:
max-same-issues: 10
exclude-use-default: true

run:
deadline: 20m
skip-dirs:
- .gocache
- /pkg/go-ethereum
- /build
- /public
- pkg/go-ethereum
- build
- public
- mock
- /scripts
- /vendor
- /tests
- scripts
- vendor
- tests
skip-files:
- ".*\\.pb(\\.gw)?\\.go$"
- pkg/toolkit/app/http/handler/dashboard/genstatic/gen.go
Expand Down
10 changes: 7 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Orchestrate Release Notes

## v21.12.7 (WIP)
## v21.12.7 (2022-05-24)
### 🛠 Bug fixes
* Reduced `tx-listener` service demands on `api` by usage of an optional in-memory cache. This is an optional feature
which can be enabled using `API_CACHE_TTL` environment variable.
* Stop printing chain-proxy access logs errors when `ACCESSLOG_ENABLED=false`.

### 🛠 Enhancements
* Reduced `tx-listener` services request to `orchestrate-api` can be reduced by usage of an
optional in-memory cache. To be enabled set a duration using `API_CACHE_TTL` environment variable.
* Reduced database I/O usage by ~60%.

## v21.12.6 (2022-05-04)
### 🛠 Bug fixes
Expand Down
13 changes: 3 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ ifeq ($(UNAME_S),Darwin)
VEGETA_BIN_URL = https://github.com/tsenart/vegeta/releases/download/v12.8.4/vegeta_12.8.4_darwin_amd64.tar.gz
endif

ifneq (,$(wildcard ./.env))
include .env
export
endif

.PHONY: all run-coverage coverage fmt fmt-check vet lint misspell-check misspell race tools help

networks:
Expand Down Expand Up @@ -77,7 +72,7 @@ e2e: run-e2e
@$(OPEN) build/report/report.html 2>/dev/null

run-stress: gobuild-stress
@./build/bin/test stress
@docker-compose -f docker-compose.e2e.yml up -V stress

e2e: run-e2e
@docker-compose -f docker-compose.e2e.yml up --build report
Expand All @@ -92,8 +87,6 @@ e2e-ci: gobuild-e2e
deploy-remote-env:
@bash ./scripts/deploy-remote-env.sh

stress: run-stress
@exit $(docker inspect orchestrate_stress_1 --format='{{.State.ExitCode}}')

stress-ci:
@docker-compose -f docker-compose.dev.yml up stress
Expand Down Expand Up @@ -172,10 +165,10 @@ bootstrap-deps: bootstrap ## Wait for dependencies to be ready
@bash scripts/bootstrap-deps.sh

gobuild-e2e: ## Build Orchestrate e2e Docker image
@CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./build/bin/test ./tests/cmd
@CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./build/bin/e2e ./tests/cmd

gobuild-stress: ## Build Orchestrate stress binary
@CGO_ENABLED=0 go build -o ./build/bin/test ./tests/stress/cmd
@CGO_ENABLED=0 go build -o ./build/bin/stress ./tests/cmd

orchestrate: gobuild ## Start Orchestrate
@docker-compose -f docker-compose.dev.yml up --force-recreate --build -d $(ORCH_SERVICES)
Expand Down
12 changes: 10 additions & 2 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ x-default-variables: &default-variables

x-container-common: &container-common
image: golang:1.16.9
restart: ${CONTAINER_RESTART-on-failure}
# restart: ${CONTAINER_RESTART-on-failure}
entrypoint: /bin/main
tty: true
networks:
Expand Down Expand Up @@ -136,13 +136,21 @@ services:
<<: *container-common
ports:
- 8082:8082
# - 2345:2345
environment:
<<: *default-variables
<<: *tx-listener-common
# volumes:
# - ./build/bin/orchestrate:/bin/main:ro
# - /usr/local/bin/dlv:/usr/bin/dlv
depends_on:
- api
command: tx-listener run

# entrypoint:
# - sh
# - -c
# - |
# dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec /bin/main tx-listener run
networks:
orchestrate:
external:
Expand Down
40 changes: 23 additions & 17 deletions docker-compose.e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,35 @@ x-container-common: &container-common
- orchestrate
tty: true

x-e2e-variables: &e2e-variables
CUCUMBER_OUTPUTPATH: "/report/output/report.json"
CUCUMBER_PATHS: "/features"
ARTIFACTS_PATH: "/artifacts"
CUCUMBER_FORMAT: "cucumber"
CUCUMBER_CONCURRENCY: ${CUCUMBER_CONCURRENCY-}
CUCUMBER_TAGS: ${CUCUMBER_TAGS-}
TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-}
CUCUMBER_STEPS_TIMEOUT: ${CUCUMBER_STEPS_TIMEOUT-30s}
KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-e2e}

x-stress-variables: &stress-variables
ARTIFACTS_PATH: "/artifacts"
STRESS_CONCURRENCY: ${STRESS_CONCURRENCY-30}
STRESS_ITERATIONS: ${STRESS_ITERATIONS-500}
STRESS_TIMEOUT: ${STRESS_TIMEOUT-15m}
TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-}
KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-stress}

services:
e2e:
<<: *container-common
environment:
<<: *default-variables
CUCUMBER_OUTPUTPATH: "/report/output/report.json"
CUCUMBER_PATHS: "/features"
ARTIFACTS_PATH: "/artifacts"
CUCUMBER_FORMAT: "cucumber"
CUCUMBER_CONCURRENCY: ${CUCUMBER_CONCURRENCY-}
CUCUMBER_TAGS: ${CUCUMBER_TAGS-}
TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-}
CUCUMBER_STEPS_TIMEOUT: ${CUCUMBER_STEPS_TIMEOUT-30s}
KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-e2e}
<<: *e2e-variables
restart: "no"
command: e2e
volumes:
- ./build/bin/test:/bin/main
- ./build/bin/e2e:/bin/main
- ./build/report:/report/output
- ./tests/features:/features
- ./tests/artifacts:/artifacts
Expand All @@ -72,16 +83,11 @@ services:
<<: *container-common
environment:
<<: *default-variables
ARTIFACTS_PATH: "/artifacts"
STRESS_CONCURRENCY: "15"
STRESS_ITERATIONS: "100"
STRESS_TIMEOUT: "5m"
TEST_GLOBAL_DATA: ${TEST_GLOBAL_DATA-}
KAFKA_CONSUMER_GROUP_NAME: ${KAFKA_CONSUMER_GROUP_NAME-stress}
<<: *stress-variables
restart: "no"
command: stress
volumes:
- ./build/bin/test:/bin/main
- ./build/bin/stress:/bin/main
- ./tests/artifacts:/artifacts

networks:
Expand Down
2 changes: 1 addition & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func EthereumError(format string, a ...interface{}) *ierror.Error {
return Errorf(Ethereum, format, a...)
}

// IsEthereumError indicate whether an error is an Etehreum error
// IsEthereumError indicate whether an error is an Ethereum error
func IsEthereumError(err error) bool {
return isErrorClass(FromError(err).GetCode(), Ethereum)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/toolkit/app/http/config/dynamic/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ func FromTraefikMiddleware(middleware *traefikdynamic.Middleware) *Middleware {
}
}

func ToTraefikMiddleware(middleware *Middleware) *traefikdynamic.Middleware {
return middleware.Middleware
}

// +k8s:deepcopy-gen=true

type Auth struct{}
Expand Down
1 change: 0 additions & 1 deletion pkg/toolkit/ethclient/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ func processBlockResult(header **ethtypes.Header, body **Body) ParseResultFunc {
}

if len(raw) == 0 {
// Block was not found
return errors.NotFoundError("block not found")
}

Expand Down
2 changes: 1 addition & 1 deletion services/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func NewAPI(
apiHandlerOpt,
httpCacheOpt,
reverseProxyOpt,
app.ProviderOpt(NewProvider(ucs.SearchChains(), time.Second, cfg.Proxy.ProxyCacheTTL)),
app.ProviderOpt(NewProvider(ucs.SearchChains(), time.Second, cfg.Proxy.ProxyCacheTTL, cfg.App.HTTP.AccessLog)),
)
}

Expand Down
4 changes: 2 additions & 2 deletions services/api/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ const (
InternalProvider = "internal"
)

func NewProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration) provider.Provider {
func NewProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration, accessLog bool) provider.Provider {
prvdr := aggregator.New()
prvdr.AddProvider(NewInternalProvider())
prvdr.AddProvider(proxy.NewChainsProxyProvider(searchChains, refresh, proxyCacheTTL))
prvdr.AddProvider(proxy.NewChainsProxyProvider(searchChains, refresh, proxyCacheTTL, accessLog))
return prvdr

}
Expand Down
2 changes: 1 addition & 1 deletion services/api/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestNewChainsProxyConfig(t *testing.T) {
}

for i, test := range testSet {
cfg := proxy.NewProxyConfig(test.chains, nil)
cfg := proxy.NewProxyConfig(test.chains, nil, true)
expectedCfg := test.expectedCfg(dynamic.NewConfig())
assert.Equal(t, expectedCfg, cfg, "Chain-registry - Store (%d/%d): expected %v but got %v", i+1, len(testSet), expectedCfg, cfg)
}
Expand Down
2 changes: 1 addition & 1 deletion services/api/proxy/http-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func HTTPCacheRequest(ctx context.Context, req *http.Request) (c bool, k string,

cacheKey := fmt.Sprintf("%s(%s)", msg.Method, string(msg.Params))
if msg.Method == "eth_getBlockByNumber" && strings.Contains(string(msg.Params), "latest") {
return true, cacheKey, time.Second, nil
return false, "", 0, nil
}

return true, cacheKey, 0, nil
Expand Down
8 changes: 3 additions & 5 deletions services/api/proxy/http-cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestHTTPCacheRequest_Valid(t *testing.T) {
assert.Equal(t, "eth_getTransactionReceipt([\"0x7d231ca6a5fc03f5365b3d62dcfe372ed5c13ac7014d016b52ed72094919556c\"])", k)
}

func TestHTTPCacheRequest_ValidWithCustomTTL(t *testing.T) {
func TestHTTPCacheRequest_IgnoreLatestBlock(t *testing.T) {
msg := ethclient.JSONRpcMessage{
Method: "eth_getBlockByNumber",
}
Expand All @@ -39,11 +39,9 @@ func TestHTTPCacheRequest_ValidWithCustomTTL(t *testing.T) {
body, _ := json.Marshal(msg)
req := httptest.NewRequest(http.MethodPost, "http://localhost", bytes.NewReader(body))

c, k, ttl, err := HTTPCacheRequest(req.Context(), req)
c, _,_, err := HTTPCacheRequest(req.Context(), req)
assert.NoError(t, err)
assert.True(t, c)
assert.Equal(t, time.Second, ttl)
assert.Equal(t, "eth_getBlockByNumber([\"latest\"])", k)
assert.False(t, c)
}

func TestHTTPCacheRequest_IgnoreReqType(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions services/api/proxy/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import (

const ChainsProxyProvider = "chains-proxy"

func NewChainsProxyProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration) provider.Provider {
func NewChainsProxyProvider(searchChains usecases.SearchChainsUseCase, refresh time.Duration, proxyCacheTTL *time.Duration, accessLog bool) provider.Provider {
poller := func(ctx context.Context) (provider.Message, error) {
// Wildcard user including chains owned by individual users (Special rights)
chains, err := searchChains.Execute(ctx, &entities.ChainFilters{}, multitenancy.NewInternalAdminUser())
if err != nil {
return nil, err
}

return dynamic.NewMessage(ChainsProxyProvider, NewProxyConfig(chains, proxyCacheTTL)), nil
return dynamic.NewMessage(ChainsProxyProvider, NewProxyConfig(chains, proxyCacheTTL, accessLog)), nil
}

return poll.New(poller, refresh)
}

func NewProxyConfig(chains []*entities.Chain, proxyCacheTTL *time.Duration) *dynamic.Configuration {
func NewProxyConfig(chains []*entities.Chain, proxyCacheTTL *time.Duration, accessLog bool) *dynamic.Configuration {
cfg := dynamic.NewConfig()

for _, chain := range chains {
Expand All @@ -44,13 +44,13 @@ func NewProxyConfig(chains []*entities.Chain, proxyCacheTTL *time.Duration) *dyn
multitenancyMid = fmt.Sprintf("auth-%v:%s@multitenancy", chain.TenantID, chain.OwnerID)
}

middlewares := []string{
"chain-proxy-accesslog@internal",
"auth@multitenancy",
multitenancyMid,
"strip-path@internal",
middlewares := []string{}
if accessLog {
middlewares = append(middlewares, "chain-proxy-accesslog@internal")
}

middlewares = append(middlewares, "auth@multitenancy", multitenancyMid, "strip-path@internal")

cfg.HTTP.Middlewares[multitenancyMid] = &dynamic.Middleware{
MultiTenancy: &dynamic.MultiTenancy{
Tenant: chain.TenantID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func upgradeRemoveFkeysContracts(db migrations.DB) error {
log.Debug("Applying improvements contract tables...")
log.Debug("Applying improvements contract tables and add index on idempotency-key...")
_, err := db.Exec(`
CREATE UNIQUE INDEX tags_repository_name_idx ON tags (repository_id, (lower(name)));
ALTER TABLE repositories drop constraint repositories_name_key;
Expand All @@ -15,17 +15,19 @@ CREATE UNIQUE INDEX repositories_name_idx ON repositories ((lower(name)));
ALTER TABLE transactions
ADD COLUMN contract_name TEXT,
ADD COLUMN contract_tag TEXT;
CREATE INDEX transactions_hash_idem_idx on transaction_requests (idempotency_key);
`)
if err != nil {
return err
}
log.Info("Applied improvements on contract tables successfully")
log.Info("Applied improvements on contract tables and added index on idempotency-key successfully")

return nil
}

func downgradeRemoveFkeysContracts(db migrations.DB) error {
log.Debug("Undoing improvements on contract tables...")
log.Debug("Undoing improvements on contract tables and removing index on idempotency-key...")
_, err := db.Exec(`
ALTER TABLE transactions
DROP COLUMN contract_name,
Expand All @@ -34,11 +36,13 @@ ALTER TABLE transactions
DROP INDEX repositories_name_idx;
ALTER TABLE repositories ADD CONSTRAINT repositories_name_key UNIQUE (name);
DROP INDEX tags_repository_name_idx;
DROP INDEX transactions_hash_idem_idx;
`)
if err != nil {
return err
}
log.Info("Downgraded improvements on contract tables")
log.Info("Downgraded improvements on contract tables and removed index on idempotency-key")

return nil
}
Expand Down
Loading

0 comments on commit b9eec15

Please sign in to comment.