diff --git a/src/Client/ScheduleClient.php b/src/Client/ScheduleClient.php index d97a0002c..9df8f95ea 100644 --- a/src/Client/ScheduleClient.php +++ b/src/Client/ScheduleClient.php @@ -32,6 +32,11 @@ use Temporal\DataConverter\DataConverter; use Temporal\DataConverter\DataConverterInterface; use Temporal\Internal\Mapper\ScheduleMapper; +use Temporal\Plugin\ConnectionPluginContext; +use Temporal\Plugin\ConnectionPluginInterface; +use Temporal\Plugin\PluginRegistry; +use Temporal\Plugin\ScheduleClientPluginContext; +use Temporal\Plugin\ScheduleClientPluginInterface; use Temporal\Internal\Marshaller\Mapper\AttributeMapperFactory; use Temporal\Internal\Marshaller\Marshaller; use Temporal\Internal\Marshaller\MarshallerInterface; @@ -45,14 +50,37 @@ final class ScheduleClient implements ScheduleClientInterface private DataConverterInterface $converter; private MarshallerInterface $marshaller; private ProtoToArrayConverter $protoConverter; + private PluginRegistry $pluginRegistry; public function __construct( ServiceClientInterface $serviceClient, ?ClientOptions $options = null, ?DataConverterInterface $converter = null, + ?PluginRegistry $pluginRegistry = null, ) { $this->clientOptions = $options ?? new ClientOptions(); $this->converter = $converter ?? DataConverter::createDefault(); + $this->pluginRegistry = $pluginRegistry ?? new PluginRegistry(); + + // Apply connection plugins (before client-level configuration) + $connectionContext = new ConnectionPluginContext($serviceClient); + foreach ($this->pluginRegistry->getPlugins(ConnectionPluginInterface::class) as $plugin) { + $plugin->configureServiceClient($connectionContext); + } + $serviceClient = $connectionContext->getServiceClient(); + + $pluginContext = new ScheduleClientPluginContext( + clientOptions: $this->clientOptions, + dataConverter: $this->converter, + ); + foreach ($this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class) as $plugin) { + $plugin->configureScheduleClient($pluginContext); + } + $this->clientOptions = $pluginContext->getClientOptions(); + if ($pluginContext->getDataConverter() !== null) { + $this->converter = $pluginContext->getDataConverter(); + } + $this->marshaller = new Marshaller( new AttributeMapperFactory(new AttributeReader()), ); @@ -71,8 +99,9 @@ public static function create( ServiceClientInterface $serviceClient, ?ClientOptions $options = null, ?DataConverterInterface $converter = null, + ?PluginRegistry $pluginRegistry = null, ): ScheduleClientInterface { - return new self($serviceClient, $options, $converter); + return new self($serviceClient, $options, $converter, $pluginRegistry); } public function createSchedule( diff --git a/src/Client/WorkflowClient.php b/src/Client/WorkflowClient.php index e982fa5e9..066a51e0f 100644 --- a/src/Client/WorkflowClient.php +++ b/src/Client/WorkflowClient.php @@ -38,6 +38,14 @@ use Temporal\Interceptor\WorkflowClientCallsInterceptor; use Temporal\Internal\Client\ActivityCompletionClient; use Temporal\Internal\Client\WorkflowProxy; +use Temporal\Plugin\ClientPluginContext; +use Temporal\Plugin\ClientPluginInterface; +use Temporal\Plugin\CompositePipelineProvider; +use Temporal\Plugin\ConnectionPluginContext; +use Temporal\Plugin\ConnectionPluginInterface; +use Temporal\Plugin\PluginRegistry; +use Temporal\Plugin\ScheduleClientPluginInterface; +use Temporal\Plugin\WorkerPluginInterface; use Temporal\Internal\Client\WorkflowRun; use Temporal\Internal\Client\WorkflowStarter; use Temporal\Internal\Client\WorkflowStub; @@ -63,6 +71,7 @@ class WorkflowClient implements WorkflowClientInterface private DataConverterInterface $converter; private ?WorkflowStarter $starter = null; private WorkflowReader $reader; + private PluginRegistry $pluginRegistry; /** @var Pipeline */ private Pipeline $interceptorPipeline; @@ -72,11 +81,39 @@ public function __construct( ?ClientOptions $options = null, ?DataConverterInterface $converter = null, ?PipelineProvider $interceptorProvider = null, + ?PluginRegistry $pluginRegistry = null, ) { - $this->interceptorPipeline = ($interceptorProvider ?? new SimplePipelineProvider()) - ->getPipeline(WorkflowClientCallsInterceptor::class); + $this->pluginRegistry = $pluginRegistry ?? new PluginRegistry(); $this->clientOptions = $options ?? new ClientOptions(); $this->converter = $converter ?? DataConverter::createDefault(); + + // Apply connection plugins (before client-level configuration) + $connectionContext = new ConnectionPluginContext($serviceClient); + foreach ($this->pluginRegistry->getPlugins(ConnectionPluginInterface::class) as $plugin) { + $plugin->configureServiceClient($connectionContext); + } + $serviceClient = $connectionContext->getServiceClient(); + + $pluginContext = new ClientPluginContext( + clientOptions: $this->clientOptions, + dataConverter: $this->converter, + ); + foreach ($this->pluginRegistry->getPlugins(ClientPluginInterface::class) as $plugin) { + $plugin->configureClient($pluginContext); + } + + $this->clientOptions = $pluginContext->getClientOptions(); + if ($pluginContext->getDataConverter() !== null) { + $this->converter = $pluginContext->getDataConverter(); + } + + // Build interceptor pipeline: merge plugin-contributed interceptors with user-provided ones + $provider = new CompositePipelineProvider( + $pluginContext->getInterceptors(), + $interceptorProvider ?? new SimplePipelineProvider(), + ); + + $this->interceptorPipeline = $provider->getPipeline(WorkflowClientCallsInterceptor::class); $this->reader = new WorkflowReader($this->createReader()); // Set Temporal-Namespace metadata @@ -96,8 +133,29 @@ public static function create( ?ClientOptions $options = null, ?DataConverterInterface $converter = null, ?PipelineProvider $interceptorProvider = null, + ?PluginRegistry $pluginRegistry = null, ): self { - return new self($serviceClient, $options, $converter, $interceptorProvider); + return new self($serviceClient, $options, $converter, $interceptorProvider, $pluginRegistry); + } + + /** + * Get plugins that also implement WorkerPluginInterface for propagation to workers. + * + * @return list + */ + public function getWorkerPlugins(): array + { + return $this->pluginRegistry->getPlugins(WorkerPluginInterface::class); + } + + /** + * Get plugins that also implement ScheduleClientPluginInterface for propagation to schedule clients. + * + * @return list + */ + public function getScheduleClientPlugins(): array + { + return $this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class); } public function getServiceClient(): ServiceClientInterface diff --git a/src/Interceptor/SimplePipelineProvider.php b/src/Interceptor/SimplePipelineProvider.php index 56360f6a1..ba0f1ab1d 100644 --- a/src/Interceptor/SimplePipelineProvider.php +++ b/src/Interceptor/SimplePipelineProvider.php @@ -22,14 +22,28 @@ class SimplePipelineProvider implements PipelineProvider * @param array $interceptors */ public function __construct( - private iterable $interceptors = [], + private readonly iterable $interceptors = [], ) {} + /** + * Create a new provider with additional interceptors prepended. + * + * @param list $interceptors Interceptors to prepend before existing ones. + */ + public function withPrependedInterceptors(array $interceptors): self + { + if ($interceptors === []) { + return $this; + } + + return new self(\array_merge($interceptors, [...$this->interceptors])); + } + public function getPipeline(string $interceptorClass): Pipeline { return $this->cache[$interceptorClass] ??= Pipeline::prepare( \array_filter( - $this->interceptors, + [...$this->interceptors], static fn(Interceptor $i): bool => $i instanceof $interceptorClass, ), ); diff --git a/src/Internal/Transport/Router/GetWorkerInfo.php b/src/Internal/Transport/Router/GetWorkerInfo.php index 915141de7..596c214d4 100644 --- a/src/Internal/Transport/Router/GetWorkerInfo.php +++ b/src/Internal/Transport/Router/GetWorkerInfo.php @@ -18,6 +18,8 @@ use Temporal\Internal\Declaration\Prototype\WorkflowPrototype; use Temporal\Internal\Marshaller\MarshallerInterface; use Temporal\Internal\Repository\RepositoryInterface; +use Temporal\Plugin\PluginInterface; +use Temporal\Plugin\PluginRegistry; use Temporal\Worker\ServiceCredentials; use Temporal\Worker\Transport\Command\ServerRequestInterface; use Temporal\Worker\WorkerInterface; @@ -28,6 +30,7 @@ public function __construct( private readonly RepositoryInterface $queues, private readonly MarshallerInterface $marshaller, private readonly ServiceCredentials $credentials, + private readonly PluginRegistry $pluginRegistry, ) {} public function handle(ServerRequestInterface $request, array $headers, Deferred $resolver): void @@ -54,6 +57,10 @@ private function workerToArray(WorkerInterface $worker): array 'Name' => $activity->getID(), ]; + $map = $this->map($this->pluginRegistry->getPlugins(PluginInterface::class), static fn(PluginInterface $plugin): array => [ + 'Name' => $plugin->getName(), + 'Version' => null, + ]); return [ 'TaskQueue' => $worker->getID(), 'Options' => $this->marshaller->marshal($worker->getOptions()), @@ -62,6 +69,7 @@ private function workerToArray(WorkerInterface $worker): array // ActivityInfo[] 'Activities' => $this->map($worker->getActivities(), $activityMap), 'PhpSdkVersion' => SdkVersion::getSdkVersion(), + 'Plugins' => $map, 'Flags' => (object) $this->prepareFlags(), ]; } diff --git a/src/Plugin/AbstractPlugin.php b/src/Plugin/AbstractPlugin.php new file mode 100644 index 000000000..2b2e6d10a --- /dev/null +++ b/src/Plugin/AbstractPlugin.php @@ -0,0 +1,34 @@ +name; + } +} diff --git a/src/Plugin/ClientPluginContext.php b/src/Plugin/ClientPluginContext.php new file mode 100644 index 000000000..fad098ecd --- /dev/null +++ b/src/Plugin/ClientPluginContext.php @@ -0,0 +1,81 @@ + */ + private array $interceptors = []; + + public function __construct( + private ClientOptions $clientOptions, + private ?DataConverterInterface $dataConverter = null, + ) {} + + public function getClientOptions(): ClientOptions + { + return $this->clientOptions; + } + + public function setClientOptions(ClientOptions $clientOptions): self + { + $this->clientOptions = $clientOptions; + return $this; + } + + public function getDataConverter(): ?DataConverterInterface + { + return $this->dataConverter; + } + + public function setDataConverter(?DataConverterInterface $dataConverter): self + { + $this->dataConverter = $dataConverter; + return $this; + } + + /** + * @return list + */ + public function getInterceptors(): array + { + return $this->interceptors; + } + + /** + * @param list $interceptors + */ + public function setInterceptors(array $interceptors): self + { + $this->interceptors = $interceptors; + return $this; + } + + /** + * Add an interceptor to the client pipeline. + */ + public function addInterceptor(Interceptor $interceptor): self + { + $this->interceptors[] = $interceptor; + return $this; + } +} diff --git a/src/Plugin/ClientPluginInterface.php b/src/Plugin/ClientPluginInterface.php new file mode 100644 index 000000000..c7eb36878 --- /dev/null +++ b/src/Plugin/ClientPluginInterface.php @@ -0,0 +1,30 @@ + $pluginInterceptors Interceptors contributed by plugins. + * @param PipelineProvider $baseProvider The original user-provided pipeline provider. + */ + public function __construct( + array $pluginInterceptors, + PipelineProvider $baseProvider, + ) { + $this->delegate = match (true) { + $pluginInterceptors === [] => $baseProvider, + $baseProvider instanceof SimplePipelineProvider => $baseProvider->withPrependedInterceptors($pluginInterceptors), + default => new class($pluginInterceptors, $baseProvider) implements PipelineProvider { + /** @var array */ + private array $cache = []; + + /** + * @param list $pluginInterceptors + */ + public function __construct( + private readonly array $pluginInterceptors, + private readonly PipelineProvider $baseProvider, + ) {} + + public function getPipeline(string $interceptorClass): Pipeline + { + if (isset($this->cache[$interceptorClass])) { + return $this->cache[$interceptorClass]; + } + + $filtered = \array_filter( + $this->pluginInterceptors, + static fn(Interceptor $i): bool => $i instanceof $interceptorClass, + ); + + if ($filtered === []) { + return $this->cache[$interceptorClass] = $this->baseProvider->getPipeline($interceptorClass); + } + + // Use only plugin interceptors - the base pipeline is lost in this edge case. + // Users should either use plugins OR a custom PipelineProvider, not both. + return $this->cache[$interceptorClass] = Pipeline::prepare($filtered); + } + }, + }; + } + + public function getPipeline(string $interceptorClass): Pipeline + { + return $this->delegate->getPipeline($interceptorClass); + } +} diff --git a/src/Plugin/ConnectionPluginContext.php b/src/Plugin/ConnectionPluginContext.php new file mode 100644 index 000000000..0108e1581 --- /dev/null +++ b/src/Plugin/ConnectionPluginContext.php @@ -0,0 +1,39 @@ +serviceClient; + } + + public function setServiceClient(ServiceClientInterface $serviceClient): self + { + $this->serviceClient = $serviceClient; + return $this; + } +} diff --git a/src/Plugin/ConnectionPluginInterface.php b/src/Plugin/ConnectionPluginInterface.php new file mode 100644 index 000000000..4caa6f7fe --- /dev/null +++ b/src/Plugin/ConnectionPluginInterface.php @@ -0,0 +1,36 @@ + */ + private array $plugins = []; + + /** + * @param iterable $plugins + */ + public function __construct(iterable $plugins = []) + { + foreach ($plugins as $plugin) { + $this->add($plugin); + } + } + + public function add(ConnectionPluginInterface|ClientPluginInterface|ScheduleClientPluginInterface|WorkerPluginInterface $plugin): void + { + $name = $plugin->getName(); + if (isset($this->plugins[$name])) { + throw new \RuntimeException(\sprintf( + 'Duplicate plugin "%s": a plugin with this name is already registered.', + $name, + )); + } + $this->plugins[$name] = $plugin; + } + + /** + * Merge another set of plugins. Throws on duplicate names. + * + * @param iterable $plugins + */ + public function merge(iterable $plugins): void + { + foreach ($plugins as $plugin) { + $this->add($plugin); + } + } + + /** + * Get all plugins implementing a given interface. + * + * @template T of TPlugin + * @param class-string $interface + * @return list + */ + public function getPlugins(string $interface): array + { + $result = []; + foreach ($this->plugins as $plugin) { + if ($plugin instanceof $interface) { + $result[] = $plugin; + } + } + return $result; + } +} diff --git a/src/Plugin/ScheduleClientPluginContext.php b/src/Plugin/ScheduleClientPluginContext.php new file mode 100644 index 000000000..c38ce683e --- /dev/null +++ b/src/Plugin/ScheduleClientPluginContext.php @@ -0,0 +1,51 @@ +clientOptions; + } + + public function setClientOptions(ClientOptions $clientOptions): self + { + $this->clientOptions = $clientOptions; + return $this; + } + + public function getDataConverter(): ?DataConverterInterface + { + return $this->dataConverter; + } + + public function setDataConverter(?DataConverterInterface $dataConverter): self + { + $this->dataConverter = $dataConverter; + return $this; + } +} diff --git a/src/Plugin/ScheduleClientPluginInterface.php b/src/Plugin/ScheduleClientPluginInterface.php new file mode 100644 index 000000000..c070d5665 --- /dev/null +++ b/src/Plugin/ScheduleClientPluginInterface.php @@ -0,0 +1,30 @@ +dataConverter; + } + + public function setDataConverter(?DataConverterInterface $dataConverter): self + { + $this->dataConverter = $dataConverter; + return $this; + } +} diff --git a/src/Plugin/WorkerPluginContext.php b/src/Plugin/WorkerPluginContext.php new file mode 100644 index 000000000..9c36311d2 --- /dev/null +++ b/src/Plugin/WorkerPluginContext.php @@ -0,0 +1,87 @@ + */ + private array $interceptors = []; + + public function __construct( + private readonly string $taskQueue, + private WorkerOptions $workerOptions, + private ?ExceptionInterceptorInterface $exceptionInterceptor = null, + ) {} + + public function getTaskQueue(): string + { + return $this->taskQueue; + } + + public function getWorkerOptions(): WorkerOptions + { + return $this->workerOptions; + } + + public function setWorkerOptions(WorkerOptions $workerOptions): self + { + $this->workerOptions = $workerOptions; + return $this; + } + + public function getExceptionInterceptor(): ?ExceptionInterceptorInterface + { + return $this->exceptionInterceptor; + } + + public function setExceptionInterceptor(?ExceptionInterceptorInterface $exceptionInterceptor): self + { + $this->exceptionInterceptor = $exceptionInterceptor; + return $this; + } + + /** + * @return list + */ + public function getInterceptors(): array + { + return $this->interceptors; + } + + /** + * @param list $interceptors + */ + public function setInterceptors(array $interceptors): self + { + $this->interceptors = $interceptors; + return $this; + } + + /** + * Add an interceptor to the worker pipeline. + */ + public function addInterceptor(Interceptor $interceptor): self + { + $this->interceptors[] = $interceptor; + return $this; + } +} diff --git a/src/Plugin/WorkerPluginInterface.php b/src/Plugin/WorkerPluginInterface.php new file mode 100644 index 000000000..e496d6979 --- /dev/null +++ b/src/Plugin/WorkerPluginInterface.php @@ -0,0 +1,74 @@ +close(); + * } + * } + * ``` + * + * @param callable(WorkerFactoryInterface): int $next Calls the next plugin or the actual run loop. + */ + public function run(WorkerFactoryInterface $factory, callable $next): int; +} diff --git a/src/Plugin/WorkerPluginTrait.php b/src/Plugin/WorkerPluginTrait.php new file mode 100644 index 000000000..a2750f4f6 --- /dev/null +++ b/src/Plugin/WorkerPluginTrait.php @@ -0,0 +1,43 @@ +converter = $dataConverter; + $this->pluginRegistry = $pluginRegistry ?? new PluginRegistry(); + // Propagate worker plugins from the client + if ($client !== null) { + $this->pluginRegistry->merge($client->getWorkerPlugins()); + } + + // Apply worker factory plugins + $factoryContext = new WorkerFactoryPluginContext( + dataConverter: $dataConverter, + ); + foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) { + $plugin->configureWorkerFactory($factoryContext); + } + + $this->converter = $factoryContext->getDataConverter() ?? $dataConverter; $this->boot($credentials ?? ServiceCredentials::create()); } @@ -118,11 +142,15 @@ public static function create( ?DataConverterInterface $converter = null, ?RPCConnectionInterface $rpc = null, ?ServiceCredentials $credentials = null, + ?PluginRegistry $pluginRegistry = null, + ?WorkflowClient $client = null, ): static { return new static( $converter ?? DataConverter::createDefault(), $rpc ?? Goridge::create(), $credentials, + $pluginRegistry ?? new PluginRegistry(), + $client, ); } @@ -134,13 +162,32 @@ public function newWorker( ?LoggerInterface $logger = null, ): WorkerInterface { $options ??= WorkerOptions::new(); + + // Apply worker plugins + $workerContext = new WorkerPluginContext( + taskQueue: $taskQueue, + workerOptions: $options, + exceptionInterceptor: $exceptionInterceptor, + ); + foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) { + $plugin->configureWorker($workerContext); + } + + $options = $workerContext->getWorkerOptions(); + + // Merge plugin-contributed interceptors with user-provided ones + $provider = new CompositePipelineProvider( + $workerContext->getInterceptors(), + $interceptorProvider ?? new SimplePipelineProvider(), + ); + $worker = new Worker( $taskQueue, $options, ServiceContainer::fromWorkerFactory( $this, - $exceptionInterceptor ?? ExceptionInterceptor::createDefault(), - $interceptorProvider ?? new SimplePipelineProvider(), + $workerContext->getExceptionInterceptor() ?? ExceptionInterceptor::createDefault(), + $provider, new Logger( $logger ?? new StderrLogger(), $options->enableLoggingInReplay, @@ -149,11 +196,22 @@ public function newWorker( ), $this->rpc, ); + + // Call initializeWorker hooks (forward order) + foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) { + $plugin->initializeWorker($worker); + } + $this->queues->add($worker); return $worker; } + public function getPluginRegistry(): PluginRegistry + { + return $this->pluginRegistry; + } + public function getReader(): ReaderInterface { return $this->reader; @@ -192,15 +250,22 @@ public function run(?HostConnectionInterface $host = null): int $host ??= RoadRunner::create(); $this->codec = $this->createCodec(); - while ($msg = $host->waitBatch()) { - try { - $host->send($this->dispatch($msg->messages, $msg->context)); - } catch (\Throwable $e) { - $host->error($e); + $pipeline = Pipeline::prepare( + $this->pluginRegistry->getPlugins(WorkerPluginInterface::class), + ); + + return $pipeline->with(function () use ($host): int { + while ($msg = $host->waitBatch()) { + try { + $host->send($this->dispatch($msg->messages, $msg->context)); + } catch (\Throwable $e) { + $host->error($e); + } } - } - return 0; + return 0; + /** @see WorkerPluginInterface::run() */ + }, 'run')($this); } public function tick(): void @@ -232,7 +297,12 @@ protected function createTaskQueue(): RepositoryInterface protected function createRouter(ServiceCredentials $credentials): RouterInterface { $router = new Router(); - $router->add(new Router\GetWorkerInfo($this->queues, $this->marshaller, $credentials)); + $router->add(new Router\GetWorkerInfo( + $this->queues, + $this->marshaller, + $credentials, + $this->pluginRegistry, + )); return $router; } diff --git a/testing/src/Environment.php b/testing/src/Environment.php index 2fa8119ee..6260f473a 100644 --- a/testing/src/Environment.php +++ b/testing/src/Environment.php @@ -21,12 +21,14 @@ final class Environment private ?Process $temporalTestServerProcess = null; private ?Process $temporalServerProcess = null; private ?Process $roadRunnerProcess = null; + private bool $externalTemporalProcessActive = false; public function __construct( OutputInterface $output, private Downloader $downloader, private SystemInfo $systemInfo, ?Command $command = null, + private bool $allowExternalTemporalProcess = false, ) { $this->io = $output instanceof SymfonyStyle ? $output @@ -37,6 +39,7 @@ public function __construct( public static function create(?Command $command = null): self { $token = \getenv('GITHUB_TOKEN'); + $allowExternalTemporalProcess = \getenv('ALLOW_EXTERNAL_TEMPORAL_PROCESS') === 'true'; $systemInfo = SystemInfo::detect(); \is_string(\getenv('ROADRUNNER_BINARY')) and $systemInfo->rrExecutable = \getenv('ROADRUNNER_BINARY'); @@ -50,6 +53,7 @@ public static function create(?Command $command = null): self ])), $systemInfo, $command, + $allowExternalTemporalProcess, ); } @@ -139,17 +143,23 @@ public function startTemporalServer( } if (!$temporalStarted || !$this->temporalServerProcess->isRunning()) { - $this->io->error([ - \sprintf( - 'Error starting Temporal server: %s.', - !$temporalStarted ? "Health check failed" : $this->temporalServerProcess->getErrorOutput(), - ), - \sprintf( - 'Command: `%s`.', - $this->serializeProcess($this->temporalServerProcess), - ), - ]); - exit(1); + $errorOutput = $this->temporalServerProcess->getErrorOutput(); + if (!$this->allowExternalTemporalProcess || !\str_contains($errorOutput, 'address already in use')) { + $this->io->error([ + \sprintf( + 'Error starting Temporal server: %s.', + !$temporalStarted ? "Health check failed" : $errorOutput, + ), + \sprintf( + 'Command: `%s`.', + $this->serializeProcess($this->temporalServerProcess), + ), + ]); + exit(1); + } + $this->io->warning('Using external Temporal Server'); + + $this->externalTemporalProcessActive = true; } $this->io->info('Temporal server started.'); } @@ -174,17 +184,21 @@ public function startTemporalTestServer(int $commandTimeout = 10): void \sleep(1); if (!$this->temporalTestServerProcess->isRunning()) { - $this->io->error([ - \sprintf( - 'Error starting Temporal Test server: %s.', - $this->temporalTestServerProcess->getErrorOutput(), - ), - \sprintf( - 'Command: `%s`.', - $this->serializeProcess($this->temporalTestServerProcess), - ), - ]); - exit(1); + $errorOutput = $this->temporalTestServerProcess->getErrorOutput(); + if (!$this->allowExternalTemporalProcess || !\str_contains($errorOutput, 'address already in use')) { + $this->io->error([ + \sprintf( + 'Error starting Temporal Test server: %s.', + $errorOutput, + ), + \sprintf( + 'Command: `%s`.', + $this->serializeProcess($this->temporalTestServerProcess), + ), + ]); + exit(1); + } + $this->io->warning('Using external Temporal Test Server'); } $this->io->info('Temporal Test server started.'); } @@ -263,8 +277,7 @@ public function stopTemporalServer(): void { if ($this->isTemporalRunning()) { $this->io->info('Stopping Temporal server... '); - $this->temporalServerProcess->stop(); - $this->temporalServerProcess = null; + $this->stopTemporalServerProcess(); $this->io->info('Temporal server stopped.'); } } @@ -273,8 +286,7 @@ public function stopTemporalTestServer(): void { if ($this->isTemporalTestRunning()) { $this->io->info('Stopping Temporal Test server... '); - $this->temporalTestServerProcess->stop(); - $this->temporalTestServerProcess = null; + $this->stopTemporalTestServerProcess(); $this->io->info('Temporal Test server stopped.'); } } @@ -291,7 +303,8 @@ public function stopRoadRunner(): void public function isTemporalRunning(): bool { - return $this->temporalServerProcess?->isRunning() === true; + return ($this->allowExternalTemporalProcess && $this->externalTemporalProcessActive) || + $this->temporalServerProcess?->isRunning() === true; } public function isRoadRunnerRunning(): bool @@ -301,7 +314,28 @@ public function isRoadRunnerRunning(): bool public function isTemporalTestRunning(): bool { - return $this->temporalTestServerProcess?->isRunning() === true; + return ($this->allowExternalTemporalProcess && $this->externalTemporalProcessActive) || + $this->temporalTestServerProcess?->isRunning() === true; + } + + private function stopTemporalTestServerProcess(): void + { + if ($this->externalTemporalProcessActive) { + $this->externalTemporalProcessActive = false; + return; + } + $this->temporalTestServerProcess->stop(); + $this->temporalTestServerProcess = null; + } + + private function stopTemporalServerProcess(): void + { + if ($this->externalTemporalProcessActive) { + $this->externalTemporalProcessActive = false; + return; + } + $this->temporalServerProcess->stop(); + $this->temporalServerProcess = null; } private function serializeProcess(?Process $temporalServerProcess): string|array diff --git a/testing/src/WorkerFactory.php b/testing/src/WorkerFactory.php index 181b3bca0..41691c980 100644 --- a/testing/src/WorkerFactory.php +++ b/testing/src/WorkerFactory.php @@ -6,6 +6,7 @@ use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; +use Temporal\Client\WorkflowClient; use Temporal\DataConverter\DataConverter; use Temporal\DataConverter\DataConverterInterface; use Temporal\Exception\ExceptionInterceptor; @@ -14,6 +15,10 @@ use Temporal\Interceptor\SimplePipelineProvider; use Temporal\Internal\ServiceContainer; use Temporal\Internal\Workflow\Logger; +use Temporal\Plugin\CompositePipelineProvider; +use Temporal\Plugin\PluginRegistry; +use Temporal\Plugin\WorkerPluginContext; +use Temporal\Plugin\WorkerPluginInterface; use Temporal\Worker\ActivityInvocationCache\ActivityInvocationCacheInterface; use Temporal\Worker\ActivityInvocationCache\RoadRunnerActivityInvocationCache; use Temporal\Worker\ServiceCredentials; @@ -32,16 +37,20 @@ public function __construct( RPCConnectionInterface $rpc, ActivityInvocationCacheInterface $activityCache, ?ServiceCredentials $credentials = null, + ?PluginRegistry $pluginRegistry = null, + ?WorkflowClient $client = null, ) { $this->activityCache = $activityCache; - parent::__construct($dataConverter, $rpc, $credentials ?? ServiceCredentials::create()); + parent::__construct($dataConverter, $rpc, $credentials ?? ServiceCredentials::create(), $pluginRegistry, $client); } public static function create( ?DataConverterInterface $converter = null, ?RPCConnectionInterface $rpc = null, ?ServiceCredentials $credentials = null, + ?PluginRegistry $pluginRegistry = null, + ?WorkflowClient $client = null, ?ActivityInvocationCacheInterface $activityCache = null, ): static { return new static( @@ -49,6 +58,8 @@ public static function create( $rpc ?? Goridge::create(), $activityCache ?? RoadRunnerActivityInvocationCache::create($converter), $credentials, + $pluginRegistry ?? new PluginRegistry(), + $client, ); } @@ -60,14 +71,32 @@ public function newWorker( ?LoggerInterface $logger = null, ): WorkerInterface { $options ??= WorkerOptions::new(); + + $workerContext = new WorkerPluginContext( + taskQueue: $taskQueue, + workerOptions: $options, + exceptionInterceptor: $exceptionInterceptor, + ); + foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) { + $plugin->configureWorker($workerContext); + } + + $options = $workerContext->getWorkerOptions(); + + // Merge plugin-contributed interceptors with user-provided ones + $provider = new CompositePipelineProvider( + $workerContext->getInterceptors(), + $interceptorProvider ?? new SimplePipelineProvider(), + ); + $worker = new WorkerMock( new Worker( $taskQueue, - $options ?? WorkerOptions::new(), + $options, ServiceContainer::fromWorkerFactory( $this, - $exceptionInterceptor ?? ExceptionInterceptor::createDefault(), - $interceptorProvider ?? new SimplePipelineProvider(), + $workerContext->getExceptionInterceptor() ?? ExceptionInterceptor::createDefault(), + $provider, new Logger( $logger ?? new NullLogger(), $options->enableLoggingInReplay, @@ -78,6 +107,12 @@ public function newWorker( ), $this->activityCache, ); + + // Call initializeWorker hooks (forward order) + foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) { + $plugin->initializeWorker($worker); + } + $this->queues->add($worker); return $worker; diff --git a/tests/Acceptance/App/Attribute/Worker.php b/tests/Acceptance/App/Attribute/Worker.php index 819806921..87e572313 100644 --- a/tests/Acceptance/App/Attribute/Worker.php +++ b/tests/Acceptance/App/Attribute/Worker.php @@ -6,6 +6,7 @@ use Psr\Log\LoggerInterface; use Temporal\Interceptor\PipelineProvider; +use Temporal\Plugin\PluginInterface; use Temporal\Worker\WorkerOptions; /** @@ -22,10 +23,12 @@ final class Worker * @param array|null $options Callable that returns {@see WorkerOptions} * @param array|null $pipelineProvider Callable that returns {@see PipelineProvider} * @param array|null $logger Callable that returns {@see LoggerInterface} + * @param array|null $plugins */ public function __construct( public readonly ?array $options = null, public readonly ?array $pipelineProvider = null, public readonly ?array $logger = null, + public readonly ?array $plugins = null, ) {} } diff --git a/tests/Acceptance/App/Feature/WorkerFactory.php b/tests/Acceptance/App/Feature/WorkerFactory.php index b6c007a71..c2c14f741 100644 --- a/tests/Acceptance/App/Feature/WorkerFactory.php +++ b/tests/Acceptance/App/Feature/WorkerFactory.php @@ -35,16 +35,19 @@ public function createWorker( ...$feature->workflows, ...$feature->activities, ); - if ($attr !== null) { - $attr->options === null or $options = $this->invoker->invoke($attr->options); - $attr->pipelineProvider === null or $interceptorProvider = $this->invoker->invoke($attr->pipelineProvider); - $attr->logger === null or $logger = $this->invoker->invoke($attr->logger); + $options = $attr?->options === null ? null : $this->invoker->invoke($attr->options); + $interceptorProvider = $attr?->pipelineProvider === null ? null : $this->invoker->invoke($attr->pipelineProvider); + $logger = $attr?->logger === null ? null : $this->invoker->invoke($attr->logger); + + // Add plugins from the attribute to the factory's registry (already instantiated, no invoker needed) + if ($attr?->plugins !== null) { + $this->workerFactory->getPluginRegistry()->merge($attr->plugins); } return $this->workerFactory->newWorker( $feature->taskQueue, $options ?? WorkerOptions::new()->withMaxConcurrentActivityExecutionSize(10), - interceptorProvider: $interceptorProvider ?? null, + interceptorProvider: $interceptorProvider, logger: $logger ?? LoggerFactory::createServerLogger($feature->taskQueue), ); } @@ -53,7 +56,7 @@ public function createWorker( * Find {@see Worker} attribute in the classes collection. * If more than one attribute is found, an exception is thrown. */ - private static function findAttribute(string ...$classes): ?Worker + public static function findAttribute(string ...$classes): ?Worker { $classes = \array_unique($classes); /** @var array $found */ diff --git a/tests/Acceptance/App/TestCase.php b/tests/Acceptance/App/TestCase.php index 7f3c726ff..e6f4675a6 100644 --- a/tests/Acceptance/App/TestCase.php +++ b/tests/Acceptance/App/TestCase.php @@ -11,9 +11,15 @@ use Spiral\Core\Scope; use Temporal\Api\Enums\V1\EventType; use Temporal\Api\Failure\V1\Failure; +use Temporal\Client\ClientOptions; +use Temporal\Client\WorkflowClient; use Temporal\Client\WorkflowClientInterface; use Temporal\Client\WorkflowStubInterface; use Temporal\Exception\TemporalException; +use Temporal\Plugin\ClientPluginInterface; +use Temporal\Plugin\PluginRegistry; +use Temporal\Tests\Acceptance\App\Attribute\Worker; +use Temporal\Tests\Acceptance\App\Feature\WorkerFactory; use Temporal\Tests\Acceptance\App\Logger\ClientLogger; use Temporal\Tests\Acceptance\App\Logger\LoggerFactory; use Temporal\Tests\Acceptance\App\Runtime\ContainerFacade; @@ -45,14 +51,33 @@ protected function runTest(): mixed $logger = LoggerFactory::createClientLogger($feature->taskQueue); $logger->clear(); + // Build scope bindings + $bindings = [ + Feature::class => $feature, + static::class => $this, + State::class => $runtime, + LoggerInterface::class => ClientLogger::class, + ClientLogger::class => $logger, + ]; + + // Auto-inject plugin-configured client from #[Worker(plugins: [...])] attribute + $workerAttr = WorkerFactory::findAttribute(static::class); + if ($workerAttr?->plugins !== null) { + $pluginRegistry = new PluginRegistry($workerAttr->plugins); + $clientPlugins = $pluginRegistry->getPlugins(ClientPluginInterface::class); + if ($clientPlugins !== []) { + $existingClient = $container->get(WorkflowClientInterface::class); + $pluginClient = WorkflowClient::create( + serviceClient: $existingClient->getServiceClient(), + options: (new ClientOptions())->withNamespace($runtime->namespace), + pluginRegistry: new PluginRegistry($workerAttr->plugins), + ); + $bindings[WorkflowClientInterface::class] = $pluginClient; + } + } + return $container->runScope( - new Scope(name: 'feature', bindings: [ - Feature::class => $feature, - static::class => $this, - State::class => $runtime, - LoggerInterface::class => ClientLogger::class, - ClientLogger::class => $logger, - ]), + new Scope(name: 'feature', bindings: $bindings), function (Container $container): mixed { $reflection = new \ReflectionMethod($this, $this->name()); $args = $container->resolveArguments($reflection); diff --git a/tests/Acceptance/Extra/Plugin/ClientPluginTest.php b/tests/Acceptance/Extra/Plugin/ClientPluginTest.php new file mode 100644 index 000000000..53f460164 --- /dev/null +++ b/tests/Acceptance/Extra/Plugin/ClientPluginTest.php @@ -0,0 +1,180 @@ +getServiceClient(), + options: (new ClientOptions())->withNamespace($runtime->namespace), + pluginRegistry: new PluginRegistry([new PrefixPlugin()]), + )->withTimeout(5); + + $stub = $pluginClient->newUntypedWorkflowStub( + 'Extra_Plugin_ClientPlugin', + WorkflowOptions::new()->withTaskQueue($feature->taskQueue), + ); + $pluginClient->start($stub, 'hello'); + + $result = $stub->getResult('string'); + self::assertSame('plugin:hello', $result); + } + + /** + * Multiple plugins apply interceptors in registration order. + */ + #[Test] + public function multiplePluginsApplyInOrder( + WorkflowClientInterface $client, + Feature $feature, + State $runtime, + ): void { + $pluginClient = WorkflowClient::create( + serviceClient: $client->getServiceClient(), + options: (new ClientOptions())->withNamespace($runtime->namespace), + pluginRegistry: new PluginRegistry([new PrefixPlugin('A:'), new PrefixPlugin2('B:')]), + )->withTimeout(5); + + $stub = $pluginClient->newUntypedWorkflowStub( + 'Extra_Plugin_ClientPlugin', + WorkflowOptions::new()->withTaskQueue($feature->taskQueue), + ); + $pluginClient->start($stub, 'test'); + + $result = $stub->getResult('string'); + // Plugin interceptors prepend, so A runs first, then B + self::assertSame('B:A:test', $result); + } + + /** + * Duplicate plugin names throw exception. + */ + #[Test] + public function duplicatePluginThrowsException( + WorkflowClientInterface $client, + State $runtime, + ): void { + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Duplicate plugin "prefix-plugin"'); + + WorkflowClient::create( + serviceClient: $client->getServiceClient(), + options: (new ClientOptions())->withNamespace($runtime->namespace), + pluginRegistry: new PluginRegistry([new PrefixPlugin(), new PrefixPlugin()]), + ); + } + + /** + * Plugin from #[Worker(plugins: [...])] is also applied via #[Stub] attribute. + */ + #[Test] + public function pluginAppliedViaWorkerAttribute( + #[Stub('Extra_Plugin_ClientPlugin', args: ['world'])] + WorkflowStubInterface $stub, + ): void { + self::assertSame('plugin:world', $stub->getResult('string')); + } +} + + +#[WorkflowInterface] +class TestWorkflow +{ + #[WorkflowMethod(name: 'Extra_Plugin_ClientPlugin')] + public function handle(string $input) + { + return $input; + } +} + + +class PrefixPlugin implements ClientPluginInterface +{ + public function __construct( + private readonly string $prefix = 'plugin:', + ) {} + + public function getName(): string + { + return 'prefix-plugin'; + } + + public function configureClient(ClientPluginContext $context): void + { + $context->addInterceptor(new PrefixInterceptor($this->prefix)); + } +} + + +class PrefixPlugin2 implements ClientPluginInterface +{ + public function __construct( + private readonly string $prefix = 'plugin2:', + ) {} + + public function getName(): string + { + return 'prefix-plugin-2'; + } + + public function configureClient(ClientPluginContext $context): void + { + $context->addInterceptor(new PrefixInterceptor($this->prefix)); + } +} + + +class PrefixInterceptor implements WorkflowClientCallsInterceptor +{ + use WorkflowClientCallsInterceptorTrait; + + public function __construct( + private readonly string $prefix, + ) {} + + public function start(StartInput $input, callable $next): WorkflowExecution + { + $original = $input->arguments->getValue(0, 'string'); + + return $next($input->with( + arguments: EncodedValues::fromValues([$this->prefix . $original], DataConverter::createDefault()), + )); + } +} diff --git a/tests/Unit/Framework/WorkerFactoryMock.php b/tests/Unit/Framework/WorkerFactoryMock.php index 07b1779e9..6006a486f 100644 --- a/tests/Unit/Framework/WorkerFactoryMock.php +++ b/tests/Unit/Framework/WorkerFactoryMock.php @@ -31,6 +31,7 @@ use Temporal\Internal\Transport\RouterInterface; use Temporal\Internal\Transport\Server; use Temporal\Internal\Transport\ServerInterface; +use Temporal\Plugin\PluginRegistry; use Temporal\Internal\Workflow\Logger; use Temporal\Worker\Environment\Environment; use Temporal\Worker\Environment\EnvironmentInterface; @@ -69,6 +70,7 @@ class WorkerFactoryMock implements WorkerFactoryInterface, LoopInterface private QueueInterface $responses; private MarshallerInterface $marshaller; private EnvironmentInterface $env; + private PluginRegistry $pluginRegistry; public function __construct(DataConverterInterface $dataConverter) { @@ -112,6 +114,11 @@ public function newWorker( return $worker; } + public function getPluginRegistry(): PluginRegistry + { + return $this->pluginRegistry; + } + public function getReader(): ReaderInterface { return $this->reader; @@ -168,6 +175,7 @@ public function tick(): void private function boot(): void { + $this->pluginRegistry = new PluginRegistry(); $this->reader = $this->createReader(); $this->marshaller = $this->createMarshaller($this->reader); $this->queues = new ArrayRepository(); @@ -187,29 +195,24 @@ private function createReader(): ReaderInterface return new AttributeReader(); } - /** - * @return RouterInterface - */ private function createRouter(): RouterInterface { $router = new Router(); - $router->add(new Router\GetWorkerInfo($this->queues, $this->marshaller, ServiceCredentials::create())); + $router->add(new Router\GetWorkerInfo( + $this->queues, + $this->marshaller, + ServiceCredentials::create(), + new PluginRegistry(), + )); return $router; } - /** - * @return ServerInterface - */ private function createServer(): ServerInterface { return new Server($this->responses, \Closure::fromCallable([$this, 'onRequest'])); } - /** - * @param ReaderInterface $reader - * @return MarshallerInterface - */ private function createMarshaller(ReaderInterface $reader): MarshallerInterface { return new Marshaller(new AttributeMapperFactory($reader)); diff --git a/tests/Unit/Plugin/AbstractPluginTestCase.php b/tests/Unit/Plugin/AbstractPluginTestCase.php new file mode 100644 index 000000000..84f2dbac3 --- /dev/null +++ b/tests/Unit/Plugin/AbstractPluginTestCase.php @@ -0,0 +1,86 @@ +getName()); + } + + public function testConfigureClientPassthrough(): void + { + $plugin = new class('noop') extends AbstractPlugin {}; + $context = new ClientPluginContext(new ClientOptions()); + + $clone = clone $context; + $plugin->configureClient($context); + + self::assertSame($clone->getClientOptions(), $context->getClientOptions()); + self::assertSame($clone->getDataConverter(), $context->getDataConverter()); + } + + public function testConfigureScheduleClientPassthrough(): void + { + $plugin = new class('noop') extends AbstractPlugin {}; + $context = new ScheduleClientPluginContext(new ClientOptions()); + + $clone = clone $context; + $plugin->configureScheduleClient($context); + + self::assertSame($clone->getClientOptions(), $context->getClientOptions()); + self::assertSame($clone->getDataConverter(), $context->getDataConverter()); + } + + public function testConfigureWorkerFactoryPassthrough(): void + { + $plugin = new class('noop') extends AbstractPlugin {}; + $context = new WorkerFactoryPluginContext(); + + $clone = clone $context; + $plugin->configureWorkerFactory($context); + + self::assertSame($clone->getDataConverter(), $context->getDataConverter()); + } + + public function testConfigureWorkerPassthrough(): void + { + $plugin = new class('noop') extends AbstractPlugin {}; + $context = new WorkerPluginContext('test-queue', WorkerOptions::new()); + + $clone = clone $context; + $plugin->configureWorker($context); + + self::assertSame($clone->getWorkerOptions(), $context->getWorkerOptions()); + self::assertSame($clone->getExceptionInterceptor(), $context->getExceptionInterceptor()); + } + + public function testInitializeWorkerNoop(): void + { + $plugin = new class('noop') extends AbstractPlugin {}; + $worker = $this->createMock(WorkerInterface::class); + + // Should not throw + $plugin->initializeWorker($worker); + self::assertTrue(true); + } +} diff --git a/tests/Unit/Plugin/ClientPluginContextTestCase.php b/tests/Unit/Plugin/ClientPluginContextTestCase.php new file mode 100644 index 000000000..bd2bdf644 --- /dev/null +++ b/tests/Unit/Plugin/ClientPluginContextTestCase.php @@ -0,0 +1,70 @@ +getClientOptions()); + self::assertNull($context->getDataConverter()); + self::assertSame([], $context->getInterceptors()); + } + + public function testSetters(): void + { + $context = new ClientPluginContext(new ClientOptions()); + $newOptions = new ClientOptions(); + $converter = $this->createMock(DataConverterInterface::class); + + $result = $context + ->setClientOptions($newOptions) + ->setDataConverter($converter); + + self::assertSame($context, $result); + self::assertSame($newOptions, $context->getClientOptions()); + self::assertSame($converter, $context->getDataConverter()); + } + + public function testAddInterceptor(): void + { + $context = new ClientPluginContext(new ClientOptions()); + + $interceptor = new class implements WorkflowClientCallsInterceptor { + use WorkflowClientCallsInterceptorTrait; + }; + $result = $context->addInterceptor($interceptor); + + self::assertSame($context, $result); + self::assertCount(1, $context->getInterceptors()); + self::assertSame($interceptor, $context->getInterceptors()[0]); + } + + public function testSetInterceptors(): void + { + $context = new ClientPluginContext(new ClientOptions()); + + $interceptor = new class implements WorkflowClientCallsInterceptor { + use WorkflowClientCallsInterceptorTrait; + }; + $context->setInterceptors([$interceptor]); + + self::assertCount(1, $context->getInterceptors()); + } +} diff --git a/tests/Unit/Plugin/ClientPluginTestCase.php b/tests/Unit/Plugin/ClientPluginTestCase.php new file mode 100644 index 000000000..8af441763 --- /dev/null +++ b/tests/Unit/Plugin/ClientPluginTestCase.php @@ -0,0 +1,252 @@ +called = true; + } + }; + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertTrue($called); + } + + public function testPluginModifiesClientOptions(): void + { + $plugin = new class implements ClientPluginInterface { + use ClientPluginTrait; + + public function getName(): string + { + return 'test.namespace'; + } + + public function configureClient(ClientPluginContext $context): void + { + $context->setClientOptions( + (new ClientOptions())->withNamespace('plugin-namespace'), + ); + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + // The namespace metadata is set from plugin-modified options + self::assertNotNull($client->getServiceClient()); + } + + public function testPluginModifiesDataConverter(): void + { + $customConverter = $this->createMock(DataConverterInterface::class); + + $plugin = new class($customConverter) implements ClientPluginInterface { + use ClientPluginTrait; + + public function __construct(private DataConverterInterface $converter) {} + + public function getName(): string + { + return 'test.converter'; + } + + public function configureClient(ClientPluginContext $context): void + { + $context->setDataConverter($this->converter); + } + }; + + // Should not throw — converter is applied + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + self::assertNotNull($client); + } + + public function testPluginAddsInterceptor(): void + { + $interceptor = new class implements WorkflowClientCallsInterceptor { + use WorkflowClientCallsInterceptorTrait; + }; + + $plugin = new class($interceptor) implements ClientPluginInterface { + use ClientPluginTrait; + + public function __construct(private WorkflowClientCallsInterceptor $interceptor) {} + + public function getName(): string + { + return 'test.interceptor'; + } + + public function configureClient(ClientPluginContext $context): void + { + $context->addInterceptor($this->interceptor); + } + }; + + // Should not throw — interceptor pipeline is built with plugin interceptor + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + self::assertNotNull($client); + } + + public function testMultiplePluginsCalledInOrder(): void + { + $order = []; + + $plugin1 = new class($order) implements ClientPluginInterface { + use ClientPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.first'; + } + + public function configureClient(ClientPluginContext $context): void + { + $this->order[] = 'first'; + } + }; + + $plugin2 = new class($order) implements ClientPluginInterface { + use ClientPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.second'; + } + + public function configureClient(ClientPluginContext $context): void + { + $this->order[] = 'second'; + } + }; + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2])); + + self::assertSame(['first', 'second'], $order); + } + + public function testDuplicatePluginThrowsException(): void + { + $plugin1 = new class('dup') extends AbstractPlugin {}; + $plugin2 = new class('dup') extends AbstractPlugin {}; + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Duplicate plugin "dup"'); + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2])); + } + + public function testGetWorkerPluginsPropagation(): void + { + $plugin = new class('combo') extends AbstractPlugin {}; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + $workerPlugins = $client->getWorkerPlugins(); + self::assertCount(1, $workerPlugins); + self::assertSame($plugin, $workerPlugins[0]); + } + + public function testGetScheduleClientPluginsPropagation(): void + { + $plugin = new class('combo') extends AbstractPlugin {}; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + $schedulePlugins = $client->getScheduleClientPlugins(); + self::assertCount(1, $schedulePlugins); + self::assertSame($plugin, $schedulePlugins[0]); + } + + public function testClientOnlyPluginNotPropagatedToWorkers(): void + { + $plugin = new class implements ClientPluginInterface { + use ClientPluginTrait; + + public function getName(): string + { + return 'client-only'; + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertCount(0, $client->getWorkerPlugins()); + self::assertCount(0, $client->getScheduleClientPlugins()); + } + + public function testWorkerOnlyPluginNotPropagatedToScheduleClient(): void + { + $plugin = new class implements ClientPluginInterface, WorkerPluginInterface { + use ClientPluginTrait; + use WorkerPluginTrait; + + public function getName(): string + { + return 'client-worker'; + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertCount(1, $client->getWorkerPlugins()); + self::assertCount(0, $client->getScheduleClientPlugins()); + } + + private function mockServiceClient(): ServiceClientInterface + { + $context = $this->createMock(ContextInterface::class); + $context->method('getMetadata')->willReturn([]); + $context->method('withMetadata')->willReturn($context); + + $client = $this->createMock(ServiceClientInterface::class); + $client->method('getContext')->willReturn($context); + $client->method('withContext')->willReturn($client); + + return $client; + } +} diff --git a/tests/Unit/Plugin/CompositePipelineProviderTestCase.php b/tests/Unit/Plugin/CompositePipelineProviderTestCase.php new file mode 100644 index 000000000..4a8bb3673 --- /dev/null +++ b/tests/Unit/Plugin/CompositePipelineProviderTestCase.php @@ -0,0 +1,124 @@ +getPipeline(TestInvokeInterceptor::class); + /** @see TestInvokeInterceptor::__invoke() */ + $result = $pipeline->with(static fn(string $s) => $s, '__invoke')('_'); + + self::assertSame('_A', $result); + } + + public function testPluginInterceptorsPrependedToSimpleProvider(): void + { + $first = new TestOrderInterceptor('1'); + $second = new TestOrderInterceptor('2'); + + $baseProvider = new SimplePipelineProvider([$second]); + $composite = new CompositePipelineProvider([$first], $baseProvider); + + $pipeline = $composite->getPipeline(TestOrderInterceptor::class); + /** @see TestOrderInterceptor::handle() */ + $result = $pipeline->with(static fn(string $s) => $s, 'handle')('_'); + + // Plugin interceptor ($first) runs before base ($second) + self::assertSame('_12', $result); + } + + public function testPipelineCaching(): void + { + $composite = new CompositePipelineProvider([], new SimplePipelineProvider()); + + $pipeline1 = $composite->getPipeline(TestOrderInterceptor::class); + $pipeline2 = $composite->getPipeline(TestOrderInterceptor::class); + + self::assertSame($pipeline1, $pipeline2); + } + + public function testCustomPipelineProviderWithPluginInterceptors(): void + { + // Custom provider that doesn't extend SimplePipelineProvider + $customProvider = new class implements PipelineProvider { + public function getPipeline(string $interceptorClass): Pipeline + { + return Pipeline::prepare([]); + } + }; + + $interceptor = new TestOrderInterceptor('P'); + $composite = new CompositePipelineProvider([$interceptor], $customProvider); + + $pipeline = $composite->getPipeline(TestOrderInterceptor::class); + /** @see TestOrderInterceptor::handle() */ + $result = $pipeline->with(static fn(string $s) => $s, 'handle')('_'); + + self::assertSame('_P', $result); + } + + public function testEmptyPluginInterceptorsWithCustomProvider(): void + { + $customProvider = new class implements PipelineProvider { + public function getPipeline(string $interceptorClass): Pipeline + { + return Pipeline::prepare([new TestOrderInterceptor('X')]); + } + }; + + $composite = new CompositePipelineProvider([], $customProvider); + $pipeline = $composite->getPipeline(TestOrderInterceptor::class); + /** @see TestOrderInterceptor::handle() */ + $result = $pipeline->with(static fn(string $s) => $s, 'handle')('_'); + + self::assertSame('_X', $result); + } +} + +/** + * Test interceptor that appends a tag to the input string. + * @internal + */ +class TestOrderInterceptor implements Interceptor +{ + public function __construct(private readonly string $tag) {} + + public function handle(string $s, callable $next): string + { + return $next($s . $this->tag); + } +} + +/** + * Test interceptor using __invoke. + * @internal + */ +class TestInvokeInterceptor implements Interceptor +{ + public function __construct(private readonly string $tag) {} + + public function __invoke(string $s, callable $next): string + { + return $next($s . $this->tag); + } +} diff --git a/tests/Unit/Plugin/ConnectionPluginContextTestCase.php b/tests/Unit/Plugin/ConnectionPluginContextTestCase.php new file mode 100644 index 000000000..f52c8443f --- /dev/null +++ b/tests/Unit/Plugin/ConnectionPluginContextTestCase.php @@ -0,0 +1,48 @@ +createMock(ServiceClientInterface::class); + $context = new ConnectionPluginContext($serviceClient); + + self::assertSame($serviceClient, $context->getServiceClient()); + } + + public function testSetServiceClientReplacesValue(): void + { + $original = $this->createMock(ServiceClientInterface::class); + $replacement = $this->createMock(ServiceClientInterface::class); + + $context = new ConnectionPluginContext($original); + $context->setServiceClient($replacement); + + self::assertSame($replacement, $context->getServiceClient()); + } + + public function testSetServiceClientReturnsSelf(): void + { + $context = new ConnectionPluginContext( + $this->createMock(ServiceClientInterface::class), + ); + + $result = $context->setServiceClient( + $this->createMock(ServiceClientInterface::class), + ); + + self::assertSame($context, $result); + } +} diff --git a/tests/Unit/Plugin/ConnectionPluginTestCase.php b/tests/Unit/Plugin/ConnectionPluginTestCase.php new file mode 100644 index 000000000..a7e19d090 --- /dev/null +++ b/tests/Unit/Plugin/ConnectionPluginTestCase.php @@ -0,0 +1,307 @@ +called = true; + } + }; + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertTrue($called); + } + + public function testConfigureServiceClientCalledFromScheduleClient(): void + { + $called = false; + $plugin = new class($called) implements ConnectionPluginInterface, ScheduleClientPluginInterface { + use ConnectionPluginTrait; + use ScheduleClientPluginTrait; + + public function __construct(private bool &$called) {} + + public function getName(): string + { + return 'test.connection'; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $this->called = true; + } + }; + + new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertTrue($called); + } + + public function testPluginModifiesServiceClientViaWithAuthKey(): void + { + $authedClient = $this->mockServiceClient(); + + $originalClient = $this->createMock(ServiceClientInterface::class); + $originalClient->method('withAuthKey')->willReturn($authedClient); + // Allow context calls for the client constructor + $context = $this->createMock(ContextInterface::class); + $context->method('getMetadata')->willReturn([]); + $context->method('withMetadata')->willReturn($context); + $originalClient->method('getContext')->willReturn($context); + $originalClient->method('withContext')->willReturn($originalClient); + + $plugin = new class implements ConnectionPluginInterface, ClientPluginInterface { + use ConnectionPluginTrait; + use ClientPluginTrait; + + public function getName(): string + { + return 'test.auth'; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $context->setServiceClient( + $context->getServiceClient()->withAuthKey('my-api-key'), + ); + } + }; + + $client = new WorkflowClient($originalClient, pluginRegistry: new PluginRegistry([$plugin])); + + // The service client should be the authed version + self::assertSame($authedClient, $client->getServiceClient()); + } + + public function testPluginAddsMetadataViaContext(): void + { + $metadataSet = null; + + $context = $this->createMock(ContextInterface::class); + $context->method('getMetadata')->willReturn([]); + $context->method('withMetadata')->willReturnCallback( + static function (array $metadata) use ($context, &$metadataSet) { + $metadataSet = $metadata; + return $context; + }, + ); + + $serviceClient = $this->createMock(ServiceClientInterface::class); + $serviceClient->method('getContext')->willReturn($context); + $serviceClient->method('withContext')->willReturn($serviceClient); + + $plugin = new class implements ConnectionPluginInterface, ClientPluginInterface { + use ConnectionPluginTrait; + use ClientPluginTrait; + + public function getName(): string + { + return 'test.metadata'; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $client = $context->getServiceClient(); + $ctx = $client->getContext(); + $context->setServiceClient( + $client->withContext( + $ctx->withMetadata(['x-custom-header' => ['value']] + $ctx->getMetadata()), + ), + ); + } + }; + + new WorkflowClient($serviceClient, pluginRegistry: new PluginRegistry([$plugin])); + + // Metadata should have been set (by plugin and then by WorkflowClient for namespace) + self::assertNotNull($metadataSet); + } + + public function testMultipleConnectionPluginsCalledInOrder(): void + { + $order = []; + + $plugin1 = new class($order) implements ConnectionPluginInterface, ClientPluginInterface { + use ConnectionPluginTrait; + use ClientPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.first'; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $this->order[] = 'first'; + } + }; + + $plugin2 = new class($order) implements ConnectionPluginInterface, ClientPluginInterface { + use ConnectionPluginTrait; + use ClientPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.second'; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $this->order[] = 'second'; + } + }; + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2])); + + self::assertSame(['first', 'second'], $order); + } + + public function testConnectionPluginRunsBeforeClientPlugin(): void + { + $order = []; + + $plugin = new class($order) implements ConnectionPluginInterface, ClientPluginInterface { + use ConnectionPluginTrait; + use ClientPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.order'; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $this->order[] = 'connection'; + } + + public function configureClient(ClientPluginContext $context): void + { + $this->order[] = 'client'; + } + }; + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertSame(['connection', 'client'], $order); + } + + public function testDefaultTraitIsNoOp(): void + { + $plugin = new class('test.noop') extends AbstractPlugin {}; + + // Should not throw — all trait methods are no-ops + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + self::assertNotNull($client); + } + + public function testAbstractPluginWorksWithConnectionPlugin(): void + { + $called = false; + + $plugin = new class($called) extends AbstractPlugin { + private bool $ref; + + public function __construct(bool &$called) + { + parent::__construct('test.abstract'); + $this->ref = &$called; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $this->ref = true; + } + }; + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertTrue($called); + } + + public function testConnectionOnlyPluginNotRegisteredAsClientPlugin(): void + { + // A plugin implementing only ConnectionPluginInterface + // should still work when passed to WorkflowClient + $called = false; + $plugin = new class($called) implements ConnectionPluginInterface { + use ConnectionPluginTrait; + + public function __construct(private bool &$called) {} + + public function getName(): string + { + return 'test.conn-only'; + } + + public function configureServiceClient(ConnectionPluginContext $context): void + { + $this->called = true; + } + }; + + new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertTrue($called); + } + + private function mockServiceClient(): ServiceClientInterface + { + $context = $this->createMock(ContextInterface::class); + $context->method('getMetadata')->willReturn([]); + $context->method('withMetadata')->willReturn($context); + + $client = $this->createMock(ServiceClientInterface::class); + $client->method('getContext')->willReturn($context); + $client->method('withContext')->willReturn($client); + $client->method('withAuthKey')->willReturn($client); + + return $client; + } +} diff --git a/tests/Unit/Plugin/PluginPropagationTestCase.php b/tests/Unit/Plugin/PluginPropagationTestCase.php new file mode 100644 index 000000000..56581d862 --- /dev/null +++ b/tests/Unit/Plugin/PluginPropagationTestCase.php @@ -0,0 +1,239 @@ +order[] = 'configureClient'; + } + + public function configureWorkerFactory(WorkerFactoryPluginContext $context): void + { + $this->order[] = 'configureWorkerFactory'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->order[] = 'configureWorker'; + } + + public function initializeWorker(WorkerInterface $worker): void + { + $this->order[] = 'initializeWorker'; + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertSame(['configureClient'], $order); + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + client: $client, + ); + + self::assertSame(['configureClient', 'configureWorkerFactory'], $order); + + $factory->newWorker('test-queue'); + + self::assertSame([ + 'configureClient', + 'configureWorkerFactory', + 'configureWorker', + 'initializeWorker', + ], $order); + } + + public function testPluginFromClientMergesWithFactoryPlugins(): void + { + $order = []; + + $clientPlugin = new class($order) extends AbstractPlugin { + public function __construct(private array &$order) + { + parent::__construct('test.from-client'); + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->order[] = 'from-client'; + } + }; + + $factoryPlugin = new class($order) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.from-factory'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->order[] = 'from-factory'; + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$clientPlugin])); + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$factoryPlugin]), + client: $client, + ); + $factory->newWorker(); + + self::assertSame(['from-factory', 'from-client'], $order); + } + + public function testDuplicateAcrossClientAndFactoryThrows(): void + { + $clientPlugin = new class('shared-name') extends AbstractPlugin {}; + + $factoryPlugin = new class implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function getName(): string + { + return 'shared-name'; + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$clientPlugin])); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Duplicate plugin "shared-name"'); + + new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$factoryPlugin]), + client: $client, + ); + } + + public function testClientOnlyPluginNotPropagatedToFactory(): void + { + $factoryConfigureCalled = false; + + $plugin = new class($factoryConfigureCalled) implements ClientPluginInterface { + use ClientPluginTrait; + + public function __construct(private bool &$called) {} + + public function getName(): string + { + return 'test.client-only'; + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + // Client-only plugin should NOT appear in getWorkerPlugins + self::assertCount(0, $client->getWorkerPlugins()); + + // Factory should work fine without this plugin + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + client: $client, + ); + $factory->newWorker(); + + self::assertCount(0, $factory->getPluginRegistry()->getPlugins(PluginInterface::class)); + } + + public function testScheduleClientPluginPropagation(): void + { + $called = false; + + $plugin = new class($called) implements ClientPluginInterface, ScheduleClientPluginInterface { + use ClientPluginTrait; + use ScheduleClientPluginTrait; + + public function __construct(private bool &$called) {} + + public function getName(): string + { + return 'test.schedule-combo'; + } + + public function configureScheduleClient(ScheduleClientPluginContext $context): void + { + $this->called = true; + } + }; + + $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + $schedulePlugins = $client->getScheduleClientPlugins(); + self::assertCount(1, $schedulePlugins); + self::assertSame($plugin, $schedulePlugins[0]); + } + + private function mockServiceClient(): ServiceClientInterface + { + $context = $this->createMock(ContextInterface::class); + $context->method('getMetadata')->willReturn([]); + $context->method('withMetadata')->willReturn($context); + + $client = $this->createMock(ServiceClientInterface::class); + $client->method('getContext')->willReturn($context); + $client->method('withContext')->willReturn($client); + + return $client; + } + + private function mockRpc(): RPCConnectionInterface + { + return $this->createMock(RPCConnectionInterface::class); + } +} diff --git a/tests/Unit/Plugin/PluginRegistryTestCase.php b/tests/Unit/Plugin/PluginRegistryTestCase.php new file mode 100644 index 000000000..b8dde7e79 --- /dev/null +++ b/tests/Unit/Plugin/PluginRegistryTestCase.php @@ -0,0 +1,123 @@ +createPlugin('plugin-1'); + $plugin2 = $this->createPlugin('plugin-2'); + + $registry = new PluginRegistry([$plugin1, $plugin2]); + + // AbstractPlugin implements all three interfaces (via TemporalPluginInterface), + // so we can retrieve both via any of them + $plugins = $registry->getPlugins(ClientPluginInterface::class); + self::assertCount(2, $plugins); + self::assertSame($plugin1, $plugins[0]); + self::assertSame($plugin2, $plugins[1]); + } + + public function testDuplicateThrowsException(): void + { + $plugin1 = $this->createPlugin('my-plugin'); + $plugin2 = $this->createPlugin('my-plugin'); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Duplicate plugin "my-plugin"'); + + new PluginRegistry([$plugin1, $plugin2]); + } + + public function testDuplicateViaAddThrowsException(): void + { + $registry = new PluginRegistry(); + $registry->add($this->createPlugin('dup-plugin')); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Duplicate plugin "dup-plugin"'); + + $registry->add($this->createPlugin('dup-plugin')); + } + + public function testMergeThrowsOnDuplicates(): void + { + $plugin1 = $this->createPlugin('plugin-a'); + $plugin2 = $this->createPlugin('plugin-b'); + $plugin3 = $this->createPlugin('plugin-a'); // duplicate + + $registry = new PluginRegistry([$plugin1]); + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Duplicate plugin "plugin-a"'); + + $registry->merge([$plugin2, $plugin3]); + } + + public function testGetPluginsByInterface(): void + { + $clientPlugin = new class implements ClientPluginInterface { + public function getName(): string + { + return 'client-only'; + } + + public function configureClient(ClientPluginContext $context): void {} + }; + + $workerPlugin = new class implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function getName(): string + { + return 'worker-only'; + } + }; + + $bothPlugin = $this->createPlugin('both'); + + $registry = new PluginRegistry([$clientPlugin, $workerPlugin, $bothPlugin]); + + $clientPlugins = $registry->getPlugins(ClientPluginInterface::class); + self::assertCount(2, $clientPlugins); + self::assertSame($clientPlugin, $clientPlugins[0]); + self::assertSame($bothPlugin, $clientPlugins[1]); + + $workerPlugins = $registry->getPlugins(WorkerPluginInterface::class); + self::assertCount(2, $workerPlugins); + self::assertSame($workerPlugin, $workerPlugins[0]); + self::assertSame($bothPlugin, $workerPlugins[1]); + + $schedulePlugins = $registry->getPlugins(ScheduleClientPluginInterface::class); + self::assertCount(1, $schedulePlugins); + self::assertSame($bothPlugin, $schedulePlugins[0]); + } + + public function testEmptyRegistry(): void + { + $registry = new PluginRegistry(); + + self::assertSame([], $registry->getPlugins(ClientPluginInterface::class)); + } + + private function createPlugin(string $name): AbstractPlugin + { + return new class($name) extends AbstractPlugin {}; + } +} diff --git a/tests/Unit/Plugin/ScheduleClientPluginContextTestCase.php b/tests/Unit/Plugin/ScheduleClientPluginContextTestCase.php new file mode 100644 index 000000000..32133950c --- /dev/null +++ b/tests/Unit/Plugin/ScheduleClientPluginContextTestCase.php @@ -0,0 +1,41 @@ +getClientOptions()); + self::assertNull($context->getDataConverter()); + } + + public function testSetters(): void + { + $context = new ScheduleClientPluginContext(new ClientOptions()); + $newOptions = new ClientOptions(); + $converter = $this->createMock(DataConverterInterface::class); + + $result = $context + ->setClientOptions($newOptions) + ->setDataConverter($converter); + + self::assertSame($context, $result); + self::assertSame($newOptions, $context->getClientOptions()); + self::assertSame($converter, $context->getDataConverter()); + } +} diff --git a/tests/Unit/Plugin/ScheduleClientPluginTestCase.php b/tests/Unit/Plugin/ScheduleClientPluginTestCase.php new file mode 100644 index 000000000..8c220b9f0 --- /dev/null +++ b/tests/Unit/Plugin/ScheduleClientPluginTestCase.php @@ -0,0 +1,194 @@ +called = true; + } + }; + + new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + + self::assertTrue($called); + } + + public function testPluginModifiesClientOptions(): void + { + $plugin = new class implements ScheduleClientPluginInterface { + use ScheduleClientPluginTrait; + + public function getName(): string + { + return 'test.namespace'; + } + + public function configureScheduleClient(ScheduleClientPluginContext $context): void + { + $context->setClientOptions( + (new ClientOptions())->withNamespace('schedule-namespace'), + ); + } + }; + + $client = new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + self::assertNotNull($client); + } + + public function testPluginModifiesDataConverter(): void + { + $customConverter = $this->createMock(DataConverterInterface::class); + + $plugin = new class($customConverter) implements ScheduleClientPluginInterface { + use ScheduleClientPluginTrait; + + public function __construct(private DataConverterInterface $converter) {} + + public function getName(): string + { + return 'test.converter'; + } + + public function configureScheduleClient(ScheduleClientPluginContext $context): void + { + $context->setDataConverter($this->converter); + } + }; + + $client = new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin])); + self::assertNotNull($client); + } + + public function testMultiplePluginsCalledInOrder(): void + { + $order = []; + + $plugin1 = new class($order) implements ScheduleClientPluginInterface { + use ScheduleClientPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.first'; + } + + public function configureScheduleClient(ScheduleClientPluginContext $context): void + { + $this->order[] = 'first'; + } + }; + + $plugin2 = new class($order) implements ScheduleClientPluginInterface { + use ScheduleClientPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.second'; + } + + public function configureScheduleClient(ScheduleClientPluginContext $context): void + { + $this->order[] = 'second'; + } + }; + + new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2])); + + self::assertSame(['first', 'second'], $order); + } + + public function testNoPluginsDoesNotBreak(): void + { + $client = new ScheduleClient($this->mockServiceClient()); + self::assertNotNull($client); + } + + public function testPluginReceivesCorrectInitialContext(): void + { + $initialOptions = (new ClientOptions())->withNamespace('initial-ns'); + $initialConverter = $this->createMock(DataConverterInterface::class); + + $receivedOptions = null; + $receivedConverter = null; + + $plugin = new class($receivedOptions, $receivedConverter) implements ScheduleClientPluginInterface { + use ScheduleClientPluginTrait; + + public function __construct( + private ?ClientOptions &$receivedOptions, + private ?DataConverterInterface &$receivedConverter, + ) {} + + public function getName(): string + { + return 'test.inspector'; + } + + public function configureScheduleClient(ScheduleClientPluginContext $context): void + { + $this->receivedOptions = $context->getClientOptions(); + $this->receivedConverter = $context->getDataConverter(); + } + }; + + new ScheduleClient( + $this->mockServiceClient(), + options: $initialOptions, + converter: $initialConverter, + pluginRegistry: new PluginRegistry([$plugin]), + ); + + self::assertSame($initialOptions, $receivedOptions); + self::assertSame($initialConverter, $receivedConverter); + } + + private function mockServiceClient(): ServiceClientInterface + { + $context = $this->createMock(ContextInterface::class); + $context->method('getMetadata')->willReturn([]); + $context->method('withMetadata')->willReturn($context); + + $client = $this->createMock(ServiceClientInterface::class); + $client->method('getContext')->willReturn($context); + $client->method('withContext')->willReturn($client); + + return $client; + } +} diff --git a/tests/Unit/Plugin/WorkerFactoryPluginTestCase.php b/tests/Unit/Plugin/WorkerFactoryPluginTestCase.php new file mode 100644 index 000000000..7489dc95d --- /dev/null +++ b/tests/Unit/Plugin/WorkerFactoryPluginTestCase.php @@ -0,0 +1,679 @@ +called = true; + } + }; + + new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + + self::assertTrue($called); + } + + public function testConfigureWorkerFactoryModifiesDataConverter(): void + { + $customConverter = $this->createMock(DataConverterInterface::class); + + $plugin = new class($customConverter) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private DataConverterInterface $converter) {} + + public function getName(): string + { + return 'test.converter'; + } + + public function configureWorkerFactory(WorkerFactoryPluginContext $context): void + { + $context->setDataConverter($this->converter); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + + self::assertSame($customConverter, $factory->getDataConverter()); + } + + public function testConfigureWorkerIsCalled(): void + { + $called = false; + $receivedTaskQueue = null; + + $plugin = new class($called, $receivedTaskQueue) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct( + private bool &$called, + private ?string &$receivedTaskQueue, + ) {} + + public function getName(): string + { + return 'test.spy'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->called = true; + $this->receivedTaskQueue = $context->getTaskQueue(); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + $factory->newWorker('my-queue'); + + self::assertTrue($called); + self::assertSame('my-queue', $receivedTaskQueue); + } + + public function testConfigureWorkerModifiesWorkerOptions(): void + { + $customOptions = WorkerOptions::new()->withMaxConcurrentActivityExecutionSize(42); + + $plugin = new class($customOptions) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private WorkerOptions $opts) {} + + public function getName(): string + { + return 'test.options'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $context->setWorkerOptions($this->opts); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + $worker = $factory->newWorker('test-queue'); + + self::assertSame(42, $worker->getOptions()->maxConcurrentActivityExecutionSize); + } + + public function testInitializeWorkerIsCalled(): void + { + $receivedWorker = null; + + $plugin = new class($receivedWorker) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private ?WorkerInterface &$receivedWorker) {} + + public function getName(): string + { + return 'test.init'; + } + + public function initializeWorker(WorkerInterface $worker): void + { + $this->receivedWorker = $worker; + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + $worker = $factory->newWorker('test-queue'); + + self::assertSame($worker, $receivedWorker); + } + + public function testInitializeWorkerReceivesCorrectTaskQueue(): void + { + $receivedTaskQueue = null; + + $plugin = new class($receivedTaskQueue) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private ?string &$receivedTaskQueue) {} + + public function getName(): string + { + return 'test.tq'; + } + + public function initializeWorker(WorkerInterface $worker): void + { + $this->receivedTaskQueue = $worker->getID(); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + $factory->newWorker('my-task-queue'); + + self::assertSame('my-task-queue', $receivedTaskQueue); + } + + public function testPluginHookOrder(): void + { + $order = []; + + $plugin = new class($order) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.order'; + } + + public function configureWorkerFactory(WorkerFactoryPluginContext $context): void + { + $this->order[] = 'configureWorkerFactory'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->order[] = 'configureWorker'; + } + + public function initializeWorker(WorkerInterface $worker): void + { + $this->order[] = 'initializeWorker'; + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + + self::assertSame(['configureWorkerFactory'], $order); + + $factory->newWorker(); + + self::assertSame([ + 'configureWorkerFactory', + 'configureWorker', + 'initializeWorker', + ], $order); + } + + public function testMultiplePluginsCalledInOrder(): void + { + $order = []; + + $plugin1 = new class($order) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.first'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->order[] = 'first'; + } + }; + + $plugin2 = new class($order) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.second'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->order[] = 'second'; + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin1, $plugin2]), + ); + $factory->newWorker(); + + self::assertSame(['first', 'second'], $order); + } + + public function testConfigureWorkerCalledPerWorker(): void + { + $taskQueues = []; + + $plugin = new class($taskQueues) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$taskQueues) {} + + public function getName(): string + { + return 'test.per-worker'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->taskQueues[] = $context->getTaskQueue(); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + $factory->newWorker('queue-a'); + $factory->newWorker('queue-b'); + + self::assertSame(['queue-a', 'queue-b'], $taskQueues); + } + + public function testGetWorkerPluginsReturnsRegistered(): void + { + $plugin1 = new class('p1') extends AbstractPlugin {}; + $plugin2 = new class('p2') extends AbstractPlugin {}; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin1, $plugin2]), + ); + + $plugins = $factory->getPluginRegistry()->getPlugins(PluginInterface::class); + self::assertCount(2, $plugins); + self::assertSame($plugin1, $plugins[0]); + self::assertSame($plugin2, $plugins[1]); + } + + public function testDuplicatePluginThrowsException(): void + { + $plugin1 = new class('dup') extends AbstractPlugin {}; + $plugin2 = new class('dup') extends AbstractPlugin {}; + + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('Duplicate plugin "dup"'); + + new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin1, $plugin2]), + ); + } + + public function testRunHookIsCalled(): void + { + $called = false; + $plugin = new class($called) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private bool &$called) {} + + public function getName(): string + { + return 'test.run'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + $this->called = true; + return $next($factory); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + + $factory->run($this->mockHost()); + + self::assertTrue($called); + } + + public function testRunHookReceivesFactoryInstance(): void + { + $receivedFactory = null; + + $plugin = new class($receivedFactory) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private ?WorkerFactoryInterface &$receivedFactory) {} + + public function getName(): string + { + return 'test.factory-ref'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + $this->receivedFactory = $factory; + return $next($factory); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + + $factory->run($this->mockHost()); + + self::assertSame($factory, $receivedFactory); + } + + public function testRunHookChainOrder(): void + { + $order = []; + + $plugin1 = new class($order) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.first'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + $this->order[] = 'first:before'; + try { + return $next($factory); + } finally { + $this->order[] = 'first:after'; + } + } + }; + + $plugin2 = new class($order) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.second'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + $this->order[] = 'second:before'; + try { + return $next($factory); + } finally { + $this->order[] = 'second:after'; + } + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin1, $plugin2]), + ); + + $factory->run($this->mockHost()); + + // First plugin is outermost: before in forward order, after in reverse (LIFO) + self::assertSame([ + 'first:before', + 'second:before', + 'second:after', + 'first:after', + ], $order); + } + + public function testRunHookCanWrapWithTryFinally(): void + { + $cleanupCalled = false; + + $plugin = new class($cleanupCalled) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private bool &$cleanupCalled) {} + + public function getName(): string + { + return 'test.cleanup'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + try { + return $next($factory); + } finally { + $this->cleanupCalled = true; + } + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + + $factory->run($this->mockHost()); + + self::assertTrue($cleanupCalled); + } + + public function testRunHookCanSkipNext(): void + { + $innerCalled = false; + + $outerPlugin = new class() implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function getName(): string + { + return 'test.outer'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + // Intentionally skip $next() + return 42; + } + }; + + $innerPlugin = new class($innerCalled) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private bool &$innerCalled) {} + + public function getName(): string + { + return 'test.inner'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + $this->innerCalled = true; + return $next($factory); + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$outerPlugin, $innerPlugin]), + ); + + $result = $factory->run($this->mockHost()); + + self::assertSame(42, $result); + self::assertFalse($innerCalled); + } + + public function testRunHookFullLifecycleOrder(): void + { + $order = []; + + $plugin = new class($order) implements WorkerPluginInterface { + use WorkerPluginTrait; + + public function __construct(private array &$order) {} + + public function getName(): string + { + return 'test.lifecycle'; + } + + public function configureWorkerFactory(WorkerFactoryPluginContext $context): void + { + $this->order[] = 'configureWorkerFactory'; + } + + public function configureWorker(WorkerPluginContext $context): void + { + $this->order[] = 'configureWorker'; + } + + public function initializeWorker(WorkerInterface $worker): void + { + $this->order[] = 'initializeWorker'; + } + + public function run(WorkerFactoryInterface $factory, callable $next): int + { + $this->order[] = 'run:before'; + try { + return $next($factory); + } finally { + $this->order[] = 'run:after'; + } + } + }; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + $factory->newWorker(); + $factory->run($this->mockHost()); + + self::assertSame([ + 'configureWorkerFactory', + 'configureWorker', + 'initializeWorker', + 'run:before', + 'run:after', + ], $order); + } + + public function testRunHookReturnsValueFromRunLoop(): void + { + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + ); + + $result = $factory->run($this->mockHost()); + + self::assertSame(0, $result); + } + + public function testDefaultTraitRunPassesThrough(): void + { + $plugin = new class('test.noop') extends AbstractPlugin {}; + + $factory = new WorkerFactory( + DataConverter::createDefault(), + $this->mockRpc(), + pluginRegistry: new PluginRegistry([$plugin]), + ); + + $result = $factory->run($this->mockHost()); + + self::assertSame(0, $result); + } + + private function mockRpc(): RPCConnectionInterface + { + return $this->createMock(RPCConnectionInterface::class); + } + + /** + * Create a mock host that immediately returns null (empty run loop). + */ + private function mockHost(): HostConnectionInterface + { + $host = $this->createMock(HostConnectionInterface::class); + $host->method('waitBatch')->willReturn(null); + + return $host; + } +} diff --git a/tests/Unit/Plugin/WorkerPluginContextTestCase.php b/tests/Unit/Plugin/WorkerPluginContextTestCase.php new file mode 100644 index 000000000..22cd5709b --- /dev/null +++ b/tests/Unit/Plugin/WorkerPluginContextTestCase.php @@ -0,0 +1,62 @@ +getDataConverter()); + } + + public function testFactoryContextSetters(): void + { + $context = new WorkerFactoryPluginContext(); + $converter = $this->createMock(DataConverterInterface::class); + + $result = $context->setDataConverter($converter); + + self::assertSame($context, $result); + self::assertSame($converter, $context->getDataConverter()); + } + + // --- WorkerPluginContext --- + + public function testWorkerContextBuilderPattern(): void + { + $options = WorkerOptions::new(); + $context = new WorkerPluginContext('test-queue', $options); + + self::assertSame('test-queue', $context->getTaskQueue()); + self::assertSame($options, $context->getWorkerOptions()); + self::assertNull($context->getExceptionInterceptor()); + self::assertSame([], $context->getInterceptors()); + } + + public function testWorkerContextSetWorkerOptions(): void + { + $context = new WorkerPluginContext('test-queue', WorkerOptions::new()); + $newOptions = WorkerOptions::new(); + + $result = $context->setWorkerOptions($newOptions); + + self::assertSame($context, $result); + self::assertSame($newOptions, $context->getWorkerOptions()); + } +}