diff --git a/frontend/app/api/mutations/useSyncConnector.ts b/frontend/app/api/mutations/useSyncConnector.ts index 25426e32b..c3ffb7ffe 100644 --- a/frontend/app/api/mutations/useSyncConnector.ts +++ b/frontend/app/api/mutations/useSyncConnector.ts @@ -77,6 +77,8 @@ const syncConnector = async ({ bucket_filter?: string[]; /** When true, replace any indexed document with the same filename. */ replace_duplicates?: boolean; + /** When true (COS only), index without an owner so all users in the instance can retrieve the document. */ + shared?: boolean; }; }): Promise => { const response = await fetch(`/api/connectors/${connectorType}/sync`, { diff --git a/frontend/components/cloud-picker/ingest-settings.tsx b/frontend/components/cloud-picker/ingest-settings.tsx index ae56d9da8..ceb007bbd 100644 --- a/frontend/components/cloud-picker/ingest-settings.tsx +++ b/frontend/components/cloud-picker/ingest-settings.tsx @@ -42,6 +42,8 @@ interface IngestSettingsProps { onOpenChange: (open: boolean) => void; settings?: IngestSettingsType; onSettingsChange?: (settings: IngestSettingsType) => void; + /** When true, show the "Make documents available to all users" toggle. COS ingestion only. */ + showShared?: boolean; } export const IngestSettings = ({ @@ -49,6 +51,7 @@ export const IngestSettings = ({ onOpenChange, settings, onSettingsChange, + showShared = false, }: IngestSettingsProps) => { const { isAuthenticated, isNoAuthMode } = useAuth(); @@ -244,7 +247,13 @@ export const IngestSettings = ({ /> -
+
Picture descriptions @@ -260,6 +269,26 @@ export const IngestSettings = ({ } />
+ + {showShared && ( +
+
+
+ Make documents available to all users +
+
+ Shared documents are visible to all users in this OpenRAG + instance. +
+
+ + handleSettingsChange({ shared: checked }) + } + /> +
+ )}
diff --git a/frontend/components/cloud-picker/types.ts b/frontend/components/cloud-picker/types.ts index f3eea21f5..fc125cbef 100644 --- a/frontend/components/cloud-picker/types.ts +++ b/frontend/components/cloud-picker/types.ts @@ -113,6 +113,8 @@ export interface IngestSettings { ocr: boolean; pictureDescriptions: boolean; embeddingModel: string; + /** When true, index without an owner so all users in the instance can retrieve the document. COS only. */ + shared?: boolean; } /** Inline error message if chunk settings are invalid; otherwise null. */ diff --git a/frontend/components/connectors/shared-bucket-view.tsx b/frontend/components/connectors/shared-bucket-view.tsx index 86d5f23b0..05a0321b2 100644 --- a/frontend/components/connectors/shared-bucket-view.tsx +++ b/frontend/components/connectors/shared-bucket-view.tsx @@ -22,6 +22,8 @@ export interface SharedBucketViewProps { addTask: (id: string) => void; onBack: () => void; onDone: () => void; + /** When true, show the "Make documents available to all users" toggle. COS ingestion only. */ + showShared?: boolean; } export function SharedBucketView({ @@ -35,6 +37,7 @@ export function SharedBucketView({ addTask, onBack, onDone, + showShared = false, }: SharedBucketViewProps) { const queryClient = useQueryClient(); const [selectedBuckets, setSelectedBuckets] = useState>( @@ -76,6 +79,7 @@ export function SharedBucketView({ selected_files: [], bucket_filter: Array.from(selectedBuckets), settings: ingestSettings, + shared: showShared ? (ingestSettings.shared ?? false) : undefined, }, }, { @@ -239,6 +243,7 @@ export function SharedBucketView({ onOpenChange={setIsSettingsOpen} settings={ingestSettings} onSettingsChange={setIngestSettings} + showShared={showShared} />
diff --git a/frontend/enhancements/connectors/ibm-cos/components/bucket-view.tsx b/frontend/enhancements/connectors/ibm-cos/components/bucket-view.tsx index 11b3922fc..adaf20d1b 100644 --- a/frontend/enhancements/connectors/ibm-cos/components/bucket-view.tsx +++ b/frontend/enhancements/connectors/ibm-cos/components/bucket-view.tsx @@ -37,6 +37,7 @@ export function IBMCOSBucketView({ addTask={addTask} onBack={onBack} onDone={onDone} + showShared /> ); } diff --git a/src/api/connectors.py b/src/api/connectors.py index 3babd660c..67c98f0f8 100644 --- a/src/api/connectors.py +++ b/src/api/connectors.py @@ -345,6 +345,10 @@ class ConnectorSyncBody(BaseModel): # rather than failing. Set by the provider upload UI after the user confirms # overwrite in the duplicate dialog. replace_duplicates: bool = False + # When True (COS only), index chunks without an owner field so OpenSearch DLS + # makes them visible to all users in the instance. Temporary CIO mechanism; + # not a full ACL feature. Defaults to False (private). + shared: bool = False class ConnectorCheckDuplicatesBody(BaseModel): @@ -544,6 +548,12 @@ async def connector_sync( ) jwt_token = user.jwt_token + if body.shared and connector_type != "ibm_cos": + return JSONResponse( + {"error": "shared flag is only supported for the ibm_cos connector"}, + status_code=400, + ) + # Get all active connections for this connector type and user connections = await connector_service.connection_manager.list_connections( user_id=user.user_id, connector_type=connector_type @@ -611,6 +621,7 @@ async def connector_sync( file_infos=file_infos, ingest_settings=body.settings, replace_duplicates=body.replace_duplicates, + shared=body.shared, ) elif body.sync_all or body.bucket_filter: # Full ingest: discover and ingest all files (or files from specific buckets). @@ -652,6 +663,7 @@ async def connector_sync( all_file_ids, jwt_token=jwt_token, ingest_settings=body.settings, + shared=body.shared, ) else: # sync_all: ingest everything the connector can see @@ -660,6 +672,7 @@ async def connector_sync( user.user_id, max_files=max_files, jwt_token=jwt_token, + shared=body.shared, ) else: # No files specified - sync only files already in OpenSearch for this connector diff --git a/src/connectors/service.py b/src/connectors/service.py index 84c36a3ee..6c603d3bb 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -323,6 +323,7 @@ async def sync_connector_files( jwt_token: str = None, filename_filter: set = None, replace_duplicates: bool = False, + shared: bool = False, ) -> str: """ Sync files from a connector connection using existing task tracking system. @@ -421,6 +422,7 @@ async def sync_connector_files( ), models_service=self.models_service, replace_duplicates=replace_duplicates, + shared=shared, ) # Use file IDs as items (no more fake file paths!) @@ -449,6 +451,7 @@ async def sync_specific_files( file_infos: list[dict[str, Any]] = None, ingest_settings: dict[str, Any] | None = None, replace_duplicates: bool = False, + shared: bool = False, ) -> str: """ Sync specific files by their IDs (used for webhook-triggered syncs or manual selection). @@ -582,6 +585,7 @@ async def sync_specific_files( models_service=self.models_service, ingest_settings=ingest_settings, replace_duplicates=replace_duplicates, + shared=shared, ) # Create custom task using TaskService diff --git a/src/models/processors.py b/src/models/processors.py index 7a7a950ba..027a8b9f0 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING, Any from config.settings import clients, get_embedding_model, get_index_name, get_openrag_config +from session_manager import AnonymousUser from utils.document_processing import ( extract_relevant, process_text_file, @@ -29,6 +30,26 @@ from connectors.base import DocumentACL +def resolve_shared_owner_fields( + user_id: str | None, + owner_name: str | None, + owner_email: str | None, + shared: bool, +) -> tuple[str | None, str | None, str | None]: + """Return (owner, owner_name, owner_email) for indexing. + + When shared=True, owner is None so the indexed chunk omits the owner field + entirely, triggering the OpenSearch DLS must_not-exists-owner clause that + makes the document visible to all users in the instance. owner_name and + owner_email are set to AnonymousUser values, matching how default/sample + documents are loaded. + """ + if shared: + _anon = AnonymousUser() + return None, _anon.name, _anon.email + return user_id, owner_name, owner_email + + class TaskProcessor: """Base class for task processors with shared processing logic""" @@ -159,13 +180,17 @@ async def delete_document_by_filename( filename: str, opensearch_client, owner_user_id: str | None = None, + shared: bool = False, ) -> None: """ Delete all chunks of a document with the given filename from OpenSearch. """ from config.settings import clients, get_index_name from utils.opensearch_delete import collect_visible_document_ids, delete_document_ids - from utils.opensearch_queries import build_owned_filename_query + from utils.opensearch_queries import ( + build_owned_filename_query, + build_replace_filename_query, + ) try: write_client = clients.opensearch @@ -187,11 +212,18 @@ async def delete_document_by_filename( filename=filename, ) return + + # When shared=True the document being replaced may have previously + # been ingested without an owner field (also shared), so the normal + # owner-scoped query would miss those chunks. Use a broader query + # that covers both owned and ownerless chunks for this filename. + build_query = build_replace_filename_query if shared else build_owned_filename_query + for candidate in candidate_filenames: document_ids = await collect_visible_document_ids( opensearch_client, index=get_index_name(), - query=build_owned_filename_query(candidate, owner_user_id), + query=build_query(candidate, owner_user_id), ) deleted_count += await delete_document_ids( write_client, @@ -223,6 +255,7 @@ async def process_document_standard( is_sample_data: bool = False, acl: "DocumentACL | None" = None, connector_file_id: str | None = None, + shared: bool = False, ): """ Standard processing pipeline for non-Langflow processors: @@ -374,9 +407,11 @@ async def process_document_standard( error=str(e), ) - # Owner is always the authenticated uploading/syncing user. Upstream ACL - # owners/authors only contribute read access through allowed principals. - owner = owner_user_id + # Owner is always the authenticated uploading/syncing user unless shared=True, + # in which case owner fields are omitted so DLS makes the doc visible to all users. + owner, owner_name, owner_email = resolve_shared_owner_fields( + owner_user_id, owner_name, owner_email, shared + ) if acl: allowed_users = acl.allowed_users or [] allowed_groups = acl.allowed_groups or [] @@ -603,6 +638,7 @@ def __init__( models_service=None, ingest_settings: dict[str, Any] | None = None, replace_duplicates: bool = False, + shared: bool = False, ): super().__init__( document_service=document_service, @@ -618,6 +654,7 @@ def __init__( self.owner_email = owner_email self.ingest_settings = ingest_settings self.replace_duplicates = replace_duplicates + self.shared = shared async def process_item(self, upload_task: UploadTask, item: str, file_task: FileTask) -> None: """Process a connector file using unified methods""" @@ -757,6 +794,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File document.filename, opensearch_client, owner_user_id=self.user_id, + shared=self.shared, ) # Create temporary file from document content @@ -840,15 +878,20 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File {}, connector_tweak_settings ) + effective_owner, effective_owner_name, effective_owner_email = ( + resolve_shared_owner_fields( + self.user_id, self.owner_name, self.owner_email, self.shared + ) + ) result = await self.connector_service.langflow_service.upload_and_ingest_file( file_tuple=file_tuple, session_id=None, tweaks=tweaks, settings=self.ingest_settings, jwt_token=self.jwt_token, - owner=self.user_id, - owner_name=self.owner_name, - owner_email=self.owner_email, + owner=effective_owner, + owner_name=effective_owner_name, + owner_email=effective_owner_email, connector_type=connection.connector_type, docling_polling_service=self.connector_service.task_service.docling_polling_service if self.connector_service.task_service @@ -890,6 +933,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File connector_type=connection.connector_type, acl=document.acl, connector_file_id=document.id, + shared=self.shared, **standard_kwargs, ) diff --git a/src/services/document_index_writer.py b/src/services/document_index_writer.py index f3374386f..7a43a0fe9 100644 --- a/src/services/document_index_writer.py +++ b/src/services/document_index_writer.py @@ -208,7 +208,6 @@ def _build_chunk_document( else metadata.get("file_size"), "connector_type": context.connector_type or metadata.get("connector_type") or "local", "source_url": context.source_url or metadata.get("source_url") or "", - "owner": context.owner, "allowed_users": list(context.allowed_users), "allowed_groups": list(context.allowed_groups), "allowed_principals": unique_acl_principals(context.allowed_principals), @@ -219,6 +218,8 @@ def _build_chunk_document( "metadata": metadata.get("metadata", {}), } + if context.owner is not None: + doc["owner"] = context.owner if context.owner_name is not None: doc["owner_name"] = context.owner_name if context.owner_email is not None: diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index da3a90a7b..1cb6247af 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -376,7 +376,7 @@ async def run_ingestion_flow( # Pass metadata via tweaks to OpenSearch component metadata_tweaks = [] - if owner or owner is None: + if owner: metadata_tweaks.append({"key": "owner", "value": owner}) if owner_name: metadata_tweaks.append({"key": "owner_name", "value": owner_name}) @@ -421,9 +421,9 @@ async def run_ingestion_flow( headers = { "X-Langflow-Global-Var-JWT": str(jwt_token or ""), - "X-Langflow-Global-Var-OWNER": str(owner), - "X-Langflow-Global-Var-OWNER_NAME": str(owner_name), - "X-Langflow-Global-Var-OWNER_EMAIL": str(owner_email), + "X-Langflow-Global-Var-OWNER": owner or "", + "X-Langflow-Global-Var-OWNER_NAME": owner_name or "", + "X-Langflow-Global-Var-OWNER_EMAIL": owner_email or "", "X-Langflow-Global-Var-CONNECTOR_TYPE": str(connector_type), "X-Langflow-Global-Var-FILENAME": filename, "X-Langflow-Global-Var-MIMETYPE": mimetype, @@ -594,9 +594,9 @@ async def run_url_ingestion_flow( resolved_document_id = hash_id(io.BytesIO(docs_url.encode("utf-8"))) headers = { "X-Langflow-Global-Var-JWT": str(jwt_token or ""), - "X-Langflow-Global-Var-OWNER": str(owner), - "X-Langflow-Global-Var-OWNER_NAME": str(owner_name), - "X-Langflow-Global-Var-OWNER_EMAIL": str(owner_email), + "X-Langflow-Global-Var-OWNER": owner or "", + "X-Langflow-Global-Var-OWNER_NAME": owner_name or "", + "X-Langflow-Global-Var-OWNER_EMAIL": owner_email or "", "X-Langflow-Global-Var-CONNECTOR_TYPE": str(connector_type), "X-Langflow-Global-Var-SELECTED_EMBEDDING_MODEL": str(embedding_model), "X-Langflow-Global-Var-DOCUMENT_ID": resolved_document_id, diff --git a/src/utils/opensearch_queries.py b/src/utils/opensearch_queries.py index a99d3227f..ea12f33a3 100644 --- a/src/utils/opensearch_queries.py +++ b/src/utils/opensearch_queries.py @@ -43,3 +43,32 @@ def build_owned_filename_query(filename: str, owner: str) -> dict: ] } } + + +def build_replace_filename_query(filename: str, owner: str) -> dict: + """Build a delete-scope query for replace_duplicates that covers both private + and shared (ownerless) chunks with this filename. + + Matches chunks where filename matches AND (owner == current user OR owner + field is absent). Combining both cases is necessary because the same + filename may have been previously ingested as shared (no owner field) and + is now being replaced by the same user. The owner-field branch protects + against accidentally deleting documents owned by *other* users that are + merely visible to the current user via allowed_users DLS. + """ + return { + "bool": { + "filter": [ + build_filename_query(filename), + { + "bool": { + "should": [ + {"term": {"owner": owner}}, + {"bool": {"must_not": {"exists": {"field": "owner"}}}}, + ], + "minimum_should_match": 1, + } + }, + ] + } + } diff --git a/tests/integration/core/test_shared_flag_dls.py b/tests/integration/core/test_shared_flag_dls.py new file mode 100644 index 000000000..6455a8afa --- /dev/null +++ b/tests/integration/core/test_shared_flag_dls.py @@ -0,0 +1,226 @@ +"""Integration tests: shared-flag DLS anonymous path in OpenSearch. + +These tests verify that documents indexed without an owner field are visible +to all authenticated users via the existing must_not-exists-owner DLS clause, +and that documents WITH an owner field remain private to their owner. + +Requires a live OpenSearch instance with DLS configured (OPENSEARCH_PASSWORD set). +""" + +from uuid import uuid4 + +import pytest +from opensearchpy import AsyncOpenSearch +from opensearchpy._async.http_aiohttp import AIOHttpConnection + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.openrag_skip_app_onboard, +] + + +def _build_admin_opensearch_client(): + from config.settings import ( + IBM_AUTH_ENABLED, + OPENSEARCH_HOST, + OPENSEARCH_PASSWORD, + OPENSEARCH_PORT, + OPENSEARCH_USERNAME, + ) + + if IBM_AUTH_ENABLED: + pytest.skip("OSS JWT DLS is not used in IBM auth mode") + if not OPENSEARCH_PASSWORD: + pytest.skip("OPENSEARCH_PASSWORD is required for this DLS integration test") + + return AsyncOpenSearch( + hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], + connection_class=AIOHttpConnection, + scheme="https", + use_ssl=True, + verify_certs=False, + ssl_assert_fingerprint=None, + http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD), + http_compress=True, + ) + + +async def _search_visible_document_ids(opensearch_client, index_name: str) -> set[str]: + response = await opensearch_client.search( + index=index_name, + body={ + "query": {"match_all": {}}, + "_source": ["document_id"], + "size": 20, + }, + ) + return {hit["_source"]["document_id"] for hit in response.get("hits", {}).get("hits", [])} + + +async def test_ownerless_doc_visible_to_all_users(): + """A document indexed without an owner field is visible to all authenticated users. + + This is the core DLS mechanism used by shared=True ingestion. + The must_not-exists-owner clause in securityconfig/roles.yml makes any + chunk whose owner key is absent universally readable. + """ + from config.settings import INDEX_BODY, clients + from session_manager import SessionManager, User + from utils.opensearch_utils import setup_opensearch_security + + admin_client = _build_admin_opensearch_client() + try: + is_reachable = await admin_client.ping() + except Exception: + is_reachable = False + if not is_reachable: + await admin_client.close() + pytest.skip("OpenSearch is not reachable") + + index_name = f"documents_shared_flag_dls_{uuid4().hex}" + user_a_id = f"user-a-{uuid4().hex}" + user_a_email = f"{user_a_id}@example.com" + user_b_id = f"user-b-{uuid4().hex}" + user_b_email = f"{user_b_id}@example.com" + + try: + await setup_opensearch_security(admin_client) + await admin_client.indices.create(index=index_name, body=INDEX_BODY) + + await admin_client.bulk( + body=[ + # Shared doc: no owner key → visible to everyone + {"index": {"_index": index_name, "_id": "shared-doc"}}, + { + "document_id": "shared-doc", + "filename": "shared.pdf", + "text": "Shared document visible to all users", + "allowed_users": [], + "allowed_groups": [], + "allowed_principals": [], + }, + # Private doc owned by user A → only visible to user A + {"index": {"_index": index_name, "_id": "private-a-doc"}}, + { + "document_id": "private-a-doc", + "filename": "private-a.pdf", + "text": "Private document owned by user A", + "owner": user_a_id, + "allowed_users": [], + "allowed_groups": [], + "allowed_principals": [], + }, + # Private doc owned by user B → only visible to user B + {"index": {"_index": index_name, "_id": "private-b-doc"}}, + { + "document_id": "private-b-doc", + "filename": "private-b.pdf", + "text": "Private document owned by user B", + "owner": user_b_id, + "allowed_users": [], + "allowed_groups": [], + "allowed_principals": [], + }, + ], + refresh=True, + ) + + session_manager = SessionManager("test") + + user_a = User(user_id=user_a_id, email=user_a_email, name="User A") + token_a = session_manager.create_opensearch_jwt_token(user_a, ttl_seconds=120) + client_a = clients.create_user_opensearch_client(token_a) + + user_b = User(user_id=user_b_id, email=user_b_email, name="User B") + token_b = session_manager.create_opensearch_jwt_token(user_b, ttl_seconds=120) + client_b = clients.create_user_opensearch_client(token_b) + + try: + visible_a = await _search_visible_document_ids(client_a, index_name) + visible_b = await _search_visible_document_ids(client_b, index_name) + + # Shared doc visible to both users + assert "shared-doc" in visible_a, "User A must see the shared (ownerless) document" + assert "shared-doc" in visible_b, "User B must see the shared (ownerless) document" + + # Each user sees only their own private doc (not the other user's) + assert "private-a-doc" in visible_a + assert "private-a-doc" not in visible_b, "User B must NOT see User A's private doc" + + assert "private-b-doc" in visible_b + assert "private-b-doc" not in visible_a, "User A must NOT see User B's private doc" + + finally: + await client_a.close() + await client_b.close() + + finally: + await admin_client.indices.delete(index=index_name, ignore_unavailable=True) + await admin_client.close() + + +async def test_null_owner_field_does_not_trigger_anonymous_path(): + """A document indexed with owner=null (not absent) must NOT be universally visible. + + This regression test guards against the serialization bug described in the plan: + 'owner': null in JSON is treated by OpenSearch exists filter as present, + so it does NOT trigger must_not-exists-owner. Only an absent key triggers it. + + This test confirms the fix in _build_chunk_document is necessary. + """ + from config.settings import INDEX_BODY, clients + from session_manager import SessionManager, User + from utils.opensearch_utils import setup_opensearch_security + + admin_client = _build_admin_opensearch_client() + try: + is_reachable = await admin_client.ping() + except Exception: + is_reachable = False + if not is_reachable: + await admin_client.close() + pytest.skip("OpenSearch is not reachable") + + index_name = f"documents_null_owner_dls_{uuid4().hex}" + unrelated_user_id = f"unrelated-{uuid4().hex}" + unrelated_email = f"{unrelated_user_id}@example.com" + + try: + await setup_opensearch_security(admin_client) + await admin_client.indices.create(index=index_name, body=INDEX_BODY) + + # Index a doc with owner: null explicitly (the broken serialization path) + await admin_client.index( + index=index_name, + id="null-owner-doc", + body={ + "document_id": "null-owner-doc", + "filename": "null-owner.pdf", + "text": "Document with owner field set to null", + "owner": None, + "allowed_users": [], + "allowed_groups": [], + "allowed_principals": [], + }, + refresh=True, + ) + + session_manager = SessionManager("test") + unrelated = User(user_id=unrelated_user_id, email=unrelated_email, name="Unrelated User") + token = session_manager.create_opensearch_jwt_token(unrelated, ttl_seconds=120) + client = clients.create_user_opensearch_client(token) + + try: + visible = await _search_visible_document_ids(client, index_name) + # owner=null means the field EXISTS (with null value), so must_not-exists-owner + # does NOT fire → unrelated user should NOT see this doc + assert "null-owner-doc" not in visible, ( + "A doc with owner=null must NOT be universally visible. " + "Only a doc with no owner key triggers the anonymous DLS path." + ) + finally: + await client.close() + + finally: + await admin_client.indices.delete(index=index_name, ignore_unavailable=True) + await admin_client.close() diff --git a/tests/unit/test_shared_flag.py b/tests/unit/test_shared_flag.py new file mode 100644 index 000000000..8fc7f1c80 --- /dev/null +++ b/tests/unit/test_shared_flag.py @@ -0,0 +1,237 @@ +"""Unit tests for the shared COS ingestion flag.""" + +from typing import Any + +import pytest + +from models.processors import resolve_shared_owner_fields +from services.document_index_writer import ( + DocumentIndexChunk, + DocumentIndexContext, + DocumentIndexWriter, +) + +# --------------------------------------------------------------------------- +# resolve_shared_owner_fields +# --------------------------------------------------------------------------- + + +def test_resolve_shared_owner_fields_private(): + result = resolve_shared_owner_fields("user-1", "Alice", "alice@example.com", shared=False) + assert result == ("user-1", "Alice", "alice@example.com") + + +def test_resolve_shared_owner_fields_shared(): + result = resolve_shared_owner_fields("user-1", "Alice", "alice@example.com", shared=True) + assert result == (None, "Anonymous User", "anonymous@localhost") + + +def test_resolve_shared_owner_fields_shared_none_inputs(): + result = resolve_shared_owner_fields(None, None, None, shared=True) + assert result == (None, "Anonymous User", "anonymous@localhost") + + +def test_resolve_shared_owner_fields_private_none_inputs(): + result = resolve_shared_owner_fields(None, None, None, shared=False) + assert result == (None, None, None) + + +# --------------------------------------------------------------------------- +# DocumentIndexWriter._build_chunk_document +# --------------------------------------------------------------------------- + + +def _make_context(**kwargs): + defaults: dict[str, Any] = dict( + document_id="doc-1", + filename="test.pdf", + mimetype="application/pdf", + embedding_model="test-model", + file_size=None, + allowed_users=[], + allowed_groups=[], + allowed_principals=[], + allowed_principal_labels=[], + is_sample_data=False, + ) + defaults.update(kwargs) + return DocumentIndexContext(**defaults) + + +def _make_chunk(): + return DocumentIndexChunk( + chunk_id="doc-1_0", + text="hello world", + vector=[0.1, 0.2, 0.3], + page=1, + ) + + +def test_build_chunk_document_owner_present_when_set(): + writer = DocumentIndexWriter() + context = _make_context(owner="user-1", owner_name="Alice", owner_email="alice@example.com") + chunk = _make_chunk() + doc = writer._build_chunk_document( + context=context, chunk=chunk, embedding_field="vector", indexed_time="2026-01-01T00:00:00" + ) + assert doc["owner"] == "user-1" + assert doc["owner_name"] == "Alice" + assert doc["owner_email"] == "alice@example.com" + + +def test_build_chunk_document_omits_owner_key_when_none(): + """Critical DLS test: owner key must be absent, not null, for must_not-exists-owner clause.""" + writer = DocumentIndexWriter() + context = _make_context(owner=None, owner_name=None, owner_email=None) + chunk = _make_chunk() + doc = writer._build_chunk_document( + context=context, chunk=chunk, embedding_field="vector", indexed_time="2026-01-01T00:00:00" + ) + assert "owner" not in doc + assert "owner_name" not in doc + assert "owner_email" not in doc + + +def test_build_chunk_document_shared_has_anonymous_metadata(): + """Shared docs: owner key absent for DLS, owner_name/email set to anonymous values.""" + writer = DocumentIndexWriter() + context = _make_context( + owner=None, owner_name="Anonymous User", owner_email="anonymous@localhost" + ) + chunk = _make_chunk() + doc = writer._build_chunk_document( + context=context, chunk=chunk, embedding_field="vector", indexed_time="2026-01-01T00:00:00" + ) + assert "owner" not in doc + assert doc["owner_name"] == "Anonymous User" + assert doc["owner_email"] == "anonymous@localhost" + + +def test_build_chunk_document_allowed_users_always_present(): + """allowed_users/groups are always written (DLS lookup requires the field to exist).""" + writer = DocumentIndexWriter() + context = _make_context(owner=None) + chunk = _make_chunk() + doc = writer._build_chunk_document( + context=context, chunk=chunk, embedding_field="vector", indexed_time="2026-01-01T00:00:00" + ) + assert "allowed_users" in doc + assert "allowed_groups" in doc + + +# --------------------------------------------------------------------------- +# build_replace_filename_query +# --------------------------------------------------------------------------- + + +def test_build_replace_filename_query_structure(): + """Must match filename AND (owner == user OR must_not-exists-owner).""" + from utils.opensearch_queries import build_replace_filename_query + + q = build_replace_filename_query("report.pdf", "user-1") + assert q["bool"]["filter"][0] == {"term": {"filename": "report.pdf"}} + should = q["bool"]["filter"][1]["bool"]["should"] + assert {"term": {"owner": "user-1"}} in should + assert {"bool": {"must_not": {"exists": {"field": "owner"}}}} in should + assert q["bool"]["filter"][1]["bool"]["minimum_should_match"] == 1 + + +def test_build_replace_filename_query_differs_from_owned_query(): + """replace query is broader than the owner-only query.""" + from utils.opensearch_queries import build_owned_filename_query, build_replace_filename_query + + owned = build_owned_filename_query("f.pdf", "u") + replace = build_replace_filename_query("f.pdf", "u") + # owned has a single term filter; replace has a bool/should + assert owned != replace + + +# --------------------------------------------------------------------------- +# ConnectorSyncBody Pydantic model +# --------------------------------------------------------------------------- + + +def test_connector_sync_body_defaults_shared_false(): + from api.connectors import ConnectorSyncBody + + body = ConnectorSyncBody() + assert body.shared is False + + +def test_connector_sync_body_shared_true(): + from api.connectors import ConnectorSyncBody + + body = ConnectorSyncBody(shared=True) + assert body.shared is True + + +def test_connector_sync_body_shared_backwards_compat(): + """Existing clients that omit shared get False.""" + from api.connectors import ConnectorSyncBody + + body = ConnectorSyncBody(selected_files=["file-1"]) + assert body.shared is False + + +# --------------------------------------------------------------------------- +# connector_sync guard: shared=True rejected for non-COS connectors +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_non_cos_connector_rejects_shared_true(): + """The API handler must return 400 when shared=True and connector_type != ibm_cos.""" + from unittest.mock import AsyncMock, MagicMock + + from api.connectors import ConnectorSyncBody, connector_sync + + body = ConnectorSyncBody(shared=True) + connector_service = MagicMock() + session_manager = MagicMock() + user = MagicMock() + user.jwt_token = "token" + user.user_id = "user-1" + + # Stub out the connections lookup so the guard fires before any connector work + connector_service.connection_manager.list_connections = AsyncMock(return_value=[]) + + response = await connector_sync( + connector_type="google_drive", + body=body, + connector_service=connector_service, + session_manager=session_manager, + user=user, + ) + assert response.status_code == 400 + import json + + detail = json.loads(response.body) + assert "ibm_cos" in detail["error"] + + +@pytest.mark.asyncio +async def test_ibm_cos_shared_true_does_not_hit_guard(): + """shared=True with ibm_cos should NOT be rejected by the guard.""" + from unittest.mock import AsyncMock, MagicMock + + from api.connectors import ConnectorSyncBody, connector_sync + + body = ConnectorSyncBody(shared=True) + connector_service = MagicMock() + session_manager = MagicMock() + user = MagicMock() + user.jwt_token = "token" + user.user_id = "user-1" + + # Return empty active connections so we get 404 (not 400) — guard didn't fire + connector_service.connection_manager.list_connections = AsyncMock(return_value=[]) + + response = await connector_sync( + connector_type="ibm_cos", + body=body, + connector_service=connector_service, + session_manager=session_manager, + user=user, + ) + # 404 = "no active connections" error, not 400 = guard rejection + assert response.status_code == 404