Skip to content

Commit 28943f4

Browse files
committed
Reuse existing connections for HTTP keep-alive
1 parent ab3bfee commit 28943f4

File tree

7 files changed

+448
-32
lines changed

7 files changed

+448
-32
lines changed

src/Io/ClientConnectionManager.php

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
namespace React\Http\Io;
44

55
use Psr\Http\Message\UriInterface;
6+
use React\EventLoop\LoopInterface;
7+
use React\EventLoop\TimerInterface;
68
use React\Promise\PromiseInterface;
79
use React\Socket\ConnectionInterface;
810
use React\Socket\ConnectorInterface;
@@ -18,9 +20,28 @@ class ClientConnectionManager
1820
/** @var ConnectorInterface */
1921
private $connector;
2022

21-
public function __construct(ConnectorInterface $connector)
23+
/** @var LoopInterface */
24+
private $loop;
25+
26+
/** @var string[] */
27+
private $idleUris = array();
28+
29+
/** @var ConnectionInterface[] */
30+
private $idleConnections = array();
31+
32+
/** @var TimerInterface[] */
33+
private $idleTimers = array();
34+
35+
/** @var \Closure[] */
36+
private $idleStreamHandlers = array();
37+
38+
/** @var float */
39+
private $maximumTimeToKeepAliveIdleConnection = 0.001;
40+
41+
public function __construct(ConnectorInterface $connector, LoopInterface $loop)
2242
{
2343
$this->connector = $connector;
44+
$this->loop = $loop;
2445
}
2546

2647
/**
@@ -39,15 +60,78 @@ public function connect(UriInterface $uri)
3960
if ($port === null) {
4061
$port = $scheme === 'https' ? 443 : 80;
4162
}
63+
$uri = ($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port;
64+
65+
// Reuse idle connection for same URI if available
66+
foreach ($this->idleConnections as $id => $connection) {
67+
if ($this->idleUris[$id] === $uri) {
68+
assert($this->idleStreamHandlers[$id] instanceof \Closure);
69+
$connection->removeListener('close', $this->idleStreamHandlers[$id]);
70+
$connection->removeListener('data', $this->idleStreamHandlers[$id]);
71+
$connection->removeListener('error', $this->idleStreamHandlers[$id]);
72+
73+
assert($this->idleTimers[$id] instanceof TimerInterface);
74+
$this->loop->cancelTimer($this->idleTimers[$id]);
75+
unset($this->idleUris[$id], $this->idleConnections[$id], $this->idleTimers[$id], $this->idleStreamHandlers[$id]);
4276

43-
return $this->connector->connect(($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port);
77+
return \React\Promise\resolve($connection);
78+
}
79+
}
80+
81+
// Create new connection if no idle connection to same URI is available
82+
return $this->connector->connect($uri);
4483
}
4584

4685
/**
86+
* Hands back an idle connection to the connection manager for possible future reuse.
87+
*
4788
* @return void
4889
*/
49-
public function handBack(ConnectionInterface $connection)
90+
public function keepAlive(UriInterface $uri, ConnectionInterface $connection)
5091
{
92+
$scheme = $uri->getScheme();
93+
assert($scheme === 'https' || $scheme === 'http');
94+
95+
$port = $uri->getPort();
96+
if ($port === null) {
97+
$port = $scheme === 'https' ? 443 : 80;
98+
}
99+
100+
$this->idleUris[] = ($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port;
101+
$this->idleConnections[] = $connection;
102+
103+
$that = $this;
104+
$cleanUp = function () use ($connection, $that) {
105+
// call public method to support legacy PHP 5.3
106+
$that->cleanUpConnection($connection);
107+
};
108+
109+
// clean up and close connection when maximum time to keep-alive idle connection has passed
110+
$this->idleTimers[] = $this->loop->addTimer($this->maximumTimeToKeepAliveIdleConnection, $cleanUp);
111+
112+
// clean up and close connection when unexpected close/data/error event happens during idle time
113+
$this->idleStreamHandlers[] = $cleanUp;
114+
$connection->on('close', $cleanUp);
115+
$connection->on('data', $cleanUp);
116+
$connection->on('error', $cleanUp);
117+
}
118+
119+
/**
120+
* @internal
121+
* @return void
122+
*/
123+
public function cleanUpConnection(ConnectionInterface $connection) // private (PHP 5.4+)
124+
{
125+
$id = \array_search($connection, $this->idleConnections, true);
126+
if ($id === false) {
127+
return;
128+
}
129+
130+
assert(\is_int($id));
131+
assert($this->idleTimers[$id] instanceof TimerInterface);
132+
$this->loop->cancelTimer($this->idleTimers[$id]);
133+
unset($this->idleUris[$id], $this->idleConnections[$id], $this->idleTimers[$id], $this->idleStreamHandlers[$id]);
134+
51135
$connection->close();
52136
}
53137
}

src/Io/ClientRequestStream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public function handleData($data)
181181
$input->on('close', function () use ($connection, $that, $connectionManager, $request, $response, &$successfulEndReceived) {
182182
// only reuse connection after successful response and both request and response allow keep alive
183183
if ($successfulEndReceived && $connection->isReadable() && $that->hasMessageKeepAliveEnabled($response) && $that->hasMessageKeepAliveEnabled($request)) {
184-
$connectionManager->handBack($connection);
184+
$connectionManager->keepAlive($request->getUri(), $connection);
185185
} else {
186186
$connection->close();
187187
}

src/Io/Sender.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public static function createFromLoop(LoopInterface $loop, ConnectorInterface $c
5454
$connector = new Connector(array(), $loop);
5555
}
5656

57-
return new self(new HttpClient(new ClientConnectionManager($connector)));
57+
return new self(new HttpClient(new ClientConnectionManager($connector, $loop)));
5858
}
5959

6060
private $http;

tests/Client/FunctionalIntegrationTest.php

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace React\Tests\Http\Client;
44

55
use Psr\Http\Message\ResponseInterface;
6+
use React\EventLoop\Loop;
67
use React\Http\Client\Client;
78
use React\Http\Io\ClientConnectionManager;
89
use React\Http\Message\Request;
@@ -47,7 +48,7 @@ public function testRequestToLocalhostEmitsSingleRemoteConnection()
4748
});
4849
$port = parse_url($socket->getAddress(), PHP_URL_PORT);
4950

50-
$client = new Client(new ClientConnectionManager(new Connector()));
51+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
5152
$request = $client->request(new Request('GET', 'http://localhost:' . $port, array(), '', '1.0'));
5253

5354
$promise = Stream\first($request, 'close');
@@ -56,7 +57,7 @@ public function testRequestToLocalhostEmitsSingleRemoteConnection()
5657
\React\Async\await(\React\Promise\Timer\timeout($promise, self::TIMEOUT_LOCAL));
5758
}
5859

59-
public function testRequestToLocalhostWillConnectAndCloseConnectionAfterResponseUntilKeepAliveIsActuallySupported()
60+
public function testRequestToLocalhostWillConnectAndCloseConnectionAfterResponseWhenKeepAliveTimesOut()
6061
{
6162
$socket = new SocketServer('127.0.0.1:0');
6263
$socket->on('connection', $this->expectCallableOnce());
@@ -72,14 +73,42 @@ public function testRequestToLocalhostWillConnectAndCloseConnectionAfterResponse
7273
});
7374
$port = parse_url($socket->getAddress(), PHP_URL_PORT);
7475

75-
$client = new Client(new ClientConnectionManager(new Connector()));
76+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
7677
$request = $client->request(new Request('GET', 'http://localhost:' . $port, array(), '', '1.1'));
7778

7879
$request->end();
7980

8081
\React\Async\await(\React\Promise\Timer\timeout($promise, self::TIMEOUT_LOCAL));
8182
}
8283

84+
public function testRequestToLocalhostWillReuseExistingConnectionForSecondRequest()
85+
{
86+
$socket = new SocketServer('127.0.0.1:0');
87+
$socket->on('connection', $this->expectCallableOnce());
88+
89+
$socket->on('connection', function (ConnectionInterface $connection) use ($socket) {
90+
$connection->on('data', function () use ($connection) {
91+
$connection->write("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK");
92+
});
93+
$socket->close();
94+
});
95+
$port = parse_url($socket->getAddress(), PHP_URL_PORT);
96+
97+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
98+
99+
$request = $client->request(new Request('GET', 'http://localhost:' . $port, array(), '', '1.1'));
100+
$promise = Stream\first($request, 'close');
101+
$request->end();
102+
103+
\React\Async\await(\React\Promise\Timer\timeout($promise, self::TIMEOUT_LOCAL));
104+
105+
$request = $client->request(new Request('GET', 'http://localhost:' . $port, array(), '', '1.1'));
106+
$promise = Stream\first($request, 'close');
107+
$request->end();
108+
109+
\React\Async\await(\React\Promise\Timer\timeout($promise, self::TIMEOUT_LOCAL));
110+
}
111+
83112
public function testRequestLegacyHttpServerWithOnlyLineFeedReturnsSuccessfulResponse()
84113
{
85114
$socket = new SocketServer('127.0.0.1:0');
@@ -88,7 +117,7 @@ public function testRequestLegacyHttpServerWithOnlyLineFeedReturnsSuccessfulResp
88117
$socket->close();
89118
});
90119

91-
$client = new Client(new ClientConnectionManager(new Connector()));
120+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
92121
$request = $client->request(new Request('GET', str_replace('tcp:', 'http:', $socket->getAddress()), array(), '', '1.0'));
93122

94123
$once = $this->expectCallableOnceWith('body');
@@ -108,7 +137,7 @@ public function testSuccessfulResponseEmitsEnd()
108137
// max_nesting_level was set to 100 for PHP Versions < 5.4 which resulted in failing test for legacy PHP
109138
ini_set('xdebug.max_nesting_level', 256);
110139

111-
$client = new Client(new ClientConnectionManager(new Connector()));
140+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
112141

113142
$request = $client->request(new Request('GET', 'http://www.google.com/', array(), '', '1.0'));
114143

@@ -133,7 +162,7 @@ public function testPostDataReturnsData()
133162
// max_nesting_level was set to 100 for PHP Versions < 5.4 which resulted in failing test for legacy PHP
134163
ini_set('xdebug.max_nesting_level', 256);
135164

136-
$client = new Client(new ClientConnectionManager(new Connector()));
165+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
137166

138167
$data = str_repeat('.', 33000);
139168
$request = $client->request(new Request('POST', 'https://' . (mt_rand(0, 1) === 0 ? 'eu.' : '') . 'httpbin.org/post', array('Content-Length' => strlen($data)), '', '1.0'));
@@ -165,7 +194,7 @@ public function testPostJsonReturnsData()
165194
$this->markTestSkipped('Not supported on HHVM');
166195
}
167196

168-
$client = new Client(new ClientConnectionManager(new Connector()));
197+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
169198

170199
$data = json_encode(array('numbers' => range(1, 50)));
171200
$request = $client->request(new Request('POST', 'https://httpbin.org/post', array('Content-Length' => strlen($data), 'Content-Type' => 'application/json'), '', '1.0'));
@@ -195,7 +224,7 @@ public function testCancelPendingConnectionEmitsClose()
195224
// max_nesting_level was set to 100 for PHP Versions < 5.4 which resulted in failing test for legacy PHP
196225
ini_set('xdebug.max_nesting_level', 256);
197226

198-
$client = new Client(new ClientConnectionManager(new Connector()));
227+
$client = new Client(new ClientConnectionManager(new Connector(), Loop::get()));
199228

200229
$request = $client->request(new Request('GET', 'http://www.google.com/', array(), '', '1.0'));
201230
$request->on('error', $this->expectCallableNever());

0 commit comments

Comments
 (0)