Skip to content

Commit

Permalink
FEAT: Add / remove a sub pool (#54)
Browse files Browse the repository at this point in the history
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
ChengjieLi28 and mergify[bot] authored Sep 8, 2023
1 parent b153a14 commit 2683f2e
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 3 deletions.
3 changes: 2 additions & 1 deletion python/xoscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ async def close(self):
self.writer.close()
try:
await self.writer.wait_closed()
except ConnectionResetError: # pragma: no cover
# TODO: May raise Runtime error: attach to another event loop
except (ConnectionResetError, RuntimeError): # pragma: no cover
pass

@property
Expand Down
5 changes: 5 additions & 0 deletions python/xoscar/backends/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def add_pool_conf(
for addr in external_address:
mapping[addr] = internal_address

def remove_pool_config(self, process_index: int):
addr = self.get_external_address(process_index)
del self._conf["pools"][process_index]
del self._conf["mapping"][addr]

def get_pool_config(self, process_index: int):
return self._conf["pools"][process_index]

Expand Down
119 changes: 118 additions & 1 deletion python/xoscar/backends/indigen/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@
from ..._utils import reset_id_random_seed
from ...utils import dataslots, ensure_coverage
from ..config import ActorPoolConfig
from ..message import CreateActorMessage
from ..message import (
ControlMessage,
ControlMessageType,
CreateActorMessage,
new_message_id,
)
from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler

_is_windows: bool = sys.platform.startswith("win")
Expand Down Expand Up @@ -284,6 +289,118 @@ async def _create_sub_pool(
status_queue.put(process_status)
await pool.join()

@classmethod
def _append_sub_pool(
cls,
config: ActorPoolConfig,
_process_index: int,
_status_queue: multiprocessing.Queue,
):
coro = cls._create_sub_pool(config, _process_index, _status_queue)
asyncio.run(coro)

async def append_sub_pool(
self,
label: str | None = None,
internal_address: str | None = None,
external_address: str | None = None,
env: dict | None = None,
modules: list[str] | None = None,
suspend_sigint: bool | None = None,
use_uvloop: bool | None = None,
logging_conf: dict | None = None,
start_method: str | None = None,
kwargs: dict | None = None,
):
external_address = (
external_address
or MainActorPool.get_external_addresses(self.external_address, n_process=1)[
-1
]
)

# use last process index's logging_conf and use_uv_loop config if not provide
actor_pool_config = self._config.as_dict()
last_process_index = self._config.get_process_indexes()[-1]
last_logging_conf = actor_pool_config["pools"][last_process_index][
"logging_conf"
]
last_use_uv_loop = actor_pool_config["pools"][last_process_index]["use_uvloop"]
_logging_conf = logging_conf or last_logging_conf
_use_uv_loop = use_uvloop if use_uvloop is not None else last_use_uv_loop

process_index = next(MainActorPool.process_index_gen(external_address))
internal_address = internal_address or MainActorPool.gen_internal_address(
process_index, external_address
)

self._config.add_pool_conf(
process_index,
label,
internal_address,
external_address,
env,
modules,
suspend_sigint,
_use_uv_loop,
_logging_conf,
kwargs,
)

def start_pool_in_process():
ctx = multiprocessing.get_context(method=start_method)
status_queue = ctx.Queue()

with _suspend_init_main():
process = ctx.Process(
target=MainActorPool._append_sub_pool,
args=(self._config, process_index, status_queue),
name=f"IndigenActorPool{process_index}",
)
process.daemon = True
process.start()

# wait for sub actor pool to finish starting
process_status = status_queue.get()
return process, process_status

loop = asyncio.get_running_loop()
with futures.ThreadPoolExecutor(1) as executor:
create_pool_task = loop.run_in_executor(executor, start_pool_in_process)
process, process_status = await create_pool_task

self._config.reset_pool_external_address(
process_index, process_status.external_addresses[0]
)
self.attach_sub_process(process_status.external_addresses[0], process)

control_message = ControlMessage(
message_id=new_message_id(),
address=self.external_address,
control_message_type=ControlMessageType.sync_config,
content=self._config,
)
await self.handle_control_command(control_message)

return process_status.external_addresses[0]

async def remove_sub_pool(
self, external_address: str, timeout: float | None = None, force: bool = False
):
process = self.sub_processes[external_address]
process_index = self._config.get_process_index(external_address)
await self.stop_sub_pool(external_address, process, timeout, force)
del self.sub_processes[external_address]
self._config.remove_pool_config(process_index)

control_message = ControlMessage(
message_id=new_message_id(),
address=self.external_address,
control_message_type=ControlMessageType.sync_config,
content=self._config,
)
await self.handle_control_command(control_message)

async def kill_sub_pool(
self, process: multiprocessing.Process, force: bool = False
):
Expand Down
96 changes: 95 additions & 1 deletion python/xoscar/backends/indigen/tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import pytest

from .... import Actor, create_actor_ref, kill_actor
from .... import Actor, create_actor, create_actor_ref, get_pool_config, kill_actor
from ....context import get_context
from ....errors import ActorNotExist, NoIdleSlot, SendMessageFailed, ServerClosed
from ....tests.core import require_ucx
Expand Down Expand Up @@ -54,6 +54,7 @@
)
from ...pool import create_actor_pool
from ...router import Router
from ...test.pool import TestMainActorPool
from ..pool import MainActorPool, SubActorPool


Expand Down Expand Up @@ -974,3 +975,96 @@ async def test_ucx(enable_internal_addr: bool):
allocate_strategy=ProcessIndex(1),
)
assert await ref1.foo(ref2, 3) == 6


@pytest.mark.asyncio
async def test_append_sub_pool():
start_method = (
os.environ.get("POOL_START_METHOD", "forkserver")
if sys.platform != "win32"
else None
)
pool = await create_actor_pool( # type: ignore
"127.0.0.1",
pool_cls=MainActorPool,
n_process=2,
subprocess_start_method=start_method,
)

async with pool:
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 3

# test add a new sub pool
sub_external_address = await pool.append_sub_pool(env={"foo": "bar"})
assert sub_external_address is not None

config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 4
process_index = config.get_process_indexes()[-1]
sub_config = config.get_pool_config(process_index)
assert sub_config["external_address"][0] == sub_external_address
assert sub_config["env"] is not None
assert sub_config["env"].get("foo", None) == "bar"

class DummyActor(Actor):
@staticmethod
def test():
return "this is dummy!"

ref = await create_actor(DummyActor, address=sub_external_address)
assert ref is not None
assert ref.address == sub_external_address
assert await ref.test() == "this is dummy!"

# test remove
await pool.remove_sub_pool(sub_external_address)
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 3
assert process_index not in config.get_process_indexes()
with pytest.raises(KeyError):
config.get_pool_config(process_index)
with pytest.raises(Exception):
await ref.test()


@pytest.mark.asyncio
async def test_test_pool_append_sub_pool():
pool = await create_actor_pool( # type: ignore
f"test://127.0.0.1:{get_next_port()}", pool_cls=TestMainActorPool, n_process=1
)

async with pool:
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 2

# test add a new sub pool
sub_external_address = await pool.append_sub_pool(env={"foo": "bar"})
assert sub_external_address is not None

config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 3
process_index = config.get_process_indexes()[-1]
sub_config = config.get_pool_config(process_index)
assert sub_config["external_address"][0] == sub_external_address
assert sub_config["env"] is not None
assert sub_config["env"].get("foo", None) == "bar"

class DummyActor(Actor):
@staticmethod
def test():
return "this is dummy!"

ref = await create_actor(DummyActor, address=sub_external_address)
assert ref is not None

assert ref.address == sub_external_address
assert await ref.test() == "this is dummy!"

# test remove
await pool.remove_sub_pool(sub_external_address)
config = await get_pool_config(pool.external_address)
assert len(config.get_process_indexes()) == 2
assert process_index not in config.get_process_indexes()
with pytest.raises(KeyError):
config.get_pool_config(process_index)
65 changes: 65 additions & 0 deletions python/xoscar/backends/test/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ..communication import DummyServer, gen_local_address
from ..config import ActorPoolConfig
from ..indigen.pool import MainActorPool, SubActorPool, SubpoolStatus
from ..message import ControlMessage, ControlMessageType, new_message_id
from ..pool import ActorPoolType


Expand Down Expand Up @@ -91,6 +92,70 @@ def _sync_pool_config(self, actor_pool_config: ActorPoolConfig):
# test pool does not create routers, thus can skip this step
pass

async def append_sub_pool(
self,
label: str | None = None,
internal_address: str | None = None,
external_address: str | None = None,
env: dict | None = None,
modules: list[str] | None = None,
suspend_sigint: bool | None = None,
use_uvloop: bool | None = None,
logging_conf: dict | None = None,
start_method: str | None = None,
kwargs: dict | None = None,
):
external_address = (
external_address
or TestMainActorPool.get_external_addresses(
self.external_address, n_process=1
)[-1]
)

# use last process index's logging_conf and use_uv_loop config if not provide
actor_pool_config = self._config.as_dict()
last_process_index = self._config.get_process_indexes()[-1]
last_logging_conf = actor_pool_config["pools"][last_process_index][
"logging_conf"
]
last_use_uv_loop = actor_pool_config["pools"][last_process_index]["use_uvloop"]
_logging_conf = logging_conf or last_logging_conf
_use_uv_loop = use_uvloop if use_uvloop is not None else last_use_uv_loop

process_index = next(TestMainActorPool.process_index_gen(external_address))
internal_address = internal_address or TestMainActorPool.gen_internal_address(
process_index, external_address
)

self._config.add_pool_conf(
process_index,
label,
internal_address,
external_address,
env,
modules,
suspend_sigint,
_use_uv_loop,
_logging_conf,
kwargs,
)
pool_task = asyncio.create_task(
TestMainActorPool.start_sub_pool(self._config, process_index)
)
tasks, addresses = await TestMainActorPool.wait_sub_pools_ready([pool_task])

self.attach_sub_process(addresses[0][0], tasks[0])

control_message = ControlMessage(
message_id=new_message_id(),
address=self.external_address,
control_message_type=ControlMessageType.sync_config,
content=self._config,
)
await self.handle_control_command(control_message)

return addresses[0][0]

async def kill_sub_pool(
self, process: multiprocessing.Process, force: bool = False
):
Expand Down

0 comments on commit 2683f2e

Please sign in to comment.