Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/rccl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ if (ENABLE_MSCCLPP AND ROCM_VERSION VERSION_LESS "60200")
endif()

## Disable WARP_SPEED if the build environment is invalid
set(WARP_SPEED_SUPPORTED_ARCHS "gfx942" "gfx942:xnack-" "gfx942:xnack+" "gfx950" "gfx950:xnack-" "gfx950:xnack+")
set(WARP_SPEED_SUPPORTED_ARCHS "gfx950" "gfx950:xnack-" "gfx950:xnack+")
set(ARCH_MATCH_FOUND OFF)
foreach(ARCH IN LISTS GPU_TARGETS)
if(ARCH IN_LIST WARP_SPEED_SUPPORTED_ARCHS)
Expand Down
63 changes: 41 additions & 22 deletions projects/rccl/src/enqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1759,13 +1759,18 @@ ncclResult_t ncclLaunchPrepare(struct ncclComm* comm) {
cudaStream_t deviceStream, launchOrder;
NCCLCHECKGOTO(ncclStrongStreamAcquire(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream), result, failure);

// userStream[0] waits on each userStream[i]...
for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) {
CUDACHECKGOTO(cudaEventRecord(comm->sharedRes->scratchEvent, l->stream), result, failure);
CUDACHECKGOTO(cudaStreamWaitEvent(launchStream, comm->sharedRes->scratchEvent, 0), result, failure);
if (persistent || planner->numStreams != 1) {
// userStream[0] waits on each userStream[i]...
for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) {
CUDACHECKGOTO(cudaEventRecord(comm->sharedRes->scratchEvent, l->stream), result, failure);
CUDACHECKGOTO(cudaStreamWaitEvent(launchStream, comm->sharedRes->scratchEvent, 0), result, failure);
}
// userStream[0] waits on deviceStream
NCCLCHECKGOTO(ncclStreamWaitStream(launchStream, deviceStream, comm->sharedRes->scratchEvent), result, failure);
} else if (planner->streams->stream != comm->lastStream && comm->lastStream != nullptr && !persistent) {
// Stream changed from last call, create dependency against last NCCL kernel launch
CUDACHECKGOTO(hipStreamWaitEvent(planner->streams->stream, comm->doneEvent, 0), result, failure);
}
// userStream[0] waits on deviceStream
NCCLCHECKGOTO(ncclStreamWaitStream(launchStream, deviceStream, comm->sharedRes->scratchEvent), result, failure);

bool capturing = ncclCudaGraphValid(planner->capturingGraph);
enum ncclImplicitOrder implicitOrder;
Expand Down Expand Up @@ -1847,6 +1852,17 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan
void* extra[] = {plan->kernelArgs, &plan->kernelArgsSize};

auto event = latency_profiler::collTraceAquireEventBaseline(plan, launchStream);
if (planner->numStreams == 1 && !plan->persistent) {
latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get());
comm->lastStream = planner->streams->stream;
CUDACHECKGOTO(hipExtLaunchKernel(plan->kernelFn, grid, block, extra, 0, launchStream, NULL, comm->doneEvent, 0), ret, do_return);

latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event));
return ncclSuccess;
}

// CUfunction fn;
// CUDACHECK(cudaGetFuncBySymbol(&fn, sym));

#if !defined(__HIP_PLATFORM_AMD__) || !defined(__HIPCC__)
int driverVersion;
Expand Down Expand Up @@ -1981,6 +1997,7 @@ ncclResult_t ncclLaunchFinish(struct ncclComm* comm) {
// back to us for reclaiming via callbackQueue.
ncclIntruQueueConstruct(&planner->planQueue);

bool capturing = ncclCudaGraphValid(planner->capturingGraph);
cudaStream_t launchStream = planner->streams->stream; // First user stream gets launch
cudaStream_t deviceStream, launchOrder;
cudaEvent_t finishedEvent = comm->sharedRes->scratchEvent;
Expand All @@ -1997,24 +2014,25 @@ ncclResult_t ncclLaunchFinish(struct ncclComm* comm) {
CUDACHECK(cudaEventCreateWithFlags(&comm->sharedRes->scratchEvent, cudaEventDisableTiming));
}

CUDACHECK(cudaEventRecord(finishedEvent, launchStream));
// deviceStream waits on userStream[0]
NCCLCHECK(ncclStrongStreamAcquiredWorkStream(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream));
if (capturing || planner->numStreams != 1 || ncclParamLaunchOrderImplicit()) {
CUDACHECK(cudaEventRecord(finishedEvent, launchStream));
// deviceStream waits on userStream[0]
NCCLCHECK(ncclStrongStreamAcquiredWorkStream(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream));

// We know that deviceStream is strictly behind the launchStream because launchStream
// synced with it before kernel launch. This allows us to to see deviceStream waiting
// on launchStream as a fast-forward. When building CUDA graphs fast forwards should
// be handled specially so as not to create graphs with a blowup in the number of edges.
// So we could do this:
// CUDACHECK(cudaStreamWaitEvent(deviceStream, finishedEvent, 0));
// But instead we do:
NCCLCHECK(ncclStreamAdvanceToEvent(planner->capturingGraph, deviceStream, finishedEvent));
// We know that deviceStream is strictly behind the launchStream because launchStream
// synced with it before kernel launch. This allows us to to see deviceStream waiting
// on launchStream as a fast-forward. When building CUDA graphs fast forwards should
// be handled specially so as not to create graphs with a blowup in the number of edges.
// So we could do this:
// CUDACHECK(cudaStreamWaitEvent(deviceStream, finishedEvent, 0));
// But instead we do:
NCCLCHECK(ncclStreamAdvanceToEvent(planner->capturingGraph, deviceStream, finishedEvent));

// Each userStream[i] waits on userStream[0]
for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) {
CUDACHECK(cudaStreamWaitEvent(l->stream, finishedEvent, 0));
// Each userStream[i] waits on userStream[0]
for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) {
CUDACHECK(cudaStreamWaitEvent(l->stream, finishedEvent, 0));
}
}
bool capturing = ncclCudaGraphValid(planner->capturingGraph);
enum ncclImplicitOrder implicitOrder;
NCCLCHECK(getImplicitOrder(&implicitOrder, capturing));
if (implicitOrder != ncclImplicitOrderNone) {
Expand Down Expand Up @@ -2762,6 +2780,7 @@ static ncclResult_t ncclPlannerSetCapturingGraph(struct ncclComm* comm, struct n
l->stream = info->stream;
l->next = planner->streams;
planner->streams = l;
planner->numStreams++;
break;
}
if (l->stream == info->stream)
Expand Down Expand Up @@ -2828,7 +2847,7 @@ static ncclResult_t p2pTaskAppend(
: comm->p2pSchedule[round].recvRank)) {
round += 1;
}
uint8_t base = ncclP2pChannelBaseForRound(comm, round);
uint8_t base = ncclP2pChannelBaseForRound(comm, round, rcclParamP2pBatchEnable());
for (int c=0; c < comm->p2pnChannelsPerPeer; c++) {
int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, c, comm->p2pnChannelsPerPeer, comm->nNodes);
if (isSendNotRecv) {
Expand Down
2 changes: 2 additions & 0 deletions projects/rccl/src/include/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ struct ncclKernelPlanner {
bool persistent;
// The list of user streams aggregated over all tasks present.
struct ncclCudaStreamList* streams;
// Keep track of the number of user streams
int numStreams;
// The most recent user stream. Ignored if streams==nullptr
cudaStream_t streamRecent;
// The graph capturing all user streams or invalid if none. Thus we restrict the
Expand Down
Loading