/** * * yunkong2 mqtt Adapter * * (c) 2014-2018 bluefox * * MIT License * */ 'use strict'; const utils = require(__dirname + '/lib/utils'); // Get common adapter utils const adapter = new utils.Adapter('mqtt'); let server = null; let client = null; let states = {}; const messageboxRegex = new RegExp('\.messagebox$'); function decrypt(key, value) { let result = ''; for (let i = 0; i < value.length; ++i) { result += String.fromCharCode(key[i % key.length].charCodeAt(0) ^ value.charCodeAt(i)); } return result; } adapter.on('message', function (obj) { if (obj) processMessage(obj); processMessages(); }); adapter.on('ready', function () { adapter.config.pass = decrypt('Zgfr56gFe87jJOM', adapter.config.pass); adapter.config.maxTopicLength = adapter.config.maxTopicLength || 100; if (adapter.config.ssl && adapter.config.type === 'server') { // Load certificates adapter.getCertificates(function (err, certificates) { adapter.config.certificates = certificates; main(); }); } else { // Start main(); } }); adapter.on('unload', function () { if (client) client.destroy(); if (server) server.destroy(); }); // is called if a subscribed state changes adapter.on('stateChange', (id, state) => { adapter.log.debug('stateChange ' + id + ': ' + JSON.stringify(state)); // State deleted if (!state) { delete states[id]; // If SERVER if (server) server.onStateChange(id); // if CLIENT if (client) client.onStateChange(id); } else // you can use the ack flag to detect if state is desired or acknowledged if ((adapter.config.sendAckToo || !state.ack) && !messageboxRegex.test(id)) { const oldVal = states[id] ? states[id].val : null; const oldAck = states[id] ? states[id].ack : null; states[id] = state; // If value really changed if (!adapter.config.onchange || oldVal !== state.val || oldAck !== state.ack) { // If SERVER if (server) server.onStateChange(id, state); // if CLIENT if (client) client.onStateChange(id, state); } } }); function processMessage(obj) { if (!obj || !obj.command) return; switch (obj.command) { case 'test': { // Try to connect to mqtt broker if (obj.callback && obj.message) { const mqtt = require('mqtt'); const _url = 'mqtt://' + (obj.message.user ? (obj.message.user + ':' + obj.message.pass + '@') : '') + obj.message.url + (obj.message.port ? (':' + obj.message.port) : '') + '?clientId=yunkong2.' + adapter.namespace; const _client = mqtt.connect(_url); // Set timeout for connection const timeout = setTimeout(() => { _client.end(); adapter.sendTo(obj.from, obj.command, 'timeout', obj.callback); }, 2000); // If connected, return success _client.on('connect', () => { _client.end(); clearTimeout(timeout); adapter.sendTo(obj.from, obj.command, 'connected', obj.callback); }); } } } } function processMessages() { adapter.getMessage((err, obj) => { if (obj) { processMessage(obj.command, obj.message); processMessages(); } }); } let cnt = 0; function readStatesForPattern(pattern) { adapter.getForeignStates(pattern, (err, res) => { if (!err && res) { if (!states) states = {}; for (const id in res) { if (res.hasOwnProperty(id) && !messageboxRegex.test(id)) { states[id] = res[id]; } } } // If all patters answered, start client or server if (!--cnt) { if (adapter.config.type === 'client') { client = new require(__dirname + '/lib/client')(adapter, states); } else { server = new require(__dirname + '/lib/server')(adapter, states); } } }); } function main() { // Subscribe on own variables to publish it if (adapter.config.publish) { const parts = adapter.config.publish.split(','); for (let t = 0; t < parts.length; t++) { if (parts[t].indexOf('#') !== -1) { adapter.log.warn('Used MQTT notation for yunkong2 in pattern "' + parts[t] + '": use "' + parts[t].replace(/#/g, '*') + ' notation'); parts[t] = parts[t].replace(/#/g, '*'); } adapter.subscribeForeignStates(parts[t].trim()); cnt++; readStatesForPattern(parts[t]); } } else { // subscribe for all variables adapter.subscribeForeignStates('*'); readStatesForPattern('*'); } adapter.config.defaultQoS = parseInt(adapter.config.defaultQoS, 10) || 0; adapter.config.retain = adapter.config.retain === 'true' || adapter.config.retain === true; adapter.config.retransmitInterval = parseInt(adapter.config.retransmitInterval, 10) || 2000; adapter.config.retransmitCount = parseInt(adapter.config.retransmitCount, 10) || 10; if (adapter.config.retransmitInterval < adapter.config.sendInterval) { adapter.config.retransmitInterval = adapter.config.sendInterval * 5; } // If no subscription, start client or server if (!cnt) { if (adapter.config.type === 'client') { client = new require(__dirname + '/lib/client')(adapter, states); } else { server = new require(__dirname + '/lib/server')(adapter, states); } } }