aboutsummaryrefslogtreecommitdiff
path: root/write-influx
diff options
context:
space:
mode:
authorBlaise Thompson <blaise@untzag.com>2020-10-28 17:44:13 -0500
committerBlaise Thompson <blaise@untzag.com>2020-10-28 17:44:13 -0500
commitd3e6cae7ea36b695835c4b1f98a0a91c7e5c6ec2 (patch)
tree68f98d5c926689ee22d7fb796e3d9ce5ebf68783 /write-influx
parente099ed460a4e0be3ff99b684e537b9678cf3f449 (diff)
initial working docker-compose
Diffstat (limited to 'write-influx')
-rw-r--r--write-influx/dockerfile4
-rw-r--r--write-influx/requirements.txt2
-rw-r--r--write-influx/write_influx.py90
3 files changed, 96 insertions, 0 deletions
diff --git a/write-influx/dockerfile b/write-influx/dockerfile
new file mode 100644
index 0000000..fa509f8
--- /dev/null
+++ b/write-influx/dockerfile
@@ -0,0 +1,4 @@
+FROM python:3.8
+COPY . ./
+RUN pip install --no-cache-dir -r requirements.txt
+CMD ["python", "-u", "write_influx.py"]
diff --git a/write-influx/requirements.txt b/write-influx/requirements.txt
new file mode 100644
index 0000000..84d08b3
--- /dev/null
+++ b/write-influx/requirements.txt
@@ -0,0 +1,2 @@
+influxdb
+paho-mqtt
diff --git a/write-influx/write_influx.py b/write-influx/write_influx.py
new file mode 100644
index 0000000..17b4ce7
--- /dev/null
+++ b/write-influx/write_influx.py
@@ -0,0 +1,90 @@
+from influxdb import InfluxDBClient
+import os
+import paho.mqtt.client as mqtt
+import time
+import pathlib
+import datetime
+from dataclasses import dataclass, field
+from typing import List, Dict
+
+
+class Topic(dict):
+
+ def __init__(self):
+ super().__init__()
+ self["__value__"] = None
+
+ def __getitem__(self, key):
+ try:
+ return super().__getitem__(key)
+ except KeyError:
+ out = Topic()
+ self[key] = out
+ return out
+
+
+homie = Topic()
+
+
+def on_connect(client, userdata, flags, rc):
+ print("Connected with result code " + str(rc))
+ client.subscribe("homie/#")
+
+
+def on_message(client, userdata, msg):
+ topics = msg.topic.split("/")
+ payload = msg.payload.decode()
+ # fill topic tree
+ here = homie
+ for t in topics[1:]:
+ here = here[t]
+ here["__value__"] = payload
+ # tags
+ tags = {}
+ tags["device_id"] = topics[1]
+ # case of device attribute
+ device_attributes = ["$homie", "$name", "$state", "$nodes", "$extensions", "$implementation"]
+ if len(topics) == 3 and topics[2] in device_attributes:
+ measurement = topics[2]
+ fields = {"value": payload}
+ write_point(measurement, tags, fields)
+ # case of device node property
+ elif len(topics) == 4:
+ measurement = topics[3]
+ tags["node"] = topics[2]
+ datatype = homie[topics[1]][topics[2]][topics[3]]["$datatype"]["__value__"]
+ if datatype == "float":
+ fields = {"value": float(payload)}
+ else:
+ return
+ write_point(measurement, tags, fields)
+
+
+influx_client = InfluxDBClient("db", 8086, 'root', 'root', "homie")
+influx_client.create_database("homie")
+
+
+def write_point(measurement, tags, fields):
+ json = {}
+ json["measurement"] = measurement
+ json["tags"] = tags
+ json["fields"] = fields
+ try:
+ influx_client.write_points([json])
+ except Exception as e:
+ print(e)
+ print(json)
+
+
+
+client = mqtt.Client()
+client.on_connect = on_connect
+client.on_message = on_message
+
+client.connect("broker", 1883, 60)
+
+# Blocking call that processes network traffic, dispatches callbacks and
+# handles reconnecting.
+# Other loop*() functions are available that give a threaded interface and a
+# manual interface.
+client.loop_forever()