diff --git a/.github/workflows/regression-tests.yml b/.github/workflows/regression-tests.yml index 4309f85..57024a2 100644 --- a/.github/workflows/regression-tests.yml +++ b/.github/workflows/regression-tests.yml @@ -30,7 +30,7 @@ jobs: uses: actions/checkout@v4 with: repository: 0xPolygon/kurtosis-cdk - ref: "v0.2.22" + ref: v0.2.24 path: kurtosis-cdk - name: Install Kurtosis CDK tools diff --git a/.mockery.yaml b/.mockery.yaml index bb55d44..0256cef 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -36,9 +36,3 @@ packages: SequencerTracker: config: filename: sequencer_tracker.generated.go - github.com/0xPolygon/cdk-data-availability/services/status: - config: - interfaces: - GapsDetector: - config: - filename: gaps_detector.generated.go diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..fa6bcd3 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,21 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run DAC", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/cmd", + "args": [ + "run", + "-cfg", + "test/config/test.local.toml", + ], + "cwd": ".", + }, + ] +} \ No newline at end of file diff --git a/client/client_test.go b/client/client_test.go index 7211484..2251a47 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -28,12 +28,12 @@ func TestClient_GetStatus(t *testing.T) { }{ { name: "successfully got status", - result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"backfill_progress":5}}`, + result: `{"result":{"version":"v1.0.0","uptime":"123","key_count":2,"last_synchronized_block":5}}`, status: &types.DACStatus{ - Uptime: "123", - Version: "v1.0.0", - KeyCount: 2, - BackfillProgress: 5, + Uptime: "123", + Version: "v1.0.0", + KeyCount: 2, + LastSynchronizedBlock: 5, }, }, { diff --git a/cmd/main.go b/cmd/main.go index 409c2fd..38560c9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -154,7 +154,7 @@ func start(cliCtx *cli.Context) error { []rpc.Service{ { Name: status.APISTATUS, - Service: status.NewEndpoints(storage, batchSynchronizer), + Service: status.NewEndpoints(storage), }, { Name: sync.APISYNC, diff --git a/db/db.go b/db/db.go index c4e4bed..05144fa 100644 --- a/db/db.go +++ b/db/db.go @@ -15,52 +15,32 @@ import ( const ( // storeLastProcessedBlockSQL is a query that stores the last processed block for a given task storeLastProcessedBlockSQL = ` - INSERT INTO data_node.sync_tasks (task, block) - VALUES ($1, $2) - ON CONFLICT (task) DO UPDATE - SET block = EXCLUDED.block, processed = NOW();` + UPDATE data_node.sync_tasks + SET block = $2, processed = NOW() + WHERE task = $1;` // getLastProcessedBlockSQL is a query that returns the last processed block for a given task getLastProcessedBlockSQL = `SELECT block FROM data_node.sync_tasks WHERE task = $1;` - // getUnresolvedBatchKeysSQL is a query that returns the unresolved batch keys from the database - getUnresolvedBatchKeysSQL = `SELECT num, hash FROM data_node.unresolved_batches LIMIT $1;` + // getMissingBatchKeysSQL is a query that returns the missing batch keys from the database + getMissingBatchKeysSQL = `SELECT num, hash FROM data_node.missing_batches LIMIT $1;` // getOffchainDataSQL is a query that returns the offchain data for a given key getOffchainDataSQL = ` - SELECT key, value, batch_num + SELECT key, value FROM data_node.offchain_data WHERE key = $1 LIMIT 1; ` // listOffchainDataSQL is a query that returns the offchain data for a given list of keys listOffchainDataSQL = ` - SELECT key, value, batch_num + SELECT key, value FROM data_node.offchain_data WHERE key IN (?); ` // countOffchainDataSQL is a query that returns the count of rows in the offchain_data table countOffchainDataSQL = "SELECT COUNT(*) FROM data_node.offchain_data;" - - // selectOffchainDataGapsSQL is a query that returns the gaps in the offchain_data table - selectOffchainDataGapsSQL = ` - WITH numbered_batches AS ( - SELECT - batch_num, - ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number - FROM data_node.offchain_data - ) - SELECT - nb1.batch_num AS current_batch_num, - nb2.batch_num AS next_batch_num - FROM - numbered_batches nb1 - LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1 - WHERE - nb1.batch_num IS NOT NULL - AND nb2.batch_num IS NOT NULL - AND nb1.batch_num + 1 <> nb2.batch_num;` ) var ( @@ -73,15 +53,14 @@ type DB interface { StoreLastProcessedBlock(ctx context.Context, block uint64, task string) error GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) - StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error - GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) - DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error + StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error + GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) + DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error) StoreOffChainData(ctx context.Context, od []types.OffChainData) error CountOffchainData(ctx context.Context) (uint64, error) - DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) } // DB is the database layer of the data node @@ -90,10 +69,9 @@ type pgDB struct { storeLastProcessedBlockStmt *sqlx.Stmt getLastProcessedBlockStmt *sqlx.Stmt - getUnresolvedBatchKeysStmt *sqlx.Stmt + getMissingBatchKeysStmt *sqlx.Stmt getOffChainDataStmt *sqlx.Stmt countOffChainDataStmt *sqlx.Stmt - detectOffChainDataGapsStmt *sqlx.Stmt } // New instantiates a DB @@ -108,9 +86,9 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) { return nil, fmt.Errorf("failed to prepare the get last processed block statement: %w", err) } - getUnresolvedBatchKeysStmt, err := pg.PreparexContext(ctx, getUnresolvedBatchKeysSQL) + getMissingBatchKeysStmt, err := pg.PreparexContext(ctx, getMissingBatchKeysSQL) if err != nil { - return nil, fmt.Errorf("failed to prepare the get unresolved batch keys statement: %w", err) + return nil, fmt.Errorf("failed to prepare the get missing batch keys statement: %w", err) } getOffChainDataStmt, err := pg.PreparexContext(ctx, getOffchainDataSQL) @@ -123,19 +101,13 @@ func New(ctx context.Context, pg *sqlx.DB) (DB, error) { return nil, fmt.Errorf("failed to prepare the count offchain data statement: %w", err) } - detectOffChainDataGapsStmt, err := pg.PreparexContext(ctx, selectOffchainDataGapsSQL) - if err != nil { - return nil, fmt.Errorf("failed to prepare the detect offchain data gaps statement: %w", err) - } - return &pgDB{ pg: pg, storeLastProcessedBlockStmt: storeLastProcessedBlockStmt, getLastProcessedBlockStmt: getLastProcessedBlockStmt, - getUnresolvedBatchKeysStmt: getUnresolvedBatchKeysStmt, + getMissingBatchKeysStmt: getMissingBatchKeysStmt, getOffChainDataStmt: getOffChainDataStmt, countOffChainDataStmt: countOffChainDataStmt, - detectOffChainDataGapsStmt: detectOffChainDataGapsStmt, }, nil } @@ -156,8 +128,8 @@ func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, return lastBlock, nil } -// StoreUnresolvedBatchKeys stores unresolved batch keys in the database -func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { +// StoreMissingBatchKeys stores missing batch keys in the database +func (db *pgDB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { if len(bks) == 0 { return nil } @@ -165,15 +137,19 @@ func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchK query, args := buildBatchKeysInsertQuery(bks) if _, err := db.pg.ExecContext(ctx, query, args...); err != nil { - return fmt.Errorf("failed to store unresolved batches: %w", err) + batchNumbers := make([]string, len(bks)) + for i, bk := range bks { + batchNumbers[i] = fmt.Sprintf("%d", bk.Number) + } + return fmt.Errorf("failed to store missing batches (batch numbers: %s): %w", strings.Join(batchNumbers, ", "), err) } return nil } -// GetUnresolvedBatchKeys returns the unresolved batch keys from the database -func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { - rows, err := db.getUnresolvedBatchKeysStmt.QueryxContext(ctx, limit) +// GetMissingBatchKeys returns the missing batch keys that is not yet present in offchain table +func (db *pgDB) GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { + rows, err := db.getMissingBatchKeysStmt.QueryxContext(ctx, limit) if err != nil { return nil, err } @@ -201,8 +177,8 @@ func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types return bks, nil } -// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database -func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { +// DeleteMissingBatchKeys deletes the missing batch keys from the missing_batch table in the db +func (db *pgDB) DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { if len(bks) == 0 { return nil } @@ -218,11 +194,11 @@ func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.Batch } query := fmt.Sprintf(` - DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (%s); + DELETE FROM data_node.missing_batches WHERE (num, hash) IN (%s); `, strings.Join(values, ",")) if _, err := db.pg.ExecContext(ctx, query, args...); err != nil { - return fmt.Errorf("failed to delete unresolved batches: %w", err) + return fmt.Errorf("failed to delete missing batches: %w", err) } return nil @@ -245,9 +221,8 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, ods []types.OffChainData) // GetOffChainData returns the value identified by the key func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) { data := struct { - Key string `db:"key"` - Value string `db:"value"` - BatchNum uint64 `db:"batch_num"` + Key string `db:"key"` + Value string `db:"value"` }{} if err := db.getOffChainDataStmt.QueryRowxContext(ctx, key.Hex()).StructScan(&data); err != nil { @@ -259,9 +234,8 @@ func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash) (*types.Of } return &types.OffChainData{ - Key: common.HexToHash(data.Key), - Value: common.FromHex(data.Value), - BatchNum: data.BatchNum, + Key: common.HexToHash(data.Key), + Value: common.FromHex(data.Value), }, nil } @@ -292,9 +266,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ defer rows.Close() type row struct { - Key string `db:"key"` - Value string `db:"value"` - BatchNum uint64 `db:"batch_num"` + Key string `db:"key"` + Value string `db:"value"` } list := make([]types.OffChainData, 0, len(keys)) @@ -305,9 +278,8 @@ func (db *pgDB) ListOffChainData(ctx context.Context, keys []common.Hash) ([]typ } list = append(list, types.OffChainData{ - Key: common.HexToHash(data.Key), - Value: common.FromHex(data.Value), - BatchNum: data.BatchNum, + Key: common.HexToHash(data.Key), + Value: common.FromHex(data.Value), }) } @@ -324,34 +296,7 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) { return count, nil } -// DetectOffchainDataGaps returns the number of gaps in the offchain_data table -func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { - rows, err := db.detectOffChainDataGapsStmt.QueryxContext(ctx) - if err != nil { - return nil, err - } - - defer rows.Close() - - type row struct { - CurrentBatchNum uint64 `db:"current_batch_num"` - NextBatchNum uint64 `db:"next_batch_num"` - } - - gaps := make(map[uint64]uint64) - for rows.Next() { - var data row - if err = rows.StructScan(&data); err != nil { - return nil, err - } - - gaps[data.CurrentBatchNum] = data.NextBatchNum - } - - return gaps, nil -} - -// buildBatchKeysInsertQuery builds the query to insert unresolved batch keys +// buildBatchKeysInsertQuery builds the query to insert missing batch keys func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) { const columnsAffected = 2 @@ -364,7 +309,7 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) { } return fmt.Sprintf(` - INSERT INTO data_node.unresolved_batches (num, hash) + INSERT INTO data_node.missing_batches (num, hash) VALUES %s ON CONFLICT (num, hash) DO NOTHING; `, strings.Join(values, ",")), args @@ -372,7 +317,7 @@ func buildBatchKeysInsertQuery(bks []types.BatchKey) (string, []interface{}) { // buildOffchainDataInsertQuery builds the query to insert offchain data func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface{}) { - const columnsAffected = 3 + const columnsAffected = 2 // Remove duplicates from the given offchain data ods = types.RemoveDuplicateOffChainData(ods) @@ -380,16 +325,15 @@ func buildOffchainDataInsertQuery(ods []types.OffChainData) (string, []interface args := make([]interface{}, len(ods)*columnsAffected) values := make([]string, len(ods)) for i, od := range ods { - values[i] = fmt.Sprintf("($%d, $%d, $%d)", i*columnsAffected+1, i*columnsAffected+2, i*columnsAffected+3) //nolint:mnd + values[i] = fmt.Sprintf("($%d, $%d)", i*columnsAffected+1, i*columnsAffected+2) //nolint:mnd args[i*columnsAffected] = od.Key.Hex() args[i*columnsAffected+1] = common.Bytes2Hex(od.Value) - args[i*columnsAffected+2] = od.BatchNum } return fmt.Sprintf(` - INSERT INTO data_node.offchain_data (key, value, batch_num) + INSERT INTO data_node.offchain_data (key, value) VALUES %s ON CONFLICT (key) DO UPDATE - SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num; + SET value = EXCLUDED.value; `, strings.Join(values, ",")), args } diff --git a/db/db_test.go b/db/db_test.go index 4f26888..97ea5dd 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -65,7 +65,7 @@ func Test_DB_StoreLastProcessedBlock(t *testing.T) { constructorExpect(mock) - expected := mock.ExpectExec(`INSERT INTO data_node\.sync_tasks \(task, block\) VALUES \(\$1, \$2\) ON CONFLICT \(task\) DO UPDATE SET block = EXCLUDED\.block, processed = NOW\(\)`). + expected := mock.ExpectExec(`UPDATE data_node\.sync_tasks SET block = \$2, processed = NOW\(\) WHERE task = \$1;`). WithArgs(tt.task, tt.block) if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) @@ -125,7 +125,7 @@ func Test_DB_GetLastProcessedBlock(t *testing.T) { constructorExpect(mock) - mock.ExpectExec(`INSERT INTO data_node\.sync_tasks \(task, block\) VALUES \(\$1, \$2\) ON CONFLICT \(task\) DO UPDATE SET block = EXCLUDED\.block, processed = NOW\(\)`). + mock.ExpectExec(`UPDATE data_node\.sync_tasks SET block = \$2, processed = NOW\(\) WHERE task = \$1;`). WithArgs(tt.task, tt.block). WillReturnResult(sqlmock.NewResult(1, 1)) @@ -159,7 +159,7 @@ func Test_DB_GetLastProcessedBlock(t *testing.T) { } } -func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { +func Test_DB_StoreMissingBatchKeys(t *testing.T) { t.Parallel() testTable := []struct { @@ -177,7 +177,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, + expectedQuery: `INSERT INTO data_node.missing_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, }, { name: "several values inserted", @@ -188,7 +188,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { Number: 2, Hash: common.BytesToHash([]byte("key2")), }}, - expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2),($3, $4) ON CONFLICT (num, hash) DO NOTHING`, + expectedQuery: `INSERT INTO data_node.missing_batches (num, hash) VALUES ($1, $2),($3, $4) ON CONFLICT (num, hash) DO NOTHING`, }, { name: "error returned", @@ -196,7 +196,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `INSERT INTO data_node.unresolved_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, + expectedQuery: `INSERT INTO data_node.missing_batches (num, hash) VALUES ($1, $2) ON CONFLICT (num, hash) DO NOTHING`, returnErr: errors.New("test error"), }, } @@ -214,10 +214,9 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { mock.ExpectPrepare(regexp.QuoteMeta(storeLastProcessedBlockSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getLastProcessedBlockSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(getUnresolvedBatchKeysSQL)) + mock.ExpectPrepare(regexp.QuoteMeta(getMissingBatchKeysSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getOffchainDataSQL)) mock.ExpectPrepare(regexp.QuoteMeta(countOffchainDataSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(selectOffchainDataGapsSQL)) dbPG, err := New(context.Background(), wdb) require.NoError(t, err) @@ -238,7 +237,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { } } - err = dbPG.StoreUnresolvedBatchKeys(context.Background(), tt.bk) + err = dbPG.StoreMissingBatchKeys(context.Background(), tt.bk) if tt.returnErr != nil { require.ErrorIs(t, err, tt.returnErr) } else { @@ -250,7 +249,7 @@ func Test_DB_StoreUnresolvedBatchKeys(t *testing.T) { } } -func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { +func Test_DB_GetMissingBatchKeys(t *testing.T) { t.Parallel() testTable := []struct { @@ -293,10 +292,10 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { require.NoError(t, err) // Seed data - seedUnresolvedBatchKeys(t, dbPG, mock, tt.bks) + seedMissingBatchKeys(t, dbPG, mock, tt.bks) var limit = uint(10) - expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.unresolved_batches LIMIT \$1\;`).WithArgs(limit) + expected := mock.ExpectQuery(`SELECT num, hash FROM data_node\.missing_batches LIMIT \$1\;`).WithArgs(limit) if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) @@ -306,7 +305,7 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { } } - data, err := dbPG.GetUnresolvedBatchKeys(context.Background(), limit) + data, err := dbPG.GetMissingBatchKeys(context.Background(), limit) if tt.returnErr != nil { require.ErrorIs(t, err, tt.returnErr) } else { @@ -319,7 +318,7 @@ func Test_DB_GetUnresolvedBatchKeys(t *testing.T) { } } -func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { +func Test_DB_DeleteMissingBatchKeys(t *testing.T) { t.Parallel() testTable := []struct { @@ -334,7 +333,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2))`, + expectedQuery: `DELETE FROM data_node.missing_batches WHERE (num, hash) IN (($1, $2))`, }, { name: "multiple values deleted", @@ -345,7 +344,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { Number: 2, Hash: common.BytesToHash([]byte("key2")), }}, - expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2),($3, $4))`, + expectedQuery: `DELETE FROM data_node.missing_batches WHERE (num, hash) IN (($1, $2),($3, $4))`, }, { name: "error returned", @@ -353,7 +352,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { Number: 1, Hash: common.BytesToHash([]byte("key1")), }}, - expectedQuery: `DELETE FROM data_node.unresolved_batches WHERE (num, hash) IN (($1, $2))`, + expectedQuery: `DELETE FROM data_node.missing_batches WHERE (num, hash) IN (($1, $2))`, returnErr: errors.New("test error"), }, } @@ -389,7 +388,7 @@ func Test_DB_DeleteUnresolvedBatchKeys(t *testing.T) { } } - err = dbPG.DeleteUnresolvedBatchKeys(context.Background(), tt.bks) + err = dbPG.DeleteMissingBatchKeys(context.Background(), tt.bks) if tt.returnErr != nil { require.ErrorIs(t, err, tt.returnErr) } else { @@ -419,7 +418,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { Key: common.BytesToHash([]byte("key1")), Value: []byte("value1"), }}, - expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`, + expectedQuery: `INSERT INTO data_node.offchain_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, }, { name: "several values inserted", @@ -430,7 +429,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { Key: common.BytesToHash([]byte("key2")), Value: []byte("value2"), }}, - expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3),($4, $5, $6) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`, + expectedQuery: `INSERT INTO data_node.offchain_data (key, value) VALUES ($1, $2),($3, $4) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, }, { name: "error returned", @@ -438,7 +437,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { Key: common.BytesToHash([]byte("key1")), Value: []byte("value1"), }}, - expectedQuery: `INSERT INTO data_node.offchain_data (key, value, batch_num) VALUES ($1, $2, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, batch_num = EXCLUDED.batch_num`, + expectedQuery: `INSERT INTO data_node.offchain_data (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, returnErr: errors.New("test error"), }, } @@ -463,7 +462,7 @@ func Test_DB_StoreOffChainData(t *testing.T) { if tt.expectedQuery != "" { args := make([]driver.Value, 0, len(tt.ods)*3) for _, od := range tt.ods { - args = append(args, od.Key.Hex(), common.Bytes2Hex(od.Value), od.BatchNum) + args = append(args, od.Key.Hex(), common.Bytes2Hex(od.Value)) } expected := mock.ExpectExec(regexp.QuoteMeta(tt.expectedQuery)).WithArgs(args...) @@ -499,15 +498,13 @@ func Test_DB_GetOffChainData(t *testing.T) { { name: "successfully selected value", od: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }}, key: common.BytesToHash([]byte("key1")), expected: &types.OffChainData{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, }, { @@ -556,8 +553,8 @@ func Test_DB_GetOffChainData(t *testing.T) { if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) } else { - expected.WillReturnRows(sqlmock.NewRows([]string{"key", "value", "batch_num"}). - AddRow(tt.expected.Key.Hex(), common.Bytes2Hex(tt.expected.Value), tt.expected.BatchNum)) + expected.WillReturnRows(sqlmock.NewRows([]string{"key", "value"}). + AddRow(tt.expected.Key.Hex(), common.Bytes2Hex(tt.expected.Value))) } data, err := dbPG.GetOffChainData(context.Background(), tt.key) @@ -595,23 +592,20 @@ func Test_DB_ListOffChainData(t *testing.T) { }, expected: []types.OffChainData{ { - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 0, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`, }, { name: "successfully selected two values", od: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, { - Key: common.BytesToHash([]byte("key2")), - Value: []byte("value2"), - BatchNum: 2, + Key: common.BytesToHash([]byte("key2")), + Value: []byte("value2"), }}, keys: []common.Hash{ common.BytesToHash([]byte("key1")), @@ -619,17 +613,15 @@ func Test_DB_ListOffChainData(t *testing.T) { }, expected: []types.OffChainData{ { - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, + Key: common.BytesToHash([]byte("key1")), + Value: []byte("value1"), }, { - Key: common.BytesToHash([]byte("key2")), - Value: []byte("value2"), - BatchNum: 2, + Key: common.BytesToHash([]byte("key2")), + Value: []byte("value2"), }, }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\, \$2\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\, \$2\)`, }, { name: "error returned", @@ -640,7 +632,7 @@ func Test_DB_ListOffChainData(t *testing.T) { keys: []common.Hash{ common.BytesToHash([]byte("key1")), }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`, returnErr: errors.New("test error"), }, { @@ -652,7 +644,7 @@ func Test_DB_ListOffChainData(t *testing.T) { keys: []common.Hash{ common.BytesToHash([]byte("undefined")), }, - sql: `SELECT key, value, batch_num FROM data_node\.offchain_data WHERE key IN \(\$1\)`, + sql: `SELECT key, value FROM data_node\.offchain_data WHERE key IN \(\$1\)`, returnErr: ErrStateNotSynchronized, }, } @@ -688,10 +680,10 @@ func Test_DB_ListOffChainData(t *testing.T) { if tt.returnErr != nil { expected.WillReturnError(tt.returnErr) } else { - returnData := sqlmock.NewRows([]string{"key", "value", "batch_num"}) + returnData := sqlmock.NewRows([]string{"key", "value"}) for _, data := range tt.expected { - returnData = returnData.AddRow(data.Key.Hex(), common.Bytes2Hex(data.Value), data.BatchNum) + returnData = returnData.AddRow(data.Key.Hex(), common.Bytes2Hex(data.Value)) } expected.WillReturnRows(returnData) @@ -785,96 +777,12 @@ func Test_DB_CountOffchainData(t *testing.T) { } } -func Test_DB_DetectOffchainDataGaps(t *testing.T) { - t.Parallel() - - testTable := []struct { - name string - seed []types.OffChainData - gaps map[uint64]uint64 - returnErr error - }{ - { - name: "one gap found", - seed: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - BatchNum: 1, - }, { - Key: common.BytesToHash([]byte("key2")), - Value: []byte("value2"), - BatchNum: 2, - }, { - Key: common.HexToHash("key4"), - Value: []byte("value4"), - BatchNum: 4, - }}, - gaps: map[uint64]uint64{ - 2: 4, - }, - }, - { - name: "error returned", - seed: []types.OffChainData{{ - Key: common.BytesToHash([]byte("key1")), - Value: []byte("value1"), - }}, - returnErr: errors.New("test error"), - }, - } - - for _, tt := range testTable { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - db, mock, err := sqlmock.New() - require.NoError(t, err) - - defer db.Close() - - constructorExpect(mock) - - wdb := sqlx.NewDb(db, "postgres") - dbPG, err := New(context.Background(), wdb) - require.NoError(t, err) - - // Seed data - seedOffchainData(t, dbPG, mock, tt.seed) - - expected := mock.ExpectQuery(regexp.QuoteMeta(selectOffchainDataGapsSQL)) - - if tt.returnErr != nil { - expected.WillReturnError(tt.returnErr) - } else { - rows := sqlmock.NewRows([]string{"current_batch_num", "next_batch_num"}) - for k, v := range tt.gaps { - rows.AddRow(k, v) - } - expected.WillReturnRows(rows) - } - - actual, err := dbPG.DetectOffchainDataGaps(context.Background()) - if tt.returnErr != nil { - require.ErrorIs(t, err, tt.returnErr) - } else { - require.NoError(t, err) - require.Equal(t, tt.gaps, actual) - } - - require.NoError(t, mock.ExpectationsWereMet()) - }) - } -} - func constructorExpect(mock sqlmock.Sqlmock) { mock.ExpectPrepare(regexp.QuoteMeta(storeLastProcessedBlockSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getLastProcessedBlockSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(getUnresolvedBatchKeysSQL)) + mock.ExpectPrepare(regexp.QuoteMeta(getMissingBatchKeysSQL)) mock.ExpectPrepare(regexp.QuoteMeta(getOffchainDataSQL)) mock.ExpectPrepare(regexp.QuoteMeta(countOffchainDataSQL)) - mock.ExpectPrepare(regexp.QuoteMeta(selectOffchainDataGapsSQL)) } func seedOffchainData(t *testing.T, db DB, mock sqlmock.Sqlmock, ods []types.OffChainData) { @@ -898,7 +806,7 @@ func seedOffchainData(t *testing.T, db DB, mock sqlmock.Sqlmock, ods []types.Off require.NoError(t, err) } -func seedUnresolvedBatchKeys(t *testing.T, db DB, mock sqlmock.Sqlmock, bks []types.BatchKey) { +func seedMissingBatchKeys(t *testing.T, db DB, mock sqlmock.Sqlmock, bks []types.BatchKey) { t.Helper() if len(bks) == 0 { @@ -915,6 +823,6 @@ func seedUnresolvedBatchKeys(t *testing.T, db DB, mock sqlmock.Sqlmock, bks []ty mock.ExpectExec(regexp.QuoteMeta(query)).WithArgs(argValues...). WillReturnResult(sqlmock.NewResult(int64(len(bks)), int64(len(bks)))) - err := db.StoreUnresolvedBatchKeys(context.Background(), bks) + err := db.StoreMissingBatchKeys(context.Background(), bks) require.NoError(t, err) } diff --git a/db/migrations/0006.sql b/db/migrations/0006.sql new file mode 100644 index 0000000..c6a80c2 --- /dev/null +++ b/db/migrations/0006.sql @@ -0,0 +1,23 @@ +-- +migrate Down +-- Add the 'batch_num' column to 'offchain_data' table +ALTER TABLE data_node.offchain_data + ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; + +-- Rename the 'missing_batches' table to 'unresolved_batches' +ALTER TABLE data_node.missing_batches RENAME TO unresolved_batches; + +-- Create an index for the 'batch_num' column for better performance +CREATE INDEX IF NOT EXISTS idx_batch_num ON data_node.offchain_data(batch_num); + +-- Reset the sync task for L1 to trigger resync +UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; + +-- +migrate Up +-- Drop the 'batch_num' column from 'offchain_data' table +ALTER TABLE data_node.offchain_data DROP COLUMN batch_num; + +-- Rename the 'unresolved_batches' table back to 'missing_batches' +ALTER TABLE data_node.unresolved_batches RENAME TO missing_batches; + +-- Drop the index created on 'batch_num' +DROP INDEX IF EXISTS idx_batch_num; \ No newline at end of file diff --git a/go.mod b/go.mod index eb2c6fb..44ef249 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,6 @@ module github.com/0xPolygon/cdk-data-availability go 1.22.4 - require ( github.com/0xPolygon/cdk v0.1.0 github.com/0xPolygon/cdk-contracts-tooling v0.0.0-20240826154954-f6182d2b17a2 @@ -11,7 +10,6 @@ require ( github.com/ethereum/go-ethereum v1.14.5 github.com/gorilla/websocket v1.5.1 github.com/hermeznetwork/tracerr v0.3.2 - github.com/iden3/go-iden3-crypto v0.0.16 github.com/invopop/jsonschema v0.12.0 github.com/jmoiron/sqlx v1.2.0 github.com/lib/pq v1.10.7 @@ -50,6 +48,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/hcl v1.0.1-0.20180906183839-65a6292f0157 // indirect github.com/holiman/uint256 v1.2.4 // indirect + github.com/iden3/go-iden3-crypto v0.0.16 // indirect github.com/klauspost/compress v1.17.2 // indirect github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/magiconair/properties v1.8.7 // indirect diff --git a/mocks/db.generated.go b/mocks/db.generated.go index d21533c..46a97c4 100644 --- a/mocks/db.generated.go +++ b/mocks/db.generated.go @@ -81,12 +81,12 @@ func (_c *DB_CountOffchainData_Call) RunAndReturn(run func(context.Context) (uin return _c } -// DeleteUnresolvedBatchKeys provides a mock function with given fields: ctx, bks -func (_m *DB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { +// DeleteMissingBatchKeys provides a mock function with given fields: ctx, bks +func (_m *DB) DeleteMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { ret := _m.Called(ctx, bks) if len(ret) == 0 { - panic("no return value specified for DeleteUnresolvedBatchKeys") + panic("no return value specified for DeleteMissingBatchKeys") } var r0 error @@ -99,89 +99,31 @@ func (_m *DB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKe return r0 } -// DB_DeleteUnresolvedBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteUnresolvedBatchKeys' -type DB_DeleteUnresolvedBatchKeys_Call struct { +// DB_DeleteMissingBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteMissingBatchKeys' +type DB_DeleteMissingBatchKeys_Call struct { *mock.Call } -// DeleteUnresolvedBatchKeys is a helper method to define mock.On call +// DeleteMissingBatchKeys is a helper method to define mock.On call // - ctx context.Context // - bks []types.BatchKey -func (_e *DB_Expecter) DeleteUnresolvedBatchKeys(ctx interface{}, bks interface{}) *DB_DeleteUnresolvedBatchKeys_Call { - return &DB_DeleteUnresolvedBatchKeys_Call{Call: _e.mock.On("DeleteUnresolvedBatchKeys", ctx, bks)} +func (_e *DB_Expecter) DeleteMissingBatchKeys(ctx interface{}, bks interface{}) *DB_DeleteMissingBatchKeys_Call { + return &DB_DeleteMissingBatchKeys_Call{Call: _e.mock.On("DeleteMissingBatchKeys", ctx, bks)} } -func (_c *DB_DeleteUnresolvedBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_DeleteUnresolvedBatchKeys_Call { +func (_c *DB_DeleteMissingBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_DeleteMissingBatchKeys_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].([]types.BatchKey)) }) return _c } -func (_c *DB_DeleteUnresolvedBatchKeys_Call) Return(_a0 error) *DB_DeleteUnresolvedBatchKeys_Call { +func (_c *DB_DeleteMissingBatchKeys_Call) Return(_a0 error) *DB_DeleteMissingBatchKeys_Call { _c.Call.Return(_a0) return _c } -func (_c *DB_DeleteUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_DeleteUnresolvedBatchKeys_Call { - _c.Call.Return(run) - return _c -} - -// DetectOffchainDataGaps provides a mock function with given fields: ctx -func (_m *DB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for DetectOffchainDataGaps") - } - - var r0 map[uint64]uint64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (map[uint64]uint64, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) map[uint64]uint64); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[uint64]uint64) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// DB_DetectOffchainDataGaps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetectOffchainDataGaps' -type DB_DetectOffchainDataGaps_Call struct { - *mock.Call -} - -// DetectOffchainDataGaps is a helper method to define mock.On call -// - ctx context.Context -func (_e *DB_Expecter) DetectOffchainDataGaps(ctx interface{}) *DB_DetectOffchainDataGaps_Call { - return &DB_DetectOffchainDataGaps_Call{Call: _e.mock.On("DetectOffchainDataGaps", ctx)} -} - -func (_c *DB_DetectOffchainDataGaps_Call) Run(run func(ctx context.Context)) *DB_DetectOffchainDataGaps_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *DB_DetectOffchainDataGaps_Call) Return(_a0 map[uint64]uint64, _a1 error) *DB_DetectOffchainDataGaps_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *DB_DetectOffchainDataGaps_Call) RunAndReturn(run func(context.Context) (map[uint64]uint64, error)) *DB_DetectOffchainDataGaps_Call { +func (_c *DB_DeleteMissingBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_DeleteMissingBatchKeys_Call { _c.Call.Return(run) return _c } @@ -243,29 +185,29 @@ func (_c *DB_GetLastProcessedBlock_Call) RunAndReturn(run func(context.Context, return _c } -// GetOffChainData provides a mock function with given fields: ctx, key -func (_m *DB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) { - ret := _m.Called(ctx, key) +// GetMissingBatchKeys provides a mock function with given fields: ctx, limit +func (_m *DB) GetMissingBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { + ret := _m.Called(ctx, limit) if len(ret) == 0 { - panic("no return value specified for GetOffChainData") + panic("no return value specified for GetMissingBatchKeys") } - var r0 *types.OffChainData + var r0 []types.BatchKey var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.OffChainData, error)); ok { - return rf(ctx, key) + if rf, ok := ret.Get(0).(func(context.Context, uint) ([]types.BatchKey, error)); ok { + return rf(ctx, limit) } - if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.OffChainData); ok { - r0 = rf(ctx, key) + if rf, ok := ret.Get(0).(func(context.Context, uint) []types.BatchKey); ok { + r0 = rf(ctx, limit) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.OffChainData) + r0 = ret.Get(0).([]types.BatchKey) } } - if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { - r1 = rf(ctx, key) + if rf, ok := ret.Get(1).(func(context.Context, uint) error); ok { + r1 = rf(ctx, limit) } else { r1 = ret.Error(1) } @@ -273,58 +215,58 @@ func (_m *DB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffC return r0, r1 } -// DB_GetOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetOffChainData' -type DB_GetOffChainData_Call struct { +// DB_GetMissingBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMissingBatchKeys' +type DB_GetMissingBatchKeys_Call struct { *mock.Call } -// GetOffChainData is a helper method to define mock.On call +// GetMissingBatchKeys is a helper method to define mock.On call // - ctx context.Context -// - key common.Hash -func (_e *DB_Expecter) GetOffChainData(ctx interface{}, key interface{}) *DB_GetOffChainData_Call { - return &DB_GetOffChainData_Call{Call: _e.mock.On("GetOffChainData", ctx, key)} +// - limit uint +func (_e *DB_Expecter) GetMissingBatchKeys(ctx interface{}, limit interface{}) *DB_GetMissingBatchKeys_Call { + return &DB_GetMissingBatchKeys_Call{Call: _e.mock.On("GetMissingBatchKeys", ctx, limit)} } -func (_c *DB_GetOffChainData_Call) Run(run func(ctx context.Context, key common.Hash)) *DB_GetOffChainData_Call { +func (_c *DB_GetMissingBatchKeys_Call) Run(run func(ctx context.Context, limit uint)) *DB_GetMissingBatchKeys_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(common.Hash)) + run(args[0].(context.Context), args[1].(uint)) }) return _c } -func (_c *DB_GetOffChainData_Call) Return(_a0 *types.OffChainData, _a1 error) *DB_GetOffChainData_Call { +func (_c *DB_GetMissingBatchKeys_Call) Return(_a0 []types.BatchKey, _a1 error) *DB_GetMissingBatchKeys_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *DB_GetOffChainData_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.OffChainData, error)) *DB_GetOffChainData_Call { +func (_c *DB_GetMissingBatchKeys_Call) RunAndReturn(run func(context.Context, uint) ([]types.BatchKey, error)) *DB_GetMissingBatchKeys_Call { _c.Call.Return(run) return _c } -// GetUnresolvedBatchKeys provides a mock function with given fields: ctx, limit -func (_m *DB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.BatchKey, error) { - ret := _m.Called(ctx, limit) +// GetOffChainData provides a mock function with given fields: ctx, key +func (_m *DB) GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) { + ret := _m.Called(ctx, key) if len(ret) == 0 { - panic("no return value specified for GetUnresolvedBatchKeys") + panic("no return value specified for GetOffChainData") } - var r0 []types.BatchKey + var r0 *types.OffChainData var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uint) ([]types.BatchKey, error)); ok { - return rf(ctx, limit) + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) (*types.OffChainData, error)); ok { + return rf(ctx, key) } - if rf, ok := ret.Get(0).(func(context.Context, uint) []types.BatchKey); ok { - r0 = rf(ctx, limit) + if rf, ok := ret.Get(0).(func(context.Context, common.Hash) *types.OffChainData); ok { + r0 = rf(ctx, key) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.BatchKey) + r0 = ret.Get(0).(*types.OffChainData) } } - if rf, ok := ret.Get(1).(func(context.Context, uint) error); ok { - r1 = rf(ctx, limit) + if rf, ok := ret.Get(1).(func(context.Context, common.Hash) error); ok { + r1 = rf(ctx, key) } else { r1 = ret.Error(1) } @@ -332,31 +274,31 @@ func (_m *DB) GetUnresolvedBatchKeys(ctx context.Context, limit uint) ([]types.B return r0, r1 } -// DB_GetUnresolvedBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetUnresolvedBatchKeys' -type DB_GetUnresolvedBatchKeys_Call struct { +// DB_GetOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetOffChainData' +type DB_GetOffChainData_Call struct { *mock.Call } -// GetUnresolvedBatchKeys is a helper method to define mock.On call +// GetOffChainData is a helper method to define mock.On call // - ctx context.Context -// - limit uint -func (_e *DB_Expecter) GetUnresolvedBatchKeys(ctx interface{}, limit interface{}) *DB_GetUnresolvedBatchKeys_Call { - return &DB_GetUnresolvedBatchKeys_Call{Call: _e.mock.On("GetUnresolvedBatchKeys", ctx, limit)} +// - key common.Hash +func (_e *DB_Expecter) GetOffChainData(ctx interface{}, key interface{}) *DB_GetOffChainData_Call { + return &DB_GetOffChainData_Call{Call: _e.mock.On("GetOffChainData", ctx, key)} } -func (_c *DB_GetUnresolvedBatchKeys_Call) Run(run func(ctx context.Context, limit uint)) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetOffChainData_Call) Run(run func(ctx context.Context, key common.Hash)) *DB_GetOffChainData_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint)) + run(args[0].(context.Context), args[1].(common.Hash)) }) return _c } -func (_c *DB_GetUnresolvedBatchKeys_Call) Return(_a0 []types.BatchKey, _a1 error) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetOffChainData_Call) Return(_a0 *types.OffChainData, _a1 error) *DB_GetOffChainData_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *DB_GetUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context, uint) ([]types.BatchKey, error)) *DB_GetUnresolvedBatchKeys_Call { +func (_c *DB_GetOffChainData_Call) RunAndReturn(run func(context.Context, common.Hash) (*types.OffChainData, error)) *DB_GetOffChainData_Call { _c.Call.Return(run) return _c } @@ -468,17 +410,17 @@ func (_c *DB_StoreLastProcessedBlock_Call) RunAndReturn(run func(context.Context return _c } -// StoreOffChainData provides a mock function with given fields: ctx, od -func (_m *DB) StoreOffChainData(ctx context.Context, od []types.OffChainData) error { - ret := _m.Called(ctx, od) +// StoreMissingBatchKeys provides a mock function with given fields: ctx, bks +func (_m *DB) StoreMissingBatchKeys(ctx context.Context, bks []types.BatchKey) error { + ret := _m.Called(ctx, bks) if len(ret) == 0 { - panic("no return value specified for StoreOffChainData") + panic("no return value specified for StoreMissingBatchKeys") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.OffChainData) error); ok { - r0 = rf(ctx, od) + if rf, ok := ret.Get(0).(func(context.Context, []types.BatchKey) error); ok { + r0 = rf(ctx, bks) } else { r0 = ret.Error(0) } @@ -486,46 +428,46 @@ func (_m *DB) StoreOffChainData(ctx context.Context, od []types.OffChainData) er return r0 } -// DB_StoreOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreOffChainData' -type DB_StoreOffChainData_Call struct { +// DB_StoreMissingBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreMissingBatchKeys' +type DB_StoreMissingBatchKeys_Call struct { *mock.Call } -// StoreOffChainData is a helper method to define mock.On call +// StoreMissingBatchKeys is a helper method to define mock.On call // - ctx context.Context -// - od []types.OffChainData -func (_e *DB_Expecter) StoreOffChainData(ctx interface{}, od interface{}) *DB_StoreOffChainData_Call { - return &DB_StoreOffChainData_Call{Call: _e.mock.On("StoreOffChainData", ctx, od)} +// - bks []types.BatchKey +func (_e *DB_Expecter) StoreMissingBatchKeys(ctx interface{}, bks interface{}) *DB_StoreMissingBatchKeys_Call { + return &DB_StoreMissingBatchKeys_Call{Call: _e.mock.On("StoreMissingBatchKeys", ctx, bks)} } -func (_c *DB_StoreOffChainData_Call) Run(run func(ctx context.Context, od []types.OffChainData)) *DB_StoreOffChainData_Call { +func (_c *DB_StoreMissingBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_StoreMissingBatchKeys_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.OffChainData)) + run(args[0].(context.Context), args[1].([]types.BatchKey)) }) return _c } -func (_c *DB_StoreOffChainData_Call) Return(_a0 error) *DB_StoreOffChainData_Call { +func (_c *DB_StoreMissingBatchKeys_Call) Return(_a0 error) *DB_StoreMissingBatchKeys_Call { _c.Call.Return(_a0) return _c } -func (_c *DB_StoreOffChainData_Call) RunAndReturn(run func(context.Context, []types.OffChainData) error) *DB_StoreOffChainData_Call { +func (_c *DB_StoreMissingBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_StoreMissingBatchKeys_Call { _c.Call.Return(run) return _c } -// StoreUnresolvedBatchKeys provides a mock function with given fields: ctx, bks -func (_m *DB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey) error { - ret := _m.Called(ctx, bks) +// StoreOffChainData provides a mock function with given fields: ctx, od +func (_m *DB) StoreOffChainData(ctx context.Context, od []types.OffChainData) error { + ret := _m.Called(ctx, od) if len(ret) == 0 { - panic("no return value specified for StoreUnresolvedBatchKeys") + panic("no return value specified for StoreOffChainData") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.BatchKey) error); ok { - r0 = rf(ctx, bks) + if rf, ok := ret.Get(0).(func(context.Context, []types.OffChainData) error); ok { + r0 = rf(ctx, od) } else { r0 = ret.Error(0) } @@ -533,31 +475,31 @@ func (_m *DB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey return r0 } -// DB_StoreUnresolvedBatchKeys_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreUnresolvedBatchKeys' -type DB_StoreUnresolvedBatchKeys_Call struct { +// DB_StoreOffChainData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StoreOffChainData' +type DB_StoreOffChainData_Call struct { *mock.Call } -// StoreUnresolvedBatchKeys is a helper method to define mock.On call +// StoreOffChainData is a helper method to define mock.On call // - ctx context.Context -// - bks []types.BatchKey -func (_e *DB_Expecter) StoreUnresolvedBatchKeys(ctx interface{}, bks interface{}) *DB_StoreUnresolvedBatchKeys_Call { - return &DB_StoreUnresolvedBatchKeys_Call{Call: _e.mock.On("StoreUnresolvedBatchKeys", ctx, bks)} +// - od []types.OffChainData +func (_e *DB_Expecter) StoreOffChainData(ctx interface{}, od interface{}) *DB_StoreOffChainData_Call { + return &DB_StoreOffChainData_Call{Call: _e.mock.On("StoreOffChainData", ctx, od)} } -func (_c *DB_StoreUnresolvedBatchKeys_Call) Run(run func(ctx context.Context, bks []types.BatchKey)) *DB_StoreUnresolvedBatchKeys_Call { +func (_c *DB_StoreOffChainData_Call) Run(run func(ctx context.Context, od []types.OffChainData)) *DB_StoreOffChainData_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.BatchKey)) + run(args[0].(context.Context), args[1].([]types.OffChainData)) }) return _c } -func (_c *DB_StoreUnresolvedBatchKeys_Call) Return(_a0 error) *DB_StoreUnresolvedBatchKeys_Call { +func (_c *DB_StoreOffChainData_Call) Return(_a0 error) *DB_StoreOffChainData_Call { _c.Call.Return(_a0) return _c } -func (_c *DB_StoreUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Context, []types.BatchKey) error) *DB_StoreUnresolvedBatchKeys_Call { +func (_c *DB_StoreOffChainData_Call) RunAndReturn(run func(context.Context, []types.OffChainData) error) *DB_StoreOffChainData_Call { _c.Call.Return(run) return _c } diff --git a/mocks/gaps_detector.generated.go b/mocks/gaps_detector.generated.go deleted file mode 100644 index a3f3f98..0000000 --- a/mocks/gaps_detector.generated.go +++ /dev/null @@ -1,79 +0,0 @@ -// Code generated by mockery v2.40.1. DO NOT EDIT. - -package mocks - -import mock "github.com/stretchr/testify/mock" - -// GapsDetector is an autogenerated mock type for the GapsDetector type -type GapsDetector struct { - mock.Mock -} - -type GapsDetector_Expecter struct { - mock *mock.Mock -} - -func (_m *GapsDetector) EXPECT() *GapsDetector_Expecter { - return &GapsDetector_Expecter{mock: &_m.Mock} -} - -// Gaps provides a mock function with given fields: -func (_m *GapsDetector) Gaps() map[uint64]uint64 { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Gaps") - } - - var r0 map[uint64]uint64 - if rf, ok := ret.Get(0).(func() map[uint64]uint64); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[uint64]uint64) - } - } - - return r0 -} - -// GapsDetector_Gaps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Gaps' -type GapsDetector_Gaps_Call struct { - *mock.Call -} - -// Gaps is a helper method to define mock.On call -func (_e *GapsDetector_Expecter) Gaps() *GapsDetector_Gaps_Call { - return &GapsDetector_Gaps_Call{Call: _e.mock.On("Gaps")} -} - -func (_c *GapsDetector_Gaps_Call) Run(run func()) *GapsDetector_Gaps_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *GapsDetector_Gaps_Call) Return(_a0 map[uint64]uint64) *GapsDetector_Gaps_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *GapsDetector_Gaps_Call) RunAndReturn(run func() map[uint64]uint64) *GapsDetector_Gaps_Call { - _c.Call.Return(run) - return _c -} - -// NewGapsDetector creates a new instance of GapsDetector. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewGapsDetector(t interface { - mock.TestingT - Cleanup(func()) -}) *GapsDetector { - mock := &GapsDetector{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/services/status/status.go b/services/status/status.go index 6b3d2a2..20d1c6b 100644 --- a/services/status/status.go +++ b/services/status/status.go @@ -15,25 +15,17 @@ import ( // APISTATUS is the namespace of the status service const APISTATUS = "status" -// GapsDetector is an interface for detecting gaps in the offchain data -type GapsDetector interface { - // Gaps returns a map of gaps in the offchain data - Gaps() map[uint64]uint64 -} - // Endpoints contains implementations for the "status" RPC endpoints type Endpoints struct { - db db.DB - startTime time.Time - gapsDetector GapsDetector + db db.DB + startTime time.Time } // NewEndpoints returns Endpoints -func NewEndpoints(db db.DB, gapsDetector GapsDetector) *Endpoints { +func NewEndpoints(db db.DB) *Endpoints { return &Endpoints{ - db: db, - startTime: time.Now(), - gapsDetector: gapsDetector, + db: db, + startTime: time.Now(), } } @@ -47,7 +39,7 @@ func (s *Endpoints) GetStatus() (interface{}, rpc.Error) { log.Errorf("failed to get the key count from the offchain_data table: %v", err) } - backfillProgress, err := s.db.GetLastProcessedBlock(ctx, string(synchronizer.L1SyncTask)) + lastSynchronizedBlock, err := s.db.GetLastProcessedBlock(ctx, string(synchronizer.L1SyncTask)) if err != nil { log.Errorf("failed to get last block processed by the synchronizer: %v", err) } @@ -56,7 +48,6 @@ func (s *Endpoints) GetStatus() (interface{}, rpc.Error) { Version: dataavailability.Version, Uptime: uptime, KeyCount: rowCount, - BackfillProgress: backfillProgress, - OffchainDataGapsExist: len(s.gapsDetector.Gaps()) > 0, + LastSynchronizedBlock: lastSynchronizedBlock, }, nil } diff --git a/services/status/status_test.go b/services/status/status_test.go index cc9b737..bf0e55a 100644 --- a/services/status/status_test.go +++ b/services/status/status_test.go @@ -52,11 +52,7 @@ func TestEndpoints_GetStatus(t *testing.T) { dbMock.On("GetLastProcessedBlock", mock.Anything, mock.Anything). Return(tt.getLastProcessedBlock, tt.getLastProcessedBlockErr) - gapDetectorMock := mocks.NewGapsDetector(t) - - gapDetectorMock.On("Gaps").Return(map[uint64]uint64{1: 1}) - - statusEndpoints := NewEndpoints(dbMock, gapDetectorMock) + statusEndpoints := NewEndpoints(dbMock) actual, err := statusEndpoints.GetStatus() @@ -72,8 +68,7 @@ func TestEndpoints_GetStatus(t *testing.T) { require.NotEmpty(t, dacStatus.Uptime) require.Equal(t, "v0.1.0", dacStatus.Version) require.Equal(t, tt.countOffchainData, dacStatus.KeyCount) - require.Equal(t, tt.getLastProcessedBlock, dacStatus.BackfillProgress) - require.True(t, dacStatus.OffchainDataGapsExist) + require.Equal(t, tt.getLastProcessedBlock, dacStatus.LastSynchronizedBlock) } }) } diff --git a/services/sync/sync_test.go b/services/sync/sync_test.go index 0aa3ac9..162df4a 100644 --- a/services/sync/sync_test.go +++ b/services/sync/sync_test.go @@ -27,18 +27,16 @@ func TestEndpoints_GetOffChainData(t *testing.T) { name: "successfully got offchain data", hash: types.ArgHash{}, data: &types.OffChainData{ - Key: common.Hash{}, - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.Hash{}, + Value: types.ArgBytes("offchaindata"), }, }, { name: "db returns error", hash: types.ArgHash{}, data: &types.OffChainData{ - Key: common.Hash{}, - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.Hash{}, + Value: types.ArgBytes("offchaindata"), }, dbErr: errors.New("test error"), err: errors.New("failed to get the requested data"), @@ -85,18 +83,16 @@ func TestSyncEndpoints_ListOffChainData(t *testing.T) { name: "successfully got offchain data", hashes: generateRandomHashes(t, 1), data: []types.OffChainData{{ - Key: common.BytesToHash(nil), - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.BytesToHash(nil), + Value: types.ArgBytes("offchaindata"), }}, }, { name: "db returns error", hashes: []types.ArgHash{}, data: []types.OffChainData{{ - Key: common.BytesToHash(nil), - Value: types.ArgBytes("offchaindata"), - BatchNum: 0, + Key: common.BytesToHash(nil), + Value: types.ArgBytes("offchaindata"), }}, dbErr: errors.New("test error"), err: errors.New("failed to list the requested data"), diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 85e1738..e66552d 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -1,7 +1,6 @@ package synchronizer import ( - "bytes" "context" "fmt" "math/rand" @@ -33,21 +32,19 @@ type SequencerTracker interface { // BatchSynchronizer watches for number events, checks if they are // "locally" stored, then retrieves and stores missing data type BatchSynchronizer struct { - client etherman.Etherman - stop chan struct{} - retry time.Duration - rpcTimeout time.Duration - blockBatchSize uint - self common.Address - db db.DB - committee *CommitteeMapSafe - syncLock sync.Mutex - reorgs <-chan BlockReorg - events chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches - sequencer SequencerTracker - rpcClientFactory client.Factory - offchainDataGaps map[uint64]uint64 - offchainDataGapsLock sync.RWMutex + client etherman.Etherman + stop chan struct{} + retry time.Duration + rpcTimeout time.Duration + blockBatchSize uint + self common.Address + db db.DB + committee *CommitteeMapSafe + syncLock sync.Mutex + reorgs <-chan BlockReorg + events chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches + sequencer SequencerTracker + rpcClientFactory client.Factory } // NewBatchSynchronizer creates the BatchSynchronizer @@ -76,7 +73,6 @@ func NewBatchSynchronizer( events: make(chan *polygonvalidiumetrog.PolygonvalidiumetrogSequenceBatches), sequencer: sequencer, rpcClientFactory: rpcClientFactory, - offchainDataGaps: make(map[uint64]uint64), } return synchronizer, synchronizer.resolveCommittee() } @@ -102,10 +98,9 @@ func (bs *BatchSynchronizer) resolveCommittee() error { // Start starts the synchronizer func (bs *BatchSynchronizer) Start(ctx context.Context) { log.Infof("starting batch synchronizer, DAC addr: %v", bs.self) - go bs.processUnresolvedBatches(ctx) + go bs.processMissingBatches(ctx) go bs.produceEvents(ctx) go bs.handleReorgs(ctx) - go bs.startOffchainDataGapsDetection(ctx) } // Stop stops the synchronizer @@ -113,17 +108,6 @@ func (bs *BatchSynchronizer) Stop() { close(bs.stop) } -// Gaps returns the offchain data gaps -func (bs *BatchSynchronizer) Gaps() map[uint64]uint64 { - bs.offchainDataGapsLock.RLock() - gaps := make(map[uint64]uint64, len(bs.offchainDataGaps)) - for key, value := range bs.offchainDataGaps { - gaps[key] = value - } - bs.offchainDataGapsLock.RUnlock() - return gaps -} - func (bs *BatchSynchronizer) handleReorgs(ctx context.Context) { log.Info("starting reorgs handler") for { @@ -263,43 +247,14 @@ func (bs *BatchSynchronizer) handleEvent( }) } - // Store batch keys. Already handled batch keys are going to be ignored based on the DB logic. - return storeUnresolvedBatchKeys(ctx, bs.db, batchKeys) + // Store batch keys in missing_batches table that are not already present offchain_data table + return bs.findMissingBatches(ctx, batchKeys) } -func (bs *BatchSynchronizer) processUnresolvedBatches(ctx context.Context) { - log.Info("starting handling unresolved batches") - for { - delay := time.NewTimer(bs.retry) - select { - case <-delay.C: - if err := bs.handleUnresolvedBatches(ctx); err != nil { - log.Error(err) - } - case <-bs.stop: - return - } - } -} - -// handleUnresolvedBatches handles unresolved batches that were collected by the event consumer -func (bs *BatchSynchronizer) handleUnresolvedBatches(ctx context.Context) error { - // Get unresolved batches - batchKeys, err := getUnresolvedBatchKeys(ctx, bs.db) - if err != nil { - return fmt.Errorf("failed to get unresolved batch keys: %v", err) - } - - if len(batchKeys) == 0 { - return nil - } - - // Collect list of keys +func (bs *BatchSynchronizer) findMissingBatches(ctx context.Context, batchKeys []types.BatchKey) error { keys := make([]common.Hash, len(batchKeys)) - hashToKeys := make(map[common.Hash]types.BatchKey) for i, key := range batchKeys { keys[i] = key.Hash - hashToKeys[key.Hash] = key } // Get the existing offchain data by the given list of keys @@ -308,55 +263,69 @@ func (bs *BatchSynchronizer) handleUnresolvedBatches(ctx context.Context) error return fmt.Errorf("failed to list offchain data: %v", err) } - // Resolve the unresolved data - data := make([]types.OffChainData, 0) - resolved := make([]types.BatchKey, 0) - - // Go over existing keys and mark them as resolved if they exist. - // Update the batch number if it is zero. + hashToKeys := make(map[common.Hash]struct{}) for _, extData := range existingOffchainData { - batchKey, ok := hashToKeys[extData.Key] + hashToKeys[extData.Key] = struct{}{} + } + + missingData := make([]types.BatchKey, 0) + for _, batchKey := range batchKeys { + _, ok := hashToKeys[batchKey.Hash] if !ok { - // This should not happen, but log it just in case - log.Errorf("unexpected key %s in the offchain data", extData.Key.Hex()) - continue + missingData = append(missingData, batchKey) } + } - // If the batch number is zero, update it - if extData.BatchNum == 0 { - extData.BatchNum = batchKey.Number - data = append(data, extData) + if len(missingData) > 0 { + return storeMissingBatchKeys(ctx, bs.db, missingData) + } + + return nil +} + +func (bs *BatchSynchronizer) processMissingBatches(ctx context.Context) { + log.Info("starting handling missing batches") + for { + delay := time.NewTimer(bs.retry) + select { + case <-delay.C: + if err := bs.handleMissingBatches(ctx); err != nil { + log.Error(err) + } + case <-bs.stop: + return } + } +} - // Mark the batch as resolved - resolved = append(resolved, batchKey) +// handleMissingBatches handles missing batches that were collected by the event consumer +func (bs *BatchSynchronizer) handleMissingBatches(ctx context.Context) error { + // Get missing batches + batchKeys, err := getMissingBatchKeys(ctx, bs.db) + if err != nil { + return fmt.Errorf("failed to get missing batch keys: %v", err) + } - // Remove the key from the map - delete(hashToKeys, extData.Key) + if len(batchKeys) == 0 { + return nil } - // Resolve the remaining unresolved data - for _, key := range hashToKeys { + data := make([]types.OffChainData, 0) + for _, key := range batchKeys { value, err := bs.resolve(ctx, key) if err != nil { log.Errorf("failed to resolve batch %s: %v", key.Hash.Hex(), err) continue } - - resolved = append(resolved, key) data = append(data, *value) } - // Store data of the batches to the DB if len(data) > 0 { if err = storeOffchainData(ctx, bs.db, data); err != nil { return fmt.Errorf("failed to store offchain data: %v", err) } - } - // Mark batches as resolved - if len(resolved) > 0 { - if err = deleteUnresolvedBatchKeys(ctx, bs.db, resolved); err != nil { + if err = deleteMissingBatchKeys(ctx, bs.db, batchKeys); err != nil { return fmt.Errorf("failed to delete successfully resolved batch keys: %v", err) } } @@ -393,7 +362,7 @@ func (bs *BatchSynchronizer) resolve(ctx context.Context, batch types.BatchKey) continue // malformed committee, skip what is known to be wrong } - value, err := bs.resolveWithMember(ctx, batch, member) + value, err := bs.resolveWithMember(ctx, batch.Hash, member) if err != nil { log.Warnf("error resolving, continuing: %v", err) bs.committee.Delete(member.Addr) @@ -422,15 +391,14 @@ func (bs *BatchSynchronizer) trySequencer(ctx context.Context, batch types.Batch } return &types.OffChainData{ - Key: batch.Hash, - Value: seqBatch.BatchL2Data, - BatchNum: batch.Number, + Key: batch.Hash, + Value: seqBatch.BatchL2Data, } } func (bs *BatchSynchronizer) resolveWithMember( parentCtx context.Context, - batch types.BatchKey, + batch common.Hash, member etherman.DataCommitteeMember, ) (*types.OffChainData, error) { cm := bs.rpcClientFactory.New(member.URL) @@ -438,67 +406,20 @@ func (bs *BatchSynchronizer) resolveWithMember( ctx, cancel := context.WithTimeout(parentCtx, bs.rpcTimeout) defer cancel() - log.Debugf("trying member %v at %v for key %v", member.Addr.Hex(), member.URL, batch.Hash.Hex()) + log.Debugf("trying member %v at %v for key %v", member.Addr.Hex(), member.URL, batch.Hex()) - bytes, err := cm.GetOffChainData(ctx, batch.Hash) + bytes, err := cm.GetOffChainData(ctx, batch) if err != nil { return nil, err } expectKey := crypto.Keccak256Hash(bytes) - if batch.Hash.Cmp(expectKey) != 0 { + if batch.Cmp(expectKey) != 0 { return nil, fmt.Errorf("unexpected key gotten from member: %v. Key: %v", member.Addr.Hex(), expectKey.Hex()) } return &types.OffChainData{ - Key: batch.Hash, - Value: bytes, - BatchNum: batch.Number, + Key: batch, + Value: bytes, }, nil } - -func (bs *BatchSynchronizer) startOffchainDataGapsDetection(ctx context.Context) { - log.Info("starting handling unresolved batches") - for { - delay := time.NewTimer(time.Minute) - select { - case <-delay.C: - if err := bs.detectOffchainDataGaps(ctx); err != nil { - log.Error(err) - } - case <-bs.stop: - return - } - } -} - -// detectOffchainDataGaps detects offchain data gaps and reports them in logs and the service state. -func (bs *BatchSynchronizer) detectOffchainDataGaps(ctx context.Context) error { - // Detect offchain data gaps - gaps, err := detectOffchainDataGaps(ctx, bs.db) - if err != nil { - return fmt.Errorf("failed to detect offchain data gaps: %v", err) - } - - // No gaps found, all good - if len(gaps) == 0 { - return nil - } - - // Log the detected gaps and store the detected gaps in the service state - gapsRaw := new(bytes.Buffer) - bs.offchainDataGapsLock.Lock() - bs.offchainDataGaps = make(map[uint64]uint64, len(gaps)) - for key, value := range gaps { - bs.offchainDataGaps[key] = value - - if _, err = fmt.Fprintf(gapsRaw, "%d=>%d\n", key, value); err != nil { - log.Errorf("failed to write offchain data gaps: %v", err) - } - } - bs.offchainDataGapsLock.Unlock() - - log.Warnf("detected offchain data gaps (current batch number => expected batch number): %s", gapsRaw.String()) - - return nil -} diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index bbb905a..057574d 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -6,6 +6,7 @@ import ( "math/big" "strings" "testing" + "time" elderberryValidium "github.com/0xPolygon/cdk-contracts-tooling/contracts/elderberry/polygonvalidiumetrog" etrogValidium "github.com/0xPolygon/cdk-contracts-tooling/contracts/etrog/polygonvalidiumetrog" @@ -272,8 +273,10 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { getTxArgs []interface{} getTxReturns []interface{} // db mock - storeUnresolvedBatchKeysArgs []interface{} - storeUnresolvedBatchKeysReturns []interface{} + listOffchainDataArgs []interface{} + listOffchainDataReturns []interface{} + storeMissingBatchKeysArgs []interface{} + storeMissingBatchKeysReturns []interface{} isErrorExpected bool } @@ -324,9 +327,14 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { config.getTxReturns...).Once() } - if config.storeUnresolvedBatchKeysArgs != nil && config.storeUnresolvedBatchKeysReturns != nil { - dbMock.On("StoreUnresolvedBatchKeys", config.storeUnresolvedBatchKeysArgs...).Return( - config.storeUnresolvedBatchKeysReturns...).Once() + if config.listOffchainDataArgs != nil && config.listOffchainDataReturns != nil { + dbMock.On("ListOffChainData", config.listOffchainDataArgs...).Return( + config.listOffchainDataReturns...).Once() + } + + if config.storeMissingBatchKeysArgs != nil && config.storeMissingBatchKeysReturns != nil { + dbMock.On("StoreMissingBatchKeys", config.storeMissingBatchKeysArgs...).Return( + config.storeMissingBatchKeysReturns...).Once() } batchSynronizer := &BatchSynchronizer{ @@ -374,6 +382,18 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }) }) + t.Run("Error getting offchain data", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, errors.New("error")}, + isErrorExpected: true, + }) + }) + t.Run("doesn't have batch in storage - successfully stored (Elderberry fork)", func(t *testing.T) { t.Parallel() @@ -397,9 +417,11 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }) testFn(t, testConfig{ - getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, - getTxReturns: []interface{}{localTx, true, nil}, - storeUnresolvedBatchKeysArgs: []interface{}{ + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{localTx, true, nil}, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, nil}, + storeMissingBatchKeysArgs: []interface{}{ mock.Anything, []types.BatchKey{{ Number: 10, @@ -407,8 +429,8 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, + storeMissingBatchKeysReturns: []interface{}{nil}, + isErrorExpected: false, }) }) @@ -416,9 +438,11 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, - getTxReturns: []interface{}{tx, true, nil}, - storeUnresolvedBatchKeysArgs: []interface{}{ + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, nil}, + storeMissingBatchKeysArgs: []interface{}{ mock.Anything, []types.BatchKey{{ Number: 10, @@ -426,8 +450,8 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, + storeMissingBatchKeysReturns: []interface{}{nil}, + isErrorExpected: false, }) }) @@ -435,8 +459,10 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { t.Parallel() testFn(t, testConfig{ - isErrorExpected: true, - storeUnresolvedBatchKeysArgs: []interface{}{ + isErrorExpected: true, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{nil, nil}, + storeMissingBatchKeysArgs: []interface{}{ mock.Anything, []types.BatchKey{{ Number: 10, @@ -444,26 +470,65 @@ func TestBatchSynchronizer_HandleEvent(t *testing.T) { }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{errors.New("error")}, - getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, - getTxReturns: []interface{}{tx, true, nil}, + storeMissingBatchKeysReturns: []interface{}{errors.New("error")}, + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, }) }) + + t.Run("have batch in storage already no error", func(t *testing.T) { + t.Parallel() + + testFn(t, testConfig{ + isErrorExpected: false, + listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, + listOffchainDataReturns: []interface{}{ + []types.OffChainData{ + { + Key: txHash, + Value: batchL2Data, + }, + }, nil, + }, + getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, + getTxReturns: []interface{}{tx, true, nil}, + }) + }) +} + +func TestBatchSynchronizer_ProcessMissingBatches(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dbMock := mocks.NewDB(t) + dbMock.On("GetMissingBatchKeys", mock.Anything, mock.Anything).Return( + []types.BatchKey{}, nil) + + batchSynronizer := &BatchSynchronizer{ + db: dbMock, + retry: time.Millisecond * 100, + stop: make(chan struct{}), + } + go batchSynronizer.processMissingBatches(ctx) + + // Wait for the retry interval and then signal to stop + time.Sleep(time.Millisecond * 200) + batchSynronizer.stop <- struct{}{} + dbMock.AssertExpectations(t) } -func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { +func TestBatchSynchronizer_HandleMissingBatches(t *testing.T) { t.Parallel() type testConfig struct { // db mock - getUnresolvedBatchKeysArgs []interface{} - getUnresolvedBatchKeysReturns []interface{} - listOffchainDataArgs []interface{} - listOffchainDataReturns []interface{} - storeOffChainDataArgs []interface{} - storeOffChainDataReturns []interface{} - deleteUnresolvedBatchKeysArgs []interface{} - deleteUnresolvedBatchKeysReturns []interface{} + getMissingBatchKeysArgs []interface{} + getMissingBatchKeysReturns []interface{} + storeOffChainDataArgs []interface{} + storeOffChainDataReturns []interface{} + deleteMissingBatchKeysArgs []interface{} + deleteMissingBatchKeysReturns []interface{} // sequencer mocks getSequenceBatchArgs []interface{} getSequenceBatchReturns []interface{} @@ -481,14 +546,9 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { ethermanMock := mocks.NewEtherman(t) sequencerMock := mocks.NewSequencerTracker(t) - if config.getUnresolvedBatchKeysArgs != nil && config.getUnresolvedBatchKeysReturns != nil { - dbMock.On("GetUnresolvedBatchKeys", config.getUnresolvedBatchKeysArgs...).Return( - config.getUnresolvedBatchKeysReturns...).Once() - } - - if config.listOffchainDataArgs != nil && config.listOffchainDataReturns != nil { - dbMock.On("ListOffChainData", config.listOffchainDataArgs...).Return( - config.listOffchainDataReturns...).Once() + if config.getMissingBatchKeysArgs != nil && config.getMissingBatchKeysReturns != nil { + dbMock.On("GetMissingBatchKeys", config.getMissingBatchKeysArgs...).Return( + config.getMissingBatchKeysReturns...).Once() } if config.storeOffChainDataArgs != nil && config.storeOffChainDataReturns != nil { @@ -496,9 +556,9 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { config.storeOffChainDataReturns...).Once() } - if config.deleteUnresolvedBatchKeysArgs != nil && config.deleteUnresolvedBatchKeysReturns != nil { - dbMock.On("DeleteUnresolvedBatchKeys", config.deleteUnresolvedBatchKeysArgs...).Return( - config.deleteUnresolvedBatchKeysReturns...).Once() + if config.deleteMissingBatchKeysArgs != nil && config.deleteMissingBatchKeysReturns != nil { + dbMock.On("DeleteMissingBatchKeys", config.deleteMissingBatchKeysArgs...).Return( + config.deleteMissingBatchKeysReturns...).Once() } if config.getSequenceBatchArgs != nil && config.getSequenceBatchReturns != nil { @@ -512,7 +572,7 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { sequencer: sequencerMock, } - err := batchSynronizer.handleUnresolvedBatches(context.Background()) + err := batchSynronizer.handleMissingBatches(context.Background()) if config.isErrorExpected { require.Error(t, err) } else { @@ -524,132 +584,123 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { sequencerMock.AssertExpectations(t) } - t.Run("Could not get unresolved batch keys", func(t *testing.T) { + t.Run("Could not get missing batch keys", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{nil, errors.New("error")}, - isErrorExpected: true, + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{nil, errors.New("error")}, + isErrorExpected: true, }) }) - t.Run("No unresolved batch keys found", func(t *testing.T) { + t.Run("No missing batch keys found", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{nil, nil}, - isErrorExpected: false, + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{nil, nil}, + isErrorExpected: false, }) }) - t.Run("Unresolved batch key already resolved", func(t *testing.T) { + t.Run("Missing batch key found", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{ + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{ []types.BatchKey{{ Number: 10, Hash: txHash, }}, nil, }, - listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, - listOffchainDataReturns: []interface{}{[]types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 10, - }}, nil}, - deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, + storeOffChainDataArgs: []interface{}{mock.Anything, + []types.OffChainData{{ + Key: txHash, + Value: batchL2Data, + }}, + }, + storeOffChainDataReturns: []interface{}{nil}, + deleteMissingBatchKeysArgs: []interface{}{mock.Anything, []types.BatchKey{{ Number: 10, Hash: txHash, }}, mock.Anything, }, - deleteUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, + deleteMissingBatchKeysReturns: []interface{}{nil}, + getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, + getSequenceBatchReturns: []interface{}{&sequencer.SeqBatch{ + Number: types.ArgUint64(10), + BatchL2Data: types.ArgBytes(batchL2Data), + }, nil}, + isErrorExpected: false, }) }) - t.Run("Unresolved batch key already resolved with no batch number", func(t *testing.T) { + t.Run("DB error while storing missing batch", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{ + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{ []types.BatchKey{{ Number: 10, Hash: txHash, }}, nil, }, - listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, - listOffchainDataReturns: []interface{}{[]types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 0, - }}, nil}, storeOffChainDataArgs: []interface{}{mock.Anything, []types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 10, - }}, - mock.Anything, - }, - storeOffChainDataReturns: []interface{}{nil}, - deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, - []types.BatchKey{{ - Number: 10, - Hash: txHash, + Key: txHash, + Value: batchL2Data, }}, - mock.Anything, }, - deleteUnresolvedBatchKeysReturns: []interface{}{nil}, - isErrorExpected: false, + storeOffChainDataReturns: []interface{}{errors.New("error")}, + getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, + getSequenceBatchReturns: []interface{}{&sequencer.SeqBatch{ + Number: types.ArgUint64(10), + BatchL2Data: types.ArgBytes(batchL2Data), + }, nil}, + isErrorExpected: true, }) }) - t.Run("Unresolved batch key found", func(t *testing.T) { + t.Run("DB error while deleting missing batch entries", func(t *testing.T) { t.Parallel() testFn(t, testConfig{ - getUnresolvedBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, - getUnresolvedBatchKeysReturns: []interface{}{ + getMissingBatchKeysArgs: []interface{}{mock.Anything, uint(100)}, + getMissingBatchKeysReturns: []interface{}{ []types.BatchKey{{ Number: 10, Hash: txHash, }}, nil, }, - listOffchainDataArgs: []interface{}{mock.Anything, []common.Hash{txHash}}, - listOffchainDataReturns: []interface{}{nil, nil}, storeOffChainDataArgs: []interface{}{mock.Anything, []types.OffChainData{{ - Key: txHash, - Value: batchL2Data, - BatchNum: 10, + Key: txHash, + Value: batchL2Data, }}, - mock.Anything, }, storeOffChainDataReturns: []interface{}{nil}, - deleteUnresolvedBatchKeysArgs: []interface{}{mock.Anything, + deleteMissingBatchKeysArgs: []interface{}{mock.Anything, []types.BatchKey{{ Number: 10, Hash: txHash, }}, mock.Anything, }, - deleteUnresolvedBatchKeysReturns: []interface{}{nil}, - getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, + deleteMissingBatchKeysReturns: []interface{}{errors.New("error")}, + getSequenceBatchArgs: []interface{}{context.Background(), uint64(10)}, getSequenceBatchReturns: []interface{}{&sequencer.SeqBatch{ Number: types.ArgUint64(10), BatchL2Data: types.ArgBytes(batchL2Data), }, nil}, - isErrorExpected: false, + isErrorExpected: true, }) }) @@ -684,14 +735,14 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { BatchL2Data: types.ArgBytes(batchL2Data), }, nil}, beginStateTransactionArgs: []interface{}{mock.Anything}, - storeUnresolvedBatchKeysArgs: []interface{}{mock.Anything, + storeMissingBatchKeysArgs: []interface{}{mock.Anything, []types.OffChainData{{ Key: txHash, Value: batchL2Data, }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{nil}, + storeMissingBatchKeysReturns: []interface{}{nil}, commitReturns: []interface{}{nil}, isErrorExpected: false, }) @@ -702,14 +753,14 @@ func TestBatchSynchronizer_HandleUnresolvedBatches(t *testing.T) { testFn(testConfig{ isErrorExpected: true, - storeUnresolvedBatchKeysArgs: []interface{}{mock.Anything, + storeMissingBatchKeysArgs: []interface{}{mock.Anything, []types.BatchKey{{ Number: 1, Hash: txHash, }}, mock.Anything, }, - storeUnresolvedBatchKeysReturns: []interface{}{errors.New("error")}, + storeMissingBatchKeysReturns: []interface{}{errors.New("error")}, beginStateTransactionArgs: []interface{}{mock.Anything}, rollbackArgs: []interface{}{mock.Anything}, getTxArgs: []interface{}{mock.Anything, event.Raw.TxHash}, @@ -804,74 +855,3 @@ func TestBatchSynchronizer_HandleReorgs(t *testing.T) { }) }) } - -func TestBatchSynchronizer_detectOffchainDataGaps(t *testing.T) { - t.Parallel() - - type testConfig struct { - // db mock - detectOffchainDataGapsArgs []interface{} - detectOffchainDataGapsReturns []interface{} - - expectedGaps map[uint64]uint64 - isErrorExpected bool - } - - testFn := func(t *testing.T, config testConfig) { - t.Helper() - - dbMock := mocks.NewDB(t) - - if config.detectOffchainDataGapsArgs != nil && config.detectOffchainDataGapsReturns != nil { - dbMock.On("DetectOffchainDataGaps", config.detectOffchainDataGapsArgs...).Return( - config.detectOffchainDataGapsReturns...).Once() - } - - batchSynronizer := &BatchSynchronizer{ - db: dbMock, - } - - err := batchSynronizer.detectOffchainDataGaps(context.Background()) - if config.isErrorExpected { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, config.expectedGaps, batchSynronizer.Gaps()) - } - - dbMock.AssertExpectations(t) - } - - t.Run("no gaps detected", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - detectOffchainDataGapsArgs: []interface{}{mock.Anything}, - detectOffchainDataGapsReturns: []interface{}{map[uint64]uint64{}, nil}, - expectedGaps: map[uint64]uint64{}, - isErrorExpected: false, - }) - }) - - t.Run("one gap detected", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - detectOffchainDataGapsArgs: []interface{}{mock.Anything}, - detectOffchainDataGapsReturns: []interface{}{map[uint64]uint64{1: 3}, nil}, - expectedGaps: map[uint64]uint64{1: 3}, - isErrorExpected: false, - }) - }) - - t.Run("failed to detect gaps", func(t *testing.T) { - t.Parallel() - - testFn(t, testConfig{ - detectOffchainDataGapsArgs: []interface{}{mock.Anything}, - detectOffchainDataGapsReturns: []interface{}{nil, errors.New("test error")}, - expectedGaps: map[uint64]uint64{}, - isErrorExpected: true, - }) - }) -} diff --git a/synchronizer/store.go b/synchronizer/store.go index 3ba5373..7bd8b99 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -43,25 +43,25 @@ func setStartBlock(parentCtx context.Context, db dbTypes.DB, block uint64, syncT return db.StoreLastProcessedBlock(ctx, block, string(syncTask)) } -func storeUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { +func storeMissingBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() - return db.StoreUnresolvedBatchKeys(ctx, keys) + return db.StoreMissingBatchKeys(ctx, keys) } -func getUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB) ([]types.BatchKey, error) { +func getMissingBatchKeys(parentCtx context.Context, db dbTypes.DB) ([]types.BatchKey, error) { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() - return db.GetUnresolvedBatchKeys(ctx, maxUnprocessedBatch) + return db.GetMissingBatchKeys(ctx, maxUnprocessedBatch) } -func deleteUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { +func deleteMissingBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() - return db.DeleteUnresolvedBatchKeys(ctx, keys) + return db.DeleteMissingBatchKeys(ctx, keys) } func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) { @@ -77,10 +77,3 @@ func storeOffchainData(parentCtx context.Context, db dbTypes.DB, data []types.Of return db.StoreOffChainData(ctx, data) } - -func detectOffchainDataGaps(parentCtx context.Context, db dbTypes.DB) (map[uint64]uint64, error) { - ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) - defer cancel() - - return db.DetectOffchainDataGaps(ctx) -} diff --git a/synchronizer/store_test.go b/synchronizer/store_test.go index c32091e..f005c02 100644 --- a/synchronizer/store_test.go +++ b/synchronizer/store_test.go @@ -125,7 +125,7 @@ func Test_setStartBlock(t *testing.T) { } } -func Test_storeUnresolvedBatchKeys(t *testing.T) { +func Test_storeMissingBatchKeys(t *testing.T) { t.Parallel() testError := errors.New("test error") @@ -143,12 +143,12 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { wantErr bool }{ { - name: "StoreUnresolvedBatchKeys returns error", + name: "StoreMissingBatchKeys returns error", db: func(t *testing.T) db.DB { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("StoreUnresolvedBatchKeys", mock.Anything, testData).Return(testError) + mockDB.On("StoreMissingBatchKeys", mock.Anything, testData).Return(testError) return mockDB }, @@ -161,7 +161,7 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("StoreUnresolvedBatchKeys", mock.Anything, testData).Return(nil) + mockDB.On("StoreMissingBatchKeys", mock.Anything, testData).Return(nil) return mockDB }, @@ -176,7 +176,7 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { testDB := tt.db(t) - if err := storeUnresolvedBatchKeys(context.Background(), testDB, tt.keys); tt.wantErr { + if err := storeMissingBatchKeys(context.Background(), testDB, tt.keys); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -185,7 +185,7 @@ func Test_storeUnresolvedBatchKeys(t *testing.T) { } } -func Test_getUnresolvedBatchKeys(t *testing.T) { +func Test_getMissingBatchKeys(t *testing.T) { t.Parallel() testError := errors.New("test error") @@ -203,12 +203,12 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { wantErr bool }{ { - name: "GetUnresolvedBatchKeys returns error", + name: "GetMissingBatchKeys returns error", db: func(t *testing.T) db.DB { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)). + mockDB.On("GetMissingBatchKeys", mock.Anything, uint(100)). Return(nil, testError) return mockDB @@ -221,7 +221,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("GetUnresolvedBatchKeys", mock.Anything, uint(100)).Return(testData, nil) + mockDB.On("GetMissingBatchKeys", mock.Anything, uint(100)).Return(testData, nil) return mockDB }, @@ -236,7 +236,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { testDB := tt.db(t) - if keys, err := getUnresolvedBatchKeys(context.Background(), testDB); tt.wantErr { + if keys, err := getMissingBatchKeys(context.Background(), testDB); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -246,7 +246,7 @@ func Test_getUnresolvedBatchKeys(t *testing.T) { } } -func Test_deleteUnresolvedBatchKeys(t *testing.T) { +func Test_deleteMissingBatchKeys(t *testing.T) { t.Parallel() testError := errors.New("test error") @@ -263,12 +263,12 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { wantErr bool }{ { - name: "DeleteUnresolvedBatchKeys returns error", + name: "DeleteMissingBatchKeys returns error", db: func(t *testing.T) db.DB { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("DeleteUnresolvedBatchKeys", mock.Anything, testData). + mockDB.On("DeleteMissingBatchKeys", mock.Anything, testData). Return(testError) return mockDB @@ -281,7 +281,7 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { t.Helper() mockDB := mocks.NewDB(t) - mockDB.On("DeleteUnresolvedBatchKeys", mock.Anything, testData). + mockDB.On("DeleteMissingBatchKeys", mock.Anything, testData). Return(nil) return mockDB @@ -296,7 +296,7 @@ func Test_deleteUnresolvedBatchKeys(t *testing.T) { testDB := tt.db(t) - if err := deleteUnresolvedBatchKeys(context.Background(), testDB, testData); tt.wantErr { + if err := deleteMissingBatchKeys(context.Background(), testDB, testData); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) @@ -364,61 +364,3 @@ func Test_storeOffchainData(t *testing.T) { }) } } - -func Test_detectOffchainDataGaps(t *testing.T) { - t.Parallel() - - testError := errors.New("test error") - - tests := []struct { - name string - db func(t *testing.T) db.DB - gaps map[uint64]uint64 - wantErr bool - }{ - { - name: "DetectOffchainDataGaps returns error", - db: func(t *testing.T) db.DB { - t.Helper() - - mockDB := mocks.NewDB(t) - - mockDB.On("DetectOffchainDataGaps", mock.Anything).Return(nil, testError) - - return mockDB - }, - gaps: nil, - wantErr: true, - }, - { - name: "all good", - db: func(t *testing.T) db.DB { - t.Helper() - - mockDB := mocks.NewDB(t) - - mockDB.On("DetectOffchainDataGaps", mock.Anything).Return(map[uint64]uint64{1: 3}, nil) - - return mockDB - }, - gaps: map[uint64]uint64{1: 3}, - wantErr: false, - }, - } - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - testDB := tt.db(t) - - if gaps, err := detectOffchainDataGaps(context.Background(), testDB); tt.wantErr { - require.ErrorIs(t, err, testError) - } else { - require.NoError(t, err) - require.Equal(t, tt.gaps, gaps) - } - }) - } -} diff --git a/test/config/test.local.toml b/test/config/test.local.toml index b19a167..d27a420 100644 --- a/test/config/test.local.toml +++ b/test/config/test.local.toml @@ -1,4 +1,4 @@ -PrivateKey = {Path = "config/test-member.keystore", Password = "testonly"} +PrivateKey = {Path = "test/config/test-member.keystore", Password = "testonly"} [L1] RpcURL = "ws://127.0.0.1:8546" diff --git a/types/types.go b/types/types.go index 6d262d7..97e4813 100644 --- a/types/types.go +++ b/types/types.go @@ -20,8 +20,7 @@ type DACStatus struct { Uptime string `json:"uptime"` Version string `json:"version"` KeyCount uint64 `json:"key_count"` - BackfillProgress uint64 `json:"backfill_progress"` - OffchainDataGapsExist bool `json:"offchain_data_gaps_exist"` + LastSynchronizedBlock uint64 `json:"last_synchronized_block"` } // BatchKey is the pairing of batch number and data hash of a batch @@ -32,9 +31,8 @@ type BatchKey struct { // OffChainData represents some data that is not stored on chain and should be preserved type OffChainData struct { - Key common.Hash - Value []byte - BatchNum uint64 + Key common.Hash + Value []byte } // RemoveDuplicateOffChainData removes duplicate off chain data