From 9ef4ef2b1970722012cf09864f72de1fb3244a0e Mon Sep 17 00:00:00 2001 From: ety001 Date: Sun, 13 Oct 2024 05:26:53 +0800 Subject: [PATCH] Operate get blocks (#20) * operate get blocks * fix style error * only get virtual ops --- docker/sync/sync.py | 170 ++++++++++---------------------------------- 1 file changed, 36 insertions(+), 134 deletions(-) diff --git a/docker/sync/sync.py b/docker/sync/sync.py index 68d8c95..d46cbab 100644 --- a/docker/sync/sync.py +++ b/docker/sync/sync.py @@ -5,7 +5,6 @@ from datetime import datetime, timedelta from steem import Steem from pymongo import MongoClient -from pprint import pprint import requests import logging @@ -25,10 +24,10 @@ batch_size = config.get('batch_size', 50) if not mongodb_url: - print(log_tag + 'NEED MONGODB') + print(f"{log_tag}NEED MONGODB") exit() -print(log_tag + 'mongo url: %s' % mongodb_url) +print(f"{log_tag}mongo url: {mongodb_url}") # Initialize Steem and MongoDB connections fullnodes = [steemd_url] @@ -40,15 +39,7 @@ init = db.status.find_one({'_id': 'height'}) last_block = init['value'] if init else (last_block_env or 1) -process_block_time = 0 -get_ops_in_block_time = 0 -save_block_time = 0 -process_op_time = [] -get_block_time = 0 - def process_op(op_obj, block, blockid): - global process_op_time - process_op_start_time = time.perf_counter() op_type = op_obj[0] op = op_obj[1] try: @@ -85,22 +76,19 @@ def process_op(op_obj, block, blockid): print(error_message) logging.error(error_message) sys.exit(1) # Stop the script - finally: - process_op_time.append(time.perf_counter() - process_op_start_time) def process_block(block, blockid): - global get_ops_in_block_time, save_block_time, process_op_time - try: save_block_start_time = time.perf_counter() save_block(block, blockid) save_block_time = time.perf_counter() - save_block_start_time + print(f"{log_tag}[TEST Time] Save Block Time [{save_block_time}]") get_ops_in_block_start_time = time.perf_counter() - ops = rpc.get_ops_in_block(blockid, False) + ops = rpc.get_ops_in_block(blockid, True) get_ops_in_block_time = time.perf_counter() - get_ops_in_block_start_time + print(f"{log_tag}[TEST Time] Get Ops in Block Time [{get_ops_in_block_time}]") - process_op_time = [] for tx in block['transactions']: for op_obj in tx['operations']: process_op(op_obj, block, blockid) @@ -380,7 +368,7 @@ def update_comment_options(op, block, blockid): mvest_per_account = {} def load_accounts(): - pprint(log_tag + "[STEEM] - Loading all accounts") + print(f"{log_tag}[STEEM] - Loading all accounts") for account in db.account.find(): if 'vesting_shares' in account: mvest_per_account.update({account['name']: account['vesting_shares']}) @@ -420,10 +408,12 @@ def update_queue(): 'created': {'$gt': max_date}, 'scanned': {'$lt': scan_ignore}, }).sort('scanned', 1).limit(queue_length) - pprint(log_tag + "[Queue] Comments - " + str(queue_length) + " of " + str(db.comment.count_documents({ + total_comments = db.comment.count_documents({ 'created': {'$gt': max_date}, 'scanned': {'$lt': scan_ignore} - }))) + }) + print(f"{log_tag}[Queue] Comments - {queue_length} of {total_comments}") + for item in queue: update_comment(item['author'], item['permlink']) @@ -433,21 +423,25 @@ def update_queue(): 'depth': 0, 'pending_payout_value': {'$gt': 0} }).limit(queue_length) - pprint(log_tag + "[Queue] Past Payouts - " + str(queue_length) + " of " + str(db.comment.count_documents({ + + total_past_payouts = db.comment.count_documents({ 'cashout_time': {'$lt': datetime.now()}, 'mode': {'$in': ['first_payout', 'second_payout']}, 'depth': 0, 'pending_payout_value': {'$gt': 0} - }))) + }) + print(f"{log_tag}[Queue] Past Payouts - {queue_length} of {total_past_payouts}") + for item in queue: update_comment(item['author'], item['permlink']) queue_length = 20 queue = db.account.find({'_dirty': True}).limit(queue_length) - pprint(log_tag + "[Queue] Updating Accounts - " + str(queue_length) + " of " + str(db.account.count_documents({'_dirty': True}))) + total_accounts = db.account.count_documents({'_dirty': True}) + print(f"{log_tag}[Queue] Updating Accounts - {queue_length} of {total_accounts}") for item in queue: update_account(item['_id']) - pprint(log_tag + "[Queue] Done") + print(f"{log_tag}[Queue] Done") def fetch_blocks_in_batch(start_block, end_block): requests_data = [ @@ -483,7 +477,7 @@ def fetch_block(block_num): return None if __name__ == '__main__': - pprint(log_tag + "[STEEM] - Starting SteemDB Sync Service") + print(f"{log_tag}[STEEM] - Starting SteemDB Sync Service") sys.stdout.flush() config = rpc.get_config() block_interval = config["STEEM_BLOCK_INTERVAL"] @@ -496,118 +490,26 @@ def fetch_block(block_num): block_number = props['last_irreversible_block_num'] while (block_number - last_block) > 0: + total_start_time = time.perf_counter() end_block = min(last_block + batch_size, block_number) - blocks = fetch_blocks_in_batch(last_block + 1, end_block) + if end_block <= last_block: + break + blocks = rpc.get_blocks_range(last_block + 1, end_block) - for block_response in blocks: - if 'result' in block_response: - block = block_response['result'] - last_block += 1 - total_start_time = time.perf_counter() - pprint(log_tag + "[STEEM] - Starting Block #" + str(last_block)) - flush_start_time1 = time.perf_counter() - sys.stdout.flush() - flush_time1 = time.perf_counter() - flush_start_time1 - - get_block_start_time = time.perf_counter() - get_block_time = time.perf_counter() - get_block_start_time - - process_block_start_time = time.perf_counter() - process_block(block, last_block) - process_block_time = time.perf_counter() - process_block_start_time - - db.status.update_one({'_id': 'height'}, {"$set": {'value': last_block}}, upsert=True) - pprint(log_tag + "[STEEM] - Processed up to Block #" + str(last_block)) - - flush_start_time2 = time.perf_counter() - sys.stdout.flush() - flush_time2 = time.perf_counter() - flush_start_time2 - - total_time = time.perf_counter() - total_start_time - print(log_tag + '[TEST Time] Total time: [%f], \ -get_block time: [%f, %s%%], \ -process_block time: [%f, %s%%], \ -save_block time: [%f, %s%%], \ -get_ops_in_block time: [%f, %s%%], \ -process ops time: [%s], \ -flush_time1: [%f, %s%%], \ -flush_time2: [%f, %s%%]' - % ( - total_time, - get_block_time, - str(get_block_time / total_time * 100), - process_block_time, - str(process_block_time / total_time * 100), - save_block_time, - str(save_block_time / total_time * 100), - get_ops_in_block_time, - str(get_ops_in_block_time / total_time * 100), - str(process_op_time), - flush_time1, - str(flush_time1 / total_time * 100), - flush_time2, - str(flush_time2 / total_time * 100) - )) + for block in blocks: + last_block = block['block_num'] + print(f"{log_tag}[STEEM] - Starting Block #{last_block}") + sys.stdout.flush() + + process_block(block, last_block) + db.status.update_one({'_id': 'height'}, {"$set": {'value': last_block}}, upsert=True) + print(f"{log_tag}[STEEM] - Processed up to Block #{last_block}") + sys.stdout.flush() + + total_time = time.perf_counter() - total_start_time + print(f"{log_tag}[TEST Time] Batch Process Time: [{total_time}]") sys.stdout.flush() - print(log_tag + '[TEST Time]global process time [%f]' % (time.perf_counter() - global_process_start_time)) - - # Check if we are at the head block - if block_number == last_block: - # Fetch the next block and retry on failure - while True: - block_response = fetch_block(last_block + 1) - if block_response and 'result' in block_response: - block = block_response['result'] - last_block += 1 - total_start_time = time.perf_counter() - pprint(log_tag + "[STEEM] - Starting Block #" + str(last_block)) - flush_start_time1 = time.perf_counter() - sys.stdout.flush() - flush_time1 = time.perf_counter() - flush_start_time1 - - get_block_start_time = time.perf_counter() - get_block_time = time.perf_counter() - get_block_start_time - - process_block_start_time = time.perf_counter() - process_block(block, last_block) - process_block_time = time.perf_counter() - process_block_start_time - - db.status.update_one({'_id': 'height'}, {"$set": {'value': last_block}}, upsert=True) - pprint(log_tag + "[STEEM] - Processed up to Block #" + str(last_block)) - - flush_start_time2 = time.perf_counter() - sys.stdout.flush() - flush_time2 = time.perf_counter() - flush_start_time2 - - total_time = time.perf_counter() - total_start_time - print(log_tag + '[TEST Time] Total time: [%f], \ -get_block time: [%f, %s%%], \ -process_block time: [%f, %s%%], \ -save_block time: [%f, %s%%], \ -get_ops_in_block time: [%f, %s%%], \ -process ops time: [%s], \ -flush_time1: [%f, %s%%], \ -flush_time2: [%f, %s%%]' - % ( - total_time, - get_block_time, - str(get_block_time / total_time * 100), - process_block_time, - str(process_block_time / total_time * 100), - save_block_time, - str(save_block_time / total_time * 100), - get_ops_in_block_time, - str(get_ops_in_block_time / total_time * 100), - str(process_op_time), - flush_time1, - str(flush_time1 / total_time * 100), - flush_time2, - str(flush_time2 / total_time * 100) - )) - break - else: - print(log_tag + "Retrying to fetch block #" + str(last_block + 1)) - time.sleep(3) # Retry after 3 seconds + print(f"{log_tag}[TEST Time] Global Process Time [{time.perf_counter() - global_process_start_time}]") time.sleep(block_interval)