-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.js
74 lines (64 loc) · 1.72 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
const mqtt = require("async-mqtt");
const influx = require("influx");
// open a connection to the influxdb database
const influxDb = new influx.InfluxDB({
host: process.env.INFLUXDB_HOST,
port: process.env.INFLUXDB_PORT,
username: process.env.INFLUXDB_USERNAME,
password: process.env.INFLUXDB_PASSWORD,
database: process.env.INFLUXDB_DATABASE
});
// connect to a MQTT broker
const mqttClient = mqtt.connect(
process.env.MQTT_URI,
{
port: process.env.MQTT_PORT,
username: process.env.MQTT_USERNAME,
password: process.env.MQTT_PASSWORD
}
);
mqttClient.on("connect", async () => {
console.log("Connected to MQTT broker");
try {
// make sure the topic name has a trailing slash
let topic = process.env.MQTT_TOPIC_PREFIX;
if (!topic.endsWith("/")) {
topic += "/";
}
// subscripe the a channel with a quality of service attribute of 2 = "exactly once"
await mqttClient.subscribe(`${topic}#`, {
qos: 2
});
mqttClient.on("message", async (topic, message) => {
console.log(topic, message.toString());
let measurementName = topic.replace(/\//g, "_");
if (measurementName.endsWith("_")) {
measurementName = measurementName.slice(0, -1);
}
const json = JSON.parse(message.toString());
let data;
// check if the transmitted object is a json
if (typeof json === "object") {
// decode message buffer to json
data = Object.assign(json, {
measurement: measurementName
});
} else {
data = {
measurement: measurementName,
fields: {
value: json
}
};
}
// only publish if data was available
if (data) {
await influxDb.writePoints([data]);
}
});
} catch (e) {
// log error and end process
console.log(e.stack);
process.exit();
}
});