-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(dashboard): added unit tests for dashboard helpers #1309
base: staging
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |||||||||
"database/sql" | ||||||||||
"fmt" | ||||||||||
"math/big" | ||||||||||
"reflect" | ||||||||||
"time" | ||||||||||
|
||||||||||
"github.com/doug-martin/goqu/v9" | ||||||||||
|
@@ -14,6 +15,7 @@ import ( | |||||||||
"github.com/gobitfly/beaconchain/pkg/commons/cache" | ||||||||||
"github.com/gobitfly/beaconchain/pkg/commons/price" | ||||||||||
"github.com/gobitfly/beaconchain/pkg/commons/utils" | ||||||||||
"github.com/jmoiron/sqlx" | ||||||||||
"github.com/lib/pq" | ||||||||||
"github.com/pkg/errors" | ||||||||||
"github.com/shopspring/decimal" | ||||||||||
|
@@ -400,3 +402,220 @@ func (d *DataAccessService) calculateValidatorDashboardBalance(ctx context.Conte | |||||||||
} | ||||||||||
return balances, nil | ||||||||||
} | ||||||||||
|
||||||||||
func (d *DataAccessService) GetLatestExportedChartTs(ctx context.Context, aggregation enums.ChartAggregation) (uint64, error) { | ||||||||||
view, dateColumn, err := d.getViewAndDateColumn(aggregation) | ||||||||||
if err != nil { | ||||||||||
return 0, err | ||||||||||
} | ||||||||||
|
||||||||||
query := fmt.Sprintf(`SELECT max(%s) FROM %s`, dateColumn, view) | ||||||||||
var ts time.Time | ||||||||||
err = d.clickhouseReader.GetContext(ctx, &ts, query) | ||||||||||
if err != nil { | ||||||||||
return 0, fmt.Errorf("error retrieving latest exported chart timestamp: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
return uint64(ts.Unix()), nil | ||||||||||
} | ||||||||||
|
||||||||||
// --- Generic Query Execution --- | ||||||||||
func executeQuery[T any](ctx context.Context, db *sqlx.DB, ds *goqu.SelectDataset) (T, error) { | ||||||||||
query, args, err := ds.Prepared(true).ToSQL() | ||||||||||
if err != nil { | ||||||||||
var zero T | ||||||||||
return zero, fmt.Errorf("error preparing query: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
var result T | ||||||||||
resultType := reflect.TypeOf(result) | ||||||||||
|
||||||||||
if resultType != nil && resultType.Kind() == reflect.Slice { | ||||||||||
err = db.SelectContext(ctx, &result, query, args...) | ||||||||||
} else { | ||||||||||
err = db.GetContext(ctx, &result, query, args...) | ||||||||||
} | ||||||||||
Comment on lines
+430
to
+437
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: I'd do 2 separate function for select and get rather than doing a runtime type check for every query, since reflecting does have a performance impact. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also we're gonna need something similar for |
||||||||||
|
||||||||||
if err != nil { | ||||||||||
return result, fmt.Errorf("error executing query: %w", err) | ||||||||||
} | ||||||||||
|
||||||||||
return result, nil | ||||||||||
} | ||||||||||
|
||||||||||
// --- Structs --- | ||||||||||
type SyncCommitteeResult struct { | ||||||||||
ValidatorIndex uint64 `db:"validatorindex"` | ||||||||||
Period uint64 `db:"period"` | ||||||||||
} | ||||||||||
|
||||||||||
// --- Sync Committee Functions --- | ||||||||||
func (d *DataAccessService) getCurrentAndUpcomingSyncCommittees(ctx context.Context, latestEpoch uint64) (map[uint64]bool, map[uint64]bool, error) { | ||||||||||
ds := buildSyncCommitteeQuery(latestEpoch) | ||||||||||
queryResult, err := executeQuery[[]SyncCommitteeResult](ctx, d.readerDb, ds) | ||||||||||
if err != nil { | ||||||||||
return nil, nil, err | ||||||||||
} | ||||||||||
|
||||||||||
current, upcoming := processSyncCommitteeResults(queryResult, utils.SyncPeriodOfEpoch(latestEpoch)) | ||||||||||
return current, upcoming, nil | ||||||||||
} | ||||||||||
|
||||||||||
func buildSyncCommitteeQuery(latestEpoch uint64) *goqu.SelectDataset { | ||||||||||
currentSyncPeriod := utils.SyncPeriodOfEpoch(latestEpoch) | ||||||||||
return goqu.Dialect("postgres"). | ||||||||||
Select( | ||||||||||
goqu.L("validatorindex"), | ||||||||||
goqu.L("period"), | ||||||||||
). | ||||||||||
From("sync_committees"). | ||||||||||
Where(goqu.L("period IN (?, ?)", currentSyncPeriod, currentSyncPeriod+1)) | ||||||||||
} | ||||||||||
|
||||||||||
func processSyncCommitteeResults(queryResult []SyncCommitteeResult, currentSyncPeriod uint64) (map[uint64]bool, map[uint64]bool) { | ||||||||||
currentSyncCommitteeValidators := make(map[uint64]bool) | ||||||||||
upcomingSyncCommitteeValidators := make(map[uint64]bool) | ||||||||||
|
||||||||||
for _, entry := range queryResult { | ||||||||||
if entry.Period == currentSyncPeriod { | ||||||||||
currentSyncCommitteeValidators[entry.ValidatorIndex] = true | ||||||||||
} else { | ||||||||||
upcomingSyncCommitteeValidators[entry.ValidatorIndex] = true | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
return currentSyncCommitteeValidators, upcomingSyncCommitteeValidators | ||||||||||
} | ||||||||||
|
||||||||||
// --- Epoch Start Functions --- | ||||||||||
func (d *DataAccessService) getEpochStart(ctx context.Context, period enums.TimePeriod) (uint64, error) { | ||||||||||
ds, err := buildEpochStartQuery(d, period) | ||||||||||
if err != nil { | ||||||||||
return 0, err | ||||||||||
} | ||||||||||
|
||||||||||
epochStart, err := executeQuery[uint64](ctx, d.clickhouseReader, ds) | ||||||||||
if err != nil { | ||||||||||
return 0, err | ||||||||||
} | ||||||||||
|
||||||||||
return epochStart, nil | ||||||||||
} | ||||||||||
|
||||||||||
func buildEpochStartQuery(d *DataAccessService, period enums.TimePeriod) (*goqu.SelectDataset, error) { | ||||||||||
clickhouseTable, _, err := d.getTablesForPeriod(period) | ||||||||||
if err != nil { | ||||||||||
return nil, err | ||||||||||
} | ||||||||||
|
||||||||||
ds := goqu.Dialect("postgres"). | ||||||||||
Select(goqu.L("epoch_start")). | ||||||||||
From(goqu.L(fmt.Sprintf("%s FINAL", clickhouseTable))). | ||||||||||
Order(goqu.L("epoch_start").Asc()). | ||||||||||
Limit(1) | ||||||||||
|
||||||||||
return ds, nil | ||||||||||
} | ||||||||||
|
||||||||||
// --- Past Sync Committees Functions --- | ||||||||||
func (d *DataAccessService) getPastSyncCommittees(ctx context.Context, indices []uint64, epochStart uint64, latestEpoch uint64) (map[uint64]uint64, error) { | ||||||||||
ds := buildPastSyncCommitteesQuery(indices, epochStart, latestEpoch) | ||||||||||
validatorIndices, err := executeQuery[[]uint64](ctx, d.alloyReader, ds) | ||||||||||
if err != nil { | ||||||||||
return nil, err | ||||||||||
} | ||||||||||
|
||||||||||
validatorCountMap := processPastSyncCommitteesResults(validatorIndices) | ||||||||||
return validatorCountMap, nil | ||||||||||
Comment on lines
+528
to
+529
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick (non-blocking):
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
func buildPastSyncCommitteesQuery(indices []uint64, epochStart, latestEpoch uint64) *goqu.SelectDataset { | ||||||||||
pastSyncPeriodCutoff := utils.SyncPeriodOfEpoch(epochStart) | ||||||||||
currentSyncPeriod := utils.SyncPeriodOfEpoch(latestEpoch) | ||||||||||
Comment on lines
+533
to
+534
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: You have a bit of logic in your build query func? Do we want this? Or should we maybe pass this to the builder func? |
||||||||||
|
||||||||||
return goqu.Dialect("postgres"). | ||||||||||
Select(goqu.L("sc.validatorindex")). | ||||||||||
From(goqu.L("sync_committees sc")). | ||||||||||
Where(goqu.L("period >= ? AND period < ? AND validatorindex = ANY(?)", pastSyncPeriodCutoff, currentSyncPeriod, pq.Array(indices))) | ||||||||||
} | ||||||||||
|
||||||||||
func processPastSyncCommitteesResults(validatorIndices []uint64) map[uint64]uint64 { | ||||||||||
validatorCountMap := make(map[uint64]uint64) | ||||||||||
for _, validatorIndex := range validatorIndices { | ||||||||||
validatorCountMap[validatorIndex]++ | ||||||||||
} | ||||||||||
return validatorCountMap | ||||||||||
} | ||||||||||
|
||||||||||
func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, int, error) { | ||||||||||
table := "" | ||||||||||
hours := 0 | ||||||||||
|
||||||||||
switch period { | ||||||||||
case enums.TimePeriods.Last1h: | ||||||||||
table = "validator_dashboard_data_rolling_1h" | ||||||||||
hours = 1 | ||||||||||
case enums.TimePeriods.Last24h: | ||||||||||
table = "validator_dashboard_data_rolling_24h" | ||||||||||
hours = 24 | ||||||||||
case enums.TimePeriods.Last7d: | ||||||||||
table = "validator_dashboard_data_rolling_7d" | ||||||||||
hours = 7 * 24 | ||||||||||
case enums.TimePeriods.Last30d: | ||||||||||
table = "validator_dashboard_data_rolling_30d" | ||||||||||
hours = 30 * 24 | ||||||||||
case enums.TimePeriods.AllTime: | ||||||||||
table = "validator_dashboard_data_rolling_total" | ||||||||||
hours = -1 | ||||||||||
default: | ||||||||||
return "", 0, fmt.Errorf("not-implemented time period: %v", period) | ||||||||||
} | ||||||||||
|
||||||||||
return table, hours, nil | ||||||||||
} | ||||||||||
|
||||||||||
func (d *DataAccessService) getTableAndDateColumn(aggregation enums.ChartAggregation) (string, string, error) { | ||||||||||
var table, dateColumn string | ||||||||||
|
||||||||||
switch aggregation { | ||||||||||
case enums.IntervalEpoch: | ||||||||||
table = "validator_dashboard_data_epoch" | ||||||||||
dateColumn = "epoch_timestamp" | ||||||||||
case enums.IntervalHourly: | ||||||||||
table = "validator_dashboard_data_hourly" | ||||||||||
dateColumn = "t" | ||||||||||
case enums.IntervalDaily: | ||||||||||
table = "validator_dashboard_data_daily" | ||||||||||
dateColumn = "t" | ||||||||||
case enums.IntervalWeekly: | ||||||||||
table = "validator_dashboard_data_weekly" | ||||||||||
dateColumn = "t" | ||||||||||
default: | ||||||||||
return "", "", fmt.Errorf("unexpected aggregation type: %v", aggregation) | ||||||||||
} | ||||||||||
|
||||||||||
return table, dateColumn, nil | ||||||||||
} | ||||||||||
|
||||||||||
func (d *DataAccessService) getViewAndDateColumn(aggregation enums.ChartAggregation) (string, string, error) { | ||||||||||
var view, dateColumn string | ||||||||||
|
||||||||||
switch aggregation { | ||||||||||
case enums.IntervalEpoch: | ||||||||||
view = "view_validator_dashboard_data_epoch_max_ts" | ||||||||||
dateColumn = "t" | ||||||||||
case enums.IntervalHourly: | ||||||||||
view = "view_validator_dashboard_data_hourly_max_ts" | ||||||||||
dateColumn = "t" | ||||||||||
case enums.IntervalDaily: | ||||||||||
view = "view_validator_dashboard_data_daily_max_ts" | ||||||||||
dateColumn = "t" | ||||||||||
case enums.IntervalWeekly: | ||||||||||
view = "view_validator_dashboard_data_weekly_max_ts" | ||||||||||
dateColumn = "t" | ||||||||||
default: | ||||||||||
return "", "", fmt.Errorf("unexpected aggregation type: %v", aggregation) | ||||||||||
} | ||||||||||
|
||||||||||
return view, dateColumn, nil | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: shouldn't we use goqu for all queries?