Skip to content

Commit

Permalink
Merge branch 'apache:master' into HDDS-9774
Browse files Browse the repository at this point in the history
  • Loading branch information
Tejaskriya authored Dec 5, 2023
2 parents 0817604 + 200b330 commit dd14631
Show file tree
Hide file tree
Showing 248 changed files with 6,871 additions and 4,689 deletions.
31 changes: 31 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,37 @@ jobs:
name: dependency
path: target/dependency
continue-on-error: true
license:
needs:
- build-info
- build
runs-on: ubuntu-20.04
timeout-minutes: 15
if: needs.build-info.outputs.needs-dependency-check == 'true'
steps:
- name: Checkout project
uses: actions/checkout@v3
- name: Download Ozone repo
id: download-ozone-repo
uses: actions/download-artifact@v3
with:
name: ozone-repo
path: |
~/.m2/repository/org/apache/ozone
- name: Execute tests
run: |
hadoop-ozone/dev-support/checks/${{ github.job }}.sh
continue-on-error: true
- name: Summary of failures
run: hadoop-ozone/dev-support/checks/_summary.sh target/${{ github.job }}/summary.txt
if: ${{ !cancelled() }}
- name: Archive build results
uses: actions/upload-artifact@v3
if: always()
with:
name: ${{ github.job }}
path: target/${{ github.job }}
continue-on-error: true
acceptance:
needs:
- build-info
Expand Down
2 changes: 1 addition & 1 deletion .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<extension>
<groupId>com.gradle</groupId>
<artifactId>gradle-enterprise-maven-extension</artifactId>
<version>1.19.2</version>
<version>1.19.3</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,23 +292,25 @@ public ContainerCommandResponseProto sendCommand(
Thread.currentThread().interrupt();
}
}
try {
for (Map.Entry<DatanodeDetails,
for (Map.Entry<DatanodeDetails,
CompletableFuture<ContainerCommandResponseProto> >
entry : futureHashMap.entrySet()) {
try {
responseProtoHashMap.put(entry.getKey(), entry.getValue().get());
}
} catch (InterruptedException e) {
LOG.error("Command execution was interrupted.");
// Re-interrupt the thread while catching InterruptedException
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
String message = "Failed to execute command {}.";
if (LOG.isDebugEnabled()) {
LOG.debug(message, processForDebug(request), e);
} else {
LOG.error(message + " Exception Class: {}, Exception Message: {}",
request.getCmdType(), e.getClass().getName(), e.getMessage());
} catch (InterruptedException e) {
LOG.error("Command execution was interrupted.");
// Re-interrupt the thread while catching InterruptedException
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
String message =
"Failed to execute command {} on datanode " + entry.getKey()
.getHostName();
if (LOG.isDebugEnabled()) {
LOG.debug(message, processForDebug(request), e);
} else {
LOG.error(message + " Exception Class: {}, Exception Message: {}",
request.getCmdType(), e.getClass().getName(), e.getMessage());
}
}
}
return responseProtoHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ExtendedDatanodeDetailsProto;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NodeImpl;

Expand Down Expand Up @@ -69,7 +70,7 @@ public class DatanodeDetails extends NodeImpl implements
LoggerFactory.getLogger(DatanodeDetails.class);

private static final Codec<DatanodeDetails> CODEC = new DelegatedCodec<>(
Proto2Codec.get(HddsProtos.ExtendedDatanodeDetailsProto.class),
Proto2Codec.get(ExtendedDatanodeDetailsProto.getDefaultInstance()),
DatanodeDetails::getFromProtoBuf,
DatanodeDetails::getExtendedProtoBufMessage);

Expand Down Expand Up @@ -392,7 +393,7 @@ public static DatanodeDetails getFromProtoBuf(
* @return DatanodeDetails
*/
public static DatanodeDetails getFromProtoBuf(
HddsProtos.ExtendedDatanodeDetailsProto extendedDetailsProto) {
ExtendedDatanodeDetailsProto extendedDetailsProto) {
DatanodeDetails.Builder builder;
if (extendedDetailsProto.hasDatanodeDetails()) {
builder = newBuilder(extendedDetailsProto.getDatanodeDetails());
Expand Down Expand Up @@ -480,12 +481,12 @@ public HddsProtos.DatanodeDetailsProto.Builder toProtoBuilder(

/**
* Returns a ExtendedDatanodeDetails protobuf message from a datanode ID.
* @return HddsProtos.ExtendedDatanodeDetailsProto
* @return ExtendedDatanodeDetailsProto
*/
@JsonIgnore
public HddsProtos.ExtendedDatanodeDetailsProto getExtendedProtoBufMessage() {
HddsProtos.ExtendedDatanodeDetailsProto.Builder extendedBuilder =
HddsProtos.ExtendedDatanodeDetailsProto.newBuilder()
public ExtendedDatanodeDetailsProto getExtendedProtoBufMessage() {
final ExtendedDatanodeDetailsProto.Builder extendedBuilder
= ExtendedDatanodeDetailsProto.newBuilder()
.setDatanodeDetails(getProtoBufMessage());

if (!Strings.isNullOrEmpty(getVersion())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ public final class ScmConfigKeys {
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size";
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
"1MB";
"64MB";
public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
"dfs.container.ratis.segment.preallocated.size";
public static final String
DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = "16KB";
DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = "4MB";
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT =
"dfs.container.ratis.statemachinedata.sync.timeout";
Expand All @@ -80,8 +80,6 @@ public final class ScmConfigKeys {
public static final String
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES =
"dfs.container.ratis.statemachinedata.sync.retries";
public static final int
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES_DEFAULT = -1;
public static final String
DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS =
"dfs.container.ratis.statemachine.max.pending.apply-transactions";
Expand Down Expand Up @@ -416,8 +414,12 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT =
"ozone.scm.pipeline.destroy.timeout";

// We wait for 150s before closing containers
// OzoneConfigKeys#OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION.
// So, we are waiting for another 150s before deleting the pipeline
// (150 + 150) = 300s
public static final String OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT =
"66s";
"300s";

public static final String OZONE_SCM_PIPELINE_CREATION_INTERVAL =
"ozone.scm.pipeline.creation.interval";
Expand All @@ -427,7 +429,7 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL =
"ozone.scm.pipeline.scrub.interval";
public static final String OZONE_SCM_PIPELINE_SCRUB_INTERVAL_DEFAULT =
"5m";
"150s";


// Allow SCM to auto create factor ONE ratis pipeline.
Expand Down Expand Up @@ -475,6 +477,10 @@ public final class ScmConfigKeys {
"ozone.scm.datanode.admin.monitor.interval";
public static final String OZONE_SCM_DATANODE_ADMIN_MONITOR_INTERVAL_DEFAULT =
"30s";
public static final String OZONE_SCM_DATANODE_ADMIN_MONITOR_LOGGING_LIMIT =
"ozone.scm.datanode.admin.monitor.logging.limit";
public static final int
OZONE_SCM_DATANODE_ADMIN_MONITOR_LOGGING_LIMIT_DEFAULT = 1000;

public static final String OZONE_SCM_INFO_WAIT_DURATION =
"ozone.scm.info.wait.duration";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,15 @@ List<DeletedBlocksTransactionInfo> getFailedDeletedBlockTxn(int count,
int resetDeletedBlockRetryCount(List<Long> txIDs) throws IOException;

/**
* Get usage information of datanode by ipaddress or uuid.
* Get usage information of datanode by address or uuid.
*
* @param ipaddress datanode ipaddress String
* @param address datanode address String
* @param uuid datanode uuid String
* @return List of DatanodeUsageInfoProto. Each element contains info such as
* capacity, SCMused, and remaining space.
* @throws IOException
*/
List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(String ipaddress,
List<HddsProtos.DatanodeUsageInfoProto> getDatanodeUsageInfo(String address,
String uuid)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class ContainerInfo implements Comparable<ContainerInfo> {
= Comparator.comparingLong(info -> info.getLastUsed().toEpochMilli());

private static final Codec<ContainerInfo> CODEC = new DelegatedCodec<>(
Proto2Codec.get(HddsProtos.ContainerInfoProto.class),
Proto2Codec.get(HddsProtos.ContainerInfoProto.getDefaultInstance()),
ContainerInfo::fromProtobuf,
ContainerInfo::getProtobuf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,6 @@ Node getNode(int leafIndex, String scope, List<String> excludedScopes,
* or shuffled input nodes otherwise. The size of returned list is limited
* by activeLen parameter.
*/
List<? extends Node> sortByDistanceCost(Node reader,
List<? extends Node> nodes, int activeLen);
<N extends Node> List<N> sortByDistanceCost(Node reader,
List<N> nodes, int activeLen);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class NetworkTopologyImpl implements NetworkTopology {
/** The algorithm to randomize nodes with equal distances. */
private final Consumer<List<? extends Node>> shuffleOperation;
/** Lock to coordinate cluster tree access. */
private ReadWriteLock netlock = new ReentrantReadWriteLock(true);
private final ReadWriteLock netlock = new ReentrantReadWriteLock(true);

public NetworkTopologyImpl(ConfigurationSource conf) {
schemaManager = NodeSchemaManager.getInstance();
Expand Down Expand Up @@ -136,7 +137,7 @@ public void add(Node node) {
@Override
public void update(Node oldNode, Node newNode) {
Preconditions.checkArgument(newNode != null, "newNode cannot be null");
if (oldNode != null && oldNode instanceof InnerNode) {
if (oldNode instanceof InnerNode) {
throw new IllegalArgumentException(
"Not allowed to update an inner node: "
+ oldNode.getNetworkFullPath());
Expand Down Expand Up @@ -225,10 +226,7 @@ private boolean containsNode(Node node) {
while (parent != null && parent != clusterTree) {
parent = parent.getParent();
}
if (parent == clusterTree) {
return true;
}
return false;
return parent == clusterTree;
}

/**
Expand Down Expand Up @@ -384,7 +382,7 @@ public Node chooseRandom(String scope) {
scope = ROOT;
}
if (scope.startsWith(SCOPE_REVERSE_STR)) {
ArrayList<String> excludedScopes = new ArrayList();
ArrayList<String> excludedScopes = new ArrayList<>();
excludedScopes.add(scope.substring(1));
return chooseRandom(ROOT, excludedScopes, null, null,
ANCESTOR_GENERATION_DEFAULT);
Expand Down Expand Up @@ -425,7 +423,7 @@ public Node chooseRandom(String scope, Collection<Node> excludedNodes) {
scope = ROOT;
}
if (scope.startsWith(SCOPE_REVERSE_STR)) {
ArrayList<String> excludedScopes = new ArrayList();
ArrayList<String> excludedScopes = new ArrayList<>();
excludedScopes.add(scope.substring(1));
return chooseRandom(ROOT, excludedScopes, excludedNodes, null,
ANCESTOR_GENERATION_DEFAULT);
Expand Down Expand Up @@ -460,7 +458,7 @@ public Node chooseRandom(String scope, Collection<Node> excludedNodes,
scope = ROOT;
}
if (scope.startsWith(SCOPE_REVERSE_STR)) {
ArrayList<String> excludedScopes = new ArrayList();
ArrayList<String> excludedScopes = new ArrayList<>();
excludedScopes.add(scope.substring(1));
return chooseRandom(ROOT, excludedScopes, excludedNodes, null,
ancestorGen);
Expand Down Expand Up @@ -771,11 +769,11 @@ public int getDistanceCost(Node node1, Node node2) {
* by activeLen parameter.
*/
@Override
public List<? extends Node> sortByDistanceCost(Node reader,
List<? extends Node> nodes, int activeLen) {
public <N extends Node> List<N> sortByDistanceCost(Node reader,
List<N> nodes, int activeLen) {
// shuffle input list of nodes if reader is not defined
if (reader == null) {
List<? extends Node> shuffledNodes =
List<N> shuffledNodes =
new ArrayList<>(nodes.subList(0, activeLen));
shuffleOperation.accept(shuffledNodes);
return shuffledNodes;
Expand All @@ -786,25 +784,19 @@ public List<? extends Node> sortByDistanceCost(Node reader,
costs[i] = getDistanceCost(reader, nodes.get(i));
}
// Add cost/node pairs to a TreeMap to sort
TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
NavigableMap<Integer, List<N>> tree = new TreeMap<>();
for (int i = 0; i < activeLen; i++) {
int cost = costs[i];
Node node = nodes.get(i);
List<Node> list = tree.get(cost);
if (list == null) {
list = Lists.newArrayListWithExpectedSize(1);
tree.put(cost, list);
}
list.add(node);
N node = nodes.get(i);
tree.computeIfAbsent(cost, k -> Lists.newArrayListWithExpectedSize(1))
.add(node);
}

List<Node> ret = new ArrayList<>();
for (List<Node> list: tree.values()) {
List<N> ret = new ArrayList<>();
for (List<N> list : tree.values()) {
if (list != null) {
shuffleOperation.accept(list);
for (Node n: list) {
ret.add(n);
}
ret.addAll(list);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public final class Pipeline {
* -- the creation time may change.
*/
private static final Codec<Pipeline> CODEC = new DelegatedCodec<>(
Proto2Codec.get(HddsProtos.Pipeline.class),
Proto2Codec.get(HddsProtos.Pipeline.getDefaultInstance()),
Pipeline::getFromProtobufSetCreationTimestamp,
p -> p.getProtobufMessage(ClientVersion.CURRENT_VERSION),
DelegatedCodec.CopyType.UNSUPPORTED);
Expand All @@ -87,10 +87,21 @@ public static Codec<Pipeline> getCodec() {
// suggested leader id with high priority
private final UUID suggestedLeaderId;

private final Instant stateEnterTime;

/**
* The immutable properties of pipeline object is used in
* ContainerStateManager#getMatchingContainerByPipeline to take a lock on
* the container allocations for a particular pipeline.
* <br><br>
* Since the Pipeline class is immutable, if we want to change the state of
* the Pipeline we should create a new Pipeline object with the new state.
* Make sure that you set the value of <i>creationTimestamp</i> properly while
* creating the new Pipeline object.
* <br><br>
* There is no need to worry about the value of <i>stateEnterTime</i> as it's
* set to <i>Instant.now</i> when you crate the Pipeline object as part of
* state change.
*/
private Pipeline(PipelineID id,
ReplicationConfig replicationConfig, PipelineState state,
Expand All @@ -102,6 +113,7 @@ private Pipeline(PipelineID id,
this.creationTimestamp = Instant.now();
this.suggestedLeaderId = suggestedLeaderId;
this.replicaIndexes = new HashMap<>();
this.stateEnterTime = Instant.now();
}

/**
Expand Down Expand Up @@ -140,6 +152,10 @@ public Instant getCreationTimestamp() {
return creationTimestamp;
}

public Instant getStateEnterTime() {
return stateEnterTime;
}

/**
* Return the suggested leaderId which has a high priority among DNs of the
* pipeline.
Expand Down
Loading

0 comments on commit dd14631

Please sign in to comment.