Skip to content

Commit 980c2c3

Browse files
committed
clean up clean up
1 parent 5b41971 commit 980c2c3

File tree

8 files changed

+20
-17
lines changed

8 files changed

+20
-17
lines changed

llama_stack/providers/inline/vector_io/faiss/faiss.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ def __init__(self, config: FaissVectorIOConfig, inference_api: Inference, files_
207207
self.kvstore: KVStore | None = None
208208
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
209209
self.openai_file_batches: dict[str, dict[str, Any]] = {}
210+
self._last_file_batch_cleanup_time = 0
210211

211212
async def initialize(self) -> None:
212213
self.kvstore = await kvstore_impl(self.config.kvstore)

llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ def __init__(self, config, inference_api: Inference, files_api: Files | None) ->
416416
self.cache: dict[str, VectorDBWithIndex] = {}
417417
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
418418
self.openai_file_batches: dict[str, dict[str, Any]] = {}
419+
self._last_file_batch_cleanup_time = 0
419420
self.kvstore: KVStore | None = None
420421

421422
async def initialize(self) -> None:

llama_stack/providers/remote/vector_io/chroma/chroma.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ async def initialize(self) -> None:
167167
self.client = chromadb.PersistentClient(path=self.config.db_path)
168168
self.openai_vector_stores = await self._load_openai_vector_stores()
169169
self.openai_file_batches: dict[str, dict[str, Any]] = {}
170+
self._last_file_batch_cleanup_time = 0
170171

171172
async def shutdown(self) -> None:
172173
pass

llama_stack/providers/remote/vector_io/milvus/milvus.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ def __init__(
318318
self.vector_db_store = None
319319
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
320320
self.openai_file_batches: dict[str, dict[str, Any]] = {}
321+
self._last_file_batch_cleanup_time = 0
321322
self.metadata_collection_name = "openai_vector_stores_metadata"
322323

323324
async def initialize(self) -> None:

llama_stack/providers/remote/vector_io/pgvector/pgvector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,7 @@ def __init__(
354354
self.vector_db_store = None
355355
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
356356
self.openai_file_batches: dict[str, dict[str, Any]] = {}
357+
self._last_file_batch_cleanup_time = 0
357358
self.metadata_collection_name = "openai_vector_stores_metadata"
358359

359360
async def initialize(self) -> None:

llama_stack/providers/remote/vector_io/qdrant/qdrant.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def __init__(
171171
self.kvstore: KVStore | None = None
172172
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
173173
self.openai_file_batches: dict[str, dict[str, Any]] = {}
174+
self._last_file_batch_cleanup_time = 0
174175
self._qdrant_lock = asyncio.Lock()
175176

176177
async def initialize(self) -> None:

llama_stack/providers/remote/vector_io/weaviate/weaviate.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ def __init__(
293293
self.vector_db_store = None
294294
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
295295
self.openai_file_batches: dict[str, dict[str, Any]] = {}
296+
self._last_file_batch_cleanup_time = 0
296297
self.metadata_collection_name = "openai_vector_stores_metadata"
297298

298299
def _get_client(self) -> weaviate.WeaviateClient:

llama_stack/providers/utils/memory/openai_vector_store_mixin.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from abc import ABC, abstractmethod
1313
from typing import Any
1414

15+
from pydantic import TypeAdapter
16+
1517
from llama_stack.apis.common.errors import VectorStoreNotFoundError
1618
from llama_stack.apis.files import Files, OpenAIFileObject
1719
from llama_stack.apis.vector_dbs import VectorDB
@@ -50,6 +52,7 @@
5052

5153
# Constants for OpenAI vector stores
5254
CHUNK_MULTIPLIER = 5
55+
FILE_BATCH_CLEANUP_INTERVAL_SECONDS = 24 * 60 * 60 # 1 day in seconds
5356

5457
VERSION = "v3"
5558
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
@@ -73,7 +76,7 @@ class OpenAIVectorStoreMixin(ABC):
7376
# KV store for persisting OpenAI vector store metadata
7477
kvstore: KVStore | None
7578
# Track last cleanup time to throttle cleanup operations
76-
_last_cleanup_time: int
79+
_last_file_batch_cleanup_time: int
7780

7881
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
7982
"""Save vector store metadata to persistent storage."""
@@ -215,16 +218,6 @@ async def _cleanup_expired_file_batches(self) -> None:
215218
if expired_count > 0:
216219
logger.info(f"Cleaned up {expired_count} expired file batches")
217220

218-
async def _cleanup_expired_file_batches_if_needed(self) -> None:
219-
"""Run cleanup if enough time has passed since the last cleanup."""
220-
current_time = int(time.time())
221-
cleanup_interval = 24 * 60 * 60 # 1 day in seconds
222-
223-
if current_time - self._last_cleanup_time >= cleanup_interval:
224-
logger.info("Running throttled cleanup of expired file batches")
225-
await self._cleanup_expired_file_batches()
226-
self._last_cleanup_time = current_time
227-
228221
async def _resume_incomplete_batches(self) -> None:
229222
"""Resume processing of incomplete file batches after server restart."""
230223
for batch_id, batch_info in self.openai_file_batches.items():
@@ -238,7 +231,7 @@ async def initialize_openai_vector_stores(self) -> None:
238231
self.openai_vector_stores = await self._load_openai_vector_stores()
239232
self.openai_file_batches = await self._load_openai_vector_store_file_batches()
240233
await self._resume_incomplete_batches()
241-
self._last_cleanup_time = 0
234+
self._last_file_batch_cleanup_time = 0
242235

243236
@abstractmethod
244237
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
@@ -945,7 +938,11 @@ async def openai_create_vector_store_file_batch(
945938
asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
946939

947940
# Run cleanup if needed (throttled to once every 1 day)
948-
asyncio.create_task(self._cleanup_expired_file_batches_if_needed())
941+
current_time = int(time.time())
942+
if current_time - self._last_file_batch_cleanup_time >= FILE_BATCH_CLEANUP_INTERVAL_SECONDS:
943+
logger.info("Running throttled cleanup of expired file batches")
944+
asyncio.create_task(self._cleanup_expired_file_batches())
945+
self._last_file_batch_cleanup_time = current_time
949946

950947
return batch_object
951948

@@ -962,11 +959,10 @@ async def _process_file_batch_async(
962959

963960
for file_id in file_ids:
964961
try:
965-
chunking_strategy_obj = (
966-
VectorStoreChunkingStrategyStatic(**chunking_strategy)
967-
if chunking_strategy.get("type") == "static"
968-
else VectorStoreChunkingStrategyAuto(**chunking_strategy)
962+
chunking_strategy_adapter: TypeAdapter[VectorStoreChunkingStrategy] = TypeAdapter(
963+
VectorStoreChunkingStrategy
969964
)
965+
chunking_strategy_obj = chunking_strategy_adapter.validate_python(chunking_strategy)
970966
await self.openai_attach_file_to_vector_store(
971967
vector_store_id=vector_store_id,
972968
file_id=file_id,

0 commit comments

Comments
 (0)