1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
from influxdb import InfluxDBClient
import os
import paho.mqtt.client as mqtt
import time
import pathlib
import datetime
from dataclasses import dataclass, field
from typing import List, Dict
class Topic(dict):
def __init__(self):
super().__init__()
self["__value__"] = None
def __getitem__(self, key):
try:
return super().__getitem__(key)
except KeyError:
out = Topic()
self[key] = out
return out
homie = Topic()
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("homie/#")
def on_message(client, userdata, msg):
topics = msg.topic.split("/")
payload = msg.payload.decode()
# fill topic tree
here = homie
for t in topics[1:]:
here = here[t]
here["__value__"] = payload
# tags
tags = {}
tags["device_id"] = topics[1]
# case of device attribute
device_attributes = [
"$homie",
"$name",
"$state",
"$nodes",
"$extensions",
"$implementation",
]
if len(topics) == 3 and topics[2] in device_attributes:
measurement = topics[2]
fields = {"value": payload}
write_point(measurement, tags, fields)
# case of device node property
elif len(topics) == 4:
measurement = topics[3]
tags["node"] = topics[2]
datatype = homie[topics[1]][topics[2]][topics[3]]["$datatype"]["__value__"]
if datatype == "float":
fields = {"value": float(payload)}
else:
return
write_point(measurement, tags, fields)
influx_client = InfluxDBClient(host="db",
port=8086,
username="admin",
password=os.environ["INFLUXDB_ADMIN_PASSWORD"],
database="homie")
influx_client.create_database("homie")
try:
influx_client.create_retention_policy(
name="two-years",
database="homie",
duration="18000h",
default=True,
replication=1,
)
except:
pass
def write_point(measurement, tags, fields):
json = {}
json["measurement"] = measurement
json["tags"] = tags
json["fields"] = fields
try:
influx_client.write_points([json])
except Exception as e:
print(e)
print(json)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker", 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
|