diff --git a/docker/history/Dockerfile b/docker/history/Dockerfile index b80bd70..8ab7052 100644 --- a/docker/history/Dockerfile +++ b/docker/history/Dockerfile @@ -1,11 +1,20 @@ -FROM python:3.6.2-slim +# Use an official Python runtime as a parent image +FROM python:3.9-slim +# Install system dependencies RUN apt-get update && \ - apt-get install -y git make gcc libssl-dev libgmp-dev python-dev libxml2-dev libxslt1-dev zlib1g-dev + apt-get install -y --no-install-recommends git make gcc libssl-dev libgmp-dev python3-dev libxml2-dev libxslt1-dev zlib1g-dev && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* -RUN pip3 install steem pymongo apscheduler +# Install Python packages +RUN pip install --no-cache-dir steem pymongo apscheduler requests tenacity -COPY ./docker/history /src -ADD ./rds-combined-ca-bundle.pem /src/rds-combined-ca-bundle.pem +# Copy the application code into the container +COPY . /src -CMD ["python", "/src/history.py"] +# Set the working directory +WORKDIR /src + +# Command to run the application +CMD ["python", "history.py"] diff --git a/docker/history/Readme.md b/docker/history/Readme.md new file mode 100644 index 0000000..6dbe9a6 --- /dev/null +++ b/docker/history/Readme.md @@ -0,0 +1,79 @@ + +# 🌟 Steem Account Data Updater + +This repository contains a sophisticated script designed to update and maintain Steem account data and associated properties within MongoDB. It retrieves data from the Steem blockchain and ensures its currency through scheduled updates. + +## ✨ Features +- **Efficient Data Retrieval**: Leverages batch requests to the Steem API for optimal data fetching. +- **Comprehensive Data Storage**: Stores detailed Steem account information and global properties in MongoDB. +- **Structured Logging and Error Handling**: Provides robust mechanisms for logging and error management. +- **Automated Scheduling**: Utilizes APScheduler to perform regular updates, ensuring data remains current. + +## 📋 Requirements +- Python 3.9 +- MongoDB +- Docker (for containerized deployment) + +## ⚙️ Configuration +Create a `config.json` file in the root directory with the following structure: + +```json +{ + "STEEMD_URLS": ["https://api.steemit.com"], + "MONGODB": "your_mongodb_connection_string" +} +``` + +Alternatively, configure via environment variables when running the Docker container: + +- `STEEMD_URLS`: A comma-separated list of Steem node URLs. +- `MONGODB`: The MongoDB connection string. + +## 🚀 Installation + +1. Clone the repository: + ```sh + git clone https://github.com/your-repo/steem-account-updater.git + cd steem-account-updater + ``` + +2. Build the Docker image: + ```sh + docker build -t steem-history . + ``` + +3. Rename `config.json.example` to `config.json` if using the file for configuration: + ```sh + mv config.json.example config.json + ``` + +4. Run the Docker container using environment variables (if not using `config.json`): + ```sh + docker run -d --name steem-history -e STEEMD_URLS="http://10.10.100.12:8080" -e MONGODB="mongodb://10.10.100.30:27017" steem-history + ``` + +5. Run the Docker container with `config.json`: + ```sh + docker run -d --name steem-history -v $(pwd)/config.json:/src/config.json steem-history + ``` + +## 📚 Usage +The script executes the following tasks: +1. **Client Information Update**: Refreshes the client data to maintain accuracy. +2. **Global Properties Update**: Keeps global blockchain properties up-to-date. +3. **Account MVests Load**: Fetches and updates the MVests (Million Vests) per account. +4. **Transaction History Update**: Ensures the transaction history is current. +5. **Account Details Processing**: Processes and inserts comprehensive account details into MongoDB. + +Monitor logs using: +```sh +docker logs --follow steem-history +``` + +## 🤝 Contributing +Pull requests are welcome. For substantial changes, please open an issue to discuss your proposed modifications. + +Please ensure that you update tests as necessary. + +## 📜 License +This project is licensed under the MIT License - see the [LICENSE](https://choosealicense.com/licenses/mit/) file for details. diff --git a/docker/history/config.json.example b/docker/history/config.json.example new file mode 100644 index 0000000..89addcc --- /dev/null +++ b/docker/history/config.json.example @@ -0,0 +1,5 @@ +{ + "STEEMD_URL": "http://10.10.100.12:8080", + "MONGODB": "mongodb://10.10.100.30:27017/" + } + \ No newline at end of file diff --git a/docker/history/fix.py b/docker/history/fix.py deleted file mode 100644 index 68083a1..0000000 --- a/docker/history/fix.py +++ /dev/null @@ -1,35 +0,0 @@ -from datetime import datetime -from pymongo import MongoClient -from bson.objectid import ObjectId -from pprint import pprint -import collections -import time -import sys -import os - -mongo = MongoClient("mongodb://mongo") -db = mongo.steemdb - -if __name__ == '__main__': - records = db.account_history.find({"account": {"$exists": False}}) - for record in records: - try: - oldid = str(record['_id']) - s1 = oldid[0:-9] - s2 = datetime.strptime(oldid[-8:], "%Y%m%d") - newDocument = record.copy() - newDocument['date'] = s2 - newDocument['account'] = s1 - del newDocument['_id'] - pprint("Adding new record for account " + newDocument['account']) - pprint("Removing ID " + record['_id']) - db.account_history.insert(newDocument) - db.account_history.remove({'_id': record['_id']}) - sys.stdout.flush() - except OSError as err: - print("OS error: {0}".format(err)) - except ValueError: - print("Could not convert data to an integer.") - except: - print("Unexpected error:", sys.exc_info()[0]) - raise diff --git a/docker/history/history.py b/docker/history/history.py index 2db95b0..96dacf9 100644 --- a/docker/history/history.py +++ b/docker/history/history.py @@ -1,44 +1,66 @@ +import logging +import json +import requests from datetime import datetime, timedelta from steem import Steem -from pymongo import MongoClient +from pymongo import MongoClient, UpdateOne from pprint import pprint -import collections -import time -import sys -import os +from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import re - +import sys +import collections # Importing collections for OrderedDict +from multiprocessing import Pool from apscheduler.schedulers.background import BackgroundScheduler +import itertools +import os # Import os module to read environment variables + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) log_tag = '[History] ' -env_dist = os.environ -steemd_url = env_dist.get('STEEMD_URL') -if steemd_url == None or steemd_url == "": - steemd_url = 'https://api.steemit.com' -mongodb_url = env_dist.get('MONGODB') -if mongodb_url == None or mongodb_url == "": - print(log_tag + 'NEED MONGODB') + +# Load configuration from config.json if it exists, otherwise from environment variables +try: + with open('config.json') as config_file: + config = json.load(config_file) + steemd_urls = config.get('STEEMD_URLS', ['https://api.steemit.com']) + mongodb_url = config.get('MONGODB') +except FileNotFoundError: + logger.warning(log_tag + 'config.json not found, falling back to environment variables') + steemd_urls = os.getenv('STEEMD_URLS', 'https://api.steemit.com').split(',') + mongodb_url = os.getenv('MONGODB') + +if not mongodb_url: + logger.error(log_tag + 'NEED MONGODB') exit() -fullnodes = [ - #'http://10.40.103.102:8090', - #'https://api.steemit.com', - steemd_url, -] -rpc = Steem(fullnodes) +# Initialize Steem client with cycling nodes +rpc = Steem(steemd_urls) +rpc.nodes = itertools.cycle(steemd_urls) + mongo = MongoClient(mongodb_url) db = mongo.steemdb mvest_per_account = {} +def steem_batch_request(url, batch_data): + headers = { + 'Content-Type': 'application/json', + } + logger.info("Requesting: %s", json.dumps(batch_data)) + response = requests.post(url, headers=headers, data=json.dumps(batch_data)) + logger.info("Response: %s", response.text) + return response.json() + def load_accounts(): - pprint(log_tag + "[STEEM] - Loading mvest per account") + logger.info(log_tag + "[STEEM] - Loading mvest per account") for account in db.account.find(): if "name" in account.keys(): mvest_per_account.update({account['name']: account['vesting_shares']}) def update_fund_history(): - pprint(log_tag + "[STEEM] - Update Fund History") + logger.info(log_tag + "[STEEM] - Update Fund History") fund = rpc.get_reward_fund('post') for key in ['recent_claims', 'content_constant']: @@ -48,14 +70,13 @@ def update_fund_history(): for key in ['last_update']: fund[key] = datetime.strptime(fund[key], "%Y-%m-%dT%H:%M:%S") - db.funds_history.insert(fund) + db.funds_history.insert_one(fund) def update_props_history(): - pprint(log_tag + "[STEEM] - Update Global Properties") + logger.info(log_tag + "[STEEM] - Update Global Properties") props = rpc.get_dynamic_global_properties() - #for key in ['max_virtual_bandwidth', 'recent_slots_filled', 'total_reward_shares2']: for key in ['recent_slots_filled', 'total_reward_shares2']: props[key] = float(props[key]) for key in ['confidential_sbd_supply', 'confidential_supply', 'current_sbd_supply', 'current_supply', 'total_reward_fund_steem', 'total_vesting_fund_steem', 'total_vesting_shares', 'virtual_supply']: @@ -63,374 +84,321 @@ def update_props_history(): for key in ['time']: props[key] = datetime.strptime(props[key], "%Y-%m-%dT%H:%M:%S") - #floor($return['total_vesting_fund_steem'] / $return['total_vesting_shares'] * 1000000 * 1000) / 1000; - props['steem_per_mvests'] = props['total_vesting_fund_steem'] / props['total_vesting_shares'] * 1000000 - db.status.update({ - '_id': 'steem_per_mvests' + db.status.update_one({ + '_id': 'steem_per_mvests' }, { - '$set': { - '_id': 'steem_per_mvests', - 'value': props['steem_per_mvests'] - } + '$set': { + '_id': 'steem_per_mvests', + 'value': props['steem_per_mvests'] + } }, upsert=True) - db.status.update({ - '_id': 'props' + db.status.update_one({ + '_id': 'props' }, { - '$set': { - '_id': 'props', - 'props': props - } + '$set': { + '_id': 'props', + 'props': props + } }, upsert=True) - db.props_history.insert(props) + db.props_history.insert_one(props) def update_tx_history(): - pprint(log_tag + "[STEEM] - Update Transaction History") + logger.info(log_tag + "[STEEM] - Update Transaction History") now = datetime.now().date() today = datetime.combine(now, datetime.min.time()) yesterday = today - timedelta(1) - # Determine tx per day query = { - '_ts': { - '$gte': today, - '$lte': today + timedelta(1) - } + '_ts': { + '$gte': today, + '$lte': today + timedelta(1) + } } - count = db.block_30d.count(query) - - pprint(log_tag + str(count)) - - pprint(log_tag + str(now)) - pprint(log_tag + str(today)) - pprint(log_tag + str(yesterday)) - - + count = db.block_30d.count_documents(query) + + logger.info(log_tag + str(count)) + logger.info(log_tag + str(now)) + logger.info(log_tag + str(today)) + logger.info(log_tag + str(yesterday)) + +@retry(wait=wait_exponential(multiplier=1, min=1, max=10), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception)) +def get_batch_account_details(accounts): + batch_data = [{"jsonrpc": "2.0", "method": "condenser_api.get_accounts", "params": [accounts], "id": i+1} for i, account in enumerate(accounts)] + current_node = next(rpc.nodes) # Get the current node from the cycle + response = steem_batch_request(current_node, batch_data) + return [res['result'][0] for res in response if 'result' in res] + +def process_account_details(account): + account_data = collections.OrderedDict(sorted(account.items())) + account_data['proxy_witness'] = sum(float(i) for i in account_data['proxied_vsf_votes']) / 1000000 + for key in ['reputation', 'to_withdraw']: + account_data[key] = float(account_data[key]) + for key in ['balance', 'sbd_balance', 'sbd_seconds', 'savings_balance', 'savings_sbd_balance', 'vesting_balance', 'vesting_shares', 'vesting_withdraw_rate']: + account_data[key] = float(account_data[key].split()[0]) + for key in ['created','last_account_recovery','last_account_update','last_owner_update','last_post','last_root_post','last_vote_time','next_vesting_withdrawal','savings_sbd_last_interest_payment','savings_sbd_seconds_last_update','sbd_last_interest_payment','sbd_seconds_last_update']: + account_data[key] = datetime.strptime(account_data[key], "%Y-%m-%dT%H:%M:%S") + account_data['total_balance'] = account_data['balance'] + account_data['savings_balance'] + account_data['total_sbd_balance'] = account_data['sbd_balance'] + account_data['savings_sbd_balance'] + account_data['scanned'] = datetime.now() + return account_data def update_history(): - update_fund_history() update_props_history() - # update_tx_history() - # sys.stdout.flush() - - # Load all accounts - users = rpc.lookup_accounts(-1, 1000) - more = True - while more: - newUsers = rpc.lookup_accounts(users[-1], 1000) - if len(newUsers) < 1000: - more = False - users = users + newUsers - - # Set dates - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - - pprint(log_tag + "[STEEM] - Update History (" + str(len(users)) + " accounts)") - # Snapshot User Count - db.statistics.update({ - 'key': 'users', - 'date': today, - }, { - 'key': 'users', - 'date': today, - 'value': len(users) - }, upsert=True) - sys.stdout.flush() - # Update history on accounts - for user in users: - # Load State - state = rpc.get_accounts([user]) - # Get Account Data - account = collections.OrderedDict(sorted(state[0].items())) - # Get followers - account['followers'] = [] - account['followers_count'] = 0 - account['followers_mvest'] = 0 -# #followers_results = rpc.get_followers(user, "", "blog", 100, api="follow") -# followers_results = rpc.get_followers(user, "", "blog", 100) -# while followers_results: -# last_account = "" -# for follower in followers_results: -# last_account = follower['follower'] -# if 'blog' in follower['what'] or 'posts' in follower['what']: -# account['followers'].append(follower['follower']) -# account['followers_count'] += 1 -# if follower['follower'] in mvest_per_account.keys(): -# account['followers_mvest'] += float(mvest_per_account[follower['follower']]) -# #followers_results = rpc.get_followers(user, last_account, "blog", 100, api="follow")[1:] -# followers_results = rpc.get_followers(user, last_account, "blog", 100)[1:] -# # Get following - account['following'] = [] - account['following_count'] = 0 -# #following_results = rpc.get_following(user, -1, "blog", 100, api="follow") -# following_results = rpc.get_following(user, "", "blog", 100) -# while following_results: -# last_account = "" -# for following in following_results: -# last_account = following['following'] -# if 'blog' in following['what'] or 'posts' in following['what']: -# account['following'].append(following['following']) -# account['following_count'] += 1 -# #following_results = rpc.get_following(user, last_account, "blog", 100, api="follow")[1:] -# following_results = rpc.get_following(user, last_account, "blog", 100)[1:] - # Convert to Numbers - account['proxy_witness'] = sum(float(i) for i in account['proxied_vsf_votes']) / 1000000 - for key in ['reputation', 'to_withdraw']: - account[key] = float(account[key]) - for key in ['balance', 'sbd_balance', 'sbd_seconds', 'savings_balance', 'savings_sbd_balance', 'vesting_balance', 'vesting_shares', 'vesting_withdraw_rate']: - account[key] = float(account[key].split()[0]) - # Convert to Date - #for key in ['created','last_account_recovery','last_account_update','last_active_proved','last_bandwidth_update','last_market_bandwidth_update','last_owner_proved','last_owner_update','last_post','last_root_post','last_vote_time','next_vesting_withdrawal','savings_sbd_last_interest_payment','savings_sbd_seconds_last_update','sbd_last_interest_payment','sbd_seconds_last_update']: - for key in ['created','last_account_recovery','last_account_update','last_owner_update','last_post','last_root_post','last_vote_time','next_vesting_withdrawal','savings_sbd_last_interest_payment','savings_sbd_seconds_last_update','sbd_last_interest_payment','sbd_seconds_last_update']: - account[key] = datetime.strptime(account[key], "%Y-%m-%dT%H:%M:%S") - # Combine Savings + Balance - account['total_balance'] = account['balance'] + account['savings_balance'] - account['total_sbd_balance'] = account['sbd_balance'] + account['savings_sbd_balance'] - # Update our current info about the account - mvest_per_account.update({account['name']: account['vesting_shares']}) - # Save current state of account - account['scanned'] = datetime.now() - db.account.update({'_id': user}, account, upsert=True) - # Create our Snapshot dict - wanted_keys = ['name', 'proxy_witness', 'activity_shares', 'average_bandwidth', 'average_market_bandwidth', 'savings_balance', 'balance', 'comment_count', 'curation_rewards', 'lifetime_bandwidth', 'lifetime_vote_count', 'next_vesting_withdrawal', 'reputation', 'post_bandwidth', 'post_count', 'posting_rewards', 'sbd_balance', 'savings_sbd_balance', 'sbd_last_interest_payment', 'sbd_seconds', 'sbd_seconds_last_update', 'to_withdraw', 'vesting_balance', 'vesting_shares', 'vesting_withdraw_rate', 'voting_power', 'withdraw_routes', 'withdrawn', 'witnesses_voted_for'] - snapshot = dict((k, account[k]) for k in wanted_keys if k in account) - snapshot.update({ - 'account': user, - 'date': today, - 'followers': len(account['followers']), - 'following': len(account['following']), - }) - # Save Snapshot in Database - db.account_history.update({ - 'account': user, - 'date': today - }, snapshot, upsert=True) - print(log_tag + 'finish user: ', user) - pprint(log_tag + "history update finish") + users = [] + try: + users = rpc.lookup_accounts(-1, 1000) + more = True + while more: + newUsers = rpc.lookup_accounts(users[-1], 1000) + if len(newUsers) < 1000: + more = False + users = users + newUsers + except Exception as e: + logger.error(f"{log_tag}Error fetching accounts: {str(e)}") + + batch_size = 50 + for i in range(0, len(users), batch_size): + batch_users = users[i:i+batch_size] + try: + account_details = get_batch_account_details(batch_users) + except Exception as e: + logger.error(f"{log_tag}Error fetching account details for batch {batch_users}: {str(e)}") + continue + + operations = [] + for account in account_details: + try: + account_data = process_account_details(account) + operations.append(UpdateOne({'_id': account_data['name']}, {'$set': account_data}, upsert=True)) + + wanted_keys = ['name', 'proxy_witness', 'activity_shares', 'average_bandwidth', 'average_market_bandwidth', 'savings_balance', 'balance', 'comment_count', 'curation_rewards', 'lifetime_bandwidth', 'lifetime_vote_count', 'next_vesting_withdrawal', 'reputation', 'post_bandwidth', 'post_count', 'posting_rewards', 'sbd_balance', 'savings_sbd_balance', 'sbd_last_interest_payment', 'sbd_seconds', 'sbd_seconds_last_update', 'to_withdraw', 'vesting_balance', 'vesting_shares', 'vesting_withdraw_rate', 'voting_power', 'withdraw_routes', 'withdrawn', 'witnesses_voted_for'] + snapshot = dict((k, account_data[k]) for k in wanted_keys if k in account_data) + snapshot.update({ + 'account': account_data['name'], + 'date': datetime.combine(datetime.now().date(), datetime.min.time()), + }) + operations.append(UpdateOne({'account': account_data['name'], 'date': datetime.combine(datetime.now().date(), datetime.min.time())}, {'$set': snapshot}, upsert=True)) + except Exception as e: + logger.error(f"{log_tag}Error processing account {account['name']}: {str(e)}") + continue + + try: + db.account.bulk_write(operations) + db.account_history.bulk_write(operations) + except Exception as e: + logger.error(f"{log_tag}Error writing to MongoDB: {str(e)}") + + logger.info(log_tag + "history update finish") def update_stats(): - pprint(log_tag + "updating stats"); - # Calculate Transactions - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 28800 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '24h', - 'tx': { - '$sum': 1 - } - } - } - ]) - data = list(results)[0]['tx'] - db.status.update({'_id': 'transactions-24h'}, {'$set': {'data' : data}}, upsert=True) - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - db.tx_history.update({ - 'timeframe': '24h', - 'date': today - }, {'$set': {'data': data}}, upsert=True) - - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 1200 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '1h', - 'tx': { - '$sum': 1 - } - } - } - ]) - db.status.update({'_id': 'transactions-1h'}, {'$set': {'data' : list(results)[0]['tx']}}, upsert=True) - - # Calculate Operations - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 28800 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '24h', - 'tx': { - '$sum': { - '$size': '$transactions.operations' - } - } - } - } - ]) - data = list(results)[0]['tx'] - db.status.update({'_id': 'operations-24h'}, {'$set': {'data' : data}}, upsert=True) - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - db.op_history.update({ - 'timeframe': '24h', - 'date': today - }, {'$set': {'data': data}}, upsert=True) - - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 1200 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '1h', - 'tx': { - '$sum': { - '$size': '$transactions.operations' - } + logger.info(log_tag + "updating stats") + results = db.block_30d.aggregate([ + { + '$sort': { + '_id': -1 + } + }, + { + '$limit': 28800 * 1 + }, + { + '$unwind': '$transactions' + }, + { + '$group': { + '_id': '24h', + 'tx': { + '$sum': 1 + } + } } - } - } - ]) - db.status.update({'_id': 'operations-1h'}, {'$set': {'data' : list(results)[0]['tx']}}, upsert=True) - + ]) + data = list(results)[0]['tx'] + db.status.update_one({'_id': 'transactions-24h'}, {'$set': {'data': data}}, upsert=True) + now = datetime.now().date() + today = datetime.combine(now, datetime.min.time()) + db.tx_history.update_one({ + 'timeframe': '24h', + 'date': today + }, {'$set': {'data': data}}, upsert=True) -def update_clients(): - try: - pprint(log_tag + "updating clients"); - start = datetime.today() - timedelta(days=90) - end = datetime.today() - regx = re.compile("([\w-]+\/[\w.]+)", re.IGNORECASE) - results = db.comment.aggregate([ - { - '$match': { - 'created': { - '$gte': start, - '$lte': end, - }, - 'json_metadata.app': { - '$type': 'string', - '$regex': regx, - } - } - }, - { - '$project': { - 'created': '$created', - 'parts': { - '$split': ['$json_metadata.app', '/'] - }, - 'reward': { - '$add': ['$total_payout_value', '$pending_payout_value', '$total_pending_payout_value'] - } - } - }, - { - '$group': { - '_id': { - 'client': {'$arrayElemAt': ['$parts', 0]}, - 'doy': {'$dayOfYear': '$created'}, - 'year': {'$year': '$created'}, - 'month': {'$month': '$created'}, - 'day': {'$dayOfMonth': '$created'}, - 'dow': {'$dayOfWeek': '$created'}, - }, - 'reward': {'$sum': '$reward'}, - 'value': {'$sum': 1} - } - }, - { - '$sort': { - '_id.year': 1, - '_id.doy': 1, - 'value': -1, - } - }, - { - '$group': { - '_id': { - 'doy': '$_id.doy', - 'year': '$_id.year', - 'month': '$_id.month', - 'day': '$_id.day', - 'dow': '$_id.dow', - }, - 'clients': { - '$push': { - 'client': '$_id.client', - 'count': '$value', - 'reward': '$reward' + results = db.block_30d.aggregate([ + { + '$sort': { + '_id': -1 + } + }, + { + '$limit': 1200 * 1 + }, + { + '$unwind': '$transactions' + }, + { + '$group': { + '_id': '1h', + 'tx': { + '$sum': 1 + } } - }, - 'reward' : { - '$sum': '$reward' - }, - 'total': { - '$sum': '$value' - } } - }, - { - '$sort': { - '_id.year': -1, - '_id.doy': -1 + ]) + db.status.update_one({'_id': 'transactions-1h'}, {'$set': {'data': list(results)[0]['tx']}}, upsert=True) + + results = db.block_30d.aggregate([ + { + '$sort': { + '_id': -1 + } + }, + { + '$limit': 28800 * 1 + }, + { + '$unwind': '$transactions' + }, + { + '$group': { + '_id': '24h', + 'tx': { + '$sum': { + '$size': '$transactions.operations' + } + } + } } - }, ]) - pprint(log_tag + "complete") - sys.stdout.flush() - data = list(results) - db.status.update({'_id': 'clients-snapshot'}, {'$set': {'data' : data}}, upsert=True) - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - db.clients_history.update({ - 'date': today + data = list(results)[0]['tx'] + db.status.update_one({'_id': 'operations-24h'}, {'$set': {'data': data}}, upsert=True) + db.op_history.update_one({ + 'timeframe': '24h', + 'date': today }, {'$set': {'data': data}}, upsert=True) - pass - except Exception as e: - pass + results = db.block_30d.aggregate([ + { + '$sort': { + '_id': -1 + } + }, + { + '$limit': 1200 * 1 + }, + { + '$unwind': '$transactions' + }, + { + '$group': { + '_id': '1h', + 'tx': { + '$sum': { + '$size': '$transactions.operations' + } + } + } + } + ]) + db.status.update_one({'_id': 'operations-1h'}, {'$set': {'data': list(results)[0]['tx']}}, upsert=True) + +def update_clients(): + try: + logger.info(log_tag + "updating clients") + start = datetime.today() - timedelta(days=90) + end = datetime.today() + regx = re.compile("([\w-]+\/[\w.]+)", re.IGNORECASE) + results = db.comment.aggregate([ + { + '$match': { + 'created': { + '$gte': start, + '$lte': end, + }, + 'json_metadata.app': { + '$type': 'string', + '$regex': regx, + } + } + }, + { + '$project': { + 'created': '$created', + 'parts': { + '$split': ['$json_metadata.app', '/'] + }, + 'reward': { + '$add': ['$total_payout_value', '$pending_payout_value', '$total_pending_payout_value'] + } + } + }, + { + '$group': { + '_id': { + 'client': {'$arrayElemAt': ['$parts', 0]}, + 'doy': {'$dayOfYear': '$created'}, + 'year': {'$year': '$created'}, + 'month': {'$month': '$created'}, + 'day': {'$dayOfMonth': '$created'}, + 'dow': {'$dayOfWeek': '$created'}, + }, + 'reward': {'$sum': '$reward'}, + 'value': {'$sum': 1} + } + }, + { + '$sort': { + '_id.year': 1, + '_id.doy': 1, + 'value': -1, + } + }, + { + '$group': { + '_id': { + 'doy': '$_id.doy', + 'year': '$_id.year', + 'month': '$_id.month', + 'day': '$_id.day', + 'dow': '$_id.dow', + }, + 'clients': { + '$push': { + 'client': '$_id.client', + 'count': '$value', + 'reward': '$reward' + } + }, + 'reward': {'$sum': '$reward'}, + 'total': {'$sum': '$value'} + } + }, + { + '$sort': { + '_id.year': -1, + '_id.doy': -1 + } + }, + ]) + logger.info(log_tag + "complete") + sys.stdout.flush() + data = list(results) + db.status.update_one({'_id': 'clients-snapshot'}, {'$set': {'data': data}}, upsert=True) + now = datetime.now().date() + today = datetime.combine(now, datetime.min.time()) + db.clients_history.update_one({ + 'date': today + }, {'$set': {'data': data}}, upsert=True) + except Exception as e: + logger.error(log_tag + "Error updating clients: %s", str(e)) if __name__ == '__main__': - pprint(log_tag + "starting"); - # Load all account data into memory - - # Start job immediately + logger.info(log_tag + "starting") update_clients() update_props_history() load_accounts() @@ -438,13 +406,11 @@ def update_clients(): update_history() sys.stdout.flush() - # Schedule it to run every 6 hours scheduler = BackgroundScheduler() scheduler.add_job(update_history, 'interval', hours=24, id='update_history') scheduler.add_job(update_clients, 'interval', hours=1, id='update_clients') scheduler.add_job(update_stats, 'interval', minutes=5, id='update_stats') scheduler.start() - # Loop try: while True: time.sleep(2) diff --git a/docker/history/history.py_bak1 b/docker/history/history.py_bak1 deleted file mode 100644 index 8f7dbd8..0000000 --- a/docker/history/history.py_bak1 +++ /dev/null @@ -1,465 +0,0 @@ -import json -from datetime import datetime, timedelta -from steem import Steem -from pymongo import MongoClient -from pprint import pprint -import collections -import time -import sys -import os -import re - -from apscheduler.schedulers.background import BackgroundScheduler - -fullnodes = [ - 'http://10.60.103.43:8080', - #'https://api.steemit.com', -] -rpc = Steem(fullnodes) -#mongo = MongoClient("mongodb://47.240.97.170") -mongo = MongoClient("mongodb://10.40.103.102") -db = mongo.steemdb - -mvest_per_account = {} - -def load_accounts(): - pprint("[STEEM] - Loading mvest per account") - for account in db.account.find(): - if "name" in account.keys(): - mvest_per_account.update({account['name']: account['vesting_shares']}) - -def update_fund_history(): - pprint("[STEEM] - Update Fund History") - - fund = rpc.get_reward_fund('post') - for key in ['recent_claims', 'content_constant']: - fund[key] = float(fund[key]) - for key in ['reward_balance']: - fund[key] = float(fund[key].split()[0]) - for key in ['last_update']: - fund[key] = datetime.strptime(fund[key], "%Y-%m-%dT%H:%M:%S") - - db.funds_history.insert(fund) - -def update_props_history(): - pprint("[STEEM] - Update Global Properties") - - props = rpc.get_dynamic_global_properties() - - # for key in ['max_virtual_bandwidth', 'recent_slots_filled', 'total_reward_shares2']: - for key in ['recent_slots_filled', 'total_reward_shares2']: - props[key] = float(props[key]) - for key in ['confidential_sbd_supply', 'confidential_supply', 'current_sbd_supply', 'current_supply', 'total_reward_fund_steem', 'total_vesting_fund_steem', 'total_vesting_shares', 'virtual_supply']: - props[key] = float(props[key].split()[0]) - for key in ['time']: - props[key] = datetime.strptime(props[key], "%Y-%m-%dT%H:%M:%S") - - #floor($return['total_vesting_fund_steem'] / $return['total_vesting_shares'] * 1000000 * 1000) / 1000; - - props['steem_per_mvests'] = props['total_vesting_fund_steem'] / props['total_vesting_shares'] * 1000000 - - db.status.update({ - '_id': 'steem_per_mvests' - }, { - '$set': { - '_id': 'steem_per_mvests', - 'value': props['steem_per_mvests'] - } - }, upsert=True) - - db.status.update({ - '_id': 'props' - }, { - '$set': { - '_id': 'props', - 'props': props - } - }, upsert=True) - - db.props_history.insert(props) - -def update_tx_history(): - pprint("[STEEM] - Update Transaction History") - now = datetime.now().date() - - today = datetime.combine(now, datetime.min.time()) - yesterday = today - timedelta(1) - - # Determine tx per day - query = { - '_ts': { - '$gte': today, - '$lte': today + timedelta(1) - } - } - count = db.block_30d.count(query) - - pprint(count) - - pprint(now) - pprint(today) - pprint(yesterday) - - - -def update_history(): - - update_fund_history() - update_props_history() - # update_tx_history() - # sys.stdout.flush() - - # Load all accounts - users = rpc.lookup_accounts(-1, 10) - print("123: ",users) - print(len(users)) - more = True - # while more: - # newUsers = rpc.lookup_accounts(users[-1], 1000) - # if len(newUsers) < 1000: - # more = False - # users = users + newUsers - - # Set dates - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - - pprint("[STEEM] - Update History (" + str(len(users)) + " accounts)") - print("fengexian ===================11") - # Snapshot User Count - db.statistics.update({ - 'key': 'users', - 'date': today, - }, { - 'key': 'users', - 'date': today, - 'value': len(users) - }, upsert=True) - sys.stdout.flush() - print("fengexian ===================11") - - # Update history on accounts - for user in users: - # Load State - state = rpc.get_accounts([user]) - # Get Account Data - account = collections.OrderedDict(sorted(state[0].items())) - # Get followers - account['followers'] = [] - account['followers_count'] = 0 - account['followers_mvest'] = 0 - # followers_results = rpc.get_followers(user, "", "blog", 100, api="follow") - print("fengexian ===================1") - followers_results = rpc.get_followers(user, "", "blog", 100) - print("fengexian ===================2") - while followers_results: - last_account = "" - for follower in followers_results: - last_account = follower['follower'] - if 'blog' in follower['what'] or 'posts' in follower['what']: - account['followers'].append(follower['follower']) - account['followers_count'] += 1 - if follower['follower'] in mvest_per_account.keys(): - account['followers_mvest'] += float(mvest_per_account[follower['follower']]) - # followers_results = rpc.get_followers(user, last_account, "blog", 100, api="follow")[1:] - followers_results = rpc.get_followers(user, last_account, "blog", 100)[1:] - # Get following - account['following'] = [] - account['following_count'] = 0 - # following_results = rpc.get_following(user, -1, "blog", 100, api="follow") - print("fengexian ===================3") - following_results = rpc.get_following(user, "", "blog", 100) - print("fengexian ===================4") - while following_results: - last_account = "" - for following in following_results: - last_account = following['following'] - if 'blog' in following['what'] or 'posts' in following['what']: - account['following'].append(following['following']) - account['following_count'] += 1 - # following_results = rpc.get_following(user, last_account, "blog", 100, api="follow")[1:] - following_results = rpc.get_following(user, last_account, "blog", 100)[1:] - # Convert to Numbers - account['proxy_witness'] = sum(float(i) for i in account['proxied_vsf_votes']) / 1000000 - print(json.dumps(account)) - print("fengexian ===================5") - for key in ['reputation', 'to_withdraw']: - account[key] = float(account[key]) - for key in ['balance', 'sbd_balance', 'sbd_seconds', 'savings_balance', 'savings_sbd_balance', 'vesting_balance', 'vesting_shares', 'vesting_withdraw_rate']: - account[key] = float(account[key].split()[0]) - # Convert to Date - # for key in ['created','last_account_recovery','last_account_update','last_active_proved','last_bandwidth_update','last_market_bandwidth_update','last_owner_proved','last_owner_update','last_post','last_root_post','last_vote_time','next_vesting_withdrawal','savings_sbd_last_interest_payment','savings_sbd_seconds_last_update','sbd_last_interest_payment','sbd_seconds_last_update']: - for key in ['created','last_account_recovery','last_account_update','last_owner_update','last_post','last_root_post','last_vote_time','next_vesting_withdrawal','savings_sbd_last_interest_payment','savings_sbd_seconds_last_update','sbd_last_interest_payment','sbd_seconds_last_update']: - account[key] = datetime.strptime(account[key], "%Y-%m-%dT%H:%M:%S") - # Combine Savings + Balance - account['total_balance'] = account['balance'] + account['savings_balance'] - account['total_sbd_balance'] = account['sbd_balance'] + account['savings_sbd_balance'] - # Update our current info about the account - print("fengexian ===================6") - mvest_per_account.update({account['name']: account['vesting_shares']}) - print("fengexian ===================7") - # Save current state of account - account['scanned'] = datetime.now() - print("fengexian ===================8") - db.account.update({'_id': user}, account, upsert=True) - print("fengexian ===================9") - # Create our Snapshot dict - wanted_keys = ['name', 'proxy_witness', 'activity_shares', 'average_bandwidth', 'average_market_bandwidth', 'savings_balance', 'balance', 'comment_count', 'curation_rewards', 'lifetime_bandwidth', 'lifetime_vote_count', 'next_vesting_withdrawal', 'reputation', 'post_bandwidth', 'post_count', 'posting_rewards', 'sbd_balance', 'savings_sbd_balance', 'sbd_last_interest_payment', 'sbd_seconds', 'sbd_seconds_last_update', 'to_withdraw', 'vesting_balance', 'vesting_shares', 'vesting_withdraw_rate', 'voting_power', 'withdraw_routes', 'withdrawn', 'witnesses_voted_for'] - snapshot = dict((k, account[k]) for k in wanted_keys if k in account) - snapshot.update({ - 'account': user, - 'date': today, - 'followers': len(account['followers']), - 'following': len(account['following']), - }) - print("fengexian ===================10") - # Save Snapshot in Database - db.account_history.update({ - 'account': user, - 'date': today - }, snapshot, upsert=True) - print("fengexian ===================11") - -def update_stats(): - pprint("updating stats"); - # Calculate Transactions - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 28800 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '24h', - 'tx': { - '$sum': 1 - } - } - } - ]) - pprint(list(results)[0]['tx']) - pprint(list(results)) - pprint(len(list(results))) - pprint(list(results)[0]) - pprint(list(results)[0]['tx']) - pprint("updating stats") - return - data = list(results)[0]['tx'] - db.status.update({'_id': 'transactions-24h'}, {'$set': {'data' : data}}, upsert=True) - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - db.tx_history.update({ - 'timeframe': '24h', - 'date': today - }, {'$set': {'data': data}}, upsert=True) - - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 1200 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '1h', - 'tx': { - '$sum': 1 - } - } - } - ]) - db.status.update({'_id': 'transactions-1h'}, {'$set': {'data' : list(results)[0]['tx']}}, upsert=True) - - # Calculate Operations - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 28800 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '24h', - 'tx': { - '$sum': { - '$size': '$transactions.operations' - } - } - } - } - ]) - data = list(results)[0]['tx'] - db.status.update({'_id': 'operations-24h'}, {'$set': {'data' : data}}, upsert=True) - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - db.op_history.update({ - 'timeframe': '24h', - 'date': today - }, {'$set': {'data': data}}, upsert=True) - - results = db.block_30d.aggregate([ - { - '$sort': { - '_id': -1 - } - }, - { - '$limit': 1200 * 1 - }, - { - '$unwind': '$transactions' - }, - { - '$group': { - '_id': '1h', - 'tx': { - '$sum': { - '$size': '$transactions.operations' - } - } - } - } - ]) - db.status.update({'_id': 'operations-1h'}, {'$set': {'data' : list(results)[0]['tx']}}, upsert=True) - - -def update_clients(): - try: - pprint("updating clients"); - start = datetime.today() - timedelta(days=90) - end = datetime.today() - regx = re.compile("([\w-]+\/[\w.]+)", re.IGNORECASE) - results = db.comment.aggregate([ - { - '$match': { - 'created': { - '$gte': start, - '$lte': end, - }, - 'json_metadata.app': { - '$type': 'string', - '$regex': regx, - } - } - }, - { - '$project': { - 'created': '$created', - 'parts': { - '$split': ['$json_metadata.app', '/'] - }, - 'reward': { - '$add': ['$total_payout_value', '$pending_payout_value', '$total_pending_payout_value'] - } - } - }, - { - '$group': { - '_id': { - 'client': {'$arrayElemAt': ['$parts', 0]}, - 'doy': {'$dayOfYear': '$created'}, - 'year': {'$year': '$created'}, - 'month': {'$month': '$created'}, - 'day': {'$dayOfMonth': '$created'}, - 'dow': {'$dayOfWeek': '$created'}, - }, - 'reward': {'$sum': '$reward'}, - 'value': {'$sum': 1} - } - }, - { - '$sort': { - '_id.year': 1, - '_id.doy': 1, - 'value': -1, - } - }, - { - '$group': { - '_id': { - 'doy': '$_id.doy', - 'year': '$_id.year', - 'month': '$_id.month', - 'day': '$_id.day', - 'dow': '$_id.dow', - }, - 'clients': { - '$push': { - 'client': '$_id.client', - 'count': '$value', - 'reward': '$reward' - } - }, - 'reward' : { - '$sum': '$reward' - }, - 'total': { - '$sum': '$value' - } - } - }, - { - '$sort': { - '_id.year': -1, - '_id.doy': -1 - } - }, - ]) - pprint("complete") - sys.stdout.flush() - data = list(results) - db.status.update({'_id': 'clients-snapshot'}, {'$set': {'data' : data}}, upsert=True) - now = datetime.now().date() - today = datetime.combine(now, datetime.min.time()) - db.clients_history.update({ - 'date': today - }, {'$set': {'data': data}}, upsert=True) - pass - except Exception as e: - pass - - -if __name__ == '__main__': - pprint("starting"); - # Load all account data into memory - - # Start job immediately - # update_clients() - # update_props_history() - # load_accounts() - # update_stats() - update_history() - # sys.stdout.flush() - # - # # Schedule it to run every 6 hours - # scheduler = BackgroundScheduler() - # scheduler.add_job(update_history, 'interval', hours=24, id='update_history') - # scheduler.add_job(update_clients, 'interval', hours=1, id='update_clients') - # scheduler.add_job(update_stats, 'interval', minutes=5, id='update_stats') - # scheduler.start() - # # Loop - # try: - # while True: - # time.sleep(2) - # except (KeyboardInterrupt, SystemExit): - # scheduler.shutdown() - diff --git a/docker/history/steemdb_history.service b/docker/history/steemdb_history.service deleted file mode 100644 index 815f854..0000000 --- a/docker/history/steemdb_history.service +++ /dev/null @@ -1,11 +0,0 @@ -[Unit] -Description=SteemDB.com History Service - -[Service] -Environment=steemnode=node.steem.ws -WorkingDirectory=/var/www/com_steemdb/docker/history -ExecStart=/usr/bin/python3 history.py -Restart=always - -[Install] -WantedBy=multi-user.target diff --git a/docker/history/test.py b/docker/history/test.py deleted file mode 100644 index 8fbc4cb..0000000 --- a/docker/history/test.py +++ /dev/null @@ -1,33 +0,0 @@ -from steem import Steem -from pymongo import MongoClient -from pprint import pprint - -from apscheduler.schedulers.background import BackgroundScheduler - -fullnodes = [ - #'http://10.60.103.43:8080', - 'https://api.steemit.com', -] -rpc = Steem(fullnodes) -mongo = MongoClient("mongodb://47.240.97.170") -#mongo = MongoClient("mongodb://10.40.103.102") -db = mongo.steemdb - - -if __name__ == '__main__': - pprint("starting"); - # Load all account data into memory - - item = db.account.find() - for entry in item: - print(item) - - db.statistics.update({ - 'key': 'users', - 'date': 'test', - }, { - 'key': 'users', - 'date': 'test', - 'value': 100 - }, upsert=True) - print ('over')