|
4 | 4 | *
|
5 | 5 | * A queue handler that moves queue execution to a non-blocking background process
|
6 | 6 | *
|
7 |
| - * @link http://www.fortrabbit.com |
| 7 | + * @link https://www.fortrabbit.com |
8 | 8 | * @copyright Copyright (c) 2017 Oliver Stark
|
9 | 9 | */
|
10 | 10 |
|
11 | 11 | namespace ostark\AsyncQueue;
|
12 | 12 |
|
13 |
| - |
14 | 13 | use Craft;
|
15 | 14 | use craft\base\Plugin as BasePlugin;
|
16 | 15 | use craft\queue\BaseJob;
|
17 | 16 | use craft\queue\Command;
|
18 | 17 | use craft\queue\Queue;
|
19 |
| -use Symfony\Component\Process\Process; |
20 |
| -use Symfony\Component\Process\PhpExecutableFinder; |
21 | 18 | use yii\base\ActionEvent;
|
22 | 19 | use yii\base\Event;
|
| 20 | +use yii\caching\CacheInterface; |
23 | 21 | use yii\queue\PushEvent;
|
24 | 22 |
|
25 | 23 |
|
|
33 | 31 | */
|
34 | 32 | class Plugin extends BasePlugin
|
35 | 33 | {
|
36 |
| - |
37 |
| - const LOCK_NAME = 'async-queue-lock'; |
38 |
| - const LOCK_TIMEOUT = 60; |
39 |
| - |
40 |
| - /** |
41 |
| - * @var bool mutex |
42 |
| - */ |
43 |
| - protected $inProgress = false; |
44 |
| - |
45 | 34 | /**
|
46 | 35 | * Init plugin
|
47 | 36 | */
|
48 | 37 | public function init()
|
49 | 38 | {
|
50 |
| - |
51 | 39 | parent::init();
|
52 | 40 |
|
53 |
| - Event::on(Command::class, Command::EVENT_AFTER_ACTION, function(ActionEvent $event) { |
54 |
| - if ('run' === $event->action->id) { |
55 |
| - $this->setInProgress(false); |
| 41 | + // Register plugin components |
| 42 | + $this->setComponents([ |
| 43 | + 'async_handler' => QueueHandler::class, |
| 44 | + 'async_pool' => QueuePool::class, |
| 45 | + ]); |
| 46 | + |
| 47 | + // Tell yii about the concrete implementation of CacheInterface |
| 48 | + Craft::$container->set(CacheInterface::class, Craft::$app->getCache()); |
| 49 | + |
| 50 | + // EventHandlers |
| 51 | + // ========================================================================= |
| 52 | + |
| 53 | + PushEvent::on( |
| 54 | + Queue::class, |
| 55 | + Queue::EVENT_AFTER_PUSH, |
| 56 | + function (PushEvent $event) { |
| 57 | + // Disable frontend queue runner |
| 58 | + Craft::$app->getConfig()->getGeneral()->runQueueAutomatically = false; |
| 59 | + |
| 60 | + // Run queue in the background |
| 61 | + if ($this->getPool()->canIUse()) { |
| 62 | + $this->getHandler()->startBackgroundProcess(); |
| 63 | + $this->getPool()->increment(); |
| 64 | + $handled = true; |
| 65 | + } |
| 66 | + |
| 67 | + // Log what's going on |
| 68 | + $this->logPushEvent($event, $handled ?? false); |
56 | 69 | }
|
57 |
| - }); |
58 |
| - |
59 |
| - // Listen to |
60 |
| - PushEvent::on(Queue::class, Queue::EVENT_AFTER_PUSH, function (PushEvent $event) { |
61 |
| - |
62 |
| - // Disable frontend queue runner |
63 |
| - Craft::$app->getConfig()->getGeneral()->runQueueAutomatically = false; |
64 |
| - |
65 |
| - if ($event->job instanceof BaseJob) { |
66 |
| - Craft::trace( |
67 |
| - Craft::t( |
68 |
| - 'async-queue', |
69 |
| - 'Handling PushEvent for {job} job', ['job' => $event->job->getDescription()] |
70 |
| - ), |
71 |
| - __METHOD__ |
72 |
| - ); |
73 |
| - } |
74 |
| - |
75 |
| - // Run queue in the background |
76 |
| - $this->startBackgroundProcess(); |
77 |
| - }); |
| 70 | + ); |
78 | 71 |
|
| 72 | + Event::on( |
| 73 | + Command::class, |
| 74 | + Command::EVENT_AFTER_ACTION, |
| 75 | + function (ActionEvent $event) { |
| 76 | + if ('run' === $event->action->id) { |
| 77 | + $this->getPool()->decrement(); |
| 78 | + } |
| 79 | + } |
| 80 | + ); |
79 | 81 | }
|
80 | 82 |
|
81 | 83 |
|
| 84 | + // ServiceLocators |
| 85 | + // ========================================================================= |
| 86 | + |
82 | 87 | /**
|
83 |
| - * Runs craft queue/run in the background |
84 |
| - * |
85 |
| - * @return bool |
| 88 | + * @return \ostark\AsyncQueue\QueueHandler |
86 | 89 | */
|
87 |
| - protected function startBackgroundProcess() |
| 90 | + public function getHandler(): QueueHandler |
88 | 91 | {
|
89 |
| - if ($this->isInProgress()) { |
90 |
| - return false; |
91 |
| - } |
92 |
| - |
93 |
| - $cmd = $this->getCommand(); |
94 |
| - $cwd = CRAFT_BASE_PATH; |
95 |
| - |
96 |
| - $process = new Process($cmd, $cwd); |
97 |
| - |
98 |
| - try { |
99 |
| - $process->run(); |
100 |
| - $this->setInProgress(true); |
101 |
| - } catch (\Exception $e) { |
102 |
| - Craft::error($e, __METHOD__); |
103 |
| - } |
104 |
| - |
105 |
| - Craft::trace( |
106 |
| - Craft::t( |
107 |
| - 'async-queue', |
108 |
| - 'Job status: {status}. Exit code: {code}', ['status' => $process->getStatus(), 'code' => $process->getExitCodeText()] |
109 |
| - ), |
110 |
| - __METHOD__ |
111 |
| - ); |
112 |
| - |
113 |
| - return true; |
| 92 | + return $this->get('async_handler'); |
114 | 93 | }
|
115 | 94 |
|
116 |
| - |
117 | 95 | /**
|
118 |
| - * Construct queue command |
119 |
| - * |
120 |
| - * @return string |
| 96 | + * @return \ostark\AsyncQueue\QueuePool |
121 | 97 | */
|
122 |
| - protected function getCommand() |
| 98 | + public function getPool(): QueuePool |
123 | 99 | {
|
124 |
| - $executableFinder = new PhpExecutableFinder(); |
125 |
| - if (false === $php = $executableFinder->find(false)) { |
126 |
| - return null; |
127 |
| - } else { |
128 |
| - $cmd = array_merge( |
129 |
| - [$php], |
130 |
| - $executableFinder->findArguments(), |
131 |
| - ['craft', 'queue/run -v'] |
132 |
| - ); |
133 |
| - |
134 |
| - return $this->getBackgroundCommand(implode(' ', $cmd)); |
135 |
| - } |
| 100 | + return $this->get('async_pool'); |
136 | 101 | }
|
137 | 102 |
|
138 | 103 |
|
139 | 104 | /**
|
140 |
| - * Extend command with background syntax |
141 |
| - * |
142 |
| - * @param string $cmd |
143 |
| - * |
144 |
| - * @return string |
| 105 | + * @param \yii\queue\PushEvent $event |
| 106 | + * @param bool $handled |
145 | 107 | */
|
146 |
| - protected function getBackgroundCommand(string $cmd): string |
147 |
| - { |
148 |
| - if (defined('PHP_WINDOWS_VERSION_BUILD')) { |
149 |
| - return 'start /B ' . $cmd . ' > NUL'; |
150 |
| - } else { |
151 |
| - return 'nice ' . $cmd . ' > /dev/null 2>&1 &'; |
152 |
| - } |
153 |
| - } |
154 |
| - |
155 |
| - protected function isInProgress() |
156 |
| - { |
157 |
| - if (\Craft::$app->getCache()->get(self::LOCK_NAME)) { |
158 |
| - Craft::trace( |
159 |
| - Craft::t( |
160 |
| - 'async-queue', |
161 |
| - 'Background process running' |
162 |
| - ), |
163 |
| - __METHOD__ |
164 |
| - ); |
165 |
| - return true; |
166 |
| - } |
167 |
| - |
168 |
| - return false; |
169 |
| - } |
170 |
| - |
171 |
| - protected function setInProgress($progress = true) |
| 108 | + protected function logPushEvent(PushEvent $event, $handled = false) |
172 | 109 | {
|
173 |
| - // set |
174 |
| - if ($progress) { |
175 |
| - return \Craft::$app->getCache()->set(self::LOCK_NAME, self::LOCK_NAME, self::LOCK_TIMEOUT); |
| 110 | + if ($event->job instanceof BaseJob) { |
| 111 | + Craft::trace( |
| 112 | + Craft::t( |
| 113 | + 'async-queue', |
| 114 | + 'New PushEvent for {job} job - ({handled})', [ |
| 115 | + 'job' => $event->job->getDescription(), |
| 116 | + 'handled' => $handled ? 'handled' : 'skipped' |
| 117 | + ] |
| 118 | + ), |
| 119 | + __METHOD__ |
| 120 | + ); |
176 | 121 | }
|
177 |
| - |
178 |
| - // remove |
179 |
| - return \Craft::$app->getCache()->delete(self::LOCK_NAME); |
180 |
| - |
181 | 122 | }
|
182 | 123 |
|
183 |
| - |
184 | 124 | }
|
0 commit comments