Skip to content

Commit

Permalink
HDDS-9734. ChunkInputStream should use new token after pipeline refre…
Browse files Browse the repository at this point in the history
…sh (apache#5664)
  • Loading branch information
adoroszlai authored Nov 23, 2023
1 parent a17a93b commit 60bb060
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
Expand All @@ -52,6 +52,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;

/**
* An {@link InputStream} called from KeyInputStream to read a block from the
* container.
Expand All @@ -65,8 +67,10 @@ public class BlockInputStream extends BlockExtendedInputStream {

private final BlockID blockID;
private final long length;
private Pipeline pipeline;
private Token<OzoneBlockTokenIdentifier> token;
private final AtomicReference<Pipeline> pipelineRef =
new AtomicReference<>();
private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
new AtomicReference<>();
private final boolean verifyChecksum;
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
Expand Down Expand Up @@ -113,8 +117,8 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Function<BlockID, BlockLocationInfo> refreshFunction) {
this.blockID = blockId;
this.length = blockLen;
this.pipeline = pipeline;
this.token = token;
setPipeline(pipeline);
tokenRef.set(token);
this.verifyChecksum = verifyChecksum;
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
Expand Down Expand Up @@ -143,7 +147,7 @@ public synchronized void initialize() throws IOException {
IOException catchEx = null;
do {
try {
chunks = getChunkInfos();
chunks = getChunkInfoList();
break;
// If we get a StorageContainerException or an IOException due to
// datanodes are not reachable, refresh to get the latest pipeline
Expand Down Expand Up @@ -203,7 +207,7 @@ private boolean isConnectivityIssue(IOException ex) {

private void refreshBlockInfo(IOException cause) throws IOException {
LOG.info("Unable to read information for block {} from pipeline {}: {}",
blockID, pipeline.getId(), cause.getMessage());
blockID, pipelineRef.get().getId(), cause.getMessage());
if (refreshFunction != null) {
LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
Expand All @@ -212,8 +216,8 @@ private void refreshBlockInfo(IOException cause) throws IOException {
} else {
LOG.debug("New pipeline for block {}: {}", blockID,
blockLocationInfo.getPipeline());
this.pipeline = blockLocationInfo.getPipeline();
this.token = blockLocationInfo.getToken();
setPipeline(blockLocationInfo.getPipeline());
tokenRef.set(blockLocationInfo.getToken());
}
} else {
throw cause;
Expand All @@ -224,46 +228,55 @@ private void refreshBlockInfo(IOException cause) throws IOException {
* Send RPC call to get the block info from the container.
* @return List of chunks in this block.
*/
protected List<ChunkInfo> getChunkInfos() throws IOException {
// irrespective of the container state, we will always read via Standalone
// protocol.
if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE && pipeline
.getType() != HddsProtos.ReplicationType.EC) {
pipeline = Pipeline.newBuilder(pipeline)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
ReplicationConfig
.getLegacyFactor(pipeline.getReplicationConfig())))
.build();
}
protected List<ChunkInfo> getChunkInfoList() throws IOException {
acquireClient();
try {
acquireClient();
} catch (IOException ioe) {
LOG.warn("Failed to acquire client for pipeline {}, block {}",
pipeline, blockID);
throw ioe;
return getChunkInfoListUsingClient();
} finally {
releaseClient();
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
}
}

DatanodeBlockID.Builder blkIDBuilder =
DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();

int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
if (replicaIndex > 0) {
blkIDBuilder.setReplicaIndex(replicaIndex);
}
GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClient, VALIDATORS, blkIDBuilder.build(), token);
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
}

return response.getBlockData().getChunksList();
} finally {
releaseClient();
DatanodeBlockID.Builder blkIDBuilder =
DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());

int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
if (replicaIndex > 0) {
blkIDBuilder.setReplicaIndex(replicaIndex);
}

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());

return response.getBlockData().getChunksList();
}

private void setPipeline(Pipeline pipeline) {
if (pipeline == null) {
return;
}

// irrespective of the container state, we will always read via Standalone
// protocol.
boolean okForRead =
pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
|| pipeline.getType() == HddsProtos.ReplicationType.EC;
Pipeline readPipeline = okForRead ? pipeline : Pipeline.newBuilder(pipeline)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
getLegacyFactor(pipeline.getReplicationConfig())))
.build();
pipelineRef.set(readPipeline);
}

private static final List<Validator> VALIDATORS
Expand All @@ -286,9 +299,16 @@ private static void validate(ContainerCommandResponseProto response)
}
}

protected void acquireClient() throws IOException {
private void acquireClient() throws IOException {
if (xceiverClientFactory != null && xceiverClient == null) {
xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
final Pipeline pipeline = pipelineRef.get();
try {
xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
} catch (IOException ioe) {
LOG.warn("Failed to acquire client for pipeline {}, block {}",
pipeline, blockID);
throw ioe;
}
}
}

Expand All @@ -303,7 +323,7 @@ protected synchronized void addStream(ChunkInfo chunkInfo) {

protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return new ChunkInputStream(chunkInfo, blockID,
xceiverClientFactory, () -> pipeline, verifyChecksum, token);
xceiverClientFactory, pipelineRef::get, verifyChecksum, tokenRef::get);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
Expand All @@ -37,7 +39,6 @@
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.EOFException;
Expand All @@ -48,9 +49,6 @@
import java.util.List;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An {@link InputStream} called from BlockInputStream to read a chunk from the
* container. Each chunk may contain multiple underlying {@link ByteBuffer}
Expand All @@ -59,16 +57,13 @@
public class ChunkInputStream extends InputStream
implements Seekable, CanUnbuffer, ByteBufferReadable {

private static final Logger LOG =
LoggerFactory.getLogger(ChunkInputStream.class);

private ChunkInfo chunkInfo;
private final ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private final XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final Supplier<Pipeline> pipelineSupplier;
private boolean verifyChecksum;
private final boolean verifyChecksum;
private boolean allocated = false;
// Buffers to store the chunk data read from the DN container
private ByteBuffer[] buffers;
Expand Down Expand Up @@ -100,21 +95,24 @@ public class ChunkInputStream extends InputStream
// retry. Once the chunk is read, this variable is reset.
private long chunkPosition = -1;

private final Token<? extends TokenIdentifier> token;
private final Supplier<Token<?>> tokenSupplier;

private static final int EOF = -1;
private final List<Validator> validators;

ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
XceiverClientFactory xceiverClientFactory,
Supplier<Pipeline> pipelineSupplier,
boolean verifyChecksum, Token<? extends TokenIdentifier> token) {
boolean verifyChecksum,
Supplier<Token<?>> tokenSupplier) {
this.chunkInfo = chunkInfo;
this.length = chunkInfo.getLen();
this.blockID = blockId;
this.xceiverClientFactory = xceiverClientFactory;
this.pipelineSupplier = pipelineSupplier;
this.verifyChecksum = verifyChecksum;
this.token = token;
this.tokenSupplier = tokenSupplier;
validators = ContainerProtocolCalls.toValidatorList(this::validateChunk);
}

public synchronized long getRemaining() {
Expand Down Expand Up @@ -422,13 +420,10 @@ private void readChunkDataIntoBuffers(ChunkInfo readChunkInfo)
@VisibleForTesting
protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {
ReadChunkResponseProto readChunkResponse;

List<Validator> validators =
ContainerProtocolCalls.toValidatorList(validator);

readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, token);
ReadChunkResponseProto readChunkResponse =
ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, tokenSupplier.get());

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand All @@ -443,55 +438,57 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
}
}

private final Validator validator =
(request, response) -> {
final ChunkInfo reqChunkInfo =
request.getReadChunk().getChunkData();

ReadChunkResponseProto readChunkResponse = response.getReadChunk();
List<ByteString> byteStrings;
boolean isV0 = false;

if (readChunkResponse.hasData()) {
ByteString byteString = readChunkResponse.getData();
if (byteString.size() != reqChunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String.format(
"Inconsistent read for chunk=%s len=%d bytesRead=%d",
reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
byteString.size()));
}
byteStrings = new ArrayList<>();
byteStrings.add(byteString);
isV0 = true;
} else {
byteStrings = readChunkResponse.getDataBuffers().getBuffersList();
long buffersLen = BufferUtils.getBuffersLen(byteStrings);
if (buffersLen != reqChunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String.format(
"Inconsistent read for chunk=%s len=%d bytesRead=%d",
reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
buffersLen));
}
}

if (verifyChecksum) {
ChecksumData checksumData = ChecksumData.getFromProtoBuf(
chunkInfo.getChecksumData());

// ChecksumData stores checksum for each 'numBytesPerChecksum'
// number of bytes in a list. Compute the index of the first
// checksum to match with the read data

long relativeOffset = reqChunkInfo.getOffset() -
chunkInfo.getOffset();
int bytesPerChecksum = checksumData.getBytesPerChecksum();
int startIndex = (int) (relativeOffset / bytesPerChecksum);
Checksum.verifyChecksum(byteStrings, checksumData, startIndex,
isV0);
}
};
private void validateChunk(
ContainerCommandRequestProto request,
ContainerCommandResponseProto response
) throws OzoneChecksumException {
final ChunkInfo reqChunkInfo =
request.getReadChunk().getChunkData();

ReadChunkResponseProto readChunkResponse = response.getReadChunk();
List<ByteString> byteStrings;
boolean isV0 = false;

if (readChunkResponse.hasData()) {
ByteString byteString = readChunkResponse.getData();
if (byteString.size() != reqChunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String.format(
"Inconsistent read for chunk=%s len=%d bytesRead=%d",
reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
byteString.size()));
}
byteStrings = new ArrayList<>();
byteStrings.add(byteString);
isV0 = true;
} else {
byteStrings = readChunkResponse.getDataBuffers().getBuffersList();
long buffersLen = BufferUtils.getBuffersLen(byteStrings);
if (buffersLen != reqChunkInfo.getLen()) {
// Bytes read from chunk should be equal to chunk size.
throw new OzoneChecksumException(String.format(
"Inconsistent read for chunk=%s len=%d bytesRead=%d",
reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
buffersLen));
}
}

if (verifyChecksum) {
ChecksumData checksumData = ChecksumData.getFromProtoBuf(
chunkInfo.getChecksumData());

// ChecksumData stores checksum for each 'numBytesPerChecksum'
// number of bytes in a list. Compute the index of the first
// checksum to match with the read data

long relativeOffset = reqChunkInfo.getOffset() -
chunkInfo.getOffset();
int bytesPerChecksum = checksumData.getBytesPerChecksum();
int startIndex = (int) (relativeOffset / bytesPerChecksum);
Checksum.verifyChecksum(byteStrings, checksumData, startIndex,
isV0);
}
}

/**
* Return the offset and length of bytes that need to be read from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DummyBlockInputStream extends BlockInputStream {
}

@Override
protected List<ChunkInfo> getChunkInfos() throws IOException {
protected List<ChunkInfo> getChunkInfoList() throws IOException {
return chunks;
}

Expand Down
Loading

0 comments on commit 60bb060

Please sign in to comment.