Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

kafka: Add topic_prefix and escape topic names #1570

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions plugins/kafka/kafka_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type KafkaOutputConfig struct {
HashVariable string `toml:"hash_variable"` // HashPartitioner key is extracted from a message variable
TopicVariable string `toml:"topic_variable"` // Topic extracted from a message variable
Topic string // Static topic
TopicPrefix string `toml:"topic_prefix"` // String to be prepended to the topic

RequiredAcks string `toml:"required_acks"` // NoResponse, WaitForLocal, WaitForAll
Timeout uint32
Expand Down Expand Up @@ -317,9 +318,10 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er
go k.processKafkaErrors(or, errChan, &wg)

var (
pack *pipeline.PipelinePack
topic = k.config.Topic
key sarama.Encoder
pack *pipeline.PipelinePack
topic = k.config.Topic
prefix = k.config.TopicPrefix
key sarama.Encoder
)

for pack = range inChan {
Expand All @@ -334,7 +336,7 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er

if msgBytes, err := or.Encode(pack); err == nil {
if msgBytes != nil {
err = k.producer.QueueMessage(topic, key, sarama.ByteEncoder(msgBytes))
err = k.producer.QueueMessage(topicEscape(prefix+topic), key, sarama.ByteEncoder(msgBytes))
if err != nil {
atomic.AddInt64(&k.processMessageFailures, 1)
or.LogError(err)
Expand Down Expand Up @@ -371,6 +373,53 @@ func (k *KafkaOutput) CleanupForRestart() {
return
}

// Escapes a topic name by encoding any non-valid kafka topic name
func topicEscape(s string) string {
hexCount := 0
for i := 0; i < len(s); i++ {
if shouldEscape(s[i]) {
hexCount++
}
}

if hexCount == 0 {
return s
}

t := make([]byte, len(s)+2*hexCount)
j := 0
for i := 0; i < len(s); i++ {
c := s[i]
if shouldEscape(c) {
t[j] = '-'
t[j+1] = "0123456789ABCDEF"[c>>4]
t[j+2] = "0123456789ABCDEF"[c&15]
j += 3
} else {
t[j] = s[i]
j++
}
}
return string(t)
}

func shouldEscape(c byte) bool {
// List of valid kafka topic characters can be found at:
// https://github.com/apache/kafka/blob/43b92f8b1ce8140c432edf11b0c842f5fbe04120/core/src/main/scala/kafka/common/Topic.scala#L25
// val legalChars = "[a-zA-Z0-9\\._\\-]"
if 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || '0' <= c && c <= '9' {
return false
}

switch c {
case '.', '_', '-':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is one problem here, is the - character needs to be escaped, since that's being used as the escape character above on L394.

This will prevent backwards compatibility though if we escape -. So unsure how to proceed best without changing the topic names on people.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand. Can't we use a different character for escaping? Back-slashes aren't allowed in a topic name, but they could still be used as an escape character, right? I might be missing something, though...

return false
}

// Everything else must be escaped.
return true
}

func init() {
pipeline.RegisterPlugin("KafkaOutput", func() interface{} {
return new(KafkaOutput)
Expand Down