@@ -32,7 +32,7 @@ type buildRequest struct {
3232
3333// buildResult represents the result of an index build operation
3434type buildResult struct {
35- indexPath string
35+ indexPath string // Index path when ErrBuilderFull causes multiple flushes
3636 records []* kgo.Record // Records to commit after successful build
3737 err error
3838}
@@ -65,6 +65,8 @@ type serialIndexer struct {
6565 // Download pipeline
6666 downloadQueue chan metastore.ObjectWrittenEvent
6767 downloadedObjects chan downloadedObject
68+ skipDownloads chan struct {} // Signal to skip downloading new objects
69+ skipMode bool // Track whether we're in skip mode
6870
6971 // Worker management
7072 buildRequestChan chan buildRequest
@@ -96,6 +98,7 @@ func newSerialIndexer(
9698 buildRequestChan : make (chan buildRequest , cfg .QueueSize ),
9799 downloadQueue : make (chan metastore.ObjectWrittenEvent , 32 ),
98100 downloadedObjects : make (chan downloadedObject , 1 ),
101+ skipDownloads : make (chan struct {}),
99102 }
100103
101104 // Initialize dskit Service
@@ -135,6 +138,9 @@ func (si *serialIndexer) stopping(_ error) error {
135138 close (si .downloadQueue )
136139 close (si .buildRequestChan )
137140
141+ // Close the skip downloads channel to signal any waiting download workers
142+ close (si .skipDownloads )
143+
138144 // Wait for workers to finish
139145 si .downloadWorkerWg .Wait ()
140146 si .buildWorkerWg .Wait ()
@@ -196,6 +202,8 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) {
196202 level .Debug (si .logger ).Log ("msg" , "download worker started" )
197203 defer level .Debug (si .logger ).Log ("msg" , "download worker stopped" )
198204
205+ si .skipMode = false // Track whether we're in skip mode
206+
199207 for {
200208 select {
201209 case event , ok := <- si .downloadQueue :
@@ -204,6 +212,12 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) {
204212 return
205213 }
206214
215+ if si .skipMode {
216+ // Skip downloading, just drain the queue
217+ level .Debug (si .logger ).Log ("msg" , "skipping download due to skip signal" , "object_path" , event .ObjectPath )
218+ continue
219+ }
220+
207221 objLogger := log .With (si .logger , "object_path" , event .ObjectPath )
208222 downloadStart := time .Now ()
209223
@@ -247,6 +261,11 @@ func (si *serialIndexer) downloadWorker(ctx context.Context) {
247261 return
248262 }
249263
264+ case <- si .skipDownloads :
265+ level .Debug (si .logger ).Log ("msg" , "download worker received skip signal, entering skip mode" )
266+ si .skipMode = true
267+ // Continue in the loop to drain the downloadQueue
268+
250269 case <- ctx .Done ():
251270 return
252271 }
@@ -302,26 +321,26 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult {
302321 }
303322
304323 // Build the index using internal method
305- indexPath , err := si .buildIndex (req .ctx , events , req .partition )
324+ indexPath , processed , err := si .buildIndex (req .ctx , events , req .partition )
306325
307326 // Update metrics
308327 buildTime := time .Since (start )
309328 si .updateMetrics (buildTime )
310329
311330 if err != nil {
312331 level .Error (si .logger ).Log ("msg" , "failed to build index" ,
313- "partition" , req .partition , "err" , err , "duration" , buildTime )
332+ "partition" , req .partition , "err" , err , "duration" , buildTime , "processed" , processed )
314333 return buildResult {err : err }
315334 }
316335
317336 level .Debug (si .logger ).Log ("msg" , "successfully built index" ,
318337 "partition" , req .partition , "index_path" , indexPath , "duration" , buildTime ,
319- "events" , len (events ))
338+ "events" , len (events ), "processed" , processed )
320339
321- // Extract records for committing
322- records := make ([]* kgo.Record , len ( req . events ) )
323- for i , buffered := range req . events {
324- records [i ] = buffered .record
340+ // Extract records for committing - only for processed events
341+ records := make ([]* kgo.Record , processed )
342+ for i := range processed {
343+ records [i ] = req . events [ i ] .record
325344 }
326345
327346 return buildResult {
@@ -331,39 +350,44 @@ func (si *serialIndexer) processBuildRequest(req buildRequest) buildResult {
331350 }
332351}
333352
334- // buildIndex is the core index building logic (moved from builder)
335- func (si * serialIndexer ) buildIndex (ctx context.Context , events []metastore.ObjectWrittenEvent , partition int32 ) (string , error ) {
353+ // buildIndex is writing all metastore events to a single index object. It
354+ // returns the index path and the number of events processed or an error if the index object is not created.
355+ // The number of events processed can be less than the number of events if the builder becomes full
356+ // when the trigger is triggerTypeAppend.
357+ func (si * serialIndexer ) buildIndex (ctx context.Context , events []metastore.ObjectWrittenEvent , partition int32 ) (string , int , error ) {
336358 level .Debug (si .logger ).Log ("msg" , "building index" , "events" , len (events ), "partition" , partition )
337359 start := time .Now ()
338360
339361 // Observe processing delay
340362 writeTime , err := time .Parse (time .RFC3339 , events [0 ].WriteTime )
341363 if err != nil {
342364 level .Error (si .logger ).Log ("msg" , "failed to parse write time" , "err" , err )
343- return "" , err
365+ return "" , 0 , err
344366 }
345367 si .builderMetrics .setProcessingDelay (writeTime )
346368
347- // Trigger the downloads
348369 for _ , event := range events {
349370 select {
350371 case si .downloadQueue <- event :
351372 // Successfully sent event for download
352373 case <- ctx .Done ():
353- return "" , ctx .Err ()
374+ return "" , 0 , ctx .Err ()
354375 }
355376 }
356377
357- // Process the results as they are downloaded
378+ // Process downloaded objects, handling ErrBuilderFull
358379 processingErrors := multierror .New ()
359- for range len (events ) {
380+
381+ processed := 0
382+ for processed < len (events ) {
360383 var obj downloadedObject
361384 select {
362385 case obj = <- si .downloadedObjects :
363386 case <- ctx .Done ():
364- return "" , ctx .Err ()
387+ return "" , processed , ctx .Err ()
365388 }
366389
390+ processed ++
367391 objLogger := log .With (si .logger , "object_path" , obj .event .ObjectPath )
368392 level .Debug (objLogger ).Log ("msg" , "processing object" )
369393
@@ -372,26 +396,78 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje
372396 continue
373397 }
374398
375- reader , err := dataobj . FromReaderAt ( bytes . NewReader ( * obj . objectBytes ), int64 ( len ( * obj . objectBytes )))
376- if err != nil {
377- processingErrors .Add (fmt .Errorf ("failed to read object: %w" , err ))
399+ // Process this object
400+ if err := si . processObject ( ctx , objLogger , obj ); err != nil {
401+ processingErrors .Add (fmt .Errorf ("failed to process object: %w" , err ))
378402 continue
379403 }
380404
381- if err := si .calculator .Calculate (ctx , objLogger , reader , obj .event .ObjectPath ); err != nil {
382- processingErrors .Add (fmt .Errorf ("failed to calculate index: %w" , err ))
383- continue
405+ // Check if builder became full during processing
406+ if si .calculator .IsFull () {
407+ // Signal download worker to skip downloading new objects
408+ select {
409+ case si .skipDownloads <- struct {}{}:
410+ level .Debug (si .logger ).Log ("msg" , "sent skip signal to download worker" )
411+ default :
412+ // Channel might be full or already closed, continue anyway
413+ level .Debug (si .logger ).Log ("msg" , "failed to send skip signal to download worker, channel full or closed" )
414+ }
415+
416+ // Drain any remaining downloaded objects to prevent goroutine leaks
417+ drainLoop:
418+ for {
419+ select {
420+ case <- si .downloadedObjects :
421+ // Drain the channel, continue draining
422+ case <- ctx .Done ():
423+ break drainLoop
424+ default :
425+ break drainLoop // No more objects to drain
426+ }
427+ }
428+ break
384429 }
385430 }
386431
387432 if processingErrors .Err () != nil {
388- return "" , processingErrors .Err ()
433+ return "" , processed , processingErrors .Err ()
434+ }
435+
436+ // Flush current index and start fresh for each trigger type means:
437+ // - append: Either the calculator became full and the remaining events will be processed by the next trigger
438+ // or all events have been processed, so we flush the current index and start fresh
439+ // - max-idle: All events have been processed, so we flush the current index and start fresh
440+ indexPath , flushErr := si .flushIndex (ctx , partition )
441+ if flushErr != nil {
442+ return "" , processed , fmt .Errorf ("failed to flush index after processing full object: %w" , flushErr )
389443 }
390444
445+ level .Debug (si .logger ).Log ("msg" , "finished building index files" , "partition" , partition ,
446+ "events" , len (events ), "processed" , processed , "index_path" , indexPath , "duration" , time .Since (start ))
447+
448+ return indexPath , processed , nil
449+ }
450+
451+ // processObject handles processing a single downloaded object
452+ func (si * serialIndexer ) processObject (ctx context.Context , objLogger log.Logger , obj downloadedObject ) error {
453+ reader , err := dataobj .FromReaderAt (bytes .NewReader (* obj .objectBytes ), int64 (len (* obj .objectBytes )))
454+ if err != nil {
455+ return fmt .Errorf ("failed to read object: %w" , err )
456+ }
457+
458+ return si .calculator .Calculate (ctx , objLogger , reader , obj .event .ObjectPath )
459+ }
460+
461+ // flushIndex flushes the current calculator state to an index object
462+ func (si * serialIndexer ) flushIndex (ctx context.Context , partition int32 ) (string , error ) {
391463 tenantTimeRanges := si .calculator .TimeRanges ()
464+ if len (tenantTimeRanges ) == 0 {
465+ return "" , nil // Nothing to flush
466+ }
467+
392468 obj , closer , err := si .calculator .Flush ()
393469 if err != nil {
394- return "" , fmt .Errorf ("failed to flush builder : %w" , err )
470+ return "" , fmt .Errorf ("failed to flush calculator : %w" , err )
395471 }
396472 defer closer .Close ()
397473
@@ -415,9 +491,11 @@ func (si *serialIndexer) buildIndex(ctx context.Context, events []metastore.Obje
415491 return "" , fmt .Errorf ("failed to update metastore ToC file: %w" , err )
416492 }
417493
418- level .Debug (si .logger ).Log ("msg" , "finished building new index file" , "partition" , partition ,
419- "events" , len (events ), "size" , obj .Size (), "duration" , time .Since (start ),
420- "tenants" , len (tenantTimeRanges ), "path" , key )
494+ si .calculator .Reset ()
495+ si .skipMode = false // Reset skip mode
496+
497+ level .Debug (si .logger ).Log ("msg" , "flushed index object" , "partition" , partition ,
498+ "path" , key , "size" , obj .Size (), "tenants" , len (tenantTimeRanges ))
421499
422500 return key , nil
423501}
0 commit comments