Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the anyio compatibility framework, so downstream integrations can use asyncio or trio? #1423

Open
Zac-HD opened this issue Feb 27, 2025 · 10 comments

Comments

@Zac-HD
Copy link

Zac-HD commented Feb 27, 2025

Over at Anthropic, we're enthusiastic about evals, and often use Inspect. My only complaint about this is that because Trio is our async framework of choice, and Inspect is built directly on asyncio, there's an awkward incompatibility in the underlying async runtime.

Fortunately, this is fixable: the anyio framework offers a very nice structured-concurrency interface (think asyncio.TaskGroup, or Trio-style), which can run seamlessly on top of either asyncio or Trio backends. In fact, anyio is already so widely used - for example, by libraries like httpx - that you're already (transitively) depending on it!

So... would you be interested in accepting PRs to incrementally migrate onto anyio? There'd be no change for asyncio users, but those of us on Trio would have a much easier time integrating Inspect into our workflows.

@jjallaire-aisi
Copy link
Collaborator

Yes, we'd definitely be down. This could actually dovetail very well with some work we are about to take on to improve our scheduler. My biggest concern is around user asyncio code continuing to work, but I am pretty sure this will be fine. To confirm my understanding of how this would go:

  1. We move from asyncio to anyio for all of our async code inside Inspect.

  2. The default backend of anyio is asyncio which means existing 3rd party solvers, tools, sandbox providers, etc. that call asycio directly will continue to function as-is.

  3. When Anthropic uses Inspect you will set the anyio backend to Trio -- this means that for Anthropic third-party Inspect code that uses asyncio directly won't work (but that's not a big deal)

  4. Third party Inspect code can (and should) update to use anyio so that it is compatible with the Trio backend.

Does all of that sound right?

So we're up for PRs but also would want to lean in and work directly on making this happen. I propose that I take a close look at this early-to-middle of next week and then ping you back to discuss particulars?

@Zac-HD
Copy link
Author

Zac-HD commented Feb 28, 2025

Yep, that all sounds right to me! Happy to chat about this whenever 😁

@jjallaire
Copy link
Collaborator

One not critical but possibly relevant issue I've discovered is that our terminal UI framework (textual) uses asycio. Use of textual is optional in Inspect (it is used for display=full but not for display=rich and on down) so this isn't a showstopper. Nevertheless, it might be useful to see if textual would also consider using anyio (as its use of asyncio is quite minimal to begin with).

@Zac-HD
Copy link
Author

Zac-HD commented Mar 2, 2025

Looks like this was discussed a while ago, and we can probably work on Textual support in parallel with Inspect 🙂

@jjallaire-aisi
Copy link
Collaborator

Have started moving things to anyio and it's going along well. As I transition from uses of asyncio.gather() (which I now clearly understand the weaknesses of!) I am wanting to make sure I write idiomatic anyio code. For example, in the inspect score command we currently run the scorers in parallel using asyncio.gather(), failing the entire operation if even one of the scorers throws an exception:

tasks: list[Awaitable[dict[str, SampleScore]]] = [
    run_score_task(state, Target(sample.target), scorers, progress)
    for (sample, state) in zip(log.samples, states)
]

scores = await asyncio.gather(*tasks)

To port this to anyio I created a tg_collect_or_raise() function used in place of gather:

scores = await tg_collect_or_raise(tasks)

Here's the implementation:

async def tg_collect_or_raise(coros: list[Awaitable[T]]) -> list[T]:
    """Runs all of the passed coroutines collecting their results.

    If an exception occurs in any of the tasks then the other tasks
    are cancelled and the exception is raised.

    Args:
       coros: List of coroutines

    Returns:
       List of results if no exceptions occurred.

    Raises:
       Exception: The first exception occurring in any of the coroutines.
    """
    results: list[tuple[int, T]] = []
    first_exception: Exception | None = None

    async with anyio.create_task_group() as tg:

        async def run_task(task: Awaitable[T], index: int) -> None:
            nonlocal first_exception
            try:
                result = await task
                results.append((index, result))
            except Exception as exc:
                if first_exception is None:
                    first_exception = exc
                tg.cancel_scope.cancel()

        for i, coro in enumerate(coros):
            tg.start_soon(run_task, coro, i)

    if first_exception:
        raise first_exception

    # sort results by original index and return just the values
    return [r for _, r in sorted(results)]

There are also more simplistic cases e.g. printing errors. For Docker container cleanup we used to do this:

tasks = [cleanup_fn(project, False) for project in projects]
results = await asyncio.gather(*tasks, return_exceptions=True)

# report errors
for result in results:
    if result is not None:
        print(f"Error cleaning up Docker environment: {result}")

And now we do this:

tasks = [cleanup_fn(project, False) for project in projects]
async with anyio.create_task_group() as tg:
    for task_coro in tasks:
        tg.start_soon(print_exceptions, task_coro, "cleaning up Docker environment")

async def print_exceptions(coro: Awaitable[T], context: str) -> None:
    try:
        await coro
    except Exception as ex:
        print(f"Error {context}: {ex}")

LMK if we are on the right track here....

@Zac-HD
Copy link
Author

Zac-HD commented Mar 3, 2025

Definitely on the right track! A few points of idiom, which might help in future:

  • With anyio or trio, coroutine objects are an implementation detail - you never want to call an async function without immediately awaiting it (or async for-ing, or async with-ing). That's why TaskGroup.start[_soon] accepts a callable; it avoids the possibility of beginning some work that isn't attached to the structure of nested taskgroups. Expect to use functools.partial() when you reach for something more complicated than a await map(...).
  • first_exception feels like a trap (because it hides other maybe-consequential error); I'd be inclined to just allow the taskgroup to raise an ExceptionGroup with however many errors you got. Note that the "cancel siblings on first failure" behavior is automatic! On the other hand making users handle exception groups is necessary once they're serious about async, but can be challenging until then so maybe throwing away this information is for the best in some cases. (I'll recommend groups in my upcoming PyCon talk, but the ecosystem is pretty immature on this point so 🤷‍♂)
  • results: list[tuple[int, T]] = [] does seem appropriate here, but in general be eager to reach for create_memory_object_stream() and the async with / async for syntax for using them.
  • I'd probably define print_exceptions(context: str, afn: Callable[P, Awaitable[Any]], *args: Unpack[P.args]) and then pass that to start_soon. Or just allow an ExceptionGroup to be raised, and then handle that.

@jjallaire
Copy link
Collaborator

jjallaire commented Mar 4, 2025

Incredibly helpful! Thank you for your patience :-)

Got it re: never creating an async function w/o managing it. I've re-written print_exceptions as:

async def print_exceptions(
    context: str,
    func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
    *args: Unpack[PosArgsT],
) -> None:
    try:
        await func(*args)
    except Exception as ex:
        print(f"Error {context}: {ex}")

But I kind of agree with you that the right call is probably just catching and printing the ExceptionGroup (less obfucating).

I have a new tg_collect() that makes the ExceptionGroup optional (it does seem like some investment on the calling side is required to handle these properly so in the short term absence of that I am defaulting to not raising the group):

async def tg_collect(
    funcs: list[Callable[[], Awaitable[T]]], exception_group: bool = False
) -> list[T]:
    """Runs all of the pased async functions and collects their results.

    The results will be returned in the same order as the input `funcs`.

    Args:
       funcs: List of async functions.
       exception_group: `True` to raise an ExceptionGroup or
          `False` (the default) to raise only the first exception.

    Returns:
       List of results of type T.

    Raises:
       Exception: First exception occurring in failed tasks
          (for `exception_group == False`, the default)
       ExceptionGroup: Exceptions that occurred in failed tasks
         (for `exception_group == True`)
    """
    try:
        results: list[tuple[int, T]] = []

        async with anyio.create_task_group() as tg:

            async def run_task(index: int) -> None:
                result = await funcs[index]()
                results.append((index, result))

            for i in range(0, len(funcs)):
                tg.start_soon(run_task, i)

        # sort results by original index and return just the values
        return [r for _, r in sorted(results)]
    except ExceptionGroup as ex:
        if exception_group:
            raise
        else:
            raise ex.exceptions[0]

Order matters in some of the call sites (thus results: list[tuple[int, T]]) but I see that when order doesn't matter we should just lean into create_memory_object_stream() (unless there is something I'm missing and it's straightforward to preserve an arbitrary order for streams).

Thanks again for you time and input here. Hoping to not just "make it work" but rather do everything the way it ought to be done for structured concurrency.

@jjallaire
Copy link
Collaborator

Update: Things continue to go well and streams are indeed great for many of our scenarios.

One thing we've identified which might require additional work is our interface to S3. We use s3fs which both makes use of asyncio as well as some really weird idioms for running async code from sync contexts. We could certainly have an initial limitation that s3 logging doesn't work w/ the Trio back end but I'm imagining you all do need s3?

I think the path to remedying this is not too bad -- an httpx client that uses botocore for auth/signing wouldn't be terribly hard to build. There is a bunch of fancy footwork in s3fs for handing files > 5gb (multipart uploads are required for that) and of course the usual retry stuff. Let us know if this is an immediate requirement as well as if you might have inclination to work on this. Our log client doesn't do that much (push and pull files, make and list directories, etc.).

This is something I think we will want to do anyway in the normal course of things so if it's not on your immediate wish list then be assured we'll probably get to it in the next few months anyway.

@Zac-HD
Copy link
Author

Zac-HD commented Mar 5, 2025

it does seem like some investment on the calling side is required to handle these properly so in the short term absence of that I am defaulting to not raising the group

Good call on both halves of this IMO; it's worth doing eventually but probably not right now.

Order matters in some of the call sites (thus results: list[tuple[int, T]]) but I see that when order doesn't matter we should just lean into create_memory_object_stream() (unless there is something I'm missing and it's straightforward to preserve an arbitrary order for streams).

I've written effectively-identical helpers, sometimes lists (or dicts) are just what you want!

I think about create_memory_object_stream() as being more about the streaming part than ordering; it's the right tool for the job when you want to produce something in one task (or many) and async for ... over results in another (or others). The .clone() method is very useful for multi-producer or multi-consumer setups, because it makes clean shutdown as easy as 1:1 cases.

If ordering matters I'd usually just have one producer task; if I had concurrent producers and also cared about order and needed streaming (so the list[tuple[int, T]] trick wasn't suitable), I'd do something like:

async def stream_to_ordered(
    send_stream: MemoryObjectSendStream[T],
    recv_stream: MemoryObjectReceiveStream[tuple[int, T]],
) -> None:
    next_idx = 0
    buffer: dict[int, T] = {}
    async with send_stream, recv_stream:
        async for idx, value in recv_stream:
            buffer[idx] = value
            while next_idx in buffer:
                await send_stream.send(buffer.pop(next_idx))
                next_idx += 1

One thing we've identified which might require additional work is our interface to S3. We use s3fs which both makes use of asyncio as well as some really weird idioms for running async code from sync contexts. We could certainly have an initial limitation that s3 logging doesn't work w/ the Trio back end but I'm imagining you all do need s3? ... This is something I think we will want to do anyway in the normal course of things so if it's not on your immediate wish list then be assured we'll probably get to it in the next few months anyway.

I'm inclined to leave this until later on the roadmap, but s3fs definitely won't work with Trio - that async-in-sync thing violates SC pretty badly. Various people keep asking for aws client libraries to use anyio, but at time of writing the SOTA approach is... to_thread.run_sync(whatever). I don't love this but we haven't had any trouble with it.

@jjallaire
Copy link
Collaborator

Okay, we've got this integrated now (on main, not yet on PyPI). Docs here: https://inspect.ai-safety-institute.org.uk/parallelism.html#async-backends

The S3 limitation we will leave for now but will come back to it soon. Hopefully we get also some engagement on the textual front.

LMK how you get on with this and if there are other things we need to button down

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants