Skip to content

Commit

Permalink
update to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
huydatabc committed Jan 31, 2021
1 parent 54ffe70 commit 17a4871
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 31 deletions.
27 changes: 14 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,23 @@ func NewConsumerByConfig(c ConsumerConfig, ackOnConsume bool) (*Consumer, error)
return NewConsumer(reader, ackOnConsume)
}

func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
for {
msg, err := c.Reader.FetchMessage(ctx)
if err != nil {
caller.Call(ctx, nil, err)
handle(ctx, nil, err)
} else {
attributes := HeaderToMap(msg.Headers)
message := mq.Message{
Id: string(msg.Key),
Data: msg.Value,
Attributes: attributes,
Raw: msg,
}
if c.AckOnConsume {
c.Reader.CommitMessages(ctx, msg)
}
handle(ctx, &message, nil)
}
attributes := HeaderToMap(msg.Headers)
message := mq.Message{
Id: string(msg.Key),
Data: msg.Value,
Attributes: attributes,
Raw: msg,
}
if c.AckOnConsume {
c.Reader.CommitMessages(ctx, msg)
}
caller.Call(ctx, &message, nil)
}
}
14 changes: 7 additions & 7 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package kafka
import "time"

type ConsumerConfig struct {
Brokers []string `mapstructure:"brokers"`
GroupID string `mapstructure:"group_id"`
Topic string `mapstructure:"topic"`
Client ClientConfig `mapstructure:"client"`
MinBytes *int `mapstructure:"min_bytes"`
MaxBytes int `mapstructure:"max_bytes"`
CommitInterval time.Duration `mapstructure:"commit_interval"`
Brokers []string `mapstructure:"brokers"`
GroupID string `mapstructure:"group_id"`
Topic string `mapstructure:"topic"`
Client ClientConfig `mapstructure:"client"`
MinBytes *int `mapstructure:"min_bytes"`
MaxBytes int `mapstructure:"max_bytes"`
CommitInterval *time.Duration `mapstructure:"commit_interval"`
}
4 changes: 2 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func NewProducerByConfig(c ProducerConfig, generateKey bool) (*Producer, error)
return NewProducer(writer, generateKey)
}

func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes *map[string]string) (string, error) {
func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes map[string]string) (string, error) {
msg := kafka.Message{Value: data}
if messageAttributes != nil {
msg.Headers = MapToHeader(*messageAttributes)
msg.Headers = MapToHeader(messageAttributes)
}
if p.Key {
id := strings.Replace(uuid.New().String(), "-", "", -1)
Expand Down
12 changes: 6 additions & 6 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import "github.com/segmentio/kafka-go"

func NewReader(c ConsumerConfig, dialer *kafka.Dialer) *kafka.Reader {
c2 := kafka.ReaderConfig{
Brokers: c.Brokers,
GroupID: c.GroupID,
Topic: c.Topic,
Dialer: dialer,
Brokers: c.Brokers,
GroupID: c.GroupID,
Topic: c.Topic,
Dialer: dialer,
}
if c.CommitInterval > 0 {
c2.CommitInterval = c.CommitInterval
if c.CommitInterval != nil {
c2.CommitInterval = *c.CommitInterval
}
if c.MinBytes != nil && *c.MinBytes >= 0 {
c2.MinBytes = *c.MinBytes
Expand Down
4 changes: 1 addition & 3 deletions writer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kafka

import (
"github.com/segmentio/kafka-go"
)
import "github.com/segmentio/kafka-go"

func NewWriter(topic string, brokers []string, dialer *kafka.Dialer) *kafka.Writer {
writer := kafka.NewWriter(kafka.WriterConfig{
Expand Down

0 comments on commit 17a4871

Please sign in to comment.