How to create a custom message queue in Magento 2

We know that Magento 2 supports consumer and MessageQueue by default (MQF). The Message Queue Framework (MQF) is a system that allows a module to publish messages to queues. It also defines the consumers that will receive the messages asynchronously.

Example, after save the order, you have to send a notification to the customer (In this case, we temporary forget the feature asynchronous email, and forcus to the process that could asynchronously)

The following diagram illustrates the Message Queue Framework

In this post, I will show you how to create a custom consumer.

#1, you have to create a custom module. In this tutorial, I made a module named


#2, you have to define a file named communication.xml under etc.

<?xml version="1.0"?> <config xmlns:xsi="" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd"> <!-- request and response: could be an interface --> <topic name="updateproduct.topic" request="Hidro\MyMessageQueue\App\TopicDataInterface" response="boolean"/> </config>
Code language: YAML (yaml)
  • name: Define topic name.
  • request: Request type, the type of the param on the execute function.
  • response: Response type.

#3, Create the class of requestTopicDataInterface and the instance of it.

  • TopicDataInterface
<?php /** * Created by Hidro Le. * Job Title: Magento Developer */ namespace Hidro\MyMessageQueue\App; /** * */ interface TopicDataInterface { /** * @return string[] */ public function getData(); /** * @param string[] $data * @return TopicDataInterface */ public function setData($data); }
Code language: PHP (php)
  • TopicData
<?php /** * Created by Hidro Le. * Job Title: Magento Developer */ namespace Hidro\MyMessageQueue\Model; use Hidro\MyMessageQueue\App\TopicDataInterface; /** * */ class TopicData implements TopicDataInterface { /** * @var */ protected $data; /** * @param string[] $data * @return $this|TopicDataInterface */ public function setData($data) { $this->data = $data; return $this; } /** * @return string[] */ public function getData() { return $this->data; } }
Code language: PHP (php)
  • Define interface preference (etc/di.xml)
<?xml version="1.0"?> <config xmlns:xsi="" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd"> <preference for="Hidro\MyMessageQueue\App\TopicDataInterface" type="Hidro\MyMessageQueue\Model\TopicData"/> </config>
Code language: HTML, XML (xml)

#4, create a configuration file that is used to determine which class and function of the consumer:

  • queue_consumer.xml
<?xml version="1.0"?> <config xmlns:xsi="" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd"> <consumer name="updateproduct.consumer" queue="updateproduct.consumer" connection="db" maxMessages="100" handler="Hidro\MyMessageQueue\Model\Queue\Handler\UpdateProductHandler::execute"/> </config>
Code language: HTML, XML (xml)
  • name: Consumer name
  • connection: Connection type (db, amqp), this connection must match the connection attribute in the queue_topology.xml file.
  • handler: Class name and function name.

#5, Create class UpdateProductHandler.

<?php /** * Created by Hidro Le. * Job Title: Magento Developer */ namespace Hidro\MyMessageQueue\Model\Queue\Handler; use Hidro\MyMessageQueue\App\TopicDataInterface; class UpdateProductHandler { public function execute(TopicDataInterface $requestData) { //Write data to a file without define LogInterface. $writer = new \Zend\Log\Writer\Stream(BP . '/var/log/' . 'hidro_debug_message_queue.log'); $logger = new \Zend\Log\Logger(); $logger->addWriter($writer); $logger->debug(json_encode($requestData->getData())); return true; } }
Code language: PHP (php)

#6, Define consumer connection via queue_publisher.xml.

<?xml version="1.0"?> <config xmlns:xsi="" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd"> <publisher topic="updateproduct.topic"> <connection name="db" exchange="updateproduct-queue-exchange"/> </publisher> </config>
Code language: HTML, XML (xml)

#7, Define queue_topology.xml. This file defines the message routing rules and declares queues and exchanges.

<?xml version="1.0"?> <config xmlns:xsi="" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd"> <exchange name="updateproduct-queue-exchange" type="topic" connection="db"> <binding id="updateProductBinding" topic="updateproduct.topic" destinationType="queue" destination="updateproduct.consumer"/> </exchange> </config>
Code language: HTML, XML (xml)

#8, Create a publisher. This publisher usingPublisherInterface to publish a message to queue.

<?php /** * Created by Hidro Le. * Job Title: Magento Developer */ namespace Hidro\MyMessageQueue\Publisher; use Magento\Framework\MessageQueue\PublisherInterface; use Hidro\MyMessageQueue\App\TopicDataInterface; /** * */ class UpdateProduct { /** * @var string */ const TOPIC_NAME = 'updateproduct.topic'; /** * @var PublisherInterface */ protected $publisher; /** * @param PublisherInterface $publisher */ public function __construct( PublisherInterface $publisher ) { $this->publisher = $publisher; } /** * @param TopicDataInterface $productData * @throws \InvalidArgumentException */ public function publish(TopicDataInterface $productData){ $this->publisher->publish(self::TOPIC_NAME, $productData); } }
Code language: PHP (php)

Finally, Test yours by using this code block.

$topicData = $objectManager->get(\Hidro\MyMessageQueue\App\TopicDataInterface::class); $topicData->setData(['product' => 123]); $orderTermsSend = $objectManager->get(\Hidro\MyMessageQueue\Publisher\UpdateProduct::class); $abc = ($orderTermsSend->publish($topicData));
Code language: PHP (php)

Read more about consumer, message queue:

Read more about Optimize PHP << Here MAGENTO 2 PHP OPTIMIZE TIPS

Leave a Comment