[PG] Enable asynchronous recovered-rank initialization with deferred join#1744
[PG] Enable asynchronous recovered-rank initialization with deferred join#1744
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the fault tolerance and elasticity of the Mooncake Process Group (PG) backend by introducing an asynchronous recovery mechanism. Previously, rank recovery caused disruptions to healthy ranks due to synchronous initialization requirements. The new approach allows recovered ranks to perform their intensive setup tasks independently before explicitly rejoining the active group, thereby minimizing performance impact on ongoing operations and improving the overall resilience of distributed training and inference systems. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a "deferred join" mechanism for Mooncake's distributed communication backend, allowing ranks to join a group in a local-only mode before fully integrating. This involves adding an active_ranks_mask parameter to various MooncakeEpBuffer synchronization methods (IB, RoCE, NVLink IPC) to optimize communication by skipping inactive ranks. The ConnectionContext and MooncakeBackend classes are updated with new state (isDummy_, localServerName_, connectionPollerRegistered_) and methods (bootstrapLocalPeer, setLocalOnlyActiveRanks, publishLocalPeerMetadata, waitForExtensionState, syncActiveRanksTensor, joinGroup) to manage this deferred join and rank recovery process. The ConnectionPoller's thread management is also improved. A new Python binding for join_group is added, and the Python mooncake_ep_buffer now passes the active_ranks_mask to the C++ backend. The test suite is updated with a new _deferred_recovery_worker and test_rank_recovery_deferred_join to validate this functionality. Review comments highlight a critical issue where the test_rank_recovery test will fail due to a NameError after the _recovery_worker function was renamed to _deferred_recovery_worker without updating the test itself. Additionally, there are two medium-severity improvement opportunities: one suggesting replacing a busy-wait loop with a blocking store->wait() call for efficiency in waitForExtensionState, and another recommending refactoring the repeated get_active_ranks call in mooncake_ep_buffer.py to avoid redundant distributed calls and code duplication.
|
|
||
| def _recovery_worker(rank, num_processes, signals): | ||
| """Worker for testing rank recovery.""" | ||
| def _deferred_recovery_worker(rank, num_processes, signals): |
There was a problem hiding this comment.
By renaming _recovery_worker to _deferred_recovery_worker, the existing test test_rank_recovery will fail with a NameError as it still tries to call the old function name _recovery_worker. Please update test_rank_recovery to call _deferred_recovery_worker if this is a replacement, or restore the original _recovery_worker if both are needed.
| while (true) { | ||
| if (meta_->store->check({task_count_key, active_ranks_key})) { | ||
| auto task_count_data = meta_->store->get(task_count_key); | ||
| std::string task_count(task_count_data.begin(), | ||
| task_count_data.end()); | ||
| meta_->taskCount = std::stoi(task_count); | ||
|
|
||
| auto active_ranks = meta_->store->get(active_ranks_key); | ||
| deserializeActiveRanks(active_ranks, meta_->activeRanks, | ||
| meta_->size); | ||
| syncActiveRanksTensor(); | ||
| return; | ||
| } | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(50)); | ||
| } |
There was a problem hiding this comment.
This while (true) loop with a sleep implements a busy-wait to check for keys in the store. This is inefficient and can consume unnecessary CPU cycles. The c10d::Store API provides a blocking wait() method that is more suitable for this purpose. Please consider using store->wait({task_count_key, active_ranks_key}) to wait for the keys to become available, which will be more efficient.
meta_->store->wait({task_count_key, active_ranks_key});
auto task_count_data = meta_->store->get(task_count_key);
std::string task_count(task_count_data.begin(),
task_count_data.end());
meta_->taskCount = std::stoi(task_count);
auto active_ranks = meta_->store->get(active_ranks_key);
deserializeActiveRanks(active_ranks, meta_->activeRanks,
meta_->size);
syncActiveRanksTensor();| from mooncake.ep import get_active_ranks | ||
| active_ranks_mask = get_active_ranks(self.backend).tolist() |
There was a problem hiding this comment.
The logic to get active_ranks_mask is repeated three times in this method. This is inefficient as it involves a distributed call (get_active_ranks) and duplicates code. It would be cleaner to fetch active_ranks_mask once at a higher scope within the connect method (e.g., before the if not self._use_fallback: block) and reuse the variable in all three places.
|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
Description
Motivation
During Elastic EP rank recovery, the recovered rank must re-run expensive initialization (especially CUDA graph capture).
Previously, this required the recovered and healthy ranks to enter the same communication phase, which paused healthy-rank inference and increased recovery disruption.
This change enables asynchronous recovered-rank initialization in Mooncake PG: recovered ranks can initialize in isolation first, then join the live process group after local recovery work is finished.
What this PR changes (Mooncake)
1. Deferred extension-group bootstrapping
joinGroup()2. Explicit deferred join API path
joinGroup()behavior for extension backends:This is the key hook that lets the recovered rank rejoin the healthy group only after local init completes.
Behavior impact
joinGroup().Module
mooncake-transfer-engine)mooncake-store)mooncake-ep)mooncake-integration)mooncake-p2p-store)mooncake-wheel)mooncake-pg)mooncake-rl)Type of Change
How Has This Been Tested?
In the latest code of sgl-project/sglang#15771
Checklist
./scripts/code_format.shbefore submitting.