Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Add decoder for Docker Fluentd [wip] #1666

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmake/externals.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ git_clone(https://github.com/golang/snappy eaa750b9bf4dcb7cb20454be850613b66cda3
git_clone(https://github.com/rafrombrc/sarama fda3e239249dd96f4a2c446aea39dfc823f4030a)
add_dependencies(sarama snappy)

git_clone(https://github.com/tinylib/msgp cd4fb1548c31d88af25205dc021be20935aec720)

if (INCLUDE_GEOIP)
add_external_plugin(git https://github.com/abh/geoip da130741c8ed2052f5f455d56e552f2e997e1ce9)
endif()
Expand Down
13 changes: 13 additions & 0 deletions docs/source/config/decoders/docker_fluentd.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.. _config_docker_fluentd_decoder:

Docker Fluentd Decoder
===================================

.. versionadded:: 0.10

| Plugin Name: **SandboxDecoder**
| File Name: **lua_decoders/docker_fluentd.lua**

.. include:: /../../sandbox/lua/decoders/docker_fluentd.lua
:start-after: --[[
:end-before: --]]
1 change: 1 addition & 0 deletions docs/source/config/decoders/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Available Decoder Plugins
:maxdepth: 1

apache_access
docker_fluentd
geoip
graylog_extended
linux_cpu_stats
Expand Down
3 changes: 3 additions & 0 deletions docs/source/config/decoders/index_noref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Decoders
.. include:: /config/decoders/apache_access.rst
:start-line: 1

.. include:: /config/decoders/docker_fluentd.rst
:start-line: 1

.. include:: /config/decoders/graylog_extended.rst
:start-line: 1

Expand Down
6 changes: 6 additions & 0 deletions docs/source/sandbox/decoder.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ Graylog Extended Log Format Decoder
:start-after: --[[
:end-before: --]]

Docker Fluentd Decoder
^^^^^^^^^^^^^^^^^^^^^^
.. include:: ../../../sandbox/lua/decoders/docker_fluentd.lua
:start-after: --[[
:end-before: --]]

Linux CPU Stats Decoder
^^^^^^^^^^^^^^^^^^^^^^^
.. include:: /../../sandbox/lua/decoders/linux_procstat.lua
Expand Down
11 changes: 11 additions & 0 deletions docs/source/sandbox/module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ Message Interpolation Module
:start-after: --[[
:end-before: --]]

.. _sandbox_msgpack_module:

MessagePack Module
------------------

.. versionadded:: 0.10

.. include:: ../../../sandbox/lua/modules/msgpack.lua
:start-after: --[=[
:end-before: --]=]

.. _sandbox_elasticsearch_module:

ElasticSearch Module
Expand Down
1 change: 1 addition & 0 deletions pipeline/all_specs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestAllSpecs(t *testing.T) {
r.AddSpec(SplitterRunnerSpec)
r.AddSpec(StatAccumInputSpec)
r.AddSpec(TokenSpec)
r.AddSpec(MessagePackSpec)

gospec.MainGoTest(r, t)
}
Expand Down
44 changes: 44 additions & 0 deletions pipeline/splitters.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"github.com/mozilla-services/heka/message"
"github.com/tinylib/msgp/msgp"
"hash"
"regexp"
)
Expand Down Expand Up @@ -273,6 +274,46 @@ func (h *HekaFramingSplitter) UnframeRecord(framed []byte, pack *PipelinePack) [
return unframed
}

type MessagePackSplitter struct {
*MessagePackSplitterConfig
sr SplitterRunner
}

type MessagePackSplitterConfig struct {
UseMsgBytes bool `toml:"use_message_bytes"`
}

func (m *MessagePackSplitter) Init(config interface{}) error {
m.MessagePackSplitterConfig = config.(*MessagePackSplitterConfig)
return nil
}

func (m *MessagePackSplitter) SetSplitterRunner(sr SplitterRunner) {
m.sr = sr
}

func (m *MessagePackSplitter) ConfigStruct() interface{} {
return &MessagePackSplitterConfig{
UseMsgBytes: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UseMsgBytes config option is automatically supported by Heka for every splitter, and you're not changing the default, so I don't think there's any reason for you to have a UseMsgBytes option. Which in turn means you don't even need a custom config struct.

}
}

func (m *MessagePackSplitter) FindRecord(buf []byte) (bytesRead int, record []byte) {
for {
if remainingBytes, err := msgp.Skip(buf); err == nil {
recordSize := len(buf) - len(remainingBytes)
bytesRead += recordSize
return bytesRead, buf[:recordSize]
} else if err == msgp.ErrShortBytes {
return bytesRead, nil // read more data
} else {
m.sr.LogError(err)
buf = buf[1:]
bytesRead++
}
}
}

func init() {
RegisterPlugin("NullSplitter", func() interface{} {
return &NullSplitter{}
Expand All @@ -286,4 +327,7 @@ func init() {
RegisterPlugin("HekaFramingSplitter", func() interface{} {
return &HekaFramingSplitter{}
})
RegisterPlugin("MessagePackSplitter", func() interface{} {
return &MessagePackSplitter{}
})
}
40 changes: 40 additions & 0 deletions pipeline/splitters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,3 +426,43 @@ func HekaFramingSpec(c gs.Context) {
})
})
}

func MessagePackSpec(c gs.Context) {
c.Specify("A MessagePack splitter", func() {
splitter := &MessagePackSplitter{}
config := splitter.ConfigStruct().(*MessagePackSplitterConfig)
sRunner := makeSplitterRunner("MessagePackSplitter", splitter)

c.Specify("splits records", func() {
b := []byte("\x95\x81\xa4\x74\x68\x69\x73\xa2\x69\x73\x81\xa4\x6a\x75\x73\x74\xa4\x74\x65\x73\x74\x81\xa4\x64\x61\x74\x61\xa3\x6c\x6f\x6c\x81\xaa\x6c\x65\x74\x27\x73\x20\x68\x61\x76\x65\xb4\x73\x6f\x6d\x65\x20\x6d\x6f\x72\x65\x20\x62\x79\x74\x65\x73\x21\x21\x21\x21\x21\xcd\x04\xd2\x95\x81\xa4\x74\x68\x69\x73\xa2\x69\x73\x81\xa4\x6a\x75\x73\x74\xa4\x74\x65\x73\x74\x81\xa4\x64\x61\x74\x61\xa3\x6c\x6f\x6c\x81\xaa\x6c\x65\x74\x27\x73\x20\x68\x61\x76\x65\xb4\x73\x6f\x6d\x65\x20\x6d\x6f\x72\x65\x20\x62\x79\x74\x65\x73\x21\x21\x21\x21\x21\xcd\x04\xd2\xc1\xc1\x95\x81\xa4\x74\x68\x69\x73\xa2\x69\x73\x81\xa4\x6a\x75\x73\x74\xa4\x74\x65\x73\x74\x81\xa4\x64\x61\x74\x61\xa3\x6c\x6f\x6c\x81\xaa\x6c\x65\x74\x27\x73\x20\x68\x61\x76\x65\xb4\x73\x6f\x6d\x65\x20\x6d\x6f\x72\x65\x20\x62\x79\x74\x65\x73\x21\x21\x21\x21\x21\xcd\x04\xd2\x9b\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09")
reader := bytes.NewReader(b)
err := splitter.Init(config)
c.Assume(err, gs.IsNil)

n, record, err := sRunner.GetRecordFromStream(reader)
c.Expect(n, gs.Equals, 67)
c.Expect(err, gs.IsNil)
c.Expect(string(record), gs.Equals, string(b[:67]))

n, record, err = sRunner.GetRecordFromStream(reader)
c.Expect(n, gs.Equals, 67)
c.Expect(err, gs.IsNil)
c.Expect(string(record), gs.Equals, string(b[67:134]))

n, record, err = sRunner.GetRecordFromStream(reader)
c.Expect(n, gs.Equals, 69) // skips the invalid data (\xc1\xc1)
c.Expect(err, gs.IsNil)
c.Expect(string(record), gs.Equals, string(b[136:203]))

n, record, err = sRunner.GetRecordFromStream(reader) // trigger the need to read more data
c.Expect(n, gs.Equals, 0)
c.Expect(err, gs.IsNil)
c.Expect(len(record), gs.Equals, 0)

n, record, err = sRunner.GetRecordFromStream(reader) // hit the EOF
c.Expect(n, gs.Equals, 0)
c.Expect(err, gs.Equals, io.EOF)
c.Expect(len(record), gs.Equals, 0)
})
})
}
113 changes: 113 additions & 0 deletions sandbox/lua/decoders/docker_fluentd.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

--[[
Decode data from Docker's Fluentd logging driver.

**Note**: The Fluentd logging driver is available in Docker 1.8.0rc1 and later.

Config:

- type (string, optional, default nil):
Sets the message 'Type' header to the specified value

*Example Heka Configuration*

.. code-block:: ini

[FluentdInput]
type = "TcpInput"
address = ":24224"
splitter = "MessagePackSplitter"
decoder = "DockerFluentdDecoder"

[MessagePackSplitter]
# No config

[DockerFluentdDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/docker_fluentd.lua"

[DockerFluentdDecoder.config]
type = "docker-fluentd"

*Example Heka Message*

.. code-block:: bash

docker run \
--log-driver fluentd \
--log-opt fluentd-address=YOUR_HEKA:24224 \
-d busybox \
echo Hello world

This command should generate something like this:

:Timestamp: 2015-08-03 20:41:06 +0000 UTC
:Type: docker-fluentd
:Hostname: 192.168.59.103:60088
:Pid: 0
:Uuid: da45d947-037d-4870-abbe-671d820ebe8d
:Logger: stdout
:Payload: Hello world
:EnvVersion:
:Severity: 7
:Fields:
| name:"container_name" type:string value:"/suspicious_meitner"
| name:"tag" type:string value:"docker.7dc19982364b"
| name:"container_id" type:string value:"7dc19982364ba459958041d2fe85e8bdc3825d06397296ddd981c51e5f15cb89"

--]]

local mp = require "msgpack"

local msg_type = read_config("type")

local msg = {
Timestamp = nil,
EnvVersion = nil,
-- Hostname = nil,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably keep this value in the table and explicitly propagate the incoming message's Hostname value since in case the input has already set it to a useful value by the time splitting happens.

Type = msg_type,
Payload = nil,
Fields = nil,
Severity = nil
}

-- Unpack and validate Fluentd message pack
function decode(mpac)
local ok, data = pcall(mp.unpack, mpac)
if not ok then
return "MessagePack decode error"
end

if type(data) ~= "table" then
return "Wrong format"
end

tag, timestamp, record = unpack(data)

if type(tag) ~= "string" or type(timestamp) ~= "number" or type(record) ~= "table" then
return "Wrong format"
end

return nil, tag, timestamp, record
end


function process_message()
err, tag, timestamp, record = decode(read_message("Payload"))
if err ~= nil then return -1, err end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where you'd add msg.Hostname = read_message("Hostname").

msg.Timestamp = timestamp * 1e9
msg.Payload = record["log"]
msg.Logger = record["source"]
record["source"] = nil
record["log"] = nil

record["tag"] = tag
msg.Fields = record

if not pcall(inject_message, msg) then return -1 end
return 0
end
Loading