Skip to content

Commit

Permalink
Merge branch 'apache:master' into HDDS-9827
Browse files Browse the repository at this point in the history
  • Loading branch information
Tejaskriya authored Dec 18, 2023
2 parents 7b052fd + 4ca41a1 commit 2336f22
Show file tree
Hide file tree
Showing 272 changed files with 4,806 additions and 5,893 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ jobs:
- name: Untar sources
run: |
tar --strip-components 1 -xzvf ozone*-src.tar.gz
- name: Workaround for HADOOP-19011
run: |
git init
git config user.name 'Github Actions'
git config user.email '[email protected]'
git commit --allow-empty -a -m 'workaround for HADOOP-19011'
- name: Cache for maven dependencies
uses: actions/cache@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ We use GitHub pull requests for contributing changes to the repository. The main
Basic code conventions followed by Ozone:

* 2 spaces indentation
* 80-char line length limit
* 120-char line length limit
* Apache license header required in most files
* no `@author` tags, authorship is indicated by Git history

Expand Down
4 changes: 2 additions & 2 deletions hadoop-hdds/annotations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds</artifactId>
<version>1.4.0-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
</parent>

<artifactId>hdds-annotation-processing</artifactId>
<version>1.4.0-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<description>Apache Ozone annotation processing tools for validating custom
annotations at compile time.
</description>
Expand Down
4 changes: 2 additions & 2 deletions hadoop-hdds/client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds</artifactId>
<version>1.4.0-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
</parent>

<artifactId>hdds-client</artifactId>
<version>1.4.0-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<description>Apache Ozone Distributed Data Store Client Library</description>
<name>Apache Ozone HDDS Client</name>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
Expand Down Expand Up @@ -254,6 +255,7 @@ public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}

@VisibleForTesting
public void setStreamBufferFlushSize(long streamBufferFlushSize) {
this.streamBufferFlushSize = streamBufferFlushSize;
}
Expand All @@ -262,6 +264,7 @@ public int getStreamBufferSize() {
return streamBufferSize;
}

@VisibleForTesting
public void setStreamBufferSize(int streamBufferSize) {
this.streamBufferSize = streamBufferSize;
}
Expand All @@ -270,6 +273,7 @@ public boolean isStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}

@VisibleForTesting
public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
this.streamBufferFlushDelay = streamBufferFlushDelay;
}
Expand All @@ -278,6 +282,7 @@ public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}

@VisibleForTesting
public void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;

/**
* This class encapsulates the arguments that are
* required for Ozone client StreamBuffer.
*/
public class StreamBufferArgs {

private int streamBufferSize;
private long streamBufferFlushSize;
private long streamBufferMaxSize;
private boolean streamBufferFlushDelay;

protected StreamBufferArgs(Builder builder) {
this.streamBufferSize = builder.bufferSize;
this.streamBufferFlushSize = builder.bufferFlushSize;
this.streamBufferMaxSize = builder.bufferMaxSize;
this.streamBufferFlushDelay = builder.streamBufferFlushDelay;
}

public int getStreamBufferSize() {
return streamBufferSize;
}

public long getStreamBufferFlushSize() {
return streamBufferFlushSize;
}

public long getStreamBufferMaxSize() {
return streamBufferMaxSize;
}

public boolean isStreamBufferFlushDelay() {
return streamBufferFlushDelay;
}

public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
this.streamBufferFlushDelay = streamBufferFlushDelay;
}

protected void setStreamBufferSize(int streamBufferSize) {
this.streamBufferSize = streamBufferSize;
}

protected void setStreamBufferFlushSize(long streamBufferFlushSize) {
this.streamBufferFlushSize = streamBufferFlushSize;
}

protected void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}

/**
* Builder class for StreamBufferArgs.
*/
public static class Builder {
private int bufferSize;
private long bufferFlushSize;
private long bufferMaxSize;
private boolean streamBufferFlushDelay;

public Builder setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public Builder setBufferFlushSize(long bufferFlushSize) {
this.bufferFlushSize = bufferFlushSize;
return this;
}

public Builder setBufferMaxSize(long bufferMaxSize) {
this.bufferMaxSize = bufferMaxSize;
return this;
}

public Builder setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
this.streamBufferFlushDelay = streamBufferFlushDelay;
return this;
}

public StreamBufferArgs build() {
return new StreamBufferArgs(this);
}

public static Builder getNewBuilder() {
return new Builder();
}
}

public static StreamBufferArgs getDefaultStreamBufferArgs(
ReplicationConfig replicationConfig, OzoneClientConfig clientConfig) {
int bufferSize;
long flushSize;
long bufferMaxSize;
boolean streamBufferFlushDelay = clientConfig.isStreamBufferFlushDelay();
if (replicationConfig.getReplicationType() == HddsProtos.ReplicationType.EC) {
bufferSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
flushSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
bufferMaxSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
} else {
bufferSize = clientConfig.getStreamBufferSize();
flushSize = clientConfig.getStreamBufferFlushSize();
bufferMaxSize = clientConfig.getStreamBufferMaxSize();
}

return Builder.getNewBuilder()
.setBufferSize(bufferSize)
.setBufferFlushSize(flushSize)
.setBufferMaxSize(bufferMaxSize)
.setStreamBufferFlushDelay(streamBufferFlushDelay)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class BlockOutputStream extends OutputStream {
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private OzoneClientConfig config;
private StreamBufferArgs streamBufferArgs;

private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
Expand Down Expand Up @@ -134,14 +136,15 @@ public class BlockOutputStream extends OutputStream {
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public BlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
Expand All @@ -166,12 +169,12 @@ public BlockOutputStream(

//number of buffers used before doing a flush
refreshCurrentBuffer();
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / streamBufferArgs
.getStreamBufferSize());

Preconditions
.checkArgument(
(long) flushPeriod * config.getStreamBufferSize() == config
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
Expand All @@ -185,6 +188,7 @@ public BlockOutputStream(
config.getBytesPerChecksum());
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
this.streamBufferArgs = streamBufferArgs;
}

void refreshCurrentBuffer() {
Expand Down Expand Up @@ -321,7 +325,7 @@ private void updateFlushLength() {
}

private boolean isBufferPoolFull() {
return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
return bufferPool.computeBufferData() == streamBufferArgs.getStreamBufferMaxSize();
}

/**
Expand All @@ -339,7 +343,7 @@ public void writeOnRetry(long len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
}
Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
Preconditions.checkArgument(len <= streamBufferArgs.getStreamBufferMaxSize());
int count = 0;
while (len > 0) {
ChunkBuffer buffer = bufferPool.getBuffer(count);
Expand All @@ -355,13 +359,13 @@ public void writeOnRetry(long len) throws IOException {
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
updateFlushLength();
executePutBlock(false, false);
}
if (writtenDataLength == config.getStreamBufferMaxSize()) {
if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
handleFullBuffer();
}
}
Expand Down Expand Up @@ -518,9 +522,9 @@ void putFlushFuture(long flushPos,
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0
&& (!config.isStreamBufferFlushDelay() ||
&& (!streamBufferArgs.isStreamBufferFlushDelay() ||
writtenDataLength - totalDataFlushedLength
>= config.getStreamBufferSize())) {
>= streamBufferArgs.getStreamBufferSize())) {
handleFlush(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -66,17 +67,18 @@ public class ECBlockOutputStream extends BlockOutputStream {
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) throws IOException {
super(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -71,17 +72,18 @@ public class RatisBlockOutputStream extends BlockOutputStream
* @param blockID block ID
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics);
bufferPool, config, token, clientMetrics, streamBufferArgs);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

Expand Down
Loading

0 comments on commit 2336f22

Please sign in to comment.