Skip to content

Commit

Permalink
[#1972] fix(server): Fix clear leak shuffle data accidentally remove …
Browse files Browse the repository at this point in the history
…data of new coming appId issue (#1971)

### What changes were proposed in this pull request?

Fix clear leak shuffle data accidentally remove data of new coming appId issue

### Why are the changes needed?

Fix: #1972

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No need.
  • Loading branch information
maobaolong authored Aug 29, 2024
1 parent e62fe7c commit 0f9d9bc
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.log;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;

import org.apache.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

public class TestLoggerExtension implements BeforeEachCallback, AfterEachCallback {
private static final ExtensionContext.Namespace NAMESPACE =
ExtensionContext.Namespace.create(TestLoggerExtension.class);
private static final String LOG_COLLECTOR_KEY = "TestLogAppender";
private TestAppender appender;

@Override
public void beforeEach(ExtensionContext context) {
appender = new TestAppender();
if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) {
org.apache.logging.log4j.core.Logger log4jlogger =
(org.apache.logging.log4j.core.Logger)
org.apache.logging.log4j.LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
appender.start();
log4jlogger.addAppender(appender);
}
context.getStore(NAMESPACE).put(LOG_COLLECTOR_KEY, this);
}

@Override
public void afterEach(ExtensionContext context) {
if (LogManager.getLogger(LogManager.ROOT_LOGGER_NAME) instanceof Logger) {
Logger log4jlogger = (Logger) LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
appender.stop();
log4jlogger.removeAppender(appender);
}
}

public static TestLoggerExtension getTestLogger(ExtensionContext context) {
return context.getStore(NAMESPACE).get(LOG_COLLECTOR_KEY, TestLoggerExtension.class);
}

/**
* Determine if a specific pattern appears in log output.
*
* @param pattern a pattern text to search for in log events
* @return true if a log message containing the pattern exists, false otherwise
*/
public boolean wasLogged(String pattern) {
return appender.wasLogged(Pattern.compile(".*" + pattern + ".*"));
}

/**
* Determine if a specific pattern appears in log output with the specified level.
*
* @param pattern a pattern text to search for in log events
* @return true if a log message containing the pattern exists, false otherwise
*/
public boolean wasLoggedWithLevel(String pattern, Level level) {
return appender.wasLoggedWithLevel(Pattern.compile(".*" + pattern + ".*"), level);
}

/**
* Count the number of times a specific pattern appears in log messages.
*
* @param pattern Pattern to search for in log events
* @return The number of log messages which match the pattern
*/
public int logCount(String pattern) {
// [\s\S] will match all character include line break
return appender.logCount(Pattern.compile("[\\s\\S]*" + pattern + "[\\s\\S]*"));
}

public class TestAppender extends AbstractAppender {
@GuardedBy("this") private final List<LogEvent> events = new ArrayList<>();

protected TestAppender() {
super("", null, null, false, null);
}

/** Determines whether a message with the given pattern was logged. */
public synchronized boolean wasLogged(Pattern pattern) {
for (LogEvent e : events) {
if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
return true;
}
}
return false;
}

/** Determines whether a message with the given pattern was logged. */
public synchronized boolean wasLoggedWithLevel(Pattern pattern, Level level) {
for (LogEvent e : events) {
if (e.getLevel().equals(level)
&& pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
return true;
}
}
return false;
}

/** Counts the number of log message with a given pattern. */
public synchronized int logCount(Pattern pattern) {
int logCount = 0;
for (LogEvent e : events) {
if (pattern.matcher(e.getMessage().getFormattedMessage()).matches()) {
logCount++;
}
}
return logCount;
}

@Override
public void append(LogEvent event) {
events.add(event);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.log;

import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;

public class TestLoggerParamResolver implements ParameterResolver {
@Override
public boolean supportsParameter(
final ParameterContext parameterContext, final ExtensionContext extensionContext)
throws ParameterResolutionException {
return ExtensionContext.class.isAssignableFrom(parameterContext.getParameter().getType())
&& parameterContext.getIndex() == 0;
}

@Override
public Object resolveParameter(
final ParameterContext parameterContext, final ExtensionContext extensionContext)
throws ParameterResolutionException {
return extensionContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,8 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds)
public void checkLeakShuffleData() {
LOG.info("Start check leak shuffle data");
try {
Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
storageManager.checkAndClearLeakedShuffleData(appIds);
storageManager.checkAndClearLeakedShuffleData(
() -> Sets.newHashSet(shuffleTaskInfos.keySet()));
LOG.info("Finish check leak shuffle data");
} catch (Exception e) {
LOG.warn("Error happened in checkLeakShuffleData", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -181,7 +182,7 @@ public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageI
}

@Override
public void checkAndClearLeakedShuffleData(Collection<String> appIds) {}
public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier) {}

@Override
public Map<String, StorageInfo> getStorageInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -145,8 +146,8 @@ public boolean canWrite(ShuffleDataFlushEvent event) {
}

@Override
public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
warmStorageManager.checkAndClearLeakedShuffleData(appIds);
public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier) {
warmStorageManager.checkAndClearLeakedShuffleData(appIdsSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -375,7 +376,7 @@ public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageI
}

@Override
public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
public void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier) {
Set<String> appIdsOnStorages = new HashSet<>();
for (LocalStorage localStorage : localStorages) {
if (!localStorage.isCorrupted()) {
Expand All @@ -384,6 +385,7 @@ public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
}
}

Collection<String> appIds = appIdsSupplier.get();
for (String appId : appIdsOnStorages) {
if (!appIds.contains(appId)) {
ShuffleDeleteHandler deleteHandler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;

import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.storage.StorageInfo;
Expand Down Expand Up @@ -55,7 +56,7 @@ public interface StorageManager {

// todo: add an interface that check storage isHealthy

void checkAndClearLeakedShuffleData(Collection<String> appIds);
void checkAndClearLeakedShuffleData(Supplier<Collection<String>> appIdsSupplier);

/**
* Report a map of storage mount point -> storage info mapping. For local storages, the mount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -32,29 +33,40 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;

import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.log.TestLoggerExtension;
import org.apache.uniffle.common.log.TestLoggerParamResolver;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskInfo;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariables;

/** The class is to test the {@link LocalStorageManager} */
@ExtendWith(TestLoggerExtension.class)
@ExtendWith(TestLoggerParamResolver.class)
public class LocalStorageManagerTest {

@BeforeAll
Expand Down Expand Up @@ -332,4 +344,38 @@ public void testEnvStorageTypeProvider() throws Exception {
assertEquals(StorageMedia.SSD, storageInfo.get(mountPoint).getType());
});
}

@Test
public void testNewAppWhileCheckLeak(ExtensionContext context) {
String[] storagePaths = {"/tmp/rss-data1"};

ShuffleServerConf conf = new ShuffleServerConf();
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
org.apache.uniffle.storage.util.StorageType.LOCALFILE.name());
LocalStorageManager localStorageManager = new LocalStorageManager(conf);

List<LocalStorage> storages = localStorageManager.getStorages();
assertNotNull(storages);

// test normal case
Map<String, ShuffleTaskInfo> shuffleTaskInfos = JavaUtils.newConcurrentMap();
shuffleTaskInfos.put("app0", new ShuffleTaskInfo("app0"));
shuffleTaskInfos.put("app1", new ShuffleTaskInfo("app1"));
shuffleTaskInfos.put("app2", new ShuffleTaskInfo("app2"));
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
TestLoggerExtension testLogger = TestLoggerExtension.getTestLogger(context);
assertFalse(testLogger.wasLogged("app"));

// test race condition case, app 3 is new app
shuffleTaskInfos.put("3", new ShuffleTaskInfo("app3"));
LocalStorage mockLocalStorage = mock(LocalStorage.class);
when(mockLocalStorage.getAppIds()).thenReturn(Collections.singleton("app3"));
storages.add(mockLocalStorage);
localStorageManager.checkAndClearLeakedShuffleData(shuffleTaskInfos::keySet);
assertTrue(testLogger.wasLogged("Delete shuffle data for appId\\[app3\\]"));
System.out.println();
}
}

0 comments on commit 0f9d9bc

Please sign in to comment.