Skip to content

Commit

Permalink
Psycopg3 compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
moser committed Feb 12, 2025
1 parent b37a712 commit 2d7c984
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 148 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
tests-psycopg-version:
strategy:
matrix:
psycopg: ["psycopg2", "psycopg"]
psycopg: ["psycopg2-binary", "psycopg2-binary==2.9.3", "psycopg-binary", "psycopg-binary==3.1"]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ Key features:
## Requirements

Python 3.9+

SQLAlchemy 1.4 or 2+

PostgreSQL 12+
Psycopg (Version 2 >= 2.9.3 or Version 3 >= 3.1)


## Installation
Expand Down
4 changes: 3 additions & 1 deletion depeche_db/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def get_union_members(union_or_type) -> typing.Generator[typing.Type, None, None
if os.environ.get("DEPECHE_DB_FORCE_PSYCOPG3", "0") != "1":
import psycopg2 as psycopg # noqa
from psycopg2.errors import LockNotAvailable as PsycoPgLockNotAvailable # noqa
from psycopg2.errors import RaiseException as PsycoPgRaiseException # noqa
from psycopg2.extras import Json as PsycoPgJson # noqa

PSYCOPG_VERSION = "2"
Expand All @@ -68,7 +69,8 @@ def get_union_members(union_or_type) -> typing.Generator[typing.Type, None, None
try:
import psycopg # noqa
from psycopg.errors import LockNotAvailable as PsycoPgLockNotAvailable # noqa
from psycopg.types.json import Jsonb as PsycoPgJson # noqa
from psycopg.errors import RaiseException as PsycoPgRaiseException # noqa
from psycopg.types.json import Json as PsycoPgJson # noqa

PSYCOPG_VERSION = "3"
except ImportError:
Expand Down
22 changes: 19 additions & 3 deletions depeche_db/_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def add_all(
try:
for idx, (message_id, message) in enumerate(messages):
_expected_version = (
expected_version + idx if expected_version is not None else None,
expected_version + idx if expected_version is not None else None
)
result: Any = conn.execute(
_sa.select(
Expand All @@ -95,8 +95,24 @@ def add_all(
).alias()
)
)
except _sa.exc.InternalError:
raise OptimisticConcurrencyError("optimistic concurrency failure")
except _sa.exc.InternalError as exc:
# psycopg2
from depeche_db._compat import PsycoPgRaiseException

if isinstance(exc.orig, PsycoPgRaiseException):
raise OptimisticConcurrencyError(
f"optimistic concurrency failure: {exc.orig}"
)
raise
except _sa.exc.ProgrammingError as exc:
# psycopg3
from depeche_db._compat import PsycoPgRaiseException

if isinstance(exc.orig, PsycoPgRaiseException):
raise OptimisticConcurrencyError(
f"optimistic concurrency failure: {exc.orig}"
)
raise
row = result.fetchone()
return MessagePosition(
stream=stream,
Expand Down
58 changes: 41 additions & 17 deletions depeche_db/tools/pg_notification_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
from typing import Iterator, Sequence

from depeche_db._compat import PSYCOPG_VERSION
from depeche_db._compat import psycopg as _psycopg

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,8 +49,24 @@ def stop(self):
self._keep_running = False
self._thread.join()

def _parse_dsn(self, dsn: str) -> tuple[str, str]:
import urllib.parse as _urlparse

parts = list(_urlparse.urlparse(dsn))
expected_psycopg_version = "2"
if parts[0] == "postgresql+psycopg":
parts[0] = "postgresql"
expected_psycopg_version = "3"
if parts[0] == "postgresql+psycopg2":
parts[0] = "postgresql"
dsn = _urlparse.urlunparse(parts)

return dsn, expected_psycopg_version

def _loop(self):
conn = _psycopg.connect(self.dsn)
dsn, expected_psycopg_version = self._parse_dsn(self.dsn)
assert expected_psycopg_version == PSYCOPG_VERSION, "Invalid psycopg version"
conn = _psycopg.connect(dsn)

try:
curs = conn.cursor()
Expand All @@ -61,21 +78,28 @@ def _loop(self):
if select.select([conn], [], [], self._select_timeout) == ([], [], []):
pass
else:
conn.poll()
while conn.notifies:
notification = conn.notifies.pop(0)
try:
if self._ignore_payload:
payload = {}
else:
payload = json.loads(notification.payload)
self._queue.put(
PgNotification(notification.channel, payload)
)
except Exception:
logger.exception(
f"Error processing notification payload: {notification}"
)

if PSYCOPG_VERSION == "3":
self._psycopg3_loop(conn)
else:
self._psycopg2_loop(conn)
finally:
conn.close()

def _psycopg3_loop(self, conn):
for notification in conn.notifies(timeout=0.2):
self._process_notification(notification)

def _psycopg2_loop(self, conn):
conn.poll()
while conn.notifies:
self._process_notification(conn.notifies.pop(0))

def _process_notification(self, notification):
try:
if self._ignore_payload:
payload = {}
else:
payload = json.loads(notification.payload)
self._queue.put(PgNotification(notification.channel, payload))
except Exception:
logger.exception(f"Error processing notification payload: {notification}")
4 changes: 2 additions & 2 deletions docs/generated/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ Key features:

* Message store with optimistic concurrency control & strong ordering guarantees
* Subscriptions with "at least once" semantics
* Parallel processing of (partitioned) subscriptions
* No database polling

## Requirements

Python 3.9+

SQLAlchemy 1.4 or 2+

PostgreSQL 12+
Psycopg (Version 2 >= 2.9.3 or Version 3 >= 3.1)


## Installation
Expand Down
4 changes: 3 additions & 1 deletion docs/generated/getting_started.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from sqlalchemy import create_engine

DB_DSN = "postgresql://depeche:depeche@localhost:4888/depeche_demo"
# If you are using psycopg 3, use the following DSN instead:
# DB_DSN = "postgresql+psycopg://depeche:depeche@localhost:4888/depeche_demo"
db_engine = create_engine(DB_DSN)

doc.md(
Expand Down Expand Up @@ -262,7 +264,7 @@ def handle_event_a(msg: SubscriptionMessage[EventA]):
)

subscription = aggregated_stream.subscription(
name="sub_example_docs_aggregate_me_with_handlers",
name="sub_example_docs_with_handlers",
handlers=handlers,
)

Expand Down
2 changes: 1 addition & 1 deletion docs/generated/output/getting-started-subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Now we can create a new subscription with these handlers.

```python
subscription = aggregated_stream.subscription(
name="sub_example_docs_aggregate_me_with_handlers",
name="sub_example_docs_with_handlers",
handlers=handlers,
)
```
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/output/getting-started-write-read.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ First, create a SQLAlchemy engine with your database connection:
from sqlalchemy import create_engine

DB_DSN = "postgresql://depeche:depeche@localhost:4888/depeche_demo"
# If you are using psycopg 3, use the following DSN instead:
# DB_DSN = "postgresql+psycopg://depeche:depeche@localhost:4888/depeche_demo"
db_engine = create_engine(DB_DSN)
```

Expand Down
Loading

0 comments on commit 2d7c984

Please sign in to comment.