Skip to content

Commit

Permalink
improvement: Quickly delete local or HDFS data at the shuffleId level..
Browse files Browse the repository at this point in the history
  • Loading branch information
yl09099 committed Nov 28, 2024
1 parent 9947af7 commit 7f17f5e
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ public abstract class PurgeEvent {
private String appId;
private String user;
private List<Integer> shuffleIds;
// Quick Delete or not.
private boolean isTwoPhasesDeletion;
// Whether to enable the deletion mode: Rename files and then delete them asynchronously.
private boolean isRenameAndDelete;

public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
this(appId, user, shuffleIds, false);
}

public PurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isTwoPhasesDeletion) {
String appId, String user, List<Integer> shuffleIds, boolean isRenameAndDelete) {
this.appId = appId;
this.user = user;
this.shuffleIds = shuffleIds;
this.isTwoPhasesDeletion = isTwoPhasesDeletion;
this.isRenameAndDelete = isRenameAndDelete;
}

public String getAppId() {
Expand All @@ -52,8 +52,8 @@ public List<Integer> getShuffleIds() {
return shuffleIds;
}

public boolean isTwoPhasesDeletion() {
return isTwoPhasesDeletion;
public boolean isRenameAndDelete() {
return isRenameAndDelete;
}

@Override
Expand All @@ -68,8 +68,8 @@ public String toString() {
+ '\''
+ ", shuffleIds="
+ shuffleIds
+ ", isTwoPhasesDeletion="
+ isTwoPhasesDeletion
+ ", isRenameAndDelete="
+ isRenameAndDelete
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void removeResources(PurgeEvent event) {
StorageType.HDFS.name(),
storage.getConf(),
purgeForExpired ? shuffleServerId : null,
event.isTwoPhasesDeletion()));
event.isRenameAndDelete()));

String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
Expand Down Expand Up @@ -152,7 +152,7 @@ public void removeResources(PurgeEvent event) {
}
boolean isSuccess =
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
if (!isSuccess && event.isTwoPhasesDeletion()) {
if (!isSuccess && event.isRenameAndDelete()) {
ShuffleServerMetrics.counterLocalTwoPhasesDeletionFaileTd.inc();
}
removeAppStorageInfo(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ public void removeResources(PurgeEvent event) {
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(),
new Configuration(),
event.isTwoPhasesDeletion()));
StorageType.LOCALFILE.name(), new Configuration(), event.isRenameAndDelete()));

List<String> deletePaths =
localStorages.stream()
Expand Down Expand Up @@ -356,7 +354,7 @@ public void removeResources(PurgeEvent event) {

boolean isSuccess =
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
if (!isSuccess && event.isTwoPhasesDeletion()) {
if (!isSuccess && event.isRenameAndDelete()) {
ShuffleServerMetrics.counterHadoopTwoPhasesDeletionFailed.inc();
}
removeAppStorageInfo(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
Expand Down Expand Up @@ -187,10 +186,7 @@ private ClientReadHandler getHadoopClientReadHandler(

public ShuffleDeleteHandler createShuffleDeleteHandler(
CreateShuffleDeleteHandlerRequest request) {
if (StorageType.HDFS.name().equals(request.getStorageType()) && request.isAsync()) {
return new HadoopShuffleAsyncDeleteHandler(
request.getConf(), request.getShuffleServerId(), AsynDeletionEventManager.getInstance());
} else if (StorageType.HDFS.name().equals(request.getStorageType()) && !request.isAsync()) {
if (StorageType.HDFS.name().equals(request.getStorageType())) {
return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) && request.isAsync()) {
return new LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;

/**
* To quickly delete the Shuffle Data that has been dropped to the disk, you need to rename the data
* first and then encapsulate the data into an asynchronous deletion event. This function is used to
* manage the actual execution of the asynchronous deletion event.
*/
public class AsynDeletionEventManager {
private static final Logger LOG = LoggerFactory.getLogger(AsynDeletionEventManager.class);

Expand All @@ -42,17 +47,17 @@ public static synchronized AsynDeletionEventManager getInstance() {
return INSTANCE;
}

protected final BlockingQueue<AsynDeletionEvent> twoPhasesDeletionEventQueue =
protected final BlockingQueue<AsynDeletionEvent> renameAndAsynDeleteEventQueue =
Queues.newLinkedBlockingQueue();
protected Thread twoPhasesDeletionThread;
protected Thread renameAndAsynDeleteThread;

public AsynDeletionEventManager() {
Runnable twoPhasesDeletionTask =
() -> {
while (true) {
AsynDeletionEvent asynDeletionEvent = null;
try {
asynDeletionEvent = twoPhasesDeletionEventQueue.take();
asynDeletionEvent = renameAndAsynDeleteEventQueue.take();
if (asynDeletionEvent
.getStorageType()
.equalsIgnoreCase(StorageType.LOCALFILE.name())) {
Expand Down Expand Up @@ -90,13 +95,13 @@ public AsynDeletionEventManager() {
}
}
};
twoPhasesDeletionThread = new Thread(twoPhasesDeletionTask);
twoPhasesDeletionThread.setName("twoPhasesDeletionThread");
twoPhasesDeletionThread.setDaemon(true);
twoPhasesDeletionThread.start();
renameAndAsynDeleteThread = new Thread(twoPhasesDeletionTask);
renameAndAsynDeleteThread.setName("renameAndAsynDeleteThread");
renameAndAsynDeleteThread.setDaemon(true);
renameAndAsynDeleteThread.start();
}

public synchronized boolean handlerDeletionQueue(AsynDeletionEvent asynDeletionEvent) {
return twoPhasesDeletionEventQueue.offer(asynDeletionEvent);
public synchronized boolean handlerAsynDelete(AsynDeletionEvent asynDeletionEvent) {
return renameAndAsynDeleteEventQueue.offer(asynDeletionEvent);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public boolean delete(String[] storageBasePaths, String appId, String user) {
e);
}
}
if (!asynDeletionEventManager.handlerDeletionQueue(asynDeletionEvent)) {
if (!asynDeletionEventManager.handlerAsynDelete(asynDeletionEvent)) {
LOG.warn(
"Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements.");
return false;
Expand Down

0 comments on commit 7f17f5e

Please sign in to comment.