Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kamarton committed Jan 13, 2023
2 parents 61e103b + 9d3b8c2 commit 84af648
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Enh: namespace refactoring `gwb` -> `gwbackupy`
- Enh #22: restore missing Gmail messages (`--restore-missing`)
- Bug: fixed date filters parsing
- Enh #38: kill signals handling

## 0.5.0

Expand Down
35 changes: 26 additions & 9 deletions gwbackupy/gmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
str_trim,
json_load,
)
from gwbackupy.process_helpers import is_killed, sleep_with_check, await_all_futures
from gwbackupy.providers.gmail_service_wrapper_interface import (
GmailServiceWrapperInterface,
)
Expand Down Expand Up @@ -167,7 +168,7 @@ def __get_all_messages_from_server(
email = self.email
logging.info("Get all message ids from server...")
messages = self.__service_wrapper.get_messages(email, q)
logging.info(f"Message(s) count: len({len(messages)}")
logging.info(f"Message(s) count: {len(messages)}")
# print(count)
# print(messages)
return messages
Expand Down Expand Up @@ -261,15 +262,23 @@ def backup(self, quick_sync_days: int | None = None) -> bool:
q = f"label:all after:{date.strftime('%Y/%m/%d')}"
messages_from_server = self.__get_all_messages_from_server(q=q)
logging.info("Processing...")
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.batch_size
) as executor:
for message_id in messages_from_server:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size)
futures = []
# submit message download jobs
for message_id in messages_from_server:
futures.append(
executor.submit(
self.__backup_messages,
messages_from_server[message_id],
stored_messages,
)
)
# wait for jobs
if not await_all_futures(futures):
# cancel jobs
executor.shutdown(cancel_futures=True)
logging.warning("Process is killed")
return False
logging.info("Processed")

if self.__error_count > 0:
Expand All @@ -280,6 +289,10 @@ def backup(self, quick_sync_days: int | None = None) -> bool:
if quick_sync_days is None:
logging.info("Mark as deletes...")
for message_id in stored_messages:
if is_killed():
logging.warning("Process is killed")
return False

links = stored_messages[message_id]
logging.debug(f"{message_id} mark as deleted in local storage...")
meta_link = links.get(0)
Expand Down Expand Up @@ -529,10 +542,10 @@ def restore(

logging.info(f"Number of potentially affected messages: {len(stored_messages)}")
logging.info("Upload messages...")
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.batch_size
) as executor:
for message_id in stored_messages:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size)
futures = []
for message_id in stored_messages:
futures.append(
executor.submit(
self.__restore_message,
message_id,
Expand All @@ -542,6 +555,10 @@ def restore(
labels_from_server,
add_labels,
)
)
if not await_all_futures(futures):
logging.warning("Process killed")
return False

if self.__error_count > 0:
logging.error(f"Messages uploaded with {self.__error_count} errors")
Expand Down
7 changes: 5 additions & 2 deletions gwbackupy/gwbackupy.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,11 @@ def cli_startup():
exit(1)
else:
raise Exception("Unknown command")
except Exception:
logging.exception("CLI startup failed")
except KeyboardInterrupt:
logging.warning("Process is interrupted")
exit(1)
except BaseException:
logging.exception("CLI startup/run failed")
exit(1)


Expand Down
2 changes: 1 addition & 1 deletion gwbackupy/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ def is_rate_limit_exceeded(e) -> bool:


def random_string(length: int = 8) -> str:
return "".join(random.choice(string.ascii_lowercase) for i in range(16))
return "".join(random.choice(string.ascii_lowercase) for _ in range(length))
63 changes: 63 additions & 0 deletions gwbackupy/process_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

import logging
import signal
import threading
import time
from datetime import datetime

is_killed_handling: bool = False
is_killed_value: bool = False
is_killed_lock = threading.RLock()


def is_killed_reset():
global is_killed_lock
with is_killed_lock:
global is_killed_value
is_killed_value = False


def is_killed() -> bool:
global is_killed_lock
with is_killed_lock:
global is_killed_handling
if not is_killed_handling:
signal.signal(signal.SIGINT, is_killed_handling_func)
signal.signal(signal.SIGTERM, is_killed_handling_func)
is_killed_handling = True
global is_killed_value
return is_killed_value


def is_killed_handling_func(*args):
global is_killed_lock
with is_killed_lock:
global is_killed_value
logging.info("Handle kill signal")
is_killed_value = True


def sleep_with_check(seconds: float, sleep_step: float = 0.1) -> bool:
start = datetime.now().timestamp()
while datetime.now().timestamp() - start < seconds:
time.sleep(sleep_step)
if is_killed():
return False
return True


def await_all_futures(futures: [], sleep_step: float = 0.1) -> bool:
while not is_killed():
if not sleep_with_check(1, sleep_step=sleep_step):
return False
has_not_done = False
for f in futures:
if not f.done():
has_not_done = True
break
if has_not_done:
continue
else:
break
return not is_killed()
8 changes: 4 additions & 4 deletions gwbackupy/providers/gapi_gmail_service_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from __future__ import annotations

import logging
import time

from googleapiclient.errors import HttpError

from gwbackupy.helpers import is_rate_limit_exceeded, random_string
from gwbackupy.process_helpers import sleep_with_check
from gwbackupy.providers.gmail_service_provider import GmailServiceProvider
from gwbackupy.providers.gmail_service_wrapper_interface import (
GmailServiceWrapperInterface,
Expand Down Expand Up @@ -87,7 +87,7 @@ def get_message(
logging.warning(
f"{message_id} rate limit exceeded, sleeping for {self.try_sleep} seconds"
)
time.sleep(self.try_sleep)
sleep_with_check(self.try_sleep)
else:
raise e
except TimeoutError as e:
Expand Down Expand Up @@ -127,7 +127,7 @@ def create_label(
logging.warning(
f"Label ({name}) create rate limit exceeded, sleeping for {self.try_sleep} seconds"
)
time.sleep(self.try_sleep)
sleep_with_check(self.try_sleep)
else:
logging.warning(f"Next attempt to create label ({name})")
except TimeoutError as e:
Expand Down Expand Up @@ -174,6 +174,6 @@ def insert_message(self, email: str, data: dict[str, any]) -> dict[str, any]:
logging.warning(
f"Message insert rate limit exceeded, sleeping for {self.try_sleep} seconds"
)
time.sleep(self.try_sleep)
sleep_with_check(self.try_sleep)
else:
logging.warning(f"Next attempt to insert message")
6 changes: 6 additions & 0 deletions gwbackupy/tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
json_load,
parse_date,
is_rate_limit_exceeded,
random_string,
)


Expand Down Expand Up @@ -88,3 +89,8 @@ def test_is_rate_limit_exceeded():
data2[0]["error"]["details"].append(dict())
e = HttpError(Resp(403, "Forbidden"), json.dumps(data2).encode("utf8"))
assert not is_rate_limit_exceeded(e)


def test_random_string():
for i in range(32):
assert len(random_string(i)) == i
80 changes: 80 additions & 0 deletions gwbackupy/tests/test_process_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import concurrent.futures
import threading
from datetime import datetime
from time import sleep

from gwbackupy.process_helpers import (
sleep_with_check,
is_killed,
is_killed_reset,
is_killed_handling_func,
await_all_futures,
)


def test_sleep_with_check():
assert not is_killed()
start = datetime.now().timestamp()
sleep_with_check(0.3, sleep_step=0.05)
end = datetime.now().timestamp()
assert end - start < 1
assert end - start >= 0.3
assert not is_killed()


def do_kill(seconds: float = 0.3):
sleep(seconds)
is_killed_handling_func()


def test_sleep_with_check_with_kill():
assert not is_killed()
try:
_thread = threading.Thread(target=do_kill)
_thread.start()
start = datetime.now().timestamp()
sleep_with_check(10, sleep_step=0.05)
end = datetime.now().timestamp()
assert end - start < 0.5
assert end - start >= 0.3
assert is_killed()
finally:
is_killed_reset()


def test_is_killed_reset():
assert not is_killed()
try:
is_killed_handling_func()
assert is_killed()
finally:
is_killed_reset()
assert not is_killed()


def test_await_all_futures():
assert not is_killed()
try:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
futures = []
start = datetime.now().timestamp()
for i in range(3):
futures.append(executor.submit(lambda x: sleep(0.1)))
assert await_all_futures(futures, sleep_step=0.05)
end = datetime.now().timestamp()
assert end - start >= 0.3
assert end - start < 3

futures.clear()
start = datetime.now().timestamp()
for i in range(3):
futures.append(executor.submit(lambda x: sleep(10)))
_thread = threading.Thread(target=do_kill)
_thread.start()
assert not await_all_futures(futures)
end = datetime.now().timestamp()
assert end - start >= 0.3
assert end - start < 3
is_killed_reset()
finally:
is_killed_reset()

0 comments on commit 84af648

Please sign in to comment.