What is RabbitMQ Used for in Magento 2

Rabbitmq

As we all know, RabbitMQ is an open-source message broker that offers a reliable, highly available, scalable, and portable messaging system.

RabbitMQ over Cronjob

Using Crontab to make asynchronous tasks is a basic approach whereas using a job/task queue manager is an elaborate one and gives you more control, power, and scalability/elasticity. Crontab is very easy to deal with but does not offer a lot of functionalities. It is best for scheduled jobs rather than for asynchronous tasks.

Message queues provide an asynchronous communications mechanism in which the sender and the receiver of a message do not contact each other. Nor do they need to communicate with the message queue at the same time. When a sender places a message onto a queue, it is stored until the recipient receives it.

Let’s have a quick look at the asynchronous messaging before we start to know how RabbitMQ works.

Asynchronous Messaging

Asynchronous Messaging can be described as a serial communication between two systems. In this method, the initiating system puts messages in a queue and proceeds to work on its other processes without waiting for an immediate response. This can be imagined as a system where the sender and receiver need not be connected to each other continuously rather than loose bindings based on rules of the message exchange. It is similar to how a person would drop a piece of mail in the postbox and the postman picks it up to eventually deliver it to the recipient. One of the most popular asynchronous messaging protocols is the Advanced Message Queuing Protocol (AMQP).

RabbitMQ for Asynchronous Messaging

In one of the examples where a Magento application transmits information to multiple applications and returns quickly to its mainstream work, RabbitMQ – the lightweight yet feature-rich messaging solution which could also be deployed across distributed networks, was implemented. It worked wonders and was able to reduce the burden on the core Magento application and efficiently sent the messages to appropriate third-party applications. It helped in application scale-up and processing 1,00,000 requests at ease from 25,000 earlier.

rabbitmq

What is this RabbitMQ and How does it work?

RabbitMQ can be termed as a message broker which primarily uses AMQP for processing communications between 2 applications. It can be visualized as a middleman where Queues can be defined and applications can connect to the Queues to pull the messages. Some of these tasks might be time-consuming. There would be a need to handle multiple requests too. The message broker puts these tasks into corresponding queues for the other application to process the tasks independently.

Message Queuing allows web servers to respond readily to incoming requests and route them to a third party for processing rather than perform the task instantaneously. This helps in handling multiple, resource-intensive requests with higher efficiency.

Components of RabbitMQ Architecture:

  • Producer – This is basically an application or component which takes the requests from the users and pushes them to the exchange. A request can be classified as a task, simple broadcast information, or a file
  • Exchange – This part of the RabbitMQ takes the request and sends it to the queues based on Exchange Type (as mentioned above) and applies rules for routing called ‘Bindings’
  • Queue – The queues are the parts that hold on to the request/information until any appropriate Subscriber requests for the information. These Queues are bound to exchanges identified by the ‘Binding Key’
  • Consumer – These are the systems that pull information from the queue and process each of them independently

RabbitMQ in Magento 2

Magento 2 has added asynchronous messaging capabilities with the introduction and implementation of RabbitMQ. Both Magento commerce and Magento open-source uses RabbitMQ to manage message queues. As of Magento release 2.3.0, RabbitMQ can also be used on Magento open source installations.

Magento was using MySQL adapters for messaging and cron jobs to ensure delivery of messages but this was found not to be very reliable and scalable option.

RabbitMQ could be very beneficial in situations where Magento needs to communicate with an ERP System or a dedicated logistics application. In a real-world scenario, the use of RabbitMQ will allow us to decouple an ERP integration process from the customer order fulfilment process. For example, when Magento triggers an Order Creation event, we can just pass the order id (message) into the order export message queue (publisher) and let the order complete normally. Now comes the RabbitMQ which queues the order ids (messages) until another process (consumer) takes an order id (message) and processes that order in its ERP System. RabbitMQ can be used to get back messages from the other systems into Magento 2. With the Consumer now running as a background process, it will give great scalability, flexibility in runtime and better error handling. A lot of scheduled tasks (cronjobs) in Magento can now be queued and processed asynchronously using RabbitMQ. With the growth of e-commerce and the ever-increasing volume of orders, message queuing techniques help by decoupling resource-intensive processes. Furthermore, this improves the frontend response time by giving an overall better user experience.

rabbitmq

We are done with the basic part. Let’s start with the coding part.

Define the environment variables in the environment file located at ‘app/etc/env.php’

'queue' => [
   'amqp' => [
       'host' => '127.0.0.1',
       'port' => '5672',
       'user' => 'codilar',
       'password' => 'codilar',
       'virtualhost' => '/',
       'ssl' => false
   ]
]

Configuring the message queue topology

Now we will create a sample module called Codilar_QueueExample. A message queue requires 4 XML files in //etc folder:

  • communication.xml – Defines aspects of the message queue system that all communication types have in common.
  • queue_consumer.xml – Defines the relationship between an existing queue and its consumer.
  • queue_topology.xml – Defines the message routing rules and declares queues and exchanges.
  • queue_publisher.xml – Defines the exchange where a topic is published.

communication.xml

We declare our topic as ‘queueexample.queue.order’

  • Define its datatype ‘string‘. This is the datatype for pushing any messages through Magento
  • Declare handler class ‘Codilar\QueueExample\Model\Queue\Consumer’ with ‘process’ method to handle input from the queue
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="queueexample.queue.order" request="string">
        <handler name="processAddOrderToQueue"  type="Codilar\QueueExample\Model\Queue\Consumer" method="process" />
    </topic>
</config>
Next up, the handler class Codilar\QueueExample\Model\Queue\Consumer.php
Class Consumer
{
    /** @var \Psr\Log\LoggerInterface  */
    protected $_logger;
 
    public function process($orders)
  {
        try{
		 //function execute handles saving order object to table
            $this->execute($orders);
 
        }catch (\Exception $e){
            //logic to catch and log errors 
            $this->_logger->critical($e->getMessage());
        }
    }
}

Execute function is a placeholder for now, we’ll return to this after step 2

queue_consumer.xml

In this file, we define the consumer parameters. It’s also possible to declare a handler class in this file.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="queueexample.queue.order"
              queue="queueexample.queue.order"
              connection="db"
              maxMessages="5000"
              consumerInstance="Magento\Framework\MessageQueue\Consumer"
              handler="Codilar\QueueExample\Model\Queue\Consumer::process"/>
</config>
  • The name and queue attributes are required.
  • The connection: For AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be ‘db’.

Note: handler class can be declared in either communication.xml or queue_consumer.xml . It’s not required to declare a handler in both places.

queue_topology.xml

This file defines the message routing rules and declares queues and exchanges

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento-db" type="topic" connection="db">
        <binding id="processAddOrderToQueueBinding"
                 topic="queueexample.queue.order"
                 destinationType="queue"
                 destination="queueexample.queue.order"/>
    </exchange>
</config>
  • Exchange name, type, connection attributes are required.
  • The exchange name should match with the exchange attribute of connection node in queue_publisher.xml file.
  • Type ‘topic’ meaning the exchange routes events to queues by matching the topic. 
  • Connection is ‘db’ since we’re using MySQL for the queue system.

queue_publisher.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <publisher topic="queueexample.queue.order">
        <connection name="db" exchange="magento-db" />
    </publisher>
</config>

The topic attribute matches the one defined in communication.xml.

Sending a message from the publisher to a queue

In this step, we will create a controller to send messages from the publisher to a queue. Since we’re using Mysql as queue system, messages are stored in Magento’s database.

  • queue table manages queues
  • queue_message table stores messages. 
  • queue_message_status manages the message queue workload

Our module has a button in config page sending ajax request to queueexample/queue/order to start adding orders to the queue. With message queue integration, the content of this action class is as below:

<?php
namespace Codilar\QueueExample\Controller\Adminhtml\Queue;
 
/**
 * Class Order
 * @package Codilar\QueueExample\Controller\Adminhtml\Queue
 */
class Order extends \Magento\Backend\App\Action
{
	/**
 	* Authorization level of a basic admin session
 	*/
	const ADMIN_RESOURCE = 'Codilar_QueueExample::config_queueexample';
 
	const TOPIC_NAME = 'queueexample.queue.order';
 
	const SIZE = 5000;
 
	/* @var \Magento\Sales\Model\ResourceModel\Order\CollectionFactory  /
	protected $_orderColFactory;
 
	/* @var \Magento\Framework\Serialize\Serializer\Json  /
	protected $_json;
 
	/* @var \Magento\Framework\MessageQueue\PublisherInterface  /
	protected $_publisher;
 
	/**
 	* Order constructor.
 	*
 	* @param \Magento\Sales\Model\ResourceModel\Order\CollectionFactory $orderColFactory
 	* @param \Magento\Framework\MessageQueue\PublisherInterface $publisher
 	* @param \Magento\Framework\Serialize\Serializer\Json $json
 	* @param \Magento\Backend\App\Action\Context $context
 	*/
	public function __construct(
    	\Magento\Sales\Model\ResourceModel\Order\CollectionFactory $orderColFactory,
    	\Magento\Framework\MessageQueue\PublisherInterface $publisher,
    	\Magento\Framework\Serialize\Serializer\Json $json,
    	\Magento\Backend\App\Action\Context $context
	){
    	$this->_orderColFactory = $orderColFactory;
    	$this->_json = $json;
    	$this->_publisher = $publisher;
	}
 
	/**
 	* @return \Magento\Framework\App\ResponseInterface|\Magento\Framework\Controller\ResultInterface|void
 	*/
	public function execute()
	{
    	if ($this->getRequest()->isAjax()) {
        	try {
		//get list of order IDs
$orderCollection = $this->_orderColFactory->create()->addFieldToSelect('entity_id')->getAllIds();
		//send data to publish function
            	$this->publishData($orderCollection, $this->type);
            	$this->getResponse()->setBody($this->_json->serialize([
                	'error' => 0,
                	'message' => __('Orders are being added to queue')
            	]));
            	return;
        	} catch (\Exception $e) {
            	$this->getResponse()->setBody($this->_json->serialize([
                	'error' => 0,
'message' => __('Something went wrong while adding record(s) to queue. Error: '.$e->getMessage())
            	]));
            	return;
        	}
    	}
    	return $this->_redirect('*/*/index');
	}
 
	/**
 	* @param $data
 	* @param $type
 	*/
	public function publishData($data,$type)
	{
    	if(is_array($data)){
		//split list of IDs into arrays of 5000 IDs each
        	$chunks = array_chunk($data,self::SIZE);
        	foreach ($chunks as $chunk){
			//publish IDs to queue
            		$rawData = [$type => $chunk];
$this->_publisher->publish(self::TOPIC_NAME, $this->_json->serialize($rawData));
        	}
    	}
	}
}
In this action class, we fetch the list of order IDs and split them into chunks of 5000 IDs each, then add each chunk to the message queue. This solves two problems:
Unresponsive Magento backend
PHP timeout due to too many orders needed to be processed
Processing message from queue
In this step we’ll expand the handler class declared in step 1:
Codilar\QueueExample\Model\Queue\Consumer.php
<?php
namespace Codilar\QueueExample\Model\Queue;
 
/**
 * Class Consumer
 * @package Codilar\QueueExample\Model\Queue
 */
class Consumer
{
	....
	/* @var \Magento\Framework\Serialize\Serializer\Json  /     
protected $_json;
 
	/**
 	* @param string $orders
 	*/
	public function process($orders)
	{
    	try{
        	$this->execute($orders);
        	
    	}catch (\Exception $e){
        	$errorCode = $e->getCode();
        	$message = __(Something went wrong while adding orders to queue');
        	$this->_notifier->addCritical(
            	$errorCode,
            	$message
        	);
        	$this->_logger->critical($errorCode .": ". $message);
    	}
	}
 
	/**
 	* @param $orderItems
 	*
 	* @throws LocalizedException
 	*/
	private function execute($orderItems)
	{
    	$orderCollectionArr = [];
    	/* @var \Codilar\QueueExample\Model\Queue $queue /
    	$queue = $this->_queueFactory->create();
    	$orderItems = $this->_json->unserialize($orderItems);
    	if(is_array($orderItems)){
        	foreach ($orderItems as $type => $orderId) {
            $orderCollectionArr[] = [
                	'type' => 'order',
                	'entity_id' => $orderId,
                	'priority' => 1,
            	];
        	}
        	//handle insertMulti orders into QueueExample queue
        	$queue->add($orderCollectionArr);
    	}
	}
}

You can find the full content of this class here

Executing message queue in Magento 2

For Magento to recognize our queue, run the following commands from root folder:

  • php bin/magento setup:upgrade
  • php bin/magento setup:di:compile

You can check if the queue is registered by running  ‘php bin/magento queue:consumers:list’. Registered consumers will appear in the result.

Consumers are executed by cron, declared under Magento/MessageQueue/etc/crontab.xml

Aside from running via cron, consumers can be executed with the following command:

php bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>

We are a renowned eCommerce web development company and Magento experts in India. If you find our article informative we will be happy to come up with a much more informative topic that you prefer us to cover. Comment on the topic in the comment section below.

Author



Comments

  1. On
    php bin/magento setup:upgrade I get:
    Service method specified in the definition of handler “processAddOrderToQueue” for topic “queueexample.queue.order” is not available. Given “Codilar\QueueExample\Model\Queue\Consumer::process”



Leave a Reply

Your email address will not be published. Required fields are marked *

Related Posts

Join 40,000+ Magento pros who receive eCommerce insights, tips, and best practices.

Request PWA Demo