@@ -76,11 +76,13 @@ protected override bool ReceivePluginInternal(object message)
7676 {
7777 case SelectCurrentPersistenceIds msg :
7878 SelectAllPersistenceIdsAsync ( msg . Offset )
79- . PipeTo ( msg . ReplyTo , success : h => new CurrentPersistenceIds ( h . Ids , h . LastOrdering ) , failure : e => new Status . Failure ( e ) ) ;
79+ . PipeTo ( msg . ReplyTo , success : h => new CurrentPersistenceIds ( h . Ids , h . LastOrdering ) ,
80+ failure : e => new Status . Failure ( e ) ) ;
8081 return true ;
8182 case ReplayTaggedMessages replay :
8283 ReplayTaggedMessagesAsync ( replay )
83- . PipeTo ( replay . ReplyTo , success : h => new RecoverySuccess ( h ) , failure : e => new ReplayMessagesFailure ( e ) ) ;
84+ . PipeTo ( replay . ReplyTo , success : h => new RecoverySuccess ( h ) ,
85+ failure : e => new ReplayMessagesFailure ( e ) ) ;
8486 return true ;
8587 case ReplayAllEvents replay :
8688 ReplayAllEventsAsync ( replay )
@@ -122,11 +124,13 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
122124 else eventToTags . Add ( p , ImmutableHashSet < string > . Empty ) ;
123125
124126 if ( IsTagId ( p . PersistenceId ) )
125- throw new InvalidOperationException ( $ "Persistence Id { p . PersistenceId } must not start with { QueryExecutor . Configuration . TagsColumnName } ") ;
127+ throw new InvalidOperationException (
128+ $ "Persistence Id { p . PersistenceId } must not start with { QueryExecutor . Configuration . TagsColumnName } ") ;
126129 }
127130
128131 var batch = new WriteJournalBatch ( eventToTags ) ;
129- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
132+ using ( var cancellationToken =
133+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
130134 await QueryExecutor . InsertBatchAsync ( connection , cancellationToken . Token , batch ) ;
131135 }
132136 } ) . ToArray ( ) ;
@@ -149,15 +153,20 @@ protected virtual async Task<long> ReplayTaggedMessagesAsync(ReplayTaggedMessage
149153 using ( var connection = CreateDbConnection ( ) )
150154 {
151155 await connection . OpenAsync ( ) ;
152- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
156+ using ( var cancellationToken =
157+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
153158 {
154159 return await QueryExecutor
155- . SelectByTagAsync ( connection , cancellationToken . Token , replay . Tag , replay . FromOffset , replay . ToOffset , replay . Max , replayedTagged => {
156- foreach ( var adapted in AdaptFromJournal ( replayedTagged . Persistent ) )
157- {
158- replay . ReplyTo . Tell ( new ReplayedTaggedMessage ( adapted , replayedTagged . Tag , replayedTagged . Offset ) , ActorRefs . NoSender ) ;
159- }
160- } ) ;
160+ . SelectByTagAsync ( connection , cancellationToken . Token , replay . Tag , replay . FromOffset ,
161+ replay . ToOffset , replay . Max , replayedTagged =>
162+ {
163+ foreach ( var adapted in AdaptFromJournal ( replayedTagged . Persistent ) )
164+ {
165+ replay . ReplyTo . Tell (
166+ new ReplayedTaggedMessage ( adapted , replayedTagged . Tag , replayedTagged . Offset ) ,
167+ ActorRefs . NoSender ) ;
168+ }
169+ } ) ;
161170 }
162171 }
163172 }
@@ -167,34 +176,41 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
167176 using ( var connection = CreateDbConnection ( ) )
168177 {
169178 await connection . OpenAsync ( ) ;
170- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
179+ using ( var cancellationToken =
180+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
171181 {
172182 return await QueryExecutor
173183 . SelectAllEventsAsync (
174184 connection ,
175- cancellationToken . Token ,
176- replay . FromOffset ,
185+ cancellationToken . Token ,
186+ replay . FromOffset ,
177187 replay . ToOffset ,
178- replay . Max ,
179- replayedEvent => {
188+ replay . Max ,
189+ replayedEvent =>
190+ {
180191 foreach ( var adapted in AdaptFromJournal ( replayedEvent . Persistent ) )
181192 {
182- replay . ReplyTo . Tell ( new ReplayedEvent ( adapted , replayedEvent . Offset ) , ActorRefs . NoSender ) ;
193+ replay . ReplyTo . Tell ( new ReplayedEvent ( adapted , replayedEvent . Offset ) ,
194+ ActorRefs . NoSender ) ;
183195 }
184196 } ) ;
185197 }
186198 }
187199 }
188200
189- protected virtual async Task < ( IEnumerable < string > Ids , long LastOrdering ) > SelectAllPersistenceIdsAsync ( long offset )
201+ protected virtual async Task < ( IEnumerable < string > Ids , long LastOrdering ) > SelectAllPersistenceIdsAsync (
202+ long offset )
190203 {
191204 using ( var connection = CreateDbConnection ( ) )
192205 {
193206 await connection . OpenAsync ( ) ;
194- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
207+ using ( var cancellationToken =
208+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
195209 {
196- var lastOrdering = await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token ) ;
197- var ids = await QueryExecutor . SelectAllPersistenceIdsAsync ( connection , cancellationToken . Token , offset ) ;
210+ var lastOrdering =
211+ await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token ) ;
212+ var ids = await QueryExecutor . SelectAllPersistenceIdsAsync ( connection , cancellationToken . Token ,
213+ offset ) ;
198214 return ( ids , lastOrdering ) ;
199215 }
200216 }
@@ -210,15 +226,18 @@ protected virtual async Task<long> ReplayAllEventsAsync(ReplayAllEvents replay)
210226 /// <param name="max">TBD</param>
211227 /// <param name="recoveryCallback">TBD</param>
212228 /// <returns>TBD</returns>
213- public override async Task ReplayMessagesAsync ( IActorContext context , string persistenceId , long fromSequenceNr , long toSequenceNr , long max ,
229+ public override async Task ReplayMessagesAsync ( IActorContext context , string persistenceId , long fromSequenceNr ,
230+ long toSequenceNr , long max ,
214231 Action < IPersistentRepresentation > recoveryCallback )
215232 {
216233 using ( var connection = CreateDbConnection ( ) )
217234 {
218235 await connection . OpenAsync ( ) ;
219- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
236+ using ( var cancellationToken =
237+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
220238 {
221- await QueryExecutor . SelectByPersistenceIdAsync ( connection , cancellationToken . Token , persistenceId , fromSequenceNr , toSequenceNr , max , recoveryCallback ) ;
239+ await QueryExecutor . SelectByPersistenceIdAsync ( connection , cancellationToken . Token , persistenceId ,
240+ fromSequenceNr , toSequenceNr , max , recoveryCallback ) ;
222241 }
223242 }
224243 }
@@ -257,7 +276,7 @@ protected bool WaitingForInitialization(object message)
257276 return true ;
258277 case Status . Failure fail :
259278 Log . Error ( fail . Cause , "Failure during {0} initialization." , Self ) ;
260-
279+
261280 // trigger a restart so we have some hope of succeeding in the future even if initialization failed
262281 throw new ApplicationException ( "Failed to initialize SQL Journal." , fail . Cause ) ;
263282 default :
@@ -268,15 +287,16 @@ protected bool WaitingForInitialization(object message)
268287
269288 private async Task < object > Initialize ( )
270289 {
271- if ( ! Settings . AutoInitialize )
290+ if ( ! Settings . AutoInitialize )
272291 return new Status . Success ( NotUsed . Instance ) ;
273292
274293 try
275294 {
276295 using ( var connection = CreateDbConnection ( ) )
277296 {
278297 await connection . OpenAsync ( ) ;
279- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
298+ using ( var cancellationToken =
299+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
280300 {
281301 await QueryExecutor . CreateTablesAsync ( connection , cancellationToken . Token ) ;
282302 }
@@ -286,6 +306,7 @@ private async Task<object> Initialize()
286306 {
287307 return new Status . Failure ( e ) ;
288308 }
309+
289310 return new Status . Success ( NotUsed . Instance ) ;
290311 }
291312
@@ -328,9 +349,11 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t
328349 using ( var connection = CreateDbConnection ( ) )
329350 {
330351 await connection . OpenAsync ( ) ;
331- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
352+ using ( var cancellationToken =
353+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
332354 {
333- await QueryExecutor . DeleteBatchAsync ( connection , cancellationToken . Token , persistenceId , toSequenceNr ) ;
355+ await QueryExecutor . DeleteBatchAsync ( connection , cancellationToken . Token , persistenceId ,
356+ toSequenceNr ) ;
334357 }
335358 }
336359 }
@@ -346,9 +369,11 @@ public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId
346369 using ( var connection = CreateDbConnection ( ) )
347370 {
348371 await connection . OpenAsync ( ) ;
349- using ( var cancellationToken = CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
372+ using ( var cancellationToken =
373+ CancellationTokenSource . CreateLinkedTokenSource ( _pendingRequestsCancellation . Token ) )
350374 {
351- return await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token , persistenceId ) ;
375+ return await QueryExecutor . SelectHighestSequenceNrAsync ( connection , cancellationToken . Token ,
376+ persistenceId ) ;
352377 }
353378 }
354379 }
@@ -361,15 +386,18 @@ protected virtual string GetConnectionString()
361386 {
362387 var connectionString = Settings . ConnectionString ;
363388
389+ #if NETSTANDARD
364390 if ( string . IsNullOrEmpty ( connectionString ) )
365391 {
366- connectionString = System . Configuration . ConfigurationManager . ConnectionStrings [ Settings . ConnectionStringName ] . ConnectionString ;
392+ connectionString =
393+ System . Configuration . ConfigurationManager . ConnectionStrings [ Settings . ConnectionStringName ] . ConnectionString ;
367394 }
395+ #endif
368396
369397 return connectionString ;
370398 }
371399
372400 protected ITimestampProvider GetTimestampProvider ( string typeName ) =>
373401 TimestampProviderProvider . GetTimestampProvider ( typeName , Context ) ;
374402 }
375- }
403+ }
0 commit comments