diff --git a/pkg/dataobj/internal/dataset/reader_downloader.go b/pkg/dataobj/internal/dataset/reader_downloader.go index 271e084744b87..21ff050f21cd0 100644 --- a/pkg/dataobj/internal/dataset/reader_downloader.go +++ b/pkg/dataobj/internal/dataset/reader_downloader.go @@ -87,6 +87,7 @@ type readerDownloader struct { inner Dataset targetCacheSize int + origColumns []Column allColumns, primary, secondary []Column dsetRanges rowRanges // Ranges of rows to _include_ in the download. @@ -135,13 +136,15 @@ func newReaderDownloader(dset Dataset, targetCacheSize int) *readerDownloader { // AddColumn must be called matching the order of columns in // [ReaderOptions.Columns]. func (dl *readerDownloader) AddColumn(col Column, primary bool) { - col = newReaderColumn(dl, col, primary) + wrappedCol := newReaderColumn(dl, col, primary) + + dl.origColumns = append(dl.origColumns, col) + dl.allColumns = append(dl.allColumns, wrappedCol) - dl.allColumns = append(dl.allColumns, col) if primary { - dl.primary = append(dl.primary, col) + dl.primary = append(dl.primary, wrappedCol) } else { - dl.secondary = append(dl.secondary, col) + dl.secondary = append(dl.secondary, wrappedCol) } } @@ -180,6 +183,25 @@ func (dl *readerDownloader) PrimaryColumns() []Column { return dl.primary } // readerDownloader in the order they were added. func (dl *readerDownloader) SecondaryColumns() []Column { return dl.secondary } +// initColumnPages populates the pages of all columns in the downloader. +func (dl *readerDownloader) initColumnPages(ctx context.Context) error { + columns := dl.allColumns + + var idx int + + for result := range dl.inner.ListPages(ctx, dl.origColumns) { + pages, err := result.Value() + if err != nil { + return err + } + + columns[idx].(*readerColumn).processPages(pages) + idx++ + } + + return nil +} + // downloadBatch downloads a batch of pages from the inner dataset. func (dl *readerDownloader) downloadBatch(ctx context.Context, requestor *readerPage) error { for _, col := range dl.allColumns { @@ -360,7 +382,7 @@ func (dl *readerDownloader) iterColumnPages(ctx context.Context, primary bool) r for _, col := range phaseColumns { col := col.(*readerColumn) if len(col.pages) == 0 { - if err := col.initPages(ctx); err != nil { + if err := dl.initColumnPages(ctx); err != nil { return err } } else if pageIndex >= len(col.pages) { @@ -436,6 +458,7 @@ func (dl *readerDownloader) Reset(dset Dataset, targetCacheSize int) { dl.readRange = rowRange{} + dl.origColumns = sliceclear.Clear(dl.origColumns) dl.allColumns = sliceclear.Clear(dl.allColumns) dl.primary = sliceclear.Clear(dl.primary) dl.secondary = sliceclear.Clear(dl.secondary) @@ -473,7 +496,7 @@ func (col *readerColumn) ColumnInfo() *ColumnInfo { func (col *readerColumn) ListPages(ctx context.Context) result.Seq[Page] { return result.Iter(func(yield func(Page) bool) error { if len(col.pages) == 0 { - err := col.initPages(ctx) + err := col.dl.initColumnPages(ctx) if err != nil { return err } @@ -489,15 +512,10 @@ func (col *readerColumn) ListPages(ctx context.Context) result.Seq[Page] { }) } -func (col *readerColumn) initPages(ctx context.Context) error { +func (col *readerColumn) processPages(pages Pages) { var startRow uint64 - for result := range col.inner.ListPages(ctx) { - innerPage, err := result.Value() - if err != nil { - return err - } - + for _, innerPage := range pages { pageRange := rowRange{ Start: startRow, End: startRow + uint64(innerPage.PageInfo().RowCount) - 1, @@ -506,8 +524,6 @@ func (col *readerColumn) initPages(ctx context.Context) error { col.pages = append(col.pages, newReaderPage(col, innerPage, pageRange)) } - - return nil } // GC garbage collects cached data from pages which will no longer be read: any