From 48b4e975d02391030cd94c568ac21e8675119c15 Mon Sep 17 00:00:00 2001 From: Darshan Sanghani Date: Mon, 4 Mar 2024 13:15:23 -0800 Subject: [PATCH] We augment the NCCL profiler with a sampling mode, Along with the on demand, runtime hooks provided by the previous commit. We would also like to improve the continuous collection by introducing sampling mechanisms. In this approach proxy events are sampled and written to a bpf map according to a user defined sampling weight. The sampled mode is designed to be always on and should not affect full (unsampled) traces being triggered by start/stop API. --- src/include/proxy.h | 2 +- src/misc/profiler.cc | 181 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 161 insertions(+), 22 deletions(-) diff --git a/src/include/proxy.h b/src/include/proxy.h index 7b80bd1b3..d057cc77b 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -40,7 +40,7 @@ struct ncclProxyOp { uint64_t opCount; int root; int next; - int nsteps; + // int nsteps; int chunkSize; uint8_t sliceSteps; uint8_t chunkSteps; diff --git a/src/misc/profiler.cc b/src/misc/profiler.cc index 5d80e6cd0..77700503b 100644 --- a/src/misc/profiler.cc +++ b/src/misc/profiler.cc @@ -14,6 +14,12 @@ #include "timer.h" #include "alloc.h" +#include +#include + +NCCL_PARAM(ProfilerSamplingEnabled, "PROFILER_SAMPLING_ENABLED", true); +NCCL_PARAM(ProfilerSamplingWeight, "PROFILER_SAMPLING_WEIGHT", 100); + static const char* profilingStateSendStr[] = { "BufferWait", "GPUWait", "SendWait", "", "End" }; static const char* profilingStateRecvStr[] = { "BufferWait", "RecvWait", "FlushWait", "GPUWait", "End" }; static const char* profilingEventStr[] = { "SendRecv", "Sleep", "Idle", "Append" }; @@ -42,19 +48,33 @@ struct ncclCollectiveEvent { }; struct ncclCollectiveEvent* collectiveEvents = NULL; -std::atomic enable_profiler{false}; +struct bpf_map_attr { + __u32 map_type; + __u32 key_size; + __u32 value_size; + __u32 max_entries; + __u32 map_flags; + }; + +std::atomic enable_profiler_trace{true}; std::atomic dumpDone{false}; std::mutex profiler_dump_mutex; - struct ncclProxyProfileEvent* profilingEvents = NULL; int profilingIndex = 0; double profilingStart = 0; + +struct ncclProxyProfileEvent* sampledEvent = NULL; +bool sampledEventAllocated = false; +int samplingProfilingIndex = 0; +int samplingProfilerMapFd = -1; + #define MAX_EVENTS 200000 /* Reduce buffer for collective traces to 500. Too large buffer for collective may resulting in failure to allocate memory. Example: If on average there is 50 collective calls in 500ms profiling window. A buffer of 500 can accomodate 10 such windows */ #define MAX_COLLECTIVES 500 +#define MAX_SAMPLED_BUFFER_SIZE 100 int collectiveIndex = 0; volatile double lastEventTimestamp = 0; @@ -76,19 +96,67 @@ void allocateCollectiveBuffer() { profilingStart = gettime(); } -ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { - if (!enable_profiler.load(std::memory_order_relaxed)) { - return ncclSuccess; +bool shouldSample() { + double sampling_rate = 1.0 / (double)ncclParamProfilerSamplingWeight(); + double r = (double)rand() / (double)RAND_MAX ; + return r <= sampling_rate; } - if (profilingEvents == NULL) { - allocateProfilingBuffer(); + + ncclResult_t allocateSamplingProfilerBuffer() { + struct bpf_map_attr mapAttr = { + .map_type = BPF_MAP_TYPE_ARRAY, + .key_size = sizeof(int), + .value_size = sizeof(struct ncclProxyProfileEvent*), + .max_entries = MAX_SAMPLED_BUFFER_SIZE, + .map_flags = 0, + }; + samplingProfilerMapFd = + syscall(__NR_bpf, BPF_MAP_CREATE, &mapAttr, sizeof(mapAttr)); + // Check if the map is created successfully + if (samplingProfilerMapFd < 0) { + // INFO(NCCL_ALL, "Failed to create sampling profiler buffer"); } + + profilingStart = gettime(); + return ncclSuccess; +} + +struct ncclProxyProfileEvent* ncclProfilingEventCreate(int state) { struct ncclProxyProfileEvent* event = NULL; - if (state%8 == 0) { - if (profilingIndex == MAX_EVENTS) return ncclSuccess; - args->subs[sub].profilingEvents[step%NCCL_STEPS] = event = profilingEvents+profilingIndex++; - if (state == ncclProxyProfileBegin) { - // Proxy operation information + // If in trace mode and there is still space in the profiling buffer, we save the event there + if (enable_profiler_trace.load(std::memory_order_relaxed) && profilingIndex < MAX_EVENTS) { + event = profilingEvents + profilingIndex++; + } + + // If in sampling mode, we first check whether this event should be sampled. + // An event should be sampled if: + // 1. sampling is enabled + // 2. the sampling decision is true + // 3. there is no other event currently being sampled (i.e., the previously sampled event has completed) + // 4. if the event is a send/recv event (i.e., not idle, sleep) + + if (ncclParamProfilerSamplingEnabled() && shouldSample() && !sampledEvent && state == ncclProxyProfileBegin) { + // Then, we decide where to store the event being sampled. + // If the profiler already records the events in the profilingEvents buffer (i.e., is in trace mode), we use that location, + // otherwise we allocate the sampledEvent variable + if (event) { + sampledEvent = event; + } + else { + ncclCalloc(&sampledEvent, 1); + sampledEventAllocated = true; + event = sampledEvent; + } + } + return event; +} + +ncclResult_t ncclProfilingEventPopulateMetadata(struct ncclProxyProfileEvent* event, struct ncclProxyArgs* args, int sub, int step, int state) { + // Proxy operation information + if(!event || state%8 != 0) { + return ncclSuccess; + } + if (state == ncclProxyProfileBegin) { event->opCount = args->opCount; event->channel = args->subs[sub].channelId; event->peer = args->subs[sub].peer; @@ -100,13 +168,14 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, event->nChannels = args->subs[sub].nChannels; event->nbytes = args->subs[sub].nbytes; event->protocol = args->protocol; - } else event->peer = -state; - } else { - event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS]; - if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL; - if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; - if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; } + else { + event->peer = -state; + } + return ncclSuccess; +} + +ncclResult_t ncclProfilingEventPopulateTimestamp(struct ncclProxyProfileEvent* event, int state) { // Timestamp if (event) { event->timestamp[state % 8] = gettime() - profilingStart; @@ -120,9 +189,79 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, return ncclSuccess; } +ncclResult_t ncclProfilingSampledEventSave(struct ncclProxyProfileEvent* event) { + if (samplingProfilerMapFd < 0) { + // INFO(NCCL_ALL, "No BPF map to dump event\n"); + return ncclSuccess; + } + if (!event) { + // INFO(NCCL_ALL, "No event to save\n"); + return ncclSuccess; + } + union bpf_attr attr = { + .map_fd = (__u32)samplingProfilerMapFd, + .key = (__u64)(unsigned long)(&samplingProfilingIndex), + .value = (__u64)(unsigned long)(event), + .flags = {}, + }; + const int ret = syscall(__NR_bpf, BPF_MAP_UPDATE_ELEM, &attr, sizeof(attr)); + if (ret < 0) { + // INFO(NCCL_ALL, "Failed to update bpf map with error %d\n", ret); + } + samplingProfilingIndex++; + return ncclSuccess; +} + +ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) { + // INFO(NCCL_ALL,"NCCL profiling record called: sub %d step %d state %d", sub, step, state); + if (!enable_profiler_trace.load(std::memory_order_relaxed) && !ncclParamProfilerSamplingEnabled()) { + return ncclSuccess; + } + + // allocate buffers if necessary + if (enable_profiler_trace.load(std::memory_order_relaxed)) { + if (profilingEvents == NULL) { + allocateProfilingBuffer(); + } + } + if (ncclParamProfilerSamplingEnabled()) { + if (samplingProfilerMapFd < 0) { + allocateSamplingProfilerBuffer(); + } + } + + struct ncclProxyProfileEvent* event = NULL; + // if this is a new event + if (state % 8 == 0) { + event = ncclProfilingEventCreate(state); + if (!event) { + return ncclSuccess; + } + args->subs[sub].profilingEvents[step%NCCL_STEPS] = event; + NCCLCHECK(ncclProfilingEventPopulateMetadata(event, args, sub, step, state)); + } + else { + event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS]; + if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL; + if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; + if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount; + } + + NCCLCHECK(ncclProfilingEventPopulateTimestamp(event, state)); + + if (event && state == ncclProxyProfileEnd && sampledEvent) { + // INFO(NCCL_ALL, "Saving sampled event"); + ncclProfilingSampledEventSave(sampledEvent); + if (sampledEventAllocated) free(sampledEvent); + sampledEvent = NULL; + } + + return ncclSuccess; +} + NCCL_API(ncclResult_t, ncclCollectiveRecord, const char*, char); ncclResult_t ncclCollectiveRecord(const char* name, char type) { - if (!enable_profiler.load(std::memory_order_relaxed)) { + if (!enable_profiler_trace.load(std::memory_order_relaxed)) { return ncclSuccess; } if (collectiveIndex >= MAX_COLLECTIVES) { @@ -272,7 +411,7 @@ NCCL_API(ncclResult_t, ncclProfilerEnable); ncclResult_t ncclProfilerEnable() { // INFO(NCCL_ALL,"Enabling proxy profiler\n"); // only if it was previously disabled - if (!enable_profiler.exchange(true)) { + if (!enable_profiler_trace.exchange(true)) { std::unique_lock lg(profiler_dump_mutex); profilingIndex = 0; collectiveIndex = 0; @@ -286,7 +425,7 @@ ncclResult_t ncclProfilerEnable() { NCCL_API(ncclResult_t, ncclProfilerDisable); ncclResult_t ncclProfilerDisable() { // INFO(NCCL_ALL,"Disabling proxy profiler\n"); - if (enable_profiler.exchange(false)) { + if (enable_profiler_trace.exchange(false)) { return ncclSuccess; } return ncclInternalError;