diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java index bb0b96f90..1ec5bbbd8 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java @@ -40,6 +40,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.kafka.KafkaTestBase; import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient; +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; import org.apache.gobblin.kafka.writer.Kafka09DataWriter; import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys; @@ -176,6 +177,34 @@ public void testCalculateProduceToConsumeLag() { Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L)); } + @Test + public void testQueueProcessorRuntimeExceptionEncounteredAutoCommitEnabled() throws Exception { + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest"); + //Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id + String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis()); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId); + consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true"); + + // Create an instance of MockedHighLevelConsumer using an anonymous class + MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) { + int callCount = 0; + @Override + public void processMessage(DecodeableKafkaRecord message) { + super.processMessage(message); + // Override the method to throw a custom exception + throw new RuntimeException("Simulated exception in processMessage"); + } + }; + consumer.startAsync().awaitRunning(); + + // Assert all NUM_MSGS messages were processed + consumer.awaitExactlyNMessages(NUM_MSGS, 10000); + consumer.shutDown(); + } + private List createByteArrayMessages() { List records = Lists.newArrayList(); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java index ea996b5fb..64bf9b810 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java @@ -340,18 +340,29 @@ public void run() { while (true) { record = queue.take(); messagesRead.inc(); - HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record); - recordsProcessed.incrementAndGet(); + try { + HighLevelConsumer.this.processMessage((DecodeableKafkaRecord) record); + recordsProcessed.incrementAndGet(); + } + catch (Exception e) { + // Rethrow exception in case auto commit is disabled + if (!HighLevelConsumer.this.enableAutoCommit) { + throw e; + } + // Continue with processing next records in case auto commit is enabled + log.error("Encountered exception while processing record. Record: {} Exception: {}", record, e); + } - if(!HighLevelConsumer.this.enableAutoCommit) { - KafkaPartition partition = new KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build(); + if (!HighLevelConsumer.this.enableAutoCommit) { + KafkaPartition partition = + new KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic) + .build(); // Committed offset should always be the offset of the next record to be read (hence +1) partitionOffsetsToCommit.put(partition, record.getOffset() + 1); } } - } catch (InterruptedException e) { + } catch(InterruptedException e){ log.warn("Thread interrupted while processing queue ", e); - // TODO: evaluate whether we should interrupt the thread or continue processing Thread.currentThread().interrupt(); } catch (Exception e) { log.error("Encountered exception while processing record so stopping queue processing. Record: {} Exception: {}", record, e);