Skip to content

Commit 95fb6d3

Browse files
fix: report import step failures correctly (#1303) (#1309)
## Description The import/migration flow could report overall success (`ok: true`) even when non-fatal sub-steps failed on imported items (summarization/dedup/embedding), because these failures were only logged or silently ignored and never propagated to import state. This PR keeps import behavior compatible (still stores raw memory when possible), but correctly reports step-level failures in migration state and final done payload: - Track per-item step failures (`summarization`, `dedup`, `embedding`) in `item` events. - Aggregate step failures in migration state (`stepFailures`) and compute `success` from both hard errors and step failures. - Final `done`/`stopped` SSE payload now includes full state and `ok: false` when any step fails. - `migrate/status` and replayed `migrate/stream` done events now surface this status consistently. - Added structured logs for successful full-step import vs partial-step failures. Related Issue (Required): Fixes #1303 ## Type of change - [x] Bug fix (non-breaking change which fixes an issue) ## How Has This Been Tested? - [x] Unit Test - [ ] Test Script Or Test Steps (please provide) - [ ] Pipeline Automated API Test (please provide) Added `apps/memos-local-openclaw/tests/migration-status.test.ts` to verify: - import state is marked unsuccessful when step failures exist even without fatal errors; - clean path remains successful; - explicit item errors still produce failure. Executed: - `npm test -- tests/migration-status.test.ts` (from `apps/memos-local-openclaw`) ## Checklist - [x] I have performed a self-review of my own code - [x] I have commented my code in hard-to-understand areas - [x] I have added tests that prove my fix is effective or that my feature works - [ ] I have created related documentation issue/PR in [MemOS-Docs](https://github.com/MemTensor/MemOS-Docs) (if applicable) - [x] I have linked the issue to this PR (if applicable) - [ ] I have mentioned the person who will review this PR
2 parents fd7c058 + 47e4029 commit 95fb6d3

File tree

2 files changed

+223
-60
lines changed

2 files changed

+223
-60
lines changed

apps/memos-local-openclaw/src/viewer/server.ts

Lines changed: 161 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,72 @@ import type { Logger, Chunk, PluginContext, MemosLocalConfig } from "../types";
2222
import { viewerHTML } from "./html";
2323
import { v4 as uuid } from "uuid";
2424

25-
function normalizeTimestamp(ts: number): number {
26-
if (ts < 1e12) return ts * 1000;
27-
return ts;
25+
export interface MigrationStepFailureCounts {
26+
summarization: number;
27+
dedup: number;
28+
embedding: number;
29+
}
30+
31+
export interface MigrationStateSnapshot {
32+
phase: string;
33+
stored: number;
34+
skipped: number;
35+
merged: number;
36+
errors: number;
37+
processed: number;
38+
total: number;
39+
lastItem: any;
40+
done: boolean;
41+
stopped: boolean;
42+
stepFailures: MigrationStepFailureCounts;
43+
success: boolean;
44+
}
45+
46+
function createInitialStepFailures(): MigrationStepFailureCounts {
47+
return { summarization: 0, dedup: 0, embedding: 0 };
48+
}
49+
50+
export function computeMigrationSuccess(state: Pick<MigrationStateSnapshot, "errors" | "stepFailures">): boolean {
51+
const sf = state.stepFailures;
52+
return state.errors === 0 && sf.summarization === 0 && sf.dedup === 0 && sf.embedding === 0;
53+
}
54+
55+
export function createInitialMigrationState(): MigrationStateSnapshot {
56+
const stepFailures = createInitialStepFailures();
57+
return {
58+
phase: "",
59+
stored: 0,
60+
skipped: 0,
61+
merged: 0,
62+
errors: 0,
63+
processed: 0,
64+
total: 0,
65+
lastItem: null,
66+
done: false,
67+
stopped: false,
68+
stepFailures,
69+
success: computeMigrationSuccess({ errors: 0, stepFailures }),
70+
};
71+
}
72+
73+
export function applyMigrationItemToState(state: MigrationStateSnapshot, d: any): void {
74+
if (d.status === "stored") state.stored++;
75+
else if (d.status === "skipped" || d.status === "duplicate") state.skipped++;
76+
else if (d.status === "merged") state.merged++;
77+
else if (d.status === "error") state.errors++;
78+
79+
if (Array.isArray(d.stepFailures)) {
80+
for (const step of d.stepFailures) {
81+
if (step === "summarization") state.stepFailures.summarization++;
82+
else if (step === "dedup") state.stepFailures.dedup++;
83+
else if (step === "embedding") state.stepFailures.embedding++;
84+
}
85+
}
86+
87+
state.processed = d.index ?? state.processed + 1;
88+
state.total = d.total ?? state.total;
89+
state.lastItem = d;
90+
state.success = computeMigrationSuccess(state);
2891
}
2992

3093
export interface ViewerServerOptions {
@@ -67,18 +130,7 @@ export class ViewerServer {
67130
private resetToken: string;
68131
private migrationRunning = false;
69132
private migrationAbort = false;
70-
private migrationState: {
71-
phase: string;
72-
stored: number;
73-
skipped: number;
74-
merged: number;
75-
errors: number;
76-
processed: number;
77-
total: number;
78-
lastItem: any;
79-
done: boolean;
80-
stopped: boolean;
81-
} = { phase: "", stored: 0, skipped: 0, merged: 0, errors: 0, processed: 0, total: 0, lastItem: null, done: false, stopped: false };
133+
private migrationState: MigrationStateSnapshot = createInitialMigrationState();
82134
private migrationSSEClients: http.ServerResponse[] = [];
83135

84136
private ppRunning = false;
@@ -3575,7 +3627,7 @@ export class ViewerServer {
35753627
} else if (this.migrationState.done) {
35763628
const evtName = this.migrationState.stopped ? "stopped" : "done";
35773629
res.write(`event: state\ndata: ${JSON.stringify(this.migrationState)}\n\n`);
3578-
res.write(`event: ${evtName}\ndata: ${JSON.stringify({ ok: true })}\n\n`);
3630+
res.write(`event: ${evtName}\ndata: ${JSON.stringify({ ok: this.migrationState.success, ...this.migrationState })}\n\n`);
35793631
res.end();
35803632
} else {
35813633
res.end();
@@ -3616,19 +3668,12 @@ export class ViewerServer {
36163668
this.migrationSSEClients = this.migrationSSEClients.filter(c => c !== res);
36173669
});
36183670

3619-
this.migrationAbort = false;
3620-
this.migrationState = { phase: "", stored: 0, skipped: 0, merged: 0, errors: 0, processed: 0, total: 0, lastItem: null, done: false, stopped: false };
3671+
this.migrationState = createInitialMigrationState();
36213672

36223673
const send = (event: string, data: unknown) => {
36233674
if (event === "item") {
36243675
const d = data as any;
3625-
if (d.status === "stored") this.migrationState.stored++;
3626-
else if (d.status === "skipped" || d.status === "duplicate") this.migrationState.skipped++;
3627-
else if (d.status === "merged") this.migrationState.merged++;
3628-
else if (d.status === "error") this.migrationState.errors++;
3629-
this.migrationState.processed = d.index ?? this.migrationState.processed + 1;
3630-
this.migrationState.total = d.total ?? this.migrationState.total;
3631-
this.migrationState.lastItem = d;
3676+
applyMigrationItemToState(this.migrationState, d);
36323677
} else if (event === "phase") {
36333678
this.migrationState.phase = (data as any).phase;
36343679
} else if (event === "progress") {
@@ -3641,11 +3686,13 @@ export class ViewerServer {
36413686
this.runMigration(send, opts.sources, concurrency).finally(() => {
36423687
this.migrationRunning = false;
36433688
this.migrationState.done = true;
3689+
this.migrationState.success = computeMigrationSuccess(this.migrationState);
3690+
const donePayload = { ok: this.migrationState.success, ...this.migrationState };
36443691
if (this.migrationAbort) {
36453692
this.migrationState.stopped = true;
3646-
this.broadcastSSE("stopped", { ok: true, ...this.migrationState });
3693+
this.broadcastSSE("stopped", donePayload);
36473694
} else {
3648-
this.broadcastSSE("done", { ok: true });
3695+
this.broadcastSSE("done", donePayload);
36493696
}
36503697
this.migrationAbort = false;
36513698
const clientsToClose = [...this.migrationSSEClients];
@@ -3742,11 +3789,24 @@ export class ViewerServer {
37423789
}
37433790

37443791
try {
3745-
const summary = await summarizer.summarize(row.text);
3792+
const stepFailures: Array<"summarization" | "dedup" | "embedding"> = [];
3793+
let summary = "";
3794+
try {
3795+
summary = await summarizer.summarize(row.text);
3796+
} catch (err) {
3797+
stepFailures.push("summarization");
3798+
this.log.warn(`Migration summarization failed: ${err}`);
3799+
}
3800+
if (!summary) {
3801+
stepFailures.push("summarization");
3802+
summary = row.text.slice(0, 200);
3803+
}
3804+
37463805
let embedding: number[] | null = null;
37473806
try {
37483807
[embedding] = await this.embedder.embed([summary]);
37493808
} catch (err) {
3809+
stepFailures.push("embedding");
37503810
this.log.warn(`Migration embed failed: ${err}`);
37513811
}
37523812

@@ -3765,26 +3825,31 @@ export class ViewerServer {
37653825
}).filter(c => c.summary);
37663826

37673827
if (candidates.length > 0) {
3768-
const dedupResult = await summarizer.judgeDedup(summary, candidates);
3769-
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
3770-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
3771-
if (targetId) {
3772-
dedupStatus = "duplicate";
3773-
dedupTarget = targetId;
3774-
dedupReason = dedupResult.reason;
3775-
}
3776-
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
3777-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
3778-
if (targetId) {
3779-
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, row.text);
3780-
try {
3781-
const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]);
3782-
if (newEmb) this.store.upsertEmbedding(targetId, newEmb);
3783-
} catch { /* best-effort */ }
3784-
dedupStatus = "merged";
3785-
dedupTarget = targetId;
3786-
dedupReason = dedupResult.reason;
3828+
try {
3829+
const dedupResult = await summarizer.judgeDedup(summary, candidates);
3830+
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
3831+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
3832+
if (targetId) {
3833+
dedupStatus = "duplicate";
3834+
dedupTarget = targetId;
3835+
dedupReason = dedupResult.reason;
3836+
}
3837+
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
3838+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
3839+
if (targetId) {
3840+
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, row.text);
3841+
try {
3842+
const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]);
3843+
if (newEmb) this.store.upsertEmbedding(targetId, newEmb);
3844+
} catch { /* best-effort */ }
3845+
dedupStatus = "merged";
3846+
dedupTarget = targetId;
3847+
dedupReason = dedupResult.reason;
3848+
}
37873849
}
3850+
} catch (err) {
3851+
stepFailures.push("dedup");
3852+
this.log.warn(`Migration dedup judgment failed: ${err}`);
37883853
}
37893854
}
37903855
}
@@ -3827,7 +3892,13 @@ export class ViewerServer {
38273892
preview: row.text.slice(0, 120),
38283893
summary: summary.slice(0, 80),
38293894
source: file,
3895+
stepFailures,
38303896
});
3897+
if (stepFailures.length > 0) {
3898+
this.log.warn(`[MIGRATION] sqlite item imported with step failures: ${stepFailures.join(",")}`);
3899+
} else {
3900+
this.log.info("[MIGRATION] sqlite item imported successfully (all steps)");
3901+
}
38313902
} catch (err) {
38323903
totalErrors++;
38333904
send("item", {
@@ -3957,11 +4028,24 @@ export class ViewerServer {
39574028
}
39584029

39594030
try {
3960-
const summary = await summarizer.summarize(content);
4031+
const stepFailures: Array<"summarization" | "dedup" | "embedding"> = [];
4032+
let summary = "";
4033+
try {
4034+
summary = await summarizer.summarize(content);
4035+
} catch (err) {
4036+
stepFailures.push("summarization");
4037+
this.log.warn(`Migration summarization failed: ${err}`);
4038+
}
4039+
if (!summary) {
4040+
stepFailures.push("summarization");
4041+
summary = content.slice(0, 200);
4042+
}
4043+
39614044
let embedding: number[] | null = null;
39624045
try {
39634046
[embedding] = await this.embedder.embed([summary]);
39644047
} catch (err) {
4048+
stepFailures.push("embedding");
39654049
this.log.warn(`Migration embed failed: ${err}`);
39664050
}
39674051

@@ -3980,17 +4064,22 @@ export class ViewerServer {
39804064
}).filter(c => c.summary);
39814065

39824066
if (candidates.length > 0) {
3983-
const dedupResult = await summarizer.judgeDedup(summary, candidates);
3984-
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
3985-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
3986-
if (targetId) { dedupStatus = "duplicate"; dedupTarget = targetId; dedupReason = dedupResult.reason; }
3987-
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
3988-
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
3989-
if (targetId) {
3990-
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, content);
3991-
try { const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]); if (newEmb) this.store.upsertEmbedding(targetId, newEmb); } catch { /* best-effort */ }
3992-
dedupStatus = "merged"; dedupTarget = targetId; dedupReason = dedupResult.reason;
4067+
try {
4068+
const dedupResult = await summarizer.judgeDedup(summary, candidates);
4069+
if (dedupResult?.action === "DUPLICATE" && dedupResult.targetIndex) {
4070+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
4071+
if (targetId) { dedupStatus = "duplicate"; dedupTarget = targetId; dedupReason = dedupResult.reason; }
4072+
} else if (dedupResult?.action === "UPDATE" && dedupResult.targetIndex && dedupResult.mergedSummary) {
4073+
const targetId = candidates[dedupResult.targetIndex - 1]?.chunkId;
4074+
if (targetId) {
4075+
this.store.updateChunkSummaryAndContent(targetId, dedupResult.mergedSummary, content);
4076+
try { const [newEmb] = await this.embedder.embed([dedupResult.mergedSummary]); if (newEmb) this.store.upsertEmbedding(targetId, newEmb); } catch { /* best-effort */ }
4077+
dedupStatus = "merged"; dedupTarget = targetId; dedupReason = dedupResult.reason;
4078+
}
39934079
}
4080+
} catch (err) {
4081+
stepFailures.push("dedup");
4082+
this.log.warn(`Migration dedup judgment failed: ${err}`);
39944083
}
39954084
}
39964085
}
@@ -4010,7 +4099,12 @@ export class ViewerServer {
40104099
if (embedding && dedupStatus === "active") this.store.upsertEmbedding(chunkId, embedding);
40114100

40124101
totalStored++;
4013-
send("item", { index: idx, total: totalMsgs, status: dedupStatus === "active" ? "stored" : dedupStatus, preview: content.slice(0, 120), summary: summary.slice(0, 80), source: file, agent: agentId, role: msgRole });
4102+
send("item", { index: idx, total: totalMsgs, status: dedupStatus === "active" ? "stored" : dedupStatus, preview: content.slice(0, 120), summary: summary.slice(0, 80), source: file, agent: agentId, role: msgRole, stepFailures });
4103+
if (stepFailures.length > 0) {
4104+
this.log.warn(`[MIGRATION] session item imported with step failures: ${stepFailures.join(",")}`);
4105+
} else {
4106+
this.log.info("[MIGRATION] session item imported successfully (all steps)");
4107+
}
40144108
} catch (err) {
40154109
totalErrors++;
40164110
send("item", { index: idx, total: totalMsgs, status: "error", preview: content.slice(0, 120), source: file, agent: agentId, error: String(err).slice(0, 200) });
@@ -4051,7 +4145,14 @@ export class ViewerServer {
40514145
}
40524146

40534147
send("progress", { total: totalProcessed, processed: totalProcessed, phase: "done" });
4054-
send("summary", { totalProcessed, totalStored, totalSkipped, totalErrors });
4148+
send("summary", {
4149+
totalProcessed,
4150+
totalStored,
4151+
totalSkipped,
4152+
totalErrors,
4153+
success: computeMigrationSuccess(this.migrationState),
4154+
stepFailures: this.migrationState.stepFailures,
4155+
});
40554156
}
40564157

40574158
// ─── Post-processing: independent task/skill generation ───
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { describe, it, expect } from "vitest";
2+
import {
3+
applyMigrationItemToState,
4+
computeMigrationSuccess,
5+
createInitialMigrationState,
6+
} from "../src/viewer/server";
7+
8+
describe("migration status step failure reporting", () => {
9+
it("reports success=false when step failures exist even if no fatal errors", () => {
10+
const state = createInitialMigrationState();
11+
12+
applyMigrationItemToState(state, {
13+
status: "stored",
14+
index: 1,
15+
total: 2,
16+
stepFailures: ["embedding"],
17+
});
18+
19+
expect(state.errors).toBe(0);
20+
expect(state.stepFailures.embedding).toBe(1);
21+
expect(state.success).toBe(false);
22+
expect(computeMigrationSuccess(state)).toBe(false);
23+
});
24+
25+
it("keeps success=true for clean imports and flips to false when dedup/summarization fail", () => {
26+
const state = createInitialMigrationState();
27+
28+
applyMigrationItemToState(state, {
29+
status: "stored",
30+
index: 1,
31+
total: 3,
32+
stepFailures: [],
33+
});
34+
35+
expect(state.success).toBe(true);
36+
37+
applyMigrationItemToState(state, {
38+
status: "stored",
39+
index: 2,
40+
total: 3,
41+
stepFailures: ["dedup", "summarization"],
42+
});
43+
44+
expect(state.stepFailures.dedup).toBe(1);
45+
expect(state.stepFailures.summarization).toBe(1);
46+
expect(state.success).toBe(false);
47+
});
48+
49+
it("counts explicit item errors and reports failure", () => {
50+
const state = createInitialMigrationState();
51+
52+
applyMigrationItemToState(state, {
53+
status: "error",
54+
index: 1,
55+
total: 1,
56+
stepFailures: [],
57+
});
58+
59+
expect(state.errors).toBe(1);
60+
expect(state.success).toBe(false);
61+
});
62+
});

0 commit comments

Comments
 (0)