You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
5.7 KiB

/**
*
* yunkong2 mqtt Adapter
*
* (c) 2014-2018 bluefox
*
* MIT License
*
*/
'use strict';
const utils = require(__dirname + '/lib/utils'); // Get common adapter utils
const adapter = new utils.Adapter('mqtt');
let server = null;
let client = null;
let states = {};
const messageboxRegex = new RegExp('\.messagebox$');
function decrypt(key, value) {
let result = '';
for (let i = 0; i < value.length; ++i) {
result += String.fromCharCode(key[i % key.length].charCodeAt(0) ^ value.charCodeAt(i));
}
return result;
}
adapter.on('message', function (obj) {
if (obj) processMessage(obj);
processMessages();
});
adapter.on('ready', function () {
adapter.config.pass = decrypt('Zgfr56gFe87jJOM', adapter.config.pass);
adapter.config.maxTopicLength = adapter.config.maxTopicLength || 100;
if (adapter.config.ssl && adapter.config.type === 'server') {
// Load certificates
adapter.getCertificates(function (err, certificates) {
adapter.config.certificates = certificates;
main();
});
} else {
// Start
main();
}
});
adapter.on('unload', function () {
if (client) client.destroy();
if (server) server.destroy();
});
// is called if a subscribed state changes
adapter.on('stateChange', (id, state) => {
adapter.log.debug('stateChange ' + id + ': ' + JSON.stringify(state));
// State deleted
if (!state) {
delete states[id];
// If SERVER
if (server) server.onStateChange(id);
// if CLIENT
if (client) client.onStateChange(id);
} else
// you can use the ack flag to detect if state is desired or acknowledged
if ((adapter.config.sendAckToo || !state.ack) && !messageboxRegex.test(id)) {
const oldVal = states[id] ? states[id].val : null;
const oldAck = states[id] ? states[id].ack : null;
states[id] = state;
// If value really changed
if (!adapter.config.onchange || oldVal !== state.val || oldAck !== state.ack) {
// If SERVER
if (server) server.onStateChange(id, state);
// if CLIENT
if (client) client.onStateChange(id, state);
}
}
});
function processMessage(obj) {
if (!obj || !obj.command) return;
switch (obj.command) {
case 'test': {
// Try to connect to mqtt broker
if (obj.callback && obj.message) {
const mqtt = require('mqtt');
const _url = 'mqtt://' + (obj.message.user ? (obj.message.user + ':' + obj.message.pass + '@') : '') + obj.message.url + (obj.message.port ? (':' + obj.message.port) : '') + '?clientId=yunkong2.' + adapter.namespace;
const _client = mqtt.connect(_url);
// Set timeout for connection
const timeout = setTimeout(() => {
_client.end();
adapter.sendTo(obj.from, obj.command, 'timeout', obj.callback);
}, 2000);
// If connected, return success
_client.on('connect', () => {
_client.end();
clearTimeout(timeout);
adapter.sendTo(obj.from, obj.command, 'connected', obj.callback);
});
}
}
}
}
function processMessages() {
adapter.getMessage((err, obj) => {
if (obj) {
processMessage(obj.command, obj.message);
processMessages();
}
});
}
let cnt = 0;
function readStatesForPattern(pattern) {
adapter.getForeignStates(pattern, (err, res) => {
if (!err && res) {
if (!states) states = {};
for (const id in res) {
if (res.hasOwnProperty(id) && !messageboxRegex.test(id)) {
states[id] = res[id];
}
}
}
// If all patters answered, start client or server
if (!--cnt) {
if (adapter.config.type === 'client') {
client = new require(__dirname + '/lib/client')(adapter, states);
} else {
server = new require(__dirname + '/lib/server')(adapter, states);
}
}
});
}
function main() {
// Subscribe on own variables to publish it
if (adapter.config.publish) {
const parts = adapter.config.publish.split(',');
for (let t = 0; t < parts.length; t++) {
if (parts[t].indexOf('#') !== -1) {
adapter.log.warn('Used MQTT notation for yunkong2 in pattern "' + parts[t] + '": use "' + parts[t].replace(/#/g, '*') + ' notation');
parts[t] = parts[t].replace(/#/g, '*');
}
adapter.subscribeForeignStates(parts[t].trim());
cnt++;
readStatesForPattern(parts[t]);
}
} else {
// subscribe for all variables
adapter.subscribeForeignStates('*');
readStatesForPattern('*');
}
adapter.config.defaultQoS = parseInt(adapter.config.defaultQoS, 10) || 0;
adapter.config.retain = adapter.config.retain === 'true' || adapter.config.retain === true;
adapter.config.retransmitInterval = parseInt(adapter.config.retransmitInterval, 10) || 2000;
adapter.config.retransmitCount = parseInt(adapter.config.retransmitCount, 10) || 10;
if (adapter.config.retransmitInterval < adapter.config.sendInterval) {
adapter.config.retransmitInterval = adapter.config.sendInterval * 5;
}
// If no subscription, start client or server
if (!cnt) {
if (adapter.config.type === 'client') {
client = new require(__dirname + '/lib/client')(adapter, states);
} else {
server = new require(__dirname + '/lib/server')(adapter, states);
}
}
}