Skip to content

Commit 42ebf66

Browse files
committed
add concurrent file attaching
1 parent 980c2c3 commit 42ebf66

File tree

9 files changed

+186
-31
lines changed

9 files changed

+186
-31
lines changed

llama_stack/providers/inline/vector_io/faiss/faiss.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ def __init__(self, config: FaissVectorIOConfig, inference_api: Inference, files_
207207
self.kvstore: KVStore | None = None
208208
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
209209
self.openai_file_batches: dict[str, dict[str, Any]] = {}
210+
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
210211
self._last_file_batch_cleanup_time = 0
211212

212213
async def initialize(self) -> None:

llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ def __init__(self, config, inference_api: Inference, files_api: Files | None) ->
416416
self.cache: dict[str, VectorDBWithIndex] = {}
417417
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
418418
self.openai_file_batches: dict[str, dict[str, Any]] = {}
419+
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
419420
self._last_file_batch_cleanup_time = 0
420421
self.kvstore: KVStore | None = None
421422

llama_stack/providers/remote/vector_io/chroma/chroma.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ async def initialize(self) -> None:
167167
self.client = chromadb.PersistentClient(path=self.config.db_path)
168168
self.openai_vector_stores = await self._load_openai_vector_stores()
169169
self.openai_file_batches: dict[str, dict[str, Any]] = {}
170+
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
170171
self._last_file_batch_cleanup_time = 0
171172

172173
async def shutdown(self) -> None:

llama_stack/providers/remote/vector_io/milvus/milvus.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ def __init__(
318318
self.vector_db_store = None
319319
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
320320
self.openai_file_batches: dict[str, dict[str, Any]] = {}
321+
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
321322
self._last_file_batch_cleanup_time = 0
322323
self.metadata_collection_name = "openai_vector_stores_metadata"
323324

llama_stack/providers/remote/vector_io/pgvector/pgvector.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# This source code is licensed under the terms described in the LICENSE file in
55
# the root directory of this source tree.
66

7+
import asyncio
78
import heapq
89
from typing import Any
910

@@ -354,6 +355,7 @@ def __init__(
354355
self.vector_db_store = None
355356
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
356357
self.openai_file_batches: dict[str, dict[str, Any]] = {}
358+
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
357359
self._last_file_batch_cleanup_time = 0
358360
self.metadata_collection_name = "openai_vector_stores_metadata"
359361

llama_stack/providers/remote/vector_io/qdrant/qdrant.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def __init__(
171171
self.kvstore: KVStore | None = None
172172
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
173173
self.openai_file_batches: dict[str, dict[str, Any]] = {}
174+
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
174175
self._last_file_batch_cleanup_time = 0
175176
self._qdrant_lock = asyncio.Lock()
176177

llama_stack/providers/remote/vector_io/weaviate/weaviate.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44
# This source code is licensed under the terms described in the LICENSE file in
55
# the root directory of this source tree.
6+
import asyncio
67
import json
78
from typing import Any
89

@@ -293,6 +294,7 @@ def __init__(
293294
self.vector_db_store = None
294295
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
295296
self.openai_file_batches: dict[str, dict[str, Any]] = {}
297+
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
296298
self._last_file_batch_cleanup_time = 0
297299
self.metadata_collection_name = "openai_vector_stores_metadata"
298300

llama_stack/providers/utils/memory/openai_vector_store_mixin.py

Lines changed: 106 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
# Constants for OpenAI vector stores
5454
CHUNK_MULTIPLIER = 5
5555
FILE_BATCH_CLEANUP_INTERVAL_SECONDS = 24 * 60 * 60 # 1 day in seconds
56+
MAX_CONCURRENT_FILES_PER_BATCH = 5 # Maximum concurrent file processing within a batch
57+
FILE_BATCH_CHUNK_SIZE = 10 # Process files in chunks of this size (2x concurrency)
5658

5759
VERSION = "v3"
5860
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
@@ -77,6 +79,8 @@ class OpenAIVectorStoreMixin(ABC):
7779
kvstore: KVStore | None
7880
# Track last cleanup time to throttle cleanup operations
7981
_last_file_batch_cleanup_time: int
82+
# Track running file batch processing tasks
83+
_file_batch_tasks: dict[str, asyncio.Task[None]]
8084

8185
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
8286
"""Save vector store metadata to persistent storage."""
@@ -224,12 +228,14 @@ async def _resume_incomplete_batches(self) -> None:
224228
if batch_info["status"] == "in_progress":
225229
logger.info(f"Resuming incomplete file batch: {batch_id}")
226230
# Restart the background processing task
227-
asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
231+
task = asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
232+
self._file_batch_tasks[batch_id] = task
228233

229234
async def initialize_openai_vector_stores(self) -> None:
230235
"""Load existing OpenAI vector stores and file batches into the in-memory cache."""
231236
self.openai_vector_stores = await self._load_openai_vector_stores()
232237
self.openai_file_batches = await self._load_openai_vector_store_file_batches()
238+
self._file_batch_tasks = {}
233239
await self._resume_incomplete_batches()
234240
self._last_file_batch_cleanup_time = 0
235241

@@ -935,7 +941,8 @@ async def openai_create_vector_store_file_batch(
935941
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
936942

937943
# Start background processing of files
938-
asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
944+
task = asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
945+
self._file_batch_tasks[batch_id] = task
939946

940947
# Run cleanup if needed (throttled to once every 1 day)
941948
current_time = int(time.time())
@@ -946,50 +953,110 @@ async def openai_create_vector_store_file_batch(
946953

947954
return batch_object
948955

949-
async def _process_file_batch_async(
956+
async def _process_files_with_concurrency(
950957
self,
958+
file_ids: list[str],
959+
vector_store_id: str,
960+
attributes: dict[str, Any],
961+
chunking_strategy_obj: Any,
951962
batch_id: str,
952963
batch_info: dict[str, Any],
953964
) -> None:
954-
"""Process files in a batch asynchronously in the background."""
955-
file_ids = batch_info["file_ids"]
956-
attributes = batch_info["attributes"]
957-
chunking_strategy = batch_info["chunking_strategy"]
958-
vector_store_id = batch_info["vector_store_id"]
965+
"""Process files with controlled concurrency and chunking."""
966+
semaphore = asyncio.Semaphore(MAX_CONCURRENT_FILES_PER_BATCH)
967+
968+
async def process_single_file(file_id: str) -> tuple[str, bool]:
969+
"""Process a single file with concurrency control."""
970+
async with semaphore:
971+
try:
972+
await self.openai_attach_file_to_vector_store(
973+
vector_store_id=vector_store_id,
974+
file_id=file_id,
975+
attributes=attributes,
976+
chunking_strategy=chunking_strategy_obj,
977+
)
978+
return file_id, True
979+
except Exception as e:
980+
logger.error(f"Failed to process file {file_id} in batch {batch_id}: {e}")
981+
return file_id, False
982+
983+
# Process files in chunks to avoid creating too many tasks at once
984+
total_files = len(file_ids)
985+
for chunk_start in range(0, total_files, FILE_BATCH_CHUNK_SIZE):
986+
chunk_end = min(chunk_start + FILE_BATCH_CHUNK_SIZE, total_files)
987+
chunk = file_ids[chunk_start:chunk_end]
988+
989+
logger.info(
990+
f"Processing chunk {chunk_start // FILE_BATCH_CHUNK_SIZE + 1} of {(total_files + FILE_BATCH_CHUNK_SIZE - 1) // FILE_BATCH_CHUNK_SIZE} ({len(chunk)} files)"
991+
)
959992

960-
for file_id in file_ids:
961-
try:
962-
chunking_strategy_adapter: TypeAdapter[VectorStoreChunkingStrategy] = TypeAdapter(
963-
VectorStoreChunkingStrategy
964-
)
965-
chunking_strategy_obj = chunking_strategy_adapter.validate_python(chunking_strategy)
966-
await self.openai_attach_file_to_vector_store(
967-
vector_store_id=vector_store_id,
968-
file_id=file_id,
969-
attributes=attributes,
970-
chunking_strategy=chunking_strategy_obj,
971-
)
993+
async with asyncio.TaskGroup() as tg:
994+
chunk_tasks = [tg.create_task(process_single_file(file_id)) for file_id in chunk]
972995

973-
# Update counts atomically
974-
batch_info["file_counts"]["completed"] += 1
975-
batch_info["file_counts"]["in_progress"] -= 1
996+
chunk_results = [task.result() for task in chunk_tasks]
976997

977-
except Exception as e:
978-
logger.error(f"Failed to process file {file_id} in batch {batch_id}: {e}")
979-
batch_info["file_counts"]["failed"] += 1
980-
batch_info["file_counts"]["in_progress"] -= 1
998+
# Update counts after each chunk for progressive feedback
999+
for _, success in chunk_results:
1000+
self._update_file_counts(batch_info, success=success)
1001+
1002+
# Save progress after each chunk
1003+
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
1004+
1005+
def _update_file_counts(self, batch_info: dict[str, Any], success: bool) -> None:
1006+
"""Update file counts based on processing result."""
1007+
if success:
1008+
batch_info["file_counts"]["completed"] += 1
1009+
else:
1010+
batch_info["file_counts"]["failed"] += 1
1011+
batch_info["file_counts"]["in_progress"] -= 1
9811012

982-
# Update final status when all files are processed
1013+
def _update_batch_status(self, batch_info: dict[str, Any]) -> None:
1014+
"""Update final batch status based on file processing results."""
9831015
if batch_info["file_counts"]["failed"] == 0:
9841016
batch_info["status"] = "completed"
9851017
elif batch_info["file_counts"]["completed"] == 0:
9861018
batch_info["status"] = "failed"
9871019
else:
9881020
batch_info["status"] = "completed" # Partial success counts as completed
9891021

990-
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
1022+
async def _process_file_batch_async(
1023+
self,
1024+
batch_id: str,
1025+
batch_info: dict[str, Any],
1026+
) -> None:
1027+
"""Process files in a batch asynchronously in the background."""
1028+
file_ids = batch_info["file_ids"]
1029+
attributes = batch_info["attributes"]
1030+
chunking_strategy = batch_info["chunking_strategy"]
1031+
vector_store_id = batch_info["vector_store_id"]
1032+
chunking_strategy_adapter: TypeAdapter[VectorStoreChunkingStrategy] = TypeAdapter(VectorStoreChunkingStrategy)
1033+
chunking_strategy_obj = chunking_strategy_adapter.validate_python(chunking_strategy)
1034+
1035+
try:
1036+
# Process all files with controlled concurrency
1037+
await self._process_files_with_concurrency(
1038+
file_ids=file_ids,
1039+
vector_store_id=vector_store_id,
1040+
attributes=attributes,
1041+
chunking_strategy_obj=chunking_strategy_obj,
1042+
batch_id=batch_id,
1043+
batch_info=batch_info,
1044+
)
1045+
1046+
# Update final batch status
1047+
self._update_batch_status(batch_info)
1048+
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
9911049

992-
logger.info(f"File batch {batch_id} processing completed with status: {batch_info['status']}")
1050+
logger.info(f"File batch {batch_id} processing completed with status: {batch_info['status']}")
1051+
1052+
except asyncio.CancelledError:
1053+
logger.info(f"File batch {batch_id} processing was cancelled")
1054+
# Clean up task reference if it still exists
1055+
self._file_batch_tasks.pop(batch_id, None)
1056+
raise # Re-raise to ensure proper cancellation propagation
1057+
finally:
1058+
# Always clean up task reference when processing ends
1059+
self._file_batch_tasks.pop(batch_id, None)
9931060

9941061
def _get_and_validate_batch(self, batch_id: str, vector_store_id: str) -> dict[str, Any]:
9951062
"""Get and validate batch exists and belongs to vector store."""
@@ -1114,6 +1181,15 @@ async def openai_cancel_vector_store_file_batch(
11141181
if batch_info["status"] not in ["in_progress"]:
11151182
raise ValueError(f"Cannot cancel batch {batch_id} with status {batch_info['status']}")
11161183

1184+
# Cancel the actual processing task if it exists
1185+
if batch_id in self._file_batch_tasks:
1186+
task = self._file_batch_tasks[batch_id]
1187+
if not task.done():
1188+
task.cancel()
1189+
logger.info(f"Cancelled processing task for file batch: {batch_id}")
1190+
# Remove from task tracking
1191+
del self._file_batch_tasks[batch_id]
1192+
11171193
batch_info["status"] = "cancelled"
11181194

11191195
await self._save_openai_vector_store_file_batch(batch_id, batch_info)

tests/unit/providers/vector_io/test_vector_io_openai_vector_stores.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import json
88
import time
9-
from unittest.mock import AsyncMock
9+
from unittest.mock import AsyncMock, patch
1010

1111
import numpy as np
1212
import pytest
@@ -31,6 +31,24 @@
3131
# -v -s --tb=short --disable-warnings --asyncio-mode=auto
3232

3333

34+
@pytest.fixture(autouse=True)
35+
def mock_resume_file_batches(request):
36+
"""Mock the resume functionality to prevent stale file batches from being processed during tests."""
37+
# Skip mocking for tests that specifically test the resume functionality
38+
if any(
39+
test_name in request.node.name
40+
for test_name in ["test_only_in_progress_batches_resumed", "test_file_batch_persistence_across_restarts"]
41+
):
42+
yield
43+
return
44+
45+
with patch(
46+
"llama_stack.providers.utils.memory.openai_vector_store_mixin.OpenAIVectorStoreMixin._resume_incomplete_batches",
47+
new_callable=AsyncMock,
48+
):
49+
yield
50+
51+
3452
async def test_initialize_index(vector_index):
3553
await vector_index.initialize()
3654

@@ -918,3 +936,55 @@ async def test_expired_batch_access_error(vector_io_adapter):
918936
# Try to access expired batch
919937
with pytest.raises(ValueError, match="File batch batch_expired has expired after 7 days from creation"):
920938
vector_io_adapter._get_and_validate_batch("batch_expired", store_id)
939+
940+
941+
async def test_max_concurrent_files_per_batch(vector_io_adapter):
942+
"""Test that file batch processing respects MAX_CONCURRENT_FILES_PER_BATCH limit."""
943+
import asyncio
944+
945+
store_id = "vs_1234"
946+
947+
# Setup vector store
948+
vector_io_adapter.openai_vector_stores[store_id] = {
949+
"id": store_id,
950+
"name": "Test Store",
951+
"files": {},
952+
"file_ids": [],
953+
}
954+
955+
active_files = 0
956+
957+
async def mock_attach_file_with_delay(vector_store_id: str, file_id: str, **kwargs):
958+
"""Mock that tracks concurrency and blocks indefinitely to test concurrency limit."""
959+
nonlocal active_files
960+
active_files += 1
961+
962+
# Block indefinitely to test concurrency limit
963+
await asyncio.sleep(float("inf"))
964+
965+
# Replace the attachment method
966+
vector_io_adapter.openai_attach_file_to_vector_store = mock_attach_file_with_delay
967+
968+
# Create a batch with more files than the concurrency limit
969+
file_ids = [f"file_{i}" for i in range(8)] # 8 files, but limit should be 5
970+
971+
batch = await vector_io_adapter.openai_create_vector_store_file_batch(
972+
vector_store_id=store_id,
973+
file_ids=file_ids,
974+
)
975+
976+
# Give time for the semaphore logic to start processing files
977+
await asyncio.sleep(0.2)
978+
979+
# Verify that only MAX_CONCURRENT_FILES_PER_BATCH files are processing concurrently
980+
# The semaphore in _process_files_with_concurrency should limit this
981+
from llama_stack.providers.utils.memory.openai_vector_store_mixin import MAX_CONCURRENT_FILES_PER_BATCH
982+
983+
assert active_files == MAX_CONCURRENT_FILES_PER_BATCH, (
984+
f"Expected {MAX_CONCURRENT_FILES_PER_BATCH} active files, got {active_files}"
985+
)
986+
987+
# Verify batch is in progress
988+
assert batch.status == "in_progress"
989+
assert batch.file_counts.total == 8
990+
assert batch.file_counts.in_progress == 8

0 commit comments

Comments
 (0)