Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(token price): refactor token price #1319

Draft
wants to merge 1 commit into
base: NOBIDS/refactor-balance
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 29 additions & 104 deletions backend/cmd/eth1indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/executionlayer/evm"

"github.com/coocood/freecache"
"github.com/ethereum/go-ethereum/common"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"

//nolint:gosec
Expand Down Expand Up @@ -162,13 +160,32 @@ func Run() {
}
defer bt.Close()

bigtable, err := database.NewBigTable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, nil)
if err != nil {
log.Fatal(err, "error connecting to bigtable", 0)
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
metadataUpdatesStore := metadataupdates.NewStore(database.Wrap(bigtable, metadataupdates.Table), cache)
dataStore := data.NewStore(database.Wrap(bigtable, data.Table))
metadataStore := metadata.NewStore(database.Wrap(bigtable, metadata.Table))

if *tokenPriceExport {
go func() {
for {
err = UpdateTokenPrices(bt, client, *tokenPriceExportList)
tokenList, err := readTokenListFile(*tokenPriceExportList)
if err != nil {
log.Error(err, "error while updating token prices", 0)
time.Sleep(*tokenPriceExportFrequency)
log.Error(err, "error reading token list file", 0)
}
pricer := executionlayer.NewTokenPricer(
metadataStore,
chainId,
executionlayer.NewLlamaClient(),
tokenList,
evm.NewBatcher(chainId, client.GetNativeClient()),
)
if err := pricer.UpdateTokens(); err != nil {
log.Error(err, "error updating tokens", 0)
}
time.Sleep(*tokenPriceExportFrequency)
}
Expand All @@ -179,16 +196,6 @@ func Run() {
go ImportEnsUpdatesLoop(bt, client, *ensBatchSize)
}

bigtable, err := database.NewBigTable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, nil)
if err != nil {
log.Fatal(err, "error connecting to bigtable", 0)
}

cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
metadataUpdatesStore := metadataupdates.NewStore(database.Wrap(bigtable, metadataupdates.Table), cache)
dataStore := data.NewStore(database.Wrap(bigtable, data.Table))
metadataStore := metadata.NewStore(database.Wrap(bigtable, metadata.Table))

indexer := executionlayer.NewIndexer(
executionlayer.NewAdaptorV1(
dataStore,
Expand Down Expand Up @@ -394,98 +401,16 @@ func ImportEnsUpdatesLoop(bt *db.Bigtable, client *rpc.ErigonClient, batchSize i
}
}

func UpdateTokenPrices(bt *db.Bigtable, client *rpc.ErigonClient, tokenListPath string) error {
tokenListContent, err := os.ReadFile(tokenListPath)
func readTokenListFile(path string) (erc20.ERC20TokenList, error) {
tokenListContent, err := os.ReadFile(path)
if err != nil {
return err
}

tokenList := &erc20.ERC20TokenList{}

err = json.Unmarshal(tokenListContent, tokenList)
if err != nil {
return err
return erc20.ERC20TokenList{}, err
}

type defillamaPriceRequest struct {
Coins []string `json:"coins"`
}
coinsList := make([]string, 0, len(tokenList.Tokens))
for _, token := range tokenList.Tokens {
coinsList = append(coinsList, "ethereum:"+token.Address)
var tokenList erc20.ERC20TokenList
if err := json.Unmarshal(tokenListContent, &tokenList); err != nil {
return erc20.ERC20TokenList{}, err
}

req := &defillamaPriceRequest{
Coins: coinsList,
}

reqEncoded, err := json.Marshal(req)
if err != nil {
return err
}

httpClient := &http.Client{Timeout: time.Second * 10}

resp, err := httpClient.Post("https://coins.llama.fi/prices", "application/json", bytes.NewReader(reqEncoded))
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("error querying defillama api: %v", resp.Status)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

type defillamaCoin struct {
Decimals int64 `json:"decimals"`
Price *decimal.Decimal `json:"price"`
Symbol string `json:"symbol"`
Timestamp int64 `json:"timestamp"`
}

type defillamaResponse struct {
Coins map[string]defillamaCoin `json:"coins"`
}

respParsed := &defillamaResponse{}
err = json.Unmarshal(body, respParsed)
if err != nil {
return err
}

tokenPrices := make([]*types.ERC20TokenPrice, 0, len(respParsed.Coins))
for address, data := range respParsed.Coins {
tokenPrices = append(tokenPrices, &types.ERC20TokenPrice{
Token: common.FromHex(strings.TrimPrefix(address, "ethereum:0x")),
Price: []byte(data.Price.String()),
})
}

g := new(errgroup.Group)
g.SetLimit(20)
for i := range tokenPrices {
i := i
g.Go(func() error {
metadata, err := client.GetERC20TokenMetadata(tokenPrices[i].Token)
if err != nil {
return err
}
tokenPrices[i].TotalSupply = metadata.TotalSupply
// log.LogInfo("price for token %x is %s @ %v", tokenPrices[i].Token, tokenPrices[i].Price, new(big.Int).SetBytes(tokenPrices[i].TotalSupply))
return nil
})
}
err = g.Wait()
if err != nil {
return err
}

return bt.SaveERC20TokenPrices(tokenPrices)
return tokenList, nil
}

func HandleChainReorgs(bt *db.Bigtable, client *rpc.ErigonClient, depth int) error {
Expand Down
41 changes: 0 additions & 41 deletions backend/pkg/commons/db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,47 +2339,6 @@ func (bigtable *Bigtable) SaveContractMetadata(address []byte, metadata *types.C
return bigtable.tableMetadata.Apply(ctx, fmt.Sprintf("%s:%x", bigtable.chainId, address), mut)
}

func (bigtable *Bigtable) SaveERC20TokenPrices(prices []*types.ERC20TokenPrice) error {
if len(prices) == 0 {
return nil
}

mutsWrite := &types.BulkMutations{
Keys: make([]string, 0, len(prices)),
Muts: make([]*gcp_bigtable.Mutation, 0, len(prices)),
}

for _, price := range prices {
rowKey := fmt.Sprintf("%s:%x", bigtable.chainId, price.Token)
mut := gcp_bigtable.NewMutation()
mut.Set(ERC20_METADATA_FAMILY, ERC20_COLUMN_PRICE, gcp_bigtable.Timestamp(0), price.Price)
mut.Set(ERC20_METADATA_FAMILY, ERC20_COLUMN_TOTALSUPPLY, gcp_bigtable.Timestamp(0), price.TotalSupply)
mutsWrite.Keys = append(mutsWrite.Keys, rowKey)
mutsWrite.Muts = append(mutsWrite.Muts, mut)
}

err := bigtable.WriteBulk(mutsWrite, bigtable.tableMetadata, DEFAULT_BATCH_INSERTS)

if err != nil {
return err
}

return nil
}

func (bigtable *Bigtable) SaveBlockKeys(blockNumber uint64, blockHash []byte, keys string) error {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
defer cancel()

mut := gcp_bigtable.NewMutation()
mut.Set(METADATA_UPDATES_FAMILY_BLOCKS, "keys", gcp_bigtable.Now(), []byte(keys))

key := fmt.Sprintf("%s:BLOCK:%s:%x", bigtable.chainId, reversedPaddedBlockNumber(blockNumber), blockHash)
err := bigtable.tableMetadataUpdates.Apply(ctx, key, mut)

return err
}

func (bigtable *Bigtable) GetBlockKeys(blockNumber uint64, blockHash []byte) ([]string, error) {
tmr := time.AfterFunc(REPORT_TIMEOUT, func() {
log.WarnWithFields(log.Fields{
Expand Down
34 changes: 34 additions & 0 deletions backend/pkg/commons/db2/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"

"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
"github.com/gobitfly/beaconchain/pkg/commons/db2/metadataupdates"
"github.com/gobitfly/beaconchain/pkg/commons/types"
)

type Balance struct {
Expand Down Expand Up @@ -34,3 +37,34 @@ func (store Store) UpdateBalance(chainID string, balances []Balance) error {
}
return store.db.BulkAdd(updates)
}

func (store Store) UpdateToken(chainID string, tokens []*types.ERC20TokenPrice) error {
updates := make(map[string][]database.Item)
for _, token := range tokens {
key := fmt.Sprintf("%s:%x", chainID, token.Token)
updates[key] = append(updates[key], database.Item{
Family: erc20MetadataFamily,
Column: erc20ColumnPrice,
Data: token.Price,
})
updates[key] = append(updates[key], database.Item{
Family: erc20MetadataFamily,
Column: erc20ColumnTotalSupply,
Data: token.TotalSupply,
})
}
return store.db.BulkAdd(updates)
}

func (store Store) TokenPrice(chainID string, token common.Address) (*types.ERC20TokenPrice, error) {
key := fmt.Sprintf("%s:%x", chainID, token.Bytes())
row, err := store.db.GetRow(key)
if err != nil {
return nil, err
}
return &types.ERC20TokenPrice{
Token: token.Bytes(),
Price: row.Values[fmt.Sprintf("%s:%s", erc20MetadataFamily, erc20ColumnPrice)],
TotalSupply: row.Values[fmt.Sprintf("%s:%s", erc20MetadataFamily, erc20ColumnTotalSupply)],
}, nil
}
6 changes: 5 additions & 1 deletion backend/pkg/commons/db2/metadata/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ const Table = "metadata"
var Schema = map[string][]string{
Table: {
accountFamily,
erc20MetadataFamily,
},
}

const (
accountFamily = "a"
accountFamily = "a"
erc20MetadataFamily = "erc20"
erc20ColumnPrice = "PRICE"
erc20ColumnTotalSupply = "TOTALSUPPLY"
)
2 changes: 1 addition & 1 deletion backend/pkg/commons/erc20/erc20.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type ERC20TokenList struct {
type ERC20TokenDetail struct {
Address string `json:"address"`
Owner string `json:"-"`
ChainID int64 `json:"chainId"`
ChainID int64 `json:"chainId"` // not used
Decimals int64 `json:"decimals"`
Name string `json:"name"`
Symbol string `json:"symbol"`
Expand Down
21 changes: 21 additions & 0 deletions backend/pkg/executionlayer/evm/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,27 @@ func (m RPCBatch) Batch(elements []BatchElement) ([]BatchResponse, error) {
return res, nil
}

func ERC20Supply(batcher Batcher, addresses []common.Address) ([]*big.Int, error) {
var elements []BatchElement
for _, address := range addresses {
input, _ := erc20.ABI.Pack("totalSupply")
elements = append(elements, BatchElement{
Method: "eth_call",
To: &address,
Data: input,
})
}
results, err := batcher.Batch(elements)
if err != nil {
return nil, err
}
var supplies []*big.Int
for _, result := range results {
supplies = append(supplies, new(big.Int).SetBytes(result.Data))
}
return supplies, nil
}

func Balance(batcher Batcher, pairs []metadataupdates.Pair) ([]*big.Int, error) {
var elements []BatchElement
for _, pair := range pairs {
Expand Down
18 changes: 18 additions & 0 deletions backend/pkg/executionlayer/evm/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ func TestBatcher(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Run("ERC20Supply", func(t *testing.T) {
supply, err := token.TotalSupply(nil)
if err != nil {
t.Fatal(err)
}
if supply.Cmp(big.NewInt(0)) == 0 {
t.Fatal("expected supply cannot be zero")
}

res, err := ERC20Supply(tt.batcher, []common.Address{tokenAddress, {}})
if err != nil {
t.Fatal(err)
}
if got, want := res[0].String(), supply.String(); got != want {
t.Errorf("got %v, want %v", got, want)
}
})

t.Run("Balance", func(t *testing.T) {
t.Run("erc20", func(t *testing.T) {
balance, err := token.BalanceOf(nil, b.BankAccount.From)
Expand Down
Loading