Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beds 536/port v1 changes #1292

Open
wants to merge 1 commit into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/cmd/eth1indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func Run() {
return
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transforms := make([]db.TransformFunc, 0)
transforms = append(transforms,
bt.TransformBlock,
bt.TransformTx,
Expand Down
21 changes: 10 additions & 11 deletions backend/cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package misc
import (
"bytes"
"context"
"os"

"database/sql"
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"math"
"math/big"
"net/http"
"os"
"strconv"
"strings"
"sync"
Expand All @@ -23,6 +23,13 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/common"
"github.com/go-redis/redis/v8"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
go_ens "github.com/wealdtech/go-ens/v3"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"

"github.com/gobitfly/beaconchain/cmd/misc/commands"
"github.com/gobitfly/beaconchain/cmd/misc/misctypes"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
Expand All @@ -37,14 +44,6 @@ import (
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/gobitfly/beaconchain/pkg/exporter/services"
"github.com/gobitfly/beaconchain/pkg/notification"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
go_ens "github.com/wealdtech/go-ens/v3"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"

"flag"

"github.com/Gurpartap/storekit-go"
)
Expand Down Expand Up @@ -1607,7 +1606,7 @@ func indexOldEth1Blocks(startBlock uint64, endBlock uint64, batchSize uint64, co
return
}

transforms := make([]func(blk *types.Eth1Block, cache *freecache.Cache) (*types.BulkMutations, *types.BulkMutations, error), 0)
transforms := make([]db.TransformFunc, 0)

log.Infof("transformerFlag: %v", transformerFlag)
transformerList := strings.Split(transformerFlag, ",")
Expand Down
40 changes: 25 additions & 15 deletions backend/pkg/commons/db/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (

gcp_bigtable "cloud.google.com/go/bigtable"
"github.com/go-redis/redis/v8"
itypes "github.com/gobitfly/eth-rewards/types"

"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
itypes "github.com/gobitfly/eth-rewards/types"

"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
Expand Down Expand Up @@ -66,7 +67,7 @@ type Bigtable struct {

tableMachineMetrics *gcp_bigtable.Table

redisCache *redis.Client
redisCache RedisClient

LastAttestationCache map[uint64]uint64
LastAttestationCacheMux *sync.Mutex
Expand All @@ -78,7 +79,29 @@ type Bigtable struct {
machineMetricsQueuedWritesChan chan (types.BulkMutation)
}

type RedisClient interface {
SCard(ctx context.Context, key string) *redis.IntCmd
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
Pipeline() redis.Pipeliner
Get(ctx context.Context, key string) *redis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
}

func InitBigtable(project, instance, chainId, redisAddress string) (*Bigtable, error) {
rdc := redis.NewClient(&redis.Options{
Addr: redisAddress,
ReadTimeout: time.Second * 20,
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
if err := rdc.Ping(ctx).Err(); err != nil {
return nil, err
}

return InitBigtableWithCache(ctx, project, instance, chainId, rdc)
}

func InitBigtableWithCache(ctx context.Context, project, instance, chainId string, rdc RedisClient) (*Bigtable, error) {
if utils.Config.Bigtable.Emulator {
if utils.Config.Bigtable.EmulatorHost == "" {
utils.Config.Bigtable.EmulatorHost = "127.0.0.1"
Expand All @@ -90,26 +113,13 @@ func InitBigtable(project, instance, chainId, redisAddress string) (*Bigtable, e
log.Fatal(err, "unable to set bigtable emulator environment variable", 0)
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

poolSize := 50
btClient, err := gcp_bigtable.NewClient(ctx, project, instance, option.WithGRPCConnectionPool(poolSize))
// btClient, err := gcp_bigtable.NewClient(context.Background(), project, instance)

if err != nil {
return nil, err
}

rdc := redis.NewClient(&redis.Options{
Addr: redisAddress,
ReadTimeout: time.Second * 20,
})

if err := rdc.Ping(ctx).Err(); err != nil {
return nil, err
}

bt := &Bigtable{
client: btClient,
tableData: btClient.Open("data"),
Expand Down
48 changes: 41 additions & 7 deletions backend/pkg/commons/db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"math/big"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -23,8 +24,6 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"

"strconv"

gcp_bigtable "cloud.google.com/go/bigtable"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -711,7 +710,9 @@ func TimestampToBigtableTimeDesc(ts time.Time) string {
return fmt.Sprintf("%04d%02d%02d%02d%02d%02d", 9999-ts.Year(), 12-ts.Month(), 31-ts.Day(), 23-ts.Hour(), 59-ts.Minute(), 59-ts.Second())
}

func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error), concurrency int64, cache *freecache.Cache) error {
type TransformFunc func(blk *types.Eth1Block, cache *freecache.Cache) (bulkData *types.BulkMutations, bulkMetadataUpdates *types.BulkMutations, err error)

func (bigtable *Bigtable) IndexEventsWithTransformers(start, end int64, transforms []TransformFunc, concurrency int64, cache *freecache.Cache) error {
g := new(errgroup.Group)
g.SetLimit(int(concurrency))

Expand Down Expand Up @@ -1048,8 +1049,19 @@ func (bigtable *Bigtable) TransformTx(blk *types.Eth1Block, cache *freecache.Cac
BlobTxFee: blobFee,
BlobGasPrice: tx.GetBlobGasPrice(),
IsContractCreation: isContract,
ErrorMsg: tx.GetErrorMsg(),
ErrorMsg: "",
Status: types.StatusType(tx.Status),
}
for _, itx := range tx.Itx {
if itx.ErrorMsg != "" {
indexedTx.ErrorMsg = itx.ErrorMsg
if indexedTx.Status == types.StatusType_SUCCESS {
indexedTx.Status = types.StatusType_PARTIAL
}
break
}
}

// Mark Sender and Recipient for balance update
bigtable.markBalanceUpdate(indexedTx.From, []byte{0x0}, bulkMetadataUpdates, cache)
bigtable.markBalanceUpdate(indexedTx.To, []byte{0x0}, bulkMetadataUpdates, cache)
Expand Down Expand Up @@ -1135,9 +1147,16 @@ func (bigtable *Bigtable) TransformBlobTx(blk *types.Eth1Block, cache *freecache
GasPrice: tx.GetGasPrice(),
BlobTxFee: blobFee,
BlobGasPrice: tx.GetBlobGasPrice(),
ErrorMsg: tx.GetErrorMsg(),
ErrorMsg: "",
BlobVersionedHashes: tx.GetBlobVersionedHashes(),
}
for _, itx := range tx.Itx {
if itx.ErrorMsg != "" {
indexedTx.ErrorMsg = itx.ErrorMsg
break
}
}

// Mark Sender and Recipient for balance update
bigtable.markBalanceUpdate(indexedTx.From, []byte{0x0}, bulkMetadataUpdates, cache)
bigtable.markBalanceUpdate(indexedTx.To, []byte{0x0}, bulkMetadataUpdates, cache)
Expand Down Expand Up @@ -1240,7 +1259,7 @@ func (bigtable *Bigtable) TransformContract(blk *types.Eth1Block, cache *freecac
contractUpdate := &types.IsContractUpdate{
IsContract: itx.GetType() == "create",
// also use success status of enclosing transaction, as even successful sub-calls can still be reverted later in the tx
Success: itx.GetErrorMsg() == "" && tx.GetErrorMsg() == "",
Success: itx.GetErrorMsg() == "" && tx.GetStatus() == 1,
}
b, err := proto.Marshal(contractUpdate)
if err != nil {
Expand Down Expand Up @@ -1303,12 +1322,26 @@ func (bigtable *Bigtable) TransformItx(blk *types.Eth1Block, cache *freecache.Ca
}
iReversed := reversePaddedIndex(i, TX_PER_BLOCK_LIMIT)

var revertSource string
for j, itx := range tx.GetItx() {
if j >= ITX_PER_TX_LIMIT {
if j > ITX_PER_TX_LIMIT {
return nil, nil, fmt.Errorf("unexpected number of internal transactions in block expected at most %d but got: %v, tx: %x", ITX_PER_TX_LIMIT, j, tx.GetHash())
}
jReversed := reversePaddedIndex(j, ITX_PER_TX_LIMIT)

// check for error before skipping, otherwise we loose track of cascading reverts
var reverted bool
if itx.ErrorMsg != "" {
reverted = true
// only save the highest root revert
if revertSource == "" || !strings.HasPrefix(itx.Path, revertSource) {
revertSource = strings.TrimSuffix(itx.Path, "]")
}
}
if revertSource != "" && strings.HasPrefix(itx.Path, revertSource) {
reverted = true
}

if itx.Path == "[]" || bytes.Equal(itx.Value, []byte{0x0}) { // skip top level and empty calls
continue
}
Expand All @@ -1322,6 +1355,7 @@ func (bigtable *Bigtable) TransformItx(blk *types.Eth1Block, cache *freecache.Ca
From: itx.GetFrom(),
To: itx.GetTo(),
Value: itx.GetValue(),
Reverted: reverted,
}

bigtable.markBalanceUpdate(indexedItx.To, []byte{0x0}, bulkMetadataUpdates, cache)
Expand Down
4 changes: 3 additions & 1 deletion backend/pkg/commons/db/bigtable_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
gcp_bigtable "cloud.google.com/go/bigtable"
)

var ErrTableAlreadyExist = fmt.Errorf("aborting bigtable schema init as tables are already present")

func InitBigtableSchema() error {
tables := make(map[string]map[string]gcp_bigtable.GCPolicy)

Expand Down Expand Up @@ -74,7 +76,7 @@ func InitBigtableSchema() error {
}

if len(existingTables) > 0 {
return fmt.Errorf("aborting bigtable schema init as tables are already present")
return ErrTableAlreadyExist
}

for name, definition := range tables {
Expand Down
14 changes: 13 additions & 1 deletion backend/pkg/commons/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"

"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"

Expand All @@ -32,14 +33,25 @@ import (
"github.com/jackc/pgx/v5/stdlib"
)

type SQLReaderDb interface {
Close() error
Get(dest interface{}, query string, args ...interface{}) error
GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Select(dest interface{}, query string, args ...interface{}) error
SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Query(query string, args ...any) (*sql.Rows, error)
Preparex(query string) (*sqlx.Stmt, error)
Rebind(query string) string
}

//go:embed migrations/*/*.sql
var EmbedMigrations embed.FS

var DBPGX *pgxpool.Conn

// DB is a pointer to the explorer-database
var WriterDb *sqlx.DB
var ReaderDb *sqlx.DB
var ReaderDb SQLReaderDb

var UserReader *sqlx.DB
var UserWriter *sqlx.DB
Expand Down
6 changes: 3 additions & 3 deletions backend/pkg/commons/db/ens.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (bigtable *Bigtable) TransformEnsNameRegistered(blk *types.Eth1Block, cache
metrics.TaskDuration.WithLabelValues("bt_transform_ens").Observe(time.Since(startTime).Seconds())
}()

bulkData = &types.BulkMutations{}
bulkMetadataUpdates = &types.BulkMutations{}
var ensCrontractAddresses map[string]string
switch bigtable.chainId {
case "1":
Expand All @@ -90,11 +92,9 @@ func (bigtable *Bigtable) TransformEnsNameRegistered(blk *types.Eth1Block, cache
case "11155111":
ensCrontractAddresses = ensContracts.ENSCrontractAddressesSepolia
default:
return nil, nil, nil
return bulkData, bulkMetadataUpdates, nil
}

bulkData = &types.BulkMutations{}
bulkMetadataUpdates = &types.BulkMutations{}
keys := make(map[string]bool)
ethLog := gethtypes.Log{}

Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/commons/db/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ func WriteExecutionChartSeriesForDay(day int64) error {
totalBlobCount = totalBlobCount.Add(decimal.NewFromInt(int64(len(tx.BlobVersionedHashes))))

default:
log.Fatal(fmt.Errorf("error unknown tx type %v hash: %x", tx.Status, tx.Hash), "", 0)
log.Fatal(fmt.Errorf("error unknown tx type %v hash: %x", tx.Type, tx.Hash), "", 0)
}
totalTxFees = totalTxFees.Add(txFees)

Expand All @@ -1611,7 +1611,7 @@ func WriteExecutionChartSeriesForDay(day int64) error {
failedTxCount += 1
totalFailedGasUsed = totalFailedGasUsed.Add(gasUsed)
totalFailedTxFee = totalFailedTxFee.Add(txFees)
case 1:
case 1, 2:
successTxCount += 1
default:
log.Fatal(fmt.Errorf("error unknown status code %v hash: %x", tx.Status, tx.Hash), "", 0)
Expand Down
Loading
Loading