Skip to content

Commit

Permalink
[GOBBLIN-2177] Avoid shutting down QueueProcessor on non-InterruptedE…
Browse files Browse the repository at this point in the history
…xception (#4080)
  • Loading branch information
abhishekmjain authored and Will-Lo committed Dec 13, 2024
1 parent 916c87d commit 89bd41b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
Expand Down Expand Up @@ -176,6 +177,34 @@ public void testCalculateProduceToConsumeLag() {
Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
}

@Test
public void testQueueProcessorRuntimeExceptionEncounteredAutoCommitEnabled() throws Exception {
Properties consumerProps = new Properties();
consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
//Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id
String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis());
consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true");

// Create an instance of MockedHighLevelConsumer using an anonymous class
MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) {
int callCount = 0;
@Override
public void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) {
super.processMessage(message);
// Override the method to throw a custom exception
throw new RuntimeException("Simulated exception in processMessage");
}
};
consumer.startAsync().awaitRunning();

// Assert all NUM_MSGS messages were processed
consumer.awaitExactlyNMessages(NUM_MSGS, 10000);
consumer.shutDown();
}

private List<byte[]> createByteArrayMessages() {
List<byte[]> records = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,18 +340,29 @@ public void run() {
while (true) {
record = queue.take();
messagesRead.inc();
HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
recordsProcessed.incrementAndGet();
try {
HighLevelConsumer.this.processMessage((DecodeableKafkaRecord) record);
recordsProcessed.incrementAndGet();
}
catch (Exception e) {
// Rethrow exception in case auto commit is disabled
if (!HighLevelConsumer.this.enableAutoCommit) {
throw e;
}
// Continue with processing next records in case auto commit is enabled
log.error("Encountered exception while processing record. Record: {} Exception: {}", record, e);
}

if(!HighLevelConsumer.this.enableAutoCommit) {
KafkaPartition partition = new KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
if (!HighLevelConsumer.this.enableAutoCommit) {
KafkaPartition partition =
new KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic)
.build();
// Committed offset should always be the offset of the next record to be read (hence +1)
partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
}
}
} catch (InterruptedException e) {
} catch(InterruptedException e){
log.warn("Thread interrupted while processing queue ", e);
// TODO: evaluate whether we should interrupt the thread or continue processing
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Encountered exception while processing record so stopping queue processing. Record: {} Exception: {}", record, e);
Expand Down

0 comments on commit 89bd41b

Please sign in to comment.