From a620bd5564f42122df987b0d4b3a4e03fefd3b26 Mon Sep 17 00:00:00 2001 From: Darshan Sanghani Date: Mon, 4 Mar 2024 11:13:29 -0800 Subject: [PATCH] Modify NCCL profiler invocation, collection, dump NCCL provides a profiling/tracing capability to record various operations during collectives including setting up buffers, sending data to and from GPU etc. This change will enable us to control NCCL profiling from the application layer through a start/stop interface. Enhancements * It uses a compile time flag and traces the whole application. So it does not support start and stop API. * Does not annotate the start and stop of the overall collective and provide collective name. * Missing chunk/data size measurement. * Add nccl API markers. * Improve clean up for profiler and collective event buffers. * Make trace dumping not dependent on collective markings. * Future enhancements will include sampling to enable always-on collection. --- src/include/profiler.h | 6 +- src/include/proxy.h | 3 + src/misc/profiler.cc | 221 +++++++++++++++++++++++++++++++++++++---- src/nccl.h.in | 9 ++ src/proxy.cc | 2 + 5 files changed, 221 insertions(+), 20 deletions(-) diff --git a/src/include/profiler.h b/src/include/profiler.h index 103af99ad..a17f1a410 100644 --- a/src/include/profiler.h +++ b/src/include/profiler.h @@ -8,6 +8,7 @@ #define NCCL_PROFILER_H_ #include "proxy.h" +#define NCCL_PROXY_PROFILER_ENABLED 1 enum ncclProxyProfileState { ncclProxyProfileBegin = 0, @@ -32,6 +33,9 @@ enum ncclProxyProfileState { }; ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state); -void ncclProfilingDump(); +ncclResult_t ncclProfilerEnable(); +ncclResult_t ncclProfilerDisable(); +void ncclProfilingDump(const char* filename = "//"); +ncclResult_t ncclCollectiveRecord(const char* name, char type); #endif diff --git a/src/include/proxy.h b/src/include/proxy.h index 353426c1d..7b80bd1b3 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -34,6 +34,8 @@ union ncclProxyOpSpecifics { struct ncclProxyOp { struct ncclProxyConnection* connection; void* buffer; + uint16_t nsteps; + uint16_t nChannels; ssize_t nbytes; uint64_t opCount; int root; @@ -62,6 +64,7 @@ struct ncclProxySubArgs { void* mhandle; int channelId; int nsteps; + int nChannels; ssize_t nbytes; int peer; diff --git a/src/misc/profiler.cc b/src/misc/profiler.cc index 785d616b8..5d80e6cd0 100644 --- a/src/misc/profiler.cc +++ b/src/misc/profiler.cc @@ -4,10 +4,13 @@ * See LICENSE.txt for license information ************************************************************************/ +#include +#include #include "profiler.h" //#define PROFILE_PROXY 1 -#ifdef PROFILE_PROXY +#ifdef NCCL_PROXY_PROFILER_ENABLED +#define ENABLE_TIMER 1 #include "timer.h" #include "alloc.h" @@ -22,17 +25,63 @@ struct ncclProxyProfileEvent { uint16_t channel; uint8_t type; // send / recv uint8_t opIndex; + int chunkSize; + int nsteps; + int nChannels; + int collectiveID; + int nbytes; + uint8_t protocol; }; +struct ncclCollectiveEvent { + double timestamp; + double timestamp_end; + std::string collectiveName; + int collectiveCount; + char type; +}; +struct ncclCollectiveEvent* collectiveEvents = NULL; + +std::atomic enable_profiler{false}; +std::atomic dumpDone{false}; +std::mutex profiler_dump_mutex; + struct ncclProxyProfileEvent* profilingEvents = NULL; int profilingIndex = 0; double profilingStart = 0; #define MAX_EVENTS 200000 +/* Reduce buffer for collective traces to 500. +Too large buffer for collective may resulting in failure to allocate memory. +Example: If on average there is 50 collective calls in 500ms profiling window. A buffer of 500 +can accomodate 10 such windows */ +#define MAX_COLLECTIVES 500 +int collectiveIndex = 0; +volatile double lastEventTimestamp = 0; + +void allocateProfilingBuffer() { + std::unique_lock lg(profiler_dump_mutex); + if (profilingEvents != NULL) { + return; + } + ncclCalloc(&profilingEvents, MAX_EVENTS); + profilingStart = gettime(); +} + +void allocateCollectiveBuffer() { + std::unique_lock lg(profiler_dump_mutex); + if (collectiveEvents != NULL) { + return; + } + ncclCalloc(&collectiveEvents, MAX_COLLECTIVES); + profilingStart = gettime(); +} ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { + if (!enable_profiler.load(std::memory_order_relaxed)) { + return ncclSuccess; + } if (profilingEvents == NULL) { - NCCLCHECK(ncclCalloc(&profilingEvents, MAX_EVENTS)); - profilingStart = gettime(); + allocateProfilingBuffer(); } struct ncclProxyProfileEvent* event = NULL; if (state%8 == 0) { @@ -46,27 +95,104 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, event->type = args->pattern; event->step = step; event->opIndex = (((uint64_t)args)/sizeof(struct ncclProxyArgs))%256; + event->chunkSize = args->chunkSize; + event->nsteps = args->subs[sub].nsteps; + event->nChannels = args->subs[sub].nChannels; + event->nbytes = args->subs[sub].nbytes; + event->protocol = args->protocol; } else event->peer = -state; } else { event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS]; if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL; if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; + if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; } // Timestamp - event->timestamp[state%8] = gettime()-profilingStart; + if (event) { + event->timestamp[state % 8] = gettime() - profilingStart; + if (event->peer >= 0) { + if (lastEventTimestamp < event->timestamp[state % 8]) { + lastEventTimestamp = event->timestamp[state % 8]; + } + event->collectiveID = collectiveIndex; + } + } return ncclSuccess; } -void ncclProfilingDump() { - static int dumpDone = 0; +NCCL_API(ncclResult_t, ncclCollectiveRecord, const char*, char); +ncclResult_t ncclCollectiveRecord(const char* name, char type) { + if (!enable_profiler.load(std::memory_order_relaxed)) { + return ncclSuccess; + } + if (collectiveIndex >= MAX_COLLECTIVES) { + return ncclSuccess; + } + // INFO(NCCL_ALL,"NCCL Collective Record name = %s, event = %c, collectiveindex=%d, max_collectives=%d", name, type, collectiveIndex, MAX_COLLECTIVES); + if (collectiveEvents == NULL) { + allocateCollectiveBuffer(); + } + struct ncclCollectiveEvent* e_prev = collectiveEvents + collectiveIndex - 1; + if (e_prev){ + if (e_prev->type == 'b' && collectiveIndex > 0) { + e_prev->timestamp_end = lastEventTimestamp; + e_prev->type = 'e'; + } + } + struct ncclCollectiveEvent* event = NULL; + event = collectiveEvents + collectiveIndex++; + if(event){ + event->collectiveName = name; + event->collectiveCount = collectiveIndex; + event->timestamp = gettime() - profilingStart; + event->timestamp_end = 0; + event->type = type; + } + // INFO(NCCL_ALL, "The collective name is %s\n", name); + return ncclSuccess; +} + +const char* getCollectiveForEvent(struct ncclProxyProfileEvent* e) { + if (collectiveEvents == NULL || e->collectiveID < 1) { + return "N/A"; + } + struct ncclCollectiveEvent* e_coll = collectiveEvents + e->collectiveID - 1; + return e_coll->collectiveName.c_str(); +} + +void profilerCleanup() { + if (profilingEvents != NULL) { + free(profilingEvents); + profilingEvents = NULL; + } + + if (collectiveEvents != NULL) { + free(collectiveEvents); + collectiveEvents = NULL; + } + dumpDone = true; +} + +NCCL_API(void, ncclProfilingDump, const char*); +void ncclProfilingDump(const char* filename) { + // INFO(NCCL_ALL,"Dumping proxy profiler trace"); + std::unique_lock lg(profiler_dump_mutex); if (dumpDone) return; - dumpDone = 1; - const char* str = ncclGetEnv("NCCL_PROXY_PROFILE"); - if (!str) { free(profilingEvents); return; } + + const char* str = filename; + if (str == "//") { str = getenv("NCCL_PROXY_PROFILE"); } + if (!str) { + profilerCleanup(); + return; + } + FILE* f = fopen(str, "w"); fprintf(f, "[\n"); for (int i=0; ipeer >= 0; const char* typeStr = sendrecv ? (e->type == ncclPatternSend ? "Send" : "Recv") : @@ -76,23 +202,24 @@ void ncclProfilingDump() { if (sendrecv) { int state = ncclProxyProfileBegin; const char** stateStr = e->type == ncclPatternSend ? profilingStateSendStr : profilingStateRecvStr; - fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"args\": { \"opCount\": %ld, \"proxyOpIndex\":%d } },\n", - typeStr, e->peer, e->step, i, e->channel, e->timestamp[state], e->opCount, e->opIndex); + const char* collectiveName = getCollectiveForEvent(e); + fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"args\": { \"opCount\": %ld, \"proxyOpIndex\":%d }, \"chunkSize\": %d, \"totalSteps\": %d, \"totalChannels\": %d, \"collectiveName\": \"%s-%d\" , \"totalbytes\": %d , \"proto\": %d },\n", + typeStr, e->peer, e->step, i, e->channel, e->timestamp[state], e->opCount, e->opIndex, e->chunkSize, e->nsteps, e->nChannels, collectiveName, e->collectiveID, e->nbytes, e->protocol); while (statetimestamp[state]) { const char* name = stateStr[state]; - fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f },\n", - name, i, e->channel, e->timestamp[state]); + fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"chunkSize\": %d, \"totalSteps\": %d, \"collectiveName\": \"%s-%d\", \"totalbytes\": %d , \"proto\": %d},\n", + name, i, e->channel, e->timestamp[state], e->chunkSize, e->nsteps, collectiveName, e->collectiveID, e->nbytes, e->protocol); state++; while (e->timestamp[state] == 0) state++; - fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f },\n", - name, i, e->channel, e->timestamp[state]); + fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"chunkSize\": %d, \"totalSteps\": %d, \"collectiveName\": \"%s-%d\", \"totalbytes\": %d , \"proto\": %d},\n", + name, i, e->channel, e->timestamp[state], e->chunkSize, e->nsteps, collectiveName, e->collectiveID, e->nbytes, e->protocol); } } - fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f },\n", - typeStr, e->peer, e->step, i, e->channel, e->timestamp[state]); + fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f , \"chunkSize\": %d, \"totalSteps\": %d , \"collectiveName\": \"%s-%d\", \"totalbytes\": %d , \"proto\": %d},\n", + typeStr, e->peer, e->step, i, e->channel, e->timestamp[state], e->chunkSize, e->nsteps, collectiveName, e->collectiveID, e->nbytes, e->protocol); } else { if (e->peer == -ncclProxyProfileAppend) { fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": -1, \"tid\": 1, \"ts\": %f, \"args\": { \"added\": %ld } },\n", @@ -105,11 +232,67 @@ void ncclProfilingDump() { typeStr, i, e->timestamp[1]); } } + for (int i = 0; i < collectiveIndex; i++) { + struct ncclCollectiveEvent* e = collectiveEvents + i; + if (e) { + fprintf( + f, + "{\"name\": \"%s-%d\", \"cat\": \"COL\", \"id\": %d, \"ph\": \"b\", \"pid\": -1, \"tid\": 1, \"ts\": %f },\n", + e->collectiveName.c_str(), + e->collectiveCount, + e->collectiveCount, + e->timestamp); + if (i == collectiveIndex - 1) { + fprintf( + f, + "{\"name\": \"%s-%d\", \"cat\": \"COL\", \"id\": %d, \"ph\": \"e\", \"pid\": -1, \"tid\": 1, \"ts\": %f },\n", + e->collectiveName.c_str(), + e->collectiveCount, + e->collectiveCount, + lastEventTimestamp); + } else { + fprintf( + f, + "{\"name\": \"%s-%d\", \"cat\": \"COL\", \"id\": %d, \"ph\": \"e\", \"pid\": -1, \"tid\": 1, \"ts\": %f },\n", + e->collectiveName.c_str(), + e->collectiveCount, + e->collectiveCount, + e->timestamp_end); + } + fflush(stdout); + } + } fprintf(f, "{} ]\n"); fclose(f); - free(profilingEvents); + profilerCleanup(); } + +// returns true if it was previously disabled +NCCL_API(ncclResult_t, ncclProfilerEnable); +ncclResult_t ncclProfilerEnable() { + // INFO(NCCL_ALL,"Enabling proxy profiler\n"); + // only if it was previously disabled + if (!enable_profiler.exchange(true)) { + std::unique_lock lg(profiler_dump_mutex); + profilingIndex = 0; + collectiveIndex = 0; + dumpDone = false; + return ncclSuccess; + } + return ncclInternalError; +}; + +// returns true if it was previously enabled +NCCL_API(ncclResult_t, ncclProfilerDisable); +ncclResult_t ncclProfilerDisable() { + // INFO(NCCL_ALL,"Disabling proxy profiler\n"); + if (enable_profiler.exchange(false)) { + return ncclSuccess; + } + return ncclInternalError; +}; + #else ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { return ncclSuccess; } -void ncclProfilingDump() {} +void ncclProfilingDump(const char* filename) {} #endif diff --git a/src/nccl.h.in b/src/nccl.h.in index 901d8c0d2..a0d921184 100644 --- a/src/nccl.h.in +++ b/src/nccl.h.in @@ -433,6 +433,15 @@ ncclResult_t pncclGroupStart(); ncclResult_t ncclGroupEnd(); ncclResult_t pncclGroupEnd(); +/* + * Enable and disable NCCL proxy thread profiler. + */ +#define NCCL_PROXY_PROFILER_ENABLED 1 +ncclResult_t ncclProfilerEnable(); +ncclResult_t ncclProfilerDisable(); +void ncclProfilingDump(const char* filename); +ncclResult_t ncclCollectiveRecord(const char* name, char type); + /* Register CUDA buffer for zero-copy operation */ ncclResult_t ncclCommRegister(const ncclComm_t comm, void* buff, size_t size, void** handle); ncclResult_t pncclCommRegister(const ncclComm_t comm, void* buff, size_t size, void** handle); diff --git a/src/proxy.cc b/src/proxy.cc index b2b488264..fb98dad58 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -359,6 +359,7 @@ static ncclResult_t ncclProxyOpToArgs(struct ncclProxyOp* op, struct ncclProxyAr sub->nsteps = op->nsteps; sub->nbytes = op->nbytes; sub->peer = op->root; + sub->nChannels = op->nChannels; sub->reg = op->reg; sub->buffer = op->buffer; args->nsubs = subIndex+1; @@ -598,6 +599,7 @@ ncclResult_t ncclProxyComputeP2p(struct ncclInfo* info, struct ncclProxyOp* op, memset(op, 0, sizeof(struct ncclProxyOp)); int channelId = info->channelId; struct ncclChannel* channel = info->comm->channels+channelId; + op->nChannels = info->comm->nChannels; op->channelId = channelId; op->sliceSteps = 1; op->chunkSteps = 1;