RabbitMQMessageQueue

Message queue that sends and receives messages via RabbitMQ message broker.

Inherits: MessageQueue

Description

The RabbitMQMessageQueue class allows you to create message queues that send and receive messages via an RabbitMQ message broker.

Configuration parameters

  • topic: name of RabbitMQ topic to subscribe
  • connection(s):
    • discovery_key: (optional) a key to retrieve the connection from IDiscovery
    • host: host name or IP address
    • port: port number
    • uri: resource URI or connection string with all parameters in it
  • credential(s):
    • store_key: (optional) a key to retrieve the credentials from ICredentialStore
    • username: user name
    • password: user password

References

  • *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
  • *:counters:*:*:1.0 - (optional) ICounters components to pass collected measurements
  • *:discovery:*:*:1.0 - (optional) IDiscovery services
  • *:credential-store:*:*:1.0 - (optional) ICredentialStore to resolve credentials

Constructors

Creates a new instance of the message queue.

public constructor(name?: string, config?: ConfigParams)

  • name: string - (optional) queue’s name.
  • config: ConfigParams - (optional) configuration parameters.

Instance methods

abandon

Returnes message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and send to dead letter queue.

public abandon(message: MessageEnvelope): Promise<void>

clear

Clears a component’s state.

public clear(correlationId: string): Promise<void>

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.

close

Closes a component and frees used resources.

public close(correlationId: string): Promise<void>

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.

complete

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

  • Important: This method is not supported by RabbitMQ.

public complete(message: MessageEnvelope): Promise<void>

configure

Configures the component by passing its configuration parameters.

public configure(config: ConfigParams): void

  • config: ConfigParams - configuration parameters to be set.

endListen

Ends listening for incoming messages. When this method is call, listen unblocks the thread and execution continues.

public endListen(correlationId: string): void

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.

isOpen

Checks if the component is open.

public isOpen(): boolean

  • returns: boolean - true if the component is open and false otherwise.

listen

Listens for incoming messages and blocks the current thread until the queue is closed.

See IMessageReceiver

public listen(correlationId: string, receiver: IMessageReceiver): void

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.
  • receiver: IMessageReceiver - receiver used to receive incoming messages.

moveToDeadLetter

Permanently removes a message from the queue and sends it to dead letter queue.

  • Important: This method is not supported by RabbitMQ.

public moveToDeadLetter(message: MessageEnvelope): Promise<void>

peek

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue, it returns null.

  • Important: This method are not supported in this release!

public peek(correlationId: string): Promise<MessageEnvelope>

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.
  • returns: Promise<MessageEnvelope> - peeked message.

peekBatch

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue, it returns an empty list.

public peekBatch(correlationId: string, messageCount: number): Promise<MessageEnvelope[]>

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.
  • messageCount: number - maximum number of messages to peek.
  • returns: Promise<MessageEnvelope[]> - list with peeked messages.

readMessageCount

Reads the current number of messages in the queue to be delivered.

public readMessageCount(): Promise<number>

  • *returns: Promise<number> - number of messages in the queue.

receive

Receives an incoming message and removes it from the queue.

public receive(correlationId: string, waitTimeout: number): Promise<MessageEnvelope>

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.
  • waitTimeout: number - timeout in milliseconds to wait for a message to come.
  • returns: Promise<MessageEnvelope> - received message or null if nothing was received.

renewLock

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

  • Important: This method is not supported by RabbitMQ.

public renewLock(message: MessageEnvelope, lockTimeout: number): Promise<void> {

  • message: MessageEnvelope - message to extend its lock.
  • lockTimeout: number - locking timeout in milliseconds.

send

Sends a message into the queue.

public send(correlationId: string, message: MessageEnvelope): Promise<void>

  • correlationId: string - (optional) transaction id used to trace execution through the call chain.
  • message: MessageEnvelope - message envelop to be sent.

Examples

queue := new RabbitMQMessageQueue("myqueue")
queue.configure(ConfigParams.fromTuples(
	"exchange", "my_exchange",
	"queue", "my_exchange",
	"options.auto_create", true,
	"connection.host", "5672",
	"connection.port", "localhost",
	"credential.username", "user",
	"credential.password", "password",
));
await queue.open("123");
await queue.send("123", new MessageEnvelope("", "mymessage", "ABC"));
message := await queue.receive("123", 10000);
if (message != null) {
	// ...
	await queue.complete(message);
}

See also