Kafka

How to communicate with Apache Kafka.

Key takeaways

KafkaConnection Component used to create and manage a Kafka connection that can be shared by multiple queues.
KafkaMessageQueue Component used to create a queue that can send and receive messages to and from Kafka.

Introduction

This tutorial will help you understand how to use two components designed to communicate with Apache Kafka. They are KafkaConnection and KafkaMessageQueue. The first can be used to connect to Apache Kafka and create a publish/subscribe communication mechanism. The second allows for the creation of a queue that can send and receive messages to and from Kafka. The main methods available in both classes are described and explained with examples.

Communicating with Kafka

Pip.Services offers several components to connect, send and receive messages from Kafka. In this section, we will see the KafkaConnection and KafkaMessageQueue classes and their most important methods.

KafkaConnection

The KafkaConnection class allows us to define a connection to Kafka. The main advantage of using this class is that it allows us to define a connection that can be shared by multiple message queues, thus reducing the number of used connections. The following is an explanation of the main operations offered by this class.

Connect to Kafka

To connect to Kafka, we first need to configure our connection. For this, we can use the ConfigParams class, which allows us to specify our connection parameters, such as host and port. Once the parameters are defined, we can use the configure() method to define a connection and the open() method to start it.

Not available
Not available
Not available
Not available
from pip_services3_kafka.connect import KafkaConnection
from pip_services3_commons.config import ConfigParams

kc = KafkaConnection()
config = ConfigParams.from_tuples( 'connection.host', 'localhost', 'connection.port', 9092)

kc.configure(config)
kc.open(None)
Not available
Check if a connection is open

To check if a connection is open, we can use the is_open() method, which returns true if the component is open and false otherwise. The example below illustrates its usage.

Not available
Not available
Not available
Not available
kc.is_open()
Not available

This class also provides the _check_open() method, which returns None if the connection is open and raises an error if it is closed. The example below shows how to use it.

Not available
Not available
Not available
Not available
kc._check_open()
Not available
Create a queue

To create a queue, this class offers the create_queue() method. This method accepts the queue’s name as input parameter. The following example shows how to use it.

Not available
Not available
Not available
Not available
kc.create_queue("my_queueA")
Not available
Delete a queue

To delete a queue, this class offers the delete_queue() method, which requires the queue’s name as input parameter. The example below shows how to use it.

Not available
Not available
Not available
Not available
kc.delete_queue('my_queueC')
Not available
List all available queues

To obtain a list of all available queues, this class has the read_queue_names() method. Once run, it returns a list with the queue names. If the connection doesn’t support this function, it returns an empty list. The following example shows how to use it.

Not available
Not available
Not available
Not available
kc.read_queue_names()
Not available
Set the offset

The seek method sets the offset to a specified value. It requires five input parameters namely, the topic name, group id, partition, specified offset, and listener. The example below shows how to use it.

Not available
Not available
Not available
Not available
offset = 60
kc.seek("my_topicA", None,0,offset,my_listener)
Not available
Publish

To publish messages on a queue, this class has the publish() method, which accepts the name of the queue and a list of messages as inputs. The following example explains its usage.

Not available
Not available
Not available
Not available
list_of_messages1 = ["message 1", "message 2"]
kc.publish("my_topicA", list_of_messages1,{})
Not available
Subscribe

To subscribe to a topic, this class offers the subscribe() method, which accepts three parameters namely, the name of a topic, a group id, a dictionary containing options such as timeout and compression, and a listener.

The listener must implement the IkafkaMessageListener interface, which contains the on_message() method. The on_message() method requires three parameters: topic, partition and message object. The example below shows how to use it.

Not available
Not available
Not available
Not available
from pip_services3_kafka.connect import IKafkaMessageListener

class my_listener(IKafkaMessageListener):
    
    def on_message(topic, partition, message):
        print(message.value().decode('utf-8'), message.offset())
        
kc.subscribe("my_topicA", None, {}, my_listener)
Not available
Unsubscribe

To unsubscribe, this class offers the unsubscribe() method. This method has the topic name, group id and listener as input parameters. The following example shows how to use it.

Not available
Not available
Not available
Not available
kc.unsubscribe("my_topicA", None, my_listener)
Not available
Close the connection

To close an open connection, we can use the close() method, which is used as follows:

Not available
Not available
Not available
Not available
kc.close(None)
Not available
Example

The following example shows how to use this class to create a connection, a set of queues, to publish them on Kafka, and to subscribe to a topic containing the published messages.

Not available
Not available
Not available
Not available
from pip_services3_kafka.connect import KafkaConnection
from pip_services3_commons.config import ConfigParams
from pip_services3_kafka.connect import IKafkaMessageListener
import threading

kc = KafkaConnection()
config = ConfigParams.from_tuples( 'connection.host', 'localhost', 'connection.port', 9092)

kc.configure(config)
kc.open(None)

kc.create_queue("my_queueA")
kc.create_queue("my_queueB")
kc.create_queue("my_queueC")


list_of_messages1 = ["message 1", "message 2"]
list_of_messages2 = ["message 3", "message 4"]
list_of_messages3 = ["message 5", "message 6"]

kc.publish("my_topicA", list_of_messages1,{})
kc.publish("my_topicB", list_of_messages2,{})
kc.publish("my_topicC", list_of_messages3,{})

class my_listener(IKafkaMessageListener):
    def on_message(topic, partition, message):
        
        print(message.value().decode('utf-8'), message.offset())

kc.subscribe("my_topicA", None, {}, my_listener)        
kc.subscribe("my_topicB", None, {}, my_listener)         
kc.subscribe("my_topicC", None, {}, my_listener) 

print("Execution completed!")
Not available

After running it, we get an output similar to

figure 1

KafkaMessageQueue

The KafkaMessageQueue component allows us to create message queues that send and receive messages via the Kafka message broker. This class has several methods that allow us to create a queue, send and receive messages, remove a message from a queue, close the queue and more. Below is an explanation with examples of its main methods.

Create a Kafka message queue

To create a queue, we need to configure it first. We can use the ConfigParams component to define the queue’s parameters, such as the topic. Additionally, we can add the connection parameters.

Not available
Not available
Not available
Not available
from pip_services3_kafka.queues import KafkaMessageQueue

queue = KafkaMessageQueue("myqueue")
queue.configure(ConfigParams.from_tuples(
    "topic", "mytopic",
    'connection.protocol', 'tcp',
    "connection.host", "localhost",
    "connection.port", 9092,
))

queue.open("123")
Not available
Send a message to Kafka

To send a message to Kafka, we can use the send() method. This method has a MessageEnvelope as a message parameter. With a message envelope, the message is stored as a buffer, and a string is converted using an utf8 transformation.

Not available
Not available
Not available
Not available
from pip_services3_messaging.queues import MessageEnvelope

queue.send("123", MessageEnvelope(None, "mymessage", "ABC"))
Not available
Receive a message from Kafka

To receive an incoming message, we can use the receive() command, which deletes the message from the queue after it has been received.

Not available
Not available
Not available
Not available
message = queue.receive("123", 10000)
Not available

Alternatively, we can use the peek() method, which retrieves a single incoming message from the queue without removing it. If there is no message available in the queue, it returns null.

Not available
Not available
Not available
Not available
envelope2 = queue.peek(None)
Not available
Remove a message

To remove a message from a queue, we can use the complete() method. The code is as follows

Not available
Not available
Not available
Not available
queue.complete(message)
Not available
Close the connection

To close a queue and free used resources, we can use the close() method, which is used in this way.

Not available
Not available
Not available
Not available
queue.close("123")
Not available
Example

We can now assemble everything in one example. First, we create a custom message receiver, which will manage the reception of messages according to our needs. This class will inherit from the MessageReceiver class and will expand it. It will also inherit the ICleanable interface, which will help us to define our clean() method.

Then, we define our connection and create an instance of our message receiver. We use the method begin_listen() inherited from the IMessagQueue interface to start our listener and send a message.

Once we received our message, we capture it, close our listener and unlock the thread with the stop_listen() method, and print a message verifying if the received message equals the sent message.

Our final code will look like this:

Not available
Not available
Not available
Not available
import threading
import time
from typing import List, Optional
from pip_services3_commons.config import ConfigParams
from pip_services3_commons.run import ICleanable
from pip_services3_messaging.queues import IMessageReceiver, MessageEnvelope, IMessageQueue
from pip_services3_kafka.queues import KafkaMessageQueue

class MyMessageReceiver(IMessageReceiver, ICleanable):

    def __init__(self):
        self.__messages: List[MessageEnvelope] = []
        self.__lock = threading.Lock()

    @property
    def messages(self) -> List[MessageEnvelope]:
        return self.__messages

    @property
    def message_count(self) -> int:
        return len(self.__messages)

    def receive_message(self, message: MessageEnvelope, queue: IMessageQueue):
        with self.__lock:
            self.__messages.append(message)

    def clear(self, correlation_id: Optional[str]):
        with self.__lock:
            self.__messages = []
                      
queue = KafkaMessageQueue("myqueue2")
queue.configure(ConfigParams.from_tuples(
    "topic", "mytopic2",
    'connection.protocol', 'tcp',
    "connection.host", "localhost",
    "connection.port", 9092,
))
queue.open(None)

message_receiver = MyMessageReceiver()
queue.begin_listen(None, message_receiver)

envelope1 = MessageEnvelope("123", "Test", "Test message")
queue.send(None, envelope1)

# await message
for i in range(15):
    if len(message_receiver.messages) > 0:
        break
    time.sleep(0.5)

envelope2 = message_receiver.messages[0]

print(envelope1.message.decode('utf-8') == envelope2.message.decode('utf-8'))

queue.end_listen(None)

queue.close(None)
Not available

If successfully run, we will get the following output.

figure 2

Wrapping up

In this tutorial, we have seen how to use two components that present two different alternatives to communicate with Kafka. The first is KafkaConnection. This class presents the advantage of allowing to use one connection with multiple queues, and work through the publish/subscribe mechanism. The second is KafkaMessageQueue, which permits to create a queue, and send/receive messages to/from Kafka.