Skip to content

Commit ab3bfee

Browse files
committed
Prepare to hand back connections when keep-alive is possible
1 parent 1c911d2 commit ab3bfee

File tree

5 files changed

+320
-3
lines changed

5 files changed

+320
-3
lines changed

src/Io/ClientConnectionManager.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,12 @@ public function connect(UriInterface $uri)
4242

4343
return $this->connector->connect(($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port);
4444
}
45+
46+
/**
47+
* @return void
48+
*/
49+
public function handBack(ConnectionInterface $connection)
50+
{
51+
$connection->close();
52+
}
4553
}

src/Io/ClientRequestStream.php

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace React\Http\Io;
44

55
use Evenement\EventEmitter;
6+
use Psr\Http\Message\MessageInterface;
67
use Psr\Http\Message\RequestInterface;
78
use React\Http\Message\Response;
89
use React\Socket\ConnectionInterface;
@@ -171,11 +172,20 @@ public function handleData($data)
171172
$this->connection = null;
172173
$this->buffer = '';
173174

174-
// take control over connection handling and close connection once response body closes
175+
// take control over connection handling and check if we can reuse the connection once response body closes
175176
$that = $this;
177+
$request = $this->request;
178+
$connectionManager = $this->connectionManager;
179+
$successfulEndReceived = false;
176180
$input = $body = new CloseProtectionStream($connection);
177-
$input->on('close', function () use ($connection, $that) {
178-
$connection->close();
181+
$input->on('close', function () use ($connection, $that, $connectionManager, $request, $response, &$successfulEndReceived) {
182+
// only reuse connection after successful response and both request and response allow keep alive
183+
if ($successfulEndReceived && $connection->isReadable() && $that->hasMessageKeepAliveEnabled($response) && $that->hasMessageKeepAliveEnabled($request)) {
184+
$connectionManager->handBack($connection);
185+
} else {
186+
$connection->close();
187+
}
188+
179189
$that->close();
180190
});
181191

@@ -190,6 +200,9 @@ public function handleData($data)
190200
$length = (int) $response->getHeaderLine('Content-Length');
191201
}
192202
$response = $response->withBody($body = new ReadableBodyStream($body, $length));
203+
$body->on('end', function () use (&$successfulEndReceived) {
204+
$successfulEndReceived = true;
205+
});
193206

194207
// emit response with streaming response body (see `Sender`)
195208
$this->emit('response', array($response, $body));
@@ -249,4 +262,29 @@ public function close()
249262
$this->emit('close');
250263
$this->removeAllListeners();
251264
}
265+
266+
/**
267+
* @internal
268+
* @return bool
269+
* @link https://www.rfc-editor.org/rfc/rfc9112#section-9.3
270+
* @link https://www.rfc-editor.org/rfc/rfc7230#section-6.1
271+
*/
272+
public function hasMessageKeepAliveEnabled(MessageInterface $message)
273+
{
274+
$connectionOptions = \RingCentral\Psr7\normalize_header(\strtolower($message->getHeaderLine('Connection')));
275+
276+
if (\in_array('close', $connectionOptions, true)) {
277+
return false;
278+
}
279+
280+
if ($message->getProtocolVersion() === '1.1') {
281+
return true;
282+
}
283+
284+
if (\in_array('keep-alive', $connectionOptions, true)) {
285+
return true;
286+
}
287+
288+
return false;
289+
}
252290
}

tests/Client/FunctionalIntegrationTest.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use React\Http\Io\ClientConnectionManager;
88
use React\Http\Message\Request;
99
use React\Promise\Deferred;
10+
use React\Promise\Promise;
1011
use React\Promise\Stream;
1112
use React\Socket\ConnectionInterface;
1213
use React\Socket\Connector;
@@ -55,6 +56,30 @@ public function testRequestToLocalhostEmitsSingleRemoteConnection()
5556
\React\Async\await(\React\Promise\Timer\timeout($promise, self::TIMEOUT_LOCAL));
5657
}
5758

59+
public function testRequestToLocalhostWillConnectAndCloseConnectionAfterResponseUntilKeepAliveIsActuallySupported()
60+
{
61+
$socket = new SocketServer('127.0.0.1:0');
62+
$socket->on('connection', $this->expectCallableOnce());
63+
64+
$promise = new Promise(function ($resolve) use ($socket) {
65+
$socket->on('connection', function (ConnectionInterface $conn) use ($socket, $resolve) {
66+
$conn->write("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK");
67+
$conn->on('close', function () use ($resolve) {
68+
$resolve(null);
69+
});
70+
$socket->close();
71+
});
72+
});
73+
$port = parse_url($socket->getAddress(), PHP_URL_PORT);
74+
75+
$client = new Client(new ClientConnectionManager(new Connector()));
76+
$request = $client->request(new Request('GET', 'http://localhost:' . $port, array(), '', '1.1'));
77+
78+
$request->end();
79+
80+
\React\Async\await(\React\Promise\Timer\timeout($promise, self::TIMEOUT_LOCAL));
81+
}
82+
5883
public function testRequestLegacyHttpServerWithOnlyLineFeedReturnsSuccessfulResponse()
5984
{
6085
$socket = new SocketServer('127.0.0.1:0');

tests/Io/ClientConnectionManagerTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,16 @@ public function testConnectWithoutSchemeShouldRejectWithException()
8080
$this->assertInstanceOf('InvalidArgumentException', $exception);
8181
$this->assertEquals('Invalid request URL given', $exception->getMessage());
8282
}
83+
84+
public function testHandBackWillCloseGivenConnectionUntilKeepAliveIsActuallySupported()
85+
{
86+
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
87+
88+
$connectionManager = new ClientConnectionManager($connector);
89+
90+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
91+
$connection->expects($this->once())->method('close');
92+
93+
$connectionManager->handBack($connection);
94+
}
8395
}

tests/Io/ClientRequestStreamTest.php

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,240 @@ public function testStreamShouldEmitResponseWithStreamingBodyUntilEndWhenRespons
479479
call_user_func($endEvent); // $endEvent() (PHP 5.4+)
480480
}
481481

482+
public function testStreamShouldReuseConnectionForHttp11ByDefault()
483+
{
484+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
485+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n");
486+
$connection->expects($this->once())->method('isReadable')->willReturn(true);
487+
$connection->expects($this->never())->method('close');
488+
489+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
490+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
491+
$connectionManager->expects($this->once())->method('handBack')->with($connection);
492+
493+
$requestData = new Request('GET', 'http://www.example.com', array(), '', '1.1');
494+
$request = new ClientRequestStream($connectionManager, $requestData);
495+
496+
$request->on('close', $this->expectCallableOnce());
497+
498+
$request->end();
499+
500+
$request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
501+
}
502+
503+
public function testStreamShouldNotReuseConnectionWhenResponseContainsConnectionClose()
504+
{
505+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
506+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n");
507+
$connection->expects($this->once())->method('isReadable')->willReturn(true);
508+
$connection->expects($this->once())->method('close');
509+
510+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
511+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
512+
513+
$requestData = new Request('GET', 'http://www.example.com', array(), '', '1.1');
514+
$request = new ClientRequestStream($connectionManager, $requestData);
515+
516+
$request->on('close', $this->expectCallableOnce());
517+
518+
$request->end();
519+
520+
$request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
521+
}
522+
523+
public function testStreamShouldNotReuseConnectionWhenRequestContainsConnectionCloseWithAdditionalOptions()
524+
{
525+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
526+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: FOO, CLOSE, BAR\r\n\r\n");
527+
$connection->expects($this->once())->method('isReadable')->willReturn(true);
528+
$connection->expects($this->once())->method('close');
529+
530+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
531+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
532+
533+
$requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'FOO, CLOSE, BAR'), '', '1.1');
534+
$request = new ClientRequestStream($connectionManager, $requestData);
535+
536+
$request->on('close', $this->expectCallableOnce());
537+
538+
$request->end();
539+
540+
$request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: Foo, Close, Bar\r\n\r\n");
541+
}
542+
543+
public function testStreamShouldNotReuseConnectionForHttp10ByDefault()
544+
{
545+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
546+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.0\r\nHost: www.example.com\r\n\r\n");
547+
$connection->expects($this->once())->method('isReadable')->willReturn(true);
548+
$connection->expects($this->once())->method('close');
549+
550+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
551+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
552+
553+
$requestData = new Request('GET', 'http://www.example.com', array(), '', '1.0');
554+
$request = new ClientRequestStream($connectionManager, $requestData);
555+
556+
$request->on('close', $this->expectCallableOnce());
557+
558+
$request->end();
559+
560+
$request->handleData("HTTP/1.0 200 OK\r\nContent-Length: 0\r\n\r\n");
561+
}
562+
563+
public function testStreamShouldReuseConnectionForHttp10WhenBothRequestAndResponseContainConnectionKeepAlive()
564+
{
565+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
566+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.0\r\nHost: www.example.com\r\nConnection: keep-alive\r\n\r\n");
567+
$connection->expects($this->once())->method('isReadable')->willReturn(true);
568+
$connection->expects($this->never())->method('close');
569+
570+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
571+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
572+
$connectionManager->expects($this->once())->method('handBack')->with($connection);
573+
574+
$requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'keep-alive'), '', '1.0');
575+
$request = new ClientRequestStream($connectionManager, $requestData);
576+
577+
$request->on('close', $this->expectCallableOnce());
578+
579+
$request->end();
580+
581+
$request->handleData("HTTP/1.0 200 OK\r\nContent-Length: 0\r\nConnection: keep-alive\r\n\r\n");
582+
}
583+
584+
public function testStreamShouldReuseConnectionForHttp10WhenBothRequestAndResponseContainConnectionKeepAliveWithAdditionalOptions()
585+
{
586+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
587+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.0\r\nHost: www.example.com\r\nConnection: FOO, KEEP-ALIVE, BAR\r\n\r\n");
588+
$connection->expects($this->once())->method('isReadable')->willReturn(true);
589+
$connection->expects($this->never())->method('close');
590+
591+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
592+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
593+
$connectionManager->expects($this->once())->method('handBack')->with($connection);
594+
595+
$requestData = new Request('GET', 'http://www.example.com', array('Connection' => 'FOO, KEEP-ALIVE, BAR'), '', '1.0');
596+
$request = new ClientRequestStream($connectionManager, $requestData);
597+
598+
$request->on('close', $this->expectCallableOnce());
599+
600+
$request->end();
601+
602+
$request->handleData("HTTP/1.0 200 OK\r\nContent-Length: 0\r\nConnection: Foo, Keep-Alive, Bar\r\n\r\n");
603+
}
604+
605+
public function testStreamShouldNotReuseConnectionWhenResponseContainsNoContentLengthAndResponseBodyTerminatedByConnectionEndEvent()
606+
{
607+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
608+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n");
609+
$connection->expects($this->once())->method('isReadable')->willReturn(false);
610+
$connection->expects($this->once())->method('close');
611+
612+
$endEvent = null;
613+
$eventName = null;
614+
$connection->expects($this->any())->method('on')->with($this->callback(function ($name) use (&$eventName) {
615+
$eventName = $name;
616+
return true;
617+
}), $this->callback(function ($cb) use (&$endEvent, &$eventName) {
618+
if ($eventName === 'end') {
619+
$endEvent = $cb;
620+
}
621+
return true;
622+
}));
623+
624+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
625+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
626+
627+
$requestData = new Request('GET', 'http://www.example.com', array(), '', '1.1');
628+
$request = new ClientRequestStream($connectionManager, $requestData);
629+
630+
$request->on('close', $this->expectCallableOnce());
631+
632+
$request->end();
633+
634+
$request->handleData("HTTP/1.1 200 OK\r\n\r\n");
635+
636+
$this->assertNotNull($endEvent);
637+
call_user_func($endEvent); // $endEvent() (PHP 5.4+)
638+
}
639+
640+
public function testStreamShouldNotReuseConnectionWhenResponseContainsContentLengthButIsTerminatedByUnexpectedCloseEvent()
641+
{
642+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
643+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n");
644+
$connection->expects($this->atMost(1))->method('isReadable')->willReturn(false);
645+
$connection->expects($this->once())->method('close');
646+
647+
$closeEvent = null;
648+
$eventName = null;
649+
$connection->expects($this->any())->method('on')->with($this->callback(function ($name) use (&$eventName) {
650+
$eventName = $name;
651+
return true;
652+
}), $this->callback(function ($cb) use (&$closeEvent, &$eventName) {
653+
if ($eventName === 'close') {
654+
$closeEvent = $cb;
655+
}
656+
return true;
657+
}));
658+
659+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
660+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
661+
662+
$requestData = new Request('GET', 'http://www.example.com', array(), '', '1.1');
663+
$request = new ClientRequestStream($connectionManager, $requestData);
664+
665+
$request->on('close', $this->expectCallableOnce());
666+
667+
$request->end();
668+
669+
$request->handleData("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\n");
670+
671+
$this->assertNotNull($closeEvent);
672+
call_user_func($closeEvent); // $closeEvent() (PHP 5.4+)
673+
}
674+
675+
public function testStreamShouldReuseConnectionWhenResponseContainsTransferEncodingChunkedAndResponseBody()
676+
{
677+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
678+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n");
679+
$connection->expects($this->once())->method('isReadable')->willReturn(true);
680+
$connection->expects($this->never())->method('close');
681+
682+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
683+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
684+
$connectionManager->expects($this->once())->method('handBack')->with($connection);
685+
686+
$requestData = new Request('GET', 'http://www.example.com', array(), '', '1.1');
687+
$request = new ClientRequestStream($connectionManager, $requestData);
688+
689+
$request->on('close', $this->expectCallableOnce());
690+
691+
$request->end();
692+
693+
$request->handleData("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n2\r\nOK\r\n0\r\n\r\n");
694+
}
695+
696+
public function testStreamShouldNotReuseConnectionWhenResponseContainsTransferEncodingChunkedAndResponseBodyContainsInvalidData()
697+
{
698+
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
699+
$connection->expects($this->once())->method('write')->with("GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n");
700+
$connection->expects($this->atMost(1))->method('isReadable')->willReturn(true);
701+
$connection->expects($this->once())->method('close');
702+
703+
$connectionManager = $this->getMockBuilder('React\Http\Io\ClientConnectionManager')->disableOriginalConstructor()->getMock();
704+
$connectionManager->expects($this->once())->method('connect')->willReturn(\React\Promise\resolve($connection));
705+
706+
$requestData = new Request('GET', 'http://www.example.com', array(), '', '1.1');
707+
$request = new ClientRequestStream($connectionManager, $requestData);
708+
709+
$request->on('close', $this->expectCallableOnce());
710+
711+
$request->end();
712+
713+
$request->handleData("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\nINVALID\r\n");
714+
}
715+
482716
/** @test */
483717
public function postRequestShouldSendAPostRequest()
484718
{

0 commit comments

Comments
 (0)