Skip to content

Commit

Permalink
PubsubIO: Add readMessagesWithAttributesWithCoderAndParseFn (#31206)
Browse files Browse the repository at this point in the history
  • Loading branch information
llamallamaduck authored May 21, 2024
1 parent 89795c0 commit 88af35e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,17 @@ public static <T> Read<T> readMessagesWithCoderAndParseFn(
return Read.newBuilder(parseFn).setCoder(coder).build();
}

/**
* Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream,
* mapping each {@link PubsubMessage}, with attributes, into type T using the supplied parse
* function and coder. Similar to {@link #readMessagesWithCoderAndParseFn(Coder, SimpleFunction)},
* but with the with addition of making the message attributes available to the ParseFn.
*/
public static <T> Read<T> readMessagesWithAttributesWithCoderAndParseFn(
Coder<T> coder, SimpleFunction<PubsubMessage, T> parseFn) {
return Read.newBuilder(parseFn).setCoder(coder).setNeedsAttributes(true).build();
}

/**
* Returns a {@link PTransform} that continuously reads binary encoded Avro messages into the Avro
* {@link GenericRecord} type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,49 @@ public void testReadMessagesWithCoderAndParseFn() {
pipeline.run();
}

static class AppendSuffixAttributeToStringPayloadParseFn
extends SimpleFunction<PubsubMessage, String> {
@Override
public String apply(PubsubMessage input) {
String payload = new String(input.getPayload(), StandardCharsets.UTF_8);
String suffixAttribute = input.getAttributeMap().get("suffix");
return payload + suffixAttribute;
}
}

private IncomingMessage messageWithSuffixAttribute(String payload, String suffix) {
return IncomingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(payload))
.putAttributes("suffix", suffix)
.build(),
1234L,
0,
UUID.randomUUID().toString(),
UUID.randomUUID().toString());
}

@Test
public void testReadMessagesWithAttributesWithCoderAndParseFn() {
ImmutableList<IncomingMessage> inputs =
ImmutableList.of(
messageWithSuffixAttribute("foo", "-some-suffix"),
messageWithSuffixAttribute("bar", "-some-other-suffix"));
clientFactory = PubsubTestClient.createFactoryForPull(CLOCK, SUBSCRIPTION, 60, inputs);

PCollection<String> read =
pipeline.apply(
PubsubIO.readMessagesWithAttributesWithCoderAndParseFn(
StringUtf8Coder.of(), new AppendSuffixAttributeToStringPayloadParseFn())
.fromSubscription(SUBSCRIPTION.getPath())
.withClock(CLOCK)
.withClientFactory(clientFactory));

List<String> outputs = ImmutableList.of("foo-some-suffix", "bar-some-other-suffix");
PAssert.that(read).containsInAnyOrder(outputs);
pipeline.run();
}

@Test
public void testDynamicTopicsBounded() throws IOException {
testDynamicTopics(true);
Expand Down

0 comments on commit 88af35e

Please sign in to comment.