Inherits: IMessageQueueConnection, IReferenceable, IConfigurable, IOpenable
Description
The KafkaConnection class allows you to create connections to Kafka using the default driver.
Important points
- By defining a connection and sharing it through multiple message queues you can reduce the number of used database connections.
Configuration parameters
- client_id: (optional) name of the client id
- connection(s):
- discovery_key: (optional) a key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: (optional) a key to retrieve the credentials from ICredentialStore
- username: username
- password: user’s password
- options:
- log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
- acks: (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
- connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
- max_retries: (optional) maximum retry attempts (default: 5)
- retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
- request_timeout: (optional) number of milliseconds to wait on broker request (default: 30000)
- flush_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
References
- *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
- *:discovery:*:*:1.0 - (optional) IDiscovery services
- *:credential-store:*:*:1.0 (optional) credential stores to resolve credentials
Constructors
Creates a new instance of the connection component.
public
constructor()
Fields
Instance methods
СheckOpen
Checks if the connection is open.
Raises an error is the connection is closed.
private
void CheckOpen()
CloseAsync
Closes a component and frees used resources.
public
Task CloseAsync(string correlationId)
- correlationId: string - (optional) transaction id used to trace execution through the call chain.
Commit
Commit a message offset.
public
void Commit(string topic, string groupId, int partition, long offset, IKafkaMessageListener listener)
- topic: string - topic name
- group_id: string - (optional) consumer group id
- partition: int - partition number
- offset: long - message offset
- listener: IKafkaMessageListener - message listener
Configure
Configures the component by passing its configuration parameters.
public
void Configure(ConfigParams config)
- config: ConfigParams - configuration parameters to be set.
ConnectToAdmin!
Note: this method is not implemented
Connects an admin client on demand.
CreateQueueAsync
Creates a message queue. If the connection doesn’t support this function, it exists without error.
public
Task CreateQueueAsync(string name)
- name: string - name of the queue to be created.
DeleteQueueAsync
Deletes a message queue. If the connection doesn’t support this function, it exists without error.
public
Task DeleteQueueAsync(string name)
- name: string - name of the queue to be deleted.
GetConnection
Gets the connection.
public
IProducer<byte[], byte[]> GetConnection()
- returns: IProducer<byte[], byte[]> - connection to Kafka.
IsOpen
Checks if the component is open.
public
bool IsOpen()
- returns: bool - true if the component is open and false otherwise.
OpenAsync
Opens the component.
public
Task OpenAsync(string correlationId)
- correlationId: string - (optional) transaction id used to trace execution through the call chain.
PublishAsync
Publish a message to a specified topic.
public
Task PublishAsync(string topic, Message<byte[], byte[]> message)
- topic: string - topic where the message will be placed.
- messages: Message<byte[], byte[]> - list of messages to be published.
ReadQueueNamesAsync
Reads a list of registered queue names. If the connection doesn’t support this function, it returns an empty list.
public
Task<List<string>> ReadQueueNamesAsync()
- returns: Task<List<string>> - queue names.
Seek
Seeks a message offset.
public
void Seek(string topic, string groupId, int partition, long offset, IKafkaMessageListener listener)
- topic: string - topic name
- groupId: string - (optional) consumer group id
- partition: int - partition number
- offset: long - message offset
- listener: IKafkaMessageListener - message listener
SetReferences
Sets references to dependent components.
public
void SetReferences(IReferences references)
- references: IReferences - references to locate the component’s dependencies.
SubscribeAsync
Subscribes to a topic.
public
Task SubscribeAsync(string topic, string groupId, ConsumerConfig config, IKafkaMessageListener listener)
- topic: string - subject(topic) name
- groupId: string - (optional) consumer group id
- config: ConsumerConfig - subscription options
- listener: IKafkaMessageListener - message listener
UnsubscribeAsync
Unsubscribes from a previously subscribed topic.
public
Task UnsubscribeAsync(string topic, string groupId, IKafkaMessageListener listener)
- topic: string - topic name
- groupId: string - (optional) consumer group id
- listener: IKafkaMessageListener - message listener