Skip to content

feat(sync): SyncMultiplexer for destination migrations#1194

Closed
orhanrauf wants to merge 1 commit intofeat/raw-data-capturefrom
feat/sync-multiplexer
Closed

feat(sync): SyncMultiplexer for destination migrations#1194
orhanrauf wants to merge 1 commit intofeat/raw-data-capturefrom
feat/sync-multiplexer

Conversation

@orhanrauf
Copy link
Member

@orhanrauf orhanrauf commented Dec 29, 2025

Summary

Adds SyncMultiplexer for managing destination migrations (blue-green deployments) and refactors SyncFactory into modular builders.

Depends on: #1189 (ARF raw data capture)


🔀 SyncMultiplexer

Enables blue-green vector DB migrations (Qdrant → Vespa, config v0 → v1, etc.)

Operations:

Operation Description
fork Add shadow destination + replay from ARF
switch Promote shadow → active (demotes current)
resync Force full sync to refresh ARF store
list Show all destinations with roles

Destination roles: ACTIVE / SHADOW / DEPRECATED

API: /sync-multiplex/{sync_id}/destinations/...

Gating: Requires SYNC_MULTIPLEXER feature flag


🏗️ Factory Refactor

Split monolithic SyncFactory into focused builders:

sync/factory/
├── __init__.py      # Exposes SyncFactory
├── _factory.py      # Main orchestrator creation
├── _source.py       # Auth, tokens, HTTP client
├── _destination.py  # Connections, native Qdrant
├── _context.py      # SyncContext + ReplayContext
└── _pipeline.py     # Handler assembly

Changes

New:

  • platform/sync/multiplex/ - Multiplexer + ARF replay
  • platform/sync/factory/ - Modular builders
  • api/v1/endpoints/sync_multiplex.py - API endpoints
  • crud/crud_sync_connection.py - Role-based filtering
  • schemas/sync_connection.py - Request/response models
  • Migration: add_role_to_sync_connection.py

Modified:

  • models/sync_connection.py - Added DestinationRole enum + role column
  • core/shared_models.py - Added SYNC_MULTIPLEXER feature flag

Summary by cubic

Introduces SyncMultiplexer for blue-green destination migrations with ARF replay, enabling safe vector DB switches without downtime. Refactors SyncFactory into modular builders to simplify orchestration and reuse.

  • New Features

    • SyncMultiplexer to manage destination roles (ACTIVE, SHADOW, DEPRECATED).
    • Fork, switch, resync, and list operations; gated by SYNC_MULTIPLEXER; API endpoints added.
    • ARF replay to populate shadow destinations without re-pulling from sources.
    • DB migration adds role column to sync_connection; new CRUD and schemas for role-based filtering.
  • Refactors

    • Split SyncFactory into modular builders (_source, _destination, _context, _pipeline).
    • Builders reused for replay (ARFReplaySource, replay_to_destination).
    • API router updated to include multiplex routes.

Written for commit 6db4ca7. Summary will update automatically on new commits.

- Add SyncMultiplexer for managing multiple destinations per sync
- Implement fork/switch/resync operations for blue-green deployments
- Add ARFReplaySource for replaying entities from raw data store
- Refactor SyncFactory into modular builders (_source, _destination, _context, _pipeline)
- Add DestinationRole enum (ACTIVE, SHADOW, DEPRECATED) to SyncConnection
- Add feature flag SYNC_MULTIPLEXER for gating access
- Add CRUD layer for SyncConnection with role-based filtering
- Add API endpoints for multiplex operations
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 issues found across 21 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/airweave/platform/sync/factory/__init__.py">

<violation number="1" location="backend/airweave/platform/sync/factory/__init__.py:15">
P2: Rule violated: **Check for Cursor Rules Drift**

The sync-architecture cursor rule needs updating to reflect these architectural changes. The rule at `.cursor/rules/sync-architecture.mdc` describes `SyncFactory` as a monolithic factory but the PR refactors it into modular builders (`_factory.py`, `_source.py`, `_destination.py`, `_context.py`, `_pipeline.py`). Additionally, the new `SyncMultiplexer` for blue-green destination migrations (with `ACTIVE`/`SHADOW`/`DEPRECATED` roles) is not documented.

Consider updating the cursor rule to:
1. Document the new modular factory structure under `platform/sync/factory/`
2. Add a section for internal builders and their purposes
3. Document the SyncMultiplexer component and destination migration workflow</violation>
</file>

<file name="backend/airweave/platform/sync/multiplex/replay.py">

<violation number="1" location="backend/airweave/platform/sync/multiplex/replay.py:25">
P2: Rule violated: **Check for Cursor Rules Drift**

Cursor rules drift detected: The sync architecture documentation needs updating to reflect the factory refactor and ARF replay capabilities.

**Affected rules:**
- `.cursor/rules/sync-architecture.mdc` - Documents `SyncFactory` as monolithic, but PR refactors into modular builders (`DestinationBuilder`, `ReplayContextBuilder`, `PipelineBuilder`)
- `.cursor/rules/arf.mdc` - Only documents ARF capture (write), missing replay capabilities (`ARFReplaySource`, `iter_entities_for_replay`, `get_replay_stats`)

**Missing documentation:**
- New `sync/factory/` module structure with builders
- `SyncMultiplexer` for blue-green destination migrations
- ARF replay workflow and `ARFReplaySource` pseudo-source

Consider updating these rules to prevent AI assistants from generating outdated patterns.</violation>

<violation number="2" location="backend/airweave/platform/sync/multiplex/replay.py:141">
P2: Raising `ValueError` for a missing ARF store may surface as a 500. Prefer `NotFoundException` (or a domain-specific exception that your exception handlers map) so the API returns a predictable status code.</violation>
</file>

<file name="backend/airweave/platform/sync/factory/_context.py">

<violation number="1" location="backend/airweave/platform/sync/factory/_context.py:105">
P1: `SyncContext.connection` is set to `None`, but the orchestrator later dereferences `sync_context.connection.*`. Pass the real source connection schema here (e.g., from `source_connection_data`).</violation>
</file>

<file name="backend/airweave/platform/sync/factory/_pipeline.py">

<violation number="1" location="backend/airweave/platform/sync/factory/_pipeline.py:92">
P0: RAW_ENTITIES destinations are collected but never wired into a handler, so self-processing destinations (e.g., Vespa) won’t receive any inserts/updates/deletes.</violation>

<violation number="2" location="backend/airweave/platform/sync/factory/_pipeline.py:116">
P3: Dead code: `if not handlers and logger:` is unreachable because `PostgresMetadataHandler()` is always appended.</violation>
</file>

<file name="backend/airweave/crud/crud_sync_connection.py">

<violation number="1" location="backend/airweave/crud/crud_sync_connection.py:21">
P1: This CRUD bypasses the project’s standard ApiContext/org access validation and relies on callers to enforce authorization. That’s a security footgun for cross-org reads/updates/deletes. Prefer requiring `ctx: ApiContext` and validating access (e.g., via joining `Sync` to check `organization_id`) inside these methods.</violation>
</file>

<file name="backend/airweave/platform/sync/multiplex/multiplexer.py">

<violation number="1" location="backend/airweave/platform/sync/multiplex/multiplexer.py:116">
P1: `sync.destination_connection_ids` appears to not exist in the current Sync model, so `fork()` will crash with AttributeError. Guard this update (or remove it) if the field isn’t present.</violation>
</file>

Reply to cubic to teach it or ask questions. Tag @cubic-dev-ai to re-run a review.

from airweave.platform.sync.factory._pipeline import PipelineBuilder
"""

from airweave.platform.sync.factory._factory import SyncFactory
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Rule violated: Check for Cursor Rules Drift

The sync-architecture cursor rule needs updating to reflect these architectural changes. The rule at .cursor/rules/sync-architecture.mdc describes SyncFactory as a monolithic factory but the PR refactors it into modular builders (_factory.py, _source.py, _destination.py, _context.py, _pipeline.py). Additionally, the new SyncMultiplexer for blue-green destination migrations (with ACTIVE/SHADOW/DEPRECATED roles) is not documented.

Consider updating the cursor rule to:

  1. Document the new modular factory structure under platform/sync/factory/
  2. Add a section for internal builders and their purposes
  3. Document the SyncMultiplexer component and destination migration workflow
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/factory/__init__.py, line 15:

<comment>The sync-architecture cursor rule needs updating to reflect these architectural changes. The rule at `.cursor/rules/sync-architecture.mdc` describes `SyncFactory` as a monolithic factory but the PR refactors it into modular builders (`_factory.py`, `_source.py`, `_destination.py`, `_context.py`, `_pipeline.py`). Additionally, the new `SyncMultiplexer` for blue-green destination migrations (with `ACTIVE`/`SHADOW`/`DEPRECATED` roles) is not documented.

Consider updating the cursor rule to:
1. Document the new modular factory structure under `platform/sync/factory/`
2. Add a section for internal builders and their purposes
3. Document the SyncMultiplexer component and destination migration workflow</comment>

<file context>
@@ -0,0 +1,17 @@
+    from airweave.platform.sync.factory._pipeline import PipelineBuilder
+&quot;&quot;&quot;
+
+from airweave.platform.sync.factory._factory import SyncFactory
+
+__all__ = [&quot;SyncFactory&quot;]
</file context>
Fix with Cubic

@@ -0,0 +1,246 @@
"""Replay service - populates destinations from ARF storage.
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Rule violated: Check for Cursor Rules Drift

Cursor rules drift detected: The sync architecture documentation needs updating to reflect the factory refactor and ARF replay capabilities.

Affected rules:

  • .cursor/rules/sync-architecture.mdc - Documents SyncFactory as monolithic, but PR refactors into modular builders (DestinationBuilder, ReplayContextBuilder, PipelineBuilder)
  • .cursor/rules/arf.mdc - Only documents ARF capture (write), missing replay capabilities (ARFReplaySource, iter_entities_for_replay, get_replay_stats)

Missing documentation:

  • New sync/factory/ module structure with builders
  • SyncMultiplexer for blue-green destination migrations
  • ARF replay workflow and ARFReplaySource pseudo-source

Consider updating these rules to prevent AI assistants from generating outdated patterns.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/multiplex/replay.py, line 25:

<comment>Cursor rules drift detected: The sync architecture documentation needs updating to reflect the factory refactor and ARF replay capabilities.

**Affected rules:**
- `.cursor/rules/sync-architecture.mdc` - Documents `SyncFactory` as monolithic, but PR refactors into modular builders (`DestinationBuilder`, `ReplayContextBuilder`, `PipelineBuilder`)
- `.cursor/rules/arf.mdc` - Only documents ARF capture (write), missing replay capabilities (`ARFReplaySource`, `iter_entities_for_replay`, `get_replay_stats`)

**Missing documentation:**
- New `sync/factory/` module structure with builders
- `SyncMultiplexer` for blue-green destination migrations
- ARF replay workflow and `ARFReplaySource` pseudo-source

Consider updating these rules to prevent AI assistants from generating outdated patterns.</comment>

<file context>
@@ -0,0 +1,246 @@
+from airweave.platform.entities._base import BaseEntity
+from airweave.platform.sources._base import BaseSource
+from airweave.platform.sync.factory import SyncFactory
+from airweave.platform.sync.factory._context import ReplayContextBuilder
+from airweave.platform.sync.factory._destination import DestinationBuilder
+from airweave.platform.sync.factory._pipeline import PipelineBuilder
</file context>
Fix with Cubic

sync=sync,
sync_job=sync_job,
collection=collection,
connection=None,
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: SyncContext.connection is set to None, but the orchestrator later dereferences sync_context.connection.*. Pass the real source connection schema here (e.g., from source_connection_data).

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/factory/_context.py, line 105:

<comment>`SyncContext.connection` is set to `None`, but the orchestrator later dereferences `sync_context.connection.*`. Pass the real source connection schema here (e.g., from `source_connection_data`).</comment>

<file context>
@@ -0,0 +1,243 @@
+            sync=sync,
+            sync_job=sync_job,
+            collection=collection,
+            connection=None,
+            entity_tracker=entity_tracker,
+            state_publisher=state_publisher,
</file context>
Fix with Cubic


handlers.append(PostgresMetadataHandler())

if not handlers and logger:
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: Dead code: if not handlers and logger: is unreachable because PostgresMetadataHandler() is always appended.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/factory/_pipeline.py, line 116:

<comment>Dead code: `if not handlers and logger:` is unreachable because `PostgresMetadataHandler()` is always appended.</comment>

<file context>
@@ -0,0 +1,119 @@
+
+        handlers.append(PostgresMetadataHandler())
+
+        if not handlers and logger:
+            logger.warning(&quot;No destination handlers created - sync has no valid destinations&quot;)
+
</file context>
Fix with Cubic

requirement = dest.processing_requirement
if requirement == ProcessingRequirement.CHUNKS_AND_EMBEDDINGS:
vector_db_destinations.append(dest)
elif requirement == ProcessingRequirement.RAW_ENTITIES:
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0: RAW_ENTITIES destinations are collected but never wired into a handler, so self-processing destinations (e.g., Vespa) won’t receive any inserts/updates/deletes.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/factory/_pipeline.py, line 92:

<comment>RAW_ENTITIES destinations are collected but never wired into a handler, so self-processing destinations (e.g., Vespa) won’t receive any inserts/updates/deletes.</comment>

<file context>
@@ -0,0 +1,119 @@
+            requirement = dest.processing_requirement
+            if requirement == ProcessingRequirement.CHUNKS_AND_EMBEDDINGS:
+                vector_db_destinations.append(dest)
+            elif requirement == ProcessingRequirement.RAW_ENTITIES:
+                self_processing_destinations.append(dest)
+            else:
</file context>
Fix with Cubic

"""CRUD operations for sync connections.

Note: SyncConnection doesn't have organization_id directly.
Access control should be enforced at the Sync level before calling these methods.
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: This CRUD bypasses the project’s standard ApiContext/org access validation and relies on callers to enforce authorization. That’s a security footgun for cross-org reads/updates/deletes. Prefer requiring ctx: ApiContext and validating access (e.g., via joining Sync to check organization_id) inside these methods.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/crud/crud_sync_connection.py, line 21:

<comment>This CRUD bypasses the project’s standard ApiContext/org access validation and relies on callers to enforce authorization. That’s a security footgun for cross-org reads/updates/deletes. Prefer requiring `ctx: ApiContext` and validating access (e.g., via joining `Sync` to check `organization_id`) inside these methods.</comment>

<file context>
@@ -0,0 +1,285 @@
+    &quot;&quot;&quot;CRUD operations for sync connections.
+
+    Note: SyncConnection doesn&#39;t have organization_id directly.
+    Access control should be enforced at the Sync level before calling these methods.
+    &quot;&quot;&quot;
+
</file context>
Fix with Cubic

# 1. Validate ARF store exists
arf_stats = await raw_data_service.get_replay_stats(str(sync_id))
if not arf_stats.get("exists"):
raise ValueError(f"No ARF store found for sync {sync_id}")
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Raising ValueError for a missing ARF store may surface as a 500. Prefer NotFoundException (or a domain-specific exception that your exception handlers map) so the API returns a predictable status code.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/multiplex/replay.py, line 141:

<comment>Raising `ValueError` for a missing ARF store may surface as a 500. Prefer `NotFoundException` (or a domain-specific exception that your exception handlers map) so the API returns a predictable status code.</comment>

<file context>
@@ -0,0 +1,246 @@
+    # 1. Validate ARF store exists
+    arf_stats = await raw_data_service.get_replay_stats(str(sync_id))
+    if not arf_stats.get(&quot;exists&quot;):
+        raise ValueError(f&quot;No ARF store found for sync {sync_id}&quot;)
+
+    entity_count = arf_stats.get(&quot;entity_count&quot;, 0)
</file context>
Fix with Cubic


# Also update sync.destination_connection_ids to include the new destination
# This ensures backward compatibility with existing sync flow
current_dest_ids = list(sync.destination_connection_ids or [])
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: sync.destination_connection_ids appears to not exist in the current Sync model, so fork() will crash with AttributeError. Guard this update (or remove it) if the field isn’t present.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/multiplex/multiplexer.py, line 116:

<comment>`sync.destination_connection_ids` appears to not exist in the current Sync model, so `fork()` will crash with AttributeError. Guard this update (or remove it) if the field isn’t present.</comment>

<file context>
@@ -0,0 +1,403 @@
+
+            # Also update sync.destination_connection_ids to include the new destination
+            # This ensures backward compatibility with existing sync flow
+            current_dest_ids = list(sync.destination_connection_ids or [])
+            if destination_connection_id not in current_dest_ids:
+                current_dest_ids.append(destination_connection_id)
</file context>
Fix with Cubic

@orhanrauf orhanrauf closed this Jan 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant