Skip to content

Commit

Permalink
Merge pull request #2590 from CounterpartyXCP/develop
Browse files Browse the repository at this point in the history
v10.6.0
  • Loading branch information
ouziel-slama authored Oct 25, 2024
2 parents 22f14da + 95e60c0 commit 2521b0b
Show file tree
Hide file tree
Showing 55 changed files with 7,325 additions and 5,550 deletions.
5,758 changes: 3,043 additions & 2,715 deletions apiary.apib

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions counterparty-core/counterpartycore/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def float_range_checker(arg):
"help": "number of threads per worker for the Gunicorn WSGI server (if enabled)",
},
],
[("--bootstrap-url",), {"type": str, "help": "the URL of the bootstrap snapshot to use"}],
]


Expand Down Expand Up @@ -457,9 +458,6 @@ def main():
parser_bootstrap = subparsers.add_parser(
"bootstrap", help="bootstrap database with hosted snapshot"
)
parser_bootstrap.add_argument(
"--bootstrap-url", help="the URL of the bootstrap snapshot to use"
)
setup.add_config_arguments(parser_bootstrap, CONFIG_ARGS, configfile)

parser_checkdb = subparsers.add_parser("check-db", help="do an integrity check on the database")
Expand Down
90 changes: 55 additions & 35 deletions counterparty-core/counterpartycore/lib/api/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import logging
import multiprocessing
import os
import threading
import time
from collections import OrderedDict
from multiprocessing import Process, Value
from threading import Thread

import flask
import requests
Expand Down Expand Up @@ -392,74 +392,94 @@ def init_flask_app():
return app


def run_api_server(args, interrupted_value, server_ready_value):
def run_api_server(args, server_ready_value, stop_event):
logger.info("Starting API Server process...")

# Initialize Sentry, logging, config, etc.
sentry.init()
# Initialise log and config
server.initialise_log_and_config(argparse.Namespace(**args))

watcher = api_watcher.APIWatcher()
watcher.start()

logger.info("Starting API Server...")
app = init_flask_app()

wsgi_server = None
parent_checker = None

try:
# Init the HTTP Server.
wsgi_server = wsgi.WSGIApplication(app, args=args)
parent_checker = ParentProcessChecker(interrupted_value, wsgi_server)
logger.info("Starting Parent Process Checker thread...")
parent_checker = ParentProcessChecker(wsgi_server)
parent_checker.start()

wsgi_server = wsgi.WSGIApplication(app, args=args)

app.app_context().push()
# Run app server (blocking)
server_ready_value.value = 1

wsgi_server.run()
except KeyboardInterrupt:
logger.trace("Keyboard Interrupt!")

except Exception as e:
logger.error("Exception in API Server process!")
raise e

finally:
logger.trace("Shutting down API Server...")

watcher.stop()
watcher.join()
if watcher is not None:
watcher.stop()
watcher.join()

if wsgi_server is not None:
logger.trace("Stopping WSGI Server thread...")
wsgi_server.stop()

if parent_checker is not None:
logger.trace("Stopping Parent Process Checker thread...")
parent_checker.stop()
parent_checker.join()

wsgi_server.stop()
parent_checker.join()
logger.trace("Closing API DB Connection Pool...")
APIDBConnectionPool().close()


# This thread is used for the following two reasons:
# 1. `docker-compose stop` does not send a SIGTERM to the child processes (in this case the API v2 process)
# 2. `process.terminate()` does not trigger a `KeyboardInterrupt` or execute the `finally` block.
class ParentProcessChecker(Thread):
def __init__(self, interruped_value, wsgi_server):
class ParentProcessChecker(threading.Thread):
def __init__(self, wsgi_server):
super().__init__()
self.interruped_value = interruped_value
self.daemon = True
self.wsgi_server = wsgi_server
self._stop_event = threading.Event()

def run(self):
parent_pid = os.getppid()
try:
while True:
if self.interruped_value.value == 0:
time.sleep(0.01)
else:
logger.trace("Parent process is dead. Exiting...")
while not self._stop_event.is_set():
if os.getppid() != parent_pid:
logger.debug("Parent process is dead. Exiting...")
self.wsgi_server.stop()
break
self.wsgi_server.stop()
time.sleep(1)
except KeyboardInterrupt:
pass

def stop(self):
self._stop_event.set()


class APIServer(object):
def __init__(self):
self.process = None
self.interrupted = Value("I", 0)
self.server_ready_value = Value("I", 0)
self.stop_event = multiprocessing.Event()

def start(self, args):
if self.process is not None:
raise Exception("API Server is already running")
self.process = Process(
target=run_api_server, args=(vars(args), self.interrupted, self.server_ready_value)
target=run_api_server, args=(vars(args), self.server_ready_value, self.stop_event)
)
self.process.start()
return self.process
Expand All @@ -468,14 +488,14 @@ def is_ready(self):
return self.server_ready_value.value == 1

def stop(self):
logger.info("Stopping API Server...")
self.interrupted.value = 1 # stop the thread
waiting_start_time = time.time()
while self.process.is_alive():
time.sleep(1)
logger.trace("Waiting for API Server to stop...")
if time.time() - waiting_start_time > 2:
logger.error("API Server did not stop in time. Terminating...")
logger.info("Stopping API Server process...")
if self.process.is_alive():
self.process.terminate()
self.process.join(timeout=2)
if self.process.is_alive():
logger.error("API Server process did not stop in time. Terminating forcefully...")
self.process.kill()
break
logger.trace("API Server stopped.")
logger.info("API Server process stopped.")

def has_stopped(self):
return self.stop_event.is_set()
83 changes: 42 additions & 41 deletions counterparty-core/counterpartycore/lib/api/api_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,59 +448,60 @@ class APIStatusPoller(threading.Thread):
"""Perform regular checks on the state of the backend and the database."""

def __init__(self):
self.last_database_check = 0
threading.Thread.__init__(self)
self.last_database_check = 0
self.stop_event = threading.Event()
self.stopping = False
self.stopped = False
self.db = None

def stop(self):
logger.info("Stopping API Status Poller...")
self.stopping = True
if self.db is not None:
self.db.close()
self.db = None
logger.info("Stopping API v1 Status Poller thread...")
self.stop_event.set()
self.join()
logger.info("API v1 Status Poller thread stopped.")

def run(self):
logger.debug("Starting API Status Poller...")
logger.info("Starting v1 API Status Poller thread...")
global CURRENT_API_STATUS_CODE, CURRENT_API_STATUS_RESPONSE_JSON # noqa: PLW0603
self.db = database.get_db_connection(config.API_DATABASE, read_only=True, check_wal=False)

interval_if_ready = 5 * 60 # 5 minutes
interval_if_not_ready = 60 # 1 minutes
interval_if_not_ready = 60 # 1 minute
interval = interval_if_not_ready

while not self.stopping: # noqa: E712
try:
# Check that backend is running, communicable, and caught up with the blockchain.
# Check that the database has caught up with bitcoind.
if (
time.time() - self.last_database_check > interval
): # Ten minutes since last check.
self.last_database_check = time.time()
if not config.FORCE and self.db is not None:
code = 11
check_backend_state()
code = 12
api_util.check_last_parsed_block(self.db, backend.bitcoind.getblockcount())
interval = interval_if_ready
except (BackendError, exceptions.DatabaseError) as e:
interval = interval_if_not_ready
exception_name = e.__class__.__name__
exception_text = str(e)
logger.debug("API Status Poller: %s", exception_text)
jsonrpc_response = jsonrpc.exceptions.JSONRPCServerError(
message=exception_name, data=exception_text
)
CURRENT_API_STATUS_CODE = code
CURRENT_API_STATUS_RESPONSE_JSON = jsonrpc_response.json.encode()
else:
CURRENT_API_STATUS_CODE = None
CURRENT_API_STATUS_RESPONSE_JSON = None
if not self.stopping:
time.sleep(0.5) # sleep for 0.5 seconds
try:
while not self.stop_event.is_set():
try:
# Check that backend is running, communicable, and caught up with the blockchain.
# Check that the database has caught up with bitcoind.
if time.time() - self.last_database_check > interval:
self.last_database_check = time.time()
if not config.FORCE and self.db is not None:
code = 11
check_backend_state()
code = 12
api_util.check_last_parsed_block(
self.db, backend.bitcoind.getblockcount()
)
interval = interval_if_ready
except (BackendError, exceptions.DatabaseError) as e:
interval = interval_if_not_ready
exception_name = e.__class__.__name__
exception_text = str(e)
logger.debug("API Status Poller: %s", exception_text)
jsonrpc_response = jsonrpc.exceptions.JSONRPCServerError(
message=exception_name, data=exception_text
)
CURRENT_API_STATUS_CODE = code
CURRENT_API_STATUS_RESPONSE_JSON = jsonrpc_response.json.encode()
else:
CURRENT_API_STATUS_CODE = None
CURRENT_API_STATUS_RESPONSE_JSON = None
self.stop_event.wait(timeout=0.5)
finally:
if self.db is not None:
self.db.close()
self.db = None
logger.info("API v1 Status Poller thread stopped.")


class APIServer(threading.Thread):
Expand All @@ -515,15 +516,15 @@ def __init__(self, db=None):
sentry.init()

def stop(self):
logger.info("Stopping API Server v1...")
logger.info("Stopping API Server v1 thread...")
if self.connection_pool:
self.connection_pool.close()
if self.server:
self.server.shutdown()
self.join()

def run(self):
logger.info("Starting API Server v1...")
logger.info("Starting API Server v1 thread...")
app = flask.Flask(__name__)
auth = HTTPBasicAuth()

Expand Down
Loading

0 comments on commit 2521b0b

Please sign in to comment.