aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlaise Thompson <blaise@untzag.com>2020-10-23 16:27:45 -0500
committerBlaise Thompson <blaise@untzag.com>2020-10-23 16:27:45 -0500
commit450cde3fbe417bcd1d846b64bc60f4633394b762 (patch)
tree36771987bd1a922e2b96981153ef9f7757477f25
initial commit
-rw-r--r--README.md18
-rw-r--r--write_influx.py91
2 files changed, 109 insertions, 0 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e2e925d
--- /dev/null
+++ b/README.md
@@ -0,0 +1,18 @@
+# mqtt
+
+documentation and source code for departmental mqtt server
+
+hosted at https://mosquitto.chem.wisc.edu
+
+## mosquitto
+
+https://mosquitto.org/
+
+## influxdb
+
+https://www.influxdata.com/
+
+## bridge daemon
+
+`write_influx.py`
+
diff --git a/write_influx.py b/write_influx.py
new file mode 100644
index 0000000..240b10b
--- /dev/null
+++ b/write_influx.py
@@ -0,0 +1,91 @@
+from influxdb import InfluxDBClient
+import os
+import paho.mqtt.client as mqtt
+import time
+import pathlib
+import datetime
+from dataclasses import dataclass, field
+from typing import List, Dict
+
+
+class Topic(dict):
+
+ def __init__(self):
+ super().__init__()
+ self["__value__"] = None
+
+ def __getitem__(self, key):
+ try:
+ return super().__getitem__(key)
+ except KeyError:
+ out = Topic()
+ self[key] = out
+ return out
+
+
+homie = Topic()
+
+
+def on_connect(client, userdata, flags, rc):
+ print("Connected with result code " + str(rc))
+ client.subscribe("homie/#")
+
+
+def on_message(client, userdata, msg):
+ topics = msg.topic.split("/")
+ payload = msg.payload.decode()
+ # fill topic tree
+ here = homie
+ for t in topics[1:]:
+ here = here[t]
+ here["__value__"] = payload
+ # tags
+ tags = {}
+ tags["device_id"] = topics[1]
+ # case of device attribute
+ device_attributes = ["$homie", "$name", "$state", "$nodes", "$extensions", "$implementation"]
+ if len(topics) == 3 and topics[2] in device_attributes:
+ measurement = topics[2]
+ fields = {"value": payload}
+ write_point(measurement, tags, fields)
+ # case of device node property
+ elif len(topics) == 4:
+ measurement = topics[3]
+ tags["node"] = topics[2]
+ datatype = homie[topics[1]][topics[2]][topics[3]]["$datatype"]["__value__"]
+ print(datatype, datatype=="float")
+ if datatype == "float":
+ fields = {"value": float(payload)}
+ else:
+ raise KeyError
+ write_point(measurement, tags, fields)
+
+
+influx_client = InfluxDBClient('localhost', 8086, 'root', 'root', "homie")
+influx_client.create_database("homie")
+
+
+def write_point(measurement, tags, fields):
+ json = {}
+ json["measurement"] = measurement
+ json["tags"] = tags
+ json["fields"] = fields
+ try:
+ influx_client.write_points([json])
+ except Exception as e:
+ print(e)
+ print(json)
+
+
+
+client = mqtt.Client()
+client.on_connect = on_connect
+client.on_message = on_message
+
+client.connect("mosquitto.chem.wisc.edu", 1883, 60)
+
+# Blocking call that processes network traffic, dispatches callbacks and
+# handles reconnecting.
+# Other loop*() functions are available that give a threaded interface and a
+# manual interface.
+client.loop_forever()