Concurrent HTTP download orchestration with async I/O
A Python library for managing multiple asynchronous HTTP downloads. Built on asyncio and aiohttp, it handles concurrency, tracks state, emits events, and lets you monitor progress.
pip install rheopyimport asyncio
from pathlib import Path
from rheo import DownloadManager
from rheo.domain import FileConfig
async def main():
files = [
FileConfig(url="https://example.com/file1.zip", priority=1),
FileConfig(url="https://example.com/file2.pdf", priority=2),
]
async with DownloadManager(download_dir=Path("./downloads"), max_concurrent=3) as manager:
await manager.add(files)
await manager.wait_until_complete()
print("All downloads complete!")
asyncio.run(main())- Concurrent downloads with worker pool
- Priority queue
- Selective cancellation (cancel individual downloads by ID)
- Hash validation (MD5, SHA256, SHA512)
- Retry logic with exponential backoff
- Real-time speed & ETA tracking
- File exists handling (skip, overwrite, or error)
- Event-driven architecture with typed
DownloadEventTypeandSubscriptionhandles (manager.on()returns a handle withunsubscribe()) - HTTP client abstraction (
BaseHttpClient, defaultAiohttpClient) - CLI tool (
rheo download) - Full type hints
# Basic download
rheo download https://example.com/file.zip
# With hash verification
rheo download https://example.com/file.zip --hash sha256:abc123...
# Custom output directory
rheo download https://example.com/file.zip -o /path/to/dirSee CLI Reference for complete command documentation.
- Full Documentation - Complete guide with detailed examples
- CLI Reference - Command-line interface
- Architecture - System design and patterns
- Contributing - Development setup and guidelines
- Roadmap - What's next
Subscribe to lifecycle events via manager.on(), which returns a Subscription handle:
from rheo.events import DownloadEventType, DownloadCompletedEvent
def on_completed(event: DownloadCompletedEvent) -> None:
print(f"done: {event.download_id}")
async with DownloadManager(download_dir=Path("./downloads")) as manager:
sub = manager.on(DownloadEventType.COMPLETED, on_completed)
await manager.add([FileConfig(url="https://example.com/file.zip")])
await manager.wait_until_complete()
# sub.unsubscribe() when no longer neededUse "*" to receive all events. Type-hint your handler for autocomplete on event fields.
Check examples/ for working code:
01_basic_download.py- Simple single file download02_multiple_with_priority.py- Multiple files with priorities03_hash_validation.py- File integrity verification04_progress_display.py- Real-time progress bar with speed/ETA05_event_logging.py- Lifecycle event debugging06_batch_summary.py- Batch download with summary report
Alpha - Core functionality works, but API may change before 1.0.
- Python: 3.11+
- License: MIT
Open an issue on GitHub or check the full documentation.