Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Newly created Storage API write streams do not recognize the previously updated table schema #33238

Closed
17 tasks
Abacn opened this issue Nov 27, 2024 · 3 comments · Fixed by #33231
Closed
17 tasks

Comments

@Abacn
Copy link
Contributor

Abacn commented Nov 27, 2024

What happened?

#33231 added a test, showing the combination of these two feature do not work together.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@ahmedabu98
Copy link
Contributor

After investigating a little, I found this doesn't actually have to do with dynamic destinations. The issue is when the sink creates a new StreamWriter after the schema has been updated.

Looks like we create the StreamWriter with a fixed schema. A StreamWriter returns an updated schema only if it was created before such an update. So it doesn't pull the updated schema and continues using the old schema it was created with.

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Dec 6, 2024

Say a streaming pipeline has been running for a while.. then the table's schema gets updated. If the pipeline decides to create new write streams after this (e.g. autosharding determines we need more shards), we will create those new write streams based on the original schema. We do not communicate to new shards that actually we are writing with a new schema. See the following code reference:

TableSchema updatedSchemaValue = updatedSchema.read();
if (autoUpdateSchema && updatedSchemaValue != null) {
// We've seen an updated schema, so we use that instead of querying the
// MessageConverter.
tableSchema = updatedSchemaValue;
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
} else {
// Start off with the base schema. As we get notified of schema updates, we
// will update the descriptor.
StorageApiDynamicDestinations.MessageConverter<?> converter =
messageConverters.get(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);
}

updatedSchema is a ValueState that exists within the scope of a shard. It only gets initialized when a Schema change happens within the lifetime of this shard:

if (newSchema.isPresent()) {
appendClientInfo.set(
AppendClientInfo.of(
newSchema.get(), appendClientInfo.get().getCloseAppendClient(), false));
APPEND_CLIENTS.invalidate(element.getKey());
APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned);
updatedSchema.write(newSchema.get());
}
}

@ahmedabu98 ahmedabu98 changed the title [Bug]: BigQueryIO Storage API Schema Update does not work with dynamic destinations [Bug]: Newly created Storage API write streams do not recognize the previously updated table schema Dec 6, 2024
@ahmedabu98
Copy link
Contributor

We do always attempt to fetch the StreamWriter's updated schema:

TableSchema updatedSchemaReturned =
(streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null;

But looking at the implementation in Storage API code base, we see that the updated schema is returned only if the StreamWriter was created before the schema update operation: https://github.com/googleapis/java-bigquerystorage/blob/f090c8eb91c1fad9e7f13850b367cca64c0afe5c/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java#L576-L591

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants