Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merging NCCL 2.21.5-1 #1271

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ext-net/example/nccl/net_device.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
#ifndef NET_DEVICE_H_
#define NET_DEVICE_H_

#include "net_device.h"

#define NCCL_NET_DEVICE_INVALID_VERSION 0x0
#define NCCL_NET_MTU_SIZE 4096

Expand Down
23 changes: 14 additions & 9 deletions ext-tuner/example/nccl/tuner.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ typedef struct {
const char* name;

// Initializes tuner states.
// nRanks: number of ranks in current communicator. Each communicator initialize its own tuner.
// nNodes: number of nodes in current communicator.
// logFunction: a logFunction can be useful to integrate logging together with NCCL core.
ncclResult_t (*init)(size_t nRanks, size_t nNodes, ncclDebugLogger_t logFunction);
// Inputs:
// - nRanks: number of ranks in current communicator. Each communicator initialize its own tuner.
// - nNodes: number of nodes in current communicator.
// - logFunction: a logFunction can be useful to integrate logging together with NCCL core.
// Outputs:
// - context: tuner context object
ncclResult_t (*init)(size_t nRanks, size_t nNodes, ncclDebugLogger_t logFunction, void **context);

// Gets info (algo, protocol, number of ctas and threads) for a given collective.
// Inputs:
// - context: tuner context object
// - collType: collective type , e.g., allreduce, allgather…
// - nBytes: collective size in bytes
// - collNetSupport: whether collnet supports this type
Expand All @@ -62,16 +66,17 @@ typedef struct {
// Also, the plugin is allowed to not set any output, or set only the
// algorithm and protocol, but not only the algorithm or only the protocol.
// Unset fields will be set automatically by NCCL.
ncclResult_t (*getCollInfo)(ncclFunc_t collType, size_t nBytes,
ncclResult_t (*getCollInfo)(void* context, ncclFunc_t collType, size_t nBytes,
int collNetSupport, int nvlsSupport, int numPipeOps,
int *algorithm, int *protocol, int* nChannels);

// Terminates the plugin and cleans up any resources that the plugin allocated.
ncclResult_t (*destroy)();
} ncclTuner_v1_t;
// context: tuner context object
ncclResult_t (*destroy)(void* context);
} ncclTuner_v2_t;

typedef ncclTuner_v1_t ncclTuner_t;
typedef ncclTuner_v2_t ncclTuner_t;

#define NCCL_TUNER_PLUGIN_SYMBOL "ncclTunerPlugin_v1"
#define NCCL_TUNER_PLUGIN_SYMBOL "ncclTunerPlugin_v2"

#endif
8 changes: 4 additions & 4 deletions ext-tuner/example/plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@

#define __hidden __attribute__ ((visibility("hidden")))

__hidden ncclResult_t pluginInit(size_t nRanks, size_t nNodes, ncclDebugLogger_t logFunction) { return ncclSuccess; }
__hidden ncclResult_t pluginInit(size_t nRanks, size_t nNodes, ncclDebugLogger_t logFunction, void **context) { return ncclSuccess; }

__hidden ncclResult_t pluginGetCollInfo(ncclFunc_t collType, size_t nBytes,
__hidden ncclResult_t pluginGetCollInfo(void* context, ncclFunc_t collType, size_t nBytes,
int collNetSupport, int nvlsSupport, int numPipeOps,
int *algorithm, int *protocol, int* nChannels) { *algorithm = NCCL_ALGO_RING; *protocol = NCCL_PROTO_SIMPLE; return ncclSuccess; }

__hidden ncclResult_t pluginDestroy() { return ncclSuccess; }
__hidden ncclResult_t pluginDestroy(void* context) { return ncclSuccess; }

#define PLUGIN_NAME "Example"

const ncclTuner_v1_t ncclTunerPlugin_v1 = {
const ncclTuner_v2_t ncclTunerPlugin_v2 = {
.name = PLUGIN_NAME,
.init = pluginInit,
.getCollInfo = pluginGetCollInfo,
Expand Down
2 changes: 1 addition & 1 deletion makefiles/version.mk
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
##### version
NCCL_MAJOR := 2
NCCL_MINOR := 20
NCCL_MINOR := 21
NCCL_PATCH := 5
NCCL_SUFFIX :=
PKG_REVISION := 1
251 changes: 148 additions & 103 deletions src/bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ static ncclResult_t bootstrapNetRecv(struct ncclSocket* sock, void* data, int si
NCCLCHECK(ncclSocketRecv(sock, data, std::min(recvSize, size)));
return ncclSuccess;
}
static ncclResult_t bootstrapNetSendRecv(struct ncclSocket* sendSock, void* sendData, int sendSize, struct ncclSocket* recvSock, void* recvData, int recvSize) {
int senderRecvSize;
NCCLCHECK(ncclSocketSendRecv(sendSock, &sendSize, sizeof(int), recvSock, &senderRecvSize, sizeof(int)));
if (senderRecvSize > recvSize) {
WARN("Message truncated : received %d bytes instead of %d", senderRecvSize, recvSize);
return ncclInternalError;
}
NCCLCHECK(ncclSocketSendRecv(sendSock, sendData, sendSize, recvSock, recvData, recvSize));
return ncclSuccess;
}

struct extInfo {
int rank;
Expand Down Expand Up @@ -395,103 +405,40 @@ ncclResult_t bootstrapSplit(struct ncclBootstrapHandle* handle, struct ncclComm*
goto exit;
}

ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
struct bootstrapState* state = (struct bootstrapState*)commState;
char* data = (char*)allData;
int rank = state->rank;
int nranks = state->nranks;

TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);
// Bootstrap send/receive functions
//
// We do not keep connections opened with all ranks at all times, and we have no guarantee
// that connections to our unique listen socket will arrive in the same order as we need
// them. Therefore, when establishing a connection, the sender sends a (peer, tag) tuple to
// allow the receiver to identify the flow, and keep it in an unexpected queue if needed.

/* Simple ring based AllGather
* At each step i receive data from (rank-i-1) from left
* and send previous step's data from (rank-i) to right
*/
for (int i=0; i<nranks-1; i++) {
size_t rslice = (rank - i - 1 + nranks) % nranks;
size_t sslice = (rank - i + nranks) % nranks;

// Send slice to the right
NCCLCHECK(bootstrapNetSend(&state->ringSendSocket, data+sslice*size, size));
// Recv slice from the left
NCCLCHECK(bootstrapNetRecv(&state->ringRecvSocket, data+rslice*size, size));
}
ncclResult_t bootstrapConnect(void* commState, int peer, int tag, struct ncclSocket* sock) {
ncclResult_t ret = ncclSuccess;
struct bootstrapState* state = (struct bootstrapState*)commState;

TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
NCCLCHECKGOTO(ncclSocketInit(sock, state->peerCommAddresses+peer, state->magic, ncclSocketTypeBootstrap), ret, fail);
NCCLCHECKGOTO(ncclSocketConnect(sock), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(sock, &state->rank, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(sock, &tag, sizeof(int)), ret, fail);
return ncclSuccess;
fail:
NCCLCHECK(ncclSocketClose(sock));
return ret;
}

ncclResult_t bootstrapSend(void* commState, int peer, int tag, void* data, int size) {
ncclResult_t ret = ncclSuccess;
struct bootstrapState* state = (struct bootstrapState*)commState;
struct ncclSocket sock;

NCCLCHECKGOTO(ncclSocketInit(&sock, state->peerCommAddresses+peer, state->magic, ncclSocketTypeBootstrap), ret, fail);
NCCLCHECKGOTO(ncclSocketConnect(&sock), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(&sock, &state->rank, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(&sock, &tag, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetSend(&sock, data, size), ret, fail);
TRACE(NCCL_BOOTSTRAP, "Sending to peer=%d tag=%d size=%d", peer, tag, size);
NCCLCHECK(bootstrapConnect(commState, peer, tag, &sock));
NCCLCHECKGOTO(bootstrapNetSend(&sock, data, size), ret, exit);

TRACE(NCCL_BOOTSTRAP, "Sent to peer=%d tag=%d size=%d", peer, tag, size);

exit:
NCCLCHECK(ncclSocketClose(&sock));
return ret;
fail:
goto exit;
}

ncclResult_t bootstrapBarrier(void* commState, int *ranks, int rank, int nranks, int tag) {
if (nranks == 1) return ncclSuccess;
TRACE(NCCL_INIT, "rank %d nranks %d tag %x - ENTER", rank, nranks, tag);

/* Simple intra process barrier
*
* Based on the dissemination algorithm by Debra Hensgen, Raphael Finkel, and Udi Manbet,
* "Two Algorithms for Barrier Synchronization," International Journal of Parallel Programming, 17(1):1-17, 1988"
*/
int data[1];
for (int mask=1; mask<nranks; mask<<=1) {
int src = (rank - mask + nranks) % nranks;
int dst = (rank + mask) % nranks;
NCCLCHECK(bootstrapSend(commState, ranks[dst], tag, data, sizeof(data)));
NCCLCHECK(bootstrapRecv(commState, ranks[src], tag, data, sizeof(data)));
}

TRACE(NCCL_INIT, "rank %d nranks %d tag %x - DONE", rank, nranks, tag);
return ncclSuccess;
}

ncclResult_t bootstrapIntraNodeAllGather(void* commState, int *ranks, int rank, int nranks, void* allData, int size) {
if (nranks == 1) return ncclSuccess;
char* data = (char*)allData;
TRACE(NCCL_INIT, "rank %d nranks %d size %d - ENTER", rank, nranks, size);

for (int i=1; i<nranks; i++) {
int src = (rank - i + nranks) % nranks;
int dst = (rank + i) % nranks;
NCCLCHECK(bootstrapSend(commState, ranks[dst], /*tag=*/i, data+rank*size, size));
NCCLCHECK(bootstrapRecv(commState, ranks[src], /*tag=*/i, data+src*size, size));
}

TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
return ncclSuccess;
}

// IntraNode in-place Broadcast
ncclResult_t bootstrapIntraNodeBroadcast(void* commState, int *ranks, int rank, int nranks, int root, void* bcastData, int size) {
if (nranks == 1) return ncclSuccess;
TRACE(NCCL_INIT, "rank %d nranks %d root %d size %d - ENTER", rank, nranks, root, size);

if (rank == root) {
for (int i=0; i<nranks; i++) {
if (i != root) NCCLCHECK(bootstrapSend(commState, ranks[i], /*tag=*/ranks[i], bcastData, size));
}
}
else {
NCCLCHECK(bootstrapRecv(commState, ranks[root], /*tag=*/ranks[rank], bcastData, size));
}

TRACE(NCCL_INIT, "rank %d nranks %d root %d size %d - DONE", rank, nranks, root, size);
return ncclSuccess;
}

ncclResult_t unexpectedEnqueue(struct bootstrapState* state, int peer, int tag, struct ncclSocket* sock) {
Expand Down Expand Up @@ -548,38 +495,136 @@ static void unexpectedFree(struct bootstrapState* state) {
}

// We can't know who we'll receive from, so we need to receive everything at once
ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size) {
ncclResult_t bootstrapAccept(void* commState, int peer, int tag, struct ncclSocket* sock) {
ncclResult_t ret = ncclSuccess;
struct bootstrapState* state = (struct bootstrapState*)commState;
struct ncclSocket sock;
int newPeer, newTag;

// Search unexpected connections first
int found;
NCCLCHECK(unexpectedDequeue(state, peer, tag, &sock, &found));
if (found) {
NCCLCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail);
goto exit;
}
NCCLCHECK(unexpectedDequeue(state, peer, tag, sock, &found));
if (found) return ncclSuccess;

// Then look for new connections
while (1) {
NCCLCHECKGOTO(ncclSocketInit(&sock), ret, fail);
NCCLCHECKGOTO(ncclSocketAccept(&sock, &state->listenSock), ret, fail);
NCCLCHECKGOTO(bootstrapNetRecv(&sock, &newPeer, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetRecv(&sock, &newTag, sizeof(int)), ret, fail);
if (newPeer == peer && newTag == tag) {
NCCLCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, fail);
goto exit;
}
// Unexpected connection. Save for later.
NCCLCHECKGOTO(unexpectedEnqueue(state, newPeer, newTag, &sock), ret, fail);
NCCLCHECKGOTO(ncclSocketInit(sock), ret, fail);
NCCLCHECKGOTO(ncclSocketAccept(sock, &state->listenSock), ret, fail);
NCCLCHECKGOTO(bootstrapNetRecv(sock, &newPeer, sizeof(int)), ret, fail);
NCCLCHECKGOTO(bootstrapNetRecv(sock, &newTag, sizeof(int)), ret, fail);
if (newPeer == peer && newTag == tag) return ncclSuccess;
NCCLCHECKGOTO(unexpectedEnqueue(state, newPeer, newTag, sock), ret, fail);
}
return ncclSuccess;
fail:
NCCLCHECK(ncclSocketClose(sock));
return ret;
}

// We can't know who we'll receive from, so we need to receive everything at once
ncclResult_t bootstrapRecv(void* commState, int peer, int tag, void* data, int size) {
ncclResult_t ret;
struct ncclSocket sock;
NCCLCHECK(bootstrapAccept(commState, peer, tag, &sock));
TRACE(NCCL_BOOTSTRAP, "Receiving tag=%d peer=%d size=%d", tag, peer, size);
NCCLCHECKGOTO(bootstrapNetRecv(&sock, ((char*)data), size), ret, exit);
exit:
NCCLCHECK(ncclSocketClose(&sock));
return ret;
fail:
goto exit;
}

// Collective algorithms, based on bootstrapSend/Recv, and sometimes bootstrapConnect/Accept

ncclResult_t bootstrapRingAllGather(struct ncclSocket* prevSocket, struct ncclSocket* nextSocket, int rank, int nranks, char* data, int size) {
/* Simple ring based AllGather
* At each step i receive data from (rank-i-1) from prev
* and send previous step's data from (rank-i) to next
*/
for (int i=0; i<nranks-1; i++) {
size_t rslice = (rank - i - 1 + nranks) % nranks;
size_t sslice = (rank - i + nranks) % nranks;

// Send slice to the right, recv slice from the left
NCCLCHECK(bootstrapNetSendRecv(nextSocket, data+sslice*size, size, prevSocket, data+rslice*size, size));
}
return ncclSuccess;
}
ncclResult_t bootstrapAllGather(void* commState, void* allData, int size) {
struct bootstrapState* state = (struct bootstrapState*)commState;
int rank = state->rank;
int nranks = state->nranks;

TRACE(NCCL_INIT, "rank %d nranks %d size %d", rank, nranks, size);

NCCLCHECK(bootstrapRingAllGather(&state->ringRecvSocket, &state->ringSendSocket, rank, nranks, (char*)allData, size));

TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
return ncclSuccess;
}

ncclResult_t bootstrapIntraNodeBarrier(void* commState, int *ranks, int rank, int nranks, int tag) {
if (nranks == 1) return ncclSuccess;
TRACE(NCCL_INIT, "rank %d nranks %d tag %x - ENTER", rank, nranks, tag);

/* Simple [intra] process barrier
*
* Based on the dissemination algorithm by Debra Hensgen, Raphael Finkel, and Udi Manbet,
* "Two Algorithms for Barrier Synchronization," International Journal of Parallel Programming, 17(1):1-17, 1988"
*/
int data[1];
for (int mask=1; mask<nranks; mask<<=1) {
int src = (rank - mask + nranks) % nranks;
int dst = (rank + mask) % nranks;
NCCLCHECK(bootstrapSend(commState, ranks ? ranks[dst] : dst, tag, data, sizeof(data)));
NCCLCHECK(bootstrapRecv(commState, ranks ? ranks[src] : src, tag, data, sizeof(data)));
}

TRACE(NCCL_INIT, "rank %d nranks %d tag %x - DONE", rank, nranks, tag);
return ncclSuccess;
}

ncclResult_t bootstrapBarrier(void* commState, int rank, int nranks, int tag) {
return bootstrapIntraNodeBarrier(commState, NULL, rank, nranks, tag);
}

ncclResult_t bootstrapIntraNodeAllGather(void* commState, int *ranks, int rank, int nranks, void* allData, int size) {
if (nranks == 1) return ncclSuccess;
TRACE(NCCL_INIT, "rank %d nranks %d size %d - ENTER", rank, nranks, size);

int prevRank = ranks[(rank - 1 + nranks)%nranks];
int nextRank = ranks[(rank + 1) % nranks];
struct ncclSocket prevSocket, nextSocket;
NCCLCHECK(bootstrapConnect(commState, nextRank, 0, &nextSocket));
NCCLCHECK(bootstrapAccept(commState, prevRank, 0, &prevSocket));

NCCLCHECK(bootstrapRingAllGather(&prevSocket, &nextSocket, rank, nranks, (char*)allData, size));

NCCLCHECK(ncclSocketClose(&nextSocket));
NCCLCHECK(ncclSocketClose(&prevSocket));

TRACE(NCCL_INIT, "rank %d nranks %d size %d - DONE", rank, nranks, size);
return ncclSuccess;
}

// [IntraNode] in-place Broadcast
ncclResult_t bootstrapIntraNodeBroadcast(void* commState, int *ranks, int rank, int nranks, int root, void* bcastData, int size) {
if (nranks == 1) return ncclSuccess;
TRACE(NCCL_INIT, "rank %d nranks %d root %d size %d - ENTER", rank, nranks, root, size);

if (rank == root) {
for (int i=0; i<nranks; i++) {
if (i != root) NCCLCHECK(bootstrapSend(commState, ranks ? ranks[i] : i, /*tag=*/ranks ? ranks[i] : i, bcastData, size));
}
}
else {
NCCLCHECK(bootstrapRecv(commState, ranks ? ranks[root] : root, /*tag=*/ranks ? ranks[rank] : rank, bcastData, size));
}

TRACE(NCCL_INIT, "rank %d nranks %d root %d size %d - DONE", rank, nranks, root, size);
return ncclSuccess;
}

ncclResult_t bootstrapBroadcast(void* commState, int rank, int nranks, int root, void* bcastData, int size) {
return bootstrapIntraNodeBroadcast(commState, NULL, rank, nranks, root, bcastData, size);
}

ncclResult_t bootstrapClose(void* commState) {
Expand Down
Loading