Skip to content

Commit

Permalink
Add custom pubsub source and sink experiment support for runner v2.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Nov 13, 2023
1 parent d49c268 commit 5ccbb05
Showing 1 changed file with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public String getUrn() {
@Override
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, Unbounded<?>> transform, SdkComponents components) {
if (ExperimentalOptions.hasExperiment(
transform.getPipeline().getOptions(), "enable_custom_pubsub_source")) {
return null;
}
if (!(transform.getTransform().getSource() instanceof PubsubUnboundedSource.PubsubSource)) {
return null;
}
Expand Down Expand Up @@ -111,6 +115,10 @@ public String getUrn() {
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubSink> transform,
SdkComponents components) {
if (ExperimentalOptions.hasExperiment(
transform.getPipeline().getOptions(), "enable_custom_pubsub_sink")) {
return null;
}
PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
ValueProvider<TopicPath> topicProvider =
Preconditions.checkStateNotNull(transform.getTransform().outer.getTopicProvider());
Expand Down Expand Up @@ -145,6 +153,10 @@ public String getUrn() {
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, PubsubUnboundedSink.PubsubDynamicSink> transform,
SdkComponents components) {
if (ExperimentalOptions.hasExperiment(
transform.getPipeline().getOptions(), "enable_custom_pubsub_sink")) {
return null;
}
PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder();
if (transform.getTransform().outer.getTimestampAttribute() != null) {
payloadBuilder.setTimestampAttribute(
Expand Down

0 comments on commit 5ccbb05

Please sign in to comment.