Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/backend/base/langflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ async def lifespan(_app: FastAPI):
f"writes will use the legacy direct-write path: {exc}"
)

# Start the periodic authz audit-log retention sweep. No-op unless
# AUTHZ_AUDIT_ENABLED and AUTHZ_AUDIT_RETENTION_DAYS > 0. The startup
# sweep in initialize_services() already pruned at boot; this keeps a
# long-running instance bounded between restarts.
try:
from langflow.services.task.audit_cleanup import audit_log_cleanup_worker

await audit_log_cleanup_worker.start()
except Exception as exc: # noqa: BLE001 — never block startup on cleanup scheduling
await logger.awarning(f"Failed to start authz audit-log cleanup worker: {exc}")

current_time = asyncio.get_event_loop().time()
await logger.adebug("Setting up LLM caching")
setup_llm_caching()
Expand Down Expand Up @@ -547,6 +558,15 @@ async def refresh_models_dev_periodically() -> None:
await stop_streamable_http_manager()
except Exception as e: # noqa: BLE001
await logger.aerror(f"Failed to stop MCP server streamable-http session manager: {e}")
# Stop the authz audit-log retention worker (best-effort;
# no-op when it was never scheduled).
try:
from langflow.services.task.audit_cleanup import audit_log_cleanup_worker

await audit_log_cleanup_worker.stop()
except Exception as e: # noqa: BLE001
await logger.aerror(f"Failed to stop authz audit-log cleanup worker: {e}")

# Cancel background tasks
tasks_to_cancel = []
if sync_flows_from_fs_task:
Expand Down
154 changes: 154 additions & 0 deletions src/backend/base/langflow/services/task/audit_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""Recurring retention sweep for the ``authz_audit_log`` table.

The retention helper :func:`langflow.services.utils.clean_authz_audit_log` is
invoked once at startup inside ``initialize_services``. That single boot-time
sweep leaves a long-running instance to accumulate audit rows without bound
between restarts. This module wires the same helper to a background worker that
prunes on a fixed interval (daily by default), modelled on the sibling
:class:`langflow.services.task.temp_flow_cleanup.CleanupWorker`.

The worker is intentionally best-effort: every sweep opens its own
``session_scope`` and the helper logs-and-swallows database errors, so a
transient outage never kills the loop or blocks the event loop / request path.
"""

from __future__ import annotations

import asyncio
import contextlib
from typing import TYPE_CHECKING

from lfx.log.logger import logger

from langflow.services.deps import get_settings_service, session_scope
from langflow.services.utils import clean_authz_audit_log

if TYPE_CHECKING:
from lfx.services.settings.auth import AuthSettings

# Daily by default; mirrors AuthSettings.AUTHZ_AUDIT_CLEANUP_INTERVAL so the
# worker still has a sane cadence if the setting is somehow unavailable.
DEFAULT_CLEANUP_INTERVAL_SECONDS = 86400


class AuditLogCleanupWorker:
"""Periodically prune ``authz_audit_log`` rows past the retention window.

The worker is a no-op unless ``AUTHZ_AUDIT_ENABLED`` is True and
``AUTHZ_AUDIT_RETENTION_DAYS`` is greater than 0 — both gates are evaluated
in :meth:`start`, so a disabled deployment never schedules a task. The
unconditional startup sweep in ``initialize_services`` still handles
boot-time pruning (including cleaning up leftover rows after auditing is
turned off), so this worker only has to cover the steady state.

Args:
interval: Optional override (seconds) for the sweep cadence. When None
the cadence is read from ``AUTHZ_AUDIT_CLEANUP_INTERVAL`` at start.
Primarily a testing seam so the schedule can be exercised quickly
without mutating global settings.
"""

def __init__(self, *, interval: float | None = None) -> None:
self._interval_override = interval
self._interval: float = float(interval) if interval is not None else DEFAULT_CLEANUP_INTERVAL_SECONDS
self._stop_event = asyncio.Event()
self._task: asyncio.Task | None = None

def _resolve_interval(self, auth_settings: AuthSettings) -> float:
"""Resolve the sweep interval, preferring the constructor override."""
if self._interval_override is not None:
return float(self._interval_override)
try:
return float(getattr(auth_settings, "AUTHZ_AUDIT_CLEANUP_INTERVAL", DEFAULT_CLEANUP_INTERVAL_SECONDS))
except (TypeError, ValueError):
return float(DEFAULT_CLEANUP_INTERVAL_SECONDS)

async def start(self) -> None:
"""Start the periodic cleanup task, honouring the audit/retention gates."""
if self._task is not None:
await logger.awarning("Audit-log cleanup worker is already running")
return

auth_settings = get_settings_service().auth_settings

if not getattr(auth_settings, "AUTHZ_AUDIT_ENABLED", False):
await logger.adebug("Audit-log cleanup worker not started: AUTHZ_AUDIT_ENABLED is False")
return

try:
retention_days = int(getattr(auth_settings, "AUTHZ_AUDIT_RETENTION_DAYS", 90))
except (TypeError, ValueError):
retention_days = 90
if retention_days <= 0:
await logger.adebug(
"Audit-log cleanup worker not started: retention disabled (AUTHZ_AUDIT_RETENTION_DAYS=%s)",
retention_days,
)
return

self._interval = self._resolve_interval(auth_settings)
self._stop_event.clear()
self._task = asyncio.create_task(self._run(), name="authz-audit-cleanup")
await logger.adebug(
"Started authz_audit_log cleanup worker (interval=%ss, retention=%sd)",
self._interval,
retention_days,
)

async def stop(self) -> None:
"""Stop the cleanup task gracefully, waiting for the current sweep to end."""
if self._task is None:
# Common path when auditing is disabled — nothing was ever scheduled.
await logger.adebug("Audit-log cleanup worker is not running")
return

await logger.adebug("Stopping authz_audit_log cleanup worker...")
self._stop_event.set()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
await logger.adebug("authz_audit_log cleanup worker stopped")

async def _run(self) -> None:
"""Prune the audit log every interval until stopped.

Sleep-first: the unconditional startup sweep already pruned at boot, so
the first scheduled pass waits one interval to avoid an immediate,
redundant delete right after startup.
"""
while not self._stop_event.is_set():
if await self._sleep_or_stop(self._interval):
break
await self._run_once()

async def _sleep_or_stop(self, delay: float) -> bool:
"""Wait ``delay`` seconds or until stop is requested.

Returns True if a stop was requested during the wait (the caller should
break out of the loop), False if the full delay elapsed.
"""
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=delay)
except asyncio.TimeoutError:
return False
return True

async def _run_once(self) -> int:
"""Run a single retention sweep in its own session.

Best-effort: ``clean_authz_audit_log`` already swallows SQLAlchemy and
timeout errors; this outer guard additionally protects the loop from
session/connection failures so the worker survives a transient outage.
Returns the number of rows deleted (``-1`` when unavailable).
"""
settings_service = get_settings_service()
try:
async with session_scope() as session:
return await clean_authz_audit_log(settings_service, session)
except Exception as exc: # noqa: BLE001 — best-effort; never kill the loop
await logger.aerror(f"Scheduled authz_audit_log cleanup failed: {exc}")
return -1


# Module-level singleton started/stopped by the application lifespan.
audit_log_cleanup_worker = AuditLogCleanupWorker()
Loading
Loading