diff --git a/README.md b/README.md index a4f74ef..add65cf 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ -Buffered processing of async iterables / generators in parallel to achieve comparable performance to `Promise.all()` +Buffered parallel processing of async iterables / generators. [![npm version](https://img.shields.io/npm/v/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable) [![npm downloads](https://img.shields.io/npm/dm/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable) @@ -16,7 +16,6 @@ Buffered processing of async iterables / generators in parallel to achieve compa [![js-semistandard-style](https://img.shields.io/badge/code%20style-semistandard-brightgreen.svg)](https://github.com/voxpelli/eslint-config) [![Follow @voxpelli@mastodon.social](https://img.shields.io/mastodon/follow/109247025527949675?domain=https%3A%2F%2Fmastodon.social&style=social)](https://mastodon.social/@voxpelli) -**WORK IN PROGRESS – early prerelease** ## Usage @@ -80,6 +79,7 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie #### Options * `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. +* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered ## Similar modules diff --git a/index.js b/index.js index 37658a4..a6b1b83 100644 --- a/index.js +++ b/index.js @@ -5,25 +5,25 @@ // TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose // TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values // TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js -// TODO: Have option to persist order? To not use Promise.race()? // TODO: Make a proper merge for async iterables by accepting multiple input iterables, see: https://twitter.com/matteocollina/status/1392056092482576385 import { findLeastTargeted } from './lib/find-least-targeted.js'; -import { makeIterableAsync } from './lib/misc.js'; -import { isAsyncIterable, isIterable, isPartOfSet } from './lib/type-checks.js'; +import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js'; +import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js'; /** * @template T * @template R * @param {AsyncIterable | Iterable | T[]} input * @param {(item: T) => (Promise|AsyncIterable)} callback - * @param {{ bufferSize?: number|undefined }} [options] + * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options] * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]> }} */ export function bufferedAsyncMap (input, callback, options) { /** @typedef {Promise> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */ const { bufferSize = 6, + ordered = false, } = options || {}; /** @type {AsyncIterable} */ @@ -39,11 +39,11 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {AsyncIterator} */ const asyncIterator = asyncIterable[Symbol.asyncIterator](); - /** @type {Set>} */ - const subIterators = new Set(); + /** @type {AsyncIterator[]} */ + const subIterators = []; - /** @type {Set} */ - const bufferedPromises = new Set(); + /** @type {BufferPromise[]} */ + const bufferedPromises = []; /** @type {WeakMap|AsyncIterator>} */ const promisesToSourceIteratorMap = new WeakMap(); @@ -76,8 +76,8 @@ export function bufferedAsyncMap (input, callback, options) { ); // TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10 - bufferedPromises.clear(); - subIterators.clear(); + bufferedPromises.splice(0, bufferedPromises.length); + subIterators.splice(0, subIterators.length); if (throwAnyError && hasError) { throw hasError; @@ -90,14 +90,20 @@ export function bufferedAsyncMap (input, callback, options) { const fillQueue = () => { if (hasError || isDone) return; - // Check which iterator that has the least amount of queued promises right now - const iterator = findLeastTargeted( - mainReturnedDone ? subIterators : [...subIterators, asyncIterator], - bufferedPromises, - promisesToSourceIteratorMap - ); + /** @type {AsyncIterator|undefined} */ + let currentSubIterator; - const currentSubIterator = isPartOfSet(iterator, subIterators) ? iterator : undefined; + if (ordered) { + currentSubIterator = subIterators[0]; + } else { + const iterator = findLeastTargeted( + mainReturnedDone ? subIterators : [...subIterators, asyncIterator], + bufferedPromises, + promisesToSourceIteratorMap + ); + + currentSubIterator = isPartOfArray(iterator, subIterators) ? iterator : undefined; + } /** @type {BufferPromise} */ const bufferPromise = currentSubIterator @@ -110,7 +116,7 @@ export function bufferedAsyncMap (input, callback, options) { throw new TypeError('Expected an object value'); } if ('err' in result || result.done) { - subIterators.delete(currentSubIterator); + arrayDeleteInPlace(subIterators, currentSubIterator); } /** @type {Awaited} */ @@ -174,29 +180,43 @@ export function bufferedAsyncMap (input, callback, options) { }); promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator); - bufferedPromises.add(bufferPromise); - if (bufferedPromises.size < bufferSize) { + if (ordered && currentSubIterator) { + let i = 0; + + while (promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) { + i += 1; + } + + bufferedPromises.splice(i, 0, bufferPromise); + } else { + bufferedPromises.push(bufferPromise); + } + + if (bufferedPromises.length < bufferSize) { fillQueue(); } }; /** @type {AsyncIterator["next"]} */ const nextValue = async () => { - if (bufferedPromises.size === 0) return markAsEnded(true); + const nextBufferedPromise = bufferedPromises[0]; + + if (!nextBufferedPromise) return markAsEnded(true); if (isDone) return { done: true, value: undefined }; + /** @type {Awaited} */ + const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises)); + arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise); + // Wait for some of the current promises to be finished const { - bufferPromise, done, err, fromSubIterator, isSubIterator, value, - } = await Promise.race(bufferedPromises); - - bufferedPromises.delete(bufferPromise); + } = resolvedPromise; // We are mandated by the spec to always do this return if the iterator is done if (isDone) { @@ -206,16 +226,16 @@ export function bufferedAsyncMap (input, callback, options) { hasError = err instanceof Error ? err : new Error('Unknown error'); } - if (fromSubIterator || subIterators.size !== 0) { + if (fromSubIterator || subIterators.length > 0) { fillQueue(); } - return bufferedPromises.size === 0 + return bufferedPromises.length === 0 ? markAsEnded(true) : nextValue(); } else if (isSubIterator && isAsyncIterable(value)) { // TODO: Handle possible error here? Or too obscure? - subIterators.add(value[Symbol.asyncIterator]()); + subIterators.unshift(value[Symbol.asyncIterator]()); fillQueue(); return nextValue(); } else { diff --git a/lib/find-least-targeted.js b/lib/find-least-targeted.js index 46af994..03eca81 100644 --- a/lib/find-least-targeted.js +++ b/lib/find-least-targeted.js @@ -1,7 +1,7 @@ /** * @template Target * @template {object} Item - * @param {Set} items + * @param {Iterable | Item[]} items * @param {WeakMap} itemTargets * @returns {Map} */ @@ -25,7 +25,7 @@ function countTargets (items, itemTargets) { * @template Target * @template {object} Item * @param {Iterable | Target[]} targets - * @param {Set} items + * @param {Iterable | Item[]} items * @param {WeakMap} itemTargets * @returns {Target|undefined} */ diff --git a/lib/misc.js b/lib/misc.js index d2af303..0f642ca 100644 --- a/lib/misc.js +++ b/lib/misc.js @@ -8,3 +8,17 @@ export async function * makeIterableAsync (input) { yield value; } } + +/** + * Similar to the .delete() on a set + * + * @template T + * @param {T[]} list + * @param {T} value + */ +export function arrayDeleteInPlace (list, value) { + const index = list.indexOf(value); + if (index !== -1) { + list.splice(index, 1); + } +} diff --git a/lib/type-checks.js b/lib/type-checks.js index 24a2bab..70909c3 100644 --- a/lib/type-checks.js +++ b/lib/type-checks.js @@ -11,9 +11,9 @@ export const isIterable = (value) => Boolean(value && typeof value === 'object' export const isAsyncIterable = (value) => Boolean(value && typeof value === 'object' && Symbol.asyncIterator in value); /** - * @template SetValue + * @template Values * @param {unknown} value - * @param {Set} set - * @returns {value is SetValue} + * @param {Values[]} list + * @returns {value is Values} */ -export const isPartOfSet = (value, set) => set.has(/** @type {SetValue} */ (value)); +export const isPartOfArray = (value, list) => list.includes(/** @type {Values} */ (value)); diff --git a/test/utils.js b/test/utils.js index cff5937..bc4b851 100644 --- a/test/utils.js +++ b/test/utils.js @@ -42,3 +42,17 @@ export async function * yieldValuesOverTimeWithPrefix (count, wait, prefix) { await promisableTimeout(waitCallback(i)); } } + +/** + * @param {number} count + * @param {number|((i: number) => number)} wait + * @param {(i: number) => AsyncGenerator} nested + * @returns {AsyncIterable} + */ +export async function * nestedYieldValuesOverTime (count, wait, nested) { + const waitCallback = typeof wait === 'number' ? () => wait : wait; + for (let i = 0; i < count; i++) { + yield * nested(i); + await promisableTimeout(waitCallback(i)); + } +} diff --git a/test/values.spec.js b/test/values.spec.js index 4c49ccc..91fcec1 100644 --- a/test/values.spec.js +++ b/test/values.spec.js @@ -10,6 +10,7 @@ import { import { isAsyncGenerator, + nestedYieldValuesOverTime, promisableTimeout, yieldValuesOverTime, yieldValuesOverTimeWithPrefix, @@ -44,6 +45,130 @@ function stubAsyncIterator () { }; } +/** + * @param {number} item + * @returns {AsyncGenerator} + */ +async function * nestedBufferedAsyncMapCallback (item) { + yield * nestedYieldValuesOverTime(3, (i) => i % 2 === 1 ? 2000 : 100, async function * (i) { + yield * yieldValuesOverTimeWithPrefix(3, (i) => i % 2 === 1 ? 2000 : 100, 'prefix-' + item + '-' + i + '-'); + }); + yield * nestedYieldValuesOverTime(3, (i) => i % 2 === 1 ? 2000 : 100, async function * (i) { + yield * yieldValuesOverTimeWithPrefix(3, (i) => i % 2 === 1 ? 2000 : 100, '2-prefix-' + item + '-' + i + '-'); + }); +} + +const nestedBufferedAsyncMapOrderedResult = () => /** @type {const} */ ([ + 'prefix-0-0-0', + 'prefix-0-0-1', + 'prefix-0-0-2', + 'prefix-0-1-0', + 'prefix-0-1-1', + 'prefix-0-1-2', + 'prefix-0-2-0', + 'prefix-0-2-1', + 'prefix-0-2-2', + '2-prefix-0-0-0', + '2-prefix-0-0-1', + '2-prefix-0-0-2', + '2-prefix-0-1-0', + '2-prefix-0-1-1', + '2-prefix-0-1-2', + '2-prefix-0-2-0', + '2-prefix-0-2-1', + '2-prefix-0-2-2', + 'prefix-1-0-0', + 'prefix-1-0-1', + 'prefix-1-0-2', + 'prefix-1-1-0', + 'prefix-1-1-1', + 'prefix-1-1-2', + 'prefix-1-2-0', + 'prefix-1-2-1', + 'prefix-1-2-2', + '2-prefix-1-0-0', + '2-prefix-1-0-1', + '2-prefix-1-0-2', + '2-prefix-1-1-0', + '2-prefix-1-1-1', + '2-prefix-1-1-2', + '2-prefix-1-2-0', + '2-prefix-1-2-1', + '2-prefix-1-2-2', + 'prefix-2-0-0', + 'prefix-2-0-1', + 'prefix-2-0-2', + 'prefix-2-1-0', + 'prefix-2-1-1', + 'prefix-2-1-2', + 'prefix-2-2-0', + 'prefix-2-2-1', + 'prefix-2-2-2', + '2-prefix-2-0-0', + '2-prefix-2-0-1', + '2-prefix-2-0-2', + '2-prefix-2-1-0', + '2-prefix-2-1-1', + '2-prefix-2-1-2', + '2-prefix-2-2-0', + '2-prefix-2-2-1', + '2-prefix-2-2-2', + 'prefix-3-0-0', + 'prefix-3-0-1', + 'prefix-3-0-2', + 'prefix-3-1-0', + 'prefix-3-1-1', + 'prefix-3-1-2', + 'prefix-3-2-0', + 'prefix-3-2-1', + 'prefix-3-2-2', + '2-prefix-3-0-0', + '2-prefix-3-0-1', + '2-prefix-3-0-2', + '2-prefix-3-1-0', + '2-prefix-3-1-1', + '2-prefix-3-1-2', + '2-prefix-3-2-0', + '2-prefix-3-2-1', + '2-prefix-3-2-2', + 'prefix-4-0-0', + 'prefix-4-0-1', + 'prefix-4-0-2', + 'prefix-4-1-0', + 'prefix-4-1-1', + 'prefix-4-1-2', + 'prefix-4-2-0', + 'prefix-4-2-1', + 'prefix-4-2-2', + '2-prefix-4-0-0', + '2-prefix-4-0-1', + '2-prefix-4-0-2', + '2-prefix-4-1-0', + '2-prefix-4-1-1', + '2-prefix-4-1-2', + '2-prefix-4-2-0', + '2-prefix-4-2-1', + '2-prefix-4-2-2', + 'prefix-5-0-0', + 'prefix-5-0-1', + 'prefix-5-0-2', + 'prefix-5-1-0', + 'prefix-5-1-1', + 'prefix-5-1-2', + 'prefix-5-2-0', + 'prefix-5-2-1', + 'prefix-5-2-2', + '2-prefix-5-0-0', + '2-prefix-5-0-1', + '2-prefix-5-0-2', + '2-prefix-5-1-0', + '2-prefix-5-1-1', + '2-prefix-5-1-2', + '2-prefix-5-2-0', + '2-prefix-5-2-1', + '2-prefix-5-2-2', +]); + describe('bufferedAsyncMap() values', () => { const count = 6; @@ -69,7 +194,7 @@ describe('bufferedAsyncMap() values', () => { }); describe('main', () => { - it('should return all values from the original AsyncIterable when looped over ', async () => { + it('should return all values from the original AsyncIterable when looped over', async () => { // Create the promise first, then have it be fully executed using clock.runAllAsync() const promisedResult = (async () => { /** @type {number[]} */ @@ -623,4 +748,146 @@ describe('bufferedAsyncMap() values', () => { await clock.runAllAsync(); nextSpy.should.have.callCount(11); }); + + describe('order', () => { + it('should be out of order as standard', async () => { + const asyncIterable = yieldValuesOverTime(10, i => i % 3 === 0 ? 2000 : 1); + + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {number[]} */ + const rawResult = []; + + for await (const value of bufferedAsyncMap(asyncIterable, async (item) => { + await promisableTimeout(item % 2 === 0 ? 2000 : 1); + return item; + }, { bufferSize: 3 })) { + rawResult.push(value); + } + + /** @type {[number[], number]} */ + const result = [rawResult, Date.now()]; + + return result; + })(); + + await clock.runAllAsync(); + + const [result, duration] = await promisedResult; + + result.should.deep.equal([0, 1, 3, 2, 5, 4, 6, 7, 9, 8]); + duration.should.equal(8006); + }); + + it('should ensure in order when requested', async () => { + const asyncIterable = yieldValuesOverTime(10, i => i % 3 === 0 ? 2000 : 1); + + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {number[]} */ + const rawResult = []; + + for await (const value of bufferedAsyncMap(asyncIterable, async (item) => { + await promisableTimeout(item % 2 === 0 ? 2000 : 1); + return item; + }, { bufferSize: 3, ordered: true })) { + rawResult.push(value); + } + + /** @type {[number[], number]} */ + const result = [rawResult, Date.now()]; + + return result; + })(); + + await clock.runAllAsync(); + + const [result, duration] = await promisedResult; + + result.should.deep.equal([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + duration.should.equal(10004); + }); + + it('should handle nested async generator values out of order when looped over', async () => { + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {string[]} */ + const rawResult = []; + + for await (const value of bufferedAsyncMap(baseAsyncIterable, nestedBufferedAsyncMapCallback)) { + rawResult.push(value); + } + + /** @type {[string[], number]} */ + const result = [rawResult, Date.now()]; + + return result; + })(); + + await clock.runAllAsync(); + + const [result, duration] = await promisedResult; + + result.should.be.an('array') + .that.has.members(nestedBufferedAsyncMapOrderedResult()) + .but.does.not.deep.equal(nestedBufferedAsyncMapOrderedResult()); + + duration.should.equal(21900); + }); + + it('should, when requested, handle nested async generator values in order when looped over', async () => { + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {string[]} */ + const rawResult = []; + + for await (const value of bufferedAsyncMap(baseAsyncIterable, nestedBufferedAsyncMapCallback, { ordered: true })) { + rawResult.push(value); + } + + /** @type {[string[], number]} */ + const result = [rawResult, Date.now()]; + + return result; + })(); + + await clock.runAllAsync(); + + const [result, duration] = await promisedResult; + + result.should.be.an('array').that.deep.equals(nestedBufferedAsyncMapOrderedResult()); + + duration.should.equal(105600); + }); + + it('should be faster than ordered non-buffered iteration', async () => { + // eslint-disable-next-line unicorn/consistent-function-scoping + async function * nestedBaseSyncIterable () { + yield * nestedYieldValuesOverTime(count, (i) => i % 2 === 1 ? 2000 : 100, nestedBufferedAsyncMapCallback); + } + + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {string[]} */ + const rawResult = []; + + for await (const value of nestedBaseSyncIterable()) { + rawResult.push(value); + } + + /** @type {[string[], number]} */ + const result = [rawResult, Date.now()]; + + return result; + })(); + + await clock.runAllAsync(); + + const [result, duration] = await promisedResult; + + result.should.be.an('array').that.deep.equals(nestedBufferedAsyncMapOrderedResult()); + + duration.should.equal(111900); + }); + }); });