Message bus
New in version 12.2
TYPO3 provides a message bus solution based on symfony/messenger. It has the ability to send messages and then handle them immediately (synchronous) or send them through transports (asynchronous, for example, queues) to be handled later.
For backwards compatibility, the default implementation uses the synchronous transport. This means that the message bus will behave exactly as before, but it will be possible to switch to a different (asynchronous) transport on a per-project base.
To offer asynchronicity, TYPO3 also provides a transport implementation based on the Doctrine DBAL messenger transport from Symfony and a basic implementation of a consumer command.
See also
To familiarize yourself with the concept, please also read the following resources:
More details and an example implementation are described in this blog post:
Table of Contents
"Everyday" usage - as a developer
Dispatch a message
-
Add a PHP class for your message object (which is an arbitrary PHP class)
-
Inject the
Message
into your class and call theBus Interface dispatch
method() <?php declare(strict_types=1); namespace MyVendor\MyExtension; use MyVendor\MyExtension\Queue\Message\DemoMessage; use Symfony\Component\Messenger\MessageBusInterface; final class MyClass { public function __construct( private readonly MessageBusInterface $bus, ) {} public function doSomething(): void { // ... $this->bus->dispatch(new DemoMessage('test')); // ... } }
Register a handler
Changed in version 13.0
A message handler can be registered using the symfony PHP attribute
\Symfony\
.
Implement the handler class
<?php
declare(strict_types=1);
namespace MyVendor\MyExtension\Queue\Handler;
use MyVendor\MyExtension\Queue\Message\DemoMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final class DemoHandler
{
public function __invoke(DemoMessage $message): void
{
// do something with $message
}
}
If your extension needs to be compatible with TYPO3 v13 and v12, use a tag
to register the handler. A Services.
entry is also needed to use
before
/after
to define an order.
MyVendor\MyExtension\Queue\Handler\DemoHandler:
tags:
- name: 'messenger.message_handler'
# Define another handler which should be called before DemoHandler:
MyVendor\MyExtension\Queue\Handler\DemoHandler2:
tags:
- name: 'messenger.message_handler'
before: 'MyVendor\MyExtension\Queue\Handler\DemoHandler'
"Everyday" usage - as a system administrator/integrator
By default, TYPO3 will behave like in versions before TYPO3 v12. This means that the message bus will use the synchronous transport and all messages will be handled immediately. To benefit from the message bus, it is recommended to switch to an asynchronous transport. Using asynchronous transports increases the resilience of the system by decoupling external dependencies even further.
Currently, the TYPO3 Core provides an asynchronous transport based on the Doctrine DBAL messenger transport. This transport is configured to use the default TYPO3 database connection. It is pre-configured and can be used by changing the settings:
$GLOBALS['TYPO3_CONF_VARS']['SYS']['messenger']['routing']['*'] = 'doctrine';
This will route all messages to the asynchronous transport (mind the *
).
Attention
If you are using the Doctrine transport, make sure to take care of running the consume command.
Async message handling - The consume command
To consume messages, run the command:
vendor/bin/typo3 messenger:consume <receiver-name>
typo3/sysext/core/bin/typo3 messenger:consume <receiver-name>
By default, you should run:
vendor/bin/typo3 messenger:consume doctrine
typo3/sysext/core/bin/typo3 messenger:consume doctrine
The command is a slimmed-down wrapper for the Symfony command
messenger:
, it only provides the basic consumption functionality. As
this command is running as a worker, it is stopped after 1 hour to avoid memory
leaks. Therefore, the command should be run from a service manager like
systemd to restart automatically after the command exits due to the time
limit.
The following code provides an example for a service. Create the following file on your server:
[Unit]
Description=Run the TYPO3 message consumer
Requires=mariadb.service
After=mariadb.service
[Service]
Type=simple
User=www-data
Group=www-data
ExecStart=/usr/bin/php8.1 /var/www/myproject/vendor/bin/typo3 messenger:consume doctrine --exit-code-on-limit 133
# Generally restart on error
Restart=on-failure
# Restart on exit code 133 (which is returned by the command when limits are reached)
RestartForceExitStatus=133
# ..but do not interpret exit code 133 as an error (as it's just a restart request)
SuccessExitStatus=133
[Install]
WantedBy=multi-user.target
Advanced usage
Configure a custom transport (Senders/Receivers)
Transports are configured in the services configuration. To allow the
configuration of a transport per message, the TYPO3 configuration
(settings.
, additional.
on system level, or
ext_
in an extension) is utilized. The transport/sender
name used in the settings is resolved to a service that has been tagged with
message.
and the respective identifier.
$GLOBALS['TYPO3_CONF_VARS']['SYS']['messenger'] = [
'routing' => [
// Use "messenger.transport.demo" as transport for DemoMessage
\MyVendor\MyExtension\Queue\Message\DemoMessage::class => 'demo',
// Use "messenger.transport.default" as transport for all other messages
'*' => 'default',
]
];
messenger.transport.demo:
factory: [ '@TYPO3\CMS\Core\Messenger\DoctrineTransportFactory', 'createTransport' ]
class: 'Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport'
arguments:
$options:
queue_name: 'demo'
tags:
- name: 'messenger.sender'
identifier: 'demo'
- name: 'messenger.receiver'
identifier: 'demo'
messenger.transport.default:
factory: [ '@Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory', 'createTransport' ]
class: 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport'
arguments:
$dsn: 'in-memory://default'
$options: [ ]
tags:
- name: 'messenger.sender'
identifier: 'default'
- name: 'messenger.receiver'
identifier: 'default'
The TYPO3 Core has been tested with three transports:
\Symfony\
(default)Component\ Messenger\ Transport\ Sync\ Sync Transport \Symfony\
(using the Doctrine DBAL messenger transport)Component\ Messenger\ Bridge\ Doctrine\ Transport\ Doctrine Transport \Symfony\
(for testing)Component\ Messenger\ Transport\ In Memory\ In Memory Transport
InMemoryTransport for testing
The In
is a transport that should only be used while
testing.
messenger.transport.default:
factory: [ '@Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory', 'createTransport' ]
class: 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport'
public: true
arguments:
$dsn: 'in-memory://default'
$options: [ ]
tags:
- name: 'messenger.sender'
identifier: 'default'
- name: 'messenger.receiver'
identifier: 'default'
Configure a custom middleware
The middleware is set up in the services configuration. By default, the
\Symfony\
and the
\Symfony\
are
registered. See also the Custom middleware section in the Symfony
documentation.
To add your own middleware, tag it as messenger.
and set the
order using TYPO3's before
and after
ordering mechanism:
Symfony\Component\Messenger\Middleware\SendMessageMiddleware:
arguments:
$sendersLocator: '@Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface'
$eventDispatcher: '@Psr\EventDispatcher\EventDispatcherInterface'
tags:
- { name: 'messenger.middleware' }
Symfony\Component\Messenger\Middleware\HandleMessageMiddleware:
arguments:
$handlersLocator: '@Symfony\Component\Messenger\Handler\HandlersLocatorInterface'
tags:
- name: 'messenger.middleware'
after: 'Symfony\Component\Messenger\Middleware\SendMessageMiddleware'