diff --git a/kafka_health_checker.go b/kafka_health_checker.go index da17660..bb18e90 100644 --- a/kafka_health_checker.go +++ b/kafka_health_checker.go @@ -12,18 +12,24 @@ type KafkaHealthChecker struct { Timeout int64 } -func NewKafkaHealthChecker(brokers []string, options ...string) *KafkaHealthChecker { +func NewHealthChecker(brokers []string, options ...string) *KafkaHealthChecker { var name string if len(options) >= 1 && len(options[0]) > 0 { name = options[0] } else { name = "kafka" } - return NewKafkaHealthCheckerWithTimeout(brokers, name, 4) + return NewKafkaHealthChecker(brokers, name, 4) } -func NewKafkaHealthCheckerWithTimeout(brokers []string, name string, timeout int64) *KafkaHealthChecker { - return &KafkaHealthChecker{brokers, name, timeout} +func NewKafkaHealthChecker(brokers []string, name string, timeouts ...int64) *KafkaHealthChecker { + var timeout int64 + if len(timeouts) >= 1 { + timeout = timeouts[0] + } else { + timeout = 4 + } + return &KafkaHealthChecker{Brokers: brokers, Service: name, Timeout: timeout} } func (s *KafkaHealthChecker) Name() string { diff --git a/producer.go b/producer.go index 6121616..2ca2d24 100644 --- a/producer.go +++ b/producer.go @@ -20,7 +20,11 @@ func NewProducer(writer *kafka.Writer, generateKey bool) (*Producer, error) { return &Producer{Writer: writer, Key: generateKey}, nil } -func NewProducerByConfig(c ProducerConfig, generateKey bool) (*Producer, error) { +func NewProducerByConfig(c ProducerConfig) (*Producer, error) { + generateKey := true + if c.Key != nil { + generateKey = *c.Key + } dialer := GetDialer(c.Client.Username, c.Client.Password, scram.SHA512, &kafka.Dialer{ Timeout: 30 * time.Second, DualStack: true, diff --git a/producer_config.go b/producer_config.go index 240ab19..cb633b8 100644 --- a/producer_config.go +++ b/producer_config.go @@ -4,4 +4,5 @@ type ProducerConfig struct { Brokers []string `mapstructure:"brokers"` Topic string `mapstructure:"topic"` Client ClientConfig `mapstructure:"client"` + Key *bool `mapstructure:"key"` }