Skip to content

Commit

Permalink
Operate get blocks (#20)
Browse files Browse the repository at this point in the history
* operate get blocks

* fix style error

* only get virtual ops
  • Loading branch information
ety001 authored Oct 12, 2024
1 parent bcf220d commit 9ef4ef2
Showing 1 changed file with 36 additions and 134 deletions.
170 changes: 36 additions & 134 deletions docker/sync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from datetime import datetime, timedelta
from steem import Steem
from pymongo import MongoClient
from pprint import pprint
import requests
import logging

Expand All @@ -25,10 +24,10 @@
batch_size = config.get('batch_size', 50)

if not mongodb_url:
print(log_tag + 'NEED MONGODB')
print(f"{log_tag}NEED MONGODB")
exit()

print(log_tag + 'mongo url: %s' % mongodb_url)
print(f"{log_tag}mongo url: {mongodb_url}")

# Initialize Steem and MongoDB connections
fullnodes = [steemd_url]
Expand All @@ -40,15 +39,7 @@
init = db.status.find_one({'_id': 'height'})
last_block = init['value'] if init else (last_block_env or 1)

process_block_time = 0
get_ops_in_block_time = 0
save_block_time = 0
process_op_time = []
get_block_time = 0

def process_op(op_obj, block, blockid):
global process_op_time
process_op_start_time = time.perf_counter()
op_type = op_obj[0]
op = op_obj[1]
try:
Expand Down Expand Up @@ -85,22 +76,19 @@ def process_op(op_obj, block, blockid):
print(error_message)
logging.error(error_message)
sys.exit(1) # Stop the script
finally:
process_op_time.append(time.perf_counter() - process_op_start_time)

def process_block(block, blockid):
global get_ops_in_block_time, save_block_time, process_op_time

try:
save_block_start_time = time.perf_counter()
save_block(block, blockid)
save_block_time = time.perf_counter() - save_block_start_time
print(f"{log_tag}[TEST Time] Save Block Time [{save_block_time}]")

get_ops_in_block_start_time = time.perf_counter()
ops = rpc.get_ops_in_block(blockid, False)
ops = rpc.get_ops_in_block(blockid, True)
get_ops_in_block_time = time.perf_counter() - get_ops_in_block_start_time
print(f"{log_tag}[TEST Time] Get Ops in Block Time [{get_ops_in_block_time}]")

process_op_time = []
for tx in block['transactions']:
for op_obj in tx['operations']:
process_op(op_obj, block, blockid)
Expand Down Expand Up @@ -380,7 +368,7 @@ def update_comment_options(op, block, blockid):
mvest_per_account = {}

def load_accounts():
pprint(log_tag + "[STEEM] - Loading all accounts")
print(f"{log_tag}[STEEM] - Loading all accounts")
for account in db.account.find():
if 'vesting_shares' in account:
mvest_per_account.update({account['name']: account['vesting_shares']})
Expand Down Expand Up @@ -420,10 +408,12 @@ def update_queue():
'created': {'$gt': max_date},
'scanned': {'$lt': scan_ignore},
}).sort('scanned', 1).limit(queue_length)
pprint(log_tag + "[Queue] Comments - " + str(queue_length) + " of " + str(db.comment.count_documents({
total_comments = db.comment.count_documents({
'created': {'$gt': max_date},
'scanned': {'$lt': scan_ignore}
})))
})
print(f"{log_tag}[Queue] Comments - {queue_length} of {total_comments}")

for item in queue:
update_comment(item['author'], item['permlink'])

Expand All @@ -433,21 +423,25 @@ def update_queue():
'depth': 0,
'pending_payout_value': {'$gt': 0}
}).limit(queue_length)
pprint(log_tag + "[Queue] Past Payouts - " + str(queue_length) + " of " + str(db.comment.count_documents({

total_past_payouts = db.comment.count_documents({
'cashout_time': {'$lt': datetime.now()},
'mode': {'$in': ['first_payout', 'second_payout']},
'depth': 0,
'pending_payout_value': {'$gt': 0}
})))
})
print(f"{log_tag}[Queue] Past Payouts - {queue_length} of {total_past_payouts}")

for item in queue:
update_comment(item['author'], item['permlink'])

queue_length = 20
queue = db.account.find({'_dirty': True}).limit(queue_length)
pprint(log_tag + "[Queue] Updating Accounts - " + str(queue_length) + " of " + str(db.account.count_documents({'_dirty': True})))
total_accounts = db.account.count_documents({'_dirty': True})
print(f"{log_tag}[Queue] Updating Accounts - {queue_length} of {total_accounts}")
for item in queue:
update_account(item['_id'])
pprint(log_tag + "[Queue] Done")
print(f"{log_tag}[Queue] Done")

def fetch_blocks_in_batch(start_block, end_block):
requests_data = [
Expand Down Expand Up @@ -483,7 +477,7 @@ def fetch_block(block_num):
return None

if __name__ == '__main__':
pprint(log_tag + "[STEEM] - Starting SteemDB Sync Service")
print(f"{log_tag}[STEEM] - Starting SteemDB Sync Service")
sys.stdout.flush()
config = rpc.get_config()
block_interval = config["STEEM_BLOCK_INTERVAL"]
Expand All @@ -496,118 +490,26 @@ def fetch_block(block_num):
block_number = props['last_irreversible_block_num']

while (block_number - last_block) > 0:
total_start_time = time.perf_counter()
end_block = min(last_block + batch_size, block_number)
blocks = fetch_blocks_in_batch(last_block + 1, end_block)
if end_block <= last_block:
break
blocks = rpc.get_blocks_range(last_block + 1, end_block)

for block_response in blocks:
if 'result' in block_response:
block = block_response['result']
last_block += 1
total_start_time = time.perf_counter()
pprint(log_tag + "[STEEM] - Starting Block #" + str(last_block))
flush_start_time1 = time.perf_counter()
sys.stdout.flush()
flush_time1 = time.perf_counter() - flush_start_time1

get_block_start_time = time.perf_counter()
get_block_time = time.perf_counter() - get_block_start_time

process_block_start_time = time.perf_counter()
process_block(block, last_block)
process_block_time = time.perf_counter() - process_block_start_time

db.status.update_one({'_id': 'height'}, {"$set": {'value': last_block}}, upsert=True)
pprint(log_tag + "[STEEM] - Processed up to Block #" + str(last_block))

flush_start_time2 = time.perf_counter()
sys.stdout.flush()
flush_time2 = time.perf_counter() - flush_start_time2

total_time = time.perf_counter() - total_start_time
print(log_tag + '[TEST Time] Total time: [%f], \
get_block time: [%f, %s%%], \
process_block time: [%f, %s%%], \
save_block time: [%f, %s%%], \
get_ops_in_block time: [%f, %s%%], \
process ops time: [%s], \
flush_time1: [%f, %s%%], \
flush_time2: [%f, %s%%]'
% (
total_time,
get_block_time,
str(get_block_time / total_time * 100),
process_block_time,
str(process_block_time / total_time * 100),
save_block_time,
str(save_block_time / total_time * 100),
get_ops_in_block_time,
str(get_ops_in_block_time / total_time * 100),
str(process_op_time),
flush_time1,
str(flush_time1 / total_time * 100),
flush_time2,
str(flush_time2 / total_time * 100)
))
for block in blocks:
last_block = block['block_num']
print(f"{log_tag}[STEEM] - Starting Block #{last_block}")
sys.stdout.flush()

process_block(block, last_block)
db.status.update_one({'_id': 'height'}, {"$set": {'value': last_block}}, upsert=True)
print(f"{log_tag}[STEEM] - Processed up to Block #{last_block}")
sys.stdout.flush()

total_time = time.perf_counter() - total_start_time
print(f"{log_tag}[TEST Time] Batch Process Time: [{total_time}]")

sys.stdout.flush()
print(log_tag + '[TEST Time]global process time [%f]' % (time.perf_counter() - global_process_start_time))

# Check if we are at the head block
if block_number == last_block:
# Fetch the next block and retry on failure
while True:
block_response = fetch_block(last_block + 1)
if block_response and 'result' in block_response:
block = block_response['result']
last_block += 1
total_start_time = time.perf_counter()
pprint(log_tag + "[STEEM] - Starting Block #" + str(last_block))
flush_start_time1 = time.perf_counter()
sys.stdout.flush()
flush_time1 = time.perf_counter() - flush_start_time1

get_block_start_time = time.perf_counter()
get_block_time = time.perf_counter() - get_block_start_time

process_block_start_time = time.perf_counter()
process_block(block, last_block)
process_block_time = time.perf_counter() - process_block_start_time

db.status.update_one({'_id': 'height'}, {"$set": {'value': last_block}}, upsert=True)
pprint(log_tag + "[STEEM] - Processed up to Block #" + str(last_block))

flush_start_time2 = time.perf_counter()
sys.stdout.flush()
flush_time2 = time.perf_counter() - flush_start_time2

total_time = time.perf_counter() - total_start_time
print(log_tag + '[TEST Time] Total time: [%f], \
get_block time: [%f, %s%%], \
process_block time: [%f, %s%%], \
save_block time: [%f, %s%%], \
get_ops_in_block time: [%f, %s%%], \
process ops time: [%s], \
flush_time1: [%f, %s%%], \
flush_time2: [%f, %s%%]'
% (
total_time,
get_block_time,
str(get_block_time / total_time * 100),
process_block_time,
str(process_block_time / total_time * 100),
save_block_time,
str(save_block_time / total_time * 100),
get_ops_in_block_time,
str(get_ops_in_block_time / total_time * 100),
str(process_op_time),
flush_time1,
str(flush_time1 / total_time * 100),
flush_time2,
str(flush_time2 / total_time * 100)
))
break
else:
print(log_tag + "Retrying to fetch block #" + str(last_block + 1))
time.sleep(3) # Retry after 3 seconds
print(f"{log_tag}[TEST Time] Global Process Time [{time.perf_counter() - global_process_start_time}]")

time.sleep(block_interval)

0 comments on commit 9ef4ef2

Please sign in to comment.