MQTT
Key takeaways
MqttMessageQueue | This component provides a message queue that can send and receive messages via an MQTT message broker. |
Introduction
In this tutorial, we will see how to create a message queue with MqttMessageQueue, which is a component available in the mqtt module. First, we will see the necessary pre-requisites and how to create a basic message queue. Then, we will learn how to create a message and send it to an MQTT app, such as Eclipse Mosquitto, and read messages from such an app. Finally, we will have a summary of all the learned concepts.
The MqttMessageQueue component
This component provides a message queue that can send and receive messages via an MQTT message broker such as Mosquitto. It contains a set of methods used to manage the communication between the queue and the message broker. The main ones are explained in the sections below.
Pre-requisites
In order to use this component, we must first import it with the following command:
import { MqttMessageQueue } from "pip-services4-mqtt-node";
import (
mqttqueue "github.com/pip-services4/pip-services4-go/pip-services4-mqtt-go/queues"
)
from pip_services4_mqtt.queues import MqttMessageQueue
Implementing our component
Once we have imported our component, we can create an instance of it and configure our queue. There are two ways to define the topic: we can either define it in the constructor when we create the queue
let queue = new MqttMessageQueue("mytopic");
queue := mqttqueue.NewMqttMessageQueue("mytopic")
queue = MqttMessageQueue("mytopic")
Or, we can create our object as
let queue = new MqttMessageQueue();
queue := mqttqueue.NewMqttMessageQueue("")
queue = MqttMessageQueue()
and then, define the topic as a configuration parameter
import { ConfigParams } from "pip-services4-components-node";
queue.configure(ConfigParams.fromTuples(
"topic", "mytopic", // set topic
"connection.protocol", "mqtt",
"connection.host", "localhost",
"connection.port", 1883,
"options.qos", "1", // sets the qos level to "At least once"
"options.autosubscribe", true, // autosubscription on the topic
"options.serialize_envelope", true // converts object into mosquitto values
));
import conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
queue := mqttqueue.NewMqttMessageQueue("")
queue.Configure(context.Background(), conf.NewConfigParamsFromTuples(
"topic", "mytopic", // set topic
"connection.protocol", "mqtt",
"connection.host", "localhost",
"connection.port", 1883,
"options.qos", "1", // sets the qos level to "At least once"
"options.autosubscribe", true, // autosubscription on the topic
"options.serialize_envelope", true, // converts object into mosquitto values
))
from pip_services4_components.config import ConfigParams
queue.configure(ConfigParams.from_tuples(
"topic", "mytopic", # set topic
'connection.protocol', 'mqtt',
"connection.host", "localhost",
"connection.port", 1883,
'options.qos', '1', # sets the qos level to 'At least once'
'options.autosubscribe', True, # autosubscription on the topic
'options.serialize_envelope', True # converts object into mosquitto values
))
Connecting to our app
Now that our component has been defined, we can connect it to our MQTT app through the open()/openAsync() method, which takes the context as its input parameter. The following example illustrates how to use it.
await queue.open(null);
queue.Open(context.Background())
queue.open(ctx)
Once the queue has been used, we can close it to free resources with the close()/closeAsync() method, which takes the context as its input parameter. The example below shows its usage.
await queue.close(null);
queue.Close(context.Background())
queue.close(None)
Creating and sending a message
After our connection is ready, we can create a message and send it to our app. To create a message, we use the MessageEnvelope component from the messaging module. This class takes the context, the message type, and the message as input parameters.
Once we have our message envelope, we can use the send()/sendAsync() to send a message to our app. This method has the context and the message envelope as input parameters. The example below shows how to send a message of type “mymessage” containing the text “ABC123”.
import { MessageEnvelope } from "pip-services4-messaging-node";
await queue.send(null, new MessageEnvelope(null, "mymessage", "ABC123"));
import msgqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
queue.Send(context.Background(), msgqueues.NewMessageEnvelope("", "mymessage", []byte("ABC123")))
from pip_services4_messaging.queues import MessageEnvelope
queue.send(ctx, MessageEnvelope(None, "mymessage", "ABC123"))
Receiving a message
This component contains several methods that can be used to read a message from an MQTT app. They are explained in the following sections.
receive()
This method receives an incoming message and removes it from the queue. It takes the context and a waiting timeout in milliseconds as input parameters. It returns a message envelope containing the received message.
let message = await queue.receive(null, 10000);
message, err := queue.Receive(context.Background(), 10000)
message = queue.receive(ctx, 10000)
Now, to obtain the text of the received message, we can use the getMessageAsString() method. The following example shows how to do this.
let msgText = message.getMessageAsString(); // Returns 'ABC123'
msgText := message.GetMessageAsString() // Returns 'ABC123'
message_text = message.get_message_as_string() # Returns 'ABC123'
peek()
This method gets a single incoming message from the queue without removing it. If there are no messages available in the queue, it returns null. The received message is contained in a message envelope. The following example illustrates its usage.
let receivedPeek = await queue.peek(null);
receivedPeek, err := queue.Peek(context.Background())
receivedPeek = queue.peek(None)
Where
let msgText = receivedPeek.getMessageAsString(); // Returns 'ABCD123'
msgText := receivedPeek.GetMessageAsString(); // Returns 'ABCD123'
message_text = receivedPeek.get_message_as_string () # Returns 'ABCD123'
peekBatch()
This method peeks multiple incoming messages from the queue in the form of a list and without removing them. If there are no messages available in the queue, it returns an empty list. As input parameters, it takes the context and the maximum number of messages to peek. The following example peeks up to three messages. As we had sent only one message, it retrieves it and then stores its content in a message envelope contained in a list.
let receivedPeekBatch = await queue.peekBatch(null, 3);
receivedPeekBatch, err := queue.PeekBatch(context.Background(), 3)
receivedPeekBatch = queue.peek_batch(None, 3)
Where
let msgText = receivedPeekBatch[0].getMessageAsString(); // Returns 'ABC123'
msgText := receivedPeekBatch[0].GetMessageAsString() // Returns 'ABC123'
message_text = receivedPeekBatch[0].get_message_as_string () # Returns 'ABC123'
Auxiliary methods
Additionally, this class has some auxiliary methods.
readMessageCount()
This method allows us to obtain the number of messages in a queue that need to be delivered.
let res = await queue.readMessageCount(); // Returns 1
res, err := queue.ReadMessageCount() // Returns 1
result = queue.read_message_count() # Returns 1
Other configuration parameters
qos
MQTT accepts three different levels, which are:
- At most once (0)
- At least once (1)
- Exactly once (2)
Based on the above, we can specify our preferred level via the _qos field in our configuration by stating its integer value (See the example in the next section).
autoыubscribe
This is a Boolean field. By declaring it as true, our component will automatically subscribe to the specified topic. This field can be specified in our configuration.
The following example shows how to specify the above-mentioned fields.
queue.configure(ConfigParams.fromTuples(
"topic", "mytopic", // set topic
"connection.protocol", "mqtt",
"connection.host", "localhost",
"connection.port", 1883,
"options.autosubscribe", true, // autosubscription on the topic
"options.serialize_envelope", true // converts object into musquitto values
));
queue.Configure(context.Background(), conf.NewConfigParamsFromTuples(
"topic", "mytopic", // set topic
"connection.protocol", "mqtt",
"connection.host", "localhost",
"connection.port", 1883,
"options.autosubscribe", true, // autosubscription on the topic
"options.serialize_envelope", true, // converts object into musquitto values
))
queue.configure(ConfigParams.from_tuples(
"topic", "mytopic", # set topic
'connection.protocol', 'mqtt',
"connection.host", "localhost",
"connection.port", 1883,
'options.autosubscribe', True, # autosubscription on the topic
'options.serialize_envelope', True # converts object into musquitto values
))
Example
We can now put together a simple example that shows how to use the learned methods. In it, we will first create a message queue and connect it to Mosquitto, and send a message, receive it and print its content. Finally, we will close our connection to free the used resources. The code is:
import { ConfigParams } from "pip-services4-components-node";
import { MqttMessageQueue } from "pip-services4-mqtt-node";
import { MessageEnvelope } from "pip-services4-messaging-node";
export async function main() {
// Component creation and configuration
var queue = new MqttMessageQueue();
queue.configure(ConfigParams.fromTuples(
"topic", "mytopic", // set topic
"connection.protocol", "mqtt",
"connection.host", "localhost",
"connection.port", 1883,
"options.autosubscribe", true, // autosubscription on the topic
"options.serialize_envelope", true // converts object into musquitto values
));
// Connection
await queue.open(ctx);
// Send a message
await queue.send(ctx, new MessageEnvelope(null, "mymessage", "ABC1234"));
// Receive a message
var message = await queue.receive(ctx, 10000);
console.log(message.getMessageAsString()); // Prints 'ABC1234'
// Close the connection
await queue.close(ctx);
}
import (
"context"
"fmt"
conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
msgqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
mqttqueue "github.com/pip-services4/pip-services4-go/pip-services4-mqtt-go/queues"
)
func main() {
// Component creation and configuration
queue := mqttqueue.NewMqttMessageQueue("")
queue.Configure(context.Background(), conf.NewConfigParamsFromTuples(
"topic", "mytopic", // set topic
"connection.protocol", "mqtt",
"connection.host", "localhost",
"connection.port", 1883,
"options.autosubscribe", true, // autosubscription on the topic
"options.serialize_envelope", true, // converts object into musquitto values
))
// Connection
queue.Open(context.Background())
// Send a message
queue.Send(context.Background(), msgqueues.NewMessageEnvelope("", "mymessage", []byte("ABC1234")))
// Receive a message
message, _ := queue.Receive(context.Background(), 10000)
fmt.Println(message.GetMessageAsString())
// Close the connection
queue.Close(context.Background())
}
# Pre-requisites
from pip_services4_mqtt.queues import MqttMessageQueue
from pip_services4_components.config import ConfigParams
from pip_services4_messaging.queues import MessageEnvelope
# Component creation and configuration
queue = MqttMessageQueue()
queue.configure(ConfigParams.from_tuples(
"topic", "mytopic", # set topic
'connection.protocol', 'mqtt',
"connection.host", "localhost",
"connection.port", 1883,
'options.autosubscribe', True, # autosubscription on the topic
'options.serialize_envelope', True # converts object into musquitto values
))
# Connection
queue.open(ctx)
# Send a message
queue.send(ctx, MessageEnvelope(None, "mymessage", "ABC1234"))
# Receive a message
message = queue.receive(ctx, 10000)
print(message.get_message_as_string ()) # Prints 'ABC1234'
# Close the connection
queue.close(None)
Wrapping up
In this tutorial, we have learned how to create and manage a message queue to communicate with an MQTT-based app using the MqttMessageQueue component.
We saw that this class can define the topic when an instance of it is created via the constructor. Alternatively, we can define it as a configuration parameter via the configure() method, which is used to set the values of the component’s parameters. Then, we learned several methods used to send and receive messages.
We also understood how to obtain the number of messages in the queue and how to set up the quality of service level. Finally, we summarized the usage of the main methods with a comprehensive example.