Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pectra support for clickhouse consensus layer exporter #1307

Draft
wants to merge 3 commits into
base: BEDS-140/clickhouse-exporter-head-2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions backend/cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func Run() {
}

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
db.WriterDb, db.ReaderDb = db.MustInitDB(&cfg.WriterDatabase, &cfg.ReaderDatabase, "pgx", "postgres")
}()
if !cfg.JustV2 {
wg.Add(1)
go func() {
defer wg.Done()
db.WriterDb, db.ReaderDb = db.MustInitDB(&cfg.WriterDatabase, &cfg.ReaderDatabase, "pgx", "postgres")
}()
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -152,12 +152,12 @@ func Run() {
monitoring.Start()

if !cfg.JustV2 {
defer db.ReaderDb.Close()
defer db.WriterDb.Close()
defer db.AlloyReader.Close()
defer db.AlloyWriter.Close()
defer db.BigtableClient.Close()
}
defer db.ReaderDb.Close() // we need it to get the pectra workaround events
defer db.WriterDb.Close()
defer db.ClickHouseReader.Close()
defer db.ClickHouseWriter.Close()
defer db.ClickHouseNativeWriter.Close()
Expand Down

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions backend/pkg/commons/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,27 @@ func WarnWithFields(additionalInfos Fields, msg string) {
}

func Tracef(format string, args ...interface{}) {
logrus.Tracef(format, args...)
if logrus.IsLevelEnabled(logrus.TraceLevel) { // performance optimization
logrus.Tracef(format, args...)
}
}

func TraceWithFields(additionalInfos Fields, msg string) {
logrus.WithFields(additionalInfos).Trace(msg)
if logrus.IsLevelEnabled(logrus.TraceLevel) {
logrus.WithFields(additionalInfos).Trace(msg)
}
}

func DebugWithFields(additionalInfos Fields, msg string) {
logrus.WithFields(additionalInfos).Debug(msg)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithFields(additionalInfos).Debug(msg)
}
}

func Debugf(format string, args ...interface{}) {
logrus.Debugf(format, args...)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Debugf(format, args...)
}
}

func logErrorInfo(err error, callerSkip int, isWarning bool, additionalInfos ...Fields) *logrus.Entry {
Expand Down
8 changes: 2 additions & 6 deletions backend/pkg/commons/types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@ type ClChainConfig struct {
CapellaForkEpoch uint64 `yaml:"CAPELLA_FORK_EPOCH"`
DenebForkVersion string `yaml:"DENEB_FORK_VERSION"`
DenebForkEpoch uint64 `yaml:"DENEB_FORK_EPOCH"`
Eip6110ForkVersion string `yaml:"EIP6110_FORK_VERSION"`
Eip6110ForkEpoch uint64 `yaml:"EIP6110_FORK_EPOCH"`
Eip7002ForkVersion string `yaml:"EIP7002_FORK_VERSION"`
Eip7002ForkEpoch uint64 `yaml:"EIP7002_FORK_EPOCH"`
WhiskForkVersion string `yaml:"WHISK_FORK_VERSION"`
WhiskForkEpoch uint64 `yaml:"WHISK_FORK_EPOCH"`
ElectraForkVersion string `yaml:"ELECTRA_FORK_VERSION"`
ElectraForkEpoch uint64 `yaml:"ELECTRA_FORK_EPOCH"`
// time parameters
SecondsPerSlot uint64 `yaml:"SECONDS_PER_SLOT"`
SecondsPerEth1Block uint64 `yaml:"SECONDS_PER_ETH1_BLOCK"`
Expand Down
17 changes: 10 additions & 7 deletions backend/pkg/commons/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,16 @@ type Config struct {
Enabled bool `yaml:"enabled" env:"ENABLED"`
} `yaml:"mevBoostRelayExporter" env:", prefix=MEVBOOSTRELAY_EXPORTER_"`
DashboardExporter struct {
RollingsInParallel int64 `yaml:"rollingsAtOnce" envconfig:"DASHBOARD_EXPORTER_ROLLINGS_AT_ONCE"` // how many rollings to do at once
RollingPartsInParallel int64 `yaml:"rollingsInParallel" envconfig:"DASHBOARD_EXPORTER_ROLLINGS_IN_PARALLEL"` // how man parts of a single rolling to do at once
TransferInParallel int64 `yaml:"transferInParallel" envconfig:"DASHBOARD_EXPORTER_TRANSFER_IN_PARALLEL"` // how many transfers to do at once
TransferAtOnce int64 `yaml:"transferAtOnce" envconfig:"DASHBOARD_EXPORTER_TRANSFER_AT_ONCE"` // how much data to transfer in a single transfer
FetchAtOnceLimit int64 `yaml:"fetchAtOnceLimit" envconfig:"DASHBOARD_EXPORTER_FETCH_AT_ONCE_LIMIT"` // how much data to fetch in a single fetch
InsertAtOnceLimit int64 `yaml:"insertAtOnceLimit" envconfig:"DASHBOARD_EXPORTER_INSERT_AT_ONCE_LIMIT"` // how much data to insert in a single insert
InsertInParallel int64 `yaml:"insertInParallel" envconfig:"DASHBOARD_EXPORTER_INSERT_IN_PARALLEL"` // how many inserts to do at once
RollingsInParallel int64 `yaml:"rollingsAtOnce" envconfig:"DASHBOARD_EXPORTER_ROLLINGS_AT_ONCE"` // how many rollings to do at once
RollingPartsInParallel int64 `yaml:"rollingsInParallel" envconfig:"DASHBOARD_EXPORTER_ROLLINGS_IN_PARALLEL"` // how man parts of a single rolling to do at once
TransferInParallel int64 `yaml:"transferInParallel" envconfig:"DASHBOARD_EXPORTER_TRANSFER_IN_PARALLEL"` // how many transfers to do at once
TransferAtOnce int64 `yaml:"transferAtOnce" envconfig:"DASHBOARD_EXPORTER_TRANSFER_AT_ONCE"` // how much data to transfer in a single transfer
FetchAtOnceLimit int64 `yaml:"fetchAtOnceLimit" envconfig:"DASHBOARD_EXPORTER_FETCH_AT_ONCE_LIMIT"` // how much data to fetch in a single fetch
FetchHeavyInParallel int64 `yaml:"fetchHeavyInParallel" envconfig:"DASHBOARD_EXPORTER_FETCH_HEAVY_IN_PARALLEL"` // how many heavy fetches to do at once
FetchMediumInParallel int64 `yaml:"fetchMediumInParallel" envconfig:"DASHBOARD_EXPORTER_FETCH_MEDIUM_IN_PARALLEL"` // how many medium fetches to do at once
FetchLightInParallel int64 `yaml:"fetchLightInParallel" envconfig:"DASHBOARD_EXPORTER_FETCH_LIGHT_IN_PARALLEL"` // how many light fetches to do at once
InsertAtOnceLimit int64 `yaml:"insertAtOnceLimit" envconfig:"DASHBOARD_EXPORTER_INSERT_AT_ONCE_LIMIT"` // how much data to insert in a single insert
InsertInParallel int64 `yaml:"insertInParallel" envconfig:"DASHBOARD_EXPORTER_INSERT_IN_PARALLEL"` // how many inserts to do at once
} `yaml:"dashboardExporter" env:", prefix=DASHBOARD_EXPORTER_"`
Pprof struct {
Enabled bool `yaml:"enabled" env:"ENABLED"`
Expand Down
44 changes: 25 additions & 19 deletions backend/pkg/commons/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ func ReadConfig(cfg *types.Config, path string) error {
cfg.Frontend.SiteBrand = "beaconcha.in"
}

// rewrite to match to allow trace as well
switch strings.ToLower(os.Getenv("LOG_LEVEL")) {
case "trace":
logrus.SetLevel(logrus.TraceLevel)
case "debug":
logrus.SetLevel(logrus.DebugLevel)
case "info":
logrus.SetLevel(logrus.InfoLevel)
case "warn":
logrus.SetLevel(logrus.WarnLevel)
case "error":
logrus.SetLevel(logrus.ErrorLevel)
case "fatal":
logrus.SetLevel(logrus.FatalLevel)
case "panic":
logrus.SetLevel(logrus.PanicLevel)
}

err = setCLConfig(cfg)
if err != nil {
return err
Expand Down Expand Up @@ -402,6 +420,10 @@ func setCLConfig(cfg *types.Config) error {
log.Warnf("DenebForkEpoch not set, defaulting to maxForkEpoch")
jr.Data.DenebForkEpoch = &maxForkEpoch
}
if jr.Data.ElectraForkEpoch == nil {
log.Warnf("ElectraForkEpoch not set, defaulting to maxForkEpoch")
jr.Data.ElectraForkEpoch = &maxForkEpoch
}

chainCfg := types.ClChainConfig{
PresetBase: jr.Data.PresetBase,
Expand All @@ -421,6 +443,8 @@ func setCLConfig(cfg *types.Config) error {
CapellaForkEpoch: *jr.Data.CapellaForkEpoch,
DenebForkVersion: jr.Data.DenebForkVersion,
DenebForkEpoch: *jr.Data.DenebForkEpoch,
ElectraForkVersion: jr.Data.ElectraForkVersion,
ElectraForkEpoch: *jr.Data.ElectraForkEpoch,
SecondsPerSlot: uint64(jr.Data.SecondsPerSlot),
SecondsPerEth1Block: uint64(jr.Data.SecondsPerEth1Block),
MinValidatorWithdrawabilityDelay: uint64(jr.Data.MinValidatorWithdrawabilityDelay),
Expand All @@ -437,7 +461,7 @@ func setCLConfig(cfg *types.Config) error {
DepositContractAddress: jr.Data.DepositContractAddress,
MaxCommitteesPerSlot: uint64(jr.Data.MaxCommitteesPerSlot),
TargetCommitteeSize: uint64(jr.Data.TargetCommitteeSize),
MaxValidatorsPerCommittee: uint64(jr.Data.TargetCommitteeSize),
MaxValidatorsPerCommittee: uint64(jr.Data.MaxValidatorsPerCommittee),
ShuffleRoundCount: uint64(jr.Data.ShuffleRoundCount),
HysteresisQuotient: uint64(jr.Data.HysteresisQuotient),
HysteresisDownwardMultiplier: uint64(jr.Data.HysteresisDownwardMultiplier),
Expand Down Expand Up @@ -511,23 +535,5 @@ func setCLConfig(cfg *types.Config) error {
cfg.Chain.ClConfig = *chainConfig
}

// rewrite to match to allow trace as well
switch strings.ToLower(os.Getenv("LOG_LEVEL")) {
case "trace":
logrus.SetLevel(logrus.TraceLevel)
case "debug":
logrus.SetLevel(logrus.DebugLevel)
case "info":
logrus.SetLevel(logrus.InfoLevel)
case "warn":
logrus.SetLevel(logrus.WarnLevel)
case "error":
logrus.SetLevel(logrus.ErrorLevel)
case "fatal":
logrus.SetLevel(logrus.FatalLevel)
case "panic":
logrus.SetLevel(logrus.PanicLevel)
}

return nil
}
131 changes: 131 additions & 0 deletions backend/pkg/commons/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math"
"math/big"
"math/bits"
"os"
"os/signal"
"reflect"
Expand Down Expand Up @@ -97,6 +98,20 @@ func SliceContains(list []string, target string) bool {

// ForkVersionAtEpoch returns the forkversion active a specific epoch
func ForkVersionAtEpoch(epoch uint64) *types.ForkVersion {
if epoch >= Config.Chain.ClConfig.ElectraForkEpoch {
return &types.ForkVersion{
Epoch: Config.Chain.ClConfig.ElectraForkEpoch,
CurrentVersion: Config.Chain.ClConfig.ElectraForkVersion,
PreviousVersion: Config.Chain.ClConfig.DenebForkVersion,
}
}
if epoch >= Config.Chain.ClConfig.DenebForkEpoch {
return &types.ForkVersion{
Epoch: Config.Chain.ClConfig.DenebForkEpoch,
CurrentVersion: Config.Chain.ClConfig.DenebForkVersion,
PreviousVersion: Config.Chain.ClConfig.CapellaForkVersion,
}
}
if epoch >= Config.Chain.ClConfig.CapellaForkEpoch {
return &types.ForkVersion{
Epoch: Config.Chain.ClConfig.CapellaForkEpoch,
Expand Down Expand Up @@ -406,3 +421,119 @@ func FirstN(input string, n int) string {
}
return input[:n]
}

var quadrantMap = [16]rune{
0b0000: ' ',
0b0001: '▗',
0b0010: '▖',
0b0011: '▄',
0b0100: '▝',
0b0101: '▐',
0b0110: '▞',
0b0111: '▟',
0b1000: '▘',
0b1001: '▚',
0b1010: '▌',
0b1011: '▙',
0b1100: '▀',
0b1101: '▜',
0b1110: '▛',
0b1111: '█',
}

func getBit(rowData []byte, bitIndex int) int {

byteIndex := bitIndex / 8
if byteIndex < 0 || byteIndex >= len(rowData) {
return 0
}
bitInside := 7 - (bitIndex % 8)
return int((rowData[byteIndex] >> bitInside) & 1)
}

func makeQuadrantRow(topRowData, bottomRowData []byte, bytesPerRow int) []rune {

nQuads := 4 * bytesPerRow
result := make([]rune, nQuads)
for c := 0; c < nQuads; c++ {
bitLeft := 2 * c
bitRight := bitLeft + 1
tl := getBit(topRowData, bitLeft)
tr := getBit(topRowData, bitRight)
bl := getBit(bottomRowData, bitLeft)
br := getBit(bottomRowData, bitRight)

mask := (tl << 3) | (tr << 2) | (bl << 1) | (br << 0)
result[c] = quadrantMap[mask]
}
return result
}

func drawLine(leftCorner, midJunction, rightCorner rune, columns int) string {
groupCount := columns / 4
var sb strings.Builder

sb.WriteRune(leftCorner)
for g := 0; g < groupCount; g++ {
sb.WriteString("════")
if g < groupCount-1 {
sb.WriteRune(midJunction)
}
}
sb.WriteRune(rightCorner)
return sb.String()
}

func RenderByteArrayAsQuadrants(data []byte, bytesPerRow int, reverseBytes bool) string {
if reverseBytes {
for i, d := range data {
data[i] = bits.Reverse8(d)
}
}

var sb strings.Builder

columns := 4 * bytesPerRow
step := 2 * bytesPerRow
nLines := (len(data) + step - 1) / step

topLine := drawLine('╔', '╦', '╗', columns)
sb.WriteString(topLine + "\n")

offset := 0
for row := 0; row < nLines; row++ {
topRow := make([]byte, bytesPerRow)
for j := 0; j < bytesPerRow && offset+j < len(data); j++ {
topRow[j] = data[offset+j]
}

botRow := make([]byte, bytesPerRow)
for j := 0; j < bytesPerRow && offset+bytesPerRow+j < len(data); j++ {
botRow[j] = data[offset+bytesPerRow+j]
}

offset += step

quads := makeQuadrantRow(topRow, botRow, bytesPerRow)

sb.WriteRune('║')
for c := 0; c < len(quads); c++ {
sb.WriteRune(quads[c])
if (c+1)%4 == 0 && c < len(quads)-1 {
sb.WriteRune('║')
}
}
sb.WriteRune('║')
sb.WriteRune('\n')

if row < nLines-1 {
midLine := drawLine('╠', '╬', '╣', columns)
sb.WriteString(midLine + "\n")
} else {
bottomLine := drawLine('╚', '╩', '╝', columns)
sb.WriteString(bottomLine + "\n")
}
}

return sb.String()
}
16 changes: 16 additions & 0 deletions backend/pkg/consapi/types/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package types

import (
"github.com/ethereum/go-ethereum/common/hexutil"
)

type ElectraDeposit struct {
Pubkey hexutil.Bytes `json:"pubkey" db:"pubkey"`
Amount uint64 `json:"amount,string" db:"amount"`
}

type ElectraConsolidation struct {
SourceValidatorIndex uint64 `json:"source_index,string" db:"source_index"`
TargetValidatorIndex uint64 `json:"target_index,string" db:"target_index"`
Amount uint64 `db:"amount"`
}
11 changes: 7 additions & 4 deletions backend/pkg/consapi/types/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import "github.com/ethereum/go-ethereum/common/hexutil"

type StandardBeaconHeaderResponse struct {
Data struct {
Root hexutil.Bytes `json:"root"`
Header struct {
Root hexutil.Bytes `json:"root"`
Canonical bool `json:"canonical"`
Header struct {
Message struct {
Slot uint64 `json:"slot,string"`
ProposerIndex uint64 `json:"proposer_index,string"`
Expand All @@ -21,8 +22,10 @@ type StandardBeaconHeaderResponse struct {

type StandardBeaconHeadersResponse struct {
Data []struct {
Root hexutil.Bytes `json:"root"`
Header struct {
// TODO: really dont have to duplicate this
Root hexutil.Bytes `json:"root"`
Canonical bool `json:"canonical"`
Header struct {
Message struct {
Slot uint64 `json:"slot,string"`
ProposerIndex uint64 `json:"proposer_index,string"`
Expand Down
Loading