diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java index 9af9742116..79099fd4b3 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputFormatCombiner.java @@ -23,6 +23,8 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; +// for spark impl this needs to be moved out to the next stages after all the kafka records are captured. + /** * This class is a Combiner, which is a functionality of the MR framework where we can plug a {@link Reducer} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java index 0e8aa3e9ff..f8b13c7a5a 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java @@ -159,6 +159,8 @@ public KafkaInputRecordReader( /** * This function will skip all the Control Messages right now. + * This method gets 2 objects, a key and a value, and fills them with + * the next key and value from the Kafka topic partition. */ @Override public boolean next(KafkaInputMapperKey key, KafkaInputMapperValue value) throws IOException { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java index e8ba243b85..bffa85bc36 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/SparkConstants.java @@ -20,6 +20,16 @@ public class SparkConstants { new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()), new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()) }); + public static final StructType KAFKA_INPUT_TABLE_SCHEMA = new StructType( + new StructField[] { new StructField("__offset__", IntegerType, false, Metadata.empty()), + new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()), + new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()), + new StructField("__partition__", IntegerType, false, Metadata.empty()), + new StructField("__message_type__", IntegerType, false, Metadata.empty()), + new StructField("__schema_id__", IntegerType, false, Metadata.empty()), + new StructField("__replication_metadata_version_id__", IntegerType, false, Metadata.empty()), + new StructField("__replication_metadata_payload__", BinaryType, false, Metadata.empty()) }); + public static final StructType DEFAULT_SCHEMA_WITH_PARTITION = new StructType( new StructField[] { new StructField(KEY_COLUMN_NAME, BinaryType, false, Metadata.empty()), new StructField(VALUE_COLUMN_NAME, BinaryType, true, Metadata.empty()), diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/PartitionSplitters.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/PartitionSplitters.java new file mode 100644 index 0000000000..a5f1187f30 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/PartitionSplitters.java @@ -0,0 +1,151 @@ +package com.linkedin.venice.spark.input.pubsub; + +import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class PartitionSplitters { + // need a method called fullPartitionSplitter, takes in list of partition start and end offsets + // and returns a list of VenicePubsubInputPartition splits + public static Map>> fullPartitionSplitter(Map> partitionOffsetsMap) { + Map>> splits = new HashMap<>(); + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + int partitionNumber = entry.getKey(); + List offsets = entry.getValue(); // start and end offsets + List> split = new ArrayList<>(); + split.add(assembleSegment(offsets.get(0), offsets.get(1))); + splits.put(partitionNumber, split); + } + return splits; + } + + // this one splits the partitions into equal length segments based on the total number of segments intended + // the final segment count is going to be less than desired count + partition count to accomodate for partial segments + // and division remainders. This method prioritizes paralellism over segment count. + public static Map>> segmentCountSplitter( + Map> partitionOffsetsMap, + int totalSegments) { + Map>> splits = new HashMap<>(); + long intendedSplitLength = computeIntendedSplitLengthBasedOnCount(partitionOffsetsMap, totalSegments); + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + int partitionNumber = entry.getKey(); + List offsets = entry.getValue(); + long startOffset = offsets.get(0); + long endOffset = offsets.get(1); + long partitionLength = (endOffset - startOffset); + if (intendedSplitLength >= partitionLength) { + // this whole partition fits nicely in one chunk + List> split = new ArrayList<>(); + split.add(assembleSegment(startOffset, endOffset)); + splits.put(partitionNumber, split); // this partition is going to be consumed by a single task + } else { + // this partition needs to be split into multiple segments + // first lets see how many segments we can get out of this partition + long count = (long) Math.ceil((double) partitionLength / intendedSplitLength); + long segmentLength = (long) Math.ceil((double) partitionLength / count); + List> split = new ArrayList<>(); + + // generate the segments + for (int i = 0; i < count; i++) { + List segment = new ArrayList<>(); + segment.add(startOffset + i * segmentLength); + if (i == count - 1) { + segment.add(endOffset); // last segment + } else { + segment.add(startOffset + (i + 1) * segmentLength - 1); + } + split.add(segment); + } + splits.put(partitionNumber, split); // Multiple splits for this partition + } + } + return splits; + } + + // and a method that cuts the partitions into segments based on the total number of messages + public static Map>> messageCountSplitter( + Map> partitionOffsetsMap, + long chunkOffsetSize) { + Map>> splits = new HashMap<>(); + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + int partitionNumber = entry.getKey(); + List offsets = entry.getValue(); + long startOffset = offsets.get(0); + long endOffset = offsets.get(1); + long partitionLength = (endOffset - startOffset); + if (chunkOffsetSize <= partitionLength) { + // this whole partition fits nicely in one chunk + List> split = new ArrayList<>(); + split.add(assembleSegment(startOffset, endOffset)); + splits.put(partitionNumber, split); // this partition is going to be consumed by a single task + } else { + // this partition needs to be split into multiple segments + // first lets see how many segments we can get out of this partition + long count = (long) Math.ceil((double) partitionLength / chunkOffsetSize); + long segmentLength = (long) Math.ceil((double) partitionLength / count); + List> split = new ArrayList<>(); + + // generate the segments + for (int i = 0; i < count; i++) { + List segment = new ArrayList<>(); + segment.add(startOffset + i * segmentLength); + segment.add(startOffset + (i + 1) * segmentLength); + split.add(segment); + } + splits.put(partitionNumber, split); // Multiple splits for this partition + } + } + return splits; + } + + // assemble and wrap the splits into VenicePubsubInputPartition objects. + public static List convertToInputPartitions( + String region, + String topicName, + Map>> splits) { + List veniceInputPartitions = new ArrayList<>(); + for (Map.Entry>> entry: splits.entrySet()) { + int partitionNumber = entry.getKey(); + List> segments = entry.getValue(); + for (List segment: segments) { + long startOffset = segment.get(0); + long endOffset = segment.get(1); + VenicePubsubInputPartition partition = + new VenicePubsubInputPartition(region, topicName, partitionNumber, startOffset, endOffset); + veniceInputPartitions.add(partition); + } + } + return veniceInputPartitions; + } + + // utility methods + private static List assembleSegment(long startOffset, long endOffset) { + List split; + List segment = new ArrayList<>(); + segment.add(startOffset); + segment.add(endOffset); + return segment; + } + + static long computeIntendedSplitLengthBasedOnCount(Map> partitionOffsetsMap, int totalSegments) { + Double totalLength = computeTopicLengthInOffsets(partitionOffsetsMap); + return (long) Math.ceil(totalLength / totalSegments); + } + + static long computeIntendedSplitCountBasedOnOffset(Map> partitionOffsetsMap, long offsetCount) { + Double totalLength = computeTopicLengthInOffsets(partitionOffsetsMap); + return (long) Math.ceil(totalLength / offsetCount); + } + + protected static Double computeTopicLengthInOffsets(Map> partitionOffsetsMap) { + Double totalLength = 0.0; + for (Map.Entry> entry: partitionOffsetsMap.entrySet()) { + List offsets = entry.getValue(); + totalLength += offsets.get(1) - offsets.get(0); + } + return totalLength; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartition.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartition.java new file mode 100644 index 0000000000..54f1fae732 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartition.java @@ -0,0 +1,54 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import org.apache.spark.sql.connector.read.InputPartition; + +/* + This split can be a whole partition or sub part of a pubsub partition, hence the name segment. + This is intentional not to mix up the Kafka partition and spark idea of a split + the equivalent class for hdfs is VeniceHdfsInputPartition + */ + + +public class VenicePubsubInputPartition implements InputPartition { + // + private static final long serialVersionUID = 1L; + + private final String region; + private final String TopicName; + private final int partitionNumber; + private final long segmentStartOffset; + private final long segmentEndOffset; + + public VenicePubsubInputPartition( + String region, + String topicName, + int partitionNumber, + long startOffset, + long endOffset) { + this.region = region; + this.TopicName = topicName; + this.partitionNumber = partitionNumber; + this.segmentStartOffset = startOffset; + this.segmentEndOffset = endOffset; + } + + public String getRegion() { + return region; + } + + public String getTopicName() { + return TopicName; + } + + public int getPartitionNumber() { + return partitionNumber; + } + + public long getSegmentStartOffset() { + return segmentStartOffset; + } + + public long getSegmentEndOffset() { + return segmentEndOffset; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java new file mode 100644 index 0000000000..7466445b89 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -0,0 +1,352 @@ +package com.linkedin.venice.spark.input.pubsub.table; +/* + this is the code that runs within each task and consumes from pubsub and return rows + This is comparable to the Mapper stage of the hadoop map reduce KIF + */ + +import com.linkedin.venice.kafka.protocol.Delete; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.utils.VeniceProperties; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.jetbrains.annotations.NotNull; + + +public class VenicePubsubInputPartitionReader implements PartitionReader { + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]); + private static final int CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES = 12; + private static final long EMPTY_POLL_SLEEP_TIME_MS = TimeUnit.SECONDS.toMillis(5); + private static final Long CONSUMER_POLL_TIMEOUT = TimeUnit.SECONDS.toMillis(1); // 1 second + + private static final Logger LOGGER = LogManager.getLogger(VenicePubsubInputPartitionReader.class); + + private final boolean filterControlMessages = true; + + // this is the buffer that holds the messages that are consumed from the pubsub + private final PubSubTopicPartition targetPubSubTopicPartition; + private final String targetPubSubTopicName; + private final int targetPartitionNumber; + private final long startingOffset; + private final long endingOffset; + private final long offsetLength; + private final PubSubConsumerAdapter pubSubConsumer; + // inputPartitionReader local buffer, that gets filled from partitionMessagesBuffer + + private final ArrayDeque> messageBuffer = new ArrayDeque<>(); + private long currentOffset; + private InternalRow currentRow = null; + private long recordsServed = 0; + private long recordsSkipped = 0; + private long lastKnownProgressPercent = 0; + private long recordsDeliveredByGet = 0; + + private Map>> consumerBuffer = + new HashMap<>(); + // the buffer that holds the relevant messages for the current partition + + public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition) { + this( + jobConfig, + inputPartition, + new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() + .create( + jobConfig, + false, + PubSubMessageDeserializer.getInstance(), + // PubSubPassThroughDeserializer.getInstance(), + "Spark_KIF_consumer"), + new PubSubTopicRepository()); + } + + // testing constructor + public VenicePubsubInputPartitionReader( + VeniceProperties jobConfig, + VenicePubsubInputPartition inputPartition, + PubSubConsumerAdapter consumer, + PubSubTopicRepository pubSubTopicRepository) { + + targetPubSubTopicName = inputPartition.getTopicName(); + targetPartitionNumber = inputPartition.getPartitionNumber(); + startingOffset = inputPartition.getSegmentStartOffset(); + endingOffset = inputPartition.getSegmentEndOffset(); + offsetLength = endingOffset - startingOffset; + + this.pubSubConsumer = consumer; + + LOGGER.info( + "Consuming started for Topic: {} Partition {}, starting offset: {} ending offset: {}", + targetPubSubTopicName, + targetPartitionNumber, + startingOffset, + endingOffset); + + PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(targetPubSubTopicName); + + // List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); + // at this point, we hope that driver has given us good information about + // the partition and offsets and the fact that topic exists. + + targetPubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopic, targetPartitionNumber); + + pubSubConsumer.subscribe(targetPubSubTopicPartition, startingOffset - 1); + // pubSubConsumer.seek(startingOffset); // do we need this? or should we rely on the starting offset passed to + // subscribe ? + + initialize(); // see, MET05-J asked for this ! + } + + private void initialize() { + next();// get the first record ready to go. + recordsServed = 0; // reset the counter + } + + // if it returns a row, it's going to be key and value and offset in the row in that order + @Override + public InternalRow get() { + recordsDeliveredByGet++; + // should return the same row if called multiple times + return currentRow; + } + + @Override + public boolean next() { + // Are we past the finish line ? + if (currentOffset >= endingOffset) { + return false; + } + + if (ableToPrepNextRow()) { + // there is a fresh row to serve + return true; + } + // at this point, buffer is empty. + + loadRecords(); // try to poll for some records and allow the exception to bubble up + return ableToPrepNextRow(); + } + + @Override + public void close() { + pubSubConsumer.close(); + double progressPercent = (currentOffset - startingOffset) * 100.0 / offsetLength; + LOGGER.info( + "Consuming ended for Topic: {} , consumed {}% of {} records,", + targetPubSubTopicName, + progressPercent, + recordsServed); + LOGGER.info("Skipped {} records, delivered rows {} times .", recordsSkipped, recordsDeliveredByGet); + } + + // borrowing Gaojie's code for dealing with empty polls. + private void loadRecords() { + List> partitionMessagesBuffer = new ArrayList<>(); + + int retry = 0; + while (retry++ < CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES) { + consumerBuffer = pubSubConsumer.poll(CONSUMER_POLL_TIMEOUT); + + partitionMessagesBuffer.addAll(consumerBuffer.get(targetPubSubTopicPartition)); + if (!partitionMessagesBuffer.isEmpty()) { + // we got some records back for the desired partition. + break; + } + + try { + Thread.sleep(EMPTY_POLL_SLEEP_TIME_MS); + } catch (InterruptedException e) { + logProgress(); + LOGGER.error( + "Interrupted while waiting for records to be consumed from topic {} partition {} to be available", + targetPubSubTopicName, + targetPartitionNumber, + e); + // should we re-throw here to break the consumption task ? + } + } + if (partitionMessagesBuffer.isEmpty()) { + // this is a valid place to throw exception and kill the consumer task + // as there is no more records to consume. + throw new RuntimeException("Empty poll after " + retry + " retries"); + } + messageBuffer.addAll(partitionMessagesBuffer); + } + + private InternalRow processPubSubMessageToRow( + @NotNull PubSubMessage pubSubMessage) { + // after deliberation, I think we are better off isolating further processing of the messages after they are dumped + // into the dataframe, Spark job can handle the rest of the processing. + + // should we detect chunking on the topic ? + + KafkaKey kafkaKey = pubSubMessage.getKey(); + KafkaMessageEnvelope kafkaMessageEnvelope = pubSubMessage.getValue(); + MessageType pubSubMessageType = MessageType.valueOf(kafkaMessageEnvelope); + + /* + List of fields we need in the row: @see KAFKA_INPUT_TABLE_SCHEMA + 1. offset ( currently a long , maybe some other complicated thing in the Northguard world) + 2. key ( serialized key Byte[]) + 3. value ( serialized value Byte[]) + 4. partition ( int ) + 5. messageType ( put vs delete ) .getValue is the int value and gives us that. value type is also of this kind + 6. schemaId ( for put and delete ) int + 7. replicationMetadataPayload ByteBuffer + 8. replicationMetadataVersionId int + */ + + // Spark row setup : + long offset = pubSubMessage.getOffset(); + ByteBuffer key = ByteBuffer.wrap(kafkaKey.getKey(), 0, kafkaKey.getKeyLength()); + ByteBuffer value; + int messageType; + int schemaId; + ByteBuffer replicationMetadataPayload; + int replicationMetadataVersionId; + + switch (pubSubMessageType) { + case PUT: + Put put = (Put) kafkaMessageEnvelope.payloadUnion; + messageType = MessageType.PUT.getValue(); + value = put.putValue; + schemaId = put.schemaId; // chunking will be handled down the road in spark job. + replicationMetadataPayload = put.replicationMetadataPayload; + replicationMetadataVersionId = put.replicationMetadataVersionId; + break; + case DELETE: + messageType = MessageType.DELETE.getValue(); + Delete delete = (Delete) kafkaMessageEnvelope.payloadUnion; + schemaId = delete.schemaId; + value = EMPTY_BYTE_BUFFER; + replicationMetadataPayload = delete.replicationMetadataPayload; + replicationMetadataVersionId = delete.replicationMetadataVersionId; + break; + default: + messageType = -1; // this is an error condition + schemaId = Integer.MAX_VALUE; + value = EMPTY_BYTE_BUFFER; + replicationMetadataPayload = EMPTY_BYTE_BUFFER; + replicationMetadataVersionId = Integer.MAX_VALUE; + // we don't care about messages other than PUT and DELETE + } + + /* + Dealing with chunking : + @link https://github.com/linkedin/venice/blob/main/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java#L53 + * 1. The top-level key is queried. + * 2. The top-level key's value's schema ID is checked. + * a) If it is positive, then it's a full value, and is returned immediately. + * b) If it is negative, then it's a {@link ChunkedValueManifest}, and we continue to the next steps. + * 3. The {@link ChunkedValueManifest} is deserialized, and its chunk keys are extracted. + * 4. Each chunk key is queried. + * 5. The chunks are stitched back together using the various adapter interfaces of this package, + * depending on whether it is the single get or batch get/compute path that needs to re-assemble + * a chunked value. + + For dumping application, we can treat this as pass-through . + + chunking code: + RawKeyBytesAndChunkedKeySuffix rawKeyAndChunkedKeySuffix = + splitCompositeKey(kafkaKey.getKey(), messageType, getSchemaIdFromValue(kafkaMessageEnvelope)); + key.key = rawKeyAndChunkedKeySuffix.getRawKeyBytes(); + + value.chunkedKeySuffix = rawKeyAndChunkedKeySuffix.getChunkedKeySuffixBytes(); + */ + + // need to figure out task tracking in Spark Land. + // pack pieces of information into the spart intermediate row, this will populate the dataframe to be read by the + // spark job + // weirdest use of verb "GET" in heabBuffer !!!!! + byte[] keyBytes = new byte[key.remaining()]; + key.get(keyBytes); + byte[] valueBytes = new byte[value.remaining()]; + value.get(valueBytes); + byte[] replicationMetadataPayloadBytes = new byte[replicationMetadataPayload.remaining()]; + replicationMetadataPayload.get(replicationMetadataPayloadBytes); + + return new GenericInternalRow( + new Object[] { offset, keyBytes, valueBytes, targetPartitionNumber, messageType, schemaId, + replicationMetadataPayloadBytes, replicationMetadataVersionId }); + } + + private void maybeLogProgress() { + double progressPercent = (currentOffset - startingOffset) * 100.0 / offsetLength; + if (progressPercent > 10 + lastKnownProgressPercent) { + logProgress(); + lastKnownProgressPercent = (long) progressPercent; + } + } + + private void logProgress() { + double progressPercent = (currentOffset - startingOffset) * 100.0 / offsetLength; + LOGGER.info( + "Consuming progress for" + + " Topic: {}, partition {} , consumed {}% of {} records. actual records delivered: {}, records skipped: {}", + targetPubSubTopicName, + targetPartitionNumber, + String.format("%.1f", (float) progressPercent), + offsetLength, + recordsServed, + recordsSkipped); + } + + public List getStats() { + return Arrays.asList(currentOffset, recordsServed, recordsSkipped, recordsDeliveredByGet); + } + + // go through the current buffer and find the next usable message + private boolean ableToPrepNextRow() { + + PubSubMessage message; + boolean found; + // buffer is already empty. + if (messageBuffer.isEmpty()) { + return false; + } + + // look for the next viable message + found = false; + while (!found) { + try { + message = messageBuffer.pop(); + } catch (NoSuchElementException e) { + // ran out of messages in the buffer + return false; + } + + currentOffset = message.getOffset(); + + if (filterControlMessages && message.getKey().isControlMessage()) { + recordsSkipped++; + } else { + currentRow = processPubSubMessageToRow(message); + recordsServed++; + found = true; + } + } + maybeLogProgress(); + return true; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java new file mode 100644 index 0000000000..41ce7e2798 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java @@ -0,0 +1,35 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import com.linkedin.venice.utils.VeniceProperties; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; + + +public class VenicePubsubInputPartitionReaderFactory implements PartitionReaderFactory { + private static final long serialVersionUID = 1L; + + private final VeniceProperties jobConfig; + + public VenicePubsubInputPartitionReaderFactory(VeniceProperties jobConfig) { + this.jobConfig = jobConfig; + } + + @Override + public PartitionReader createReader(InputPartition inputPartition) { + + if (!(inputPartition instanceof VenicePubsubInputPartition)) { + throw new IllegalArgumentException( + "VenicePubsubInputPartitionReaderFactory can only create readers for VenicePubsubInputPartition"); + } + + return new VenicePubsubInputPartitionReader(jobConfig, (VenicePubsubInputPartition) inputPartition); + } + + // Make it explicit that this reader does not support columnar reads. + @Override + public boolean supportColumnarReads(InputPartition partition) { + return false; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java new file mode 100644 index 0000000000..f70490bfba --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java @@ -0,0 +1,87 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static com.linkedin.venice.vpj.VenicePushJobConstants.*; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.PubSubClientsFactory; +import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; +import com.linkedin.venice.pubsub.api.PubSubTopic; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.spark.input.pubsub.PartitionSplitters; +import com.linkedin.venice.utils.VeniceProperties; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.types.StructType; + + +public class VenicePubsubInputScan implements Scan, Batch { + private final VeniceProperties jobConfig; + + public VenicePubsubInputScan(VeniceProperties jobConfig) { + this.jobConfig = jobConfig; + } + + @Override + public InputPartition[] planInputPartitions() { + try { + String topicName = jobConfig.getString(KAFKA_INPUT_TOPIC); + String regionName = jobConfig.getString(KAFKA_INPUT_FABRIC); + int splitCount = 1000; + PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName); + PubSubClientsFactory clientsFactory = new PubSubClientsFactory(jobConfig); + // PubSubAdminAdapter pubsubAdminClient = + // clientsFactory.getAdminAdapterFactory().create(veniceProperties, pubSubTopicRepository); + PubSubMessageDeserializer pubSubMessageDeserializer = PubSubMessageDeserializer.getInstance(); + PubSubConsumerAdapter pubSubConsumer = clientsFactory.getConsumerAdapterFactory() + .create(jobConfig, false, pubSubMessageDeserializer, "Spark_KIF_planner"); + + // surround by retries from retryUtils or something + List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); + + // int numPartitions = listOfPartitions.size(); // number of partitions in the topic in case. + + // need a map of int to long,long to store the start and end offsets for each partition + Map> partitionOffsetsMap = new HashMap<>(); + + for (PubSubTopicPartitionInfo partition: listOfPartitions) { + PubSubTopicPartition pubSubTopicPartition = partition.getTopicPartition(); + int partitionNumber = partition.getTopicPartition().getPartitionNumber(); + + // do these with retries from retryUtils or something + long startOffset = pubSubConsumer.beginningOffset(pubSubTopicPartition, Duration.ofSeconds(60)); + long endOffset = pubSubConsumer.endOffset(pubSubTopicPartition); + // + + partitionOffsetsMap.put(partitionNumber, Arrays.asList(startOffset, endOffset)); + } + + Map>> splits = PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, splitCount); + return PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]); + } catch (Exception e) { + throw new VeniceException("Could not get FileSystem", e);// handle exception + // something broke in the process of getting the splits + } + } + + @Override + public PartitionReaderFactory createReaderFactory() { + + return new VenicePubsubInputPartitionReaderFactory(jobConfig); + } + + @Override + public StructType readSchema() { + return null; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java new file mode 100644 index 0000000000..b2a0dc77c0 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java @@ -0,0 +1,19 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import com.linkedin.venice.utils.VeniceProperties; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.ScanBuilder; + + +public class VenicePubsubInputScanBuilder implements ScanBuilder { + private final VeniceProperties jobConfig; + + public VenicePubsubInputScanBuilder(VeniceProperties properties) { + this.jobConfig = properties; + } + + @Override + public Scan build() { + return new VenicePubsubInputScan(jobConfig); + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java new file mode 100644 index 0000000000..568778c098 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java @@ -0,0 +1,52 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static com.linkedin.venice.spark.SparkConstants.*; + +import com.linkedin.venice.utils.VeniceProperties; +import java.util.Collections; +import java.util.Properties; +import java.util.Set; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + + +/** + * This is the entrypoint of the Pubsub input source. It is used by Spark to create a DataFrame from a Pubsub topic. + */ + +public class VenicePubsubInputTable implements SupportsRead { + static final String INPUT_TABLE_NAME = "venice_pubsub_table"; + private final VeniceProperties jobConfig; + + public VenicePubsubInputTable(VeniceProperties jobConfig) { + this.jobConfig = jobConfig; + // + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + Properties properties = jobConfig.getPropertiesCopy(); + properties.putAll(options.asCaseSensitiveMap()); + + return new VenicePubsubInputScanBuilder(new VeniceProperties(properties)); // should we flip this to + // VeniceProperties? + } + + @Override + public String name() { + return INPUT_TABLE_NAME; + } + + @Override + public StructType schema() { + return KAFKA_INPUT_TABLE_SCHEMA; + } + + @Override + public Set capabilities() { + return Collections.singleton(TableCapability.BATCH_READ); + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java new file mode 100644 index 0000000000..e02aa0577d --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java @@ -0,0 +1,46 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static com.linkedin.venice.spark.SparkConstants.*; + +import com.linkedin.venice.utils.VeniceProperties; +import java.util.Map; +import java.util.Properties; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + + +public class VenicePubsubSource implements TableProvider { + public StructType inferSchema(CaseInsensitiveStringMap options) { + // there is no inference, the table is always created with the same schema + return KAFKA_INPUT_TABLE_SCHEMA; + } + + @Override + public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { + // we don't support partitioning, it comes from the kafka topic. + return TableProvider.super.inferPartitioning(options); + } + + @Override + public Table getTable(StructType schema, Transform[] partitioning, Map configs) { + Properties properties = new Properties(); + properties.putAll(configs); + // the properties here is the entry point for all the configurations + // we receive from the outer layer. + // schem and partitioning are useless and should be discarded? + // + + // VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(properties); + + // if we want to break the bag-chain , this is the place ! + return new VenicePubsubInputTable(new VeniceProperties(properties)); + } + + @Override + public boolean supportsExternalMetadata() { + return false; + } +} diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/PartitionSplittersTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/PartitionSplittersTest.java new file mode 100644 index 0000000000..86688150dd --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/PartitionSplittersTest.java @@ -0,0 +1,141 @@ +package com.linkedin.venice.spark.input.pubsub; + +import static org.testng.Assert.*; + +import java.util.*; +import org.testng.annotations.Test; + + +public class PartitionSplittersTest { + @Test + void fullPartitionSplitter_ReturnsCorrectSplits() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 100L)); + partitionOffsetsMap.put(1, Arrays.asList(100L, 200L)); + partitionOffsetsMap.put(2, Arrays.asList(0L, 20_000_000L)); + + Map>> result = PartitionSplitters.fullPartitionSplitter(partitionOffsetsMap); + + // result is a map of partition number to list of segments + // need to assert value and keys are correct based on input data + assertEquals(3, result.size()); + assertEquals(Arrays.asList(Arrays.asList(0L, 100L)), result.get(0)); + assertEquals(Arrays.asList(Arrays.asList(100L, 200L)), result.get(1)); + assertEquals(Arrays.asList(Arrays.asList(0L, 20_000_000L)), result.get(2)); + + } + + @Test + void segmentCountSplitter_SplitsCorrectly_CommonCase() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(20_000L, 145_408L)); + partitionOffsetsMap.put(1, Arrays.asList(25_000L, 170_067L)); + partitionOffsetsMap.put(2, Arrays.asList(30_000L, 150_022L)); + partitionOffsetsMap.put(3, Arrays.asList(21_000L, 190_031L)); + partitionOffsetsMap.put(4, Arrays.asList(22_000L, 145_343L)); + + final int intendedSplitCount = 40; + + Map>> result = + PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, intendedSplitCount); + + int totalPartitionsPresent = result.size(); + assertEquals(5, totalPartitionsPresent); // right number of partitions present in the result + + // count all the segments + int totalSegments = 0; + for (List> segments: result.values()) { + totalSegments += segments.size(); + } + assertTrue(intendedSplitCount <= totalSegments); // at least as many segments as intended + assertTrue(intendedSplitCount + totalPartitionsPresent > totalSegments); // at most intended + partition count + + assertEquals(8, result.get(0).size()); + assertEquals(9, result.get(1).size()); + + assertEquals(Arrays.asList(20_000L, 35_675L), result.get(0).get(0)); // spot check first segment + assertEquals(Arrays.asList(37_904L, 54_807L), result.get(3).get(1)); // spot check middle segments + assertEquals(Arrays.asList(25_000L, 41_118L), result.get(1).get(0)); // spot check middle segment + assertEquals(Arrays.asList(129_926L, 145_343L), result.get(4).get(7)); // spot check first segment + + } + + @Test + void segmentCountSplitter_SplitsCorrectly_EdgeCase1() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 150L)); + partitionOffsetsMap.put(3, Arrays.asList(110L, 200L)); + partitionOffsetsMap.put(2, Arrays.asList(50L, 70L)); + partitionOffsetsMap.put(1, Arrays.asList(20L, 23L)); + partitionOffsetsMap.put(5, Arrays.asList(20L, 400L)); + + final int intendedSplitCount = 9; + + Map>> result = + PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, intendedSplitCount); + + int totalPartitionsPresent = result.size(); + assertEquals(5, totalPartitionsPresent); // right number of partitions present in the result + + // count all the segments + int totalSegments = 0; + for (List> segments: result.values()) { + totalSegments += segments.size(); + } + assertTrue(intendedSplitCount <= totalSegments); // at least as many segments as intended + assertTrue(intendedSplitCount + totalPartitionsPresent > totalSegments); // at most intended + partition count + + assertEquals(3, result.get(0).size()); + assertEquals(1, result.get(1).size()); + + assertEquals(Arrays.asList(155L, 200L), result.get(3).get(1)); + assertEquals(Arrays.asList(20L, 23L), result.get(1).get(0)); + + } + + @Test + void segmentCountSplitter_HandlesSingleSegment() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 50L)); + + Map>> result = PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, 1); + + assertEquals(1, result.size()); + assertEquals(1, result.get(0).size()); + } + + @Test + void computeIntendedSplitLengthBasedOnCount_CalculatesCorrectly() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 97L)); + partitionOffsetsMap.put(1, Arrays.asList(100L, 903L)); + partitionOffsetsMap.put(2, Arrays.asList(0L, 2022L)); + + long result = PartitionSplitters.computeIntendedSplitLengthBasedOnCount(partitionOffsetsMap, 29); + + assertEquals(101L, result); + } + + @Test + void computeIntendedSplitCountBasedOnOffset_CalculatesCorrectly() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 100L)); + partitionOffsetsMap.put(1, Arrays.asList(100L, 200L)); + + long result = PartitionSplitters.computeIntendedSplitCountBasedOnOffset(partitionOffsetsMap, 50L); + + assertEquals(4, result); + } + + @Test + void computeTopicLengthInOffsets_CalculatesCorrectly() { + Map> partitionOffsetsMap = new HashMap<>(); + partitionOffsetsMap.put(0, Arrays.asList(0L, 101L)); + partitionOffsetsMap.put(1, Arrays.asList(99L, 203L)); + + Double result = PartitionSplitters.computeTopicLengthInOffsets(partitionOffsetsMap); + + assertEquals(205D, result); + } + +} diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java new file mode 100644 index 0000000000..98aae45d29 --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -0,0 +1,146 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_BROKER_URL; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_INPUT_TOPIC; +import static com.linkedin.venice.vpj.VenicePushJobConstants.KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.ImmutablePubSubMessage; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; +import com.linkedin.venice.utils.VeniceProperties; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.spark.sql.catalyst.InternalRow; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +public class VenicePubsubInputPartitionReaderTest { + private static final String KAFKA_MESSAGE_KEY_PREFIX = "key_"; + private static final String KAFKA_MESSAGE_VALUE_PREFIX = "value_"; + private static final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private VenicePubsubInputPartitionReader reader; + private VenicePubsubInputPartition inputPartition; + + @BeforeMethod + public void setUp() { + Properties jobConfig = new Properties(); + int startingOffset = 0; // starting offset other than 0 needs mocking of subscription ... + int endingOffset = 77; // total of 78 records + int targetPartitionNumber = 42; + + String topicName = "BigStrangePubSubTopic_V1_rt_r"; + + jobConfig.put(KAFKA_INPUT_BROKER_URL, "kafkaAddress"); + jobConfig.put(KAFKA_SOURCE_KEY_SCHEMA_STRING_PROP, ChunkedKeySuffix.SCHEMA$.toString()); + jobConfig.put(KAFKA_INPUT_TOPIC, topicName); + + PubSubConsumerAdapter consumer = mock(PubSubConsumerAdapter.class); + + long numRecords = endingOffset - startingOffset + 1; + List> consumerRecordList = new ArrayList<>(); + + PubSubTopicPartition pubSubTopicPartition = + new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topicName), targetPartitionNumber); + + // fill the topic message array + for (int i = startingOffset; i < numRecords; ++i) { + byte[] keyBytes = (KAFKA_MESSAGE_KEY_PREFIX + i).getBytes(); + byte[] valueBytes = (KAFKA_MESSAGE_VALUE_PREFIX + i).getBytes(); + + KafkaKey kafkaKey = new KafkaKey(PUT, keyBytes); + KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); + messageEnvelope.producerMetadata = new ProducerMetadata(); + messageEnvelope.producerMetadata.messageTimestamp = 0; + messageEnvelope.producerMetadata.messageSequenceNumber = 0; + messageEnvelope.producerMetadata.segmentNumber = 0; + messageEnvelope.producerMetadata.producerGUID = new GUID(); + Put put = new Put(); + put.schemaId = 42; // shouldn't go with -1, -1 is reserved for chunking . + put.putValue = ByteBuffer.wrap(valueBytes); + put.replicationMetadataPayload = ByteBuffer.allocate(0); + messageEnvelope.payloadUnion = put; + consumerRecordList.add(new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, i, -1, -1)); + } + + Map>> recordsMap = new HashMap<>(); + recordsMap.put(pubSubTopicPartition, consumerRecordList); + when(consumer.poll(anyLong())).thenReturn(recordsMap, new HashMap<>()); + + inputPartition = + new VenicePubsubInputPartition("prod-lva2", topicName, targetPartitionNumber, startingOffset, endingOffset); + + reader = new VenicePubsubInputPartitionReader( + new VeniceProperties(jobConfig), + inputPartition, + consumer, + pubSubTopicRepository); + } + + /* + * Test method for {@link com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartitionReader#next()}. + * Runs through the records once and tests if the next() works correctly for both presence and absence of recrods. + * Also ensures that the stats are recording the stats correctly. + */ + @Test + public void testNextAndGetStats() { + + System.out.println(); + assertTrue(reader.next()); // first record 0 + + for (int i = 0; i < 10; i++) { // 10 gets just to break the balance + reader.get(); + } + + for (int i = 1; i < 77; i++) { // 78 records expected + reader.get(); + assertTrue(reader.next()); + } + + // skipped is zero in these cases , no control message in data + // currentOffset, recordConvertedToRow, recordsSkipped, recordsDeliveredByGet + List stats = reader.getStats(); + // since the offset starts at 0 for this test, we expect the offset to be equal to sum of skipped and converted + assertEquals((long) stats.get(0), (long) stats.get(1) + stats.get(2)); + + assertEquals(Arrays.asList(77L, 77L, 0L, 86L), reader.getStats()); + + assertFalse(reader.next()); + assertEquals(Arrays.asList(77L, 77L, 0L, 86L), reader.getStats()); + reader.close(); + } + + @Test + public void testGet() { + for (int i = 0; i < 77; i++) { // 78 records expected + InternalRow row = reader.get(); + long offset = row.getLong(0); + byte[] key = row.getBinary(1); + byte[] value = row.getBinary(2); + + assertEquals(key, (KAFKA_MESSAGE_KEY_PREFIX + offset).getBytes()); + assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); + } + } +} diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java new file mode 100644 index 0000000000..5f6e53a90a --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionTest.java @@ -0,0 +1,39 @@ +package com.linkedin.venice.spark.input.pubsub.table; + +import static org.testng.Assert.*; + +import org.apache.commons.lang3.SerializationUtils; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +public class VenicePubsubInputPartitionTest { + VenicePubsubInputPartition targetObject; + + @BeforeTest + public void setUp() { + targetObject = + new VenicePubsubInputPartition("prod-lva2", "BigStrangePubSubTopic_V1_rt_r", 42, 49_152, 100_000_000); + } + + @Test + public void testSerializablity() { + + byte[] serialized = SerializationUtils.serialize(targetObject); + VenicePubsubInputPartition deserializedObject = SerializationUtils.deserialize(serialized); + + assertEquals(deserializedObject.getTopicName(), targetObject.getTopicName()); + assertEquals(deserializedObject.getPartitionNumber(), targetObject.getPartitionNumber()); + assertEquals(deserializedObject.getSegmentStartOffset(), targetObject.getSegmentStartOffset()); + assertEquals(deserializedObject.getSegmentEndOffset(), targetObject.getSegmentEndOffset()); + + // object should give back exactly what was put in. + assertEquals(deserializedObject.getRegion(), "prod-lva2"); + assertEquals(deserializedObject.getTopicName(), "BigStrangePubSubTopic_V1_rt_r"); + assertEquals(deserializedObject.getSegmentEndOffset(), 100_000_000); + assertEquals(deserializedObject.getSegmentStartOffset(), 49152); + assertEquals(deserializedObject.getPartitionNumber(), 42); + + } + +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java index 0424e185f0..b10cd0349d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/hadoop/input/kafka/TestKafkaInputFormat.java @@ -58,7 +58,7 @@ public void cleanUp() throws IOException { pubSubBrokerWrapper.close(); } - public String getTopic(int numRecord, int numPartition) { + public String getNewTopicContainingRecords(int numRecord, int numPartition) { String topicName = Utils.getUniqueString("test_kafka_input_format") + "_v1"; manager.createTopic(pubSubTopicRepository.getTopic(topicName), numPartition, 1, true); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = @@ -122,7 +122,7 @@ public int compare(KafkaInputSplit o1, KafkaInputSplit o2) { @Test public void testGetSplits() throws IOException { KafkaInputFormat kafkaInputFormat = new KafkaInputFormat(); - String topic = getTopic(1000, 3); + String topic = getNewTopicContainingRecords(1000, 3); JobConf conf = new JobConf(); conf.set(KAFKA_INPUT_BROKER_URL, pubSubBrokerWrapper.getAddress()); conf.set(KAFKA_INPUT_TOPIC, topic);