diff --git a/CHANGELOG.MD b/CHANGELOG.MD index e8b17c1..143ee41 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0). +## [2.6.41] - 2026-04-26 + +### Security + +- **Bump `python-dotenv` from 1.0.0 to 1.2.2** to resolve CVE-2026-28684 (arbitrary file overwrite via symlink follow). MEDIUM severity. The pinned 1.0.0 had no fix available; 1.2.2 is the first release with the patch. + +### Fixed + +- **Single-file rescan UI flips to "done" then resumes minutes later**. The Flask route created a `ScanState` row keyed on its generated `scan_id`, then the Celery worker's `scan_service.scan_single_file` created a *second* `ScanState` with a different `scan_id` and tracked progress on that one. The UI's progress monitor lost track between the two rows and reported the scan complete before the worker had even started hashing the file. `scan_single_file` now accepts an optional `scan_id` and reuses the existing row when one matches; the Celery `scan_media_task` passes `scan_id` through for `scan_type='single'`. +- **Silent first-attempt failures on Celery worker scans** caused by post-fork PostgreSQL connection sharing. Symptom in logs: `psycopg2.DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq` on one worker, surfacing as a bare `NotImplementedError` from `sqlalchemy/engine/result.py:_indexes_for_keys` in a sibling worker. Fixed by disposing the SQLAlchemy engine in the `worker_process_init` Celery signal so each forked child builds its own connection pool. The existing log-handler setup in that signal has been merged into a single `_setup_worker_process` handler. +- **Scan progress flickers to "failed" during transient Celery retries**. `scan_media_task`'s catch-all exception handler used to set `phase='failed'` and `is_active=False` on every error, including ones that were about to be retried. The handler now keeps the row active (`phase='initializing'` with a "Retrying after error" progress message) when more retries remain, and only marks the scan failed once the retry budget is exhausted or the error is the known-fatal `PGRES_TUPLES_OK` connection-corruption case. + ## [2.6.40] - 2026-04-20 ### Security diff --git a/pixelprobe/celery_config.py b/pixelprobe/celery_config.py index 2a6b5db..280fb6e 100644 --- a/pixelprobe/celery_config.py +++ b/pixelprobe/celery_config.py @@ -157,16 +157,25 @@ def init_celery(app, celery): @worker_process_init.connect -def _setup_db_log_handler_in_worker(**kwargs): - """Attach DatabaseLogHandler in each forked Celery worker child process. - - Threads don't survive fork(), so the handler set up in the parent process - has a dead _writer_thread in children. This signal fires once per child - and creates a fresh handler with its own background writer thread. +def _setup_worker_process(**kwargs): + """Initialize each forked Celery worker child process. + + 1. Dispose the inherited SQLAlchemy engine so each child builds a fresh + connection pool. Without this, child processes share libpq sockets + with the parent, which surfaces as "PGRES_TUPLES_OK and no message + from the libpq" - a NotImplementedError when concurrent SQLAlchemy + queries try to read a row whose cursor was torn out from under them. + 2. Attach a fresh DatabaseLogHandler. The handler set up in the parent + process has a dead _writer_thread because threads don't survive + fork(); this signal fires once per child and replaces it. """ from app import app + from pixelprobe.models import db from pixelprobe.utils.log_handler import DatabaseLogHandler + with app.app_context(): + db.engine.dispose() + handler = DatabaseLogHandler(app) handler.setLevel(logging.INFO) logging.getLogger().addHandler(handler) diff --git a/pixelprobe/services/scan_service.py b/pixelprobe/services/scan_service.py index 62dae5d..7c8c516 100644 --- a/pixelprobe/services/scan_service.py +++ b/pixelprobe/services/scan_service.py @@ -12,6 +12,7 @@ from typing import List, Dict, Optional, Tuple from flask import current_app +from pixelprobe.constants import SCAN_PHASES from pixelprobe.media_checker import PixelProbe, load_exclusions, load_exclusions_with_patterns from pixelprobe.models import db, ScanResult, ScanState, ScanReport, ScanChunk from pixelprobe.utils.helpers import ProgressTracker @@ -92,8 +93,16 @@ def update_progress(self, current: int, total: int, file_path: str, status: str) except Exception as e: logger.debug(f"Failed to update Redis progress: {e}") - def scan_single_file(self, file_path: str, force_rescan: bool = False) -> Dict: - """Scan a single file""" + def scan_single_file(self, file_path: str, force_rescan: bool = False, + scan_id: Optional[str] = None) -> Dict: + """Scan a single file. + + When ``scan_id`` is provided (e.g., the API route created a ScanState + before queueing the Celery task), reuse that row so the UI tracks one + continuous scan from queued through completed. Without this, a second + ScanState is created here and the UI's progress monitor briefly sees + no active scan and flips to "done" before the new row appears. + """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") @@ -104,13 +113,29 @@ def scan_single_file(self, file_path: str, force_rescan: bool = False) -> Dict: self.update_progress(0, 1, file_path, 'scanning') self.scan_cancelled = False - # Create ScanState record for UI progress tracking - scan_state = ScanState.create_new_scan() - scan_state.start_scan([file_path], force_rescan) - scan_state.phase = 'initializing' + scan_state = None + if scan_id: + scan_state = ScanState.query.filter_by(scan_id=scan_id).first() + + if scan_state is None: + scan_state = ScanState.create_new_scan(scan_id=scan_id) + + # Apply single-file initialization fields directly. We avoid + # ScanState.start_scan() here because it commits eagerly and sets + # phase='discovering', which we'd immediately overwrite. + now = datetime.now(timezone.utc) + scan_state.is_active = True + scan_state.phase = SCAN_PHASES['INITIALIZING'] scan_state.progress_message = 'Initializing single file scan' scan_state.estimated_total = 1 scan_state.phase_total = 1 + scan_state.files_processed = 0 + scan_state.directories = json.dumps([file_path]) + scan_state.force_rescan = force_rescan + scan_state.error_message = None + scan_state.start_time = now + scan_state.last_update = now + scan_state.end_time = None db.session.commit() # Capture scan ID for UI progress tracking diff --git a/pixelprobe/tasks.py b/pixelprobe/tasks.py index d5d93cd..018b4c3 100644 --- a/pixelprobe/tasks.py +++ b/pixelprobe/tasks.py @@ -15,9 +15,11 @@ from contextlib import contextmanager from pixelprobe.celery_config import celery_app +from pixelprobe.constants import SCAN_PHASES from pixelprobe.services.scan_service import ScanService from pixelprobe.progress_utils import get_redis_client, get_scan_progress_redis, update_scan_progress_redis from pixelprobe.models import db, ScanState, ScanResult, ScanReport +from pixelprobe.utils.celery_utils import is_db_connection_corruption from pixelprobe.utils.log_context import current_scan_id, current_celery_task_id @@ -273,9 +275,14 @@ def progress_callback(progress_data): elif scan_type == 'single': # Single file scan if paths and len(paths) == 1: + # Pass scan_id so scan_service reuses the ScanState row this task + # is already tracking, instead of creating a second one. Otherwise + # the UI's progress monitor sees a brief gap between rows and + # flips to "done" before the real scan starts. result = scan_service.scan_single_file( file_path=paths[0], - force_rescan=force_rescan + force_rescan=force_rescan, + scan_id=scan_id ) # CRITICAL: Commit Flask-SQLAlchemy session to ensure ScanService changes are visible @@ -309,6 +316,15 @@ def progress_callback(progress_data): is_db_error = isinstance(exc, (sqlalchemy.exc.DatabaseError, psycopg2.DatabaseError)) is_connection_error = isinstance(exc, (sqlalchemy.exc.OperationalError, psycopg2.OperationalError)) + # Decide whether this exception will be retried so we can preserve the + # ScanState row across attempts. If we marked it failed/inactive on + # every transient error, the UI would flip to "done" during the retry + # window and only re-discover the scan minutes later. + is_corruption_error = is_db_error and is_db_connection_corruption(exc) + will_retry = ( + self.request.retries < self.max_retries and not is_corruption_error + ) + # Update scan state with error try: # Roll back any pending transaction before querying @@ -318,8 +334,17 @@ def progress_callback(progress_data): if scan_state: error_msg = f"Celery task failed: {str(exc)}" scan_state.error_message = error_msg[:950] # Truncate to fit VARCHAR(1000) - scan_state.is_active = False - scan_state.phase = 'failed' + if will_retry: + # Keep the row active so the UI keeps showing progress + # during the retry backoff instead of jumping to "done". + scan_state.phase = SCAN_PHASES['INITIALIZING'] + scan_state.progress_message = ( + f'Retrying after error ' + f'(attempt {self.request.retries + 1}/{self.max_retries})' + ) + else: + scan_state.is_active = False + scan_state.phase = 'failed' db.session.commit() except Exception as db_exc: logger.error(f"Failed to update scan state with error: {str(db_exc)}") @@ -340,7 +365,7 @@ def progress_callback(progress_data): elif is_db_error: logger.error(f"Database error detected: {type(exc).__name__}") # Don't retry immediately for database corruption errors - if "PGRES_TUPLES_OK" in str(exc) or "no message from the libpq" in str(exc): + if is_db_connection_corruption(exc): logger.error(f"Database connection corruption detected - task {self.request.id} failed permanently") raise exc else: diff --git a/pixelprobe/utils/celery_utils.py b/pixelprobe/utils/celery_utils.py index 2ed67c2..42d0fc6 100644 --- a/pixelprobe/utils/celery_utils.py +++ b/pixelprobe/utils/celery_utils.py @@ -23,3 +23,14 @@ def check_celery_available(): celery_enabled = False return celery_enabled + + +def is_db_connection_corruption(exc) -> bool: + """Detect post-fork PostgreSQL connection corruption. + + Surfaces as "PGRES_TUPLES_OK and no message from the libpq" when a forked + worker inherits and uses a parent's libpq socket. The connection is dead; + retrying the same task on the same connection will not help. + """ + msg = str(exc) + return "PGRES_TUPLES_OK" in msg or "no message from the libpq" in msg diff --git a/pixelprobe/version.py b/pixelprobe/version.py index b9d0e4d..61817fc 100644 --- a/pixelprobe/version.py +++ b/pixelprobe/version.py @@ -4,7 +4,7 @@ # Default version - this is the single source of truth -_DEFAULT_VERSION = '2.6.40' +_DEFAULT_VERSION = '2.6.41' # Allow override via environment variable for CI/CD, but default to the hardcoded version diff --git a/requirements.txt b/requirements.txt index 63c6205..3f0f391 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ pillow-heif==0.22.0 # HEIC/HEIF support for Pillow (requires libheif system lib python-magic==0.4.27 ffmpeg-python==0.2.0 SQLAlchemy==2.0.41 -python-dotenv==1.0.0 +python-dotenv==1.2.2 Werkzeug==3.1.6 gunicorn==23.0.0 pytz==2023.3 diff --git a/tests/unit/test_scan_service.py b/tests/unit/test_scan_service.py index 42d14ea..461c8e4 100644 --- a/tests/unit/test_scan_service.py +++ b/tests/unit/test_scan_service.py @@ -92,6 +92,40 @@ def test_scan_single_file_not_found(self, scan_service): """Test scanning non-existent file""" with pytest.raises(FileNotFoundError): scan_service.scan_single_file('/nonexistent/file.mp4') + + @patch('os.path.exists') + @patch('pixelprobe.services.scan_service.PixelProbe') + def test_scan_single_file_reuses_existing_scan_state(self, mock_probe_class, mock_exists, + scan_service, app, db): + """Single-file scan reuses an existing ScanState row when scan_id is passed. + + Regression test for the v2.6.41 UI flicker bug: the API route created a + ScanState before queueing the Celery task, then scan_single_file created + a *second* row with a different scan_id and the UI lost track in between. + """ + with app.app_context(): + mock_exists.return_value = True + mock_probe_class.return_value.scan_file.return_value = Mock() + + existing = ScanState.create_new_scan(scan_id='route-scan-id') + existing.start_scan(['/test/file.mp4'], force_rescan=True) + existing.is_active = False # Simulate post-failure state pre-retry + existing.phase = 'failed' + db.session.commit() + existing_id = existing.id + + scan_service.scan_single_file('/test/file.mp4', force_rescan=True, + scan_id='route-scan-id') + + rows = ScanState.query.filter_by(scan_id='route-scan-id').all() + assert len(rows) == 1 + assert rows[0].id == existing_id + assert rows[0].is_active is True + assert rows[0].phase == 'initializing' + assert rows[0].error_message is None + + if scan_service.current_scan_thread: + scan_service.current_scan_thread.join(timeout=1) @patch('os.path.exists') @patch('pixelprobe.services.scan_service.PixelProbe')