From b06aa3fa5ff16cf6d6a1691733a932a9015623e8 Mon Sep 17 00:00:00 2001 From: chillymosh <86857777+chillymosh@users.noreply.github.com> Date: Fri, 10 Jan 2025 22:10:48 +0000 Subject: [PATCH] Rework class generation for eventsub and additional data This reworks the class generation for eventsub. It also adds the Metadata, Header and NotificationSubscription to the BaseClass for all event notifications. --- twitchio/eventsub/websockets.py | 2 +- twitchio/models/eventsub_.py | 236 +++++++++++++++++++++++++++++- twitchio/types_/conduits.py | 8 +- twitchio/types_/eventsub.py | 1 + twitchio/web/aio_adapter.py | 2 +- twitchio/web/starlette_adapter.py | 2 +- 6 files changed, 238 insertions(+), 13 deletions(-) diff --git a/twitchio/eventsub/websockets.py b/twitchio/eventsub/websockets.py index 99c33233..e38b442f 100644 --- a/twitchio/eventsub/websockets.py +++ b/twitchio/eventsub/websockets.py @@ -413,7 +413,7 @@ async def _process_notification(self, data: NotificationMessage) -> None: event = _SUB_MAPPING.get(sub_type, sub_type.removeprefix("channel.")).replace(".", "_") try: - payload_class = create_event_instance(sub_type, data["payload"]["event"], http=self._http) + payload_class = create_event_instance(sub_type, data, http=self._http) except ValueError: logger.warning("Websocket '%s' received an unhandled eventsub event: '%s'.", self, event) return diff --git a/twitchio/models/eventsub_.py b/twitchio/models/eventsub_.py index ce695c52..f2245d2e 100644 --- a/twitchio/models/eventsub_.py +++ b/twitchio/models/eventsub_.py @@ -24,6 +24,7 @@ from __future__ import annotations +import datetime from typing import TYPE_CHECKING, Any, ClassVar, Literal, NamedTuple, cast from twitchio.assets import Asset @@ -38,11 +39,17 @@ if TYPE_CHECKING: - import datetime - from twitchio.http import HTTPAsyncIterator, HTTPClient from twitchio.models.channel_points import CustomRewardRedemption - from twitchio.types_.conduits import Condition, RevocationSubscription, RevocationTransport + from twitchio.types_.conduits import ( + Condition, + NotificationMessage, + NotificationMetaData, + NotificationSubscription as _NotificationSubscription, + NotificationTransport, + RevocationSubscription, + RevocationTransport, + ) from twitchio.types_.eventsub import * from twitchio.types_.responses import ( EventsubSubscriptionResponse, @@ -55,17 +62,234 @@ class BaseEvent: _registry: ClassVar[dict[str, type]] = {} subscription_type: ClassVar[str | None] = None + def __init__( + self, + *, + subscription_data: Any, + metadata: NotificationMetaData | None = None, + headers: EventSubHeaders | None = None, + ) -> None: + self._metadata = metadata + self._headers = headers + self._sub_data = subscription_data + def __init_subclass__(cls, **kwargs: Any) -> None: super().__init_subclass__(**kwargs) if cls.subscription_type is not None: BaseEvent._registry[cls.subscription_type] = cls + @property + def timestamp(self) -> datetime.datetime: + """The timestamp of the eventsub notification from Twitch in UTC. + + If the notification Twitch sends is missing this data, then it will fall back to current UTC time. + + Returns + ------- + datetime.datetime + The datetime in UTC of the eventsub notification from Twitch. + """ + if self._metadata and (timestamp := self._metadata.get("message_timestamp")): + return parse_timestamp(timestamp) + + if self._headers and (timestamp := self._headers.get("Twitch-Eventsub-Message-Timestamp")): + return parse_timestamp(timestamp) + + return datetime.datetime.now(datetime.UTC) + + @property + def metadata(self) -> Metadata | None: + """Returns the metadata of a websocket event notification. + + Returns + ------- + Metadata | None + """ + if self._metadata is not None: + return Metadata(self._metadata) + return None -def create_event_instance(event_type: str, payload: dict[str, Any], http: HTTPClient | None = None) -> Any: + @property + def headers(self) -> Headers | None: + """Returns eventsub webhook headers as a structured Headers object. + + Returns + ------- + Headers | None + """ + if self._headers is not None: + return Headers(self._headers) + return None + + @property + def subscription_data(self) -> NotificationSubscription: + """Returns the subscription data of the eventsub notification. + + Returns + ------- + NotificationSubscription + """ + return NotificationSubscription(self._sub_data) + + +def create_event_instance( + event_type: str, + raw_data: NotificationMessage | Any, + *, + http: HTTPClient | None = None, + headers: EventSubHeaders | None = None, +) -> Any: event_cls = BaseEvent._registry.get(event_type) - if event_cls is None: + if not event_cls: raise ValueError(f"No class registered for event type {event_type}") - return event_cls(payload) if http is None else event_cls(payload, http=http) + + payload = raw_data["payload"]["event"] if "payload" in raw_data else raw_data["event"] + metadata = raw_data.get("metadata") + sub_data = raw_data["payload"]["subscription"] if "payload" in raw_data else raw_data["subscription"] + instance = event_cls(payload, http=http) + + if isinstance(instance, BaseEvent): + instance._sub_data = sub_data + instance._metadata = metadata + instance._headers = headers + + return instance + + +class Metadata: + """ + Represents the metadata returned from a websocket eventsub notification. + + Attributes + ----------- + message_id: str + An ID that uniquely identifies the message. + message_type: typing.Literal["notification"] + The type of message, which is set to `notification`. + message_timestamp: datetime.datetime + The UTC date and time that the message was sent. + subscription_type: str + The type of subscription. See `Subscription Types `_. + subscription_version: typing.Literal["1", "2"] + The version number of the subscription type's definition. This is the same value specified in the subscription request. + """ + + __slots__ = ("message_id", "message_timestamp", "message_type", "subscription_type", "subscription_version") + + def __init__(self, data: NotificationMetaData) -> None: + self.message_id: str = data["message_id"] + self.message_type: Literal["notification"] = data["message_type"] + self.message_timestamp: datetime.datetime = parse_timestamp(data["message_timestamp"]) + self.subscription_type: str = data["subscription_type"] + self.subscription_version: Literal["1", "2"] = data["subscription_version"] + + def __repr__(self) -> str: + return f"" + + +class Headers: + """ + Represents the headers received from a webhook notification. + + Attributes + ----------- + message_id: str + An ID that uniquely identifies this message. This is an opaque ID, and is not required to be in any particular format. + message_retry: str + Twitch sends you a notification at least once. If Twitch is unsure of whether you received a notification, it'll resend the event, which means you may receive a notification twice. + message_type: typing.Literal["notification", "webhook_callback_verification", "revocation"] + The type of notification. Possible values are: + + - notification — Contains the event's data. + - webhook_callback_verification — Contains the challenge used to verify that you own the event handler. + - revocation — Contains the reason why Twitch revoked your subscription. + + message_signature: str + The HMAC signature that you use to verify that Twitch sent the message. + message_timestamp: datetime.datetime: str + The UTC date and time that Twitch sent the notification. + subscription_type: str + The subscription type you subscribed to. For example, `channel.follow`. + subscription_version: str + The version number that identifies the definition of the subscription request. This version matches the version number that you specified in your subscription request. + raw_data: dict[str, str] + The headers as a raw dictionary, as there are additional fields that are not Twitch specific. You can utilise the `.get()` method to retrieve specific headers. + + """ + + __slots__ = ( + "message_id", + "message_retry", + "message_signature", + "message_timestamp", + "message_type", + "raw_data", + "subscription_type", + "subscription_version", + ) + + def __init__(self, data: EventSubHeaders) -> None: + self.message_id: str = data.get("Twitch-Eventsub-Message-Id", "") + self.message_retry: str = data.get("Twitch-Eventsub-Message-Retry", "") + self.message_type: Literal["notification", "webhook_callback_verification", "revocation"] = data.get( + "Twitch-Eventsub-Message-Type", "notification" + ) + self.message_signature: str = data.get("Twitch-Eventsub-Message-Signature", "") + timestamp = data.get("Twitch-Eventsub-Message-Timestamp", datetime.datetime.now(tz=datetime.UTC).isoformat()) + self.message_timestamp: datetime.datetime = parse_timestamp(timestamp) + self.subscription_type: str = data.get("Twitch-Eventsub-Subscription-Type", "") + self.subscription_version: str = data.get("Twitch-Eventsub-Subscription-Version", "") + + self.raw_data: EventSubHeaders = data + + def get(self, key: str) -> str | None: + """Retrieve a header value by key.""" + return self.raw_data.get(key) + + def __repr__(self) -> str: + return f"" + + +class NotificationSubscription: + """ + Represents the metadata returned from a websocket eventsub notification. + + Attributes + ----------- + id: str + An ID that uniquely identifies this subscription. + status: str + The subscription's status. + type: str + The notification's subscription type. + version: typing.Literal["1", "2"] + The version number of the subscription type's definition. + cost: int + How much the subscription counts against your limit. See `Subscription Limits `_. + condition: Condition + This is a TypedDict that contains the conditions under which the event fires. + transport: NotificationTransport + This is a TypedDict that contains information about the transport used for notifications. + created_at: datetime.datetime + The UTC date and time that the subscription was created. + """ + + __slots__ = ("condition", "cost", "created_at", "id", "status", "transport", "type", "version") + + def __init__(self, data: _NotificationSubscription) -> None: + self.id: str = data["id"] + self.status: str = data["status"] + self.type: str = data["type"] + self.version: Literal["1", "2"] = data["version"] + self.cost: int = data["cost"] + self.condition: Condition = data["condition"] + self.transport: NotificationTransport = data["transport"] + self.created_at: datetime.datetime = parse_timestamp(data["created_at"]) + + def __repr__(self) -> str: + return ( + f"" + ) class Boundary(NamedTuple): diff --git a/twitchio/types_/conduits.py b/twitchio/types_/conduits.py index fde789be..74e7bde4 100644 --- a/twitchio/types_/conduits.py +++ b/twitchio/types_/conduits.py @@ -101,7 +101,7 @@ class NotificationMetaData(TypedDict): message_type: Literal["notification"] message_timestamp: str subscription_type: str - subscription_version: str + subscription_version: Literal["1", "2"] class ReconnectMetaData(TypedDict): @@ -156,15 +156,15 @@ class Condition(TypedDict, total=False): class NotificationTransport(TypedDict): - method: Literal["websocket"] + method: Literal["websocket", "webhook"] session_id: str class NotificationSubscription(TypedDict): id: str - status: Literal["enabled"] + status: str type: str - version: str + version: Literal["1", "2"] cost: int condition: Condition transport: NotificationTransport diff --git a/twitchio/types_/eventsub.py b/twitchio/types_/eventsub.py index 0807e9c6..5c32d15c 100644 --- a/twitchio/types_/eventsub.py +++ b/twitchio/types_/eventsub.py @@ -112,6 +112,7 @@ "ChatSubGiftData", "ChatUserMessageHoldEvent", "ChatUserMessageUpdateEvent", + "EventSubHeaders", "GoalBeginEvent", "GoalEndEvent", "GoalProgressEvent", diff --git a/twitchio/web/aio_adapter.py b/twitchio/web/aio_adapter.py index dfb2d5e4..18eee268 100644 --- a/twitchio/web/aio_adapter.py +++ b/twitchio/web/aio_adapter.py @@ -257,7 +257,7 @@ async def eventsub_callback(self, request: web.Request) -> web.Response: event = _SUB_MAPPING.get(sub_type, sub_type.removeprefix("channel.")).replace(".", "_") try: - payload_class = create_event_instance(sub_type, data["event"], http=self.client._http) + payload_class = create_event_instance(sub_type, data, http=self.client._http, headers=headers) except ValueError: logger.warning("Webhook '%s' received an unhandled eventsub event: '%s'.", self, event) return web.Response(status=200) diff --git a/twitchio/web/starlette_adapter.py b/twitchio/web/starlette_adapter.py index a8784c96..a2b22bee 100644 --- a/twitchio/web/starlette_adapter.py +++ b/twitchio/web/starlette_adapter.py @@ -269,7 +269,7 @@ async def eventsub_callback(self, request: Request) -> Response: event = _SUB_MAPPING.get(sub_type, sub_type.removeprefix("channel.")).replace(".", "_") try: - payload_class = create_event_instance(sub_type, data["event"], http=self.client._http) + payload_class = create_event_instance(sub_type, data, http=self.client._http, headers=headers) except ValueError: logger.warning("Webhook '%s' received an unhandled eventsub event: '%s'.", self, event) return Response(status_code=200)