Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
kafka: Add topic_prefix and escape topic names
Browse files Browse the repository at this point in the history
These are especially useful when paired with `topic_variable`.
Since the variable is being extracted from a field, we can't guarantee
that the characters being used for the topic name are valid, so
we implement a slightly modified version of stdlib's `QueryEncode`
which sanitizes the name to the range of valid topic chars.

Example:

```toml
[KafkaOutput]
topic_prefix = "heka-"
topic_variable = "Fields[ContainerName]"
```
  • Loading branch information
mattrobenolt committed Jun 10, 2015
1 parent 32cadf8 commit 8a015b6
Showing 1 changed file with 53 additions and 4 deletions.
57 changes: 53 additions & 4 deletions plugins/kafka/kafka_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type KafkaOutputConfig struct {
HashVariable string `toml:"hash_variable"` // HashPartitioner key is extracted from a message variable
TopicVariable string `toml:"topic_variable"` // Topic extracted from a message variable
Topic string // Static topic
TopicPrefix string `toml:"topic_prefix"` // String to be prepended to the topic

RequiredAcks string `toml:"required_acks"` // NoResponse, WaitForLocal, WaitForAll
Timeout uint32
Expand Down Expand Up @@ -317,9 +318,10 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er
go k.processKafkaErrors(or, errChan, &wg)

var (
pack *pipeline.PipelinePack
topic = k.config.Topic
key sarama.Encoder
pack *pipeline.PipelinePack
topic = k.config.Topic
prefix = k.config.TopicPrefix
key sarama.Encoder
)

for pack = range inChan {
Expand All @@ -334,7 +336,7 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er

if msgBytes, err := or.Encode(pack); err == nil {
if msgBytes != nil {
err = k.producer.QueueMessage(topic, key, sarama.ByteEncoder(msgBytes))
err = k.producer.QueueMessage(topicEscape(prefix+topic), key, sarama.ByteEncoder(msgBytes))
if err != nil {
atomic.AddInt64(&k.processMessageFailures, 1)
or.LogError(err)
Expand Down Expand Up @@ -371,6 +373,53 @@ func (k *KafkaOutput) CleanupForRestart() {
return
}

// Escapes a topic name
func topicEscape(s string) string {
spaceCount, hexCount := 0, 0
for i := 0; i < len(s); i++ {
c := s[i]
if shouldEscape(c) {
if c == ' ' {
spaceCount++
} else {
hexCount++
}
}
}

if spaceCount == 0 && hexCount == 0 {
return s
}

t := make([]byte, len(s)+2*hexCount)
j := 0
for i := 0; i < len(s); i++ {
switch c := s[i]; {
case c == ' ':
t[j] = '+'
j++
case shouldEscape(c):
t[j] = '-'
t[j+1] = "0123456789ABCDEF"[c>>4]
t[j+2] = "0123456789ABCDEF"[c&15]
j += 3
default:
t[j] = s[i]
j++
}
}
return string(t)
}

func shouldEscape(c byte) bool {
// §2.3 Unreserved characters (alphanum)
if 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || '0' <= c && c <= '9' {
return false
}
// Everything else must be escaped.
return true
}

func init() {
pipeline.RegisterPlugin("KafkaOutput", func() interface{} {
return new(KafkaOutput)
Expand Down

0 comments on commit 8a015b6

Please sign in to comment.