Skip to content

Commit c8e0430

Browse files
authored
Merge pull request #3 from appwrite-labs/fix-memory
Cleanup memory on Swoole IO
2 parents bd380cb + 816a043 commit c8e0430

File tree

5 files changed

+67
-155
lines changed

5 files changed

+67
-155
lines changed

.github/workflows/changelog.yml

Lines changed: 0 additions & 51 deletions
This file was deleted.

.github/workflows/code-style.yml

Lines changed: 0 additions & 25 deletions
This file was deleted.

.github/workflows/documentation.yml

Lines changed: 0 additions & 44 deletions
This file was deleted.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ phpDocumentor.phar
1414
.phpactor.json
1515
.phpunit.result.cache
1616
.phpdoc
17+
.idea

PhpAmqpLib/Wire/IO/SwooleIO.php

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,18 @@ class SwooleIO extends AbstractIO
2121
* @param int $port
2222
* @param float $connection_timeout
2323
* @param float $read_write_timeout
24-
* @param null $context
24+
* @param mixed $context
2525
* @param bool $keepalive
2626
* @param int $heartbeat
2727
*/
2828
public function __construct(
29-
$host,
30-
$port,
31-
$connection_timeout,
32-
$read_write_timeout,
33-
$context = null,
34-
$keepalive = false,
35-
$heartbeat = 0
29+
string $host,
30+
int $port,
31+
float $connection_timeout,
32+
float $read_write_timeout,
33+
mixed $context = null,
34+
bool $keepalive = false,
35+
int $heartbeat = 0
3636
) {
3737
/*
3838
TODO FUTURE enable this check
@@ -58,8 +58,11 @@ public function __construct(
5858
* @throws AMQPRuntimeException
5959
* @throws AMQPIOException
6060
*/
61-
public function connect()
61+
public function connect(): void
6262
{
63+
// Close any existing connection to prevent resource leaks
64+
$this->close();
65+
6366
$this->sock = new Client(SWOOLE_SOCK_TCP);
6467

6568
// Set socket options before connecting
@@ -71,9 +74,12 @@ public function connect()
7174
'open_tcp_nodelay' => true,
7275
'tcp_keepalive' => $this->keepalive,
7376
'package_max_length' => 2 * 1024 * 1024, // 2MB max package
77+
'socket_buffer_size' => 2 * 1024 * 1024,
78+
'buffer_output_size' => 2 * 1024 * 1024,
7479
]);
7580

7681
if (!$this->sock->connect($this->host, $this->port)) {
82+
$this->close();
7783
throw new AMQPIOException(
7884
sprintf(
7985
'Error Connecting to server(%s): %s ',
@@ -93,7 +99,7 @@ public function connect()
9399
* @throws AMQPTimeoutException
94100
* @throws AMQPConnectionClosedException
95101
*/
96-
public function read($len)
102+
public function read($len): string
97103
{
98104
if ($this->sock === null) {
99105
throw new AMQPConnectionClosedException('Socket connection is closed');
@@ -115,27 +121,28 @@ public function read($len)
115121
// Read remaining bytes from socket
116122
while ($remaining > 0) {
117123
if (!$this->sock->connected) {
124+
$this->close();
118125
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
119126
}
120127

121128
// Swoole recv() returns false on error, empty string on EOF
122129
$chunk = $this->sock->recv($remaining, $this->read_timeout);
123130

124-
if ($chunk === false) {
131+
if ($chunk === '' || $chunk === false) {
132+
$this->close();
125133
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
126134
throw new AMQPTimeoutException('Read timeout');
127135
}
128-
throw new AMQPIOException(
129-
sprintf('Error receiving data: %s', swoole_strerror($this->sock->errCode)),
130-
$this->sock->errCode
131-
);
132-
}
133-
134-
if ($chunk === '') {
136+
if ($this->sock->errCode !== 0) {
137+
throw new AMQPIOException(
138+
sprintf('Error receiving data: %s', swoole_strerror($this->sock->errCode)),
139+
$this->sock->errCode
140+
);
141+
}
135142
throw new AMQPConnectionClosedException('Connection closed by peer');
136143
}
137144

138-
$data .= $chunk;
145+
$data .= $chunk;
139146
$remaining -= strlen($chunk);
140147
}
141148

@@ -148,30 +155,44 @@ public function read($len)
148155
* @throws AMQPRuntimeException
149156
* @throws AMQPTimeoutException
150157
* @throws AMQPConnectionClosedException
158+
* @throws AMQPIOException
151159
*/
152-
public function write($data)
160+
public function write($data): void
153161
{
154162
if ($this->sock === null || !$this->sock->connected) {
163+
$this->close();
155164
throw new AMQPConnectionClosedException('Socket connection is closed');
156165
}
157166

158167
$this->checkBrokerHeartbeat();
159168

160-
// Swoole send() handles partial writes internally
161-
$result = $this->sock->send($data);
169+
$totalLength = strlen($data);
170+
$offset = 0;
162171

163-
if ($result === false) {
164-
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
165-
throw new AMQPTimeoutException('Write timeout');
172+
while ($offset < $totalLength) {
173+
// Send remaining bytes; avoid extra substr() call for the first chunk
174+
$chunk = $offset === 0 ? $data : substr($data, $offset);
175+
176+
$sent = $this->sock->send($chunk);
177+
178+
if ($sent === false) {
179+
$this->close();
180+
if ($this->sock->errCode == SOCKET_ETIMEDOUT) {
181+
throw new AMQPTimeoutException('Write timeout');
182+
}
183+
throw new AMQPIOException(
184+
sprintf('Error sending data: %s', swoole_strerror($this->sock->errCode)),
185+
$this->sock->errCode
186+
);
166187
}
167-
throw new AMQPIOException(
168-
sprintf('Error sending data: %s', swoole_strerror($this->sock->errCode)),
169-
$this->sock->errCode
170-
);
171-
}
172188

173-
if ($result !== strlen($data)) {
174-
throw new AMQPIOException('Could not write entire buffer');
189+
if ($sent === 0) {
190+
// Peer closed the connection
191+
$this->close();
192+
throw new AMQPConnectionClosedException('Connection closed by peer while writing');
193+
}
194+
195+
$offset += $sent;
175196
}
176197

177198
$this->last_write = microtime(true);
@@ -180,7 +201,7 @@ public function write($data)
180201
/**
181202
* @return void
182203
*/
183-
public function close()
204+
public function close(): void
184205
{
185206
if ($this->sock !== null && $this->sock->connected) {
186207
$this->sock->close();
@@ -191,13 +212,21 @@ public function close()
191212
$this->buffer = '';
192213
}
193214

215+
/**
216+
* Ensure the socket is closed when the object is destroyed.
217+
*/
218+
public function __destruct()
219+
{
220+
$this->close();
221+
}
222+
194223
/**
195224
* @param int|null $sec
196225
* @param int $usec
197226
* @return int|bool
198227
* @throws AMQPConnectionClosedException
199228
*/
200-
protected function do_select(?int $sec, int $usec)
229+
protected function do_select(?int $sec, int $usec): bool|int
201230
{
202231
if ($this->sock === null || !$this->sock->connected) {
203232
throw new AMQPConnectionClosedException('Socket connection is closed');
@@ -222,12 +251,14 @@ protected function do_select(?int $sec, int $usec)
222251
}
223252
// Connection error
224253
if ($this->sock->errCode == SOCKET_ECONNRESET || !$this->sock->connected) {
254+
$this->close();
225255
throw new AMQPConnectionClosedException('Connection reset by peer');
226256
}
227257
return false; // Other error
228258
}
229259

230260
if ($data === '') {
261+
$this->close();
231262
throw new AMQPConnectionClosedException('Connection closed by peer');
232263
}
233264

@@ -239,7 +270,7 @@ protected function do_select(?int $sec, int $usec)
239270
/**
240271
* @return Client|null
241272
*/
242-
public function getSocket()
273+
public function getSocket(): ?Client
243274
{
244275
return $this->sock;
245276
}

0 commit comments

Comments
 (0)