diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 1d687812560b..01848d92d928 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -671,6 +671,17 @@ public static Read readMessagesWithCoderAndParseFn( return Read.newBuilder(parseFn).setCoder(coder).build(); } + /** + * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream, + * mapping each {@link PubsubMessage}, with attributes, into type T using the supplied parse + * function and coder. Similar to {@link #readMessagesWithCoderAndParseFn(Coder, SimpleFunction)}, + * but with the with addition of making the message attributes available to the ParseFn. + */ + public static Read readMessagesWithAttributesWithCoderAndParseFn( + Coder coder, SimpleFunction parseFn) { + return Read.newBuilder(parseFn).setCoder(coder).setNeedsAttributes(true).build(); + } + /** * Returns a {@link PTransform} that continuously reads binary encoded Avro messages into the Avro * {@link GenericRecord} type. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index abc35d0bb1b2..fe6338a501c4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -780,6 +780,49 @@ public void testReadMessagesWithCoderAndParseFn() { pipeline.run(); } + static class AppendSuffixAttributeToStringPayloadParseFn + extends SimpleFunction { + @Override + public String apply(PubsubMessage input) { + String payload = new String(input.getPayload(), StandardCharsets.UTF_8); + String suffixAttribute = input.getAttributeMap().get("suffix"); + return payload + suffixAttribute; + } + } + + private IncomingMessage messageWithSuffixAttribute(String payload, String suffix) { + return IncomingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(payload)) + .putAttributes("suffix", suffix) + .build(), + 1234L, + 0, + UUID.randomUUID().toString(), + UUID.randomUUID().toString()); + } + + @Test + public void testReadMessagesWithAttributesWithCoderAndParseFn() { + ImmutableList inputs = + ImmutableList.of( + messageWithSuffixAttribute("foo", "-some-suffix"), + messageWithSuffixAttribute("bar", "-some-other-suffix")); + clientFactory = PubsubTestClient.createFactoryForPull(CLOCK, SUBSCRIPTION, 60, inputs); + + PCollection read = + pipeline.apply( + PubsubIO.readMessagesWithAttributesWithCoderAndParseFn( + StringUtf8Coder.of(), new AppendSuffixAttributeToStringPayloadParseFn()) + .fromSubscription(SUBSCRIPTION.getPath()) + .withClock(CLOCK) + .withClientFactory(clientFactory)); + + List outputs = ImmutableList.of("foo-some-suffix", "bar-some-other-suffix"); + PAssert.that(read).containsInAnyOrder(outputs); + pipeline.run(); + } + @Test public void testDynamicTopicsBounded() throws IOException { testDynamicTopics(true);