Skip to content

Commit

Permalink
We augment the NCCL profiler with a sampling mode,
Browse files Browse the repository at this point in the history
Along with the on demand, runtime hooks provided by the
previous commit. We would also like to improve the continuous
collection by introducing sampling mechanisms.

In this approach proxy events are sampled and written to a
bpf map according to a user defined sampling weight.

The sampled mode is designed to be always on and should
not affect full (unsampled) traces being triggered by start/stop API.
  • Loading branch information
Darshan Sanghani committed Mar 4, 2024
1 parent bb321a0 commit 48b4e97
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/include/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct ncclProxyOp {
uint64_t opCount;
int root;
int next;
int nsteps;
// int nsteps;
int chunkSize;
uint8_t sliceSteps;
uint8_t chunkSteps;
Expand Down
181 changes: 160 additions & 21 deletions src/misc/profiler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
#include "timer.h"
#include "alloc.h"

#include <linux/bpf.h>
#include <sys/syscall.h>

NCCL_PARAM(ProfilerSamplingEnabled, "PROFILER_SAMPLING_ENABLED", true);
NCCL_PARAM(ProfilerSamplingWeight, "PROFILER_SAMPLING_WEIGHT", 100);

static const char* profilingStateSendStr[] = { "BufferWait", "GPUWait", "SendWait", "", "End" };
static const char* profilingStateRecvStr[] = { "BufferWait", "RecvWait", "FlushWait", "GPUWait", "End" };
static const char* profilingEventStr[] = { "SendRecv", "Sleep", "Idle", "Append" };
Expand Down Expand Up @@ -42,19 +48,33 @@ struct ncclCollectiveEvent {
};
struct ncclCollectiveEvent* collectiveEvents = NULL;

std::atomic<bool> enable_profiler{false};
struct bpf_map_attr {
__u32 map_type;
__u32 key_size;
__u32 value_size;
__u32 max_entries;
__u32 map_flags;
};

std::atomic<bool> enable_profiler_trace{true};
std::atomic<bool> dumpDone{false};
std::mutex profiler_dump_mutex;

struct ncclProxyProfileEvent* profilingEvents = NULL;
int profilingIndex = 0;
double profilingStart = 0;

struct ncclProxyProfileEvent* sampledEvent = NULL;
bool sampledEventAllocated = false;
int samplingProfilingIndex = 0;
int samplingProfilerMapFd = -1;

#define MAX_EVENTS 200000
/* Reduce buffer for collective traces to 500.
Too large buffer for collective may resulting in failure to allocate memory.
Example: If on average there is 50 collective calls in 500ms profiling window. A buffer of 500
can accomodate 10 such windows */
#define MAX_COLLECTIVES 500
#define MAX_SAMPLED_BUFFER_SIZE 100
int collectiveIndex = 0;
volatile double lastEventTimestamp = 0;

Expand All @@ -76,19 +96,67 @@ void allocateCollectiveBuffer() {
profilingStart = gettime();
}

ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) {
if (!enable_profiler.load(std::memory_order_relaxed)) {
return ncclSuccess;
bool shouldSample() {
double sampling_rate = 1.0 / (double)ncclParamProfilerSamplingWeight();
double r = (double)rand() / (double)RAND_MAX ;
return r <= sampling_rate;
}
if (profilingEvents == NULL) {
allocateProfilingBuffer();

ncclResult_t allocateSamplingProfilerBuffer() {
struct bpf_map_attr mapAttr = {
.map_type = BPF_MAP_TYPE_ARRAY,
.key_size = sizeof(int),
.value_size = sizeof(struct ncclProxyProfileEvent*),
.max_entries = MAX_SAMPLED_BUFFER_SIZE,
.map_flags = 0,
};
samplingProfilerMapFd =
syscall(__NR_bpf, BPF_MAP_CREATE, &mapAttr, sizeof(mapAttr));
// Check if the map is created successfully
if (samplingProfilerMapFd < 0) {
// INFO(NCCL_ALL, "Failed to create sampling profiler buffer");
}

profilingStart = gettime();
return ncclSuccess;
}

struct ncclProxyProfileEvent* ncclProfilingEventCreate(int state) {
struct ncclProxyProfileEvent* event = NULL;
if (state%8 == 0) {
if (profilingIndex == MAX_EVENTS) return ncclSuccess;
args->subs[sub].profilingEvents[step%NCCL_STEPS] = event = profilingEvents+profilingIndex++;
if (state == ncclProxyProfileBegin) {
// Proxy operation information
// If in trace mode and there is still space in the profiling buffer, we save the event there
if (enable_profiler_trace.load(std::memory_order_relaxed) && profilingIndex < MAX_EVENTS) {
event = profilingEvents + profilingIndex++;
}

// If in sampling mode, we first check whether this event should be sampled.
// An event should be sampled if:
// 1. sampling is enabled
// 2. the sampling decision is true
// 3. there is no other event currently being sampled (i.e., the previously sampled event has completed)
// 4. if the event is a send/recv event (i.e., not idle, sleep)

if (ncclParamProfilerSamplingEnabled() && shouldSample() && !sampledEvent && state == ncclProxyProfileBegin) {
// Then, we decide where to store the event being sampled.
// If the profiler already records the events in the profilingEvents buffer (i.e., is in trace mode), we use that location,
// otherwise we allocate the sampledEvent variable
if (event) {
sampledEvent = event;
}
else {
ncclCalloc(&sampledEvent, 1);
sampledEventAllocated = true;
event = sampledEvent;
}
}
return event;
}

ncclResult_t ncclProfilingEventPopulateMetadata(struct ncclProxyProfileEvent* event, struct ncclProxyArgs* args, int sub, int step, int state) {
// Proxy operation information
if(!event || state%8 != 0) {
return ncclSuccess;
}
if (state == ncclProxyProfileBegin) {
event->opCount = args->opCount;
event->channel = args->subs[sub].channelId;
event->peer = args->subs[sub].peer;
Expand All @@ -100,13 +168,14 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step,
event->nChannels = args->subs[sub].nChannels;
event->nbytes = args->subs[sub].nbytes;
event->protocol = args->protocol;
} else event->peer = -state;
} else {
event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS];
if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL;
if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount;
if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount;
}
else {
event->peer = -state;
}
return ncclSuccess;
}

ncclResult_t ncclProfilingEventPopulateTimestamp(struct ncclProxyProfileEvent* event, int state) {
// Timestamp
if (event) {
event->timestamp[state % 8] = gettime() - profilingStart;
Expand All @@ -120,9 +189,79 @@ ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step,
return ncclSuccess;
}

ncclResult_t ncclProfilingSampledEventSave(struct ncclProxyProfileEvent* event) {
if (samplingProfilerMapFd < 0) {
// INFO(NCCL_ALL, "No BPF map to dump event\n");
return ncclSuccess;
}
if (!event) {
// INFO(NCCL_ALL, "No event to save\n");
return ncclSuccess;
}
union bpf_attr attr = {
.map_fd = (__u32)samplingProfilerMapFd,
.key = (__u64)(unsigned long)(&samplingProfilingIndex),
.value = (__u64)(unsigned long)(event),
.flags = {},
};
const int ret = syscall(__NR_bpf, BPF_MAP_UPDATE_ELEM, &attr, sizeof(attr));
if (ret < 0) {
// INFO(NCCL_ALL, "Failed to update bpf map with error %d\n", ret);
}
samplingProfilingIndex++;
return ncclSuccess;
}

ncclResult_t ncclProfilingRecord(struct ncclProxyArgs* args, int sub, int step, int state) {
// INFO(NCCL_ALL,"NCCL profiling record called: sub %d step %d state %d", sub, step, state);
if (!enable_profiler_trace.load(std::memory_order_relaxed) && !ncclParamProfilerSamplingEnabled()) {
return ncclSuccess;
}

// allocate buffers if necessary
if (enable_profiler_trace.load(std::memory_order_relaxed)) {
if (profilingEvents == NULL) {
allocateProfilingBuffer();
}
}
if (ncclParamProfilerSamplingEnabled()) {
if (samplingProfilerMapFd < 0) {
allocateSamplingProfilerBuffer();
}
}

struct ncclProxyProfileEvent* event = NULL;
// if this is a new event
if (state % 8 == 0) {
event = ncclProfilingEventCreate(state);
if (!event) {
return ncclSuccess;
}
args->subs[sub].profilingEvents[step%NCCL_STEPS] = event;
NCCLCHECK(ncclProfilingEventPopulateMetadata(event, args, sub, step, state));
}
else {
event = (struct ncclProxyProfileEvent*)args->subs[sub].profilingEvents[step%NCCL_STEPS];
if (state == ncclProxyProfileEnd) args->subs[sub].profilingEvents[step%NCCL_STEPS] = NULL;
if (state == ncclProxyProfileAppendEnd) event->opCount = args->opCount;
if (event && state == ncclProxyProfileAppendEnd) event->opCount = args->opCount;
}

NCCLCHECK(ncclProfilingEventPopulateTimestamp(event, state));

if (event && state == ncclProxyProfileEnd && sampledEvent) {
// INFO(NCCL_ALL, "Saving sampled event");
ncclProfilingSampledEventSave(sampledEvent);
if (sampledEventAllocated) free(sampledEvent);
sampledEvent = NULL;
}

return ncclSuccess;
}

NCCL_API(ncclResult_t, ncclCollectiveRecord, const char*, char);
ncclResult_t ncclCollectiveRecord(const char* name, char type) {
if (!enable_profiler.load(std::memory_order_relaxed)) {
if (!enable_profiler_trace.load(std::memory_order_relaxed)) {
return ncclSuccess;
}
if (collectiveIndex >= MAX_COLLECTIVES) {
Expand Down Expand Up @@ -272,7 +411,7 @@ NCCL_API(ncclResult_t, ncclProfilerEnable);
ncclResult_t ncclProfilerEnable() {
// INFO(NCCL_ALL,"Enabling proxy profiler\n");
// only if it was previously disabled
if (!enable_profiler.exchange(true)) {
if (!enable_profiler_trace.exchange(true)) {
std::unique_lock<std::mutex> lg(profiler_dump_mutex);
profilingIndex = 0;
collectiveIndex = 0;
Expand All @@ -286,7 +425,7 @@ ncclResult_t ncclProfilerEnable() {
NCCL_API(ncclResult_t, ncclProfilerDisable);
ncclResult_t ncclProfilerDisable() {
// INFO(NCCL_ALL,"Disabling proxy profiler\n");
if (enable_profiler.exchange(false)) {
if (enable_profiler_trace.exchange(false)) {
return ncclSuccess;
}
return ncclInternalError;
Expand Down

0 comments on commit 48b4e97

Please sign in to comment.