feat(sync): add multiplexer module for destination migrations with database-persisted SyncExecutionConfig#1201
feat(sync): add multiplexer module for destination migrations with database-persisted SyncExecutionConfig#1201felixschmetz wants to merge 9 commits intomainfrom
Conversation
- Add SyncMultiplexer for managing multiple destinations per sync - Implement fork/switch/resync operations for blue-green deployments - Add ARFReplaySource for replaying entities from raw data store - Refactor SyncFactory into modular builders (_source, _destination, _context, _pipeline) - Add DestinationRole enum (ACTIVE, SHADOW, DEPRECATED) to SyncConnection - Add feature flag SYNC_MULTIPLEXER for gating access - Add CRUD layer for SyncConnection with role-based filtering - Add API endpoints for multiplex operations
This reverts commit 237988f.
Enables database-first approach for sync execution configuration. Config is persisted in DB and refetched by worker to avoid Temporal bloat.
There was a problem hiding this comment.
5 issues found across 32 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/airweave/platform/sync/orchestrator.py">
<violation number="1" location="backend/airweave/platform/sync/orchestrator.py:545">
P2: Rule violated: **Check for Cursor Rules Drift**
Cursor Rules Drift: The new `skip_cursor_updates` configuration option changes when cursors are saved, but `.cursor/rules/connector-cursors.mdc` states "Saves cursor after successful sync" without documenting this exception.
Consider updating the "System Behavior" section in `.cursor/rules/connector-cursors.mdc` to include:
- Saves cursor after successful sync (unless execution_config.skip_cursor_updates is set, e.g., for ARF-only syncs)
</file>
<file name="backend/airweave/core/shared_models.py">
<violation number="1" location="backend/airweave/core/shared_models.py:105">
P2: Rule violated: **Check for Cursor Rules Drift**
Missing frontend feature flag constant. Per the documented process in `.cursor/rules/feature-flags.mdc`, new feature flags must also be added to `frontend/src/lib/constants/feature-flags.ts`. The frontend constants file states it "Must match backend FeatureFlag enum exactly."
Add to `frontend/src/lib/constants/feature-flags.ts`:
```typescript
// Sync Features
SYNC_MULTIPLEXER: 'sync_multiplexer',
```</violation>
</file>
<file name="backend/airweave/platform/sync/factory/_factory.py">
<violation number="1" location="backend/airweave/platform/sync/factory/_factory.py:21">
P2: Rule violated: **Check for Cursor Rules Drift**
Cursor rule `.cursor/rules/sync-architecture.mdc` needs updating to reflect the new Builder-based factory architecture. The rule currently documents `SyncFactory` as monolithic but doesn't mention the new modular pattern with `SourceBuilder`, `DestinationBuilder`, `ContextBuilder`, and `PipelineBuilder` classes. Developers using Cursor will receive outdated guidance about sync factory structure.</violation>
</file>
<file name="backend/airweave/crud/crud_sync_connection.py">
<violation number="1" location="backend/airweave/crud/crud_sync_connection.py:179">
P0: `UnitOfWork` does not have a `flush()` method - this will raise `AttributeError` at runtime. Based on the base class pattern in `_base_organization.py`, when `uow` is provided, no action should be taken (the UoW context manager handles commit on exit).</violation>
</file>
<file name="backend/airweave/platform/sync/factory/_pipeline.py">
<violation number="1" location="backend/airweave/platform/sync/factory/_pipeline.py:94">
P1: The `self_processing_destinations` list is populated but never used. Destinations with `ProcessingRequirement.RAW_ENTITIES` are collected but no handler is created for them, meaning they will be silently ignored during sync operations.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| async def _save_cursor_data(self) -> None: | ||
| """Save cursor data to database if it exists.""" | ||
| # Skip cursor updates if configured (e.g., for ARF-only syncs) | ||
| if self.sync_context.execution_config and self.sync_context.execution_config.skip_cursor_updates: |
There was a problem hiding this comment.
P2: Rule violated: Check for Cursor Rules Drift
Cursor Rules Drift: The new skip_cursor_updates configuration option changes when cursors are saved, but .cursor/rules/connector-cursors.mdc states "Saves cursor after successful sync" without documenting this exception.
Consider updating the "System Behavior" section in .cursor/rules/connector-cursors.mdc to include:
- Saves cursor after successful sync (unless execution_config.skip_cursor_updates is set, e.g., for ARF-only syncs)
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/orchestrator.py, line 545:
<comment>Cursor Rules Drift: The new `skip_cursor_updates` configuration option changes when cursors are saved, but `.cursor/rules/connector-cursors.mdc` states "Saves cursor after successful sync" without documenting this exception.
Consider updating the "System Behavior" section in `.cursor/rules/connector-cursors.mdc` to include:
- Saves cursor after successful sync (unless execution_config.skip_cursor_updates is set, e.g., for ARF-only syncs)
<file context>
@@ -541,6 +541,13 @@ async def _complete_sync(self) -> None:
async def _save_cursor_data(self) -> None:
"""Save cursor data to database if it exists."""
+ # Skip cursor updates if configured (e.g., for ARF-only syncs)
+ if self.sync_context.execution_config and self.sync_context.execution_config.skip_cursor_updates:
+ self.sync_context.logger.info(
+ "⏭️ Skipping cursor update (disabled by execution_config)"
</file context>
| PRIORITY_SUPPORT = "priority_support" | ||
| SOURCE_RATE_LIMITING = "source_rate_limiting" | ||
| ZEPHYR_SCALE = "zephyr_scale" # Enables Zephyr Scale test management sync for Jira | ||
| SYNC_MULTIPLEXER = "sync_multiplexer" # Destination multiplexing for migrations |
There was a problem hiding this comment.
P2: Rule violated: Check for Cursor Rules Drift
Missing frontend feature flag constant. Per the documented process in .cursor/rules/feature-flags.mdc, new feature flags must also be added to frontend/src/lib/constants/feature-flags.ts. The frontend constants file states it "Must match backend FeatureFlag enum exactly."
Add to frontend/src/lib/constants/feature-flags.ts:
// Sync Features
SYNC_MULTIPLEXER: 'sync_multiplexer',Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/core/shared_models.py, line 105:
<comment>Missing frontend feature flag constant. Per the documented process in `.cursor/rules/feature-flags.mdc`, new feature flags must also be added to `frontend/src/lib/constants/feature-flags.ts`. The frontend constants file states it "Must match backend FeatureFlag enum exactly."
Add to `frontend/src/lib/constants/feature-flags.ts`:
```typescript
// Sync Features
SYNC_MULTIPLEXER: 'sync_multiplexer',
```</comment>
<file context>
@@ -102,6 +102,7 @@ class FeatureFlag(str, Enum):
PRIORITY_SUPPORT = "priority_support"
SOURCE_RATE_LIMITING = "source_rate_limiting"
ZEPHYR_SCALE = "zephyr_scale" # Enables Zephyr Scale test management sync for Jira
+ SYNC_MULTIPLEXER = "sync_multiplexer" # Destination multiplexing for migrations
</file context>
| from airweave.db.init_db_native import init_db_with_entity_definitions | ||
| from airweave.platform.entities._base import BaseEntity | ||
| from airweave.platform.sync.config import SyncExecutionConfig | ||
| from airweave.platform.sync.factory._context import ContextBuilder |
There was a problem hiding this comment.
P2: Rule violated: Check for Cursor Rules Drift
Cursor rule .cursor/rules/sync-architecture.mdc needs updating to reflect the new Builder-based factory architecture. The rule currently documents SyncFactory as monolithic but doesn't mention the new modular pattern with SourceBuilder, DestinationBuilder, ContextBuilder, and PipelineBuilder classes. Developers using Cursor will receive outdated guidance about sync factory structure.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/factory/_factory.py, line 21:
<comment>Cursor rule `.cursor/rules/sync-architecture.mdc` needs updating to reflect the new Builder-based factory architecture. The rule currently documents `SyncFactory` as monolithic but doesn't mention the new modular pattern with `SourceBuilder`, `DestinationBuilder`, `ContextBuilder`, and `PipelineBuilder` classes. Developers using Cursor will receive outdated guidance about sync factory structure.</comment>
<file context>
@@ -0,0 +1,186 @@
+from airweave.db.init_db_native import init_db_with_entity_definitions
+from airweave.platform.entities._base import BaseEntity
+from airweave.platform.sync.config import SyncExecutionConfig
+from airweave.platform.sync.factory._context import ContextBuilder
+from airweave.platform.sync.factory._destination import DestinationBuilder
+from airweave.platform.sync.factory._pipeline import PipelineBuilder
</file context>
| db.add(db_obj) | ||
|
|
||
| if uow: | ||
| await uow.flush() |
There was a problem hiding this comment.
P0: UnitOfWork does not have a flush() method - this will raise AttributeError at runtime. Based on the base class pattern in _base_organization.py, when uow is provided, no action should be taken (the UoW context manager handles commit on exit).
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/crud/crud_sync_connection.py, line 179:
<comment>`UnitOfWork` does not have a `flush()` method - this will raise `AttributeError` at runtime. Based on the base class pattern in `_base_organization.py`, when `uow` is provided, no action should be taken (the UoW context manager handles commit on exit).</comment>
<file context>
@@ -0,0 +1,285 @@
+ db.add(db_obj)
+
+ if uow:
+ await uow.flush()
+ else:
+ await db.commit()
</file context>
✅ Addressed in 7252404
| enable_postgres = config is None or config.enable_postgres_handler | ||
|
|
||
| vector_db_destinations: list[BaseDestination] = [] | ||
| self_processing_destinations: list[BaseDestination] = [] |
There was a problem hiding this comment.
P1: The self_processing_destinations list is populated but never used. Destinations with ProcessingRequirement.RAW_ENTITIES are collected but no handler is created for them, meaning they will be silently ignored during sync operations.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/factory/_pipeline.py, line 94:
<comment>The `self_processing_destinations` list is populated but never used. Destinations with `ProcessingRequirement.RAW_ENTITIES` are collected but no handler is created for them, meaning they will be silently ignored during sync operations.</comment>
<file context>
@@ -0,0 +1,142 @@
+ enable_postgres = config is None or config.enable_postgres_handler
+
+ vector_db_destinations: list[BaseDestination] = []
+ self_processing_destinations: list[BaseDestination] = []
+
+ for dest in destinations:
</file context>
There was a problem hiding this comment.
1 issue found across 8 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/airweave/crud/crud_sync_connection.py">
<violation number="1" location="backend/airweave/crud/crud_sync_connection.py:179">
P1: Replacing `await uow.flush()` with `pass` breaks the Unit of Work pattern. Changes added via `db.add()` won't be flushed to the database, meaning auto-generated IDs won't be populated and subsequent operations in the same transaction won't see the new object. The correct pattern (used elsewhere in the codebase) is `await uow.session.flush()`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| db.add(db_obj) | ||
|
|
||
| if uow: | ||
| pass |
There was a problem hiding this comment.
P1: Replacing await uow.flush() with pass breaks the Unit of Work pattern. Changes added via db.add() won't be flushed to the database, meaning auto-generated IDs won't be populated and subsequent operations in the same transaction won't see the new object. The correct pattern (used elsewhere in the codebase) is await uow.session.flush().
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/crud/crud_sync_connection.py, line 179:
<comment>Replacing `await uow.flush()` with `pass` breaks the Unit of Work pattern. Changes added via `db.add()` won't be flushed to the database, meaning auto-generated IDs won't be populated and subsequent operations in the same transaction won't see the new object. The correct pattern (used elsewhere in the codebase) is `await uow.session.flush()`.</comment>
<file context>
@@ -176,7 +176,7 @@ async def create(
if uow:
- await uow.flush()
+ pass
else:
await db.commit()
</file context>
Summary by cubic
Adds a config-based destination multiplexer for syncs to support blue-green migrations, with ARF replay and DB-persisted execution config to control sync behavior. Teams can fork a shadow destination, validate, and switch without re-pulling from sources.
New Features
Migration
Written for commit 03b3766. Summary will update on new commits.