Please wait...

Symfony Messenger : background task

Hello,

I would like to introduce you how to make a background task with Symfony Messenger component.

You need to run a large script (file import/export, for example) from your web application, while guaranteeing its processing: Symfony's Messenger component can be very useful in this case.

Let's imagine that we have an AwesomeService that executes a big data import method, for example.

<?php
#src\Service\AwesomeService.php

namespace App\Service;

use Doctrine\ORM\EntityManagerInterface;

class AwesomeService
{
    private $em;

    public function __construct(EntityManagerInterface $em) 
    {
        $this->em = $em;
    }

    public function import(array $parameters): array 
    {
        // This method is to illustrate this example
        $results = [];

        // Do your awesome stuff...

        return $results;
    }
}

Messenger component installation

Install component with composer :

php composer require symfony/messenger

[Recommended] This other component allows you to store message information in a messenger_messages database table:

php composer require symfony/doctrine-messenger

This can be very useful to manage script's retries and failures, to relaunch them via another transport queue.

Message and handler creation

This component works in a similar way to the one mentioned in a previous article : Event listener.

Let's start by creating a message:

<?php
# src\Messenger\Message\AwesomeMessage.php

namespace App\Messenger\Message;

class AwesomeMessage
{
    private $userId;
    private $parameters;

    public function __construct(int $userId, ?array $parameters = []) 
    {
        // We can retrieve the current user, with $userId
    }

    public function getUserId(): int
    {
        return $this->userId;
    }

    public function getParameters(): array
    {
        return $this->parameters;
    }
}

Now let's create the corresponding handler, to run the big service method, and to send an e-mail or an other message

<?php
# src\Messenger\MessageHandler\AwesomeMessageHandler.php

namespace App\Messenger\MessageHandler;

use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Doctrine\ORM\EntityManagerInterface;

// To send an email
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\Email;
use Symfony\Component\Mailer\Exception\TransportExceptionInterface;

// To send an other message
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;

use App\Messenger\Message\AwesomeMessage;
use App\Service\AwesomeService;

class AwesomeMessageHandler implements MessageHandlerInterface
{
    private $awesome_service;
    private $em;
    private $message_bus;
    private $mailer;

    public function __construct( 
        AwesomeService $awesome_service, 
        EntityManagerInterface $em,
        MessageBusInterface $message_bus,
        MailerInterface $mailer
    ) {
        $this->awesome_service = $awesome_service;
        $this->em = $em;
        $this->message_bus = $message_bus;
        $this->mailer = $mailer;
    }

    public function __invoke(AwesomeMessage $message)
    {
        $user = $this->em->getRepository(User::class)->findOneById($message->getUserId());
        $results = $this->awesome_service->import($message->getParameters());

        // If error, send an email, for example
        if (true === $results['has_error'])
        {
            $email = (new Email())
                ->from('hello@example.com')
                ->to($user->getEmail())
                ->subject('Time for Symfony Mailer!')
                ->text('Sending emails is fun again!')
                ->html('<p>See Twig integration for better HTML integration!</p>');

            try {
                $this->mailer->send($email);
            } catch (TransportExceptionInterface $e) {

            }
        }

        // We could also dispatch another message
        $new_message = new AwesomeOtherMessage($results); // To be created
        $this->message_bus->dispatch(
            (new Envelope($new_message))
                ->with(new DispatchAfterCurrentBusStamp())
        );

    }
}

Use messenger component

We're going to create a controller that will dispatch our message

// src/Controller/DefaultController.php
namespace App\Controller;

use App\Messenger\Message\AwesomeMessage;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $bus): Response
    {
        // will cause the AwesomeMessageHandler to be called with the current user id
        $bus->dispatch(new AwesomeMessage($this->getUser()->getId()));
        // ...
    }
}

Component configuration

We create the component configuration file

# config/packages/messenger.yaml
framework:
    messenger:
        # reset services after consuming messages
        reset_on_message: true
        failure_transport: failed 

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
            failed: '%env(MESSENGER_TRANSPORT_DSN)%?queue_name=failed' # failed messages will be stored in your database table
            sync: 'sync://'

        routing:
            # Route your messages to the transports
            'App\Messenger\Message\AwesomeMessage': sync # Synchrone : For tests
            'App\Messenger\Message\AwesomeMessage': async # Background mode

Test message delivery from the command line

The component includes commands to route messages from a transport queue (here async)

php bin/console messenger:consume async -vv 

Production deployment

Create a stand-alone service to handle messages

[Unit]
Description=Symfony messenger-consume %i

[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
Restart=always
RestartSec=30

[Install]
WantedBy=default.target

Service activation

sudo systemctl enable messenger-worker@service
sudo systemctl start messenger-worker@service

For more information, see the Messenger component documentation.