diff --git a/src/include/proxy.h b/src/include/proxy.h index 7b80bd1b3..d057cc77b 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -40,7 +40,7 @@ struct ncclProxyOp { uint64_t opCount; int root; int next; - int nsteps; + // int nsteps; int chunkSize; uint8_t sliceSteps; uint8_t chunkSteps; diff --git a/src/misc/profiler.cc b/src/misc/profiler.cc index 5d80e6cd0..77700503b 100644 --- a/src/misc/profiler.cc +++ b/src/misc/profiler.cc @@ -14,6 +14,12 @@ #include "timer.h" #include "alloc.h" +#include +#include + +NCCL_PARAM(ProfilerSamplingEnabled, "PROFILER_SAMPLING_ENABLED", true); +NCCL_PARAM(ProfilerSamplingWeight, "PROFILER_SAMPLING_WEIGHT", 100); + static const char* profilingStateSendStr[] = { "BufferWait", "GPUWait", "SendWait", "", "End" }; static const char* profilingStateRecvStr[] = { "BufferWait", "RecvWait", "FlushWait", "GPUWait", "End" }; static const char* profilingEventStr[] = { "SendRecv", "Sleep", "Idle", "Append" }; @@ -42,19 +48,33 @@ struct ncclCollectiveEvent { }; struct ncclCollectiveEvent* collectiveEvents = NULL; -std::atomic enable_profiler{false}; +struct bpf_map_attr { + __u32 map_type; + __u32 key_size; + __u32 value_size; + __u32 max_entries; + __u32 map_flags; + }; + +std::atomic enable_profiler_trace{true}; std::atomic dumpDone{false}; std::mutex profiler_dump_mutex; - struct ncclProxyProfileEvent* profilingEvents = NULL; int profilingIndex = 0; double profilingStart = 0; + +struct ncclProxyProfileEvent* sampledEvent = NULL; +bool sampledEventAllocated = false; +int samplingProfilingIndex = 0; +int samplingProfilerMapFd = -1; + #define MAX_EVENTS 200000 /* Reduce buffer for collective traces to 500. Too large buffer for collective may resulting in failure to allocate memory. Example: If on average there is 50 collective calls in 500ms profiling window. A buffer of 500 can accomodate 10 such windows */ #define MAX_COLLECTIVES 500 +#define MAX_SAMPLED_BUFFER_SIZE 100 int collectiveIndex = 0; volatile double lastEventTimestamp = 0; @@ -76,19 +96,67 @@ void allocateCollectiveBuffer() { profilingStart = gettime(); } -ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { - if (!enable_profiler.load(std::memory_order_relaxed)) { - return ncclSuccess; +bool shouldSample() { + double sampling_rate = 1.0 / (double)ncclParamProfilerSamplingWeight(); + double r = (double)rand() / (double)RAND_MAX ; + return r <= sampling_rate; } - if (profilingEvents == NULL) { - allocateProfilingBuffer(); + + ncclResult_t allocateSamplingProfilerBuffer() { + struct bpf_map_attr mapAttr = { + .map_type = BPF_MAP_TYPE_ARRAY, + .key_size = sizeof(int), + .value_size = sizeof(struct ncclProxyProfileEvent*), + .max_entries = MAX_SAMPLED_BUFFER_SIZE, + .map_flags = 0, + }; + samplingProfilerMapFd = + syscall(__NR_bpf, BPF_MAP_CREATE, &mapAttr, sizeof(mapAttr)); + // Check if the map is created successfully + if (samplingProfilerMapFd < 0) { + // INFO(NCCL_ALL, "Failed to create sampling profiler buffer"); } + + profilingStart = gettime(); + return ncclSuccess; +} + +struct ncclProxyProfileEvent* ncclProfilingEventCreate(int state) { struct ncclProxyProfileEvent* event = NULL; - if (state%8 == 0) { - if (profilingIndex == MAX_EVENTS) return ncclSuccess; - args->subs[sub].profilingEvents[step%NCCL_STEPS] = event = profilingEvents+profilingIndex++; - if (state == ncclProxyProfileBegin) { - // Proxy operation information + // If in trace mode and there is still space in the profiling buffer, we save the event there + if (enable_profiler_trace.load(std::memory_order_relaxed) && profilingIndex < MAX_EVENTS) { + event = profilingEvents + profilingIndex++; + } + + // If in sampling mode, we first check whether this event should be sampled. + // An event should be sampled if: + // 1. sampling is enabled + // 2. the sampling decision is true + // 3. there is no other event currently being sampled (i.e., the previously sampled event has completed) + // 4. if the event is a send/recv event (i.e., not idle, sleep) + + if (ncclParamProfilerSamplingEnabled() && shouldSample() && !sampledEvent && state == ncclProxyProfileBegin) { + // Then, we decide where to store the event being sampled. + // If the profiler already records the events in the profilingEvents buffer (i.e., is in trace mode), we use that location, + // otherwise we allocate the sampledEvent variable + if (event) { + sampledEvent = event; + } + else { + ncclCalloc(&sampledEvent, 1); + sampledEventAllocated = true; + event = sampledEvent; + } + } + return event; +} + +ncclResult_t ncclProfilingEventPopulateMetadata(struct ncclProxyProfileEvent* event, struct ncclProxyArgs* args, int sub, int step, int state) { + // Proxy operation information + if(!event || state%8 != 0) { + return ncclSuccess; + } + if (state == ncclProxyProfileBegin) { event->opCount = args->opCount; event->channel = args->subs[sub].channelId; event->peer = args->subs[sub].peer; @@ -100,13 +168,14 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, event->nChannels = args->subs[sub].nChannels; event->nbytes = args->subs[sub].nbytes; event->protocol = args->protocol; - } else event->peer = -state; - } else { - event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS]; - if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL; - if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; - if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; } + else { + event->peer = -state; + } + return ncclSuccess; +} + +ncclResult_t ncclProfilingEventPopulateTimestamp(struct ncclProxyProfileEvent* event, int state) { // Timestamp if (event) { event->timestamp[state % 8] = gettime() - profilingStart; @@ -120,9 +189,79 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, return ncclSuccess; } +ncclResult_t ncclProfilingSampledEventSave(struct ncclProxyProfileEvent* event) { + if (samplingProfilerMapFd < 0) { + // INFO(NCCL_ALL, "No BPF map to dump event\n"); + return ncclSuccess; + } + if (!event) { + // INFO(NCCL_ALL, "No event to save\n"); + return ncclSuccess; + } + union bpf_attr attr = { + .map_fd = (__u32)samplingProfilerMapFd, + .key = (__u64)(unsigned long)(&samplingProfilingIndex), + .value = (__u64)(unsigned long)(event), + .flags = {}, + }; + const int ret = syscall(__NR_bpf, BPF_MAP_UPDATE_ELEM, &attr, sizeof(attr)); + if (ret < 0) { + // INFO(NCCL_ALL, "Failed to update bpf map with error %d\n", ret); + } + samplingProfilingIndex++; + return ncclSuccess; +} + +ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { + // INFO(NCCL_ALL,"NCCL profiling record called: sub %d step %d state %d", sub, step, state); + if (!enable_profiler_trace.load(std::memory_order_relaxed) && !ncclParamProfilerSamplingEnabled()) { + return ncclSuccess; + } + + // allocate buffers if necessary + if (enable_profiler_trace.load(std::memory_order_relaxed)) { + if (profilingEvents == NULL) { + allocateProfilingBuffer(); + } + } + if (ncclParamProfilerSamplingEnabled()) { + if (samplingProfilerMapFd < 0) { + allocateSamplingProfilerBuffer(); + } + } + + struct ncclProxyProfileEvent* event = NULL; + // if this is a new event + if (state % 8 == 0) { + event = ncclProfilingEventCreate(state); + if (!event) { + return ncclSuccess; + } + args->subs[sub].profilingEvents[step%NCCL_STEPS] = event; + NCCLCHECK(ncclProfilingEventPopulateMetadata(event, args, sub, step, state)); + } + else { + event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS]; + if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL; + if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; + if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; + } + + NCCLCHECK(ncclProfilingEventPopulateTimestamp(event, state)); + + if (event && state == ncclProxyProfileEnd && sampledEvent) { + // INFO(NCCL_ALL, "Saving sampled event"); + ncclProfilingSampledEventSave(sampledEvent); + if (sampledEventAllocated) free(sampledEvent); + sampledEvent = NULL; + } + + return ncclSuccess; +} + NCCL_API(ncclResult_t, ncclCollectiveRecord, const char*, char); ncclResult_t ncclCollectiveRecord(const char* name, char type) { - if (!enable_profiler.load(std::memory_order_relaxed)) { + if (!enable_profiler_trace.load(std::memory_order_relaxed)) { return ncclSuccess; } if (collectiveIndex >= MAX_COLLECTIVES) { @@ -272,7 +411,7 @@ NCCL_API(ncclResult_t, ncclProfilerEnable); ncclResult_t ncclProfilerEnable() { // INFO(NCCL_ALL,"Enabling proxy profiler\n"); // only if it was previously disabled - if (!enable_profiler.exchange(true)) { + if (!enable_profiler_trace.exchange(true)) { std::unique_lock lg(profiler_dump_mutex); profilingIndex = 0; collectiveIndex = 0; @@ -286,7 +425,7 @@ ncclResult_t ncclProfilerEnable() { NCCL_API(ncclResult_t, ncclProfilerDisable); ncclResult_t ncclProfilerDisable() { // INFO(NCCL_ALL,"Disabling proxy profiler\n"); - if (enable_profiler.exchange(false)) { + if (enable_profiler_trace.exchange(false)) { return ncclSuccess; } return ncclInternalError;