Messaging basics

How to use the MemoryMessageQueue component.

Key takeaways

IMessageReceiver Interface that defines the main methods for a message receiver component.
IMessageQueue Interface that defines the basic methods for a message queue.
MemoryMessageQueue In-memory implementation of a message queue that can be used to send and receive messages.
listen() Method used to listen for incoming messages.
send() Method used to send a message into a queue.
open() Method used to open a component (e.g. MemoryMessageQueue) before use.
close() Method used to close a component (e.g. MemoryMessageQueue) after use and free used resources.
MessageEnvelope Component that is used to add information to a message.

Introduction

This tutorial will help you understand how and when to use the MemoryMessageQueue component. First, it explains two interfaces, namely, IMessageQueue and IMessageReceiver, which have to be implemented by message queues and message receivers, respectively. Next, it describes the basics of the MemoryMessageQueue component and its prerequisites. Following this, it explains how to create a message receiver and an instance of the MemoryMessageQueue, add a listener to the queue, send and receive a message, and close the component. It ends by grouping the various pieces into a single program and wrapping up the concepts seen in this tutorial.

IMessageQueue and IMessageReceiver

The IMessageQueue interface defines the basic methods for a message queue and needs to be implemented by all components of this type. The MemoryMessageQueue class implements it via its parent class - MessageQueue.

The IMessageReceiver interface defines the receiveMessage() method, and must be implemented by all message receiver components. In the example presented in the following sections, we define a class named MyMessageReciever, which implements this interface.

The diagrams below show the main relations between these interfaces and the components covered in this tutorial.

figure 1

MemoryMessageQueue

This component provides an in-memory implementation of a message queue that can be used to send and receive messages. It is usually used for testing purposes.

Pre-requisites

In order to use this component, we need to import it first. The following command shows how to do this:

import { MemoryMessageQueue } from "pip-services3-messaging-nodex";
using PipServices3.Messaging.Queues;
import (
	cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)
import 'package:pip_services3_messaging/pip_services3_messaging.dart';
from pip_services3_messaging.queues import MemoryMessageQueue
Not available

Message receiver

Following this, we create a message receiver that prints a customized message once a message is received.

import { IMessageQueue } from "pip-services3-messaging-nodex";

export class MyMessageReceiver implements IMessageReceiver {
    public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
        console.log("Received message: " + envelope.getMessageAsString());
    }
}
using PipServices3.Messaging.Queues;

class MyMessageReceiver : IMessageReceiver
{
    public async Task ReceiveMessageAsync(MessageEnvelope envelope, IMessageQueue queue)
    {
        Console.WriteLine("Received message: " + envelope.GetMessageAsString());
    }
}

import (
	"context"
	"fmt"

	cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)

type MyMessageReceiver struct {
}

func NewMyMessageReceiver() *MyMessageReceiver {
	c := &MyMessageReceiver{}
	return c
}

func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
	fmt.Println("Received message: " + envelope.GetMessageAsString())
	return nil
}
import 'package:pip_services3_messaging/pip_services3_messaging.dart';

class MyMessageReceiver implements IMessageReceiver {
  @override
  Future receiveMessage(MessageEnvelope envelope, IMessageQueue queue) async {
    print('Received message: ' + envelope.getMessageAsString());
  }
}
from pip_services3_messaging.queues import IMessageReceiver

class MyMessageReceiver(IMessageReceiver):
    def receive_message(self, envelop, queue):
        print("Received message: " + envelop.get_message_as_string())
Not available

Creating a MemoryMessageQueue component

The next step is to create an instance of the MemoryMessageQueue class and open it. The following code shows how to do this:

let messageQueue = new MemoryMessageQueue();
await messageQueue.open("123")    // correlationId = "123"
var messageQueue = new MemoryMessageQueue();
await messageQueue.OpenAsync("123");   // correlationId = "123"

messageQueue := cqueues.NewMemoryMessageQueue("my_queue")
err := messageQueue.Open(context.Background(), "123")
var messageQueue = MemoryMessageQueue();
await messageQueue.open('123');  // correlationId = "123"
messageQueue = MemoryMessageQueue()
messageQueue.open("123")   # correlationId = "123"
Not available

Creating a listener

Now, we need to create a listener that waits for messages to arrive in the queue. In order for this process not to collide with the message sending, we create it in a separate thread. The following code does just that:

new Promise(() => messageQueue.listen("123", new MyMessageReceiver()));
new Thread(() =>
{
    Thread.CurrentThread.IsBackground = true;
    messageQueue.ListenAsync("123", new MyMessageReceiver()).Wait();
}).Start();
go messageQueue.Listen(context.Background(), "123", NewMyMessageReceiver())
Future(() {
  messageQueue.listen('123', MyMessageReceiver());
});
Thread(target=messageQueue.listen, args=("123", MyMessageReceiver()), daemon=True).start()
Not available

Sending a message

After creating a listener, we send a message to the queue. For this, we use a MessageEnvelope component, which allows us to add extra information to the message, such as a message type. The following code explains how this can be done:

import { MessageEnvelope } from "pip-services3-messaging-nodex";

await messageQueue.send("123", new MessageEnvelope(null, "mymessage", "ABC"));
using PipServices3.Messaging.Queues;

await messageQueue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC"));
import (
	"context"
	"fmt"

	cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)

err := messageQueue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
import 'package:pip_services3_messaging/pip_services3_messaging.dart';

await messageQueue.send('123', MessageEnvelope(null, 'mymessage', 'ABC'));
from pip_services3_messaging.queues import MessageEnvelope

messageQueue.send("123", MessageEnvelope(None, "mymessage", "ABC")) 
Not available

Receiving a message

After sending the message, the listener receives it and the message receiver prints the following text:

figure 1

Closing the MemoryMessageQueue

Once we’re done using the memory queue, we close it to free resources. The following code shows how to do this:

messageQueue.close('123')   // correlationId = "123"
await messageQueue.CloseAsync("123");  // correlationId = "123"
err = messageQueue.Close(context.Background(), "123")
await messageQueue.close('123');    // correlationId = "123"
messageQueue.close('123')  # correlationId = "123"
Not available

Final code

Now, let’s put all of these concepts together into a single program. The code below shows what the end results looks like:

import { IMessageQueue, IMessageReceiver, MemoryMessageQueue, MessageEnvelope } from "pip-services3-messaging-nodex";

export async function main() {
    // Message queue
    let messageQueue = new MemoryMessageQueue();
    await messageQueue.open("123");

    // Listener
    new Promise(() => messageQueue.listen("123", new MyMessageReceiver()));
    
    // Send message
    await messageQueue.send("123", new MessageEnvelope(null, "mymessage", "ABC"));

    // Close message queue
    await messageQueue.close('123');
}

// Message receiver
export class MyMessageReceiver implements IMessageReceiver {
    public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
        console.log("Received message: " + envelope.getMessageAsString());
    }
}

using System;
using System.Threading;
using System.Threading.Tasks;
using PipServices3.Messaging.Queues;

namespace ExampleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            // Message queue
            var messageQueue = new MemoryMessageQueue();
            messageQueue.OpenAsync("123").Wait();

            // Listener
            new Thread(() =>
            {
                Thread.CurrentThread.IsBackground = true;
                messageQueue.ListenAsync("123", new MyMessageReceiver()).Wait();
            }).Start();

            // Send message
            messageQueue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC")).Wait();
            // Close message queue
            messageQueue.CloseAsync("123").Wait();
        }
    }

    // Message receiver
    class MyMessageReceiver : IMessageReceiver
    {
        public async Task ReceiveMessageAsync(MessageEnvelope envelope, IMessageQueue queue)
        {
            Console.WriteLine("Received message: " + envelope.GetMessageAsString());
        }
    }
}

import (
	"context"
	"fmt"
	"time"

	cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)

func main() {
	// Message queue
	messageQueue := cqueues.NewMemoryMessageQueue("my_queue")
	err := messageQueue.Open(context.Background(), "123")
	if err != nil {
		panic(err)
	}

	// Listener
	go messageQueue.Listen(context.Background(), "123", NewMyMessageReceiver())

	// Send message
	err = messageQueue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
	if err != nil {
		panic(err)
	}

	<-time.After(100 * time.Millisecond)

	// Close message queue
	err = messageQueue.Close(context.Background(), "123")
	if err != nil {
		panic(err)
	}
}

// Message receiver
type MyMessageReceiver struct {
}

func NewMyMessageReceiver() *MyMessageReceiver {
	c := &MyMessageReceiver{}
	return c
}

func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
	fmt.Println("Received message: " + envelope.GetMessageAsString())
	return nil
}

import 'package:pip_services3_messaging/pip_services3_messaging.dart';

void main(List<String> argument) async {
  // Message queue
  var messageQueue = MemoryMessageQueue();
  await messageQueue.open('123');

  // Listener
  // ignore: unawaited_futures
  Future(() {
    messageQueue.listen('123', MyMessageReceiver());
  });

  // Send message
  await messageQueue.send('123', MessageEnvelope(null, 'mymessage', 'ABC'));

  // Close message queue
  await messageQueue.close('123');
}

// Message receiver
class MyMessageReceiver implements IMessageReceiver {
  @override
  Future receiveMessage(MessageEnvelope envelope, IMessageQueue queue) async {
    print('Received message: ' + envelope.getMessageAsString());
  }
}

# Pre-requisites
import time
from threading import Thread
from pip_services3_messaging.queues import IMessageReceiver, MemoryMessageQueue, MessageEnvelope

# Message receiver
class MyMessageReceiver(IMessageReceiver):
    def receive_message(self, envelop, queue):
        print("Received message: " + envelop.get_message_as_string())

# Message queue
messageQueue = MemoryMessageQueue()
messageQueue.open("123")

# Listener
Thread(target=messageQueue.listen, args=("123", MyMessageReceiver()), daemon=True).start()

# Send message
messageQueue.send("123", MessageEnvelope(None, "mymessage", "ABC")) 
time.sleep(0.1)  # wait message

# Close message queue
messageQueue.close('123')

Not available

Wrapping up

In this tutorial, we learned how to use the MemoryMessageQueue, which, as the name suggests, works by storing messages in memory.

In order to understand the dynamics of this component, we first explored the IMessageQueueInterface and IMessageReceiver interfaces. Next, we created a message receiver, a memory message queue, and a listener for the queue. Then, we sent a message and saw that it was received by our queue. Finally, we combined all of these concepts together to create a simple program that works with a queue.