12
12
from abc import ABC , abstractmethod
13
13
from typing import Any
14
14
15
+ from pydantic import TypeAdapter
16
+
15
17
from llama_stack .apis .common .errors import VectorStoreNotFoundError
16
18
from llama_stack .apis .files import Files , OpenAIFileObject
17
19
from llama_stack .apis .vector_dbs import VectorDB
50
52
51
53
# Constants for OpenAI vector stores
52
54
CHUNK_MULTIPLIER = 5
55
+ FILE_BATCH_CLEANUP_INTERVAL_SECONDS = 24 * 60 * 60 # 1 day in seconds
53
56
54
57
VERSION = "v3"
55
58
VECTOR_DBS_PREFIX = f"vector_dbs:{ VERSION } ::"
@@ -73,7 +76,7 @@ class OpenAIVectorStoreMixin(ABC):
73
76
# KV store for persisting OpenAI vector store metadata
74
77
kvstore : KVStore | None
75
78
# Track last cleanup time to throttle cleanup operations
76
- _last_cleanup_time : int
79
+ _last_file_batch_cleanup_time : int
77
80
78
81
async def _save_openai_vector_store (self , store_id : str , store_info : dict [str , Any ]) -> None :
79
82
"""Save vector store metadata to persistent storage."""
@@ -215,16 +218,6 @@ async def _cleanup_expired_file_batches(self) -> None:
215
218
if expired_count > 0 :
216
219
logger .info (f"Cleaned up { expired_count } expired file batches" )
217
220
218
- async def _cleanup_expired_file_batches_if_needed (self ) -> None :
219
- """Run cleanup if enough time has passed since the last cleanup."""
220
- current_time = int (time .time ())
221
- cleanup_interval = 24 * 60 * 60 # 1 day in seconds
222
-
223
- if current_time - self ._last_cleanup_time >= cleanup_interval :
224
- logger .info ("Running throttled cleanup of expired file batches" )
225
- await self ._cleanup_expired_file_batches ()
226
- self ._last_cleanup_time = current_time
227
-
228
221
async def _resume_incomplete_batches (self ) -> None :
229
222
"""Resume processing of incomplete file batches after server restart."""
230
223
for batch_id , batch_info in self .openai_file_batches .items ():
@@ -238,7 +231,7 @@ async def initialize_openai_vector_stores(self) -> None:
238
231
self .openai_vector_stores = await self ._load_openai_vector_stores ()
239
232
self .openai_file_batches = await self ._load_openai_vector_store_file_batches ()
240
233
await self ._resume_incomplete_batches ()
241
- self ._last_cleanup_time = 0
234
+ self ._last_file_batch_cleanup_time = 0
242
235
243
236
@abstractmethod
244
237
async def delete_chunks (self , store_id : str , chunks_for_deletion : list [ChunkForDeletion ]) -> None :
@@ -945,7 +938,11 @@ async def openai_create_vector_store_file_batch(
945
938
asyncio .create_task (self ._process_file_batch_async (batch_id , batch_info ))
946
939
947
940
# Run cleanup if needed (throttled to once every 1 day)
948
- asyncio .create_task (self ._cleanup_expired_file_batches_if_needed ())
941
+ current_time = int (time .time ())
942
+ if current_time - self ._last_file_batch_cleanup_time >= FILE_BATCH_CLEANUP_INTERVAL_SECONDS :
943
+ logger .info ("Running throttled cleanup of expired file batches" )
944
+ asyncio .create_task (self ._cleanup_expired_file_batches ())
945
+ self ._last_file_batch_cleanup_time = current_time
949
946
950
947
return batch_object
951
948
@@ -962,11 +959,10 @@ async def _process_file_batch_async(
962
959
963
960
for file_id in file_ids :
964
961
try :
965
- chunking_strategy_obj = (
966
- VectorStoreChunkingStrategyStatic (** chunking_strategy )
967
- if chunking_strategy .get ("type" ) == "static"
968
- else VectorStoreChunkingStrategyAuto (** chunking_strategy )
962
+ chunking_strategy_adapter : TypeAdapter [VectorStoreChunkingStrategy ] = TypeAdapter (
963
+ VectorStoreChunkingStrategy
969
964
)
965
+ chunking_strategy_obj = chunking_strategy_adapter .validate_python (chunking_strategy )
970
966
await self .openai_attach_file_to_vector_store (
971
967
vector_store_id = vector_store_id ,
972
968
file_id = file_id ,
0 commit comments