import (
"context"
"fmt"
"sync"
"time"
kafka "github.com/Shopify/sarama"
conf "github.com/pip-services3-gox/pip-services3-commons-gox/config"
kafkaconn "github.com/pip-services3-gox/pip-services3-kafka-gox/connect"
)
type MyListener struct {
ready chan bool
Lock sync.Mutex
}
func NewMyListener() *MyListener {
c := &MyListener{}
c.ready = make(chan bool)
return c
}
// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *MyListener) Setup(kafka.ConsumerGroupSession) error {
// Mark the consumer as ready
c.ready <- true
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (c *MyListener) Cleanup(kafka.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (c *MyListener) ConsumeClaim(session kafka.ConsumerGroupSession, claim kafka.ConsumerGroupClaim) error {
for {
// if len(c.readablePartitions) == 0 || slices.Contains(c.readablePartitions, claim.Partition()) {
select {
case msg := <-claim.Messages():
if msg != nil {
fmt.Printf("%s, %s", msg.Topic, msg.Value)
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
// }
}
}
func (c *MyListener) SetReady(chFlag chan bool) {
c.Lock.Lock()
defer c.Lock.Unlock()
c.ready = chFlag
}
// Returns: channel with bool flag ready
func (c *MyListener) Ready() chan bool {
return c.ready
}
myListener := NewMyListener()
kc.Subscribe(context.Background(), "my_topicA", "", kafka.NewConfig(), myListener)