From f5efa08b367cf574bfca9236c6e759081316c687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Fri, 7 Feb 2025 12:56:41 -0600 Subject: [PATCH] feat: Added an extensible API for stream schema sources --- .../sample_tap_gitlab/gitlab_rest_streams.py | 13 ++- singer_sdk/schema.py | 103 ++++++++++++++++++ tests/_singerlib/test_schema.py | 34 ++++++ tests/core/test_schema.py | 86 ++++----------- 4 files changed, 168 insertions(+), 68 deletions(-) create mode 100644 singer_sdk/schema.py diff --git a/samples/sample_tap_gitlab/gitlab_rest_streams.py b/samples/sample_tap_gitlab/gitlab_rest_streams.py index 6fb1e5640..fa61a12ac 100644 --- a/samples/sample_tap_gitlab/gitlab_rest_streams.py +++ b/samples/sample_tap_gitlab/gitlab_rest_streams.py @@ -7,6 +7,7 @@ from singer_sdk.authenticators import SimpleAuthenticator from singer_sdk.pagination import SimpleHeaderPaginator +from singer_sdk.schema import LocalSchemaSource from singer_sdk.streams.rest import RESTStream from singer_sdk.typing import ( ArrayType, @@ -19,7 +20,7 @@ StringType, ) -SCHEMAS_DIR = importlib.resources.files(__package__) / "schemas" +LOCAL_SCHEMAS = LocalSchemaSource(importlib.resources.files(__package__) / "schemas") DEFAULT_URL_BASE = "https://gitlab.com/api/v4" @@ -103,7 +104,7 @@ class ProjectsStream(ProjectBasedStream): primary_keys = ("id",) replication_key = "last_activity_at" is_sorted = True - schema_filepath = SCHEMAS_DIR / "projects.json" + schema = LOCAL_SCHEMAS("projects") class ReleasesStream(ProjectBasedStream): @@ -113,7 +114,7 @@ class ReleasesStream(ProjectBasedStream): path = "/projects/{project_id}/releases" primary_keys = ("project_id", "tag_name") replication_key = None - schema_filepath = SCHEMAS_DIR / "releases.json" + schema = LOCAL_SCHEMAS("releases") class IssuesStream(ProjectBasedStream): @@ -124,7 +125,7 @@ class IssuesStream(ProjectBasedStream): primary_keys = ("id",) replication_key = "updated_at" is_sorted = False - schema_filepath = SCHEMAS_DIR / "issues.json" + schema = LOCAL_SCHEMAS("issues") class CommitsStream(ProjectBasedStream): @@ -137,7 +138,7 @@ class CommitsStream(ProjectBasedStream): primary_keys = ("id",) replication_key = "created_at" is_sorted = False - schema_filepath = SCHEMAS_DIR / "commits.json" + schema = LOCAL_SCHEMAS("commits") class EpicsStream(ProjectBasedStream): @@ -202,7 +203,7 @@ class EpicIssuesStream(GitlabStream): path = "/groups/{group_id}/epics/{epic_iid}/issues" primary_keys = ("id",) replication_key = None - schema_filepath = SCHEMAS_DIR / "epic_issues.json" + schema = LOCAL_SCHEMAS("epic_issues") parent_stream_type = EpicsStream # Stream should wait for parents to complete. def get_url_params( diff --git a/singer_sdk/schema.py b/singer_sdk/schema.py new file mode 100644 index 000000000..768f56e60 --- /dev/null +++ b/singer_sdk/schema.py @@ -0,0 +1,103 @@ +"""Schema sources.""" + +from __future__ import annotations + +import functools +import json +import sys +import typing as t +from pathlib import Path + +import requests + +from singer_sdk._singerlib import resolve_schema_references + +if sys.version_info < (3, 12): + from importlib.abc import Traversable +else: + from importlib.resources.abc import Traversable + + +class BaseSchemaSource: + """Base schema source.""" + + def __init__(self, path: str | Path | Traversable) -> None: + """Initialize the schema source. + + Args: + path: Path to the schema. + """ + self.path = path + self._registry: dict[str, dict] = {} + + def get_schema(self, *args: t.Any, **kwargs: t.Any) -> dict: + """Get schema from reference. + + Raises: + NotImplementedError: If the method is not implemented by the subclass. + """ + msg = "Subclasses must implement this method." + raise NotImplementedError(msg) + + def __call__(self, *args: t.Any, **kwargs: t.Any) -> dict: + """Get schema for the given stream name or reference. + + Returns: + The schema dictionary. + """ + return self.get_schema(*args, **kwargs) + + +class LocalSchemaSource(BaseSchemaSource): + """Local schema source.""" + + def get_schema(self, name: str) -> dict: + """Get schema from reference. + + Args: + name: Name of the stream. + + Returns: + The schema dictionary. + """ + if name not in self._registry: + schema_path = self.path / f"{name}.json" + self._registry[name] = json.loads(schema_path.read_text()) + + return self._registry[name] + + +class OpenAPISchemaSource(BaseSchemaSource): + """OpenAPI schema source.""" + + @functools.cached_property + def spec_dict(self) -> dict: + """OpenAPI spec dictionary. + + Raises: + ValueError: If the path type is not supported. + """ + if isinstance(self.path, Path | Traversable): + return json.loads(self.path.read_text()) + + if self.path.startswith("http"): + return requests.get(self.path, timeout=10).json() + + msg = f"Unsupported path type: {self.path}" + raise ValueError(msg) + + def get_schema(self, ref: str) -> dict: + """Get schema from reference. + + Args: + ref: Reference to the schema. + + Returns: + The schema dictionary. + """ + if ref not in self._registry: + schema = {"$ref": f"#/components/schemas/{ref}"} + schema["components"] = self.spec_dict["components"] + self._registry[ref] = resolve_schema_references(schema) + + return self._registry[ref] diff --git a/tests/_singerlib/test_schema.py b/tests/_singerlib/test_schema.py index 2f0023347..9f0dca112 100644 --- a/tests/_singerlib/test_schema.py +++ b/tests/_singerlib/test_schema.py @@ -30,6 +30,40 @@ } +def test_simple_schema(): + simple_schema = { + "title": "Longitude and Latitude Values", + "description": "A geographical coordinate.", + "required": ["latitude", "longitude"], + "type": "object", + "properties": { + "latitude": {"type": "number", "minimum": -90, "maximum": 90}, + "longitude": {"type": "number", "minimum": -180, "maximum": 180}, + }, + } + + schema_plus = Schema.from_dict(simple_schema) + assert schema_plus.to_dict() == simple_schema + assert schema_plus.required == ["latitude", "longitude"] + assert isinstance(schema_plus.properties["latitude"], Schema) + latitude = schema_plus.properties["latitude"] + assert latitude.type == "number" + + +def test_schema_with_items(): + schema = { + "description": "A representation of a person, company, organization, or place", + "type": "object", + "properties": {"fruits": {"type": "array", "items": {"type": "string"}}}, + } + schema_plus = Schema.from_dict(schema) + assert schema_plus.to_dict() == schema + assert isinstance(schema_plus.properties["fruits"], Schema) + fruits = schema_plus.properties["fruits"] + assert isinstance(fruits.items, Schema) + assert fruits.items.type == "string" + + @pytest.mark.parametrize( "schema,expected", [ diff --git a/tests/core/test_schema.py b/tests/core/test_schema.py index 5fa8c75f8..1e17a32b8 100644 --- a/tests/core/test_schema.py +++ b/tests/core/test_schema.py @@ -1,70 +1,32 @@ -""" -Testing that Schema can convert schemas lossless from and to dicts. - -Schemas are taken from these examples; -https://json-schema.org/learn/miscellaneous-examples.html - -NOTE: The following properties are not currently supported; -pattern -unevaluatedProperties -propertyNames -minProperties -maxProperties -prefixItems -contains -minContains -maxContains -minItems -maxItems -uniqueItems -enum -const -contentMediaType -contentEncoding -allOf -oneOf -not - -Some of these could be trivially added (if they are SIMPLE_PROPERTIES. -Some might need more thinking if they can contain schemas (though, note that we also -treat 'additionalProperties', 'anyOf' and' patternProperties' as SIMPLE even though they -can contain schemas. -""" +"""Test the schema sources.""" from __future__ import annotations -from singer_sdk._singerlib import Schema +import typing as t + +from singer_sdk.schema import LocalSchemaSource, OpenAPISchemaSource +if t.TYPE_CHECKING: + import pytest -def test_simple_schema(): - simple_schema = { - "title": "Longitude and Latitude Values", - "description": "A geographical coordinate.", - "required": ["latitude", "longitude"], - "type": "object", - "properties": { - "latitude": {"type": "number", "minimum": -90, "maximum": 90}, - "longitude": {"type": "number", "minimum": -180, "maximum": 180}, - }, - } - schema_plus = Schema.from_dict(simple_schema) - assert schema_plus.to_dict() == simple_schema - assert schema_plus.required == ["latitude", "longitude"] - assert isinstance(schema_plus.properties["latitude"], Schema) - latitude = schema_plus.properties["latitude"] - assert latitude.type == "number" +def test_local_schema_source(pytestconfig: pytest.Config): + schema_dir = pytestconfig.rootpath / "tests/fixtures/schemas" + schema_source = LocalSchemaSource(schema_dir) + schema = schema_source("user") + assert isinstance(schema, dict) + assert schema["type"] == "object" + assert "items" not in schema + assert "properties" in schema + assert "id" in schema["properties"] -def test_schema_with_items(): - schema = { - "description": "A representation of a person, company, organization, or place", - "type": "object", - "properties": {"fruits": {"type": "array", "items": {"type": "string"}}}, - } - schema_plus = Schema.from_dict(schema) - assert schema_plus.to_dict() == schema - assert isinstance(schema_plus.properties["fruits"], Schema) - fruits = schema_plus.properties["fruits"] - assert isinstance(fruits.items, Schema) - assert fruits.items.type == "string" +def test_openapi_schema_source(pytestconfig: pytest.Config): + openapi_path = pytestconfig.rootpath / "tests/fixtures/openapi.json" + schema_source = OpenAPISchemaSource(openapi_path) + schema = schema_source("ProjectListItem") + assert isinstance(schema, dict) + assert schema["type"] == "object" + assert "items" not in schema + assert "properties" in schema + assert "id" in schema["properties"]