Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 57 additions & 13 deletions backend/api/routes/rules.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import re
from collections import defaultdict
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

Expand All @@ -24,6 +26,18 @@
CleanupRuleUpdate,
)


class ValidateRegexRequest(BaseModel):
base_path: str = ""
suffix: str = ""


class ValidateRegexResponse(BaseModel):
valid: bool
error: str | None = None
pattern: str | None = None


router = APIRouter(prefix="/api", tags=["rules"])


Expand Down Expand Up @@ -85,7 +99,8 @@ async def _validate_rule_paths(
"""Normalize and validate that each path matches at least one indexed media file.

Returns the cleaned list (or None if empty). Raises 400 if any path does
not match any stored media path for the rule's media type / libraries.
not match any stored media path for the rule's media type / libraries, or if
a regex pattern is invalid.
"""
if not paths:
return None
Expand All @@ -101,22 +116,24 @@ async def _validate_rule_paths(
)

normalized_media = [mp.replace("\\", "/").lower() for mp in media_paths]
wildcard_chars = ("*", "?", "[")

cleaned: list[str] = []
for raw in paths:
pattern = (raw or "").strip()
for pattern in paths:
pattern = (pattern or "").strip()
if not pattern:
continue
norm = pattern.replace("\\", "/").lower().rstrip("/")
first_wild = min(
(norm.find(ch) for ch in wildcard_chars if norm.find(ch) >= 0),
default=len(norm),
)
prefix = norm[:first_wild].rstrip("/")
if not prefix or not any(
mp == prefix or mp.startswith(prefix + "/") for mp in normalized_media
):

# Validate regex syntax (patterns should be pre-validated, but double-check)
try:
regex = re.compile(pattern, re.IGNORECASE)
except re.error as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid regex pattern '{pattern}': {e}",
)

# Check if pattern matches any media path
if not any(regex.search(mp) for mp in normalized_media):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(f"Path '{pattern}' does not match any indexed media location."),
Expand Down Expand Up @@ -226,6 +243,33 @@ async def create_rule(
return new_rule


@router.post("/rules/validate-regex", response_model=ValidateRegexResponse)
async def validate_regex_pattern(
body: ValidateRegexRequest,
_admin: Annotated[User, Depends(require_admin)],
) -> ValidateRegexResponse:
"""Validate and construct a regex pattern from base_path and suffix.
The base_path is treated as a literal string (escaped), while the suffix
can contain regex patterns. Returns the valid/complete pattern on success.
"""
# Normalize both paths
normalized_base = (body.base_path or "").replace("\\", "/").lower().rstrip("/")
normalized_suffix = (body.suffix or "").replace("\\", "/").lower().rstrip("/")

# Escape the base path to treat it as a literal string
escaped_base = re.escape(normalized_base)

# Combine escaped base with suffix, anchored to start of string
combined = f"""^{escaped_base if not normalized_suffix else f"{escaped_base}/{normalized_suffix}"}"""

# Validate the combined pattern
try:
re.compile(combined, re.IGNORECASE)
return ValidateRegexResponse(valid=True, pattern=combined)
except re.error as e:
return ValidateRegexResponse(valid=False, error=str(e))


@router.post("/rules/{rule_id}", response_model=CleanupRuleResponse)
async def update_rule(
rule_id: int,
Expand Down
27 changes: 13 additions & 14 deletions backend/tasks/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import fnmatch
import re
from datetime import UTC, datetime

from sqlalchemy import delete, or_, select
Expand Down Expand Up @@ -33,10 +33,8 @@
def _path_matches_any(file_paths: list[str], patterns: list[str]) -> bool:
"""Return True if any file_path matches any pattern.

A pattern containing wildcard characters (``*``, ``?``, ``[``) is evaluated
with :mod:`fnmatch` (case-insensitive). A pattern without wildcards is
treated as a directory prefix - a file path matches if it equals the
pattern or lives beneath it.
Patterns are regex strings (case-insensitive) that should be validated
before being stored. All patterns are expected to be valid regex.
"""
if not patterns or not file_paths:
return False
Expand All @@ -47,18 +45,19 @@ def _path_matches_any(file_paths: list[str], patterns: list[str]) -> bool:
if not normalized_files:
return False

wildcard_chars = ("*", "?", "[")
for raw in patterns:
pattern = (raw or "").replace("\\", "/").lower().rstrip("/")
for pattern in patterns:
pattern = (pattern or "").strip()
if not pattern:
continue
if any(ch in pattern for ch in wildcard_chars):
if any(fnmatch.fnmatchcase(fp, pattern) for fp in normalized_files):
return True
else:
prefix = pattern + "/"
if any(fp == pattern or fp.startswith(prefix) for fp in normalized_files):

try:
regex = re.compile(pattern, re.IGNORECASE)
if any(regex.search(fp) for fp in normalized_files):
return True
except re.error:
# Invalid regex - should not happen if patterns are validated, but skip to be safe
LOG.warning(f"Invalid regex pattern: {pattern}")
continue
return False


Expand Down
Loading
Loading