Skip to content

Commit

Permalink
[GOBBLIN-1956]Make Kafka streaming pipeline be able to config the max…
Browse files Browse the repository at this point in the history
… poll records during runtime (#3827)

* address comments

* use connectionmanager when httpclient is not cloesable

* add uite test

* fix typo

* [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max poll records during runtime

* small refractor

---------

Co-authored-by: Zihan Li <[email protected]>
  • Loading branch information
2 people authored and Will-Lo committed Nov 16, 2023
1 parent d0842da commit f1f164f
Showing 1 changed file with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableKafkaRecord> {
public static final String DATASET_KEY = "dataset";
public static final String DATASET_PARTITION_KEY = "datasetPartition";
public static final String MAX_KAFKA_BUFFER_SIZE_IN_BYTES = "kafka.streaming.max.kafka.buffer.size.in.bytes";
public static final Long DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES = Long.valueOf(50 * 1024 * 1024);
// Max number of records to be pulled in single polling.
private static final String KAFKA_MAX_POLL_RECORDS_KEY = "kafka.consumer.maxPollRecords";
private static final int DEFAULT_MAX_POLL_RECORDS = 100;
private static final Long MAX_LOG_ERRORS = 100L;

private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
Expand Down Expand Up @@ -115,6 +120,7 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
protected MultiLongWatermark lowWatermark;
protected MultiLongWatermark highWatermark;
protected MultiLongWatermark nextWatermark;
protected long maxAvgRecordSize = -1;
protected Map<Integer, DecodeableKafkaRecord> perPartitionLastSuccessfulRecord;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);

Expand Down Expand Up @@ -214,6 +220,19 @@ public LongWatermark getLwm() {

public KafkaStreamingExtractor(WorkUnitState state) {
super(state);
this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
Map<KafkaPartition, LongWatermark> topicPartitionWatermarks = getTopicPartitionWatermarks(this.topicPartitions);
if (this.maxAvgRecordSize > 0 ) {
long maxPollRecords =
state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES, DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;
maxPollRecords = Math.min(maxPollRecords, state.getPropAsInt(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS));
state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, maxPollRecords);
log.info("set max.poll.records to be " + maxPollRecords);
} else {
// As there is no avg record size available, using lower number to make sure we don't hit OOM issue
state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS);
log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS);
}
this.kafkaConsumerClientResolver =
new ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
try {
Expand All @@ -229,8 +248,7 @@ public KafkaStreamingExtractor(WorkUnitState state) {
this._schemaRegistry = state.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of(
KafkaSchemaRegistry.<String, S>get(state.getProperties())) : Optional.<KafkaSchemaRegistry<String, S>>absent();

this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
this.kafkaConsumerClient.assignAndSeek(topicPartitions, getTopicPartitionWatermarks(this.topicPartitions));
this.kafkaConsumerClient.assignAndSeek(topicPartitions, topicPartitionWatermarks);
this.messageIterator = this.kafkaConsumerClient.consume();

this.partitions = KafkaUtils.getPartitions(state);
Expand Down Expand Up @@ -292,6 +310,7 @@ private Map<KafkaPartition, LongWatermark> getTopicPartitionWatermarks(List<Kafk
if (kafkaWatermarkMap.containsKey(topicPartitionString)) {
LongWatermark longWatermark = ((KafkaWatermark) kafkaWatermarkMap.get(topicPartitionString)).getLwm();
longWatermarkMap.put(topicPartition, longWatermark);
maxAvgRecordSize = Math.max(maxAvgRecordSize, ((KafkaWatermark) kafkaWatermarkMap.get(topicPartitionString)).getAvgRecordSize());
} else {
longWatermarkMap.put(topicPartition, new LongWatermark(0L));
}
Expand Down

0 comments on commit f1f164f

Please sign in to comment.