Skip to content

Fix iterate and greedyIterate methods #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 40 additions & 48 deletions src/clients/dataset-client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ActorRun, Dataset, DatasetClient } from 'apify-client';
import { DatasetClient } from 'apify-client';

import { RunsTracker } from '../tracker.js';
import { DatasetItem, GreedyIterateOptions, ExtendedDatasetClient, IterateOptions } from '../types.js';
Expand Down Expand Up @@ -26,82 +26,74 @@ export class ExtDatasetClient<T extends DatasetItem> extends DatasetClient<T> im
const { pageSize, ...listItemOptions } = options;
this.customLogger.info('Iterating Dataset', { pageSize }, { url: this.url });

let totalItems = 0;
const initialOffset = listItemOptions.offset ?? 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I feel weird about this being in the outer scope as it really doesn't matter when we don't have a pageSize - that would entail duplicating the log, but TBH I'm not against that as it's actually two implementations masquerading as one function..

let readItems = 0;

if (pageSize) {
let offset = 0;
let currentPage = await this.superClient.listItems({ ...listItemOptions, offset, limit: pageSize });
let currentPage = await this.superClient.listItems({
...listItemOptions,
limit: pageSize,
});
while (currentPage.items.length > 0) {
totalItems += currentPage.items.length;
readItems += currentPage.items.length;
for (const item of currentPage.items) {
yield item;
}

offset += pageSize;
currentPage = await this.superClient.listItems({ offset, limit: pageSize });
currentPage = await this.superClient.listItems({
...listItemOptions,
offset: initialOffset + readItems,
limit: pageSize,
});
}
} else {
const itemList = await this.superClient.listItems(listItemOptions);
totalItems += itemList.items.length;
readItems += itemList.items.length;
for (const item of itemList.items) {
yield item;
}
}

this.customLogger.info('Finished reading dataset', { totalItems }, { url: this.url });
this.customLogger.info('Finished reading dataset', { initialOffset, readItems }, { url: this.url });
}

async* greedyIterate(options: GreedyIterateOptions = {}): AsyncGenerator<T, void, void> {
const { pageSize = 100, itemsThreshold = 100, pollIntervalSecs = 10, ...listItemOptions } = options;
this.customLogger.info('Greedily iterating Dataset', { pageSize }, { url: this.url });
const { pollIntervalSecs = 10, ...iterateOptions } = options;
this.customLogger.info('Greedily iterating Dataset', { pageSize: iterateOptions.pageSize }, { url: this.url });

let readItemsCount = 0;
let currentOffset = iterateOptions.offset ?? 0;

let dataset: Dataset | undefined;
let run: ActorRun | undefined;
const runId = (await this.get())?.actRunId;

// TODO: breaking change - remove itemsThreshold and just listItems at every iteration
while (true) {
dataset = await this.get();
if (!dataset || !dataset.actRunId) {
this.customLogger.error('Error getting Dataset while iterating greedily', { id: this.id });
return;
}

run = await this.apifyClient.run(dataset.actRunId).get();
if (!run) {
this.customLogger.error('Error getting Run while iterating Dataset greedily', { id: this.id });
return;
}
let runStatus = runId ? (await this.apifyClient.run(runId).get())?.status : undefined;

if (run.status !== 'READY' && run.status !== 'RUNNING') {
break;
}

if (dataset.itemCount >= readItemsCount + itemsThreshold) {
const itemList = await this.superClient.listItems({ ...listItemOptions, offset: readItemsCount, limit: pageSize });
readItemsCount += itemList.count;
for (const item of itemList.items) {
if (runId) {
while (runStatus && ['READY', 'RUNNING'].includes(runStatus)) {
const datasetIterator = this.iterate({ ...iterateOptions, offset: currentOffset });
for await (const item of datasetIterator) {
currentOffset++;
Copy link
Contributor

@halvko halvko Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this, and postponed adding my review till I had let it settle a bit. I ended up having to do pretty much the same when using the library to do "reasonable segmentation". I think this shows that our abstraction is leaky - whether there will actually be an await before we get to the next element. I think it would be better if we just returned AsyncGenerator<T[], void, void>, only yielding if there actually were any elements. I think iterate should have the same signature, even though it may often be used without a page size such it only actually yields once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would simplify both this code, and most (if not all) usages

yield item;
}
}

await new Promise((resolve) => setTimeout(resolve, pollIntervalSecs * 1000));
await new Promise((resolve) => setTimeout(resolve, pollIntervalSecs * 1000));

runStatus = (await this.apifyClient.run(runId).get())?.status;
}
} else {
this.customLogger.error(
'Greedy iterate: error getting Dataset or associated run\'s ID; trying to read the remaining items.',
);
}

dataset = await this.get();
if (!dataset || !dataset.actRunId) {
this.customLogger.error('Error getting Dataset while iterating greedily', { id: this.id });
return;
if (runId && !runStatus) {
this.customLogger.error(
'Greedy iterate: error getting associated run\'s status: trying to read the remaining items.',
);
}

while (readItemsCount < dataset.itemCount) {
const itemList = await this.superClient.listItems({ ...listItemOptions, offset: readItemsCount, limit: pageSize });
if (itemList.count === 0) { break; }
readItemsCount += itemList.count;
for (const item of itemList.items) {
yield item;
}
const datasetIterator = this.iterate({ ...iterateOptions, offset: currentOffset });
for await (const item of datasetIterator) {
yield item;
}
}
}
7 changes: 0 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,6 @@ export type IterateOptions = DatasetClientListItemOptions & {
}

export type GreedyIterateOptions = IterateOptions & {
/**
* Download new items when they are more than the specified threshold, or when the Run terminates.\
* If zero, the new items are downloaded as soon as they are detected.
*
* @default 100
*/
itemsThreshold?: number
/**
* Check the run's status regularly at the specified interval, in seconds.
*
Expand Down
55 changes: 45 additions & 10 deletions test/clients/dataset-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('dataset-client', () => {
expect(listItemsSpy).toHaveBeenCalledWith({});
});

it('iterates the items from the dataset, using pagination', async () => {
it('iterates the items from the dataset, using pagination, passing the given options', async () => {
const listItemsSpy = vi.spyOn(DatasetClient.prototype, 'listItems')
.mockImplementationOnce(async () => ({
count: 2,
Expand All @@ -102,26 +102,61 @@ describe('dataset-client', () => {
limit: 2,
desc: true,
}));
const datasetIterator = datasetClient.iterate({ pageSize: 2 });
const datasetIterator = datasetClient.iterate({ pageSize: 2, fields: ['title'] });
let index = 0;
for await (const item of datasetIterator) {
expect(item).toEqual(testItems[index]);
index++;
}
expect(index).toBe(3);
expect(listItemsSpy).toHaveBeenCalledTimes(3);
expect(listItemsSpy).toHaveBeenNthCalledWith(1, { offset: 0, limit: 2 });
expect(listItemsSpy).toHaveBeenNthCalledWith(2, { offset: 2, limit: 2 });
expect(listItemsSpy).toHaveBeenNthCalledWith(3, { offset: 4, limit: 2 });
expect(listItemsSpy).toHaveBeenNthCalledWith(1, { offset: 0, limit: 2, fields: ['title'] });
expect(listItemsSpy).toHaveBeenNthCalledWith(2, { offset: 2, limit: 2, fields: ['title'] });
expect(listItemsSpy).toHaveBeenNthCalledWith(3, { offset: 4, limit: 2, fields: ['title'] });
});
});

describe('greedyIterate', () => {
it('iterates the items from the dataset as soon as one batch is available, using pagination', () => {
// TODO: test
it('iterates the items using pagination and starting from the desired offset', async () => {
const listItemsSpy = vi.spyOn(DatasetClient.prototype, 'listItems')
.mockImplementationOnce(async () => ({
count: 2,
items: testItems.slice(0, 2),
total: 8,
offset: 5,
limit: 2,
desc: true,
}))
.mockImplementationOnce(async () => ({
count: 1,
items: testItems.slice(2, 3),
total: 8,
offset: 7,
limit: 2,
desc: true,
}))
.mockImplementationOnce(async () => ({
count: 0,
items: [],
total: 8,
offset: 9,
limit: 2,
desc: true,
}));
const datasetIterator = datasetClient.iterate({ pageSize: 2, offset: 5 });
let index = 0;
for await (const item of datasetIterator) {
expect(item).toEqual(testItems[index]);
index++;
}
expect(index).toBe(3);
expect(listItemsSpy).toHaveBeenCalledTimes(3);
expect(listItemsSpy).toHaveBeenNthCalledWith(1, { offset: 5, limit: 2 });
expect(listItemsSpy).toHaveBeenNthCalledWith(2, { offset: 7, limit: 2 });
expect(listItemsSpy).toHaveBeenNthCalledWith(3, { offset: 9, limit: 2 });
});
});

it('iterates the items from the dataset as soon as new items are available, setting pageSize to 0', () => {
describe('greedyIterate', () => {
it('iterates the items from the dataset until the run has finished', () => {
// TODO: test
});
});
Expand Down