diff --git a/CHANGELOG.md b/CHANGELOG.md index 99c3324..840e13e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Enh: namespace refactoring `gwb` -> `gwbackupy` - Enh #22: restore missing Gmail messages (`--restore-missing`) - Bug: fixed date filters parsing +- Enh #38: kill signals handling ## 0.5.0 diff --git a/gwbackupy/gmail.py b/gwbackupy/gmail.py index 3f3b923..b200880 100644 --- a/gwbackupy/gmail.py +++ b/gwbackupy/gmail.py @@ -16,6 +16,7 @@ str_trim, json_load, ) +from gwbackupy.process_helpers import is_killed, sleep_with_check, await_all_futures from gwbackupy.providers.gmail_service_wrapper_interface import ( GmailServiceWrapperInterface, ) @@ -167,7 +168,7 @@ def __get_all_messages_from_server( email = self.email logging.info("Get all message ids from server...") messages = self.__service_wrapper.get_messages(email, q) - logging.info(f"Message(s) count: len({len(messages)}") + logging.info(f"Message(s) count: {len(messages)}") # print(count) # print(messages) return messages @@ -261,15 +262,23 @@ def backup(self, quick_sync_days: int | None = None) -> bool: q = f"label:all after:{date.strftime('%Y/%m/%d')}" messages_from_server = self.__get_all_messages_from_server(q=q) logging.info("Processing...") - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.batch_size - ) as executor: - for message_id in messages_from_server: + executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size) + futures = [] + # submit message download jobs + for message_id in messages_from_server: + futures.append( executor.submit( self.__backup_messages, messages_from_server[message_id], stored_messages, ) + ) + # wait for jobs + if not await_all_futures(futures): + # cancel jobs + executor.shutdown(cancel_futures=True) + logging.warning("Process is killed") + return False logging.info("Processed") if self.__error_count > 0: @@ -280,6 +289,10 @@ def backup(self, quick_sync_days: int | None = None) -> bool: if quick_sync_days is None: logging.info("Mark as deletes...") for message_id in stored_messages: + if is_killed(): + logging.warning("Process is killed") + return False + links = stored_messages[message_id] logging.debug(f"{message_id} mark as deleted in local storage...") meta_link = links.get(0) @@ -529,10 +542,10 @@ def restore( logging.info(f"Number of potentially affected messages: {len(stored_messages)}") logging.info("Upload messages...") - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.batch_size - ) as executor: - for message_id in stored_messages: + executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size) + futures = [] + for message_id in stored_messages: + futures.append( executor.submit( self.__restore_message, message_id, @@ -542,6 +555,10 @@ def restore( labels_from_server, add_labels, ) + ) + if not await_all_futures(futures): + logging.warning("Process killed") + return False if self.__error_count > 0: logging.error(f"Messages uploaded with {self.__error_count} errors") diff --git a/gwbackupy/gwbackupy.py b/gwbackupy/gwbackupy.py index 1718963..9937824 100644 --- a/gwbackupy/gwbackupy.py +++ b/gwbackupy/gwbackupy.py @@ -213,8 +213,11 @@ def cli_startup(): exit(1) else: raise Exception("Unknown command") - except Exception: - logging.exception("CLI startup failed") + except KeyboardInterrupt: + logging.warning("Process is interrupted") + exit(1) + except BaseException: + logging.exception("CLI startup/run failed") exit(1) diff --git a/gwbackupy/helpers.py b/gwbackupy/helpers.py index 433bb05..d4fedbe 100644 --- a/gwbackupy/helpers.py +++ b/gwbackupy/helpers.py @@ -68,4 +68,4 @@ def is_rate_limit_exceeded(e) -> bool: def random_string(length: int = 8) -> str: - return "".join(random.choice(string.ascii_lowercase) for i in range(16)) + return "".join(random.choice(string.ascii_lowercase) for _ in range(length)) diff --git a/gwbackupy/process_helpers.py b/gwbackupy/process_helpers.py new file mode 100644 index 0000000..fde51f9 --- /dev/null +++ b/gwbackupy/process_helpers.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import logging +import signal +import threading +import time +from datetime import datetime + +is_killed_handling: bool = False +is_killed_value: bool = False +is_killed_lock = threading.RLock() + + +def is_killed_reset(): + global is_killed_lock + with is_killed_lock: + global is_killed_value + is_killed_value = False + + +def is_killed() -> bool: + global is_killed_lock + with is_killed_lock: + global is_killed_handling + if not is_killed_handling: + signal.signal(signal.SIGINT, is_killed_handling_func) + signal.signal(signal.SIGTERM, is_killed_handling_func) + is_killed_handling = True + global is_killed_value + return is_killed_value + + +def is_killed_handling_func(*args): + global is_killed_lock + with is_killed_lock: + global is_killed_value + logging.info("Handle kill signal") + is_killed_value = True + + +def sleep_with_check(seconds: float, sleep_step: float = 0.1) -> bool: + start = datetime.now().timestamp() + while datetime.now().timestamp() - start < seconds: + time.sleep(sleep_step) + if is_killed(): + return False + return True + + +def await_all_futures(futures: [], sleep_step: float = 0.1) -> bool: + while not is_killed(): + if not sleep_with_check(1, sleep_step=sleep_step): + return False + has_not_done = False + for f in futures: + if not f.done(): + has_not_done = True + break + if has_not_done: + continue + else: + break + return not is_killed() diff --git a/gwbackupy/providers/gapi_gmail_service_wrapper.py b/gwbackupy/providers/gapi_gmail_service_wrapper.py index dd61670..e3972e9 100644 --- a/gwbackupy/providers/gapi_gmail_service_wrapper.py +++ b/gwbackupy/providers/gapi_gmail_service_wrapper.py @@ -1,11 +1,11 @@ from __future__ import annotations import logging -import time from googleapiclient.errors import HttpError from gwbackupy.helpers import is_rate_limit_exceeded, random_string +from gwbackupy.process_helpers import sleep_with_check from gwbackupy.providers.gmail_service_provider import GmailServiceProvider from gwbackupy.providers.gmail_service_wrapper_interface import ( GmailServiceWrapperInterface, @@ -87,7 +87,7 @@ def get_message( logging.warning( f"{message_id} rate limit exceeded, sleeping for {self.try_sleep} seconds" ) - time.sleep(self.try_sleep) + sleep_with_check(self.try_sleep) else: raise e except TimeoutError as e: @@ -127,7 +127,7 @@ def create_label( logging.warning( f"Label ({name}) create rate limit exceeded, sleeping for {self.try_sleep} seconds" ) - time.sleep(self.try_sleep) + sleep_with_check(self.try_sleep) else: logging.warning(f"Next attempt to create label ({name})") except TimeoutError as e: @@ -174,6 +174,6 @@ def insert_message(self, email: str, data: dict[str, any]) -> dict[str, any]: logging.warning( f"Message insert rate limit exceeded, sleeping for {self.try_sleep} seconds" ) - time.sleep(self.try_sleep) + sleep_with_check(self.try_sleep) else: logging.warning(f"Next attempt to insert message") diff --git a/gwbackupy/tests/test_helpers.py b/gwbackupy/tests/test_helpers.py index b9edd96..1d44979 100644 --- a/gwbackupy/tests/test_helpers.py +++ b/gwbackupy/tests/test_helpers.py @@ -12,6 +12,7 @@ json_load, parse_date, is_rate_limit_exceeded, + random_string, ) @@ -88,3 +89,8 @@ def test_is_rate_limit_exceeded(): data2[0]["error"]["details"].append(dict()) e = HttpError(Resp(403, "Forbidden"), json.dumps(data2).encode("utf8")) assert not is_rate_limit_exceeded(e) + + +def test_random_string(): + for i in range(32): + assert len(random_string(i)) == i diff --git a/gwbackupy/tests/test_process_helpers.py b/gwbackupy/tests/test_process_helpers.py new file mode 100644 index 0000000..6c69733 --- /dev/null +++ b/gwbackupy/tests/test_process_helpers.py @@ -0,0 +1,80 @@ +import concurrent.futures +import threading +from datetime import datetime +from time import sleep + +from gwbackupy.process_helpers import ( + sleep_with_check, + is_killed, + is_killed_reset, + is_killed_handling_func, + await_all_futures, +) + + +def test_sleep_with_check(): + assert not is_killed() + start = datetime.now().timestamp() + sleep_with_check(0.3, sleep_step=0.05) + end = datetime.now().timestamp() + assert end - start < 1 + assert end - start >= 0.3 + assert not is_killed() + + +def do_kill(seconds: float = 0.3): + sleep(seconds) + is_killed_handling_func() + + +def test_sleep_with_check_with_kill(): + assert not is_killed() + try: + _thread = threading.Thread(target=do_kill) + _thread.start() + start = datetime.now().timestamp() + sleep_with_check(10, sleep_step=0.05) + end = datetime.now().timestamp() + assert end - start < 0.5 + assert end - start >= 0.3 + assert is_killed() + finally: + is_killed_reset() + + +def test_is_killed_reset(): + assert not is_killed() + try: + is_killed_handling_func() + assert is_killed() + finally: + is_killed_reset() + assert not is_killed() + + +def test_await_all_futures(): + assert not is_killed() + try: + executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) + futures = [] + start = datetime.now().timestamp() + for i in range(3): + futures.append(executor.submit(lambda x: sleep(0.1))) + assert await_all_futures(futures, sleep_step=0.05) + end = datetime.now().timestamp() + assert end - start >= 0.3 + assert end - start < 3 + + futures.clear() + start = datetime.now().timestamp() + for i in range(3): + futures.append(executor.submit(lambda x: sleep(10))) + _thread = threading.Thread(target=do_kill) + _thread.start() + assert not await_all_futures(futures) + end = datetime.now().timestamp() + assert end - start >= 0.3 + assert end - start < 3 + is_killed_reset() + finally: + is_killed_reset()