Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
rachit77 committed Nov 26, 2024
1 parent e17fe08 commit 7291aeb
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 653 deletions.
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,3 @@ packages:
SequencerTracker:
config:
filename: sequencer_tracker.generated.go
github.com/0xPolygon/cdk-data-availability/services/status:
config:
interfaces:
GapsDetector:
config:
filename: gaps_detector.generated.go
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func start(cliCtx *cli.Context) error {
[]rpc.Service{
{
Name: status.APISTATUS,
Service: status.NewEndpoints(storage, batchSynchronizer),
Service: status.NewEndpoints(storage),
},
{
Name: sync.APISYNC,
Expand Down
87 changes: 14 additions & 73 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,20 @@ const (

// getOffchainDataSQL is a query that returns the offchain data for a given key
getOffchainDataSQL = `
SELECT key, value, batch_num
SELECT key, value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`

// listOffchainDataSQL is a query that returns the offchain data for a given list of keys
listOffchainDataSQL = `
SELECT key, value, batch_num
SELECT key, value
FROM data_node.offchain_data
WHERE key IN (?);
`

// countOffchainDataSQL is a query that returns the count of rows in the offchain_data table
countOffchainDataSQL = "SELECT COUNT(*) FROM data_node.offchain_data;"

// selectOffchainDataGapsSQL is a query that returns the gaps in the offchain_data table
selectOffchainDataGapsSQL = `
WITH numbered_batches AS (
SELECT
batch_num,
ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number
FROM data_node.offchain_data
)
SELECT
nb1.batch_num AS current_batch_num,
nb2.batch_num AS next_batch_num
FROM
numbered_batches nb1
LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1
WHERE
nb1.batch_num IS NOT NULL
AND nb2.batch_num IS NOT NULL
AND nb1.batch_num + 1 <> nb2.batch_num;`
)

var (
Expand All @@ -81,7 +62,6 @@ type DB interface {
ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error)
StoreOffChainData(ctx context.Context, od []types.OffChainData) error
CountOffchainData(ctx context.Context) (uint64, error)
DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error)
}

// DB is the database layer of the data node
Expand All @@ -93,7 +73,6 @@ type pgDB struct {
getUnresolvedBatchKeysStmt *sqlx.Stmt
getOffChainDataStmt *sqlx.Stmt
countOffChainDataStmt *sqlx.Stmt
detectOffChainDataGapsStmt *sqlx.Stmt
}

// New instantiates a DB
Expand Down Expand Up @@ -123,19 +102,13 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) {
return nil, fmt.Errorf("failed to prepare the count offchain data statement: %w", err)
}

detectOffChainDataGapsStmt, err := pg.PreparexContext(ctx, selectOffchainDataGapsSQL)
if err != nil {
return nil, fmt.Errorf("failed to prepare the detect offchain data gaps statement: %w", err)
}

return &pgDB{
pg: pg,
storeLastProcessedBlockStmt: storeLastProcessedBlockStmt,
getLastProcessedBlockStmt: getLastProcessedBlockStmt,
getUnresolvedBatchKeysStmt: getUnresolvedBatchKeysStmt,
getOffChainDataStmt: getOffChainDataStmt,
countOffChainDataStmt: countOffChainDataStmt,
detectOffChainDataGapsStmt: detectOffChainDataGapsStmt,
}, nil
}

Expand Down Expand Up @@ -245,9 +218,8 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, ods []types.OffChainData)
// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) {
data := struct {
Key string `db:"key"`
Value string `db:"value"`
BatchNum uint64 `db:"batch_num"`
Key string `db:"key"`
Value string `db:"value"`
}{}

if err := db.getOffChainDataStmt.QueryRowxContext(ctx, key.Hex()).StructScan(&data); err != nil {
Expand All @@ -259,9 +231,8 @@ func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.Of
}

return &types.OffChainData{
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
BatchNum: data.BatchNum,
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
}, nil
}

Expand Down Expand Up @@ -292,9 +263,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ
defer rows.Close()

type row struct {
Key string `db:"key"`
Value string `db:"value"`
BatchNum uint64 `db:"batch_num"`
Key string `db:"key"`
Value string `db:"value"`
}

list := make([]types.OffChainData, 0, len(keys))
Expand All @@ -305,9 +275,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ
}

list = append(list, types.OffChainData{
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
BatchNum: data.BatchNum,
Key: common.HexToHash(data.Key),
Value: common.FromHex(data.Value),
})
}

Expand All @@ -324,33 +293,6 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) {
return count, nil
}

// DetectOffchainDataGaps returns the number of gaps in the offchain_data table
func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) {
rows, err := db.detectOffChainDataGapsStmt.QueryxContext(ctx)
if err != nil {
return nil, err
}

defer rows.Close()

type row struct {
CurrentBatchNum uint64 `db:"current_batch_num"`
NextBatchNum uint64 `db:"next_batch_num"`
}

gaps := make(map[uint64]uint64)
for rows.Next() {
var data row
if err = rows.StructScan(&data); err != nil {
return nil, err
}

gaps[data.CurrentBatchNum] = data.NextBatchNum
}

return gaps, nil
}

// buildBatchKeysInsertQuery builds the query to insert unresolved batch keys
func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {
const columnsAffected = 2
Expand All @@ -372,24 +314,23 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) {

// buildOffchainDataInsertQuery builds the query to insert offchain data
func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface{}) {
const columnsAffected = 3
const columnsAffected = 2

// Remove duplicates from the given offchain data
ods = types.RemoveDuplicateOffChainData(ods)

args := make([]interface{}, len(ods)*columnsAffected)
values := make([]string, len(ods))
for i, od := range ods {
values[i] = fmt.Sprintf("($%d, $%d, $%d)", i*columnsAffected+1, i*columnsAffected+2, i*columnsAffected+3) //nolint:mnd
values[i] = fmt.Sprintf("($%d, $%d)", i*columnsAffected+1, i*columnsAffected+2) //nolint:mnd
args[i*columnsAffected] = od.Key.Hex()
args[i*columnsAffected+1] = common.Bytes2Hex(od.Value)
args[i*columnsAffected+2] = od.BatchNum
}

return fmt.Sprintf(`
INSERT INTO data_node.offchain_data (key, value, batch_num)
INSERT INTO data_node.offchain_data (key, value)
VALUES %s
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num;
SET value = EXCLUDED.value;
`, strings.Join(values, ",")), args
}
Loading

0 comments on commit 7291aeb

Please sign in to comment.