@@ -204,8 +204,8 @@ public protocol StoreProtocol {
204204public final class Store < Model> : ObservableObject , StoreProtocol
205205where Model: ModelProtocol
206206{
207- /// Cancellable for fx subscription.
208- private var cancelFx : AnyCancellable ?
207+ /// Stores cancellables by ID
208+ private( set ) var cancellables : [ UUID : AnyCancellable ] = [ : ]
209209
210210 /// Private for all actions sent to the store.
211211 private var _actions = PassthroughSubject < Model . Action , Never > ( )
@@ -215,17 +215,6 @@ where Model: ModelProtocol
215215 _actions. eraseToAnyPublisher ( )
216216 }
217217
218- /// Source publisher for batches of fx modeled as publishers.
219- private var _fxBatches = PassthroughSubject < Fx < Model . Action > , Never > ( )
220-
221- /// `fx` represents a flat stream of actions from all fx publishers.
222- private var fx : AnyPublisher < Model . Action , Never > {
223- _fxBatches
224- . flatMap ( { publisher in publisher } )
225- . receive ( on: DispatchQueue . main)
226- . eraseToAnyPublisher ( )
227- }
228-
229218 /// Publisher for updates performed on state
230219 private var _updates = PassthroughSubject < Model . UpdateType , Never > ( )
231220
@@ -273,11 +262,6 @@ where Model: ModelProtocol
273262 subsystem: " ObservableStore " ,
274263 category: " Store "
275264 )
276-
277- self . cancelFx = self . fx
278- . sink ( receiveValue: { [ weak self] action in
279- self ? . send ( action)
280- } )
281265 }
282266
283267 /// Initialize with a closure that receives environment.
@@ -318,12 +302,49 @@ where Model: ModelProtocol
318302 self . send ( action)
319303 }
320304
321- /// Subscribe to a publisher of actions, send the actions it publishes
322- /// to the store.
305+ /// Subscribe to a publisher of actions, piping them through to
306+ /// the store.
307+ ///
308+ /// Holds on to the cancellable until publisher completes.
309+ /// When publisher completes, removes cancellable.
323310 public func subscribe( to fx: Fx < Model . Action > ) {
324- self . _fxBatches. send ( fx)
311+ // Create a UUID for the cancellable.
312+ // Store cancellable in dictionary by UUID.
313+ // Remove cancellable from dictionary upon effect completion.
314+ // This retains the effect pipeline for as long as it takes to complete
315+ // the effect, and then removes it, so we don't have a cancellables
316+ // memory leak.
317+ let id = UUID ( )
318+
319+ // Receive Fx on main thread. This does two important things:
320+ //
321+ // First, SwiftUI requires that any state mutations that would change
322+ // views happen on the main thread. Receiving on main ensures that
323+ // all fx-driven state transitions happen on main, even if the
324+ // publisher is off-main-thread.
325+ //
326+ // Second, if we didn't schedule receive on main, it would be possible
327+ // for publishers to complete immediately, causing receiveCompletion
328+ // to attempt to remove the publisher from `cancellables` before
329+ // it is added. By scheduling to receive publisher on main,
330+ // we force publisher to complete on next tick, ensuring that it
331+ // is always first added, then removed from `cancellables`.
332+ let cancellable = fx
333+ . receive (
334+ on: DispatchQueue . main,
335+ options: . init( qos: . default)
336+ )
337+ . sink (
338+ receiveCompletion: { [ weak self] _ in
339+ self ? . cancellables. removeValue ( forKey: id)
340+ } ,
341+ receiveValue: { [ weak self] action in
342+ self ? . send ( action)
343+ }
344+ )
345+ self . cancellables [ id] = cancellable
325346 }
326-
347+
327348 /// Send an action to the store to update state and generate effects.
328349 /// Any effects generated are fed back into the store.
329350 ///
0 commit comments