Skip to content

Commit 3feac12

Browse files
committed
Stopwatch polishing
1 parent b46eb59 commit 3feac12

File tree

7 files changed

+45
-43
lines changed

7 files changed

+45
-43
lines changed

src/Propulsion.CosmosStore/CosmosStoreSource.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ type CosmosStoreSource =
8888
token = ctx.epoch; latency = readElapsed; rc = ctx.requestCharge; age = age; docs = docs.Count
8989
ingestLatency = pt.Elapsed; ingestQueued = cur }
9090
(log |> Log.withMetric m).Information("Reader {partitionId} {token,9} age {age:dd\.hh\:mm\:ss} {count,4} docs {requestCharge,6:f1}RU {l,5:f1}s Wait {pausedS:f3}s Ahead {cur}/{max}",
91-
ctx.rangeId, ctx.epoch, age, docs.Count, ctx.requestCharge, readElapsed.TotalSeconds, pt.ElapsedMilliseconds, cur, max)
91+
ctx.rangeId, ctx.epoch, age, docs.Count, ctx.requestCharge, readElapsed.TotalSeconds, pt.ElapsedSeconds, cur, max)
9292
sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
9393
}
9494

src/Propulsion.DynamoStore/DynamoStoreIndex.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ module Reader =
9696
// Returns flattened list of all spans, and flag indicating whether tail reached
9797
let private loadIndexEpoch (log : Serilog.ILogger) (epochs : AppendsEpoch.Reader.Service) trancheId epochId
9898
: Async<AppendsEpoch.Events.StreamSpan array * bool * int64> = async {
99-
let sw = Stopwatch.start ()
99+
let ts = Stopwatch.timestamp ()
100100
let! maybeStreamBytes, _version, state = epochs.Read(trancheId, epochId, 0)
101-
let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, sw.ElapsedSeconds
101+
let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, Stopwatch.elapsedSeconds ts
102102
let spans = state.changes |> Array.collect (fun struct (_i, spans) -> spans)
103103
let totalEvents = spans |> Array.sumBy (fun x -> x.c.Length)
104104
let totalStreams = spans |> AppendsEpoch.flatten |> Seq.length

src/Propulsion.EventStore/EventStoreReader.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn
150150
let rec aux () = async {
151151
let! currentSlice = conn.ReadAllEventsForwardAsync(range.Current, batchSize, resolveLinkTos=false) |> Async.AwaitTaskCorrect
152152
sw.Stop() // Stop the clock after the read call completes; transition to measuring time to traverse / filter / submit
153-
let postSw = Stopwatch.start ()
153+
let postTs = Stopwatch.timestamp ()
154154
let batchEvents, batchBytes = slicesStats.Ingest currentSlice in overallStats.Ingest(int64 batchEvents, batchBytes)
155155
let events = currentSlice.Events |> Seq.choose tryMapEvent |> Array.ofSeq
156156
streams.Clear(); cats.Clear()
@@ -160,7 +160,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn
160160
let! cur, max = postBatch currentSlice.NextPosition events
161161
Log.Information("Read {pos,10} {pct:p1} {ft:n3}s {mb:n1}MB {count,4} {categories,4}c {streams,4}s {events,4}e Post {pt:n3}s {cur}/{max}",
162162
range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), Log.miB batchBytes,
163-
batchEvents, cats.Count, streams.Count, events.Length, (let e = postSw.Elapsed in e.TotalSeconds), cur, max)
163+
batchEvents, cats.Count, streams.Count, events.Length, Stopwatch.elapsedSeconds postTs, cur, max)
164164
if not (range.TryNext currentSlice.NextPosition && not once && not currentSlice.IsEndOfStream) then
165165
if currentSlice.IsEndOfStream then return Eof
166166
else return EndOfTranche

src/Propulsion.Feed/FeedSource.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,10 @@ type FeedSource
278278
fun (wasLast, pos) -> asyncSeq {
279279
if wasLast then
280280
do! Async.Sleep(TimeSpan.toMs tailSleepInterval)
281-
let sw = Stopwatch.start ()
281+
let readTs = Stopwatch.timestamp ()
282282
let! page = readPage (trancheId, pos)
283283
let items' = page.items |> Array.map (fun x -> struct (streamName, x))
284-
yield struct (sw.Elapsed, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>))
284+
yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>))
285285
}
286286

287287
/// Drives the continual loop of reading and checkpointing each tranche until a fault occurs. <br/>

src/Propulsion/Internal.fs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ module TimeSpan =
99
module Stopwatch =
1010

1111
let inline start () = System.Diagnostics.Stopwatch.StartNew()
12+
let inline timestamp () = System.Diagnostics.Stopwatch.GetTimestamp()
13+
let inline ticksToSeconds ticks = double ticks / double System.Diagnostics.Stopwatch.Frequency
14+
let inline ticksToTimeSpan ticks = ticksToSeconds ticks |> TimeSpan.FromSeconds
15+
16+
let inline elapsedTicks (ts : int64) = timestamp () - ts
17+
let inline elapsedSeconds (ts : int64) = elapsedTicks ts |> ticksToSeconds
18+
let inline elapsed (ts : int64) = elapsedTicks ts |> ticksToTimeSpan // equivalent to .NET 7 System.Diagnostics.Stopwatch.GetElapsedTime()
1219

1320
type System.Diagnostics.Stopwatch with
1421

src/Propulsion/Parallel.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ module Scheduling =
6262
let x = { elapsedMs = 0L; remaining = batch.messages.Length; faults = ConcurrentStack(); batch = batch }
6363
x, seq {
6464
for item in batch.messages -> async {
65-
let sw = Stopwatch.start ()
65+
let ts = Stopwatch.timestamp ()
6666
try match! handle item with
67-
| Choice1Of2 () -> x.RecordOk sw.Elapsed
68-
| Choice2Of2 exn -> x.RecordExn(sw.Elapsed, exn)
67+
| Choice1Of2 () -> x.RecordOk(Stopwatch.elapsed ts)
68+
| Choice2Of2 exn -> x.RecordExn(Stopwatch.elapsed ts, exn)
6969
// This exception guard _should_ technically not be necessary per the interface contract, but cannot risk an orphaned batch
70-
with exn -> x.RecordExn(sw.Elapsed, exn) } }
70+
with exn -> x.RecordExn(Stopwatch.elapsed ts, exn) } }
7171

7272
/// Infers whether a WipBatch is in a terminal state
7373
let (|Busy|Completed|Faulted|) = function

src/Propulsion/Streams.fs

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -277,14 +277,14 @@ module Dispatch =
277277
// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
278278
let tryFillDispatcher (potential : seq<Item<'F>>) markStarted project markBusy =
279279
let xs = potential.GetEnumerator()
280-
let startTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
280+
let startTs = Stopwatch.timestamp ()
281281
let mutable hasCapacity, dispatched = true, false
282282
while xs.MoveNext() && hasCapacity do
283283
let item = xs.Current
284-
let succeeded = inner.TryAdd(project struct (startTimestamp, item))
284+
let succeeded = inner.TryAdd(project struct (startTs, item))
285285
if succeeded then
286286
markBusy item.stream
287-
markStarted (item.stream, startTimestamp)
287+
markStarted (item.stream, startTs)
288288
hasCapacity <- succeeded
289289
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
290290
struct (dispatched, hasCapacity)
@@ -434,11 +434,6 @@ module Scheduling =
434434

435435
type [<Struct; NoEquality; NoComparison>] BufferState = Idle | Active | Full | Slipstreaming
436436

437-
module StopwatchTicks =
438-
439-
let inline elapsed (sw : System.Diagnostics.Stopwatch) = sw.ElapsedTicks
440-
let inline toTimeSpan ticks = TimeSpan.FromSeconds(double ticks / double System.Diagnostics.Stopwatch.Frequency)
441-
442437
module Stats =
443438

444439
/// Manages state used to generate metrics (and summary logs) regarding streams currently being processed by a Handler
@@ -447,8 +442,8 @@ module Scheduling =
447442
type private StreamState = { ts : int64; mutable count : int }
448443
let private walkAges (state : Dictionary<_, _>) =
449444
if state.Count = 0 then Seq.empty else
450-
let currentTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
451-
seq { for x in state.Values -> struct (currentTimestamp - x.ts, x.count) }
445+
let currentTs = Stopwatch.timestamp ()
446+
seq { for x in state.Values -> struct (currentTs - x.ts, x.count) }
452447
let private renderState agesAndCounts =
453448
let mutable oldest, newest, streams, attempts = Int64.MinValue, Int64.MaxValue, 0, 0
454449
for struct (diff, count) in agesAndCounts do
@@ -457,7 +452,7 @@ module Scheduling =
457452
streams <- streams + 1
458453
attempts <- attempts + count
459454
if streams = 0 then oldest <- 0L; newest <- 0L
460-
struct (streams, attempts), struct (StopwatchTicks.toTimeSpan oldest, StopwatchTicks.toTimeSpan newest)
455+
struct (streams, attempts), struct (Stopwatch.ticksToTimeSpan oldest, Stopwatch.ticksToTimeSpan newest)
461456
/// Manages the list of currently dispatched Handlers
462457
/// NOTE we are guaranteed we'll hear about a Start before a Finish (or another Start) per stream by the design of the Dispatcher
463458
type private Active() =
@@ -507,17 +502,17 @@ module Scheduling =
507502
type [<NoComparison; NoEquality>] Timers() =
508503
let mutable results, dispatch, merge, ingest, stats, sleep = 0L, 0L, 0L, 0L, 0L, 0L
509504
let sw = Stopwatch.start()
510-
member _.RecordResults sw = results <- results + StopwatchTicks.elapsed sw
511-
member _.RecordDispatch sw = dispatch <- dispatch + StopwatchTicks.elapsed sw
512-
member _.RecordMerge sw = merge <- merge + StopwatchTicks.elapsed sw
513-
member _.RecordIngest sw = ingest <- ingest + StopwatchTicks.elapsed sw
514-
member _.RecordStats sw = stats <- stats + StopwatchTicks.elapsed sw
515-
member _.RecordSleep sw = sleep <- sleep + StopwatchTicks.elapsed sw
505+
member _.RecordResults ts = results <- results + Stopwatch.elapsedTicks ts
506+
member _.RecordDispatch ts = dispatch <- dispatch + Stopwatch.elapsedTicks ts
507+
member _.RecordMerge ts = merge <- merge + Stopwatch.elapsedTicks ts
508+
member _.RecordIngest ts = ingest <- ingest + Stopwatch.elapsedTicks ts
509+
member _.RecordStats ts = stats <- stats + Stopwatch.elapsedTicks ts
510+
member _.RecordSleep ts = sleep <- sleep + Stopwatch.elapsedTicks ts
516511
member _.Dump(log : ILogger) =
517-
let dt, ft, mt = StopwatchTicks.toTimeSpan results, StopwatchTicks.toTimeSpan dispatch, StopwatchTicks.toTimeSpan merge
518-
let it, st, zt = StopwatchTicks.toTimeSpan ingest, StopwatchTicks.toTimeSpan stats, StopwatchTicks.toTimeSpan sleep
512+
let dt, ft, mt = Stopwatch.ticksToTimeSpan results, Stopwatch.ticksToTimeSpan dispatch, Stopwatch.ticksToTimeSpan merge
513+
let it, st, zt = Stopwatch.ticksToTimeSpan ingest, Stopwatch.ticksToTimeSpan stats, Stopwatch.ticksToTimeSpan sleep
519514
let m = Log.Metric.SchedulerCpu (mt, it, ft, dt, st)
520-
let tot = StopwatchTicks.toTimeSpan (results + dispatch + merge + ingest + stats + sleep)
515+
let tot = Stopwatch.ticksToTimeSpan (results + dispatch + merge + ingest + stats + sleep)
521516
(log |> Log.withMetric m).Information(" Cpu Streams {mt:n1}s Batches {it:n1}s Dispatch {ft:n1}s Results {dt:n1}s Stats {st:n1}s Sleep {zt:n1}s Total {tot:n1}s Interval {int:n1}s",
522517
mt.TotalSeconds, it.TotalSeconds, ft.TotalSeconds, dt.TotalSeconds, st.TotalSeconds, zt.TotalSeconds, tot.TotalSeconds, sw.ElapsedSeconds)
523518
results <- 0L; dispatch <- 0L; merge <- 0L; ingest <- 0L; stats <- 0L; sleep <- 0L
@@ -632,9 +627,9 @@ module Scheduling =
632627
static member Create(inner,
633628
project : struct (FsCodec.StreamName * StreamSpan<'F>) -> CancellationToken -> Task<struct (bool * Choice<'P, 'E>)>,
634629
interpretProgress, stats, dumpStreams) =
635-
let project struct (startTicks, item : Dispatch.Item<'F>) (ct : CancellationToken) = task {
630+
let project struct (startTs, item : Dispatch.Item<'F>) (ct : CancellationToken) = task {
636631
let! progressed, res = project (item.stream, item.span) ct
637-
return struct (System.Diagnostics.Stopwatch.GetTimestamp() - startTicks |> StopwatchTicks.toTimeSpan, item.stream, progressed, res) }
632+
return struct (Stopwatch.elapsed startTs, item.stream, progressed, res) }
638633
MultiDispatcher<_, _, _, _>(inner, project, interpretProgress, stats, dumpStreams)
639634
static member Create(inner, handle, interpret, toIndex, stats, dumpStreams) =
640635
let project item ct = task {
@@ -877,17 +872,17 @@ module Scheduling =
877872

878873
member _.Pump(ct : CancellationToken) = task {
879874
use _ = dispatcher.Result.Subscribe(fun struct (t, s, pr, r) -> writeResult (Result (t, s, pr, r)))
880-
let inline ssw () = Stopwatch.start ()
875+
let inline ts () = Stopwatch.timestamp ()
881876
while not ct.IsCancellationRequested do
882877
let mutable s = { idle = true; dispatcherState = Idle; remaining = maxCycles; waitForPending = false; waitForCapacity = false }
883878
let t = dispatcher.Timers
884879
while s.remaining <> 0 do
885880
s.remaining <- s.remaining - 1
886881
// 1. propagate write write outcomes to buffer (can mark batches completed etc)
887-
let processedResults = let sw = ssw () in let r = tryHandleResults () in t.RecordResults sw; r
882+
let processedResults = let ts = ts () in let r = tryHandleResults () in t.RecordResults ts; r
888883
// 2. top up provisioning of writers queue
889884
// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
890-
let struct (dispatched, hasCapacity) = let sw = ssw () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch sw; r
885+
let struct (dispatched, hasCapacity) = let ts = ts () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch ts; r
891886
s.idle <- s.idle && not processedResults && not dispatched
892887
match s.dispatcherState with
893888
| Idle when not hasCapacity ->
@@ -899,8 +894,8 @@ module Scheduling =
899894
| Idle -> // need to bring more work into the pool as we can't fill the work queue from what we have
900895
// If we're going to fill the write queue with random work, we should bring all read events into the state first
901896
// Hence we potentially take more than one batch at a time based on maxBatches (but less buffered work is more optimal)
902-
let mergeStreams batchStreams = let sw = ssw () in streams.Merge batchStreams; t.RecordMerge sw
903-
let ingestBatch batch = let sw = ssw () in ingestBatch batch; t.RecordIngest sw
897+
let mergeStreams batchStreams = let ts = ts () in streams.Merge batchStreams; t.RecordMerge ts
898+
let ingestBatch batch = let ts = ts () in ingestBatch batch; t.RecordIngest ts
904899
let struct (ingested, filled) = ingestBatches mergeStreams ingestBatch maxBatches
905900
if ingested then s.waitForPending <- not filled // no need to wait if we ingested as many as needed
906901
elif slipstreamingEnabled then s.dispatcherState <- Slipstreaming; s.waitForPending <- true // try some slip-streaming, but wait for proper items too
@@ -911,17 +906,17 @@ module Scheduling =
911906
if s.remaining = 0 && hasCapacity then s.waitForPending <- true
912907
if s.remaining = 0 && not hasCapacity && not wakeForResults then s.waitForCapacity <- true
913908
// While the loop can take a long time, we don't attempt logging of stats per iteration on the basis that the maxCycles should be low
914-
let sw = ssw () in dispatcher.RecordStats(pendingCount()); t.RecordStats sw
909+
let ts = ts () in dispatcher.RecordStats(pendingCount()); t.RecordStats ts
915910
// 4. Do a minimal sleep so we don't run completely hot when empty (unless we did something non-trivial)
916911
if s.idle then
917-
let sleepSw = ssw ()
912+
let sleepTs = Stopwatch.timestamp ()
918913
let wakeConditions : Task array = [|
919914
if wakeForResults then awaitResults ct
920915
elif s.waitForCapacity then dispatcher.AwaitCapacity()
921916
if s.waitForPending then awaitPending ct
922917
Task.Delay(int sleepIntervalMs) |]
923918
do! Task.WhenAny(wakeConditions) :> Task
924-
t.RecordSleep sleepSw
919+
t.RecordSleep sleepTs
925920
// 3. Record completion state once per full iteration; dumping streams is expensive so needs to be done infrequently
926921
if dispatcher.RecordState(s.dispatcherState, streams, totalPurged) && purgeDue () then
927922
purge () }
@@ -1222,11 +1217,11 @@ module Sync =
12221217

12231218
let attemptWrite struct (stream, span : FsCodec.ITimelineEvent<'F> array) ct = task {
12241219
let struct (met, span') = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) span
1225-
let prepareSw = Stopwatch.start ()
1220+
let prepareTs = Stopwatch.timestamp ()
12261221
try let req = struct (stream, span')
12271222
let! res, outcome = Async.StartImmediateAsTask(handle req, cancellationToken = ct)
12281223
let index' = SpanResult.toIndex span' res
1229-
return struct (index' > span[0].Index, Choice1Of2 struct (index', struct (met, prepareSw.Elapsed), outcome))
1224+
return struct (index' > span[0].Index, Choice1Of2 struct (index', struct (met, Stopwatch.elapsed prepareTs), outcome))
12301225
with e -> return struct (false, Choice2Of2 struct (met, e)) }
12311226

12321227
let interpretWriteResultProgress _streams (stream : FsCodec.StreamName) = function

0 commit comments

Comments
 (0)