Skip to content

Commit a348f4a

Browse files
committed
Changed AsyncTransformer to work with collections instead of records.
Added collection types for async collections.
1 parent bb363bb commit a348f4a

13 files changed

+248
-58
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Collection;
5+
6+
use Amp\Iterator;
7+
8+
class AsyncFilteredRecords extends AsyncRecordCollection
9+
{
10+
private $filter;
11+
12+
public function __construct(Iterator $records, AsyncRecordCollection $previousCollection, callable $filter)
13+
{
14+
parent::__construct($records, $previousCollection);
15+
16+
$this->filter = $filter;
17+
}
18+
19+
public function getFilter(): callable
20+
{
21+
return $this->filter;
22+
}
23+
}

src/Collection/AsyncPorterRecords.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Collection;
5+
6+
use Amp\Iterator;
7+
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
8+
9+
class AsyncPorterRecords extends AsyncRecordCollection
10+
{
11+
private $specification;
12+
13+
public function __construct(Iterator $records, AsyncImportSpecification $specification)
14+
{
15+
parent::__construct($records, $records);
16+
17+
$this->specification = $specification;
18+
}
19+
20+
public function getSpecification(): AsyncImportSpecification
21+
{
22+
return $this->specification;
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Collection;
5+
6+
use Amp\Iterator;
7+
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
8+
9+
class AsyncProviderRecords extends AsyncRecordCollection
10+
{
11+
private $resource;
12+
13+
public function __construct(Iterator $records, AsyncResource $resource)
14+
{
15+
parent::__construct($records);
16+
17+
$this->resource = $resource;
18+
}
19+
20+
public function getResource(): AsyncResource
21+
{
22+
return $this->resource;
23+
}
24+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Collection;
5+
6+
use Amp\Iterator;
7+
use Amp\Promise;
8+
9+
abstract class AsyncRecordCollection implements Iterator
10+
{
11+
private $records;
12+
13+
private $previousCollection;
14+
15+
public function __construct(Iterator $records, self $previousCollection = null)
16+
{
17+
$this->records = $records;
18+
$this->previousCollection = $previousCollection;
19+
}
20+
21+
public function advance(): Promise
22+
{
23+
return $this->records->advance();
24+
}
25+
26+
public function getCurrent(): array
27+
{
28+
return $this->records->getCurrent();
29+
}
30+
31+
public function getPreviousCollection(): ?AsyncRecordCollection
32+
{
33+
return $this->previousCollection;
34+
}
35+
36+
public function findFirstCollection(): ?AsyncRecordCollection
37+
{
38+
do {
39+
$previous = $nextPrevious ?? $this->getPreviousCollection();
40+
} while ($previous && $nextPrevious = $previous->getPreviousCollection());
41+
42+
return $previous ?: $this;
43+
}
44+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Collection;
5+
6+
use ScriptFUSION\Porter\Specification\AsyncImportSpecification;
7+
8+
class CountableAsyncPorterRecords extends AsyncPorterRecords implements \Countable
9+
{
10+
use CountableRecordsTrait;
11+
12+
public function __construct(AsyncRecordCollection $records, int $count, AsyncImportSpecification $specification)
13+
{
14+
parent::__construct($records, $specification);
15+
16+
$this->setCount($count);
17+
}
18+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace ScriptFUSION\Porter\Collection;
5+
6+
use Amp\Iterator;
7+
use ScriptFUSION\Porter\Provider\Resource\AsyncResource;
8+
9+
class CountableAsyncProviderRecords extends AsyncProviderRecords implements \Countable
10+
{
11+
use CountableRecordsTrait;
12+
13+
public function __construct(Iterator $records, int $count, AsyncResource $resource)
14+
{
15+
parent::__construct($records, $resource);
16+
17+
$this->setCount($count);
18+
}
19+
}

src/Collection/CountablePorterRecords.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,6 @@ class CountablePorterRecords extends PorterRecords implements \Countable
99
{
1010
use CountableRecordsTrait;
1111

12-
/**
13-
* @param RecordCollection $records
14-
* @param int $count
15-
* @param ImportSpecification $specification
16-
*/
1712
public function __construct(RecordCollection $records, int $count, ImportSpecification $specification)
1813
{
1914
parent::__construct($records, $specification);

src/Collection/RecordCollection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public function __construct(\Iterator $records, self $previousCollection = null)
1818
$this->previousCollection = $previousCollection;
1919
}
2020

21+
// TODO: Consider throwing our own exception type for clarity, instead of relying on PHP's TypeError.
2122
public function current(): array
2223
{
2324
return $this->records->current();

src/Porter.php

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
namespace ScriptFUSION\Porter;
55

66
use Amp\Iterator;
7-
use Amp\Producer;
87
use Amp\Promise;
98
use Psr\Container\ContainerInterface;
9+
use ScriptFUSION\Porter\Collection\AsyncPorterRecords;
10+
use ScriptFUSION\Porter\Collection\AsyncProviderRecords;
11+
use ScriptFUSION\Porter\Collection\AsyncRecordCollection;
12+
use ScriptFUSION\Porter\Collection\CountableAsyncPorterRecords;
1013
use ScriptFUSION\Porter\Collection\CountablePorterRecords;
1114
use ScriptFUSION\Porter\Collection\CountableProviderRecords;
1215
use ScriptFUSION\Porter\Collection\PorterRecords;
@@ -125,13 +128,19 @@ private function fetch(ImportSpecification $specification): \Iterator
125128
return $resource->fetch(ImportConnectorFactory::create($connector, $specification));
126129
}
127130

128-
public function importAsync(AsyncImportSpecification $specification): Iterator
131+
public function importAsync(AsyncImportSpecification $specification): AsyncRecordCollection
129132
{
130133
$specification = clone $specification;
131134

132135
$records = $this->fetchAsync($specification);
133136

134-
return $this->transformAsync($records, $specification->getTransformers(), $specification->getContext());
137+
if (!$records instanceof AsyncProviderRecords) {
138+
$records = new AsyncProviderRecords($records, $specification->getAsyncResource());
139+
}
140+
141+
$records = $this->transformAsync($records, $specification->getTransformers(), $specification->getContext());
142+
143+
return $this->createAsyncPorterRecords($records, $specification);
135144
}
136145

137146
public function importOneAsync(AsyncImportSpecification $specification): Promise
@@ -201,38 +210,25 @@ private function transformRecords(RecordCollection $records, array $transformers
201210
return $records;
202211
}
203212

204-
private function transformAsync(Iterator $records, array $transformers, $context): Producer
205-
{
206-
return new Producer(function (\Closure $emit) use ($records, $transformers, $context) {
207-
while (yield $records->advance()) {
208-
$record = $records->getCurrent();
209-
210-
foreach ($transformers as $transformer) {
211-
if (!$transformer instanceof AsyncTransformer) {
212-
// TODO: Proper exception or separate async stack.
213-
throw new \RuntimeException('Cannot use sync transformer.');
214-
}
215-
if ($transformer instanceof PorterAware) {
216-
$transformer->setPorter($this);
217-
}
218-
219-
$record = yield $transformer->transformAsync($record, $context);
220-
221-
if ($record === null) {
222-
// Do not process more transformers.
223-
break;
224-
}
225-
}
226-
227-
if ($record !== null) {
228-
if (!\is_array($record)) {
229-
throw new \RuntimeException('Unexpected type: record must be array or null.');
230-
}
231-
232-
yield $emit($record);
233-
}
213+
private function transformAsync(
214+
AsyncRecordCollection $records,
215+
array $transformers,
216+
$context
217+
): AsyncRecordCollection {
218+
foreach ($transformers as $transformer) {
219+
if (!$transformer instanceof AsyncTransformer) {
220+
// TODO: Proper exception or separate async stack.
221+
throw new \RuntimeException('Cannot use sync transformer.');
234222
}
235-
});
223+
224+
if ($transformer instanceof PorterAware) {
225+
$transformer->setPorter($this);
226+
}
227+
228+
$records = $transformer->transformAsync($records, $context);
229+
}
230+
231+
return $records;
236232
}
237233

238234
private function createProviderRecords(\Iterator $records, ProviderResource $resource): ProviderRecords
@@ -253,6 +249,17 @@ private function createPorterRecords(RecordCollection $records, ImportSpecificat
253249
return new PorterRecords($records, $specification);
254250
}
255251

252+
private function createAsyncPorterRecords(
253+
AsyncRecordCollection $records,
254+
AsyncImportSpecification $specification
255+
): AsyncPorterRecords {
256+
if ($records instanceof \Countable) {
257+
return new CountableAsyncPorterRecords($records, \count($records), $specification);
258+
}
259+
260+
return new AsyncPorterRecords($records, $specification);
261+
}
262+
256263
/**
257264
* Gets the provider matching the specified name.
258265
*
@@ -269,7 +276,7 @@ private function getProvider(string $name)
269276
}
270277

271278
try {
272-
return $this->getOrCreateProviderFactory()->createProvider("$name");
279+
return $this->getOrCreateProviderFactory()->createProvider($name);
273280
} catch (ObjectNotCreatedException $exception) {
274281
throw new ProviderNotFoundException("No such provider registered: \"$name\".", $exception);
275282
}

src/Transform/AsyncTransformer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33

44
namespace ScriptFUSION\Porter\Transform;
55

6-
use Amp\Promise;
6+
use ScriptFUSION\Porter\Collection\AsyncRecordCollection;
77

88
interface AsyncTransformer
99
{
10-
public function transformAsync(array $record, $context): Promise;
10+
public function transformAsync(AsyncRecordCollection $records, $context): AsyncRecordCollection;
1111
}

0 commit comments

Comments
 (0)