Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1938] improvement: Use ShuffleSegment to replace FileBasedShuffleSegment and BufferSegment #1939

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleSegment;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
Expand All @@ -62,7 +62,7 @@ public class ShuffleReadClientImpl implements ShuffleReadClient {
private Roaring64NavigableMap taskIdBitmap;
private Roaring64NavigableMap pendingBlockIds;
private Roaring64NavigableMap processedBlockIds = Roaring64NavigableMap.bitmapOf();
private Queue<BufferSegment> bufferSegmentQueue = Queues.newLinkedBlockingQueue();
private Queue<ShuffleSegment> shuffleSegmentQueue = Queues.newLinkedBlockingQueue();
private AtomicLong readDataTime = new AtomicLong(0);
private AtomicLong copyTime = new AtomicLong(0);
private AtomicLong crcCheckTime = new AtomicLong(0);
Expand Down Expand Up @@ -212,19 +212,19 @@ public CompressedShuffleBlock readShuffleBlockData() {
}

// if client need request new data from shuffle server
if (bufferSegmentQueue.isEmpty()) {
if (shuffleSegmentQueue.isEmpty()) {
if (read() <= 0) {
return null;
}
}

// get next buffer segment
BufferSegment bs = null;
ShuffleSegment bs = null;

// blocks in bufferSegmentQueue may be from different partition in range partition mode,
// or may be from speculation task, filter them and just read the necessary block
while (true) {
bs = bufferSegmentQueue.poll();
bs = shuffleSegmentQueue.poll();
if (bs == null) {
break;
}
Expand Down Expand Up @@ -317,7 +317,7 @@ private int read() {
if (readBuffer == null || readBuffer.capacity() == 0) {
return 0;
}
bufferSegmentQueue.addAll(sdr.getBufferSegments());
shuffleSegmentQueue.addAll(sdr.getBufferSegments());
return sdr.getBufferSegments().size();
}

Expand Down
13 changes: 7 additions & 6 deletions client/src/test/java/org/apache/uniffle/client/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleSegment;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -65,19 +65,20 @@ public static void validateResult(ShuffleReadClient readClient, Map<Long, byte[]

public static void validateResult(Map<Long, byte[]> expectedData, ShuffleDataResult sdr) {
byte[] buffer = sdr.getData();
List<BufferSegment> bufferSegments = sdr.getBufferSegments();
assertEquals(expectedData.size(), bufferSegments.size());
List<ShuffleSegment> shuffleSegments = sdr.getBufferSegments();
assertEquals(expectedData.size(), shuffleSegments.size());
for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
BufferSegment bs = findBufferSegment(entry.getKey(), bufferSegments);
ShuffleSegment bs = findBufferSegment(entry.getKey(), shuffleSegments);
assertNotNull(bs);
byte[] data = new byte[bs.getLength()];
System.arraycopy(buffer, bs.getOffset(), data, 0, bs.getLength());
assertArrayEquals(entry.getValue(), data);
}
}

private static BufferSegment findBufferSegment(long blockId, List<BufferSegment> bufferSegments) {
for (BufferSegment bs : bufferSegments) {
private static ShuffleSegment findBufferSegment(
long blockId, List<ShuffleSegment> shuffleSegments) {
for (ShuffleSegment bs : shuffleSegments) {
if (bs.getBlockId() == blockId) {
return bs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class ShuffleDataResult {

private final ManagedBuffer buffer;
private final List<BufferSegment> bufferSegments;
private final List<ShuffleSegment> shuffleSegments;

public ShuffleDataResult() {
this(new byte[0]);
Expand All @@ -43,26 +43,26 @@ public ShuffleDataResult(byte[] data) {

public ShuffleDataResult(ManagedBuffer buffer) {
this.buffer = buffer;
this.bufferSegments = Lists.newArrayList();
this.shuffleSegments = Lists.newArrayList();
}

public ShuffleDataResult(ByteBuffer data, List<BufferSegment> bufferSegments) {
public ShuffleDataResult(ByteBuffer data, List<ShuffleSegment> shuffleSegments) {
this.buffer =
new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER);
this.bufferSegments = bufferSegments;
this.shuffleSegments = shuffleSegments;
}

public ShuffleDataResult(ByteBuf data, List<BufferSegment> bufferSegments) {
this(new NettyManagedBuffer(data), bufferSegments);
public ShuffleDataResult(ByteBuf data, List<ShuffleSegment> shuffleSegments) {
this(new NettyManagedBuffer(data), shuffleSegments);
}

public ShuffleDataResult(byte[] data, List<BufferSegment> bufferSegments) {
this(data != null ? ByteBuffer.wrap(data) : null, bufferSegments);
public ShuffleDataResult(byte[] data, List<ShuffleSegment> shuffleSegments) {
this(data != null ? ByteBuffer.wrap(data) : null, shuffleSegments);
}

public ShuffleDataResult(ManagedBuffer data, List<BufferSegment> bufferSegments) {
public ShuffleDataResult(ManagedBuffer data, List<ShuffleSegment> shuffleSegments) {
this.buffer = data;
this.bufferSegments = bufferSegments;
this.shuffleSegments = shuffleSegments;
}

public byte[] getData() {
Expand Down Expand Up @@ -94,13 +94,13 @@ public ManagedBuffer getManagedBuffer() {
return buffer;
}

public List<BufferSegment> getBufferSegments() {
return bufferSegments;
public List<ShuffleSegment> getBufferSegments() {
return shuffleSegments;
}

public boolean isEmpty() {
return bufferSegments == null
|| bufferSegments.isEmpty()
return shuffleSegments == null
|| shuffleSegments.isEmpty()
|| buffer == null
|| buffer.size() == 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
public class ShuffleDataSegment {
private final long offset;
private final int length;
private final List<BufferSegment> bufferSegments;
private final List<ShuffleSegment> shuffleSegments;

public ShuffleDataSegment(long offset, int length, List<BufferSegment> bufferSegments) {
public ShuffleDataSegment(long offset, int length, List<ShuffleSegment> shuffleSegments) {
this.offset = offset;
this.length = length;
this.bufferSegments = bufferSegments;
this.shuffleSegments = shuffleSegments;
}

public long getOffset() {
Expand All @@ -43,7 +43,7 @@ public int getLength() {
return length;
}

public List<BufferSegment> getBufferSegments() {
return bufferSegments;
public List<ShuffleSegment> getBufferSegments() {
return shuffleSegments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import org.apache.uniffle.common.exception.RssException;

public class BufferSegment {
public class ShuffleSegment implements java.io.Serializable, Comparable<ShuffleSegment> {

public static final int SEGMENT_SIZE = 4 * Long.BYTES + 2 * Integer.BYTES;

private long blockId;
private long offset;
Expand All @@ -30,7 +32,7 @@ public class BufferSegment {
private long crc;
private long taskAttemptId;

public BufferSegment(
public ShuffleSegment(
long blockId, long offset, int length, int uncompressLength, long crc, long taskAttemptId) {
this.blockId = blockId;
this.offset = offset;
Expand All @@ -42,13 +44,13 @@ public BufferSegment(

@Override
public boolean equals(Object obj) {
if (obj instanceof BufferSegment) {
return blockId == ((BufferSegment) obj).getBlockId()
&& offset == ((BufferSegment) obj).getOffset()
&& length == ((BufferSegment) obj).getLength()
&& uncompressLength == ((BufferSegment) obj).getUncompressLength()
&& crc == ((BufferSegment) obj).getCrc()
&& taskAttemptId == ((BufferSegment) obj).getTaskAttemptId();
if (obj instanceof ShuffleSegment) {
return blockId == ((ShuffleSegment) obj).getBlockId()
&& offset == ((ShuffleSegment) obj).getOffset()
&& length == ((ShuffleSegment) obj).getLength()
&& uncompressLength == ((ShuffleSegment) obj).getUncompressLength()
&& crc == ((ShuffleSegment) obj).getCrc()
&& taskAttemptId == ((ShuffleSegment) obj).getTaskAttemptId();
}
return false;
}
Expand Down Expand Up @@ -101,4 +103,14 @@ public int getUncompressLength() {
public long getTaskAttemptId() {
return taskAttemptId;
}

@Override
public int compareTo(ShuffleSegment s) {
if (this.offset > s.getOffset()) {
return 1;
} else if (this.offset < s.getOffset()) {
return -1;
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleSegment;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.NettyUtils;
Expand Down Expand Up @@ -84,8 +84,8 @@ public static Map<Integer, List<Long>> decodePartitionToBlockIds(ByteBuf byteBuf
return partitionToBlockIds;
}

public static List<BufferSegment> decodeBufferSegments(ByteBuf byteBuf) {
List<BufferSegment> bufferSegments = Lists.newArrayList();
public static List<ShuffleSegment> decodeBufferSegments(ByteBuf byteBuf) {
List<ShuffleSegment> shuffleSegments = Lists.newArrayList();
int size = byteBuf.readInt();
for (int i = 0; i < size; i++) {
long blockId = byteBuf.readLong();
Expand All @@ -94,10 +94,10 @@ public static List<BufferSegment> decodeBufferSegments(ByteBuf byteBuf) {
int uncompressLength = byteBuf.readInt();
long crc = byteBuf.readLong();
long taskAttemptId = byteBuf.readLong();
BufferSegment bufferSegment =
new BufferSegment(blockId, offset, length, uncompressLength, crc, taskAttemptId);
bufferSegments.add(bufferSegment);
ShuffleSegment shuffleSegment =
new ShuffleSegment(blockId, offset, length, uncompressLength, crc, taskAttemptId);
shuffleSegments.add(shuffleSegment);
}
return bufferSegments;
return shuffleSegments;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import io.netty.buffer.ByteBuf;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleSegment;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.ByteBufUtils;

Expand Down Expand Up @@ -80,19 +80,19 @@ public static void encodePartitionRanges(List<PartitionRange> partitionRanges, B
}
}

public static void encodeBufferSegments(List<BufferSegment> bufferSegments, ByteBuf byteBuf) {
byteBuf.writeInt(bufferSegments.size());
for (BufferSegment bufferSegment : bufferSegments) {
byteBuf.writeLong(bufferSegment.getBlockId());
byteBuf.writeInt(bufferSegment.getOffset());
byteBuf.writeInt(bufferSegment.getLength());
byteBuf.writeInt(bufferSegment.getUncompressLength());
byteBuf.writeLong(bufferSegment.getCrc());
byteBuf.writeLong(bufferSegment.getTaskAttemptId());
public static void encodeBufferSegments(List<ShuffleSegment> shuffleSegments, ByteBuf byteBuf) {
byteBuf.writeInt(shuffleSegments.size());
for (ShuffleSegment shuffleSegment : shuffleSegments) {
byteBuf.writeLong(shuffleSegment.getBlockId());
byteBuf.writeInt(shuffleSegment.getOffset());
byteBuf.writeInt(shuffleSegment.getLength());
byteBuf.writeInt(shuffleSegment.getUncompressLength());
byteBuf.writeLong(shuffleSegment.getCrc());
byteBuf.writeLong(shuffleSegment.getTaskAttemptId());
}
}

public static int encodeLengthOfBufferSegments(List<BufferSegment> bufferSegments) {
return Integer.BYTES + bufferSegments.size() * (3 * Long.BYTES + 3 * Integer.BYTES);
public static int encodeLengthOfBufferSegments(List<ShuffleSegment> shuffleSegments) {
return Integer.BYTES + shuffleSegments.size() * (3 * Long.BYTES + 3 * Integer.BYTES);
}
}
Loading
Loading