Skip to content

Commit 50a4c48

Browse files
authored
Ensure TransformOnObservable preserves ChangeSet order (#1008)
* Addresses #1007 by ensuring that all changes are processed in order instead of processing remove changes after the others. * Created KeyedDisposable helper class * More unit tests
1 parent ce443e9 commit 50a4c48

File tree

3 files changed

+248
-36
lines changed

3 files changed

+248
-36
lines changed

src/DynamicData.Tests/Cache/TransformOnObservableFixture.cs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
22
using System.Linq;
33
using System.Reactive;
4+
using System.Reactive.Concurrency;
45
using System.Reactive.Linq;
6+
using System.Reactive.Subjects;
57
using System.Threading.Tasks;
68
using Bogus;
79
using DynamicData.Kernel;
@@ -141,13 +143,27 @@ IObservable<string> CreateChildObs(Animal a, int id) =>
141143
results.IsCompleted.Should().Be(completeSource && completeChildren);
142144
}
143145

146+
[Fact]
147+
public void ResultFailsIfChildFails()
148+
{
149+
// Arrange
150+
var expectedError = new Exception("Expected");
151+
var throwObservable = Observable.Throw<IChangeSet<Animal, int>>(expectedError);
152+
153+
// Act
154+
using var results = _animalCache.Connect().TransformOnObservable(_ => throwObservable).AsAggregator();
155+
156+
// Assert
157+
results.Error.Should().Be(expectedError);
158+
}
159+
144160
[Fact]
145161
public void ResultFailsIfSourceFails()
146162
{
147163
// Arrange
148164
var expectedError = new Exception("Expected");
149165
var throwObservable = Observable.Throw<IChangeSet<Animal, int>>(expectedError);
150-
using var results = _animalCache.Connect().Concat(throwObservable).TransformOnObservable(animal => Observable.Return(animal)).AsAggregator();
166+
using var results = _animalCache.Connect().Concat(throwObservable).TransformOnObservable(Observable.Return).AsAggregator();
151167

152168
// Act
153169
_animalCache.Dispose();
@@ -156,6 +172,39 @@ public void ResultFailsIfSourceFails()
156172
results.Error.Should().Be(expectedError);
157173
}
158174

175+
[Theory]
176+
[InlineData(true)]
177+
[InlineData(false)]
178+
public void OrderOfChangesIsPreserved(bool removeFirst)
179+
{
180+
// Arrange
181+
using var results = _animalCache.Connect().TransformOnObservable(Observable.Return).AsAggregator();
182+
(var firstReason, var nextReason, var expectedChanges) = removeFirst
183+
? (ChangeReason.Remove, ChangeReason.Add, InitialCount * 2)
184+
: (ChangeReason.Add, ChangeReason.Remove, InitialCount * 3);
185+
186+
// Act
187+
_animalCache.Edit(updater =>
188+
{
189+
if (removeFirst)
190+
{
191+
updater.Clear();
192+
updater.AddOrUpdate(_animalFaker.Generate(InitialCount));
193+
}
194+
else
195+
{
196+
updater.AddOrUpdate(_animalFaker.Generate(InitialCount));
197+
updater.Clear();
198+
}
199+
});
200+
201+
// Assert
202+
results.Messages.Count.Should().Be(2);
203+
results.Messages[1].Count.Should().Be(expectedChanges);
204+
results.Messages[1].Take(InitialCount).All(change => change.Reason == firstReason).Should().BeTrue();
205+
results.Messages[1].Skip(InitialCount).All(change => change.Reason == nextReason).Should().BeTrue();
206+
}
207+
159208
public void Dispose()
160209
{
161210
_animalCache.Dispose();

src/DynamicData/Cache/Internal/TransformOnObservable.cs

Lines changed: 118 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,63 +2,146 @@
22
// Roland Pheasant licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for full license information.
44

5+
using System.Diagnostics;
56
using System.Reactive.Disposables;
67
using System.Reactive.Linq;
78
using DynamicData.Internal;
89

910
namespace DynamicData.Cache.Internal;
1011

11-
internal sealed class TransformOnObservable<TSource, TKey, TDestination>(IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, IObservable<TDestination>> transform)
12+
internal sealed class TransformOnObservable<TSource, TKey, TDestination>(IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, IObservable<TDestination>> transform, bool transformOnRefresh = false)
1213
where TSource : notnull
1314
where TKey : notnull
1415
where TDestination : notnull
1516
{
16-
public IObservable<IChangeSet<TDestination, TKey>> Run() => Observable.Create<IChangeSet<TDestination, TKey>>(observer =>
17+
public IObservable<IChangeSet<TDestination, TKey>> Run() =>
18+
Observable.Create<IChangeSet<TDestination, TKey>>(observer => new Subscription(source, transform, observer, transformOnRefresh));
19+
20+
// Maintains state for a single subscription
21+
private sealed class Subscription : IDisposable
1722
{
18-
var cache = new ChangeAwareCache<TDestination, TKey>();
19-
var locker = InternalEx.NewLock();
20-
var parentUpdate = false;
23+
#if NET9_0_OR_GREATER
24+
private readonly Lock _synchronize = new();
25+
#else
26+
private readonly object _synchronize = new();
27+
#endif
28+
private readonly ChangeAwareCache<TDestination, TKey> _cache = new();
29+
private readonly KeyedDisposable<TKey> _transformSubscriptions = new();
30+
private readonly Func<TSource, TKey, IObservable<TDestination>> _transform;
31+
private readonly IDisposable _sourceSubscription;
32+
private readonly IObserver<IChangeSet<TDestination, TKey>> _observer;
33+
private readonly bool _transformOnRefresh;
34+
private int _subscriptionCounter = 1;
35+
private int _updateCounter;
36+
37+
public Subscription(IObservable<IChangeSet<TSource, TKey>> source, Func<TSource, TKey, IObservable<TDestination>> transform, IObserver<IChangeSet<TDestination, TKey>> observer, bool transformOnRefresh)
38+
{
39+
_observer = observer;
40+
_transform = transform;
41+
_transformOnRefresh = transformOnRefresh;
42+
_sourceSubscription = source
43+
.Do(_ => IncrementUpdates())
44+
.Synchronize(_synchronize)
45+
.SubscribeSafe(ProcessSourceChangeSet, observer.OnError, CheckCompleted);
46+
}
47+
48+
public void Dispose()
49+
{
50+
lock (_synchronize)
51+
{
52+
_sourceSubscription.Dispose();
53+
_transformSubscriptions.Dispose();
54+
}
55+
}
2156

22-
// Helper to emit any pending changes when appropriate
23-
void EmitChanges(bool fromParent)
57+
private void ProcessSourceChangeSet(IChangeSet<TSource, TKey> changes)
2458
{
25-
if (fromParent || !parentUpdate)
59+
// Process all the changes at once to preserve the changeset order
60+
foreach (var change in changes.ToConcreteType())
2661
{
27-
var changes = cache!.CaptureChanges();
62+
switch (change.Reason)
63+
{
64+
// Shutdown existing sub (if any) and create a new one that
65+
// Will update the cache and emit the changes
66+
case ChangeReason.Add or ChangeReason.Update:
67+
CreateTransformSubscription(change.Current, change.Key);
68+
break;
69+
70+
// Shutdown the existing subscription and remove from the cache
71+
case ChangeReason.Remove:
72+
_transformSubscriptions.Remove(change.Key);
73+
_cache.Remove(change.Key);
74+
break;
75+
76+
case ChangeReason.Refresh:
77+
if (_transformOnRefresh)
78+
{
79+
CreateTransformSubscription(change.Current, change.Key);
80+
}
81+
else
82+
{
83+
// Let the downstream decide what this means
84+
_cache.Refresh(change.Key);
85+
}
86+
break;
87+
}
88+
}
89+
90+
// Emit any pending changes
91+
EmitChanges();
92+
}
93+
94+
private void IncrementUpdates() => Interlocked.Increment(ref _updateCounter);
95+
96+
private void EmitChanges()
97+
{
98+
if (Interlocked.Decrement(ref _updateCounter) == 0)
99+
{
100+
var changes = _cache.CaptureChanges();
28101
if (changes.Count > 0)
29102
{
30-
observer.OnNext(changes);
103+
_observer.OnNext(changes);
31104
}
105+
}
32106

33-
parentUpdate = false;
107+
Debug.Assert(_updateCounter >= 0, "Should never be negative");
108+
}
109+
110+
private void CheckCompleted()
111+
{
112+
if (Interlocked.Decrement(ref _subscriptionCounter) == 0)
113+
{
114+
_observer.OnCompleted();
34115
}
116+
117+
Debug.Assert(_subscriptionCounter >= 0, "Should never be negative");
35118
}
36119

37120
// Create the sub-observable that takes the result of the transformation,
38121
// filters out unchanged values, and then updates the cache
39-
IObservable<TDestination> CreateSubObservable(TSource obj, TKey key) =>
40-
transform(obj, key)
122+
private void CreateTransformSubscription(TSource obj, TKey key)
123+
{
124+
// Add a new subscription. Do first so cleanup of existing subs doesn't trigger OnCompleted.
125+
Interlocked.Increment(ref _subscriptionCounter);
126+
127+
// Create a container for the Disposable and add to the KeyedDisposable
128+
var disposableContainer = _transformSubscriptions.Add(key, new SingleAssignmentDisposable());
129+
130+
// Create the transformation observable for the source item, filter unchanged, and update the cache
131+
// Will Dispose immediately if OnCompleted fires upon subscription because OnCompleted disposes the container
132+
// Remove the TransformSubscription if it completes because its not needed anymore
133+
disposableContainer.Disposable = _transform(obj, key)
41134
.DistinctUntilChanged()
42-
.Synchronize(locker!)
43-
.Do(val => cache!.AddOrUpdate(val, key));
44-
45-
// Flag a parent update is happening once inside the lock
46-
var shared = source
47-
.Synchronize(locker!)
48-
.Do(_ => parentUpdate = true)
49-
.Publish();
50-
51-
// MergeMany automatically handles Add/Update/Remove and OnCompleted/OnError correctly
52-
var subMerged = shared
53-
.MergeMany(CreateSubObservable)
54-
.SubscribeSafe(_ => EmitChanges(fromParent: false), observer.OnError, observer.OnCompleted);
55-
56-
// Subscribe to the shared Observable to handle Remove events. MergeMany will unsubscribe from the sub-observable,
57-
// but the corresponding key value needs to be removed from the Cache so the remove is observed downstream.
58-
var subRemove = shared
59-
.OnItemRemoved((_, key) => cache!.Remove(key), invokeOnUnsubscribe: false)
60-
.SubscribeSafe(_ => EmitChanges(fromParent: true), observer.OnError);
61-
62-
return new CompositeDisposable(shared.Connect(), subMerged, subRemove);
63-
});
135+
.Do(_ => IncrementUpdates())
136+
.Synchronize(_synchronize)
137+
.Finally(CheckCompleted)
138+
.SubscribeSafe(val => TransformOnNext(val, key), _observer.OnError, () => _transformSubscriptions.Remove(key));
139+
}
140+
141+
private void TransformOnNext(TDestination latestValue, TKey key)
142+
{
143+
_cache.AddOrUpdate(latestValue, key);
144+
EmitChanges();
145+
}
146+
}
64147
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) 2011-2023 Roland Pheasant. All rights reserved.
2+
// Roland Pheasant licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for full license information.
4+
5+
namespace DynamicData.Internal;
6+
7+
/// <summary>
8+
/// Manages Disposables by Key:
9+
/// 1) Adding a disposable with the same key will dispose/replace the previous one.
10+
/// 2) Adding when the container is Disposed will Dispose it immediately.
11+
/// </summary>
12+
/// <typeparam name="TKey">Type to use for the Key.</typeparam>
13+
internal sealed class KeyedDisposable<TKey> : IDisposable
14+
where TKey : notnull
15+
{
16+
private readonly Dictionary<TKey, IDisposable> _disposables = [];
17+
private bool _disposedValue;
18+
19+
public int Count => _disposables.Count;
20+
21+
public IEnumerable<TKey> Keys => _disposables.Keys;
22+
23+
public bool ContainsKey(TKey key) => _disposables.ContainsKey(key);
24+
25+
public bool IsDisposed => _disposedValue;
26+
27+
public TDisposable Add<TDisposable>(TKey key, TDisposable disposable)
28+
where TDisposable : IDisposable
29+
{
30+
disposable.ThrowArgumentNullExceptionIfNull(nameof(disposable));
31+
32+
if (!_disposedValue)
33+
{
34+
Remove(key);
35+
_disposables.Add(key, disposable);
36+
}
37+
else
38+
{
39+
disposable.Dispose();
40+
}
41+
42+
return disposable;
43+
}
44+
45+
public void Remove(TKey key)
46+
{
47+
#if NET6_0_OR_GREATER
48+
if (_disposables.Remove(key, out var disposable))
49+
{
50+
disposable.Dispose();
51+
}
52+
#else
53+
if (_disposables.TryGetValue(key, out var disposable))
54+
{
55+
disposable.Dispose();
56+
_disposables.Remove(key);
57+
}
58+
#endif
59+
}
60+
61+
public void Dispose()
62+
{
63+
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
64+
Dispose(disposing: true);
65+
GC.SuppressFinalize(this);
66+
}
67+
68+
private void Dispose(bool disposing)
69+
{
70+
if (!_disposedValue)
71+
{
72+
_disposedValue = true;
73+
if (disposing)
74+
{
75+
_disposables.Values.ForEach(d => d.Dispose());
76+
_disposables.Clear();
77+
}
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)