Skip to content

Commit

Permalink
Add task manager for models and simulations (asreview#1861)
Browse files Browse the repository at this point in the history
  • Loading branch information
cskaandorp authored Oct 23, 2024
1 parent 19620cf commit 7c7189b
Show file tree
Hide file tree
Showing 10 changed files with 531 additions and 90 deletions.
17 changes: 14 additions & 3 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ npm ci
### Setting up servers

The best development workflow for the ASReview frontend and backend makes use
of 2 simultanously running servers. One serves the Python server with the
Flask app and the other the Node server with the frontend.
of 3 simultanously running servers:

1. A Python server with the the Flask app.
2. A Python server for a task manager that manages running models after records are labeled.
3. A Node server for the frontend.

Open a command line interface (e.g. Terminal or CMD.exe) and navigate to
`asreview/webapp`. Start the Flask app with
Expand All @@ -35,7 +38,15 @@ cd asreview/webapp
flask run --debug
```

Next, open a second command line interface and navigate to `asreview/webapp`.
Next, open a second command line interface and run:

```sh
asreview task-manager
```

This starts the task manager (by default on `localhost`, port 5101). Use the `--verbose` flag to view logging messages.

Next, open a third command line interface and navigate to `asreview/webapp`.
Start the local front end application running on a Node server.

```sh
Expand Down
78 changes: 46 additions & 32 deletions asreview/webapp/api/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
import json
import logging
import shutil
import subprocess
import sys
import socket
import tempfile
import time
from dataclasses import asdict
Expand Down Expand Up @@ -56,6 +55,10 @@
from asreview.webapp.authentication.decorators import current_user_projects
from asreview.webapp.authentication.decorators import project_authorization
from asreview.webapp.authentication.models import Project
from asreview.webapp.task_manager.task_manager import DEFAULT_TASK_MANAGER_HOST
from asreview.webapp.task_manager.task_manager import DEFAULT_TASK_MANAGER_PORT
from asreview.webapp.tasks import run_model
from asreview.webapp.tasks import run_simulation
from asreview.webapp.utils import asreview_path
from asreview.webapp.utils import get_project_path

Expand Down Expand Up @@ -89,6 +92,44 @@ def _fill_last_ranking(project, ranking):
state.add_last_ranking(records.values, None, ranking, None, None)


def _run_model(project):
# if there is a socket, it means we would like to delegate
# training / simulation to the queue manager,
# otherwise run training / simulation directly
simulation = project.config["mode"] == PROJECT_MODE_SIMULATE

if not current_app.testing:
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(
(
current_app.config.get(
"TASK_MANAGER_HOST", DEFAULT_TASK_MANAGER_HOST
),
current_app.config.get(
"TASK_MANAGER_PORT", DEFAULT_TASK_MANAGER_PORT
),
)
)
payload = {
"action": "insert",
"project_id": project.config["id"],
"simulation": simulation,
}
# send
client_socket.sendall(json.dumps(payload).encode("utf-8"))
except socket.error:
raise RuntimeError("Queue manager is not alive.")
finally:
client_socket.close()

else:
if simulation:
run_simulation(project)
else:
run_model(project)


# error handlers
@bp.errorhandler(ValueError)
def value_error(e):
Expand Down Expand Up @@ -715,14 +756,7 @@ def api_train(project): # noqa: F401
return jsonify({"success": True})

try:
run_command = [
sys.executable if sys.executable else "python",
"-m",
"asreview",
"web_run_model",
str(project.project_path),
]
subprocess.Popen(run_command)
_run_model(project)

except Exception as err:
logging.error(err)
Expand Down Expand Up @@ -797,19 +831,7 @@ def api_update_review_status(project, review_id):
_fill_last_ranking(project, "random")

if trigger_model and (pk or is_simulation):
try:
subprocess.Popen(
[
sys.executable if sys.executable else "python",
"-m",
"asreview",
"web_run_model",
str(project.project_path),
]
)

except Exception as err:
return jsonify(message=f"Failed to train the model. {err}"), 400
_run_model(project)

project.update_review(status=status)

Expand Down Expand Up @@ -1129,15 +1151,7 @@ def api_label_record(project, record_id): # noqa: F401
raise ValueError(f"Invalid label {label}")

if retrain_model:
subprocess.Popen(
[
sys.executable if sys.executable else "python",
"-m",
"asreview",
"web_run_model",
str(project.project_path),
]
)
_run_model(project)

if request.method == "POST":
return jsonify({"success": True})
Expand Down
11 changes: 11 additions & 0 deletions asreview/webapp/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,15 @@ def index_protected(**kwargs):
def static_from_root():
return send_from_directory("build", request.path[1:])

# The task manager needs to be configured if not in testing
if not (app.testing):
# I want people to be able to configure the host and port of
# the task manager by using the env var TASK_MANAGER_ENDPOINT.
# This var needs to provide host and port in 1 string.
endpoint = app.config.get("TASK_MANAGER_ENDPOINT", False)
if endpoint:
endpoint = endpoint.split(":")
app.config["TASK_MANAGER_HOST"] = endpoint[0]
app.config["TASK_MANAGER_PORT"] = int(endpoint[1])

return app
29 changes: 28 additions & 1 deletion asreview/webapp/entry_points/lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.
import argparse
import logging
import multiprocessing as mp
import os
import socket
import time
import webbrowser
from pathlib import Path
from threading import Timer
Expand All @@ -27,6 +29,7 @@
from asreview._deprecated import DeprecateAction
from asreview._deprecated import mark_deprecated_help_strings
from asreview.webapp.app import create_app
from asreview.webapp.task_manager.task_manager import run_task_manager
from asreview.webapp.utils import asreview_path
from asreview.webapp.utils import get_project_path
from asreview.webapp.utils import get_projects
Expand Down Expand Up @@ -177,10 +180,34 @@ def lab_entry_point(argv):

console.print("Press [bold]Ctrl+C[/bold] to exit.\n\n")

# spin up task manager
start_event = mp.Event()
process = mp.Process(
target=run_task_manager,
args=(
app.config.get("TASK_MANAGER_WORKERS", None),
app.config.get("TASK_MANAGER_HOST", None),
app.config.get("TASK_MANAGER_PORT", None),
app.config.get("TASK_MANAGER_VERBOSE", False),
start_event,
),
)
process.start()

# wait for the process to spin up
start_time = time.time()
while not start_event.is_set():
time.sleep(0.1)
if time.time() - start_time > 5:
console.print(
"\n\n[red]Error: unable to startup the model server.[/red]\n\n"
)
return

try:
waitress.serve(app, host=args.host, port=port, threads=6)
except KeyboardInterrupt:
console.print("\n\nShutting down server\n\n")
console.print("\n\nShutting down server.\n\n")


def _lab_parser():
Expand Down
61 changes: 61 additions & 0 deletions asreview/webapp/entry_points/task_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import argparse
import textwrap
import os

from asreview.webapp.task_manager.task_manager import setup_logging
from asreview.webapp.task_manager.task_manager import DEFAULT_TASK_MANAGER_HOST
from asreview.webapp.task_manager.task_manager import DEFAULT_TASK_MANAGER_PORT
from asreview.webapp.task_manager.task_manager import DEFAULT_TASK_MANAGER_WORKERS
from asreview.webapp.task_manager.task_manager import TaskManager


description = """\
This entry point launches an instance of ASReview's task manager. You can
specify the host, port, and number of workers using environment variables
(ASREVIEW_LAB_TASK_MANAGER_[WORKERS|HOST|PORT]) or CLI parameters, with
environment variables taking precedence.
"""


def _arg_parser(argv):
parser = argparse.ArgumentParser(description=textwrap.dedent(description).strip())
parser.add_argument(
"--verbose", action="store_true", help="Enable verbose logging."
)

parser.add_argument(
"--workers",
default=DEFAULT_TASK_MANAGER_WORKERS,
type=int,
help=f"Specify the number of workers, defaults to {DEFAULT_TASK_MANAGER_WORKERS}.",
)

parser.add_argument(
"--host",
default=DEFAULT_TASK_MANAGER_HOST,
type=str,
help=f"Specify the task manager's host, defaults to {DEFAULT_TASK_MANAGER_HOST}.",
)

parser.add_argument(
"--port",
default=DEFAULT_TASK_MANAGER_PORT,
type=int,
help=f"Port of task manager, defaults to {DEFAULT_TASK_MANAGER_PORT}.",
)

return parser.parse_args(argv)


def main(argv):
args = _arg_parser(argv)

verbose = os.getenv("ASREVIEW_LAB_TASK_MANAGER_VERBOSE", args.verbose)
setup_logging(verbose=verbose)

manager = TaskManager(
max_workers=os.getenv("ASREVIEW_LAB_TASK_MANAGER_WORKERS", args.workers),
host=os.getenv("ASREVIEW_LAB_TASK_MANAGER_HOST", args.host),
port=os.getenv("ASREVIEW_LAB_TASK_MANAGER_PORT", args.port),
)
manager.start_manager()
13 changes: 13 additions & 0 deletions asreview/webapp/task_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2019-2022 The ASReview Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
16 changes: 16 additions & 0 deletions asreview/webapp/task_manager/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()


class ProjectQueueModel(Base):
"""Queue model"""

__tablename__ = "queue"
id = Column(Integer, primary_key=True)
project_id = Column(String(250), nullable=False, unique=True)
simulation = Column(Boolean, nullable=False, default=False)
Loading

0 comments on commit 7c7189b

Please sign in to comment.