
Message queue that sends and receives messages via the Kafka message broker.

Inherits: CachedMessageQueue, IKafkaMessageListener


The KafkaMessageQueue class allows you to create message queues that send and receive messages via the Kafka message broker.

Configuration parameters

  • topic: name of Kafka topic to subscribe
  • group_id: (optional) consumer group id (default: default)
  • from_beginning: (optional) restarts receiving messages from the beginning (default: false)
  • read_partitions: (optional) number of partitions to be consumed concurrently (default: 1)
  • autocommit: (optional) turns on/off autocommit (default: true)
  • connection(s):
    • discovery_key: (optional) key to retrieve the connection from IDiscovery
    • host: host name or IP address
    • port: port number
    • uri: resource URI or connection string with all parameters in it
  • credential(s):
    • store_key: (optional) a key to retrieve the credentials from ICredentialStore
    • username: username
    • password: user’s password
  • options:
    • autosubscribe: (optional) true to automatically subscribe on option(default: false)
    • acks: (optional) controls the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
    • autocommit_timeout: (optional) number of milliseconds to perform autocommit offsets (default: 1000)
    • connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
    • max_retries: (optional) maximum number of retry attempts (default: 5)
    • retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
    • request_timeout: (optional) number of milliseconds to wait on broker request (default: 30000)
    • flush_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)


  • *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
  • *:counters:*:*:1.0 - (optional) ICounters components to pass collected measurements
  • *:discovery:*:*:1.0 - (optional) IDiscovery services
  • *:credential-store:*:*:1.0 - (optional) ICredentialStore to resolve credentials
  • *:connection:kafka:*:1.0 - (optional) shared connection to Kafka service


Creates a new instance of the message queue.

public KafkaMessageQueue(string name = null)

  • name: string - (optional) queue name.



Autocommit option

private _autoCommit: bool


Kafka connection component.

private _connection: KafkaConnection


Dependency resolver.

private _dependencyResolver: DependencyResolver


From beginning (Subscribe option)

private _fromBeginning: bool


Group id

private _groupId: string = ‘default’



private _readPartitions: int


Option to subscribe

private _subscribed: bool



private _topic: string

Instance methods


Returns a message into the queue and makes it available for all subscribers to receive it again. This method is usually used to return a message which could not be processed at the moment, to repeat the attempt. Messages that cause unrecoverable errors shall be removed permanently or/and sent to the dead letter queue.

public override async Task AbandonAsync(MessageEnvelope message)


Note: this method is not implemented

Clears a component’s state.


Closes a component and frees used resources.

public override Task CloseAsync(IContext context)

  • context: IContext - (optional) a context to trace execution through a call chain.


Permanently removes a message from the queue. This method is usually used to remove the message after successful processing.

public override Task CompleteAsync(MessageEnvelope message)


Configures a component by passing its configuration parameters.

public override void Configure(ConfigParams config)

  • config:: ConfigParams - configuration parameters to be set.


Note: this method is not implemented

Ends listening for incoming messages. When this method is called, Listen unblocks the thread and execution continues.


Returns the content of a message and information about it.

private Message<byte[], byte[]> FromMessage(MessageEnvelope message)

  • message: MessageEnvelope - message
  • returns: Message<byte[], byte[]> - content of the message and information about it.


Returns the topic.

private string GetTopic()

  • returns: string - topic


Checks if the component is open.

public override bool IsOpen()

  • returns: bool - true if the component is open and false otherwise.


Note: this method is not implemented

Listens for incoming messages and blocks the current thread until the queue is closed.

See IMessageReceiver


Permanently removes a message from the queue and sends it to the dead letter queue.

  • Important: This method is not supported by Kafka.

public override Task MoveToDeadLetterAsync(MessageEnvelope message)


Deserializes a message. Then, sends it to a receiver if its set or puts it into the queue.

public void OnMessage(KafkaMessage msg)


Opens the component.

public override Task OpenAsync(IContext context)

  • context: IContext - (optional) a context to trace execution through a call chain.


Note: this method is not implemented

Peeks a single incoming message from the queue without removing it. If there are no messages available in the queue, it returns null.


Note: this method is not implemented

Peeks multiple incoming messages from the queue without removing them. If there are no messages available in the queue, it returns an empty list.


Note: this method is not implemented

Reads the current number of messages in the queue to be delivered.


TODO: this method is not implemented

Receives an incoming message and removes it from the queue.


Renews a lock on a message that makes it invisible from other receivers in the queue. This method is usually used to extend the message processing time.

  • Important: This method is not supported by Kafka.

public override Task RenewLockAsync(MessageEnvelope message, long lockTimeout)

  • message: MessageEnvelope - message to extend its lock.
  • lockTimeout: long - locking timeout in milliseconds.


Sends a message into the queue.

public override async Task SendAsync(IContext context, MessageEnvelope message)

  • context: IContext - (optional) a context to trace execution through a call chain.
  • message: MessageEnvelope - message envelop to be sent.


Sets references to dependent components.

public override void SetReferences(IReferences references)

  • references: IReferences - references to locate the component’s dependencies.


Subscribes to a topic.

protected override Task SubscribeAsync(IContext context)

  • context: IContext - (optional) a context to trace execution through a call chain.


Creates a new MessageEnvelope.

private MessageEnvelope ToMessage(KafkaMessage msg)


Note: this method is not implemented

Unsets (clears) previously set references to dependent components.


var queue = new KafkaMessageQueue("myqueue");
  "subject", "mytopic",
  "connection.protocol", "tcp"
  "connection.host", "localhost"
  "connection.port", 9092

await queue.OpenAsync("123");
await queue.SendAsync("123", new MessageEnvelope("", "mymessage", "ABC"));
var message = await queue.ReceiveAsync("123");

if (message != null) {
    await queue.CompleteAsync("123", message);

See also