v3.0.0 preview | Streaming indicators and quotes #1018
Replies: 14 comments 31 replies
This comment has been hidden.
This comment has been hidden.
-
|
What you think about using Rx.net for streaming data and processing indicators? |
Beta Was this translation helpful? Give feedback.
-
|
Hi, I'm right now trying out v3.0.0 preview and wanted to ask whether there is or will be an option to limit the size of the data collections held by both, observable and observer, as I didn't find any yet. Thanks! |
Beta Was this translation helpful? Give feedback.
-
|
Just wanted to share a note, for "determine best approach for handling out of sequence quotes", or the "auto-aggregator", you may need a parameter to determine what to do with missing candles. I found that in premarket or sometimes just after open, some candles will be missing because of extremely limited volume. I believe the correct way to do it is to use the last available value as that candle's value. For instance, if you have candles for 10:01, 10:02, 10:04, 10:05, the missing 10:03 could potentially mess up any calculations. So 10:03 can just have the same value as 10:02. Or, the avg between 10:02 and 10:04 is another approach ive seen. great library! and great work! |
Beta Was this translation helpful? Give feedback.
-
|
Let me share some modeling with you, I did for series of data I had before This design should help you write indicators for real time series very easy and quickly, and should handle series size, and improve performance by reducing arrays allocation Expand for detail and code samplesCache methods results, this interface for caching methods using their names and an object that represent the method parameterspublic interface IMethodCacher
{
bool TryGetFromCache<TRet>(string key, object parameters, out TRet result);
void SetCache<TRet>(string key, object parameters, TRet value);
TRet ExecuteAndCache<TRet>(object obj, Func<TRet> func, [CallerMemberName] string methodName = "");
Task<TRet> ExecuteAndCacheAsync<TRet>(object parameters, Func<Task<TRet>> func, [CallerMemberName] string methodName = "");
}Implement it, I chose concurrent dictionary hereTwo dictionaries one for the method name, the other inside it is for that method parameters public abstract class MethodCacher : IMethodCacher
{
readonly ConcurrentDictionary<string, ConcurrentDictionary<object, object>> _c =
new ConcurrentDictionary<string, ConcurrentDictionary<object, object>>();
public bool TryGetFromCache<TRet>(string key, object parameters, out TRet result)
{
result = default;
if (!_c.TryGetValue(key, out var dic))
return false;
if (!dic.TryGetValue(parameters, out var obj))
return false;
result = (TRet)obj;
return true;
}
public void SetCache<TRet>(string key, object parameters, TRet value)
{
if (!_c.TryGetValue(key, out var dic))
dic = _c.AddOrUpdate(key, new ConcurrentDictionary<object, object>(), (k, v) => v);
dic.AddOrUpdate(parameters, value, (k, v) => v);
}
public async Task<TRet> ExecuteAndCacheAsync<TRet>(object parameters, Func<Task<TRet>> func,
[CallerMemberName] string caller = "")
{
if (!_c.TryGetValue(caller, out var dic))
{
dic = _c.AddOrUpdate(caller, new ConcurrentDictionary<object, object>(), (k, v) => v);
}
if (!dic.TryGetValue(parameters, out var result))
{
result = await func();
dic.AddOrUpdate(parameters, result, (k, v) => v);
}
return (TRet)result;
}
public TRet ExecuteAndCache<TRet>(object obj, Func<TRet> func, [CallerMemberName] string caller = "")
{
if(!_c.TryGetValue(caller, out var dic))
{
dic = _c.AddOrUpdate(caller, new ConcurrentDictionary<object, object>(), (k, v) => v);
}
if(!dic.TryGetValue(obj, out var result))
{
result = func();
dic.AddOrUpdate(obj, result, (k, v) => v);
}
return (TRet)result;
}
public bool TryGetFromCache<TRet>(string key, out TRet result) =>
TryGetFromCache(key, new MethodCacheParams(), out result);
public void SetCache<TRet>(string key, TRet value) =>
SetCache(key, new MethodCacheParams());
}Now extend it, so it looks like method callDefine IEquatable to compare parameters and work as dictionary key for the method parameters, go for up to 6 parametersrecord struct MethodCacheParams();
record struct MethodCacheParams<T>(T a) where T : struct, INumber<T>;
record struct MethodCacheParams<T1, T2>(T1 a1, T2 a2)
where T1 : struct, INumber<T1>
where T2 : struct, INumber<T2>;
record struct MethodCacheParams<T1, T2, T3>(T1 a1, T2 a2, T3 a3)
where T1 : struct, INumber<T1>
where T2 : struct, INumber<T2>
where T3 : struct, INumber<T3>;
// and so on until T6Write the extensionspublic static class MethodCacherExtensions
{
public static TRet ExecuteAndCache<TRet>(this IMethodCacher mc, Func<TRet> func, [CallerMemberName] string caller = "")
=> mc.ExecuteAndCache(new MethodCacheParams(), func, caller);
public static TRet ExecuteAndCache<TRet, T1>(this IMethodCacher mc, T1 a1, Func<TRet> func, [CallerMemberName] string caller = "") where T1 : struct, INumber<T1>
=> mc.ExecuteAndCache(new MethodCacheParams<T1>(a1), func, caller);
public static TRet ExecuteAndCache<TRet, T1, T2>(this IMethodCacher mc, T1 a1, T2 a2, Func<TRet> func, [CallerMemberName] string caller = "")
where T1 : struct, INumber<T1>
where T2 : struct, INumber<T2>
=> mc.ExecuteAndCache(new MethodCacheParams<T1, T2>(a1, a2), func, caller);
// and so on until T6
. . . Usage example, this is how the results holder will look likestatic void Main(string[] args)
{
var obj = new MyObj();
obj.X = "Hello world!";
var x = obj.GetX(1, 2, 3);
Console.WriteLine(x);
obj.X = "Good bye";
x = obj.GetX(2, 3, 4);
Console.WriteLine(x);
x = obj.GetX(1, 2, 3);
Console.WriteLine(x);
Console.ReadLine();
}
public class MyObj : MethodCacher
{
public string X { get; set; }
public MyResult GetX(int a, int b, int c)
{
var func = () => new MyResult(X);
return this.ExecuteAndCache(a, b, c, func);
}
}Implement series navigationDefine seriesThis model will have
public interface IReadOnlySeries<T> : IReadOnlyList<T>
{
IEnumerable<T> EnumeratePrevious(T item, bool includeItem);
IEnumerable<T> EnumerateNext(T item, bool includeItem);
T Prev(T item);
T Next(T item);
int IndexOf(T item);
}
public interface ISeries<T> : IReadOnlySeries<T>
{
void Add(T item);
}public class Series<T> : ISeries<T>
{
readonly int _maxItems;
readonly List<T> _items;
readonly bool _isCapped;
readonly object _lock;
public Series(int max = -1)
{
_maxItems = max;
_isCapped = max > 0;
if(_isCapped)
{
_items = new List<T>(max);
_lock = new object();
}
else
{
_items = new List<T>();
}
}
public T this[int index] => _items[index];
public int Count => _items.Count;
public void Add(T item)
{
if(_lock != null)
Monitor.Enter(_lock);
try
{
if (_isCapped && _items.Count == _maxItems)
{
var first = _items[0];
_items.RemoveAt(0);
OnItemOutdated(first);
}
_items.Add(item);
OnItemAdded(item);
}
finally
{
if(_lock != null)
Monitor.Exit(_lock);
}
}
public int IndexOf(T item) => _items.IndexOf(item);
public IEnumerator<T> GetEnumerator() => _items.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => _items.GetEnumerator();
public IEnumerable<T> EnumeratePrevious(T item, bool includeItem)
{
var index = _items.IndexOf(item);
index += includeItem ? 1 : 0;
return _items.Take(index);
}
public IEnumerable<T> EnumerateNext(T item, bool includeItem)
{
var index = _items.IndexOf(item);
index += includeItem ? 0 : 1;
return _items.Skip(index);
}
public T Prev(T item)
{
var index = _items.IndexOf(item);
if (index <= 0)
return default;
return _items[index - 1];
}
public T Next(T item)
{
var index = _items.IndexOf(item);
if (index < 0 || index >= _items.Count - 1)
return default;
return _items[index + 1];
}
protected virtual void OnItemAdded(T item) { }
protected virtual void OnItemOutdated(T item) { }
}Define the candle modelThis model will have
public interface ICandle
{
public int Index { get; } // => _series.IndexOf(this);
public ICandle Prev(); // => _series.Prev(this);
public ICandle Next(); // => _series.Next(this);
public IEnumerable<ICandle> EnumeratePrev(bool includeSelf = true); // => _series.EnumeratePrevious(this, includeSelf);
public IEnumerable<ICandle> EnumerateNext(bool includeSelf = true); // => _series.EnumerateNext(this, includeSelf);
public void SetSeries(ISeriece<ICandle> series);Create the candles series, this will hold candles, set a cap so old ones will be removed, and allow them to navigate by Next() and Prev() methodspublic interface ICandleSeries : ISeries<ICandle>
{
}
public class CandleSeries : Series<ICandle>, ICandleSeries
{
public CandleSeries(int max) : base(max)
{
}
protected override void OnItemAdded(ICandle c)
{
c.SetSeries(this); // so in the Indicators we can do c.Prev() and c.Next()
. . .Implement the indicators using cachable methods that return indicators results, and implement the navigation in the candle as well. All in the Candle model.public class Candle : MethodCacher, ICandle
{
ICandleSeries _series;
//cached method
public EmaResult GetEma(int period)
{
var func = () => EmaCalculation(period);
//use method cacher
return ExecuteAndCache(period, func);
}
private EmaResult EmaCalculation(int perdiod)
{
//get previous item
var prev = this.Prev();
if(prev == null)
. . .
var prevEma = prev.GetEma(period) // This will come from memory, not calculated
//Calculate
}
//series methods
public void SetSeries(ICandleSeries series) => _series = series;
public ICandle Prev() => _series.Prev(this);
public ICandle Next() => _series.Next(this);
public IEnumerable<ICandle> EnumeratePrev(bool includeSelf = true) => _series.EnumeratePrevious(this, includeSelf);
}Or write an Indicatorpublic static class SomeIndicator
{
public static SomeResult Calculate(ICandle c, int p1, double p2, ...)
{
//prev candles
var prevCandles = c.EnumeratePrev().TakeLast(p1) ...
//prev
var prev = c.Prev();
//prev of prev
var prev_prev = c.Prev().Prev();
//cached result
var r = prev.GetResult(p1, p2, ...) //cached according to method name and parameters.
// and so on.
}
}Hope it helps, I might have copied inconsistent pieces, let me know if it is not clear. |
Beta Was this translation helpful? Give feedback.
-
|
Hi there! This might be a bit late, but I really liked your library and wanted to share an idea. A small refactoring of the indicators using Here’s an example of how SMA and EMA could look when implemented with public static IEnumerable<SmaResult> CalcSma(this IEnumerable<double> data, int period)
{
if (period <= 0)
throw new ArgumentException("Period must be greater than zero.", nameof(period));
double sum = 0;
Queue<double> window = new Queue<double>(period);
foreach (var item in data)
{
if (window.Count == period)
sum -= window.Dequeue();
window.Enqueue(item);
sum += item;
double sma = sum / window.Count;
yield return new SmaResult(sma);
}
}
public record SmaResult(double Sma);
public static IEnumerable<EmaResult> CalcEma(this IEnumerable<double> data, int period)
{
if (period <= 0)
throw new ArgumentException("Period must be greater than zero.", nameof(period));
double multiplier = 2.0 / (period + 1);
var sma = new Incrementor<double, SmaResult>(s => CalcSma(s, period));
double ema = 0;
int i = 0;
foreach (var item in data)
{
if (i++ < period)
{
ema = sma.CalcNext(item).Sma;
}
else
{
ema += (item - ema) * multiplier;
}
yield return new EmaResult(ema);
}
}
public record EmaResult(double Ema);Example of batch processing: List<double> BatchExample(List<double> data, int period)
{
var result = data.CalcSma(period)
.Select(x => x.Sma)
.ToList();
return result;
}Example of incremental processing: void IncrementalExample(int period)
{
var ema = new Incrementor<double, EmaResult>(s => s.CalcEma(period));
while(true)
{
double x = GetNextValue();
double currentEma = ema.CalcNext(x).Ema;
// Do something with currentEma
}
}The transformation of a method that accepts an Such a class could be implemented like this, for example:internal class Incrementor<TSource, TValue> where TSource : notnull
{
private readonly TASingleItemDataSource<TSource> _source = new TASingleItemDataSource<TSource>();
private readonly IEnumerator<TValue> _result;
public TValue Current => _result.Current;
public Incrementor(Func<IEnumerable<TSource>, IEnumerable<TValue>> calc)
{
_result = calc(_source).GetEnumerator();
}
public TValue CalcNext(TSource value)
{
_source.SetNext(value);
_result.MoveNext();
return _result.Current;
}
}
internal class TASingleItemDataSource<TValue> : IEnumerable<TValue> where TValue : notnull
{
private readonly Enumerator _enumerator = new();
public void SetNext(TValue item)
{
_enumerator.SetNext(item);
}
public IEnumerator<TValue> GetEnumerator()
{
return _enumerator;
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
private class Enumerator : IEnumerator<TValue>
{
private bool _hasNext;
private TValue? _next;
private bool _hasCurrent;
private TValue? _current;
public TValue Current => _hasCurrent ? _current! : throw new InvalidOperationException("No value is set");
object IEnumerator.Current => Current;
public void SetNext(TValue item)
{
if (_hasNext)
throw new InvalidOperationException("Next item is already set.");
_next = item;
_hasNext = true;
}
public bool MoveNext()
{
if (_hasNext)
{
_current = _next;
_hasCurrent = true;
_next = default;
_hasNext = false;
return true;
}
return false;
}
public void Dispose()
{
}
public void Reset()
{
throw new NotImplementedException();
}
}
}I believe that the performance of both batch and incremental processing would remain acceptable. At the same time, the code would stay readable and easy to maintain. |
Beta Was this translation helpful? Give feedback.
This comment has been hidden.
This comment has been hidden.
-
|
Hello @DaveSkender , I've been testing the streaming features lately and I've been struggling with a particular scenario where a quote being added to a QuoteHub is still live and will be updated as new ticks arrive (example if you are aggregating data for the current day into a 1-day quote). I couldn't find any straightforward scenario to access the quote and override it with the new values except removing it and inserting a new updated instance of it in the QuoteHub instance. but this causes any indicator subscribed to the QuoteHub to generate an index out of range exception immediately. are you familiar with the issue or am I doing things wrong ? |
Beta Was this translation helpful? Give feedback.
-
|
@DaveSkender, since you provided backward-compatibility like a pro, my code wasn't broken when I came back around to this, and I didn't go investigating to discover the streaming features until a couple of days ago. Gives me a pine-vibe, but is a major leap. Thank you! I have not pondered every edge-case that could complicate this, but I bet you'll appreciate the risk/reward for this: Currently, RollbackState() must duplicate the indicator logic to reset the state. Memory is cheaper than compute, so let's cache the state, instead: This makes RollbackState(rollbackDate) an O(1) op, for every indicator! That, allows incremental updates to the current/last quote very quickly: ....ish. Rsi, for example: I volunteer. |
Beta Was this translation helpful? Give feedback.
-
|
Hi @DaveSkender, can you give me a rough estimate of when you'll release the next preview of version V3? I'd like to test it. |
Beta Was this translation helpful? Give feedback.
-
|
@DaveSkender, using the static series, I noticed there were just a few indicators whose results implement ISeries, instead of IReusable, so they lacked the all-powerful Value property. Working on what I consider a powerful upgrade to StreamHub, those same indicators are throwing a fit. Is there a reason for some results implementing ISeries instead of IReusable? Some are less obvious than SMA, but each result has a definite winner for where to point Value. If this was an oversight, fix incoming... 😁 |
Beta Was this translation helpful? Give feedback.
-
|
Hi @DaveSkender, thanks first for the new preview release, i will test it soon. :) One thing is not clear to me if the following scenario is supported. I plan to (re)calculate my indicators every X seconds, my quotes/candles are per minute. I would like to update always the latest quote/candle until it is closed. Is that scenario already supported? When yes, is this supported by buffer lists or just by quote hubs? Or do i need to use only closed quotes/candles? Thanks in the meantime. |
Beta Was this translation helpful? Give feedback.
-
|
With the static types, my flow was: With the streamhubs, I'm at a fork and need to decide:
|
Beta Was this translation helpful? Give feedback.
-
|
@DaveSkender Neither library contains synchronization primitives, allowing/forcing me to choose my own. In the end, I realized that synchronizing the upstream-est quoteprovider synchronizes everything downstream. If QuoteHub is the primary upstream source (for not-just-me), you might consider adding a boolean to QuoteHub ctor enabling your favorite flavor of internal sync. Aggregators would want this, too. This would make all downstream chains optionally thread-safe, out-of-the-box. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
We’ll be releasing a series of progressive pre-release preview versions to evaluate and stabilize features for streaming indicators from live WebSockets and other incremental quote sources. As we evolve our approach, your feedback is appreciated. Runnable examples are available.
Beta Was this translation helpful? Give feedback.
All reactions