Skip to content

Commit aae9e4a

Browse files
committed
fix: report import step failures correctly (#1303)
1 parent e529dac commit aae9e4a

File tree

2 files changed

+223
-60
lines changed

2 files changed

+223
-60
lines changed

apps/memos-local-openclaw/src/viewer/server.ts

Lines changed: 161 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,72 @@ import type { Logger, Chunk, PluginContext } from "../types";
1818
import { viewerHTML } from "./html";
1919
import { v4 as uuid } from "uuid";
2020

21-
function normalizeTimestamp(ts: number): number {
22-
if (ts < 1e12) return ts * 1000;
23-
return ts;
21+
export interface MigrationStepFailureCounts {
22+
summarization: number;
23+
dedup: number;
24+
embedding: number;
25+
}
26+
27+
export interface MigrationStateSnapshot {
28+
phase: string;
29+
stored: number;
30+
skipped: number;
31+
merged: number;
32+
errors: number;
33+
processed: number;
34+
total: number;
35+
lastItem: any;
36+
done: boolean;
37+
stopped: boolean;
38+
stepFailures: MigrationStepFailureCounts;
39+
success: boolean;
40+
}
41+
42+
function createInitialStepFailures(): MigrationStepFailureCounts {
43+
return { summarization: 0, dedup: 0, embedding: 0 };
44+
}
45+
46+
export function computeMigrationSuccess(state: Pick<MigrationStateSnapshot, "errors" | "stepFailures">): boolean {
47+
const sf = state.stepFailures;
48+
return state.errors === 0 && sf.summarization === 0 && sf.dedup === 0 && sf.embedding === 0;
49+
}
50+
51+
export function createInitialMigrationState(): MigrationStateSnapshot {
52+
const stepFailures = createInitialStepFailures();
53+
return {
54+
phase: "",
55+
stored: 0,
56+
skipped: 0,
57+
merged: 0,
58+
errors: 0,
59+
processed: 0,
60+
total: 0,
61+
lastItem: null,
62+
done: false,
63+
stopped: false,
64+
stepFailures,
65+
success: computeMigrationSuccess({ errors: 0, stepFailures }),
66+
};
67+
}
68+
69+
export function applyMigrationItemToState(state: MigrationStateSnapshot, d: any): void {
70+
if (d.status === "stored") state.stored++;
71+
else if (d.status === "skipped" || d.status === "duplicate") state.skipped++;
72+
else if (d.status === "merged") state.merged++;
73+
else if (d.status === "error") state.errors++;
74+
75+
if (Array.isArray(d.stepFailures)) {
76+
for (const step of d.stepFailures) {
77+
if (step === "summarization") state.stepFailures.summarization++;
78+
else if (step === "dedup") state.stepFailures.dedup++;
79+
else if (step === "embedding") state.stepFailures.embedding++;
80+
}
81+
}
82+
83+
state.processed = d.index ?? state.processed + 1;
84+
state.total = d.total ?? state.total;
85+
state.lastItem = d;
86+
state.success = computeMigrationSuccess(state);
2487
}
2588

2689
export interface ViewerServerOptions {
@@ -60,18 +123,7 @@ export class ViewerServer {
60123
private resetToken: string;
61124
private migrationRunning = false;
62125
private migrationAbort = false;
63-
private migrationState: {
64-
phase: string;
65-
stored: number;
66-
skipped: number;
67-
merged: number;
68-
errors: number;
69-
processed: number;
70-
total: number;
71-
lastItem: any;
72-
done: boolean;
73-
stopped: boolean;
74-
} = { phase: "", stored: 0, skipped: 0, merged: 0, errors: 0, processed: 0, total: 0, lastItem: null, done: false, stopped: false };
126+
private migrationState: MigrationStateSnapshot = createInitialMigrationState();
75127
private migrationSSEClients: http.ServerResponse[] = [];
76128

77129
private ppRunning = false;
@@ -1641,7 +1693,7 @@ export class ViewerServer {
16411693
} else if (this.migrationState.done) {
16421694
const evtName = this.migrationState.stopped ? "stopped" : "done";
16431695
res.write(`event: state\ndata: ${JSON.stringify(this.migrationState)}\n\n`);
1644-
res.write(`event: ${evtName}\ndata: ${JSON.stringify({ ok: true })}\n\n`);
1696+
res.write(`event: ${evtName}\ndata: ${JSON.stringify({ ok: this.migrationState.success, ...this.migrationState })}\n\n`);
16451697
res.end();
16461698
} else {
16471699
res.end();
@@ -1682,19 +1734,12 @@ export class ViewerServer {
16821734
this.migrationSSEClients = this.migrationSSEClients.filter(c => c !== res);
16831735
});
16841736

1685-
this.migrationAbort = false;
1686-
this.migrationState = { phase: "", stored: 0, skipped: 0, merged: 0, errors: 0, processed: 0, total: 0, lastItem: null, done: false, stopped: false };
1737+
this.migrationState = createInitialMigrationState();
16871738

16881739
const send = (event: string, data: unknown) => {
16891740
if (event === "item") {
16901741
const d = data as any;
1691-
if (d.status === "stored") this.migrationState.stored++;
1692-
else if (d.status === "skipped" || d.status === "duplicate") this.migrationState.skipped++;
1693-
else if (d.status === "merged") this.migrationState.merged++;
1694-
else if (d.status === "error") this.migrationState.errors++;
1695-
this.migrationState.processed = d.index ?? this.migrationState.processed + 1;
1696-
this.migrationState.total = d.total ?? this.migrationState.total;
1697-
this.migrationState.lastItem = d;
1742+
applyMigrationItemToState(this.migrationState, d);
16981743
} else if (event === "phase") {
16991744
this.migrationState.phase = (data as any).phase;
17001745
} else if (event === "progress") {
@@ -1707,11 +1752,13 @@ export class ViewerServer {
17071752
this.runMigration(send, opts.sources, concurrency).finally(() => {
17081753
this.migrationRunning = false;
17091754
this.migrationState.done = true;
1755+
this.migrationState.success = computeMigrationSuccess(this.migrationState);
1756+
const donePayload = { ok: this.migrationState.success, ...this.migrationState };
17101757
if (this.migrationAbort) {
17111758
this.migrationState.stopped = true;
1712-
this.broadcastSSE("stopped", { ok: true, ...this.migrationState });
1759+
this.broadcastSSE("stopped", donePayload);
17131760
} else {
1714-
this.broadcastSSE("done", { ok: true });
1761+
this.broadcastSSE("done", donePayload);
17151762
}
17161763
this.migrationAbort = false;
17171764
const clientsToClose = [...this.migrationSSEClients];
@@ -1808,11 +1855,24 @@ export class ViewerServer {
18081855
}
18091856

18101857
try {
1811-
const summary = await summarizer.summarize(row.text);
1858+
const stepFailures: Array<"summarization" | "dedup" | "embedding"> = [];
1859+
let summary = "";
1860+
try {
1861+
summary = await summarizer.summarize(row.text);
1862+
} catch (err) {
1863+
stepFailures.push("summarization");
1864+
this.log.warn(`Migration summarization failed: ${err}`);
1865+
}
1866+
if (!summary) {
1867+
stepFailures.push("summarization");
1868+
summary = row.text.slice(0, 200);
1869+
}
1870+
18121871
let embedding: number[] | null = null;
18131872
try {
18141873
[embedding] = await this.embedder.embed([summary]);
18151874
} catch (err) {
1875+
stepFailures.push("embedding");
18161876
this.log.warn(`Migration embed failed: ${err}`);
18171877
}
18181878

@@ -1831,26 +1891,31 @@ export class ViewerServer {
18311891
}).filter(c => c.summary);
18321892

18331893
if (candidates.length > 0) {
1834-
const dedupResult = await summarizer.judgeDedup(summary, candidates);
1835-
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
1836-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
1837-
if (targetId) {
1838-
dedupStatus = "duplicate";
1839-
dedupTarget = targetId;
1840-
dedupReason = dedupResult.reason;
1841-
}
1842-
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
1843-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
1844-
if (targetId) {
1845-
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, row.text);
1846-
try {
1847-
const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]);
1848-
if (newEmb) this.store.upsertEmbedding(targetId, newEmb);
1849-
} catch { /* best-effort */ }
1850-
dedupStatus = "merged";
1851-
dedupTarget = targetId;
1852-
dedupReason = dedupResult.reason;
1894+
try {
1895+
const dedupResult = await summarizer.judgeDedup(summary, candidates);
1896+
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
1897+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
1898+
if (targetId) {
1899+
dedupStatus = "duplicate";
1900+
dedupTarget = targetId;
1901+
dedupReason = dedupResult.reason;
1902+
}
1903+
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
1904+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
1905+
if (targetId) {
1906+
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, row.text);
1907+
try {
1908+
const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]);
1909+
if (newEmb) this.store.upsertEmbedding(targetId, newEmb);
1910+
} catch { /* best-effort */ }
1911+
dedupStatus = "merged";
1912+
dedupTarget = targetId;
1913+
dedupReason = dedupResult.reason;
1914+
}
18531915
}
1916+
} catch (err) {
1917+
stepFailures.push("dedup");
1918+
this.log.warn(`Migration dedup judgment failed: ${err}`);
18541919
}
18551920
}
18561921
}
@@ -1893,7 +1958,13 @@ export class ViewerServer {
18931958
preview: row.text.slice(0, 120),
18941959
summary: summary.slice(0, 80),
18951960
source: file,
1961+
stepFailures,
18961962
});
1963+
if (stepFailures.length > 0) {
1964+
this.log.warn(`[MIGRATION] sqlite item imported with step failures: ${stepFailures.join(",")}`);
1965+
} else {
1966+
this.log.info("[MIGRATION] sqlite item imported successfully (all steps)");
1967+
}
18971968
} catch (err) {
18981969
totalErrors++;
18991970
send("item", {
@@ -2023,11 +2094,24 @@ export class ViewerServer {
20232094
}
20242095

20252096
try {
2026-
const summary = await summarizer.summarize(content);
2097+
const stepFailures: Array<"summarization" | "dedup" | "embedding"> = [];
2098+
let summary = "";
2099+
try {
2100+
summary = await summarizer.summarize(content);
2101+
} catch (err) {
2102+
stepFailures.push("summarization");
2103+
this.log.warn(`Migration summarization failed: ${err}`);
2104+
}
2105+
if (!summary) {
2106+
stepFailures.push("summarization");
2107+
summary = content.slice(0, 200);
2108+
}
2109+
20272110
let embedding: number[] | null = null;
20282111
try {
20292112
[embedding] = await this.embedder.embed([summary]);
20302113
} catch (err) {
2114+
stepFailures.push("embedding");
20312115
this.log.warn(`Migration embed failed: ${err}`);
20322116
}
20332117

@@ -2046,17 +2130,22 @@ export class ViewerServer {
20462130
}).filter(c => c.summary);
20472131

20482132
if (candidates.length > 0) {
2049-
const dedupResult = await summarizer.judgeDedup(summary, candidates);
2050-
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
2051-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
2052-
if (targetId) { dedupStatus = "duplicate"; dedupTarget = targetId; dedupReason = dedupResult.reason; }
2053-
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
2054-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
2055-
if (targetId) {
2056-
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, content);
2057-
try { const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]); if (newEmb) this.store.upsertEmbedding(targetId, newEmb); } catch { /* best-effort */ }
2058-
dedupStatus = "merged"; dedupTarget = targetId; dedupReason = dedupResult.reason;
2133+
try {
2134+
const dedupResult = await summarizer.judgeDedup(summary, candidates);
2135+
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
2136+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
2137+
if (targetId) { dedupStatus = "duplicate"; dedupTarget = targetId; dedupReason = dedupResult.reason; }
2138+
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
2139+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
2140+
if (targetId) {
2141+
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, content);
2142+
try { const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]); if (newEmb) this.store.upsertEmbedding(targetId, newEmb); } catch { /* best-effort */ }
2143+
dedupStatus = "merged"; dedupTarget = targetId; dedupReason = dedupResult.reason;
2144+
}
20592145
}
2146+
} catch (err) {
2147+
stepFailures.push("dedup");
2148+
this.log.warn(`Migration dedup judgment failed: ${err}`);
20602149
}
20612150
}
20622151
}
@@ -2076,7 +2165,12 @@ export class ViewerServer {
20762165
if (embedding && dedupStatus === "active") this.store.upsertEmbedding(chunkId, embedding);
20772166

20782167
totalStored++;
2079-
send("item", { index: idx, total: totalMsgs, status: dedupStatus === "active" ? "stored" : dedupStatus, preview: content.slice(0, 120), summary: summary.slice(0, 80), source: file, agent: agentId, role: msgRole });
2168+
send("item", { index: idx, total: totalMsgs, status: dedupStatus === "active" ? "stored" : dedupStatus, preview: content.slice(0, 120), summary: summary.slice(0, 80), source: file, agent: agentId, role: msgRole, stepFailures });
2169+
if (stepFailures.length > 0) {
2170+
this.log.warn(`[MIGRATION] session item imported with step failures: ${stepFailures.join(",")}`);
2171+
} else {
2172+
this.log.info("[MIGRATION] session item imported successfully (all steps)");
2173+
}
20802174
} catch (err) {
20812175
totalErrors++;
20822176
send("item", { index: idx, total: totalMsgs, status: "error", preview: content.slice(0, 120), source: file, agent: agentId, error: String(err).slice(0, 200) });
@@ -2117,7 +2211,14 @@ export class ViewerServer {
21172211
}
21182212

21192213
send("progress", { total: totalProcessed, processed: totalProcessed, phase: "done" });
2120-
send("summary", { totalProcessed, totalStored, totalSkipped, totalErrors });
2214+
send("summary", {
2215+
totalProcessed,
2216+
totalStored,
2217+
totalSkipped,
2218+
totalErrors,
2219+
success: computeMigrationSuccess(this.migrationState),
2220+
stepFailures: this.migrationState.stepFailures,
2221+
});
21212222
}
21222223

21232224
// ─── Post-processing: independent task/skill generation ───
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { describe, it, expect } from "vitest";
2+
import {
3+
applyMigrationItemToState,
4+
computeMigrationSuccess,
5+
createInitialMigrationState,
6+
} from "../src/viewer/server";
7+
8+
describe("migration status step failure reporting", () => {
9+
it("reports success=false when step failures exist even if no fatal errors", () => {
10+
const state = createInitialMigrationState();
11+
12+
applyMigrationItemToState(state, {
13+
status: "stored",
14+
index: 1,
15+
total: 2,
16+
stepFailures: ["embedding"],
17+
});
18+
19+
expect(state.errors).toBe(0);
20+
expect(state.stepFailures.embedding).toBe(1);
21+
expect(state.success).toBe(false);
22+
expect(computeMigrationSuccess(state)).toBe(false);
23+
});
24+
25+
it("keeps success=true for clean imports and flips to false when dedup/summarization fail", () => {
26+
const state = createInitialMigrationState();
27+
28+
applyMigrationItemToState(state, {
29+
status: "stored",
30+
index: 1,
31+
total: 3,
32+
stepFailures: [],
33+
});
34+
35+
expect(state.success).toBe(true);
36+
37+
applyMigrationItemToState(state, {
38+
status: "stored",
39+
index: 2,
40+
total: 3,
41+
stepFailures: ["dedup", "summarization"],
42+
});
43+
44+
expect(state.stepFailures.dedup).toBe(1);
45+
expect(state.stepFailures.summarization).toBe(1);
46+
expect(state.success).toBe(false);
47+
});
48+
49+
it("counts explicit item errors and reports failure", () => {
50+
const state = createInitialMigrationState();
51+
52+
applyMigrationItemToState(state, {
53+
status: "error",
54+
index: 1,
55+
total: 1,
56+
stepFailures: [],
57+
});
58+
59+
expect(state.errors).toBe(1);
60+
expect(state.success).toBe(false);
61+
});
62+
});

0 commit comments

Comments
 (0)