Skip to content
2 changes: 2 additions & 0 deletions backend/airweave/api/v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
source_rate_limits,
sources,
sync,
sync_multiplex,
transformers,
usage,
users,
Expand Down Expand Up @@ -52,6 +53,7 @@
source_rate_limits.router, prefix="/source-rate-limits", tags=["source-rate-limits"]
)
api_router.include_router(sync.router, prefix="/sync", tags=["sync"])
api_router.include_router(sync_multiplex.router, prefix="/sync", tags=["sync-multiplex"])
api_router.include_router(entities.router, prefix="/entities", tags=["entities"])
api_router.include_router(entity_counts.router, prefix="/entity-counts", tags=["entity-counts"])
api_router.include_router(transformers.router, prefix="/transformers", tags=["transformers"])
Expand Down
206 changes: 206 additions & 0 deletions backend/airweave/api/v1/endpoints/sync_multiplex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""Endpoints for sync multiplexing (destination migrations).

Enables blue-green deployments for vector DB migrations:
- Fork: Add shadow destination + optionally replay from ARF
- Switch: Promote shadow to active
- List: Show all destinations with roles
- Resync: Force full sync from source to refresh ARF

Feature-gated: Requires SYNC_MULTIPLEXER feature flag enabled for the organization.
"""

from typing import List
from uuid import UUID

from fastapi import Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from airweave import schemas
from airweave.api import deps
from airweave.api.context import ApiContext
from airweave.api.router import TrailingSlashRouter
from airweave.core.shared_models import FeatureFlag
from airweave.platform.sync.multiplex.multiplexer import SyncMultiplexer

router = TrailingSlashRouter()


def _require_multiplexer_feature(ctx: ApiContext) -> None:
"""Check if organization has multiplexer feature enabled.

Args:
ctx: API context

Raises:
HTTPException: If feature not enabled
"""
if not ctx.has_feature(FeatureFlag.SYNC_MULTIPLEXER):
raise HTTPException(
status_code=403,
detail="Sync multiplexer feature is not enabled for this organization",
)


@router.get(
"/{sync_id}/destinations",
response_model=List[schemas.DestinationSlotInfo],
summary="List destination slots",
description="List all destinations for a sync with their roles (active/shadow/deprecated).",
)
async def list_destinations(
sync_id: UUID,
db: AsyncSession = Depends(deps.get_db),
ctx: ApiContext = Depends(deps.get_context),
) -> List[schemas.DestinationSlotInfo]:
"""List all destination slots for a sync.

Returns slots sorted by role: ACTIVE first, then SHADOW, then DEPRECATED.
"""
_require_multiplexer_feature(ctx)
multiplexer = SyncMultiplexer(db, ctx, ctx.logger)
return await multiplexer.list_destinations(sync_id)


@router.post(
"/{sync_id}/destinations/fork",
response_model=schemas.ForkDestinationResponse,
summary="Fork a new destination",
description="Add a shadow destination for migration testing. Optionally replay from ARF store.",
)
async def fork_destination(
sync_id: UUID,
request: schemas.ForkDestinationRequest,
db: AsyncSession = Depends(deps.get_db),
ctx: ApiContext = Depends(deps.get_context),
) -> schemas.ForkDestinationResponse:
"""Fork a new shadow destination.

Creates a new destination slot with SHADOW role. If replay_from_arf is True,
entities will be replayed from the ARF store to populate the new destination.

Args:
sync_id: Sync ID to fork destination for
request: Fork request with destination connection ID and replay flag
db: Database session
ctx: API context

Returns:
ForkDestinationResponse with slot and optional replay job info
"""
_require_multiplexer_feature(ctx)
multiplexer = SyncMultiplexer(db, ctx, ctx.logger)
slot, replay_job = await multiplexer.fork(
sync_id=sync_id,
destination_connection_id=request.destination_connection_id,
replay_from_arf=request.replay_from_arf,
)

slot_schema = schemas.SyncConnectionSchema(
id=slot.id,
sync_id=slot.sync_id,
connection_id=slot.connection_id,
role=slot.role,
created_at=slot.created_at,
modified_at=slot.modified_at,
)

return schemas.ForkDestinationResponse(
slot=slot_schema,
replay_job_id=replay_job.id if replay_job else None,
replay_job_status=replay_job.status.value if replay_job else None,
)


@router.post(
"/{sync_id}/destinations/{slot_id}/switch",
response_model=schemas.SwitchDestinationResponse,
summary="Switch active destination",
description="Promote a shadow destination to active. The current active becomes deprecated.",
)
async def switch_destination(
sync_id: UUID,
slot_id: UUID,
db: AsyncSession = Depends(deps.get_db),
ctx: ApiContext = Depends(deps.get_context),
) -> schemas.SwitchDestinationResponse:
"""Switch the active destination.

Promotes the specified shadow slot to ACTIVE and demotes the current
ACTIVE slot to DEPRECATED.

Args:
sync_id: Sync ID
slot_id: Slot ID to promote to active
db: Database session
ctx: API context

Returns:
Switch response with new and previous active slot IDs
"""
_require_multiplexer_feature(ctx)
multiplexer = SyncMultiplexer(db, ctx, ctx.logger)
return await multiplexer.switch(sync_id=sync_id, new_active_slot_id=slot_id)


@router.post(
"/{sync_id}/resync",
response_model=schemas.SyncJob,
summary="Resync from source",
description="Force a full sync from the source to refresh the ARF store.",
)
async def resync_from_source(
sync_id: UUID,
db: AsyncSession = Depends(deps.get_db),
ctx: ApiContext = Depends(deps.get_context),
) -> schemas.SyncJob:
"""Force full sync from source to refresh ARF.

Triggers a full sync (ignoring cursor) to ensure the ARF store is up-to-date
before forking to a new destination.

Args:
sync_id: Sync ID
db: Database session
ctx: API context

Returns:
SyncJob for tracking progress
"""
_require_multiplexer_feature(ctx)
multiplexer = SyncMultiplexer(db, ctx, ctx.logger)
return await multiplexer.resync_from_source(sync_id=sync_id)


@router.get(
"/{sync_id}/destinations/active",
response_model=schemas.DestinationSlotInfo,
summary="Get active destination",
description="Get the currently active destination for a sync.",
)
async def get_active_destination(
sync_id: UUID,
db: AsyncSession = Depends(deps.get_db),
ctx: ApiContext = Depends(deps.get_context),
) -> schemas.DestinationSlotInfo:
"""Get the active destination slot.

Args:
sync_id: Sync ID
db: Database session
ctx: API context

Returns:
Active destination info

Raises:
HTTPException: If no active destination found
"""
_require_multiplexer_feature(ctx)
multiplexer = SyncMultiplexer(db, ctx, ctx.logger)
active = await multiplexer.get_active_destination(sync_id)
if not active:
raise HTTPException(
status_code=404,
detail=f"No active destination found for sync {sync_id}",
)
return active
1 change: 1 addition & 0 deletions backend/airweave/core/shared_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class FeatureFlag(str, Enum):
PRIORITY_SUPPORT = "priority_support"
SOURCE_RATE_LIMITING = "source_rate_limiting"
ZEPHYR_SCALE = "zephyr_scale" # Enables Zephyr Scale test management sync for Jira
SYNC_MULTIPLEXER = "sync_multiplexer" # Destination multiplexing for migrations
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Rule violated: Check for Cursor Rules Drift

Missing frontend feature flag constant. Per the documented process in .cursor/rules/feature-flags.mdc, new feature flags must also be added to frontend/src/lib/constants/feature-flags.ts. The frontend constants file states it "Must match backend FeatureFlag enum exactly."

Add to frontend/src/lib/constants/feature-flags.ts:

// Sync Features
SYNC_MULTIPLEXER: 'sync_multiplexer',
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/core/shared_models.py, line 105:

<comment>Missing frontend feature flag constant. Per the documented process in `.cursor/rules/feature-flags.mdc`, new feature flags must also be added to `frontend/src/lib/constants/feature-flags.ts`. The frontend constants file states it &quot;Must match backend FeatureFlag enum exactly.&quot;

Add to `frontend/src/lib/constants/feature-flags.ts`:
```typescript
// Sync Features
SYNC_MULTIPLEXER: &#39;sync_multiplexer&#39;,
```</comment>

<file context>
@@ -102,6 +102,7 @@ class FeatureFlag(str, Enum):
     PRIORITY_SUPPORT = &quot;priority_support&quot;
     SOURCE_RATE_LIMITING = &quot;source_rate_limiting&quot;
     ZEPHYR_SCALE = &quot;zephyr_scale&quot;  # Enables Zephyr Scale test management sync for Jira
+    SYNC_MULTIPLEXER = &quot;sync_multiplexer&quot;  # Destination multiplexing for migrations
 
 
</file context>
Fix with Cubic



class AuthMethod(str, Enum):
Expand Down
8 changes: 5 additions & 3 deletions backend/airweave/core/source_connection_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Clean source connection service with auth method inference."""

from datetime import datetime
from typing import TYPE_CHECKING, Any, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from uuid import UUID

if TYPE_CHECKING:
Expand Down Expand Up @@ -1488,6 +1488,7 @@ async def run(
id: UUID,
ctx: ApiContext,
force_full_sync: bool = False,
execution_config: Optional[Dict[str, Any]] = None,
) -> schemas.SourceConnectionJob:
"""Trigger a sync run for a source connection.

Expand All @@ -1498,6 +1499,7 @@ async def run(
force_full_sync: If True, forces a full sync with orphaned entity cleanup.
Only allowed for continuous syncs (syncs with cursor data).
Raises HTTPException if used on non-continuous syncs.
execution_config: Optional execution config dict to persist in DB for worker
"""
source_conn = await crud.source_connection.get(db, id=id, ctx=ctx)
if not source_conn:
Expand Down Expand Up @@ -1541,9 +1543,9 @@ async def run(
db=db, source_connection=source_conn, ctx=ctx
)

# Trigger sync through Temporal only
# Trigger sync through Temporal only (stores execution_config in DB)
sync, sync_job = await sync_service.trigger_sync_run(
db, sync_id=source_conn.sync_id, ctx=ctx
db, sync_id=source_conn.sync_id, ctx=ctx, execution_config=execution_config
)

await temporal_service.run_source_connection_workflow(
Expand Down
21 changes: 18 additions & 3 deletions backend/airweave/core/sync_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Refactored sync service with Temporal-only execution."""

from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from uuid import UUID

from fastapi import HTTPException
Expand All @@ -15,6 +15,7 @@
from airweave.db.unit_of_work import UnitOfWork
from airweave.models.sync import Sync
from airweave.models.sync_job import SyncJob
from airweave.platform.sync.config import SyncExecutionConfig
from airweave.platform.sync.factory import SyncFactory
from airweave.platform.temporal.schedule_service import temporal_schedule_service

Expand Down Expand Up @@ -100,6 +101,7 @@ async def run(
ctx: ApiContext,
access_token: Optional[str] = None,
force_full_sync: bool = False,
execution_config: Optional[SyncExecutionConfig] = None,
) -> schemas.Sync:
"""Run a sync.

Expand All @@ -113,6 +115,8 @@ async def run(
access_token (Optional[str]): Optional access token to use
instead of stored credentials.
force_full_sync (bool): If True, forces a full sync with orphaned entity deletion.
execution_config (Optional[SyncExecutionConfig]): Optional execution config
for controlling sync behavior (destination filtering, handler toggles, etc.)

Returns:
-------
Expand All @@ -130,6 +134,7 @@ async def run(
ctx=ctx,
access_token=access_token,
force_full_sync=force_full_sync,
execution_config=execution_config,
)
except Exception as e:
ctx.logger.error(f"Error during sync orchestrator creation: {e}")
Expand All @@ -151,13 +156,15 @@ async def trigger_sync_run(
db: AsyncSession,
sync_id: UUID,
ctx: ApiContext,
execution_config: Optional[Dict[str, Any]] = None,
) -> Tuple[schemas.Sync, schemas.SyncJob]:
"""Trigger a manual sync run.

Args:
db: Database session
sync_id: Sync ID to run
ctx: API context
execution_config: Optional execution config dict to persist in DB

Returns:
Tuple of (sync, sync_job) schemas
Expand Down Expand Up @@ -192,7 +199,7 @@ async def trigger_sync_run(

# Create sync job
async with UnitOfWork(db) as uow:
sync_job = await self._create_sync_job(uow.session, sync_id, ctx, uow)
sync_job = await self._create_sync_job(uow.session, sync_id, ctx, uow, execution_config)

await uow.commit()
await uow.session.refresh(sync_job)
Expand All @@ -206,14 +213,22 @@ async def _create_sync_job(
sync_id: UUID,
ctx: ApiContext,
uow: UnitOfWork,
execution_config: Optional[Dict[str, Any]] = None,
) -> SyncJob:
"""Create a sync job record."""
ctx.logger.info(f"Creating sync job with execution_config: {execution_config}")
sync_job_in = schemas.SyncJobCreate(
sync_id=sync_id,
status=SyncJobStatus.PENDING,
execution_config_json=execution_config,
)
ctx.logger.info(f"SyncJobCreate schema: {sync_job_in.model_dump()}")

return await crud.sync_job.create(db, obj_in=sync_job_in, ctx=ctx, uow=uow)
result = await crud.sync_job.create(db, obj_in=sync_job_in, ctx=ctx, uow=uow)
ctx.logger.info(
f"Created sync job with execution_config_json: {result.execution_config_json}"
)
return result

async def list_sync_jobs(
self,
Expand Down
2 changes: 2 additions & 0 deletions backend/airweave/crud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .crud_source_connection import source_connection
from .crud_source_rate_limit import source_rate_limit
from .crud_sync import sync
from .crud_sync_connection import sync_connection
from .crud_sync_cursor import sync_cursor
from .crud_sync_job import sync_job
from .crud_transformer import transformer
Expand Down Expand Up @@ -48,6 +49,7 @@
"source_connection",
"source_rate_limit",
"sync",
"sync_connection",
"sync_cursor",
"sync_job",
"transformer",
Expand Down
Loading
Loading