Kafka
Key takeaways
KafkaConnection | Component used to create and manage a Kafka connection that can be shared by multiple queues. |
KafkaMessageQueue | Component used to create a queue that can send and receive messages to and from Kafka. |
Introduction
This tutorial will help you understand how to use two components designed to communicate with Apache Kafka. They are KafkaConnection and KafkaMessageQueue. The first can be used to connect to Apache Kafka and create a publish/subscribe communication mechanism. The second allows for the creation of a queue that can send and receive messages to and from Kafka. The main methods available in both classes are described and explained with examples.
Communicating with Kafka
Pip.Services offers several components to connect, send and receive messages from Kafka. In this section, we will see the KafkaConnection and KafkaMessageQueue classes and their most important methods.
KafkaConnection
The KafkaConnection class allows us to define a connection to Kafka. The main advantage of using this class is that it allows us to define a connection that can be shared by multiple message queues, thus reducing the number of used connections. The following is an explanation of the main operations offered by this class.
Connect to Kafka
To connect to Kafka, we first need to configure our connection. For this, we can use the ConfigParams class, which allows us to specify our connection parameters, such as host and port. Once the parameters are defined, we can use the configure() method to define a connection and the open() method to start it.
import { ConfigParams } from "pip-services4-components-node";
import { KafkaConnection } from "pip-services4-kafka-node";
let kc = new KafkaConnection();
let config = ConfigParams.fromTuples(
'connection.host', 'localhost',
'connection.port', 9092
);
kc.configure(config);
await kc.open(null);
import (
"context"
conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
kafkaconn "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/connect"
)
func main() {
kc := kafkaconn.NewKafkaConnection()
config := conf.NewConfigParamsFromTuples(
"connection.host", "localhost",
"connection.port", 9092,
)
kc.Configure(context.Background(), config)
kc.Open(context.Background())
}
from pip_services4_kafka.connect import KafkaConnection
from pip_services4_components.config import ConfigParams
kc = KafkaConnection()
config = ConfigParams.from_tuples(
'connection.host', 'localhost',
'connection.port', 9092
)
kc.configure(config)
kc.open(None)
Check if a connection is open
To check if a connection is open, we can use the isOpen() method, which returns true if the component is open and false otherwise. The example below illustrates its usage.
kc.isOpen();
ok := kc.IsOpen()
kc.is_open()
This class also provides the checkOpen() method, which returns None if the connection is open and raises an error if it is closed. The example below shows how to use it.
kc.checkOpen();
Note: kc.checkOpen() is not supported in the current version
kc._check_open()
Note: Both methods are protected. Therefore, they must be defined in a subclass of this component.
Create a queue
To create a queue, this class offers the createQueue() method. This method accepts the queue’s name as input parameter. The following example shows how to use it.
await kc.createQueue("my_queueA");
err := kc.CreateQueue("my_queueA")
if err != nil {
panic(err)
}
err := kc.CreateQueue("my_queueA")
if err != nil {
panic(err)
}
Delete a queue
To delete a queue, this class offers the deleteQueue() method, which requires the queue’s name as input parameter. The example below shows how to use it.
await kc.deleteQueue('my_queueC');
err = kc.DeleteQueue("my_queueC")
if err != nil {
panic(err)
}
kc.delete_queue('my_queueC')
List all available queues
To obtain a list of all available queues, this class has the readQueueNames() method. Once run, it returns a list with the queue names. If the connection doesn’t support this function, it returns an empty list. The following example shows how to use it.
await kc.readQueueNames();
names, err := kc.ReadQueueNames()
kc.read_queue_names()
Set the offset
The seek method sets the offset to a specified value. It requires five input parameters namely, the topic name, group id, partition, specified offset, and listener. The example below shows how to use it.
let offset = 60;
await kc.seek("my_topicA", null, 0, offset, myListener);
Note: Seek is not supported in the curent version.
offset = 60
kc.seek("my_topicA", None, 0, offset, my_listener)
Publish
To publish messages on a queue, this class has the publish() method, which accepts the name of the queue and a list of messages as inputs. The following example explains its usage.
let listOfMessages = ["message 1", "message 2"];
await kc.publish("my_topicA", listOfMessages, {});
var listOfMessages []*kafka.ProducerMessage
for i, val := range []string{"message 1", "message 2"} {
msg := &kafka.ProducerMessage{}
msg.Key = kafka.StringEncoder(i)
msg.Value = kafka.ByteEncoder(val)
msg.Timestamp = time.Now()
listOfMessages = append(listOfMessages, msg)
}
kc.Publish(context.Background(), "my_topicA", listOfMessages)
list_of_messages = [{'value': "message 1"}, {'value': "message 2"}]
kc.publish("my_topicA", list_of_messages, {})
Subscribe
To subscribe to a topic, this class offers the subscribe() method, which accepts three parameters namely, the name of a topic, a group id, a dictionary containing options such as timeout and compression, and a listener.
The listener must implement the IkafkaMessageListener interface, which contains the on_message() method. The onMessage() method requires three parameters: topic, partition and message object. The example below shows how to use it.
import { IKafkaMessageListener } from "pip-services4-kafka-node";
class MyListener implements IKafkaMessageListener {
public async onMessage(topic: string, partition: number, message: any): Promise<void> {
console.log(`${topic}, ${message}`);
}
}
let myListener = new MyListener();
await kc.subscribe("my_topicA", null, {}, myListener);
import (
"fmt"
"sync"
kafka "github.com/Shopify/sarama"
)
type MyListener struct {
ready chan bool
Lock sync.Mutex
}
func NewMyListener() *MyListener {
c := &MyListener{}
c.ready = make(chan bool)
return c
}
// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *MyListener) Setup(kafka.ConsumerGroupSession) error {
// Mark the consumer as ready
c.ready <- true
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (c *MyListener) Cleanup(kafka.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *MyListener) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error {
for {
// if len(c.readablePartitions) == 0 || slices.Contains(c.readablePartitions, claim.Partition()) {
select {
case msg := <-claim.Messages():
if msg != nil {
fmt.Printf("%s, %s", msg.Topic, msg.Value)
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
// }
}
}
func (c *MyListener) SetReady(chFlag chan bool) {
c.Lock.Lock()
defer c.Lock.Unlock()
c.ready = chFlag
}
// Returns: channel with bool flag ready
func (c *MyListener) Ready() chan bool {
return c.ready
}
myListener := NewMyListener()
kc.Subscribe(context.Background(), "my_topicA", "", kafka.NewConfig(), myListener)
from pip_services4_kafka.connect import IKafkaMessageListener
class MyListener(IKafkaMessageListener):
def on_message(topic, partition, message):
print(message.value().decode('utf-8'), message.offset())
my_listener = MyListener()
kc.subscribe("my_topicA", None, {}, my_listener)
Unsubscribe
To unsubscribe, this class offers the unsubscribe() method. This method has the topic name, group id and listener as input parameters. The following example shows how to use it.
await kc.unsubscribe("my_topicA", null, myListener);
err = kc.Unsubscribe(context.Background(), "my_topicA", "test", myListener)
kc.unsubscribe("my_topicA", None, my_listener)
Close the connection
To close an open connection, we can use the close() method, which is used as follows:
await kc.close(null);
_ = kc.Close(context.Background())
kc.close(None)
Example
The following example shows how to use this class to create a connection, a set of queues, to publish them on Kafka, and to subscribe to a topic containing the published messages.
import { ConfigParams } from "pip-services4-components-node";
import { IKafkaMessageListener, KafkaConnection } from "pip-services4-kafka-node";
export async function main() {
let kc = new KafkaConnection();
let config = ConfigParams.fromTuples(
'connection.host', 'localhost',
'connection.port', 9092
)
kc.configure(config);
await kc.open(null);
await kc.createQueue("my_queueA");
await kc.createQueue("my_queueB");
await kc.createQueue("my_queueC");
let listOfMessages1 = [{ value: "message 1" }, { value: "message 2" }];
let listOfMessages2 = [{ value: "message 3" }, { value: "message 4" }];
let listOfMessages3 = [{ value: "message 5" }, { value: "message 6" }];
await kc.publish("my_topicA", listOfMessages1, null);
await kc.publish("my_topicB", listOfMessages2, null);
await kc.publish("my_topicC", listOfMessages3, null);
let myListener = new MyListener();
await kc.subscribe("my_topicA", null, null, myListener);
await kc.subscribe("my_topicB", null, null, myListener);
await kc.subscribe("my_topicC", null, null, myListener);
await kc.close(null);
console.log('Execution completed!');
}
class MyListener implements IKafkaMessageListener {
public async onMessage(topic: string, partition: number, message: any): Promise<void> {
console.log(`${topic}, ${message}`);
}
}
import (
"context"
"fmt"
"sync"
"time"
kafka "github.com/Shopify/sarama"
conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
kafkaconn "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/connect"
)
func main() {
ctx := context.Background()
kc := kafkaconn.NewKafkaConnection()
config := conf.NewConfigParamsFromTuples(
"connection.host", "localhost",
"connection.port", 9092,
)
kc.Configure(context.Background(), config)
kc.Open(context.Background())
err := kc.CreateQueue("my_queueA")
if err != nil {
panic(err)
}
err = kc.DeleteQueue("my_queueA")
if err != nil {
panic(err)
}
_, _ = kc.ReadQueueNames()
var listOfMessages1 []*kafka.ProducerMessage
var listOfMessages2 []*kafka.ProducerMessage
var listOfMessages3 []*kafka.ProducerMessage
for i, val := range []string{"message 1", "message 2"} {
msg := &kafka.ProducerMessage{}
msg.Key = kafka.StringEncoder(rune(i))
msg.Value = kafka.ByteEncoder(val)
msg.Timestamp = time.Now()
listOfMessages1 = append(listOfMessages1, msg)
}
for i, val := range []string{"message 3", "message 4"} {
msg := &kafka.ProducerMessage{}
msg.Key = kafka.StringEncoder(rune(i))
msg.Value = kafka.ByteEncoder(val)
msg.Timestamp = time.Now()
listOfMessages2 = append(listOfMessages2, msg)
}
for i, val := range []string{"message 5", "message 6"} {
msg := &kafka.ProducerMessage{}
msg.Key = kafka.StringEncoder(rune(i))
msg.Value = kafka.ByteEncoder(val)
msg.Timestamp = time.Now()
listOfMessages3 = append(listOfMessages3, msg)
}
err = kc.Publish(ctx, "my_topicA", listOfMessages1)
if err != nil {
panic(err)
}
err = kc.Publish(ctx, "my_topicB", listOfMessages2)
if err != nil {
panic(err)
}
err = kc.Publish(ctx, "my_topicC", listOfMessages3)
if err != nil {
panic(err)
}
myListener := NewMyListener()
err = kc.Subscribe(ctx, "my_topicA", "test", kafka.NewConfig(), myListener)
if err != nil {
panic(err)
}
err = kc.Subscribe(ctx, "my_topicB", "test", kafka.NewConfig(), myListener)
if err != nil {
panic(err)
}
err = kc.Subscribe(ctx, "my_topicC", "test", kafka.NewConfig(), myListener)
if err != nil {
panic(err)
}
<-time.After(1000 * time.Microsecond)
err = kc.Close(ctx)
if err != nil {
panic(err)
}
fmt.Println("Execution completed!")
}
type MyListener struct {
ready chan bool
Lock sync.Mutex
}
func NewMyListener() *MyListener {
c := &MyListener{}
c.ready = make(chan bool)
return c
}
// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *MyListener) Setup(kafka.ConsumerGroupSession) error {
// Mark the consumer as ready
c.ready <- true
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (c *MyListener) Cleanup(kafka.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *MyListener) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error {
for {
// if len(c.readablePartitions) == 0 || slices.Contains(c.readablePartitions, claim.Partition()) {
select {
case msg := <-claim.Messages():
if msg != nil {
fmt.Printf("%s, %s", msg.Topic, msg.Value)
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
// }
}
}
func (c *MyListener) SetReady(chFlag chan bool) {
c.Lock.Lock()
defer c.Lock.Unlock()
c.ready = chFlag
}
// Returns: channel with bool flag ready
func (c *MyListener) Ready() chan bool {
return c.ready
}
from pip_services4_kafka.connect import KafkaConnection
from pip_services4_components.config import ConfigParams
from pip_services4_kafka.connect import IKafkaMessageListener
kc = KafkaConnection()
config = ConfigParams.from_tuples(
'connection.host', 'localhost',
'connection.port', 9092
)
kc.configure(config)
kc.open(None)
kc.create_queue("my_queueA")
kc.create_queue("my_queueB")
kc.create_queue("my_queueC")
list_of_messages1 = [{'value': "message 1"}, {'value': "message 2"}]
list_of_messages2 = [{'value': "message 3"}, {'value': "message 4"}]
list_of_messages3 = [{'value': "message 5"}, {'value': "message 6"}]
kc.publish("my_topicA", list_of_messages1, {})
kc.publish("my_topicB", list_of_messages2, {})
kc.publish("my_topicC", list_of_messages3, {})
class MyListener(IKafkaMessageListener):
def on_message(self, topic, partition, message):
print(message.value().decode('utf-8'), message.offset())
my_listener = MyListener()
kc.subscribe("my_topicA", None, {}, my_listener)
kc.subscribe("my_topicB", None, {}, my_listener)
kc.subscribe("my_topicC", None, {}, my_listener)
kc.close(None)
print("Execution completed!")
After running it, we get an output similar to
KafkaMessageQueue
The KafkaMessageQueue component allows us to create message queues that send and receive messages via the Kafka message broker. This class has several methods that allow us to create a queue, send and receive messages, remove a message from a queue, close the queue and more. Below is an explanation with examples of its main methods.
Create a Kafka message queue
To create a queue, we need to configure it first. We can use the ConfigParams component to define the queue’s parameters, such as the topic. Additionally, we can add the connection parameters.
import { ConfigParams } from "pip-services4-components-node";
import { KafkaMessageQueue } from "pip-services4-kafka-node";
let queue = new KafkaMessageQueue();
queue.configure(ConfigParams.fromTuples(
"topic", "mytopic",
'connection.protocol', 'tcp',
"connection.host", "localhost",
"connection.port", 9092,
"options.autosubscribe", true
));
await queue.open(null);
import (
"context"
conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
kafkaqueue "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/queues"
)
ctx := context.Background()
queue := kafkaqueue.NewKafkaMessageQueue("")
queue.Configure(ctx, conf.NewConfigParamsFromTuples(
"topic", "mytopic",
"connection.protocol", "tcp",
"connection.host", "localhost",
"connection.port", 9092,
"options.autosubscribe", true,
))
queue.Open(ctx)
from pip_services4_kafka.queues import KafkaMessageQueue
queue = KafkaMessageQueue()
queue.configure(ConfigParams.from_tuples(
"topic", "mytopic",
'connection.protocol', 'tcp',
"connection.host", "localhost",
"connection.port", 9092,
"options.autosubscribe", True
))
queue.open(None)
Send a message to Kafka
To send a message to Kafka, we can use the send() method. This method has a MessageEnvelope as a message parameter. With a message envelope, the message is stored as a buffer, and a string is converted using an utf8 transformation.
import { MessageEnvelope } from "pip-services4-messaging-node";
await queue.send(null, new MessageEnvelope(null, 'mymessage', 'ABC'));
import cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
err := queue.Send(ctx, cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
from pip_services4_messaging.queues import MessageEnvelope
queue.send(None, MessageEnvelope(None, "mymessage", "ABC"))
Receive a message from Kafka
To receive an incoming message, we can use the receive() command, which deletes the message from the queue after it has been received.
let message = queue.receive(null, 10000);
message, err := queue.Receive(ctx, 10000)
message = queue.receive(None, 10000)
Alternatively, we can use the peek() method, which retrieves a single incoming message from the queue without removing it. If there is no message available in the queue, it returns null.
let envelope = await queue.peek(null);
envelope, err := queue.Peek(ctx)
envelope = queue.peek(None)
Remove a message
To remove a message from a queue, we can use the complete() method. The code is as follows
await queue.complete(message);
err := queue.Complete(ctx, message)
queue.complete(message)
Close the connection
To close a queue and free used resources, we can use the close() method, which is used in this way.
await queue.close(null);
err := queue.Close(ctx)
queue.close(None)
Example
We can now assemble everything in one example. First, we create a custom message receiver, which will manage the reception of messages according to our needs. This class will inherit from the MessageReceiver class and will expand it. It will also inherit the ICleanable interface, which will help us to define our clean() method.
Then, we define our connection and create an instance of our message receiver. We use the method beginListen() inherited from the IMessagQueue interface to start our listener and send a message.
Once we received our message, we capture it, close our listener and unlock the thread with the stopListen() method, and print a message verifying if the received message equals the sent message.
Our final code will look like this:
import { ConfigParams, ICleanable, Context } from "pip-services4-components-node";
import { KafkaMessageQueue } from "pip-services4-kafka-node";
import { IMessageQueue, IMessageReceiver, MessageEnvelope } from "pip-services4-messaging-node";
class MyMessageReceiver implements IMessageReceiver, ICleanable {
private _messages: MessageEnvelope[] = [];
constructor() { }
public get messages(): MessageEnvelope[] {
return this._messages;
}
public get messageCount(): number {
return this._messages.length;
}
public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
this._messages.push(envelope);
}
public async clear(ctx: Context): Promise<void> {
this._messages = [];
}
}
export async function main() {
let queue = new KafkaMessageQueue();
queue.configure(ConfigParams.fromTuples(
"topic", "mytopic2",
'connection.protocol', 'tcp',
"connection.host", "localhost",
"connection.port", 9092,
"options.autosubscribe", true
));
await queue.open(null);
let messageReceiver = new MyMessageReceiver();
await queue.beginListen(null, messageReceiver);
let envelope1 = new MessageEnvelope(null, 'Test', 'Test message');
await queue.send(null, envelope1);
// await message
for (let i = 0; i < 15; i++) {
if (messageReceiver.messages.length > 0) {
break;
}
await new Promise<void>((resolve, reject) => {
setTimeout(resolve, 500);
});
}
let envelope2 = messageReceiver.messages[0];
console.log(envelope1.getMessageAsString() == envelope2.getMessageAsString());
await queue.endListen(null);
await queue.close(null);
}
import (
"context"
"fmt"
"sync"
"time"
conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
kafkaqueue "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/queues"
cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
)
func main() {
ctx := context.Background()
queue := kafkaqueue.NewKafkaMessageQueue("my_queue")
queue.Configure(ctx, conf.NewConfigParamsFromTuples(
// "topic", "my_queue",
"connection.protocol", "tcp",
"connection.host", "localhost",
"connection.port", 9092,
"options.autosubscribe", true,
))
err := queue.Open(ctx)
if err != nil {
panic(err)
}
messageReceiver := NewMyMessageReceiver()
queue.BeginListen(ctx, messageReceiver)
envelope1 := cqueues.NewMessageEnvelope("123", "Test", []byte("Test message"))
err = queue.Send(ctx, envelope1)
if err != nil {
panic(err)
}
// wait message
for i := 0; i < 15; i++ {
if messageReceiver.MessageCount() > 0 {
fmt.Println(messageReceiver.MessageCount())
break
}
<-time.After(500 * time.Millisecond)
}
envelope2 := messageReceiver.Messages()[0]
fmt.Println(envelope1.GetMessageAsString() == envelope2.GetMessageAsString())
queue.EndListen(ctx)
err = queue.Close(ctx)
if err != nil {
panic(err)
}
}
type MyMessageReceiver struct {
_messages []*cqueues.MessageEnvelope
lock sync.Mutex
}
func NewMyMessageReceiver() *MyMessageReceiver {
c := &MyMessageReceiver{
_messages: make([]*cqueues.MessageEnvelope, 0),
}
return c
}
func (c *MyMessageReceiver) Messages() []*cqueues.MessageEnvelope {
c.lock.Lock()
defer c.lock.Unlock()
return c._messages
}
func (c *MyMessageReceiver) MessageCount() int {
c.lock.Lock()
defer c.lock.Unlock()
return len(c._messages)
}
func (c *MyMessageReceiver) Clear(ctx context.Context, correlationId string) error {
c.lock.Lock()
defer c.lock.Unlock()
c._messages = make([]*cqueues.MessageEnvelope, 0)
return nil
}
func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
c.lock.Lock()
defer c.lock.Unlock()
c._messages = append(c._messages, envelope)
return nil
}
import threading
import time
from typing import List, Optional
from pip_services4_components.config import ConfigParams
from pip_services4_components.run import ICleanable
from pip_services4_messaging.queues import IMessageReceiver, MessageEnvelope, IMessageQueue
from pip_services4_kafka.queues import KafkaMessageQueue
class MyMessageReceiver(IMessageReceiver, ICleanable):
def __init__(self):
self.__messages: List[MessageEnvelope] = []
self.__lock = threading.Lock()
@property
def messages(self) -> List[MessageEnvelope]:
return self.__messages
@property
def message_count(self) -> int:
return len(self.__messages)
def receive_message(self, message: MessageEnvelope, queue: IMessageQueue):
with self.__lock:
self.__messages.append(message)
def clear(self, correlation_id: Optional[str]):
with self.__lock:
self.__messages = []
queue = KafkaMessageQueue()
queue.configure(ConfigParams.from_tuples(
"topic", "mytopic2",
'connection.protocol', 'tcp',
"connection.host", "localhost",
"connection.port", 9092,
"options.autosubscribe", True
))
queue.open(None)
message_receiver = MyMessageReceiver()
queue.begin_listen(None, message_receiver)
envelope1 = MessageEnvelope(ctx, "Test", "Test message")
queue.send(None, envelope1)
# await message
for i in range(15):
if len(message_receiver.messages) > 0:
break
time.sleep(0.5)
envelope2 = message_receiver.messages[0]
print(envelope1.message.decode('utf-8') == envelope2.message.decode('utf-8'))
queue.end_listen(None)
queue.close(None)
If successfully run, we will get the following output.
Wrapping up
In this tutorial, we have seen how to use two components that present two different alternatives to communicate with Kafka. The first is KafkaConnection. This class presents the advantage of allowing to use one connection with multiple queues, and work through the publish/subscribe mechanism. The second is KafkaMessageQueue, which permits to create a queue, and send/receive messages to/from Kafka.