Skip to content

Commit

Permalink
Merge pull request #3 from iavofficial/update/V1.7.9
Browse files Browse the repository at this point in the history
Update to V1.7.9
  • Loading branch information
iavuser authored Jan 29, 2025
2 parents 60b9f2a + 183a3b9 commit e5caf82
Show file tree
Hide file tree
Showing 27 changed files with 2,271 additions and 401 deletions.
17 changes: 9 additions & 8 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@ __pycache__/
.vscode/
*.log
*.egg-info/
*.bak
*.py.bak

# test database
Routing_Api/db.sqlite3

# User-specific files
*.suo
*.user

# Environments
.env
.venv
venv
.vs
.temp

# Python enviroments
env/
venv/
ENV/
env.bak/
venv.bak/
04_API/Routing_Api/Routing_Api/Routing_Api/Mobis/lib/
Routing_Api/Routing_Api/Routing_Api/Mobis/lib/
.idea/

routing/test_profile_stats_dump.txt
routing/test_profile_graph.png
routing/test_profile_stats_cumtime.txt
Expand Down
4 changes: 2 additions & 2 deletions Celerybeat.dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.9.16
FROM python:3.12.0

WORKDIR /routing
COPY routing ./
Expand All @@ -13,7 +13,7 @@ WORKDIR /www
COPY Routing_Api ./
RUN python -m compileall .

ENV PYTHONUNBUFFERED 1
ENV PYTHONUNBUFFERED=1
ENV IS_CELERY_APP yes

ENTRYPOINT ["celery", "-A", "Routing_Api", "beat", "-l", "INFO"]
4 changes: 2 additions & 2 deletions Celeryworker.dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.9.7
FROM python:3.12.0

WORKDIR /routing
COPY routing ./
Expand All @@ -13,7 +13,7 @@ WORKDIR /www
COPY Routing_Api ./
RUN python -m compileall .

ENV PYTHONUNBUFFERED 1
ENV PYTHONUNBUFFERED=1
ENV IS_CELERY_APP yes

ENTRYPOINT ["celery", "-A", "Routing_Api", "worker", "-l", "INFO"]
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
FROM python:3.9.16
FROM python:3.12.0

WORKDIR /routing
COPY routing ./
RUN python -m compileall .
RUN pip install --no-cache-dir .
RUN apt update
RUN apt install -y iputils-ping

WORKDIR /www
COPY requirements.txt .
Expand All @@ -16,7 +18,7 @@ WORKDIR /www
COPY Routing_Api ./
RUN python -m compileall .

ENV PYTHONUNBUFFERED 1
ENV PYTHONUNBUFFERED=1

# default timeout of 30 sec may be too low
ENTRYPOINT ["gunicorn", "Routing_Api.wsgi", "--bind", "0.0.0.0:8080", "-k", "gevent", "--timeout", "180"]
209 changes: 78 additions & 131 deletions Routing_Api/Routing_Api/Mobis/EventBus.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pika
from pika import ConnectionParameters, BlockingConnection, PlainCredentials, exceptions, BasicProperties, DeliveryMode
import os
import time
from _thread import start_new_thread
import logging
import threading
Expand All @@ -9,7 +8,7 @@

LOGGER = logging.getLogger('Mobis.EventBus')

HOST = os.environ.get('RABBITMQ_HOST','localhost')
HOST = os.environ.get('RABBITMQ_HOST','rabbitmq')
USER = os.environ.get('RABBITMQ_USERNAME','guest')
PASS = os.environ.get('RABBITMQ_PASSWORD','guest')
VHOST = os.environ.get('RABBITMQ_VHOST','/').replace("'", "").replace('"','')
Expand All @@ -20,125 +19,75 @@
EXCHANGE = 'busnow_event_bus'

if HOST:
credentials = pika.PlainCredentials(USER, PASS)
parameters = pika.ConnectionParameters(host=HOST,
virtual_host=VHOST,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300, # after this, a blocked connection is closed
connection_attempts=(5*60)//5, # listen for 5 minutes, then throw an error
retry_delay=5)

class Publisher():
# from https://groups.google.com/forum/#!searchin/pika-python/publish$20async%7Csort:date/pika-python/ZsH924c22e0/R3-Sag2DAgAJ
# Blocking publisher
credentials = PlainCredentials(USER, PASS)
parameters = ConnectionParameters(host=HOST,
virtual_host=VHOST,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300, # after this, a blocked connection is closed
connection_attempts=(5*60)//5, # listen for 5 minutes, then throw an error
retry_delay=5)

class AsyncPublisher(threading.Thread):
"""
A threaded publisher, which opens a connection to the RabbitMQ instance. Responsible for publishing messages
and managing the connection (e.g., creating a channel before sending a message and closing it afterwards).
Provides an asynchronous publisher for RabbitMQ. It inherits from `threading.Thread`,
allowing it to run the publishing process in a separate thread.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.daemon = True
self.is_running = True
self.name = "AsyncPublisher"

def __init__(self):
self._connection = None
self._parameters = parameters

self._thread = None
self._stopping = False

# Automatically close the connection to RabbitMQ when the program exits
atexit.register(self.close)

def start(self):
""" Starts a background thread that runs the `run_loop` method. """

self._thread = threading.Thread(target=self.run_loop)
self._thread.setDaemon(True)
self._thread.start()
LOGGER.debug("STARTING PUBLISHER")
self.connection = BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.confirm_delivery()
atexit.register(self.stop)

def run_loop(self):
def run(self):
"""
Runs in the background thread and maintains a connection to the RabbitMQ broker.
Probes whether the connection is closed at regular intervals and tries to reconnect if it is.
Closes the connection, when the `close` method is called.
Overrides the `threading.Thread.run()` method. This is the main loop of the thread where we continuously
process data events as long as the thread is running.
"""
while self.is_running:
self.connection.process_data_events(time_limit=1)

def _publish(self, routing_key, message, exchange=EXCHANGE):
try:
self.channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=message,
properties=BasicProperties(content_type='application/json',
delivery_mode=DeliveryMode.Transient))
LOGGER.info(f"Published {routing_key} with body: {message}")
except exceptions.UnroutableError as error:
LOGGER.exception(f"Message could not be confirmed: {error}")

self._connection = self.connect()
while not self._stopping:
try:
if self._connection.is_closed:
LOGGER.debug("RECONNECTING")
self._connection = self.connect()
self._connection.sleep(10)
except pika.exceptions.ConnectionClosed as e:
LOGGER.warning("Connection was closed: %s", e)
except Exception as e:
LOGGER.warning("Error maintaing connection to rabbit: %s", e)
if self._connection.is_open:
self._connection.close()
self._connection.close()
LOGGER.debug("DISCONNECTED")

def connect(self):
""" Creates a blocking connection to RabbitMQ based on the configured parameters. """

return pika.BlockingConnection(parameters=self._parameters)

def create_channel(self):
def publish(self, message, routing_key, exchange=EXCHANGE):
"""
Creates a channel to an exchange. Channels cannot exist without connections. When a connection is closed, so are all channels on it.
Public method used to publish messages in a thread-safe manner
to ensure the `_publish` function is executed within the connection's IO loops context by calling
back into the IO loop correctly from worker threads with `add_callback_threadsafe`.
"""

return self._connection.channel()

def close_channel(self, channel):
channel.close()
#LOGGER.debug("channel CLOSED")

def publish(self, message, routing_key, exchange=EXCHANGE):
self.connection.add_callback_threadsafe(lambda: self._publish(routing_key, message, exchange))

def stop(self):
"""
Creates a channel and ublishes a message to a RabbitMQ exchange with the given routing key.
Adds a timeout to close the channel after the message is sent.
Attributes:
message (str): The message to be published.
routing_key (str): The routing key for the message. Used by the exchange to determin which queue to route the message to.
exchange (str): The exchange to use for publishing the message. Defaults to `EXCHANGE` (= busnow_event_bus).
Stops the running thread, processes any remaining data events and closes the connection if it's open.
"""

channel = None
try:
channel = self.create_channel()
except Exception as e:
# try to reconnect
LOGGER.error("No channel due to exception: %s", e)
LOGGER.info("try to reconnect")
self._connection = self.connect()
channel = self.create_channel()

LOGGER.debug("channel CREATED")
LOGGER.info(f'sending {routing_key} with body: {message}')
channel.publish(exchange=exchange, routing_key=routing_key, body=message)
LOGGER.debug("PUBLISHED")
self._connection.add_timeout(0, partial(self.close_channel, channel))

def close(self):
""" Closes the connection to RabbitMQ and closes the running thread. """

self._stopping = True
LOGGER.debug("DISCONNECTING")
# wait for background thread to close connection
self._thread.join()
self.is_running = False
# Wait until all the data events have been processed
self.connection.process_data_events(time_limit=1)
if self.connection.is_open:
self.connection.close()

class UnthreadedPublisher():
"""
An unthreaded publisher, which opens a connection to the RabbitMQ instance. Responsible for publishing messages
and managing the connection (e.g., creating a channel before sending a message and closing it afterwards).
"""
def connect(self):
# print('UnthreadedPublisher')
# print(parameters.blocked_connection_timeout)

return pika.BlockingConnection(parameters=parameters)
return BlockingConnection(parameters=parameters)

def create_channel(self):
return self._connection.channel()
Expand All @@ -149,13 +98,12 @@ def close_channel(self, channel):
def publish(self, message, routing_key, exchange=EXCHANGE):
self._connection = self.connect()
channel = self.create_channel()
LOGGER.debug("channel CREATED")
LOGGER.info(f'sending {routing_key} with body: {message}')
channel.publish(exchange=exchange, routing_key=routing_key, body=message,
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=1))
LOGGER.debug("PUBLISHED")
self._connection.add_timeout(0, partial(self.close_channel, channel))
channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=message,
properties=BasicProperties(content_type='text/plain', delivery_mode=1))
LOGGER.info(f"Published {routing_key} with body: {message}")
# self._connection.add_timeout(0, partial(self.close_channel, channel))
self._connection.close()

class Consumer():
Expand All @@ -178,20 +126,24 @@ def setup(self):
# Wait for RabbitMQ
try:
LOGGER.info('Trying to connect consumer to %s...', parameters)
self.connection = pika.BlockingConnection(parameters)
self.connection = BlockingConnection(parameters)
LOGGER.info('Success!')
except pika.exceptions.ConnectionClosed as err:
except exceptions.ConnectionClosed as err:
LOGGER.error('failed with: %s', err, exc_info=True)
raise err
self.channel = self.connection.channel()
LOGGER.info(f"Created channel {self.channel}")
self.channel.exchange_declare(exchange=EXCHANGE,
exchange_type='direct')
LOGGER.info(f"Declared exchange {EXCHANGE}")
self.result = self.channel.queue_declare(self.queue_name, exclusive=False, durable=True)
LOGGER.info(f"Declared queue {self.queue_name}")

for key in self.callbacks:
self.channel.queue_bind(routing_key=key,
exchange=EXCHANGE,
queue=self.queue_name)
LOGGER.info(f"Bound exchange {EXCHANGE} to queue {self.queue_name} with binding key {key}")

def tear_down(self):
if self.channel and self.channel.is_open:
Expand All @@ -202,40 +154,35 @@ def tear_down(self):
def register(self, routing_key, callback):
""" Registers a given callback to a `routing_key`. """

print('register()')
print(routing_key)
print(callback)
self.callbacks[routing_key] = callback
LOGGER.info(f"Registered {callback.__name__} function to {routing_key} routing key")

def run_loop(self):
"""
Runs in the background thread, maintains a connection to the RabbitMQ broker and consumes incoming messages.
"""
while True:
try:
print('self.setup() + self.channel.basic_consume()')
print(self.queue_name)
self.setup()
self.channel.basic_consume(
consumer_callback=self._callback, queue=self.queue_name)
print('basic_consume invoked')
print('invoking start_consuming()')
self.channel.basic_consume(str(self.queue_name), self._callback)
self.channel.start_consuming()
print('start_consuming() returned!')
except exceptions.ConnectionClosed as e:
LOGGER.error(f"Connection closed: {e}")
except exceptions.ChannelError as e:
LOGGER.error(f"Channel error: {e}")
except Exception as e:
LOGGER.error('rabbitmq consumer had the following error: %s', e)
LOGGER.exception(e) # Log entire exception traceback for unexpected errors
finally:
self.tear_down()
LOGGER.info("Tearing down connection and channel")
self.tear_down() # Pass connection and channel for cleanup


def _callback(self, ch, method, properties, body):
""" Invokes the callback function for the routing key from the message. """

print('info: incoming message')
print(method.routing_key)
print(body)
LOGGER.info("info: incoming message %s with body %s", method.routing_key, body)

LOGGER.info(f"Incoming message with body {body}")
if method.routing_key not in self.callbacks:
LOGGER.warning('%s not registered', method.routing_key)
LOGGER.warning(f"{method.routing_key} not registered!")
try:
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.callbacks[method.routing_key](ch, method, properties, body)
Expand Down
Loading

0 comments on commit e5caf82

Please sign in to comment.