NATS

How to send and receive messages via a NATS server.

Key takeaways

NatsMessageQueue Component used to send and receive messages via a NATS server.
configure() Method used to set the values of configuration parameters.
send() Method used to send messages to a NATS server.
receive() Method used to receive messages from a NATS server.

Introduction

This tutorial will help you understand how the NatsMessageQueue component is used. First, we will see a brief description of this class and how to import it. Then, we will implement this component, connect it, and create and send a message to a NATS server from it. Finally, we will combine the code into a complete program and summarize what we have learned.

The NatsMessageQueue component

This component can be used to send and receive messages via a NATS server. In order to show how to use it, in the following sections, we will see in a stepwise manner how to create a custom component that sends messages to a NATS server located on our machine.

Pre-requisites

In order to use this component, we need to import it first. The following code shows how to do this:

import { NatsMessageQueue } from 'pip-services4-nats-node';

import nqueues "github.com/pip-services4/pip-services4-go/pip-services4-nats-go/queues"
Not available
Not available
Not available

Implementing our component

Once imported, we can create an instance of this class and configure it. In the example below, we consider a NATS server on our machine and port 4222. In addition, we declare autosubscribe as true, which, as the name suggests, creates a subscription to our topic automatically.

let queue = new NatsMessageQueue(); 

queue.configure(ConfigParams.fromTuples(
    "topic", "mytopic",
    "connection.protocol", "nats",
    "connection.host", "localhost",
    "connection.port", 4222,
    "options.autosubscribe", true 
    ));

import (
	"context"

	cconf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
	nqueues "github.com/pip-services4/pip-services4-go/pip-services4-nats-go/queues"
)

queue := nqueues.NewNatsMessageQueue("mytopic")

queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
	// "topic", "mytopic",
	"connection.protocol", "nats",
	"connection.host", "localhost",
	"connection.port", 4222,
	"options.autosubscribe", true,
))
Not available
Not available
Not available

Connecting to NATS

Now that we have defined our component, we connect to the NATS server via the open() method, which takes the context as an input parameter. The following code shows how to perform this operation:

 await queue.open(ctx);

err := queue.Open(context.Background())
Not available
Not available
Not available

Creating and sending a message

Once connected, we send a message to our NATS server via the send() method. This method accepts a context and a MessageEnvelope objects as inputs. And, this last object requires a context, a message type, and the data being sent as inputs. The code below exemplifies how to perform this task:

await queue.send(ctx, new MessageEnvelope(ctx, "mymessage", "ABC"));


import cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"

err = queue.Send(context.Background(), cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
if err != nil {
	panic(err)
}
Not available
Not available
Not available

Receiving a message

We can receive messages from our subscribed topic via the receive() method, which accepts a context and a timeout in milliseconds as input parameters. The code below gives an example of its usage:

let message = await queue.receive(ctx, 10000); 


message, err := queue.Receive(context.Background(), 10000)
Not available
Not available
Not available

Closing the connection

After the task is concluded, we close the connection to free resources. The following command shows how to do this:

await queue.close(ctx)



err = queue.Close(context.Background())
Not available
Not available
Not available

Final code

Now, we can combine the code from the previous sections into a program that creates an instance of this class, configures and connects it to our NATS server, and sends and receives a message from it. The code below shows the result of this operation:

import { ConfigParams } from "pip-services4-components-node";
import { MessageEnvelope } from "pip-services4-messaging-node";
import { NatsMessageQueue } from 'pip-services4-nats-node';

export async function main() {

    // Create and configure a component
    let queue = new NatsMessageQueue(); 

    queue.configure(ConfigParams.fromTuples(
        "topic", "mytopic",
        "connection.protocol", "nats",
        "connection.host", "localhost",
        "connection.port", 4222,
        'options.autosubscribe', true 
    ));

    // Connect
    await queue.open(ctx);

    // Send a message
    await queue.send(ctx, new MessageEnvelope(ctx, "mymessage", "ABC"));

    // Receive a message
    let message = await queue.receive(ctx, 10000); 
    console.log('my message is: ' + message)

    // Close the connection
    await queue.close(ctx)
    console.log('Program executed');
}

import (
	"context"
	"fmt"

	cconf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
	cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
	nqueues "github.com/pip-services4/pip-services4-go/pip-services4-nats-go/queues"
)

func main() {
	// Create and configure a component
	queue := nqueues.NewNatsMessageQueue("mytopic")

	queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
		// "topic", "mytopic",
		"connection.protocol", "nats",
		"connection.host", "localhost",
		"connection.port", 4222,
		"options.autosubscribe", true,
	))

	// Connect
	err := queue.Open(context.Background())
	if err != nil {
		panic(err)
	}

	// Send a message
	err = queue.Send(context.Background(), cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
	if err != nil {
		panic(err)
	}

	// Receive a message
	message, err := queue.Receive(context.Background(), 10000)
	if err != nil {
		panic(err)
	}

	fmt.Printf("my message is: %s", message)

	// Close the connection
	err = queue.Close(context.Background())
	if err != nil {
		panic(err)
	}

	fmt.Println("Program executed")
}
Not available
Not available
Not available

Which, after running, will produce the following output

figure 1

Thus, proving that the message was correctly received by the NATS server.

Wrapping up

In this tutorial, we have learned how to use the NatsMessageQueue component. First, we saw the necessary pre-requisites and how to create a configured instance of it. Then, we understood how to send and receive messages from our NATS server. And lastly, we combined the different sections into one comprehensive program.