Skip to content

Commit df9ac96

Browse files
committed
Refactor to move idle connection handling to Connection
1 parent 7c3bd22 commit df9ac96

File tree

5 files changed

+644
-349
lines changed

5 files changed

+644
-349
lines changed

src/Io/Connection.php

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace React\Mysql\Io;
44

55
use Evenement\EventEmitter;
6+
use React\EventLoop\LoopInterface;
67
use React\Mysql\Commands\CommandInterface;
78
use React\Mysql\Commands\PingCommand;
89
use React\Mysql\Commands\QueryCommand;
@@ -29,26 +30,46 @@ class Connection extends EventEmitter
2930
private $executor;
3031

3132
/**
32-
* @var integer
33+
* @var int one of the state constants (may change, but should be used readonly from outside)
34+
* @see self::STATE_*
3335
*/
34-
private $state = self::STATE_AUTHENTICATED;
36+
public $state = self::STATE_AUTHENTICATED;
3537

3638
/**
3739
* @var SocketConnectionInterface
3840
*/
3941
private $stream;
4042

43+
/** @var LoopInterface */
44+
private $loop;
45+
46+
/** @var float */
47+
private $idlePeriod = 0.001;
48+
49+
/** @var ?\React\EventLoop\TimerInterface */
50+
private $idleTimer;
51+
52+
/** @var int */
53+
private $pending = 0;
54+
4155
/**
4256
* Connection constructor.
4357
*
4458
* @param SocketConnectionInterface $stream
4559
* @param Executor $executor
60+
* @param LoopInterface $loop
61+
* @param ?float $idlePeriod
4662
*/
47-
public function __construct(SocketConnectionInterface $stream, Executor $executor)
63+
public function __construct(SocketConnectionInterface $stream, Executor $executor, LoopInterface $loop, $idlePeriod)
4864
{
4965
$this->stream = $stream;
5066
$this->executor = $executor;
5167

68+
$this->loop = $loop;
69+
if ($idlePeriod !== null) {
70+
$this->idlePeriod = $idlePeriod;
71+
}
72+
5273
$stream->on('error', [$this, 'handleConnectionError']);
5374
$stream->on('close', [$this, 'handleConnectionClosed']);
5475
}
@@ -71,6 +92,7 @@ public function query($sql, array $params = [])
7192
return \React\Promise\reject($e);
7293
}
7394

95+
$this->awake();
7496
$deferred = new Deferred();
7597

7698
// store all result set rows until result set end
@@ -86,11 +108,13 @@ public function query($sql, array $params = [])
86108

87109
$rows = [];
88110

111+
$this->idle();
89112
$deferred->resolve($result);
90113
});
91114

92115
// resolve / reject status reply (response without result set)
93116
$command->on('error', function ($error) use ($deferred) {
117+
$this->idle();
94118
$deferred->reject($error);
95119
});
96120
$command->on('success', function () use ($command, $deferred) {
@@ -99,6 +123,7 @@ public function query($sql, array $params = [])
99123
$result->insertId = $command->insertId;
100124
$result->warningCount = $command->warningCount;
101125

126+
$this->idle();
102127
$deferred->resolve($result);
103128
});
104129

@@ -115,20 +140,30 @@ public function queryStream($sql, $params = [])
115140
$command = new QueryCommand();
116141
$command->setQuery($query);
117142
$this->_doCommand($command);
143+
$this->awake();
144+
145+
$stream = new QueryStream($command, $this->stream);
146+
$stream->on('close', function () {
147+
$this->idle();
148+
});
118149

119-
return new QueryStream($command, $this->stream);
150+
return $stream;
120151
}
121152

122153
public function ping()
123154
{
124155
return new Promise(function ($resolve, $reject) {
125-
$this->_doCommand(new PingCommand())
126-
->on('error', function ($reason) use ($reject) {
127-
$reject($reason);
128-
})
129-
->on('success', function () use ($resolve) {
130-
$resolve(null);
131-
});
156+
$command = $this->_doCommand(new PingCommand());
157+
$this->awake();
158+
159+
$command->on('success', function () use ($resolve) {
160+
$this->idle();
161+
$resolve(null);
162+
});
163+
$command->on('error', function ($reason) use ($reject) {
164+
$this->idle();
165+
$reject($reason);
166+
});
132167
});
133168
}
134169

@@ -137,6 +172,10 @@ public function quit()
137172
return new Promise(function ($resolve, $reject) {
138173
$command = $this->_doCommand(new QuitCommand());
139174
$this->state = self::STATE_CLOSING;
175+
176+
// mark connection as "awake" until it is closed, so never "idle"
177+
$this->awake();
178+
140179
$command->on('success', function () use ($resolve) {
141180
$resolve(null);
142181
$this->close();
@@ -158,6 +197,11 @@ public function close()
158197
$remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false;
159198
$this->stream->close();
160199

200+
if ($this->idleTimer !== null) {
201+
$this->loop->cancelTimer($this->idleTimer);
202+
$this->idleTimer = null;
203+
}
204+
161205
// reject all pending commands if connection is closed
162206
while (!$this->executor->isIdle()) {
163207
$command = $this->executor->dequeue();
@@ -223,4 +267,29 @@ protected function _doCommand(CommandInterface $command)
223267

224268
return $this->executor->enqueue($command);
225269
}
270+
271+
private function awake()
272+
{
273+
++$this->pending;
274+
275+
if ($this->idleTimer !== null) {
276+
$this->loop->cancelTimer($this->idleTimer);
277+
$this->idleTimer = null;
278+
}
279+
}
280+
281+
private function idle()
282+
{
283+
--$this->pending;
284+
285+
if ($this->pending < 1 && $this->idlePeriod >= 0 && $this->state === self::STATE_AUTHENTICATED) {
286+
$this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () {
287+
// soft-close connection and emit close event afterwards both on success or on error
288+
$this->idleTimer = null;
289+
$this->quit()->then(null, function () {
290+
// ignore to avoid reporting unhandled rejection
291+
});
292+
});
293+
}
294+
}
226295
}

src/Io/Factory.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,12 @@ public function createConnection(
210210
$connecting->cancel();
211211
});
212212

213-
$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri) {
213+
$idlePeriod = isset($args['idle']) ? (float) $args['idle'] : null;
214+
$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri, $idlePeriod) {
214215
$executor = new Executor();
215216
$parser = new Parser($stream, $executor);
216217

217-
$connection = new Connection($stream, $executor);
218+
$connection = new Connection($stream, $executor, $this->loop, $idlePeriod);
218219
$command = $executor->enqueue($authCommand);
219220
$parser->start();
220221

0 commit comments

Comments
 (0)