diff --git a/consumer.go b/consumer.go index 5c5677b..1af6205 100644 --- a/consumer.go +++ b/consumer.go @@ -29,22 +29,23 @@ func NewConsumerByConfig(c ConsumerConfig, ackOnConsume bool) (*Consumer, error) return NewConsumer(reader, ackOnConsume) } -func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) { +func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) { for { msg, err := c.Reader.FetchMessage(ctx) if err != nil { - caller.Call(ctx, nil, err) + handle(ctx, nil, err) + } else { + attributes := HeaderToMap(msg.Headers) + message := mq.Message{ + Id: string(msg.Key), + Data: msg.Value, + Attributes: attributes, + Raw: msg, + } + if c.AckOnConsume { + c.Reader.CommitMessages(ctx, msg) + } + handle(ctx, &message, nil) } - attributes := HeaderToMap(msg.Headers) - message := mq.Message{ - Id: string(msg.Key), - Data: msg.Value, - Attributes: attributes, - Raw: msg, - } - if c.AckOnConsume { - c.Reader.CommitMessages(ctx, msg) - } - caller.Call(ctx, &message, nil) } } diff --git a/consumer_config.go b/consumer_config.go index 709d3f4..e47b589 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -3,11 +3,11 @@ package kafka import "time" type ConsumerConfig struct { - Brokers []string `mapstructure:"brokers"` - GroupID string `mapstructure:"group_id"` - Topic string `mapstructure:"topic"` - Client ClientConfig `mapstructure:"client"` - MinBytes *int `mapstructure:"min_bytes"` - MaxBytes int `mapstructure:"max_bytes"` - CommitInterval time.Duration `mapstructure:"commit_interval"` + Brokers []string `mapstructure:"brokers"` + GroupID string `mapstructure:"group_id"` + Topic string `mapstructure:"topic"` + Client ClientConfig `mapstructure:"client"` + MinBytes *int `mapstructure:"min_bytes"` + MaxBytes int `mapstructure:"max_bytes"` + CommitInterval *time.Duration `mapstructure:"commit_interval"` } diff --git a/producer.go b/producer.go index 7bca1b4..6121616 100644 --- a/producer.go +++ b/producer.go @@ -30,10 +30,10 @@ func NewProducerByConfig(c ProducerConfig, generateKey bool) (*Producer, error) return NewProducer(writer, generateKey) } -func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes *map[string]string) (string, error) { +func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes map[string]string) (string, error) { msg := kafka.Message{Value: data} if messageAttributes != nil { - msg.Headers = MapToHeader(*messageAttributes) + msg.Headers = MapToHeader(messageAttributes) } if p.Key { id := strings.Replace(uuid.New().String(), "-", "", -1) diff --git a/reader.go b/reader.go index d118ef7..021379f 100644 --- a/reader.go +++ b/reader.go @@ -4,13 +4,13 @@ import "github.com/segmentio/kafka-go" func NewReader(c ConsumerConfig, dialer *kafka.Dialer) *kafka.Reader { c2 := kafka.ReaderConfig{ - Brokers: c.Brokers, - GroupID: c.GroupID, - Topic: c.Topic, - Dialer: dialer, + Brokers: c.Brokers, + GroupID: c.GroupID, + Topic: c.Topic, + Dialer: dialer, } - if c.CommitInterval > 0 { - c2.CommitInterval = c.CommitInterval + if c.CommitInterval != nil { + c2.CommitInterval = *c.CommitInterval } if c.MinBytes != nil && *c.MinBytes >= 0 { c2.MinBytes = *c.MinBytes diff --git a/writer.go b/writer.go index c703794..6ae6787 100644 --- a/writer.go +++ b/writer.go @@ -1,8 +1,6 @@ package kafka -import ( - "github.com/segmentio/kafka-go" -) +import "github.com/segmentio/kafka-go" func NewWriter(topic string, brokers []string, dialer *kafka.Dialer) *kafka.Writer { writer := kafka.NewWriter(kafka.WriterConfig{