Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions pkg/dataobj/internal/dataset/reader_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type readerDownloader struct {
inner Dataset
targetCacheSize int

origColumns []Column
allColumns, primary, secondary []Column

dsetRanges rowRanges // Ranges of rows to _include_ in the download.
Expand Down Expand Up @@ -135,13 +136,15 @@ func newReaderDownloader(dset Dataset, targetCacheSize int) *readerDownloader {
// AddColumn must be called matching the order of columns in
// [ReaderOptions.Columns].
func (dl *readerDownloader) AddColumn(col Column, primary bool) {
col = newReaderColumn(dl, col, primary)
wrappedCol := newReaderColumn(dl, col, primary)

dl.origColumns = append(dl.origColumns, col)
dl.allColumns = append(dl.allColumns, wrappedCol)

dl.allColumns = append(dl.allColumns, col)
if primary {
dl.primary = append(dl.primary, col)
dl.primary = append(dl.primary, wrappedCol)
} else {
dl.secondary = append(dl.secondary, col)
dl.secondary = append(dl.secondary, wrappedCol)
}
}

Expand Down Expand Up @@ -180,6 +183,25 @@ func (dl *readerDownloader) PrimaryColumns() []Column { return dl.primary }
// readerDownloader in the order they were added.
func (dl *readerDownloader) SecondaryColumns() []Column { return dl.secondary }

// initColumnPages populates the pages of all columns in the downloader.
func (dl *readerDownloader) initColumnPages(ctx context.Context) error {
columns := dl.allColumns

var idx int

for result := range dl.inner.ListPages(ctx, dl.origColumns) {
pages, err := result.Value()
if err != nil {
return err
}

columns[idx].(*readerColumn).processPages(pages)
idx++
}

return nil
}

// downloadBatch downloads a batch of pages from the inner dataset.
func (dl *readerDownloader) downloadBatch(ctx context.Context, requestor *readerPage) error {
for _, col := range dl.allColumns {
Expand Down Expand Up @@ -360,7 +382,7 @@ func (dl *readerDownloader) iterColumnPages(ctx context.Context, primary bool) r
for _, col := range phaseColumns {
col := col.(*readerColumn)
if len(col.pages) == 0 {
if err := col.initPages(ctx); err != nil {
if err := dl.initColumnPages(ctx); err != nil {
return err
}
} else if pageIndex >= len(col.pages) {
Expand Down Expand Up @@ -436,6 +458,7 @@ func (dl *readerDownloader) Reset(dset Dataset, targetCacheSize int) {

dl.readRange = rowRange{}

dl.origColumns = sliceclear.Clear(dl.origColumns)
dl.allColumns = sliceclear.Clear(dl.allColumns)
dl.primary = sliceclear.Clear(dl.primary)
dl.secondary = sliceclear.Clear(dl.secondary)
Expand Down Expand Up @@ -473,7 +496,7 @@ func (col *readerColumn) ColumnInfo() *ColumnInfo {
func (col *readerColumn) ListPages(ctx context.Context) result.Seq[Page] {
return result.Iter(func(yield func(Page) bool) error {
if len(col.pages) == 0 {
err := col.initPages(ctx)
err := col.dl.initColumnPages(ctx)
if err != nil {
return err
}
Expand All @@ -489,15 +512,10 @@ func (col *readerColumn) ListPages(ctx context.Context) result.Seq[Page] {
})
}

func (col *readerColumn) initPages(ctx context.Context) error {
func (col *readerColumn) processPages(pages Pages) {
var startRow uint64

for result := range col.inner.ListPages(ctx) {
innerPage, err := result.Value()
if err != nil {
return err
}

for _, innerPage := range pages {
pageRange := rowRange{
Start: startRow,
End: startRow + uint64(innerPage.PageInfo().RowCount) - 1,
Expand All @@ -506,8 +524,6 @@ func (col *readerColumn) initPages(ctx context.Context) error {

col.pages = append(col.pages, newReaderPage(col, innerPage, pageRange))
}

return nil
}

// GC garbage collects cached data from pages which will no longer be read: any
Expand Down
Loading