Skip to content

Commit

Permalink
HDDS-11742. scm ratis server - impl the method to retrieve leaderId
Browse files Browse the repository at this point in the history
  • Loading branch information
Slava Tutrinov committed Nov 28, 2024
1 parent d4edc6f commit 75fb348
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -170,6 +171,8 @@ private class RatisServerStub implements SCMRatisServer {
private Map<RequestType, Object> handlers =
new EnumMap<>(RequestType.class);

private RaftPeerId leaderId = RaftPeerId.valueOf(UUID.randomUUID().toString());

@Override
public void start() {
}
Expand Down Expand Up @@ -283,5 +286,10 @@ public boolean removeSCM(RemoveSCMRequest request) throws IOException {
public GrpcTlsConfig getGrpcTlsConfig() {
return null;
}

@Override
public RaftPeerId getLeaderId() {
return leaderId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;

Expand Down Expand Up @@ -68,4 +69,6 @@ SCMRatisResponse submitRequest(SCMRatisRequest request)

GrpcTlsConfig getGrpcTlsConfig();

RaftPeerId getLeaderId();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import jakarta.annotation.Nullable;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -147,6 +148,15 @@ public GrpcTlsConfig getGrpcTlsConfig() {
return grpcTlsConfig;
}

@Override
@Nullable
public RaftPeerId getLeaderId() {
if (getLeader() != null) {
return getLeader().getId();
}
return null;
}

private static void waitForLeaderToBeReady(RaftServer server,
OzoneConfiguration conf, RaftGroup group) throws IOException {
boolean ready;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1595,19 +1595,9 @@ public void start() throws IOException {

setStartTime();

AtomicBoolean leaderIsDefined = new AtomicBoolean(false);
if (SCMHAUtils.isSCMHAEnabled(configuration)) {
getScmHAManager().getRatisServer().getRatisRoles().stream()
.filter(peerState -> peerState.contains("LEADER"))
.findFirst()
.ifPresent(peerState -> {
leaderIsDefined.set(true);
String[] peerInfo = peerState.split(":");
scmHAMetricsUpdate(peerInfo[3]);
});
}

if (!leaderIsDefined.get()) {
if (SCMHAUtils.isSCMHAEnabled(configuration) && getScmHAManager().getRatisServer().getLeaderId() != null) {
scmHAMetricsUpdate(getScmHAManager().getRatisServer().getLeaderId().toString());
} else {
// At this point leader is not known
scmHAMetricsUpdate(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -31,6 +32,7 @@
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -111,6 +113,11 @@ public SCMStateMachine getSCMStateMachine() {
public GrpcTlsConfig getGrpcTlsConfig() {
return null;
}

@Override
public RaftPeerId getLeaderId() {
return RaftPeerId.valueOf(UUID.randomUUID().toString());
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.junit.jupiter.api.Test;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;

import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
* Test for SCM Ratis Server Implementation.
*/
public class TestSCMRatisServerImpl {

@Test
public void testGetLeaderId() throws Exception {

try (
MockedConstruction<SecurityConfig> mockedSecurityConfigConstruction = mockConstruction(SecurityConfig.class);
MockedStatic<RaftServer> staticMockedRaftServer = mockStatic(RaftServer.class);
MockedStatic<RatisUtil> staticMockedRatisUtil = mockStatic(RatisUtil.class);
) {
// given
ConfigurationSource conf = mock(ConfigurationSource.class);
StorageContainerManager scm = mock(StorageContainerManager.class);
String clusterId = "CID-" + UUID.randomUUID();
when(scm.getClusterId()).thenReturn(clusterId);
SCMHADBTransactionBuffer dbTransactionBuffer = mock(SCMHADBTransactionBuffer.class);

RaftServer.Builder raftServerBuilder = mock(RaftServer.Builder.class);
when(raftServerBuilder.setServerId(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setProperties(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setStateMachineRegistry(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setOption(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setGroup(any())).thenReturn(raftServerBuilder);
when(raftServerBuilder.setParameters(any())).thenReturn(raftServerBuilder);

RaftServer raftServer = mock(RaftServer.class);

RaftServer.Division division = mock(RaftServer.Division.class);
when(raftServer.getDivision(any())).thenReturn(division);

SCMStateMachine scmStateMachine = mock(SCMStateMachine.class);
when(division.getStateMachine()).thenReturn(scmStateMachine);

when(raftServerBuilder.build()).thenReturn(raftServer);

staticMockedRaftServer.when(RaftServer::newBuilder).thenReturn(raftServerBuilder);

RaftProperties raftProperties = mock(RaftProperties.class);
staticMockedRatisUtil.when(() -> RatisUtil.newRaftProperties(conf)).thenReturn(raftProperties);

SecurityConfig sc = new SecurityConfig(conf);
when(sc.isSecurityEnabled()).thenReturn(false);

SCMRatisServerImpl scmRatisServer = spy(new SCMRatisServerImpl(conf, scm, dbTransactionBuffer));
doReturn(RaftPeer.newBuilder().setId(RaftPeerId.valueOf("peer1")).build()).when(scmRatisServer).getLeader();

// when
RaftPeerId leaderId = scmRatisServer.getLeaderId();

// then
assertEquals(RaftPeerId.valueOf("peer1"), leaderId);

// but when
doReturn(null).when(scmRatisServer).getLeader();
leaderId = scmRatisServer.getLeaderId();

// then
assertNull(leaderId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.ha.SCMHAMetrics;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
Expand Down Expand Up @@ -129,6 +130,9 @@ void testAllSCMAreRunning() throws Exception {
count++;
leaderScm = scm;
}
if (SCMHAUtils.isSCMHAEnabled(conf)) {
assertNotNull(scm.getScmHAManager().getRatisServer().getLeaderId());
}
assertEquals(peerSize, numOfSCMs);
}
assertEquals(1, count);
Expand Down

0 comments on commit 75fb348

Please sign in to comment.