From d3e6cae7ea36b695835c4b1f98a0a91c7e5c6ec2 Mon Sep 17 00:00:00 2001 From: Blaise Thompson Date: Wed, 28 Oct 2020 17:44:13 -0500 Subject: initial working docker-compose --- write_influx.py | 91 --------------------------------------------------------- 1 file changed, 91 deletions(-) delete mode 100644 write_influx.py (limited to 'write_influx.py') diff --git a/write_influx.py b/write_influx.py deleted file mode 100644 index 240b10b..0000000 --- a/write_influx.py +++ /dev/null @@ -1,91 +0,0 @@ -from influxdb import InfluxDBClient -import os -import paho.mqtt.client as mqtt -import time -import pathlib -import datetime -from dataclasses import dataclass, field -from typing import List, Dict - - -class Topic(dict): - - def __init__(self): - super().__init__() - self["__value__"] = None - - def __getitem__(self, key): - try: - return super().__getitem__(key) - except KeyError: - out = Topic() - self[key] = out - return out - - -homie = Topic() - - -def on_connect(client, userdata, flags, rc): - print("Connected with result code " + str(rc)) - client.subscribe("homie/#") - - -def on_message(client, userdata, msg): - topics = msg.topic.split("/") - payload = msg.payload.decode() - # fill topic tree - here = homie - for t in topics[1:]: - here = here[t] - here["__value__"] = payload - # tags - tags = {} - tags["device_id"] = topics[1] - # case of device attribute - device_attributes = ["$homie", "$name", "$state", "$nodes", "$extensions", "$implementation"] - if len(topics) == 3 and topics[2] in device_attributes: - measurement = topics[2] - fields = {"value": payload} - write_point(measurement, tags, fields) - # case of device node property - elif len(topics) == 4: - measurement = topics[3] - tags["node"] = topics[2] - datatype = homie[topics[1]][topics[2]][topics[3]]["$datatype"]["__value__"] - print(datatype, datatype=="float") - if datatype == "float": - fields = {"value": float(payload)} - else: - raise KeyError - write_point(measurement, tags, fields) - - -influx_client = InfluxDBClient('localhost', 8086, 'root', 'root', "homie") -influx_client.create_database("homie") - - -def write_point(measurement, tags, fields): - json = {} - json["measurement"] = measurement - json["tags"] = tags - json["fields"] = fields - try: - influx_client.write_points([json]) - except Exception as e: - print(e) - print(json) - - - -client = mqtt.Client() -client.on_connect = on_connect -client.on_message = on_message - -client.connect("mosquitto.chem.wisc.edu", 1883, 60) - -# Blocking call that processes network traffic, dispatches callbacks and -# handles reconnecting. -# Other loop*() functions are available that give a threaded interface and a -# manual interface. -client.loop_forever() -- cgit v1.2.3