feat: Per-user ingestion coordination to prevent cross-source race conditions#171
feat: Per-user ingestion coordination to prevent cross-source race conditions#171anirudhaacharyap wants to merge 4 commits into
Conversation
Introduce UserIngestionCoordinator that serialises ingestion pipeline execution per user_id using async FIFO locks, while allowing different users to proceed in parallel. - New src/api/ingestion_coordinator.py with lazy lock creation, automatic cleanup, and a clean async context-manager interface - Wrap /v1/memory/ingest and /v1/memory/batch-ingest routes with per-user lock (existing global Semaphore(5) retained as backpressure) - Wrap both server.py test-frontend ingest routes with coordinator - Prevents profile overwrites, temporal duplicates, and summary drift caused by concurrent cross-source requests for the same user Closes #per-user-coordination
There was a problem hiding this comment.
Code Review
This pull request introduces a UserIngestionCoordinator to ensure that ingestion tasks are serialized per user, preventing concurrent pipeline runs for the same user_id. Feedback focuses on optimizing the locking order in ingest_memory to prevent system-wide starvation caused by the global semaphore being acquired before the per-user lock. Additionally, it is recommended to apply the global semaphore and structured error handling to the batch_ingest_memory route to maintain consistent backpressure and error reporting.
| async with _ingest_semaphore: | ||
| result = await asyncio.wait_for( | ||
| pipeline.run( | ||
| user_query=req.user_query, | ||
| agent_response=req.agent_response or "Acknowledged.", | ||
| user_id=user_id, | ||
| session_datetime=req.session_datetime, | ||
| image_url=req.image_url, | ||
| effort_level=req.effort_level, | ||
| ), | ||
| timeout=120.0 | ||
| ) | ||
| async with _user_coordinator.acquire(user_id): |
There was a problem hiding this comment.
Acquiring the global _ingest_semaphore before the per-user _user_coordinator lock can lead to system-wide starvation.
If a single user sends multiple concurrent requests, they could fill all available slots in the global semaphore while waiting for their own sequential user lock. This would prevent other users from acquiring a semaphore slot, even if the system has capacity to process their requests. Swapping the order ensures that a user only occupies a global concurrency slot when they are actually ready to run.
async with _user_coordinator.acquire(user_id):
async with _ingest_semaphore:| result = await asyncio.wait_for( | ||
| pipeline.run( | ||
| user_query=item.user_query, | ||
| agent_response=item.agent_response or "Acknowledged.", | ||
| user_id=user_id, | ||
| session_datetime=item.session_datetime, | ||
| image_url=item.image_url, | ||
| effort_level=item.effort_level, | ||
| ), | ||
| timeout=120.0 | ||
| ) |
There was a problem hiding this comment.
The batch_ingest_memory route currently bypasses the _ingest_semaphore, which is intended to provide system-wide backpressure. To maintain consistent concurrency control across the API, each pipeline run within the batch should respect the global semaphore.
Additionally, consider wrapping this route in a try/except block (similar to the single ingest route) to provide structured error responses instead of a generic 500 internal server error if a pipeline run fails.
| result = await asyncio.wait_for( | |
| pipeline.run( | |
| user_query=item.user_query, | |
| agent_response=item.agent_response or "Acknowledged.", | |
| user_id=user_id, | |
| session_datetime=item.session_datetime, | |
| image_url=item.image_url, | |
| effort_level=item.effort_level, | |
| ), | |
| timeout=120.0 | |
| ) | |
| async with _ingest_semaphore: | |
| result = await asyncio.wait_for( | |
| pipeline.run( | |
| user_query=item.user_query, | |
| agent_response=item.agent_response or "Acknowledged.", | |
| user_id=user_id, | |
| session_datetime=item.session_datetime, | |
| image_url=item.image_url, | |
| effort_level=item.effort_level, | |
| ), | |
| timeout=120.0 | |
| ) |
|
hi @anirudhaacharyap please have a look on the gemini suggestions |
…ured error handling to batch_ingest_memory
|
Hi @anirudhaacharyap there are some conflicts, please have a look |
…batch_ingest_memory
i forgot to sync the code mb, i am fixing it now |
anirudhaacharyap
left a comment
There was a problem hiding this comment.
🚀 Concurrency Locking & Error Handling Optimizations
This PR addresses feedback regarding locking order starvation and structured error reporting for batch operations.
🛠️ Key Changes
1. Swapped Locking Hierarchy (src/api/routes/memory.py)
- Problem: Acquiring the global semaphore (
_ingest_semaphore) before the per-user lock (_user_coordinator) could lead to system-wide starvation if one user queued multiple concurrent requests. - Solution: Swapped the acquisition order. The route now claims the sequential per-user lock first. The helper
_run_ingest_payloadsubsequently acquires the global concurrency limit semaphore internally.
# Route-level serialization (per user)
async with _user_coordinator.acquire(user_id):
data = await asyncio.wait_for(
_run_ingest_payload(payload, user_id),
timeout=120.0,
)|
Please do check it once, and it would be really good if there was a discord server or anything of such where we can communicate |
Ya actually we are building the community now, you can contact me on ishaankone@gmail.com for any queries or discussions |
…tion locking order
|
| Filename | Overview |
|---|---|
| src/api/ingestion_coordinator.py | New per-user FIFO lock implementation — clean design with lazy lock creation, waiter reference counting, and automatic cleanup. No external dependencies. |
| src/api/routes/memory.py | Coordinator and staged batch pipeline wired in; _run_batch_ingest_payload is now dead code after the route began calling _run_staged_batch_payload directly. |
| src/pipelines/ingest.py | Major refactor adds run_staged_batch with three-phase A/B/C pipeline and decoupled helper methods; contains a leftover dev comment and dead assignment in Phase B's code-result lookup. |
| src/agents/judge.py | Adds pending_ops overlay logic to all similarity-search paths (vector, profile-metadata, temporal), allowing in-flight operations to be visible within a batch before being persisted. |
| server.py | Test-frontend routes wrapped with a second, independent UserIngestionCoordinator instance that does not share state with the one in memory.py. |
| debug_test.py | Throwaway debug script committed by mistake; should be removed. |
| xlsx.py | Completely unrelated Excel budget generator; should be removed. |
| tests/test_ingestion_coordinator.py | Thorough coordinator unit tests covering FIFO ordering, cross-user parallelism, cleanup, exception safety, and deadlock prevention. |
| tests/test_batch_ingest.py | Integration and overlay tests for the staged batch pipeline; uses threading to simulate concurrent batches, which is appropriate for the sync TestClient. |
Sequence Diagram
sequenceDiagram
participant C1 as Client (Source A)
participant C2 as Client (Source B)
participant UC as UserIngestionCoordinator
participant SEM as Semaphore(5)
participant PA as Phase A (Extract)
participant PB as Phase B (Judge + pending_ops)
participant PC as Phase C (Weave)
C1->>UC: acquire(user_id)
UC-->>C1: lock acquired
C2->>UC: acquire(user_id)
Note over UC: C2 queued (FIFO)
C1->>SEM: acquire
SEM-->>C1: slot granted
C1->>PA: "gather(*extract tasks) [parallel]"
PA-->>C1: extraction results
C1->>PB: sequential per item
loop each item
PB->>PB: judge domains concurrently (gather)
PB->>PB: pending_ops.extend(ops)
end
PB-->>C1: judge results
C1->>PC: "gather(*weave tasks) [parallel]"
PC-->>C1: write results
C1->>SEM: release
C1->>UC: release(user_id)
UC-->>C2: lock acquired (C2 now runs)
Comments Outside Diff (1)
-
src/api/ingestion_coordinator.py, line 397-420 (link)Two independent coordinator instances don't cross-coordinate
server.pyandsrc/api/routes/memory.pyeach instantiate their ownUserIngestionCoordinator. Since locks are in-memory and not shared, a user whose ingestion arrives at the test-frontend routes (server.py) and simultaneously at the main API routes (memory.py) will not be serialised — two pipelines for the same user can run concurrently across the two app instances. This partially undermines the cross-source serialisation goal stated in the PR. A shared module-level singleton (or the distributed-lock path mentioned in the docstring) would close this gap.
Reviews (1): Last reviewed commit: "feat: staged parallel hybrid ingestion p..." | Re-trigger Greptile
| async def _run_batch_ingest_payload( | ||
| payload: Dict[str, Any], | ||
| user_id: str, | ||
| ) -> Dict[str, Any]: | ||
| async with _ingest_semaphore: | ||
| return await _run_staged_batch_payload(payload, user_id) |
There was a problem hiding this comment.
_run_batch_ingest_payload is defined but never called anywhere in the codebase. After the refactor, the batch_ingest_memory route calls _run_staged_batch_payload directly (with _ingest_semaphore in the route handler). This orphaned function can be removed to avoid confusion about which path is actually taken.
| # 4. Code annotations | ||
| code_res = item_state.get("code_judge") or item_state.get("code_result") | ||
| # Wait, let's look at the result schema. It's code_result | ||
| code_res = item_state.get("code_result") |
There was a problem hiding this comment.
The first assignment is immediately overwritten and the inline comment reads like a dev note. Remove both the dead assignment and the comment.
| # 4. Code annotations | |
| code_res = item_state.get("code_judge") or item_state.get("code_result") | |
| # Wait, let's look at the result schema. It's code_result | |
| code_res = item_state.get("code_result") | |
| # 4. Code annotations | |
| code_res = item_state.get("code_result") |
| import asyncio | ||
| from fastapi.testclient import TestClient | ||
| from unittest.mock import AsyncMock, patch | ||
|
|
||
| from src.api.app import create_app | ||
|
|
||
| app = create_app() | ||
| client = TestClient(app) | ||
|
|
||
| with patch("src.api.routes.memory.require_api_key", return_value={"username": "test_user"}): | ||
| from src.api.dependencies import require_api_key, enforce_rate_limit, require_ready | ||
| app.dependency_overrides[require_api_key] = lambda: {"username": "test_user"} | ||
| app.dependency_overrides[enforce_rate_limit] = lambda: True | ||
| app.dependency_overrides[require_ready] = lambda: True | ||
|
|
||
| payload = { | ||
| "items": [ | ||
| { | ||
| "user_query": "Hello world", | ||
| "agent_response": "Hi there", | ||
| "user_id": "test_user_1", | ||
| } | ||
| ] | ||
| } | ||
|
|
||
| try: | ||
| response = client.post( | ||
| "/v1/memory/batch-ingest", | ||
| json=payload, | ||
| headers={"Authorization": "Bearer test-key"} | ||
| ) | ||
| print("Status code:", response.status_code) | ||
| import json | ||
| print(json.dumps(response.json(), indent=2)) | ||
| except Exception as e: | ||
| print("Exception:", e) |
There was a problem hiding this comment.
Unrelated/debug files committed to the repository
debug_test.py, test_output.txt (a binary artifact), and xlsx.py (an Excel workbook generator with no relation to this feature) were accidentally included in the PR. All three should be removed before merging — debug_test.py is a throwaway debug script, test_output.txt is a build artifact, and xlsx.py appears to be a personal utility script that does not belong in this codebase.
Fixes #152
Problem
Ingestion requests can originate from multiple sources (MCP, SDK, browser extension) simultaneously. When concurrent requests hit the pipeline for the same user, they race against shared state — causing profile overwrites, temporal event duplicates, and stale summary deduplication.
The existing
asyncio.Semaphore(5)caps total system concurrency but does not distinguish between users.Solution
Introduces a
UserIngestionCoordinatorthat serialises ingestion peruser_idusing async FIFO locks, while allowing different users to proceed fully in parallel.New:
src/api/ingestion_coordinator.pyasyncio.Lockwith lazy creation and automatic cleanupasync with coordinator.acquire(user_id)context-manager APIModified:
src/api/routes/memory.py/v1/memory/ingest— pipeline.run wrapped in per-user lock (nested inside existing global semaphore)/v1/memory/batch-ingest— entire batch acquired under a single user lock, preserving sequential item processingModified:
server.py/v1/memory/ingest,/api/ingest) wrapped with coordinatorGuarantees
asyncio.Lockasyncio.Lockwaiter queueuser_idSemaphore(5)retainedTesting
7 unit tests at 100% coverage on the coordinator module: