Skip to content

Commit 9660313

Browse files
committed
Take advantage of iterators instead of converting to array first
1 parent a15a9e7 commit 9660313

File tree

4 files changed

+141
-26
lines changed

4 files changed

+141
-26
lines changed

src/functions.php

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -297,19 +297,10 @@ function parallel(iterable $tasks): PromiseInterface
297297
$pending = [];
298298
});
299299
$results = [];
300-
$errored = false;
300+
$continue = true;
301301

302-
if (!\is_array($tasks)) {
303-
$tasks = \iterator_to_array($tasks);
304-
}
305-
306-
$numTasks = count($tasks);
307-
if (0 === $numTasks) {
308-
$deferred->resolve($results);
309-
}
310-
311-
$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
312-
$errored = true;
302+
$taskErrback = function ($error) use (&$pending, $deferred, &$continue) {
303+
$continue = false;
313304
$deferred->reject($error);
314305

315306
foreach ($pending as $promise) {
@@ -321,25 +312,31 @@ function parallel(iterable $tasks): PromiseInterface
321312
};
322313

323314
foreach ($tasks as $i => $task) {
324-
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
315+
$taskCallback = function ($result) use (&$results, &$pending, &$continue, $i, $deferred) {
325316
$results[$i] = $result;
317+
unset($pending[$i]);
326318

327-
if (count($results) === $numTasks) {
319+
if (!$pending && !$continue) {
328320
$deferred->resolve($results);
329321
}
330322
};
331323

332-
$promise = call_user_func($task);
324+
$promise = \call_user_func($task);
333325
assert($promise instanceof PromiseInterface);
334326
$pending[$i] = $promise;
335327

336328
$promise->then($taskCallback, $taskErrback);
337329

338-
if ($errored) {
330+
if (!$continue) {
339331
break;
340332
}
341333
}
342334

335+
$continue = false;
336+
if (!$pending) {
337+
$deferred->resolve($results);
338+
}
339+
343340
return $deferred->promise();
344341
}
345342

@@ -358,8 +355,9 @@ function series(iterable $tasks): PromiseInterface
358355
});
359356
$results = [];
360357

361-
if (!\is_array($tasks)) {
362-
$tasks = \iterator_to_array($tasks);
358+
if ($tasks instanceof \IteratorAggregate) {
359+
$tasks = $tasks->getIterator();
360+
assert($tasks instanceof \Iterator);
363361
}
364362

365363
/** @var callable():void $next */
@@ -369,13 +367,19 @@ function series(iterable $tasks): PromiseInterface
369367
};
370368

371369
$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
372-
if (0 === count($tasks)) {
370+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
373371
$deferred->resolve($results);
374372
return;
375373
}
376374

377-
$task = array_shift($tasks);
378-
$promise = call_user_func($task);
375+
if ($tasks instanceof \Iterator) {
376+
$task = $tasks->current();
377+
$tasks->next();
378+
} else {
379+
$task = \array_shift($tasks);
380+
}
381+
382+
$promise = \call_user_func($task);
379383
assert($promise instanceof PromiseInterface);
380384
$pending = $promise;
381385

@@ -401,19 +405,26 @@ function waterfall(iterable $tasks): PromiseInterface
401405
$pending = null;
402406
});
403407

404-
if (!\is_array($tasks)) {
405-
$tasks = \iterator_to_array($tasks);
408+
if ($tasks instanceof \IteratorAggregate) {
409+
$tasks = $tasks->getIterator();
410+
assert($tasks instanceof \Iterator);
406411
}
407412

408413
/** @var callable $next */
409414
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
410-
if (0 === count($tasks)) {
415+
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
411416
$deferred->resolve($value);
412417
return;
413418
}
414419

415-
$task = array_shift($tasks);
416-
$promise = call_user_func_array($task, func_get_args());
420+
if ($tasks instanceof \Iterator) {
421+
$task = $tasks->current();
422+
$tasks->next();
423+
} else {
424+
$task = \array_shift($tasks);
425+
}
426+
427+
$promise = \call_user_func_array($task, func_get_args());
417428
assert($promise instanceof PromiseInterface);
418429
$pending = $promise;
419430

tests/ParallelTest.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class ParallelTest extends TestCase
1011
{
@@ -126,6 +127,25 @@ function () use (&$called) {
126127
$this->assertSame(2, $called);
127128
}
128129

130+
public function testParallelWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
131+
{
132+
$called = 0;
133+
134+
$tasks = (function () use (&$called) {
135+
while (true) {
136+
yield function () use (&$called) {
137+
return reject(new \RuntimeException('Rejected ' . ++$called));
138+
};
139+
}
140+
})();
141+
142+
$promise = React\Async\parallel($tasks);
143+
144+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
145+
146+
$this->assertSame(1, $called);
147+
}
148+
129149
public function testParallelWithErrorWillCancelPendingPromises()
130150
{
131151
$cancelled = 0;

tests/SeriesTest.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class SeriesTest extends TestCase
1011
{
@@ -125,6 +126,47 @@ function () use (&$called) {
125126
$this->assertSame(1, $called);
126127
}
127128

129+
public function testSeriesWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
130+
{
131+
$called = 0;
132+
133+
$tasks = (function () use (&$called) {
134+
while (true) {
135+
yield function () use (&$called) {
136+
return reject(new \RuntimeException('Rejected ' . ++$called));
137+
};
138+
}
139+
})();
140+
141+
$promise = React\Async\series($tasks);
142+
143+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
144+
145+
$this->assertSame(1, $called);
146+
}
147+
148+
public function testSeriesWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
149+
{
150+
$tasks = new class() implements \IteratorAggregate {
151+
public $called = 0;
152+
153+
public function getIterator(): \Iterator
154+
{
155+
while (true) {
156+
yield function () {
157+
return reject(new \RuntimeException('Rejected ' . ++$this->called));
158+
};
159+
}
160+
}
161+
};
162+
163+
$promise = React\Async\series($tasks);
164+
165+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
166+
167+
$this->assertSame(1, $tasks->called);
168+
}
169+
128170
public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
129171
{
130172
$cancelled = 0;

tests/WaterfallTest.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React;
66
use React\EventLoop\Loop;
77
use React\Promise\Promise;
8+
use function React\Promise\reject;
89

910
class WaterfallTest extends TestCase
1011
{
@@ -139,6 +140,47 @@ function () use (&$called) {
139140
$this->assertSame(1, $called);
140141
}
141142

143+
public function testWaterfallWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
144+
{
145+
$called = 0;
146+
147+
$tasks = (function () use (&$called) {
148+
while (true) {
149+
yield function () use (&$called) {
150+
return reject(new \RuntimeException('Rejected ' . ++$called));
151+
};
152+
}
153+
})();
154+
155+
$promise = React\Async\waterfall($tasks);
156+
157+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
158+
159+
$this->assertSame(1, $called);
160+
}
161+
162+
public function testWaterfallWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
163+
{
164+
$tasks = new class() implements \IteratorAggregate {
165+
public $called = 0;
166+
167+
public function getIterator(): \Iterator
168+
{
169+
while (true) {
170+
yield function () {
171+
return reject(new \RuntimeException('Rejected ' . ++$this->called));
172+
};
173+
}
174+
}
175+
};
176+
177+
$promise = React\Async\waterfall($tasks);
178+
179+
$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));
180+
181+
$this->assertSame(1, $tasks->called);
182+
}
183+
142184
public function testWaterfallWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
143185
{
144186
$cancelled = 0;

0 commit comments

Comments
 (0)