diff --git a/src/include/profiler.h b/src/include/profiler.h index 103af99ad..a17f1a410 100644 --- a/src/include/profiler.h +++ b/src/include/profiler.h @@ -8,6 +8,7 @@ #define NCCL_PROFILER_H_ #include "proxy.h" +#define NCCL_PROXY_PROFILER_ENABLED 1 enum ncclProxyProfileState { ncclProxyProfileBegin = 0, @@ -32,6 +33,9 @@ enum ncclProxyProfileState { }; ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state); -void ncclProfilingDump(); +ncclResult_t ncclProfilerEnable(); +ncclResult_t ncclProfilerDisable(); +void ncclProfilingDump(const char* filename = "//"); +ncclResult_t ncclCollectiveRecord(const char* name, char type); #endif diff --git a/src/include/proxy.h b/src/include/proxy.h index 353426c1d..7b80bd1b3 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -34,6 +34,8 @@ union ncclProxyOpSpecifics { struct ncclProxyOp { struct ncclProxyConnection* connection; void* buffer; + uint16_t nsteps; + uint16_t nChannels; ssize_t nbytes; uint64_t opCount; int root; @@ -62,6 +64,7 @@ struct ncclProxySubArgs { void* mhandle; int channelId; int nsteps; + int nChannels; ssize_t nbytes; int peer; diff --git a/src/misc/profiler.cc b/src/misc/profiler.cc index 785d616b8..5d80e6cd0 100644 --- a/src/misc/profiler.cc +++ b/src/misc/profiler.cc @@ -4,10 +4,13 @@ * See LICENSE.txt for license information ************************************************************************/ +#include +#include #include "profiler.h" //#define PROFILE_PROXY 1 -#ifdef PROFILE_PROXY +#ifdef NCCL_PROXY_PROFILER_ENABLED +#define ENABLE_TIMER 1 #include "timer.h" #include "alloc.h" @@ -22,17 +25,63 @@ struct ncclProxyProfileEvent { uint16_t channel; uint8_t type; // send / recv uint8_t opIndex; + int chunkSize; + int nsteps; + int nChannels; + int collectiveID; + int nbytes; + uint8_t protocol; }; +struct ncclCollectiveEvent { + double timestamp; + double timestamp_end; + std::string collectiveName; + int collectiveCount; + char type; +}; +struct ncclCollectiveEvent* collectiveEvents = NULL; + +std::atomic enable_profiler{false}; +std::atomic dumpDone{false}; +std::mutex profiler_dump_mutex; + struct ncclProxyProfileEvent* profilingEvents = NULL; int profilingIndex = 0; double profilingStart = 0; #define MAX_EVENTS 200000 +/* Reduce buffer for collective traces to 500. +Too large buffer for collective may resulting in failure to allocate memory. +Example: If on average there is 50 collective calls in 500ms profiling window. A buffer of 500 +can accomodate 10 such windows */ +#define MAX_COLLECTIVES 500 +int collectiveIndex = 0; +volatile double lastEventTimestamp = 0; + +void allocateProfilingBuffer() { + std::unique_lock lg(profiler_dump_mutex); + if (profilingEvents != NULL) { + return; + } + ncclCalloc(&profilingEvents, MAX_EVENTS); + profilingStart = gettime(); +} + +void allocateCollectiveBuffer() { + std::unique_lock lg(profiler_dump_mutex); + if (collectiveEvents != NULL) { + return; + } + ncclCalloc(&collectiveEvents, MAX_COLLECTIVES); + profilingStart = gettime(); +} ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { + if (!enable_profiler.load(std::memory_order_relaxed)) { + return ncclSuccess; + } if (profilingEvents == NULL) { - NCCLCHECK(ncclCalloc(&profilingEvents, MAX_EVENTS)); - profilingStart = gettime(); + allocateProfilingBuffer(); } struct ncclProxyProfileEvent* event = NULL; if (state%8 == 0) { @@ -46,27 +95,104 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, event->type = args->pattern; event->step = step; event->opIndex = (((uint64_t)args)/sizeof(struct ncclProxyArgs))%256; + event->chunkSize = args->chunkSize; + event->nsteps = args->subs[sub].nsteps; + event->nChannels = args->subs[sub].nChannels; + event->nbytes = args->subs[sub].nbytes; + event->protocol = args->protocol; } else event->peer = -state; } else { event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS]; if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL; if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; + if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; } // Timestamp - event->timestamp[state%8] = gettime()-profilingStart; + if (event) { + event->timestamp[state % 8] = gettime() - profilingStart; + if (event->peer >= 0) { + if (lastEventTimestamp < event->timestamp[state % 8]) { + lastEventTimestamp = event->timestamp[state % 8]; + } + event->collectiveID = collectiveIndex; + } + } return ncclSuccess; } -void ncclProfilingDump() { - static int dumpDone = 0; +NCCL_API(ncclResult_t, ncclCollectiveRecord, const char*, char); +ncclResult_t ncclCollectiveRecord(const char* name, char type) { + if (!enable_profiler.load(std::memory_order_relaxed)) { + return ncclSuccess; + } + if (collectiveIndex >= MAX_COLLECTIVES) { + return ncclSuccess; + } + // INFO(NCCL_ALL,"NCCL Collective Record name = %s, event = %c, collectiveindex=%d, max_collectives=%d", name, type, collectiveIndex, MAX_COLLECTIVES); + if (collectiveEvents == NULL) { + allocateCollectiveBuffer(); + } + struct ncclCollectiveEvent* e_prev = collectiveEvents + collectiveIndex - 1; + if (e_prev){ + if (e_prev->type == 'b' && collectiveIndex > 0) { + e_prev->timestamp_end = lastEventTimestamp; + e_prev->type = 'e'; + } + } + struct ncclCollectiveEvent* event = NULL; + event = collectiveEvents + collectiveIndex++; + if(event){ + event->collectiveName = name; + event->collectiveCount = collectiveIndex; + event->timestamp = gettime() - profilingStart; + event->timestamp_end = 0; + event->type = type; + } + // INFO(NCCL_ALL, "The collective name is %s\n", name); + return ncclSuccess; +} + +const char* getCollectiveForEvent(struct ncclProxyProfileEvent* e) { + if (collectiveEvents == NULL || e->collectiveID < 1) { + return "N/A"; + } + struct ncclCollectiveEvent* e_coll = collectiveEvents + e->collectiveID - 1; + return e_coll->collectiveName.c_str(); +} + +void profilerCleanup() { + if (profilingEvents != NULL) { + free(profilingEvents); + profilingEvents = NULL; + } + + if (collectiveEvents != NULL) { + free(collectiveEvents); + collectiveEvents = NULL; + } + dumpDone = true; +} + +NCCL_API(void, ncclProfilingDump, const char*); +void ncclProfilingDump(const char* filename) { + // INFO(NCCL_ALL,"Dumping proxy profiler trace"); + std::unique_lock lg(profiler_dump_mutex); if (dumpDone) return; - dumpDone = 1; - const char* str = ncclGetEnv("NCCL_PROXY_PROFILE"); - if (!str) { free(profilingEvents); return; } + + const char* str = filename; + if (str == "//") { str = getenv("NCCL_PROXY_PROFILE"); } + if (!str) { + profilerCleanup(); + return; + } + FILE* f = fopen(str, "w"); fprintf(f, "[\n"); for (int i=0; ipeer >= 0; const char* typeStr = sendrecv ? (e->type == ncclPatternSend ? "Send" : "Recv") : @@ -76,23 +202,24 @@ void ncclProfilingDump() { if (sendrecv) { int state = ncclProxyProfileBegin; const char** stateStr = e->type == ncclPatternSend ? profilingStateSendStr : profilingStateRecvStr; - fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"args\": { \"opCount\": %ld, \"proxyOpIndex\":%d } },\n", - typeStr, e->peer, e->step, i, e->channel, e->timestamp[state], e->opCount, e->opIndex); + const char* collectiveName = getCollectiveForEvent(e); + fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"args\": { \"opCount\": %ld, \"proxyOpIndex\":%d }, \"chunkSize\": %d, \"totalSteps\": %d, \"totalChannels\": %d, \"collectiveName\": \"%s-%d\" , \"totalbytes\": %d , \"proto\": %d },\n", + typeStr, e->peer, e->step, i, e->channel, e->timestamp[state], e->opCount, e->opIndex, e->chunkSize, e->nsteps, e->nChannels, collectiveName, e->collectiveID, e->nbytes, e->protocol); while (statetimestamp[state]) { const char* name = stateStr[state]; - fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f },\n", - name, i, e->channel, e->timestamp[state]); + fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"chunkSize\": %d, \"totalSteps\": %d, \"collectiveName\": \"%s-%d\", \"totalbytes\": %d , \"proto\": %d},\n", + name, i, e->channel, e->timestamp[state], e->chunkSize, e->nsteps, collectiveName, e->collectiveID, e->nbytes, e->protocol); state++; while (e->timestamp[state] == 0) state++; - fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f },\n", - name, i, e->channel, e->timestamp[state]); + fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f, \"chunkSize\": %d, \"totalSteps\": %d, \"collectiveName\": \"%s-%d\", \"totalbytes\": %d , \"proto\": %d},\n", + name, i, e->channel, e->timestamp[state], e->chunkSize, e->nsteps, collectiveName, e->collectiveID, e->nbytes, e->protocol); } } - fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f },\n", - typeStr, e->peer, e->step, i, e->channel, e->timestamp[state]); + fprintf(f, "{\"name\": \"%s-%d-%d\", \"cat\": \"NET\", \"ph\": \"e\", \"id\": %d, \"pid\": %d, \"tid\": 1, \"ts\": %f , \"chunkSize\": %d, \"totalSteps\": %d , \"collectiveName\": \"%s-%d\", \"totalbytes\": %d , \"proto\": %d},\n", + typeStr, e->peer, e->step, i, e->channel, e->timestamp[state], e->chunkSize, e->nsteps, collectiveName, e->collectiveID, e->nbytes, e->protocol); } else { if (e->peer == -ncclProxyProfileAppend) { fprintf(f, "{\"name\": \"%s\", \"cat\": \"NET\", \"ph\": \"b\", \"id\": %d, \"pid\": -1, \"tid\": 1, \"ts\": %f, \"args\": { \"added\": %ld } },\n", @@ -105,11 +232,67 @@ void ncclProfilingDump() { typeStr, i, e->timestamp[1]); } } + for (int i = 0; i < collectiveIndex; i++) { + struct ncclCollectiveEvent* e = collectiveEvents + i; + if (e) { + fprintf( + f, + "{\"name\": \"%s-%d\", \"cat\": \"COL\", \"id\": %d, \"ph\": \"b\", \"pid\": -1, \"tid\": 1, \"ts\": %f },\n", + e->collectiveName.c_str(), + e->collectiveCount, + e->collectiveCount, + e->timestamp); + if (i == collectiveIndex - 1) { + fprintf( + f, + "{\"name\": \"%s-%d\", \"cat\": \"COL\", \"id\": %d, \"ph\": \"e\", \"pid\": -1, \"tid\": 1, \"ts\": %f },\n", + e->collectiveName.c_str(), + e->collectiveCount, + e->collectiveCount, + lastEventTimestamp); + } else { + fprintf( + f, + "{\"name\": \"%s-%d\", \"cat\": \"COL\", \"id\": %d, \"ph\": \"e\", \"pid\": -1, \"tid\": 1, \"ts\": %f },\n", + e->collectiveName.c_str(), + e->collectiveCount, + e->collectiveCount, + e->timestamp_end); + } + fflush(stdout); + } + } fprintf(f, "{} ]\n"); fclose(f); - free(profilingEvents); + profilerCleanup(); } + +// returns true if it was previously disabled +NCCL_API(ncclResult_t, ncclProfilerEnable); +ncclResult_t ncclProfilerEnable() { + // INFO(NCCL_ALL,"Enabling proxy profiler\n"); + // only if it was previously disabled + if (!enable_profiler.exchange(true)) { + std::unique_lock lg(profiler_dump_mutex); + profilingIndex = 0; + collectiveIndex = 0; + dumpDone = false; + return ncclSuccess; + } + return ncclInternalError; +}; + +// returns true if it was previously enabled +NCCL_API(ncclResult_t, ncclProfilerDisable); +ncclResult_t ncclProfilerDisable() { + // INFO(NCCL_ALL,"Disabling proxy profiler\n"); + if (enable_profiler.exchange(false)) { + return ncclSuccess; + } + return ncclInternalError; +}; + #else ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { return ncclSuccess; } -void ncclProfilingDump() {} +void ncclProfilingDump(const char* filename) {} #endif diff --git a/src/nccl.h.in b/src/nccl.h.in index 901d8c0d2..a0d921184 100644 --- a/src/nccl.h.in +++ b/src/nccl.h.in @@ -433,6 +433,15 @@ ncclResult_t pncclGroupStart(); ncclResult_t ncclGroupEnd(); ncclResult_t pncclGroupEnd(); +/* + * Enable and disable NCCL proxy thread profiler. + */ +#define NCCL_PROXY_PROFILER_ENABLED 1 +ncclResult_t ncclProfilerEnable(); +ncclResult_t ncclProfilerDisable(); +void ncclProfilingDump(const char* filename); +ncclResult_t ncclCollectiveRecord(const char* name, char type); + /* Register CUDA buffer for zero-copy operation */ ncclResult_t ncclCommRegister(const ncclComm_t comm, void* buff, size_t size, void** handle); ncclResult_t pncclCommRegister(const ncclComm_t comm, void* buff, size_t size, void** handle); diff --git a/src/proxy.cc b/src/proxy.cc index b2b488264..fb98dad58 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -359,6 +359,7 @@ static ncclResult_t ncclProxyOpToArgs(struct ncclProxyOp* op, struct ncclProxyAr sub->nsteps = op->nsteps; sub->nbytes = op->nbytes; sub->peer = op->root; + sub->nChannels = op->nChannels; sub->reg = op->reg; sub->buffer = op->buffer; args->nsubs = subIndex+1; @@ -598,6 +599,7 @@ ncclResult_t ncclProxyComputeP2p(struct ncclInfo* info, struct ncclProxyOp* op, memset(op, 0, sizeof(struct ncclProxyOp)); int channelId = info->channelId; struct ncclChannel* channel = info->comm->channels+channelId; + op->nChannels = info->comm->nChannels; op->channelId = channelId; op->sliceSteps = 1; op->chunkSteps = 1;