aboutsummaryrefslogtreecommitdiff
path: root/write-influx/write_influx.py
blob: b558b8401822623602f1d03a0d6f49b75cb13784 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from influxdb import InfluxDBClient
import os
import paho.mqtt.client as mqtt
import time
import pathlib
import datetime
from dataclasses import dataclass, field
from typing import List, Dict


class Topic(dict):
    def __init__(self):
        super().__init__()
        self["__value__"] = None

    def __getitem__(self, key):
        try:
            return super().__getitem__(key)
        except KeyError:
            out = Topic()
            self[key] = out
            return out


homie = Topic()


def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe("homie/#")


def on_message(client, userdata, msg):
    topics = msg.topic.split("/")
    payload = msg.payload.decode()
    # fill topic tree
    here = homie
    for t in topics[1:]:
        here = here[t]
    here["__value__"] = payload
    # tags
    tags = {}
    tags["device_id"] = topics[1]
    # case of device attribute
    device_attributes = [
        "$homie",
        "$name",
        "$state",
        "$nodes",
        "$extensions",
        "$implementation",
    ]
    if len(topics) == 3 and topics[2] in device_attributes:
        measurement = topics[2]
        fields = {"value": payload}
        write_point(measurement, tags, fields)
    # case of device node property
    elif len(topics) == 4:
        measurement = topics[3]
        tags["node"] = topics[2]
        datatype = homie[topics[1]][topics[2]][topics[3]]["$datatype"]["__value__"]
        if datatype == "float":
            fields = {"value": float(payload)}
        else:
            return
        write_point(measurement, tags, fields)


influx_client = InfluxDBClient(host="db",
                               port=8086,
                               username="admin",
                               password=os.environ["INFLUXDB_ADMIN_PASSWORD"],
                               database="homie")

influx_client.create_database("homie")
try:
    influx_client.create_retention_policy(
        name="two-years",
        database="homie",
        duration="18000h",
        default=True,
        replication=1,
    )
except:
    pass


def write_point(measurement, tags, fields):
    json = {}
    json["measurement"] = measurement
    json["tags"] = tags
    json["fields"] = fields
    try:
        influx_client.write_points([json])
    except Exception as e:
        print(e)
    print(json)


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("broker", 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()