Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2083] improvement: Quickly delete local or HDFS data at the shuffleId level. #2084

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ public void registerShuffle(
taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
try {
long start = System.currentTimeMillis();
shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId);
shuffleServer
.getShuffleTaskManager()
.removeShuffleDataSyncRenameAndDelete(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage recomputing for app: {}, "
+ "shuffleId: {}. It costs {} ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ public class ShuffleServerMetrics {
public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
public static final String CACHED_BLOCK_COUNT = "cached_block_count";

private static final String TOTAL_LOCAL_RENAME_AND_DELETION_FAILED =
"total_local_rename_and_deletion_failed";

public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
Expand Down Expand Up @@ -245,6 +248,7 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeReadLocalDataFileBufferSize;
public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
public static Gauge.Child gaugeReadMemoryDataBufferSize;
public static Counter.Child counterLocalRenameAndDeletionFaileTd;

public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
Expand Down Expand Up @@ -440,6 +444,8 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
counterTotalHugePartitionNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
counterTotalHugePartitionExceedHardLimitNum =
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM);
counterLocalRenameAndDeletionFaileTd =
metricsManager.addLabeledCounter(TOTAL_LOCAL_RENAME_AND_DELETION_FAILED);

gaugeLocalStorageIsWritable =
metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, LOCAL_DISK_PATH_LABEL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,11 @@ public boolean isAppExpired(String appId) {
* @param shuffleIds
*/
public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds) {
removeResourcesByShuffleIds(appId, shuffleIds, false);
}

public void removeResourcesByShuffleIds(
String appId, List<Integer> shuffleIds, boolean isRenameAndDelete) {
Lock writeLock = getAppWriteLock(appId);
writeLock.lock();
try {
Expand Down Expand Up @@ -866,7 +871,7 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds)
withTimeoutExecution(
() -> {
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isRenameAndDelete));
return null;
},
storageRemoveOperationTimeoutSec,
Expand Down Expand Up @@ -1074,6 +1079,10 @@ public void removeShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
}

public void removeShuffleDataSyncRenameAndDelete(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true);
}

public ShuffleDataDistributionType getDataDistributionType(String appId) {
return shuffleTaskInfos.get(appId).getDataDistType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ public abstract class PurgeEvent {
private String appId;
private String user;
private List<Integer> shuffleIds;
// Whether to enable the deletion mode: Rename files and then delete them asynchronously.
private boolean isRenameAndDelete;

public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
this(appId, user, shuffleIds, false);
}

public PurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isRenameAndDelete) {
this.appId = appId;
this.user = user;
this.shuffleIds = shuffleIds;
this.isRenameAndDelete = isRenameAndDelete;
}

public String getAppId() {
Expand All @@ -44,6 +52,10 @@ public List<Integer> getShuffleIds() {
return shuffleIds;
}

public boolean isRenameAndDelete() {
return isRenameAndDelete;
}

@Override
public String toString() {
return this.getClass().getSimpleName()
Expand All @@ -56,6 +68,8 @@ public String toString() {
+ '\''
+ ", shuffleIds="
+ shuffleIds
+ ", isRenameAndDelete="
+ isRenameAndDelete
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
public class ShufflePurgeEvent extends PurgeEvent {

public ShufflePurgeEvent(String appId, String user, List<Integer> shuffleIds) {
super(appId, user, shuffleIds);
this(appId, user, shuffleIds, false);
}

public ShufflePurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isRenameAndDelete) {
super(appId, user, shuffleIds, isRenameAndDelete);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public void removeResources(PurgeEvent event) {
new CreateShuffleDeleteHandlerRequest(
StorageType.HDFS.name(),
storage.getConf(),
purgeForExpired ? shuffleServerId : null));
purgeForExpired ? shuffleServerId : null,
event.isRenameAndDelete()));

String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public void removeResources(PurgeEvent event) {
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(), new Configuration()));
StorageType.LOCALFILE.name(), new Configuration(), event.isRenameAndDelete()));

List<String> deletePaths =
localStorages.stream()
Expand Down Expand Up @@ -352,7 +352,11 @@ public void removeResources(PurgeEvent event) {
})
.collect(Collectors.toList());

deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
boolean isSuccess =
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
if (!isSuccess && event.isRenameAndDelete()) {
ShuffleServerMetrics.counterLocalRenameAndDeletionFaileTd.inc();
}
removeAppStorageInfo(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this will effect the metrics analysis when using the 2 phase deletion?

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler;
import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
Expand Down Expand Up @@ -186,7 +188,10 @@ public ShuffleDeleteHandler createShuffleDeleteHandler(
CreateShuffleDeleteHandlerRequest request) {
if (StorageType.HDFS.name().equals(request.getStorageType())) {
return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) && request.isAsync()) {
return new LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())
&& !request.isAsync()) {
return new LocalFileDeleteHandler();
} else {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.storage.handler;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;

public class AsynDeletionEvent {
private static final String TEMPORARYSUFFIX = "_tmp";
private String appId;
private String user;
private String shuffleServerId;
private Configuration conf;
/** Records the mapping between the path to be deleted and the path to be renamed. */
private Map<String, String> needDeletePathAndRenamePath;

private String storageType;

public AsynDeletionEvent(
String appId,
String user,
Configuration conf,
String shuffleServerId,
List<String> needDeletePath,
String storageType) {
this.appId = appId;
this.user = user;
this.shuffleServerId = shuffleServerId;
this.conf = conf;
this.needDeletePathAndRenamePath =
needDeletePath.stream()
.collect(
Collectors.toMap(Function.identity(), s -> StringUtils.join(s, TEMPORARYSUFFIX)));
this.storageType = storageType;
}

public String getAppId() {
return appId;
}

public String getUser() {
return user;
}

public Configuration getConf() {
return conf;
}

public Map<String, String> getNeedDeletePathAndRenamePath() {
return needDeletePathAndRenamePath;
}

public String[] getNeedDeleteRenamePaths() {
return needDeletePathAndRenamePath.values().stream().toArray(String[]::new);
}

public String getShuffleServerId() {
return shuffleServerId;
}

public String getStorageType() {
return storageType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler {
*
* @param appId ApplicationId for delete
*/
void delete(String[] storageBasePaths, String appId, String user);
boolean delete(String[] storageBasePaths, String appId, String user);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.storage.handler.impl;

import java.util.concurrent.BlockingQueue;

import com.google.common.collect.Queues;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
import org.apache.uniffle.storage.handler.AsynDeletionEvent;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;

/**
* To quickly delete the Shuffle Data that has been dropped to the disk, you need to rename the data
* first and then encapsulate the data into an asynchronous deletion event. This function is used to
* manage the actual execution of the asynchronous deletion event.
*/
public class AsynDeletionEventManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give some comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give some comments?

Your above message has been modified.

private static final Logger LOG = LoggerFactory.getLogger(AsynDeletionEventManager.class);

private static AsynDeletionEventManager INSTANCE;

public static synchronized AsynDeletionEventManager getInstance() {
if (INSTANCE == null) {
INSTANCE = new AsynDeletionEventManager();
}
return INSTANCE;
}

protected final BlockingQueue<AsynDeletionEvent> renameAndAsynDeleteEventQueue =
Queues.newLinkedBlockingQueue();
protected Thread renameAndAsynDeleteThread;

public AsynDeletionEventManager() {
Runnable renameAndDeletionTask =
() -> {
while (true) {
AsynDeletionEvent asynDeletionEvent = null;
try {
asynDeletionEvent = renameAndAsynDeleteEventQueue.take();
if (asynDeletionEvent
.getStorageType()
.equalsIgnoreCase(StorageType.LOCALFILE.name())) {
ShuffleDeleteHandler deleteHandler =
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(), new Configuration()));
deleteHandler.delete(
asynDeletionEvent.getNeedDeleteRenamePaths(),
asynDeletionEvent.getAppId(),
asynDeletionEvent.getUser());
} else if (asynDeletionEvent
.getStorageType()
.equalsIgnoreCase(StorageType.HDFS.name())) {
ShuffleDeleteHandler deleteHandler =
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.HDFS.name(),
asynDeletionEvent.getConf(),
asynDeletionEvent.getShuffleServerId()));
deleteHandler.delete(
asynDeletionEvent.getNeedDeleteRenamePaths(),
asynDeletionEvent.getAppId(),
asynDeletionEvent.getUser());
}
} catch (Exception e) {
if (asynDeletionEvent != null) {
LOG.error(
"Delete Paths of {} failed.", asynDeletionEvent.getNeedDeleteRenamePaths(), e);
} else {
LOG.error("Failed to delete a directory in renameAndAsynDeleteThread.", e);
}
}
}
};
renameAndAsynDeleteThread = new Thread(renameAndDeletionTask);
renameAndAsynDeleteThread.setName("renameAndAsynDeleteThread");
renameAndAsynDeleteThread.setDaemon(true);
renameAndAsynDeleteThread.start();
}

public synchronized boolean handlerAsynDelete(AsynDeletionEvent asynDeletionEvent) {
return renameAndAsynDeleteEventQueue.offer(asynDeletionEvent);
}
}
Loading
Loading