Skip to content

Commit

Permalink
mqtt 5
Browse files Browse the repository at this point in the history
  • Loading branch information
everesio committed Dec 29, 2022
1 parent c63766a commit 78fe285
Show file tree
Hide file tree
Showing 47 changed files with 2,156 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

.PHONY: clean build fmt test

TAG ?= v0.1.1
TAG ?= v0.2.0

BUILD_FLAGS ?=
BINARY ?= mqtt-proxy
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ MQTT Proxy allows MQTT clients to send messages to other messaging systems

* MQTT protocol
* [x] [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html)
* [ ] [MQTT 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html)
* [x] [MQTT 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html)
* Publisher
* [x] Noop
* [x] [Apache Kafka](https://kafka.apache.org/)
Expand Down Expand Up @@ -95,6 +95,9 @@ prerequisites
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 0" --repeat 1 -q 0
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 1" --repeat 1 -q 1
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 2" --repeat 1 -q 2
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 0 / v5" --repeat 1 -q 0 -V mqttv5
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 1 / v5" --repeat 1 -q 1 -V mqttv5
docker exec -it mqtt-client mosquitto_pub -L mqtt://mqtt-proxy:1883/dummy -m "test qos 2 / v5" --repeat 1 -q 2 -V mqttv5
```
* proxy using Kafka SSL listener
Expand Down
2 changes: 1 addition & 1 deletion apis/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

const (
AuthAccepted = mqttproto.Accepted
AuthUnauthorized = mqttproto.RefusedNotAuthorized
AuthUnauthorized = mqttproto.RefusedBadUserNameOrPassword
)

type UserPasswordAuthRequest struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/mqtt/codec/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

mqttproto "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/proto"
mqtt311 "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/v311"
mqtt5 "github.com/grepplabs/mqtt-proxy/pkg/mqtt/codec/v5"
)

func ReadPacket(r io.Reader, protocolVersion byte) (mqttproto.ControlPacket, error) {
Expand All @@ -26,7 +27,7 @@ func ReadPacket(r io.Reader, protocolVersion byte) (mqttproto.ControlPacket, err
case mqttproto.MQTT_3_1_1:
return mqtt311.ReadPacket(r)
case mqttproto.MQTT_5:
return nil, mqtt311.NewConnAckError(mqttproto.RefusedUnacceptableProtocolVersion, "mqtt5 is not supported yet")
return mqtt5.ReadPacket(r)
default:
return nil, mqtt311.NewConnAckError(mqttproto.RefusedUnacceptableProtocolVersion, fmt.Sprintf("unsupported protocol version %v", protocolVersion))
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/mqtt/codec/proto/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const (

// MQTT 5 - 3.2.2.2 Connect Reason Code
const (
RefusedUnspecifiedError byte = 0x80 // 128
RefusedUnsupportedProtocolVersion byte = 0x84 // 132
RefusedV5UnspecifiedError byte = 0x80 // 128
RefusedV5UnsupportedProtocolVersion byte = 0x84 // 132
RefusedV5BadUserNameOrPassword byte = 0x86 // 134
)
19 changes: 15 additions & 4 deletions pkg/mqtt/codec/proto/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ func EncodeUint16(v uint16) []byte {
return b
}

func DecodeLength(r io.Reader) (int, error) {
func DecodeUvarint(r io.Reader) (int, error) {
byteReader := newByteReader(r)
length, err := binary.ReadUvarint(byteReader)
if err != nil {
return 0, err
}
if byteReader.bytesRead > 4 {
return 0, fmt.Errorf("the maximum number of bytes in the length is 4, but was %d", byteReader.bytesRead)
return 0, fmt.Errorf("the maximum number of bytes in the variable byte integer is 4, but was %d", byteReader.bytesRead)
}
return int(length), nil
}

// WriteLength is a modified binary.PutUvarint
func WriteLength(buffer *bytes.Buffer, x uint32) int {
// WriteUvarint is a modified binary.PutUvarint
func WriteUvarint(buffer *bytes.Buffer, x uint32) int {
i := 0
for x >= 0x80 {
buffer.WriteByte(byte(x) | 0x80)
Expand Down Expand Up @@ -104,3 +104,14 @@ func (r *byteReader) ReadByte() (byte, error) {
r.bytesRead += n
return r.buf[0], nil
}

type CountingReader struct {
Reader io.Reader
BytesRead int
}

func (r *CountingReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.BytesRead += n
return n, err
}
4 changes: 2 additions & 2 deletions pkg/mqtt/codec/proto/fixedheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func (fh *FixedHeader) Unpack(typeAndFlags byte, r io.Reader) (err error) {
fh.Dup = (typeAndFlags & 0x08) == 0x08
fh.Qos = (typeAndFlags & 0x06) >> 1
fh.Retain = (typeAndFlags & 0x01) != 0
fh.RemainingLength, err = DecodeLength(r)
fh.RemainingLength, err = DecodeUvarint(r)
return err
}

func (fh *FixedHeader) Pack() bytes.Buffer {
var header bytes.Buffer
header.WriteByte(fh.getFixedHeaderByte1())
WriteLength(&header, uint32(fh.RemainingLength))
WriteUvarint(&header, uint32(fh.RemainingLength))
return header
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/mqtt/codec/proto/fixedheader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ func TestFixedHeaderDecodeError(t *testing.T) {
{
name: "remaining length too long",
input: "008080808001",
expected: "the maximum number of bytes in the length is 4, but was 5",
expected: "the maximum number of bytes in the variable byte integer is 4, but was 5",
},
{
name: "remaining length too long",
input: "00808080808001",
expected: "the maximum number of bytes in the length is 4, but was 6",
expected: "the maximum number of bytes in the variable byte integer is 4, but was 6",
},
}
for _, tc := range tests {
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/connack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewConnackPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.CONNACK).(*ConnackPacket)
a.Equal(mqttproto.CONNACK, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -78,6 +79,7 @@ func TestConnackPacketCodec(t *testing.T) {
}
packet := decoded.(*ConnackPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewConnectPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.CONNECT).(*ConnectPacket)
a.Equal(mqttproto.CONNECT, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -115,6 +116,7 @@ func TestDecodeConnectPacket(t *testing.T) {
}
packet := decoded.(*ConnectPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/disconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewDisconnectPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.DISCONNECT).(*DisconnectPacket)
a.Equal(mqttproto.DISCONNECT, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -52,6 +53,7 @@ func TestDisconnectPacketCodec(t *testing.T) {
}
packet := decoded.(*DisconnectPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/pingreq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewPingreqPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.PINGREQ).(*PingreqPacket)
a.Equal(mqttproto.PINGREQ, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -52,6 +53,7 @@ func TestPingreqPacketCodec(t *testing.T) {
}
packet := decoded.(*PingreqPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/pingresp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewPingrespPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.PINGRESP).(*PingrespPacket)
a.Equal(mqttproto.PINGRESP, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -52,6 +53,7 @@ func TestPingrespPacketCodec(t *testing.T) {
}
packet := decoded.(*PingrespPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/puback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewPubackPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.PUBACK).(*PubackPacket)
a.Equal(mqttproto.PUBACK, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -64,6 +65,7 @@ func TestPubackPacketCodec(t *testing.T) {
}
packet := decoded.(*PubackPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/pubcomp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewPubcompPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.PUBCOMP).(*PubcompPacket)
a.Equal(mqttproto.PUBCOMP, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -64,6 +65,7 @@ func TestPubcompPacketCodec(t *testing.T) {
}
packet := decoded.(*PubcompPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
14 changes: 6 additions & 8 deletions pkg/mqtt/codec/v311/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,25 @@ func (p *PublishPacket) Write(w io.Writer) (err error) {
return err
}

func (p *PublishPacket) Unpack(b io.Reader) (err error) {
func (p *PublishPacket) Unpack(r io.Reader) (err error) {
var payloadLength = p.FixedHeader.RemainingLength

p.TopicName, err = mqttproto.DecodeString(b)
cr := &mqttproto.CountingReader{Reader: r}
p.TopicName, err = mqttproto.DecodeString(cr)
if err != nil {
return err
}

if p.Qos > 0 {
p.MessageID, err = mqttproto.DecodeUint16(b)
p.MessageID, err = mqttproto.DecodeUint16(cr)
if err != nil {
return err
}
payloadLength -= len(p.TopicName) + 4
} else {
payloadLength -= len(p.TopicName) + 2
}
payloadLength -= cr.BytesRead
if payloadLength < 0 {
return fmt.Errorf("error unpacking publish, payload length < 0")
}
p.Message = make([]byte, payloadLength)
_, err = io.ReadFull(b, p.Message)
_, err = io.ReadFull(cr, p.Message)
return err
}
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewPublishPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.PUBLISH).(*PublishPacket)
a.Equal(mqttproto.PUBLISH, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -124,6 +125,7 @@ func TestPublishPacketCodec(t *testing.T) {
}
packet := decoded.(*PublishPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/pubrec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewPubrecPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.PUBREC).(*PubrecPacket)
a.Equal(mqttproto.PUBREC, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -64,6 +65,7 @@ func TestPubrecPacketCodec(t *testing.T) {
}
packet := decoded.(*PubrecPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/pubrel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewPubrelPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.PUBREL).(*PubrelPacket)
a.Equal(mqttproto.PUBREL, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -66,6 +67,7 @@ func TestPubrelPacketCodec(t *testing.T) {
}
packet := decoded.(*PubrelPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/suback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewSubackPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.SUBACK).(*SubackPacket)
a.Equal(mqttproto.SUBACK, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -78,6 +79,7 @@ func TestSubackPacketCodec(t *testing.T) {
}
packet := decoded.(*SubackPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewSubscribePacket(t *testing.T) {
packet := NewControlPacket(mqttproto.SUBSCRIBE).(*SubscribePacket)
a.Equal(mqttproto.SUBSCRIBE, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -92,6 +93,7 @@ func TestSubscribePacketCodec(t *testing.T) {
}
packet := decoded.(*SubscribePacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/unsuback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewUnsubackPacket(t *testing.T) {
packet := NewControlPacket(mqttproto.UNSUBACK).(*UnsubackPacket)
a.Equal(mqttproto.UNSUBACK, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -64,6 +65,7 @@ func TestUnsubackPacketCodec(t *testing.T) {
}
packet := decoded.(*UnsubackPacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/mqtt/codec/v311/unsubscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestNewUnsubscribePacket(t *testing.T) {
packet := NewControlPacket(mqttproto.UNSUBSCRIBE).(*UnsubscribePacket)
a.Equal(mqttproto.UNSUBSCRIBE, packet.MessageType)
a.Equal(mqttproto.MqttMessageTypeNames[packet.MessageType], packet.Name())
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())
t.Log(packet)
}

Expand Down Expand Up @@ -65,6 +66,7 @@ func TestUnsubscribePacketCodec(t *testing.T) {
}
packet := decoded.(*UnsubscribePacket)
a.Equal(*tc.packet, *packet)
a.Equal(mqttproto.MQTT_3_1_1, packet.Version())

// encode
var output bytes.Buffer
Expand Down
Loading

0 comments on commit 78fe285

Please sign in to comment.