Skip to content

Commit bbcae34

Browse files
committed
Fix race condition where streamed events could be lost
Events stream in via model_write listener while also being fetched from the database. If the DB fetch completed before all events were persisted, replaceModelsInStore would wipe out events that came in via model_write. Added mergeModelsInStore that adds fetched events without removing existing ones. Applied to HTTP, gRPC, and WebSocket event hooks.
1 parent 2a5587c commit bbcae34

File tree

6 files changed

+45
-9
lines changed

6 files changed

+45
-9
lines changed

crates/yaak-http/src/transaction.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ mod tests {
342342

343343
#[tokio::test]
344344
async fn test_transaction_single_redirect() {
345-
let redirect_headers = vec![("Location".to_string(), "https://example.com/new".to_string())];
345+
let redirect_headers =
346+
vec![("Location".to_string(), "https://example.com/new".to_string())];
346347

347348
let responses = vec![
348349
MockResponse { status: 302, headers: redirect_headers, body: vec![] },
@@ -373,7 +374,8 @@ mod tests {
373374

374375
#[tokio::test]
375376
async fn test_transaction_max_redirects_exceeded() {
376-
let redirect_headers = vec![("Location".to_string(), "https://example.com/loop".to_string())];
377+
let redirect_headers =
378+
vec![("Location".to_string(), "https://example.com/loop".to_string())];
377379

378380
// Create more redirects than allowed
379381
let responses: Vec<MockResponse> = (0..12)
@@ -525,7 +527,8 @@ mod tests {
525527
_request: SendableHttpRequest,
526528
_event_tx: mpsc::Sender<HttpResponseEvent>,
527529
) -> Result<HttpResponse> {
528-
let headers = vec![("set-cookie".to_string(), "session=xyz789; Path=/".to_string())];
530+
let headers =
531+
vec![("set-cookie".to_string(), "session=xyz789; Path=/".to_string())];
529532

530533
let body_stream: Pin<Box<dyn AsyncRead + Send>> =
531534
Box::pin(std::io::Cursor::new(vec![]));
@@ -584,7 +587,10 @@ mod tests {
584587
let headers = vec![
585588
("set-cookie".to_string(), "session=abc123; Path=/".to_string()),
586589
("set-cookie".to_string(), "user_id=42; Path=/".to_string()),
587-
("set-cookie".to_string(), "preferences=dark; Path=/; Max-Age=86400".to_string()),
590+
(
591+
"set-cookie".to_string(),
592+
"preferences=dark; Path=/; Max-Age=86400".to_string(),
593+
),
588594
];
589595

590596
let body_stream: Pin<Box<dyn AsyncRead + Send>> =

crates/yaak-models/guest-js/store.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,22 @@ export function replaceModelsInStore<
206206
});
207207
}
208208

209+
export function mergeModelsInStore<
210+
M extends AnyModel['model'],
211+
T extends Extract<AnyModel, { model: M }>,
212+
>(model: M, models: T[]) {
213+
mustStore().set(modelStoreDataAtom, (prev: ModelStoreData) => {
214+
const existingModels = { ...prev[model] } as Record<string, T>;
215+
for (const m of models) {
216+
existingModels[m.id] = m;
217+
}
218+
return {
219+
...prev,
220+
[model]: existingModels,
221+
};
222+
});
223+
}
224+
209225
function shouldIgnoreModel({ model, updateSource }: ModelPayload) {
210226
// Never ignore updates from non-user sources
211227
if (updateSource.type !== 'window') {

src-web/hooks/useHttpResponseEvents.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { invoke } from '@tauri-apps/api/core';
22
import type { HttpResponse, HttpResponseEvent } from '@yaakapp-internal/models';
3-
import { httpResponseEventsAtom, replaceModelsInStore } from '@yaakapp-internal/models';
3+
import {
4+
httpResponseEventsAtom,
5+
mergeModelsInStore,
6+
replaceModelsInStore,
7+
} from '@yaakapp-internal/models';
48
import { useAtomValue } from 'jotai';
59
import { useEffect, useMemo } from 'react';
610

@@ -13,8 +17,10 @@ export function useHttpResponseEvents(response: HttpResponse | null) {
1317
return;
1418
}
1519

20+
// Use merge instead of replace to preserve events that came in via model_write
21+
// while we were fetching from the database
1622
invoke<HttpResponseEvent[]>('cmd_get_http_response_events', { responseId: response.id }).then(
17-
(events) => replaceModelsInStore('http_response_event', events),
23+
(events) => mergeModelsInStore('http_response_event', events),
1824
);
1925
}, [response?.id]);
2026

src-web/hooks/usePinnedGrpcConnection.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { GrpcConnection, GrpcEvent } from '@yaakapp-internal/models';
33
import {
44
grpcConnectionsAtom,
55
grpcEventsAtom,
6+
mergeModelsInStore,
67
replaceModelsInStore,
78
} from '@yaakapp-internal/models';
89
import { atom, useAtomValue } from 'jotai';
@@ -67,8 +68,10 @@ export function useGrpcEvents(connectionId: string | null) {
6768
return;
6869
}
6970

71+
// Use merge instead of replace to preserve events that came in via model_write
72+
// while we were fetching from the database
7073
invoke<GrpcEvent[]>('models_grpc_events', { connectionId }).then((events) => {
71-
replaceModelsInStore('grpc_event', events);
74+
mergeModelsInStore('grpc_event', events);
7275
});
7376
}, [connectionId]);
7477

src-web/hooks/usePinnedWebsocketConnection.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { invoke } from '@tauri-apps/api/core';
22
import type { WebsocketConnection, WebsocketEvent } from '@yaakapp-internal/models';
33
import {
4+
mergeModelsInStore,
45
replaceModelsInStore,
56
websocketConnectionsAtom,
67
websocketEventsAtom,
@@ -54,8 +55,10 @@ export function useWebsocketEvents(connectionId: string | null) {
5455
return;
5556
}
5657

58+
// Use merge instead of replace to preserve events that came in via model_write
59+
// while we were fetching from the database
5760
invoke<WebsocketEvent[]>('models_websocket_events', { connectionId }).then(
58-
(events) => replaceModelsInStore('websocket_event', events),
61+
(events) => mergeModelsInStore('websocket_event', events),
5962
);
6063
}, [connectionId]);
6164

src-web/lib/model_util.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ function makeEvent(
1111
id: 'test',
1212
model: 'http_response_event',
1313
responseId: 'resp',
14-
createdAt: Date.now(),
14+
workspaceId: 'ws',
15+
createdAt: new Date().toISOString(),
16+
updatedAt: new Date().toISOString(),
1517
event: { type, name, value } as HttpResponseEvent['event'],
1618
};
1719
}

0 commit comments

Comments
 (0)