Inherits: IMessageQueueConnection, IReferenceable, IConfigurable, IOpenable
Description
The KafkaConnection class allows you to create connections to Kafka using the default driver.
Important points
- By defining a connection and sharing it through multiple message queues you can reduce the number of used database connections.
Configuration parameters
- client_id: (optional) name of the client id
- connection(s):
- discovery_key: (optional) a key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: (optional) a key to retrieve the credentials from ICredentialStore
- username: username
- password: user’s password
- options:
- log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
- acks: (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
- connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
- max_retries: (optional) maximum retry attempts (default: 5)
- retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
- request_timeout: (optional) number of milliseconds to wait on broker request (default: 30000)
- flush_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
References
- *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
- *:discovery:*:*:1.0 - (optional) IDiscovery services
- *:credential-store:*:*:1.0 (optional) credential stores to resolve credentials
Constructors
Creates a new instance of the connection component.
publicconstructor()
Fields
Instance methods
СheckOpen
Checks if the connection is open.
Raises an error is the connection is closed.
privatevoid CheckOpen()
CloseAsync
Closes a component and frees used resources.
publicTask CloseAsync(string correlationId)
- correlationId: string - (optional) transaction id used to trace execution through the call chain.
Commit
Commit a message offset.
publicvoid Commit(string topic, string groupId, int partition, long offset, IKafkaMessageListener listener)
- topic: string - topic name
- group_id: string - (optional) consumer group id
- partition: int - partition number
- offset: long - message offset
- listener: IKafkaMessageListener - message listener
Configure
Configures the component by passing its configuration parameters.
publicvoid Configure(ConfigParams config)
- config: ConfigParams - configuration parameters to be set.
ConnectToAdmin!
Note: this method is not implemented
Connects an admin client on demand.
CreateQueueAsync
Creates a message queue. If the connection doesn’t support this function, it exists without error.
publicTask CreateQueueAsync(string name)
- name: string - name of the queue to be created.
DeleteQueueAsync
Deletes a message queue. If the connection doesn’t support this function, it exists without error.
publicTask DeleteQueueAsync(string name)
- name: string - name of the queue to be deleted.
GetConnection
Gets the connection.
publicIProducer<byte[], byte[]> GetConnection()
- returns: IProducer<byte[], byte[]> - connection to Kafka.
IsOpen
Checks if the component is open.
publicbool IsOpen()
- returns: bool - true if the component is open and false otherwise.
OpenAsync
Opens the component.
publicTask OpenAsync(string correlationId)
- correlationId: string - (optional) transaction id used to trace execution through the call chain.
PublishAsync
Publish a message to a specified topic.
publicTask PublishAsync(string topic, Message<byte[], byte[]> message)
- topic: string - topic where the message will be placed.
- messages: Message<byte[], byte[]> - list of messages to be published.
ReadQueueNamesAsync
Reads a list of registered queue names. If the connection doesn’t support this function, it returns an empty list.
publicTask<List<string>> ReadQueueNamesAsync()
- returns: Task<List<string>> - queue names.
Seek
Seeks a message offset.
publicvoid Seek(string topic, string groupId, int partition, long offset, IKafkaMessageListener listener)
- topic: string - topic name
- groupId: string - (optional) consumer group id
- partition: int - partition number
- offset: long - message offset
- listener: IKafkaMessageListener - message listener
SetReferences
Sets references to dependent components.
publicvoid SetReferences(IReferences references)
- references: IReferences - references to locate the component’s dependencies.
SubscribeAsync
Subscribes to a topic.
publicTask SubscribeAsync(string topic, string groupId, ConsumerConfig config, IKafkaMessageListener listener)
- topic: string - subject(topic) name
- groupId: string - (optional) consumer group id
- config: ConsumerConfig - subscription options
- listener: IKafkaMessageListener - message listener
UnsubscribeAsync
Unsubscribes from a previously subscribed topic.
publicTask UnsubscribeAsync(string topic, string groupId, IKafkaMessageListener listener)
- topic: string - topic name
- groupId: string - (optional) consumer group id
- listener: IKafkaMessageListener - message listener