Skip to content

Commit b437977

Browse files
🤖 feat: add backend support for soft-interrupts (#767)
Adds a `soft: boolean` option to `interruptStream()` that sets a pending flag instead of immediately aborting. The stream manager checks this flag at content boundaries (end of text blocks, tool results, reasoning sections) and gracefully terminates at the next one. This lays the groundwork for automatic context management—when the agent approaches the context window limit, we can issue a soft interrupt to let the current thought complete cleanly before triggering compaction. This avoids cutting off mid-sentence or mid-tool-call, producing cleaner conversation history for the compaction summary. The frontend currently always uses hard interrupts; the soft interrupt path is exercised only by tests for now. A following PR will use this code. I have an integration test to add back once the sendMessage integration test suite is restored..... --- _Generated with `mux`_
1 parent df30cbc commit b437977

File tree

6 files changed

+121
-43
lines changed

6 files changed

+121
-43
lines changed

bun.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"lockfileVersion": 1,
3+
"configVersion": 0,
34
"workspaces": {
45
"": {
56
"name": "mux",

src/common/orpc/schemas/api.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,12 @@ export const workspace = {
185185
interruptStream: {
186186
input: z.object({
187187
workspaceId: z.string(),
188-
options: z.object({ abandonPartial: z.boolean().optional() }).optional(),
188+
options: z
189+
.object({
190+
soft: z.boolean().optional(),
191+
abandonPartial: z.boolean().optional(),
192+
})
193+
.optional(),
189194
}),
190195
output: ResultSchema(z.void(), z.string()),
191196
},

src/node/services/agentSession.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ export class AgentSession {
293293
if (this.aiService.isStreaming(this.workspaceId)) {
294294
// MUST use abandonPartial=true to prevent handleAbort from performing partial compaction
295295
// with mismatched history (since we're about to truncate it)
296-
const stopResult = await this.interruptStream(/* abandonPartial */ true);
296+
const stopResult = await this.interruptStream({ abandonPartial: true });
297297
if (!stopResult.success) {
298298
return Err(createUnknownSendMessageError(stopResult.error));
299299
}
@@ -405,24 +405,27 @@ export class AgentSession {
405405
return this.streamWithHistory(model, options);
406406
}
407407

408-
async interruptStream(abandonPartial?: boolean): Promise<Result<void>> {
408+
async interruptStream(options?: {
409+
soft?: boolean;
410+
abandonPartial?: boolean;
411+
}): Promise<Result<void>> {
409412
this.assertNotDisposed("interruptStream");
410413

411414
if (!this.aiService.isStreaming(this.workspaceId)) {
412415
return Ok(undefined);
413416
}
414417

415-
// Delete partial BEFORE stopping to prevent abort handler from committing it
416-
// The abort handler in aiService.ts runs immediately when stopStream is called,
417-
// so we must delete first to ensure it finds no partial to commit
418-
if (abandonPartial) {
418+
// For hard interrupts, delete partial BEFORE stopping to prevent abort handler
419+
// from committing it. For soft interrupts, defer to stream-abort handler since
420+
// the stream continues running and would recreate the partial.
421+
if (options?.abandonPartial && !options?.soft) {
419422
const deleteResult = await this.partialService.deletePartial(this.workspaceId);
420423
if (!deleteResult.success) {
421424
return Err(deleteResult.error);
422425
}
423426
}
424427

425-
const stopResult = await this.aiService.stopStream(this.workspaceId, abandonPartial);
428+
const stopResult = await this.aiService.stopStream(this.workspaceId, options);
426429
if (!stopResult.success) {
427430
return Err(stopResult.error);
428431
}

src/node/services/aiService.ts

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -197,17 +197,20 @@ export class AIService extends EventEmitter {
197197
this.streamManager.on("stream-delta", (data) => this.emit("stream-delta", data));
198198
this.streamManager.on("stream-end", (data) => this.emit("stream-end", data));
199199

200-
// Handle stream-abort: commit partial to history before forwarding
201-
// Note: If abandonPartial option was used, partial is already deleted by IPC handler
200+
// Handle stream-abort: dispose of partial based on abandonPartial flag
202201
this.streamManager.on("stream-abort", (data: StreamAbortEvent) => {
203202
void (async () => {
204-
// Check if partial still exists (not abandoned)
205-
const partial = await this.partialService.readPartial(data.workspaceId);
206-
if (partial) {
203+
if (data.abandonPartial) {
204+
// Caller requested discarding partial - delete without committing
205+
await this.partialService.deletePartial(data.workspaceId);
206+
} else {
207207
// Commit interrupted message to history with partial:true metadata
208208
// This ensures /clear and /truncate can clean up interrupted messages
209-
await this.partialService.commitToHistory(data.workspaceId);
210-
await this.partialService.deletePartial(data.workspaceId);
209+
const partial = await this.partialService.readPartial(data.workspaceId);
210+
if (partial) {
211+
await this.partialService.commitToHistory(data.workspaceId);
212+
await this.partialService.deletePartial(data.workspaceId);
213+
}
211214
}
212215

213216
// Forward abort event to consumers
@@ -1084,12 +1087,15 @@ export class AIService extends EventEmitter {
10841087
}
10851088
}
10861089

1087-
async stopStream(workspaceId: string, abandonPartial?: boolean): Promise<Result<void>> {
1090+
async stopStream(
1091+
workspaceId: string,
1092+
options?: { soft?: boolean; abandonPartial?: boolean }
1093+
): Promise<Result<void>> {
10881094
if (this.mockModeEnabled && this.mockScenarioPlayer) {
10891095
this.mockScenarioPlayer.stop(workspaceId);
10901096
return Ok(undefined);
10911097
}
1092-
return this.streamManager.stopStream(workspaceId, abandonPartial);
1098+
return this.streamManager.stopStream(workspaceId, options);
10931099
}
10941100

10951101
/**

src/node/services/streamManager.ts

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ interface WorkspaceStreamInfo {
112112
partialWritePromise?: Promise<void>;
113113
// Track background processing promise for guaranteed cleanup
114114
processingPromise: Promise<void>;
115+
// Soft-interrupt state: when pending, stream will end at next block boundary
116+
softInterrupt: { pending: false } | { pending: true; abandonPartial: boolean };
115117
// Temporary directory for tool outputs (auto-cleaned when stream ends)
116118
runtimeTempDir: string;
117119
// Runtime for temp directory cleanup
@@ -418,38 +420,73 @@ export class StreamManager extends EventEmitter {
418420
): Promise<void> {
419421
try {
420422
streamInfo.state = StreamState.STOPPING;
421-
422423
// Flush any pending partial write immediately (preserves work on interruption)
423424
await this.flushPartialWrite(workspaceId, streamInfo);
424425

425426
streamInfo.abortController.abort();
426427

427-
// CRITICAL: Wait for processing to fully complete before cleanup
428-
// This prevents race conditions where the old stream is still running
429-
// while a new stream starts (e.g., old stream writing to partial.json)
430-
await streamInfo.processingPromise;
428+
await this.cleanupStream(workspaceId, streamInfo, abandonPartial);
429+
} catch (error) {
430+
console.error("Error during stream cancellation:", error);
431+
// Force cleanup even if cancellation fails
432+
this.workspaceStreams.delete(workspaceId);
433+
}
434+
}
431435

432-
// Get usage and duration metadata (usage may be undefined if aborted early)
433-
const { usage, duration } = await this.getStreamMetadata(streamInfo);
436+
// Checks if a soft interrupt is necessary, and performs one if so
437+
// Similar to cancelStreamSafely but performs cleanup without blocking
438+
private async checkSoftCancelStream(
439+
workspaceId: WorkspaceId,
440+
streamInfo: WorkspaceStreamInfo
441+
): Promise<void> {
442+
if (!streamInfo.softInterrupt.pending) return;
443+
try {
444+
streamInfo.state = StreamState.STOPPING;
434445

435-
// Emit abort event with usage if available
436-
this.emit("stream-abort", {
437-
type: "stream-abort",
438-
workspaceId: workspaceId as string,
439-
messageId: streamInfo.messageId,
440-
metadata: { usage, duration },
441-
abandonPartial,
442-
});
446+
// Flush any pending partial write immediately (preserves work on interruption)
447+
await this.flushPartialWrite(workspaceId, streamInfo);
443448

444-
// Clean up immediately
445-
this.workspaceStreams.delete(workspaceId);
449+
streamInfo.abortController.abort();
450+
451+
// Return back to the stream loop so we can wait for it to finish before
452+
// sending the stream abort event.
453+
const abandonPartial = streamInfo.softInterrupt.pending
454+
? streamInfo.softInterrupt.abandonPartial
455+
: false;
456+
void this.cleanupStream(workspaceId, streamInfo, abandonPartial);
446457
} catch (error) {
447458
console.error("Error during stream cancellation:", error);
448459
// Force cleanup even if cancellation fails
449460
this.workspaceStreams.delete(workspaceId);
450461
}
451462
}
452463

464+
private async cleanupStream(
465+
workspaceId: WorkspaceId,
466+
streamInfo: WorkspaceStreamInfo,
467+
abandonPartial?: boolean
468+
): Promise<void> {
469+
// CRITICAL: Wait for processing to fully complete before cleanup
470+
// This prevents race conditions where the old stream is still running
471+
// while a new stream starts (e.g., old stream writing to partial.json)
472+
await streamInfo.processingPromise;
473+
474+
// Get usage and duration metadata (usage may be undefined if aborted early)
475+
const { usage, duration } = await this.getStreamMetadata(streamInfo);
476+
477+
// Emit abort event with usage if available
478+
this.emit("stream-abort", {
479+
type: "stream-abort",
480+
workspaceId: workspaceId as string,
481+
messageId: streamInfo.messageId,
482+
metadata: { usage, duration },
483+
abandonPartial,
484+
});
485+
486+
// Clean up immediately
487+
this.workspaceStreams.delete(workspaceId);
488+
}
489+
453490
/**
454491
* Atomically creates a new stream with all necessary setup
455492
*/
@@ -555,6 +592,7 @@ export class StreamManager extends EventEmitter {
555592
lastPartialWriteTime: 0, // Initialize to 0 to allow immediate first write
556593
partialWritePromise: undefined, // No write in flight initially
557594
processingPromise: Promise.resolve(), // Placeholder, overwritten in startStream
595+
softInterrupt: { pending: false },
558596
runtimeTempDir, // Stream-scoped temp directory for tool outputs
559597
runtime, // Runtime for temp directory cleanup
560598
};
@@ -718,6 +756,7 @@ export class StreamManager extends EventEmitter {
718756
workspaceId: workspaceId as string,
719757
messageId: streamInfo.messageId,
720758
});
759+
await this.checkSoftCancelStream(workspaceId, streamInfo);
721760
break;
722761
}
723762

@@ -772,6 +811,7 @@ export class StreamManager extends EventEmitter {
772811
strippedOutput
773812
);
774813
}
814+
await this.checkSoftCancelStream(workspaceId, streamInfo);
775815
break;
776816
}
777817

@@ -808,6 +848,7 @@ export class StreamManager extends EventEmitter {
808848
toolErrorPart.toolName,
809849
errorOutput
810850
);
851+
await this.checkSoftCancelStream(workspaceId, streamInfo);
811852
break;
812853
}
813854

@@ -852,6 +893,7 @@ export class StreamManager extends EventEmitter {
852893
case "start":
853894
case "start-step":
854895
case "text-start":
896+
case "finish":
855897
// These events can be logged or handled if needed
856898
break;
857899

@@ -869,13 +911,14 @@ export class StreamManager extends EventEmitter {
869911
usage: finishStepPart.usage,
870912
};
871913
this.emit("usage-delta", usageEvent);
914+
await this.checkSoftCancelStream(workspaceId, streamInfo);
872915
break;
873916
}
874917

875-
case "finish":
876-
// No usage-delta here - totalUsage sums all steps, not current context.
877-
// Last finish-step already has correct context window usage.
918+
case "text-end": {
919+
await this.checkSoftCancelStream(workspaceId, streamInfo);
878920
break;
921+
}
879922
}
880923
}
881924

@@ -1363,14 +1406,32 @@ export class StreamManager extends EventEmitter {
13631406

13641407
/**
13651408
* Stops an active stream for a workspace
1409+
* First call: Sets soft interrupt and emits delta event → frontend shows "Interrupting..."
1410+
* Second call: Hard aborts the stream immediately
13661411
*/
1367-
async stopStream(workspaceId: string, abandonPartial?: boolean): Promise<Result<void>> {
1412+
async stopStream(
1413+
workspaceId: string,
1414+
options?: { soft?: boolean; abandonPartial?: boolean }
1415+
): Promise<Result<void>> {
13681416
const typedWorkspaceId = workspaceId as WorkspaceId;
13691417

13701418
try {
13711419
const streamInfo = this.workspaceStreams.get(typedWorkspaceId);
1372-
if (streamInfo) {
1373-
await this.cancelStreamSafely(typedWorkspaceId, streamInfo, abandonPartial);
1420+
if (!streamInfo) {
1421+
return Ok(undefined); // No active stream
1422+
}
1423+
1424+
const soft = options?.soft ?? false;
1425+
1426+
if (soft) {
1427+
// Soft interrupt: set flag, will cancel at next block boundary
1428+
streamInfo.softInterrupt = {
1429+
pending: true,
1430+
abandonPartial: options?.abandonPartial ?? false,
1431+
};
1432+
} else {
1433+
// Hard interrupt: cancel immediately
1434+
await this.cancelStreamSafely(typedWorkspaceId, streamInfo, options?.abandonPartial);
13741435
}
13751436
return Ok(undefined);
13761437
} catch (error) {

src/node/services/workspaceService.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -875,17 +875,19 @@ export class WorkspaceService extends EventEmitter {
875875

876876
async interruptStream(
877877
workspaceId: string,
878-
options?: { abandonPartial?: boolean }
878+
options?: { soft?: boolean; abandonPartial?: boolean }
879879
): Promise<Result<void>> {
880880
try {
881881
const session = this.getOrCreateSession(workspaceId);
882-
const stopResult = await session.interruptStream(options?.abandonPartial);
882+
const stopResult = await session.interruptStream(options);
883883
if (!stopResult.success) {
884884
log.error("Failed to stop stream:", stopResult.error);
885885
return Err(stopResult.error);
886886
}
887887

888-
if (options?.abandonPartial) {
888+
// For hard interrupts, delete partial immediately. For soft interrupts,
889+
// defer to stream-abort handler (stream is still running and may recreate partial).
890+
if (options?.abandonPartial && !options?.soft) {
889891
log.debug("Abandoning partial for workspace:", workspaceId);
890892
await this.partialService.deletePartial(workspaceId);
891893
}

0 commit comments

Comments
 (0)