Skip to content

Commit

Permalink
Use uploader through injector
Browse files Browse the repository at this point in the history
  • Loading branch information
aeshub committed Mar 22, 2023
1 parent 3c5b07e commit 40b4978
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 13 deletions.
8 changes: 1 addition & 7 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
import time
from logging import Logger
from threading import Thread
Expand All @@ -21,7 +20,6 @@
RobotStatusPublisher,
)
from isar.state_machine.state_machine import StateMachine, main
from isar.storage.storage_interface import StorageInterface
from isar.storage.uploader import Uploader
from robot_interface.robot_interface import RobotInterface

Expand All @@ -33,6 +31,7 @@
logger: Logger = logging.getLogger("main")

state_machine: StateMachine = injector.get(StateMachine)
uploader: Uploader = injector.get(Uploader)
robot: RobotInterface = injector.get(RobotInterface)
queues: Queues = injector.get(Queues)

Expand All @@ -43,11 +42,6 @@
)
threads.append(state_machine_thread)

uploader: Uploader = Uploader(
upload_queue=queues.upload_queue,
storage_handlers=injector.get(List[StorageInterface]),
)

uploader_thread: Thread = Thread(
target=uploader.run, name="ISAR Uploader", daemon=True
)
Expand Down
17 changes: 17 additions & 0 deletions src/isar/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from isar.storage.local_storage import LocalStorage
from isar.storage.slimm_storage import SlimmStorage
from isar.storage.storage_interface import StorageInterface
from isar.storage.uploader import Uploader
from robot_interface.robot_interface import RobotInterface
from robot_interface.telemetry.mqtt_client import MqttClientInterface, MqttPublisher

Expand Down Expand Up @@ -141,6 +142,22 @@ def provide_state_machine(
)


class UploaderModule(Module):
@provider
@singleton
def provide_uploader(
self,
queues: Queues,
storage_handlers: List[StorageInterface],
mqtt_client: MqttClientInterface,
) -> Uploader:
return Uploader(
queues=queues,
storage_handlers=storage_handlers,
mqtt_publisher=mqtt_client,
)


class UtilitiesModule(Module):
@provider
@singleton
Expand Down
16 changes: 12 additions & 4 deletions src/isar/storage/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
from queue import Empty, Queue
from typing import List

from injector import inject

from isar.config.settings import settings
from isar.models.communication.queues import Queues
from isar.models.mission_metadata.mission_metadata import MissionMetadata
from isar.storage.storage_interface import StorageException, StorageInterface
from robot_interface.models.inspection.inspection import Inspection
from robot_interface.telemetry.mqtt_client import MqttClientInterface


@dataclass
Expand Down Expand Up @@ -36,28 +40,32 @@ def seconds_until_retry(self) -> int:


class Uploader:
@inject
def __init__(
self,
upload_queue: Queue,
queues: Queues,
storage_handlers: List[StorageInterface],
mqtt_publisher: MqttClientInterface,
max_wait_time: int = settings.UPLOAD_FAILURE_MAX_WAIT,
max_retry_attempts: int = settings.UPLOAD_FAILURE_ATTEMPTS_LIMIT,
) -> None:
"""Initializes the uploader.
Parameters
----------
upload_queue : Queue
Queue used for cross-thread communication.
queues : Queues
Queues used for cross-thread communication.
storage_handlers : List[StorageInterface]
List of handlers for different upload options
max_wait_time : float
The maximum wait time between two retries (exponential backoff)
max_retry_attempts : int
Maximum attempts to retry an upload when it fails
"""
self.upload_queue: Queue = upload_queue
self.upload_queue: Queue = queues.upload_queue
self.storage_handlers: List[StorageInterface] = storage_handlers
self.mqtt_publisher = mqtt_publisher

self.max_wait_time = max_wait_time
self.max_retry_attempts = max_retry_attempts
self._internal_upload_queue: List[UploaderQueueItem] = []
Expand Down
4 changes: 3 additions & 1 deletion tests/isar/state_machine/test_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from robot_interface.models.exceptions import RobotException
from robot_interface.models.mission import DriveToPose, Step, TakeImage
from robot_interface.models.mission.status import StepStatus
from robot_interface.telemetry.mqtt_client import MqttClientInterface
from tests.mocks.pose import MockPose
from tests.mocks.robot_interface import MockRobot
from tests.mocks.step import MockStep
Expand All @@ -38,8 +39,9 @@ class UploaderThread(object):
def __init__(self, injector) -> None:
self.injector: Injector = injector
self.uploader: Uploader = Uploader(
upload_queue=self.injector.get(Queues).upload_queue,
queues=self.injector.get(Queues),
storage_handlers=injector.get(List[StorageInterface]),
mqtt_publisher=injector.get(MqttClientInterface),
)
self._thread: Thread = Thread(target=self.uploader.run)
self._thread.daemon = True
Expand Down
4 changes: 3 additions & 1 deletion tests/isar/storage/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from isar.storage.storage_interface import StorageInterface
from isar.storage.uploader import Uploader
from robot_interface.models.inspection.inspection import ImageMetadata, Inspection
from robot_interface.telemetry.mqtt_client import MqttClientInterface

MISSION_ID = "some-mission-id"
ARBITRARY_IMAGE_METADATA = ImageMetadata(
Expand All @@ -31,8 +32,9 @@ class UploaderThread(object):
def __init__(self, injector) -> None:
self.injector: Injector = injector
self.uploader: Uploader = Uploader(
upload_queue=self.injector.get(Queues).upload_queue,
queues=self.injector.get(Queues),
storage_handlers=injector.get(List[StorageInterface]),
mqtt_publisher=injector.get(MqttClientInterface),
)
self._thread: Thread = Thread(target=self.uploader.run)
self._thread.daemon = True
Expand Down

0 comments on commit 40b4978

Please sign in to comment.