From 2a2f4c3693e279896461a2098766b6ca0420d76e Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Tue, 8 Oct 2024 17:17:57 -0700 Subject: [PATCH 01/12] [vpj] Kafka ingestion into rows of a data frame via Spark Table API --- .../input/kafka/KafkaInputFormatCombiner.java | 2 + .../input/kafka/KafkaInputRecordReader.java | 2 + .../linkedin/venice/spark/SparkConstants.java | 10 + .../input/pubsub/PartitionSplitters.java | 151 ++++++++ .../clients/VenicePubSubAdminClient.java | 5 + .../VenicePubsubPartitionConsumer.java | 25 ++ .../table/VenicePubsubInputPartition.java | 54 +++ .../VenicePubsubInputPartitionReader.java | 339 ++++++++++++++++++ ...nicePubsubInputPartitionReaderFactory.java | 36 ++ .../pubsub/table/VenicePubsubInputScan.java | 90 +++++ .../table/VenicePubsubInputScanBuilder.java | 19 + .../pubsub/table/VenicePubsubInputTable.java | 52 +++ .../pubsub/table/VenicePubsubSource.java | 42 +++ .../input/pubsub/PartitionSplittersTest.java | 141 ++++++++ .../VenicePubsubInputPartitionReaderTest.java | 125 +++++++ .../table/VenicePubsubInputPartitionTest.java | 38 ++ .../input/kafka/TestKafkaInputFormat.java | 4 +- 17 files changed, 1133 insertions(+), 2 deletions(-) create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/PartitionSplitters.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartition.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java create mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java create mode 100644 clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/PartitionSplittersTest.java create mode 100644 clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java create mode 100644 clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java index 9af9742116..79099fd4b3 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java @@ -23,6 +23,8 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; +// for spark impl this needs to be moved out to the next stages after all the kafka records are captured. + /** * This class is a Combiner, which is a functionality of the MR framework where we can plug a {@link Reducer} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java index 0e8aa3e9ff..f8b13c7a5a 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java @@ -159,6 +159,8 @@ public KafkaInputRecordReader( /** * This function will skip all the Control Messages right now. + * This method gets 2 objects, a key and a value, and fills them with + * the next key and value from the Kafka topic partition. */ @Override public boolean next(KafkaInputMapperKey key, KafkaInputMapperValue value) throws IOException { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java index e8ba243b85..bffa85bc36 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java @@ -20,6 +20,16 @@ public class SparkConstants { new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()), new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()) }); + public static final StructType KAFKA_INPUT_TABLE_SCHEMA = new StructType( + new StructField[] { new StructField("__offset__", IntegerType, false, Metadata.empty()), + new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()), + new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()), + new StructField("__partition__", IntegerType, false, Metadata.empty()), + new StructField("__message_type__", IntegerType, false, Metadata.empty()), + new StructField("__schema_id__", IntegerType, false, Metadata.empty()), + new StructField("__replication_metadata_version_id__", IntegerType, false, Metadata.empty()), + new StructField("__replication_metadata_payload__", BinaryType, false, Metadata.empty()) }); + public static final StructType DEFAULT_SCHEMA_WITH_PARTITION = new StructType( new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()), new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()), diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/PartitionSplitters.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/PartitionSplitters.java new file mode 100644 index 0000000000..a5f1187f30 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/PartitionSplitters.java @@ -0,0 +1,151 @@ +package com.linkedin.venice.spark.input.pubsub; + +import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class PartitionSplitters { + // need a method called fullPartitionSplitter, takes in list of partition start and end offsets + // and returns a list of VenicePubsubInputPartition splits + public static Map>> fullPartitionSplitter(Map> partitionOffsetsMap) { + Map>> splits = new HashMap<>(); + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + int partitionNumber = entry.getKey(); + List offsets = entry.getValue(); // start and end offsets + List> split = new ArrayList<>(); + split.add(assembleSegment(offsets.get(0), offsets.get(1))); + splits.put(partitionNumber, split); + } + return splits; + } + + // this one splits the partitions into equal length segments based on the total number of segments intended + // the final segment count is going to be less than desired count + partition count to accomodate for partial segments + // and division remainders. This method prioritizes paralellism over segment count. + public static Map>> segmentCountSplitter( + Map> partitionOffsetsMap, + int totalSegments) { + Map>> splits = new HashMap<>(); + long intendedSplitLength = computeIntendedSplitLengthBasedOnCount(partitionOffsetsMap, totalSegments); + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + int partitionNumber = entry.getKey(); + List offsets = entry.getValue(); + long startOffset = offsets.get(0); + long endOffset = offsets.get(1); + long partitionLength = (endOffset - startOffset); + if (intendedSplitLength >= partitionLength) { + // this whole partition fits nicely in one chunk + List> split = new ArrayList<>(); + split.add(assembleSegment(startOffset, endOffset)); + splits.put(partitionNumber, split); // this partition is going to be consumed by a single task + } else { + // this partition needs to be split into multiple segments + // first lets see how many segments we can get out of this partition + long count = (long) Math.ceil((double) partitionLength / intendedSplitLength); + long segmentLength = (long) Math.ceil((double) partitionLength / count); + List> split = new ArrayList<>(); + + // generate the segments + for (int i = 0; i < count; i++) { + List segment = new ArrayList<>(); + segment.add(startOffset + i * segmentLength); + if (i == count - 1) { + segment.add(endOffset); // last segment + } else { + segment.add(startOffset + (i + 1) * segmentLength - 1); + } + split.add(segment); + } + splits.put(partitionNumber, split); // Multiple splits for this partition + } + } + return splits; + } + + // and a method that cuts the partitions into segments based on the total number of messages + public static Map>> messageCountSplitter( + Map> partitionOffsetsMap, + long chunkOffsetSize) { + Map>> splits = new HashMap<>(); + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + int partitionNumber = entry.getKey(); + List offsets = entry.getValue(); + long startOffset = offsets.get(0); + long endOffset = offsets.get(1); + long partitionLength = (endOffset - startOffset); + if (chunkOffsetSize <= partitionLength) { + // this whole partition fits nicely in one chunk + List> split = new ArrayList<>(); + split.add(assembleSegment(startOffset, endOffset)); + splits.put(partitionNumber, split); // this partition is going to be consumed by a single task + } else { + // this partition needs to be split into multiple segments + // first lets see how many segments we can get out of this partition + long count = (long) Math.ceil((double) partitionLength / chunkOffsetSize); + long segmentLength = (long) Math.ceil((double) partitionLength / count); + List> split = new ArrayList<>(); + + // generate the segments + for (int i = 0; i < count; i++) { + List segment = new ArrayList<>(); + segment.add(startOffset + i * segmentLength); + segment.add(startOffset + (i + 1) * segmentLength); + split.add(segment); + } + splits.put(partitionNumber, split); // Multiple splits for this partition + } + } + return splits; + } + + // assemble and wrap the splits into VenicePubsubInputPartition objects. + public static List convertToInputPartitions( + String region, + String topicName, + Map>> splits) { + List veniceInputPartitions = new ArrayList<>(); + for (Map.Entry>> entry: splits.entrySet()) { + int partitionNumber = entry.getKey(); + List> segments = entry.getValue(); + for (List segment: segments) { + long startOffset = segment.get(0); + long endOffset = segment.get(1); + VenicePubsubInputPartition partition = + new VenicePubsubInputPartition(region, topicName, partitionNumber, startOffset, endOffset); + veniceInputPartitions.add(partition); + } + } + return veniceInputPartitions; + } + + // utility methods + private static List assembleSegment(long startOffset, long endOffset) { + List split; + List segment = new ArrayList<>(); + segment.add(startOffset); + segment.add(endOffset); + return segment; + } + + static long computeIntendedSplitLengthBasedOnCount(Map> partitionOffsetsMap, int totalSegments) { + Double totalLength = computeTopicLengthInOffsets(partitionOffsetsMap); + return (long) Math.ceil(totalLength / totalSegments); + } + + static long computeIntendedSplitCountBasedOnOffset(Map> partitionOffsetsMap, long offsetCount) { + Double totalLength = computeTopicLengthInOffsets(partitionOffsetsMap); + return (long) Math.ceil(totalLength / offsetCount); + } + + protected static Double computeTopicLengthInOffsets(Map> partitionOffsetsMap) { + Double totalLength = 0.0; + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + List offsets = entry.getValue(); + totalLength += offsets.get(1) - offsets.get(0); + } + return totalLength; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java new file mode 100644 index 0000000000..0a93f06dcf --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java @@ -0,0 +1,5 @@ +package com.linkedin.venice.spark.input.pubsub.clients; + +public class VenicePubSubAdminClient { + +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java new file mode 100644 index 0000000000..ef005a55bc --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java @@ -0,0 +1,25 @@ +package com.linkedin.venice.spark.input.pubsub.clients; + +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; +import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; +import java.util.Properties; + + +public class VenicePubsubPartitionConsumer { + private final Properties configBag; + private final VenicePubsubInputPartition inputPartition; + private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private ApacheKafkaConsumerAdapter consumer; + private ApacheKafkaAdminAdapter admin; + + public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) { + this.configBag = configBag; + this.inputPartition = inputPartition; + String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_" + + inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset(); + + } + +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartition.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartition.java new file mode 100644 index 0000000000..54f1fae732 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartition.java @@ -0,0 +1,54 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import org.apache.spark.sql.connector.read.InputPartition; + +/* + This split can be a whole partition or sub part of a pubsub partition, hence the name segment. + This is intentional not to mix up the Kafka partition and spark idea of a split + the equivalent class for hdfs is VeniceHdfsInputPartition + */ + + +public class VenicePubsubInputPartition implements InputPartition { + // + private static final long serialVersionUID = 1L; + + private final String region; + private final String TopicName; + private final int partitionNumber; + private final long segmentStartOffset; + private final long segmentEndOffset; + + public VenicePubsubInputPartition( + String region, + String topicName, + int partitionNumber, + long startOffset, + long endOffset) { + this.region = region; + this.TopicName = topicName; + this.partitionNumber = partitionNumber; + this.segmentStartOffset = startOffset; + this.segmentEndOffset = endOffset; + } + + public String getRegion() { + return region; + } + + public String getTopicName() { + return TopicName; + } + + public int getPartitionNumber() { + return partitionNumber; + } + + public long getSegmentStartOffset() { + return segmentStartOffset; + } + + public long getSegmentEndOffset() { + return segmentEndOffset; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java new file mode 100644 index 0000000000..036ffdd57f --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -0,0 +1,339 @@ +package com.linkedin.venice.spark.input.pubsub.table; +/* + this is the code that runs within each task and consumes from pubsub and return rows + This is comparable to the Mapper stage of the hadoop map reduce KIF + */ + +import com.linkedin.venice.kafka.protocol.Delete; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.utils.VeniceProperties; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.jetbrains.annotations.NotNull; + + +public class VenicePubsubInputPartitionReader implements PartitionReader { + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]); + private static final int CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES = 12; + private static final long EMPTY_POLL_SLEEP_TIME_MS = TimeUnit.SECONDS.toMillis(5); + private static final Long CONSUMER_POLL_TIMEOUT = TimeUnit.SECONDS.toMillis(1); // 1 second + + private static final Logger LOGGER = LogManager.getLogger(VenicePubsubInputPartitionReader.class); + + private final boolean filterControlMessages = true; + + // this is the buffer that holds the messages that are consumed from the pubsub + private final PubSubTopicPartition targetPubSubTopicPartition; + private final String targetPubSubTopicName; + private final int targetPartitionNumber; + private final long startingOffset; + private final long endingOffset; + private final long offsetLength; + private final PubSubConsumerAdapter pubSubConsumer; + // inputPartitionReader local buffer, that gets filled from partitionMessagesBuffer + + private final ArrayDeque> messageBuffer = new ArrayDeque<>(); + private long currentOffset; + private InternalRow currentRow = null; + private long recordsServed = 0; + private long recordsSkipped = 0; + private long lastKnownProgressPercent = 0; + + private Map>> consumerBuffer = + new HashMap<>(); + // the buffer that holds the relevant messages for the current partition + private List> partitionMessagesBuffer = new ArrayList<>(); + + public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputPartition inputPartition) { + this( + jobConfig, + inputPartition, + new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() + .create( + new VeniceProperties(jobConfig), + false, + PubSubMessageDeserializer.getInstance(), + // PubSubPassThroughDeserializer.getInstance(), + "Spark_KIF_consumer"), + new PubSubTopicRepository()); + } + + // testing constructor + public VenicePubsubInputPartitionReader( + Properties jobConfig, + VenicePubsubInputPartition inputPartition, + PubSubConsumerAdapter consumer, + PubSubTopicRepository pubSubTopicRepository) { + + targetPubSubTopicName = inputPartition.getTopicName(); + targetPartitionNumber = inputPartition.getPartitionNumber(); + startingOffset = inputPartition.getSegmentStartOffset(); + endingOffset = inputPartition.getSegmentEndOffset(); + offsetLength = endingOffset - startingOffset; + VeniceProperties veniceProperties = new VeniceProperties(jobConfig); + + this.pubSubConsumer = consumer; + + LOGGER.info( + "Consuming started for Topic: {} Partition {}, starting offset: {} ending offset: {}", + targetPubSubTopicName, + targetPartitionNumber, + startingOffset, + endingOffset); + + PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(targetPubSubTopicName); + + // List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); + // at this point, we hope that driver has given us good information about + // the partition and offsets and the fact that topic exists. + + targetPubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopic, targetPartitionNumber); + + pubSubConsumer.subscribe(targetPubSubTopicPartition, startingOffset - 1); + // pubSubConsumer.seek(startingOffset); // do we need this? or should we rely on the starting offset passed to + // subscribe ? + + next(); // get the first record ready to go. + } + + // if it returns a row, it's going to be key and value and offset in the row in that order + @Override + public InternalRow get() { + // should return the same row if called multiple times + return currentRow; + } + + @Override + public boolean next() { + // Are we past the finish line ? + if (currentOffset > endingOffset) { + return false; + } + + if (ableToPrepNextRow()) { + // there is a fresh row to serve + return true; + } + // at this point, buffer is empty. + + loadRecords(); // try to poll for some records and allow the exception to bubble up + return ableToPrepNextRow(); + } + + @Override + public void close() { + pubSubConsumer.close(); + LOGGER.info( + "Consuming ended for Topic: {} , consumed {} records, skipped {} records", + targetPubSubTopicName, + recordsServed, + recordsSkipped); + } + + // borrowing Gaojie's code for dealing with empty polls. + private void loadRecords() { + int retry = 0; + while (retry++ < CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES) { + consumerBuffer = pubSubConsumer.poll(CONSUMER_POLL_TIMEOUT); + partitionMessagesBuffer = consumerBuffer.get(targetPubSubTopicPartition); + if (!partitionMessagesBuffer.isEmpty()) { + // we got some records back for the desired partition. + break; + } + + try { + Thread.sleep(EMPTY_POLL_SLEEP_TIME_MS); + } catch (InterruptedException e) { + logProgress(); + LOGGER.error( + "Interrupted while waiting for records to be consumed from topic {} partition {} to be available", + targetPubSubTopicName, + targetPartitionNumber, + e); + // should we re-throw here to break the consumption task ? + } + } + if (partitionMessagesBuffer.isEmpty()) { + // this is a valid place to throw exception and kill the consumer task + // as there is no more records to consume. + throw new RuntimeException("Empty poll after " + retry + " retries"); + } + messageBuffer.addAll(partitionMessagesBuffer); + } + + private InternalRow processPubSubMessageToRow( + @NotNull PubSubMessage pubSubMessage) { + // after deliberation, I think we are better off isolating further processing of the messages after they are dumped + // into the dataframe, Spark job can handle the rest of the processing. + + // should we detect chunking on the topic ? + + KafkaKey kafkaKey = pubSubMessage.getKey(); + KafkaMessageEnvelope kafkaMessageEnvelope = pubSubMessage.getValue(); + MessageType pubSubMessageType = MessageType.valueOf(kafkaMessageEnvelope); + + /* + List of fields we need in the row: @see KAFKA_INPUT_TABLE_SCHEMA + 1. offset ( currently a long , maybe some other complicated thing in the Northguard world) + 2. key ( serialized key Byte[]) + 3. value ( serialized value Byte[]) + 4. partition ( int ) + 5. messageType ( put vs delete ) .getValue is the int value and gives us that. value type is also of this kind + 6. schemaId ( for put and delete ) int + 7. replicationMetadataPayload ByteBuffer + 8. replicationMetadataVersionId int + */ + + // Spark row setup : + long offset = pubSubMessage.getOffset(); + ByteBuffer key = ByteBuffer.wrap(kafkaKey.getKey(), 0, kafkaKey.getKeyLength()); + ByteBuffer value; + int partition = targetPartitionNumber; + int messageType; + int schemaId; + ByteBuffer replicationMetadataPayload; + int replicationMetadataVersionId; + + switch (pubSubMessageType) { + case PUT: + Put put = (Put) kafkaMessageEnvelope.payloadUnion; + messageType = MessageType.PUT.getValue(); + value = put.putValue; + schemaId = put.schemaId; // chunking will be handled down the road in spark job. + replicationMetadataPayload = put.replicationMetadataPayload; + replicationMetadataVersionId = put.replicationMetadataVersionId; + break; + case DELETE: + messageType = MessageType.DELETE.getValue(); + Delete delete = (Delete) kafkaMessageEnvelope.payloadUnion; + schemaId = delete.schemaId; + value = EMPTY_BYTE_BUFFER; + replicationMetadataPayload = delete.replicationMetadataPayload; + replicationMetadataVersionId = delete.replicationMetadataVersionId; + break; + default: + messageType = -1; // this is an error condition + schemaId = Integer.MAX_VALUE; + value = EMPTY_BYTE_BUFFER; + replicationMetadataPayload = EMPTY_BYTE_BUFFER; + replicationMetadataVersionId = Integer.MAX_VALUE; + // we don't care about messages other than PUT and DELETE + } + + /* + Dealing with chunking : + @link https://github.com/linkedin/venice/blob/main/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java#L53 + * 1. The top-level key is queried. + * 2. The top-level key's value's schema ID is checked. + * a) If it is positive, then it's a full value, and is returned immediately. + * b) If it is negative, then it's a {@link ChunkedValueManifest}, and we continue to the next steps. + * 3. The {@link ChunkedValueManifest} is deserialized, and its chunk keys are extracted. + * 4. Each chunk key is queried. + * 5. The chunks are stitched back together using the various adapter interfaces of this package, + * depending on whether it is the single get or batch get/compute path that needs to re-assemble + * a chunked value. + + For dumping application, we can treat this as pass-through . + + chunking code: + RawKeyBytesAndChunkedKeySuffix rawKeyAndChunkedKeySuffix = + splitCompositeKey(kafkaKey.getKey(), messageType, getSchemaIdFromValue(kafkaMessageEnvelope)); + key.key = rawKeyAndChunkedKeySuffix.getRawKeyBytes(); + + value.chunkedKeySuffix = rawKeyAndChunkedKeySuffix.getChunkedKeySuffixBytes(); + */ + + // need to figure out task tracking in Spark Land. + // pack pieces of information into the spart intermediate row, this will populate the dataframe to be read by the + // spark job + // weirdest use of verb "GET" in heabBuffer !!!!! + byte[] keyBytes = new byte[key.remaining()]; + key.get(keyBytes); + byte[] valueBytes = new byte[value.remaining()]; + value.get(valueBytes); + byte[] replicationMetadataPayloadBytes = new byte[replicationMetadataPayload.remaining()]; + replicationMetadataPayload.get(replicationMetadataPayloadBytes); + + return new GenericInternalRow( + new Object[] { offset, keyBytes, valueBytes, partition, messageType, schemaId, replicationMetadataPayloadBytes, + replicationMetadataVersionId }); + } + + private void maybeLogProgress() { + long progressPercent = (currentOffset - startingOffset) * 100 / offsetLength; + if (progressPercent > 10 + lastKnownProgressPercent) { + logProgress(); + lastKnownProgressPercent = progressPercent; + } + } + + private void logProgress() { + long progressPercent = (currentOffset - startingOffset) * 100 / offsetLength; + LOGGER.info( + "Consuming progress for" + + " Topic: {}, partition {} , consumed {}% of {} records. actual records delivered: {}, records skipped: {}", + targetPubSubTopicName, + targetPartitionNumber, + progressPercent, + offsetLength, + recordsServed, + recordsSkipped); + } + + // go through the current buffer and find the next usable message + private boolean ableToPrepNextRow() { + + PubSubMessage message; + boolean found; + // buffer is already empty. + if (messageBuffer.isEmpty()) { + return false; + } + + // look for the next viable message + found = false; + while (!found) { + try { + message = messageBuffer.pop(); + } catch (NoSuchElementException e) { + // ran out of messages in the buffer + return false; + } + + currentOffset = message.getOffset(); + + if (filterControlMessages && message.getKey().isControlMessage()) { + recordsSkipped++; + } else { + currentRow = processPubSubMessageToRow(message); + recordsServed++; + found = true; + } + } + maybeLogProgress(); + return true; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java new file mode 100644 index 0000000000..1fb1986b75 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java @@ -0,0 +1,36 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import java.util.Properties; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; + + +public class VenicePubsubInputPartitionReaderFactory implements PartitionReaderFactory { + private static final long serialVersionUID = 1L; + + private final Properties jobConfig; + + public VenicePubsubInputPartitionReaderFactory(Properties jobConfig) { + this.jobConfig = jobConfig; + } + + @Override + public PartitionReader createReader(InputPartition inputPartition) { + + if (!(inputPartition instanceof VenicePubsubInputPartition)) { + throw new IllegalArgumentException( + "VenicePubsubInputPartitionReaderFactory can only create readers for VenicePubsubInputPartition"); + } + + return new VenicePubsubInputPartitionReader(jobConfig, (VenicePubsubInputPartition) inputPartition); + } + + // Make it explicit that this reader does not support columnar reads. + @Override + public boolean supportColumnarReads(InputPartition partition) { + return false; + } + +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java new file mode 100644 index 0000000000..fa331ec71b --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java @@ -0,0 +1,90 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.spark.input.pubsub.PartitionSplitters; +import com.linkedin.venice.utils.VeniceProperties; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.types.StructType; + + +public class VenicePubsubInputScan implements Scan, Batch { + private final Properties jobConfig; + + public VenicePubsubInputScan(Properties jobConfig) { + this.jobConfig = jobConfig; + } + + @Override + public InputPartition[] planInputPartitions() { + try { + String topicName = "test_topic_v_1"; + String regionName = "ei-ltx1"; + int splitCount = 1000; + VeniceProperties veniceProperties = new VeniceProperties(jobConfig); + PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName); + PubSubClientsFactory clientsFactory = new PubSubClientsFactory(veniceProperties); + // PubSubAdminAdapter pubsubAdminClient = + // clientsFactory.getAdminAdapterFactory().create(veniceProperties, pubSubTopicRepository); + PubSubMessageDeserializer pubSubMessageDeserializer = PubSubMessageDeserializer.getInstance(); + PubSubConsumerAdapter pubSubConsumer = clientsFactory.getConsumerAdapterFactory() + .create(veniceProperties, false, pubSubMessageDeserializer, "Spark_KIF_planner"); + + // surround by retries from retryUtils or something + List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); + + int numPartitions = listOfPartitions.size(); // number of partitions in the topic + + // need a map of int to long,long to store the start and end offsets for each partition + Map> partitionOffsetsMap = new HashMap<>(); + + for (PubSubTopicPartitionInfo partition: listOfPartitions) { + PubSubTopicPartition pubSubTopicPartition = partition.getTopicPartition(); + int partitionNumber = partition.getTopicPartition().getPartitionNumber(); + + // do these with retries from retryUtils or something + long startOffset = pubSubConsumer.beginningOffset(pubSubTopicPartition, Duration.ofSeconds(60)); + long endOffset = pubSubConsumer.endOffset(pubSubTopicPartition); + // + + partitionOffsetsMap.put(partitionNumber, Arrays.asList(startOffset, endOffset)); + } + + Map>> splits = PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, splitCount); + InputPartition[] inputPartitions = + PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]); + return (inputPartitions); + } catch (Exception e) { + // handle exception + // something broke in the process of getting the splits + return null; // ? how do I tell spart that this is a failure? + } + + } + + @Override + public PartitionReaderFactory createReaderFactory() { + + return new VenicePubsubInputPartitionReaderFactory(jobConfig); + } + + @Override + public StructType readSchema() { + return null; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java new file mode 100644 index 0000000000..54fb008b6e --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java @@ -0,0 +1,19 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import java.util.Properties; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.ScanBuilder; + + +public class VenicePubsubInputScanBuilder implements ScanBuilder { + private final Properties jobConfig; + + public VenicePubsubInputScanBuilder(Properties properties) { + this.jobConfig = properties; + } + + @Override + public Scan build() { + return new VenicePubsubInputScan(jobConfig); + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java new file mode 100644 index 0000000000..55298de550 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java @@ -0,0 +1,52 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static com.linkedin.venice.spark.SparkConstants.*; + +import java.util.Collections; +import java.util.Properties; +import java.util.Set; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + + +/** + * This is the entrypoint of the Pubsub input source. It is used by Spark to create a DataFrame from a Pubsub topic. + */ + +public class VenicePubsubInputTable implements SupportsRead { + static final String INPUT_TABLE_NAME = "venice_pubsub_table"; + private final Properties properties; + + public VenicePubsubInputTable(Properties properties) { + this.properties = properties; + // infer pubsub consumer properties from the properties + // + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + // convert the options to properties + Properties properties = new Properties(); + properties.putAll(options.asCaseSensitiveMap()); + + return new VenicePubsubInputScanBuilder(properties); + } + + @Override + public String name() { + return INPUT_TABLE_NAME; + } + + @Override + public StructType schema() { + return DEFAULT_SCHEMA; + } + + @Override + public Set capabilities() { + return Collections.singleton(TableCapability.BATCH_READ); + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java new file mode 100644 index 0000000000..844b02ee0b --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java @@ -0,0 +1,42 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static com.linkedin.venice.spark.SparkConstants.*; + +import java.util.Map; +import java.util.Properties; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + + +public class VenicePubsubSource implements TableProvider { + public StructType inferSchema(CaseInsensitiveStringMap options) { + return KAFKA_INPUT_TABLE_SCHEMA; + } + + @Override + public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { + return TableProvider.super.inferPartitioning(options); + } + + @Override + public Table getTable(StructType schema, Transform[] partitioning, Map sparkConfigs) { + Properties properties = new Properties(); + properties.putAll(sparkConfigs); + // the properties here is the entry point for all the configurations + // we receive from the outer layer. + // schem and partitioning are useless and should be discarded? + // + + // VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(properties); + + return new VenicePubsubInputTable(properties); + } + + @Override + public boolean supportsExternalMetadata() { + return false; + } +} diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/PartitionSplittersTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/PartitionSplittersTest.java new file mode 100644 index 0000000000..86688150dd --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/PartitionSplittersTest.java @@ -0,0 +1,141 @@ +package com.linkedin.venice.spark.input.pubsub; + +import static org.testng.Assert.*; + +import java.util.*; +import org.testng.annotations.Test; + + +public class PartitionSplittersTest { + @Test + void fullPartitionSplitter_ReturnsCorrectSplits() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 100L)); + partitionOffsetsMap.put(1, Arrays.asList(100L, 200L)); + partitionOffsetsMap.put(2, Arrays.asList(0L, 20_000_000L)); + + Map>> result = PartitionSplitters.fullPartitionSplitter(partitionOffsetsMap); + + // result is a map of partition number to list of segments + // need to assert value and keys are correct based on input data + assertEquals(3, result.size()); + assertEquals(Arrays.asList(Arrays.asList(0L, 100L)), result.get(0)); + assertEquals(Arrays.asList(Arrays.asList(100L, 200L)), result.get(1)); + assertEquals(Arrays.asList(Arrays.asList(0L, 20_000_000L)), result.get(2)); + + } + + @Test + void segmentCountSplitter_SplitsCorrectly_CommonCase() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(20_000L, 145_408L)); + partitionOffsetsMap.put(1, Arrays.asList(25_000L, 170_067L)); + partitionOffsetsMap.put(2, Arrays.asList(30_000L, 150_022L)); + partitionOffsetsMap.put(3, Arrays.asList(21_000L, 190_031L)); + partitionOffsetsMap.put(4, Arrays.asList(22_000L, 145_343L)); + + final int intendedSplitCount = 40; + + Map>> result = + PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, intendedSplitCount); + + int totalPartitionsPresent = result.size(); + assertEquals(5, totalPartitionsPresent); // right number of partitions present in the result + + // count all the segments + int totalSegments = 0; + for (List> segments: result.values()) { + totalSegments += segments.size(); + } + assertTrue(intendedSplitCount <= totalSegments); // at least as many segments as intended + assertTrue(intendedSplitCount + totalPartitionsPresent > totalSegments); // at most intended + partition count + + assertEquals(8, result.get(0).size()); + assertEquals(9, result.get(1).size()); + + assertEquals(Arrays.asList(20_000L, 35_675L), result.get(0).get(0)); // spot check first segment + assertEquals(Arrays.asList(37_904L, 54_807L), result.get(3).get(1)); // spot check middle segments + assertEquals(Arrays.asList(25_000L, 41_118L), result.get(1).get(0)); // spot check middle segment + assertEquals(Arrays.asList(129_926L, 145_343L), result.get(4).get(7)); // spot check first segment + + } + + @Test + void segmentCountSplitter_SplitsCorrectly_EdgeCase1() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 150L)); + partitionOffsetsMap.put(3, Arrays.asList(110L, 200L)); + partitionOffsetsMap.put(2, Arrays.asList(50L, 70L)); + partitionOffsetsMap.put(1, Arrays.asList(20L, 23L)); + partitionOffsetsMap.put(5, Arrays.asList(20L, 400L)); + + final int intendedSplitCount = 9; + + Map>> result = + PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, intendedSplitCount); + + int totalPartitionsPresent = result.size(); + assertEquals(5, totalPartitionsPresent); // right number of partitions present in the result + + // count all the segments + int totalSegments = 0; + for (List> segments: result.values()) { + totalSegments += segments.size(); + } + assertTrue(intendedSplitCount <= totalSegments); // at least as many segments as intended + assertTrue(intendedSplitCount + totalPartitionsPresent > totalSegments); // at most intended + partition count + + assertEquals(3, result.get(0).size()); + assertEquals(1, result.get(1).size()); + + assertEquals(Arrays.asList(155L, 200L), result.get(3).get(1)); + assertEquals(Arrays.asList(20L, 23L), result.get(1).get(0)); + + } + + @Test + void segmentCountSplitter_HandlesSingleSegment() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 50L)); + + Map>> result = PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, 1); + + assertEquals(1, result.size()); + assertEquals(1, result.get(0).size()); + } + + @Test + void computeIntendedSplitLengthBasedOnCount_CalculatesCorrectly() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 97L)); + partitionOffsetsMap.put(1, Arrays.asList(100L, 903L)); + partitionOffsetsMap.put(2, Arrays.asList(0L, 2022L)); + + long result = PartitionSplitters.computeIntendedSplitLengthBasedOnCount(partitionOffsetsMap, 29); + + assertEquals(101L, result); + } + + @Test + void computeIntendedSplitCountBasedOnOffset_CalculatesCorrectly() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 100L)); + partitionOffsetsMap.put(1, Arrays.asList(100L, 200L)); + + long result = PartitionSplitters.computeIntendedSplitCountBasedOnOffset(partitionOffsetsMap, 50L); + + assertEquals(4, result); + } + + @Test + void computeTopicLengthInOffsets_CalculatesCorrectly() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 101L)); + partitionOffsetsMap.put(1, Arrays.asList(99L, 203L)); + + Double result = PartitionSplitters.computeTopicLengthInOffsets(partitionOffsetsMap); + + assertEquals(205D, result); + } + +} diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java new file mode 100644 index 0000000000..cb4b80c43f --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -0,0 +1,125 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static com.linkedin.venice.kafka.protocol.enums.MessageType.*; +import static com.linkedin.venice.vpj.VenicePushJobConstants.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.ImmutablePubSubMessage; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.spark.sql.catalyst.InternalRow; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +public class VenicePubsubInputPartitionReaderTest { + private static final String KAFKA_MESSAGE_KEY_PREFIX = "key_"; + private static final String KAFKA_MESSAGE_VALUE_PREFIX = "value_"; + private static final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private VenicePubsubInputPartitionReader reader; + private VenicePubsubInputPartition inputPartition; + + @BeforeTest + public void setUp() { + Properties jobConfig = new Properties(); + int startingOffset = 0; // starting offset other than 0 needs mocking of subscription ... + int endingOffset = 100; + int targetPartitionNumber = 42; + + String topicName = "BigStrangePubSubTopic_V1_rt_r"; + + jobConfig.put(KAFKA_INPUT_BROKER_URL, "kafkaAddress"); + jobConfig.put(KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP, ChunkedKeySuffix.SCHEMA$.toString()); + jobConfig.put(KAFKA_INPUT_TOPIC, topicName); + + PubSubConsumerAdapter consumer = mock(PubSubConsumerAdapter.class); + + long numRecords = endingOffset - startingOffset; + List> consumerRecordList = new ArrayList<>(); + + PubSubTopicPartition pubSubTopicPartition = + new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topicName), targetPartitionNumber); + + // fill the topic message array + for (int i = startingOffset; i < numRecords; ++i) { + byte[] keyBytes = (KAFKA_MESSAGE_KEY_PREFIX + i).getBytes(); + byte[] valueBytes = (KAFKA_MESSAGE_VALUE_PREFIX + i).getBytes(); + + KafkaKey kafkaKey = new KafkaKey(PUT, keyBytes); + KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); + messageEnvelope.producerMetadata = new ProducerMetadata(); + messageEnvelope.producerMetadata.messageTimestamp = 0; + messageEnvelope.producerMetadata.messageSequenceNumber = 0; + messageEnvelope.producerMetadata.segmentNumber = 0; + messageEnvelope.producerMetadata.producerGUID = new GUID(); + Put put = new Put(); + put.schemaId = 42; // shouldn't go with -1, -1 is reserved for chunking . + put.putValue = ByteBuffer.wrap(valueBytes); + put.replicationMetadataPayload = ByteBuffer.allocate(0); + messageEnvelope.payloadUnion = put; + consumerRecordList.add(new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, i, -1, -1)); + } + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(pubSubTopicPartition, consumerRecordList); + when(consumer.poll(anyLong())).thenReturn(recordsMap, new HashMap<>()); + + inputPartition = + new VenicePubsubInputPartition("prod-lva2", topicName, targetPartitionNumber, startingOffset, endingOffset); + + reader = new VenicePubsubInputPartitionReader(jobConfig, inputPartition, consumer, pubSubTopicRepository); + } + + @Test + public void testNext() { + assertTrue(reader.next()); + for (int i = 0; i < 99; i++) { + reader.get(); + assertTrue(reader.next()); + } + reader.get(); + assertFalse(reader.next()); + reader.close(); + } + + @Test + public void testGet() { + InternalRow row = reader.get(); + System.out.println(row); + long offset = row.getLong(0); + byte[] key = row.getBinary(1); + byte[] value = row.getBinary(2); + + assertEquals(key, (KAFKA_MESSAGE_KEY_PREFIX + offset).getBytes()); + assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); + + // assertEquals(row.get(0, KafkaKey), "dummyData1"); + // assertTrue(row.getInt(1) >= 0); + // assertTrue(row.getInt(1) < 1000); + // assertTrue(row.getBoolean(2)); + System.out.println(key); + System.out.println(value); + } + + @Test + public void testClose() { + reader.close(); + // Add assertions if there are any resources to verify after close + } +} diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java new file mode 100644 index 0000000000..217cdd80d4 --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java @@ -0,0 +1,38 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static org.testng.Assert.*; + +import org.apache.commons.lang3.SerializationUtils; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +public class VenicePubsubInputPartitionTest { + VenicePubsubInputPartition targetObject; + + @Test + public void testSerializablity() { + + byte[] serialized = SerializationUtils.serialize(targetObject); + VenicePubsubInputPartition deserializedObject = SerializationUtils.deserialize(serialized); + + assertEquals(deserializedObject.getTopicName(), targetObject.getTopicName()); + assertEquals(deserializedObject.getPartitionNumber(), targetObject.getPartitionNumber()); + assertEquals(deserializedObject.getSegmentStartOffset(), targetObject.getSegmentStartOffset()); + assertEquals(deserializedObject.getSegmentEndOffset(), targetObject.getSegmentEndOffset()); + + // object should give back exactly what was put in. + assertEquals(deserializedObject.getRegion(), "prod-lva2"); + assertEquals(deserializedObject.getTopicName(), "BigStrangePubSubTopic_V1_rt_r"); + assertEquals(deserializedObject.getSegmentEndOffset(), 100_000_000); + assertEquals(deserializedObject.getSegmentStartOffset(), 49152); + assertEquals(deserializedObject.getPartitionNumber(), 42); + + } + + @BeforeTest + public void setUp() { + targetObject = + new VenicePubsubInputPartition("prod-lva2", "BigStrangePubSubTopic_V1_rt_r", 42, 49_152, 100_000_000); + } +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java index 0424e185f0..b10cd0349d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java @@ -58,7 +58,7 @@ public void cleanUp() throws IOException { pubSubBrokerWrapper.close(); } - public String getTopic(int numRecord, int numPartition) { + public String getNewTopicContainingRecords(int numRecord, int numPartition) { String topicName = Utils.getUniqueString("test_kafka_input_format") + "_v1"; manager.createTopic(pubSubTopicRepository.getTopic(topicName), numPartition, 1, true); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = @@ -122,7 +122,7 @@ public int compare(KafkaInputSplit o1, KafkaInputSplit o2) { @Test public void testGetSplits() throws IOException { KafkaInputFormat kafkaInputFormat = new KafkaInputFormat(); - String topic = getTopic(1000, 3); + String topic = getNewTopicContainingRecords(1000, 3); JobConf conf = new JobConf(); conf.set(KAFKA_INPUT_BROKER_URL, pubSubBrokerWrapper.getAddress()); conf.set(KAFKA_INPUT_TOPIC, topic); From 09cdd59d1ff62975ef88e123d5010eee43cc711b Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Wed, 9 Oct 2024 11:00:39 -0700 Subject: [PATCH 02/12] Spotbug fix1 --- .../input/pubsub/table/VenicePubsubInputPartitionReader.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 036ffdd57f..63123ab7cc 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -71,7 +71,8 @@ public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputP this( jobConfig, inputPartition, - new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() + new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() // need to review the + // properties bag ... .create( new VeniceProperties(jobConfig), false, @@ -93,7 +94,6 @@ public VenicePubsubInputPartitionReader( startingOffset = inputPartition.getSegmentStartOffset(); endingOffset = inputPartition.getSegmentEndOffset(); offsetLength = endingOffset - startingOffset; - VeniceProperties veniceProperties = new VeniceProperties(jobConfig); this.pubSubConsumer = consumer; From 56cac6d185aeacd78c148fe66220423abdd1dcf2 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Wed, 9 Oct 2024 15:01:29 -0700 Subject: [PATCH 03/12] The use of VeniceProperties as the method for passing configuration is deeply entrenched in the system, adopting this approach for KIF repush. formatting fixes / datatype fixes / spotbug fixes --- .../VenicePubsubPartitionConsumer.java | 38 +++++++++---------- .../VenicePubsubInputPartitionReader.java | 6 ++- ...nicePubsubInputPartitionReaderFactory.java | 1 - .../pubsub/table/VenicePubsubInputScan.java | 11 ++---- .../pubsub/table/VenicePubsubInputTable.java | 15 ++++---- .../pubsub/table/VenicePubsubSource.java | 6 ++- .../VenicePubsubInputPartitionReaderTest.java | 2 - 7 files changed, 40 insertions(+), 39 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java index ef005a55bc..1e63a4200d 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java @@ -1,25 +1,25 @@ package com.linkedin.venice.spark.input.pubsub.clients; -import com.linkedin.venice.pubsub.PubSubTopicRepository; -import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; -import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; -import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; -import java.util.Properties; - +//import com.linkedin.venice.pubsub.PubSubTopicRepository; +//import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +//import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; +//import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; +//import java.util.Properties; public class VenicePubsubPartitionConsumer { - private final Properties configBag; - private final VenicePubsubInputPartition inputPartition; - private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - private ApacheKafkaConsumerAdapter consumer; - private ApacheKafkaAdminAdapter admin; - - public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) { - this.configBag = configBag; - this.inputPartition = inputPartition; - String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_" - + inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset(); - - } + // potentially future home of consumer if we decide to factor it out of the VenicePubsubPartitionReader + // private final Properties configBag; + // private final VenicePubsubInputPartition inputPartition; + // private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + // private ApacheKafkaConsumerAdapter consumer; + // private ApacheKafkaAdminAdapter admin; + // + // public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) { + // this.configBag = configBag; + // this.inputPartition = inputPartition; + // String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_" + // + inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset(); + // + // } } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 63123ab7cc..cbb7df4468 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -72,7 +72,7 @@ public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputP jobConfig, inputPartition, new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() // need to review the - // properties bag ... + // properties bag ... .create( new VeniceProperties(jobConfig), false, @@ -116,6 +116,10 @@ public VenicePubsubInputPartitionReader( // pubSubConsumer.seek(startingOffset); // do we need this? or should we rely on the starting offset passed to // subscribe ? + initialize(); // see, MET05-J asked for this ! + } + + private void initialize() { next(); // get the first record ready to go. } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java index 1fb1986b75..b8ad293e74 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java @@ -32,5 +32,4 @@ public PartitionReader createReader(InputPartition inputPartition) public boolean supportColumnarReads(InputPartition partition) { return false; } - } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java index fa331ec71b..8b8d1f8c20 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java @@ -1,5 +1,6 @@ package com.linkedin.venice.spark.input.pubsub.table; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -48,7 +49,7 @@ public InputPartition[] planInputPartitions() { // surround by retries from retryUtils or something List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); - int numPartitions = listOfPartitions.size(); // number of partitions in the topic + // int numPartitions = listOfPartitions.size(); // number of partitions in the topic in case. // need a map of int to long,long to store the start and end offsets for each partition Map> partitionOffsetsMap = new HashMap<>(); @@ -66,15 +67,11 @@ public InputPartition[] planInputPartitions() { } Map>> splits = PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, splitCount); - InputPartition[] inputPartitions = - PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]); - return (inputPartitions); + return PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]); } catch (Exception e) { - // handle exception + throw new VeniceException("Could not get FileSystem", e);// handle exception // something broke in the process of getting the splits - return null; // ? how do I tell spart that this is a failure? } - } @Override diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java index 55298de550..d3f1bba9e7 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java @@ -2,6 +2,7 @@ import static com.linkedin.venice.spark.SparkConstants.*; +import com.linkedin.venice.utils.VeniceProperties; import java.util.Collections; import java.util.Properties; import java.util.Set; @@ -18,21 +19,19 @@ public class VenicePubsubInputTable implements SupportsRead { static final String INPUT_TABLE_NAME = "venice_pubsub_table"; - private final Properties properties; + private final VeniceProperties jobConfig; - public VenicePubsubInputTable(Properties properties) { - this.properties = properties; - // infer pubsub consumer properties from the properties + public VenicePubsubInputTable(VeniceProperties jobConfig) { + this.jobConfig = jobConfig; // } @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - // convert the options to properties - Properties properties = new Properties(); + Properties properties = jobConfig.getPropertiesCopy(); properties.putAll(options.asCaseSensitiveMap()); - return new VenicePubsubInputScanBuilder(properties); + return new VenicePubsubInputScanBuilder(properties); // should we flip this to VeniceProperties? } @Override @@ -42,7 +41,7 @@ public String name() { @Override public StructType schema() { - return DEFAULT_SCHEMA; + return KAFKA_INPUT_TABLE_SCHEMA; } @Override diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java index 844b02ee0b..784068b150 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java @@ -2,6 +2,7 @@ import static com.linkedin.venice.spark.SparkConstants.*; +import com.linkedin.venice.utils.VeniceProperties; import java.util.Map; import java.util.Properties; import org.apache.spark.sql.connector.catalog.Table; @@ -13,11 +14,13 @@ public class VenicePubsubSource implements TableProvider { public StructType inferSchema(CaseInsensitiveStringMap options) { + // there is no inference, the table is always created with the same schema return KAFKA_INPUT_TABLE_SCHEMA; } @Override public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { + // we don't support partitioning, it comes from the kafka topic. return TableProvider.super.inferPartitioning(options); } @@ -32,7 +35,8 @@ public Table getTable(StructType schema, Transform[] partitioning, Map= 0); // assertTrue(row.getInt(1) < 1000); // assertTrue(row.getBoolean(2)); - System.out.println(key); - System.out.println(value); } @Test From 4fc913eef3f745e724c0f9dd6433fe1f7bbe1930 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Thu, 31 Oct 2024 19:49:38 -0700 Subject: [PATCH 04/12] Unified the usage of property bags across the rest of the code. --- .../table/VenicePubsubInputPartitionReader.java | 9 ++++----- .../VenicePubsubInputPartitionReaderFactory.java | 6 +++--- .../pubsub/table/VenicePubsubInputScan.java | 16 ++++++++-------- .../table/VenicePubsubInputScanBuilder.java | 6 +++--- .../pubsub/table/VenicePubsubInputTable.java | 3 ++- .../input/pubsub/table/VenicePubsubSource.java | 4 ++-- .../VenicePubsubInputPartitionReaderTest.java | 7 ++++++- 7 files changed, 28 insertions(+), 23 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index cbb7df4468..1ffeb2a499 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,14 +66,14 @@ public class VenicePubsubInputPartitionReader implements PartitionReader> partitionMessagesBuffer = new ArrayList<>(); - public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputPartition inputPartition) { + public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition) { this( jobConfig, inputPartition, - new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() // need to review the + new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() // need to review the // properties bag ... .create( - new VeniceProperties(jobConfig), + jobConfig, false, PubSubMessageDeserializer.getInstance(), // PubSubPassThroughDeserializer.getInstance(), @@ -84,7 +83,7 @@ public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputP // testing constructor public VenicePubsubInputPartitionReader( - Properties jobConfig, + VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition, PubSubConsumerAdapter consumer, PubSubTopicRepository pubSubTopicRepository) { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java index b8ad293e74..41ce7e2798 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java @@ -1,6 +1,6 @@ package com.linkedin.venice.spark.input.pubsub.table; -import java.util.Properties; +import com.linkedin.venice.utils.VeniceProperties; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -10,9 +10,9 @@ public class VenicePubsubInputPartitionReaderFactory implements PartitionReaderFactory { private static final long serialVersionUID = 1L; - private final Properties jobConfig; + private final VeniceProperties jobConfig; - public VenicePubsubInputPartitionReaderFactory(Properties jobConfig) { + public VenicePubsubInputPartitionReaderFactory(VeniceProperties jobConfig) { this.jobConfig = jobConfig; } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java index 8b8d1f8c20..f70490bfba 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java @@ -1,5 +1,7 @@ package com.linkedin.venice.spark.input.pubsub.table; +import static com.linkedin.venice.vpj.VenicePushJobConstants.*; + import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; @@ -15,7 +17,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; @@ -24,27 +25,26 @@ public class VenicePubsubInputScan implements Scan, Batch { - private final Properties jobConfig; + private final VeniceProperties jobConfig; - public VenicePubsubInputScan(Properties jobConfig) { + public VenicePubsubInputScan(VeniceProperties jobConfig) { this.jobConfig = jobConfig; } @Override public InputPartition[] planInputPartitions() { try { - String topicName = "test_topic_v_1"; - String regionName = "ei-ltx1"; + String topicName = jobConfig.getString(KAFKA_INPUT_TOPIC); + String regionName = jobConfig.getString(KAFKA_INPUT_FABRIC); int splitCount = 1000; - VeniceProperties veniceProperties = new VeniceProperties(jobConfig); PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName); - PubSubClientsFactory clientsFactory = new PubSubClientsFactory(veniceProperties); + PubSubClientsFactory clientsFactory = new PubSubClientsFactory(jobConfig); // PubSubAdminAdapter pubsubAdminClient = // clientsFactory.getAdminAdapterFactory().create(veniceProperties, pubSubTopicRepository); PubSubMessageDeserializer pubSubMessageDeserializer = PubSubMessageDeserializer.getInstance(); PubSubConsumerAdapter pubSubConsumer = clientsFactory.getConsumerAdapterFactory() - .create(veniceProperties, false, pubSubMessageDeserializer, "Spark_KIF_planner"); + .create(jobConfig, false, pubSubMessageDeserializer, "Spark_KIF_planner"); // surround by retries from retryUtils or something List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java index 54fb008b6e..b2a0dc77c0 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java @@ -1,14 +1,14 @@ package com.linkedin.venice.spark.input.pubsub.table; -import java.util.Properties; +import com.linkedin.venice.utils.VeniceProperties; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; public class VenicePubsubInputScanBuilder implements ScanBuilder { - private final Properties jobConfig; + private final VeniceProperties jobConfig; - public VenicePubsubInputScanBuilder(Properties properties) { + public VenicePubsubInputScanBuilder(VeniceProperties properties) { this.jobConfig = properties; } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java index d3f1bba9e7..568778c098 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java @@ -31,7 +31,8 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { Properties properties = jobConfig.getPropertiesCopy(); properties.putAll(options.asCaseSensitiveMap()); - return new VenicePubsubInputScanBuilder(properties); // should we flip this to VeniceProperties? + return new VenicePubsubInputScanBuilder(new VeniceProperties(properties)); // should we flip this to + // VeniceProperties? } @Override diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java index 784068b150..e02aa0577d 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java @@ -25,9 +25,9 @@ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { } @Override - public Table getTable(StructType schema, Transform[] partitioning, Map sparkConfigs) { + public Table getTable(StructType schema, Transform[] partitioning, Map configs) { Properties properties = new Properties(); - properties.putAll(sparkConfigs); + properties.putAll(configs); // the properties here is the entry point for all the configurations // we receive from the outer layer. // schem and partitioning are useless and should be discarded? diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index 1f8faa5dd2..aa04fbda3e 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -17,6 +17,7 @@ import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; +import com.linkedin.venice.utils.VeniceProperties; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -83,7 +84,11 @@ public void setUp() { inputPartition = new VenicePubsubInputPartition("prod-lva2", topicName, targetPartitionNumber, startingOffset, endingOffset); - reader = new VenicePubsubInputPartitionReader(jobConfig, inputPartition, consumer, pubSubTopicRepository); + reader = new VenicePubsubInputPartitionReader( + new VeniceProperties(jobConfig), + inputPartition, + consumer, + pubSubTopicRepository); } @Test From 072ee4e03062de6f1b65c880d5d2e079d22466d7 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Fri, 1 Nov 2024 15:05:43 -0700 Subject: [PATCH 05/12] Protect the system from NPEs resulting from calling addAll on a consumerBugger.get that comes back as null . --- .../table/VenicePubsubInputPartitionReader.java | 12 ++++++++---- .../VenicePubsubInputPartitionReaderTest.java | 14 +++++++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 1ffeb2a499..6d10e0636c 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -64,7 +64,6 @@ public class VenicePubsubInputPartitionReader implements PartitionReader>> consumerBuffer = new HashMap<>(); // the buffer that holds the relevant messages for the current partition - private List> partitionMessagesBuffer = new ArrayList<>(); public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition) { this( @@ -119,7 +118,8 @@ public VenicePubsubInputPartitionReader( } private void initialize() { - next(); // get the first record ready to go. + next();// get the first record ready to go. + recordsServed = 0; // reset the counter } // if it returns a row, it's going to be key and value and offset in the row in that order @@ -132,7 +132,7 @@ public InternalRow get() { @Override public boolean next() { // Are we past the finish line ? - if (currentOffset > endingOffset) { + if (currentOffset >= endingOffset) { return false; } @@ -158,10 +158,13 @@ public void close() { // borrowing Gaojie's code for dealing with empty polls. private void loadRecords() { + List> partitionMessagesBuffer = new ArrayList<>(); + int retry = 0; while (retry++ < CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES) { consumerBuffer = pubSubConsumer.poll(CONSUMER_POLL_TIMEOUT); - partitionMessagesBuffer = consumerBuffer.get(targetPubSubTopicPartition); + + partitionMessagesBuffer.addAll(consumerBuffer.get(targetPubSubTopicPartition)); if (!partitionMessagesBuffer.isEmpty()) { // we got some records back for the desired partition. break; @@ -185,6 +188,7 @@ private void loadRecords() { throw new RuntimeException("Empty poll after " + retry + " retries"); } messageBuffer.addAll(partitionMessagesBuffer); + } private InternalRow processPubSubMessageToRow( diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index aa04fbda3e..b091d39a87 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.Properties; import org.apache.spark.sql.catalyst.InternalRow; -import org.testng.annotations.BeforeTest; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -36,7 +36,7 @@ public class VenicePubsubInputPartitionReaderTest { private VenicePubsubInputPartitionReader reader; private VenicePubsubInputPartition inputPartition; - @BeforeTest + @BeforeMethod public void setUp() { Properties jobConfig = new Properties(); int startingOffset = 0; // starting offset other than 0 needs mocking of subscription ... @@ -51,14 +51,14 @@ public void setUp() { PubSubConsumerAdapter consumer = mock(PubSubConsumerAdapter.class); - long numRecords = endingOffset - startingOffset; + long numRecords = endingOffset - startingOffset + 1; List> consumerRecordList = new ArrayList<>(); PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topicName), targetPartitionNumber); // fill the topic message array - for (int i = startingOffset; i < numRecords; ++i) { + for (int i = startingOffset; i <= numRecords; ++i) { byte[] keyBytes = (KAFKA_MESSAGE_KEY_PREFIX + i).getBytes(); byte[] valueBytes = (KAFKA_MESSAGE_VALUE_PREFIX + i).getBytes(); @@ -93,12 +93,12 @@ public void setUp() { @Test public void testNext() { - assertTrue(reader.next()); - for (int i = 0; i < 99; i++) { + System.out.println(); + assertTrue(reader.next()); // first record 0 + for (int i = 0; i < 98; i++) { // 99 more records reader.get(); assertTrue(reader.next()); } - reader.get(); assertFalse(reader.next()); reader.close(); } From b1870d9cca625c9f644a212da306fa5c2e389a4e Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Fri, 1 Nov 2024 16:47:10 -0700 Subject: [PATCH 06/12] Better offset math to deal with off-by-ones happening throughout . --- .../VenicePubsubInputPartitionReaderTest.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index b091d39a87..96274dd1fc 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -40,7 +40,7 @@ public class VenicePubsubInputPartitionReaderTest { public void setUp() { Properties jobConfig = new Properties(); int startingOffset = 0; // starting offset other than 0 needs mocking of subscription ... - int endingOffset = 100; + int endingOffset = 77; // total of 78 records int targetPartitionNumber = 42; String topicName = "BigStrangePubSubTopic_V1_rt_r"; @@ -58,7 +58,7 @@ public void setUp() { new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topicName), targetPartitionNumber); // fill the topic message array - for (int i = startingOffset; i <= numRecords; ++i) { + for (int i = startingOffset; i < numRecords; ++i) { byte[] keyBytes = (KAFKA_MESSAGE_KEY_PREFIX + i).getBytes(); byte[] valueBytes = (KAFKA_MESSAGE_VALUE_PREFIX + i).getBytes(); @@ -95,7 +95,7 @@ public void setUp() { public void testNext() { System.out.println(); assertTrue(reader.next()); // first record 0 - for (int i = 0; i < 98; i++) { // 99 more records + for (int i = 1; i < 77; i++) { // 78 records expected reader.get(); assertTrue(reader.next()); } @@ -105,15 +105,16 @@ public void testNext() { @Test public void testGet() { - InternalRow row = reader.get(); - System.out.println(row); - long offset = row.getLong(0); - byte[] key = row.getBinary(1); - byte[] value = row.getBinary(2); - - assertEquals(key, (KAFKA_MESSAGE_KEY_PREFIX + offset).getBytes()); - assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); - + for (int i = 0; i < 77; i++) { // 78 records expected + InternalRow row = reader.get(); + System.out.println(row); + long offset = row.getLong(0); + byte[] key = row.getBinary(1); + byte[] value = row.getBinary(2); + + assertEquals(key, (KAFKA_MESSAGE_KEY_PREFIX + offset).getBytes()); + assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); + } // assertEquals(row.get(0, KafkaKey), "dummyData1"); // assertTrue(row.getInt(1) >= 0); // assertTrue(row.getInt(1) < 1000); From 68cf01cb27c3a27b08308a1be9c45b579eb5cf26 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Sun, 3 Nov 2024 11:15:17 -0800 Subject: [PATCH 07/12] Rearranged pubsub test. --- .../pubsub/table/VenicePubsubInputPartitionTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java index 217cdd80d4..5f6e53a90a 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java @@ -10,6 +10,12 @@ public class VenicePubsubInputPartitionTest { VenicePubsubInputPartition targetObject; + @BeforeTest + public void setUp() { + targetObject = + new VenicePubsubInputPartition("prod-lva2", "BigStrangePubSubTopic_V1_rt_r", 42, 49_152, 100_000_000); + } + @Test public void testSerializablity() { @@ -30,9 +36,4 @@ public void testSerializablity() { } - @BeforeTest - public void setUp() { - targetObject = - new VenicePubsubInputPartition("prod-lva2", "BigStrangePubSubTopic_V1_rt_r", 42, 49_152, 100_000_000); - } } From 09f4b01eadf6cad2ba0162113ef4943917f0720c Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Sun, 3 Nov 2024 11:25:40 -0800 Subject: [PATCH 08/12] Removed un-needed clients they are now embedded in the partition reader and ... --- .../clients/VenicePubSubAdminClient.java | 5 ---- .../VenicePubsubPartitionConsumer.java | 25 ------------------- 2 files changed, 30 deletions(-) delete mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java delete mode 100644 clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java deleted file mode 100644 index 0a93f06dcf..0000000000 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubSubAdminClient.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.linkedin.venice.spark.input.pubsub.clients; - -public class VenicePubSubAdminClient { - -} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java deleted file mode 100644 index 1e63a4200d..0000000000 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.linkedin.venice.spark.input.pubsub.clients; - -//import com.linkedin.venice.pubsub.PubSubTopicRepository; -//import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; -//import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; -//import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; -//import java.util.Properties; - -public class VenicePubsubPartitionConsumer { - // potentially future home of consumer if we decide to factor it out of the VenicePubsubPartitionReader - - // private final Properties configBag; - // private final VenicePubsubInputPartition inputPartition; - // private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - // private ApacheKafkaConsumerAdapter consumer; - // private ApacheKafkaAdminAdapter admin; - // - // public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) { - // this.configBag = configBag; - // this.inputPartition = inputPartition; - // String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_" - // + inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset(); - // - // } -} From 1d7bebdaa1be00640d6a06caceac2f6ff99b3a22 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Tue, 5 Nov 2024 14:55:38 -0800 Subject: [PATCH 09/12] Added counter for delivered records, can help with debugging future missing records situations. --- .../table/VenicePubsubInputPartitionReader.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 6d10e0636c..72892f710e 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -60,6 +60,7 @@ public class VenicePubsubInputPartitionReader implements PartitionReader>> consumerBuffer = new HashMap<>(); @@ -69,8 +70,7 @@ public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsub this( jobConfig, inputPartition, - new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() // need to review the - // properties bag ... + new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() .create( jobConfig, false, @@ -125,6 +125,7 @@ private void initialize() { // if it returns a row, it's going to be key and value and offset in the row in that order @Override public InternalRow get() { + recordsDeliveredByGet++; // should return the same row if called multiple times return currentRow; } @@ -149,11 +150,8 @@ public boolean next() { @Override public void close() { pubSubConsumer.close(); - LOGGER.info( - "Consuming ended for Topic: {} , consumed {} records, skipped {} records", - targetPubSubTopicName, - recordsServed, - recordsSkipped); + LOGGER.info("Consuming ended for Topic: {} , consumed {} records,", targetPubSubTopicName, recordsServed); + LOGGER.info(" skipped {} records, gets invoked : {} .", recordsSkipped, recordsDeliveredByGet); } // borrowing Gaojie's code for dealing with empty polls. @@ -188,7 +186,6 @@ private void loadRecords() { throw new RuntimeException("Empty poll after " + retry + " retries"); } messageBuffer.addAll(partitionMessagesBuffer); - } private InternalRow processPubSubMessageToRow( From 9f72cbb8a5037a973daa9fe4337f473b7c701e27 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Tue, 12 Nov 2024 13:24:39 -0800 Subject: [PATCH 10/12] Removed un-needed tests and comments. --- .../table/VenicePubsubInputPartitionReaderTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index 96274dd1fc..2627fed4e0 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -115,15 +115,6 @@ public void testGet() { assertEquals(key, (KAFKA_MESSAGE_KEY_PREFIX + offset).getBytes()); assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); } - // assertEquals(row.get(0, KafkaKey), "dummyData1"); - // assertTrue(row.getInt(1) >= 0); - // assertTrue(row.getInt(1) < 1000); - // assertTrue(row.getBoolean(2)); } - @Test - public void testClose() { - reader.close(); - // Add assertions if there are any resources to verify after close - } } From 2bc56ee7d7724f4d7806a6775f4da6533ecf6ee2 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Tue, 12 Nov 2024 13:28:15 -0800 Subject: [PATCH 11/12] Explicit imports per venice coding style. --- .../VenicePubsubInputPartitionReaderTest.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index 2627fed4e0..f4608d3a07 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -1,9 +1,15 @@ package com.linkedin.venice.spark.input.pubsub.table; -import static com.linkedin.venice.kafka.protocol.enums.MessageType.*; -import static com.linkedin.venice.vpj.VenicePushJobConstants.*; -import static org.mockito.Mockito.*; -import static org.testng.Assert.*; +import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_TOPIC; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import com.linkedin.venice.kafka.protocol.GUID; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; From 57606ec26c4c389ce16c1546c70fef0e995a4fd7 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Tue, 12 Nov 2024 20:21:30 -0800 Subject: [PATCH 12/12] Added method to expose stats from the record reader and related tests. Polished logging. --- .../VenicePubsubInputPartitionReader.java | 27 ++++++++++++------- .../VenicePubsubInputPartitionReaderTest.java | 26 +++++++++++++++--- 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 72892f710e..7466445b89 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -150,8 +151,13 @@ public boolean next() { @Override public void close() { pubSubConsumer.close(); - LOGGER.info("Consuming ended for Topic: {} , consumed {} records,", targetPubSubTopicName, recordsServed); - LOGGER.info(" skipped {} records, gets invoked : {} .", recordsSkipped, recordsDeliveredByGet); + double progressPercent = (currentOffset - startingOffset) * 100.0 / offsetLength; + LOGGER.info( + "Consuming ended for Topic: {} , consumed {}% of {} records,", + targetPubSubTopicName, + progressPercent, + recordsServed); + LOGGER.info("Skipped {} records, delivered rows {} times .", recordsSkipped, recordsDeliveredByGet); } // borrowing Gaojie's code for dealing with empty polls. @@ -215,7 +221,6 @@ private InternalRow processPubSubMessageToRow( long offset = pubSubMessage.getOffset(); ByteBuffer key = ByteBuffer.wrap(kafkaKey.getKey(), 0, kafkaKey.getKeyLength()); ByteBuffer value; - int partition = targetPartitionNumber; int messageType; int schemaId; ByteBuffer replicationMetadataPayload; @@ -282,31 +287,35 @@ private InternalRow processPubSubMessageToRow( replicationMetadataPayload.get(replicationMetadataPayloadBytes); return new GenericInternalRow( - new Object[] { offset, keyBytes, valueBytes, partition, messageType, schemaId, replicationMetadataPayloadBytes, - replicationMetadataVersionId }); + new Object[] { offset, keyBytes, valueBytes, targetPartitionNumber, messageType, schemaId, + replicationMetadataPayloadBytes, replicationMetadataVersionId }); } private void maybeLogProgress() { - long progressPercent = (currentOffset - startingOffset) * 100 / offsetLength; + double progressPercent = (currentOffset - startingOffset) * 100.0 / offsetLength; if (progressPercent > 10 + lastKnownProgressPercent) { logProgress(); - lastKnownProgressPercent = progressPercent; + lastKnownProgressPercent = (long) progressPercent; } } private void logProgress() { - long progressPercent = (currentOffset - startingOffset) * 100 / offsetLength; + double progressPercent = (currentOffset - startingOffset) * 100.0 / offsetLength; LOGGER.info( "Consuming progress for" + " Topic: {}, partition {} , consumed {}% of {} records. actual records delivered: {}, records skipped: {}", targetPubSubTopicName, targetPartitionNumber, - progressPercent, + String.format("%.1f", (float) progressPercent), offsetLength, recordsServed, recordsSkipped); } + public List getStats() { + return Arrays.asList(currentOffset, recordsServed, recordsSkipped, recordsDeliveredByGet); + } + // go through the current buffer and find the next usable message private boolean ableToPrepNextRow() { diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index f4608d3a07..98aae45d29 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -26,6 +26,7 @@ import com.linkedin.venice.utils.VeniceProperties; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -97,15 +98,36 @@ public void setUp() { pubSubTopicRepository); } + /* + * Test method for {@link com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartitionReader#next()}. + * Runs through the records once and tests if the next() works correctly for both presence and absence of recrods. + * Also ensures that the stats are recording the stats correctly. + */ @Test - public void testNext() { + public void testNextAndGetStats() { + System.out.println(); assertTrue(reader.next()); // first record 0 + + for (int i = 0; i < 10; i++) { // 10 gets just to break the balance + reader.get(); + } + for (int i = 1; i < 77; i++) { // 78 records expected reader.get(); assertTrue(reader.next()); } + + // skipped is zero in these cases , no control message in data + // currentOffset, recordConvertedToRow, recordsSkipped, recordsDeliveredByGet + List stats = reader.getStats(); + // since the offset starts at 0 for this test, we expect the offset to be equal to sum of skipped and converted + assertEquals((long) stats.get(0), (long) stats.get(1) + stats.get(2)); + + assertEquals(Arrays.asList(77L, 77L, 0L, 86L), reader.getStats()); + assertFalse(reader.next()); + assertEquals(Arrays.asList(77L, 77L, 0L, 86L), reader.getStats()); reader.close(); } @@ -113,7 +135,6 @@ public void testNext() { public void testGet() { for (int i = 0; i < 77; i++) { // 78 records expected InternalRow row = reader.get(); - System.out.println(row); long offset = row.getLong(0); byte[] key = row.getBinary(1); byte[] value = row.getBinary(2); @@ -122,5 +143,4 @@ public void testGet() { assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); } } - }