From ce7cc382912cfd90fb294aa1693171d014b1270c Mon Sep 17 00:00:00 2001 From: Sanketh Nalli Date: Sat, 4 Nov 2023 19:12:16 -0700 Subject: [PATCH] rm shutdown-check as main loop --- .../com/github/ambry/cloud/CloudStorageCompactor.java | 8 +++++--- .../com/github/ambry/cloud/CloudStorageCompactorTest.java | 1 - .../test/java/com/github/ambry/cloud/VcrServerTest.java | 2 ++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageCompactor.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageCompactor.java index aa9d3eeaf7..b9c806a1a6 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageCompactor.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageCompactor.java @@ -18,7 +18,6 @@ import com.github.ambry.utils.Utils; import java.util.HashMap; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -63,10 +62,10 @@ public void run() { logger.info("[COMPACT] Thread info = {}", Thread.currentThread()); // Main compaction loop - while (!isShutDown()) { + while (true) { logger.info("[COMPACT] Starting cloud compaction"); compactPartitions(); - // This check prevents us from falling asleep. The loop condition prevents us from submitting more jobs. + // This shutdown-check prevents us from falling asleep. if (isShutDown()) { logger.info("[COMPACT] Breaking loop because compaction executor is shutdown"); break; @@ -148,7 +147,10 @@ public int compactPartitions() { // Wait for completion or shutdown, don't use any timeouts for (String partitionIdStr : compactionTasks.keySet()) { + // Although this is a finite loop, the future.get() is a blocking one, and we can't predict how long that'll be. + // This shutdown-check short circuits the loop. if (executorService.isShutdown()) { + logger.info("[COMPACT] Skipping Future.get because of shutdown"); break; } try { diff --git a/ambry-cloud/src/test/java/com/github/ambry/cloud/CloudStorageCompactorTest.java b/ambry-cloud/src/test/java/com/github/ambry/cloud/CloudStorageCompactorTest.java index e7362e6090..9470d87771 100644 --- a/ambry-cloud/src/test/java/com/github/ambry/cloud/CloudStorageCompactorTest.java +++ b/ambry-cloud/src/test/java/com/github/ambry/cloud/CloudStorageCompactorTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import org.slf4j.Logger; diff --git a/ambry-cloud/src/test/java/com/github/ambry/cloud/VcrServerTest.java b/ambry-cloud/src/test/java/com/github/ambry/cloud/VcrServerTest.java index 70095a2197..b83c16877e 100644 --- a/ambry-cloud/src/test/java/com/github/ambry/cloud/VcrServerTest.java +++ b/ambry-cloud/src/test/java/com/github/ambry/cloud/VcrServerTest.java @@ -69,6 +69,7 @@ public void testVCRServerWithStaticCluster() throws Exception { VerifiableProperties verifiableProperties = getStaticClusterVcrProps(); VcrServer vcrServer = new VcrServer(verifiableProperties, mockClusterAgentsFactory, notificationSystem, null); vcrServer.startup(); + Assert.assertNull("Expected null compactor", vcrServer.getVcrReplicationManager().getCloudStorageCompactor()); vcrServer.shutdown(); } @@ -116,6 +117,7 @@ public void testVCRServerWithHelixCluster() throws Exception { new VcrServer(verifiableProperties, mockClusterAgentsFactory, notificationSystem, cloudDestinationFactory, null); vcrServer.startup(); + Assert.assertNotNull("Expected compactor", vcrServer.getVcrReplicationManager().getCloudStorageCompactor()); vcrServer.shutdown(); helixControllerManager.syncStop(); zkInfo.shutdown();