import (
"context"
"fmt"
"sync"
"time"
kafka "github.com/Shopify/sarama"
conf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
kafkaconn "github.com/pip-services3-gox/pip-services3-kafka-gox/connect"
)
func main() {
ctx := context.Background()
kc := kafkaconn.NewKafkaConnection()
config := conf.NewConfigParamsFromTuples(
"connection.host", "localhost",
"connection.port", 9092,
)
kc.Configure(context.Background(), config)
kc.Open(context.Background(), "")
err := kc.CreateQueue("my_queueA")
if err != nil {
panic(err)
}
err = kc.DeleteQueue("my_queueA")
if err != nil {
panic(err)
}
_, _ = kc.ReadQueueNames()
var listOfMessages1 []*kafka.ProducerMessage
var listOfMessages2 []*kafka.ProducerMessage
var listOfMessages3 []*kafka.ProducerMessage
for i, val := range []string{"message 1", "message 2"} {
msg := &kafka.ProducerMessage{}
msg.Key = kafka.StringEncoder(rune(i))
msg.Value = kafka.ByteEncoder(val)
msg.Timestamp = time.Now()
listOfMessages1 = append(listOfMessages1, msg)
}
for i, val := range []string{"message 3", "message 4"} {
msg := &kafka.ProducerMessage{}
msg.Key = kafka.StringEncoder(rune(i))
msg.Value = kafka.ByteEncoder(val)
msg.Timestamp = time.Now()
listOfMessages2 = append(listOfMessages2, msg)
}
for i, val := range []string{"message 5", "message 6"} {
msg := &kafka.ProducerMessage{}
msg.Key = kafka.StringEncoder(rune(i))
msg.Value = kafka.ByteEncoder(val)
msg.Timestamp = time.Now()
listOfMessages3 = append(listOfMessages3, msg)
}
err = kc.Publish(ctx, "my_topicA", listOfMessages1)
if err != nil {
panic(err)
}
err = kc.Publish(ctx, "my_topicB", listOfMessages2)
if err != nil {
panic(err)
}
err = kc.Publish(ctx, "my_topicC", listOfMessages3)
if err != nil {
panic(err)
}
myListener := NewMyListener()
err = kc.Subscribe(ctx, "my_topicA", "test", kafka.NewConfig(), myListener)
if err != nil {
panic(err)
}
err = kc.Subscribe(ctx, "my_topicB", "test", kafka.NewConfig(), myListener)
if err != nil {
panic(err)
}
err = kc.Subscribe(ctx, "my_topicC", "test", kafka.NewConfig(), myListener)
if err != nil {
panic(err)
}
<- time.After(1000 * time.Microsecond)
err = kc.Close(ctx, "")
if err != nil {
panic(err)
}
fmt.Println("Execution completed!")
}
type MyListener struct {
ready chan bool
Lock sync.Mutex
}
func NewMyListener() *MyListener {
c := &MyListener{}
c.ready = make(chan bool)
return c
}
// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *MyListener) Setup(kafka.ConsumerGroupSession) error {
// Mark the consumer as ready
c.ready <- true
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (c *MyListener) Cleanup(kafka.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *MyListener) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error {
for {
// if len(c.readablePartitions) == 0 || slices.Contains(c.readablePartitions, claim.Partition()) {
select {
case msg := <-claim.Messages():
if msg != nil {
fmt.Printf("%s, %s", msg.Topic, msg.Value)
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
// }
}
}
func (c *MyListener) SetReady(chFlag chan bool) {
c.Lock.Lock()
defer c.Lock.Unlock()
c.ready = chFlag
}
// Returns: channel with bool flag ready
func (c *MyListener) Ready() chan bool {
return c.ready
}