moved cluster support into core, removed clustered appender, multiprocess appender

This commit is contained in:
Gareth Jones 2013-08-24 20:46:10 +10:00
parent d8cf8cb2dc
commit eabcaf8aef
5 changed files with 185 additions and 783 deletions

View File

@ -1,118 +0,0 @@
"use strict";
var cluster = require('cluster');
var log4js = require('../log4js');
/**
* Takes a loggingEvent object, returns string representation of it.
*/
function serializeLoggingEvent(loggingEvent) {
return JSON.stringify(loggingEvent);
}
/**
* Takes a string, returns an object with
* the correct log properties.
*
* This method has been "borrowed" from the `multiprocess` appender
* by `nomiddlename` (https://github.com/nomiddlename/log4js-node/blob/master/lib/appenders/multiprocess.js)
*
* Apparently, node.js serializes everything to strings when using `process.send()`,
* so we need smart deserialization that will recreate log date and level for further processing by log4js internals.
*/
function deserializeLoggingEvent(loggingEventString) {
var loggingEvent;
try {
loggingEvent = JSON.parse(loggingEventString);
loggingEvent.startTime = new Date(loggingEvent.startTime);
loggingEvent.level = log4js.levels.toLevel(loggingEvent.level.levelStr);
} catch (e) {
// JSON.parse failed, just log the contents probably a naughty.
loggingEvent = {
startTime: new Date(),
categoryName: 'log4js',
level: log4js.levels.ERROR,
data: [ 'Unable to parse log:', loggingEventString ]
};
}
return loggingEvent;
}
/**
* Creates an appender.
*
* If the current process is a master (`cluster.isMaster`), then this will be a "master appender".
* Otherwise this will be a worker appender, that just sends loggingEvents to the master process.
*
* If you are using this method directly, make sure to provide it with `config.actualAppenders` array
* of actual appender instances.
*
* Or better use `configure(config, options)`
*/
function createAppender(config) {
if (cluster.isMaster) {
var masterAppender = function(loggingEvent) {
if (config.actualAppenders) {
var size = config.actualAppenders.length;
for(var i = 0; i < size; i++) {
config.actualAppenders[i](loggingEvent);
}
}
}
// Listen on new workers
cluster.on('fork', function(worker) {
worker.on('message', function(message) {
if (message.type && message.type === '::log-message') {
// console.log("master : " + cluster.isMaster + " received message: " + JSON.stringify(message.event));
var loggingEvent = deserializeLoggingEvent(message.event);
masterAppender(loggingEvent);
}
});
});
return masterAppender;
} else {
return function(loggingEvent) {
// If inside the worker process, then send the logger event to master.
if (cluster.isWorker) {
// console.log("worker " + cluster.worker.id + " is sending message");
process.send({ type: '::log-message', event: serializeLoggingEvent(loggingEvent)});
}
}
}
}
function configure(config, options) {
if (config.appenders && cluster.isMaster) {
var size = config.appenders.length;
config.actualAppenders = new Array(size);
for(var i = 0; i < size; i++) {
log4js.loadAppender(config.appenders[i].type);
config.actualAppenders[i] = log4js.appenderMakers[config.appenders[i].type](config.appenders[i], options);
}
}
return createAppender(config);
}
exports.appender = createAppender;
exports.configure = configure;

View File

@ -1,129 +0,0 @@
"use strict";
var log4js = require('../log4js')
, net = require('net')
, END_MSG = '__LOG4JS__';
/**
* Creates a server, listening on config.loggerPort, config.loggerHost.
* Output goes to config.actualAppender (config.appender is used to
* set up that appender).
*/
function logServer(config) {
/**
* Takes a utf-8 string, returns an object with
* the correct log properties.
*/
function deserializeLoggingEvent(clientSocket, msg) {
var loggingEvent;
try {
loggingEvent = JSON.parse(msg);
loggingEvent.startTime = new Date(loggingEvent.startTime);
loggingEvent.level = log4js.levels.toLevel(loggingEvent.level.levelStr);
} catch (e) {
// JSON.parse failed, just log the contents probably a naughty.
loggingEvent = {
startTime: new Date(),
categoryName: 'log4js',
level: log4js.levels.ERROR,
data: [ 'Unable to parse log:', msg ]
};
}
loggingEvent.remoteAddress = clientSocket.remoteAddress;
loggingEvent.remotePort = clientSocket.remotePort;
return loggingEvent;
}
var actualAppender = config.actualAppender,
server = net.createServer(function serverCreated(clientSocket) {
clientSocket.setEncoding('utf8');
var logMessage = '';
function logTheMessage(msg) {
if (logMessage.length > 0) {
actualAppender(deserializeLoggingEvent(clientSocket, msg));
}
}
function chunkReceived(chunk) {
var event;
logMessage += chunk || '';
if (logMessage.indexOf(END_MSG) > -1) {
event = logMessage.substring(0, logMessage.indexOf(END_MSG));
logTheMessage(event);
logMessage = logMessage.substring(event.length + END_MSG.length) || '';
//check for more, maybe it was a big chunk
chunkReceived();
}
}
clientSocket.on('data', chunkReceived);
clientSocket.on('end', chunkReceived);
});
server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost');
return actualAppender;
}
function workerAppender(config) {
var canWrite = false,
buffer = [],
socket;
createSocket();
function createSocket() {
socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
socket.on('connect', function() {
emptyBuffer();
canWrite = true;
});
socket.on('timeout', socket.end.bind(socket));
//don't bother listening for 'error', 'close' gets called after that anyway
socket.on('close', createSocket);
}
function emptyBuffer() {
var evt;
while ((evt = buffer.shift())) {
write(evt);
}
}
function write(loggingEvent) {
socket.write(JSON.stringify(loggingEvent), 'utf8');
socket.write(END_MSG, 'utf8');
}
return function log(loggingEvent) {
if (canWrite) {
write(loggingEvent);
} else {
buffer.push(loggingEvent);
}
};
}
function createAppender(config) {
if (config.mode === 'master') {
return logServer(config);
} else {
return workerAppender(config);
}
}
function configure(config, options) {
var actualAppender;
if (config.appender && config.mode === 'master') {
log4js.loadAppender(config.appender.type);
actualAppender = log4js.appenderMakers[config.appender.type](config.appender, options);
config.actualAppender = actualAppender;
}
return createAppender(config);
}
exports.appender = createAppender;
exports.configure = configure;

View File

@ -48,6 +48,7 @@
*/
var debug = require('./debug')('core')
, fs = require('fs')
, cluster = require('cluster')
, util = require('util')
, layouts = require('./layouts')
, levels = require('./levels')
@ -64,16 +65,57 @@ var debug = require('./debug')('core')
}
};
function serialise(event) {
return JSON.stringify(event);
}
function deserialise(serialised) {
var event;
try {
event = JSON.parse(serialised);
event.startTime = new Date(event.startTime);
event.level = levels.toLevel(event.level.levelStr);
} catch(e) {
event = {
startTime: new Date(),
category: 'log4js',
level: levels.ERROR,
data: [ 'Unable to parse log:', serialised ]
};
}
return event;
}
//in a multi-process node environment, worker loggers will use
//process.send
cluster.on('fork', function(worker) {
debug('listening to worker: ' + worker);
worker.on('message', function(message) {
if (message.type && message.type === '::log4js-message') {
debug("received message: " + message.event);
dispatch(deserialise(message.event));
}
});
});
/**
* Get a logger instance.
* @param {String} categoryName name of category to log to.
* @param {String} category to log to.
* @return {Logger} instance of logger for the category
* @static
*/
function getLogger (categoryName) {
debug("getLogger(" + categoryName + ")");
function getLogger (category) {
debug("getLogger(" + category + ")");
return new Logger(dispatch, categoryName || 'default');
return new Logger(
cluster.isMaster ? dispatch : workerDispatch,
category || 'default'
);
}
function workerDispatch(event) {
process.send({ type: "::log4js-message", event: serialise(event) });
}
/**
@ -92,67 +134,6 @@ function dispatch(event) {
}
}
/*
var configState = {};
function loadConfigurationFile(filename) {
if (filename) {
return JSON.parse(fs.readFileSync(filename, "utf8"));
}
return undefined;
}
function configureOnceOff(config, options) {
if (config) {
try {
configureAppenders(config.appenders, options);
configureLevels(config.levels);
if (config.replaceConsole) {
replaceConsole();
} else {
restoreConsole();
}
} catch (e) {
throw new Error(
"Problem reading log4js config " + util.inspect(config) +
". Error was \"" + e.message + "\" (" + e.stack + ")"
);
}
}
}
function reloadConfiguration() {
var mtime = getMTime(configState.filename);
if (!mtime) return;
if (configState.lastMTime && (mtime.getTime() > configState.lastMTime.getTime())) {
configureOnceOff(loadConfigurationFile(configState.filename));
}
configState.lastMTime = mtime;
}
function getMTime(filename) {
var mtime;
try {
mtime = fs.statSync(configState.filename).mtime;
} catch (e) {
getLogger('log4js').warn('Failed to load configuration file ' + filename);
}
return mtime;
}
function initReloadConfiguration(filename, options) {
if (configState.timerId) {
clearInterval(configState.timerId);
delete configState.timerId;
}
configState.filename = filename;
configState.lastMTime = getMTime(filename);
configState.timerId = setInterval(reloadConfiguration, options.reloadSecs*1000);
}
*/
function load(file) {
return JSON.parse(fs.readFileSync(file, "utf-8"));
}
@ -178,25 +159,6 @@ function configure(configurationFileOrObject) {
validateCategories(config.categories);
categories = config.categories;
/*
var config = configurationFileOrObject;
config = config || process.env.LOG4JS_CONFIG;
options = options || {};
if (config === undefined || config === null || typeof(config) === 'string') {
if (options.reloadSecs) {
initReloadConfiguration(config, options);
}
config = loadConfigurationFile(config) || defaultConfig;
} else {
if (options.reloadSecs) {
getLogger('log4js').warn(
'Ignoring configuration reload parameter for "object" configuration.'
);
}
}
configureOnceOff(config, options);
*/
}
function validateCategories(cats) {
@ -259,48 +221,9 @@ function loadAppender(appender) {
appenderMakers[appender] = appenderModule.configure.bind(appenderModule);
}
/*
var originalConsoleFunctions = {
log: console.log,
debug: console.debug,
info: console.info,
warn: console.warn,
error: console.error
};
function replaceConsole(logger) {
function replaceWith(fn) {
return function() {
fn.apply(logger, arguments);
};
}
logger = logger || getLogger("console");
['log','debug','info','warn','error'].forEach(function (item) {
console[item] = replaceWith(item === 'log' ? logger.info : logger[item]);
});
}
function restoreConsole() {
['log', 'debug', 'info', 'warn', 'error'].forEach(function (item) {
console[item] = originalConsoleFunctions[item];
});
}
*/
module.exports = {
getLogger: getLogger,
configure: configure,
/*
replaceConsole: replaceConsole,
restoreConsole: restoreConsole,
levels: levels,
layouts: layouts,
appenders: {},
appenderMakers: appenderMakers,
connectLogger: require('./connect-logger').connectLogger
*/
};
//set ourselves up

View File

@ -1,116 +1,145 @@
"use strict";
var assert = require('assert');
var vows = require('vows');
var layouts = require('../lib/layouts');
var sandbox = require('sandboxed-module');
var LoggingEvent = require('../lib/logger').LoggingEvent;
var cluster = require('cluster');
var should = require('should')
, sandbox = require('sandboxed-module');
vows.describe('log4js cluster appender').addBatch({
'when in master mode': {
topic: function() {
var registeredClusterEvents = [];
var loggingEvents = [];
// Fake cluster module, so no cluster listeners be really added
var fakeCluster = {
on: function(event, callback) {
registeredClusterEvents.push(event);
},
isMaster: true,
isWorker: false,
};
var fakeActualAppender = function(loggingEvent) {
loggingEvents.push(loggingEvent);
}
// Load appender and fake modules in it
var appenderModule = sandbox.require('../lib/appenders/clustered', {
requires: {
'cluster': fakeCluster,
}
});
var masterAppender = appenderModule.appender({
actualAppenders: [ fakeActualAppender ]
});
describe('log4js in a cluster', function() {
describe('when in master mode', function() {
// Actual test - log message using masterAppender
masterAppender(new LoggingEvent('wovs', 'Info', ['masterAppender test']));
var returnValue = {
registeredClusterEvents: registeredClusterEvents,
loggingEvents: loggingEvents,
};
return returnValue;
},
"should register 'fork' event listener on 'cluster'": function(topic) {
assert.equal(topic.registeredClusterEvents[0], 'fork');
},
"should log using actual appender": function(topic) {
assert.equal(topic.loggingEvents[0].data[0], 'masterAppender test');
},
},
'when in worker mode': {
topic: function() {
var registeredProcessEvents = [];
// Fake cluster module, to fake we're inside a worker process
var fakeCluster = {
isMaster: false,
isWorker: true,
};
var fakeProcess = {
send: function(data) {
registeredProcessEvents.push(data);
},
};
// Load appender and fake modules in it
var appenderModule = sandbox.require('../lib/appenders/clustered', {
requires: {
'cluster': fakeCluster,
},
globals: {
'process': fakeProcess,
}
});
var workerAppender = appenderModule.appender();
var log4js
, clusterOnFork = false
, workerCb
, events = []
, worker = {
on: function(evt, cb) {
evt.should.eql('message');
this.cb = cb;
}
};
// Actual test - log message using masterAppender
workerAppender(new LoggingEvent('wovs', 'Info', ['workerAppender test']));
var returnValue = {
registeredProcessEvents: registeredProcessEvents,
};
return returnValue;
},
"worker appender should call process.send" : function(topic) {
assert.equal(topic.registeredProcessEvents[0].type, '::log-message');
assert.equal(JSON.parse(topic.registeredProcessEvents[0].event).data[0], "workerAppender test");
}
}
before(function() {
log4js = sandbox.require(
'../lib/log4js',
{
requires: {
'cluster': {
isMaster: true,
on: function(evt, cb) {
evt.should.eql('fork');
clusterOnFork = true;
cb(worker);
}
},
'./appenders/console': {
configure: function() {
return function(event) {
events.push(event);
};
}
}
}
}
);
});
}).exportTo(module);
it('should listen for fork events', function() {
clusterOnFork.should.be.true;
});
it('should listen for messages from workers', function() {
//workerCb was created in a different context to the test
//(thanks to sandbox.require), so doesn't pick up the should prototype
(typeof worker.cb).should.eql('function');
});
it('should log valid ::log4js-message events', function() {
worker.cb({
type: '::log4js-message',
event: JSON.stringify({
startTime: '2010-10-10 18:54:06',
category: 'cheese',
level: { levelStr: 'DEBUG' },
data: [ "blah" ]
})
});
events.should.have.length(1);
events[0].data[0].should.eql("blah");
events[0].category.should.eql('cheese');
//startTime was created in a different context to the test
//(thanks to sandbox.require), so instanceof doesn't think
//it's a Date.
events[0].startTime.constructor.name.should.eql('Date');
events[0].level.toString().should.eql('DEBUG');
});
it('should handle invalid ::log4js-message events', function() {
worker.cb({
type: '::log4js-message',
event: "biscuits"
});
worker.cb({
type: '::log4js-message',
event: JSON.stringify({
startTime: 'whatever'
})
});
events.should.have.length(3);
events[1].data[0].should.eql('Unable to parse log:');
events[1].data[1].should.eql('biscuits');
events[1].category.should.eql('log4js');
events[1].level.toString().should.eql('ERROR');
events[2].data[0].should.eql('Unable to parse log:');
events[2].data[1].should.eql(JSON.stringify({ startTime: 'whatever'}));
});
it('should ignore other events', function() {
worker.cb({
type: "::blah-blah",
event: "blah"
});
events.should.have.length(3);
});
});
describe('when in worker mode', function() {
var log4js, events = [];
before(function() {
log4js = sandbox.require(
'../lib/log4js',
{
requires: {
'cluster': {
isMaster: false,
on: function() {}
}
},
globals: {
'process': {
'send': function(event) {
events.push(event);
}
}
}
}
);
log4js.getLogger('test').debug("just testing");
});
it('should emit ::log4js-message events', function() {
events.should.have.length(1);
events[0].type.should.eql('::log4js-message');
events[0].event.should.be.a('string');
var evt = JSON.parse(events[0].event);
evt.category.should.eql('test');
evt.level.levelStr.should.eql('DEBUG');
evt.data[0].should.eql('just testing');
});
});
});

View File

@ -1,303 +0,0 @@
"use strict";
var vows = require('vows')
, sandbox = require('sandboxed-module')
, assert = require('assert')
;
function makeFakeNet() {
return {
logEvents: [],
data: [],
cbs: {},
createConnectionCalled: 0,
fakeAppender: function(logEvent) {
this.logEvents.push(logEvent);
},
createConnection: function(port, host) {
var fakeNet = this;
this.port = port;
this.host = host;
this.createConnectionCalled += 1;
return {
on: function(evt, cb) {
fakeNet.cbs[evt] = cb;
},
write: function(data, encoding) {
fakeNet.data.push(data);
fakeNet.encoding = encoding;
},
end: function() {
fakeNet.closeCalled = true;
}
};
},
createServer: function(cb) {
var fakeNet = this;
cb({
remoteAddress: '1.2.3.4',
remotePort: '1234',
setEncoding: function(encoding) {
fakeNet.encoding = encoding;
},
on: function(event, cb) {
fakeNet.cbs[event] = cb;
}
});
return {
listen: function(port, host) {
fakeNet.port = port;
fakeNet.host = host;
}
};
}
};
}
vows.describe('Multiprocess Appender').addBatch({
'worker': {
topic: function() {
var fakeNet = makeFakeNet(),
appender = sandbox.require(
'../lib/appenders/multiprocess',
{
requires: {
'net': fakeNet
}
}
).appender({ mode: 'worker', loggerPort: 1234, loggerHost: 'pants' });
//don't need a proper log event for the worker tests
appender('before connect');
fakeNet.cbs.connect();
appender('after connect');
fakeNet.cbs.close(true);
appender('after error, before connect');
fakeNet.cbs.connect();
appender('after error, after connect');
return fakeNet;
},
'should open a socket to the loggerPort and loggerHost': function(net) {
assert.equal(net.port, 1234);
assert.equal(net.host, 'pants');
},
'should buffer messages written before socket is connected': function(net) {
assert.equal(net.data[0], JSON.stringify('before connect'));
},
'should write log messages to socket as json strings with a terminator string': function(net) {
assert.equal(net.data[0], JSON.stringify('before connect'));
assert.equal(net.data[1], '__LOG4JS__');
assert.equal(net.data[2], JSON.stringify('after connect'));
assert.equal(net.data[3], '__LOG4JS__');
assert.equal(net.encoding, 'utf8');
},
'should attempt to re-open the socket on error': function(net) {
assert.equal(net.data[4], JSON.stringify('after error, before connect'));
assert.equal(net.data[5], '__LOG4JS__');
assert.equal(net.data[6], JSON.stringify('after error, after connect'));
assert.equal(net.data[7], '__LOG4JS__');
assert.equal(net.createConnectionCalled, 2);
}
},
'worker with timeout': {
topic: function() {
var fakeNet = makeFakeNet(),
appender = sandbox.require(
'../lib/appenders/multiprocess',
{
requires: {
'net': fakeNet
}
}
).appender({ mode: 'worker' });
//don't need a proper log event for the worker tests
appender('before connect');
fakeNet.cbs.connect();
appender('after connect');
fakeNet.cbs.timeout();
appender('after timeout, before close');
fakeNet.cbs.close();
appender('after close, before connect');
fakeNet.cbs.connect();
appender('after close, after connect');
return fakeNet;
},
'should attempt to re-open the socket': function(net) {
//skipping the __LOG4JS__ separators
assert.equal(net.data[0], JSON.stringify('before connect'));
assert.equal(net.data[2], JSON.stringify('after connect'));
assert.equal(net.data[4], JSON.stringify('after timeout, before close'));
assert.equal(net.data[6], JSON.stringify('after close, before connect'));
assert.equal(net.data[8], JSON.stringify('after close, after connect'));
assert.equal(net.createConnectionCalled, 2);
}
},
'worker defaults': {
topic: function() {
var fakeNet = makeFakeNet(),
appender = sandbox.require(
'../lib/appenders/multiprocess',
{
requires: {
'net': fakeNet
}
}
).appender({ mode: 'worker' });
return fakeNet;
},
'should open a socket to localhost:5000': function(net) {
assert.equal(net.port, 5000);
assert.equal(net.host, 'localhost');
}
},
'master': {
topic: function() {
var fakeNet = makeFakeNet(),
appender = sandbox.require(
'../lib/appenders/multiprocess',
{
requires: {
'net': fakeNet
}
}
).appender({ mode: 'master',
loggerHost: 'server',
loggerPort: 1234,
actualAppender: fakeNet.fakeAppender.bind(fakeNet)
});
appender('this should be sent to the actual appender directly');
return fakeNet;
},
'should listen for log messages on loggerPort and loggerHost': function(net) {
assert.equal(net.port, 1234);
assert.equal(net.host, 'server');
},
'should return the underlying appender': function(net) {
assert.equal(net.logEvents[0], 'this should be sent to the actual appender directly');
},
'when a client connects': {
topic: function(net) {
var logString = JSON.stringify(
{ level: { level: 10000, levelStr: 'DEBUG' }
, data: ['some debug']}
) + '__LOG4JS__';
net.cbs.data(
JSON.stringify(
{ level: { level: 40000, levelStr: 'ERROR' }
, data: ['an error message'] }
) + '__LOG4JS__'
);
net.cbs.data(logString.substring(0, 10));
net.cbs.data(logString.substring(10));
net.cbs.data(logString + logString + logString);
net.cbs.end(
JSON.stringify(
{ level: { level: 50000, levelStr: 'FATAL' }
, data: ["that's all folks"] }
) + '__LOG4JS__'
);
net.cbs.data('bad message__LOG4JS__');
return net;
},
'should parse log messages into log events and send to appender': function(net) {
assert.equal(net.logEvents[1].level.toString(), 'ERROR');
assert.equal(net.logEvents[1].data[0], 'an error message');
assert.equal(net.logEvents[1].remoteAddress, '1.2.3.4');
assert.equal(net.logEvents[1].remotePort, '1234');
},
'should parse log messages split into multiple chunks': function(net) {
assert.equal(net.logEvents[2].level.toString(), 'DEBUG');
assert.equal(net.logEvents[2].data[0], 'some debug');
assert.equal(net.logEvents[2].remoteAddress, '1.2.3.4');
assert.equal(net.logEvents[2].remotePort, '1234');
},
'should parse multiple log messages in a single chunk': function(net) {
assert.equal(net.logEvents[3].data[0], 'some debug');
assert.equal(net.logEvents[4].data[0], 'some debug');
assert.equal(net.logEvents[5].data[0], 'some debug');
},
'should handle log messages sent as part of end event': function(net) {
assert.equal(net.logEvents[6].data[0], "that's all folks");
},
'should handle unparseable log messages': function(net) {
assert.equal(net.logEvents[7].level.toString(), 'ERROR');
assert.equal(net.logEvents[7].categoryName, 'log4js');
assert.equal(net.logEvents[7].data[0], 'Unable to parse log:');
assert.equal(net.logEvents[7].data[1], 'bad message');
}
}
},
'master defaults': {
topic: function() {
var fakeNet = makeFakeNet(),
appender = sandbox.require(
'../lib/appenders/multiprocess',
{
requires: {
'net': fakeNet
}
}
).appender({ mode: 'master' });
return fakeNet;
},
'should listen for log messages on localhost:5000': function(net) {
assert.equal(net.port, 5000);
assert.equal(net.host, 'localhost');
}
}
}).addBatch({
'configure': {
topic: function() {
var results = {}
, fakeNet = makeFakeNet()
, appender = sandbox.require(
'../lib/appenders/multiprocess',
{
requires: {
'net': fakeNet,
'../log4js': {
loadAppender: function(app) {
results.appenderLoaded = app;
},
appenderMakers: {
'madeupappender': function(config, options) {
results.config = config;
results.options = options;
}
}
}
}
}
).configure(
{
mode: 'master',
appender: {
type: 'madeupappender',
cheese: 'gouda'
}
},
{ crackers: 'jacobs' }
);
return results;
},
'should load underlying appender for master': function(results) {
assert.equal(results.appenderLoaded, 'madeupappender');
},
'should pass config to underlying appender': function(results) {
assert.equal(results.config.cheese, 'gouda');
},
'should pass options to underlying appender': function(results) {
assert.equal(results.options.crackers, 'jacobs');
}
}
}).exportTo(module);