How to handle messages in batches with Symfony Messenger.

Wolfgang Klinger
4 min readAug 1, 2022

--

A useful feature explained.

Image by https://www.pexels.com/@renato-k-56319541/

Symfony Messenger 5.4 introduced handling messages in batches at the end of 2021.

The benefits are obvious — you handle multiple messages at once instead of processing them one by one, which is more useful and efficient in some cases (think of feeding a batch of documents into an external search engine like Solr with one HTTP request instead of multiple single requests per message).

Unfortunately, there’s nothing more to find in the Messenger documentation than the announcement I linked above. There is an unanswered question on Stack Overflow on how to actually use it, an open issue on Symfony’s GitHub page to add proper documentation, and some related threads in the Slack channel.

Let’s examine this feature in greater detail. If you just want to use it, you can skip straight to the summary.

Messenger component

I assume you’ve already added the symfony/messenger component to your project and have a basic understanding of how it works.

Among others, the following parts are related to handling messages in batches with the messenger component:

  • \Symfony\Component\Messenger\Worker
  • \Symfony\Component\Messenger\Middleware\HandleMessageMiddleware
  • \Symfony\Component\Messenger\Handler\BatchHandlerInterface
  • \Symfony\Component\Messenger\Handler\BatchHandlerTrait
  • \Symfony\Component\Messenger\Handler\Acknowledger
  • \Symfony\Component\Messenger\Stamp\AckStamp
  • \Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp

I won’t go into all the details, but you can take a look at the code yourself if you’re curious.

Example dissection

Let’s take a closer look at the example given in the announcement:

The handler class implements the BatchHandlerInterface interface that declares only one public method flush¹

/**
* Flushes any pending buffers.
*
* @param bool $force Whether flushing is required
*/
public function flush(bool $force): void;

Next there’s the BatchHandlerTrait trait which implements the interface and some private methods to queue and handle the messages.

Then there’s the actual __invoke Method that’s called by the middleware to handle the message MyMessage which takes another (optional) argument of type Acknowledger.

The __invoke Method calls the private handle Method (the default implementation is in the trait) and passes in the given message MyMessage and the instance of the Acknowledger.

Then the handle method either queues the messages or flushes the queue (calling the handler’s process method) if the defined amount of queued messages is reached or no Acknowledger instance is given.

Acknowledger

An interesting part is the Acknowledger argument for the __invoke method. To understand what’s happening here (aka where this argument comes from), take a look at the \Symfony\Component\Messenger\Middleware\ HandleMessageMiddleware::handle method which identifies your handler as a batch handler through the handler descriptor (your handler implements the BatchHandlerInterface interface) and if the message has an AckStamp on the message’s envelope (more about that in a second), a new Acknowledger is instantiated and passed in as the second argument to the handler call:

So whenever the batch handler doesn’t acknowledge a message (because of an exception; see the line with $ack->nack($e) in the announcement example above) the exception is wrapped in a HandlerFailedException which may lead to a new delivery attempt (retry) of the worker.

The AckStamp is added to the message’s envelope in the \Symfony\Component\Messenger\Worker::handleMessage method automatically, when the message is dispatched into the message bus by the worker.

This therefore implies that the AckStamp is only added for asynchronous transports! Messages over the synchronized transport (sync://) are always processed immediately (not in batches).

Batch Handling

Let’s get back to the handler.

The handle Method must either return the number of pending messages in the batch if $ack is not null or the result of handling the message (otherwise, a LogicException is thrown in the HandleMessageMiddleware middleware). The trait is implementing this method for you.

If no Acknowledger instance is passed, the message is processed immediately (the stack is forcibly flushed).

Otherwise, the message is pushed to the internal queue (an array in fact) and the method shouldFlush is called to decide whether the threshold (default = 10) has been reached. If so, the queue is flushed and the batch is processed by the handler’s process method.

To define another limit, you can simply add a method like

private function shouldFlush(): bool
{
return 15 <= \count($this->jobs);
}

in your handler².

As said before, if the defined batch size is reached, the handler’s process method is called with the queued jobs as argument. That argument is an array like

[
[
0: MyMessage,
1: Acknowledger
],
[
0: MyMessage,
1: Acknowledger
],
]

so you can iterate over it like:

The argument for the ack method is only relevant for synchronous command buses expecting a return value. I prefer not to return a value, so we simply use the message itself as argument.

TLDR

To cut a long story short, to handle messages in batches you have to

  • create a message and handler
  • configure asynchronous transport for the messages and handler
  • implement the BatchHandlerInterface for your handler
  • use the BatchHandlerTrait (or implement the methods yourself)
  • expect an Acknowledger instance as second argument of the handler’s __invoke method
  • overwrite the trait’s shouldFlush method to set a custom batch size (default = 10)
  • implement a process method in your handler
  • process the jobs and handle possible exceptions by acking or nacking
  • simply set the job’s message as ack argument if you don’t expect the handler to return a result

Working example

I’ve created a complete working example on GitHub.

I hope I’ve described everything correctly, if not, please leave a comment and I’ll be happy to adjust the article. Thanks for reading and happy batch processing with Symfony!

[1] What’s interesting is that the $force boolean variable of the flush method is never used in the trait. But maybe that’s intended as some kind of option if you implement the flush method yourself.

[2] Remember that you can override a trait’s method at any time. The method implemented in the class that’s using the trait takes precedence.

--

--

Wolfgang Klinger
Wolfgang Klinger

Written by Wolfgang Klinger

Programmer, Photographer, Gardener

No responses yet