Skip to content

Commit

Permalink
JdbcIO - report schema as part of lineage (#33795)
Browse files Browse the repository at this point in the history
* fix missing schemas

* spotless

* fix tests
  • Loading branch information
stankiewicz authored Jan 29, 2025
1 parent e688295 commit 5d48309
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output

private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable String reportedLineage;
private @Nullable KV<@Nullable String, String> reportedLineage;

private ReadFn(
SerializableFunction<Void, DataSource> dataSourceProviderFn,
Expand Down Expand Up @@ -1641,16 +1641,17 @@ private Connection getConnection() throws SQLException {
this.connection = connection;

// report Lineage if not haven't done so
String table = JdbcUtil.extractTableFromReadQuery(query.get());
if (!table.equals(reportedLineage)) {
KV<@Nullable String, String> schemaWithTable =
JdbcUtil.extractTableFromReadQuery(query.get());
if (schemaWithTable != null && !schemaWithTable.equals(reportedLineage)) {
JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
if (fqn == null) {
fqn = JdbcUtil.FQNComponents.of(connection);
}
if (fqn != null) {
fqn.reportLineage(Lineage.getSources(), table);
fqn.reportLineage(Lineage.getSources(), schemaWithTable);
}
reportedLineage = table;
reportedLineage = schemaWithTable;
}
}
return connection;
Expand Down Expand Up @@ -2665,7 +2666,7 @@ abstract Builder<T, V> setMaxBatchBufferingDuration(
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable PreparedStatement preparedStatement;
private @Nullable String reportedLineage;
private @Nullable KV<@Nullable String, String> reportedLineage;
private static @Nullable FluentBackoff retryBackOff;

public WriteFn(WriteFnSpec<T, V> spec) {
Expand Down Expand Up @@ -2705,20 +2706,21 @@ private Connection getConnection() throws SQLException {
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
this.connection = connection;

// report Lineage if haven't done so
String table = spec.getTable();
if (Strings.isNullOrEmpty(table) && spec.getStatement() != null) {
table = JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
KV<@Nullable String, String> tableWithSchema;
if (Strings.isNullOrEmpty(spec.getTable()) && spec.getStatement() != null) {
tableWithSchema = JdbcUtil.extractTableFromWriteQuery(spec.getStatement().get());
} else {
tableWithSchema = JdbcUtil.extractTableFromTable(spec.getTable());
}
if (!Objects.equals(table, reportedLineage)) {
if (!Objects.equals(tableWithSchema, reportedLineage)) {
JdbcUtil.FQNComponents fqn = JdbcUtil.FQNComponents.of(validSource);
if (fqn == null) {
fqn = JdbcUtil.FQNComponents.of(connection);
}
if (fqn != null) {
fqn.reportLineage(Lineage.getSinks(), table);
fqn.reportLineage(Lineage.getSinks(), tableWithSchema);
}
reportedLineage = table;
reportedLineage = tableWithSchema;
}
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,14 +678,29 @@ abstract static class JdbcUrl {
/** Jdbc fully qualified name components. */
@AutoValue
abstract static class FQNComponents {

static final String DEFAULT_SCHEMA = "default";

abstract String getScheme();

abstract Iterable<String> getSegments();

void reportLineage(Lineage lineage, @Nullable String table) {
void reportLineage(Lineage lineage, @Nullable KV<@Nullable String, String> tableWithSchema) {
ImmutableList.Builder<String> builder = ImmutableList.<String>builder().addAll(getSegments());
if (table != null && !table.isEmpty()) {
builder.add(table);
if (tableWithSchema != null) {
if (tableWithSchema.getKey() != null && !tableWithSchema.getKey().isEmpty()) {
builder.add(tableWithSchema.getKey());
} else {
// Every database engine has the default schema or search path if user hasn't provided
// one. The name
// is specific to db engine. For PostgreSQL it is public, for MSSQL it is dbo.
// Users can have custom default scheme for the benefit of the user but dataflow is unable
// to determine that.
builder.add(DEFAULT_SCHEMA);
}
if (!tableWithSchema.getValue().isEmpty()) {
builder.add(tableWithSchema.getValue());
}
}
lineage.add(getScheme(), builder.build());
}
Expand Down Expand Up @@ -792,41 +807,66 @@ void reportLineage(Lineage lineage, @Nullable String table) {
}
}

private static final Pattern TABLE_PATTERN =
Pattern.compile(
"(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?<tableName>[^\\s\\[\\]`]+)\\]?`?",
Pattern.CASE_INSENSITIVE);

private static final Pattern READ_STATEMENT_PATTERN =
Pattern.compile(
"SELECT\\s+.+?\\s+FROM\\s+\\[?(?<tableName>[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE);
"SELECT\\s+.+?\\s+FROM\\s+(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?`?(?<tableName>[^\\s\\[\\]`]+)\\]?`?",
Pattern.CASE_INSENSITIVE);

private static final Pattern WRITE_STATEMENT_PATTERN =
Pattern.compile(
"INSERT\\s+INTO\\s+\\[?(?<tableName>[^\\s\\[\\]]+)\\]?", Pattern.CASE_INSENSITIVE);
"INSERT\\s+INTO\\s+(\\[?`?(?<schemaName>[^\\s\\[\\]`]+)\\]?`?\\.)?\\[?(?<tableName>[^\\s\\[\\]]+)\\]?",
Pattern.CASE_INSENSITIVE);

/** Extract table name a SELECT statement. Return empty string if fail to extract. */
static String extractTableFromReadQuery(@Nullable String query) {
/** Extract schema and table name a SELECT statement. Return null if fail to extract. */
static @Nullable KV<@Nullable String, String> extractTableFromReadQuery(@Nullable String query) {
if (query == null) {
return "";
return null;
}
Matcher matchRead = READ_STATEMENT_PATTERN.matcher(query);
if (matchRead.find()) {
String matched = matchRead.group("tableName");
if (matched != null) {
return matched;
String matchedTable = matchRead.group("tableName");
String matchedSchema = matchRead.group("schemaName");
System.out.println(matchedSchema);
if (matchedTable != null) {
return KV.of(matchedSchema, matchedTable);
}
}
return null;
}

static @Nullable KV<@Nullable String, String> extractTableFromTable(@Nullable String table) {
if (table == null) {
return null;
}
Matcher matchRead = TABLE_PATTERN.matcher(table);
if (matchRead.find()) {
String matchedTable = matchRead.group("tableName");
String matchedSchema = matchRead.group("schemaName");
if (matchedTable != null) {
return KV.of(matchedSchema, matchedTable);
}
}
return "";
return null;
}

/** Extract table name from an INSERT statement. Return empty string if fail to extract. */
static String extractTableFromWriteQuery(@Nullable String query) {
static @Nullable KV<@Nullable String, String> extractTableFromWriteQuery(@Nullable String query) {
if (query == null) {
return "";
return null;
}
Matcher matchRead = WRITE_STATEMENT_PATTERN.matcher(query);
if (matchRead.find()) {
String matched = matchRead.group("tableName");
if (matched != null) {
return matched;
String matchedTable = matchRead.group("tableName");
String matchedSchema = matchRead.group("schemaName");
if (matchedTable != null) {
return KV.of(matchedSchema, matchedTable);
}
}
return "";
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ public void testRead() {
PipelineResult result = pipeline.run();
assertThat(
Lineage.query(result.metrics(), Lineage.Type.SOURCE),
hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", READ_TABLE_NAME))));
hasItem(
Lineage.getFqName(
"derby", ImmutableList.of("memory", "testDB", "default", READ_TABLE_NAME))));
}

@Test
Expand All @@ -271,7 +273,9 @@ public void testReadWithSingleStringParameter() {
PipelineResult result = pipeline.run();
assertThat(
Lineage.query(result.metrics(), Lineage.Type.SOURCE),
hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", READ_TABLE_NAME))));
hasItem(
Lineage.getFqName(
"derby", ImmutableList.of("memory", "testDB", "default", READ_TABLE_NAME))));
}

@Test
Expand Down Expand Up @@ -543,7 +547,9 @@ public void testWrite() throws Exception {
assertRowCount(DATA_SOURCE, tableName, EXPECTED_ROW_COUNT);
assertThat(
Lineage.query(result.metrics(), Lineage.Type.SINK),
hasItem(Lineage.getFqName("derby", ImmutableList.of("memory", "testDB", tableName))));
hasItem(
Lineage.getFqName(
"derby", ImmutableList.of("memory", "testDB", "default", tableName))));
} finally {
DatabaseTestHelper.deleteTable(DATA_SOURCE, tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,25 @@ public void testFqnFromHikariDataSourceMySql() {

@Test
public void testExtractTableFromQuery() {
ImmutableList<KV<String, @Nullable String>> readCases =
ImmutableList<KV<String, @Nullable KV<String, String>>> readCases =
ImmutableList.of(
KV.of("select * from table_1", "table_1"),
KV.of("SELECT a, b FROM [table-2]", "table-2"),
KV.of("drop table not-select", ""));
for (KV<String, @Nullable String> testCase : readCases) {
KV.of("select * from table_1", KV.of(null, "table_1")),
KV.of("select * from public.table_1", KV.of("public", "table_1")),
KV.of("select * from `select`", KV.of(null, "select")),
KV.of("select * from `public`.`select`", KV.of("public", "select")),
KV.of("SELECT a, b FROM [table-2]", KV.of(null, "table-2")),
KV.of("SELECT a, b FROM [public].[table-2]", KV.of("public", "table-2")),
KV.of("drop table not-select", null));
for (KV<String, @Nullable KV<String, String>> testCase : readCases) {
assertEquals(testCase.getValue(), JdbcUtil.extractTableFromReadQuery(testCase.getKey()));
}
ImmutableList<KV<String, @Nullable String>> writeCases =
ImmutableList<KV<String, @Nullable KV<String, String>>> writeCases =
ImmutableList.of(
KV.of("insert into table_1 values ...", "table_1"),
KV.of("INSERT INTO [table-2] values ...", "table-2"),
KV.of("drop table not-select", ""));
for (KV<String, @Nullable String> testCase : writeCases) {
KV.of("insert into table_1 values ...", KV.of(null, "table_1")),
KV.of("INSERT INTO [table-2] values ...", KV.of(null, "table-2")),
KV.of("INSERT INTO [foo].[table-2] values ...", KV.of("foo", "table-2")),
KV.of("drop table not-select", null));
for (KV<String, @Nullable KV<String, String>> testCase : writeCases) {
assertEquals(testCase.getValue(), JdbcUtil.extractTableFromWriteQuery(testCase.getKey()));
}
}
Expand Down

0 comments on commit 5d48309

Please sign in to comment.