KafkaMessageQueue

Message queue that sends and receives messages via the Kafka message broker.

Implements: MessageQueue, IReferenceable, IUnreferenceable, IConfigurable, IOpenable, ICleanable

Description

The KafkaMessageQueue class allows you to create message queues that send and receive messages via the Kafka message broker.

Configuration parameters

  • topic: name of Kafka topic to subscribe
  • group_id: (optional) consumer group id (default: default)
  • from_beginning: (optional) restarts receiving messages from the beginning (default: false)
  • read_partitions: (optional) number of partitions to be consumed concurrently (default: 1)
  • autocommit: (optional) turns on/off autocommit (default: true)
  • connection(s):
    • discovery_key: (optional) key to retrieve the connection from IDiscovery
    • host: host name or IP address
    • port: port number
    • uri: resource URI or connection string with all parameters in it
  • credential(s):
    • store_key: (optional) key to retrieve the credentials from ICredentialStore
    • username: username
    • password: user’s password
  • options:
    • autosubscribe: (optional) true to automatically subscribe on option (default: false)
    • acks: (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
    • log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
    • connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
    • max_retries: (optional) maximum retry attempts (default: 5)
    • retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
    • request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)

References

  • *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
  • *:counters:*:*:1.0 - (optional) ICounters components to pass collected measurements
  • *:discovery:*:*:1.0 - (optional) IDiscovery services
  • *:credential-store:*:*:1.0 - (optional) ICredentialStore to resolve credentials
  • *:connection:kafka:*:1.0 - (optional) shared connection to Kafka controller

Constructors

Creates a new instance of the message queue.

KafkaMessageQueue(name: str = None)

  • name: str - (optional) queue name.

Fields

_acks

Acknowledgements

_acks: int = -1

_auto_commit

Autocommit option

_auto_commit: bool = true

_auto_subscribe

Autosubscribe option

_auto_subscribe: bool

_connection

Kafka connection component.

_connection: KafkaConnection

_dependency_resolver

Dependency resolver.

_dependency_resolver: DependencyResolver

_group_id

Group id

_group_id: str

_logger

Logger

_logger: CompositeLogger

_messages

Messages

_messages: MessageEnvelope[]

_read_rartitions

Partition

_read_rartitions: int = 1

_receiver

Message receiver

_receiver: IMessageReceiver

_subscribed

Option to subscribe

_subscribed: bool

_topic

Topic

_topic: str

Instance methods

abandon

Returns a message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and sent to dead letter queue.

abandon(message: MessageEnvelope)

clear

Clears a component’s state.

clear(context: Optional[IContext])

  • context: IContext - (optional) a context to trace execution through a call chain.

close

Closes a component and frees used resources.

close(context: Optional[IContext])

  • context: IContext - (optional) a context to trace execution through a call chain.

complete

Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

complete(message: MessageEnvelope)

configure

Configures a component by passing its configuration parameters.

configure(config: ConfigParams)

  • config:: ConfigParams - configuration parameters to be set.

endListen

Ends listening for incoming messages. When this method is called, listen unblocks the thread and execution continues.

endListen(context: Optional[IContext])

  • context: IContext - (optional) a context to trace execution through a call chain.

_from_message

Returns the content of a message and information about it.

_from_message(message: MessageEnvelope): Optional[dict]

  • message: MessageEnvelope - message
  • returns: Optional[dict] - content of the message and information about it.

_get_topic

Returns the topic.

_get_topic(): str

  • returns: str - topic

is_open

Checks if the component is open.

is_open(): bool

  • returns: bool - true if the component is open and false otherwise.

listen

Listens for incoming messages and blocks the current thread until the queue is closed.

See IMessageReceiver

listen(context: Optional[IContext], receiver: IMessageReceiver)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • receiver: IMessageReceiver - receiver used to receive incoming messages.

move_to_dead_letter

Permanently removes a message from the queue and sends it to dead letter queue.

  • Important: This method is not supported by Kafka.

move_to_dead_letter(message: MessageEnvelope)

on_message

Deserializes a message. Then, sends it to a receiver if its set or puts it into the queue.

on_message(topic: str, partition: int, msg: Message)

  • topic: str - topic
  • partition: int - partition number
  • msg: Message - message

open

Opens the component.

open(context: Optional[IContext])

  • context: IContext - (optional) a context to trace execution through a call chain.

peek

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue, it returns null.

peek(context: Optional[IContext]): MessageEnvelope

  • context: Optional[IContext] - (optional) transaction id used to trace execution through the call chain.
  • returns: MessageEnvelope - peeked message.

peek_batch

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue, it returns an empty list.

peek_batch(context: Optional[IContext], messageCount: number): List[MessageEnvelope]

  • context: IContext - (optional) a context to trace execution through a call chain.
  • messageCount: number - maximum number of messages to peek.
  • returns: List[MessageEnvelope] - list with peeked messages.

read_message_count

Reads the current number of messages in the queue to be delivered.

read_message_count(): int

  • *returns: int - number of messages in the queue.

receive

Receives an incoming message and removes it from the queue.

receive(context: Optional[IContext], waitTimeout: number): MessageEnvelope

  • context: IContext - (optional) a context to trace execution through a call chainn.
  • waitTimeout: number - timeout in milliseconds to wait for a message to come.
  • returns: MessageEnvelope - received message or null if nothing was received.

renew_lock

Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

  • Important: This method is not supported by Kafka.

renew_lock(message: MessageEnvelope, lock_timeout: int)

  • message: MessageEnvelope - message to extend its lock.
  • lock_timeout: int - locking timeout in milliseconds.

send

Sends a message into the queue.

send(context: Optional[IContext], message: MessageEnvelope)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • message: MessageEnvelope - message envelop to be sent.

set_references

Sets references to dependent components.

set_references(references: IReferences)

  • references: IReferences - references to locate the component dependencies.

_subscribe

Subscribes to a topic.

_subscribe(context: Optional[IContext])

  • context: IContext - (optional) a context to trace execution through a call chain.

_to_message

Creates a new MessageEnvelope.

_to_message(msg: Message): Optional[MessageEnvelope]

unset_references

Unsets (clears) previously set references to dependent components.

unset_references()

Examples

queue = KafkaMessageQueue("myqueue")
queue.configure(ConfigParams.from_tuples(
    "topic", "mytopic",
    'connection.protocol', 'tcp',
    "connection.host", "localhost",
    "connection.port", 9092,
))
        
queue.open("123")
queue.send("123", MessageEnvelope(None, "mymessage", "ABC"))
message = queue.receive("123", 10000)
        
print(message)
        
if message is not None:
    ...
    queue.complete(message)

See also