NATS
Key takeaways
NatsMessageQueue | Component used to send and receive messages via a NATS server. |
configure() | Method used to set the values of configuration parameters. |
send() | Method used to send messages to a NATS server. |
receive() | Method used to receive messages from a NATS server. |
Introduction
This tutorial will help you understand how the NatsMessageQueue component is used. First, we will see a brief description of this class and how to import it. Then, we will implement this component, connect it, and create and send a message to a NATS server from it. Finally, we will combine the code into a complete program and summarize what we have learned.
The NatsMessageQueue component
This component can be used to send and receive messages via a NATS server. In order to show how to use it, in the following sections, we will see in a stepwise manner how to create a custom component that sends messages to a NATS server located on our machine.
Pre-requisites
In order to use this component, we need to import it first. The following code shows how to do this:
import { NatsMessageQueue } from 'pip-services3-nats-nodex';
using PipServices3.Nats.Queues;
import nqueues "github.com/pip-services3-gox/pip-services3-nats-gox/queues"
Implementing our component
Once imported, we can create an instance of this class and configure it. In the example below, we consider a NATS server on our machine and port 4222. In addition, we declare autosubscribe as true, which, as the name suggests, creates a subscription to our topic automatically.
let queue = new NatsMessageQueue();
queue.configure(ConfigParams.fromTuples(
"topic", "mytopic",
"connection.protocol", "nats",
"connection.host", "localhost",
"connection.port", 4222,
"options.autosubscribe", true
));
var queue = new NatsMessageQueue();
queue.Configure(ConfigParams.FromTuples(
"topic", "mytopic",
"connection.protocol", "nats",
"connection.host", "localhost",
"connection.port", 4222,
"options.autosubscribe", true
));
queue := nqueues.NewNatsMessageQueue("mytopic")
queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
// "topic", "mytopic",
"connection.protocol", "nats",
"connection.host", "localhost",
"connection.port", 4222,
"options.autosubscribe", true,
))
Connecting to NATS
Now that we have defined our component, we connect to the NATS server via the open() method, which takes the correlation_id as an input parameter. The following code shows how to perform this operation:
await queue.open("123");
await queue.OpenAsync(null);
err := queue.Open(context.Background(), "123")
Creating and sending a message
Once connected, we send a message to our NATS server via the send() method. This method accepts a correlation_id and a MessageEnvelope object as inputs. And, this last object requires a correlation_id, a message type, and the data being sent as inputs. The code below exemplifies how to perform this task:
await queue.send("123", new MessageEnvelope("123", "mymessage", "ABC"));
await queue.SendAsync("123", new MessageEnvelope("123", "mymessage", "ABC"));
err = queue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
if err != nil {
panic(err)
}
Receiving a message
We can receive messages from our subscribed topic via the receive() method, which accepts a correlation_id and a timeout in milliseconds as input parameters. The code below gives an example of its usage:
let message = await queue.receive("123", 10000);
var message = await queue.ReceiveAsync("123", 10000);
message, err := queue.Receive(context.Background(), "123", 10000)
Closing the connection
After the task is concluded, we close the connection to free resources. The following command shows how to do this:
await queue.close("123")
await queue.CloseAsync("123");
err = queue.Close(context.Background(), "123")
Final code
Now, we can combine the code from the previous sections into a program that creates an instance of this class, configures and connects it to our NATS server, and sends and receives a message from it. The code below shows the result of this operation:
// Pre-requisites
import { ConfigParams } from 'pip-services3-commons-nodex';
import { NatsMessageQueue } from 'pip-services3-nats-nodex';
import { MessageEnvelope } from 'pip-services3-messaging-nodex';
export async function main() {
// Create and configure a component
let queue = new NatsMessageQueue();
queue.configure(ConfigParams.fromTuples(
"topic", "mytopic",
"connection.protocol", "nats",
"connection.host", "localhost",
"connection.port", 4222,
'options.autosubscribe', true
));
// Connect
await queue.open("123");
// Send a message
await queue.send("123", new MessageEnvelope("123", "mymessage", "ABC"));
// Receive a message
let message = await queue.receive("123", 10000);
console.log('my message is: ' + message)
// Close the connection
await queue.close("123")
console.log('Program executed');
}
// Pre-requisites
using PipServices3.Commons.Config;
using PipServices3.Nats.Queues;
using PipServices3.Messaging.Queues;
namespace ExampleApp
{
class Program
{
/// <summary>
/// Running the container
/// </summary>
/// <param name="args"></param>
static void Main(string[] args)
{
// Create and configure a component
var queue = new NatsMessageQueue();
queue.Configure(ConfigParams.FromTuples(
"topic", "mytopic",
"connection.protocol", "nats",
"connection.host", "localhost",
"connection.port", 4222,
"options.autosubscribe", true
));
// Connect
queue.OpenAsync(null).Wait();
// Send a message
queue.SendAsync("123", new MessageEnvelope("123", "mymessage", "ABC")).Wait();
// Receive a message
var message = queue.ReceiveAsync("123", 10000).Result;
Console.WriteLine("my message is: " + message.GetMessageAsString());
// Close the connection
queue.CloseAsync("123").Wait();
Console.WriteLine("Program executed");
}
}
}
// Pre-requisites
import (
"context"
"fmt"
cconf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
nqueues "github.com/pip-services3-gox/pip-services3-nats-gox/queues"
)
func main() {
// Create and configure a component
queue := nqueues.NewNatsMessageQueue("mytopic")
queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
// "topic", "mytopic",
"connection.protocol", "nats",
"connection.host", "localhost",
"connection.port", 4222,
"options.autosubscribe", true,
))
// Connect
err := queue.Open(context.Background(), "123")
if err != nil {
panic(err)
}
// Send a message
err = queue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
if err != nil {
panic(err)
}
// Receive a message
message, err := queue.Receive(context.Background(), "123", 10000)
if err != nil {
panic(err)
}
fmt.Printf("my message is: %s", message)
// Close the connection
err = queue.Close(context.Background(), "123")
if err != nil {
panic(err)
}
fmt.Println("Program executed")
}
Which, after running, will produce the following output
Thus, proving that the message was correctly received by the NATS server.
Wrapping up
In this tutorial, we have learned how to use the NatsMessageQueue component. First, we saw the necessary pre-requisites and how to create a configured instance of it. Then, we understood how to send and receive messages from our NATS server. And lastly, we combined the different sections into one comprehensive program.