Skip to content

Commit

Permalink
feat: Activate Version (#89)
Browse files Browse the repository at this point in the history
* Activat Version example test

* Activate version working

* Lint and Schema fix for config

* Activate Version test to show data being deleted

* Lint Fix

* Activate version is doing things correctly, existing implementations of ACTIVATE_VERSION sometimes send an ACTIVATE_MESSAGE before records are sent which I think is just a bad tap implemention but it is valid

* Consistent spacing for docs

* Update README.md

Co-authored-by: Aaron ("AJ") Steers <[email protected]>

---------

Co-authored-by: Aaron ("AJ") Steers <[email protected]>
  • Loading branch information
visch and aaronsteers authored Feb 14, 2023
1 parent c66b0e2 commit 8150737
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 18 deletions.
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Target
* `schema-flattening`

## Settings

| Setting | Required | Default | Description |
| :------------------- | :------: | :-----------------: | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. |
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. |
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port, dialect. Note that you must esacpe password special characters properly see https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| Setting | Required | Default | Description |
| :-------------------- | :------: | :-----------------: | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| host | False | None | Hostname for postgres instance. Note if sqlalchemy_url is set this will be ignored. |
| port | False | 5432 | The port on which postgres is awaiting connection. Note if sqlalchemy_url is set this will be ignored. |
| user | False | None | User name used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | SQLAlchemy connection string. This will override using host, user, password, port,dialect. Note that you must escape password special characters properly. See https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords |
| dialect+driver | False | postgresql+psycopg2 | Dialect+driver see https://docs.sqlalchemy.org/en/20/core/engines.html. Generally just leave this alone. Note if sqlalchemy_url is set this will be ignored. |
| default_target_schema | False | None | Postgres schema to send data to, example: tap-clickup |
| hard_delete | False | 0 | When activate version is sent from a tap this specefies if we should delete the records that don't match, or mark them with a date in the `_sdc_deleted_at` column. |
| add_record_metadata | False | 1 | Note that this must be enabled for activate_version to work!This adds _sdc_extracted_at, _sdc_batched_at, and more to every table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html for more information. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `target-postgres --about`

Expand Down
67 changes: 67 additions & 0 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import uuid
from typing import Any, Dict, Iterable, List, Optional, Union

import sqlalchemy
from pendulum import now
from singer_sdk.sinks import SQLSink
from sqlalchemy import Column, MetaData, Table, insert
from sqlalchemy.sql import Executable
from sqlalchemy.sql.expression import bindparam

from target_postgres.connector import PostgresConnector

Expand Down Expand Up @@ -252,3 +255,67 @@ def schema_name(self) -> Optional[str]:

# Schema name not detected.
return None

def activate_version(self, new_version: int) -> None:
"""Bump the active version of the target table.
Args:
new_version: The version number to activate.
"""
# There's nothing to do if the table doesn't exist yet
# (which it won't the first time the stream is processed)
if not self.connector.table_exists(self.full_table_name):
return

deleted_at = now()
# Different from SingerSDK as we need to handle types the
# same as SCHEMA messsages
datetime_type = self.connector.to_sql_type(
{"type": "string", "format": "date-time"}
)

# Different from SingerSDK as we need to handle types the
# same as SCHEMA messsages
integer_type = self.connector.to_sql_type({"type": "integer"})

if not self.connector.column_exists(
full_table_name=self.full_table_name,
column_name=self.version_column_name,
):
self.connector.prepare_column(
self.full_table_name,
self.version_column_name,
sql_type=integer_type,
)

self.logger.info("Hard delete: %s", self.config.get("hard_delete"))
if self.config["hard_delete"] is True:
self.connection.execute(
f"DELETE FROM {self.full_table_name} "
f"WHERE {self.version_column_name} <= {new_version} "
f"OR {self.version_column_name} IS NULL"
)
return

if not self.connector.column_exists(
full_table_name=self.full_table_name,
column_name=self.soft_delete_column_name,
):
self.connector.prepare_column(
self.full_table_name,
self.soft_delete_column_name,
sql_type=datetime_type,
)
# Need to deal with the case where data doesn't exist for the version column
query = sqlalchemy.text(
f"UPDATE {self.full_table_name}\n"
f"SET {self.soft_delete_column_name} = :deletedate \n"
f"WHERE {self.version_column_name} < :version "
f"OR {self.version_column_name} IS NULL \n"
f" AND {self.soft_delete_column_name} IS NULL\n"
)
query = query.bindparams(
bindparam("deletedate", value=deleted_at, type_=datetime_type),
bindparam("version", value=new_version, type_=integer_type),
)
self.connector.connection.execute(query)
29 changes: 25 additions & 4 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(
th.StringType,
description=(
"User name used to authenticate. "
+ "Note if sqlalchemy_url is set this will be ignored.",
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
Expand All @@ -95,9 +95,9 @@ def __init__(
th.StringType,
description=(
"SQLAlchemy connection string. "
+ "This will override using host, user, password, port,"
+ "dialect. Note that you must esacpe password special"
+ "characters properly see"
+ "This will override using host, user, password, port, "
+ "dialect. Note that you must esacpe password special "
+ "characters properly see "
+ "https://docs.sqlalchemy.org/en/20/core/engines.html#escaping-special-characters-such-as-signs-in-passwords" # noqa: E501
),
),
Expand All @@ -117,6 +117,27 @@ def __init__(
th.StringType,
description="Postgres schema to send data to, example: tap-clickup",
),
th.Property(
"hard_delete",
th.BooleanType,
default=False,
description=(
"When activate version is sent from a tap this specefies "
+ "if we should delete the records that don't match, or mark "
+ "them with a date in the `_sdc_deleted_at` column."
),
),
th.Property(
"add_record_metadata",
th.BooleanType,
default=True,
description=(
"Note that this must be enabled for activate_version to work!"
+ "This adds _sdc_extracted_at, _sdc_batched_at, and more to every "
+ "table. See https://sdk.meltano.com/en/latest/implementation/record_metadata.html " # noqa: E501
+ "for more information."
),
),
).to_dict()
default_sink_class = PostgresSink

Expand Down
10 changes: 10 additions & 0 deletions target_postgres/tests/data_files/activate_version_hard.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"type": "SCHEMA", "stream": "test_activate_version_hard", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_hard", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_hard", "version": 1674486431563}
10 changes: 10 additions & 0 deletions target_postgres/tests/data_files/activate_version_soft.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"type": "SCHEMA", "stream": "test_activate_version_soft", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_soft", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_soft", "version": 1674486431563}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"type": "SCHEMA", "stream": "test_activate_version_deletes_data_properly", "schema": {"type": "object", "properties": {"code": {"type": ["string"]}, "name": {"type": ["null", "string"]}}}, "key_properties": ["code"], "bookmark_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AF", "name": "Africa"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AN", "name": "Antarctica"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "AS", "name": "Asia"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "EU", "name": "Europe"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "NA", "name": "North America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "OC", "name": "Oceania"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "RECORD", "stream": "test_activate_version_deletes_data_properly", "record": {"code": "SA", "name": "South America"}, "version": 1674486431563, "time_extracted": "2023-01-23T15:07:11.563063Z"}
{"type": "ACTIVATE_VERSION", "stream": "test_activate_version_deletes_data_properly", "version": 1674486431563}
Loading

0 comments on commit 8150737

Please sign in to comment.