How to create a custom message queue in Magento 2

How to create a custom message queue in Magento 2

We know that Magento 2 supports consumer and MessageQueue by default (MQF). The Message Queue Framework (MQF) is a system that allows a module to publish messages to queues. It also defines the consumers that will receive the messages asynchronously.

Example, after save the order, you have to send a notification to the customer (In this case, we temporary forget the feature asynchronous email, and forcus to the process that could asynchronously)

The following diagram illustrates the Message Queue Framework

In this post, I will show you how to create a custom consumer.

#1, you have to create a custom module. In this tutorial, I made a module named Hidro_MyMessageQueue.

#2, you have to define a file named communication.xml under etc.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<!--    request and response: could be an interface -->
    <topic name="updateproduct.topic" request="Hidro\MyMessageQueue\App\TopicDataInterface" response="boolean"/>
</config>
  • name: Define topic name.
  • request: Request type, the type of the param on the execute function.
  • response: Response type.

#3, Create the class of request TopicDataInterface and the instance of it.

  • TopicDataInterface
<?php
/**
 * Created by Hidro Le.
 * Job Title: Magento Developer
 */

namespace Hidro\MyMessageQueue\App;

/**
 *
 */
interface TopicDataInterface
{
    /**
     * @return string[]
     */
    public function getData();

    /**
     * @param string[] $data
     * @return TopicDataInterface
     */
    public function setData($data);
}
  • TopicData
<?php

/**
 * Created by Hidro Le.
 * Job Title: Magento Developer
 */

namespace Hidro\MyMessageQueue\Model;

use Hidro\MyMessageQueue\App\TopicDataInterface;
/**
 *
 */
class TopicData implements TopicDataInterface
{
    /**
     * @var
     */
    protected $data;

    /**
     * @param string[] $data
     * @return $this|TopicDataInterface
     */
    public function setData($data)
    {
        $this->data = $data;
        return $this;
    }

    /**
     * @return string[]
     */
    public function getData()
    {
        return $this->data;
    }
}
  • Define interface preference (etc/di.xml)
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    <preference for="Hidro\MyMessageQueue\App\TopicDataInterface" type="Hidro\MyMessageQueue\Model\TopicData"/>
</config>

#4, create a configuration file that is used to determine which class and function of the consumer: queue_consumer.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="updateproduct.cunsomer" queue="updateproduct.cunsomer" connection="db" maxMessages="100"
              handler="Hidro\MyMessageQueue\Model\Queue\Handler\UpdateProductHandler::execute"/>
</config>
  • name: Consumer name
  • connection: Connection type (db, amqp), this connection must match the connection attribute in the queue_topology.xml file.
  • handler: Class name and function name.

#5, Create class UpdateProductHandler.

<?php
/**
 * Created by Hidro Le.
 * Job Title: Magento Developer
 */

namespace Hidro\MyMessageQueue\Model\Queue\Handler;

use Hidro\MyMessageQueue\App\TopicDataInterface;

class UpdateProductHandler
{
    public function execute(TopicDataInterface $requestData)
    {
        //Write data to a file without define LogInterface.
        $writer = new \Zend\Log\Writer\Stream(BP . '/var/log/' . 'hidro_debug_message_queue.log');
        $logger = new \Zend\Log\Logger();
        $logger->addWriter($writer);
        $logger->debug(json_encode($requestData->getData()));
        return true;
    }
}

#6, Define consumer connection via queue_publisher.xml.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
<publisher topic="updateproduct.topic">
<connection name="db" exchange="magento-db"/>
</publisher>
</config>

#7, Define queue_topology.xml. This file defines the message routing rules and declares queues and exchanges.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="magento-db" type="topic" connection="db">
<binding id="UpdateProductBinding" topic="updateproduct.topic" destinationType="queue"
destination="updateproduct.cunsomer"/>
</exchange>
</config>

#8, Create a publisher. This publisher using PublisherInterface to publish a message to queue.

<?php

/**
 * Created by Hidro Le.
 * Job Title: Magento Developer
 */

namespace Hidro\MyMessageQueue\Publisher;

use Magento\Framework\MessageQueue\PublisherInterface;
use Hidro\MyMessageQueue\App\TopicDataInterface;
/**
 *
 */
class UpdateProduct
{
    /**
     * @var string
     */
    const TOPIC_NAME = 'updateproduct.topic';

    /**
     * @var PublisherInterface
     */
    protected $publisher;

    /**
     * @param PublisherInterface  $publisher
     */
    public function __construct(
        PublisherInterface $publisher
    )
    {
        $this->publisher = $publisher;
    }

    /**
     * @param TopicDataInterface $productData
     * @throws \InvalidArgumentException
     */
    public function publish(TopicDataInterface $productData){
        $this->publisher->publish(self::TOPIC_NAME, $productData);
    }
}

Finally, Test yours by using this code block.

$topicData = $objectManager->get(\Hidro\MyMessageQueue\App\TopicDataInterface::class);
$topicData->setData(['product' => 123]);
$orderTermsSend = $objectManager->get(\Hidro\MyMessageQueue\Publisher\UpdateProduct::class);

$abc = ($orderTermsSend->publish($topicData));

Read more about consumer, message queue:

Read more about Optimize PHP << Here MAGENTO 2 PHP OPTIMIZE TIPS

Leave a Reply

Your email address will not be published.