NATS

How to send and receive messages via a NATS server.

Key takeaways

NatsMessageQueue Component used to send and receive messages via a NATS server.
configure() Method used to set the values of configuration parameters.
send() Method used to send messages to a NATS server.
receive() Method used to receive messages from a NATS server.

Introduction

This tutorial will help you understand how the NatsMessageQueue component is used. First, we will see a brief description of this class and how to import it. Then, we will implement this component, connect it, and create and send a message to a NATS server from it. Finally, we will combine the code into a complete program and summarize what we have learned.

The NatsMessageQueue component

This component can be used to send and receive messages via a NATS server. In order to show how to use it, in the following sections, we will see in a stepwise manner how to create a custom component that sends messages to a NATS server located on our machine.

Pre-requisites

In order to use this component, we need to import it first. The following code shows how to do this:

import { NatsMessageQueue } from 'pip-services3-nats-nodex';
using PipServices3.Nats.Queues;
import nqueues "github.com/pip-services3-gox/pip-services3-nats-gox/queues"
Not available
Not available
Not available

Implementing our component

Once imported, we can create an instance of this class and configure it. In the example below, we consider a NATS server on our machine and port 4222. In addition, we declare autosubscribe as true, which, as the name suggests, creates a subscription to our topic automatically.

let queue = new NatsMessageQueue(); 

queue.configure(ConfigParams.fromTuples(
    "topic", "mytopic",
    "connection.protocol", "nats",
    "connection.host", "localhost",
    "connection.port", 4222,
    "options.autosubscribe", true 
    ));
var queue = new NatsMessageQueue();

queue.Configure(ConfigParams.FromTuples(
    "topic", "mytopic",
    "connection.protocol", "nats",
    "connection.host", "localhost",
    "connection.port", 4222,
    "options.autosubscribe", true
));

queue := nqueues.NewNatsMessageQueue("mytopic")

queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
	// "topic", "mytopic",
	"connection.protocol", "nats",
	"connection.host", "localhost",
	"connection.port", 4222,
	"options.autosubscribe", true,
))
Not available
Not available
Not available

Connecting to NATS

Now that we have defined our component, we connect to the NATS server via the open() method, which takes the correlation_id as an input parameter. The following code shows how to perform this operation:

 await queue.open("123");
await queue.OpenAsync(null);
err := queue.Open(context.Background(), "123")
Not available
Not available
Not available

Creating and sending a message

Once connected, we send a message to our NATS server via the send() method. This method accepts a correlation_id and a MessageEnvelope object as inputs. And, this last object requires a correlation_id, a message type, and the data being sent as inputs. The code below exemplifies how to perform this task:

await queue.send("123", new MessageEnvelope("123", "mymessage", "ABC"));
await queue.SendAsync("123", new MessageEnvelope("123", "mymessage", "ABC"));
err = queue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
if err != nil {
	panic(err)
}
Not available
Not available
Not available

Receiving a message

We can receive messages from our subscribed topic via the receive() method, which accepts a correlation_id and a timeout in milliseconds as input parameters. The code below gives an example of its usage:

let message = await queue.receive("123", 10000); 
var message = await queue.ReceiveAsync("123", 10000); 
message, err := queue.Receive(context.Background(), "123", 10000)
Not available
Not available
Not available

Closing the connection

After the task is concluded, we close the connection to free resources. The following command shows how to do this:

await queue.close("123")
await queue.CloseAsync("123");
err = queue.Close(context.Background(), "123")
Not available
Not available
Not available

Final code

Now, we can combine the code from the previous sections into a program that creates an instance of this class, configures and connects it to our NATS server, and sends and receives a message from it. The code below shows the result of this operation:

// Pre-requisites
import { ConfigParams } from 'pip-services3-commons-nodex';
import { NatsMessageQueue } from 'pip-services3-nats-nodex';

import { MessageEnvelope } from 'pip-services3-messaging-nodex';

export async function main() {

    // Create and configure a component
    let queue = new NatsMessageQueue(); 

    queue.configure(ConfigParams.fromTuples(
        "topic", "mytopic",
        "connection.protocol", "nats",
        "connection.host", "localhost",
        "connection.port", 4222,
        'options.autosubscribe', true 
    ));

    // Connect
    await queue.open("123");

    // Send a message
    await queue.send("123", new MessageEnvelope("123", "mymessage", "ABC"));

    // Receive a message
    let message = await queue.receive("123", 10000); 
    console.log('my message is: ' + message)

    // Close the connection
    await queue.close("123")
    console.log('Program executed');
}
// Pre-requisites
using PipServices3.Commons.Config;
using PipServices3.Nats.Queues;
using PipServices3.Messaging.Queues;

namespace ExampleApp
{
    class Program
    {
        /// <summary>
        /// Running the container
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            // Create and configure a component
            var queue = new NatsMessageQueue();

            queue.Configure(ConfigParams.FromTuples(
                "topic", "mytopic",
                "connection.protocol", "nats",
                "connection.host", "localhost",
                "connection.port", 4222,
                "options.autosubscribe", true
            ));

            // Connect
            queue.OpenAsync(null).Wait();

            // Send a message
            queue.SendAsync("123", new MessageEnvelope("123", "mymessage", "ABC")).Wait();

            // Receive a message
            var message = queue.ReceiveAsync("123", 10000).Result;
            Console.WriteLine("my message is: " + message.GetMessageAsString());

            // Close the connection
            queue.CloseAsync("123").Wait();
            Console.WriteLine("Program executed");
        }
    }
}
// Pre-requisites
import (
	"context"
	"fmt"

	cconf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
	cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
	nqueues "github.com/pip-services3-gox/pip-services3-nats-gox/queues"
)

func main() {
	// Create and configure a component
	queue := nqueues.NewNatsMessageQueue("mytopic")

	queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
		// "topic", "mytopic",
		"connection.protocol", "nats",
		"connection.host", "localhost",
		"connection.port", 4222,
		"options.autosubscribe", true,
	))

	// Connect
	err := queue.Open(context.Background(), "123")
	if err != nil {
		panic(err)
	}

	// Send a message
	err = queue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
	if err != nil {
		panic(err)
	}

	// Receive a message
	message, err := queue.Receive(context.Background(), "123", 10000)
	if err != nil {
		panic(err)
	}

	fmt.Printf("my message is: %s", message)

	// Close the connection
	err = queue.Close(context.Background(), "123")
	if err != nil {
		panic(err)
	}

	fmt.Println("Program executed")
}
Not available
Not available
Not available

Which, after running, will produce the following output

figure 1

Thus, proving that the message was correctly received by the NATS server.

Wrapping up

In this tutorial, we have learned how to use the NatsMessageQueue component. First, we saw the necessary pre-requisites and how to create a configured instance of it. Then, we understood how to send and receive messages from our NATS server. And lastly, we combined the different sections into one comprehensive program.