Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions frontend/app/api/mutations/useSyncConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ const syncConnector = async ({
bucket_filter?: string[];
/** When true, replace any indexed document with the same filename. */
replace_duplicates?: boolean;
/** When true (COS only), index without an owner so all users in the instance can retrieve the document. */
shared?: boolean;
};
}): Promise<SyncResponse> => {
const response = await fetch(`/api/connectors/${connectorType}/sync`, {
Expand Down
31 changes: 30 additions & 1 deletion frontend/components/cloud-picker/ingest-settings.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ interface IngestSettingsProps {
onOpenChange: (open: boolean) => void;
settings?: IngestSettingsType;
onSettingsChange?: (settings: IngestSettingsType) => void;
/** When true, show the "Make documents available to all users" toggle. COS ingestion only. */
showShared?: boolean;
}

export const IngestSettings = ({
isOpen,
onOpenChange,
settings,
onSettingsChange,
showShared = false,
}: IngestSettingsProps) => {
const { isAuthenticated, isNoAuthMode } = useAuth();

Expand Down Expand Up @@ -244,7 +247,13 @@ export const IngestSettings = ({
/>
</div>

<div className="flex items-center justify-between">
<div
className={
showShared
? "flex items-center justify-between border-b pb-3 mb-3"
: "flex items-center justify-between"
}
>
<div>
<div className="text-sm pb-2 font-semibold">
Picture descriptions
Expand All @@ -260,6 +269,26 @@ export const IngestSettings = ({
}
/>
</div>

{showShared && (
<div className="flex items-center justify-between">
<div>
<div className="text-sm pb-2 font-semibold">
Make documents available to all users
</div>
<div className="text-sm text-muted-foreground">
Shared documents are visible to all users in this OpenRAG
instance.
</div>
</div>
<Switch
checked={currentSettings.shared ?? false}
onCheckedChange={(checked) =>
handleSettingsChange({ shared: checked })
}
/>
</div>
)}
</div>
</CollapsibleContent>
</Collapsible>
Expand Down
2 changes: 2 additions & 0 deletions frontend/components/cloud-picker/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ export interface IngestSettings {
ocr: boolean;
pictureDescriptions: boolean;
embeddingModel: string;
/** When true, index without an owner so all users in the instance can retrieve the document. COS only. */
shared?: boolean;
}

/** Inline error message if chunk settings are invalid; otherwise null. */
Expand Down
5 changes: 5 additions & 0 deletions frontend/components/connectors/shared-bucket-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export interface SharedBucketViewProps {
addTask: (id: string) => void;
onBack: () => void;
onDone: () => void;
/** When true, show the "Make documents available to all users" toggle. COS ingestion only. */
showShared?: boolean;
}

export function SharedBucketView({
Expand All @@ -35,6 +37,7 @@ export function SharedBucketView({
addTask,
onBack,
onDone,
showShared = false,
}: SharedBucketViewProps) {
const queryClient = useQueryClient();
const [selectedBuckets, setSelectedBuckets] = useState<Set<string>>(
Expand Down Expand Up @@ -76,6 +79,7 @@ export function SharedBucketView({
selected_files: [],
bucket_filter: Array.from(selectedBuckets),
settings: ingestSettings,
shared: showShared ? (ingestSettings.shared ?? false) : undefined,
},
},
{
Expand Down Expand Up @@ -239,6 +243,7 @@ export function SharedBucketView({
onOpenChange={setIsSettingsOpen}
settings={ingestSettings}
onSettingsChange={setIngestSettings}
showShared={showShared}
/>
</div>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export function IBMCOSBucketView({
addTask={addTask}
onBack={onBack}
onDone={onDone}
showShared
/>
);
}
13 changes: 13 additions & 0 deletions src/api/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ class ConnectorSyncBody(BaseModel):
# rather than failing. Set by the provider upload UI after the user confirms
# overwrite in the duplicate dialog.
replace_duplicates: bool = False
# When True (COS only), index chunks without an owner field so OpenSearch DLS
# makes them visible to all users in the instance. Temporary CIO mechanism;
# not a full ACL feature. Defaults to False (private).
shared: bool = False


class ConnectorCheckDuplicatesBody(BaseModel):
Expand Down Expand Up @@ -544,6 +548,12 @@ async def connector_sync(
)
jwt_token = user.jwt_token

if body.shared and connector_type != "ibm_cos":
return JSONResponse(
{"error": "shared flag is only supported for the ibm_cos connector"},
status_code=400,
)
Comment thread
ricofurtado marked this conversation as resolved.

# Get all active connections for this connector type and user
connections = await connector_service.connection_manager.list_connections(
user_id=user.user_id, connector_type=connector_type
Expand Down Expand Up @@ -611,6 +621,7 @@ async def connector_sync(
file_infos=file_infos,
ingest_settings=body.settings,
replace_duplicates=body.replace_duplicates,
shared=body.shared,
)
elif body.sync_all or body.bucket_filter:
# Full ingest: discover and ingest all files (or files from specific buckets).
Expand Down Expand Up @@ -652,6 +663,7 @@ async def connector_sync(
all_file_ids,
jwt_token=jwt_token,
ingest_settings=body.settings,
shared=body.shared,
)
else:
# sync_all: ingest everything the connector can see
Expand All @@ -660,6 +672,7 @@ async def connector_sync(
user.user_id,
max_files=max_files,
jwt_token=jwt_token,
shared=body.shared,
)
else:
# No files specified - sync only files already in OpenSearch for this connector
Expand Down
4 changes: 4 additions & 0 deletions src/connectors/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ async def sync_connector_files(
jwt_token: str = None,
filename_filter: set = None,
replace_duplicates: bool = False,
shared: bool = False,
) -> str:
"""
Sync files from a connector connection using existing task tracking system.
Expand Down Expand Up @@ -421,6 +422,7 @@ async def sync_connector_files(
),
models_service=self.models_service,
replace_duplicates=replace_duplicates,
shared=shared,
)

# Use file IDs as items (no more fake file paths!)
Expand Down Expand Up @@ -449,6 +451,7 @@ async def sync_specific_files(
file_infos: list[dict[str, Any]] = None,
ingest_settings: dict[str, Any] | None = None,
replace_duplicates: bool = False,
shared: bool = False,
) -> str:
"""
Sync specific files by their IDs (used for webhook-triggered syncs or manual selection).
Expand Down Expand Up @@ -582,6 +585,7 @@ async def sync_specific_files(
models_service=self.models_service,
ingest_settings=ingest_settings,
replace_duplicates=replace_duplicates,
shared=shared,
)

# Create custom task using TaskService
Expand Down
56 changes: 48 additions & 8 deletions src/models/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@
from connectors.base import DocumentACL


def resolve_shared_owner_fields(
user_id: str | None,
owner_name: str | None,
owner_email: str | None,
shared: bool,
) -> tuple[str | None, str | None, str | None]:
"""Return (owner, owner_name, owner_email) for indexing.

When shared=True, all three are None so the indexed chunk omits the owner
field entirely, triggering the OpenSearch DLS must_not-exists-owner clause
that makes the document visible to all users in the instance.
"""
if shared:
return None, None, None
return user_id, owner_name, owner_email


class TaskProcessor:
"""Base class for task processors with shared processing logic"""

Expand Down Expand Up @@ -159,13 +176,17 @@ async def delete_document_by_filename(
filename: str,
opensearch_client,
owner_user_id: str | None = None,
shared: bool = False,
) -> None:
"""
Delete all chunks of a document with the given filename from OpenSearch.
"""
from config.settings import clients, get_index_name
from utils.opensearch_delete import collect_visible_document_ids, delete_document_ids
from utils.opensearch_queries import build_owned_filename_query
from utils.opensearch_queries import (
build_owned_filename_query,
build_replace_filename_query,
)

try:
write_client = clients.opensearch
Expand All @@ -187,11 +208,18 @@ async def delete_document_by_filename(
filename=filename,
)
return

# When shared=True the document being replaced may have previously
# been ingested without an owner field (also shared), so the normal
# owner-scoped query would miss those chunks. Use a broader query
# that covers both owned and ownerless chunks for this filename.
build_query = build_replace_filename_query if shared else build_owned_filename_query

for candidate in candidate_filenames:
document_ids = await collect_visible_document_ids(
opensearch_client,
index=get_index_name(),
query=build_owned_filename_query(candidate, owner_user_id),
query=build_query(candidate, owner_user_id),
)
deleted_count += await delete_document_ids(
write_client,
Expand Down Expand Up @@ -223,6 +251,7 @@ async def process_document_standard(
is_sample_data: bool = False,
acl: "DocumentACL | None" = None,
connector_file_id: str | None = None,
shared: bool = False,
):
"""
Standard processing pipeline for non-Langflow processors:
Expand Down Expand Up @@ -374,9 +403,11 @@ async def process_document_standard(
error=str(e),
)

# Owner is always the authenticated uploading/syncing user. Upstream ACL
# owners/authors only contribute read access through allowed principals.
owner = owner_user_id
# Owner is always the authenticated uploading/syncing user unless shared=True,
# in which case owner fields are omitted so DLS makes the doc visible to all users.
owner, owner_name, owner_email = resolve_shared_owner_fields(
owner_user_id, owner_name, owner_email, shared
)
if acl:
allowed_users = acl.allowed_users or []
allowed_groups = acl.allowed_groups or []
Expand Down Expand Up @@ -603,6 +634,7 @@ def __init__(
models_service=None,
ingest_settings: dict[str, Any] | None = None,
replace_duplicates: bool = False,
shared: bool = False,
):
super().__init__(
document_service=document_service,
Expand All @@ -618,6 +650,7 @@ def __init__(
self.owner_email = owner_email
self.ingest_settings = ingest_settings
self.replace_duplicates = replace_duplicates
self.shared = shared

async def process_item(self, upload_task: UploadTask, item: str, file_task: FileTask) -> None:
"""Process a connector file using unified methods"""
Expand Down Expand Up @@ -757,6 +790,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
document.filename,
opensearch_client,
owner_user_id=self.user_id,
shared=self.shared,
)

# Create temporary file from document content
Expand Down Expand Up @@ -840,15 +874,20 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
{}, connector_tweak_settings
)

effective_owner, effective_owner_name, effective_owner_email = (
resolve_shared_owner_fields(
self.user_id, self.owner_name, self.owner_email, self.shared
)
)
result = await self.connector_service.langflow_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=None,
tweaks=tweaks,
settings=self.ingest_settings,
jwt_token=self.jwt_token,
owner=self.user_id,
owner_name=self.owner_name,
owner_email=self.owner_email,
owner=effective_owner,
owner_name=effective_owner_name,
owner_email=effective_owner_email,
connector_type=connection.connector_type,
docling_polling_service=self.connector_service.task_service.docling_polling_service
if self.connector_service.task_service
Expand Down Expand Up @@ -890,6 +929,7 @@ async def process_item(self, upload_task: UploadTask, item: str, file_task: File
connector_type=connection.connector_type,
acl=document.acl,
connector_file_id=document.id,
shared=self.shared,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
**standard_kwargs,
)

Expand Down
3 changes: 2 additions & 1 deletion src/services/document_index_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ def _build_chunk_document(
else metadata.get("file_size"),
"connector_type": context.connector_type or metadata.get("connector_type") or "local",
"source_url": context.source_url or metadata.get("source_url") or "",
"owner": context.owner,
"allowed_users": list(context.allowed_users),
"allowed_groups": list(context.allowed_groups),
"allowed_principals": unique_acl_principals(context.allowed_principals),
Expand All @@ -219,6 +218,8 @@ def _build_chunk_document(
"metadata": metadata.get("metadata", {}),
}

if context.owner is not None:
doc["owner"] = context.owner
if context.owner_name is not None:
doc["owner_name"] = context.owner_name
if context.owner_email is not None:
Expand Down
14 changes: 7 additions & 7 deletions src/services/langflow_file_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ async def run_ingestion_flow(

# Pass metadata via tweaks to OpenSearch component
metadata_tweaks = []
if owner or owner is None:
if owner:
metadata_tweaks.append({"key": "owner", "value": owner})
if owner_name:
metadata_tweaks.append({"key": "owner_name", "value": owner_name})
Expand Down Expand Up @@ -421,9 +421,9 @@ async def run_ingestion_flow(

headers = {
"X-Langflow-Global-Var-JWT": str(jwt_token or ""),
"X-Langflow-Global-Var-OWNER": str(owner),
"X-Langflow-Global-Var-OWNER_NAME": str(owner_name),
"X-Langflow-Global-Var-OWNER_EMAIL": str(owner_email),
"X-Langflow-Global-Var-OWNER": owner or "",
"X-Langflow-Global-Var-OWNER_NAME": owner_name or "",
"X-Langflow-Global-Var-OWNER_EMAIL": owner_email or "",
"X-Langflow-Global-Var-CONNECTOR_TYPE": str(connector_type),
"X-Langflow-Global-Var-FILENAME": filename,
"X-Langflow-Global-Var-MIMETYPE": mimetype,
Expand Down Expand Up @@ -594,9 +594,9 @@ async def run_url_ingestion_flow(
resolved_document_id = hash_id(io.BytesIO(docs_url.encode("utf-8")))
headers = {
"X-Langflow-Global-Var-JWT": str(jwt_token or ""),
"X-Langflow-Global-Var-OWNER": str(owner),
"X-Langflow-Global-Var-OWNER_NAME": str(owner_name),
"X-Langflow-Global-Var-OWNER_EMAIL": str(owner_email),
"X-Langflow-Global-Var-OWNER": owner or "",
"X-Langflow-Global-Var-OWNER_NAME": owner_name or "",
"X-Langflow-Global-Var-OWNER_EMAIL": owner_email or "",
"X-Langflow-Global-Var-CONNECTOR_TYPE": str(connector_type),
"X-Langflow-Global-Var-SELECTED_EMBEDDING_MODEL": str(embedding_model),
"X-Langflow-Global-Var-DOCUMENT_ID": resolved_document_id,
Expand Down
Loading
Loading