using Confluent.Kafka;

using PipServices3.Commons.Config;
using PipServices3.Kafka.Connect;


var kc = new KafkaConnection();
var config = ConfigParams.FromTuples(
    "connection.host", "localhost",
    "connection.port", 9092
);

kc.Configure(config);
await kc.OpenAsync(null);

await kc.CreateQueueAsync("my_queueA");
await kc.CreateQueueAsync("my_queueB");
await kc.CreateQueueAsync("my_queueC");

var messages = new List<Message<byte[], byte[]>>();
for (var i = 0; i < 3; i++)
{
    var message = new Message<byte[], byte[]>();
    message.Value = Encoding.ASCII.GetBytes("message " + i.ToString());
    messages.Add(message);
}

await kc.PublishAsync("my_topicA", messages[0]);
await kc.PublishAsync("my_topicB", messages[1]);
await kc.PublishAsync("my_topicC", messages[2]);

var myListener = new MyListener();

await kc.SubscribeAsync("my_topicA", "My Computer", null, myListener);
await kc.SubscribeAsync("my_topicB", "My Computer", null, myListener);
await kc.SubscribeAsync("my_topicC", "My Computer", null, myListener);

await kc.CloseAsync(null);


public class MyListener : IKafkaMessageListener
{
    public void OnMessage(KafkaMessage msg)
    {
        Console.WriteLine(Encoding.ASCII.GetString(msg.Message.Value));
    }
}