Skip to content

Commit a330c4f

Browse files
committed
fix(openclaw): prevent duplicate viewer instances on gateway restart
When OpenClaw calls register() again (deferred reload / gateway restart), the old viewer HTTP server was not stopped. Each re-registration leaked a new server on an incrementing port (18799 → 18800 → 18801…). Root cause: serviceStarted and viewer were closure-scoped inside register(), so a second call created a fresh closure with no knowledge of the previous one. Fix: track active instances at module level, keyed by stateDir. On re-registration with the same stateDir, startServiceCore() now awaits the previous viewer's port release before binding the new one. Also makes ViewerServer.stop() return a Promise so the caller can properly wait for the HTTP server to close. Closes #1471
1 parent 96a1dd6 commit a330c4f

File tree

4 files changed

+201
-5
lines changed

4 files changed

+201
-5
lines changed

apps/memos-local-openclaw/index.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,19 @@ const pluginConfigSchema = {
150150
},
151151
};
152152

153+
// ─── Module-level singleton guard ───
154+
// Prevents duplicate viewers/stores when OpenClaw calls register() multiple
155+
// times for the SAME stateDir (e.g. deferred reload, gateway restart).
156+
// Keyed by stateDir so that truly independent instances (different data dirs,
157+
// as in tests) are not accidentally torn down.
158+
const activeInstances = new Map<string, {
159+
viewer: InstanceType<typeof ViewerServer>;
160+
hubServer: InstanceType<typeof HubServer> | null;
161+
worker: InstanceType<typeof IngestWorker>;
162+
store: InstanceType<typeof SqliteStore>;
163+
telemetry: InstanceType<typeof Telemetry>;
164+
}>();
165+
153166
const memosLocalPlugin = {
154167
id: "memos-local-openclaw-plugin",
155168
name: "MemOS Local Memory",
@@ -160,6 +173,19 @@ const memosLocalPlugin = {
160173
configSchema: pluginConfigSchema,
161174

162175
register(api: OpenClawPluginApi) {
176+
// Resolve stateDir early so we can check for a duplicate instance with the
177+
// same data directory (deferred reload / gateway restart scenario).
178+
const stateDir = process.env.OPENCLAW_STATE_DIR || api.resolvePath("~/.openclaw");
179+
180+
// Snapshot the previous instance for this stateDir so startServiceCore()
181+
// can tear it down asynchronously (awaiting port release) before starting
182+
// the new viewer. Instances with a different stateDir are left untouched.
183+
const instanceToReplace = activeInstances.get(stateDir) ?? null;
184+
activeInstances.delete(stateDir);
185+
if (instanceToReplace) {
186+
api.logger.info("memos-local: previous instance detected, will stop before starting new viewer");
187+
}
188+
163189
api.registerMemoryCapability({
164190
promptBuilder: buildMemoryPromptSection,
165191
});
@@ -286,7 +312,6 @@ const memosLocalPlugin = {
286312
}
287313

288314
let pluginCfg = (api.pluginConfig ?? {}) as Record<string, unknown>;
289-
const stateDir = process.env.OPENCLAW_STATE_DIR || api.resolvePath("~/.openclaw");
290315

291316
// Fallback: read config from file if not provided by OpenClaw
292317
const configPath = path.join(stateDir, "state", "memos-local", "config.json");
@@ -2378,6 +2403,9 @@ Groups: ${groupNames.length > 0 ? groupNames.join(", ") : "(none)"}`,
23782403
? new HubServer({ store, log: ctx.log, config: ctx.config, dataDir: stateDir, embedder, defaultHubPort: derivedHubPort })
23792404
: null;
23802405

2406+
// Track this instance so the next register() with the same stateDir can tear it down
2407+
activeInstances.set(stateDir, { viewer, hubServer, worker, store, telemetry });
2408+
23812409
// ─── Service lifecycle ───
23822410

23832411
let serviceStarted = false;
@@ -2386,6 +2414,21 @@ Groups: ${groupNames.length > 0 ? groupNames.join(", ") : "(none)"}`,
23862414
if (serviceStarted) return;
23872415
serviceStarted = true;
23882416

2417+
// Gracefully shut down the previous instance before binding new ports
2418+
if (instanceToReplace) {
2419+
api.logger.info("memos-local: stopping previous instance...");
2420+
try {
2421+
await instanceToReplace.viewer.stop();
2422+
await instanceToReplace.hubServer?.stop();
2423+
await instanceToReplace.worker.flush().catch(() => {});
2424+
await instanceToReplace.telemetry.shutdown().catch(() => {});
2425+
instanceToReplace.store.close();
2426+
} catch (err) {
2427+
api.logger.warn(`memos-local: previous instance cleanup error: ${err}`);
2428+
}
2429+
api.logger.info("memos-local: previous instance stopped");
2430+
}
2431+
23892432
if (hubServer) {
23902433
const hubUrl = await hubServer.start();
23912434
api.logger.info(`memos-local: hub started at ${hubUrl}`);
@@ -2429,10 +2472,11 @@ Groups: ${groupNames.length > 0 ? groupNames.join(", ") : "(none)"}`,
24292472
id: "memos-local-openclaw-plugin",
24302473
start: async () => { await startServiceCore(); },
24312474
stop: async () => {
2475+
activeInstances.delete(stateDir);
24322476
await worker.flush();
24332477
await telemetry.shutdown();
24342478
await hubServer?.stop();
2435-
viewer.stop();
2479+
await viewer.stop();
24362480
store.close();
24372481
api.logger.info("memos-local: stopped");
24382482
},

apps/memos-local-openclaw/src/viewer/server.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,13 +211,19 @@ export class ViewerServer {
211211
}
212212
}
213213

214-
stop(): void {
214+
stop(): Promise<void> {
215215
this.stopHubHeartbeat();
216216
this.stopNotifPoll();
217217
for (const c of this.notifSSEClients) { try { c.end(); } catch {} }
218218
this.notifSSEClients = [];
219-
this.server?.close();
219+
if (!this.server) return Promise.resolve();
220+
const srv = this.server;
220221
this.server = null;
222+
return new Promise<void>((resolve) => {
223+
srv.close(() => resolve());
224+
// Force-close idle keep-alive connections so close() doesn't hang
225+
srv.closeAllConnections?.();
226+
});
221227
}
222228

223229
getResetToken(): string {
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/**
2+
* Real integration test: verify that calling register() twice stops the
3+
* previous viewer instead of leaking a second HTTP server on a new port.
4+
*
5+
* Spins up actual HTTP servers (ViewerServer) and verifies:
6+
* 1. Old instance is torn down before new one starts
7+
* 2. New viewer binds to the same port (not port+1)
8+
* 3. Only one HTTP server is running after re-registration
9+
*/
10+
11+
import { describe, it, expect, afterAll } from "vitest";
12+
import http from "node:http";
13+
import net from "node:net";
14+
import os from "node:os";
15+
import fs from "node:fs";
16+
import path from "node:path";
17+
18+
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "memos-dup-test-"));
19+
fs.mkdirSync(path.join(tmpDir, "memos-local"), { recursive: true });
20+
fs.mkdirSync(path.join(tmpDir, "workspace", "skills"), { recursive: true });
21+
fs.mkdirSync(path.join(tmpDir, "skills"), { recursive: true });
22+
23+
/** Find a free port by temporarily binding to port 0 */
24+
function findFreePort(): Promise<number> {
25+
return new Promise((resolve, reject) => {
26+
const srv = net.createServer();
27+
srv.listen(0, "127.0.0.1", () => {
28+
const port = (srv.address() as net.AddressInfo).port;
29+
srv.close(() => resolve(port));
30+
});
31+
srv.on("error", reject);
32+
});
33+
}
34+
35+
/** Check if a port is in use by attempting TCP connect */
36+
function isPortListening(port: number): Promise<boolean> {
37+
return new Promise((resolve) => {
38+
const sock = net.createConnection({ host: "127.0.0.1", port }, () => {
39+
sock.destroy();
40+
resolve(true);
41+
});
42+
sock.on("error", () => resolve(false));
43+
sock.setTimeout(1000, () => { sock.destroy(); resolve(false); });
44+
});
45+
}
46+
47+
function createMockApi(stateDir: string, gatewayPort: number) {
48+
const services: Array<{ id: string; start: () => Promise<void>; stop: () => Promise<void> }> = [];
49+
const logs: string[] = [];
50+
return {
51+
api: {
52+
id: "memos-local-openclaw-plugin",
53+
pluginConfig: {},
54+
config: { gateway: { port: gatewayPort } },
55+
resolvePath: (p: string) => p.replace("~/.openclaw", stateDir),
56+
logger: {
57+
info: (msg: string) => logs.push(msg),
58+
warn: (msg: string) => logs.push(msg),
59+
error: (msg: string) => logs.push(msg),
60+
debug: (msg: string) => logs.push(msg),
61+
},
62+
registerTool: () => {},
63+
registerMemoryCapability: () => {},
64+
registerService: (svc: any) => { services.push(svc); },
65+
registerHook: () => {},
66+
on: () => {},
67+
},
68+
services,
69+
logs,
70+
};
71+
}
72+
73+
let lastCleanup: (() => Promise<void>) | null = null;
74+
75+
afterAll(async () => {
76+
if (lastCleanup) await lastCleanup();
77+
await new Promise((r) => setTimeout(r, 200));
78+
fs.rmSync(tmpDir, { recursive: true, force: true });
79+
});
80+
81+
/** Extract the viewer port from logs like "→ http://127.0.0.1:19799" */
82+
function extractViewerPort(logs: string[]): number | null {
83+
for (const l of logs) {
84+
const m = l.match(/\s*http:\/\/127\.0\.0\.1:(\d+)/);
85+
if (m) return parseInt(m[1], 10);
86+
}
87+
return null;
88+
}
89+
90+
describe("duplicate instance prevention (real HTTP servers)", () => {
91+
it("second register() should stop previous viewer and reuse the same port", async () => {
92+
// Pick a random free port to avoid collisions with other processes
93+
const freePort = await findFreePort();
94+
const gatewayPort = freePort;
95+
const expectedViewerPort = gatewayPort + 10;
96+
97+
// Confirm our ports are actually free
98+
expect(await isPortListening(expectedViewerPort)).toBe(false);
99+
expect(await isPortListening(expectedViewerPort + 1)).toBe(false);
100+
101+
const pluginModule = await import("../index");
102+
const plugin = pluginModule.default;
103+
104+
// ─── 1st register + start ───
105+
const mock1 = createMockApi(tmpDir, gatewayPort);
106+
plugin.register(mock1.api as any);
107+
const svc1 = mock1.services[mock1.services.length - 1];
108+
await svc1.start();
109+
110+
const viewerPort1 = extractViewerPort(mock1.logs);
111+
expect(viewerPort1).toBe(expectedViewerPort);
112+
expect(await isPortListening(expectedViewerPort)).toBe(true);
113+
114+
// ─── 2nd register (simulates deferred reload / gateway restart) ───
115+
const mock2 = createMockApi(tmpDir, gatewayPort);
116+
plugin.register(mock2.api as any);
117+
118+
// Verify the plugin detected the previous instance
119+
const detectedMsg = mock2.logs.some((l) => l.includes("previous instance detected"));
120+
expect(detectedMsg).toBe(true);
121+
122+
// Start the 2nd service — cleanup happens inside startServiceCore()
123+
const svc2 = mock2.services[mock2.services.length - 1];
124+
await svc2.start();
125+
126+
lastCleanup = async () => { await svc2.stop(); };
127+
128+
// Verify: stopped previous, then started on the SAME port
129+
const stoppedMsg = mock2.logs.some((l) => l.includes("previous instance stopped"));
130+
expect(stoppedMsg).toBe(true);
131+
132+
const viewerPort2 = extractViewerPort(mock2.logs);
133+
expect(viewerPort2).toBe(expectedViewerPort); // Same port, not port+1
134+
135+
// Verify: only ONE server is running (the expected port), not two
136+
expect(await isPortListening(expectedViewerPort)).toBe(true);
137+
expect(await isPortListening(expectedViewerPort + 1)).toBe(false);
138+
139+
// ─── Clean stop ───
140+
await svc2.stop();
141+
lastCleanup = null;
142+
await new Promise((r) => setTimeout(r, 200));
143+
144+
expect(await isPortListening(expectedViewerPort)).toBe(false);
145+
});
146+
});

apps/memos-local-openclaw/tests/shutdown-lifecycle.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ describe("shutdown lifecycle", () => {
8383

8484
class MockViewer {
8585
async start(): Promise<string> { return "http://127.0.0.1:18799"; }
86-
stop(): void { events.push("viewer-stop"); }
86+
async stop(): Promise<void> { events.push("viewer-stop"); }
8787
getResetToken(): string { return "token"; }
8888
}
8989

0 commit comments

Comments
 (0)