|
21 | 21 |
|
22 | 22 | from fastapi import HTTPException, UploadFile |
23 | 23 | from fastapi.responses import StreamingResponse |
24 | | -from sqlalchemy import select |
| 24 | +from sqlalchemy import select, text |
25 | 25 | from sqlalchemy.ext.asyncio import AsyncSession |
26 | 26 |
|
27 | 27 | from aperag.config import settings |
@@ -117,16 +117,31 @@ def _validate_file(self, filename: str, size: int) -> str: |
117 | 117 | return file_suffix |
118 | 118 |
|
119 | 119 | async def _check_duplicate_document( |
120 | | - self, user: str, collection_id: str, filename: str, file_hash: str |
| 120 | + self, session: AsyncSession, user: str, collection_id: str, filename: str, file_hash: str |
121 | 121 | ) -> db_models.Document | None: |
122 | 122 | """ |
123 | | - Check if a document with the same name exists in the collection. |
| 123 | + Check if a document with the same name exists in the collection within the same transaction. |
124 | 124 | Returns the existing document if found, None otherwise. |
125 | 125 |
|
126 | 126 | Raises DocumentNameConflictException if same name but different file hash. |
| 127 | +
|
| 128 | + Args: |
| 129 | + session: Database session for transaction isolation |
| 130 | + user: User ID |
| 131 | + collection_id: Collection ID |
| 132 | + filename: Document filename |
| 133 | + file_hash: File content hash for duplicate detection |
127 | 134 | """ |
128 | | - # Use repository to query for existing document |
129 | | - existing_doc = await self.db_ops.query_document_by_name_and_collection(user, collection_id, filename) |
| 135 | + # Query within the same transaction for proper isolation |
| 136 | + stmt = select(db_models.Document).where( |
| 137 | + db_models.Document.user == user, |
| 138 | + db_models.Document.collection_id == collection_id, |
| 139 | + db_models.Document.name == filename, |
| 140 | + db_models.Document.status != db_models.DocumentStatus.DELETED, |
| 141 | + db_models.Document.gmt_deleted.is_(None), # Not soft deleted |
| 142 | + ) |
| 143 | + result = await session.execute(stmt) |
| 144 | + existing_doc = result.scalars().first() |
130 | 145 |
|
131 | 146 | if existing_doc: |
132 | 147 | # If existing document has no hash (legacy document), skip hash check |
@@ -414,9 +429,9 @@ async def _create_documents_atomically(session): |
414 | 429 | index_types = self._get_index_types_for_collection(collection_config) |
415 | 430 |
|
416 | 431 | for file_info in file_data: |
417 | | - # Check for duplicate document (same name and hash) |
| 432 | + # Check for duplicate document (same name and hash) within transaction |
418 | 433 | existing_doc = await self._check_duplicate_document( |
419 | | - user, collection.id, file_info["filename"], file_info["file_hash"] |
| 434 | + session, user, collection.id, file_info["filename"], file_info["file_hash"] |
420 | 435 | ) |
421 | 436 |
|
422 | 437 | if existing_doc and not ignore_duplicate: |
@@ -1086,34 +1101,61 @@ async def upload_document( |
1086 | 1101 | file_hash = calculate_file_hash(file_content) |
1087 | 1102 |
|
1088 | 1103 | async def _upload_document_atomically(session): |
1089 | | - # Check for duplicate document (same name and hash) |
1090 | | - existing_doc = await self._check_duplicate_document(user_id, collection.id, file.filename, file_hash) |
| 1104 | + from sqlalchemy.dialects.postgresql import insert |
1091 | 1105 |
|
1092 | | - if existing_doc: |
1093 | | - # Return existing document info (idempotent behavior) |
1094 | | - logger.info( |
1095 | | - f"Document '{file.filename}' already exists with same content, returning existing document {existing_doc.id}" |
1096 | | - ) |
1097 | | - return view_models.UploadDocumentResponse( |
1098 | | - document_id=existing_doc.id, |
1099 | | - filename=existing_doc.name, |
1100 | | - size=existing_doc.size, |
1101 | | - status=existing_doc.status, |
1102 | | - ) |
| 1106 | + # Try atomic insert first using INSERT ... ON CONFLICT |
| 1107 | + # This prevents race condition at database level |
| 1108 | + from aperag.db.models import random_id |
1103 | 1109 |
|
1104 | | - # Create new document with UPLOADED status (temporary) |
1105 | | - document_instance = await self._create_document_record( |
1106 | | - session=session, |
| 1110 | + temp_doc_id = "doc" + random_id() |
| 1111 | + |
| 1112 | + stmt = insert(db_models.Document).values( |
| 1113 | + id=temp_doc_id, |
| 1114 | + name=file.filename, |
1107 | 1115 | user=user_id, |
1108 | 1116 | collection_id=collection.id, |
1109 | | - filename=file.filename, |
| 1117 | + status=db_models.DocumentStatus.UPLOADED, |
1110 | 1118 | size=file.size, |
1111 | | - status=db_models.DocumentStatus.UPLOADED, # Temporary status |
1112 | | - file_suffix=file_suffix, |
1113 | | - file_content=file_content, |
1114 | 1119 | content_hash=file_hash, |
| 1120 | + gmt_created=utc_now(), |
| 1121 | + gmt_updated=utc_now(), |
| 1122 | + ) |
| 1123 | + stmt = stmt.on_conflict_do_nothing( |
| 1124 | + index_elements=["collection_id", "name"], index_where=text("gmt_deleted IS NULL") |
1115 | 1125 | ) |
1116 | 1126 |
|
| 1127 | + result = await session.execute(stmt) |
| 1128 | + await session.flush() |
| 1129 | + |
| 1130 | + if result.rowcount == 0: |
| 1131 | + # Document already exists, query and return it |
| 1132 | + existing_doc = await self._check_duplicate_document( |
| 1133 | + session, user_id, collection.id, file.filename, file_hash |
| 1134 | + ) |
| 1135 | + if existing_doc: |
| 1136 | + logger.info( |
| 1137 | + f"Document '{file.filename}' already exists with same content, returning existing document {existing_doc.id}" |
| 1138 | + ) |
| 1139 | + return view_models.UploadDocumentResponse( |
| 1140 | + document_id=existing_doc.id, |
| 1141 | + filename=existing_doc.name, |
| 1142 | + size=existing_doc.size, |
| 1143 | + status=existing_doc.status, |
| 1144 | + ) |
| 1145 | + |
| 1146 | + # Document created, now upload file to object store |
| 1147 | + async_obj_store = get_async_object_store() |
| 1148 | + document_instance = await session.get(db_models.Document, temp_doc_id) |
| 1149 | + upload_path = f"{document_instance.object_store_base_path()}/original{file_suffix}" |
| 1150 | + await async_obj_store.put(upload_path, file_content) |
| 1151 | + |
| 1152 | + # Update document with object path |
| 1153 | + metadata = {"object_path": upload_path} |
| 1154 | + document_instance.doc_metadata = json.dumps(metadata) |
| 1155 | + session.add(document_instance) |
| 1156 | + await session.flush() |
| 1157 | + await session.refresh(document_instance) |
| 1158 | + |
1117 | 1159 | return view_models.UploadDocumentResponse( |
1118 | 1160 | document_id=document_instance.id, filename=file.filename, size=file.size, status="UPLOADED" |
1119 | 1161 | ) |
|
0 commit comments