From e3522620ff407bd9031e8dbe44cf8df8d3415a59 Mon Sep 17 00:00:00 2001 From: HAOCHENYE <21724054@zju.edu.cn> Date: Sat, 26 Oct 2024 20:04:14 +0000 Subject: [PATCH 1/3] [Enhance] Enhance volc --- .pre-commit-config.yaml | 2 +- opencompass/runners/volc.py | 91 +++++++++++++++++++++++++++-------- opencompass/utils/__init__.py | 1 + 3 files changed, 72 insertions(+), 22 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c5e5eea92..6e421dd22 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ exclude: | ) repos: - repo: https://github.com/PyCQA/flake8 - rev: 5.0.4 + rev: 6.1.0 hooks: - id: flake8 exclude: | diff --git a/opencompass/runners/volc.py b/opencompass/runners/volc.py index f076daa61..bf1b8b3ef 100644 --- a/opencompass/runners/volc.py +++ b/opencompass/runners/volc.py @@ -1,10 +1,13 @@ +import json import os import os.path as osp import random import re import subprocess import time +import warnings from functools import partial +from json import JSONDecodeError from typing import Any, Dict, List, Optional, Tuple import mmengine @@ -13,11 +16,22 @@ from mmengine.utils import track_parallel_progress from opencompass.registry import RUNNERS, TASKS -from opencompass.utils import get_logger +from opencompass.utils import StrEnum, get_logger from .base import BaseRunner +class VolcStatus(StrEnum): + success = 'Success' + failed = 'Failed' + cancelled = 'Cancelled' + exception = 'Exception' + killing = 'Killing' + success_holding = 'SuccessHolding' + failed_holding = 'FailedHolding' + queue = 'Queue' + + @RUNNERS.register_module() class VOLCRunner(BaseRunner): """Distributed runner based on Volcano Cloud Cluster (VCC). It will launch @@ -121,13 +135,13 @@ def _launch(self, task_cfg: ConfigDict, random_sleep: bool = True): conda_env_name = self.volcano_cfg['conda_env_name'] - shell_cmd = (f'source {self.volcano_cfg["bashrc_path"]}; ' + shell_cmd = (f"source {self.volcano_cfg['bashrc_path']}; " f'source activate {conda_env_name}; ') shell_cmd += f'export PYTHONPATH={pwd}:$PYTHONPATH; ' else: assert self.volcano_cfg.get('python_env_path') is not None shell_cmd = ( - f'export PATH={self.volcano_cfg["python_env_path"]}/bin:$PATH; ' # noqa: E501 + f"export PATH={self.volcano_cfg['python_env_path']}/bin:$PATH; " # noqa: E501 f'export PYTHONPATH={pwd}:$PYTHONPATH; ') huggingface_cache = self.volcano_cfg.get('huggingface_cache') @@ -183,8 +197,6 @@ def _launch(self, task_cfg: ConfigDict, random_sleep: bool = True): retry = self.retry while True: - if random_sleep: - time.sleep(random.randint(0, 10)) task_status, returncode = self._run_task(cmd, out_path, poll_interval=20) @@ -192,6 +204,8 @@ def _launch(self, task_cfg: ConfigDict, random_sleep: bool = True): if not (self._job_failed(task_status, output_paths)) \ or retry <= 0: break + if random_sleep: + time.sleep(random.randint(0, 10)) retry -= 1 finally: @@ -209,6 +223,7 @@ def _run_task(self, cmd, log_path, poll_interval): shell=True, text=True, capture_output=True) + f = open(log_path, 'w') pattern = r'(?<=task_id=).*(?=\n\n)' match = re.search(pattern, result.stdout) if match: @@ -217,32 +232,66 @@ def _run_task(self, cmd, log_path, poll_interval): '--format Status' log_cmd = f'volc ml_task logs --task {task_id} --instance worker_0' while True: - task_status = os.popen(ask_cmd).read() - pattern = r'(?<=\[{"Status":").*(?="}\])' - match = re.search(pattern, task_status) - if match: - task_status = match.group() - else: - task_status = 'Exception' + ret = subprocess.run(ask_cmd, + shell=True, + text=True, + capture_output=True) + try: + task_status = json.loads(ret.stdout)[0]['Status'] + except JSONDecodeError: + print('The task is not yet in the queue for ' + f"{ret.stdout}, waiting...") + time.sleep(poll_interval) + continue + finally: + if task_status not in VolcStatus.__members__.values(): + warnings.warn( + f"Unrecognized task status: {task_status}. " + 'This might be due to a newer version of Volc. ' + 'Please report this issue to the OpenCompass.') + + if task_status != VolcStatus.queue: + # Record task status when jobs is in Queue + f.write(ret.stdout or ret.stderr) + f.flush() + time.sleep(poll_interval) + continue + if self.debug: print(task_status) - logs = os.popen(log_cmd).read() - with open(log_path, 'w', encoding='utf-8') as f: - f.write(logs) + + # TODO: volc log cmd is broken now, this should be double + # checked when log cli is fixed + ret = subprocess.run(log_cmd, + shell=True, + text=True, + capture_output=True) + + f.write(log_cmd) + f.write(ret.stdout) + f.flush() + time.sleep(poll_interval) + if task_status in [ - 'Success', 'Failed', 'Cancelled', 'Exception', - 'Killing', 'SuccessHolding', 'FailedHolding' + VolcStatus.success, + VolcStatus.success_holding, ]: break - time.sleep(poll_interval) + else: + time.sleep(poll_interval) + continue else: - task_status = 'Exception' + print(f"Failed to submit the task for:{result.stdout}") + task_status = VolcStatus.exception + f.write(f"{result.stdout}: {result.returncode}") + f.close() return task_status, result.returncode def _job_failed(self, task_status: str, output_paths: List[str]) -> bool: - return task_status != 'Success' or not all( - osp.exists(output_path) for output_path in output_paths) + return task_status not in [ + VolcStatus.success, VolcStatus.success_holding + ] or not all(osp.exists(output_path) for output_path in output_paths) def _choose_flavor(self, num_gpus): config_path = self.volcano_cfg.volcano_config_path diff --git a/opencompass/utils/__init__.py b/opencompass/utils/__init__.py index 0762b2d87..24ef9f5c6 100644 --- a/opencompass/utils/__init__.py +++ b/opencompass/utils/__init__.py @@ -5,6 +5,7 @@ from .datasets import * # noqa from .dependency import * # noqa from .dict_postprocessors import * # noqa +from .enum_extention import StrEnum # noqa from .file import * # noqa from .fileio import * # noqa from .lark import * # noqa From 9ba52c986783cab1fff3e984c940cc6813024fec Mon Sep 17 00:00:00 2001 From: HAOCHENYE <21724054@zju.edu.cn> Date: Fri, 1 Nov 2024 07:54:39 +0000 Subject: [PATCH 2/3] [dev] update utils --- opencompass/runners/volc.py | 8 +++---- opencompass/utils/enum_extention.py | 33 +++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) create mode 100644 opencompass/utils/enum_extention.py diff --git a/opencompass/runners/volc.py b/opencompass/runners/volc.py index bf1b8b3ef..5e9439e61 100644 --- a/opencompass/runners/volc.py +++ b/opencompass/runners/volc.py @@ -240,13 +240,13 @@ def _run_task(self, cmd, log_path, poll_interval): task_status = json.loads(ret.stdout)[0]['Status'] except JSONDecodeError: print('The task is not yet in the queue for ' - f"{ret.stdout}, waiting...") + f'{ret.stdout}, waiting...') time.sleep(poll_interval) continue finally: if task_status not in VolcStatus.__members__.values(): warnings.warn( - f"Unrecognized task status: {task_status}. " + f'Unrecognized task status: {task_status}. ' 'This might be due to a newer version of Volc. ' 'Please report this issue to the OpenCompass.') @@ -281,9 +281,9 @@ def _run_task(self, cmd, log_path, poll_interval): time.sleep(poll_interval) continue else: - print(f"Failed to submit the task for:{result.stdout}") + print(f'Failed to submit the task for:{result.stdout}') task_status = VolcStatus.exception - f.write(f"{result.stdout}: {result.returncode}") + f.write(f'{result.stdout}: {result.returncode}') f.close() return task_status, result.returncode diff --git a/opencompass/utils/enum_extention.py b/opencompass/utils/enum_extention.py new file mode 100644 index 000000000..6fcde7790 --- /dev/null +++ b/opencompass/utils/enum_extention.py @@ -0,0 +1,33 @@ +# Copied from the source code of `enum.StrEnum` to support Python<3.11 +from enum import Enum + + +class StrEnum(str, Enum): + """Enum where members are also (and must be) strings.""" + + def __new__(cls, *values): + """values must already be of type `str`""" + if len(values) > 3: + raise TypeError('too many arguments for str(): %r' % (values, )) + if len(values) == 1: + # it must be a string + if not isinstance(values[0], str): + raise TypeError('%r is not a string' % (values[0], )) + if len(values) >= 2: + # check that encoding argument is a string + if not isinstance(values[1], str): + raise TypeError('encoding must be a string, not %r' % + (values[1], )) + if len(values) == 3: + # check that errors argument is a string + if not isinstance(values[2], str): + raise TypeError('errors must be a string, not %r' % + (values[2])) + value = str(*values) + member = str.__new__(cls, value) + member._value_ = value + return member + + def _generate_next_value_(name, start, count, last_values): + """Return the lower-cased version of the member name.""" + return name.lower() From 56d33466e7414e1d662d4014dd7d3ea3f3052a54 Mon Sep 17 00:00:00 2001 From: HAOCHENYE <21724054@zju.edu.cn> Date: Fri, 1 Nov 2024 08:22:41 +0000 Subject: [PATCH 3/3] [dev] update get task_staus --- opencompass/runners/volc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opencompass/runners/volc.py b/opencompass/runners/volc.py index 5e9439e61..5381b2aaa 100644 --- a/opencompass/runners/volc.py +++ b/opencompass/runners/volc.py @@ -237,7 +237,8 @@ def _run_task(self, cmd, log_path, poll_interval): text=True, capture_output=True) try: - task_status = json.loads(ret.stdout)[0]['Status'] + task_status = json.loads( + ret.stdout.split()[-1])[0]['Status'] except JSONDecodeError: print('The task is not yet in the queue for ' f'{ret.stdout}, waiting...')