diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 695b635919..a536838937 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -253,7 +253,9 @@ public void registerShuffle( taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber); try { long start = System.currentTimeMillis(); - shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId); + shuffleServer + .getShuffleTaskManager() + .removeShuffleDataSyncTwoPhases(appId, shuffleId); LOG.info( "Deleted the previous stage attempt data due to stage recomputing for app: {}, " + "shuffleId: {}. It costs {} ms", diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index 22d9b7c0ac..fb5ec10aea 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -170,6 +170,11 @@ public class ShuffleServerMetrics { public static final String REPORTED_BLOCK_COUNT = "reported_block_count"; public static final String CACHED_BLOCK_COUNT = "cached_block_count"; + private static final String TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED = + "total_hadoop_two_phases_deletion_failed"; + private static final String TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED = + "total_local_two_phases_deletion_failed"; + public static Counter.Child counterTotalAppNum; public static Counter.Child counterTotalAppWithHugePartitionNum; public static Counter.Child counterTotalPartitionNum; @@ -245,6 +250,8 @@ public class ShuffleServerMetrics { public static Gauge.Child gaugeReadLocalDataFileBufferSize; public static Gauge.Child gaugeReadLocalIndexFileBufferSize; public static Gauge.Child gaugeReadMemoryDataBufferSize; + public static Counter.Child counterLocalTwoPhasesDeletionFaileTd; + public static Counter.Child counterHadoopTwoPhasesDeletionFailed; public static Gauge gaugeTotalDataSizeUsage; public static Gauge gaugeInMemoryDataSizeUsage; @@ -440,6 +447,10 @@ private static void setUpMetrics(ShuffleServerConf serverConf) { counterTotalHugePartitionNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM); counterTotalHugePartitionExceedHardLimitNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM); + counterLocalTwoPhasesDeletionFaileTd = + metricsManager.addLabeledCounter(TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED); + counterHadoopTwoPhasesDeletionFailed = + metricsManager.addLabeledCounter(TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED); gaugeLocalStorageIsWritable = metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, LOCAL_DISK_PATH_LABEL); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index dbc94c0072..53c5130a18 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -814,6 +814,11 @@ public boolean isAppExpired(String appId) { * @param shuffleIds */ public void removeResourcesByShuffleIds(String appId, List shuffleIds) { + removeResourcesByShuffleIds(appId, shuffleIds, false); + } + + public void removeResourcesByShuffleIds( + String appId, List shuffleIds, boolean isTwoPhases) { Lock writeLock = getAppWriteLock(appId); writeLock.lock(); try { @@ -846,7 +851,7 @@ public void removeResourcesByShuffleIds(String appId, List shuffleIds) withTimeoutExecution( () -> { storageManager.removeResources( - new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)); + new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isTwoPhases)); return null; }, storageRemoveOperationTimeoutSec, @@ -1054,6 +1059,10 @@ public void removeShuffleDataSync(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); } + public void removeShuffleDataSyncTwoPhases(String appId, int shuffleId) { + removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true); + } + public ShuffleDataDistributionType getDataDistributionType(String appId) { return shuffleTaskInfos.get(appId).getDataDistType(); } diff --git a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java index eb555f7edf..ec9fa283da 100644 --- a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java @@ -25,11 +25,19 @@ public abstract class PurgeEvent { private String appId; private String user; private List shuffleIds; + // Quick Delete or not. + private boolean isTwoPhasesDeletion; public PurgeEvent(String appId, String user, List shuffleIds) { + this(appId, user, shuffleIds, false); + } + + public PurgeEvent( + String appId, String user, List shuffleIds, boolean isTwoPhasesDeletion) { this.appId = appId; this.user = user; this.shuffleIds = shuffleIds; + this.isTwoPhasesDeletion = isTwoPhasesDeletion; } public String getAppId() { @@ -44,6 +52,10 @@ public List getShuffleIds() { return shuffleIds; } + public boolean isTwoPhasesDeletion() { + return isTwoPhasesDeletion; + } + @Override public String toString() { return this.getClass().getSimpleName() @@ -56,6 +68,8 @@ public String toString() { + '\'' + ", shuffleIds=" + shuffleIds + + ", isTwoPhasesDeletion=" + + isTwoPhasesDeletion + '}'; } } diff --git a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java index cbc39aab84..32750990b0 100644 --- a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java @@ -22,6 +22,11 @@ public class ShufflePurgeEvent extends PurgeEvent { public ShufflePurgeEvent(String appId, String user, List shuffleIds) { - super(appId, user, shuffleIds); + this(appId, user, shuffleIds, false); + } + + public ShufflePurgeEvent( + String appId, String user, List shuffleIds, boolean isTwoPhases) { + super(appId, user, shuffleIds, isTwoPhases); } } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index adbbc594cc..658cf04184 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -114,7 +114,8 @@ public void removeResources(PurgeEvent event) { new CreateShuffleDeleteHandlerRequest( StorageType.HDFS.name(), storage.getConf(), - purgeForExpired ? shuffleServerId : null)); + purgeForExpired ? shuffleServerId : null, + event.isTwoPhasesDeletion())); String basicPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId); @@ -149,7 +150,11 @@ public void removeResources(PurgeEvent event) { storage.getStoragePath())); } } - deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); + boolean isSuccess = + deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); + if (!isSuccess && event.isTwoPhasesDeletion()) { + ShuffleServerMetrics.counterLocalTwoPhasesDeletionFaileTd.inc(); + } removeAppStorageInfo(event); } else { LOG.warn("Storage gotten is null when removing resources for event: {}", event); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index c33c17f3bf..950934f651 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -311,7 +311,9 @@ public void removeResources(PurgeEvent event) { ShuffleHandlerFactory.getInstance() .createShuffleDeleteHandler( new CreateShuffleDeleteHandlerRequest( - StorageType.LOCALFILE.name(), new Configuration())); + StorageType.LOCALFILE.name(), + new Configuration(), + event.isTwoPhasesDeletion())); List deletePaths = localStorages.stream() @@ -352,7 +354,11 @@ public void removeResources(PurgeEvent event) { }) .collect(Collectors.toList()); - deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); + boolean isSuccess = + deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); + if (!isSuccess && event.isTwoPhasesDeletion()) { + ShuffleServerMetrics.counterHadoopTwoPhasesDeletionFailed.inc(); + } removeAppStorageInfo(event); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java index 4edca3b347..6e10601b66 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java +++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java @@ -32,9 +32,12 @@ import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.storage.handler.api.ClientReadHandler; import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; +import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager; import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler; import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler; +import org.apache.uniffle.storage.handler.impl.HadoopShuffleAsyncDeleteHandler; import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler; +import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler; import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler; import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler; import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler; @@ -184,9 +187,15 @@ private ClientReadHandler getHadoopClientReadHandler( public ShuffleDeleteHandler createShuffleDeleteHandler( CreateShuffleDeleteHandlerRequest request) { - if (StorageType.HDFS.name().equals(request.getStorageType())) { + if (StorageType.HDFS.name().equals(request.getStorageType()) && request.isAsync()) { + return new HadoopShuffleAsyncDeleteHandler( + request.getConf(), request.getShuffleServerId(), AsynDeletionEventManager.getInstance()); + } else if (StorageType.HDFS.name().equals(request.getStorageType()) && !request.isAsync()) { return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId()); - } else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) { + } else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) && request.isAsync()) { + return new LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance()); + } else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) + && !request.isAsync()) { return new LocalFileDeleteHandler(); } else { throw new UnsupportedOperationException( diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/AsynDeletionEvent.java b/storage/src/main/java/org/apache/uniffle/storage/handler/AsynDeletionEvent.java new file mode 100644 index 0000000000..b002c23db9 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/AsynDeletionEvent.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +public class AsynDeletionEvent { + private static final String TEMPORARYSUFFIX = "_tmp"; + private String appId; + private String user; + private String shuffleServerId; + private Configuration conf; + /** Records the mapping between the path to be deleted and the path to be renamed. */ + private Map needDeletePathAndRenamePath; + + private String storageType; + + public AsynDeletionEvent( + String appId, + String user, + Configuration conf, + String shuffleServerId, + List needDeletePath, + String storageType) { + this.appId = appId; + this.user = user; + this.shuffleServerId = shuffleServerId; + this.conf = conf; + this.needDeletePathAndRenamePath = + needDeletePath.stream() + .collect( + Collectors.toMap(Function.identity(), s -> StringUtils.join(s, TEMPORARYSUFFIX))); + this.storageType = storageType; + } + + public String getAppId() { + return appId; + } + + public String getUser() { + return user; + } + + public Configuration getConf() { + return conf; + } + + public Map getNeedDeletePathAndRenamePath() { + return needDeletePathAndRenamePath; + } + + public String[] getNeedDeleteRenamePaths() { + return needDeletePathAndRenamePath.values().stream().toArray(String[]::new); + } + + public String getShuffleServerId() { + return shuffleServerId; + } + + public String getStorageType() { + return storageType; + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java index c0b32b9432..195a26dab8 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java @@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler { * * @param appId ApplicationId for delete */ - void delete(String[] storageBasePaths, String appId, String user); + boolean delete(String[] storageBasePaths, String appId, String user); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java new file mode 100644 index 0000000000..1a2a9f9b66 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import java.util.concurrent.BlockingQueue; + +import com.google.common.collect.Queues; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.storage.factory.ShuffleHandlerFactory; +import org.apache.uniffle.storage.handler.AsynDeletionEvent; +import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; +import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest; +import org.apache.uniffle.storage.util.StorageType; + +public class AsynDeletionEventManager { + private static final Logger LOG = LoggerFactory.getLogger(AsynDeletionEventManager.class); + + private static AsynDeletionEventManager INSTANCE; + + public static synchronized AsynDeletionEventManager getInstance() { + if (INSTANCE == null) { + INSTANCE = new AsynDeletionEventManager(); + } + return INSTANCE; + } + + protected final BlockingQueue twoPhasesDeletionEventQueue = + Queues.newLinkedBlockingQueue(); + protected Thread twoPhasesDeletionThread; + + public AsynDeletionEventManager() { + Runnable twoPhasesDeletionTask = + () -> { + while (true) { + AsynDeletionEvent asynDeletionEvent = null; + try { + asynDeletionEvent = twoPhasesDeletionEventQueue.take(); + if (asynDeletionEvent + .getStorageType() + .equalsIgnoreCase(StorageType.LOCALFILE.name())) { + ShuffleDeleteHandler deleteHandler = + ShuffleHandlerFactory.getInstance() + .createShuffleDeleteHandler( + new CreateShuffleDeleteHandlerRequest( + StorageType.LOCALFILE.name(), new Configuration())); + deleteHandler.delete( + asynDeletionEvent.getNeedDeleteRenamePaths(), + asynDeletionEvent.getAppId(), + asynDeletionEvent.getUser()); + } else if (asynDeletionEvent + .getStorageType() + .equalsIgnoreCase(StorageType.HDFS.name())) { + ShuffleDeleteHandler deleteHandler = + ShuffleHandlerFactory.getInstance() + .createShuffleDeleteHandler( + new CreateShuffleDeleteHandlerRequest( + StorageType.HDFS.name(), + asynDeletionEvent.getConf(), + asynDeletionEvent.getShuffleServerId())); + deleteHandler.delete( + asynDeletionEvent.getNeedDeleteRenamePaths(), + asynDeletionEvent.getAppId(), + asynDeletionEvent.getUser()); + } + } catch (Exception e) { + if (asynDeletionEvent != null) { + LOG.error( + "Delete Paths of {} failed.", asynDeletionEvent.getNeedDeleteRenamePaths(), e); + } else { + LOG.error("Failed to delete a directory in twoPhasesDeletionThread.", e); + } + } + } + }; + twoPhasesDeletionThread = new Thread(twoPhasesDeletionTask); + twoPhasesDeletionThread.setName("twoPhasesDeletionThread"); + twoPhasesDeletionThread.setDaemon(true); + twoPhasesDeletionThread.start(); + } + + public synchronized boolean handlerDeletionQueue(AsynDeletionEvent asynDeletionEvent) { + return twoPhasesDeletionEventQueue.offer(asynDeletionEvent); + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleAsyncDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleAsyncDeleteHandler.java new file mode 100644 index 0000000000..3e49e6e022 --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleAsyncDeleteHandler.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import java.io.FileNotFoundException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; +import org.apache.uniffle.storage.handler.AsynDeletionEvent; +import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; +import org.apache.uniffle.storage.util.StorageType; + +public class HadoopShuffleAsyncDeleteHandler implements ShuffleDeleteHandler { + private static final Logger LOG = LoggerFactory.getLogger(HadoopShuffleAsyncDeleteHandler.class); + private final String shuffleServerId; + private Configuration hadoopConf; + private AsynDeletionEventManager asynDeletionEventManager; + + public HadoopShuffleAsyncDeleteHandler( + Configuration hadoopConf, + String shuffleServerId, + AsynDeletionEventManager asynDeletionEventManager) { + this.hadoopConf = hadoopConf; + this.shuffleServerId = shuffleServerId; + this.asynDeletionEventManager = asynDeletionEventManager; + } + + /** Rename the file and then delete it asynchronously. */ + @Override + public boolean delete(String[] storageBasePaths, String appId, String user) { + AsynDeletionEvent asynDeletionEvent = + new AsynDeletionEvent( + appId, + user, + hadoopConf, + shuffleServerId, + Arrays.asList(storageBasePaths), + StorageType.HDFS.name()); + for (Map.Entry appIdNeedDeletePaths : + asynDeletionEvent.getNeedDeletePathAndRenamePath().entrySet()) { + final Path path = new Path(appIdNeedDeletePaths.getKey()); + final Path breakdownPathFolder = new Path(appIdNeedDeletePaths.getValue()); + boolean isExists = false; + boolean isSuccess = false; + int times = 0; + int retryMax = 5; + long start = System.currentTimeMillis(); + LOG.info( + "Try rename shuffle data in Hadoop FS for appId[{}] of user[{}] with {}", + appId, + user, + path); + while (!isSuccess && times < retryMax) { + try { + FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf); + isExists = fileSystem.exists(path); + if (isExists) { + isSuccess = fileSystem.rename(path, breakdownPathFolder); + } else { + break; + } + } catch (Exception e) { + if (e instanceof FileNotFoundException) { + LOG.info("[{}] doesn't exist, ignore it.", path); + return false; + } + times++; + LOG.warn("Can't rename shuffle data for appId[{}] with {} times", appId, times, e); + try { + Thread.sleep(1000); + } catch (Exception ex) { + LOG.warn("Exception happened when Thread.sleep", ex); + } + } + } + if (isExists) { + if (isSuccess) { + LOG.info( + "Rename shuffle data in Hadoop FS for appId[{}] with {} successfully in {} ms", + appId, + path, + (System.currentTimeMillis() - start)); + } else { + LOG.warn( + "Failed to rename shuffle data in Hadoop FS for appId [{}] with {} successfully in {} ms", + appId, + path, + (System.currentTimeMillis() - start)); + } + } else { + LOG.info( + "Rename shuffle data in Hadoop FS for appId[{}] with {} is not exists", appId, path); + } + } + if (!asynDeletionEventManager.handlerDeletionQueue(asynDeletionEvent)) { + LOG.warn( + "Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements."); + return false; + } + return true; + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java index 312f052f9a..58854cb703 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java @@ -44,7 +44,7 @@ public HadoopShuffleDeleteHandler(Configuration hadoopConf, String shuffleServer } @Override - public void delete(String[] storageBasePaths, String appId, String user) { + public boolean delete(String[] storageBasePaths, String appId, String user) { for (String deletePath : storageBasePaths) { final Path path = new Path(deletePath); boolean isSuccess = false; @@ -64,7 +64,7 @@ public void delete(String[] storageBasePaths, String appId, String user) { } catch (Exception e) { if (e instanceof FileNotFoundException) { LOG.info("[{}] doesn't exist, ignore it.", path); - return; + return false; } times++; LOG.warn( @@ -96,6 +96,7 @@ public void delete(String[] storageBasePaths, String appId, String user) { + " ms"); } } + return true; } private void delete(FileSystem fileSystem, Path path, String filePrefix) throws IOException { diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java new file mode 100644 index 0000000000..31a68251de --- /dev/null +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.storage.handler.impl; + +import java.io.File; +import java.util.Arrays; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.storage.handler.AsynDeletionEvent; +import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; +import org.apache.uniffle.storage.util.StorageType; + +public class LocalFileAsyncDeleteHandler implements ShuffleDeleteHandler { + private static final Logger LOG = LoggerFactory.getLogger(LocalFileAsyncDeleteHandler.class); + private AsynDeletionEventManager asynDeletionEventManager; + + public LocalFileAsyncDeleteHandler(AsynDeletionEventManager asynDeletionEventManager) { + this.asynDeletionEventManager = asynDeletionEventManager; + } + + /** Rename the file and then delete it asynchronously. */ + @Override + public boolean delete(String[] storageBasePaths, String appId, String user) { + AsynDeletionEvent asynDeletionEvent = + new AsynDeletionEvent( + appId, user, null, null, Arrays.asList(storageBasePaths), StorageType.LOCALFILE.name()); + for (Map.Entry appIdNeedDeletePaths : + asynDeletionEvent.getNeedDeletePathAndRenamePath().entrySet()) { + String shufflePath = appIdNeedDeletePaths.getKey(); + String breakdownShufflePath = appIdNeedDeletePaths.getValue(); + boolean isExists; + boolean isSuccess = false; + long start = System.currentTimeMillis(); + try { + File baseFolder = new File(shufflePath); + isExists = baseFolder.exists(); + File breakdownBaseFolder = new File(breakdownShufflePath); + if (isExists) { + isSuccess = baseFolder.renameTo(breakdownBaseFolder); + } + if (isExists) { + if (isSuccess) { + LOG.info( + "Rename shuffle data for appId[{}] with {} to {} cost {} ms", + appId, + shufflePath, + breakdownShufflePath, + (System.currentTimeMillis() - start)); + } else { + LOG.warn( + "Can't Rename shuffle data for appId[{}] with {} to {}", + appId, + shufflePath, + breakdownShufflePath); + } + } else { + LOG.info("Rename shuffle data for appId[{}],[{}] is not exists", appId, shufflePath); + } + } catch (Exception e) { + LOG.error( + "Can't Rename shuffle data for appId[{}] with {} to {}", + appId, + shufflePath, + breakdownShufflePath, + e); + } + } + if (!asynDeletionEventManager.handlerDeletionQueue(asynDeletionEvent)) { + LOG.warn( + "Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements."); + return false; + } + return true; + } +} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java index 277fb18d7a..106b01d774 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java @@ -30,7 +30,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler { private static final Logger LOG = LoggerFactory.getLogger(LocalFileDeleteHandler.class); @Override - public void delete(String[] shuffleDataStoredPath, String appId, String user) { + public boolean delete(String[] shuffleDataStoredPath, String appId, String user) { for (String basePath : shuffleDataStoredPath) { final String shufflePath = basePath; long start = System.currentTimeMillis(); @@ -47,7 +47,9 @@ public void delete(String[] shuffleDataStoredPath, String appId, String user) { + " ms"); } catch (Exception e) { LOG.warn("Can't delete shuffle data for appId[" + appId + "] with " + shufflePath, e); + return false; } } + return true; } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java index b8eda28953..250e1d203e 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java +++ b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java @@ -24,16 +24,28 @@ public class CreateShuffleDeleteHandlerRequest { private String storageType; private Configuration conf; private String shuffleServerId; + private boolean isAsync; public CreateShuffleDeleteHandlerRequest(String storageType, Configuration conf) { this(storageType, conf, null); } + public CreateShuffleDeleteHandlerRequest( + String storageType, Configuration conf, boolean isAsync) { + this(storageType, conf, null, isAsync); + } + public CreateShuffleDeleteHandlerRequest( String storageType, Configuration conf, String shuffleServerId) { + this(storageType, conf, shuffleServerId, false); + } + + public CreateShuffleDeleteHandlerRequest( + String storageType, Configuration conf, String shuffleServerId, boolean isAsync) { this.storageType = storageType; this.conf = conf; this.shuffleServerId = shuffleServerId; + this.isAsync = isAsync; } public String getStorageType() { @@ -47,4 +59,8 @@ public Configuration getConf() { public String getShuffleServerId() { return shuffleServerId; } + + public boolean isAsync() { + return isAsync; + } }