Skip to content

Commit 06a894e

Browse files
author
Oliver Stark
committed
Process pool implemented to restrict concurrency
1 parent 15633e1 commit 06a894e

File tree

6 files changed

+297
-124
lines changed

6 files changed

+297
-124
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [1.3.0] - 2018-02-22
6+
### Changed
7+
- Process pool implemented to restrict concurrency
8+
- Concurrency configurable via `ASYNC_QUEUE_CONCURRENCY` ENV var (default: 2)
9+
- Lifetime of pool configurable via `ASYNC_QUEUE_POOL_LIFETIME` ENV var (default: 3600 seconds)
10+
511
## [1.2.0] - 2018-02-19
612
### Changed
713
- Prevent multiple background processes

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "ostark/craft-async-queue",
33
"description": "A queue handler that moves queue execution to a non-blocking background process",
44
"type": "craft-plugin",
5-
"version": "1.2.0",
5+
"version": "1.3.0",
66
"keywords": [
77
"craft",
88
"cms",

src/Plugin.php

Lines changed: 63 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,20 @@
44
*
55
* A queue handler that moves queue execution to a non-blocking background process
66
*
7-
* @link http://www.fortrabbit.com
7+
* @link https://www.fortrabbit.com
88
* @copyright Copyright (c) 2017 Oliver Stark
99
*/
1010

1111
namespace ostark\AsyncQueue;
1212

13-
1413
use Craft;
1514
use craft\base\Plugin as BasePlugin;
1615
use craft\queue\BaseJob;
1716
use craft\queue\Command;
1817
use craft\queue\Queue;
19-
use Symfony\Component\Process\Process;
20-
use Symfony\Component\Process\PhpExecutableFinder;
2118
use yii\base\ActionEvent;
2219
use yii\base\Event;
20+
use yii\caching\CacheInterface;
2321
use yii\queue\PushEvent;
2422

2523

@@ -33,152 +31,94 @@
3331
*/
3432
class Plugin extends BasePlugin
3533
{
36-
37-
const LOCK_NAME = 'async-queue-lock';
38-
const LOCK_TIMEOUT = 60;
39-
40-
/**
41-
* @var bool mutex
42-
*/
43-
protected $inProgress = false;
44-
4534
/**
4635
* Init plugin
4736
*/
4837
public function init()
4938
{
50-
5139
parent::init();
5240

53-
Event::on(Command::class, Command::EVENT_AFTER_ACTION, function(ActionEvent $event) {
54-
if ('run' === $event->action->id) {
55-
$this->setInProgress(false);
41+
// Register plugin components
42+
$this->setComponents([
43+
'async_handler' => QueueHandler::class,
44+
'async_pool' => QueuePool::class,
45+
]);
46+
47+
// Tell yii about the concrete implementation of CacheInterface
48+
Craft::$container->set(CacheInterface::class, Craft::$app->getCache());
49+
50+
// EventHandlers
51+
// =========================================================================
52+
53+
PushEvent::on(
54+
Queue::class,
55+
Queue::EVENT_AFTER_PUSH,
56+
function (PushEvent $event) {
57+
// Disable frontend queue runner
58+
Craft::$app->getConfig()->getGeneral()->runQueueAutomatically = false;
59+
60+
// Run queue in the background
61+
if ($this->getPool()->canIUse()) {
62+
$this->getHandler()->startBackgroundProcess();
63+
$this->getPool()->increment();
64+
$handled = true;
65+
}
66+
67+
// Log what's going on
68+
$this->logPushEvent($event, $handled ?? false);
5669
}
57-
});
58-
59-
// Listen to
60-
PushEvent::on(Queue::class, Queue::EVENT_AFTER_PUSH, function (PushEvent $event) {
61-
62-
// Disable frontend queue runner
63-
Craft::$app->getConfig()->getGeneral()->runQueueAutomatically = false;
64-
65-
if ($event->job instanceof BaseJob) {
66-
Craft::trace(
67-
Craft::t(
68-
'async-queue',
69-
'Handling PushEvent for {job} job', ['job' => $event->job->getDescription()]
70-
),
71-
__METHOD__
72-
);
73-
}
74-
75-
// Run queue in the background
76-
$this->startBackgroundProcess();
77-
});
70+
);
7871

72+
Event::on(
73+
Command::class,
74+
Command::EVENT_AFTER_ACTION,
75+
function (ActionEvent $event) {
76+
if ('run' === $event->action->id) {
77+
$this->getPool()->decrement();
78+
}
79+
}
80+
);
7981
}
8082

8183

84+
// ServiceLocators
85+
// =========================================================================
86+
8287
/**
83-
* Runs craft queue/run in the background
84-
*
85-
* @return bool
88+
* @return \ostark\AsyncQueue\QueueHandler
8689
*/
87-
protected function startBackgroundProcess()
90+
public function getHandler(): QueueHandler
8891
{
89-
if ($this->isInProgress()) {
90-
return false;
91-
}
92-
93-
$cmd = $this->getCommand();
94-
$cwd = CRAFT_BASE_PATH;
95-
96-
$process = new Process($cmd, $cwd);
97-
98-
try {
99-
$process->run();
100-
$this->setInProgress(true);
101-
} catch (\Exception $e) {
102-
Craft::error($e, __METHOD__);
103-
}
104-
105-
Craft::trace(
106-
Craft::t(
107-
'async-queue',
108-
'Job status: {status}. Exit code: {code}', ['status' => $process->getStatus(), 'code' => $process->getExitCodeText()]
109-
),
110-
__METHOD__
111-
);
112-
113-
return true;
92+
return $this->get('async_handler');
11493
}
11594

116-
11795
/**
118-
* Construct queue command
119-
*
120-
* @return string
96+
* @return \ostark\AsyncQueue\QueuePool
12197
*/
122-
protected function getCommand()
98+
public function getPool(): QueuePool
12399
{
124-
$executableFinder = new PhpExecutableFinder();
125-
if (false === $php = $executableFinder->find(false)) {
126-
return null;
127-
} else {
128-
$cmd = array_merge(
129-
[$php],
130-
$executableFinder->findArguments(),
131-
['craft', 'queue/run -v']
132-
);
133-
134-
return $this->getBackgroundCommand(implode(' ', $cmd));
135-
}
100+
return $this->get('async_pool');
136101
}
137102

138103

139104
/**
140-
* Extend command with background syntax
141-
*
142-
* @param string $cmd
143-
*
144-
* @return string
105+
* @param \yii\queue\PushEvent $event
106+
* @param bool $handled
145107
*/
146-
protected function getBackgroundCommand(string $cmd): string
147-
{
148-
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
149-
return 'start /B ' . $cmd . ' > NUL';
150-
} else {
151-
return 'nice ' . $cmd . ' > /dev/null 2>&1 &';
152-
}
153-
}
154-
155-
protected function isInProgress()
156-
{
157-
if (\Craft::$app->getCache()->get(self::LOCK_NAME)) {
158-
Craft::trace(
159-
Craft::t(
160-
'async-queue',
161-
'Background process running'
162-
),
163-
__METHOD__
164-
);
165-
return true;
166-
}
167-
168-
return false;
169-
}
170-
171-
protected function setInProgress($progress = true)
108+
protected function logPushEvent(PushEvent $event, $handled = false)
172109
{
173-
// set
174-
if ($progress) {
175-
return \Craft::$app->getCache()->set(self::LOCK_NAME, self::LOCK_NAME, self::LOCK_TIMEOUT);
110+
if ($event->job instanceof BaseJob) {
111+
Craft::trace(
112+
Craft::t(
113+
'async-queue',
114+
'New PushEvent for {job} job - ({handled})', [
115+
'job' => $event->job->getDescription(),
116+
'handled' => $handled ? 'handled' : 'skipped'
117+
]
118+
),
119+
__METHOD__
120+
);
176121
}
177-
178-
// remove
179-
return \Craft::$app->getCache()->delete(self::LOCK_NAME);
180-
181122
}
182123

183-
184124
}

src/QueueHandler.php

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php namespace ostark\AsyncQueue;
2+
3+
use Craft;
4+
use Symfony\Component\Process\Process;
5+
use Symfony\Component\Process\PhpExecutableFinder;
6+
7+
/**
8+
* QueueHandler
9+
*
10+
* @author Oliver Stark
11+
* @package AsyncQueue
12+
* @since 1.3.0
13+
*
14+
*/
15+
class QueueHandler
16+
{
17+
18+
/**
19+
* Runs craft queue/run in the background
20+
*/
21+
public function startBackgroundProcess()
22+
{
23+
$cmd = $this->getCommand();
24+
$cwd = CRAFT_BASE_PATH;
25+
26+
$process = new Process($cmd, $cwd);
27+
28+
try {
29+
$process->run();
30+
} catch (\Exception $e) {
31+
Craft::error($e, __METHOD__);
32+
}
33+
34+
Craft::trace(
35+
Craft::t(
36+
'async-queue',
37+
'Job status: {status}. Exit code: {code}', ['status' => $process->getStatus(), 'code' => $process->getExitCodeText()]
38+
),
39+
__METHOD__
40+
);
41+
42+
}
43+
44+
45+
/**
46+
* Construct queue command
47+
*
48+
* @return string
49+
*/
50+
protected function getCommand()
51+
{
52+
$executableFinder = new PhpExecutableFinder();
53+
if (false === $php = $executableFinder->find(false)) {
54+
return null;
55+
} else {
56+
$cmd = array_merge(
57+
[$php],
58+
$executableFinder->findArguments(),
59+
['craft', 'queue/run -v']
60+
);
61+
62+
return $this->getBackgroundCommand(implode(' ', $cmd));
63+
}
64+
}
65+
66+
67+
/**
68+
* Extend command with background syntax
69+
*
70+
* @param string $cmd
71+
*
72+
* @return string
73+
*/
74+
protected function getBackgroundCommand(string $cmd): string
75+
{
76+
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
77+
return 'start /B ' . $cmd . ' > NUL';
78+
} else {
79+
return 'nice ' . $cmd . ' > /dev/null 2>&1 &';
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)