diff --git a/projects/rccl/CMakeLists.txt b/projects/rccl/CMakeLists.txt index 653feafdeeb..d4c4caf1d0e 100644 --- a/projects/rccl/CMakeLists.txt +++ b/projects/rccl/CMakeLists.txt @@ -400,7 +400,7 @@ if (ENABLE_MSCCLPP AND ROCM_VERSION VERSION_LESS "60200") endif() ## Disable WARP_SPEED if the build environment is invalid -set(WARP_SPEED_SUPPORTED_ARCHS "gfx942" "gfx942:xnack-" "gfx942:xnack+" "gfx950" "gfx950:xnack-" "gfx950:xnack+") +set(WARP_SPEED_SUPPORTED_ARCHS "gfx950" "gfx950:xnack-" "gfx950:xnack+") set(ARCH_MATCH_FOUND OFF) foreach(ARCH IN LISTS GPU_TARGETS) if(ARCH IN_LIST WARP_SPEED_SUPPORTED_ARCHS) diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index 29e46b0dd62..d05a8a2a9d7 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -1759,13 +1759,18 @@ ncclResult_t ncclLaunchPrepare(struct ncclComm* comm) { cudaStream_t deviceStream, launchOrder; NCCLCHECKGOTO(ncclStrongStreamAcquire(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream), result, failure); - // userStream[0] waits on each userStream[i]... - for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) { - CUDACHECKGOTO(cudaEventRecord(comm->sharedRes->scratchEvent, l->stream), result, failure); - CUDACHECKGOTO(cudaStreamWaitEvent(launchStream, comm->sharedRes->scratchEvent, 0), result, failure); + if (persistent || planner->numStreams != 1) { + // userStream[0] waits on each userStream[i]... + for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) { + CUDACHECKGOTO(cudaEventRecord(comm->sharedRes->scratchEvent, l->stream), result, failure); + CUDACHECKGOTO(cudaStreamWaitEvent(launchStream, comm->sharedRes->scratchEvent, 0), result, failure); + } + // userStream[0] waits on deviceStream + NCCLCHECKGOTO(ncclStreamWaitStream(launchStream, deviceStream, comm->sharedRes->scratchEvent), result, failure); + } else if (planner->streams->stream != comm->lastStream && comm->lastStream != nullptr && !persistent) { + // Stream changed from last call, create dependency against last NCCL kernel launch + CUDACHECKGOTO(hipStreamWaitEvent(planner->streams->stream, comm->doneEvent, 0), result, failure); } - // userStream[0] waits on deviceStream - NCCLCHECKGOTO(ncclStreamWaitStream(launchStream, deviceStream, comm->sharedRes->scratchEvent), result, failure); bool capturing = ncclCudaGraphValid(planner->capturingGraph); enum ncclImplicitOrder implicitOrder; @@ -1847,6 +1852,17 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan void* extra[] = {plan->kernelArgs, &plan->kernelArgsSize}; auto event = latency_profiler::collTraceAquireEventBaseline(plan, launchStream); + if (planner->numStreams == 1 && !plan->persistent) { + latency_profiler::collTraceRecordStartEvent(comm, launchStream, event.get()); + comm->lastStream = planner->streams->stream; + CUDACHECKGOTO(hipExtLaunchKernel(plan->kernelFn, grid, block, extra, 0, launchStream, NULL, comm->doneEvent, 0), ret, do_return); + + latency_profiler::collTraceRecordEndEvent(comm, plan, launchStream, std::move(event)); + return ncclSuccess; + } + + // CUfunction fn; + // CUDACHECK(cudaGetFuncBySymbol(&fn, sym)); #if !defined(__HIP_PLATFORM_AMD__) || !defined(__HIPCC__) int driverVersion; @@ -1981,6 +1997,7 @@ ncclResult_t ncclLaunchFinish(struct ncclComm* comm) { // back to us for reclaiming via callbackQueue. ncclIntruQueueConstruct(&planner->planQueue); + bool capturing = ncclCudaGraphValid(planner->capturingGraph); cudaStream_t launchStream = planner->streams->stream; // First user stream gets launch cudaStream_t deviceStream, launchOrder; cudaEvent_t finishedEvent = comm->sharedRes->scratchEvent; @@ -1997,24 +2014,25 @@ ncclResult_t ncclLaunchFinish(struct ncclComm* comm) { CUDACHECK(cudaEventCreateWithFlags(&comm->sharedRes->scratchEvent, cudaEventDisableTiming)); } - CUDACHECK(cudaEventRecord(finishedEvent, launchStream)); - // deviceStream waits on userStream[0] - NCCLCHECK(ncclStrongStreamAcquiredWorkStream(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream)); + if (capturing || planner->numStreams != 1 || ncclParamLaunchOrderImplicit()) { + CUDACHECK(cudaEventRecord(finishedEvent, launchStream)); + // deviceStream waits on userStream[0] + NCCLCHECK(ncclStrongStreamAcquiredWorkStream(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream)); - // We know that deviceStream is strictly behind the launchStream because launchStream - // synced with it before kernel launch. This allows us to to see deviceStream waiting - // on launchStream as a fast-forward. When building CUDA graphs fast forwards should - // be handled specially so as not to create graphs with a blowup in the number of edges. - // So we could do this: - // CUDACHECK(cudaStreamWaitEvent(deviceStream, finishedEvent, 0)); - // But instead we do: - NCCLCHECK(ncclStreamAdvanceToEvent(planner->capturingGraph, deviceStream, finishedEvent)); + // We know that deviceStream is strictly behind the launchStream because launchStream + // synced with it before kernel launch. This allows us to to see deviceStream waiting + // on launchStream as a fast-forward. When building CUDA graphs fast forwards should + // be handled specially so as not to create graphs with a blowup in the number of edges. + // So we could do this: + // CUDACHECK(cudaStreamWaitEvent(deviceStream, finishedEvent, 0)); + // But instead we do: + NCCLCHECK(ncclStreamAdvanceToEvent(planner->capturingGraph, deviceStream, finishedEvent)); - // Each userStream[i] waits on userStream[0] - for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) { - CUDACHECK(cudaStreamWaitEvent(l->stream, finishedEvent, 0)); + // Each userStream[i] waits on userStream[0] + for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) { + CUDACHECK(cudaStreamWaitEvent(l->stream, finishedEvent, 0)); + } } - bool capturing = ncclCudaGraphValid(planner->capturingGraph); enum ncclImplicitOrder implicitOrder; NCCLCHECK(getImplicitOrder(&implicitOrder, capturing)); if (implicitOrder != ncclImplicitOrderNone) { @@ -2762,6 +2780,7 @@ static ncclResult_t ncclPlannerSetCapturingGraph(struct ncclComm* comm, struct n l->stream = info->stream; l->next = planner->streams; planner->streams = l; + planner->numStreams++; break; } if (l->stream == info->stream) @@ -2828,7 +2847,7 @@ static ncclResult_t p2pTaskAppend( : comm->p2pSchedule[round].recvRank)) { round += 1; } - uint8_t base = ncclP2pChannelBaseForRound(comm, round); + uint8_t base = ncclP2pChannelBaseForRound(comm, round, rcclParamP2pBatchEnable()); for (int c=0; c < comm->p2pnChannelsPerPeer; c++) { int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, c, comm->p2pnChannelsPerPeer, comm->nNodes); if (isSendNotRecv) { diff --git a/projects/rccl/src/include/comm.h b/projects/rccl/src/include/comm.h index b42b7598827..e38ab3df4d7 100644 --- a/projects/rccl/src/include/comm.h +++ b/projects/rccl/src/include/comm.h @@ -400,6 +400,8 @@ struct ncclKernelPlanner { bool persistent; // The list of user streams aggregated over all tasks present. struct ncclCudaStreamList* streams; + // Keep track of the number of user streams + int numStreams; // The most recent user stream. Ignored if streams==nullptr cudaStream_t streamRecent; // The graph capturing all user streams or invalid if none. Thus we restrict the