Skip to content

Commit d8c4e84

Browse files
committed
fix: Ensure teardown happens before notification of complete or error
Resolves #7443
1 parent 400e502 commit d8c4e84

File tree

16 files changed

+111
-132
lines changed

16 files changed

+111
-132
lines changed

packages/observable/src/observable.ts

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,6 @@ export interface SubscriberOverrides<T> {
220220
* will be handled and passed to the destination's `error` method.
221221
*/
222222
complete?: () => void;
223-
/**
224-
* If provided, this function will be called after all teardown has occurred
225-
* for this {@link Subscriber}. This is generally used for cleanup purposes
226-
* during operator development.
227-
*/
228-
finalize?: () => void;
229223
}
230224

231225
/**
@@ -248,8 +242,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
248242
protected readonly _errorOverride: ((err: any) => void) | null = null;
249243
/** @internal */
250244
protected readonly _completeOverride: (() => void) | null = null;
251-
/** @internal */
252-
protected readonly _onFinalize: (() => void) | null = null;
253245

254246
/**
255247
* @deprecated Do not create instances of `Subscriber` directly. Use {@link operate} instead.
@@ -283,7 +275,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
283275
this._nextOverride = overrides?.next ?? null;
284276
this._errorOverride = overrides?.error ?? null;
285277
this._completeOverride = overrides?.complete ?? null;
286-
this._onFinalize = overrides?.finalize ?? null;
287278

288279
// It's important - for performance reasons - that all of this class's
289280
// members are initialized and that they are always initialized in the same
@@ -355,7 +346,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
355346
if (!this.closed) {
356347
this.isStopped = true;
357348
super.unsubscribe();
358-
this._onFinalize?.();
359349
}
360350
}
361351

@@ -364,19 +354,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
364354
}
365355

366356
protected _error(err: any): void {
367-
try {
368-
this.destination.error(err);
369-
} finally {
370-
this.unsubscribe();
371-
}
357+
this.unsubscribe();
358+
this.destination.error(err);
372359
}
373360

374361
protected _complete(): void {
375-
try {
376-
this.destination.complete();
377-
} finally {
378-
this.unsubscribe();
379-
}
362+
this.unsubscribe();
363+
this.destination.complete();
380364
}
381365
}
382366

@@ -428,22 +412,20 @@ function overrideNext<T>(this: Subscriber<T>, value: T): void {
428412
}
429413

430414
function overrideError(this: Subscriber<unknown>, err: any): void {
415+
this.unsubscribe();
431416
try {
432417
this._errorOverride!(err);
433418
} catch (error) {
434419
this.destination.error(error);
435-
} finally {
436-
this.unsubscribe();
437420
}
438421
}
439422

440423
function overrideComplete(this: Subscriber<unknown>): void {
424+
this.unsubscribe();
441425
try {
442426
this._completeOverride!();
443427
} catch (error) {
444428
this.destination.error(error);
445-
} finally {
446-
this.unsubscribe();
447429
}
448430
}
449431

packages/rxjs/spec/observables/dom/webSocket-spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ describe('webSocket', () => {
669669
});
670670
socket.triggerClose({ wasClean: true });
671671

672-
expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B complete', 'B unsub']);
672+
expect(results).to.deep.equal(['A next', 'A unsub', 'B next', 'B next', 'B next', 'B unsub', 'B complete']);
673673
});
674674

675675
it('should not close the socket until all subscriptions complete', () => {

packages/rxjs/src/internal/observable/forkJoin.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ export function forkJoin(...args: any[]): Observable<any> {
165165
}
166166
values[sourceIndex] = value;
167167
},
168-
complete: () => remainingCompletions--,
169-
finalize: () => {
170-
if (!remainingCompletions || !hasValue) {
168+
complete: () => {
169+
remainingCompletions--;
170+
if (remainingCompletions <= 0 || !hasValue) {
171171
if (remainingEmissions === 0) {
172172
destination.next(keys ? createObject(keys, values) : values);
173173
destination.complete();

packages/rxjs/src/internal/operators/bufferCount.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
6363
let buffers: T[][] = [];
6464
let count = 0;
6565

66+
destination.add(() => {
67+
// Clean up our memory when we finalize
68+
buffers = null!;
69+
});
70+
6671
source.subscribe(
6772
operate({
6873
destination,
@@ -108,10 +113,6 @@ export function bufferCount<T>(bufferSize: number, startBufferEvery: number | nu
108113
}
109114
destination.complete();
110115
},
111-
finalize: () => {
112-
// Clean up our memory when we finalize
113-
buffers = null!;
114-
},
115116
})
116117
);
117118
});

packages/rxjs/src/internal/operators/bufferTime.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
8383
// this is only really used for when *just* the buffer time span is passed.
8484
let restartOnEmit = false;
8585

86+
destination.add(() => {
87+
bufferRecords = null;
88+
});
89+
8690
/**
8791
* Does the work of emitting the buffer from the record, ensuring that the
8892
* record is removed before the emission so reentrant code (from some custom scheduling, perhaps)
@@ -153,8 +157,6 @@ export function bufferTime<T>(bufferTimeSpan: number, ...otherArgs: any[]): Oper
153157
destination.complete();
154158
destination.unsubscribe();
155159
},
156-
// Clean up
157-
finalize: () => (bufferRecords = null),
158160
});
159161

160162
source.subscribe(bufferTimeSubscriber);

packages/rxjs/src/internal/operators/bufferWhen.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Subscriber} from '@rxjs/observable';
1+
import type { Subscriber } from '@rxjs/observable';
22
import { operate, Observable, from } from '@rxjs/observable';
33
import type { ObservableInput, OperatorFunction } from '../types.js';
44
import { noop } from '../util/noop.js';
@@ -43,14 +43,18 @@ import { noop } from '../util/noop.js';
4343
*/
4444
export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]> {
4545
return (source) =>
46-
new Observable((subscriber) => {
46+
new Observable((destination) => {
4747
// The buffer we keep and emit.
4848
let buffer: T[] | null = null;
4949
// A reference to the subscriber used to subscribe to
5050
// the closing notifier. We need to hold this so we can
5151
// end the subscription after the first notification.
5252
let closingSubscriber: Subscriber<T> | null = null;
5353

54+
destination.add(() => {
55+
buffer = closingSubscriber = null!;
56+
});
57+
5458
// Ends the previous closing notifier subscription, so it
5559
// terminates after the first emission, then emits
5660
// the current buffer if there is one, starts a new buffer, and starts a
@@ -62,12 +66,12 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
6266
// emit the buffer if we have one, and start a new buffer.
6367
const b = buffer;
6468
buffer = [];
65-
b && subscriber.next(b);
69+
b && destination.next(b);
6670

6771
// Get a new closing notifier and subscribe to it.
6872
from(closingSelector()).subscribe(
6973
(closingSubscriber = operate({
70-
destination: subscriber,
74+
destination,
7175
next: openBuffer,
7276
complete: noop,
7377
}))
@@ -80,17 +84,15 @@ export function bufferWhen<T>(closingSelector: () => ObservableInput<any>): Oper
8084
// Subscribe to our source.
8185
source.subscribe(
8286
operate({
83-
destination: subscriber,
87+
destination,
8488
// Add every new value to the current buffer.
8589
next: (value) => buffer?.push(value),
8690
// When we complete, emit the buffer if we have one,
8791
// then complete the result.
8892
complete: () => {
89-
buffer && subscriber.next(buffer);
90-
subscriber.complete();
93+
buffer && destination.next(buffer);
94+
destination.complete();
9195
},
92-
// Release memory on finalization
93-
finalize: () => (buffer = closingSubscriber = null!),
9496
})
9597
);
9698
});

packages/rxjs/src/internal/operators/debounce.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Subscriber} from '@rxjs/observable';
1+
import type { Subscriber } from '@rxjs/observable';
22
import { operate, Observable, from } from '@rxjs/observable';
33
import type { MonoTypeOperatorFunction, ObservableInput } from '../types.js';
44
import { noop } from '../util/noop.js';
@@ -69,6 +69,10 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
6969
// The subscriber/subscription for the current debounce, if there is one.
7070
let durationSubscriber: Subscriber<any> | null = null;
7171

72+
destination.add(() => {
73+
lastValue = durationSubscriber = null;
74+
});
75+
7276
const emit = () => {
7377
// Unsubscribe any current debounce subscription we have,
7478
// we only cared about the first notification from it, and we
@@ -106,10 +110,6 @@ export function debounce<T>(durationSelector: (value: T) => ObservableInput<any>
106110
emit();
107111
destination.complete();
108112
},
109-
finalize: () => {
110-
// Finalization.
111-
lastValue = durationSubscriber = null;
112-
},
113113
})
114114
);
115115
});

packages/rxjs/src/internal/operators/debounceTime.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
6666
let lastValue: T;
6767
let activeTask: Subscription | void;
6868

69+
destination.add(() => {
70+
lastValue = activeTask = null!;
71+
});
72+
6973
source.subscribe(
7074
operate({
7175
destination,
@@ -94,10 +98,6 @@ export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyn
9498
}
9599
destination.complete();
96100
},
97-
finalize: () => {
98-
// Finalization.
99-
lastValue = activeTask = null!;
100-
},
101101
})
102102
);
103103
});

packages/rxjs/src/internal/operators/groupBy.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ export function groupBy<T, K, R>(
153153
// A lookup for the groups that we have so far.
154154
const groups = new Map<K, SubjectLike<any>>();
155155

156+
destination.add(() => {
157+
// Free up memory.
158+
groups.clear();
159+
});
160+
156161
// Used for notifying all groups and the subscriber in the same way.
157162
const notify = (cb: (group: Observer<any>) => void) => {
158163
groups.forEach(cb);
@@ -202,9 +207,18 @@ export function groupBy<T, K, R>(
202207
// Our duration notified! We can complete the group.
203208
// The group will be removed from the map in the finalization phase.
204209
group!.complete();
210+
groups.delete(key);
211+
durationSubscriber?.unsubscribe();
212+
},
213+
error: (err) => {
214+
group!.error(err);
215+
groups.delete(key);
216+
durationSubscriber?.unsubscribe();
217+
},
218+
complete: () => {
219+
groups.delete(key);
205220
durationSubscriber?.unsubscribe();
206221
},
207-
finalize: () => groups.delete(key),
208222
});
209223

210224
// Start our duration notifier.
@@ -222,11 +236,6 @@ export function groupBy<T, K, R>(
222236
error: handleError,
223237
// Source completes.
224238
complete: () => notify((consumer) => consumer.complete()),
225-
// Free up memory.
226-
// When the source subscription is _finally_ torn down, release the subjects and keys
227-
// in our groups Map, they may be quite large and we don't want to keep them around if we
228-
// don't have to.
229-
finalize: () => groups.clear(),
230239
});
231240

232241
// Subscribe to the source

packages/rxjs/src/internal/operators/mergeInternals.ts

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,6 @@ export function mergeInternals<T, R>(
5757
// against our concurrency limit later.
5858
active++;
5959

60-
// A flag used to show that the inner observable completed.
61-
// This is checked during finalization to see if we should
62-
// move to the next item in the buffer, if there is on.
63-
let innerComplete = false;
64-
6560
// Start our inner subscription.
6661
from(project(value, index++)).subscribe(
6762
operate({
@@ -81,37 +76,18 @@ export function mergeInternals<T, R>(
8176
}
8277
},
8378
complete: () => {
84-
// Flag that we have completed, so we know to check the buffer
85-
// during finalization.
86-
innerComplete = true;
87-
},
88-
finalize: () => {
89-
// During finalization, if the inner completed (it wasn't errored or
90-
// cancelled), then we want to try the next item in the buffer if
91-
// there is one.
92-
if (innerComplete) {
93-
// We have to wrap this in a try/catch because it happens during
94-
// finalization, possibly asynchronously, and we want to pass
95-
// any errors that happen (like in a projection function) to
96-
// the outer Subscriber.
97-
try {
98-
// INNER SOURCE COMPLETE
99-
// Decrement the active count to ensure that the next time
100-
// we try to call `doInnerSub`, the number is accurate.
101-
active--;
102-
// If we have more values in the buffer, try to process those
103-
// Note that this call will increment `active` ahead of the
104-
// next conditional, if there were any more inner subscriptions
105-
// to start.
106-
while (buffer.length && active < concurrent) {
107-
doInnerSub(buffer.shift()!);
108-
}
109-
// Check to see if we can complete, and complete if so.
110-
checkComplete();
111-
} catch (err) {
112-
destination.error(err);
113-
}
79+
// Decrement the active count to ensure that the next time
80+
// we try to call `doInnerSub`, the number is accurate.
81+
active--;
82+
// If we have more values in the buffer, try to process those
83+
// Note that this call will increment `active` ahead of the
84+
// next conditional, if there were any more inner subscriptions
85+
// to start.
86+
while (buffer.length && active < concurrent) {
87+
doInnerSub(buffer.shift()!);
11488
}
89+
// Check to see if we can complete, and complete if so.
90+
checkComplete();
11591
},
11692
})
11793
);

0 commit comments

Comments
 (0)