From 7b97af6f9692002360b78178f62088fef8182c38 Mon Sep 17 00:00:00 2001 From: Jonathan de Bruin <jonathandebruinos@gmail.com> Date: Thu, 8 Jun 2023 16:29:16 +0200 Subject: [PATCH] Replace custom training lock by filelock (#1463) --- asreview/webapp/run_model.py | 32 +++------ asreview/webapp/sqlock.py | 127 ----------------------------------- 2 files changed, 9 insertions(+), 150 deletions(-) delete mode 100644 asreview/webapp/sqlock.py diff --git a/asreview/webapp/run_model.py b/asreview/webapp/run_model.py index 59beac24e..8596e233d 100755 --- a/asreview/webapp/run_model.py +++ b/asreview/webapp/run_model.py @@ -17,6 +17,9 @@ import sys from pathlib import Path +from filelock import FileLock +from filelock import Timeout + from asreview.models.balance import get_balance_model from asreview.models.classifiers import get_classifier from asreview.models.feature_extraction import get_feature_model @@ -25,7 +28,6 @@ from asreview.project import open_state from asreview.review.base import BaseReview from asreview.webapp.io import read_data -from asreview.webapp.sqlock import SQLiteLock def get_lab_reviewer( @@ -85,28 +87,14 @@ def train_model(project): """Add the new labels to the review and do the modeling. It uses a lock to ensure only one model is running at the same time. - Old results directories are deleted after 4 iterations. - - It has one argument on the CLI, which is the base project directory. """ logging.info(f"Project {project.project_path} - Train a new model for project") - # get file locations - lock_file = Path(project.project_path, "lock.sqlite") - # Lock so that only one training run is running at the same time. - # It doesn't lock the flask server/client. - with SQLiteLock( - lock_file, blocking=False, lock_name="training", project_id=project.project_id - ) as lock: - # If the lock is not acquired, another training instance is running. - if not lock.locked(): - logging.info( - f"Project {project.project_path} - " - "Cannot acquire lock, other instance running." - ) - return + lock = FileLock(Path(project.project_path, "training.lock"), timeout=0) + + with lock: # Check if there are new labeled records. with open_state(project.project_path) as state: @@ -120,11 +108,6 @@ def train_model(project): # Train the model. reviewer.train() - else: - logging.info( - f"Project {project.project_path} - No new labels since last run." - ) - return def main(argv): @@ -153,6 +136,9 @@ def main(argv): # change the project status to review project.update_review(status="review") + except Timeout: + logging.debug("Another iteration is training") + except Exception as err: # save the error to the project project.set_error(err, save_error_message=args.output_error) diff --git a/asreview/webapp/sqlock.py b/asreview/webapp/sqlock.py deleted file mode 100644 index 226e3cd29..000000000 --- a/asreview/webapp/sqlock.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright 2019-2022 The ASReview Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -import sqlite3 -from time import sleep - - -def _log_msg(msg, project_id=None): - """Return the log message with project_id.""" - if project_id is not None: - return f"Project {project_id} - {msg}" - - return msg - - -def get_db(db_file): - db = sqlite3.connect( - str(db_file), check_same_thread=False, detect_types=sqlite3.PARSE_DECLTYPES - ) - db.row_factory = sqlite3.Row - return db - - -def release_all_locks(db_file): - db = get_db(db_file) - db.execute("DELETE FROM locks;") - db.close() - - -class SQLiteLock: - def __init__( - self, - db_file, - lock_name="global", - blocking=False, - timeout=30, - polling_rate=0.4, - project_id=None, - ): - self.db_file = db_file - self.lock_name = lock_name - self.lock_acquired = False - self.timeout = timeout - self.polling_rate = polling_rate - self.project_id = project_id - - # acquire - self.acquire(blocking=blocking, timeout=timeout, polling_rate=polling_rate) - - def acquire(self, blocking=False, timeout=30, polling_rate=0.4): - if self.lock_acquired: - return - - if not os.path.isfile(self.db_file): - self.init_db() - - cur_timeout = 0 - while True and not self.lock_acquired: - db = get_db(self.db_file) - try: - db.isolation_level = "EXCLUSIVE" - db.execute("BEGIN EXCLUSIVE") - lock_entry = db.execute( - "SELECT * FROM locks WHERE name = ?", (self.lock_name,) - ).fetchone() - if lock_entry is None: - db.execute("INSERT INTO locks (name) VALUES (?)", (self.lock_name,)) - self.lock_acquired = True - logging.debug( - _log_msg(f"Acquired lock {self.lock_name}", self.project_id) - ) - db.commit() - except sqlite3.OperationalError as e: - logging.error( - _log_msg(f"Encountering operational error {e}", self.project_id) - ) - db.close() - if self.lock_acquired or not blocking: - break - cur_timeout += polling_rate - sleep(polling_rate) - - def init_db(self): - db = get_db(self.db_file) - db.executescript( - "DROP TABLE IF EXISTS locks; " "CREATE TABLE locks (name TEXT NOT NULL);" - ) - db.close() - - def locked(self): - return self.lock_acquired - - def __enter__(self): - return self - - def __exit__(self, *_, **__): - self.release() - - def release(self): - if not self.locked(): - return - while True: - db = get_db(self.db_file) - try: - db.execute("DELETE FROM locks WHERE name = ?", (self.lock_name,)) - db.commit() - db.close() - break - except sqlite3.OperationalError: - pass - db.close() - sleep(0.4) - logging.debug(_log_msg(f"Released lock {self.lock_name}", self.project_id)) - self.lock_acquired = False