Skip to content

Conversation

Ronbogo
Copy link
Contributor

@Ronbogo Ronbogo commented Sep 30, 2025

Overview

  1. Add single-control mode, enabled by setting single_rank_load=True. This feature avoid possible timeouts caused by severe imbalance in rollout results across trainer DP ranks when max_concurrent_rollouts and max_head_offpolicyness are large.

  2. Introduce DP load-balancing for single-control mode, inspired by verl's seqlen balancer. The algorithm actively rebalances workload among DP ranks, ensuring more even utilization and improving performance.

Co-authored-by: shun001 <[email protected]>
Co-authored-by: flemingpau <[email protected]>
Co-authored-by: Moocharr <[email protected]>
Co-authored-by: HUZZZW <[email protected]>
Co-authored-by: ChengQianqian <[email protected]>
Co-authored-by: zx506 <[email protected]>
Co-authored-by: Shengzhou Lyu <[email protected]>
Co-authored-by: flemingpau <[email protected]>
Co-authored-by: casparcwang <[email protected]>
Co-authored-by: dazhuangzhuang1024 <[email protected]>
Co-authored-by: HShan886 <[email protected]>
Copy link
Contributor

Summary of Changes

Hello @Ronbogo, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces significant enhancements to distributed training by adding a single-control mode and a sophisticated load-balancing mechanism. The single-control mode addresses potential timeouts in scenarios with high max_concurrent_rollouts and max_head_offpolicyness by streamlining rollout data management. Complementing this, the new load-balancing algorithm dynamically rebalances workloads across Data Parallel ranks based on sequence lengths, ensuring more uniform resource utilization and boosting overall performance. These changes provide greater stability and efficiency for large-scale distributed training setups.

Highlights

  • Single-Control Mode: Introduced a single_rank_load option to enable a single-control mode, preventing timeouts in distributed training by centralizing rollout data handling, especially when max_concurrent_rollouts and max_head_offpolicyness are high.
  • DP Load Balancing: Implemented a Data Parallel (DP) load-balancing algorithm, inspired by Verl's seqlen balancer, to actively rebalance workloads across DP ranks based on total tokens, ensuring more even utilization and improving performance.
  • API and Configuration Updates: Added single_rank_load and balance_batch parameters to DatasetConfig and updated workflow_api, sglang_remote, vllm_remote, and recover modules to integrate these new functionalities.
  • New Data Utility Functions: Introduced distributed_batch and balance_batch functions in areal/utils/data.py, along with supporting utilities like karmarkar_karp for efficient data distribution and re-padding.
  • Example Integration and Documentation: Updated the boba_grpo.py example to demonstrate the use of single-control mode and load balancing, and added comprehensive documentation for the new configuration parameters in docs/cli_reference.md.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a single-controller mode and a batch balancing feature to improve performance and avoid timeouts in cases of imbalanced rollouts. The core logic for batch balancing is implemented in areal/utils/data.py, and examples/math/boba_grpo.py is updated to demonstrate its usage. My review has identified a critical bug in the example's rollout logic, an incorrect type hint in the workflow API, and several medium-severity issues related to documentation clarity, code style, and redundancy. Addressing these points will improve the correctness and maintainability of the new features.

Comment on lines 234 to 243
try:
data = next(data_generator)
except StopIteration:
data_generator = iter(train_dataloader)
data = next(data_generator)
batch = rollout.rollout_batch(
data=data,
workflow=workflow,
should_accept=lambda sample: True,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a bug in the non-asynchronous rollout logic. The rollout.rollout_batch(...) call is inside the except StopIteration block. This means if next(data_generator) succeeds, batch will not be assigned and will remain None, causing issues later. The rollout_batch call should be moved outside the try-except block to ensure it's always executed.

Suggested change
try:
data = next(data_generator)
except StopIteration:
data_generator = iter(train_dataloader)
data = next(data_generator)
batch = rollout.rollout_batch(
data=data,
workflow=workflow,
should_accept=lambda sample: True,
)
try:
data = next(data_generator)
except StopIteration:
data_generator = iter(train_dataloader)
data = next(data_generator)
batch = rollout.rollout_batch(
data=data,
workflow=workflow,
should_accept=lambda sample: True,
)

def wait(self, count: int, timeout: float | None = None) -> Dict[str, Any]:
def wait(
self, count: int, timeout: float | None = None, single_rank_load: bool = False
) -> Dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The return type hint for the wait method is now incorrect. With the introduction of single_rank_load, the method can return either a Dict[str, Any] or a List[Dict[str, Any]]. The type hint should be updated to reflect this.

Suggested change
) -> Dict[str, Any]:
) -> Dict[str, Any] | List[Dict[str, Any]]:

Comment on lines 957 to 959
metadata={
"help": "balance all rollouts across dp ranks by total tokens.now, it works only when single_rank_load was set true."
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The help text for balance_batch is a bit unclear and contains a typo. It can be improved for better readability and correctness.

Suggested change
metadata={
"help": "balance all rollouts across dp ranks by total tokens.now, it works only when single_rank_load was set true."
},
metadata={
"help": "Balance all rollouts across DP ranks by total tokens. Note: this only works when `single_rank_load` is set to True."
},

Comment on lines +129 to +133
print(
f"-----cv_original {cv_original} total_tokens_per_rank_original {total_tokens_per_rank_original}"
)
print(f"-----cv_balance {cv_balance} total_tokens_per_rank {total_tokens_per_rank}")
print("*****************************************************")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The print statements in test_balance_batch appear to be for debugging. They should be removed from the final test code to keep the test output clean. If this information is valuable for debugging in CI, consider using the logging module instead.

balance_batch_enabled,
):
"""
Broadcast data when using signle controller (single_rank_load==True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a typo in the docstring for distributed_batch. "signle" should be "single".

Suggested change
Broadcast data when using signle controller (single_rank_load==True)
Broadcast data when using single controller (single_rank_load==True)



def karmarkar_karp(seqlen_list: List[int], k_partitions: int, equal_size: bool):
import heapq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The heapq module is imported inside the karmarkar_karp function. As a standard library module, it should be imported at the top of the file for better code organization and to avoid repeated imports.

Comment on lines +399 to +400
def spread(self):
return self.sets[0].sum - self.sets[-1].sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The State class has both a spread method and a spread property that perform the exact same calculation. The method is redundant and should be removed to avoid code duplication.

| `num_workers` | integer | `0` | Number of worker processes for data loading |
| `drop_last` | boolean | `True` | Drop the last incomplete batch |
| `single_rank_load` | boolean | `False` | Use single rank rollout send/recive or not |
| `balance_batch` | boolean | `False` | balance all rollouts across dp ranks by total tokens.now, it works only when single_rank_load was set true. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The description for balance_batch has some grammatical issues and can be made clearer for better user understanding.

Suggested change
| `balance_batch` | boolean | `False` | balance all rollouts across dp ranks by total tokens.now, it works only when single_rank_load was set true. |
| `balance_batch` | boolean | `False` | balance all rollouts across dp ranks by total tokens.now, it works only when single_rank_load is set true. |

Comment on lines +763 to +779
| `model` | string | `""` | - |
| `seed` | integer | `1` | - |
| `skip_tokenizer_init` | boolean | `False` | - |
| `enforce_eager` | boolean | `True` | - |
| `dtype` | string | `"bfloat16"` | - |
| `max_num_seqs` | integer | `256` | - |
| `block_size` | integer | `16` | - |
| `swap_space` | integer | `4` | - |
| `cpu_offload_gb` | float | `0` | - |
| `max_seq_len_to_capture` | integer | `32768` | - |
| `disable_sliding_window` | boolean | `True` | - |
| `max_model_len` | integer \| None | `32768` | - |
| `enable_chunked_prefill` | boolean | `False` | - |
| `enable_prefix_caching` | boolean | `False` | - |
| `gpu_memory_utilization` | float | `0.9` | - |
| `worker_extension_cls` | string | `"areal.thirdparty.vllm.vllm_worker_extension.VLLMWorkerExtension"` | - |
| `enable_sleep_mode` | boolean | `False` | - |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new vLLM Configuration section is missing descriptions for most of its parameters. Please add descriptions to make this documentation more helpful for users.

config.rollout.consumer_batch_size *= world_size
config.rollout.max_concurrent_rollouts *= world_size
else:
# Create empty datqaloader for other ranks when using single rank load
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's a typo in the comment: "datqaloader" should be "dataloader".

Suggested change
# Create empty datqaloader for other ranks when using single rank load
# Create empty dataloader for other ranks when using single rank load

@garrett4wade
Copy link
Collaborator

@Ronbogo Hi, thanks for this important feature! FYI we've planned publish a new release v0.3.4 by 10.10 or some near day, and we'd like to merge this feature after that. :)

@dazhuangzhuang1024
Copy link
Contributor

@Ronbogo Hi, thanks for this important feature! FYI we've planned publish a new release v0.3.4 by 10.10 or some near day, and we'd like to merge this feature after that. :)

OK

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants