From 383047d7cacb51e08a49634d16b4003f33b7c0ae Mon Sep 17 00:00:00 2001 From: Manfred Steyer Date: Fri, 8 Aug 2025 23:15:41 +0200 Subject: [PATCH] feat: add mutation api rxMutation creates a mutation function in the store delegating to a function/method returning an observable and takes care of race conditions by internally using mergeMap (default), switchMap, concatMap, or exhaustMap. mutation also creates an async mutation function delegating to a function/method returning an observable. When the async function is awaited without throwing an exception, the caller can safely assume that it has been executed. However, here, the consumer is responsible for preventing race conditions. --- libs/ngrx-toolkit/src/lib/mutation.spec.ts | 96 +++++++++++++ libs/ngrx-toolkit/src/lib/mutation.ts | 156 +++++++++++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 libs/ngrx-toolkit/src/lib/mutation.spec.ts create mode 100644 libs/ngrx-toolkit/src/lib/mutation.ts diff --git a/libs/ngrx-toolkit/src/lib/mutation.spec.ts b/libs/ngrx-toolkit/src/lib/mutation.spec.ts new file mode 100644 index 00000000..f6259e70 --- /dev/null +++ b/libs/ngrx-toolkit/src/lib/mutation.spec.ts @@ -0,0 +1,96 @@ +import { fakeAsync, TestBed, tick } from '@angular/core/testing'; +import { patchState, signalStore, withMethods, withState } from '@ngrx/signals'; +import { delay, Observable, of } from 'rxjs'; +import { mutation, rxMutation, withMutations } from './mutation'; + +function calcDouble(value: number): Observable { + return of(value * 2).pipe(delay(1000)); +} + +describe('mutation', () => { + it('rxMutation should update the state', fakeAsync(() => { + TestBed.runInInjectionContext(() => { + const Store = signalStore( + withState({ counter: 3 }), + withMutations(), + withMethods((store) => ({ + increment: rxMutation(store, { + operation: (value: number) => calcDouble(value), + onSuccess: (_params, result) => { + patchState(store, (state) => ({ + counter: state.counter + result, + })); + }, + }), + })), + ); + const store = new Store(); + + store.increment(2); + tick(1000); + expect(store.counter()).toEqual(7); + }); + })); + + it('rxMutation deals with race conditions', fakeAsync(() => { + TestBed.runInInjectionContext(() => { + const Store = signalStore( + withState({ counter: 3 }), + withMutations(), + withMethods((store) => ({ + increment: rxMutation(store, { + operation: (value: number) => calcDouble(value), + onSuccess: (_params, result) => { + patchState(store, (state) => ({ + counter: state.counter + result, + })); + }, + operator: 'switch', + }), + })), + ); + + const store = new Store(); + + const successSpy = jest.fn(); + const errorSpy = jest.fn(); + + store.increment.success.subscribe(successSpy); + store.increment.error.subscribe(errorSpy); + + store.increment(1); + tick(500); + store.increment(2); + tick(1000); + + expect(store.counter()).toEqual(7); + expect(successSpy).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledTimes(0); + expect(successSpy).toHaveBeenCalledWith({ params: 2, result: 4 }); + }); + })); + + it('mutation should update the state', fakeAsync(() => { + TestBed.runInInjectionContext(() => { + const Store = signalStore( + withState({ counter: 3 }), + withMutations(), + withMethods((store) => ({ + increment: mutation(store, { + operation: (value: number) => calcDouble(value), + onSuccess: (_params, result) => { + patchState(store, (state) => ({ + counter: state.counter + result, + })); + }, + }), + })), + ); + const store = new Store(); + + store.increment(2); + tick(1000); + expect(store.counter()).toEqual(7); + }); + })); +}); diff --git a/libs/ngrx-toolkit/src/lib/mutation.ts b/libs/ngrx-toolkit/src/lib/mutation.ts new file mode 100644 index 00000000..531729b0 --- /dev/null +++ b/libs/ngrx-toolkit/src/lib/mutation.ts @@ -0,0 +1,156 @@ +import { computed, DestroyRef, inject, Injector } from '@angular/core'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; +import { + patchState, + signalStoreFeature, + withComputed, + withState, + WritableStateSource, +} from '@ngrx/signals'; +import { + catchError, + concatMap, + exhaustMap, + firstValueFrom, + mergeMap, + Observable, + of, + OperatorFunction, + Subject, + switchMap, + tap, +} from 'rxjs'; + +// +// CREDITS: This implementation is highly influenced by Marko Stanimirović' prototype: +// https://github.com/markostanimirovic/rx-resource-proto +// +// Big thanks to Marko for sharing his knowledge and for his great work! +// + +export interface MutationState { + _mutationCount: number; + mutationError: string | null; +} + +const incrementCounter = (state: MutationState) => ({ + ...state, + _mutationCount: state._mutationCount + 1, +}); +const decrementCounter = (state: MutationState) => ({ + ...state, + _mutationCount: state._mutationCount - 1, +}); + +export const clearMutationError = (state: MutationState) => ({ + ...state, + mutationError: null, +}); + +export type FlatteningOperator = 'merge' | 'concat' | 'switch' | 'exhaust'; + +export interface MutationOptions { + operation: (params: P) => Observable; + onSuccess?: (params: P, result: R) => void; + onError?: (params: P, error: unknown) => string | void; + operator?: FlatteningOperator; + injector?: Injector; +} + +function flatten( + operation: (params: P) => Observable, + operator: FlatteningOperator, +): OperatorFunction { + switch (operator) { + case 'concat': + return concatMap(operation); + case 'switch': + return switchMap(operation); + case 'exhaust': + return exhaustMap(operation); + case 'merge': + default: + return mergeMap(operation); + } +} + +export function rxMutation( + store: WritableStateSource, + options: MutationOptions, +) { + const destroyRef = options.injector?.get(DestroyRef) || inject(DestroyRef); + const mutationSubject = new Subject

(); + + const successSubject = new Subject<{ params: P; result: R }>(); + const errorSubject = new Subject<{ params: P; error: unknown }>(); + + const operator = options.operator || 'merge'; + const flatteningOp = flatten((params: P) => { + return options.operation(params).pipe( + tap((result) => { + options.onSuccess?.(params, result); + patchState(store, decrementCounter); + successSubject.next({ params, result }); + }), + catchError((error) => { + console.error('Mutation error:', error); + const mutationError = + options.onError?.(params, error) ?? + error.message ?? + 'Mutation failed'; + patchState(store, mutationError, decrementCounter); + errorSubject.next({ params, error }); + return of(null); + }), + ); + }, operator); + + mutationSubject + .pipe(flatteningOp, takeUntilDestroyed(destroyRef)) + .subscribe(); + + const result = (params: P) => { + patchState(store, incrementCounter); + mutationSubject.next(params); + }; + + result.success = successSubject.asObservable(); + result.error = errorSubject.asObservable(); + + return result; +} + +export function mutation( + store: WritableStateSource, + options: MutationOptions, +): (params: P) => Promise { + return async (params: P): Promise => { + patchState(store, incrementCounter); + try { + const result = await firstValueFrom(options.operation(params)); + options.onSuccess?.(params, result); + return result; + } catch (error) { + console.error('Mutation error:', error); + const mutationError = + options.onError?.(params, error) ?? + (error instanceof Error ? error.message : 'Mutation failed'); + patchState(store, { mutationError }); + throw error; + } finally { + patchState(store, decrementCounter); + } + }; +} + +export function withMutations() { + return signalStoreFeature( + withState({ + _mutationCount: 0, + mutationError: null, + }), + withComputed((state) => ({ + isProcessing: computed(() => state._mutationCount() > 0), + })), + ); +}