Skip to content

Commit

Permalink
Execute single multi writing query instead of multiple single queries (
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman authored Jul 18, 2024
1 parent f3879f1 commit 796e6c9
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 145 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@v3
- name: Lint
uses: golangci/golangci-lint-action@v4.0.0
uses: golangci/golangci-lint-action@v6.0.1
with:
args: --timeout 10m --verbose
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ linters:
- gosec # Security problems
- gci
- misspell # Misspelled English words in comments
- gomnd
- mnd
- gofmt # Whether the code was gofmt-ed
- goimports # Unused imports
- revive
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ build-docker-nc: ## Builds a docker image with the node binary - but without bui

.PHONY: install-linter
install-linter: ## Installs the linter
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $$(go env GOPATH)/bin v1.52.2
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $$(go env GOPATH)/bin v1.59.1

.PHONY: lint
lint: ## Runs the linter
Expand Down
136 changes: 70 additions & 66 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"strings"

"github.com/0xPolygon/cdk-data-availability/types"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,27 +23,9 @@ const (
// getLastProcessedBlockSQL is a query that returns the last processed block for a given task
getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;`

// storeUnresolvedBatchesSQL is a query that stores unresolved batches in the database
storeUnresolvedBatchesSQL = `
INSERT INTO data_node.unresolved_batches (num, hash)
VALUES ($1, $2)
ON CONFLICT (num, hash) DO NOTHING;
`

// getUnresolvedBatchKeysSQL is a query that returns the unresolved batch keys from the database
getUnresolvedBatchKeysSQL = `SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;`

// deleteUnresolvedBatchKeysSQL is a query that deletes the unresolved batch keys from the database
deleteUnresolvedBatchKeysSQL = `DELETE FROM data_node.unresolved_batches WHERE num = $1 AND hash = $2;`

// storeOffChainDataSQL is a query that stores offchain data in the database
storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value, batch_num)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num;
`

// getOffchainDataSQL is a query that returns the offchain data for a given key
getOffchainDataSQL = `
SELECT key, value, batch_num
Expand Down Expand Up @@ -175,27 +158,17 @@ func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64,

// StoreUnresolvedBatchKeys stores unresolved batch keys in the database
func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
tx, err := db.pg.BeginTxx(ctx, nil)
if err != nil {
return err
}

stmt, err := tx.PreparexContext(ctx, storeUnresolvedBatchesSQL)
if err != nil {
return err
if len(bks) == 0 {
return nil
}

for _, bk := range bks {
if _, err = stmt.ExecContext(ctx, bk.Number, bk.Hash.Hex()); err != nil {
if txErr := tx.Rollback(); txErr != nil {
return fmt.Errorf("%v: rollback caused by %v", txErr, err)
}
query, args := buildBatchKeysInsertQuery(bks)

return err
}
if _, err := db.pg.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("failed to store unresolved batches: %w", err)
}

return tx.Commit()
return nil
}

// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
Expand Down Expand Up @@ -230,52 +203,43 @@ func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types

// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database
func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error {
tx, err := db.pg.BeginTxx(ctx, nil)
if err != nil {
return err
if len(bks) == 0 {
return nil
}

stmt, err := tx.PreparexContext(ctx, deleteUnresolvedBatchKeysSQL)
if err != nil {
return err
const columnsAffected = 2

args := make([]interface{}, len(bks)*columnsAffected)
values := make([]string, len(bks))
for i, bk := range bks {
values[i] = fmt.Sprintf("($%d, $%d)", i*columnsAffected+1, i*columnsAffected+2) //nolint:mnd
args[i*columnsAffected] = bk.Number
args[i*columnsAffected+1] = bk.Hash.Hex()
}

for _, bk := range bks {
if _, err = stmt.ExecContext(ctx, bk.Number, bk.Hash.Hex()); err != nil {
if txErr := tx.Rollback(); txErr != nil {
return fmt.Errorf("%v: rollback caused by %v", txErr, err)
}
query := fmt.Sprintf(`
DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (%s);
`, strings.Join(values, ","))

return err
}
if _, err := db.pg.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("failed to delete unresolved batches: %w", err)
}

return tx.Commit()
return nil
}

// StoreOffChainData stores and array of key values in the Db
func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData) error {
tx, err := db.pg.BeginTxx(ctx, nil)
if err != nil {
return err
func (db *pgDB) StoreOffChainData(ctx context.Context, ods []types.OffChainData) error {
if len(ods) == 0 {
return nil
}

stmt, err := tx.PreparexContext(ctx, storeOffChainDataSQL)
if err != nil {
return err
}

for _, d := range od {
if _, err = stmt.ExecContext(ctx, d.Key.Hex(), common.Bytes2Hex(d.Value), d.BatchNum); err != nil {
if txErr := tx.Rollback(); txErr != nil {
return fmt.Errorf("%v: rollback caused by %v", txErr, err)
}

return err
}
query, args := buildOffchainDataInsertQuery(ods)
if _, err := db.pg.ExecContext(ctx, query, args...); err != nil {
return fmt.Errorf("failed to store offchain data: %w", err)
}

return tx.Commit()
return nil
}

// GetOffChainData returns the value identified by the key
Expand Down Expand Up @@ -386,3 +350,43 @@ func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64,

return gaps, nil
}

// buildBatchKeysInsertQuery builds the query to insert unresolved batch keys
func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {
const columnsAffected = 2

args := make([]interface{}, len(bks)*columnsAffected)
values := make([]string, len(bks))
for i, bk := range bks {
values[i] = fmt.Sprintf("($%d, $%d)", i*columnsAffected+1, i*columnsAffected+2) //nolint:mnd
args[i*columnsAffected] = bk.Number
args[i*columnsAffected+1] = bk.Hash.Hex()
}

return fmt.Sprintf(`
INSERT INTO data_node.unresolved_batches (num, hash)
VALUES %s
ON CONFLICT (num, hash) DO NOTHING;
`, strings.Join(values, ",")), args
}

// buildOffchainDataInsertQuery builds the query to insert offchain data
func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface{}) {
const columnsAffected = 3

args := make([]interface{}, len(ods)*columnsAffected)
values := make([]string, len(ods))
for i, od := range ods {
values[i] = fmt.Sprintf("($%d, $%d, $%d)", i*columnsAffected+1, i*columnsAffected+2, i*columnsAffected+3) //nolint:mnd
args[i*columnsAffected] = od.Key.Hex()
args[i*columnsAffected+1] = common.Bytes2Hex(od.Value)
args[i*columnsAffected+2] = od.BatchNum
}

return fmt.Sprintf(`
INSERT INTO data_node.offchain_data (key, value, batch_num)
VALUES %s
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num;
`, strings.Join(values, ",")), args
}
Loading

0 comments on commit 796e6c9

Please sign in to comment.