Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: Event loop is closed, when running through Flask[async] #1168

Open
nioncode opened this issue Oct 1, 2024 · 13 comments
Open
Assignees
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@nioncode
Copy link

nioncode commented Oct 1, 2024

Bug Description

After seeing that gevent is not supported (#969), we started to port our app from flask to quart. At the moment, we successfully ported some endpoints to quart, but still have others that are kept in flask. To have a similar code structure, we now run flask with async views (using flask[async] as dependency) so that we can run the same async database driver etc. in flask and quart and don't have to maintain sync and async engines.

After the app was idle for some time, we could not establish new connections to the database, since the event loop is closed (see stacktrace below). After restarting the running container, connections can successfully be established again.

From what I understood, flask[async] creates a new event loop for each request, i.e. the event loop of flask gets stopped after every request. That's why we use the NullPool of sqlalchemy so that connections are not shared and new connections are established for each request (which is ok performance-wise for those endpoints that are still running in flask), since you cannot share connections across different event loops. This seems to work fine, since a new connection is created instead of re-using an existing one, so it is probably not the source of the issue.
Further, cloud-sql-python-connector seems to spin up its own thread + event loop, which is used for internal processing, so this is probably not the issue either.

What does NOT spin up its own event loop, is the aiohttp.ClientSession that gets lazily initialized during connector.connect_async and seems to use the loop that is currently running whenever the first db connection gets established. What I think is happening here is that this event loop gets closed at some point and then the CientSession can not perform any requests anymore. What I don't understand though is why the event loop gets closed (since it seems to not be the event loop started by flask[async] for request processing, since that loop should be stopped after every request and so the next request should immediately fail).

Should the aiohttp.ClientSession use the same event loop that the connector itself uses (which could just pass its internal loop to the session when initiating it) or is there a reason why it connects to the currently running loop?

Example code (or command)

        connector = Connector(refresh_strategy=RefreshStrategy.LAZY)
        atexit.register(lambda: connector.close())

        async def get_connection() -> Connection:
            return await connector.connect_async(
                _from_env_str("DB_CONNECTION_NAME"),
                "asyncpg",
                user=_from_env_str("DB_USER"),
                db=_from_env_str("DB_NAME"),
                ip_type=IPTypes.PRIVATE,
                enable_iam_auth=True,
            )

Stacktrace

Traceback (most recent call last):
  File "/app/my_app/auth.py", line 124, in _parse_user_object
    user = await user_lookup(
           ^^^^^^^^^^^^^^^^^^
  File "/app/my_app/auth.py", line 101, in lookup_tss
    db_object = await db_object.for_serial(username)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/my_app/models_db.py", line 732, in for_serial
    await db.session.scalars(
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/scoping.py", line 1113, in scalars
    return await self._proxied.scalars(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py", line 574, in scalars
    result = await self.execute(
             ^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
             ^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 201, in greenlet_spawn
    result = context.throw(*sys.exc_info())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2351, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2226, in _execute_internal
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2095, in _connection_for_bind
    return trans._connection_for_bind(engine, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in _connection_for_bind
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
                ^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 1189, in _connection_for_bind
    conn = bind.connect()
           ^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/opentelemetry/instrumentation/sqlalchemy/engine.py", line 98, in _wrap_connect_internal
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 3276, in connect
    return self._connection_cls(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 146, in __init__
    self._dbapi_connection = engine.raw_connection()
                             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 3300, in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 449, in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 1263, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 712, in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/impl.py", line 308, in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 390, in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 674, in __init__
    self.__connect()
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 900, in __connect
    with util.safe_reraise():
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 896, in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/pool/base.py", line 362, in <lambda>
    return lambda rec: creator_fn()
                       ^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/engine.py", line 115, in creator
    return sync_engine.dialect.dbapi.connect(  # type: ignore
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 932, in connect
    await_only(creator_fn(*arg, **kw)),
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 132, in await_only
    return current.parent.switch(awaitable)  # type: ignore[no-any-return,attr-defined] # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 196, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/app/my_app/config.py", line 55, in get_connection
    return await connector.connect_async(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/google/cloud/sql/connector/connector.py", line 341, in connect_async
    conn_info = await cache.connect_info()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/google/cloud/sql/connector/lazy.py", line 103, in connect_info
    conn_info = await self._client.get_connection_info(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/google/cloud/sql/connector/client.py", line 271, in get_connection_info
    metadata = await metadata_task
               ^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/google/cloud/sql/connector/client.py", line 128, in _get_metadata
    resp = await self._client.get(url, headers=headers)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/aiohttp/client.py", line 535, in _request
    handle = tm.start()
             ^^^^^^^^^^
  File "/app/venv/lib/python3.12/site-packages/aiohttp/helpers.py", line 627, in start
    return self._loop.call_at(when, self.__call__)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 774, in call_at
    self._check_closed()
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Steps to reproduce?

  1. Deploy an app that uses flask[async] and an async connector to connect to the db.
  2. Run a successful request
  3. Wait a few hours (not clear what the exact timeout is or if this is reproducible, we'll need to debug this further)
  4. Run another request that requires the database
  5. Observe a stacktrace where the connection cannot be established

Environment

  1. OS type and version: python:3.12-slim Docker container
  2. Python version: 3.12
  3. Cloud SQL Python Connector version: 1.11.0

Additional Details

No response

@nioncode nioncode added the type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. label Oct 1, 2024
@nioncode
Copy link
Author

nioncode commented Oct 1, 2024

Could be the same as #1107. Seems like we would need a different connector for each connection, although I'm not sure why it works in the first place then (and why the connector even allows to be used without passing an explicit loop to use?).

@jackwotherspoon
Copy link
Collaborator

Hi @nioncode thanks for raising an issue on the Cloud SQL Python Connector! 😄

So first things first, this seems like you may have hit some edge case as regular async usage with asyncpg should not work without explicitly passing the loop to use. I am not super familiar with the async extension of Flask flask[async]. Maybe it is why the initial connection is working but as you mention it closes the event loop outside of the request context.

Per our README's Async Driver Usage section and guidance you should either use create_async_connector helper function or Connector(loop=your_event_loop) to initialize a Connector object to be used with the asyncpg driver.

Is there a reason you are not initializing the Connector with one of these approaches?

I need to hammer down the different async usage patterns and make sure the Connector handles these gracefully. It seems there is definitely some room for improvements and paths that should error explicitly instead of failing after a period of time.

Thanks for this data point.

@jackwotherspoon jackwotherspoon added the priority: p2 Moderately-important priority. Fix may not be included in next release. label Oct 1, 2024
@nioncode
Copy link
Author

nioncode commented Oct 1, 2024

Thanks for the fast response! Honestly we just missed the create_async_connector helper function, since we switched from the sync version to the async version and did not see any problems initially, so we thought everything works fine.

Can you recommend a way to close the connector as soon as the underlying connection is closed in sqlalchemy? I had a look at sqlalchemy events, but it seems like all the connection close events are raised before the connection is actually closed, so it is probably too early to close the connector at this point.

Also, I'm wondering if a workaround would be if the connector would forward its internal loop to the aiohttp ClientSession instead of letting the ClientSession default to the cururently running loop. Is there any downside in having the connector + aiohttp run with one loop and the remaining app with another?

@jackwotherspoon
Copy link
Collaborator

Can you recommend a way to close the connector as soon as the underlying connection is closed in sqlalchemy? I had a look at sqlalchemy events, but it seems like all the connection close events are raised before the connection is actually closed, so it is probably too early to close the connector at this point.

This I will have to look into, my first thought was close events as well, maybe the close_detached? However, you want to re-use a single Connector to be used by many connections and not just a Connector per connection as this will lead to quota issues and extra connection latency.

Is there any downside in having the connector + aiohttp run with one loop and the remaining app with another

The reason the aiohttp.ClientSession uses the running event loop is that connector.connect_async is meant to be the async entrypoint. Once you are within this method we want to use a single event loop to be the most efficient with resources and not be hopping between event loops leading to additional complexity.

@nioncode
Copy link
Author

nioncode commented Oct 1, 2024

I understand that this would be the optimal setup, but I'm afraid this just does not work with flask[async] (although I wonder how it even works initially before the app goes idle for a long time), since it closes the event loop for every request.

At the moment we are just using flask[async] as a kind of bridge until we ported everything to quart, so for us this problem will hopefully solve itself during the next weeks. However, I'd suggest that the connector makes it clear that you should never use multiple event loops and just fail if one attempts to do so instead of sometimes working and then later break in production in hard-to-understand ways (similar to what you did with #1113).

Why does the connector even support creating its own event loop, if it is not supposed to be used that way?

@jackwotherspoon
Copy link
Collaborator

Why does the connector even support creating its own event loop, if it is not supposed to be used that way?

Good question, the connector supports creating its own event loop because this is the way the sync drivers are intended to be used. However, for async driver (asyncpg) its expected that an event loop is already present.

Looking back we should have created a separate AsyncConnector class for async usage that would have made things much more clear instead of re-using the Connector class for both sync and async drivers.

@jackwotherspoon
Copy link
Collaborator

FYI: I have fixed the issue with gevent in #1170 and will cut a release of the library next week to get it out the door.

Will continue to look at this use-case as well.

@nioncode
Copy link
Author

nioncode commented Oct 9, 2024

Thanks for the gevent fix! However, I think we are now in a weird situation where we cannot switch to gevent anymore, since our code base now uses async/await everywhere 😄

Can you give us a recommendation how we would best use the connector at this point through flask[async]? We don't care if the connector operates in async or sync mode (since flask[async] still runs each request sequentially and not in parallel) and whether there are connection delays (since all performance critical endpoints were moved to quart already). We just need to avoid getting random event loop related issues (which probably happen when the connector tries to refresh its credentials, but I'm not too sure about this) when running in flask.
We'd like to avoid running with pg8000 instead of asyncdb, since they behave a bit differently with regards to some database exceptions, which we then would have to handle differently depending on which driver we are currently using, but if this would be a way to use the connector in a 'proper' way, then we'd do this.

EDIT: we can't even migrate our entire app to quart at this point, since we have other dependencies that are not async-compatible (Google Cloud Storage, Cloud Tasks), so it would be best if we keep those in flask anyways.

@nioncode
Copy link
Author

nioncode commented Oct 9, 2024

Can we maybe use a sync Connector and just hand it a pre-created event loop? Then everything related to the connector should happen on that loop and if we don't use a connection pool, then connections are not shared across multiple event loops and things should work mostly fine or?

@jackwotherspoon
Copy link
Collaborator

@nioncode couple clarifying questions for you.

We'd like to avoid running with pg8000 instead of asyncdb

Which database driver would you like to use? I am a bit confused by this sentence...

Can we maybe use a sync Connector and just hand it a pre-created event loop?

Yes you could definitely give this a try, creating a loop and initializing the sync Connector with it. However, this means you have to create multiple Connectors potentially for different connections if I understand your use case properly. Ideally the handling of event loops etc should be abstracted away from the user and inside the Connector logic.

@nioncode
Copy link
Author

nioncode commented Oct 11, 2024

Which database driver would you like to use? I am a bit confused by this sentence...

Initially (when we were running with regular flask) we used pg8000. We then switched to asynpg. As part of this issue, we now wanted to avoid going back to pg8000, but in the end this did not work out (see below).

Yes you could definitely give this a try, creating a loop and initializing the sync Connector with it. However, this means you have to create multiple Connectors potentially for different connections if I understand your use case properly. Ideally the handling of event loops etc should be abstracted away from the user and inside the Connector logic.

Thanks. We played around with this a bit, but it was a real mess getting this to work properly with flask[async], since it seems like each request now actually runs in its own thread, so this lead to more or less creating a connector per connection since there was no way of matching the currently running thread to the matching coordinator. This then finally worked, but something leaked so the memory consumption grew with every request (and in one variations even the cpu usage, which was very confusing for us).

Long story short, we now moved back to using a sync engine (and connector) with pg8000 when running through flask[async], which works reasonably well (at least in our use case) and allows to share the connection pool between multiple threads (since the sync engine or the connection pool coordinates this correctly).

I'm still wondering if it would be somehow possible to support flask[async] with an async connector, but I don't understand enough of how the threads and event loops work in flask[async] to make an educated guess here.

Thank you for all your fast responses and help with this, it's a real pleasure to work with you!

@jackwotherspoon
Copy link
Collaborator

@nioncode thanks for the great clarification! I understand your dilemma well now.

I plan to take some time soon to dive deep into a few ways in which I can make the async interface easier to use and will make sure to give flask[async] a proper test and see if I can get back to you on a better solution.

In the meantime, I am glad the sync engine seems to be working.

Thanks for all the details, makes my life much easier to reproduce this use-case. 😄

Will update this thread with any updates made.

@jackwotherspoon
Copy link
Collaborator

jackwotherspoon commented Jan 7, 2025

Will begin taking a closer look into async frameworks and their usage of Python Connector in the coming weeks...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

2 participants