Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8469a84
Improved index queue processing
mcop1 Sep 9, 2025
19cf2c1
Added migration
mcop1 Sep 9, 2025
ae1b417
Apply php-cs-fixer changes
mcop1 Sep 10, 2025
016d2f3
Async adding items for update/add operation
mcop1 Sep 10, 2025
6f85435
Apply php-cs-fixer changes
mcop1 Sep 10, 2025
f754424
Removed auto-generated stub
mcop1 Sep 10, 2025
bb95410
Apply php-cs-fixer changes
mcop1 Sep 10, 2025
9845ca3
Improved sql statements, avoided bc break
mcop1 Sep 11, 2025
6547e24
Added message to transport, improved reading values from sql result
mcop1 Sep 11, 2025
b454883
Apply php-cs-fixer changes
mcop1 Sep 11, 2025
d7bb9a0
Reverted changes to generateSelectQuery
mcop1 Sep 11, 2025
b0a7d07
Apply php-cs-fixer changes
mcop1 Sep 11, 2025
9dd4285
Add processing of messages to tests
mcop1 Sep 11, 2025
139a547
Using consume method
mcop1 Sep 11, 2025
a417413
Revert "Using consume method"
mcop1 Sep 11, 2025
4a86734
Revert "Add processing of messages to tests"
mcop1 Sep 11, 2025
ddbffd1
Implemented SynchronousProcessingRelatedIdsService
mcop1 Sep 11, 2025
a894e34
Apply php-cs-fixer changes
mcop1 Sep 11, 2025
1f656d0
Refactor deleteQueueEntries
mcop1 Sep 16, 2025
01d6c47
Apply php-cs-fixer changes
mcop1 Sep 16, 2025
952bde4
Added documentation
mcop1 Sep 16, 2025
00f4cf2
Added upgrade note
mcop1 Sep 18, 2025
a1462bf
Changing runners for codeception tests
mcop1 Sep 18, 2025
821a39c
Reverting to default runners
mcop1 Sep 18, 2025
1d0d135
Adapted installer to create new table layout
mcop1 Sep 18, 2025
dcfe9a8
Apply php-cs-fixer changes
mcop1 Sep 18, 2025
0d8439d
Cast id to string for compatibility with lower versions
mcop1 Sep 18, 2025
f0ca860
Merge branch '2.x' into 337-rethink-index-queue-services-with-workers
mcop1 Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/pimcore/messenger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ framework:
Pimcore\Bundle\GenericDataIndexBundle\Message\DispatchQueueMessagesMessage: pimcore_generic_data_index_queue
Pimcore\Bundle\GenericDataIndexBundle\Message\UpdateLanguageSettingsMessage: pimcore_generic_data_index_queue
Pimcore\Bundle\GenericDataIndexBundle\Message\UpdateClassMappingMessage: pimcore_generic_data_index_queue
Pimcore\Bundle\GenericDataIndexBundle\Message\EnqueueRelatedIdsMessage: pimcore_generic_data_index_queue
buses:
messenger.bus.pimcore-generic-data-index:
middleware:
Expand Down
3 changes: 3 additions & 0 deletions config/services/search/index.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ services:
Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingServiceInterface:
class: Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingService

Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingRelatedIdsServiceInterface:
class: Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingRelatedIdsService

Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\EnqueueServiceInterface:
class: Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\EnqueueService

Expand Down
3 changes: 2 additions & 1 deletion doc/01_Installation/02_Upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
Following steps are necessary during updating to newer versions.

## Upgrade to 2.2.0
- Added `trackTotalHits` parameter to `DefaultSearchService` and `SearchExecutionService`. The default value is true, which means that total hits will always be computed accurately, even if they exceed the search engines threshold for accurate hit calculation. Change this parameter to `null`, to use the default threshold, pass an integer value to set a specific one.
- [Indexing] Added `id` column as new primary key to `generic_data_index_queue`. Please make sure to execute migrations.
- [Searching] Added `trackTotalHits` parameter to `DefaultSearchService` and `SearchExecutionService`. The default value is true, which means that total hits will always be computed accurately, even if they exceed the search engines threshold for accurate hit calculation. Change this parameter to `null`, to use the default threshold, pass an integer value to set a specific one.

## Upgrade to 2.1.0
- Added support for Symfony 7
Expand Down
21 changes: 21 additions & 0 deletions doc/02_Configuration/03_Index_Management.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ pimcore_generic_data_index:
max_batch_size: 400
```

#### Related elements

The indexing queue is automatically populated whenever an element undergoes an update operation. This process includes not only the modified element itself but also any related elements. By default, this indexing occurs asynchronously through Symfony Messenger.

For scenarios requiring immediate processing, you can temporarily switch to synchronous mode by utilizing the `SynchronousProcessingRelatedIdsServiceInterface`.

Available methods are:

| Method | Description |
|--------|-------------|
| `enable()` | Activates synchronous processing mode |
| `disable()` | Reverts to asynchronous processing mode |
| `isEnabled()` | Returns the current processing mode status |

:::info

Currently the `SynchronousProcessingRelatedIdsServiceInterface` interface does not influence the behavior of delete operations. They are always processed synchronously.

:::


### Repairing Indices

Sometimes it might be needed to delete and recreate the index from the Pimcore database
Expand Down
16 changes: 15 additions & 1 deletion src/Entity/IndexQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ class IndexQueue
public const TABLE = 'generic_data_index_queue';

#[ORM\Id]
#[ORM\Column(type: 'bigint')]
private int $id;

#[ORM\Column(type: 'integer', options: ['unsigned' => true])]
private int $elementId;

#[ORM\Id]
#[ORM\Column(type: 'string', length: 20)]
private string $elementType;

Expand All @@ -48,6 +50,18 @@ class IndexQueue
#[ORM\Column(type: 'bigint', options: ['unsigned' => true, 'default' => 0])]
private string $dispatched;

public function getId(): int
{
return $this->id;
}

public function setId(int $id): IndexQueue
{
$this->id = $id;

return $this;
}

public function getElementId(): int
{
return $this->elementId;
Expand Down
7 changes: 5 additions & 2 deletions src/EventSubscriber/AssetIndexUpdateSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexQueueOperation;
use Pimcore\Bundle\GenericDataIndexBundle\Installer;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessagesDispatcher;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingRelatedIdsServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueueServiceInterface;
use Pimcore\Event\AssetEvents;
Expand All @@ -31,7 +32,8 @@ public function __construct(
private IndexQueueServiceInterface $indexQueueService,
private Installer $installer,
private QueueMessagesDispatcher $queueMessagesDispatcher,
private SynchronousProcessingServiceInterface $synchronousProcessing
private SynchronousProcessingServiceInterface $synchronousProcessing,
private SynchronousProcessingRelatedIdsServiceInterface $synchronousProcessingRelatedIds
) {
}

Expand All @@ -54,7 +56,8 @@ public function updateAsset(AssetEvent $event): void
->updateIndexQueue(
element: $event->getAsset(),
operation: IndexQueueOperation::UPDATE->value,
processSynchronously: $this->synchronousProcessing->isEnabled()
processSynchronously: $this->synchronousProcessing->isEnabled(),
enqueueRelatedItemsAsync: $this->synchronousProcessingRelatedIds->isEnabled() === false
)
->commit();

Expand Down
37 changes: 21 additions & 16 deletions src/EventSubscriber/DataObjectIndexUpdateSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexQueueOperation;
use Pimcore\Bundle\GenericDataIndexBundle\Installer;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessagesDispatcher;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingRelatedIdsServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueueServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Traits\LoggerAwareTrait;
use Pimcore\Event\DataObjectEvents;
use Pimcore\Event\Model\DataObjectEvent;
use Pimcore\Model\DataObject\AbstractObject;
use Pimcore\Model\DataObject\Service;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

/**
Expand All @@ -35,7 +36,8 @@ public function __construct(
private readonly Installer $installer,
private readonly IndexQueueServiceInterface $indexQueueService,
private readonly QueueMessagesDispatcher $queueMessagesDispatcher,
private readonly SynchronousProcessingServiceInterface $synchronousProcessing
private readonly SynchronousProcessingServiceInterface $synchronousProcessing,
private readonly SynchronousProcessingRelatedIdsServiceInterface $synchronousProcessingRelatedIds
) {
}

Expand All @@ -62,19 +64,20 @@ public function updateDataObject(DataObjectEvent $event): void
return;
}

$inheritanceBackup = AbstractObject::getGetInheritedValues();
AbstractObject::setGetInheritedValues(true);
$dataObject = $event->getObject();
Service::useInheritedValues(true, fn () =>
$this->indexQueueService
->updateIndexQueue(
$dataObject,
IndexQueueOperation::UPDATE->value,
$this->synchronousProcessing->isEnabled(),
$dataObject->hasChildren(includingUnpublished: true),
$this->synchronousProcessingRelatedIds->isEnabled() === false
)
->commit()
);

$this->indexQueueService
->updateIndexQueue(
element: $event->getObject(),
operation: IndexQueueOperation::UPDATE->value,
processSynchronously: $this->synchronousProcessing->isEnabled()
)
->commit();
$this->queueMessagesDispatcher->dispatchQueueMessages();

AbstractObject::setGetInheritedValues($inheritanceBackup);
}

public function deleteDataObject(DataObjectEvent $event): void
Expand All @@ -83,11 +86,13 @@ public function deleteDataObject(DataObjectEvent $event): void
return;
}

$dataObject = $event->getObject();
$this->indexQueueService
->updateIndexQueue(
element: $event->getObject(),
operation: IndexQueueOperation::DELETE->value,
processSynchronously: $this->synchronousProcessing->isEnabled()
$dataObject,
IndexQueueOperation::DELETE->value,
$this->synchronousProcessing->isEnabled(),
$dataObject->hasChildren(includingUnpublished: true)
)
->commit();
$this->queueMessagesDispatcher->dispatchQueueMessages();
Expand Down
7 changes: 5 additions & 2 deletions src/EventSubscriber/DocumentIndexUpdateSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\IndexQueueOperation;
use Pimcore\Bundle\GenericDataIndexBundle\Installer;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessagesDispatcher;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingRelatedIdsServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\SynchronousProcessingServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueueServiceInterface;
use Pimcore\Event\DocumentEvents;
Expand All @@ -31,7 +32,8 @@ public function __construct(
private IndexQueueServiceInterface $indexQueueService,
private Installer $installer,
private QueueMessagesDispatcher $queueMessagesDispatcher,
private SynchronousProcessingServiceInterface $synchronousProcessing
private SynchronousProcessingServiceInterface $synchronousProcessing,
private SynchronousProcessingRelatedIdsServiceInterface $synchronousProcessingRelatedIds
) {
}

Expand All @@ -54,7 +56,8 @@ public function updateDocument(DocumentEvent $event): void
->updateIndexQueue(
element: $event->getDocument(),
operation: IndexQueueOperation::UPDATE->value,
processSynchronously: $this->synchronousProcessing->isEnabled()
processSynchronously: $this->synchronousProcessing->isEnabled(),
enqueueRelatedItemsAsync: $this->synchronousProcessingRelatedIds->isEnabled() === false
)
->commit();

Expand Down
8 changes: 5 additions & 3 deletions src/Installer.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use Doctrine\DBAL\Schema\SchemaException;
use Pimcore;
use Pimcore\Bundle\GenericDataIndexBundle\Entity\IndexQueue;
use Pimcore\Bundle\GenericDataIndexBundle\Migrations\Version20240325081139;
use Pimcore\Bundle\GenericDataIndexBundle\Migrations\Version20251009110653;
use Pimcore\Extension\Bundle\Installer\Exception\InstallationException;
use Symfony\Component\HttpKernel\Bundle\BundleInterface;

Expand All @@ -40,7 +40,7 @@ public function __construct(

public function getLastMigrationVersionClassName(): ?string
{
return Version20240325081139::class;
return Version20251009110653::class;
}

/**
Expand Down Expand Up @@ -89,6 +89,7 @@ private function installIndexQueueTable(Schema $schema): void
{
if (!$schema->hasTable(IndexQueue::TABLE)) {
$queueTable = $schema->createTable(IndexQueue::TABLE);
$queueTable->addColumn('id', 'bigint', ['autoincrement' => true]);
$queueTable->addColumn('elementId', 'integer', ['notnull' => true, 'unsigned' => true]);
$queueTable->addColumn('elementType', 'string', ['notnull' => true, 'length' => 20]);
$queueTable->addColumn('elementIndexName', 'string', ['notnull' => true, 'length' => 255]);
Expand All @@ -100,7 +101,8 @@ private function installIndexQueueTable(Schema $schema): void
'default' => 0,
]);

$queueTable->setPrimaryKey(['elementId', 'elementType']);
$queueTable->setPrimaryKey(['id']);
$queueTable->addUniqueIndex(['elementId', 'elementType'], IndexQueue::TABLE . '_element_id_type');
$queueTable->addIndex(['dispatched'], IndexQueue::TABLE . '_dispatched');
$queueTable->addIndex(['operationTime'], IndexQueue::TABLE . '_operation_time');
}
Expand Down
50 changes: 50 additions & 0 deletions src/Message/EnqueueRelatedIdsMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php
declare(strict_types=1);

/**
* This source file is available under the terms of the
* Pimcore Open Core License (POCL)
* Full copyright and license information is available in
* LICENSE.md which is distributed with this source code.
*
* @copyright Copyright (c) Pimcore GmbH (https://www.pimcore.com)
* @license Pimcore Open Core License (POCL)
*/

namespace Pimcore\Bundle\GenericDataIndexBundle\Message;

use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\ElementType;

/**
* @internal
*/
final readonly class EnqueueRelatedIdsMessage
{
public function __construct(
private int $elementId,
private ElementType $elementType,
private string $operation,
private bool $addParentElement
) {
}

public function getElementId(): int
{
return $this->elementId;
}

public function getElementType(): ElementType
{
return $this->elementType;
}

public function getOperation(): string
{
return $this->operation;
}

public function getAddParentElement(): bool
{
return $this->addParentElement;
}
}
62 changes: 62 additions & 0 deletions src/MessageHandler/EnqueueRelatedIdsHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php
declare(strict_types=1);

/**
* This source file is available under the terms of the
* Pimcore Open Core License (POCL)
* Full copyright and license information is available in
* LICENSE.md which is distributed with this source code.
*
* @copyright Copyright (c) Pimcore GmbH (https://www.pimcore.com)
* @license Pimcore Open Core License (POCL)
*/

namespace Pimcore\Bundle\GenericDataIndexBundle\MessageHandler;

use Pimcore\Bundle\GenericDataIndexBundle\Enum\SearchIndex\ElementType;
use Pimcore\Bundle\GenericDataIndexBundle\Message\EnqueueRelatedIdsMessage;
use Pimcore\Bundle\GenericDataIndexBundle\Service\ElementServiceInterface;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueue\QueueMessagesDispatcher;
use Pimcore\Bundle\GenericDataIndexBundle\Service\SearchIndex\IndexQueueServiceInterface;
use Pimcore\Model\DataObject\AbstractObject;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

/**
* @internal
*/
#[AsMessageHandler]
final readonly class EnqueueRelatedIdsHandler
{
public function __construct(
private IndexQueueServiceInterface $indexQueueService,
private QueueMessagesDispatcher $queueMessagesDispatcher,
private ElementServiceInterface $elementService
) {
}

public function __invoke(EnqueueRelatedIdsMessage $message): void
{
$inheritanceBackup = null;
$elementType = $message->getElementType();
$element = $this->elementService->getElementByType($message->getElementId(), $elementType->value);

if ($elementType === ElementType::DATA_OBJECT) {
$inheritanceBackup = AbstractObject::getGetInheritedValues();
AbstractObject::setGetInheritedValues(true);
}

$this->indexQueueService->updateIndexQueue(
$element,
$message->getOperation(),
!$message->getAddParentElement(),
true,
false
)->commit();

if ($inheritanceBackup !== null) {
AbstractObject::setGetInheritedValues($inheritanceBackup);
}

$this->queueMessagesDispatcher->dispatchQueueMessages();
}
}
Loading