Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0).

## [2.6.41] - 2026-04-26

### Security

- **Bump `python-dotenv` from 1.0.0 to 1.2.2** to resolve CVE-2026-28684 (arbitrary file overwrite via symlink follow). MEDIUM severity. The pinned 1.0.0 had no fix available; 1.2.2 is the first release with the patch.

### Fixed

- **Single-file rescan UI flips to "done" then resumes minutes later**. The Flask route created a `ScanState` row keyed on its generated `scan_id`, then the Celery worker's `scan_service.scan_single_file` created a *second* `ScanState` with a different `scan_id` and tracked progress on that one. The UI's progress monitor lost track between the two rows and reported the scan complete before the worker had even started hashing the file. `scan_single_file` now accepts an optional `scan_id` and reuses the existing row when one matches; the Celery `scan_media_task` passes `scan_id` through for `scan_type='single'`.
- **Silent first-attempt failures on Celery worker scans** caused by post-fork PostgreSQL connection sharing. Symptom in logs: `psycopg2.DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq` on one worker, surfacing as a bare `NotImplementedError` from `sqlalchemy/engine/result.py:_indexes_for_keys` in a sibling worker. Fixed by disposing the SQLAlchemy engine in the `worker_process_init` Celery signal so each forked child builds its own connection pool. The existing log-handler setup in that signal has been merged into a single `_setup_worker_process` handler.
- **Scan progress flickers to "failed" during transient Celery retries**. `scan_media_task`'s catch-all exception handler used to set `phase='failed'` and `is_active=False` on every error, including ones that were about to be retried. The handler now keeps the row active (`phase='initializing'` with a "Retrying after error" progress message) when more retries remain, and only marks the scan failed once the retry budget is exhausted or the error is the known-fatal `PGRES_TUPLES_OK` connection-corruption case.

## [2.6.40] - 2026-04-20

### Security
Expand Down
21 changes: 15 additions & 6 deletions pixelprobe/celery_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,25 @@ def init_celery(app, celery):


@worker_process_init.connect
def _setup_db_log_handler_in_worker(**kwargs):
"""Attach DatabaseLogHandler in each forked Celery worker child process.

Threads don't survive fork(), so the handler set up in the parent process
has a dead _writer_thread in children. This signal fires once per child
and creates a fresh handler with its own background writer thread.
def _setup_worker_process(**kwargs):
"""Initialize each forked Celery worker child process.

1. Dispose the inherited SQLAlchemy engine so each child builds a fresh
connection pool. Without this, child processes share libpq sockets
with the parent, which surfaces as "PGRES_TUPLES_OK and no message
from the libpq" - a NotImplementedError when concurrent SQLAlchemy
queries try to read a row whose cursor was torn out from under them.
2. Attach a fresh DatabaseLogHandler. The handler set up in the parent
process has a dead _writer_thread because threads don't survive
fork(); this signal fires once per child and replaces it.
"""
from app import app
from pixelprobe.models import db
from pixelprobe.utils.log_handler import DatabaseLogHandler

with app.app_context():
db.engine.dispose()

handler = DatabaseLogHandler(app)
handler.setLevel(logging.INFO)
logging.getLogger().addHandler(handler)
Expand Down
37 changes: 31 additions & 6 deletions pixelprobe/services/scan_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import List, Dict, Optional, Tuple

from flask import current_app
from pixelprobe.constants import SCAN_PHASES
from pixelprobe.media_checker import PixelProbe, load_exclusions, load_exclusions_with_patterns
from pixelprobe.models import db, ScanResult, ScanState, ScanReport, ScanChunk
from pixelprobe.utils.helpers import ProgressTracker
Expand Down Expand Up @@ -92,8 +93,16 @@ def update_progress(self, current: int, total: int, file_path: str, status: str)
except Exception as e:
logger.debug(f"Failed to update Redis progress: {e}")

def scan_single_file(self, file_path: str, force_rescan: bool = False) -> Dict:
"""Scan a single file"""
def scan_single_file(self, file_path: str, force_rescan: bool = False,
scan_id: Optional[str] = None) -> Dict:
"""Scan a single file.

When ``scan_id`` is provided (e.g., the API route created a ScanState
before queueing the Celery task), reuse that row so the UI tracks one
continuous scan from queued through completed. Without this, a second
ScanState is created here and the UI's progress monitor briefly sees
no active scan and flips to "done" before the new row appears.
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")

Expand All @@ -104,13 +113,29 @@ def scan_single_file(self, file_path: str, force_rescan: bool = False) -> Dict:
self.update_progress(0, 1, file_path, 'scanning')
self.scan_cancelled = False

# Create ScanState record for UI progress tracking
scan_state = ScanState.create_new_scan()
scan_state.start_scan([file_path], force_rescan)
scan_state.phase = 'initializing'
scan_state = None
if scan_id:
scan_state = ScanState.query.filter_by(scan_id=scan_id).first()

if scan_state is None:
scan_state = ScanState.create_new_scan(scan_id=scan_id)

# Apply single-file initialization fields directly. We avoid
# ScanState.start_scan() here because it commits eagerly and sets
# phase='discovering', which we'd immediately overwrite.
now = datetime.now(timezone.utc)
scan_state.is_active = True
scan_state.phase = SCAN_PHASES['INITIALIZING']
scan_state.progress_message = 'Initializing single file scan'
scan_state.estimated_total = 1
scan_state.phase_total = 1
scan_state.files_processed = 0
scan_state.directories = json.dumps([file_path])
scan_state.force_rescan = force_rescan
scan_state.error_message = None
scan_state.start_time = now
scan_state.last_update = now
scan_state.end_time = None
db.session.commit()

# Capture scan ID for UI progress tracking
Expand Down
33 changes: 29 additions & 4 deletions pixelprobe/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from contextlib import contextmanager

from pixelprobe.celery_config import celery_app
from pixelprobe.constants import SCAN_PHASES
from pixelprobe.services.scan_service import ScanService
from pixelprobe.progress_utils import get_redis_client, get_scan_progress_redis, update_scan_progress_redis
from pixelprobe.models import db, ScanState, ScanResult, ScanReport
from pixelprobe.utils.celery_utils import is_db_connection_corruption
from pixelprobe.utils.log_context import current_scan_id, current_celery_task_id


Expand Down Expand Up @@ -273,9 +275,14 @@ def progress_callback(progress_data):
elif scan_type == 'single':
# Single file scan
if paths and len(paths) == 1:
# Pass scan_id so scan_service reuses the ScanState row this task
# is already tracking, instead of creating a second one. Otherwise
# the UI's progress monitor sees a brief gap between rows and
# flips to "done" before the real scan starts.
result = scan_service.scan_single_file(
file_path=paths[0],
force_rescan=force_rescan
force_rescan=force_rescan,
scan_id=scan_id
)

# CRITICAL: Commit Flask-SQLAlchemy session to ensure ScanService changes are visible
Expand Down Expand Up @@ -309,6 +316,15 @@ def progress_callback(progress_data):
is_db_error = isinstance(exc, (sqlalchemy.exc.DatabaseError, psycopg2.DatabaseError))
is_connection_error = isinstance(exc, (sqlalchemy.exc.OperationalError, psycopg2.OperationalError))

# Decide whether this exception will be retried so we can preserve the
# ScanState row across attempts. If we marked it failed/inactive on
# every transient error, the UI would flip to "done" during the retry
# window and only re-discover the scan minutes later.
is_corruption_error = is_db_error and is_db_connection_corruption(exc)
will_retry = (
self.request.retries < self.max_retries and not is_corruption_error
)

# Update scan state with error
try:
# Roll back any pending transaction before querying
Expand All @@ -318,8 +334,17 @@ def progress_callback(progress_data):
if scan_state:
error_msg = f"Celery task failed: {str(exc)}"
scan_state.error_message = error_msg[:950] # Truncate to fit VARCHAR(1000)
scan_state.is_active = False
scan_state.phase = 'failed'
if will_retry:
# Keep the row active so the UI keeps showing progress
# during the retry backoff instead of jumping to "done".
scan_state.phase = SCAN_PHASES['INITIALIZING']
scan_state.progress_message = (
f'Retrying after error '
f'(attempt {self.request.retries + 1}/{self.max_retries})'
)
else:
scan_state.is_active = False
scan_state.phase = 'failed'
db.session.commit()
except Exception as db_exc:
logger.error(f"Failed to update scan state with error: {str(db_exc)}")
Expand All @@ -340,7 +365,7 @@ def progress_callback(progress_data):
elif is_db_error:
logger.error(f"Database error detected: {type(exc).__name__}")
# Don't retry immediately for database corruption errors
if "PGRES_TUPLES_OK" in str(exc) or "no message from the libpq" in str(exc):
if is_db_connection_corruption(exc):
logger.error(f"Database connection corruption detected - task {self.request.id} failed permanently")
raise exc
else:
Expand Down
11 changes: 11 additions & 0 deletions pixelprobe/utils/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ def check_celery_available():
celery_enabled = False

return celery_enabled


def is_db_connection_corruption(exc) -> bool:
"""Detect post-fork PostgreSQL connection corruption.

Surfaces as "PGRES_TUPLES_OK and no message from the libpq" when a forked
worker inherits and uses a parent's libpq socket. The connection is dead;
retrying the same task on the same connection will not help.
"""
msg = str(exc)
return "PGRES_TUPLES_OK" in msg or "no message from the libpq" in msg
2 changes: 1 addition & 1 deletion pixelprobe/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Default version - this is the single source of truth


_DEFAULT_VERSION = '2.6.40'
_DEFAULT_VERSION = '2.6.41'


# Allow override via environment variable for CI/CD, but default to the hardcoded version
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pillow-heif==0.22.0 # HEIC/HEIF support for Pillow (requires libheif system lib
python-magic==0.4.27
ffmpeg-python==0.2.0
SQLAlchemy==2.0.41
python-dotenv==1.0.0
python-dotenv==1.2.2
Werkzeug==3.1.6
gunicorn==23.0.0
pytz==2023.3
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/test_scan_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,40 @@ def test_scan_single_file_not_found(self, scan_service):
"""Test scanning non-existent file"""
with pytest.raises(FileNotFoundError):
scan_service.scan_single_file('/nonexistent/file.mp4')

@patch('os.path.exists')
@patch('pixelprobe.services.scan_service.PixelProbe')
def test_scan_single_file_reuses_existing_scan_state(self, mock_probe_class, mock_exists,
scan_service, app, db):
"""Single-file scan reuses an existing ScanState row when scan_id is passed.

Regression test for the v2.6.41 UI flicker bug: the API route created a
ScanState before queueing the Celery task, then scan_single_file created
a *second* row with a different scan_id and the UI lost track in between.
"""
with app.app_context():
mock_exists.return_value = True
mock_probe_class.return_value.scan_file.return_value = Mock()

existing = ScanState.create_new_scan(scan_id='route-scan-id')
existing.start_scan(['/test/file.mp4'], force_rescan=True)
existing.is_active = False # Simulate post-failure state pre-retry
existing.phase = 'failed'
db.session.commit()
existing_id = existing.id

scan_service.scan_single_file('/test/file.mp4', force_rescan=True,
scan_id='route-scan-id')

rows = ScanState.query.filter_by(scan_id='route-scan-id').all()
assert len(rows) == 1
assert rows[0].id == existing_id
assert rows[0].is_active is True
assert rows[0].phase == 'initializing'
assert rows[0].error_message is None

if scan_service.current_scan_thread:
scan_service.current_scan_thread.join(timeout=1)

@patch('os.path.exists')
@patch('pixelprobe.services.scan_service.PixelProbe')
Expand Down
Loading