Skip to content

Commit bcbab44

Browse files
authored
Merge pull request #18 from clue-labs/cleanup-refs
Clean up unneeded references for unwrapped streams when closing
2 parents 2d47cc2 + bb8527f commit bcbab44

File tree

4 files changed

+117
-10
lines changed

4 files changed

+117
-10
lines changed

src/UnwrapReadableStream.php

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public function __construct(PromiseInterface $promise)
3131

3232
$this->promise = $promise->then(
3333
function ($stream) {
34-
if (!($stream instanceof ReadableStreamInterface)) {
34+
if (!$stream instanceof ReadableStreamInterface) {
3535
throw new InvalidArgumentException('Not a readable stream');
3636
}
3737
return $stream;
@@ -80,6 +80,9 @@ function ($e) use ($out, &$closed) {
8080
$out->emit('error', array($e, $out));
8181
$out->close();
8282
}
83+
84+
// resume() and pause() may attach to this promise, so ensure we actually reject here
85+
throw $e;
8386
}
8487
);
8588
}
@@ -91,16 +94,20 @@ public function isReadable()
9194

9295
public function pause()
9396
{
94-
$this->promise->then(function (ReadableStreamInterface $stream) {
95-
$stream->pause();
96-
});
97+
if ($this->promise !== null) {
98+
$this->promise->then(function (ReadableStreamInterface $stream) {
99+
$stream->pause();
100+
});
101+
}
97102
}
98103

99104
public function resume()
100105
{
101-
$this->promise->then(function (ReadableStreamInterface $stream) {
102-
$stream->resume();
103-
});
106+
if ($this->promise !== null) {
107+
$this->promise->then(function (ReadableStreamInterface $stream) {
108+
$stream->resume();
109+
});
110+
}
104111
}
105112

106113
public function pipe(WritableStreamInterface $dest, array $options = array())
@@ -122,7 +129,9 @@ public function close()
122129
if ($this->promise instanceof CancellablePromiseInterface) {
123130
$this->promise->cancel();
124131
}
132+
$this->promise = null;
125133

126-
$this->emit('close', array($this));
134+
$this->emit('close');
135+
$this->removeAllListeners();
127136
}
128137
}

src/UnwrapWritableStream.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public function __construct(PromiseInterface $promise)
3535

3636
$this->promise = $promise->then(
3737
function ($stream) {
38-
if (!($stream instanceof WritableStreamInterface)) {
38+
if (!$stream instanceof WritableStreamInterface) {
3939
throw new InvalidArgumentException('Not a writable stream');
4040
}
4141
return $stream;
@@ -156,7 +156,9 @@ public function close()
156156
if ($this->promise instanceof CancellablePromiseInterface) {
157157
$this->promise->cancel();
158158
}
159+
$this->promise = $this->stream = null;
159160

160-
$this->emit('close', array($this));
161+
$this->emit('close');
162+
$this->removeAllListeners();
161163
}
162164
}

tests/UnwrapReadableTest.php

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,30 @@ public function testForwardsPauseToInputStream()
200200
$stream->pause();
201201
}
202202

203+
/**
204+
* @doesNotPerformAssertions
205+
*/
206+
public function testPauseAfterCloseHasNoEffect()
207+
{
208+
$promise = new \React\Promise\Promise(function () { });
209+
$stream = Stream\unwrapReadable($promise);
210+
211+
$stream->close();
212+
$stream->pause();
213+
}
214+
215+
216+
/**
217+
* @doesNotPerformAssertions
218+
*/
219+
public function testPauseAfterErrorDueToInvalidInputHasNoEffect()
220+
{
221+
$promise = \React\Promise\reject(new \RuntimeException());
222+
$stream = Stream\unwrapReadable($promise);
223+
224+
$stream->pause();
225+
}
226+
203227
public function testForwardsResumeToInputStream()
204228
{
205229
$input = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
@@ -211,6 +235,18 @@ public function testForwardsResumeToInputStream()
211235
$stream->resume();
212236
}
213237

238+
/**
239+
* @doesNotPerformAssertions
240+
*/
241+
public function testResumeAfterCloseHasNoEffect()
242+
{
243+
$promise = new \React\Promise\Promise(function () { });
244+
$stream = Stream\unwrapReadable($promise);
245+
246+
$stream->close();
247+
$stream->resume();
248+
}
249+
214250
public function testPipingStreamWillForwardDataEvents()
215251
{
216252
$input = new ThroughStream();
@@ -279,4 +315,31 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler()
279315

280316
$this->assertFalse($input->isReadable());
281317
}
318+
319+
public function testCloseShouldRemoveAllListenersAfterCloseEvent()
320+
{
321+
$promise = new \React\Promise\Promise(function () { });
322+
$stream = Stream\unwrapReadable($promise);
323+
324+
$stream->on('close', $this->expectCallableOnce());
325+
$this->assertCount(1, $stream->listeners('close'));
326+
327+
$stream->close();
328+
329+
$this->assertCount(0, $stream->listeners('close'));
330+
}
331+
332+
public function testCloseShouldRemoveReferenceToPromiseToAvoidGarbageReferences()
333+
{
334+
$promise = new \React\Promise\Promise(function () { });
335+
$stream = Stream\unwrapReadable($promise);
336+
337+
$stream->close();
338+
339+
$ref = new \ReflectionProperty($stream, 'promise');
340+
$ref->setAccessible(true);
341+
$value = $ref->getValue($stream);
342+
343+
$this->assertNull($value);
344+
}
282345
}

tests/UnwrapWritableTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,4 +352,37 @@ public function testClosingStreamWillCloseStreamFromCancellationHandler()
352352

353353
$this->assertFalse($input->isWritable());
354354
}
355+
356+
public function testCloseShouldRemoveAllListenersAfterCloseEvent()
357+
{
358+
$promise = new \React\Promise\Promise(function () { });
359+
$stream = Stream\unwrapWritable($promise);
360+
361+
$stream->on('close', $this->expectCallableOnce());
362+
$this->assertCount(1, $stream->listeners('close'));
363+
364+
$stream->close();
365+
366+
$this->assertCount(0, $stream->listeners('close'));
367+
}
368+
369+
public function testCloseShouldRemoveReferenceToPromiseAndStreamToAvoidGarbageReferences()
370+
{
371+
$promise = \React\Promise\resolve(new ThroughStream());
372+
$stream = Stream\unwrapWritable($promise);
373+
374+
$stream->close();
375+
376+
$ref = new \ReflectionProperty($stream, 'promise');
377+
$ref->setAccessible(true);
378+
$value = $ref->getValue($stream);
379+
380+
$this->assertNull($value);
381+
382+
$ref = new \ReflectionProperty($stream, 'stream');
383+
$ref->setAccessible(true);
384+
$value = $ref->getValue($stream);
385+
386+
$this->assertNull($value);
387+
}
355388
}

0 commit comments

Comments
 (0)