From 715202094891b7c26987f16048e59e79effe9ef5 Mon Sep 17 00:00:00 2001 From: Matt Robenolt Date: Tue, 9 Jun 2015 19:02:03 -0700 Subject: [PATCH] kafka: Add `topic_prefix` and escape topic names These are especially useful when paired with `topic_variable`. Since the variable is being extracted from a field, we can't guarantee that the characters being used for the topic name are valid, so we implement a slightly modified version of stdlib's `QueryEncode` which sanitizes the name to the range of valid topic chars. Example: ```toml [KafkaOutput] topic_prefix = "heka-" topic_variable = "Fields[ContainerName]" ``` --- plugins/kafka/kafka_output.go | 57 ++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/plugins/kafka/kafka_output.go b/plugins/kafka/kafka_output.go index 68311d26c..3dac890ca 100644 --- a/plugins/kafka/kafka_output.go +++ b/plugins/kafka/kafka_output.go @@ -48,6 +48,7 @@ type KafkaOutputConfig struct { HashVariable string `toml:"hash_variable"` // HashPartitioner key is extracted from a message variable TopicVariable string `toml:"topic_variable"` // Topic extracted from a message variable Topic string // Static topic + TopicPrefix string `toml:"topic_prefix"` // String to be prepended to the topic RequiredAcks string `toml:"required_acks"` // NoResponse, WaitForLocal, WaitForAll Timeout uint32 @@ -317,9 +318,10 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er go k.processKafkaErrors(or, errChan, &wg) var ( - pack *pipeline.PipelinePack - topic = k.config.Topic - key sarama.Encoder + pack *pipeline.PipelinePack + topic = k.config.Topic + prefix = k.config.TopicPrefix + key sarama.Encoder ) for pack = range inChan { @@ -334,7 +336,7 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er if msgBytes, err := or.Encode(pack); err == nil { if msgBytes != nil { - err = k.producer.QueueMessage(topic, key, sarama.ByteEncoder(msgBytes)) + err = k.producer.QueueMessage(topicEscape(prefix+topic), key, sarama.ByteEncoder(msgBytes)) if err != nil { atomic.AddInt64(&k.processMessageFailures, 1) or.LogError(err) @@ -371,6 +373,53 @@ func (k *KafkaOutput) CleanupForRestart() { return } +// Escapes a topic name by encoding any non-valid kafka topic name +func topicEscape(s string) string { + hexCount := 0 + for i := 0; i < len(s); i++ { + if shouldEscape(s[i]) { + hexCount++ + } + } + + if hexCount == 0 { + return s + } + + t := make([]byte, len(s)+2*hexCount) + j := 0 + for i := 0; i < len(s); i++ { + c := s[i] + if shouldEscape(c) { + t[j] = '-' + t[j+1] = "0123456789ABCDEF"[c>>4] + t[j+2] = "0123456789ABCDEF"[c&15] + j += 3 + } else { + t[j] = s[i] + j++ + } + } + return string(t) +} + +func shouldEscape(c byte) bool { + // List of valid kafka topic characters can be found at: + // https://github.com/apache/kafka/blob/43b92f8b1ce8140c432edf11b0c842f5fbe04120/core/src/main/scala/kafka/common/Topic.scala#L25 + // val legalChars = "[a-zA-Z0-9\\._\\-]" + if 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || '0' <= c && c <= '9' { + return false + } + + switch c { + case '.', '_', '-': + return false + } + + // Everything else must be escaped. + return true +} + func init() { pipeline.RegisterPlugin("KafkaOutput", func() interface{} { return new(KafkaOutput)