diff --git a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java new file mode 100644 index 0000000000..df0e815ebc --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerExtension.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.log; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import javax.annotation.concurrent.GuardedBy; + +import org.apache.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.Logger; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class TestLoggerExtension implements BeforeEachCallback, AfterEachCallback { + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(TestLoggerExtension.class); + private static final String LOG_COLLECTOR_KEY = "TestLogAppender"; + private TestAppender appender; + + @Override + public void beforeEach(ExtensionContext context) { + appender = new TestAppender(); + if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) { + org.apache.logging.log4j.core.Logger log4jlogger = + (org.apache.logging.log4j.core.Logger) + org.apache.logging.log4j.LogManager.getLogger(LogManager.ROOT_LOGGER_NAME); + appender.start(); + log4jlogger.addAppender(appender); + } + context.getStore(NAMESPACE).put(LOG_COLLECTOR_KEY, this); + } + + @Override + public void afterEach(ExtensionContext context) { + if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) { + Logger log4jlogger = (Logger) LogManager.getLogger(LogManager.ROOT_LOGGER_NAME); + appender.stop(); + log4jlogger.removeAppender(appender); + } + } + + public static TestLoggerExtension getTestLogger(ExtensionContext context) { + return context.getStore(NAMESPACE).get(LOG_COLLECTOR_KEY, TestLoggerExtension.class); + } + + /** + * Determine if a specific pattern appears in log output. + * + * @param pattern a pattern text to search for in log events + * @return true if a log message containing the pattern exists, false otherwise + */ + public boolean wasLogged(String pattern) { + return appender.wasLogged(Pattern.compile(".*" + pattern + ".*")); + } + + /** + * Determine if a specific pattern appears in log output with the specified level. + * + * @param pattern a pattern text to search for in log events + * @return true if a log message containing the pattern exists, false otherwise + */ + public boolean wasLoggedWithLevel(String pattern, Level level) { + return appender.wasLoggedWithLevel(Pattern.compile(".*" + pattern + ".*"), level); + } + + /** + * Count the number of times a specific pattern appears in log messages. + * + * @param pattern Pattern to search for in log events + * @return The number of log messages which match the pattern + */ + public int logCount(String pattern) { + // [\s\S] will match all character include line break + return appender.logCount(Pattern.compile("[\\s\\S]*" + pattern + "[\\s\\S]*")); + } + + public class TestAppender extends AbstractAppender { + @GuardedBy("this") private final List events = new ArrayList<>(); + + protected TestAppender() { + super("", null, null, false, null); + } + + /** Determines whether a message with the given pattern was logged. */ + public synchronized boolean wasLogged(Pattern pattern) { + for (LogEvent e : events) { + if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) { + return true; + } + } + return false; + } + + /** Determines whether a message with the given pattern was logged. */ + public synchronized boolean wasLoggedWithLevel(Pattern pattern, Level level) { + for (LogEvent e : events) { + if (e.getLevel().equals(level) + && pattern.matcher(e.getMessage().getFormattedMessage()).matches()) { + return true; + } + } + return false; + } + + /** Counts the number of log message with a given pattern. */ + public synchronized int logCount(Pattern pattern) { + int logCount = 0; + for (LogEvent e : events) { + if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) { + logCount++; + } + } + return logCount; + } + + @Override + public void append(LogEvent event) { + events.add(event); + } + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java new file mode 100644 index 0000000000..10fba0021a --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/log/TestLoggerParamResolver.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.log; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class TestLoggerParamResolver implements ParameterResolver { + @Override + public boolean supportsParameter( + final ParameterContext parameterContext, final ExtensionContext extensionContext) + throws ParameterResolutionException { + return ExtensionContext.class.isAssignableFrom(parameterContext.getParameter().getType()) + && parameterContext.getIndex() == 0; + } + + @Override + public Object resolveParameter( + final ParameterContext parameterContext, final ExtensionContext extensionContext) + throws ParameterResolutionException { + return extensionContext; + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 226682e638..b482584236 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -832,8 +832,8 @@ public void removeResourcesByShuffleIds(String appId, List shuffleIds) public void checkLeakShuffleData() { LOG.info("Start check leak shuffle data"); try { - Set appIds = Sets.newHashSet(shuffleTaskInfos.keySet()); - storageManager.checkAndClearLeakedShuffleData(appIds); + storageManager.checkAndClearLeakedShuffleData( + () -> Sets.newHashSet(shuffleTaskInfos.keySet())); LOG.info("Finish check leak shuffle data"); } catch (Exception e) { LOG.warn("Error happened in checkLeakShuffleData", e); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index 33d9b820bc..99c055e5fa 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; @@ -181,7 +182,7 @@ public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageI } @Override - public void checkAndClearLeakedShuffleData(Collection appIds) {} + public void checkAndClearLeakedShuffleData(Supplier> appIdsSupplier) {} @Override public Map getStorageInfo() { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java index c1169f42ff..05b83e558f 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java @@ -20,6 +20,7 @@ import java.lang.reflect.Constructor; import java.util.Collection; import java.util.Map; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,8 +146,8 @@ public boolean canWrite(ShuffleDataFlushEvent event) { } @Override - public void checkAndClearLeakedShuffleData(Collection appIds) { - warmStorageManager.checkAndClearLeakedShuffleData(appIds); + public void checkAndClearLeakedShuffleData(Supplier> appIdsSupplier) { + warmStorageManager.checkAndClearLeakedShuffleData(appIdsSupplier); } @Override diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 10f831ad16..6be2af7406 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -375,7 +376,7 @@ public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageI } @Override - public void checkAndClearLeakedShuffleData(Collection appIds) { + public void checkAndClearLeakedShuffleData(Supplier> appIdsSupplier) { Set appIdsOnStorages = new HashSet<>(); for (LocalStorage localStorage : localStorages) { if (!localStorage.isCorrupted()) { @@ -384,6 +385,7 @@ public void checkAndClearLeakedShuffleData(Collection appIds) { } } + Collection appIds = appIdsSupplier.get(); for (String appId : appIdsOnStorages) { if (!appIds.contains(appId)) { ShuffleDeleteHandler deleteHandler = diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java index 402edc2cd0..70425a22d8 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Map; +import java.util.function.Supplier; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.storage.StorageInfo; @@ -55,7 +56,7 @@ public interface StorageManager { // todo: add an interface that check storage isHealthy - void checkAndClearLeakedShuffleData(Collection appIds); + void checkAndClearLeakedShuffleData(Supplier> appIdsSupplier); /** * Report a map of storage mount point -> storage info mapping. For local storages, the mount diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java index 5584e2609f..63c5c12dda 100644 --- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java @@ -24,6 +24,7 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,29 +33,40 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; import org.apache.uniffle.common.ShufflePartitionedBlock; +import org.apache.uniffle.common.log.TestLoggerExtension; +import org.apache.uniffle.common.log.TestLoggerParamResolver; import org.apache.uniffle.common.storage.StorageInfo; import org.apache.uniffle.common.storage.StorageMedia; import org.apache.uniffle.common.storage.StorageStatus; +import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleDataReadEvent; import org.apache.uniffle.server.ShuffleServerConf; import org.apache.uniffle.server.ShuffleServerMetrics; +import org.apache.uniffle.server.ShuffleTaskInfo; import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariables; /** The class is to test the {@link LocalStorageManager} */ +@ExtendWith(TestLoggerExtension.class) +@ExtendWith(TestLoggerParamResolver.class) public class LocalStorageManagerTest { @BeforeAll @@ -332,4 +344,38 @@ public void testEnvStorageTypeProvider() throws Exception { assertEquals(StorageMedia.SSD, storageInfo.get(mountPoint).getType()); }); } + + @Test + public void testNewAppWhileCheckLeak(ExtensionContext context) { + String[] storagePaths = {"/tmp/rss-data1"}; + + ShuffleServerConf conf = new ShuffleServerConf(); + conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths)); + conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); + conf.setString( + ShuffleServerConf.RSS_STORAGE_TYPE.key(), + org.apache.uniffle.storage.util.StorageType.LOCALFILE.name()); + LocalStorageManager localStorageManager = new LocalStorageManager(conf); + + List storages = localStorageManager.getStorages(); + assertNotNull(storages); + + // test normal case + Map shuffleTaskInfos = JavaUtils.newConcurrentMap(); + shuffleTaskInfos.put("app0", new ShuffleTaskInfo("app0")); + shuffleTaskInfos.put("app1", new ShuffleTaskInfo("app1")); + shuffleTaskInfos.put("app2", new ShuffleTaskInfo("app2")); + localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet); + TestLoggerExtension testLogger = TestLoggerExtension.getTestLogger(context); + assertFalse(testLogger.wasLogged("app")); + + // test race condition case, app 3 is new app + shuffleTaskInfos.put("3", new ShuffleTaskInfo("app3")); + LocalStorage mockLocalStorage = mock(LocalStorage.class); + when(mockLocalStorage.getAppIds()).thenReturn(Collections.singleton("app3")); + storages.add(mockLocalStorage); + localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet); + assertTrue(testLogger.wasLogged("Delete shuffle data for appId\\[app3\\]")); + System.out.println(); + } }