Messaging basics

How to use the MemoryMessageQueue component.

Key takeaways

IMessageReceiver Interface that defines the main methods for a message receiver component.
IMessageQueue Interface that defines the basic methods for a message queue.
MemoryMessageQueue In-memory implementation of a message queue that can be used to send and receive messages.
listen() Method used to listen for incoming messages.
send() Method used to send a message into a queue.
open() Method used to open a component (e.g. MemoryMessageQueue) before use.
close() Method used to close a component (e.g. MemoryMessageQueue) after use and free used resources.
MessageEnvelope Component that is used to add information to a message.

Introduction

This tutorial will help you understand how and when to use the MemoryMessageQueue component. First, it explains two interfaces, namely, IMessageQueue and IMessageReceiver, which have to be implemented by message queues and message receivers, respectively. Next, it describes the basics of the MemoryMessageQueue component and its prerequisites. Following this, it explains how to create a message receiver and an instance of the MemoryMessageQueue, add a listener to the queue, send and receive a message, and close the component. It ends by grouping the various pieces into a single program and wrapping up the concepts seen in this tutorial.

IMessageQueue and IMessageReceiver

The IMessageQueue interface defines the basic methods for a message queue and needs to be implemented by all components of this type. The MemoryMessageQueue class implements it via its parent class MessageQueue.

The IMessageReceiver interface defines the receiveMessage() method, and must be implemented by all message receiver components. In the example presented in the following sections, we define a class named MyMessageReciever, which implements this interface.

The diagrams below show the main relations between these interfaces and the components covered in this tutorial.

figure 1

MemoryMessageQueue

This component provides an in-memory implementation of a message queue that can be used to send and receive messages. It is usually used for testing purposes.

Pre-requisites

In order to use this component, we need to import it first. The following command shows how to do this:

import { MemoryMessageQueue } from "pip-services4-messaging-node";
Not available
import cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
Not available
from pip_services4_messaging.queues import MemoryMessageQueue
Not available

Message receiver

Following this, we create a message receiver that prints a customized message once a message is received.

import { IMessageQueue, IMessageReceiver, MessageEnvelope } from "pip-services4-messaging-node";

export class MyMessageReceiver implements IMessageReceiver {
    public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
        console.log("Received message: " + envelope.getMessageAsString());
    }
}
Not available
import (
	"context"
	"fmt"

	cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
)

type MyMessageReceiver struct {
}

func NewMyMessageReceiver() *MyMessageReceiver {
	c := &MyMessageReceiver{}
	return c
}

func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
	fmt.Println("Received message: " + envelope.GetMessageAsString())
	return nil
}
Not available
from pip_services4_messaging.queues import IMessageReceiver

class MyMessageReceiver(IMessageReceiver):
    def receive_message(self, envelop, queue):
        print("Received message: " + envelop.get_message_as_string())
Not available

Creating a MemoryMessageQueue component

The next step is to create an instance of the MemoryMessageQueue class and open it. The following code shows how to do this:

let messageQueue = new MemoryMessageQueue();
await messageQueue.open(ctx)  
Not available
messageQueue := cqueues.NewMemoryMessageQueue("my_queue")
err := messageQueue.Open(context.Background())
Not available
from pip_services4_components.context import Context

context_data = {
  "traceId": "123",
}

my_context = Context(context_data)

messageQueue = MemoryMessageQueue()
messageQueue.open(my_context) 
Not available

Creating a listener

Now, we need to create a listener that waits for messages to arrive in the queue. In order for this process not to collide with the message sending, we create it in a separate thread. The following code does just that:

new Promise(() => messageQueue.listen(ctx, new MyMessageReceiver()));

Not available
go messageQueue.Listen(context.Background(), NewMyMessageReceiver())
Not available
from threading import Thread

Thread(target=messageQueue.listen, args=(my_context, MyMessageReceiver()), daemon=True).start()
Not available

Sending a message

After creating a listener, we send a message to the queue using a MessageEnvelope component. This component allows us to add extra information to the message, such as a message type. The following code explains how this can be done:

import { MessageEnvelope } from "pip-services4-messaging-node";

await messageQueue.send(ctx, new MessageEnvelope(null, "mymessage", "ABC"));
Not available
err := messageQueue.Send(context.Background(), cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
Not available
from threading import Thread

Thread(target=messageQueue.listen, args=(my_context, MyMessageReceiver()), daemon=True).start()
Not available

Receiving a message

After sending the message, the listener receives it and the message receiver prints the following text:

figure 1

Closing the MemoryMessageQueue

Once we’re done using the memory queue, we close it to free resources. The following code shows how to do this:

messageQueue.close(ctx) 
Not available
err = messageQueue.Close(context.Background())
Not available
messageQueue.close(my_context)  
Not available

Final code

Now, let’s put all of these concepts together into a single program. The code below shows what the end results looks like:

import { IMessageQueue, IMessageReceiver, MemoryMessageQueue, MessageEnvelope } from "pip-services4-messaging-node";

export async function main() {
    // Message queue
    let messageQueue = new MemoryMessageQueue();
    await messageQueue.open(ctx);

    // Listener
    new Promise(() => messageQueue.listen(ctx, new MyMessageReceiver()));
    
    // Send message
    await messageQueue.send(ctx, new MessageEnvelope(null, "mymessage", "ABC"));

    // Close message queue
    await messageQueue.close(ctx);
}

// Message receiver
export class MyMessageReceiver implements IMessageReceiver {
    public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
        console.log("Received message: " + envelope.getMessageAsString());
    }
}
Not available
import (
	"context"
	"fmt"
	"time"

	cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
)

func main() {
	// Message queue
	messageQueue := cqueues.NewMemoryMessageQueue("my_queue")
	err := messageQueue.Open(context.Background())
	if err != nil {
		panic(err)
	}

	// Listener
	go messageQueue.Listen(context.Background(), NewMyMessageReceiver())

	// Send message
	err = messageQueue.Send(context.Background(), cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
	if err != nil {
		panic(err)
	}

	<-time.After(100 * time.Millisecond)

	// Close message queue
	err = messageQueue.Close(context.Background())
	if err != nil {
		panic(err)
	}
}

// Message receiver
type MyMessageReceiver struct {
}

func NewMyMessageReceiver() *MyMessageReceiver {
	c := &MyMessageReceiver{}
	return c
}

func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
	fmt.Println("Received message: " + envelope.GetMessageAsString())
	return nil
}
Not available
# Pre-requisites
import time
from threading import Thread
from pip_services4_messaging.queues import IMessageReceiver, MemoryMessageQueue, MessageEnvelope

from pip_services4_components.context import IContext

my_context = IContext()

# Message receiver
class MyMessageReceiver(IMessageReceiver):
    def receive_message(self, envelop, queue):
        print("Received message: " + envelop.get_message_as_string())

# Message queue
messageQueue = MemoryMessageQueue()
messageQueue.open(my_context)

# Listener
Thread(target=messageQueue.listen, args=(my_context, MyMessageReceiver()), daemon=True).start()

# Send message
messageQueue.send(my_context, MessageEnvelope(None, "mymessage", "ABC")) 
time.sleep(0.1)  # wait message

# Close message queue
messageQueue.close(my_context)

Not available

Wrapping up

In this tutorial, we learned how to use the MemoryMessageQueue, which, as the name suggests, works by storing messages in memory.

In order to understand the dynamics of this component, we first explored the IMessageQueueInterface and IMessageReceiver interfaces. Next, we created a message receiver, a memory message queue, and a listener for the queue. Then, we sent a message and saw that it was received by our queue. Finally, we combined all of these concepts together to create a simple program that works with a queue.