Skip to content

Commit

Permalink
remove listing topics when processing each element (#31897)
Browse files Browse the repository at this point in the history
Co-authored-by: Naireen <[email protected]>
  • Loading branch information
Naireen and Naireen authored Jul 23, 2024
1 parent 9b6c805 commit 35cfad9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,19 +436,6 @@ public ProcessContinuation processElement(
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
// Check whether current TopicPartition is still available to read.
Set<TopicPartition> existingTopicPartitions = new HashSet<>();
for (List<PartitionInfo> topicPartitionList : consumer.listTopics().values()) {
topicPartitionList.forEach(
partitionInfo -> {
existingTopicPartitions.add(
new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
});
}
if (!existingTopicPartitions.contains(kafkaSourceDescriptor.getTopicPartition())) {
return ProcessContinuation.stop();
}

ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
long startOffset = tracker.currentRestriction().getFrom();
Expand All @@ -462,6 +449,10 @@ public ProcessContinuation processElement(
// When there are no records available for the current TopicPartition, self-checkpoint
// and move to process the next element.
if (rawRecords.isEmpty()) {
if (!topicPartitionExists(
kafkaSourceDescriptor.getTopicPartition(), consumer.listTopics())) {
return ProcessContinuation.stop();
}
if (timestampPolicy != null) {
updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker);
}
Expand Down Expand Up @@ -522,6 +513,23 @@ public ProcessContinuation processElement(
}
}

private boolean topicPartitionExists(
TopicPartition topicPartition, Map<String, List<PartitionInfo>> topicListMap) {
// Check if the current TopicPartition still exists.
Set<TopicPartition> existingTopicPartitions = new HashSet<>();
for (List<PartitionInfo> topicPartitionList : topicListMap.values()) {
topicPartitionList.forEach(
partitionInfo -> {
existingTopicPartitions.add(
new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
});
}
if (!existingTopicPartitions.contains(topicPartition)) {
return false;
}
return true;
}

// see https://github.com/apache/beam/issues/25962
private ConsumerRecords<byte[], byte[]> poll(
Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ public void testProcessElementWithEmptyPoll() throws Exception {
public void testProcessElementWhenTopicPartitionIsRemoved() throws Exception {
MockMultiOutputReceiver receiver = new MockMultiOutputReceiver();
consumer.setRemoved();
consumer.setNumOfRecordsPerPoll(10);
consumer.setNumOfRecordsPerPoll(-1);
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE));
ProcessContinuation result =
dofnInstance.processElement(
Expand Down

0 comments on commit 35cfad9

Please sign in to comment.