Skip to content

Commit

Permalink
examples: Add dup filtering to mqtt_relay (merbanan#3018)
Browse files Browse the repository at this point in the history
Keep information about the previous value sent.  If it's been 5
seconds, or new value is different (ignoring keys like snr and
frequency), then send it.  Otherwise, just don't.  This causes bursts
of e.g. 4 transmissions to result in one MQTT message, on the theory
that the 4 transmissions are not actually 4 messags, but a strategy to
transmit one message more reliably.

Define a new configuration option to enable duplicate filtering, and
default it to True.

Steal logging config from mqtt_filter.py, and add a configuration
option DEBUG that if True results in debug logging instead of info.
  • Loading branch information
gdt authored Aug 9, 2024
1 parent 09d6251 commit aed3b6f
Showing 1 changed file with 105 additions and 6 deletions.
111 changes: 105 additions & 6 deletions examples/rtl_433_mqtt_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,38 @@
from __future__ import print_function
from __future__ import with_statement

import socket
import json
import logging
import socket
import time

import paho.mqtt.client as mqtt


# The config class represents a config object. The constructor takes
# an optional pathname, and will switch on the suffix (.yaml for now)
# and read a dictionary.
class rtlconfig(object):

# Initialize with default values.
c = {
# Syslog socket configuration
# Log level info (False) or debug (True)
'DEBUG': False,

# Address to listen on for syslog/json messages from rtl_433
'UDP_IP': "127.0.0.1",
'UDP_PORT': 1433,

# MQTT broker configuration
# MQTT broker address and credentials
'MQTT_HOST': "127.0.0.1",
'MQTT_PORT': 1883,
'MQTT_USERNAME': None,
'MQTT_PASSWORD': None,
'MQTT_TLS': False,

# MQTT content
'MQTT_PREFIX': "sensor/rtl_433",
'MQTT_DEDUP': True,
'MQTT_INDIVIDUAL_TOPICS': True,
'MQTT_JSON_TOPIC': True,
}
Expand All @@ -68,17 +78,90 @@ def __init__(self, f=None):
def __getitem__(self, k):
return self.c[k]

class dedup(object):
""" A dedup class object supports deduping a stream of reports by
answering if a report is interesting relative to the history. While
more complicated deduping is allowed by the interface, for now it is
very simple, keeping track of only the previous interesting object.
For now, we more or less require that all reports have the same keys. """

# \todo Consider a cache with several entries.

def __init__(self):
# Make this long enough to skip repeats, but allow messages
# every 10s to come through.
self.duration = 5
# Exclude reception metadata (time and RF).
self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg')
# Initialize storage for what was last sent.
(self.last_report, self.last_now) = (None, None)

def send_store(self, report, n):
""" Record report, n as the last report declared interesting, and
return True (to denote interesting). """
(self.last_report, self.last_now) = (report, n)
return True

def equiv(self, j1, j2):
""" Return True if j1 and j2 are the same, except for boring_keys. """
for (k, v) in j1.items():
# If in boring, we don't care.
if k not in self.boring_keys:
# If in j1 and not j2, they are different.
if k not in j2:
logging.debug("equiv: %s in j1 and not j2" % (k))
return False
if j1[k] != j2[k]:
logging.debug("equiv: %s differs j1=%s and j2=%s" % (k, j1[k], j2[k]))
return False
# If the lengths are different, they must be different.
if len(j1) != len(j2):
logging.debug("equiv: len(j1) %d != len(j2) %d" % (len(j1), len(j2)))
return False

# If we get here, then the lengths are the same, and all
# non-boring keys in j1 exist in j2, and have the same value.
# It could be that j2 is missing a boring key and also has a
# new non-boring key, but boring keys in particular should not
# be variable.
return True

# report is a python dictionary
def is_interesting(self, report):
""" If report is intersting, return True and update records of the
most recent interesting report. Otherwise return False. """
n = time.time()

# If previous interesting is missing or empty, accept this one.
if self.last_report is None or self.last_now is None:
logging.debug("interesting: no previous")
return self.send_store(report, n)

# If previous one was too long ago, accept this one.
if n - self.last_now > self.duration:
logging.debug("interesting: time")
return self.send_store(report, n)

if not self.equiv(self.last_report, report):
logging.debug("interesting: different")
return self.send_store(report, n)

return False

# Create a config object, defaults modified by the config file if present.
c = rtlconfig("rtl_433_mqtt_relay.yaml")

# Create a dedup object for later use, even if it's configured off.
d = dedup()

def mqtt_connect(client, userdata, flags, rc):
"""Handle MQTT connection callback."""
print("MQTT connected: " + mqtt.connack_string(rc))
logging.info("MQTT connected: " + mqtt.connack_string(rc))


def mqtt_disconnect(client, userdata, rc):
"""Handle MQTT disconnection callback."""
print("MQTT disconnected: " + mqtt.connack_string(rc))
logging.info("MQTT disconnected: " + mqtt.connack_string(rc))


# Create listener for incoming json string packets.
Expand All @@ -100,6 +183,14 @@ def sanitize(text):
def publish_sensor_to_mqtt(mqttc, data, line):
"""Publish rtl_433 sensor data to MQTT."""

if c['MQTT_DEDUP']:
# If this data is not novel relative to recent data, just skip it.
# Otherwise, send it via MQTT.
if not d.is_interesting(data):
logging.debug(" not interesting")
return
logging.debug( "INTERESTING")

# Construct a topic from the information that identifies which
# device this frame is from.
# NB: id is only used if channel is not present.
Expand Down Expand Up @@ -166,6 +257,7 @@ def rtl_433_probe():
try:
line = parse_syslog(line)
data = json.loads(line)
logging.debug("received %s" % line)
publish_sensor_to_mqtt(mqttc, data, line)

except ValueError:
Expand All @@ -179,8 +271,15 @@ def run():
# uid
# gid
# working_directory
rtl_433_probe()

# Set up logging at INFO, and change to DEBUG if config asks for that.
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',datefmt='%Y-%m-%dT%H:%M:%S%z')
logging.getLogger().setLevel(logging.INFO)
if c['DEBUG']:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("DEBUG LOGGING ENABLED")

rtl_433_probe()

if __name__ == "__main__":
run()

0 comments on commit aed3b6f

Please sign in to comment.