In a typical IoT model, you have many IoT sensors publishing data to IoT broker. You may want to collect data from all these IoT nodes in a database for visualization and further analytics.
I have couple of IoT nodes installed in two houses in two different cities. And I collect data from all IoT nodes in a central database.
To achieve this, I am using a raspberry pi 3 running 24×7 at home. On this raspberry I have configured influxdb/grafana and a MQTT broker.
I wrote mqtt subscriber about an year ago to receive data for one topic. But now I have improved it further to accept messages from multiple “topics”.
“topics” and other configuration parameters are configured in a json file, config.json as following.
cat /opt/tools/mqttsub/config.json
{
"mqtt": { "broker": "127.0.0.1", "mqttuser": "username", "mqttpass": "password", "mqttport": 1883, "mqttclient": "Collector Node" },
"topics": ["weather", "dustdensity", "whatever",....],
"dataconn": {"dbhost": "127.0.0.1", "dbname": "dbName", "dbuser": "database user", "dbpass": "database password"},
"BaseLocation": {"city": "Gurgaon"}
}
Here is the code that reads above config.json and then keeps listing on listed “topics”.
This code accepts payload from IoT publishers in format {“tags”: {“tag1”: tag1value, “tag2”: tag2value….}, “fields”:{“field1” field1value, “field2”: field2value,…..}}
MQTT subscriber appends publisher “topic” as influxdb measurement in json payload. final database payloads looks like:
{‘fields’: {‘humidity’: 61.3, ‘temperature’: 23.5}, ‘tags’: {‘location’: ‘Gurgaon’}, ‘measurement’: ‘weather’}
MQTT subscriber will collect data only from the topics mentioned in config.json. No other “topic” will be processed.
cat /opt/tools/mqttsub/mqttcol.py
#!/usr/bin/python3
import paho.mqtt.client as paho
import datetime
import time
import logging
import json
import subprocess
from influxdb import InfluxDBClient
with open('/opt/tools/mqttsub/config.json') as cfg:
cfgdata = json.load(cfg)
broker = cfgdata.get('mqtt').get('broker')
port = cfgdata.get('mqtt').get('mqttport')
username = cfgdata.get('mqtt').get('mqttuser')
password = cfgdata.get('mqtt').get('mqttpass')
topics = cfgdata.get('topics')
dbhost = cfgdata.get('dataconn').get('dbhost')
dbname = cfgdata.get('dataconn').get('dbname')
dbuser = cfgdata.get('dataconn').get('dbuser')
dbpass = cfgdata.get('dataconn').get('dbpass')
print(broker, port, username, password, topics, dbhost, dbname, dbuser, dbpass)
dbclient = InfluxDBClient(dbhost, 8086, dbuser, dbpass, dbname)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler('/var/log/mqttcol.log') # create a file handler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # create a logging format
handler.setFormatter(formatter)
logger.addHandler(handler)# add the handlers to the logger
def on_subscribe(client, userdata, mid, granted_qos):
#print("Subscribed: "+str(mid)+" "+str(granted_qos))
print("Waiting for message...")
def on_message(client, userdata, mqttmsg):
print(mqttmsg.topic+" "+str(mqttmsg.qos)+" "+mqttmsg.payload.decode("utf-8"))
if mqttmsg.topic in topics:
dbmeasurement = mqttmsg.topic
jsondata=json.loads(mqttmsg.payload.decode("utf-8"))
jsondata['measurement']=dbmeasurement
#jsondata['time']=str(datetime.datetime.now())
payload=[jsondata]
print(payload)
dbclient.write_points(payload)
logger.info(payload)
client = paho.Client()
client.username_pw_set(username, password)
client.on_subscribe = on_subscribe
client.on_message = on_message
client.connect(broker, port)
for topic in topics:
client.subscribe(topic)
client.loop_forever()
This program will also write messages to a log file /var/log/mqttcol.log
Next, let us create systemctl service to manage start | stop | status operations on MQTT collector.
Create a file “/etc/systemd/system/mkaiot.service” like:
[Unit]
Description=MKA IoT
[Service]
Type=simple
ExecStart=/opt/tools/mqttsub/mqttcol.py
[Install]
WantedBy=multi-user.target
Set permissions
chmod 644 /etc/systemd/system/mkasiot.service
Enable service to start at boot up
systemctl enable mkaiot
Start service
systemctl start mkaiot
Let us check Status
systemctl status mkaiot
mkaiot.service - MKA IoT
Loaded: loaded (/etc/systemd/system/mkaiot.service; enabled; vendor preset: enabled)
Active: active (running) since Sat 2020-03-14 14:40:55 IST; 4h 38min ago
Main PID: 373 (mqttcol.py)
Tasks: 1 (limit: 4915)
CGroup: /system.slice/mkaiot.service
└─373 /usr/bin/python3 /opt/tools/mqttsub/mqttcol.py
Mar 14 18:14:07 homebox mqttcol.py[373]: weather 0 {"tags": {"location": "Gurgaon"}, "fields": {"humidity": 61.4, "temperature": 23.5}}
Mar 14 18:14:07 homebox mqttcol.py[373]: [{'fields': {'humidity': 61.4, 'temperature': 23.5}, 'tags': {'location': 'Gurgaon'}, 'measurement': 'weather'}]
Mar 14 18:14:07 homebox mqttcol.py[373]: dustdensity 0 {"tags": {"location": "Gurgaon"}, "fields": {"dust": 0.408558}}
Mar 14 18:14:07 homebox mqttcol.py[373]: [{'fields': {'dust': 0.408558}, 'tags': {'location': 'Gurgaon'}, 'measurement': 'dustdensity'}]
I hope you find this article useful.