Implements: MessageQueue
Description
The MemoryMessageQueue class is used to create message queues that send and receive messages within the same process by using shared memory.
Important points
- This queue is typically used for testing to mock real queues.
Configuration parameters
- name: name of the message queue
References
- *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
- *:counters:*:*:1.0 - (optional) ICounters components to pass collected measurements
Constructors
Creates a new instance of the message queue.
See also MessagingCapabilities
MemoryMessageQueue(name: str = None)
- name: str - (optional) a queue name.
Fields
Instance methods
abandon
Returns a message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message that could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and sent to dead letter queue.
abandon(message: MessageEnvelope)
- message: MessageEnvelope - message to return.
clear
Clears the component’s state.
clear(correlation_id: Optional[str])
- correlation_id: Optional[str] - (optional) transaction id used to trace execution through the call chain.
close
Closes the component and frees used resources.
close(correlation_id: Optional[str])
- correlation_id: Optional[str] - (optional) transaction id used to trace execution through the call chain.
complete
Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.
complete(message: MessageEnvelope)
- message: MessageEnvelope - message to remove.
configure
Configures the component by passing its configuration parameters.
configure(config: ConfigParams)
- config: ConfigParams - configuration parameters to be set.
end_listen
Ends listening for incoming messages. When this method is called, listen unblocks the thread and execution continues.
end_listen(correlation_id: Optional[str])
- correlation_id: Optional[str] - (optional) transaction id to trace execution through a call chain.
is_open
Checks if the component is opened.
is_open(): bool
- returns: bool - True if the component is open and False otherwise.
listen
Listens for incoming messages and blocks the current thread until the queue is closed.
See also IMessageReceiver, receive
listen(correlation_id: Optional[str], receiver: IMessageReceiver)
- correlation_id: Optional[str] - (optional) transaction id used to trace execution through the call chain.
- receiver: IMessageReceiver - receiver used to receive incoming messages.
peek
Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue, it returns None.
peek(correlation_id: Optional[str]): MessageEnvelope
- correlation_id: Optional[str] - transaction id used to trace execution through the call chain.
- returns: MessageEnvelope - peeked message or None.
peek_batch
Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue, it returns an empty list.
peek_batch(correlation_id: Optional[str], message_count: int): List[MessageEnvelope]
- correlation_id: Optional[str] - (optional) transaction id used to trace execution through the call chain.
- message_count: int - maximum number of messages to peek.
- returns: List[MessageEnvelope] - list with peeked messages.
read_message_count
Reads the current number of messages in the queue to be delivered.
read_message_count(): int
- returns: int - number of messages in the queue.
receive
Receives an incoming message and removes it from the queue.
receive(correlation_id: Optional[str], wait_timeout: int): MessageEnvelope
- correlation_id: Optional[str] - (optional) transaction id used to trace execution through the call chain.
- wait_timeout: int - timeout in milliseconds to wait for a message to come.
- returns: MessageEnvelope - received message or None.
renew_lock
Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.
renew_lock(message: MessageEnvelope, lock_timeout: int)
- message: MessageEnvelope - message to extend its lock.
- lock_timeout: int - locking timeout in milliseconds.
send
Sends a message into the queue.
send(correlation_id: Optional[str], envelope: MessageEnvelope)
- correlation_id: Optional[str] - (optional) transaction id used to trace execution through the call chain.
- envelope: MessageEnvelope - message envelop to be sent.
_open_with_params
Opens the component with given connection and credential parameters.
_open_with_params(correlation_id: Optional[str], connections: List[ConnectionParams], credentials: CredentialParams)
- correlation_id: Optional[str] - (optional) transaction id used to trace execution through the call chain.
- connections: List[ConnectionParams] - connection parameters
- credential: CredentialParams - credential parameters
Examples
queue = MessageQueue("myqueue")
queue.send("123", MessageEnvelope(None, "mymessage", "ABC"))
message = queue.receive("123", 0)
if message != None:
# ...
queue.complete("123", message)