How to handle messages in batches with Symfony Messenger.
A useful feature explained.
Symfony Messenger 5.4 introduced handling messages in batches at the end of 2021.
The benefits are obvious — you handle multiple messages at once instead of processing them one by one, which is more useful and efficient in some cases (think of feeding a batch of documents into an external search engine like Solr with one HTTP request instead of multiple single requests per message).
Unfortunately, there’s nothing more to find in the Messenger documentation than the announcement I linked above. There is an unanswered question on Stack Overflow on how to actually use it, an open issue on Symfony’s GitHub page to add proper documentation, and some related threads in the Slack channel.
Let’s examine this feature in greater detail. If you just want to use it, you can skip straight to the summary.
I assume you’ve already added the
symfony/messenger component to your project and have a basic understanding of how it works.
Among others, the following parts are related to handling messages in batches with the messenger component:
I won’t go into all the details, but you can take a look at the code yourself if you’re curious.
Let’s take a closer look at the example given in the announcement:
The handler class implements the
BatchHandlerInterface interface that declares only one public method
* Flushes any pending buffers.
* @param bool $force Whether flushing is required
public function flush(bool $force): void;
Next there’s the
BatchHandlerTrait trait which implements the interface and some private methods to queue and handle the messages.
Then there’s the actual
__invoke Method that’s called by the middleware to handle the message
MyMessage which takes another (optional) argument of type
__invoke Method calls the private
handle Method (the default implementation is in the trait) and passes in the given message
MyMessage and the instance of the
handle method either queues the messages or flushes the queue (calling the handler’s
process method) if the defined amount of queued messages is reached or no
Acknowledger instance is given.
An interesting part is the
Acknowledger argument for the
__invoke method. To understand what’s happening here (aka where this argument comes from), take a look at the
\Symfony\Component\Messenger\Middleware\ HandleMessageMiddleware::handle method which identifies your handler as a batch handler through the handler descriptor (your handler implements the
BatchHandlerInterface interface) and if the message has an
AckStamp on the message’s envelope (more about that in a second), a new
Acknowledger is instantiated and passed in as the second argument to the handler call:
So whenever the batch handler doesn’t acknowledge a message (because of an exception; see the line with
$ack->nack($e) in the announcement example above) the exception is wrapped in a
HandlerFailedException which may lead to a new delivery attempt (retry) of the worker.
This therefore implies that the
AckStamp is only added for asynchronous transports! Messages over the synchronized transport (
sync://) are always processed immediately (not in batches).
Let’s get back to the handler.
handle Method must either return the number of pending messages in the batch if
$ack is not null or the result of handling the message (otherwise, a
LogicException is thrown in the
HandleMessageMiddleware middleware). The trait is implementing this method for you.
Acknowledger instance is passed, the message is processed immediately (the stack is forcibly flushed).
Otherwise, the message is pushed to the internal queue (an array in fact) and the method
shouldFlush is called to decide whether the threshold (default = 10) has been reached. If so, the queue is flushed and the batch is processed by the handler’s
To define another limit, you can simply add a method like
private function shouldFlush(): bool
return 15 <= \count($this->jobs);
in your handler².
As said before, if the defined batch size is reached, the handler’s
process method is called with the queued jobs as argument. That argument is an array like
so you can iterate over it like:
The argument for the
ack method is only relevant for synchronous command buses expecting a return value. I prefer not to return a value, so we simply use the message itself as argument.
To cut a long story short, to handle messages in batches you have to
- create a message and handler
- configure asynchronous transport for the messages and handler
- implement the
BatchHandlerInterfacefor your handler
- use the
BatchHandlerTrait(or implement the methods yourself)
- expect an
Acknowledgerinstance as second argument of the handler’s
- overwrite the trait’s
shouldFlushmethod to set a custom batch size (default = 10)
- implement a
processmethod in your handler
- process the jobs and handle possible exceptions by
- simply set the job’s message as
ackargument if you don’t expect the handler to return a result
I’ve created a complete working example on GitHub.
I hope I’ve described everything correctly, if not, please leave a comment and I’ll be happy to adjust the article. Thanks for reading and happy batch processing with Symfony!
 What’s interesting is that the
$force boolean variable of the
flush method is never used in the trait. But maybe that’s intended as some kind of option if you implement the
flush method yourself.
 Remember that you can override a trait’s method at any time. The method implemented in the class that’s using the trait takes precedence.