from pip_services3_kafka.connect import KafkaConnection
from pip_services3_commons.config import ConfigParams
from pip_services3_kafka.connect import IKafkaMessageListener

kc = KafkaConnection()
config = ConfigParams.from_tuples(
    'connection.host', 'localhost',
    'connection.port', 9092
)

kc.configure(config)
kc.open(None)

kc.create_queue("my_queueA")
kc.create_queue("my_queueB")
kc.create_queue("my_queueC")

list_of_messages1 = [{'value': "message 1"}, {'value': "message 2"}]
list_of_messages2 = [{'value': "message 3"}, {'value': "message 4"}]
list_of_messages3 = [{'value': "message 5"}, {'value': "message 6"}]

kc.publish("my_topicA", list_of_messages1, {})
kc.publish("my_topicB", list_of_messages2, {})
kc.publish("my_topicC", list_of_messages3, {})

class MyListener(IKafkaMessageListener):
    def on_message(self, topic, partition, message):
        print(message.value().decode('utf-8'), message.offset())

my_listener = MyListener()

kc.subscribe("my_topicA", None, {}, my_listener)
kc.subscribe("my_topicB", None, {}, my_listener)
kc.subscribe("my_topicC", None, {}, my_listener)

kc.close(None)

print("Execution completed!")