Skip to content

Commit 1cb6e81

Browse files
authored
Merge pull request #5 from driftphp/feature/added-wait-until-stream-listener
Added function for waiting for stream listeners
2 parents c5a8e71 + 5a64fd7 commit 1cb6e81

File tree

4 files changed

+184
-8
lines changed

4 files changed

+184
-8
lines changed

.circleci/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
version: 2
22
jobs:
3-
test-php72:
3+
test-php73:
44
docker:
5-
- image: circleci/php:7.2-cli
5+
- image: circleci/php:7.3-cli
66

77
working_directory: ~/project
88
steps:
@@ -48,5 +48,5 @@ workflows:
4848
version: 2
4949
test:
5050
jobs:
51-
- test-php72
51+
- test-php73
5252
- test-php74

composer.json

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,26 @@
44
"keywords": ["blocking", "await", "sleep", "Event Loop", "Promise", "ReactPHP", "async"],
55
"homepage": "https://github.com/driftphp/reactphp-functions",
66
"license": "MIT",
7-
"autoload": {
8-
"files": [ "src/functions_include.php" ]
9-
},
7+
8+
"authors": [
9+
{
10+
"name": "Marc Morera",
11+
"email": "[email protected]"
12+
}
13+
],
14+
1015
"require": {
11-
"php": ">=7.1",
16+
"php": "^7.3",
1217
"react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5",
1318
"react/promise": "^2.7 || ^1.2.1",
1419
"react/child-process": "^0.6",
15-
"react/promise-timer": "^1.5"
20+
"react/promise-timer": "^1.5",
21+
"react/stream": "^1.0"
1622
},
1723
"require-dev": {
1824
"clue/block-react": "^1.3"
25+
},
26+
"autoload": {
27+
"files": [ "src/functions_include.php" ]
1928
}
2029
}

src/functions.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33

44
namespace Drift\React;
55

6+
use Evenement\EventEmitterInterface;
67
use React\ChildProcess\Process;
78
use React\EventLoop\LoopInterface;
9+
use React\EventLoop\TimerInterface;
810
use React\Promise\Deferred;
911
use React\Promise\PromiseInterface;
1012
use React\Promise\Timer;
13+
use React\Stream\ReadableStreamInterface;
14+
use function React\Promise\reject;
1115

1216
/**
1317
* Sleep for n seconds
@@ -74,4 +78,41 @@ function mime_content_type(string $fileName, LoopInterface $loop) : PromiseInter
7478
return $deferred->promise();
7579
}
7680

81+
/**
82+
* Wait until a number of listeners are aware of stream data.
83+
*
84+
* @param EventEmitterInterface $stream
85+
* @param LoopInterface $loop
86+
* @param int $minimumListeners
87+
* @param int $timeout
88+
*
89+
* @return PromiseInterface<ReadableStreamInterface>
90+
*/
91+
function wait_for_stream_listeners(
92+
EventEmitterInterface $stream,
93+
LoopInterface $loop,
94+
int $minimumListeners = 1,
95+
float $timeout = -1
96+
) : PromiseInterface
97+
{
98+
if ($minimumListeners < 0) {
99+
return reject(new \LogicException('You cannot expect negative amount of listeners in a stream.'));
100+
}
77101

102+
$deferred = new Deferred();
103+
$timer = $loop->addPeriodicTimer(0.001, function(TimerInterface $timer) use ($deferred, $stream, $minimumListeners, $loop) {
104+
if (count($stream->listeners('data')) >= $minimumListeners) {
105+
$loop->cancelTimer($timer);
106+
$deferred->resolve($stream);
107+
}
108+
});
109+
110+
if ($timeout>0) {
111+
$loop->addTimer($timeout, function() use ($timer, $loop, $deferred, $timeout) {
112+
$loop->cancelTimer($timer);
113+
$deferred->reject(new \RuntimeException("No listeners attached after $timeout seconds"));
114+
});
115+
}
116+
117+
return $deferred->promise();
118+
}

tests/WaitForStreamListenersTest.php

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
<?php
2+
3+
4+
namespace Drift\React\Tests;
5+
6+
use PHPUnit\Framework\TestCase;
7+
use React\EventLoop\Factory;
8+
use React\EventLoop\TimerInterface;
9+
use React\Stream\ThroughStream;
10+
use function Clue\React\Block\await;
11+
use function Drift\React\wait_for_stream_listeners;
12+
13+
/**
14+
* Class WaitForStreamListenersTest
15+
*/
16+
class WaitForStreamListenersTest extends TestCase
17+
{
18+
/**
19+
* Test without listeners
20+
*/
21+
public function testNoListeners()
22+
{
23+
$this->expectNotToPerformAssertions();
24+
$loop = Factory::create();
25+
$stream = new ThroughStream();
26+
await(wait_for_stream_listeners($stream, $loop, 0), $loop);
27+
}
28+
29+
/**
30+
* Test without listeners
31+
*/
32+
public function testNegativeListeners()
33+
{
34+
$this->expectException(\LogicException::class);
35+
$loop = Factory::create();
36+
$stream = new ThroughStream();
37+
await(wait_for_stream_listeners($stream, $loop, -1), $loop);
38+
}
39+
40+
/**
41+
* Test timeout
42+
*/
43+
public function testTimeout()
44+
{
45+
$loop = Factory::create();
46+
$stream = new ThroughStream();
47+
try {
48+
await(wait_for_stream_listeners($stream, $loop, 1, 1), $loop);
49+
$this->fail('Timeout should reject');
50+
} catch (\Exception $exception) {
51+
$this->assertTrue(true);
52+
}
53+
}
54+
55+
/**
56+
* Test one listener
57+
*/
58+
public function testOneListener()
59+
{
60+
$this->expectNotToPerformAssertions();
61+
$loop = Factory::create();
62+
$stream = new ThroughStream();
63+
$stream->on('data', function(){});
64+
await(wait_for_stream_listeners($stream, $loop, 1, 1), $loop);
65+
}
66+
67+
/**
68+
* Test two listener
69+
*/
70+
public function testTwoListeners()
71+
{
72+
$loop = Factory::create();
73+
$stream = new ThroughStream();
74+
$stream->on('data', function(){});
75+
76+
$from = time();
77+
try {
78+
await(wait_for_stream_listeners($stream, $loop, 2, 1), $loop);
79+
$this->fail('Timeout should reject');
80+
} catch (\Exception $exception) {
81+
$to = time();
82+
$this->assertTrue(intval($to-$from) == 1);
83+
}
84+
85+
$stream->on('data', function(){});
86+
await(wait_for_stream_listeners($stream, $loop, 2, 1), $loop);
87+
}
88+
89+
/**
90+
* Test future listener
91+
*/
92+
public function testFutureListener()
93+
{
94+
$this->expectNotToPerformAssertions();
95+
$loop = Factory::create();
96+
$stream = new ThroughStream();
97+
98+
$loop->addPeriodicTimer(0.1, function(TimerInterface $timer) use ($stream) {
99+
$stream->on('data', function() {});
100+
});
101+
102+
await(wait_for_stream_listeners($stream, $loop, 10, 1.01), $loop);
103+
}
104+
105+
/**
106+
* Test future listener
107+
*
108+
* @group X
109+
*/
110+
public function testFutureListenerTimeout()
111+
{
112+
$loop = Factory::create();
113+
$stream = new ThroughStream();
114+
115+
$loop->addPeriodicTimer(0.1, function(TimerInterface $timer) use ($stream) {
116+
$stream->on('data', function() {});
117+
});
118+
119+
try {
120+
await(wait_for_stream_listeners($stream, $loop, 10, 9), $loop);
121+
$this->fail('Timeout should reject');
122+
} catch (\Exception $exception) {
123+
// Great
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)