-
-
Notifications
You must be signed in to change notification settings - Fork 100
/
Copy pathethsync.py
157 lines (136 loc) · 6.01 KB
/
ethsync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# Indexer for Ethereum to get transaction list by ETH address
# https://github.com/Adamant-im/ETH-transactions-storage
# Contributors:
# v2.4.1
# 2022-2024 ADAMANT Foundation ([email protected]), @twhitehead00, Tyvan Cheng ([email protected])
# 2021-2022 ADAMANT Foundation ([email protected]), Francesco Bonanno ([email protected]),
# Guénolé de Cadoudal ([email protected]), Drew Wells ([email protected])
# 2020-2021 ADAMANT Foundation ([email protected]): Aleksei Lebedev
# 2017-2020 ADAMANT TECH LABS LP ([email protected]): Artem Brunov, Aleksei Lebedev
from os import environ
from web3 import Web3
from web3.middleware import geth_poa_middleware
import psycopg2
import time
import sys
import logging
#from systemd.journal import JournalHandler
# Get env variables or set to default
dbname = environ.get("DB_NAME")
startBlock = environ.get("START_BLOCK") or "1"
confirmationBlocks = environ.get("CONFIRMATIONS_BLOCK") or "0"
nodeUrl = environ.get("ETH_URL")
pollingPeriod = environ.get("PERIOD") or "20"
logFile = environ.get("LOG_FILE")
if dbname == None:
print('Add postgre database in env var DB_NAME')
exit(2)
if nodeUrl == None:
print('Add eth url in env var ETH_URL')
exit(2)
# Connect to Ethereum node
if nodeUrl.startswith("http"):
web3 = Web3(Web3.HTTPProvider(nodeUrl)) # "http://publicnode:8545"
elif nodeUrl.startswith("ws"):
web3 = Web3(Web3.WebsocketProvider(nodeUrl)) # "ws://publicnode:8546"
else:
web3 = Web3(Web3.IPCProvider(nodeUrl)) # "/home/geth/.ethereum/geth.ipc"
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
# Start logger
#logger = logging.getLogger("EthIndexerLog")
logger = logging.getLogger("eth-sync")
logger.setLevel(logging.INFO)
# File logger
if logFile == None:
lfh = logging.StreamHandler()
else:
lfh = logging.FileHandler(logFile)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
lfh.setFormatter(formatter)
logger.addHandler(lfh)
# Systemd logger, if we want to user journalctl logs
# Install systemd-python and
# decomment "#from systemd.journal import JournalHandler" up
#ljc = JournalHandler()
#formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
#ljc.setFormatter(formatter)
#logger.addHandler(ljc)
try:
logger.info("Trying to connect to " + dbname + " database…")
conn = psycopg2.connect(database=dbname)
conn.autocommit = True
logger.info("Connected to the database")
except:
logger.error("Unable to connect to database")
exit(1)
# Delete last block as it may be not imported in full
cur = conn.cursor()
cur.execute('DELETE FROM public.ethtxs WHERE block = (SELECT Max(block) from public.ethtxs)')
cur.close()
conn.close()
# Wait for the node to be in sync before indexing
while web3.eth.syncing != False:
# Change with the time, in second, do you want to wait
# before checking again, default is 5 minutes
logger.info("Waiting Ethereum node to be in sync…")
time.sleep(300)
logger.info("Ethereum node is synced.")
# Adds all transactions from Ethereum block
def insertTxsFromBlock(block):
blockid = block['number']
time = block['timestamp']
for txNumber in range(0, len(block.transactions)):
trans = block.transactions[txNumber]
transReceipt = web3.eth.get_transaction_receipt(trans['hash'])
# Save also transaction status, should be null if pre byzantium blocks
# status = bool(transReceipt['status'])
txhash = trans['hash'].hex()
value = trans['value']
inputinfo = trans['input']
# Check if transaction is a contract transfer
if (value == 0 and not inputinfo.hex().startswith('0xa9059cbb')):
continue
fr = trans['from']
to = trans['to']
gasprice = trans['gasPrice']
gas = transReceipt['gasUsed']
contract_to = ''
contract_value = ''
# Check if transaction is a contract transfer
if inputinfo.hex().startswith('0xa9059cbb'):
contract_to = inputinfo.hex()[10:-64]
contract_value = inputinfo.hex()[74:]
# Correct contract transfer transaction represents '0x' + 4 bytes 'a9059cbb' + 32 bytes (64 chars) for contract address and 32 bytes for its value
# Some buggy txs can break up Indexer, so we'll filter it
if len(contract_to) > 128:
logger.info('Skipping ' + str(txhash) + ' tx. Incorrect contract_to length: ' + str(len(contract_to)))
contract_to = ''
contract_value = ''
cur.execute(
'INSERT INTO public.ethtxs(time, txfrom, txto, value, gas, gasprice, block, txhash, contract_to, contract_value) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)',
(time, fr, to, value, gas, gasprice, blockid, txhash, contract_to, contract_value))
# Fetch all of new (not in index) Ethereum blocks and add transactions to index
while True:
try:
conn = psycopg2.connect(database=dbname)
conn.autocommit = True
except:
logger.error("Unable to connect to database")
cur = conn.cursor()
cur.execute('SELECT Max(block) from public.ethtxs')
maxblockindb = cur.fetchone()[0]
# On first start, we index transactions from a block number you indicate
if maxblockindb is None:
maxblockindb = int(startBlock)
endblock = int(web3.eth.block_number) - int(confirmationBlocks)
logger.info('Current best block in index: ' + str(maxblockindb) + '; in Ethereum chain: ' + str(endblock))
for blockHeight in range(maxblockindb + 1, endblock):
block = web3.eth.get_block(blockHeight, True)
if len(block.transactions) > 0:
insertTxsFromBlock(block)
logger.info('Block ' + str(blockHeight) + ' with ' + str(len(block.transactions)) + ' transactions is processed')
else:
logger.info('Block ' + str(blockHeight) + ' does not contain transactions')
cur.close()
conn.close()
time.sleep(int(pollingPeriod))