Skip to content

Commit

Permalink
HDDS-11742. test SCMs leadership metric in case of simultaneous start…
Browse files Browse the repository at this point in the history
… in a separate test scenario
  • Loading branch information
Slava Tutrinov committed Nov 28, 2024
1 parent 44f211d commit e8f9afd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.TestMiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
Expand All @@ -44,9 +45,10 @@
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -55,7 +57,9 @@
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
Expand All @@ -73,6 +77,8 @@
@Timeout(300)
public class TestStorageContainerManagerHA {

private static final Logger LOG = LoggerFactory.getLogger(TestMiniOzoneCluster.class);

private MiniOzoneHAClusterImpl cluster = null;
private OzoneConfiguration conf;
private String omServiceId;
Expand All @@ -87,7 +93,6 @@ public class TestStorageContainerManagerHA {
*
* @throws IOException
*/
@BeforeEach
public void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
Expand Down Expand Up @@ -119,6 +124,7 @@ public void shutdown() {

@Test
void testAllSCMAreRunning() throws Exception {
init();
int count = 0;
List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
assertEquals(numOfSCMs, scms.size());
Expand Down Expand Up @@ -250,6 +256,7 @@ private boolean areAllScmInSync(long leaderIndex) {

@Test
public void testPrimordialSCM() throws Exception {
init();
StorageContainerManager scm1 = cluster.getStorageContainerManagers().get(0);
StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1);
OzoneConfiguration conf1 = scm1.getConfiguration();
Expand All @@ -268,6 +275,7 @@ public void testPrimordialSCM() throws Exception {

@Test
public void testBootStrapSCM() throws Exception {
init();
StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1);
OzoneConfiguration conf2 = scm2.getConfiguration();
boolean isDeleted = scm2.getScmStorageConfig().getVersionFile().delete();
Expand Down Expand Up @@ -327,4 +335,75 @@ private void waitForLeaderToBeReady()
}, 1000, (int) ScmConfigKeys
.OZONE_SCM_HA_RATIS_LEADER_READY_WAIT_TIMEOUT_DEFAULT);
}

@Test
public void testSCMLeadershipMetric() throws IOException, InterruptedException {
// GIVEN
int scmInstancesCount = 3;
conf = new OzoneConfiguration();
MiniOzoneCluster.Builder haMiniClusterBuilder = MiniOzoneCluster.newHABuilder(conf)
.setSCMServiceId("scm-service-id")
.setOMServiceId("om-service-id")
.setNumOfActiveOMs(0)
.setNumOfStorageContainerManagers(scmInstancesCount)
.setNumOfActiveSCMs(1)
.setNumDatanodes(0);

// start single SCM instance without other Ozone services
// in order to initialize and bootstrap SCM instances only
cluster = (MiniOzoneHAClusterImpl) haMiniClusterBuilder.build();

List<StorageContainerManager> storageContainerManagersList =
((MiniOzoneHAClusterImpl) cluster).getStorageContainerManagersList();

// stop the single SCM instance in order to imitate further simultaneous start of SCMs
storageContainerManagersList.get(0).stop();
storageContainerManagersList.get(0).join();

// WHEN (imitate simultaneous start of the SCMs)
CountDownLatch scmInstancesCounter = new CountDownLatch(scmInstancesCount);
int retryCount = 0;
while (true) {
AtomicInteger failedSCMs = new AtomicInteger();
for (StorageContainerManager scm : storageContainerManagersList) {
CountDownLatch finalScmInstancesCounter = scmInstancesCounter;
new Thread(() -> {
try {
scm.start();
} catch (IOException e) {
failedSCMs.incrementAndGet();
} finally {
finalScmInstancesCounter.countDown();
}
}).start();
}
scmInstancesCounter.await();
if (failedSCMs.get() == 0) {
break;
} else {
for (StorageContainerManager scm : storageContainerManagersList) {
scm.stop();
scm.join();
LOG.info("Stopping StorageContainerManager server at {}",
scm.getClientRpcAddress());
}
++retryCount;
LOG.info("SCMs port conflicts, retried {} times",
retryCount);
failedSCMs.set(0);
scmInstancesCounter = new CountDownLatch(scmInstancesCount);
}
}

// THEN expect at least one SCM node (leader) will have 'scmha_metrics_scmha_leader_state' metric set to 1
boolean leaderMetricDefined = false;
for (StorageContainerManager scm : storageContainerManagersList) {
if (scm.getScmHAMetrics() != null && scm.getScmHAMetrics().getSCMHAMetricsInfoLeaderState() == 1) {
leaderMetricDefined = true;
break;
}
}
assertTrue(leaderMetricDefined);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand Down Expand Up @@ -541,7 +540,7 @@ protected OMHAService createOMService() throws IOException,
}

/**
* Start SCM service with multiple SCMs.
* Start OM service with multiple OMs.
*/
protected SCMHAService createSCMService()
throws IOException, AuthenticationException {
Expand All @@ -554,7 +553,6 @@ protected SCMHAService createSCMService()

int retryCount = 0;

CountDownLatch scmStartCounter = new CountDownLatch(numOfSCMs);
while (true) {
try {
initSCMHAConfig();
Expand Down Expand Up @@ -586,29 +584,19 @@ protected SCMHAService createSCMService()
}
scmList.add(scm);

int scmCounter = i;
new Thread(() -> {
if (scmCounter <= numOfActiveSCMs) {
try {
scm.start();
} catch (IOException e) {
scmStartCounter.countDown();
throw new RuntimeException(e);
}
activeSCMs.add(scm);
LOG.info("Started SCM RPC server at {}",
scm.getClientRpcAddress());
} else {
inactiveSCMs.add(scm);
LOG.info("Initialized SCM at {}. This SCM is currently "
+ "inactive (not running).", scm.getClientRpcAddress());
}
scmStartCounter.countDown();
}).start();
if (i <= numOfActiveSCMs) {
scm.start();
activeSCMs.add(scm);
LOG.info("Started SCM RPC server at {}",
scm.getClientRpcAddress());
} else {
inactiveSCMs.add(scm);
LOG.info("Intialized SCM at {}. This SCM is currently "
+ "inactive (not running).", scm.getClientRpcAddress());
}
}
scmStartCounter.await();
break;
} catch (RuntimeException | InterruptedException e) {
} catch (BindException e) {
for (StorageContainerManager scm : scmList) {
scm.stop();
scm.join();
Expand Down

0 comments on commit e8f9afd

Please sign in to comment.