Skip to content

Commit a15a9e7

Browse files
committed
Support iterable type for parallel() + series() + waterfall()
1 parent a1a0059 commit a15a9e7

File tree

5 files changed

+163
-9
lines changed

5 files changed

+163
-9
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ $promise->then(function (int $bytes) {
208208

209209
### parallel()
210210

211-
The `parallel(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
211+
The `parallel(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
212212
like this:
213213

214214
```php
@@ -250,7 +250,7 @@ React\Async\parallel([
250250

251251
### series()
252252

253-
The `series(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
253+
The `series(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
254254
like this:
255255

256256
```php
@@ -292,7 +292,7 @@ React\Async\series([
292292

293293
### waterfall()
294294

295-
The `waterfall(array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
295+
The `waterfall(iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
296296
like this:
297297

298298
```php

src/functions.php

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,10 @@ function coroutine(callable $function, ...$args): PromiseInterface
282282
}
283283

284284
/**
285-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
285+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
286286
* @return PromiseInterface<array<mixed>,Exception>
287287
*/
288-
function parallel(array $tasks): PromiseInterface
288+
function parallel(iterable $tasks): PromiseInterface
289289
{
290290
$pending = [];
291291
$deferred = new Deferred(function () use (&$pending) {
@@ -299,6 +299,10 @@ function parallel(array $tasks): PromiseInterface
299299
$results = [];
300300
$errored = false;
301301

302+
if (!\is_array($tasks)) {
303+
$tasks = \iterator_to_array($tasks);
304+
}
305+
302306
$numTasks = count($tasks);
303307
if (0 === $numTasks) {
304308
$deferred->resolve($results);
@@ -340,10 +344,10 @@ function parallel(array $tasks): PromiseInterface
340344
}
341345

342346
/**
343-
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
347+
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
344348
* @return PromiseInterface<array<mixed>,Exception>
345349
*/
346-
function series(array $tasks): PromiseInterface
350+
function series(iterable $tasks): PromiseInterface
347351
{
348352
$pending = null;
349353
$deferred = new Deferred(function () use (&$pending) {
@@ -354,6 +358,10 @@ function series(array $tasks): PromiseInterface
354358
});
355359
$results = [];
356360

361+
if (!\is_array($tasks)) {
362+
$tasks = \iterator_to_array($tasks);
363+
}
364+
357365
/** @var callable():void $next */
358366
$taskCallback = function ($result) use (&$results, &$next) {
359367
$results[] = $result;
@@ -380,10 +388,10 @@ function series(array $tasks): PromiseInterface
380388
}
381389

382390
/**
383-
* @param array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
391+
* @param iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
384392
* @return PromiseInterface<mixed,Exception>
385393
*/
386-
function waterfall(array $tasks): PromiseInterface
394+
function waterfall(iterable $tasks): PromiseInterface
387395
{
388396
$pending = null;
389397
$deferred = new Deferred(function () use (&$pending) {
@@ -393,6 +401,10 @@ function waterfall(array $tasks): PromiseInterface
393401
$pending = null;
394402
});
395403

404+
if (!\is_array($tasks)) {
405+
$tasks = \iterator_to_array($tasks);
406+
}
407+
396408
/** @var callable $next */
397409
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
398410
if (0 === count($tasks)) {

tests/ParallelTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ public function testParallelWithoutTasks()
1717
$promise->then($this->expectCallableOnceWith(array()));
1818
}
1919

20+
public function testParallelWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
21+
{
22+
$tasks = (function () {
23+
if (false) {
24+
yield;
25+
}
26+
})();
27+
28+
$promise = React\Async\parallel($tasks);
29+
30+
$promise->then($this->expectCallableOnceWith([]));
31+
}
32+
2033
public function testParallelWithTasks()
2134
{
2235
$tasks = array(
@@ -49,6 +62,38 @@ function () {
4962
$timer->assertInRange(0.1, 0.2);
5063
}
5164

65+
public function testParallelWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
66+
{
67+
$tasks = (function () {
68+
yield function () {
69+
return new Promise(function ($resolve) {
70+
Loop::addTimer(0.1, function () use ($resolve) {
71+
$resolve('foo');
72+
});
73+
});
74+
};
75+
yield function () {
76+
return new Promise(function ($resolve) {
77+
Loop::addTimer(0.11, function () use ($resolve) {
78+
$resolve('bar');
79+
});
80+
});
81+
};
82+
})();
83+
84+
$promise = React\Async\parallel($tasks);
85+
86+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
87+
88+
$timer = new Timer($this);
89+
$timer->start();
90+
91+
Loop::run();
92+
93+
$timer->stop();
94+
$timer->assertInRange(0.1, 0.2);
95+
}
96+
5297
public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
5398
{
5499
$called = 0;

tests/SeriesTest.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ public function testSeriesWithoutTasks()
1717
$promise->then($this->expectCallableOnceWith(array()));
1818
}
1919

20+
public function testSeriesWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
21+
{
22+
$tasks = (function () {
23+
if (false) {
24+
yield;
25+
}
26+
})();
27+
28+
$promise = React\Async\series($tasks);
29+
30+
$promise->then($this->expectCallableOnceWith([]));
31+
}
32+
2033
public function testSeriesWithTasks()
2134
{
2235
$tasks = array(
@@ -49,6 +62,38 @@ function () {
4962
$timer->assertInRange(0.10, 0.20);
5063
}
5164

65+
public function testSeriesWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
66+
{
67+
$tasks = (function () {
68+
yield function () {
69+
return new Promise(function ($resolve) {
70+
Loop::addTimer(0.051, function () use ($resolve) {
71+
$resolve('foo');
72+
});
73+
});
74+
};
75+
yield function () {
76+
return new Promise(function ($resolve) {
77+
Loop::addTimer(0.051, function () use ($resolve) {
78+
$resolve('bar');
79+
});
80+
});
81+
};
82+
})();
83+
84+
$promise = React\Async\series($tasks);
85+
86+
$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));
87+
88+
$timer = new Timer($this);
89+
$timer->start();
90+
91+
Loop::run();
92+
93+
$timer->stop();
94+
$timer->assertInRange(0.10, 0.20);
95+
}
96+
5297
public function testSeriesWithError()
5398
{
5499
$called = 0;

tests/WaterfallTest.php

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@ public function testWaterfallWithoutTasks()
1717
$promise->then($this->expectCallableOnceWith(null));
1818
}
1919

20+
public function testWaterfallWithoutTasksFromEmptyGeneratorResolvesWithNull()
21+
{
22+
$tasks = (function () {
23+
if (false) {
24+
yield;
25+
}
26+
})();
27+
28+
$promise = React\Async\waterfall($tasks);
29+
30+
$promise->then($this->expectCallableOnceWith(null));
31+
}
32+
2033
public function testWaterfallWithTasks()
2134
{
2235
$tasks = array(
@@ -56,6 +69,45 @@ function ($bar) {
5669
$timer->assertInRange(0.15, 0.30);
5770
}
5871

72+
public function testWaterfallWithTasksFromGeneratorResolvesWithFinalFulfillmentValue()
73+
{
74+
$tasks = (function () {
75+
yield function ($foo = 'foo') {
76+
return new Promise(function ($resolve) use ($foo) {
77+
Loop::addTimer(0.05, function () use ($resolve, $foo) {
78+
$resolve($foo);
79+
});
80+
});
81+
};
82+
yield function ($foo) {
83+
return new Promise(function ($resolve) use ($foo) {
84+
Loop::addTimer(0.05, function () use ($resolve, $foo) {
85+
$resolve($foo . 'bar');
86+
});
87+
});
88+
};
89+
yield function ($bar) {
90+
return new Promise(function ($resolve) use ($bar) {
91+
Loop::addTimer(0.05, function () use ($resolve, $bar) {
92+
$resolve($bar . 'baz');
93+
});
94+
});
95+
};
96+
})();
97+
98+
$promise = React\Async\waterfall($tasks);
99+
100+
$promise->then($this->expectCallableOnceWith('foobarbaz'));
101+
102+
$timer = new Timer($this);
103+
$timer->start();
104+
105+
Loop::run();
106+
107+
$timer->stop();
108+
$timer->assertInRange(0.15, 0.30);
109+
}
110+
59111
public function testWaterfallWithError()
60112
{
61113
$called = 0;

0 commit comments

Comments
 (0)