Skip to content

Commit 2766032

Browse files
authored
Merge pull request #509 from NERSC/fix-intermittent-draining-issue
fix bad nats connection on page refresh and add nats status indicator
2 parents a87c637 + daf05b4 commit 2766032

File tree

9 files changed

+278
-105
lines changed

9 files changed

+278
-105
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { Chip, Tooltip } from "@mui/material"
2+
import type { NatsConnection } from "@nats-io/nats-core"
3+
import { useMemo } from "react"
4+
import { useNats } from "../contexts/nats"
5+
6+
type NatsStatus = {
7+
color: "success" | "warning" | "error"
8+
label: string
9+
tooltip: string
10+
}
11+
12+
const getNatsStatus = (
13+
connection: NatsConnection | null,
14+
isConnected: boolean,
15+
): NatsStatus => {
16+
if (!connection) {
17+
return {
18+
color: "error",
19+
label: "Disconnected",
20+
tooltip: "NATS is not connected",
21+
}
22+
}
23+
24+
if (connection.isDraining()) {
25+
return {
26+
color: "warning",
27+
label: "Draining",
28+
tooltip: "NATS connection is draining",
29+
}
30+
}
31+
32+
if (connection.isClosed()) {
33+
return {
34+
color: "error",
35+
label: "Closed",
36+
tooltip: "NATS connection is closed",
37+
}
38+
}
39+
40+
if (isConnected) {
41+
return {
42+
color: "success",
43+
label: "Connected",
44+
tooltip: "NATS connection is active",
45+
}
46+
}
47+
48+
return {
49+
color: "warning",
50+
label: "Connecting",
51+
tooltip: "NATS connection is establishing",
52+
}
53+
}
54+
55+
export const NatsStatusIndicator = () => {
56+
const { natsConnection, isConnected } = useNats()
57+
const status = useMemo(
58+
() => getNatsStatus(natsConnection, isConnected),
59+
[natsConnection, isConnected],
60+
)
61+
62+
return (
63+
<Tooltip title={status.tooltip} arrow placement="top">
64+
<Chip label="NATS" color={status.color} size="small" variant="outlined" />
65+
</Tooltip>
66+
)
67+
}

frontend/interactEM/src/components/pipelines/hudcomposer.tsx

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
import { useCallback, useRef, useState } from "react"
1010
import { useActivePipeline } from "../../hooks/api/useActivePipeline"
1111
import { usePipelineStore } from "../../stores"
12+
import { NatsStatusIndicator } from "../natsstatus"
1213
import { DeletePipelineButton } from "./deletebutton"
1314
import { HudListButton } from "./hudlistbutton"
1415
import { LaunchPipelineButton } from "./launchbutton"
@@ -126,8 +127,12 @@ export const HudComposer: React.FC = () => {
126127
onClick={handleTogglePipelineDrawer}
127128
/>
128129

129-
{/* Pipeline content */}
130-
{pipelineDisplayContent()}
130+
<Box sx={{ display: "flex", alignItems: "center", flex: 1 }}>
131+
{/* Pipeline content */}
132+
{pipelineDisplayContent()}
133+
</Box>
134+
135+
<NatsStatusIndicator />
131136
</Box>
132137

133138
{/* Pipeline List Drawer */}

frontend/interactEM/src/components/pipelines/hudrunning.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { PipelineDeploymentPublic } from "../../client"
55
import { useActivePipeline } from "../../hooks/api/useActivePipeline"
66
import { useDeployment } from "../../hooks/api/useDeploymentsQuery"
77
import { usePipelineStore } from "../../stores"
8+
import { NatsStatusIndicator } from "../natsstatus"
89
import { DeploymentManagementPanel } from "./deploymentmanagementpanel"
910
import { HudListButton } from "./hudlistbutton"
1011
import { HudRuntimeInfo } from "./hudruntimeinfo"
@@ -63,6 +64,8 @@ export const HudRunning: React.FC = () => {
6364
isLoading={loading}
6465
error={error}
6566
/>
67+
68+
<NatsStatusIndicator />
6669
</Box>
6770

6871
{/* Deployment Management Panel */}

frontend/interactEM/src/contexts/nats.tsx

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,26 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
6565
isConnected: false,
6666
})
6767

68-
const [natsConnection, setNatsConnection] = useState<NatsConnection | null>(
69-
null,
70-
)
7168
const { token, natsJwt, isAuthenticated } = useAuth()
7269
const tokenRef = useRef(token)
7370
const natsJwtRef = useRef(natsJwt)
71+
const hasConnectedRef = useRef(false)
72+
const connectionRef = useRef<NatsConnection | null>(null)
7473

7574
useEffect(() => {
7675
tokenRef.current = token
7776
}, [token])
7877

78+
useEffect(() => {
79+
natsJwtRef.current = natsJwt
80+
}, [natsJwt])
81+
7982
useEffect(() => {
8083
if (!isAuthenticated) {
8184
return
8285
}
83-
async function setupNatsServices(nc: NatsConnection) {
86+
87+
async function setupNatsServices(nc: NatsConnection): Promise<boolean> {
8488
try {
8589
const js = jetstream(nc)
8690
const jsm = await jetstreamManager(nc)
@@ -93,11 +97,14 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
9397
keyValueManager: kvm,
9498
isConnected: true,
9599
})
100+
return true
96101
} catch (error) {
97102
console.error("Failed to setup NATS services:", error)
98103
setState((prev) => ({ ...prev, isConnected: false }))
104+
return false
99105
}
100106
}
107+
101108
async function connect() {
102109
try {
103110
const servers = Array.isArray(natsServers) ? natsServers : [natsServers]
@@ -125,11 +132,20 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
125132
maxReconnectAttempts: 30,
126133
})
127134

135+
connectionRef.current = nc
128136
console.log("NATS connection successful")
129137

130-
setNatsConnection(nc)
131-
await setupNatsServices(nc)
132-
138+
const setupOk = await setupNatsServices(nc)
139+
if (!setupOk) {
140+
try {
141+
await nc.drain()
142+
} catch (err) {
143+
console.error("Error draining NATS connection:", err)
144+
}
145+
connectionRef.current = null
146+
hasConnectedRef.current = false
147+
return
148+
}
133149
// natsConnection will cycle through the following status sequence when
134150
// it is disconnected:
135151
// 1. Error
@@ -145,30 +161,44 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
145161
setState((prev) => ({ ...prev, isConnected: true }))
146162
break
147163
case "error":
148-
// TODO: handle error better, maybe with UI update
149-
setState((prev) => ({
150-
...prev,
151-
isConnected: false,
152-
}))
164+
case "disconnect":
165+
case "staleConnection":
166+
case "close":
167+
setState((prev) => ({ ...prev, isConnected: false }))
153168
break
154169
}
155170
}
156171
})().catch(console.error)
157172
} catch (error) {
158173
console.error("Failed to connect to NATS:", error)
159174
setState((prev) => ({ ...prev, isConnected: false }))
175+
hasConnectedRef.current = false
176+
const nc = connectionRef.current
177+
connectionRef.current = null
178+
if (nc) {
179+
try {
180+
await nc.drain()
181+
} catch (err) {
182+
console.error("Error draining NATS connection:", err)
183+
}
184+
}
160185
}
161186
}
162187

163-
if (!natsConnection) {
188+
if (!hasConnectedRef.current) {
189+
hasConnectedRef.current = true
164190
connect()
165191
}
192+
166193
return () => {
167-
if (natsConnection) {
194+
hasConnectedRef.current = false
195+
const nc = connectionRef.current
196+
connectionRef.current = null
197+
if (nc) {
168198
console.log("Draining NATS connection")
169199
;(async () => {
170200
try {
171-
await natsConnection.drain()
201+
await nc.drain()
172202
console.log("NATS connection drained and closed")
173203
} catch (err) {
174204
console.error("Error draining NATS connection:", err)
@@ -183,7 +213,7 @@ export const NatsProvider: React.FC<NatsProviderProps> = ({
183213
isConnected: false,
184214
})
185215
}
186-
}, [isAuthenticated, natsConnection, natsServers])
216+
}, [isAuthenticated, natsServers])
187217

188218
if (!isAuthenticated) {
189219
return null
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import {
2+
ClosedConnectionError,
3+
DrainingConnectionError,
4+
} from "@nats-io/nats-core"
5+
6+
export function isConnectionError(err: unknown): boolean {
7+
return (
8+
err instanceof DrainingConnectionError ||
9+
err instanceof ClosedConnectionError
10+
)
11+
}
Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,57 @@
11
import type { KV } from "@nats-io/kv"
2-
import { DrainingConnectionError } from "@nats-io/nats-core"
32
import { useEffect, useRef, useState } from "react"
43
import { useNats } from "../../contexts/nats"
4+
import { isConnectionError } from "./natsErrors"
55

66
export const useBucket = (bucketName: string): KV | null => {
7-
const { keyValueManager } = useNats()
7+
const { keyValueManager, isConnected } = useNats()
88
const [bucket, setBucket] = useState<KV | null>(null)
9-
const isMounted = useRef(true)
9+
const isMountedRef = useRef(true)
1010

1111
useEffect(() => {
12-
isMounted.current = true
12+
isMountedRef.current = true
13+
// Create abort signal for this effect instance to handle race conditions
14+
// when connection drops mid-open and reconnects before promise settles
15+
const abortController = new AbortController()
1316

1417
const openBucket = async () => {
15-
if (keyValueManager && !bucket) {
16-
try {
17-
const openedBucket = await keyValueManager.open(bucketName)
18-
if (isMounted.current) {
19-
setBucket(openedBucket)
20-
}
21-
} catch (error) {
22-
if (error instanceof DrainingConnectionError) {
23-
// quietly ignore if connection is draining
24-
return
25-
}
26-
console.error(`Failed to open bucket "${bucketName}":`, error)
18+
if (!keyValueManager || !isConnected) {
19+
return
20+
}
21+
22+
try {
23+
const openedBucket = await keyValueManager.open(bucketName)
24+
25+
// Check if this effect instance was aborted before setting state
26+
if (abortController.signal.aborted) {
27+
return
28+
}
29+
30+
if (isMountedRef.current) {
31+
setBucket(openedBucket)
32+
}
33+
} catch (error) {
34+
// Check if this effect instance was aborted before handling error
35+
if (abortController.signal.aborted) {
36+
return
37+
}
38+
39+
if (isConnectionError(error)) {
40+
// Silently ignore connection lifecycle errors
41+
return
2742
}
43+
console.error(`Failed to open bucket "${bucketName}":`, error)
2844
}
2945
}
3046

3147
openBucket()
3248

3349
return () => {
34-
isMounted.current = false
50+
isMountedRef.current = false
51+
abortController.abort()
52+
setBucket(null)
3553
}
36-
}, [keyValueManager, bucket, bucketName])
54+
}, [keyValueManager, bucketName, isConnected])
3755

3856
return bucket
3957
}

0 commit comments

Comments
 (0)