Description
KafkaConnection is used to create Kafka connections using the default driver.
Important points By defining a connection and sharing it through multiple message queues you can reduce the number of used database connections.
Configuration parameters
- client_id: (optional) name of the client id
- connection(s):
- discovery_key: (optional) key to retrieve the connection from IDiscovery
- host: host name or IP address
- port: port number (default: 27017)
- uri: resource URI or connection string with all parameters in it
- credential(s):
- store_key: (optional) key to retrieve the credentials from ICredentialStore
- username: username
- password: user’s password
- options:
- acks: (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
- num_partitions: (optional) number of partitions of the created topic (default: 1)
- replication_factor: (optional) kafka replication factor of the topic (default: 1)
- log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
- connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
- max_retries: (optional) maximum retry attempts (default: 5)
- retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
- request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)
References
- *:logger:*:*:1.0 - (optional) ILogger components to pass log messages
- *:discovery:*:*:1.0 - (optional) IDiscovery services
- *:credential-store:*:*:1.0 (optional) credential stores to resolve credentials
Constructors
NewKafkaConnection
Creates a new instance of the connection component.
NewKafkaConnection() *KafkaConnection
Fields
Methods
CheckOpen
Checks if the connection is open.
Raises an error if the connection is closed.
(c *KafkaConnection) checkOpen(correlationId string) error
- returns: error - error or nil if no errors occurred.
Close
Closes a component and frees used resources.
(c *KafkaConnection) Close(ctx context.Context, correlationId string) error
- ctx: context.Context - operation context
- correlationId: string - (optional) transaction id used to trace execution through the call chain.
- returns: error - error or nil if no errors occurred.
Commit!
Note: this method is not implemented
Commit a message offset.
Configure
Configures the component by passing its configuration parameters.
(c *KafkaConnection) Configure(ctx context.Context, config *ConfigParams)
- ctx: context.Context - operation context
- config: *ConfigParams - configuration parameters to be set.
connectToAdmin
Connects an admin client on demand.
(c *KafkaConnection) connectToAdmin(correlationId string) error
- returns: error - error or nil if no errors occurred.
CreateQueue
Creates a message queue. If the connection doesn’t support this function, it exists without error.
(c *KafkaConnection) CreateQueue(name string) error
- name: string - name of the queue to be created.
- returns: error - error or nil if no errors occurred.
DeleteQueue
Deletes a message queue. If the connection doesn’t support this function, it exists without error.
(c *KafkaConnection) DeleteQueue(name string) error
- name: string - name of the queue to be deleted.
- returns: error - error or nil if no errors occurred.
GetConnection
Gets the connection.
(c *KafkaConnection) GetConnection() kafka.SyncProducer
- returns: kafka.SyncProducer - connection to Kafka
IsOpen
Checks if the component is open.
(c *KafkaConnection) IsOpen() bool
- returns: bool - true if the component is open and false otherwise.
Open
Opens the component.
(c *KafkaConnection) Open(ctx context.Context, correlationId string) error
- ctx: context.Context - operation context
- correlationId: string - (optional) transaction id used to trace execution through the call chain.
- returns: error - error or nil if no errors occurred.
Publish
Publish a message to a specified topic.
(c *KafkaConnection) Publish(ctx context.Context, topic string, messages []*kafka.ProducerMessage, config *kafka.Config) error
- ctx: context.Context - operation context.
- topic: string - topic where the message will be placed.
- messages: []*kafka.ProducerMessage - list of messages to be published.
- config: *kafka.Config - publishing options.
- returns: error - error or nil if no errors occurred.
ReadQueueNames
Reads a list of registered queue names. If the connection doesn’t support this function, it returns an empty list.
(c *KafkaConnection) ReadQueueNames() ([]string, error)
- returns: ([]string, error) - queue names.
Seek!
Note: this method is not implemented
Seeks a message offset.
SetReferences
Sets references to dependent components.
(c *KafkaConnection) SetReferences(ctx context.Context, references IReferences)
- ctx: context.Context - operation context.
- references: IReferences - references to locate the component dependencies.
Subscribe
Subscribes to a topic.
(c *KafkaConnection) Subscribe(ctx context.Context, topic string, groupId string, config *kafka.Config, listener IKafkaMessageListener) error
- ctx: context.Context - operation context.
- topic: string - subject(topic) name
- groupId: string - (optional) consumer group id
- config: *kafka.Config - subscription options
- listener: IKafkaMessageListener - message listener
- returns: error - error or nil if no errors occurred.
Unsubscribe
Unsubscribes from a previously subscribed topic
(c *KafkaConnection) Unsubscribe(ctx context.Context, topic string, groupId string, listener IKafkaMessageListener) error
- ctx: context.Context - operation context.
- topic: string - topic name
- groupId: string - (optional) consumer group id
- listener: IKafkaMessageListener - message listener
- returns: error - error or nil if no errors occurred.