From 2683f2e027c980b1b532e7ee72a917957f464ded Mon Sep 17 00:00:00 2001 From: Chengjie Li <109656400+ChengjieLi28@users.noreply.github.com> Date: Fri, 8 Sep 2023 15:15:24 +0800 Subject: [PATCH] FEAT: Add / remove a sub pool (#54) Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../xoscar/backends/communication/socket.py | 3 +- python/xoscar/backends/config.py | 5 + python/xoscar/backends/indigen/pool.py | 119 +++++++++++++++++- .../backends/indigen/tests/test_pool.py | 96 +++++++++++++- python/xoscar/backends/test/pool.py | 65 ++++++++++ 5 files changed, 285 insertions(+), 3 deletions(-) diff --git a/python/xoscar/backends/communication/socket.py b/python/xoscar/backends/communication/socket.py index e068e27a..5c2c5a26 100644 --- a/python/xoscar/backends/communication/socket.py +++ b/python/xoscar/backends/communication/socket.py @@ -96,7 +96,8 @@ async def close(self): self.writer.close() try: await self.writer.wait_closed() - except ConnectionResetError: # pragma: no cover + # TODO: May raise Runtime error: attach to another event loop + except (ConnectionResetError, RuntimeError): # pragma: no cover pass @property diff --git a/python/xoscar/backends/config.py b/python/xoscar/backends/config.py index c6138260..78afa25b 100644 --- a/python/xoscar/backends/config.py +++ b/python/xoscar/backends/config.py @@ -70,6 +70,11 @@ def add_pool_conf( for addr in external_address: mapping[addr] = internal_address + def remove_pool_config(self, process_index: int): + addr = self.get_external_address(process_index) + del self._conf["pools"][process_index] + del self._conf["mapping"][addr] + def get_pool_config(self, process_index: int): return self._conf["pools"][process_index] diff --git a/python/xoscar/backends/indigen/pool.py b/python/xoscar/backends/indigen/pool.py index 0021d3b9..ae34a899 100644 --- a/python/xoscar/backends/indigen/pool.py +++ b/python/xoscar/backends/indigen/pool.py @@ -35,7 +35,12 @@ from ..._utils import reset_id_random_seed from ...utils import dataslots, ensure_coverage from ..config import ActorPoolConfig -from ..message import CreateActorMessage +from ..message import ( + ControlMessage, + ControlMessageType, + CreateActorMessage, + new_message_id, +) from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler _is_windows: bool = sys.platform.startswith("win") @@ -284,6 +289,118 @@ async def _create_sub_pool( status_queue.put(process_status) await pool.join() + @classmethod + def _append_sub_pool( + cls, + config: ActorPoolConfig, + _process_index: int, + _status_queue: multiprocessing.Queue, + ): + coro = cls._create_sub_pool(config, _process_index, _status_queue) + asyncio.run(coro) + + async def append_sub_pool( + self, + label: str | None = None, + internal_address: str | None = None, + external_address: str | None = None, + env: dict | None = None, + modules: list[str] | None = None, + suspend_sigint: bool | None = None, + use_uvloop: bool | None = None, + logging_conf: dict | None = None, + start_method: str | None = None, + kwargs: dict | None = None, + ): + external_address = ( + external_address + or MainActorPool.get_external_addresses(self.external_address, n_process=1)[ + -1 + ] + ) + + # use last process index's logging_conf and use_uv_loop config if not provide + actor_pool_config = self._config.as_dict() + last_process_index = self._config.get_process_indexes()[-1] + last_logging_conf = actor_pool_config["pools"][last_process_index][ + "logging_conf" + ] + last_use_uv_loop = actor_pool_config["pools"][last_process_index]["use_uvloop"] + _logging_conf = logging_conf or last_logging_conf + _use_uv_loop = use_uvloop if use_uvloop is not None else last_use_uv_loop + + process_index = next(MainActorPool.process_index_gen(external_address)) + internal_address = internal_address or MainActorPool.gen_internal_address( + process_index, external_address + ) + + self._config.add_pool_conf( + process_index, + label, + internal_address, + external_address, + env, + modules, + suspend_sigint, + _use_uv_loop, + _logging_conf, + kwargs, + ) + + def start_pool_in_process(): + ctx = multiprocessing.get_context(method=start_method) + status_queue = ctx.Queue() + + with _suspend_init_main(): + process = ctx.Process( + target=MainActorPool._append_sub_pool, + args=(self._config, process_index, status_queue), + name=f"IndigenActorPool{process_index}", + ) + process.daemon = True + process.start() + + # wait for sub actor pool to finish starting + process_status = status_queue.get() + return process, process_status + + loop = asyncio.get_running_loop() + with futures.ThreadPoolExecutor(1) as executor: + create_pool_task = loop.run_in_executor(executor, start_pool_in_process) + process, process_status = await create_pool_task + + self._config.reset_pool_external_address( + process_index, process_status.external_addresses[0] + ) + self.attach_sub_process(process_status.external_addresses[0], process) + + control_message = ControlMessage( + message_id=new_message_id(), + address=self.external_address, + control_message_type=ControlMessageType.sync_config, + content=self._config, + ) + await self.handle_control_command(control_message) + + return process_status.external_addresses[0] + + async def remove_sub_pool( + self, external_address: str, timeout: float | None = None, force: bool = False + ): + process = self.sub_processes[external_address] + process_index = self._config.get_process_index(external_address) + await self.stop_sub_pool(external_address, process, timeout, force) + del self.sub_processes[external_address] + self._config.remove_pool_config(process_index) + + control_message = ControlMessage( + message_id=new_message_id(), + address=self.external_address, + control_message_type=ControlMessageType.sync_config, + content=self._config, + ) + await self.handle_control_command(control_message) + async def kill_sub_pool( self, process: multiprocessing.Process, force: bool = False ): diff --git a/python/xoscar/backends/indigen/tests/test_pool.py b/python/xoscar/backends/indigen/tests/test_pool.py index 91330f67..1d82e41e 100644 --- a/python/xoscar/backends/indigen/tests/test_pool.py +++ b/python/xoscar/backends/indigen/tests/test_pool.py @@ -25,7 +25,7 @@ import pytest -from .... import Actor, create_actor_ref, kill_actor +from .... import Actor, create_actor, create_actor_ref, get_pool_config, kill_actor from ....context import get_context from ....errors import ActorNotExist, NoIdleSlot, SendMessageFailed, ServerClosed from ....tests.core import require_ucx @@ -54,6 +54,7 @@ ) from ...pool import create_actor_pool from ...router import Router +from ...test.pool import TestMainActorPool from ..pool import MainActorPool, SubActorPool @@ -974,3 +975,96 @@ async def test_ucx(enable_internal_addr: bool): allocate_strategy=ProcessIndex(1), ) assert await ref1.foo(ref2, 3) == 6 + + +@pytest.mark.asyncio +async def test_append_sub_pool(): + start_method = ( + os.environ.get("POOL_START_METHOD", "forkserver") + if sys.platform != "win32" + else None + ) + pool = await create_actor_pool( # type: ignore + "127.0.0.1", + pool_cls=MainActorPool, + n_process=2, + subprocess_start_method=start_method, + ) + + async with pool: + config = await get_pool_config(pool.external_address) + assert len(config.get_process_indexes()) == 3 + + # test add a new sub pool + sub_external_address = await pool.append_sub_pool(env={"foo": "bar"}) + assert sub_external_address is not None + + config = await get_pool_config(pool.external_address) + assert len(config.get_process_indexes()) == 4 + process_index = config.get_process_indexes()[-1] + sub_config = config.get_pool_config(process_index) + assert sub_config["external_address"][0] == sub_external_address + assert sub_config["env"] is not None + assert sub_config["env"].get("foo", None) == "bar" + + class DummyActor(Actor): + @staticmethod + def test(): + return "this is dummy!" + + ref = await create_actor(DummyActor, address=sub_external_address) + assert ref is not None + assert ref.address == sub_external_address + assert await ref.test() == "this is dummy!" + + # test remove + await pool.remove_sub_pool(sub_external_address) + config = await get_pool_config(pool.external_address) + assert len(config.get_process_indexes()) == 3 + assert process_index not in config.get_process_indexes() + with pytest.raises(KeyError): + config.get_pool_config(process_index) + with pytest.raises(Exception): + await ref.test() + + +@pytest.mark.asyncio +async def test_test_pool_append_sub_pool(): + pool = await create_actor_pool( # type: ignore + f"test://127.0.0.1:{get_next_port()}", pool_cls=TestMainActorPool, n_process=1 + ) + + async with pool: + config = await get_pool_config(pool.external_address) + assert len(config.get_process_indexes()) == 2 + + # test add a new sub pool + sub_external_address = await pool.append_sub_pool(env={"foo": "bar"}) + assert sub_external_address is not None + + config = await get_pool_config(pool.external_address) + assert len(config.get_process_indexes()) == 3 + process_index = config.get_process_indexes()[-1] + sub_config = config.get_pool_config(process_index) + assert sub_config["external_address"][0] == sub_external_address + assert sub_config["env"] is not None + assert sub_config["env"].get("foo", None) == "bar" + + class DummyActor(Actor): + @staticmethod + def test(): + return "this is dummy!" + + ref = await create_actor(DummyActor, address=sub_external_address) + assert ref is not None + + assert ref.address == sub_external_address + assert await ref.test() == "this is dummy!" + + # test remove + await pool.remove_sub_pool(sub_external_address) + config = await get_pool_config(pool.external_address) + assert len(config.get_process_indexes()) == 2 + assert process_index not in config.get_process_indexes() + with pytest.raises(KeyError): + config.get_pool_config(process_index) diff --git a/python/xoscar/backends/test/pool.py b/python/xoscar/backends/test/pool.py index dd4fb4e9..d0892eab 100644 --- a/python/xoscar/backends/test/pool.py +++ b/python/xoscar/backends/test/pool.py @@ -22,6 +22,7 @@ from ..communication import DummyServer, gen_local_address from ..config import ActorPoolConfig from ..indigen.pool import MainActorPool, SubActorPool, SubpoolStatus +from ..message import ControlMessage, ControlMessageType, new_message_id from ..pool import ActorPoolType @@ -91,6 +92,70 @@ def _sync_pool_config(self, actor_pool_config: ActorPoolConfig): # test pool does not create routers, thus can skip this step pass + async def append_sub_pool( + self, + label: str | None = None, + internal_address: str | None = None, + external_address: str | None = None, + env: dict | None = None, + modules: list[str] | None = None, + suspend_sigint: bool | None = None, + use_uvloop: bool | None = None, + logging_conf: dict | None = None, + start_method: str | None = None, + kwargs: dict | None = None, + ): + external_address = ( + external_address + or TestMainActorPool.get_external_addresses( + self.external_address, n_process=1 + )[-1] + ) + + # use last process index's logging_conf and use_uv_loop config if not provide + actor_pool_config = self._config.as_dict() + last_process_index = self._config.get_process_indexes()[-1] + last_logging_conf = actor_pool_config["pools"][last_process_index][ + "logging_conf" + ] + last_use_uv_loop = actor_pool_config["pools"][last_process_index]["use_uvloop"] + _logging_conf = logging_conf or last_logging_conf + _use_uv_loop = use_uvloop if use_uvloop is not None else last_use_uv_loop + + process_index = next(TestMainActorPool.process_index_gen(external_address)) + internal_address = internal_address or TestMainActorPool.gen_internal_address( + process_index, external_address + ) + + self._config.add_pool_conf( + process_index, + label, + internal_address, + external_address, + env, + modules, + suspend_sigint, + _use_uv_loop, + _logging_conf, + kwargs, + ) + pool_task = asyncio.create_task( + TestMainActorPool.start_sub_pool(self._config, process_index) + ) + tasks, addresses = await TestMainActorPool.wait_sub_pools_ready([pool_task]) + + self.attach_sub_process(addresses[0][0], tasks[0]) + + control_message = ControlMessage( + message_id=new_message_id(), + address=self.external_address, + control_message_type=ControlMessageType.sync_config, + content=self._config, + ) + await self.handle_control_command(control_message) + + return addresses[0][0] + async def kill_sub_pool( self, process: multiprocessing.Process, force: bool = False ):