Skip to content

Commit

Permalink
Adds utility scripts and initial docs for managing multi-GPU Dask clu…
Browse files Browse the repository at this point in the history
…sters for cuGraph (#4838)

This PR adds utility scripts and initial docs for managing multi-GPU Dask clusters for cuGraph, aimed at helping the situation described in [this issue](#4831).

These scripts are taken from internal tools used for MNMG testing and have been modified to be more generalized for use by the community.

Authors:
  - Rick Ratzel (https://github.com/rlratzel)
  - Don Acosta (https://github.com/acostadon)

Approvers:
  - Brad Rees (https://github.com/BradReesWork)
  - Don Acosta (https://github.com/acostadon)
  - Joseph Nke (https://github.com/jnke2016)

URL: #4838
  • Loading branch information
rlratzel authored Jan 7, 2025
1 parent c522baf commit b339c18
Show file tree
Hide file tree
Showing 3 changed files with 452 additions and 0 deletions.
55 changes: 55 additions & 0 deletions scripts/dask/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Dask scripts for multi-GPU environments

This directory contains tools for configuring environments for single-node or
multi-node, multi-gpu (SNMG or MNMG) Dask-based cugraph runs, currently
consisting of shell and python scripts.

Users should also consult the multi-GPU utilities in the
`python/cugraph/cugraph/testing/mg_utils.py` module, specifically the
`start_dask_client()` function, to see how to create `client` and `cluster`
instances in Python code to access the corresponding Dask processes created by
the tools here.


### run-dask-process.sh

This script is used to start the Dask scheduler and workers as needed.

To start a scheduler and workers on a node, run it like this:
```
bash$ run-dask-process.sh scheduler workers
```
Once a scheduler is running on a node in the cluster, workers can be started
on other nodes in the cluster by running the script on each worker node like
this:
```
bash$ run-dask-process.sh workers
```
The env var SCHEDULER_FILE must be set to the location where the scheduler
will generate the scheduler JSON file. The same env var is used by the
workers to locate the generated scheduler JSON file for reading.

The script will ensure the scheduler is started before the workers when both
are specified.

Additional options can be specified for using different communication
mechanisms:
```
--tcp - initalize a TCP cluster (default)
--ucx - initialize a UCX cluster with NVLink
--ucxib | --ucx-ib - initialize a UCX cluster with InfiniBand+NVLink
```
Finally, the script can be run with `-h` or `--help` to see the full set of
options.

### wait_for_workers.py

This script can be used to ensure all workers that are expected to be present
in the cluster are up and running. This is useful for automation that sets up
the Dask cluster and cannot proceed until the Dask cluster is available
to accept tasks.

This example waits for 16 workers to be present:
```
bash$ python wait_for_workers.py --scheduler-file-path=$SCHEDULER_FILE --num-expected-workers=16
```
274 changes: 274 additions & 0 deletions scripts/dask/run-dask-process.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
#!/bin/bash
# Copyright (c) 2024-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

################################################################################
NUMARGS=$#
ARGS=$*
VALIDARGS="-h --help scheduler workers --tcp --ucx --ucxib --ucx-ib"
HELP="$0 [<app> ...] [<flag> ...]
where <app> is:
scheduler - start dask scheduler
workers - start dask workers
and <flag> is:
--tcp - initalize a TCP cluster (default)
--ucx - initialize a UCX cluster with NVLink
--ucxib | --ucx-ib - initialize a UCX cluster with InfiniBand+NVLink
-h | --help - print this text
The cluster config order of precedence is any specification on the
command line (--tcp, --ucx, etc.) if provided, then the value of the
env var DASK_CLUSTER_CONFIG_TYPE if set, then the default value of TCP.
The env var SCHEDULER_FILE must be set to the location of the dask scheduler
file that the scheduler will generate and the worker(s) will read. This
location must be accessible by the scheduler and workers, meaning a multi-node
configuration will need to set this to a location on a shared file system.
"

# Default configuration variables. Most are defined using the bash := or :-
# syntax, which means they will be set only if they were previously unset in
# the environment.
WORKER_RMM_POOL_SIZE=${WORKER_RMM_POOL_SIZE:-12G}
DASK_CUDA_INTERFACE=${DASK_CUDA_INTERFACE:-ibp5s0f0}
DASK_SCHEDULER_PORT=${DASK_SCHEDULER_PORT:-8792}
DASK_DEVICE_MEMORY_LIMIT=${DASK_DEVICE_MEMORY_LIMIT:-auto}
DASK_HOST_MEMORY_LIMIT=${DASK_HOST_MEMORY_LIMIT:-auto}

# Logs can be written to a specific location by setting the DASK_LOGS_DIR
# env var. If unset, all logs are created under a dir named after the
# current PID.
DASK_LOGS_DIR=${DASK_LOGS_DIR:-dask_logs-$$}
DASK_SCHEDULER_LOG=${DASK_LOGS_DIR}/scheduler_log.txt
DASK_WORKERS_LOG=${DASK_LOGS_DIR}/worker-${HOSTNAME}_log.txt

# DASK_CLUSTER_CONFIG_TYPE defaults to the env var value if set, else TCP. CLI
# options to this script take precedence. Valid values are TCP, UCX, UCXIB
DASK_CLUSTER_CONFIG_TYPE=${DASK_CLUSTER_CONFIG_TYPE:-TCP}


################################################################################
# FUNCTIONS

numargs=$#
args=$*
hasArg () {
(( ${numargs} != 0 )) && (echo " ${args} " | grep -q " $1 ")
}

logger_prefix=">>>> "
logger () {
if (( $# > 0 )) && [ "$1" == "-p" ]; then
shift
echo -e "${logger_prefix}$@"
else
echo -e "$(date --utc "+%D-%T.%N")_UTC${logger_prefix}$@"
fi
}

buildTcpArgs () {
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s"
export DASK_DISTRIBUTED__WORKER__MEMORY__Terminate="False"

SCHEDULER_ARGS="--protocol=tcp
--port=$DASK_SCHEDULER_PORT
--scheduler-file $SCHEDULER_FILE
"

WORKER_ARGS="--rmm-pool-size=$WORKER_RMM_POOL_SIZE
--local-directory=/tmp/$LOGNAME
--scheduler-file=$SCHEDULER_FILE
--memory-limit=$DASK_HOST_MEMORY_LIMIT
--device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT
"

}

buildUCXWithInfinibandArgs () {
export DASK_RMM__POOL_SIZE=0.5GB
export DASK_DISTRIBUTED__COMM__UCX__CREATE_CUDA_CONTEXT=True

SCHEDULER_ARGS="--protocol=ucx
--port=$DASK_SCHEDULER_PORT
--interface=$DASK_CUDA_INTERFACE
--scheduler-file $SCHEDULER_FILE
"

WORKER_ARGS="--interface=$DASK_CUDA_INTERFACE
--rmm-pool-size=$WORKER_RMM_POOL_SIZE
--rmm-async
--local-directory=/tmp/$LOGNAME
--scheduler-file=$SCHEDULER_FILE
--memory-limit=$DASK_HOST_MEMORY_LIMIT
--device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT
"
}

buildUCXwithoutInfinibandArgs () {
export UCX_TCP_CM_REUSEADDR=y
export UCX_MAX_RNDV_RAILS=1
export UCX_TCP_TX_SEG_SIZE=8M
export UCX_TCP_RX_SEG_SIZE=8M

export DASK_DISTRIBUTED__COMM__UCX__CUDA_COPY=True
export DASK_DISTRIBUTED__COMM__UCX__TCP=True
export DASK_DISTRIBUTED__COMM__UCX__NVLINK=True
export DASK_DISTRIBUTED__COMM__UCX__INFINIBAND=False
export DASK_DISTRIBUTED__COMM__UCX__RDMACM=False
export DASK_RMM__POOL_SIZE=0.5GB


SCHEDULER_ARGS="--protocol=ucx
--port=$DASK_SCHEDULER_PORT
--scheduler-file $SCHEDULER_FILE
"

WORKER_ARGS="--enable-tcp-over-ucx
--enable-nvlink
--disable-infiniband
--disable-rdmacm
--rmm-pool-size=$WORKER_RMM_POOL_SIZE
--local-directory=/tmp/$LOGNAME
--scheduler-file=$SCHEDULER_FILE
--memory-limit=$DASK_HOST_MEMORY_LIMIT
--device-memory-limit=$DASK_DEVICE_MEMORY_LIMIT
"
}

scheduler_pid=""
worker_pid=""
num_scheduler_tries=0

startScheduler () {
mkdir -p $(dirname $SCHEDULER_FILE)
echo "RUNNING: \"dask scheduler $SCHEDULER_ARGS\"" > $DASK_SCHEDULER_LOG
dask scheduler $SCHEDULER_ARGS >> $DASK_SCHEDULER_LOG 2>&1 &
scheduler_pid=$!
}


################################################################################
# READ CLI OPTIONS

START_SCHEDULER=0
START_WORKERS=0

if (( ${NUMARGS} == 0 )); then
echo "${HELP}"
exit 0
else
if hasArg -h || hasArg --help; then
echo "${HELP}"
exit 0
fi
for a in ${ARGS}; do
if ! (echo " ${VALIDARGS} " | grep -q " ${a} "); then
echo "Invalid option: ${a}"
exit 1
fi
done
fi

if [ -z ${SCHEDULER_FILE+x} ]; then
echo "Env var SCHEDULER_FILE must be set. See -h for details"
exit 1
fi

if hasArg scheduler; then
START_SCHEDULER=1
fi
if hasArg workers; then
START_WORKERS=1
fi
# Allow the command line to take precedence
if hasArg --tcp; then
DASK_CLUSTER_CONFIG_TYPE=TCP
elif hasArg --ucx; then
DASK_CLUSTER_CONFIG_TYPE=UCX
elif hasArg --ucxib || hasArg --ucx-ib; then
DASK_CLUSTER_CONFIG_TYPE=UCXIB
fi


################################################################################
# SETUP & RUN

#export DASK_LOGGING__DISTRIBUTED="DEBUG"
#ulimit -n 100000

if [[ "$DASK_CLUSTER_CONFIG_TYPE" == "UCX" ]]; then
logger "Using cluster configurtion for UCX"
buildUCXwithoutInfinibandArgs
elif [[ "$DASK_CLUSTER_CONFIG_TYPE" == "UCXIB" ]]; then
logger "Using cluster configurtion for UCX with Infiniband"
buildUCXWithInfinibandArgs
else
logger "Using cluster configurtion for TCP"
buildTcpArgs
fi

mkdir -p $DASK_LOGS_DIR
logger "Logs written to: $DASK_LOGS_DIR"

if [[ $START_SCHEDULER == 1 ]]; then
rm -f $SCHEDULER_FILE $DASK_SCHEDULER_LOG $DASK_WORKERS_LOG

startScheduler
sleep 6
num_scheduler_tries=$(( num_scheduler_tries+1 ))

# Wait for the scheduler to start first before proceeding, since
# it may require several retries (if prior run left ports open
# that need time to close, etc.)
while [ ! -f "$SCHEDULER_FILE" ]; do
scheduler_alive=$(ps -p $scheduler_pid > /dev/null ; echo $?)
if [[ $scheduler_alive != 0 ]]; then
if [[ $num_scheduler_tries != 30 ]]; then
logger "scheduler failed to start, retry #$num_scheduler_tries"
startScheduler
sleep 6
num_scheduler_tries=$(( num_scheduler_tries+1 ))
else
logger "could not start scheduler, exiting."
exit 1
fi
fi
done
logger "scheduler started."
fi

if [[ $START_WORKERS == 1 ]]; then
rm -f $DASK_WORKERS_LOG
while [ ! -f "$SCHEDULER_FILE" ]; do
logger "run-dask-process.sh: $SCHEDULER_FILE not present - waiting to start workers..."
sleep 2
done
echo "RUNNING: \"dask_cuda_worker $WORKER_ARGS\"" > $DASK_WORKERS_LOG
dask-cuda-worker $WORKER_ARGS >> $DASK_WORKERS_LOG 2>&1 &
worker_pid=$!
logger "worker(s) started."
fi

# This script will not return until the following background process
# have been completed/killed.
if [[ $worker_pid != "" ]]; then
logger "waiting for worker pid $worker_pid to finish before exiting script..."
wait $worker_pid
fi
if [[ $scheduler_pid != "" ]]; then
logger "waiting for scheduler pid $scheduler_pid to finish before exiting script..."
wait $scheduler_pid
fi
Loading

0 comments on commit b339c18

Please sign in to comment.