Skip to content

Commit

Permalink
HDDS-12007. BlockDataStreamOutput should only send one PutBlock durin…
Browse files Browse the repository at this point in the history
…g close. (#7645)
  • Loading branch information
szetszwo authored Jan 9, 2025
1 parent e21e724 commit f1f0ec3
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ public void executePutBlock(boolean close,
waitFuturesComplete();
final BlockData blockData = containerBlockData.build();
if (close) {
// HDDS-12007 changed datanodes to ignore the following PutBlock request.
// However, clients still have to send it for maintaining compatibility.
// Otherwise, new clients won't send a PutBlock.
// Then, old datanodes will fail since they expect a PutBlock.
final ContainerCommandRequestProto putBlockRequest
= ContainerProtocolCalls.getPutBlockRequest(
xceiverClient.getPipeline(), blockData, true, tokenString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,21 +524,6 @@ private ContainerCommandResponseProto dispatchCommand(
return response;
}

private CompletableFuture<ContainerCommandResponseProto> link(
ContainerCommandRequestProto requestProto, LogEntryProto entry) {
return CompletableFuture.supplyAsync(() -> {
final DispatcherContext context = DispatcherContext
.newBuilder(DispatcherContext.Op.STREAM_LINK)
.setTerm(entry.getTerm())
.setLogIndex(entry.getIndex())
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();

return dispatchCommand(requestProto, context);
}, executor);
}

private CompletableFuture<Message> writeStateMachineData(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
Expand Down Expand Up @@ -689,29 +674,8 @@ public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {

final KeyValueStreamDataChannel kvStreamDataChannel =
(KeyValueStreamDataChannel) dataChannel;

final ContainerCommandRequestProto request =
kvStreamDataChannel.getPutBlockRequest();

return link(request, entry).whenComplete((response, e) -> {
if (e != null) {
LOG.warn("Failed to link logEntry {} for request {}",
TermIndex.valueOf(entry), request, e);
}
if (response != null) {
final ContainerProtos.Result result = response.getResult();
if (LOG.isDebugEnabled()) {
LOG.debug("{} to link logEntry {} for request {}, response: {}",
result, TermIndex.valueOf(entry), request, response);
}
if (result == ContainerProtos.Result.SUCCESS) {
kvStreamDataChannel.setLinked();
return;
}
}
// failed to link, cleanup
kvStreamDataChannel.cleanUp();
});
kvStreamDataChannel.setLinked();
return CompletableFuture.completedFuture(null);
}

private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
Expand All @@ -36,9 +33,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* This class is used to get the DataChannel for streaming.
Expand All @@ -53,8 +48,6 @@ interface WriteMethod {

private final Buffers buffers = new Buffers(
BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX);
private final AtomicReference<ContainerCommandRequestProto> putBlockRequest
= new AtomicReference<>();
private final AtomicBoolean closed = new AtomicBoolean();

KeyValueStreamDataChannel(File file, ContainerData containerData,
Expand Down Expand Up @@ -90,7 +83,7 @@ static int writeBuffers(ReferenceCountedObject<ByteBuffer> src,
return src.get().remaining();
}

private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
static void writeFully(ByteBuffer b, WriteMethod writeMethod)
throws IOException {
while (b.remaining() > 0) {
final int written = writeMethod.applyAsInt(b);
Expand All @@ -100,11 +93,6 @@ private static void writeFully(ByteBuffer b, WriteMethod writeMethod)
}
}

public ContainerCommandRequestProto getPutBlockRequest() {
return Objects.requireNonNull(putBlockRequest.get(),
() -> "putBlockRequest == null, " + this);
}

void assertOpen() throws IOException {
if (closed.get()) {
throw new IOException("Already closed: " + this);
Expand All @@ -115,7 +103,7 @@ void assertOpen() throws IOException {
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel));
writeBuffers();
} finally {
super.close();
}
Expand All @@ -130,22 +118,23 @@ protected void cleanupInternal() throws IOException {
}
}

static ContainerCommandRequestProto closeBuffers(
Buffers buffers, WriteMethod writeMethod) throws IOException {
/**
* Write the data in {@link #buffers} to the channel.
* Note that the PutBlock proto at the end is ignored; see HDDS-12007.
*/
private void writeBuffers() throws IOException {
final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
final ByteBuf buf = ref.retain();
final ContainerCommandRequestProto putBlockRequest;
try {
putBlockRequest = readPutBlockRequest(buf);
setEndIndex(buf);
// write the remaining data
writeFully(buf.nioBuffer(), writeMethod);
writeFully(buf.nioBuffer(), super::writeFileChannel);
} finally {
ref.release();
}
return putBlockRequest;
}

private static int readProtoLength(ByteBuf b, int lengthIndex) {
static int readProtoLength(ByteBuf b, int lengthIndex) {
final int readerIndex = b.readerIndex();
LOG.debug("{}, lengthIndex = {}, readerIndex = {}",
b, lengthIndex, readerIndex);
Expand All @@ -158,8 +147,8 @@ private static int readProtoLength(ByteBuf b, int lengthIndex) {
return b.nioBuffer().getInt();
}

static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b)
throws IOException {
/** Set end index to the proto index in order to ignore the proto. */
static void setEndIndex(ByteBuf b) {
// readerIndex protoIndex lengthIndex readerIndex+readableBytes
// V V V V
// format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
Expand All @@ -168,37 +157,7 @@ static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b)
final int protoLength = readProtoLength(b.duplicate(), lengthIndex);
final int protoIndex = lengthIndex - protoLength;

final ContainerCommandRequestProto proto;
try {
proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer());
} catch (Throwable t) {
RatisHelper.debug(b, "catch", LOG);
throw new IOException("Failed to readPutBlockRequest from " + b
+ ": readerIndex=" + readerIndex
+ ", protoIndex=" + protoIndex
+ ", protoLength=" + protoLength
+ ", lengthIndex=" + lengthIndex, t);
}

// set index for reading data
b.writerIndex(protoIndex);

return proto;
}

private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
throws IOException {
RatisHelper.debug(b, "readPutBlockRequest", LOG);
final ByteString byteString = ByteString.copyFrom(b);

final ContainerCommandRequestProto request =
ContainerCommandRequestMessage.toProto(byteString, null);

if (!request.hasPutBlock()) {
throw new StorageContainerException(
"Malformed PutBlock request. trace ID: " + request.getTraceID(),
ContainerProtos.Result.MALFORMED_REQUEST);
}
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
Expand Down Expand Up @@ -58,9 +62,8 @@
import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX;
import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.executePutBlockClose;
import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.getProtoLength;
import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.closeBuffers;
import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.readPutBlockRequest;
import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeBuffers;
import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeFully;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -106,6 +109,49 @@ public void testSerialization() throws Exception {
assertEquals(PUT_BLOCK_PROTO, proto);
}

static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) throws IOException {
// readerIndex protoIndex lengthIndex readerIndex+readableBytes
// V V V V
// format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---|
final int readerIndex = b.readerIndex();
final int lengthIndex = readerIndex + b.readableBytes() - 4;
final int protoLength = KeyValueStreamDataChannel.readProtoLength(b.duplicate(), lengthIndex);
final int protoIndex = lengthIndex - protoLength;

final ContainerCommandRequestProto proto;
try {
proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer());
} catch (Throwable t) {
RatisHelper.debug(b, "catch", LOG);
throw new IOException("Failed to readPutBlockRequest from " + b
+ ": readerIndex=" + readerIndex
+ ", protoIndex=" + protoIndex
+ ", protoLength=" + protoLength
+ ", lengthIndex=" + lengthIndex, t);
}

// set index for reading data
b.writerIndex(protoIndex);

return proto;
}

private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
throws IOException {
RatisHelper.debug(b, "readPutBlockRequest", LOG);
final ByteString byteString = ByteString.copyFrom(b);

final ContainerCommandRequestProto request =
ContainerCommandRequestMessage.toProto(byteString, null);

if (!request.hasPutBlock()) {
throw new StorageContainerException(
"Malformed PutBlock request. trace ID: " + request.getTraceID(),
Result.MALFORMED_REQUEST);
}
return request;
}

@Test
public void testBuffers() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(32);
Expand Down Expand Up @@ -230,6 +276,21 @@ public CompletableFuture<DataStreamReply> closeAsync() {
new Reply(true, 0, putBlockRequest));
}

static ContainerCommandRequestProto closeBuffers(
Buffers buffers, WriteMethod writeMethod) throws IOException {
final ReferenceCountedObject<ByteBuf> ref = buffers.pollAll();
final ByteBuf buf = ref.retain();
final ContainerCommandRequestProto putBlockRequest;
try {
putBlockRequest = readPutBlockRequest(buf);
// write the remaining data
writeFully(buf.nioBuffer(), writeMethod);
} finally {
ref.release();
}
return putBlockRequest;
}

@Override
public CompletableFuture<DataStreamReply> writeAsync(
FilePositionCount filePositionCount, WriteOption... writeOptions) {
Expand Down

0 comments on commit f1f0ec3

Please sign in to comment.