Skip to content

Commit b46eb59

Browse files
committed
Update Propulsion.EventStore to 4.0.0
1 parent 9d762f7 commit b46eb59

File tree

6 files changed

+10
-30
lines changed

6 files changed

+10
-30
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ clients to current ones by means of adjusting package references while retaining
107107

108108
(Reading and position metrics are exposed via `Propulsion.CosmosStore.Prometheus`)
109109

110-
- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore` v `3.0.7`, `Serilog`
110+
- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore` v `4.0.0`, `Serilog`
111111

112112
- **Deprecated as reading (and writing) relies on the legacy EventStoreDB TCP interface**
113113
- Contains ultra-high throughput striped reader implementation
114+
- Presently Used by [`proSync` template](https://github.com/jet/dotnet-templates/tree/master/propulsion-sync)
114115

115116
(Reading and position metrics are emitted to Console / Serilog; no Prometheus support)
116117

src/Propulsion.EventStore/Checkpoint.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,25 +93,25 @@ type Service internal (resolve : CheckpointSeriesId -> Equinox.Decider<Events.Ev
9393
/// Determines the present state of the CheckpointSequence
9494
member _.Read(series) =
9595
let stream = resolve series
96-
stream.Query id
96+
stream.Query(id, load = Equinox.AllowStale)
9797

9898
/// Start a checkpointing series with the supplied parameters
9999
/// NB will fail if already existing; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted
100100
member _.Start(series, freq : TimeSpan, pos : int64) =
101101
let stream = resolve series
102-
stream.Transact(interpret (Command.Start(DateTimeOffset.UtcNow, freq, pos)))
102+
stream.Transact(interpret (Command.Start(DateTimeOffset.UtcNow, freq, pos)), load = Equinox.AllowStale)
103103

104104
/// Override a checkpointing series with the supplied parameters
105105
/// NB fails if not already initialized; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted
106106
member _.Override(series, freq : TimeSpan, pos : int64) =
107107
let stream = resolve series
108-
stream.Transact(interpret (Command.Override(DateTimeOffset.UtcNow, freq, pos)))
108+
stream.Transact(interpret (Command.Override(DateTimeOffset.UtcNow, freq, pos)), load = Equinox.AllowStale)
109109

110110
/// Ingest a position update
111111
/// NB fails if not already initialized; caller should ensure correct initialization has taken place via Read -> Start
112112
member _.Commit(series, pos : int64) =
113113
let stream = resolve series
114-
stream.Transact(interpret (Command.Update(DateTimeOffset.UtcNow, pos)))
114+
stream.Transact(interpret (Command.Update(DateTimeOffset.UtcNow, pos)), load = Equinox.AllowStale)
115115

116116
let create resolve = Service(streamName >> resolve)
117117

src/Propulsion.EventStore/EventStoreSink.fs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,6 @@ open System.Collections.Generic
1616
open System
1717
open System.Threading
1818

19-
module private StreamSpan =
20-
21-
#if EVENTSTORE_LEGACY
22-
let private nativeToDefault_ x = FsCodec.Core.TimelineEvent.Map<ReadOnlyMemory<byte>> (fun (xs : byte array) -> ReadOnlyMemory xs) x
23-
let inline nativeToDefault span = Array.map nativeToDefault_ span
24-
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map<byte array> (fun (xs : ReadOnlyMemory<byte>) -> xs.ToArray())
25-
let inline defaultToNative span = Array.map defaultToNative_ span
26-
#else
27-
let nativeToDefault = id
28-
let defaultToNative_ = id
29-
let defaultToNative = id
30-
#endif
31-
3219
module Internal =
3320

3421
[<AutoOpen>]
@@ -55,21 +42,13 @@ module Internal =
5542

5643
let write (log : ILogger) (context : EventStoreContext) stream (span : Default.StreamSpan) = async {
5744
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length)
58-
let! res = context.Sync(log, stream, span[0].Index - 1L, (span |> Array.map (fun span -> StreamSpan.defaultToNative_ span :> _)))
45+
let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _))
5946
let ress =
6047
match res with
6148
| GatewaySyncResult.Written (Token.Unpack pos') ->
62-
#if EVENTSTORE_LEGACY
63-
Ok (pos'.pos.streamVersion + 1L)
64-
#else
6549
Ok (pos'.streamVersion + 1L)
66-
#endif
6750
| GatewaySyncResult.ConflictUnknown (Token.Unpack pos) ->
68-
#if EVENTSTORE_LEGACY
69-
match pos.pos.streamVersion + 1L with
70-
#else
7151
match pos.streamVersion + 1L with
72-
#endif
7352
| actual when actual < span[0].Index -> PrefixMissing (span, actual)
7453
| actual when actual >= span[0].Index + span.LongLength -> Duplicate actual
7554
| actual -> PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int))

src/Propulsion.EventStore/EventStoreSource.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ module Mapping =
3636
member x.Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(x.CreatedEpoch)
3737

3838
let (|PropulsionTimelineEvent|) (x : RecordedEvent) : FsCodec.ITimelineEvent<_> =
39-
let inline len0ToNull (x : _[]) = match x with null -> null | x when x.Length = 0 -> null | x -> x
39+
let inline len0ToNull (x : _[]) = match x with null -> ReadOnlyMemory.Empty | x when x.Length = 0 -> ReadOnlyMemory.Empty | x -> ReadOnlyMemory x
4040
FsCodec.Core.TimelineEvent.Create(x.EventNumber, x.EventType, len0ToNull x.Data, len0ToNull x.Metadata, timestamp = x.Timestamp) :> _
4141

4242
let (|PropulsionStreamEvent|) (x : RecordedEvent) : StreamEvent<_> =

src/Propulsion.EventStore/Propulsion.EventStore.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<ItemGroup>
2020
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />
2121

22-
<PackageReference Include="Equinox.EventStore" Version="[3.0.7, 3.99.0]" />
22+
<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.1" />
2323
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
2424
</ItemGroup>
2525

src/Propulsion.Kafka/Propulsion.Kafka.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<ItemGroup>
1818
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />
1919

20-
<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.2.2" />
20+
<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.7.1" />
2121
<PackageReference Include="FsKafka" Version="[1.7.0, 1.9.99)" />
2222
</ItemGroup>
2323

0 commit comments

Comments
 (0)