Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions packages/adapters/database/src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ export async function getRebalanceOperations(
offset?: number,
filter?: {
status?: RebalanceOperationStatus | RebalanceOperationStatus[];
bridge?: string;
bridge?: string | string[];
chainId?: number;
earmarkId?: string | null;
invoiceId?: string;
Expand All @@ -738,9 +738,14 @@ export async function getRebalanceOperations(
paramCount++;
}

if (filter.bridge !== undefined) {
conditions.push(`ro."bridge" = $${paramCount}`);
values.push(filter.bridge);
if (filter.bridge) {
if (Array.isArray(filter.bridge)) {
conditions.push(`ro.bridge = ANY($${paramCount})`);
values.push(filter.bridge);
} else {
conditions.push(`ro.bridge = $${paramCount}`);
values.push(filter.bridge);
}
paramCount++;
}

Expand Down
16 changes: 16 additions & 0 deletions packages/adapters/rebalance/test/adapters/ccip/ccip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,22 @@ describe('CCIPBridgeAdapter', () => {
expect(mockGetExecutionReceipts).toHaveBeenCalled(); // SDK should be called as fallback
});

it('falls back to SDK when Atlas API returns non-200, non-404 status', async () => {
mockFetch.mockResolvedValueOnce({
ok: false,
status: 500,
statusText: 'Internal Server Error',
json: async () => ({}),
} as Response);
mockGetExecutionReceipts.mockImplementation(async function* () {
yield mockExecutionReceipt;
});

const status = await adapter.getTransferStatus('0xhash', 1, 42161);
expect(status.status).toBe('SUCCESS');
expect(mockGetExecutionReceipts).toHaveBeenCalled(); // SDK should be called as fallback
});

it('returns PENDING when no execution receipts found (SDK fallback)', async () => {
mockGetExecutionReceipts.mockImplementation(async function* () {
// Empty generator - no receipts
Expand Down
2 changes: 2 additions & 0 deletions packages/poller/src/rebalance/mantleEth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ export const executeMethCallbacks = async (context: ProcessingContext): Promise<
// Get all pending operations from database
const { operations } = await db.getRebalanceOperations(undefined, undefined, {
status: [RebalanceOperationStatus.PENDING, RebalanceOperationStatus.AWAITING_CALLBACK],
bridge: [SupportedBridge.Mantle, `${SupportedBridge.Across}-mantle`],
});

logger.debug(`Found ${operations.length} meth rebalance operations`, {
Expand Down Expand Up @@ -987,6 +988,7 @@ export const executeMethCallbacks = async (context: ProcessingContext): Promise<
logger.warn('Operation is not a mantle bridge', logContext);
continue;
}

const adapter = rebalance.getAdapter(bridgeType as SupportedBridge);

// Get origin transaction hash from JSON field
Expand Down
8 changes: 2 additions & 6 deletions packages/poller/src/rebalance/tacUsdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1699,15 +1699,11 @@ const executeTacCallbacks = async (context: ProcessingContext): Promise<void> =>
const operationTtlMinutes = config.regularRebalanceOpTTLMinutes ?? DEFAULT_OPERATION_TTL_MINUTES;

// Get all pending TAC operations
const { operations } = await db.getRebalanceOperations(undefined, undefined, {
const { operations: tacOperations } = await db.getRebalanceOperations(undefined, undefined, {
status: [RebalanceOperationStatus.PENDING, RebalanceOperationStatus.AWAITING_CALLBACK],
bridge: [`${SupportedBridge.Stargate}-tac`, SupportedBridge.TacInner],
});

// Filter for TAC-related operations
const tacOperations = operations.filter(
(op) => op.bridge === 'stargate-tac' || op.bridge === SupportedBridge.TacInner,
);

// SERIALIZATION CHECK: Only allow one Leg 2 (TacInner) operation in-flight at a time
// This prevents mixing funds from multiple flows when they complete close together
const pendingTacInnerOps = tacOperations.filter(
Expand Down