Skip to content

fix: Gracefully shutdown multiprocessing.Manager and fix proxy resource leaks#83

Open
Leahlijuan wants to merge 6 commits intomainfrom
fix/cleanup
Open

fix: Gracefully shutdown multiprocessing.Manager and fix proxy resource leaks#83
Leahlijuan wants to merge 6 commits intomainfrom
fix/cleanup

Conversation

@Leahlijuan
Copy link
Copy Markdown
Collaborator

This change resolves a "Finalize object, dead" error and FileNotFoundError that occurred during the teardown phase of training, particularly after many steps.

  • Add teardown method to save_strategy, checkpoint_io, memory_storage_writer
  • Reused _write_events_per_checkpoint_id and _write_results_per_checkpoint_id proxy dictionaries across MemoryStorageWriter
  • Added explicit pop() for Event objects in MemoryStorageWriter.finish_checkpoint to allow them to be properly garbage collected.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses critical resource management issues that led to 'Finalize object, dead' errors and FileNotFoundError during the teardown phase of training. It introduces a structured and propagated teardown mechanism across key checkpointing components and optimizes MemoryStorageWriter to prevent multiprocessing.Manager proxy resource leaks. These changes aim to significantly improve the robustness and stability of the training process, especially after many steps, by ensuring all resources are properly released.

Highlights

  • Graceful Resource Teardown: Introduced teardown methods across key checkpointing components, including save_strategy, checkpoint_io, and memory_storage_writer, to ensure the graceful shutdown of multiprocessing.Manager instances and associated resources, resolving 'Finalize object, dead' errors.
  • Proxy Object Reuse and Leak Prevention: Implemented the reuse of multiprocessing.Manager proxy dictionaries (_write_events_per_checkpoint_id and _write_results_per_checkpoint_id) in MemoryStorageWriter during re-initialization, preventing the creation of new proxy objects and mitigating resource leaks and FileNotFoundError.
  • Explicit Event Cleanup: Added explicit pop() calls for Event objects in MemoryStorageWriter.finish_checkpoint to ensure they are properly garbage collected, further preventing resource accumulation.
  • Enhanced Test Coverage: Added comprehensive unit tests for the new teardown functionality across various components and enhanced existing tests to verify the correct reuse of proxy objects.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • src/ml_flashpoint/adapter/megatron/save_strategies.py
    • Reused proxy objects from the old StorageWriter when re-initializing MemoryStorageWriter to prevent resource leaks.
    • Added a teardown method to ensure the underlying StorageWriter is properly shut down.
  • src/ml_flashpoint/adapter/nemo/checkpoint_io.py
    • Implemented a teardown method in MLFlashpointCheckpointIO to propagate cleanup calls to its save_strategy and fallback_checkpoint_io.
    • Updated MLFlashpointAsyncFinalizableCheckpointIO's teardown method to also call teardown on its wrapped mlf_checkpoint_io.
  • src/ml_flashpoint/adapter/nemo/wrapper_util.py
    • Ensured CheckpointObjectManager uses a default buffer size if initial_write_buffer_size_bytes is not provided.
  • src/ml_flashpoint/adapter/pytorch/memory_storage_writer.py
    • Added explicit pop() for event proxies in finish_checkpoint to facilitate garbage collection.
    • Introduced a teardown method to gracefully shut down the multiprocessing.Manager instance.
  • tests/adapter/megatron/test_save_strategies.py
    • Added a test for the teardown method of MLFlashpointMegatronAsyncSaveStrategy.
    • Enhanced an existing test to verify the reuse of proxy objects during MemoryStorageWriter re-initialization.
  • tests/adapter/nemo/test_checkpoint_io.py
    • Added tests to confirm that teardown methods in MLFlashpointCheckpointIO and MLFlashpointAsyncFinalizableCheckpointIO correctly propagate cleanup calls.
  • tests/adapter/pytorch/test_memory_storage_writer.py
    • Added a test for the MemoryStorageWriter's teardown method, including idempotency.
    • Updated existing tests to assert that event proxies are properly cleared after checkpoint finalization.
Activity
  • No specific activity (comments, reviews, progress updates) has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Leahlijuan Leahlijuan requested review from g-husam and kkkapu March 13, 2026 18:24
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces teardown methods across several classes to gracefully shut down multiprocessing resources, addressing resource leaks by reusing proxy objects from the multiprocessing manager. The changes are logical and well-supported by new and updated tests. My review includes a few suggestions to enhance code clarity and robustness, primarily by making attribute existence checks more explicit and readable.

Comment on lines 146 to 150
self._storage_writer = MemoryStorageWriter(
checkpoint_saver=self._checkpoint_saver,
mp_manager_future=self._storage_writer._main_process_torchmp_manager_future,
thread_count=self._storage_writer._thread_count,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For improved readability, consider using the old_storage_writer variable when creating the new MemoryStorageWriter instance. While the current code is functionally correct because self._storage_writer still refers to the old instance at that point, explicitly using old_storage_writer makes the intent clearer and less prone to misinterpretation during future maintenance.

Suggested change
self._storage_writer = MemoryStorageWriter(
checkpoint_saver=self._checkpoint_saver,
mp_manager_future=self._storage_writer._main_process_torchmp_manager_future,
thread_count=self._storage_writer._thread_count,
)
self._storage_writer = MemoryStorageWriter(
checkpoint_saver=self._checkpoint_saver,
mp_manager_future=old_storage_writer._main_process_torchmp_manager_future,
thread_count=old_storage_writer._thread_count,
)

self.fallback_checkpoint_io.remove_checkpoint(path)

@log_execution_time(logger=_LOGGER, name="MLFlashpointCheckpointIO.teardown", level=logging.INFO)
def teardown(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this invoked btw?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inside the teardown of MLFlashpointAsyncFinalizableCheckpointIO

try:
manager = self._main_process_torchmp_manager_future.result(timeout=1.0)
_LOGGER.info("Shutting down torch_mp Manager...")
manager.shutdown()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the call above times out, this will fail right? what value will manager hold

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it timeout, will get timeout exception and caught by line 403, then the manager.shutdown() would be skipped

@Leahlijuan Leahlijuan requested a review from g-husam March 13, 2026 20:46
@github-actions
Copy link
Copy Markdown

Python Code Coverage Summary

Code Coverage

Package Line Rate Branch Rate Health
src.ml_flashpoint 100% 100%
src.ml_flashpoint.adapter 100% 100%
src.ml_flashpoint.adapter.megatron 97% 92%
src.ml_flashpoint.adapter.nemo 98% 91%
src.ml_flashpoint.adapter.pytorch 98% 92%
src.ml_flashpoint.checkpoint_object_manager 93% 93%
src.ml_flashpoint.core 95% 92%
src.ml_flashpoint.replication 81% 81%
Summary 95% (2357 / 2488) 91% (554 / 610)

Minimum allowed line rate is 90%

@github-actions
Copy link
Copy Markdown

C++ Code Coverage Summary

Code Coverage

Package Line Rate Branch Rate Health
src.ml_flashpoint.checkpoint_object_manager.buffer_object 93% 54%
src.ml_flashpoint.checkpoint_object_manager.object_manager 70% 37%
src.ml_flashpoint.replication.transfer_service 79% 40%
Summary 81% (916 / 1126) 43% (687 / 1604)

Minimum allowed line rate is 80%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants