How to handle messages in batches with Symfony Messenger.
A useful feature explained.
Symfony Messenger 5.4 introduced handling messages in batches at the end of 2021.
The benefits are obvious — you handle multiple messages at once instead of processing them one by one, which is more useful and efficient in some cases (think of feeding a batch of documents into an external search engine like Solr with one HTTP request instead of multiple single requests per message).
Unfortunately, there’s nothing more to find in the Messenger documentation than the announcement I linked above. There is an unanswered question on Stack Overflow on how to actually use it, an open issue on Symfony’s GitHub page to add proper documentation, and some related threads in the Slack channel.
Let’s examine this feature in greater detail. If you just want to use it, you can skip straight to the summary.
Messenger component
I assume you’ve already added the symfony/messenger
component to your project and have a basic understanding of how it works.
Among others, the following parts are related to handling messages in batches with the messenger component:
\Symfony\Component\Messenger\Worker
\Symfony\Component\Messenger\Middleware\HandleMessageMiddleware
\Symfony\Component\Messenger\Handler\BatchHandlerInterface
\Symfony\Component\Messenger\Handler\BatchHandlerTrait
\Symfony\Component\Messenger\Handler\Acknowledger
\Symfony\Component\Messenger\Stamp\AckStamp
\Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp
I won’t go into all the details, but you can take a look at the code yourself if you’re curious.
Example dissection
Let’s take a closer look at the example given in the announcement:
The handler class implements the BatchHandlerInterface
interface that declares only one public method flush
¹
/**
* Flushes any pending buffers.
*
* @param bool $force Whether flushing is required
*/
public function flush(bool $force): void;
Next there’s the BatchHandlerTrait
trait which implements the interface and some private methods to queue and handle the messages.
Then there’s the actual __invoke
Method that’s called by the middleware to handle the message MyMessage
which takes another (optional) argument of type Acknowledger
.
The __invoke
Method calls the private handle
Method (the default implementation is in the trait) and passes in the given message MyMessage
and the instance of the Acknowledger
.
Then the handle
method either queues the messages or flushes the queue (calling the handler’s process
method) if the defined amount of queued messages is reached or no Acknowledger
instance is given.
Acknowledger
An interesting part is the Acknowledger
argument for the __invoke
method. To understand what’s happening here (aka where this argument comes from), take a look at the \Symfony\Component\Messenger\Middleware\ HandleMessageMiddleware::handle
method which identifies your handler as a batch handler through the handler descriptor (your handler implements the BatchHandlerInterface
interface) and if the message has an AckStamp
on the message’s envelope (more about that in a second), a new Acknowledger
is instantiated and passed in as the second argument to the handler call:
So whenever the batch handler doesn’t acknowledge a message (because of an exception; see the line with $ack->nack($e)
in the announcement example above) the exception is wrapped in a HandlerFailedException
which may lead to a new delivery attempt (retry) of the worker.
The AckStamp
is added to the message’s envelope in the \Symfony\Component\Messenger\Worker::handleMessage
method automatically, when the message is dispatched into the message bus by the worker.
This therefore implies that the AckStamp
is only added for asynchronous transports! Messages over the synchronized transport (sync://
) are always processed immediately (not in batches).
Batch Handling
Let’s get back to the handler.
The handle
Method must either return the number of pending messages in the batch if $ack
is not null or the result of handling the message (otherwise, a LogicException
is thrown in the HandleMessageMiddleware
middleware). The trait is implementing this method for you.
If no Acknowledger
instance is passed, the message is processed immediately (the stack is forcibly flushed).
Otherwise, the message is pushed to the internal queue (an array in fact) and the method shouldFlush
is called to decide whether the threshold (default = 10) has been reached. If so, the queue is flushed and the batch is processed by the handler’s process
method.
To define another limit, you can simply add a method like
private function shouldFlush(): bool
{
return 15 <= \count($this->jobs);
}
in your handler².
As said before, if the defined batch size is reached, the handler’s process
method is called with the queued jobs as argument. That argument is an array like
[
[
0: MyMessage,
1: Acknowledger
],
[
0: MyMessage,
1: Acknowledger
],
]
so you can iterate over it like:
The argument for the ack
method is only relevant for synchronous command buses expecting a return value. I prefer not to return a value, so we simply use the message itself as argument.
TLDR
To cut a long story short, to handle messages in batches you have to
- create a message and handler
- configure asynchronous transport for the messages and handler
- implement the
BatchHandlerInterface
for your handler - use the
BatchHandlerTrait
(or implement the methods yourself) - expect an
Acknowledger
instance as second argument of the handler’s__invoke
method - overwrite the trait’s
shouldFlush
method to set a custom batch size (default = 10) - implement a
process
method in your handler - process the jobs and handle possible exceptions by
ack
ing ornack
ing - simply set the job’s message as
ack
argument if you don’t expect the handler to return a result
Working example
I’ve created a complete working example on GitHub.
I hope I’ve described everything correctly, if not, please leave a comment and I’ll be happy to adjust the article. Thanks for reading and happy batch processing with Symfony!
[1] What’s interesting is that the $force
boolean variable of the flush
method is never used in the trait. But maybe that’s intended as some kind of option if you implement the flush
method yourself.
[2] Remember that you can override a trait’s method at any time. The method implemented in the class that’s using the trait takes precedence.