From 6f517e510e1d977e5acb456ada169c4542a53acf Mon Sep 17 00:00:00 2001 From: Mac Prible Date: Mon, 9 Mar 2026 14:09:50 -0500 Subject: [PATCH] Parallelize FrameSource initialization with ThreadPoolExecutor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each FrameSource runs a keyframe scan on init, which is I/O-bound. With multiple cameras this was sequential, taking N × scan_time. Now all cameras initialize in parallel threads, reducing wall time to roughly the cost of the slowest single camera. --- .../core/process_synchronized_recording.py | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/caliscope/core/process_synchronized_recording.py b/src/caliscope/core/process_synchronized_recording.py index 360144b1..42a776a5 100644 --- a/src/caliscope/core/process_synchronized_recording.py +++ b/src/caliscope/core/process_synchronized_recording.py @@ -5,6 +5,7 @@ """ import logging +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from pathlib import Path from typing import Callable @@ -164,18 +165,28 @@ def get_initial_thumbnails( def _create_frame_sources(recording_dir: Path, cameras: dict[int, CameraData]) -> dict[int, FrameSource]: - """Create FrameSource for each camera cam_id.""" - sources: dict[int, FrameSource] = {} + """Create FrameSource for each camera cam_id. - for cam_id in cameras: + Each FrameSource runs a keyframe scan on init (I/O-bound), so cameras + are initialized in parallel threads. + """ + + def _init_one(cam_id: int) -> tuple[int, FrameSource | None]: try: - sources[cam_id] = FrameSource(recording_dir, cam_id) + return cam_id, FrameSource(recording_dir, cam_id) except FileNotFoundError: logger.warning(f"Video file not found for cam_id {cam_id}, skipping") + return cam_id, None except ValueError as e: logger.warning(f"Error opening video for cam_id {cam_id}: {e}") + return cam_id, None + + cam_ids = list(cameras.keys()) + + with ThreadPoolExecutor(max_workers=len(cam_ids)) as pool: + results = pool.map(_init_one, cam_ids) - return sources + return {cam_id: source for cam_id, source in results if source is not None} def _accumulate_points(