Skip to content

Commit daf05b4

Browse files
committed
fix intermittent issue
this fixes the following types of runtime errs: ``` NATS connection successful 2 nats.tsx:128:17Draining NATS connection nats.tsx:168:17Failed to create consumer in stream "noti": DrainingConnectionError: connection draining DrainingConnectionError errors.ts:152 _check nats.ts:89 request nats.ts:373 _request jsbaseclient_api.ts:119 addUpdate jsmconsumer_api.ts:163 add jsmconsumer_api.ts:180 createConsumer useConsumer.ts:68 useConsumer useConsumer.ts:89 React 37 performWorkUntilDeadline scheduler.development.js:45 js scheduler.development.js:223 js scheduler.development.js:364 __require2 chunk-PR4QN5HX.js:14 js index.js:6 __require2 chunk-PR4QN5HX.js:14 React 2 __require2 chunk-PR4QN5HX.js:14 js React __require2 chunk-PR4QN5HX.js:14 <anonymous> react-dom_client.js:20192useConsumer.ts:85:17Watch error for bucket stat: DrainingConnectionError: connection draining DrainingConnectionError errors.ts:152 _check nats.ts:89 request nats.ts:373 _request jsbaseclient_api.ts:119 addUpdate jsmconsumer_api.ts:163 add jsmconsumer_api.ts:180 getOrderedPushConsumer jsmstream_api.ts:206 getPushConsumer jsmstream_api.ts:154 watch kv.ts:916 useBucketWatch useBucketWatch.ts:63 useBucketWatch useBucketWatch.ts:119 React 33 performWorkUntilDeadline scheduler.development.js:45 js scheduler.development.js:223 js scheduler.development.js:364 __require2 chunk-PR4QN5HX.js:14 js index.js:6 __require2 chunk-PR4QN5HX.js:14 React 2 __require2 chunk-PR4QN5HX.js:14 js React __require2 chunk-PR4QN5HX.js:14 <anonymous> react-dom_client.js:20192useBucketWatch.ts:114:19Watch error for bucket stat: DrainingConnectionError: connection draining DrainingConnectionError errors.ts:152 _check nats.ts:89 request nats.ts:373 _request jsbaseclient_api.ts:119 addUpdate jsmconsumer_api.ts:163 add jsmconsumer_api.ts:180 getOrderedPushConsumer jsmstream_api.ts:206 getPushConsumer jsmstream_api.ts:154 watch kv.ts:916 useBucketWatch useBucketWatch.ts:63 useBucketWatch useBucketWatch.ts:119 React 29 performWorkUntilDeadline scheduler.development.js:45 js scheduler.development.js:223 js scheduler.development.js:364 __require2 chunk-PR4QN5HX.js:14 js index.js:6 __require2 chunk-PR4QN5HX.js:14 React 2 __require2 chunk-PR4QN5HX.js:14 js React __require2 chunk-PR4QN5HX.js:14 <anonymous> react-dom_client.js:20192useBucketWatch.ts:114:19 useBucketWatch useBucketWatch.ts:114 useBucketWatch useBucketWatch.ts:119 React 29 performWorkUntilDeadline scheduler.development.js:45 (Async: EventHandlerNonNull) js scheduler.development.js:223 js scheduler.development.js:364 __require2 chunk-PR4QN5HX.js:14 js index.js:6 __require2 chunk-PR4QN5HX.js:14 React 2 __require2 chunk-PR4QN5HX.js:14 js React __require2 chunk-PR4QN5HX.js:14 <anonymous> react-dom_client.js:20192Watch error for bucket stat: ClosedConnectionError: closed connection ClosedConnectionError errors.ts:135 _check nats.ts:86 request nats.ts:373 _request jsbaseclient_api.ts:119 addUpdate jsmconsumer_api.ts:163 add jsmconsumer_api.ts:180 getOrderedPushConsumer jsmstream_api.ts:206 getPushConsumer jsmstream_api.ts:154 watch kv.ts:916 useBucketWatch useBucketWatch.ts:63useBucketWatch.ts:114:19Watch error for bucket stat: ClosedConnectionError: closed connection ClosedConnectionError errors.ts:135 _check nats.ts:86 request nats.ts:373 _request jsbaseclient_api.ts:119 addUpdate jsmconsumer_api.ts:163 add jsmconsumer_api.ts:180 getOrderedPushConsumer jsmstream_api.ts:206 getPushConsumer jsmstream_api.ts:154 watch kv.ts:916 useBucketWatch useBucketWatch.ts:63useBucketWatch.ts:114:19NATS connection drained and closed nats.tsx:172:21 ``` sdf
1 parent 4603560 commit daf05b4

File tree

6 files changed

+201
-103
lines changed

6 files changed

+201
-103
lines changed

frontend/interactEM/src/contexts/nats.tsx

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,26 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
6565
isConnected: false,
6666
})
6767

68-
const [natsConnection, setNatsConnection] = useState<NatsConnection | null>(
69-
null,
70-
)
7168
const { token, natsJwt, isAuthenticated } = useAuth()
7269
const tokenRef = useRef(token)
7370
const natsJwtRef = useRef(natsJwt)
71+
const hasConnectedRef = useRef(false)
72+
const connectionRef = useRef<NatsConnection | null>(null)
7473

7574
useEffect(() => {
7675
tokenRef.current = token
7776
}, [token])
7877

78+
useEffect(() => {
79+
natsJwtRef.current = natsJwt
80+
}, [natsJwt])
81+
7982
useEffect(() => {
8083
if (!isAuthenticated) {
8184
return
8285
}
83-
async function setupNatsServices(nc: NatsConnection) {
86+
87+
async function setupNatsServices(nc: NatsConnection): Promise<boolean> {
8488
try {
8589
const js = jetstream(nc)
8690
const jsm = await jetstreamManager(nc)
@@ -93,11 +97,14 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
9397
keyValueManager: kvm,
9498
isConnected: true,
9599
})
100+
return true
96101
} catch (error) {
97102
console.error("Failed to setup NATS services:", error)
98103
setState((prev) => ({ ...prev, isConnected: false }))
104+
return false
99105
}
100106
}
107+
101108
async function connect() {
102109
try {
103110
const servers = Array.isArray(natsServers) ? natsServers : [natsServers]
@@ -125,11 +132,20 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
125132
maxReconnectAttempts: 30,
126133
})
127134

135+
connectionRef.current = nc
128136
console.log("NATS connection successful")
129137

130-
setNatsConnection(nc)
131-
await setupNatsServices(nc)
132-
138+
const setupOk = await setupNatsServices(nc)
139+
if (!setupOk) {
140+
try {
141+
await nc.drain()
142+
} catch (err) {
143+
console.error("Error draining NATS connection:", err)
144+
}
145+
connectionRef.current = null
146+
hasConnectedRef.current = false
147+
return
148+
}
133149
// natsConnection will cycle through the following status sequence when
134150
// it is disconnected:
135151
// 1. Error
@@ -145,30 +161,44 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
145161
setState((prev) => ({ ...prev, isConnected: true }))
146162
break
147163
case "error":
148-
// TODO: handle error better, maybe with UI update
149-
setState((prev) => ({
150-
...prev,
151-
isConnected: false,
152-
}))
164+
case "disconnect":
165+
case "staleConnection":
166+
case "close":
167+
setState((prev) => ({ ...prev, isConnected: false }))
153168
break
154169
}
155170
}
156171
})().catch(console.error)
157172
} catch (error) {
158173
console.error("Failed to connect to NATS:", error)
159174
setState((prev) => ({ ...prev, isConnected: false }))
175+
hasConnectedRef.current = false
176+
const nc = connectionRef.current
177+
connectionRef.current = null
178+
if (nc) {
179+
try {
180+
await nc.drain()
181+
} catch (err) {
182+
console.error("Error draining NATS connection:", err)
183+
}
184+
}
160185
}
161186
}
162187

163-
if (!natsConnection) {
188+
if (!hasConnectedRef.current) {
189+
hasConnectedRef.current = true
164190
connect()
165191
}
192+
166193
return () => {
167-
if (natsConnection) {
194+
hasConnectedRef.current = false
195+
const nc = connectionRef.current
196+
connectionRef.current = null
197+
if (nc) {
168198
console.log("Draining NATS connection")
169199
;(async () => {
170200
try {
171-
await natsConnection.drain()
201+
await nc.drain()
172202
console.log("NATS connection drained and closed")
173203
} catch (err) {
174204
console.error("Error draining NATS connection:", err)
@@ -183,7 +213,7 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
183213
isConnected: false,
184214
})
185215
}
186-
}, [isAuthenticated, natsConnection, natsServers])
216+
}, [isAuthenticated, natsServers])
187217

188218
if (!isAuthenticated) {
189219
return null
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import {
2+
ClosedConnectionError,
3+
DrainingConnectionError,
4+
} from "@nats-io/nats-core"
5+
6+
export function isConnectionError(err: unknown): boolean {
7+
return (
8+
err instanceof DrainingConnectionError ||
9+
err instanceof ClosedConnectionError
10+
)
11+
}
Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,57 @@
11
import type { KV } from "@nats-io/kv"
2-
import { DrainingConnectionError } from "@nats-io/nats-core"
32
import { useEffect, useRef, useState } from "react"
43
import { useNats } from "../../contexts/nats"
4+
import { isConnectionError } from "./natsErrors"
55

66
export const useBucket = (bucketName: string): KV | null => {
7-
const { keyValueManager } = useNats()
7+
const { keyValueManager, isConnected } = useNats()
88
const [bucket, setBucket] = useState<KV | null>(null)
9-
const isMounted = useRef(true)
9+
const isMountedRef = useRef(true)
1010

1111
useEffect(() => {
12-
isMounted.current = true
12+
isMountedRef.current = true
13+
// Create abort signal for this effect instance to handle race conditions
14+
// when connection drops mid-open and reconnects before promise settles
15+
const abortController = new AbortController()
1316

1417
const openBucket = async () => {
15-
if (keyValueManager && !bucket) {
16-
try {
17-
const openedBucket = await keyValueManager.open(bucketName)
18-
if (isMounted.current) {
19-
setBucket(openedBucket)
20-
}
21-
} catch (error) {
22-
if (error instanceof DrainingConnectionError) {
23-
// quietly ignore if connection is draining
24-
return
25-
}
26-
console.error(`Failed to open bucket "${bucketName}":`, error)
18+
if (!keyValueManager || !isConnected) {
19+
return
20+
}
21+
22+
try {
23+
const openedBucket = await keyValueManager.open(bucketName)
24+
25+
// Check if this effect instance was aborted before setting state
26+
if (abortController.signal.aborted) {
27+
return
28+
}
29+
30+
if (isMountedRef.current) {
31+
setBucket(openedBucket)
32+
}
33+
} catch (error) {
34+
// Check if this effect instance was aborted before handling error
35+
if (abortController.signal.aborted) {
36+
return
37+
}
38+
39+
if (isConnectionError(error)) {
40+
// Silently ignore connection lifecycle errors
41+
return
2742
}
43+
console.error(`Failed to open bucket "${bucketName}":`, error)
2844
}
2945
}
3046

3147
openBucket()
3248

3349
return () => {
34-
isMounted.current = false
50+
isMountedRef.current = false
51+
abortController.abort()
52+
setBucket(null)
3553
}
36-
}, [keyValueManager, bucket, bucketName])
54+
}, [keyValueManager, bucketName, isConnected])
3755

3856
return bucket
3957
}
Lines changed: 67 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
import type { KvEntry, KvWatchOptions } from "@nats-io/kv"
2-
import { useEffect, useState } from "react"
1+
import type { KvEntry, KvWatchEntry, KvWatchOptions } from "@nats-io/kv"
2+
import type { QueuedIterator } from "@nats-io/nats-core"
3+
import { useEffect, useRef, useState } from "react"
34
import type { z } from "zod"
5+
import { isConnectionError } from "./natsErrors"
46
import { useBucket } from "./useBucket"
57

68
// We need to ensure that the type has an id property
@@ -24,10 +26,15 @@ export function useBucketWatch<T extends WithId>({
2426
const [error, setError] = useState<string | null>(null)
2527
const [isLoading, setIsLoading] = useState<boolean>(true)
2628
const bucket = useBucket(bucketName)
29+
const watchRef = useRef<QueuedIterator<KvWatchEntry> | null>(null)
2730

2831
useEffect(() => {
2932
if (!bucket) return
3033

34+
setItems([])
35+
setError(null)
36+
setIsLoading(true)
37+
3138
const abortController = new AbortController()
3239
const signal = abortController.signal
3340

@@ -36,7 +43,7 @@ export function useBucketWatch<T extends WithId>({
3643
if (!entry.value) return null
3744

3845
try {
39-
const data = entry.json<any>()
46+
const data = entry.json<unknown>()
4047
const parseResult = schema.safeParse(data)
4148

4249
if (parseResult.success) {
@@ -47,81 +54,93 @@ export function useBucketWatch<T extends WithId>({
4754
parseResult.error,
4855
)
4956
return null
50-
} catch (error) {
51-
console.error(`Failed to parse data for key ${entry.key}:`, error)
57+
} catch (err) {
58+
console.error(`Failed to parse data for key ${entry.key}:`, err)
5259
return null
5360
}
5461
}
5562

5663
// Watch for changes
57-
;(async () => {
64+
const startWatch = async () => {
5865
try {
5966
const watchOptions: KvWatchOptions | undefined = keyFilter
6067
? { key: keyFilter }
6168
: undefined
6269

6370
const watch = await bucket.watch(watchOptions)
71+
watchRef.current = watch
72+
73+
if (signal.aborted) {
74+
watch.stop()
75+
return
76+
}
77+
78+
let firstEntry = true
79+
for await (const entry of watch) {
80+
if (signal.aborted) {
81+
watch.stop()
82+
return
83+
}
84+
85+
const key = entry.key
86+
// since we are using a subject prefix (e.g., "<prefix>.<id>")
87+
// we need to strip the prefix for filtering
88+
const strippedKey = stripPrefix
89+
? key.replace(`${stripPrefix}.`, "")
90+
: key
91+
92+
setItems((prevItems) => {
93+
// Simply filter based on id property
94+
const filteredItems = prevItems.filter((item) => {
95+
// Check both possible ID locations from the WithId type
96+
const itemId = "id" in item ? item.id : item.uri.id
97+
return itemId !== strippedKey
98+
})
6499

65-
const processWatch = async () => {
66-
for await (const entry of watch) {
67-
if (signal.aborted) {
68-
watch.stop()
69-
return
100+
// Handle deletion
101+
if (entry.operation === "DEL" || entry.operation === "PURGE") {
102+
return filteredItems
70103
}
71104

72-
const key = entry.key
73-
// since we are using a subject prefix (e.g., "<prefix>.<id>")
74-
// we need to strip the prefix for filtering
75-
const strippedKey = stripPrefix
76-
? key.replace(`${stripPrefix}.`, "")
77-
: key
78-
79-
setItems((prevItems) => {
80-
// Simply filter based on id property
81-
const filteredItems = prevItems.filter((item) => {
82-
// Check both possible ID locations from the WithId type
83-
const itemId = "id" in item ? item.id : item.uri.id
84-
return itemId !== strippedKey
85-
})
86-
87-
// Handle deletion
88-
if (entry.operation === "DEL" || entry.operation === "PURGE") {
89-
return filteredItems
90-
}
91-
92-
// Skip if no value
93-
if (!entry.value) {
94-
return filteredItems
95-
}
96-
97-
const item = validateAndParse(entry)
98-
if (item) {
99-
return [...filteredItems, item]
100-
}
105+
// Skip if no value
106+
if (!entry.value) {
101107
return filteredItems
102-
})
108+
}
103109

104-
// Mark loading as complete after initial entries have been processed
105-
if (isLoading && !signal.aborted) {
106-
setIsLoading(false)
110+
const item = validateAndParse(entry)
111+
if (item) {
112+
return [...filteredItems, item]
107113
}
114+
return filteredItems
115+
})
116+
117+
// Mark loading as complete after first entry processed
118+
if (firstEntry && !signal.aborted) {
119+
firstEntry = false
120+
setIsLoading(false)
108121
}
109122
}
110-
111-
processWatch()
112123
} catch (err) {
124+
// Silently ignore connection lifecycle errors
125+
if (isConnectionError(err)) {
126+
return
127+
}
113128
if (!signal.aborted) {
114129
console.error(`Watch error for bucket ${bucketName}:`, err)
115130
setError(`Failed to watch bucket ${bucketName}`)
116131
setIsLoading(false)
117132
}
118133
}
119-
})()
134+
}
135+
136+
startWatch()
120137

121138
return () => {
122139
abortController.abort()
140+
watchRef.current?.stop()
141+
watchRef.current = null
123142
}
124-
}, [bucket, stripPrefix, bucketName, schema, keyFilter, isLoading])
143+
}, [bucket, stripPrefix, bucketName, schema, keyFilter])
125144

126145
return { items, error, isLoading }
127146
}

0 commit comments

Comments
 (0)