Drupal 11: The Queues API

I've talked a lot about the Batch API in Drupal recently, and I've mentioned that it is built upon the Queue API, but I haven't gone any deeper than that. I wrote about the Queues API in Drupal 7, but thought I would bring my understanding up to date.

A queue is a data construct that uses a  "first in, first out" (or FIFO) flow where items are processed in the order that they were added to the queue. This system has a lot of different uses, but is most important when it comes to asynchronous data processing. Drupal and many modules make use of the queue system to process information behind the scenes.

The difference between a queue and a batch is that the batch is for time sensitive things where the user is expecting something to happen. A queue, on the other hand, is more for data processing that needs to happen behind the scenes or without any user triggering the process.

Batches also tend to be stateless, meaning that if the batch fails half way through it is sometimes difficult to re-start the batch from the same point. It is possible if you create your batches in just the right way, but this is actually a little rate. A queue manages this much better by having all of the items in the queue and then giving you options about what you can do with each item as you process it. This means that you might pop a queue item back into the queue for later processing if it failed.

In this article I will look at the Queue API in Drupal 11, how it is used and what sort of best practices are used when using the API.

Creating A Queue

To create a queue in Drupal you need to create an instance of the 'queue' service. This is a factory that can be used to create and manage your queues inside Drupal. By default, all queues in Drupal are database queues (handled via the queue.database default queue factory), although this can be changed with configuration settings.

We'll visit the different types of queue in a later article, but for now we can assume that our queue is stored and manged in a database table called "queue".

Once you have the factory created you can then create a simple queue using the get() method, passing in the name that you want your queue to be. Here is how to get hold of a queue in Drupal.

/** @var \Drupal\Core\Queue\QueueFactoryInterface $queueFactory */
$queueFactory = \Drupal::service('queue');

/** @var \Drupal\Core\Queue\QueueInterface $queue */
$queue = $this->queueFactory->get('queue_simple_example');

You can simplify this using the following.

/** @var \Drupal\Core\Queue\QueueInterface $queue */
$queue = \Drupal::queue('queue_simple_example');

Now that we have a queue we can add an item to it using the createItem() method. The information we pass to the createItem() method is stored in the queue table as serialised PHP data, so as long as we can reconstruct the data afterwards then we are free to pass anything we want to the queue.

Here is an example of creating a queue item using a PHP stdClass object to store the data in an object.

$item = new \stdClass();
$item->id = 123;
$queue->createItem($item);

If we look at the "queue" table now we can see that our queue item is now present.

> select * from queue;
+---------+--------------------------------+------------------------------------+--------+------------+
| item_id | name                           | data                               | expire | created    |
+---------+--------------------------------+------------------------------------+--------+------------+
|    5156 | queue_simple_example           | O:8:"stdClass":1:{s:2:"id";i:123;} |      0 | 1735664606 |
+---------+--------------------------------+------------------------------------+--------+------------+

A more advanced method of using this system is to create a queue DTO class that can be used to store data in the queue. This is better than using the stdClass object to set things up as you get more control over how the object stores information and what behaviors it has when sleeping and waking (i.e. using the serialization process). I will go over this technique in a future article, but for now, we have now created an item in the queue.

If you want to check to see how many items are now in the queue you can use the numberOfItems() method to return this information.

$number = $queue->numberOfItems()

After running the createItem() method once we should have a single item in our queue.

Let's go onto doing something with this queue item.

The Queue Worker

Once you have added items to the queue you will need a way to perform work with them. Queue workers are a type of Drupal plugin that you create to process the data in your queue items.

To create a queue worker you need to create a class that extends the class Drupal\Core\Queue\QueueWorkerBase in the src/Plugin/QueueWorker directory in your custom module. The definition of the class needs to contain an annotation that tells Drupal that this is a queue worker and what queue it acts upon.

Following on from our "queue_simple_example" queue above, we would create the annotation like this.

/**
 * Queue worker for the queue_simple_example.
 *
 * @QueueWorker(
 *   id = "queue_simple_example",
 *   title = @Translation("Queue worker for the simple queue example."),
 *   cron = {"time" = 60}
 * )
 */

The "title" is the human readable title of the queue, which isn't usually printed out to users. The "cron" part details the amount of time that the queue can be processed for during a cron run, which in this case is 60 seconds.

The class itself must contain a method called processItem(), which accepts a $data parameter. This parameter is the data we added to the queue when creating it, which you then process in this method. As the queue processor works through the queue it calls a claimItem() method on the queue, which will return an item to process and set a lease time of the queue item as it is pulled out of the database. This means that individual items are not processed twice once a queue processor has claimed it.

Here is a very simple implementation of a queue worker class that accepts items from the "queue_simple_example" queue. We are also implementing the ContainerFactoryPluginInterface interface so that we can inject different services into the plugin so that we can perform the work we need.

<?php

namespace Drupal\queue_simple_example\Plugin\QueueWorker;

use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\StringTranslation\StringTranslationTrait;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
 * Queue worker for the queue_simple_example.
 *
 * @QueueWorker(
 *   id = "queue_simple_example",
 *   title = @Translation("Queue worker for the simple queue example."),
 *   cron = {"time" = 60}
 * )
 */
class QueueExampleWorker extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  use StringTranslationTrait;

  /**
   * Logger factory.
   *
   * @var \Drupal\Core\Logger\LoggerChannelInterface
   */
  protected $logger;

  /**
   * {@inheritDoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    $instance = new self($configuration, $plugin_id, $plugin_definition);
    $instance->logger = $container->get('logger.channel.queue_simple_example');
    return $instance;
  }

  /**
   * {@inheritDoc}
   */
  public function processItem($data) {
    // Process the queue item here, and then create a log message.
    $this->logger->info($this->t('Processed simple queue item @id', ['@id' => $data->id]));
  }

}

What you do in the processItem() method depends on what you are trying to do, you will probably need to add in the related services that you need to process the queue item in the appropriate way.

The above processItem() method does nothing, but succeeds the processing and returns without any error. This means that the upstream queue processing code will assume that everything went well and will remove the item from the queue. If something does go wrong then you can throw an exception to get the queue processor to return the item to the queue for later processing.

Throwing different types of exception in your processItem() method have different effects on the queue item.

  • \Drupal\Core\Queue\DelayedRequeueException - If the queue implements the \Drupal\Core\Queue\DelayableQueueInterface interface then the delayItem() method is used to delay the execution of the queue item for a set period of time (which is passed as part of the exception). If the queue doesn't support the DelayableQueueInterface interface then the item is simply held back in the queue for a while, which defaults to an hour.
  • \Drupal\Core\Queue\RequeueException - The item is added back into the queue and the lease time is reset, which means that it will be picked up as soon as the queue manager comes to that item.
  • \Drupal\Core\Queue\SuspendQueueException - The item is added back into the queue and the queue execution is stopped.
  • \Exception - The item is added back into the queue for later processing and an error is logged.

As the queue processes through the items it contains you can throw the above exceptions to do different things with the queue items. All of these exceptions will put the item back into the queue, so if you do not want to do that you should not throw an exception at all. This will allow the queue processor to delete the item from the queue and continue with the other items.

I've talked about processing a queue, but exactly how can we do that?

Processing A Queue

All queues are processed by Drupal automatically during a cron run.

This means that you can process a queue in the following ways:

  • Via the cron processor form at "/admin/config/system/cron".
  • Via the "Run cron" link on the status page.
  • Via Drush using the command "drush cron".

Each of these methods will trigger Drupal to pick up any currently pending items in the queue and process them using the appropriate queue worker. The time parameter set in the queue worker annotation is used to dictate how long the queue will be processed for, and Drupal will automatically shut down the queue processing once that limit has been reached. 

You can also use the Queue UI module to inspect and process your queue items. This module is a really handy way of inspecting the counts of all of the queues in your system and also allows you to process the queue items using a batch process.

Uses Of A Queue

There are many reasons why you would want to use a queue in Drupal.

The core systems in Drupal make use of the Queue API when checking for updates as this check doesn't necessarily need to be performed by a user and can be added to the queue for processing on a daily or weekly schedule (as configured in the update manager settings).

You can also spot lots of third party modules making use of the queue system in order to perform tasks. For example, the Warmer module will make use of the queue system to create caches for different objects on the site by filling the queue with the items that need to be "warmed" and then processing them during cron runs.

Here are some ideas for processing items in a queue:

  • Add any potentially lengthy operations to the queue for later processing. For example, if you wanted to delete all taxonomy terms on the site you would add all of the term IDs to a queue and then process the term deletions during the queue processing. Depending on how many terms are on the site this process might take seconds or days, but the use of a queue means that every item will be processed in turn.
  • Synchronism of items from (or to) an API is really useful to do in a queue, especially if that synchronization doesn't require any user input. For example, let's say that you have an API that you need to post details to when the user updates their profile, which might involve something like pushing newsletter settings to a third party service. When the user saves their preferences you can pop an item into the queue so that their newsletter settings can be picked up and synchronized during the next cron run. Doing this means that the user doesn't need to wait for the preferences to be saved on the fly, but it also builds in resiliency. If you API goes down then the items in the queue will throw exceptions during processing and be put back into the queue. When your API comes back online the queue will process the updates correctly and your users will never know anything was wrong.

I have used the queue system in Drupal since it was available in Drupal 7. In fact, one of my more complex queue implementations was for a client who had to interface with a (very) slow API system. The system in question was a billing platform that would pull details from an API and gave users the ability to pay their bills online. The tricky part was that some users had lots of accounts on the system, and pulling in that information on the fly would take up to 25 minutes for some of the largest accounts. Clearly, this was not an acceptable amount of time for a user to wait for their account page to load, so we had to think of a workaround.

The solution to this problem was to identify users with the largest number of accounts on the system and pre-cache their account details overnight using a series of queues. This meant that we could fetch the information from the API for those users and store it in the Drupal cache system so that when the user loaded their account their data was pulled from the cache, rather than directly from the API. When the user changed something on the account we would invalidate the cache for the item they changed and re-synchronise it straight away, without having to fetch all the data again. I worked with the client to fine tune the threshold of the number of accounts that would trigger this pre-cache step, but this solved a potentially big issue and allowed the platform to load quickly.

The basic rule of thumb is that any list of things that can to be processed asynchronously can be processed by a queue worker.

Conclusion

What I have shown here are the basics of using the Queue API in Drupal. There are a number of different details that I have skipped over here and will address in future articles on this subject.

Both batches and queues have their use cases in Drupal. The queue system should be used for data that you don't need to process immediately, but still need to process in a timely manner. The batch system is used when you have to process the data immediately, and a user is watching (and waiting) for the data to be processed. If you aren't careful, the batch system can start to creak a little when you have lots of data to process. It's often better to use a queue to store and process data when you want to be sure that everything is processed and you don't need to have the results straight away.

All of the code that you see here is available in a GitHub repository that contains a series of examples of the queue system in action. This example module works just like the Batch API examples module, where you can pick and chose what example you want to look at. The code in this article is available in the sub module called queue_simple_example. Nothing in the module will delete or change data without telling you about it first, so feel free to download and have a play.

Add new comment

The content of this field is kept private and will not be shown publicly.