Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions config/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
return (function () {
$inContainer = in_container();
$progressTimeCheck = fn(int $v, int $d): int => 0 === $v || $v >= 180 ? $v : $d;
$progressToMS = fn(int $v): int => $v < 60 ? $v * 1000 : 60000;
$progressToMS = fn(int $v): int => $v < 60 ? $v * 1_000 : 60_000;
$tokenExpiry = max(1, (int) env('WS_AUTH_TOKEN_EXPIRY', 2 * 24 * 60 * 60));
$defaultRefreshWindow = max(60, min(24 * 60 * 60, max(60, intdiv($tokenExpiry, 4))));
$logsPruneAfter = (string) env('WS_LOGS_PRUNE_AFTER', '-7 DAYS');
Expand Down Expand Up @@ -298,7 +298,7 @@
'opcache.enable' => 1,
'opcache.memory_consumption' => 128,
'opcache.interned_strings_buffer' => 16,
'opcache.max_accelerated_files' => 10000,
'opcache.max_accelerated_files' => 10_000,
'opcache.max_wasted_percentage' => 5,
'opcache.validate_timestamps' => $inContainer ? 0 : 1,
'expose_php' => 0,
Expand Down Expand Up @@ -416,6 +416,19 @@

$config['events'] = [
'logfile' => ag($config, 'tmpDir') . '/logs/events.' . $logDateFormat . '.log',
'queue' => [
'driver' => env('WS_EVENTS_QUEUE_DRIVER', 'auto'),
'path' => env('WS_EVENTS_QUEUE_PATH', fn() => ag($config, 'tmpDir') . '/queue/events'),
'file' => [
'claim_after_seconds' => (int) env('WS_EVENTS_QUEUE_FILE_CLAIM_AFTER_SECONDS', 300),
],
'redis' => [
'stream' => env('WS_EVENTS_QUEUE_REDIS_STREAM', 'watchstate:events'),
'group' => env('WS_EVENTS_QUEUE_REDIS_GROUP', 'watchstate'),
'consumer' => env('WS_EVENTS_QUEUE_REDIS_CONSUMER', 'ws-worker'),
'claim_after_ms' => (int) env('WS_EVENTS_QUEUE_REDIS_CLAIM_AFTER_MS', 300_000),
],
],
'listeners' => [
'cache' => new DateInterval(env('WS_EVENTS_LISTENERS_CACHE', 'PT1M')),
'file' => env('APP_EVENTS_FILE', function () use ($config): ?string {
Expand Down
1 change: 1 addition & 0 deletions config/directories.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
'{path}/users',
'{tmp_dir}/logs',
'{tmp_dir}/cache',
'{tmp_dir}/queue/events',
'{tmp_dir}/profiler',
'{tmp_dir}/webhooks',
'{tmp_dir}/debug',
Expand Down
44 changes: 44 additions & 0 deletions config/env.spec.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

declare(strict_types=1);

/**
* Last update: 2025-07-13
*
Expand Down Expand Up @@ -195,6 +197,48 @@
'description' => 'Where to store cache data. This is usually used if the cache server is not available and/or experiencing issues.',
'type' => 'string',
],
[
'key' => 'WS_EVENTS_QUEUE_DRIVER',
'config' => 'events.queue.driver',
'description' => 'The event queue transport driver. Supported values are auto, redis, file.',
'type' => 'string',
],
[
'key' => 'WS_EVENTS_QUEUE_PATH',
'config' => 'events.queue.path',
'description' => 'Where to store filesystem event queue payloads.',
'type' => 'string',
],
[
'key' => 'WS_EVENTS_QUEUE_REDIS_STREAM',
'config' => 'events.queue.redis.stream',
'description' => 'Redis stream key used for queued events.',
'type' => 'string',
],
[
'key' => 'WS_EVENTS_QUEUE_FILE_CLAIM_AFTER_SECONDS',
'config' => 'events.queue.file.claim_after_seconds',
'description' => 'How old a claimed filesystem queued event must be before another dispatcher can reclaim it.',
'type' => 'int',
],
[
'key' => 'WS_EVENTS_QUEUE_REDIS_GROUP',
'config' => 'events.queue.redis.group',
'description' => 'Redis consumer group used for queued events.',
'type' => 'string',
],
[
'key' => 'WS_EVENTS_QUEUE_REDIS_CONSUMER',
'config' => 'events.queue.redis.consumer',
'description' => 'Redis consumer name used by this WatchState instance.',
'type' => 'string',
],
[
'key' => 'WS_EVENTS_QUEUE_REDIS_CLAIM_AFTER_MS',
'config' => 'events.queue.redis.claim_after_ms',
'description' => 'How old a pending Redis stream event must be before another dispatcher can reclaim it.',
'type' => 'int',
],
[
'key' => 'WS_LOGGER_SYSLOG_FACILITY',
'config' => 'logger.syslog.facility',
Expand Down
50 changes: 48 additions & 2 deletions config/services.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@

use App\Libs\Config;
use App\Libs\ConfigFile;
use App\Libs\Container;
use App\Libs\Database\DatabaseInterface as iDB;
use App\Libs\Database\DBLayer;
use App\Libs\Database\PDO\PDOAdapter;
use App\Libs\Database\PdoFactory;
use App\Libs\Entity\StateEntity;
use App\Libs\Entity\StateInterface;
use App\Libs\Events\EventQueue;
use App\Libs\Events\Queue\ArrayEventTransport;
use App\Libs\Events\Queue\EventTransportInterface;
use App\Libs\Events\Queue\FilesystemEventTransport;
use App\Libs\Events\Queue\NullEventTransport;
use App\Libs\Events\Queue\RedisStreamEventTransport;
use App\Libs\Exceptions\RuntimeException;
use App\Libs\Extends\ConsoleOutput;
use App\Libs\Extends\HttpClient;
Expand Down Expand Up @@ -105,10 +111,50 @@
'class' => fn() => new QueueRequests(),
],

EventTransportInterface::class => [
'class' => function (): EventTransportInterface {
$driver = strtolower((string) Config::get('events.queue.driver', 'auto'));

if ('null' === $driver) {
return new NullEventTransport();
}

if ('array' === $driver || true === (defined('IN_TEST_MODE') && true === IN_TEST_MODE) && 'auto' === $driver) {
return new ArrayEventTransport();
}

if ('file' === $driver) {
return new FilesystemEventTransport(
path: (string) Config::get('events.queue.path'),
claimAfterSeconds: (int) Config::get('events.queue.file.claim_after_seconds', 300),
);
}

try {
return new RedisStreamEventTransport(
redis: Container::get(Redis::class),
stream: (string) Config::get('events.queue.redis.stream'),
group: (string) Config::get('events.queue.redis.group'),
consumer: (string) Config::get('events.queue.redis.consumer'),
claimAfterMs: (int) Config::get('events.queue.redis.claim_after_ms', 300_000),
);
} catch (Throwable $e) {
if ('redis' === $driver) {
throw $e;
}

return new FilesystemEventTransport(
path: (string) Config::get('events.queue.path'),
claimAfterSeconds: (int) Config::get('events.queue.file.claim_after_seconds', 300),
);
}
},
],

EventQueue::class => [
'class' => fn(iCache $cache, EventsRepository $repo): EventQueue => new EventQueue($cache, $repo),
'class' => fn(EventTransportInterface $transport, EventsRepository $repo): EventQueue => new EventQueue($transport, $repo),
'args' => [
iCache::class,
EventTransportInterface::class,
EventsRepository::class,
],
],
Expand Down
5 changes: 3 additions & 2 deletions container/files/redis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ loglevel nothing
# Persistence
dbfilename redis.rdb
dir /config/cache/
appendonly no
appendonly yes
appendfilename appendonly.aof
appendfsync everysec
save 900 1
save 300 10
save 60 10000

# Arbitrary Parameters
maxmemory-policy allkeys-lru
maxmemory-policy noeviction
slowlog-log-slower-than 10000
slowlog-max-len 128
notify-keyspace-events ""
Expand Down
20 changes: 8 additions & 12 deletions src/API/WebHook.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,24 @@

namespace App\API;

use App\Commands\System\TasksCommand;
use App\Libs\Attributes\Route\Route;
use App\Libs\Config;
use App\Libs\Enums\Http\Status;
use App\Libs\Middlewares\AuthorizationMiddleware;
use App\Libs\Options;
use App\Libs\Uri;
use App\Listeners\ProcessWebhookEvent;
use DateInterval;
use Psr\Http\Message\ResponseInterface as iResponse;
use Psr\Http\Message\ServerRequestInterface as iRequest;
use Psr\SimpleCache\CacheInterface as iCache;

final class WebHook
{
public const string URL = '%{api.prefix}/webhook';

public function __construct(
private readonly iCache $cache,
) {}
public function __construct()
{
set_time_limit(0);
}

/**
* Receive a webhook request from a backend.
Expand All @@ -48,12 +46,10 @@ public function __invoke(iRequest $request): iResponse
array_flip([AuthorizationMiddleware::KEY_NAME, AuthorizationMiddleware::TOKEN_NAME]),
);

$opts = [];

if (true === (bool) $this->cache->get(TasksCommand::CACHE_NAME, false)) {
$opts[Options::CACHE_ONLY] = true;
$opts[Options::CACHE_TTL] = new DateInterval('PT6H');
}
$opts = [
Options::FAIL_FAST_ON_LOCK => true,
Options::QUEUE_ONLY => true,
];

$post = $request->getParsedBody();
$data = [
Expand Down
53 changes: 53 additions & 0 deletions src/Commands/Events/DispatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use App\Libs\Attributes\Route\Cli;
use App\Libs\Config;
use App\Libs\Events\DataEvent;
use App\Libs\Events\EventQueue;
use App\Libs\Events\Queue\EventEnvelope;
use App\Libs\Events\Queue\EventTransportInterface as iEventTransport;
use App\Libs\Extends\ProxyHandler;
use App\Libs\Options;
use App\Model\Events\Event;
Expand Down Expand Up @@ -49,6 +52,8 @@ final class DispatchCommand extends Command
public function __construct(
private readonly iDispatcher $dispatcher,
private readonly EventsRepository $repo,
private readonly EventQueue $queue,
private readonly iEventTransport $transport,
private readonly iCache $cache,
private iLogger $logger,
) {
Expand All @@ -67,6 +72,7 @@ protected function configure(): void

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->drainTransport((int) $input->getOption('limit'));
$this->unloadEvents();

register_events();
Expand All @@ -92,6 +98,53 @@ protected function execute(InputInterface $input, OutputInterface $output): int
return $this->runEvents(visibleLevel: $visibleLevel, limit: (int) $input->getOption('limit'), debug: $debug);
}

private function drainTransport(int $limit): void
{
foreach ($this->transport->dequeue($limit) as $envelope) {
try {
$item = $this->queue->materialize($envelope);
$this->transport->ack($envelope);
$this->logger->info("Materialized queued event '{event}'.", [
'event_name' => 'event.queue.materialized',
'subsystem' => 'events',
'operation' => 'queue.materialize',
'outcome' => 'success',
'event' => $envelope->event,
'queue_id' => $envelope->id,
'item_id' => $item->id,
]);
} catch (Throwable $e) {
$this->handleDrainFailure($envelope, $e);
}
}
}

private function handleDrainFailure(EventEnvelope $envelope, Throwable $e): void
{
$isLock = false !== stripos($e->getMessage(), 'database is locked');

if (true === $isLock) {
$this->transport->release($envelope);
$outcome = 'retry';
$reason = 'database_locked';
} else {
$this->transport->fail($envelope);
$outcome = 'failed';
$reason = 'materialize_failed';
}

$this->logger->error("Failed to materialize queued event '{event}'.", [
'event_name' => 'event.queue.materialize_failed',
'subsystem' => 'events',
'operation' => 'queue.materialize',
'outcome' => $outcome,
'reason' => $reason,
'event' => $envelope->event,
'queue_id' => $envelope->id,
...exception_log($e),
]);
}

protected function runEvents(Level $visibleLevel, int $limit, bool $debug = false): int
{
$repo = $this->repo
Expand Down
Loading
Loading