MessageQueue

Abstract message queue that is used as a basis for specific message queue implementations.

Implements: IMessageQueue, IConfigurable, IReferenceable

Description

The MessageQueue class allows you to create a message queue that is used as a basis for specific message queue implementions.

Configuration parameters

  • name: name of the message queue

connection(s):

  • discovery_key: key to retrieve parameters from discovery service
  • protocol: connection protocol like http, https, tcp, udp
  • host: host name or IP address
  • port: port number
  • uri: resource URI or connection string with all parameters in it

credential(s):

  • store_key: key to retrieve parameters from credential store
  • username: username
  • password: user’s password
  • access_id: application access id
  • access_key: application secret key

References

  • *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
  • *:counters:*:*:1.0 - (optional) ICounters components to pass collected measurements
  • *:discovery:*:*:1.0 - (optional) IDiscovery components to discover connection(s)
  • *:credential-store:*:*:1.0 - (optional) ICredentialStore components to lookup credential(s)

Constructors

Creates a new instance of the message queue.

MessageQueue([String? name, MessagingCapabilities? capabilities])

  • name: String - (optional) queue name
  • capabilities: MessagingCapabilities? - (optional) a capabilities of this message queue

Fields

logger

Component used to pass log messages.

logger: CompositeLogger

counters

Component to pass collected measurements.

counters: CompositeCounters

connectionResolver

Component used to resolve connections.

connectionResolver: ConnectionResolver

credentialResolver

Component used to resolve credentials.

credentialResolver: CredentialResolver

name

Name of the message queue.

name: String

capabilities

Component used to store the message queue.

capabilities: MessagingCapabilities

Instance methods

abandon

Returns a message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and sent to dead letter queue.

@override

Future abandon(MessageEnvelope message)

clear

Clears a component’s state.

Future clear(IContext? context)

  • context: IContext - (optional) a context to trace execution through a call chain.

close

Closes a component and frees the used resources.

@override

Future close(IContext? context)

  • context: IContext - (optional) a context to trace execution through a call chain.

complete

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

@override

Future complete(MessageEnvelope message)

endListen

Ends listening for incoming messages. When this method is called, listen unblocks the thread and execution continues.

@override

void endListen(IContext? context)

  • context: IContext - (optional) a context to trace execution through a call chain.

isOpen

Checks if the component is opened.

@override

bool isOpen()

  • returns: bool - true if the component has been opened and false otherwise.

listen

Listens for incoming messages and blocks the current thread until the queue is closed.
See also IMessageReceiver, receive

@override

void listen(IContext? context, IMessageReceiver receiver)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • receiver: IMessageReceiver - receiver used to receive incoming messages.

moveToDeadLetter

Permanently removes a message from the queue and sends it to dead letter queue.

@override

Future moveToDeadLetter(MessageEnvelope message)

peek

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue, it returns null.

@override

Future<MessageEnvelope?> peek(IContext? context)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • returns: Future<MessageEnvelope?> - peeked message or null.

peekBatch

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue, it returns an empty list.

@override

Future<List<MessageEnvelope?>> peekBatch(IContext? context, int messageCount)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • messageCount: int - maximum number of messages to peek.
  • returns: Future<List<MessageEnvelope?>> - list of peeked messages

readMessageCount

Reads the current number of messages in the queue to be delivered.

@override

Future<int> readMessageCount()

  • returns: Future<int> - number of messages in the queue.

receive

Receives an incoming message and removes it from the queue.

@override

Future<MessageEnvelope?> receive(IContext? context, int waitTimeout)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • waitTimeout: int - timeout in milliseconds to wait for a message to come.
  • returns: Future<MessageEnvelope?> - received message or null.

renewLock

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

@override

Future renewLock(MessageEnvelope message, int lockTimeout)

  • message: MessageEnvelope - message to extend its lock.
  • lockTimeout: int - locking timeout in milliseconds.

beginListen

Listens for incoming messages without blocking the current thread.
See also listen, IMessageReceiver

@override

void beginListen(IContext? context, IMessageReceiver receiver)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • receiver: IMessageReceiver - receiver used to receive incoming messages.

configure

Configures the component by passing its configuration parameters.

@override

void configure(ConfigParams config)

  • config: ConfigParams - configuration parameters to be set.

getCapabilities

Gets the queue capabilities

@override

MessagingCapabilities getCapabilities()

getName

Gets the queue’s name.

@override

String getName()

  • returns: String - queue’s name.

open

Opens the component.

@override

Future open(IContext? context)

  • context: IContext - (optional) a context to trace execution through a call chain.

sendAsObject

Sends an object into the queue. Before sending it, the object is converted into a JSON string and wrapped in a MessageEnvelope.

@override

Future sendAsObject(IContext? context, String messageType, message)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • messageType: String - message type
  • message: dynamic - object value to be sent

setReferences

Sets references to dependent components.

@override

void setReferences(IReferences references)

  • references: IReferences - references to locate the component’s dependencies.

toString

Gets a string representation of the object.

@override

String toString()

  • returns: String - string representation of the object.

openWithParams

Opens the component with the given connection and credential parameters.

Future openWithParams(IContext? context, ConnectionParams connection, CredentialParams? credential)