Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion config.json_dist
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,67 @@
"influxenabled" : true,
"mqttenabled" : true,
"debug" : false,
"debugmqtt" : false
"debugmqtt" : false,
"debuggraphite" : false,
"graphiteurl": "plaintext://graphite.example.org:2003/",
"graphiteprefix": "solar.battery",
"graphiteenabled" : true
},
"3e" : {
"mqtt" : true,
"influx" : true,
"graphite" : false,
"tag" : "general",
"serie" : "generic"
},
"3f" : {
"mqtt" : true,
"influx" : true,
"graphite" : true,
"tag" : "general",
"serie" : "generic"
},
"32" : {
"mqtt" : true,
"influx" : true,
"graphite" : false,
"tag" : "general",
"serie" : "generic"
},
"78" : {
"mqtt" : false,
"influx" : false,
"graphite" : false,
"info" : "Dont use influx on message 7857",
"tag" : "general",
"serie" : "generic"
},
"57" : {
"mqtt" : false,
"influx" : false,
"graphite" : true,
"tag" : "general",
"serie" : "generic"
},
"42" : {
"mqtt" : false,
"influx" : true,
"graphite" : false,
"tag" : "general",
"serie" : "nodes",
"tagID" : "ID"
},
"54" : {
"mqtt" : false,
"influx" : true,
"graphite" : true,
"tag" : "general",
"serie" : "daily"
},
"41" : {
"mqtt" : true,
"influx" : false,
"graphite" : true,
"tag" : "general",
"serie" : "nodes2",
"info" : "Dont send this message to influx. its not supported. if you need the data stored in influx use MQTT and then parse it first"
Expand All @@ -62,6 +80,7 @@
"all" : {
"mqtt" : false,
"influx" :false,
"graphite" : false,
"info" : "Do not run every message to influx. you will kill the machine if running raspberry pi!"
}
}
Expand Down
158 changes: 108 additions & 50 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var mqtt = require('mqtt')
var server = udp.createSocket('udp4');
var Parser = require('binary-parser').Parser;
const Influx = require('influx');
const graphite = require('graphite');
var fs = require('fs');


Expand Down Expand Up @@ -43,62 +44,74 @@ catch (e) {
//});


var debug = (config.config.debug) ? config.config.debug : false;
var debugMQTT = (config.config.debugmqtt) ? config.config.debugmqtt : false;;
const debug = (config.config.debug) ? config.config.debug : false;
const debugMQTT = (config.config.debugmqtt) ? config.config.debugmqtt : false;;
const debugGraphite = (config.config.debuggraphite) ? config.config.debuggraphite : false;;



//MQTT server generally localhost
var mqtthost = (config.config.mqtthost) ? config.config.mqtthost : 'localhost';
var mqttenabled = (config.config.mqttenabled) ? config.config.mqttenabled : false;
var mqttusername = (config.config.mqttusername) ? config.config.mqttusername : '';
var mqttpassword = (config.config.mqttpassword) ? config.config.mqttpassword : '';
var influxhost = (config.config.influxhost) ? config.config.influxhost :'localhost';
var influxdatabase = (config.config.influxdatabase) ? config.config.influxdatabase :'localhost';
var influxusername = (config.config.influxusername) ? config.config.influxusername : '';
var influxpassword = (config.config.influxpassword) ? config.config.influxpassword : '';
var influxenabled = (config.config.influxenabled) ? config.config.influxenabled : false;


const mqtthost = (config.config.mqtthost) ? config.config.mqtthost : 'localhost';
const mqttenabled = (config.config.mqttenabled) ? config.config.mqttenabled : false;
const mqttusername = (config.config.mqttusername) ? config.config.mqttusername : '';
const mqttpassword = (config.config.mqttpassword) ? config.config.mqttpassword : '';
const influxhost = (config.config.influxhost) ? config.config.influxhost :'localhost';
const influxdatabase = (config.config.influxdatabase) ? config.config.influxdatabase :'localhost';
const influxusername = (config.config.influxusername) ? config.config.influxusername : '';
const influxpassword = (config.config.influxpassword) ? config.config.influxpassword : '';
const influxenabled = (config.config.influxenabled) ? config.config.influxenabled : false;
const graphiteurl = config.config.graphiteurl || '';
const graphiteprefix = config.config.graphiteprefix || '';
const graphiteenabled = (config.config.graphiteenabled) ? config.config.graphiteenabled : false;

var client;
//Setup MQTT
options={
clientId:"raspi",
username:mqttusername,
password:mqttpassword,
clean:true};

var client = mqtt.connect('mqtt://' + mqtthost, options)

client.on("error",function(error){
console.log("Can't connect to MQTT server" + error);
console.log(mqttenabled);
if (mqttenabled) { process.exit(1); }
});

if (mqttenabled) {
options={
clientId:"raspi",
username:mqttusername,
password:mqttpassword,
clean:true};

client = mqtt.connect('mqtt://' + mqtthost, options)

client.on("error",function(error){
console.log("Can't connect to MQTT server" + error);
console.log(mqttenabled);
if (mqttenabled) { process.exit(1); }
});
}
//console.log("connected flag "+client.connected);


const influx = new Influx.InfluxDB({
host: influxhost,
database: influxdatabase,
port: 8086,
username: influxusername,
password: influxpassword,
})

console.log('Influx host set to: ' + influxhost);

influx.ping(5000).then(hosts => {
hosts.forEach(host => {
if (host.online) {
console.log(`${host.url.host} responded in ${host.rtt}ms running ${host.version})`)
} else {
console.log(`InfluxDB: ${host.url.host} is offline so quitting`)
if (influxenabled) process.exit(1);
}
var influx;
if (influxenabled) {
influx = new Influx.InfluxDB({
host: influxhost,
database: influxdatabase,
port: 8086,
username: influxusername,
password: influxpassword,
})
})

console.log('Influx host set to: ' + influxhost);

influx.ping(5000).then(hosts => {
hosts.forEach(host => {
if (host.online) {
console.log(`${host.url.host} responded in ${host.rtt}ms running ${host.version})`)
} else {
console.log(`InfluxDB: ${host.url.host} is offline so quitting`)
if (influxenabled) process.exit(1);
}
})
})
}

var graphiteClient;
if (graphiteenabled) {
graphiteClient = graphite.createClient(graphiteurl);
}



Expand Down Expand Up @@ -149,6 +162,49 @@ function sendInflux(data, tag) {
);
};


function sendGraphite(systemId,messageId,data) {
const prefix = graphiteprefix + '.batrium' + systemId + '.';
var metrics = {};

switch (messageId) {
case "54":
metrics = {
[prefix + 'DailySessionCumulShuntkWhCharge']: data.DailySessionCumulShuntkWhCharge,
[prefix + 'DailySessionCumulShuntkWhDischg']: data.DailySessionCumulShuntkWhDischg,
};
break;
case "3f":
['ShuntVoltage', 'ShuntCurrent', 'ShuntPowerVA'].forEach((x) => {
metrics[prefix + x] = data[x];
});
break;
case "57":
['SystemOpStatus', 'ShuntSOC', 'ShuntVoltage', 'ShuntCurrent'].forEach((x) => {
metrics[prefix + x] = data[x];
});
break;
case "41":
data.nodes.forEach((node) => {
//Rename these two, so the naming is nicer
metrics[prefix + 'cells.' + node.ID + '.Volt'] = node['MinCellVolt'];
metrics[prefix + 'cells.' + node.ID + '.Temp'] = node['MinCellTemp'];
['BypassTemp', 'BypassAmp', 'Status'].forEach((x) => {
metrics[prefix + 'cells.' + node.ID + '.' + x] = node[x];
});
});
break;
}

if (debugGraphite) {
console.log(metrics);
}
if (metrics) {
graphiteClient.write(metrics);
}
}


function errorText(string) {
console.log('\x1b[31m%s\x1b[0m', string);
}
Expand Down Expand Up @@ -204,11 +260,13 @@ server.on('message',function(msg,info){
obj = Object.assign(payload, messages[payload.MessageId](msg));
if (debug) console.log(obj);
// check if the message id is present in the config. This dont care what version is there if file exist
if (config[messageID] && config[messageID].mqtt || config.all.mqtt) sendMqtt(payload.SystemId,payload.MessageId,obj);
if (config[messageID] && config[messageID].influx || config.all.influx) sendInflux(obj, tag);
if (mqttenabled && (config[messageID] && config[messageID].mqtt || config.all.mqtt)) sendMqtt(payload.SystemId,payload.MessageId,obj);
if (influxenabled && (config[messageID] && config[messageID].influx || config.all.influx)) sendInflux(obj, tag);
if (graphiteenabled && (config[messageID] && config[messageID].graphite || config.all.graphite)) sendGraphite(payload.SystemId,messageID,obj);
// Below is used if you use messageid and version in the configuration file
if (config[payload.MessageId] && config[payload.MessageId].mqtt || config.all.mqtt) sendMqtt(payload.SystemId,payload.MessageId,obj);
if (config[payload.MessageId] && config[payload.MessageId].influx || config.all.influx) sendInflux(obj, tag);
if (mqttenabled && (config[payload.MessageId] && config[payload.MessageId].mqtt || config.all.mqtt)) sendMqtt(payload.SystemId,payload.MessageId,obj);
if (influxenabled && (config[payload.MessageId] && config[payload.MessageId].influx || config.all.influx)) sendInflux(obj, tag);
if (graphiteenabled && (config[payload.MessageId] && config[payload.MessageId].graphite || config.all.graphite)) sendGraphite(payload.SystemId,messageID,obj);
} catch (e) {
errorText('Couldnt get payload for ' + payload.MessageId + ' Size: %s',msg.length);
console.log(e);
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"dependencies": {
"binary-parser": "latest",
"influx": "latest",
"mqtt": "latest"
"mqtt": "latest",
"graphite": "latest"
}
}