Kafka

How to communicate with Apache Kafka.

Key takeaways

KafkaConnection Component used to create and manage a Kafka connection that can be shared by multiple queues.
KafkaMessageQueue Component used to create a queue that can send and receive messages to and from Kafka.

Introduction

This tutorial will help you understand how to use two components designed to communicate with Apache Kafka. They are KafkaConnection and KafkaMessageQueue. The first can be used to connect to Apache Kafka and create a publish/subscribe communication mechanism. The second allows for the creation of a queue that can send and receive messages to and from Kafka. The main methods available in both classes are described and explained with examples.

Communicating with Kafka

Pip.Services offers several components to connect, send and receive messages from Kafka. In this section, we will see the KafkaConnection and KafkaMessageQueue classes and their most important methods.

KafkaConnection

The KafkaConnection class allows us to define a connection to Kafka. The main advantage of using this class is that it allows us to define a connection that can be shared by multiple message queues, thus reducing the number of used connections. The following is an explanation of the main operations offered by this class.

Connect to Kafka

To connect to Kafka, we first need to configure our connection. For this, we can use the ConfigParams class, which allows us to specify our connection parameters, such as host and port. Once the parameters are defined, we can use the configure() method to define a connection and the open() method to start it.

import { ConfigParams } from "pip-services4-components-node";
import { KafkaConnection } from "pip-services4-kafka-node";

let kc = new KafkaConnection();
let config = ConfigParams.fromTuples(
    'connection.host', 'localhost',
    'connection.port', 9092
);

kc.configure(config);

await kc.open(null);

import (
	"context"

	conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
	kafkaconn "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/connect"
)

func main() {
	kc := kafkaconn.NewKafkaConnection()
	config := conf.NewConfigParamsFromTuples(
		"connection.host", "localhost",
		"connection.port", 9092,
	)

	kc.Configure(context.Background(), config)

	kc.Open(context.Background())
}
Not available
from pip_services4_kafka.connect import KafkaConnection
from pip_services4_components.config import ConfigParams

kc = KafkaConnection()
config = ConfigParams.from_tuples(
    'connection.host', 'localhost', 
    'connection.port', 9092
)

kc.configure(config)
kc.open(None)
Not available
Check if a connection is open

To check if a connection is open, we can use the isOpen() method, which returns true if the component is open and false otherwise. The example below illustrates its usage.

kc.isOpen();

ok := kc.IsOpen()
Not available
kc.is_open()
Not available

This class also provides the checkOpen() method, which returns None if the connection is open and raises an error if it is closed. The example below shows how to use it.

kc.checkOpen();


Note:  kc.checkOpen() is not supported in the current version
Not available
kc._check_open()
Not available

Note: Both methods are protected. Therefore, they must be defined in a subclass of this component.

Create a queue

To create a queue, this class offers the createQueue() method. This method accepts the queue’s name as input parameter. The following example shows how to use it.

await kc.createQueue("my_queueA");


err := kc.CreateQueue("my_queueA")
if err != nil {
	panic(err)
}
Not available
err := kc.CreateQueue("my_queueA")
if err != nil {
	panic(err)
}
Not available
Delete a queue

To delete a queue, this class offers the deleteQueue() method, which requires the queue’s name as input parameter. The example below shows how to use it.

await kc.deleteQueue('my_queueC');


err = kc.DeleteQueue("my_queueC")
if err != nil {
	panic(err)
}
Not available
kc.delete_queue('my_queueC')
Not available
List all available queues

To obtain a list of all available queues, this class has the readQueueNames() method. Once run, it returns a list with the queue names. If the connection doesn’t support this function, it returns an empty list. The following example shows how to use it.

await kc.readQueueNames();


names, err := kc.ReadQueueNames()
Not available
kc.read_queue_names()
Not available
Set the offset

The seek method sets the offset to a specified value. It requires five input parameters namely, the topic name, group id, partition, specified offset, and listener. The example below shows how to use it.

let offset = 60;
await kc.seek("my_topicA", null, 0, offset, myListener);


Note: Seek is not supported in the curent version.
Not available
offset = 60
kc.seek("my_topicA", None, 0, offset, my_listener)
Not available
Publish

To publish messages on a queue, this class has the publish() method, which accepts the name of the queue and a list of messages as inputs. The following example explains its usage.

let listOfMessages = ["message 1", "message 2"];
await kc.publish("my_topicA", listOfMessages, {});


var listOfMessages []*kafka.ProducerMessage
for i, val := range []string{"message 1", "message 2"} {
	msg := &kafka.ProducerMessage{}
	msg.Key = kafka.StringEncoder(i)
	msg.Value = kafka.ByteEncoder(val)
	msg.Timestamp = time.Now()
	listOfMessages = append(listOfMessages, msg)
}
kc.Publish(context.Background(), "my_topicA", listOfMessages)
Not available
list_of_messages = [{'value': "message 1"}, {'value': "message 2"}]
kc.publish("my_topicA", list_of_messages, {})
Not available
Subscribe

To subscribe to a topic, this class offers the subscribe() method, which accepts three parameters namely, the name of a topic, a group id, a dictionary containing options such as timeout and compression, and a listener.

The listener must implement the IkafkaMessageListener interface, which contains the on_message() method. The onMessage() method requires three parameters: topic, partition and message object. The example below shows how to use it.

import { IKafkaMessageListener } from "pip-services4-kafka-node";

class MyListener implements IKafkaMessageListener {

    public async onMessage(topic: string, partition: number, message: any): Promise<void> {
        console.log(`${topic}, ${message}`);
    }
    
}

let myListener = new MyListener();

await kc.subscribe("my_topicA", null, {}, myListener);

import (
	"fmt"
	"sync"

	kafka "github.com/Shopify/sarama"
)

type MyListener struct {
	ready chan bool
	Lock  sync.Mutex
}

func NewMyListener() *MyListener {
	c := &MyListener{}
	c.ready = make(chan bool)
	return c
}

// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *MyListener) Setup(kafka.ConsumerGroupSession) error {
	// Mark the consumer as ready
	c.ready <- true
	close(c.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (c *MyListener) Cleanup(kafka.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *MyListener) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error {
	for {
		// if len(c.readablePartitions) == 0 || slices.Contains(c.readablePartitions, claim.Partition()) {
		select {
		case msg := <-claim.Messages():
			if msg != nil {
				fmt.Printf("%s, %s", msg.Topic, msg.Value)
			}
		// Should return when `session.Context()` is done.
		// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
		// https://github.com/Shopify/sarama/issues/1192
		case <-session.Context().Done():
			return nil
		}
		// }
	}
}

func (c *MyListener) SetReady(chFlag chan bool) {
	c.Lock.Lock()
	defer c.Lock.Unlock()
	c.ready = chFlag
}

// Returns: channel with bool flag ready
func (c *MyListener) Ready() chan bool {
	return c.ready
}

myListener := NewMyListener()

kc.Subscribe(context.Background(), "my_topicA", "", kafka.NewConfig(), myListener)
Not available
from pip_services4_kafka.connect import IKafkaMessageListener

class MyListener(IKafkaMessageListener):
    
    def on_message(topic, partition, message):
        print(message.value().decode('utf-8'), message.offset())

my_listener = MyListener()
kc.subscribe("my_topicA", None, {}, my_listener)
Not available
Unsubscribe

To unsubscribe, this class offers the unsubscribe() method. This method has the topic name, group id and listener as input parameters. The following example shows how to use it.

await kc.unsubscribe("my_topicA", null, myListener);


err = kc.Unsubscribe(context.Background(), "my_topicA", "test", myListener)
Not available
kc.unsubscribe("my_topicA", None, my_listener)
Not available
Close the connection

To close an open connection, we can use the close() method, which is used as follows:

await kc.close(null);


_ = kc.Close(context.Background())
Not available
kc.close(None)
Not available
Example

The following example shows how to use this class to create a connection, a set of queues, to publish them on Kafka, and to subscribe to a topic containing the published messages.

import { ConfigParams } from "pip-services4-components-node";
import { IKafkaMessageListener, KafkaConnection } from "pip-services4-kafka-node";

export async function main() {
    let kc = new KafkaConnection();
    let config = ConfigParams.fromTuples(
        'connection.host', 'localhost',
        'connection.port', 9092
    )

    kc.configure(config);

    await kc.open(null);

    await kc.createQueue("my_queueA");
    await kc.createQueue("my_queueB");
    await kc.createQueue("my_queueC");

    let listOfMessages1 = [{ value: "message 1" }, { value: "message 2" }];
    let listOfMessages2 = [{ value: "message 3" }, { value: "message 4" }];
    let listOfMessages3 = [{ value: "message 5" }, { value: "message 6" }];

    await kc.publish("my_topicA", listOfMessages1, null);
    await kc.publish("my_topicB", listOfMessages2, null);
    await kc.publish("my_topicC", listOfMessages3, null);

    let myListener = new MyListener();

    await kc.subscribe("my_topicA", null, null, myListener);
    await kc.subscribe("my_topicB", null, null, myListener);
    await kc.subscribe("my_topicC", null, null, myListener);

    await kc.close(null);

    console.log('Execution completed!');
}

class MyListener implements IKafkaMessageListener {

    public async onMessage(topic: string, partition: number, message: any): Promise<void> {
        console.log(`${topic}, ${message}`);
    }
}

import (
	"context"
	"fmt"
	"sync"
	"time"

	kafka "github.com/Shopify/sarama"
	conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
	kafkaconn "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/connect"
)

func main() {
	ctx := context.Background()

	kc := kafkaconn.NewKafkaConnection()
	config := conf.NewConfigParamsFromTuples(
		"connection.host", "localhost",
		"connection.port", 9092,
	)

	kc.Configure(context.Background(), config)

	kc.Open(context.Background())

	err := kc.CreateQueue("my_queueA")
	if err != nil {
		panic(err)
	}

	err = kc.DeleteQueue("my_queueA")
	if err != nil {
		panic(err)
	}

	_, _ = kc.ReadQueueNames()

	var listOfMessages1 []*kafka.ProducerMessage
	var listOfMessages2 []*kafka.ProducerMessage
	var listOfMessages3 []*kafka.ProducerMessage

	for i, val := range []string{"message 1", "message 2"} {
		msg := &kafka.ProducerMessage{}
		msg.Key = kafka.StringEncoder(rune(i))
		msg.Value = kafka.ByteEncoder(val)
		msg.Timestamp = time.Now()
		listOfMessages1 = append(listOfMessages1, msg)
	}

	for i, val := range []string{"message 3", "message 4"} {
		msg := &kafka.ProducerMessage{}
		msg.Key = kafka.StringEncoder(rune(i))
		msg.Value = kafka.ByteEncoder(val)
		msg.Timestamp = time.Now()
		listOfMessages2 = append(listOfMessages2, msg)
	}

	for i, val := range []string{"message 5", "message 6"} {
		msg := &kafka.ProducerMessage{}
		msg.Key = kafka.StringEncoder(rune(i))
		msg.Value = kafka.ByteEncoder(val)
		msg.Timestamp = time.Now()
		listOfMessages3 = append(listOfMessages3, msg)
	}

	err = kc.Publish(ctx, "my_topicA", listOfMessages1)
	if err != nil {
		panic(err)
	}

	err = kc.Publish(ctx, "my_topicB", listOfMessages2)
	if err != nil {
		panic(err)
	}

	err = kc.Publish(ctx, "my_topicC", listOfMessages3)
	if err != nil {
		panic(err)
	}

	myListener := NewMyListener()

	err = kc.Subscribe(ctx, "my_topicA", "test", kafka.NewConfig(), myListener)
	if err != nil {
		panic(err)
	}

	err = kc.Subscribe(ctx, "my_topicB", "test", kafka.NewConfig(), myListener)
	if err != nil {
		panic(err)
	}

	err = kc.Subscribe(ctx, "my_topicC", "test", kafka.NewConfig(), myListener)
	if err != nil {
		panic(err)
	}

	<-time.After(1000 * time.Microsecond)

	err = kc.Close(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Println("Execution completed!")

}

type MyListener struct {
	ready chan bool
	Lock  sync.Mutex
}

func NewMyListener() *MyListener {
	c := &MyListener{}
	c.ready = make(chan bool)
	return c
}

// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *MyListener) Setup(kafka.ConsumerGroupSession) error {
	// Mark the consumer as ready
	c.ready <- true
	close(c.ready)
	return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (c *MyListener) Cleanup(kafka.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *MyListener) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error {
	for {
		// if len(c.readablePartitions) == 0 || slices.Contains(c.readablePartitions, claim.Partition()) {
		select {
		case msg := <-claim.Messages():
			if msg != nil {
				fmt.Printf("%s, %s", msg.Topic, msg.Value)
			}
		// Should return when `session.Context()` is done.
		// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
		// https://github.com/Shopify/sarama/issues/1192
		case <-session.Context().Done():
			return nil
		}
		// }
	}
}

func (c *MyListener) SetReady(chFlag chan bool) {
	c.Lock.Lock()
	defer c.Lock.Unlock()
	c.ready = chFlag
}

// Returns: channel with bool flag ready
func (c *MyListener) Ready() chan bool {
	return c.ready
}

Not available
from pip_services4_kafka.connect import KafkaConnection
from pip_services4_components.config import ConfigParams
from pip_services4_kafka.connect import IKafkaMessageListener

kc = KafkaConnection()
config = ConfigParams.from_tuples(
    'connection.host', 'localhost',
    'connection.port', 9092
)

kc.configure(config)
kc.open(None)

kc.create_queue("my_queueA")
kc.create_queue("my_queueB")
kc.create_queue("my_queueC")

list_of_messages1 = [{'value': "message 1"}, {'value': "message 2"}]
list_of_messages2 = [{'value': "message 3"}, {'value': "message 4"}]
list_of_messages3 = [{'value': "message 5"}, {'value': "message 6"}]

kc.publish("my_topicA", list_of_messages1, {})
kc.publish("my_topicB", list_of_messages2, {})
kc.publish("my_topicC", list_of_messages3, {})

class MyListener(IKafkaMessageListener):
    def on_message(self, topic, partition, message):
        print(message.value().decode('utf-8'), message.offset())

my_listener = MyListener()

kc.subscribe("my_topicA", None, {}, my_listener)
kc.subscribe("my_topicB", None, {}, my_listener)
kc.subscribe("my_topicC", None, {}, my_listener)

kc.close(None)

print("Execution completed!")
Not available

After running it, we get an output similar to

figure 1

KafkaMessageQueue

The KafkaMessageQueue component allows us to create message queues that send and receive messages via the Kafka message broker. This class has several methods that allow us to create a queue, send and receive messages, remove a message from a queue, close the queue and more. Below is an explanation with examples of its main methods.

Create a Kafka message queue

To create a queue, we need to configure it first. We can use the ConfigParams component to define the queue’s parameters, such as the topic. Additionally, we can add the connection parameters.


import { ConfigParams } from "pip-services4-components-node";
import { KafkaMessageQueue } from "pip-services4-kafka-node";

let queue = new KafkaMessageQueue();
queue.configure(ConfigParams.fromTuples(
    "topic", "mytopic",
    'connection.protocol', 'tcp',
    "connection.host", "localhost",
    "connection.port", 9092,
    "options.autosubscribe", true
));

await queue.open(null);

import (
	"context"

	conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
	kafkaqueue "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/queues"
)


ctx := context.Background()

queue := kafkaqueue.NewKafkaMessageQueue("")
queue.Configure(ctx, conf.NewConfigParamsFromTuples(
	"topic", "mytopic",
	"connection.protocol", "tcp",
	"connection.host", "localhost",
	"connection.port", 9092,
	"options.autosubscribe", true,
))

queue.Open(ctx)
Not available

from pip_services4_kafka.queues import KafkaMessageQueue

queue = KafkaMessageQueue()
queue.configure(ConfigParams.from_tuples(
    "topic", "mytopic",
    'connection.protocol', 'tcp',
    "connection.host", "localhost",
    "connection.port", 9092,
    "options.autosubscribe", True
))

queue.open(None)
Not available
Send a message to Kafka

To send a message to Kafka, we can use the send() method. This method has a MessageEnvelope as a message parameter. With a message envelope, the message is stored as a buffer, and a string is converted using an utf8 transformation.

import { MessageEnvelope } from "pip-services4-messaging-node";

await queue.send(null, new MessageEnvelope(null, 'mymessage', 'ABC'));

import cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"

err := queue.Send(ctx, cqueues.NewMessageEnvelope("123", "mymessage", []byte("ABC")))
Not available

from pip_services4_messaging.queues import MessageEnvelope

queue.send(None, MessageEnvelope(None, "mymessage", "ABC"))
Not available
Receive a message from Kafka

To receive an incoming message, we can use the receive() command, which deletes the message from the queue after it has been received.

let message = queue.receive(null, 10000);


message, err := queue.Receive(ctx, 10000)
Not available
message = queue.receive(None, 10000)
Not available

Alternatively, we can use the peek() method, which retrieves a single incoming message from the queue without removing it. If there is no message available in the queue, it returns null.

let envelope = await queue.peek(null);


envelope, err := queue.Peek(ctx)
Not available
envelope = queue.peek(None)
Not available
Remove a message

To remove a message from a queue, we can use the complete() method. The code is as follows

await queue.complete(message);


err := queue.Complete(ctx, message)
Not available
queue.complete(message)
Not available
Close the connection

To close a queue and free used resources, we can use the close() method, which is used in this way.

await queue.close(null);


err := queue.Close(ctx)
Not available
queue.close(None)
Not available
Example

We can now assemble everything in one example. First, we create a custom message receiver, which will manage the reception of messages according to our needs. This class will inherit from the MessageReceiver class and will expand it. It will also inherit the ICleanable interface, which will help us to define our clean() method.

Then, we define our connection and create an instance of our message receiver. We use the method beginListen() inherited from the IMessagQueue interface to start our listener and send a message.

Once we received our message, we capture it, close our listener and unlock the thread with the stopListen() method, and print a message verifying if the received message equals the sent message.

Our final code will look like this:

import { ConfigParams, ICleanable, Context } from "pip-services4-components-node";
import { KafkaMessageQueue } from "pip-services4-kafka-node";
import { IMessageQueue, IMessageReceiver, MessageEnvelope } from "pip-services4-messaging-node";

class MyMessageReceiver implements IMessageReceiver, ICleanable {
    private _messages: MessageEnvelope[] = [];

    constructor() { }

    public get messages(): MessageEnvelope[] {
        return this._messages;
    }

    public get messageCount(): number {
        return this._messages.length;
    }

    public async receiveMessage(envelope: MessageEnvelope, queue: IMessageQueue): Promise<void> {
        this._messages.push(envelope);
    }

    public async clear(ctx: Context): Promise<void> {
        this._messages = [];
    }
}
                      
export async function main() {

    let queue = new KafkaMessageQueue();
    queue.configure(ConfigParams.fromTuples(
        "topic", "mytopic2",
        'connection.protocol', 'tcp',
        "connection.host", "localhost",
        "connection.port", 9092,
        "options.autosubscribe", true
    ));

    await queue.open(null);

    let messageReceiver = new MyMessageReceiver();
    await queue.beginListen(null, messageReceiver);

    let envelope1 = new MessageEnvelope(null, 'Test', 'Test message');
    await queue.send(null, envelope1);

    // await message
    for (let i = 0; i < 15; i++) {
        if (messageReceiver.messages.length > 0) {
            break;
        }
        await new Promise<void>((resolve, reject) => {
            setTimeout(resolve, 500);
        });
    }

    let envelope2 = messageReceiver.messages[0];
    console.log(envelope1.getMessageAsString() == envelope2.getMessageAsString());

    await queue.endListen(null);
    await queue.close(null);
}

import (
	"context"
	"fmt"
	"sync"
	"time"

	conf "github.com/pip-services4/pip-services4-go/pip-services4-components-go/config"
	kafkaqueue "github.com/pip-services4/pip-services4-go/pip-services4-kafka-go/queues"
	cqueues "github.com/pip-services4/pip-services4-go/pip-services4-messaging-go/queues"
)

func main() {
	ctx := context.Background()

	queue := kafkaqueue.NewKafkaMessageQueue("my_queue")
	queue.Configure(ctx, conf.NewConfigParamsFromTuples(
		// "topic", "my_queue",
		"connection.protocol", "tcp",
		"connection.host", "localhost",
		"connection.port", 9092,
		"options.autosubscribe", true,
	))

	err := queue.Open(ctx)
	if err != nil {
		panic(err)
	}

	messageReceiver := NewMyMessageReceiver()
	queue.BeginListen(ctx, messageReceiver)

	envelope1 := cqueues.NewMessageEnvelope("123", "Test", []byte("Test message"))
	err = queue.Send(ctx, envelope1)
	if err != nil {
		panic(err)
	}

	// wait message
	for i := 0; i < 15; i++ {
		if messageReceiver.MessageCount() > 0 {
			fmt.Println(messageReceiver.MessageCount())
			break
		}
		<-time.After(500 * time.Millisecond)
	}

	envelope2 := messageReceiver.Messages()[0]
	fmt.Println(envelope1.GetMessageAsString() == envelope2.GetMessageAsString())

	queue.EndListen(ctx)
	err = queue.Close(ctx)
	if err != nil {
		panic(err)
	}
}

type MyMessageReceiver struct {
	_messages []*cqueues.MessageEnvelope
	lock      sync.Mutex
}

func NewMyMessageReceiver() *MyMessageReceiver {
	c := &MyMessageReceiver{
		_messages: make([]*cqueues.MessageEnvelope, 0),
	}
	return c
}

func (c *MyMessageReceiver) Messages() []*cqueues.MessageEnvelope {
	c.lock.Lock()
	defer c.lock.Unlock()
	return c._messages
}

func (c *MyMessageReceiver) MessageCount() int {
	c.lock.Lock()
	defer c.lock.Unlock()
	return len(c._messages)
}

func (c *MyMessageReceiver) Clear(ctx context.Context, correlationId string) error {
	c.lock.Lock()
	defer c.lock.Unlock()
	c._messages = make([]*cqueues.MessageEnvelope, 0)
	return nil
}

func (c *MyMessageReceiver) ReceiveMessage(ctx context.Context, envelope *cqueues.MessageEnvelope, queue cqueues.IMessageQueue) (err error) {
	c.lock.Lock()
	defer c.lock.Unlock()
	c._messages = append(c._messages, envelope)
	return nil
}
Not available
import threading
import time
from typing import List, Optional
from pip_services4_components.config import ConfigParams
from pip_services4_components.run import ICleanable
from pip_services4_messaging.queues import IMessageReceiver, MessageEnvelope, IMessageQueue
from pip_services4_kafka.queues import KafkaMessageQueue

class MyMessageReceiver(IMessageReceiver, ICleanable):

    def __init__(self):
        self.__messages: List[MessageEnvelope] = []
        self.__lock = threading.Lock()

    @property
    def messages(self) -> List[MessageEnvelope]:
        return self.__messages

    @property
    def message_count(self) -> int:
        return len(self.__messages)

    def receive_message(self, message: MessageEnvelope, queue: IMessageQueue):
        with self.__lock:
            self.__messages.append(message)

    def clear(self, correlation_id: Optional[str]):
        with self.__lock:
            self.__messages = []
                      
queue = KafkaMessageQueue()
queue.configure(ConfigParams.from_tuples(
    "topic", "mytopic2",
    'connection.protocol', 'tcp',
    "connection.host", "localhost",
    "connection.port", 9092,
    "options.autosubscribe", True
))
queue.open(None)

message_receiver = MyMessageReceiver()
queue.begin_listen(None, message_receiver)

envelope1 = MessageEnvelope(ctx, "Test", "Test message")
queue.send(None, envelope1)

# await message
for i in range(15):
    if len(message_receiver.messages) > 0:
        break
    time.sleep(0.5)

envelope2 = message_receiver.messages[0]

print(envelope1.message.decode('utf-8') == envelope2.message.decode('utf-8'))

queue.end_listen(None)

queue.close(None)

Not available

If successfully run, we will get the following output.

figure 2

Wrapping up

In this tutorial, we have seen how to use two components that present two different alternatives to communicate with Kafka. The first is KafkaConnection. This class presents the advantage of allowing to use one connection with multiple queues, and work through the publish/subscribe mechanism. The second is KafkaMessageQueue, which permits to create a queue, and send/receive messages to/from Kafka.