diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index f9467b2ce9..b45179c581 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -117,6 +117,7 @@ import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import com.linkedin.venice.utils.pools.LandFillObjectPool; import java.io.BufferedReader; import java.io.Console; @@ -125,6 +126,7 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.time.LocalDateTime; @@ -143,6 +145,10 @@ import java.util.Properties; import java.util.Set; import java.util.StringJoiner; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -588,6 +594,9 @@ public static void main(String[] args) throws Exception { case DUMP_HOST_HEARTBEAT: dumpHostHeartbeat(cmd); break; + case CLUSTER_BATCH_TASK: + clusterBatchTask(cmd); + break; default: StringJoiner availableCommands = new StringJoiner(", "); for (Command c: Command.values()) { @@ -852,6 +861,96 @@ private static void deleteStore(CommandLine cmd) throws IOException { printObject(response); } + private static void clusterBatchTask(CommandLine cmd) { + String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.CLUSTER_BATCH_TASK); + String task = getRequiredArgument(cmd, Arg.TASK_NAME, Command.CLUSTER_BATCH_TASK); + String checkpointFile = getRequiredArgument(cmd, Arg.CHECKPOINT_FILE, Command.CLUSTER_BATCH_TASK); + int parallelism = Integer.parseInt(getOptionalArgument(cmd, Arg.THREAD_COUNT, "1")); + LOGGER.info( + "[**** Cluster Command Params ****] Cluster: {}, Task: {}, Checkpoint: {}, Parallelism: {}", + clusterName, + task, + checkpointFile, + parallelism); + // Create child data center controller client map. + ChildAwareResponse childAwareResponse = controllerClient.listChildControllers(clusterName); + Map controllerClientMap = getControllerClientMap(clusterName, childAwareResponse); + + // Fetch list cluster store list from parent region. + Map progressMap = new VeniceConcurrentHashMap<>(); + MultiStoreResponse clusterStoreResponse = controllerClient.queryStoreList(false); + if (clusterStoreResponse.isError()) { + throw new VeniceException("Unable to fetch cluster store list: " + clusterStoreResponse.getError()); + } + for (String storeName: clusterStoreResponse.getStores()) { + progressMap.put(storeName, Boolean.FALSE); + } + + // Load progress from checkpoint file. If file does not exist, it will create new one during checkpointing. + try { + Path checkpointFilePath = Paths.get(checkpointFile); + if (!Files.exists(checkpointFilePath.toAbsolutePath())) { + LOGGER.info( + "Checkpoint file path does not exist, will create a new checkpoint file: {}", + checkpointFilePath.toAbsolutePath()); + } else { + List fileLines = Files.readAllLines(checkpointFilePath); + for (String line: fileLines) { + String storeName = line.split(",")[0]; + // For now, it is boolean to start with, we can add more states to support retry. + boolean status = false; + if (line.split(",").length > 1) { + status = Boolean.parseBoolean(line.split(",")[1]); + } + progressMap.put(storeName, status); + } + } + } catch (IOException e) { + throw new VeniceException(e); + } + List taskList = + progressMap.entrySet().stream().filter(e -> !e.getValue()).map(Map.Entry::getKey).collect(Collectors.toList()); + + // Validate task type. For now, we only has one task, if we have more task in the future, we can extend this logic. + Supplier> functionSupplier = null; + if (SystemStorePushTask.TASK_NAME.equals(task)) { + String systemStoreType = getOptionalArgument(cmd, Arg.SYSTEM_STORE_TYPE); + if (systemStoreType != null) { + if (!(systemStoreType.equalsIgnoreCase(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.toString()) + || systemStoreType.equalsIgnoreCase(VeniceSystemStoreType.META_STORE.toString()))) { + printErrAndExit("System store type: " + systemStoreType + " is not supported."); + } + } + System.out.println( + functionSupplier = () -> new SystemStorePushTask( + controllerClient, + controllerClientMap, + clusterName, + systemStoreType == null ? Optional.empty() : Optional.of(systemStoreType))); + } else { + printErrAndExit("Undefined task: " + task); + } + + // Create thread pool and start parallel processing. + ExecutorService executorService = Executors.newFixedThreadPool(parallelism); + List futureList = new ArrayList<>(); + for (int i = 0; i < parallelism; i++) { + BatchMaintenanceTaskRunner batchMaintenanceTaskRunner = + new BatchMaintenanceTaskRunner(progressMap, checkpointFile, taskList, functionSupplier.get()); + futureList.add(executorService.submit(batchMaintenanceTaskRunner)); + } + for (int i = 0; i < parallelism; i++) { + try { + futureList.get(i).get(); + LOGGER.info("Cluster task completed for thread : {}", i); + } catch (InterruptedException | ExecutionException e) { + LOGGER.warn(e.getMessage()); + executorService.shutdownNow(); + } + } + executorService.shutdownNow(); + } + private static void backfillSystemStores(CommandLine cmd) { String clusterName = getRequiredArgument(cmd, Arg.CLUSTER, Command.BACKFILL_SYSTEM_STORES); String systemStoreType = getRequiredArgument(cmd, Arg.SYSTEM_STORE_TYPE, Command.BACKFILL_SYSTEM_STORES); diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java index 75b2115a49..05603f884c 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Arg.java @@ -226,7 +226,10 @@ public enum Arg { SYSTEM_STORE_TYPE( "system-store-type", "sst", true, "Type of system store to backfill. Supported types are davinci_push_status_store and meta_store" - ), RETRY("retry", "r", false, "Retry this operation"), + ), TASK_NAME("task-name", "tn", true, "Name of the task for cluster command. Supported command [PushSystemStore]."), + CHECKPOINT_FILE("checkpoint-file", "cf", true, "Checkpoint file path for cluster command."), + THREAD_COUNT("thread-count", "tc", true, "Number of threads to execute. 1 if not specified"), + RETRY("retry", "r", false, "Retry this operation"), DISABLE_LOG("disable-log", "dl", false, "Disable logs from internal classes. Only print command output on console"), STORE_VIEW_CONFIGS( "storage-view-configs", "svc", true, diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/BatchMaintenanceTaskRunner.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/BatchMaintenanceTaskRunner.java new file mode 100644 index 0000000000..c8241442b0 --- /dev/null +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/BatchMaintenanceTaskRunner.java @@ -0,0 +1,100 @@ +package com.linkedin.venice; + +import com.linkedin.venice.exceptions.VeniceException; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This class is a simple runnable which keeps fetching task from list and execute the assigned task. The task fetching + * and progress tracking / checkpointing is thread-safe, so it can be run in parallel. + */ +public class BatchMaintenanceTaskRunner implements Runnable { + private static final Logger LOGGER = LogManager.getLogger(BatchMaintenanceTaskRunner.class); + private static final String TASK_LOG_PREFIX = "[**** TASK INFO ****]"; + + private static final ReentrantLock LOCK = new ReentrantLock(); + private static final AtomicInteger INDEX = new AtomicInteger(-1); + private final List taskList; + private final Function storeRunnable; + private final Map progressMap; + private final String checkpointFile; + + public BatchMaintenanceTaskRunner( + Map progressMap, + String checkpointFile, + List taskList, + Function storeRunnable) { + this.taskList = taskList; + this.storeRunnable = storeRunnable; + this.progressMap = progressMap; + this.checkpointFile = checkpointFile; + } + + @Override + public void run() { + while (true) { + int fetchedTaskIndex = INDEX.incrementAndGet(); + if (fetchedTaskIndex >= taskList.size()) { + LOGGER.info("Cannot find new store from queue, will exit."); + break; + } + String store = taskList.get(fetchedTaskIndex); + try { + LOGGER.info("{} Running store job: {} for store: {}", TASK_LOG_PREFIX, fetchedTaskIndex + 1, store); + boolean result = storeRunnable.apply(store); + if (result) { + LOGGER.info( + "{} Complete store task for job: {}/{} store: {}", + TASK_LOG_PREFIX, + fetchedTaskIndex + 1, + taskList.size(), + store); + progressMap.put(store, true); + } else { + LOGGER.info( + "{} Failed store task for job: {}/{} store: {}", + TASK_LOG_PREFIX, + fetchedTaskIndex + 1, + taskList.size(), + store); + } + // Periodically update the checkpoint file. + if ((fetchedTaskIndex % 100) == 0) { + LOGGER.info("{} Preparing to checkpoint status at index {}", TASK_LOG_PREFIX, fetchedTaskIndex); + checkpoint(checkpointFile); + } + } catch (Exception e) { + LOGGER.info("{} Caught exception: {}. Will exit.", TASK_LOG_PREFIX, e.getMessage()); + } + } + // Perform one final checkpointing before existing the runnable. + checkpoint(checkpointFile); + } + + public void checkpoint(String checkpointFile) { + try { + LOCK.lock(); + LOGGER.info("Updating checkpoint..."); + + List status = + progressMap.entrySet().stream().map(e -> e.getKey() + "," + e.getValue()).collect(Collectors.toList()); + Files.write(Paths.get(checkpointFile), status); + LOGGER.info("Updated checkpoint..."); + + } catch (IOException e) { + throw new VeniceException(e); + } finally { + LOCK.unlock(); + } + } +} diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java index a943c76a3e..abdf608927 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/Command.java @@ -12,6 +12,7 @@ import static com.linkedin.venice.Arg.BATCH_GET_LIMIT; import static com.linkedin.venice.Arg.BLOB_TRANSFER_ENABLED; import static com.linkedin.venice.Arg.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOUR; +import static com.linkedin.venice.Arg.CHECKPOINT_FILE; import static com.linkedin.venice.Arg.CHILD_CONTROLLER_ADMIN_TOPIC_CONSUMPTION_ENABLED; import static com.linkedin.venice.Arg.CHUNKING_ENABLED; import static com.linkedin.venice.Arg.CLIENT_DECOMPRESSION_ENABLED; @@ -127,6 +128,8 @@ import static com.linkedin.venice.Arg.SYSTEM_STORE_TYPE; import static com.linkedin.venice.Arg.TARGET_SWAP_REGION; import static com.linkedin.venice.Arg.TARGET_SWAP_REGION_WAIT_TIME; +import static com.linkedin.venice.Arg.TASK_NAME; +import static com.linkedin.venice.Arg.THREAD_COUNT; import static com.linkedin.venice.Arg.TO_BE_STOPPED_NODES; import static com.linkedin.venice.Arg.UNUSED_SCHEMA_DELETION_ENABLED; import static com.linkedin.venice.Arg.URL; @@ -209,6 +212,10 @@ public enum Command { "backfill-system-stores", "Create system stores of a given type for user stores in a cluster", new Arg[] { URL, CLUSTER, SYSTEM_STORE_TYPE } ), + CLUSTER_BATCH_TASK( + "cluster-batch-task", "Run specific task against all user stores in a cluster in parallel", + new Arg[] { URL, CLUSTER, TASK_NAME, CHECKPOINT_FILE }, new Arg[] { THREAD_COUNT } + ), SET_VERSION( "set-version", "Set the version that will be served", new Arg[] { URL, STORE, VERSION }, new Arg[] { CLUSTER } ), ADD_SCHEMA("add-schema", "", new Arg[] { URL, STORE, VALUE_SCHEMA }, new Arg[] { CLUSTER }), diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java new file mode 100644 index 0000000000..b214c46afc --- /dev/null +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/SystemStorePushTask.java @@ -0,0 +1,175 @@ +package com.linkedin.venice; + +import static com.linkedin.venice.AdminTool.printObject; + +import com.linkedin.venice.common.VeniceSystemStoreType; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.ControllerResponse; +import com.linkedin.venice.controllerapi.JobStatusQueryResponse; +import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; +import com.linkedin.venice.controllerapi.VersionCreationResponse; +import com.linkedin.venice.controllerapi.VersionResponse; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pushmonitor.ExecutionStatus; +import com.linkedin.venice.utils.Utils; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * This class aims to do one time emtpy push to all user system stores of a specific user store. + * It will aggregate and compute the largest used version from all regions and update store before performing empty push. + * It will also skip empty push to store which is being migrated and is in the destination cluster. + */ +public class SystemStorePushTask implements Function { + public static final String TASK_NAME = "PushSystemStore"; + private static final Logger LOGGER = LogManager.getLogger(SystemStorePushTask.class); + private static final int JOB_POLLING_RETRY_COUNT = 200; + private static final int JOB_POLLING_RETRY_PERIOD_IN_SECONDS = 5; + private static final String SYSTEM_STORE_PUSH_TASK_LOG_PREFIX = "[**** SYSTEM STORE PUSH ****]"; + private final List systemStoreTypeList; + + private final ControllerClient parentControllerClient; + private final String clusterName; + private final Map childControllerClientMap; + + public SystemStorePushTask( + ControllerClient parentControllerClient, + Map controllerClientMap, + String clusterName, + Optional systemStoreTypeFilter) { + this.parentControllerClient = parentControllerClient; + this.childControllerClientMap = controllerClientMap; + this.clusterName = clusterName; + if (systemStoreTypeFilter.isPresent()) { + systemStoreTypeList = Collections.singletonList(VeniceSystemStoreType.valueOf(systemStoreTypeFilter.get())); + } else { + systemStoreTypeList = + Arrays.asList(VeniceSystemStoreType.META_STORE, VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE); + } + } + + public Boolean apply(String storeName) { + StoreResponse storeResponse = parentControllerClient.getStore(storeName); + if (storeResponse.isError()) { + LOGGER.error("{} Unable to locate user store: {}", SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, storeName); + return false; + } + if (storeResponse.getStore().isMigrating() && storeResponse.getStore().isMigrationDuplicateStore()) { + LOGGER.error( + "{} Unable to empty push to system store of migrating dest cluster store: {} in cluster: {}", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + storeName, + clusterName); + return false; + } + + for (VeniceSystemStoreType type: systemStoreTypeList) { + String systemStoreName = type.getSystemStoreName(storeName); + /** + * In current implementation, a push to system store will flip the flag to true, which can introduce unexpected + * behavior to the store. Here, we skip the system store push if it is turned off. + */ + boolean isSystemStoreEnabled = VeniceSystemStoreType.META_STORE.equals(type) + ? storeResponse.getStore().isStoreMetaSystemStoreEnabled() + : storeResponse.getStore().isDaVinciPushStatusStoreEnabled(); + if (!isSystemStoreEnabled) { + LOGGER.warn( + "{} System store: {} is disabled. Will skip the push.", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName); + } + VersionResponse response = parentControllerClient.getStoreLargestUsedVersion(clusterName, systemStoreName); + if (response.isError()) { + LOGGER.error( + "{} Unable to locate largest used store version for: {}", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName); + return false; + } + int largestUsedVersion = response.getVersion(); + + int version = getStoreLargestUsedVersionNumber(parentControllerClient, systemStoreName); + if (version == -1) { + return false; + } + largestUsedVersion = Math.max(largestUsedVersion, version); + for (Map.Entry controllerClientEntry: childControllerClientMap.entrySet()) { + int result = getStoreLargestUsedVersionNumber(controllerClientEntry.getValue(), systemStoreName); + if (result == -1) { + LOGGER.error( + "{} Unable to locate store for: {} in region: {}", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName, + controllerClientEntry.getKey()); + return false; + } + largestUsedVersion = Math.max(largestUsedVersion, result); + } + + LOGGER.info("Aggregate largest version: {} for store: {}", largestUsedVersion, systemStoreName); + ControllerResponse controllerResponse = parentControllerClient + .updateStore(systemStoreName, new UpdateStoreQueryParams().setLargestUsedVersionNumber(largestUsedVersion)); + if (controllerResponse.isError()) { + LOGGER.error( + "{} Unable to set largest used store version for: {} as {} in all regions", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + systemStoreName, + largestUsedVersion); + return false; + } + + VersionCreationResponse versionCreationResponse = + parentControllerClient.emptyPush(systemStoreName, "SYSTEM_STORE_PUSH_" + System.currentTimeMillis(), 10000); + // Kafka topic name in the above response is null, and it will be fixed with this code change. + String topicName = Version.composeKafkaTopic(systemStoreName, versionCreationResponse.getVersion()); + // Polling job status to make sure the empty push hits every child colo + int count = JOB_POLLING_RETRY_COUNT; + while (true) { + JobStatusQueryResponse jobStatusQueryResponse = + parentControllerClient.retryableRequest(3, controllerClient -> controllerClient.queryJobStatus(topicName)); + printObject(jobStatusQueryResponse, System.out::print); + if (jobStatusQueryResponse.isError()) { + return false; + } + ExecutionStatus executionStatus = ExecutionStatus.valueOf(jobStatusQueryResponse.getStatus()); + if (executionStatus.isTerminal()) { + if (executionStatus.isError()) { + LOGGER.error("{} Push error for topic: {}", SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, topicName); + return false; + } + LOGGER.info("{} Push completed: {}", SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, topicName); + break; + } + Utils.sleep(TimeUnit.SECONDS.toMillis(JOB_POLLING_RETRY_PERIOD_IN_SECONDS)); + count--; + if (count == 0) { + LOGGER.error( + "{} Push not finished: {} in {} seconds", + SYSTEM_STORE_PUSH_TASK_LOG_PREFIX, + topicName, + JOB_POLLING_RETRY_COUNT * JOB_POLLING_RETRY_PERIOD_IN_SECONDS); + return false; + } + } + } + return true; + } + + int getStoreLargestUsedVersionNumber(ControllerClient controllerClient, String systemStoreName) { + // Make sure store exist in region and return largest used version number. + StoreResponse systemStoreResponse = controllerClient.getStore(systemStoreName); + if (systemStoreResponse.isError()) { + return -1; + } + return systemStoreResponse.getStore().getLargestUsedVersionNumber(); + } +}