From 6052399ff270282e0475837963dd8f5ba13edd95 Mon Sep 17 00:00:00 2001 From: yl09099 Date: Fri, 23 Aug 2024 14:41:04 +0800 Subject: [PATCH] [Improvement] Quickly delete local or HDFS data at the shuffleId level. --- .../server/ShuffleServerGrpcService.java | 2 +- .../uniffle/server/ShuffleTaskManager.java | 22 ++++++- .../server/storage/HadoopStorageManager.java | 59 +++++++++++++++++- .../server/storage/HybridStorageManager.java | 8 ++- .../server/storage/LocalStorageManager.java | 58 ++++++++++++++++- .../server/storage/StorageManager.java | 2 + .../handler/api/ShuffleDeleteHandler.java | 10 +++ .../impl/HadoopShuffleDeleteHandler.java | 62 +++++++++++++++++++ .../handler/impl/LocalFileDeleteHandler.java | 46 ++++++++++++++ 9 files changed, 262 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index b0fe9d9ef9..5e67d90ba0 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -226,7 +226,7 @@ public void registerShuffle( taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber); try { long start = System.currentTimeMillis(); - shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId); + shuffleServer.getShuffleTaskManager().quickRemoveShuffleDataSync(appId, shuffleId); LOG.info( "Deleted the previous stage attempt data due to stage recomputing for app: {}, " + "shuffleId: {}. It costs {} ms", diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 226682e638..4d24b5b763 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -779,6 +779,16 @@ public boolean isAppExpired(String appId) { * @param shuffleIds */ public void removeResourcesByShuffleIds(String appId, List shuffleIds) { + removeResourcesByShuffleIds(appId, shuffleIds, false); + } + + /** + * Clear up the partial resources of shuffleIds of App. + * + * @param appId + * @param shuffleIds + */ + public void removeResourcesByShuffleIds(String appId, List shuffleIds, boolean isQuick) { Lock writeLock = getAppWriteLock(appId); writeLock.lock(); try { @@ -811,7 +821,7 @@ public void removeResourcesByShuffleIds(String appId, List shuffleIds) withTimeoutExecution( () -> { storageManager.removeResources( - new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)); + new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds), isQuick); return null; }, storageRemoveOperationTimeoutSec, @@ -998,6 +1008,16 @@ public void removeShuffleDataSync(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); } + /** + * Delete all data under the shuffleId using the synchronous quick delete mode. + * + * @param appId + * @param shuffleId + */ + public void quickRemoveShuffleDataSync(String appId, int shuffleId) { + removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true); + } + public ShuffleDataDistributionType getDataDistributionType(String appId) { return shuffleTaskInfos.get(appId).getDataDistType(); } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index 33d9b820bc..54465e705f 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -21,10 +21,14 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,6 +42,7 @@ import org.apache.uniffle.common.storage.StorageInfo; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.JavaUtils; +import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.server.Checker; import org.apache.uniffle.server.ShuffleDataFlushEvent; import org.apache.uniffle.server.ShuffleDataReadEvent; @@ -64,12 +69,19 @@ public class HadoopStorageManager extends SingleStorageManager { private Map appIdToStorages = JavaUtils.newConcurrentMap(); private Map pathToStorages = JavaUtils.newConcurrentMap(); private final boolean isStorageAuditLogEnabled; + private final Map>> quickDeletePaths; + private ScheduledExecutorService deletePathExecutorService; HadoopStorageManager(ShuffleServerConf conf) { super(conf); hadoopConf = conf.getHadoopConf(); shuffleServerId = conf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, "shuffleServerId"); isStorageAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED); + this.quickDeletePaths = JavaUtils.newConcurrentMap(); + deletePathExecutorService = + ThreadUtils.getDaemonSingleThreadScheduledExecutor("deleteHadoopPathExecutor"); + deletePathExecutorService.scheduleAtFixedRate( + this::clearQuickDeletePath, 1000 / 2, 1000, TimeUnit.MILLISECONDS); } @Override @@ -98,6 +110,11 @@ public Storage selectStorage(ShuffleDataReadEvent event) { @Override public void removeResources(PurgeEvent event) { + removeResources(event, false); + } + + @Override + public void removeResources(PurgeEvent event, boolean isQuick) { String appId = event.getAppId(); HadoopStorage storage = getStorageByAppId(appId); if (storage != null) { @@ -148,7 +165,20 @@ public void removeResources(PurgeEvent event) { storage.getStoragePath())); } } - deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); + if (isQuick) { + Map>> userAppIdNeedDeletePaths = + JavaUtils.newConcurrentMap(); + Map> appIdNeedDeletePaths = + userAppIdNeedDeletePaths.computeIfAbsent( + event.getUser(), k -> JavaUtils.newConcurrentMap()); + List needDeletePaths = + appIdNeedDeletePaths.computeIfAbsent(event.getAppId(), k -> Lists.newArrayList()); + needDeletePaths.addAll(deletePaths); + deleteHandler.quickDelete(userAppIdNeedDeletePaths); + quickDeletePaths.putAll(userAppIdNeedDeletePaths); + } else { + deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); + } removeAppStorageInfo(event); } else { LOG.warn("Storage gotten is null when removing resources for event: {}", event); @@ -189,6 +219,33 @@ public Map getStorageInfo() { return Maps.newHashMap(); } + private void clearQuickDeletePath() { + synchronized (quickDeletePaths) { + // delete shuffle data for application + ShuffleDeleteHandler deleteHandler = + ShuffleHandlerFactory.getInstance() + .createShuffleDeleteHandler( + new CreateShuffleDeleteHandlerRequest( + StorageType.LOCALFILE.name(), new Configuration())); + if (!quickDeletePaths.isEmpty()) { + for (Map.Entry>> userAppIdNeedDeletePaths : + quickDeletePaths.entrySet()) { + String user = userAppIdNeedDeletePaths.getKey(); + for (Map.Entry> appIdNeedDeletePaths : + userAppIdNeedDeletePaths.getValue().entrySet()) { + String appId = appIdNeedDeletePaths.getKey(); + List needDeletePaths = appIdNeedDeletePaths.getValue(); + List needDeleteTmpPaths = + needDeletePaths.stream() + .map(path -> StringUtils.join(path, "_tmp")) + .collect(Collectors.toList()); + deleteHandler.delete(needDeleteTmpPaths.toArray(new String[0]), appId, user); + } + } + } + } + } + public HadoopStorage getStorageByAppId(String appId) { if (!appIdToStorages.containsKey(appId)) { synchronized (this) { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java index c1169f42ff..adeba5b771 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HybridStorageManager.java @@ -157,9 +157,13 @@ public Map getStorageInfo() { } public void removeResources(PurgeEvent event) { + removeResources(event, false); + } + + public void removeResources(PurgeEvent event, boolean isQuick) { LOG.info("Start to remove resource of {}", event); - warmStorageManager.removeResources(event); - coldStorageManager.removeResources(event); + warmStorageManager.removeResources(event, isQuick); + coldStorageManager.removeResources(event, isQuick); } public StorageManager getColdStorageManager() { diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 10f831ad16..a23c354a9f 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -53,6 +55,7 @@ import org.apache.uniffle.common.storage.StorageInfo; import org.apache.uniffle.common.storage.StorageMedia; import org.apache.uniffle.common.storage.StorageStatus; +import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.server.Checker; @@ -90,6 +93,8 @@ public class LocalStorageManager extends SingleStorageManager { private final List typeProviders = Lists.newArrayList(); private final boolean isStorageAuditLogEnabled; + private final Map>> quickDeletePaths; + private ScheduledExecutorService deletePathExecutorService; @VisibleForTesting LocalStorageManager(ShuffleServerConf conf) { @@ -175,6 +180,11 @@ public class LocalStorageManager extends SingleStorageManager { localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList()))); this.checker = new LocalStorageChecker(conf, localStorages); isStorageAuditLogEnabled = conf.getBoolean(ShuffleServerConf.SERVER_STORAGE_AUDIT_LOG_ENABLED); + this.quickDeletePaths = JavaUtils.newConcurrentMap(); + deletePathExecutorService = + ThreadUtils.getDaemonSingleThreadScheduledExecutor("deleteLocalPathExecutor"); + deletePathExecutorService.scheduleAtFixedRate( + this::clearQuickDeletePath, 1000 / 2, 1000, TimeUnit.MILLISECONDS); } private StorageMedia getStorageTypeForBasePath(String basePath) { @@ -266,6 +276,11 @@ public Checker getStorageChecker() { @Override public void removeResources(PurgeEvent event) { + removeResources(event, false); + } + + @Override + public void removeResources(PurgeEvent event, boolean isQuick) { String appId = event.getAppId(); String user = event.getUser(); List shuffleSet = @@ -327,11 +342,50 @@ public void removeResources(PurgeEvent event) { } }) .collect(Collectors.toList()); - - deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); + if (isQuick) { + Map>> userAppIdNeedDeletePaths = + JavaUtils.newConcurrentMap(); + Map> appIdNeedDeletePaths = + userAppIdNeedDeletePaths.computeIfAbsent( + event.getUser(), k -> JavaUtils.newConcurrentMap()); + List needDeletePaths = + appIdNeedDeletePaths.computeIfAbsent(event.getAppId(), k -> Lists.newArrayList()); + needDeletePaths.addAll(deletePaths); + deleteHandler.quickDelete(userAppIdNeedDeletePaths); + quickDeletePaths.putAll(userAppIdNeedDeletePaths); + } else { + deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); + } removeAppStorageInfo(event); } + private void clearQuickDeletePath() { + synchronized (quickDeletePaths) { + // delete shuffle data for application + ShuffleDeleteHandler deleteHandler = + ShuffleHandlerFactory.getInstance() + .createShuffleDeleteHandler( + new CreateShuffleDeleteHandlerRequest( + StorageType.LOCALFILE.name(), new Configuration())); + if (!quickDeletePaths.isEmpty()) { + for (Map.Entry>> userAppIdNeedDeletePaths : + quickDeletePaths.entrySet()) { + String user = userAppIdNeedDeletePaths.getKey(); + for (Map.Entry> appIdNeedDeletePaths : + userAppIdNeedDeletePaths.getValue().entrySet()) { + String appId = appIdNeedDeletePaths.getKey(); + List needDeletePaths = appIdNeedDeletePaths.getValue(); + List needDeleteTmpPaths = + needDeletePaths.stream() + .map(path -> StringUtils.join(path, "_tmp")) + .collect(Collectors.toList()); + deleteHandler.delete(needDeleteTmpPaths.toArray(new String[0]), appId, user); + } + } + } + } + } + private void cleanupStorageSelectionCache(PurgeEvent event) { Function deleteConditionFunc = null; String prefixKey = null; diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java index 402edc2cd0..16058f385a 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java @@ -43,6 +43,8 @@ public interface StorageManager { void removeResources(PurgeEvent event); + void removeResources(PurgeEvent event, boolean isQuick); + void start(); void stop(); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java index c0b32b9432..bd28281965 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java @@ -17,6 +17,9 @@ package org.apache.uniffle.storage.handler.api; +import java.util.List; +import java.util.Map; + public interface ShuffleDeleteHandler { /** @@ -25,4 +28,11 @@ public interface ShuffleDeleteHandler { * @param appId ApplicationId for delete */ void delete(String[] storageBasePaths, String appId, String user); + + /** + * Rename the file and then delete it asynchronously. + * + * @param userAppIdNeedDeletePaths + */ + void quickDelete(Map>> userAppIdNeedDeletePaths); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java index 312f052f9a..e1e8b338e0 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java @@ -19,7 +19,10 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; @@ -118,4 +121,63 @@ private void delete(FileSystem fileSystem, Path path, String filePrefix) throws fileSystem.delete(path, true); } } + + @Override + public void quickDelete(Map>> userAppIdNeedDeletePaths) { + for (Map.Entry>> userAppIdNeedDeletePath : + userAppIdNeedDeletePaths.entrySet()) { + String user = userAppIdNeedDeletePath.getKey(); + for (Map.Entry> appIdNeedDeletePaths : + userAppIdNeedDeletePath.getValue().entrySet()) { + String appId = appIdNeedDeletePaths.getKey(); + List needDeletePaths = appIdNeedDeletePaths.getValue(); + for (String needDeletePath : needDeletePaths) { + final Path path = new Path(needDeletePath); + final Path breakdownPathFolder = new Path(StringUtils.join(needDeletePath, "_tmp")); + boolean isSuccess = false; + int times = 0; + int retryMax = 5; + long start = System.currentTimeMillis(); + LOG.info( + "Try rename shuffle data in Hadoop FS for appId[{}] of user[{}] with {}", + appId, + user, + path); + while (!isSuccess && times < retryMax) { + try { + FileSystem fileSystem = + HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf); + fileSystem.rename(path, breakdownPathFolder); + isSuccess = true; + } catch (Exception e) { + if (e instanceof FileNotFoundException) { + LOG.info("[{}] doesn't exist, ignore it.", path); + return; + } + times++; + LOG.warn("Can't rename shuffle data for appId[{}] with {} times", appId, times, e); + try { + Thread.sleep(1000); + } catch (Exception ex) { + LOG.warn("Exception happened when Thread.sleep", ex); + } + } + } + if (isSuccess) { + LOG.info( + "Rename shuffle data in Hadoop FS for appId[{}] with {} successfully in {} ms", + appId, + path, + (System.currentTimeMillis() - start)); + } else { + LOG.info( + "Failed to rename shuffle data in Hadoop FS for appId [{}] with {} successfully in {} ms", + appId, + path, + (System.currentTimeMillis() - start)); + } + } + } + } + } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java index 277fb18d7a..5a02dd170f 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java @@ -18,8 +18,11 @@ package org.apache.uniffle.storage.handler.impl; import java.io.File; +import java.util.List; +import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,4 +53,47 @@ public void delete(String[] shuffleDataStoredPath, String appId, String user) { } } } + + @Override + public void quickDelete(Map>> userAppIdNeedDeletePaths) { + for (Map.Entry>> userAppIdNeedDeletePath : + userAppIdNeedDeletePaths.entrySet()) { + for (Map.Entry> appIdNeedDeletePaths : + userAppIdNeedDeletePath.getValue().entrySet()) { + String appId = appIdNeedDeletePaths.getKey(); + List needDeletePaths = appIdNeedDeletePaths.getValue(); + for (String needDeletePath : needDeletePaths) { + String shufflePath = needDeletePath; + String breakdownShufflePath = StringUtils.join(needDeletePath, "_tmp"); + long start = System.currentTimeMillis(); + try { + File baseFolder = new File(shufflePath); + File breakdownBaseFolder = new File(breakdownShufflePath); + boolean isSuccess = baseFolder.renameTo(breakdownBaseFolder); + if (isSuccess) { + LOG.info( + "Rename shuffle data for appId[{}] with {} to {} cost {} ms", + appId, + shufflePath, + breakdownShufflePath, + (System.currentTimeMillis() - start)); + } else { + LOG.warn( + "Can't Rename shuffle data for appId[{}] with {} to {}", + appId, + shufflePath, + breakdownShufflePath); + } + } catch (Exception e) { + LOG.warn( + "Can't Rename shuffle data for appId[{}] with {} to {}", + appId, + shufflePath, + breakdownShufflePath, + e); + } + } + } + } + } }