Skip to content

Commit

Permalink
[admin-tool] Add a cluster batch processing framework command and a s…
Browse files Browse the repository at this point in the history
…ystem store empty push task (#1254)

This is just a side-effect PR I created for batch processing all stores in cluster when I am empty pushing all system stores to apply config updates.
Add the new command so you can write your store-oriented task and execute it cluster-wide with the admin-tool. It supports basic checkpointing and parallel processing.
With this, there is a system store empty push task in this PR as well, it did sanity checks and empty push to all system stores for a specific user store.
  • Loading branch information
sixpluszero authored Jan 16, 2025
1 parent 0514542 commit bfd7755
Show file tree
Hide file tree
Showing 5 changed files with 385 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.io.BufferedReader;
import java.io.Console;
Expand All @@ -125,6 +126,7 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
Expand All @@ -143,6 +145,10 @@
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -588,6 +594,9 @@ public static void main(String[] args) throws Exception {
case DUMP_HOST_HEARTBEAT:
dumpHostHeartbeat(cmd);
break;
case CLUSTER_BATCH_TASK:
clusterBatchTask(cmd);
break;
default:
StringJoiner availableCommands = new StringJoiner(", ");
for (Command c: Command.values()) {
Expand Down Expand Up @@ -852,6 +861,96 @@ private static void deleteStore(CommandLine cmd) throws IOException {
printObject(response);
}

private static void clusterBatchTask(CommandLine cmd) {
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.CLUSTER_BATCH_TASK);
String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.CLUSTER_BATCH_TASK);
String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.CLUSTER_BATCH_TASK);
int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1"));
LOGGER.info(
"[**** Cluster Command Params ****] Cluster: {}, Task: {}, Checkpoint: {}, Parallelism: {}",
clusterName,
task,
checkpointFile,
parallelism);
// Create child data center controller client map.
ChildAwareResponse childAwareResponse = controllerClient.listChildControllers(clusterName);
Map<String, ControllerClient> controllerClientMap = getControllerClientMap(clusterName, childAwareResponse);

// Fetch list cluster store list from parent region.
Map<String, Boolean> progressMap = new VeniceConcurrentHashMap<>();
MultiStoreResponse clusterStoreResponse = controllerClient.queryStoreList(false);
if (clusterStoreResponse.isError()) {
throw new VeniceException("Unable to fetch cluster store list: " + clusterStoreResponse.getError());
}
for (String storeName: clusterStoreResponse.getStores()) {
progressMap.put(storeName, Boolean.FALSE);
}

// Load progress from checkpoint file. If file does not exist, it will create new one during checkpointing.
try {
Path checkpointFilePath = Paths.get(checkpointFile);
if (!Files.exists(checkpointFilePath.toAbsolutePath())) {
LOGGER.info(
"Checkpoint file path does not exist, will create a new checkpoint file: {}",
checkpointFilePath.toAbsolutePath());
} else {
List<String> fileLines = Files.readAllLines(checkpointFilePath);
for (String line: fileLines) {
String storeName = line.split(",")[0];
// For now, it is boolean to start with, we can add more states to support retry.
boolean status = false;
if (line.split(",").length > 1) {
status = Boolean.parseBoolean(line.split(",")[1]);
}
progressMap.put(storeName, status);
}
}
} catch (IOException e) {
throw new VeniceException(e);
}
List<String> taskList =
progressMap.entrySet().stream().filter(e -> !e.getValue()).map(Map.Entry::getKey).collect(Collectors.toList());

// Validate task type. For now, we only has one task, if we have more task in the future, we can extend this logic.
Supplier<Function<String, Boolean>> functionSupplier = null;
if (SystemStorePushTask.TASK_NAME.equals(task)) {
String systemStoreType = getOptionalArgument(cmd, Arg.SYSTEM_STORE_TYPE);
if (systemStoreType != null) {
if (!(systemStoreType.equalsIgnoreCase(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.toString())
|| systemStoreType.equalsIgnoreCase(VeniceSystemStoreType.META_STORE.toString()))) {
printErrAndExit("System store type: " + systemStoreType + " is not supported.");
}
}
System.out.println(
functionSupplier = () -> new SystemStorePushTask(
controllerClient,
controllerClientMap,
clusterName,
systemStoreType == null ? Optional.empty() : Optional.of(systemStoreType)));
} else {
printErrAndExit("Undefined task: " + task);
}

// Create thread pool and start parallel processing.
ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < parallelism; i++) {
BatchMaintenanceTaskRunner batchMaintenanceTaskRunner =
new BatchMaintenanceTaskRunner(progressMap, checkpointFile, taskList, functionSupplier.get());
futureList.add(executorService.submit(batchMaintenanceTaskRunner));
}
for (int i = 0; i < parallelism; i++) {
try {
futureList.get(i).get();
LOGGER.info("Cluster task completed for thread : {}", i);
} catch (InterruptedException | ExecutionException e) {
LOGGER.warn(e.getMessage());
executorService.shutdownNow();
}
}
executorService.shutdownNow();
}

private static void backfillSystemStores(CommandLine cmd) {
String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.BACKFILL_SYSTEM_STORES);
String systemStoreType = getRequiredArgument(cmd, Arg.SYSTEM_STORE_TYPE, Command.BACKFILL_SYSTEM_STORES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ public enum Arg {
SYSTEM_STORE_TYPE(
"system-store-type", "sst", true,
"Type of system store to backfill. Supported types are davinci_push_status_store and meta_store"
), RETRY("retry", "r", false, "Retry this operation"),
), TASK_NAME("task-name", "tn", true, "Name of the task for cluster command. Supported command [PushSystemStore]."),
CHECKPOINT_FILE("checkpoint-file", "cf", true, "Checkpoint file path for cluster command."),
THREAD_COUNT("thread-count", "tc", true, "Number of threads to execute. 1 if not specified"),
RETRY("retry", "r", false, "Retry this operation"),
DISABLE_LOG("disable-log", "dl", false, "Disable logs from internal classes. Only print command output on console"),
STORE_VIEW_CONFIGS(
"storage-view-configs", "svc", true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.linkedin.venice;

import com.linkedin.venice.exceptions.VeniceException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This class is a simple runnable which keeps fetching task from list and execute the assigned task. The task fetching
* and progress tracking / checkpointing is thread-safe, so it can be run in parallel.
*/
public class BatchMaintenanceTaskRunner implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(BatchMaintenanceTaskRunner.class);
private static final String TASK_LOG_PREFIX = "[**** TASK INFO ****]";

private static final ReentrantLock LOCK = new ReentrantLock();
private static final AtomicInteger INDEX = new AtomicInteger(-1);
private final List<String> taskList;
private final Function<String, Boolean> storeRunnable;
private final Map<String, Boolean> progressMap;
private final String checkpointFile;

public BatchMaintenanceTaskRunner(
Map<String, Boolean> progressMap,
String checkpointFile,
List<String> taskList,
Function<String, Boolean> storeRunnable) {
this.taskList = taskList;
this.storeRunnable = storeRunnable;
this.progressMap = progressMap;
this.checkpointFile = checkpointFile;
}

@Override
public void run() {
while (true) {
int fetchedTaskIndex = INDEX.incrementAndGet();
if (fetchedTaskIndex >= taskList.size()) {
LOGGER.info("Cannot find new store from queue, will exit.");
break;
}
String store = taskList.get(fetchedTaskIndex);
try {
LOGGER.info("{} Running store job: {} for store: {}", TASK_LOG_PREFIX, fetchedTaskIndex + 1, store);
boolean result = storeRunnable.apply(store);
if (result) {
LOGGER.info(
"{} Complete store task for job: {}/{} store: {}",
TASK_LOG_PREFIX,
fetchedTaskIndex + 1,
taskList.size(),
store);
progressMap.put(store, true);
} else {
LOGGER.info(
"{} Failed store task for job: {}/{} store: {}",
TASK_LOG_PREFIX,
fetchedTaskIndex + 1,
taskList.size(),
store);
}
// Periodically update the checkpoint file.
if ((fetchedTaskIndex % 100) == 0) {
LOGGER.info("{} Preparing to checkpoint status at index {}", TASK_LOG_PREFIX, fetchedTaskIndex);
checkpoint(checkpointFile);
}
} catch (Exception e) {
LOGGER.info("{} Caught exception: {}. Will exit.", TASK_LOG_PREFIX, e.getMessage());
}
}
// Perform one final checkpointing before existing the runnable.
checkpoint(checkpointFile);
}

public void checkpoint(String checkpointFile) {
try {
LOCK.lock();
LOGGER.info("Updating checkpoint...");

List<String> status =
progressMap.entrySet().stream().map(e -> e.getKey() + "," + e.getValue()).collect(Collectors.toList());
Files.write(Paths.get(checkpointFile), status);
LOGGER.info("Updated checkpoint...");

} catch (IOException e) {
throw new VeniceException(e);
} finally {
LOCK.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static com.linkedin.venice.Arg.BATCH_GET_LIMIT;
import static com.linkedin.venice.Arg.BLOB_TRANSFER_ENABLED;
import static com.linkedin.venice.Arg.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOUR;
import static com.linkedin.venice.Arg.CHECKPOINT_FILE;
import static com.linkedin.venice.Arg.CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED;
import static com.linkedin.venice.Arg.CHUNKING_ENABLED;
import static com.linkedin.venice.Arg.CLIENT_DECOMPRESSION_ENABLED;
Expand Down Expand Up @@ -127,6 +128,8 @@
import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION;
import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME;
import static com.linkedin.venice.Arg.TASK_NAME;
import static com.linkedin.venice.Arg.THREAD_COUNT;
import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES;
import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED;
import static com.linkedin.venice.Arg.URL;
Expand Down Expand Up @@ -209,6 +212,10 @@ public enum Command {
"backfill-system-stores", "Create system stores of a given type for user stores in a cluster",
new Arg[] { URL, CLUSTER, SYSTEM_STORE_TYPE }
),
CLUSTER_BATCH_TASK(
"cluster-batch-task", "Run specific task against all user stores in a cluster in parallel",
new Arg[] { URL, CLUSTER, TASK_NAME, CHECKPOINT_FILE }, new Arg[] { THREAD_COUNT }
),
SET_VERSION(
"set-version", "Set the version that will be served", new Arg[] { URL, STORE, VERSION }, new Arg[] { CLUSTER }
), ADD_SCHEMA("add-schema", "", new Arg[] { URL, STORE, VALUE_SCHEMA }, new Arg[] { CLUSTER }),
Expand Down
Loading

0 comments on commit bfd7755

Please sign in to comment.