|
6 | 6 | use Amp\Coroutine; |
7 | 7 | use Amp\Deferred; |
8 | 8 | use Amp\Promise; |
| 9 | +use function Amp\call; |
9 | 10 |
|
10 | 11 | abstract class AbstractPool implements Pool { |
11 | 12 | use CallableMaker; |
@@ -48,7 +49,7 @@ public function __construct() { |
48 | 49 | * {@inheritdoc} |
49 | 50 | */ |
50 | 51 | public function extractConnection(): Promise { |
51 | | - return \Amp\call(function () { |
| 52 | + return call(function () { |
52 | 53 | $connection = yield from $this->pop(); |
53 | 54 | $this->connections->detach($connection); |
54 | 55 | return $connection; |
@@ -86,8 +87,6 @@ protected function addConnection(Connection $connection) { |
86 | 87 | } |
87 | 88 |
|
88 | 89 | /** |
89 | | - * @coroutine |
90 | | - * |
91 | 90 | * @return \Generator |
92 | 91 | * |
93 | 92 | * @resolve \Amp\Postgres\Connection |
@@ -145,170 +144,158 @@ private function push(Connection $connection) { |
145 | 144 | * {@inheritdoc} |
146 | 145 | */ |
147 | 146 | public function query(string $sql): Promise { |
148 | | - return new Coroutine($this->doQuery($sql)); |
149 | | - } |
150 | | - |
151 | | - private function doQuery(string $sql): \Generator { |
152 | | - /** @var \Amp\Postgres\Connection $connection */ |
153 | | - $connection = yield from $this->pop(); |
| 147 | + return call(function () use ($sql) { |
| 148 | + /** @var \Amp\Postgres\Connection $connection */ |
| 149 | + $connection = yield from $this->pop(); |
154 | 150 |
|
155 | | - try { |
156 | | - $result = yield $connection->query($sql); |
157 | | - } catch (\Throwable $exception) { |
158 | | - $this->push($connection); |
159 | | - throw $exception; |
160 | | - } |
| 151 | + try { |
| 152 | + $result = yield $connection->query($sql); |
| 153 | + } catch (\Throwable $exception) { |
| 154 | + $this->push($connection); |
| 155 | + throw $exception; |
| 156 | + } |
161 | 157 |
|
162 | | - if ($result instanceof Operation) { |
163 | | - $result->onDestruct(function () use ($connection) { |
| 158 | + if ($result instanceof Operation) { |
| 159 | + $result->onDestruct(function () use ($connection) { |
| 160 | + $this->push($connection); |
| 161 | + }); |
| 162 | + } else { |
164 | 163 | $this->push($connection); |
165 | | - }); |
166 | | - } else { |
167 | | - $this->push($connection); |
168 | | - } |
| 164 | + } |
169 | 165 |
|
170 | | - return $result; |
| 166 | + return $result; |
| 167 | + }); |
171 | 168 | } |
172 | 169 |
|
173 | 170 | /** |
174 | 171 | * {@inheritdoc} |
175 | 172 | */ |
176 | 173 | public function execute(string $sql, array $params = []): Promise { |
177 | | - return new Coroutine($this->doExecute($sql, $params)); |
178 | | - } |
179 | | - |
180 | | - private function doExecute(string $sql, array $params): \Generator { |
181 | | - /** @var \Amp\Postgres\Connection $connection */ |
182 | | - $connection = yield from $this->pop(); |
| 174 | + return call(function () use ($sql, $params) { |
| 175 | + /** @var \Amp\Postgres\Connection $connection */ |
| 176 | + $connection = yield from $this->pop(); |
183 | 177 |
|
184 | | - try { |
185 | | - $result = yield $connection->execute($sql, $params); |
186 | | - } catch (\Throwable $exception) { |
187 | | - $this->push($connection); |
188 | | - throw $exception; |
189 | | - } |
| 178 | + try { |
| 179 | + $result = yield $connection->execute($sql, $params); |
| 180 | + } catch (\Throwable $exception) { |
| 181 | + $this->push($connection); |
| 182 | + throw $exception; |
| 183 | + } |
190 | 184 |
|
191 | | - if ($result instanceof Operation) { |
192 | | - $result->onDestruct(function () use ($connection) { |
| 185 | + if ($result instanceof Operation) { |
| 186 | + $result->onDestruct(function () use ($connection) { |
| 187 | + $this->push($connection); |
| 188 | + }); |
| 189 | + } else { |
193 | 190 | $this->push($connection); |
194 | | - }); |
195 | | - } else { |
196 | | - $this->push($connection); |
197 | | - } |
| 191 | + } |
198 | 192 |
|
199 | | - return $result; |
| 193 | + return $result; |
| 194 | + }); |
200 | 195 | } |
201 | 196 |
|
202 | 197 | /** |
203 | 198 | * {@inheritdoc} |
204 | 199 | */ |
205 | 200 | public function prepare(string $sql): Promise { |
206 | | - return new Coroutine($this->doPrepare($sql)); |
207 | | - } |
| 201 | + return call(function () use ($sql) { |
| 202 | + /** @var \Amp\Postgres\Connection $connection */ |
| 203 | + $connection = yield from $this->pop(); |
208 | 204 |
|
209 | | - private function doPrepare(string $sql): \Generator { |
210 | | - /** @var \Amp\Postgres\Connection $connection */ |
211 | | - $connection = yield from $this->pop(); |
| 205 | + try { |
| 206 | + /** @var \Amp\Postgres\Statement $statement */ |
| 207 | + $statement = yield $connection->prepare($sql); |
| 208 | + } catch (\Throwable $exception) { |
| 209 | + $this->push($connection); |
| 210 | + throw $exception; |
| 211 | + } |
212 | 212 |
|
213 | | - try { |
214 | | - /** @var \Amp\Postgres\Statement $statement */ |
215 | | - $statement = yield $connection->prepare($sql); |
216 | | - } catch (\Throwable $exception) { |
217 | | - $this->push($connection); |
218 | | - throw $exception; |
219 | | - } |
| 213 | + $statement->onDestruct(function () use ($connection) { |
| 214 | + $this->push($connection); |
| 215 | + }); |
220 | 216 |
|
221 | | - $statement->onDestruct(function () use ($connection) { |
222 | | - $this->push($connection); |
| 217 | + return $statement; |
223 | 218 | }); |
224 | | - |
225 | | - return $statement; |
226 | 219 | } |
227 | 220 |
|
228 | 221 | /** |
229 | 222 | * {@inheritdoc} |
230 | 223 | */ |
231 | 224 | public function notify(string $channel, string $payload = ""): Promise { |
232 | | - return new Coroutine($this->doNotify($channel, $payload)); |
233 | | - } |
234 | | - |
235 | | - private function doNotify(string $channel, string $payload): \Generator { |
236 | | - /** @var \Amp\Postgres\Connection $connection */ |
237 | | - $connection = yield from $this->pop(); |
| 225 | + return call(function () use ($channel, $payload) { |
| 226 | + /** @var \Amp\Postgres\Connection $connection */ |
| 227 | + $connection = yield from $this->pop(); |
238 | 228 |
|
239 | | - try { |
240 | | - $result = yield $connection->notify($channel, $payload); |
241 | | - } finally { |
242 | | - $this->push($connection); |
243 | | - } |
| 229 | + try { |
| 230 | + $result = yield $connection->notify($channel, $payload); |
| 231 | + } finally { |
| 232 | + $this->push($connection); |
| 233 | + } |
244 | 234 |
|
245 | | - return $result; |
| 235 | + return $result; |
| 236 | + }); |
246 | 237 | } |
247 | 238 |
|
248 | 239 | /** |
249 | 240 | * {@inheritdoc} |
250 | 241 | */ |
251 | 242 | public function listen(string $channel): Promise { |
252 | | - return new Coroutine($this->doListen($channel)); |
253 | | - } |
254 | | - |
255 | | - public function doListen(string $channel): \Generator { |
256 | | - ++$this->listenerCount; |
| 243 | + return call(function () use ($channel) { |
| 244 | + ++$this->listenerCount; |
257 | 245 |
|
258 | | - if ($this->listeningConnection === null) { |
259 | | - $this->listeningConnection = new Coroutine($this->pop()); |
260 | | - } |
261 | | - |
262 | | - if ($this->listeningConnection instanceof Promise) { |
263 | | - $this->listeningConnection = yield $this->listeningConnection; |
264 | | - } |
| 246 | + if ($this->listeningConnection === null) { |
| 247 | + $this->listeningConnection = new Coroutine($this->pop()); |
| 248 | + } |
265 | 249 |
|
266 | | - try { |
267 | | - /** @var \Amp\Postgres\Listener $listener */ |
268 | | - $listener = yield $this->listeningConnection->listen($channel); |
269 | | - } catch (\Throwable $exception) { |
270 | | - if (--$this->listenerCount === 0) { |
271 | | - $connection = $this->listeningConnection; |
272 | | - $this->listeningConnection = null; |
273 | | - $this->push($connection); |
| 250 | + if ($this->listeningConnection instanceof Promise) { |
| 251 | + $this->listeningConnection = yield $this->listeningConnection; |
274 | 252 | } |
275 | | - throw $exception; |
276 | | - } |
277 | 253 |
|
278 | | - $listener->onDestruct(function () { |
279 | | - if (--$this->listenerCount === 0) { |
280 | | - $connection = $this->listeningConnection; |
281 | | - $this->listeningConnection = null; |
282 | | - $this->push($connection); |
| 254 | + try { |
| 255 | + /** @var \Amp\Postgres\Listener $listener */ |
| 256 | + $listener = yield $this->listeningConnection->listen($channel); |
| 257 | + } catch (\Throwable $exception) { |
| 258 | + if (--$this->listenerCount === 0) { |
| 259 | + $connection = $this->listeningConnection; |
| 260 | + $this->listeningConnection = null; |
| 261 | + $this->push($connection); |
| 262 | + } |
| 263 | + throw $exception; |
283 | 264 | } |
284 | | - }); |
285 | 265 |
|
286 | | - return $listener; |
| 266 | + $listener->onDestruct(function () { |
| 267 | + if (--$this->listenerCount === 0) { |
| 268 | + $connection = $this->listeningConnection; |
| 269 | + $this->listeningConnection = null; |
| 270 | + $this->push($connection); |
| 271 | + } |
| 272 | + }); |
| 273 | + |
| 274 | + return $listener; |
| 275 | + }); |
287 | 276 | } |
288 | 277 |
|
289 | 278 | /** |
290 | 279 | * {@inheritdoc} |
291 | 280 | */ |
292 | 281 | public function transaction(int $isolation = Transaction::COMMITTED): Promise { |
293 | | - return new Coroutine($this->doTransaction($isolation)); |
294 | | - } |
| 282 | + return call(function () use ($isolation) { |
| 283 | + /** @var \Amp\Postgres\Connection $connection */ |
| 284 | + $connection = yield from $this->pop(); |
295 | 285 |
|
296 | | - private function doTransaction(int $isolation = Transaction::COMMITTED): \Generator { |
297 | | - /** @var \Amp\Postgres\Connection $connection */ |
298 | | - $connection = yield from $this->pop(); |
| 286 | + try { |
| 287 | + /** @var \Amp\Postgres\Transaction $transaction */ |
| 288 | + $transaction = yield $connection->transaction($isolation); |
| 289 | + } catch (\Throwable $exception) { |
| 290 | + $this->push($connection); |
| 291 | + throw $exception; |
| 292 | + } |
299 | 293 |
|
300 | | - try { |
301 | | - /** @var \Amp\Postgres\Transaction $transaction */ |
302 | | - $transaction = yield $connection->transaction($isolation); |
303 | | - } catch (\Throwable $exception) { |
304 | | - $this->push($connection); |
305 | | - throw $exception; |
306 | | - } |
| 294 | + $transaction->onDestruct(function () use ($connection) { |
| 295 | + $this->push($connection); |
| 296 | + }); |
307 | 297 |
|
308 | | - $transaction->onDestruct(function () use ($connection) { |
309 | | - $this->push($connection); |
| 298 | + return $transaction; |
310 | 299 | }); |
311 | | - |
312 | | - return $transaction; |
313 | 300 | } |
314 | 301 | } |
0 commit comments