RabbitMQ

How to send and receive messages via RabbitMQ.

Key takeaways

RabbitMQMessageQueue Component used to send and receive messages via RabbitMQ.
OpenAsync() Method used to connect to RabbitMQ.
SendAsync() Method used to send messages to RabbitMQ.
ReceiveAsync() Method used to receive messages from RabbitMQ.
queue, exchange, host, port (or uri) Required configuration parameters.

Introduction

In this tutorial, you will learn how to send and receive messages via RabbitMQ. First, we will see the necessary pre-requisites. Then, we will learn how to implement this component, connect to RabbitMQ and send and receive messages from it. Finally, we will combine the different sections into a program and summarize all the learned concepts.

The RabbitMQMessageQueue component

This component is part of the RabbitMQ module and represents a message queue that sends and receives messages via the RabbitMQ message broker. Furthermore, it is a subclass of MessageQueue. The following sections explain how to use it.

Pre-requisites

In order to use this component, we need to import it first. The following code shows how to do this:

import { RabbitMQMessageQueue } from 'pip-services3-rbbitmq-nodex'
using PipServices3.RabbitMQ.Queues;
import rqueues "github.com/pip-services3-gox/pip-services3-rabbitmq-gox/queues"
Not available
import 'package:pip_services3_rabbitmq/pip_services3_rabbitmq.dart';
Not available

Implementing our component

First, and after having imported our component, we create an instance of it:

let queue = new RabbitMQMessageQueue();
var queue = new RabbitMQMessageQueue();
queue := rqueues.NewEmptyRabbitMQMessageQueue("my_topic")
var queue = RabbitMQMessageQueue('my_test');
Not available
Not available

Configuring our component

After creating an instance of our component, we need to configure it. Here, an important point is to understand the different parameters involved in this operation. The following table summarizes them:

figure 1

To configure our object, we use the configure() method, which accepts a ConfigParams object as input. In our example, we define the RabbitMQ exchange, the queue name, the host, and the port. We also assign the value true to auto_create. In this manner, if the queue doesn’t exist in RabbitMQ, it is created. Furthermore, for the example’s purpose, we consider the guest user. But, if we want to refer to another user, we also need to specify the necessary credentials (username and password).

import { ConfigParams } from 'pip-services3-commons-nodex';

queue.configure(ConfigParams.fromTuples(
    "exchange", "myqueue", // rabbitmq exchange type
    "queue", "myqueue", // queue name
    "options.auto_create", true, // autocreate queue
    "connection.host", "localhost",
    "connection.port", 5672
    // if need credentials
    //"credential.username", "user",
    //"credential.password", "pass123"
));
using PipServices3.Commons.Config;

queue.Configure(ConfigParams.FromTuples(
    "exchange", "myqueue", // rabbitmq exchange type
    "queue", "myqueue", // queue name
    "options.auto_create", true, // autocreate queue
    "connection.host", "localhost",
    "connection.port", 5672
    // if need credentials
    //"credential.username", "user",
    //"credential.password", "pass123"
));
import (
	"context"

	cconf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
	rqueues "github.com/pip-services3-gox/pip-services3-rabbitmq-gox/queues"
)

queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
	"exchange", "myqueue", // rabbitmq exchange type
	"queue", "myqueue", // queue name
	"options.auto_create", true, // autocreate queue
	"connection.host", "localhost",
	"connection.port", 5672,
	// if need credentials
	//"credential.username", "user",
	//"credential.password", "pass123"
))
import 'package:pip_services3_commons/pip_services3_commons.dart';

queue.configure(ConfigParams.fromTuples([
  'exchange',
  'myqueue', // rabbitmq exchange type
  'queue',
  'myqueue', // queue name
  'options.auto_create',
  true, // autocreate queue
  'connection.host',
  'localhost',
  'connection.port',
  5672
  // if need credentials
  //"credential.username", "user",
  //"credential.password", "pass123"
]));
Not available
Not available

Connecting to RabbitMQ

To connect to RabbitMQ, we use the OpenAsync() method, which requires the correlation_id as an input parameter. The following example shows how to connect our previously defined queue:

await queue.open("123");
await queue.OpenAsync("123");
err := queue.Open(context.Background(), "123")
if err != nil {
	panic(err)
}
await queue.open('123');
Not available
Not available

Creating and sending a message

Once connected, we can send a message to RabbitMQ. For this, we use the SendAsync() method, which accepts the correlation_id and a MessageEnvelope object as inputs. This last object contains the correlation_id, message type, and message content as inputs. The following code shows how to do this:

import { MessageEnvelope } from 'pip-services3-messaging-nodex';

await queue.send("123", new MessageEnvelope(null, "mymessage", "ABC"));

using PipServices3.Messaging.Queues;

await queue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC"));
import cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"

err = queue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
import 'package:pip_services3_messaging/pip_services3_messaging.dart';

await queue.send('123', MessageEnvelope(null, 'mymessage', 'ABC'));
Not available
Not available

Receiving a message

To receive a message, we use the RecevieAsync() method, which has the correlation_id and the waiting time in milliseconds as input parameters. The following example shows how to use it:

let received = await queue.receive("123", 10000);
var received = await queue.ReceiveAsync("123", 0);
received, err := queue.Receive(context.Background(), "123", 0)
var received = await queue.receive('123', 0);
Not available
Not available

Final code

Now, we assemble all that was learned into one program. The result is:

import { ConfigParams } from 'pip-services3-commons-nodex';
import { MessageEnvelope } from 'pip-services3-messaging-nodex';
import { RabbitMQMessageQueue } from 'pip-services3-rbbitmq-nodex'

export async function main() {

    let queue = new RabbitMQMessageQueue();

    queue.configure(ConfigParams.fromTuples(
        "exchange", "myqueue", // rabbitmq exchange type
        "queue", "myqueue", // queue name
        "options.auto_create", true, // autocreate queue

        "connection.host", "localhost",
        "connection.port", 5672
        // if need credentials
        //"credential.username", "user",
        //"credential.password", "pass123"
    ));

    await queue.open("123");

    await queue.send("123", new MessageEnvelope(null, "mymessage", "ABC"));

    let received = await queue.receive("123", 10000);

    console.log(received.getMessageAsString());
    console.log("Task Completed");
}
using System;
using System.Threading.Tasks;

using PipServices3.Commons.Config;
using PipServices3.RabbitMQ.Queues;
using PipServices3.Messaging.Queues;


namespace ConsoleApp1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            ExampleStart().Wait();
        }

        public static async Task ExampleStart()
        {
            var queue = new RabbitMQMessageQueue();

            queue.Configure(ConfigParams.FromTuples(
                "exchange", "myqueue", // rabbitmq exchange type
                "queue", "myqueue", // queue name
                "options.auto_create", true, // autocreate queue

                "connection.host", "localhost",
                "connection.port", 5672
                // if need credentials
                //"credential.username", "user",
                //"credential.password", "pass123"
            ));

            await queue.OpenAsync("123");

            await queue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC"));

            var received = await queue.ReceiveAsync("123", 10000);

            Console.WriteLine(received.GetMessageAsString());
            Console.WriteLine("Task completed");
        }
    }
}
import (
	"context"
	"fmt"

	cconf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
	cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
	rqueues "github.com/pip-services3-gox/pip-services3-rabbitmq-gox/queues"
)

func main() {
	// Create and configure a component
	queue := rqueues.NewEmptyRabbitMQMessageQueue("my_topic")

	queue.Configure(context.Background(), cconf.NewConfigParamsFromTuples(
		"exchange", "myqueue", // rabbitmq exchange type
		"queue", "myqueue", // queue name
		"options.auto_create", true, // autocreate queue
		"connection.host", "localhost",
		"connection.port", 5672,
		// if need credentials
		"credential.username", "user",
		"credential.password", "password",
	))

	err := queue.Open(context.Background(), "123")
	if err != nil {
		panic(err)
	}

	err = queue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
	if err != nil {
		panic(err)
	}

	received, err := queue.Receive(context.Background(), "123", 10000)
	if err != nil {
		panic(err)
	}

	fmt.Println(received.GetMessageAsString())
	fmt.Println("Task completed")
}
import 'package:pip_services3_commons/pip_services3_commons.dart';
import 'package:pip_services3_messaging/pip_services3_messaging.dart';
import 'package:pip_services3_rabbitmq/pip_services3_rabbitmq.dart';

/// Running the container
void main(List<String> argument) async {
  var queue = RabbitMQMessageQueue('my_test');

  queue.configure(ConfigParams.fromTuples([
    'exchange',
    'myqueue', // rabbitmq exchange type
    'queue',
    'myqueue', // queue name
    'options.auto_create',
    true, // autocreate queue
    'connection.host',
    'localhost',
    'connection.port',
    5672,
    // if need credentials
    'credential.username', 'user',
    'credential.password', 'pass123'
  ]));

  await queue.open('123');

  await queue.send('123', MessageEnvelope(null, 'mymessage', 'ABC'));

  var received = await queue.receive('123', 10000); // receive is not supported in dart

  print(received.getMessageAsString());
  print('Task completed');
}

Not available
Not available

Which, after running, produces the following output that confirms the message was received and sent by RabbitMQ:

figure 2

Wrapping up

In this tutorial, we have seen how to communicate with RabbitMQ by sending and receiving messages. First, we learned the basics of the RabbitMQMessageQueue component and how to import, implement and configure it. Then, we understood the different configuration parameters. Finally, we saw how to send and receive messages and created a program that combined all the learned concepts.