From 7d989f2cf22f2da4c7006008186ecf4e9f3158bc Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 4 Jan 2025 12:05:07 -0800 Subject: [PATCH 1/2] HDDS-12007. BlockDataStreamOutput should only send one PutBlock during close. --- .../server/ratis/ContainerStateMachine.java | 25 ++----------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 23be4138b60..1cb638cfb6d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -689,29 +689,8 @@ public CompletableFuture link(DataStream stream, LogEntryProto entry) { final KeyValueStreamDataChannel kvStreamDataChannel = (KeyValueStreamDataChannel) dataChannel; - - final ContainerCommandRequestProto request = - kvStreamDataChannel.getPutBlockRequest(); - - return link(request, entry).whenComplete((response, e) -> { - if (e != null) { - LOG.warn("Failed to link logEntry {} for request {}", - TermIndex.valueOf(entry), request, e); - } - if (response != null) { - final ContainerProtos.Result result = response.getResult(); - if (LOG.isDebugEnabled()) { - LOG.debug("{} to link logEntry {} for request {}, response: {}", - result, TermIndex.valueOf(entry), request, response); - } - if (result == ContainerProtos.Result.SUCCESS) { - kvStreamDataChannel.setLinked(); - return; - } - } - // failed to link, cleanup - kvStreamDataChannel.cleanUp(); - }); + kvStreamDataChannel.setLinked(); + return CompletableFuture.completedFuture(null); } private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { From 6807d70ec5a2f320638e2ce35593db8291b177ad Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 8 Jan 2025 11:48:01 -0800 Subject: [PATCH 2/2] Remove KeyValueStreamDataChannel.putBlockRequest and other unused code. --- .../scm/storage/BlockDataStreamOutput.java | 4 ++ .../server/ratis/ContainerStateMachine.java | 15 ----- .../impl/KeyValueStreamDataChannel.java | 65 ++++--------------- .../impl/TestKeyValueStreamDataChannel.java | 65 ++++++++++++++++++- 4 files changed, 79 insertions(+), 70 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 8c2883a4374..342fcaba9af 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -410,6 +410,10 @@ public void executePutBlock(boolean close, waitFuturesComplete(); final BlockData blockData = containerBlockData.build(); if (close) { + // HDDS-12007 changed datanodes to ignore the following PutBlock request. + // However, clients still have to send it for maintaining compatibility. + // Otherwise, new clients won't send a PutBlock. + // Then, old datanodes will fail since they expect a PutBlock. final ContainerCommandRequestProto putBlockRequest = ContainerProtocolCalls.getPutBlockRequest( xceiverClient.getPipeline(), blockData, true, tokenString); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 1cb638cfb6d..a0325311621 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -524,21 +524,6 @@ private ContainerCommandResponseProto dispatchCommand( return response; } - private CompletableFuture link( - ContainerCommandRequestProto requestProto, LogEntryProto entry) { - return CompletableFuture.supplyAsync(() -> { - final DispatcherContext context = DispatcherContext - .newBuilder(DispatcherContext.Op.STREAM_LINK) - .setTerm(entry.getTerm()) - .setLogIndex(entry.getIndex()) - .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA) - .setContainer2BCSIDMap(container2BCSIDMap) - .build(); - - return dispatchCommand(requestProto, context); - }, executor); - } - private CompletableFuture writeStateMachineData( ContainerCommandRequestProto requestProto, long entryIndex, long term, long startTime) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index 7500860229d..52838aff2e2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -20,14 +20,11 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; -import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; import org.apache.ratis.util.ReferenceCountedObject; import org.slf4j.Logger; @@ -36,9 +33,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * This class is used to get the DataChannel for streaming. @@ -53,8 +48,6 @@ interface WriteMethod { private final Buffers buffers = new Buffers( BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX); - private final AtomicReference putBlockRequest - = new AtomicReference<>(); private final AtomicBoolean closed = new AtomicBoolean(); KeyValueStreamDataChannel(File file, ContainerData containerData, @@ -90,7 +83,7 @@ static int writeBuffers(ReferenceCountedObject src, return src.get().remaining(); } - private static void writeFully(ByteBuffer b, WriteMethod writeMethod) + static void writeFully(ByteBuffer b, WriteMethod writeMethod) throws IOException { while (b.remaining() > 0) { final int written = writeMethod.applyAsInt(b); @@ -100,11 +93,6 @@ private static void writeFully(ByteBuffer b, WriteMethod writeMethod) } } - public ContainerCommandRequestProto getPutBlockRequest() { - return Objects.requireNonNull(putBlockRequest.get(), - () -> "putBlockRequest == null, " + this); - } - void assertOpen() throws IOException { if (closed.get()) { throw new IOException("Already closed: " + this); @@ -115,7 +103,7 @@ void assertOpen() throws IOException { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { - putBlockRequest.set(closeBuffers(buffers, super::writeFileChannel)); + writeBuffers(); } finally { super.close(); } @@ -130,22 +118,23 @@ protected void cleanupInternal() throws IOException { } } - static ContainerCommandRequestProto closeBuffers( - Buffers buffers, WriteMethod writeMethod) throws IOException { + /** + * Write the data in {@link #buffers} to the channel. + * Note that the PutBlock proto at the end is ignored; see HDDS-12007. + */ + private void writeBuffers() throws IOException { final ReferenceCountedObject ref = buffers.pollAll(); final ByteBuf buf = ref.retain(); - final ContainerCommandRequestProto putBlockRequest; try { - putBlockRequest = readPutBlockRequest(buf); + setEndIndex(buf); // write the remaining data - writeFully(buf.nioBuffer(), writeMethod); + writeFully(buf.nioBuffer(), super::writeFileChannel); } finally { ref.release(); } - return putBlockRequest; } - private static int readProtoLength(ByteBuf b, int lengthIndex) { + static int readProtoLength(ByteBuf b, int lengthIndex) { final int readerIndex = b.readerIndex(); LOG.debug("{}, lengthIndex = {}, readerIndex = {}", b, lengthIndex, readerIndex); @@ -158,8 +147,8 @@ private static int readProtoLength(ByteBuf b, int lengthIndex) { return b.nioBuffer().getInt(); } - static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) - throws IOException { + /** Set end index to the proto index in order to ignore the proto. */ + static void setEndIndex(ByteBuf b) { // readerIndex protoIndex lengthIndex readerIndex+readableBytes // V V V V // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---| @@ -168,37 +157,7 @@ static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) final int protoLength = readProtoLength(b.duplicate(), lengthIndex); final int protoIndex = lengthIndex - protoLength; - final ContainerCommandRequestProto proto; - try { - proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer()); - } catch (Throwable t) { - RatisHelper.debug(b, "catch", LOG); - throw new IOException("Failed to readPutBlockRequest from " + b - + ": readerIndex=" + readerIndex - + ", protoIndex=" + protoIndex - + ", protoLength=" + protoLength - + ", lengthIndex=" + lengthIndex, t); - } - // set index for reading data b.writerIndex(protoIndex); - - return proto; - } - - private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b) - throws IOException { - RatisHelper.debug(b, "readPutBlockRequest", LOG); - final ByteString byteString = ByteString.copyFrom(b); - - final ContainerCommandRequestProto request = - ContainerCommandRequestMessage.toProto(byteString, null); - - if (!request.hasPutBlock()) { - throw new StorageContainerException( - "Malformed PutBlock request. trace ID: " + request.getTraceID(), - ContainerProtos.Result.MALFORMED_REQUEST); - } - return request; } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java index e6067e5c560..99793a0201f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java @@ -22,10 +22,14 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; +import org.apache.hadoop.hdds.ratis.RatisHelper; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.io.FilePositionCount; import org.apache.ratis.io.StandardWriteOption; @@ -58,9 +62,8 @@ import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.PUT_BLOCK_REQUEST_LENGTH_MAX; import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.executePutBlockClose; import static org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput.getProtoLength; -import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.closeBuffers; -import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.readPutBlockRequest; import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeBuffers; +import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeFully; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -106,6 +109,49 @@ public void testSerialization() throws Exception { assertEquals(PUT_BLOCK_PROTO, proto); } + static ContainerCommandRequestProto readPutBlockRequest(ByteBuf b) throws IOException { + // readerIndex protoIndex lengthIndex readerIndex+readableBytes + // V V V V + // format: |--- data ---|--- proto ---|--- proto length (4 bytes) ---| + final int readerIndex = b.readerIndex(); + final int lengthIndex = readerIndex + b.readableBytes() - 4; + final int protoLength = KeyValueStreamDataChannel.readProtoLength(b.duplicate(), lengthIndex); + final int protoIndex = lengthIndex - protoLength; + + final ContainerCommandRequestProto proto; + try { + proto = readPutBlockRequest(b.slice(protoIndex, protoLength).nioBuffer()); + } catch (Throwable t) { + RatisHelper.debug(b, "catch", LOG); + throw new IOException("Failed to readPutBlockRequest from " + b + + ": readerIndex=" + readerIndex + + ", protoIndex=" + protoIndex + + ", protoLength=" + protoLength + + ", lengthIndex=" + lengthIndex, t); + } + + // set index for reading data + b.writerIndex(protoIndex); + + return proto; + } + + private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b) + throws IOException { + RatisHelper.debug(b, "readPutBlockRequest", LOG); + final ByteString byteString = ByteString.copyFrom(b); + + final ContainerCommandRequestProto request = + ContainerCommandRequestMessage.toProto(byteString, null); + + if (!request.hasPutBlock()) { + throw new StorageContainerException( + "Malformed PutBlock request. trace ID: " + request.getTraceID(), + Result.MALFORMED_REQUEST); + } + return request; + } + @Test public void testBuffers() throws Exception { final ExecutorService executor = Executors.newFixedThreadPool(32); @@ -230,6 +276,21 @@ public CompletableFuture closeAsync() { new Reply(true, 0, putBlockRequest)); } + static ContainerCommandRequestProto closeBuffers( + Buffers buffers, WriteMethod writeMethod) throws IOException { + final ReferenceCountedObject ref = buffers.pollAll(); + final ByteBuf buf = ref.retain(); + final ContainerCommandRequestProto putBlockRequest; + try { + putBlockRequest = readPutBlockRequest(buf); + // write the remaining data + writeFully(buf.nioBuffer(), writeMethod); + } finally { + ref.release(); + } + return putBlockRequest; + } + @Override public CompletableFuture writeAsync( FilePositionCount filePositionCount, WriteOption... writeOptions) {