Messaging basics
Key takeaways
IMessageReceiver | Interface that defines the main methods for a message receiver component. |
IMessageQueue | Interface that defines the basic methods for a message queue. |
MemoryMessageQueue | In-memory implementation of a message queue that can be used to send and receive messages. |
listen() | Method used to listen for incoming messages. |
send() | Method used to send a message into a queue. |
open() | Method used to open a component (e.g. MemoryMessageQueue) before use. |
close() | Method used to close a component (e.g. MemoryMessageQueue) after use and free used resources. |
MessageEnvelope | Component that is used to add information to a message. |
Introduction
This tutorial will help you understand how and when to use the MemoryMessageQueue component. First, it explains two interfaces, namely, IMessageQueue and IMessageReceiver, which have to be implemented by message queues and message receivers, respectively. Next, it describes the basics of the MemoryMessageQueue component and its prerequisites. Following this, it explains how to create a message receiver and an instance of the MemoryMessageQueue, add a listener to the queue, send and receive a message, and close the component. It ends by grouping the various pieces into a single program and wrapping up the concepts seen in this tutorial.
IMessageQueue and IMessageReceiver
The IMessageQueue interface defines the basic methods for a message queue and needs to be implemented by all components of this type. The MemoryMessageQueue class implements it via its parent class - MessageQueue.
The IMessageReceiver interface defines the receiveMessage() method, and must be implemented by all message receiver components. In the example presented in the following sections, we define a class named MyMessageReciever, which implements this interface.
The diagrams below show the main relations between these interfaces and the components covered in this tutorial.
MemoryMessageQueue
This component provides an in-memory implementation of a message queue that can be used to send and receive messages. It is usually used for testing purposes.
Pre-requisites
In order to use this component, we need to import it first. The following command shows how to do this:
import { MemoryMessageQueue } from "pip-services3-messaging-nodex";
using PipServices3.Messaging.Queues;
import (
cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)
import 'package:pip_services3_messaging/pip_services3_messaging.dart';
from pip_services3_messaging.queues import MemoryMessageQueue
Message receiver
Following this, we create a message receiver that prints a customized message once a message is received.
import { IMessageQueue } from "pip-services3-messaging-nodex";
export class MyMessageReceiver implements IMessageReceiver {
public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
console.log("Received message: " + envelope.getMessageAsString());
}
}
using PipServices3.Messaging.Queues;
class MyMessageReceiver : IMessageReceiver
{
public async Task ReceiveMessageAsync(MessageEnvelope envelope, IMessageQueue queue)
{
Console.WriteLine("Received message: " + envelope.GetMessageAsString());
}
}
import (
"context"
"fmt"
cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)
type MyMessageReceiver struct {
}
func NewMyMessageReceiver() *MyMessageReceiver {
c := &MyMessageReceiver{}
return c
}
func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
fmt.Println("Received message: " + envelope.GetMessageAsString())
return nil
}
import 'package:pip_services3_messaging/pip_services3_messaging.dart';
class MyMessageReceiver implements IMessageReceiver {
@override
Future receiveMessage(MessageEnvelope envelope, IMessageQueue queue) async {
print('Received message: ' + envelope.getMessageAsString());
}
}
from pip_services3_messaging.queues import IMessageReceiver
class MyMessageReceiver(IMessageReceiver):
def receive_message(self, envelop, queue):
print("Received message: " + envelop.get_message_as_string())
Creating a MemoryMessageQueue component
The next step is to create an instance of the MemoryMessageQueue class and open it. The following code shows how to do this:
let messageQueue = new MemoryMessageQueue();
await messageQueue.open("123") // correlationId = "123"
var messageQueue = new MemoryMessageQueue();
await messageQueue.OpenAsync("123"); // correlationId = "123"
messageQueue := cqueues.NewMemoryMessageQueue("my_queue")
err := messageQueue.Open(context.Background(), "123")
var messageQueue = MemoryMessageQueue();
await messageQueue.open('123'); // correlationId = "123"
messageQueue = MemoryMessageQueue()
messageQueue.open("123") # correlationId = "123"
Creating a listener
Now, we need to create a listener that waits for messages to arrive in the queue. In order for this process not to collide with the message sending, we create it in a separate thread. The following code does just that:
new Promise(() => messageQueue.listen("123", new MyMessageReceiver()));
new Thread(() =>
{
Thread.CurrentThread.IsBackground = true;
messageQueue.ListenAsync("123", new MyMessageReceiver()).Wait();
}).Start();
go messageQueue.Listen(context.Background(), "123", NewMyMessageReceiver())
Future(() {
messageQueue.listen('123', MyMessageReceiver());
});
Thread(target=messageQueue.listen, args=("123", MyMessageReceiver()), daemon=True).start()
Sending a message
After creating a listener, we send a message to the queue. For this, we use a MessageEnvelope component, which allows us to add extra information to the message, such as a message type. The following code explains how this can be done:
import { MessageEnvelope } from "pip-services3-messaging-nodex";
await messageQueue.send("123", new MessageEnvelope(null, "mymessage", "ABC"));
using PipServices3.Messaging.Queues;
await messageQueue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC"));
import (
"context"
"fmt"
cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)
err := messageQueue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
import 'package:pip_services3_messaging/pip_services3_messaging.dart';
await messageQueue.send('123', MessageEnvelope(null, 'mymessage', 'ABC'));
from pip_services3_messaging.queues import MessageEnvelope
messageQueue.send("123", MessageEnvelope(None, "mymessage", "ABC"))
Receiving a message
After sending the message, the listener receives it and the message receiver prints the following text:
Closing the MemoryMessageQueue
Once we’re done using the memory queue, we close it to free resources. The following code shows how to do this:
messageQueue.close('123') // correlationId = "123"
await messageQueue.CloseAsync("123"); // correlationId = "123"
err = messageQueue.Close(context.Background(), "123")
await messageQueue.close('123'); // correlationId = "123"
messageQueue.close('123') # correlationId = "123"
Final code
Now, let’s put all of these concepts together into a single program. The code below shows what the end results looks like:
import { IMessageQueue, IMessageReceiver, MemoryMessageQueue, MessageEnvelope } from "pip-services3-messaging-nodex";
export async function main() {
// Message queue
let messageQueue = new MemoryMessageQueue();
await messageQueue.open("123");
// Listener
new Promise(() => messageQueue.listen("123", new MyMessageReceiver()));
// Send message
await messageQueue.send("123", new MessageEnvelope(null, "mymessage", "ABC"));
// Close message queue
await messageQueue.close('123');
}
// Message receiver
export class MyMessageReceiver implements IMessageReceiver {
public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
console.log("Received message: " + envelope.getMessageAsString());
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using PipServices3.Messaging.Queues;
namespace ExampleApp
{
class Program
{
static void Main(string[] args)
{
// Message queue
var messageQueue = new MemoryMessageQueue();
messageQueue.OpenAsync("123").Wait();
// Listener
new Thread(() =>
{
Thread.CurrentThread.IsBackground = true;
messageQueue.ListenAsync("123", new MyMessageReceiver()).Wait();
}).Start();
// Send message
messageQueue.SendAsync("123", new MessageEnvelope(null, "mymessage", "ABC")).Wait();
// Close message queue
messageQueue.CloseAsync("123").Wait();
}
}
// Message receiver
class MyMessageReceiver : IMessageReceiver
{
public async Task ReceiveMessageAsync(MessageEnvelope envelope, IMessageQueue queue)
{
Console.WriteLine("Received message: " + envelope.GetMessageAsString());
}
}
}
import (
"context"
"fmt"
"time"
cqueues "github.com/pip-services3-gox/pip-services3-messaging-gox/queues"
)
func main() {
// Message queue
messageQueue := cqueues.NewMemoryMessageQueue("my_queue")
err := messageQueue.Open(context.Background(), "123")
if err != nil {
panic(err)
}
// Listener
go messageQueue.Listen(context.Background(), "123", NewMyMessageReceiver())
// Send message
err = messageQueue.Send(context.Background(), "123", cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
if err != nil {
panic(err)
}
<-time.After(100 * time.Millisecond)
// Close message queue
err = messageQueue.Close(context.Background(), "123")
if err != nil {
panic(err)
}
}
// Message receiver
type MyMessageReceiver struct {
}
func NewMyMessageReceiver() *MyMessageReceiver {
c := &MyMessageReceiver{}
return c
}
func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
fmt.Println("Received message: " + envelope.GetMessageAsString())
return nil
}
import 'package:pip_services3_messaging/pip_services3_messaging.dart';
void main(List<String> argument) async {
// Message queue
var messageQueue = MemoryMessageQueue();
await messageQueue.open('123');
// Listener
// ignore: unawaited_futures
Future(() {
messageQueue.listen('123', MyMessageReceiver());
});
// Send message
await messageQueue.send('123', MessageEnvelope(null, 'mymessage', 'ABC'));
// Close message queue
await messageQueue.close('123');
}
// Message receiver
class MyMessageReceiver implements IMessageReceiver {
@override
Future receiveMessage(MessageEnvelope envelope, IMessageQueue queue) async {
print('Received message: ' + envelope.getMessageAsString());
}
}
# Pre-requisites
import time
from threading import Thread
from pip_services3_messaging.queues import IMessageReceiver, MemoryMessageQueue, MessageEnvelope
# Message receiver
class MyMessageReceiver(IMessageReceiver):
def receive_message(self, envelop, queue):
print("Received message: " + envelop.get_message_as_string())
# Message queue
messageQueue = MemoryMessageQueue()
messageQueue.open("123")
# Listener
Thread(target=messageQueue.listen, args=("123", MyMessageReceiver()), daemon=True).start()
# Send message
messageQueue.send("123", MessageEnvelope(None, "mymessage", "ABC"))
time.sleep(0.1) # wait message
# Close message queue
messageQueue.close('123')
Wrapping up
In this tutorial, we learned how to use the MemoryMessageQueue, which, as the name suggests, works by storing messages in memory.
In order to understand the dynamics of this component, we first explored the IMessageQueueInterface and IMessageReceiver interfaces. Next, we created a message receiver, a memory message queue, and a listener for the queue. Then, we sent a message and saw that it was received by our queue. Finally, we combined all of these concepts together to create a simple program that works with a queue.