Skip to content

Commit 17e3857

Browse files
authored
Kafka: Fix Progress Marking where consuming >1 Topic (#63)
1 parent ddbcf41 commit 17e3857

File tree

6 files changed

+56
-54
lines changed

6 files changed

+56
-54
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
1313
### Removed
1414
### Fixed
1515

16+
- `Kafka`: Change buffer grouping to include `Topic` alongside `PartitionId` - existing implementation did not guarantee marking progress where consuming from more than one Topic concurrently [#63](https://github.com/jet/propulsion/pull/63)
17+
1618
<a name="2.4.3"></a>
1719
## [2.4.3] - 2019-04-27
1820

src/Propulsion.EventStore/StripedIngester.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ open StripedIngesterImpl
4848

4949
/// Holds batches away from Core processing to limit in-flight processing
5050
type StripedIngester
51-
( log : ILogger, inner : Propulsion.Ingestion.Ingester<seq<StreamEvent<byte[]>>, Propulsion.Submission.SubmissionBatch<StreamEvent<byte[]>>>,
51+
( log : ILogger, inner : Propulsion.Ingestion.Ingester<seq<StreamEvent<byte[]>>, Propulsion.Submission.SubmissionBatch<_, StreamEvent<byte[]>>>,
5252
maxInFlightBatches, initialSeriesIndex, statsInterval : TimeSpan, ?pumpInterval) =
5353
let cts = new CancellationTokenSource()
5454
let pumpInterval = defaultArg pumpInterval (TimeSpan.FromMilliseconds 5.)

src/Propulsion.Kafka/Consumers.fs

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ module private Impl =
5454
/// Pauses if in-flight upper threshold is breached until such time as it drops below that the lower limit
5555
type KafkaIngestionEngine<'Info>
5656
( log : ILogger, counter : Core.InFlightMessageCounter, consumer : IConsumer<_, _>, closeConsumer,
57-
mapMessage : ConsumeResult<_, _> -> 'Info, emit : Submission.SubmissionBatch<'Info>[] -> unit,
57+
mapMessage : ConsumeResult<_, _> -> 'Info, emit : Submission.SubmissionBatch<TopicPartition, 'Info>[] -> unit,
5858
maxBatchSize, emitInterval, statsInterval) =
59-
let acc = Dictionary<int, _>()
59+
let acc = Dictionary<TopicPartition, _>()
6060
let remainingIngestionWindow = intervalTimer emitInterval
6161
let mutable intervalMsgs, intervalChars, totalMessages, totalChars = 0L, 0L, 0L, 0L
6262
let dumpStats () =
@@ -67,33 +67,33 @@ type KafkaIngestionEngine<'Info>
6767
let maybeLogStats =
6868
let due = intervalCheck statsInterval
6969
fun () -> if due () then dumpStats ()
70-
let mkSubmission partitionId span : Submission.SubmissionBatch<'M> =
70+
let mkSubmission topicPartition span : Submission.SubmissionBatch<'S, 'M> =
7171
let checkpoint () =
7272
counter.Delta(-span.reservation) // counterbalance Delta(+) per ingest, below
7373
Bindings.storeOffset log consumer span.highWaterMark
74-
{ partitionId = partitionId; onCompletion = checkpoint; messages = span.messages.ToArray() }
74+
{ source = topicPartition; onCompletion = checkpoint; messages = span.messages.ToArray() }
7575
let ingest result =
7676
let message = Bindings.mapMessage result
7777
let sz = approximateMessageBytes message
7878
counter.Delta(+sz) // counterbalanced by Delta(-) in checkpoint(), below
7979
intervalMsgs <- intervalMsgs + 1L
8080
let inline stringLen (s : string) = match s with null -> 0 | x -> x.Length
8181
intervalChars <- intervalChars + int64 (stringLen message.Key + stringLen message.Value)
82-
let partitionId = Bindings.partitionId result
82+
let tp = result.TopicPartition
8383
let span =
84-
match acc.TryGetValue partitionId with
85-
| false, _ -> let span = PartitionBuffer<'Info>.Create(sz, result, mapMessage) in acc.[partitionId] <- span; span
84+
match acc.TryGetValue tp with
85+
| false, _ -> let span = PartitionBuffer<'Info>.Create(sz, result, mapMessage) in acc.[tp] <- span; span
8686
| true, span -> span.Enqueue(sz, result, mapMessage); span
8787
if span.messages.Count = maxBatchSize then
88-
acc.Remove partitionId |> ignore
89-
emit [| mkSubmission partitionId span |]
88+
acc.Remove tp |> ignore
89+
emit [| mkSubmission tp span |]
9090
let submit () =
9191
match acc.Count with
9292
| 0 -> ()
93-
| partitionsWithMessagesThisInterval ->
94-
let tmp = ResizeArray<Submission.SubmissionBatch<'Info>>(partitionsWithMessagesThisInterval)
95-
for KeyValue(partitionIndex, span) in acc do
96-
tmp.Add(mkSubmission partitionIndex span)
93+
| topicPartitionsWithMessagesThisInterval ->
94+
let tmp = ResizeArray<Submission.SubmissionBatch<_, 'Info>>(topicPartitionsWithMessagesThisInterval)
95+
for KeyValue(tp, span) in acc do
96+
tmp.Add(mkSubmission tp span)
9797
acc.Clear()
9898
emit <| tmp.ToArray()
9999
member __.Pump() = async {
@@ -187,12 +187,12 @@ type ParallelConsumer private () =
187187
let pumpInterval = defaultArg pumpInterval (TimeSpan.FromMilliseconds 5.)
188188

189189
let dispatcher = Parallel.Scheduling.Dispatcher maxDop
190-
let scheduler = Parallel.Scheduling.PartitionedSchedulingEngine<'Msg>(log, handle, dispatcher.TryAdd, statsInterval, ?logExternalStats=logExternalStats)
190+
let scheduler = Parallel.Scheduling.PartitionedSchedulingEngine<_, 'Msg>(log, handle, dispatcher.TryAdd, statsInterval, ?logExternalStats=logExternalStats)
191191
let maxSubmissionsPerPartition = defaultArg maxSubmissionsPerPartition 5
192-
let mapBatch onCompletion (x : Submission.SubmissionBatch<_>) : Parallel.Scheduling.Batch<'Msg> =
192+
let mapBatch onCompletion (x : Submission.SubmissionBatch<_, _>) : Parallel.Scheduling.Batch<_, 'Msg> =
193193
let onCompletion' () = x.onCompletion(); onCompletion()
194-
{ partitionId = x.partitionId; messages = x.messages; onCompletion = onCompletion'; }
195-
let submitBatch (x : Parallel.Scheduling.Batch<_>) : int =
194+
{ source = x.source; messages = x.messages; onCompletion = onCompletion'; }
195+
let submitBatch (x : Parallel.Scheduling.Batch<_, _>) : int =
196196
scheduler.Submit x
197197
x.messages.Length
198198
let submitter = Submission.SubmissionEngine(log, maxSubmissionsPerPartition, mapBatch, submitBatch, statsInterval, pumpInterval)
@@ -255,7 +255,7 @@ module Core =
255255
logExternalState |> Option.iter (fun f -> f log)
256256
streams.Dump(log, Streams.Buffering.StreamState.eventsSize)
257257
let streamsScheduler = Streams.Scheduling.StreamSchedulingEngine.Create<_, _, _, _>(dispatcher, stats, prepare, handle, Streams.SpanResult.toIndex, dumpStreams, ?idleDelay=idleDelay, ?maxBatches=maxBatches)
258-
let mapConsumedMessagesToStreamsBatch onCompletion (x : Submission.SubmissionBatch<'Info>) : Streams.Scheduling.StreamsBatch<_> =
258+
let mapConsumedMessagesToStreamsBatch onCompletion (x : Submission.SubmissionBatch<TopicPartition, 'Info>) : Streams.Scheduling.StreamsBatch<_> =
259259
let onCompletion () = x.onCompletion(); onCompletion()
260260
Streams.Scheduling.StreamsBatch.Create(onCompletion, Seq.collect infoToStreamEvents x.messages) |> fst
261261
let submitter =
@@ -482,7 +482,7 @@ type BatchesConsumer =
482482
x.stream, Choice2Of2 (s, e) |] }
483483
let dispatcher = Streams.Scheduling.BatchedDispatcher(select, handle, stats, dumpStreams)
484484
let streamsScheduler = Streams.Scheduling.StreamSchedulingEngine.Create(dispatcher, ?idleDelay=idleDelay, maxBatches=maxBatches)
485-
let mapConsumedMessagesToStreamsBatch onCompletion (x : Submission.SubmissionBatch<'Info>) : Streams.Scheduling.StreamsBatch<_> =
485+
let mapConsumedMessagesToStreamsBatch onCompletion (x : Submission.SubmissionBatch<TopicPartition, 'Info>) : Streams.Scheduling.StreamsBatch<_> =
486486
let onCompletion () = x.onCompletion(); onCompletion()
487487
Streams.Scheduling.StreamsBatch.Create(onCompletion, Seq.collect infoToStreamEvents x.messages) |> fst
488488
let submitter =

src/Propulsion/Parallel.fs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,17 @@ module Scheduling =
4141

4242
/// Batch of work as passed from the Submitter to the Scheduler comprising messages with their associated checkpointing/completion callback
4343
[<NoComparison; NoEquality>]
44-
type Batch<'M> = { partitionId : int; messages: 'M []; onCompletion: unit -> unit }
44+
type Batch<'S, 'M> = { source : 'S; messages: 'M []; onCompletion: unit -> unit }
4545

4646
/// Thread-safe/lock-free batch-level processing state
4747
/// - referenced [indirectly, see `mkDispatcher`] among all task invocations for a given batch
4848
/// - scheduler loop continuously inspects oldest active instance per partition in order to infer attainment of terminal (completed or faulted) state
4949
[<NoComparison; NoEquality>]
50-
type WipBatch<'M> =
50+
type WipBatch<'S, 'M> =
5151
{ mutable elapsedMs : int64 // accumulated processing time for stats
5252
mutable remaining : int // number of outstanding completions; 0 => batch is eligible for completion
5353
mutable faults : ConcurrentStack<exn> // exceptions, order is not relevant and use is infrequent hence ConcurrentStack
54-
batch: Batch<'M> }
54+
batch: Batch<'S, 'M> }
5555

5656
member private __.RecordOk(duration : TimeSpan) =
5757
// need to record stats first as remaining = 0 is used as completion gate
@@ -61,7 +61,7 @@ module Scheduling =
6161
__.faults.Push exn
6262

6363
/// Prepares an initial set of shared state for a batch of tasks, together with the Async<unit> computations that will feed their results into it
64-
static member Create(batch : Batch<'M>, handle) : WipBatch<'M> * seq<Async<unit>> =
64+
static member Create(batch : Batch<'S, 'M>, handle) : WipBatch<'S, 'M> * seq<Async<unit>> =
6565
let x = { elapsedMs = 0L; remaining = batch.messages.Length; faults = ConcurrentStack(); batch = batch }
6666
x, seq {
6767
for item in batch.messages -> async {
@@ -84,13 +84,13 @@ module Scheduling =
8484
/// - replenishing the Dispatcher
8585
/// - determining when WipBatches attain terminal state in order to triggering completion callbacks at the earliest possible opportunity
8686
/// - triggering abend of the processing should any dispatched tasks start to fault
87-
type PartitionedSchedulingEngine<'M>(log : ILogger, handle, tryDispatch : (Async<unit>) -> bool, statsInterval, ?logExternalStats) =
87+
type PartitionedSchedulingEngine<'S, 'M when 'S : equality>(log : ILogger, handle, tryDispatch : (Async<unit>) -> bool, statsInterval, ?logExternalStats) =
8888
// Submitters dictate batch commencement order by supply batches in a fair order; should never be empty if there is work in the system
89-
let incoming = ConcurrentQueue<Batch<'M>>()
89+
let incoming = ConcurrentQueue<Batch<'S, 'M>>()
9090
// Prepared work items ready to feed to Dispatcher (only created on demand in order to ensure we maximize overall progress and fairness)
9191
let waiting = Queue<Async<unit>>(1024)
9292
// Index of batches that have yet to attain terminal state (can be >1 per partition)
93-
let active = Dictionary<int(*partitionId*),Queue<WipBatch<'M>>>()
93+
let active = Dictionary<'S(*partitionId*),Queue<WipBatch<'S, 'M>>>()
9494
(* accumulators for periodically emitted statistics info *)
9595
let mutable cycles, processingDuration = 0, TimeSpan.Zero
9696
let startedBatches, completedBatches, startedItems, completedItems = PartitionStats(), PartitionStats(), PartitionStats(), PartitionStats()
@@ -130,7 +130,7 @@ module Scheduling =
130130
abend (AggregateException(exns))
131131
| Some (Completed batchProcessingDuration) -> // call completion function asap
132132
let partitionId, markCompleted, itemCount =
133-
let { batch = { partitionId = p; onCompletion = f; messages = msgs } } = queue.Dequeue()
133+
let { batch = { source = p; onCompletion = f; messages = msgs } } = queue.Dequeue()
134134
p, f, msgs.LongLength
135135
completedBatches.Record partitionId
136136
completedItems.Record(partitionId, itemCount)
@@ -144,7 +144,7 @@ module Scheduling =
144144
let tryPrepareNext () =
145145
match incoming.TryDequeue() with
146146
| false, _ -> false
147-
| true, ({ partitionId = pid; messages = msgs} as batch) ->
147+
| true, ({ source = pid; messages = msgs} as batch) ->
148148
startedBatches.Record(pid)
149149
startedItems.Record(pid, msgs.LongLength)
150150
let wipBatch, runners = WipBatch.Create(batch, handle)
@@ -180,34 +180,34 @@ module Scheduling =
180180
Thread.Sleep 1 } // not Async.Sleep, we like this context and/or cache state if nobody else needs it
181181

182182
/// Feeds a batch of work into the queue; the caller is expected to ensure sumbissions are timely to avoid starvation, but throttled to ensure fair ordering
183-
member __.Submit(batches : Batch<'M>) =
183+
member __.Submit(batches : Batch<'S, 'M>) =
184184
incoming.Enqueue batches
185185

186186
type ParallelIngester<'Item> =
187187
static member Start(log, partitionId, maxRead, submit, ?statsInterval, ?sleepInterval) =
188188
let makeBatch onCompletion (items : 'Item seq) =
189189
let items = Array.ofSeq items
190-
let batch : Submission.SubmissionBatch<'Item> = { partitionId = partitionId; onCompletion = onCompletion; messages = items }
190+
let batch : Submission.SubmissionBatch<_, 'Item> = { source = partitionId; onCompletion = onCompletion; messages = items }
191191
batch,(items.Length,items.Length)
192-
Ingestion.Ingester<'Item seq,Submission.SubmissionBatch<'Item>>.Start(log, maxRead, makeBatch, submit, ?statsInterval=statsInterval, ?sleepInterval=sleepInterval)
192+
Ingestion.Ingester<'Item seq,Submission.SubmissionBatch<_, 'Item>>.Start(log, maxRead, makeBatch, submit, ?statsInterval=statsInterval, ?sleepInterval=sleepInterval)
193193

194194
type ParallelProjector =
195195
static member Start(log : ILogger, maxReadAhead, maxDop, handle, ?statsInterval, ?maxSubmissionsPerPartition, ?logExternalStats)
196196
: ProjectorPipeline<_> =
197197

198198
let statsInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.)
199199
let dispatcher = Scheduling.Dispatcher maxDop
200-
let scheduler = Scheduling.PartitionedSchedulingEngine<'Item>(log, handle, dispatcher.TryAdd, statsInterval, ?logExternalStats=logExternalStats)
200+
let scheduler = Scheduling.PartitionedSchedulingEngine<_, 'Item>(log, handle, dispatcher.TryAdd, statsInterval, ?logExternalStats=logExternalStats)
201201
let maxSubmissionsPerPartition = defaultArg maxSubmissionsPerPartition 5
202202

203-
let mapBatch onCompletion (x : Submission.SubmissionBatch<'Item>) : Scheduling.Batch<'Item> =
203+
let mapBatch onCompletion (x : Submission.SubmissionBatch<_, 'Item>) : Scheduling.Batch<_, 'Item> =
204204
let onCompletion () = x.onCompletion(); onCompletion()
205-
{ partitionId = x.partitionId; onCompletion = onCompletion; messages = x.messages}
205+
{ source = x.source; onCompletion = onCompletion; messages = x.messages}
206206

207-
let submitBatch (x : Scheduling.Batch<'Item>) : int =
207+
let submitBatch (x : Scheduling.Batch<_, 'Item>) : int =
208208
scheduler.Submit x
209209
0
210210

211-
let submitter = Submission.SubmissionEngine<_, _>(log, maxSubmissionsPerPartition, mapBatch, submitBatch, statsInterval)
211+
let submitter = Submission.SubmissionEngine<_, _, _>(log, maxSubmissionsPerPartition, mapBatch, submitBatch, statsInterval)
212212
let startIngester (rangeLog, partitionId) = ParallelIngester<'Item>.Start(rangeLog, partitionId, maxReadAhead, submitter.Ingest)
213213
ProjectorPipeline.Start(log, dispatcher.Pump(), scheduler.Pump, submitter.Pump(), startIngester)

src/Propulsion/Streams.fs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -804,9 +804,9 @@ module Projector =
804804
let makeBatch onCompletion (items : StreamEvent<_> seq) =
805805
let items = Array.ofSeq items
806806
let streams = HashSet(seq { for x in items -> x.stream })
807-
let batch : Submission.SubmissionBatch<_> = { partitionId = partitionId; onCompletion = onCompletion; messages = items }
807+
let batch : Submission.SubmissionBatch<_, _> = { source = partitionId; onCompletion = onCompletion; messages = items }
808808
batch, (streams.Count, items.Length)
809-
Ingestion.Ingester<StreamEvent<_> seq, Submission.SubmissionBatch<StreamEvent<_>>>.Start(log, maxRead, makeBatch, submit, ?statsInterval=statsInterval, ?sleepInterval=sleepInterval)
809+
Ingestion.Ingester<StreamEvent<_> seq, Submission.SubmissionBatch<_, StreamEvent<_>>>.Start(log, maxRead, makeBatch, submit, ?statsInterval=statsInterval, ?sleepInterval=sleepInterval)
810810

811811
type StreamsSubmitter =
812812
static member Create
@@ -824,13 +824,13 @@ module Projector =
824824
| Some a -> if a.TryMerge x then worked <- true
825825
worked
826826
let tryCompactQueue = if defaultArg disableCompaction false then None else Some tryCompactQueueImpl
827-
Submission.SubmissionEngine<_, _>(log, maxSubmissionsPerPartition, mapBatch, submitBatch, statsInterval, ?tryCompactQueue=tryCompactQueue, ?pumpInterval=pumpInterval)
827+
Submission.SubmissionEngine<_, _, _>(log, maxSubmissionsPerPartition, mapBatch, submitBatch, statsInterval, ?tryCompactQueue=tryCompactQueue, ?pumpInterval=pumpInterval)
828828

829829
type StreamsProjectorPipeline =
830830
static member Start
831831
( log : Serilog.ILogger, pumpDispatcher, pumpScheduler, maxReadAhead, submitStreamsBatch, statsInterval,
832832
?ingesterStatsInterval, ?maxSubmissionsPerPartition) =
833-
let mapBatch onCompletion (x : Submission.SubmissionBatch<StreamEvent<_>>) : Scheduling.StreamsBatch<_> =
833+
let mapBatch onCompletion (x : Submission.SubmissionBatch<_, StreamEvent<_>>) : Scheduling.StreamsBatch<_> =
834834
let onCompletion () = x.onCompletion(); onCompletion()
835835
Scheduling.StreamsBatch.Create(onCompletion, x.messages) |> fst
836836
let submitter = StreamsSubmitter.Create(log, mapBatch, submitStreamsBatch, statsInterval, ?maxSubmissionsPerPartition=maxSubmissionsPerPartition)

0 commit comments

Comments
 (0)