MQTT

How to send and receive messages via an MQTT broker.

Key takeaways

MqttMessageQueue This component provides a message queue that can send and receive messages via an MQTT message broker.

Introduction

In this tutorial, we will see how to create a message queue with MqttMessageQueue, which is a component available in the mqtt module. First, we will see the necessary pre-requisites and how to create a basic message queue. Then, we will learn how to create a message and send it to an MQTT app, such as Eclipse Mosquitto, and read messages from such an app. Finally, we will have a summary of all the learned concepts.

The MqttMessageQueue component

This component provides a message queue that can send and receive messages via an MQTT message broker such as Mosquitto. It contains a set of methods used to manage the communication between the queue and the message broker. The main ones are explained in the sections below.

Pre-requisites

In order to use this component, we must first import it with the following command:

import { MqttMessageQueue } from "pip-services3-mqtt-nodex";
using PipServices3.Mqtt.Queues;
import (
    mqttqueue "github.com/pip-services3-gox/pip-services3-mqtt-gox/queues"
)
import 'package:pip_services3_mqtt/pip_services3_mqtt.dart';
from pip_services3_mqtt.queues import MqttMessageQueue
Not available

Implementing our component

Once we have imported our component, we can create an instance of it and configure our queue. There are two ways to define the topic: we can either define it in the constructor when we create the queue

let queue = new MqttMessageQueue("mytopic");
var queue = new MqttMessageQueue("mytopic");
queue := mqttqueue.NewMqttMessageQueue("mytopic")
var queue = MqttMessageQueue('mytopic');
queue = MqttMessageQueue("mytopic")
Not available

Or, we can create our object as

let queue = new MqttMessageQueue();
var queue = new MqttMessageQueue();
queue := mqttqueue.NewMqttMessageQueue("")
var queue = new MqttMessageQueue();
queue = MqttMessageQueue()
Not available

and then, define the topic as a configuration parameter

import { ConfigParams } from "pip-services3-commons-nodex";

queue.configure(ConfigParams.fromTuples(
    "topic", "mytopic",                 // set topic
    "connection.protocol", "mqtt",
    "connection.host", "localhost",
    "connection.port", 1883,
    "options.qos", "1",                 // sets the qos level to "At least once"
    "options.autosubscribe", true,      // autosubscription on the topic
    "options.serialize_envelope", true  // converts object into mosquitto values 
));

using PipServices3.Commons.Config;

queue.Configure(ConfigParams.FromTuples(
    "topic", "mytopic",                 // set topic
    "connection.protocol", "mqtt",
    "connection.host", "localhost",
    "connection.port", 1883,
    "options.qos", "1",                 // sets the qos level to "At least once"
    "options.autosubscribe", true,      // autosubscription on the topic
    "options.serialize_envelope", true  // converts object into mosquitto values 
));

import (
    conf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
)

queue.Configure(context.Background(), context.Background(), conf.NewConfigParamsFromTuples(
	"topic", "mytopic",                 // set topic
	"connection.protocol", "mqtt",
	"connection.host", "localhost",
	"connection.port", 1883,
	"options.qos", "1",                 // sets the qos level to "At least once"
	"options.autosubscribe", true,		// autosubscription on the topic
	"options.serialize_envelope", true, // converts object into mosquitto values 
)); 

import 'package:pip_services3_commons/pip_services3_commons.dart';

queue.configure(ConfigParams.fromTuples([
  'topic', 'mytopic', // set topic
  'connection.protocol', 'mqtt', 
  'connection.host', 'localhost',
  'connection.port', 1883,
  'options.qos', '1', // sets the qos level to "At least once"
  'options.autosubscribe', true, // autosubscription on the topic
  'options.serialize_envelope', true // converts object into mosquitto values
]));

from pip_services3_commons.config import ConfigParams

queue.configure(ConfigParams.from_tuples(
    "topic", "mytopic",                 # set topic
    'connection.protocol', 'mqtt',
    "connection.host", "localhost",
    "connection.port", 1883,
    'options.qos', '1',                 # sets the qos level to 'At least once'
    'options.autosubscribe', True,      # autosubscription on the topic
    'options.serialize_envelope', True  # converts object into mosquitto values 
))
Not available

Connecting to our app

Now that our component has been defined, we can connect it to our MQTT app through the open()/openAsync() method, which takes the correlationId as its input parameter. The following example illustrates how to use it.

await queue.open(null);
await queue.OpenAsync("123");
queue.Open(context.Background(), "123")
await queue.open(null);
queue.open("123")
Not available

Once the queue has been used, we can close it to free resources with the close()/closeAsync() method, which takes the correlationId as its input parameter. The example below shows its usage.

await queue.close(null);
await queue.CloseAsync("123");
queue.Close(context.Background(), "123")
await queue.close(null);
queue.close(None)
Not available

Creating and sending a message

After our connection is ready, we can create a message and send it to our app. To create a message, we use the MessageEnvelope component from the messaging module. This class takes the correlationId, the message type, and the message as input parameters.

Once we have our message envelope, we can use the send()/sendAsync() to send a message to our app. This method has the correlationId and the message envelope as input parameters. The example below shows how to send a message of type “mymessage” containing the text “ABC123”.

import { MessageEnvelope } from "pip-services3-messaging-nodex";

await queue.send(null, new MessageEnvelope(null, "mymessage", "ABC123"));
using PipServices3.Messaging.Queues;

await queue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC123"));
import (
	msgqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)

queue.Send(context.Background(), "123", msgqueues.NewMessageEnvelope("", "mymessage", []byte("ABC123")))
import 'package:pip_services3_messaging/pip_services3_messaging.dart';

await queue.send(null, MessageEnvelope(null, 'mymessage', 'ABC123'));
from pip_services3_messaging.queues import MessageEnvelope

queue.send("123", MessageEnvelope(None, "mymessage", "ABC123"))
Not available

Receiving a message

This component contains several methods that can be used to read a message from an MQTT app. They are explained in the following sections.

receive()

This method receives an incoming message and removes it from the queue. It takes the correlationId and a waiting timeout in milliseconds as input parameters. It returns a message envelope containing the received message.

let message = await queue.receive(null, 10000);
var message = await queue.ReceiveAsync("123", 10000);
message, err := queue.Receive(context.Background(), "123", 10000)
var message = await queue.receive(null, 10000);
message = queue.receive("123", 10000)
Not available

Now, to obtain the text of the received message, we can use the getMessageAsString() method. The following example shows how to do this.

let msgText = message.getMessageAsString(); // Returns 'ABC123'
var msgText = message.GetMessageAsString(); // Returns 'ABC123'
msgText := message.GetMessageAsString() // Returns 'ABC123'
var msgText = message?.getMessageAsString(); // Returns 'ABC123'
message_text = message.get_message_as_string() # Returns 'ABC123'
Not available
peek()

This method gets a single incoming message from the queue without removing it. If there are no messages available in the queue, it returns null. The received message is contained in a message envelope. The following example illustrates its usage.

let receivedPeek = await queue.peek(null);
var receivedPeek = await queue.PeekAsync(null);
receivedPeek, err := queue.Peek(context.Background(), "123")
var receivedPeek = await queue.peek(null);
receivedPeek = queue.peek(None)
Not available

Where

let msgText = receivedPeek.getMessageAsString(); // Returns 'ABCD123'
var msgText = receivedPeek.GetMessageAsString(); // Returns 'ABCD123'
msgText := receivedPeek.GetMessageAsString(); // Returns 'ABCD123'
var msgText = receivedPeek?.getMessageAsString() // Returns 'ABCD123'
message_text = receivedPeek.get_message_as_string () # Returns 'ABCD123'
Not available
peekBatch()

This method peeks multiple incoming messages from the queue in the form of a list and without removing them. If there are no messages available in the queue, it returns an empty list. As input parameters, it takes the correlationId and the maximum number of messages to peek. The following example peeks up to three messages. As we had sent only one message, it retrieves it and then stores its content in a message envelope contained in a list.

let receivedPeekBatch = await queue.peekBatch(null, 3);
var receivedPeekBatch = await queue.PeekBatchAsync(null, 3);
receivedPeekBatch, err := queue.PeekBatch(context.Background(), "123", 3)
var receivedPeekBatch = await queue.peekBatch(null, 3);
receivedPeekBatch = queue.peek_batch(None, 3)
Not available

Where

let msgText = receivedPeekBatch[0].getMessageAsString(); // Returns 'ABC123'
var msgText = receivedPeekBatch[0].GetMessageAsString(); // Returns 'ABC123'
msgText := receivedPeekBatch[0].GetMessageAsString() // Returns 'ABC123'
var msgText = receivedPeekBatch[0].getMessageAsString(); // Returns 'ABC123'
message_text = receivedPeekBatch[0].get_message_as_string () # Returns 'ABC123'
Not available

Auxiliary methods

Additionally, this class has some auxiliary methods.

readMessageCount()

This method allows us to obtain the number of messages in a queue that need to be delivered.

let res = await queue.readMessageCount();  // Returns 1
var res = await queue.ReadMessageCountAsync();  // Returns 1
res, err := queue.ReadMessageCount() // Returns 1
var res = await queue.readMessageCount(); // Returns 1
result = queue.read_message_count()  # Returns 1
Not available

Other configuration parameters

qos

MQTT accepts three different levels, which are:

  • At most once (0)
  • At least once (1)
  • Exactly once (2)

Based on the above, we can specify our preferred level via the _qos field in our configuration by stating its integer value (See the example in the next section).

autoыubscribe

This is a Boolean field. By declaring it as true, our component will automatically subscribe to the specified topic. This field can be specified in our configuration.

The following example shows how to specify the above-mentioned fields.

queue.configure(ConfigParams.fromTuples(
    "topic", "mytopic",                 // set topic
    "connection.protocol", "mqtt",
    "connection.host", "localhost",
    "connection.port", 1883,
    "options.autosubscribe", true,      // autosubscription on the topic
    "options.serialize_envelope", true  // converts object into musquitto values 
));

queue.Configure(ConfigParams.FromTuples(
    "topic", "mytopic",                 // set topic
    "connection.protocol", "mqtt",
    "connection.host", "localhost",
    "connection.port", 1883,
    "options.autosubscribe", true,      // autosubscription on the topic
    "options.serialize_envelope", true  // converts object into musquitto values 
));

queue.Configure(context.Background(), conf.NewConfigParamsFromTuples(
	"topic", "mytopic",                 // set topic
	"connection.protocol", "mqtt",
	"connection.host", "localhost",
	"connection.port", 1883,
	"options.autosubscribe", true,     	// autosubscription on the topic
	"options.serialize_envelope", true, // converts object into musquitto values 
))

queue.configure(ConfigParams.fromTuples([
  'topic', 'mytopic', // set topic
  'connection.protocol', 'mqtt',
  'connection.host', 'localhost',
  'connection.port', 1883,
  'options.autosubscribe', true, // autosubscription on the topic
  'options.serialize_envelope', true // converts object into musquitto values
]));

queue.configure(ConfigParams.from_tuples(
     "topic", "mytopic", # set topic
    'connection.protocol', 'mqtt',
    "connection.host", "localhost",
    "connection.port", 1883,
    'options.autosubscribe', True, # autosubscription on the topic
    'options.serialize_envelope', True # converts object into musquitto values 
))
Not available

Example

We can now put together a simple example that shows how to use the learned methods. In it, we will first create a message queue and connect it to Mosquitto, and send a message, receive it and print its content. Finally, we will close our connection to free the used resources. The code is:

// Pre-requisites
import { ConfigParams } from "pip-services3-commons-nodex";
import { MessageEnvelope } from "pip-services3-messaging-nodex";
import { MqttMessageQueue } from "pip-services3-mqtt-nodex";

export async function main() { 
    // Component creation and configuration
    var queue = new MqttMessageQueue();

    queue.configure(ConfigParams.fromTuples(
        "topic", "mytopic",                 // set topic
        "connection.protocol", "mqtt",
        "connection.host", "localhost",
        "connection.port", 1883,
        "options.autosubscribe", true,      // autosubscription on the topic
        "options.serialize_envelope", true  // converts object into musquitto values 
    ));

    // Connection
    await queue.open("123");

    // Send a message
    await queue.send("123", new MessageEnvelope(null, "mymessage", "ABC1234"));

    // Receive a message
    var message = await queue.receive("123", 10000);
    console.log(message.getMessageAsString()); // Prints 'ABC1234'

    // Close the connection
    await queue.close("123");
}
// Pre-requisites
using System;
using System.Threading.Tasks;

using PipServices3.Commons.Config;
using PipServices3.Messaging.Queues;
using PipServices3.Mqtt.Queues;

// Component creation and configuration
var queue = new MqttMessageQueue();

queue.Configure(ConfigParams.FromTuples(
    "topic", "mytopic",                 // set topic
    "connection.protocol", "mqtt",
    "connection.host", "localhost",
    "connection.port", 1883,
    "options.autosubscribe", true,      // autosubscription on the topic
    "options.serialize_envelope", true  // converts object into musquitto values 
));

// Connection
await queue.OpenAsync("123");

// Send a message
await queue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC1234"));

// Receive a message
var message = await queue.ReceiveAsync("123", 10000);
Console.WriteLine(message.GetMessageAsString()); // Prints 'ABC1234'

// Close the connection
await queue.CloseAsync("123");

import (
	"fmt"

	conf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
	msgqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
	mqttqueue "github.com/pip-services3-gox/pip-services3-mqtt-gox/queues"
)

func main() {
	// Component creation and configuration
	queue := mqttqueue.NewMqttMessageQueue("")

	queue.Configure(context.Background(), conf.NewConfigParamsFromTuples(
		"topic", "mytopic", // set topic
		"connection.protocol", "mqtt",
		"connection.host", "localhost",
		"connection.port", 1883,
		"options.autosubscribe", true, // autosubscription on the topic
		"options.serialize_envelope", true, // converts object into musquitto values
	))

	// Connection
	queue.Open(context.Background(), "123")

	// Send a message
	queue.Send(context.Background(), "123", msgqueues.NewMessageEnvelope("", "mymessage", []byte("ABC1234")))

	// Receive a message
	message, _ := queue.Receive(context.Background(), "123", 10000)
	fmt.Println(message.GetMessageAsString())

	// Close the connection
	queue.Close(context.Background(), "123")
}

// Pre-requisites
import 'package:pip_services3_commons/pip_services3_commons.dart';
import 'package:pip_services3_messaging/pip_services3_messaging.dart';
import 'package:pip_services3_mqtt/pip_services3_mqtt.dart';

void main(List<String> arguments) async {
  // Component creation and configuration
  var queue = MqttMessageQueue();

  queue.configure(ConfigParams.fromTuples([
    'topic', 'mytopic', // set topic
    'connection.protocol', 'mqtt',
    'connection.host', 'localhost',
    'connection.port', 1883,
    'options.autosubscribe', true, // autosubscription on the topic
    'options.serialize_envelope', true // converts object into musquitto values
  ]));

  // Connection
  await queue.open('123');

  // Send a message
  await queue.send('123', MessageEnvelope(null, 'mymessage', 'ABC1234'));

  // Receive a message
  var message = await queue.receive('123', 10000);
  print(message?.getMessageAsString()); // Prints 'ABC1234'

  // Close the connection
  await queue.close('123');
}

# Pre-requisites
from pip_services3_mqtt.queues import MqttMessageQueue
from pip_services3_commons.config import ConfigParams
from pip_services3_messaging.queues import MessageEnvelope

# Component creation and configuration
queue = MqttMessageQueue() 

queue.configure(ConfigParams.from_tuples(
     "topic", "mytopic", # set topic
    'connection.protocol', 'mqtt',
    "connection.host", "localhost",
    "connection.port", 1883,
    'options.autosubscribe', True, # autosubscription on the topic
    'options.serialize_envelope', True # converts object into musquitto values 
))

# Connection
queue.open("123")

# Send a message
queue.send("123", MessageEnvelope(None, "mymessage", "ABC1234"))

# Receive a message
message = queue.receive("123", 10000)
print(message.get_message_as_string ()) # Prints 'ABC1234'

# Close the connection
queue.close(None)
Not available

Wrapping up

In this tutorial, we have learned how to create and manage a message queue to communicate with an MQTT-based app using the MqttMessageQueue component.

We saw that this class can define the topic when an instance of it is created via the constructor. Alternatively, we can define it as a configuration parameter via the configure() method, which is used to set the values of the component’s parameters. Then, we learned several methods used to send and receive messages.

We also understood how to obtain the number of messages in the queue and how to set up the quality of service level. Finally, we summarized the usage of the main methods with a comprehensive example.