Changed multiprocess appender to use a single socket per client

This commit is contained in:
Gareth Jones 2012-07-04 08:44:50 +10:00
parent df491c0b14
commit a33e48cb07
3 changed files with 140 additions and 83 deletions

View File

@ -1,62 +1,120 @@
var log4js = require('../log4js'); var log4js = require('../log4js'),
var layouts = require('../layouts'); net = require('net'),
var net = require('net'); END_MSG = '__LOG4JS__';
var util = require('util');
var LogServer = function createLogServer(config) { /**
var actualAppender = config.actualAppender; * Creates a server, listening on config.loggerPort, config.loggerHost.
var server = net.createServer(function serverCreated(clientSocket) { * Output goes to config.actualAppender (config.appender is used to
clientSocket.on('connect', function clientConnected() { * set up that appender).
var logMessage = ''; */
clientSocket.on('data', function chunkReceived(chunk) { function logServer(config) {
logMessage += chunk;
}); /**
clientSocket.on('end', function gotChunks() { * Takes a utf-8 string, returns an object with
* the correct log properties.
*/
function deserializeLoggingEvent(clientSocket, msg) {
var loggingEvent;
try { try {
var loggingEvent = JSON.parse(logMessage); loggingEvent = JSON.parse(msg);
deserializeLoggingEvent(loggingEvent);
actualAppender(loggingEvent);
} catch (e) {
// JSON.parse failed, just log the contents probably a naughty.
actualAppender(createLoggingEvent('ERROR', 'Unable to parse log: ' + logMessage));
}
});
});
});
server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost');
}
function createLoggingEvent(level, message) {
return {
startTime: new Date(),
categoryName: 'log4js',
level: { toString: function () {
return level;
}},
data: [ message ]
};
}
function deserializeLoggingEvent(loggingEvent) {
loggingEvent.startTime = new Date(loggingEvent.startTime); loggingEvent.startTime = new Date(loggingEvent.startTime);
loggingEvent.level.toString = function levelToString() { loggingEvent.level.toString = function levelToString() {
return loggingEvent.level.levelStr; return loggingEvent.level.levelStr;
}; };
} catch (e) {
// JSON.parse failed, just log the contents probably a naughty.
loggingEvent = {
startTime: new Date(),
categoryName: 'log4js',
level: { toString: function () {
return 'ERROR';
}
},
data: [ 'Unable to parse log:', msg ]
};
}
loggingEvent.remoteAddress = clientSocket.remoteAddress;
loggingEvent.remotePort = clientSocket.remotePort;
return loggingEvent;
}
var actualAppender = config.actualAppender,
server = net.createServer(function serverCreated(clientSocket) {
clientSocket.setEncoding('utf8');
clientSocket.on('connect', function clientConnected() {
var logMessage = '';
function logTheMessage(msg) {
if (logMessage.length > 0) {
actualAppender(deserializeLoggingEvent(clientSocket, msg));
}
}
function chunkReceived(chunk) {
var event;
logMessage += chunk || '';
if (logMessage.indexOf(END_MSG) > -1) {
event = logMessage.substring(0, logMessage.indexOf(END_MSG));
logTheMessage(event);
logMessage = logMessage.substring(event.length + END_MSG.length) || '';
//check for more, maybe it was a big chunk
chunkReceived();
}
}
clientSocket.on('data', chunkReceived);
clientSocket.on('end', chunkReceived);
});
});
server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost');
return actualAppender;
} }
function workerAppender(config) { function workerAppender(config) {
return function log(loggingEvent) { var canWrite = false,
var socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost'); buffer = [],
socket.on('connect', function socketConnected() { socket;
socket.end(JSON.stringify(loggingEvent), 'utf8');
createSocket();
function createSocket() {
socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
socket.on('connect', function() {
emptyBuffer();
canWrite = true;
}); });
socket.on('timeout', socket.end.bind(socket));
//don't bother listening for 'error', 'close' gets called after that anyway
socket.on('close', createSocket);
}
function emptyBuffer() {
var evt;
while ((evt = buffer.shift())) {
write(evt);
}
}
function write(loggingEvent) {
socket.write(JSON.stringify(loggingEvent), 'utf8');
socket.write(END_MSG, 'utf8');
}
return function log(loggingEvent) {
if (canWrite) {
write(loggingEvent);
} else {
buffer.push(loggingEvent);
}
}; };
} }
function createAppender(config) { function createAppender(config) {
if (config.mode === 'master') { if (config.mode === 'master') {
var server = new LogServer(config); return logServer(config);
return config.actualAppender;
} else { } else {
return workerAppender(config); return workerAppender(config);
} }

View File

@ -576,5 +576,4 @@ vows.describe('log4js').addBatch({
assert.equal(logEvents[1].data[0], 'info3'); assert.equal(logEvents[1].data[0], 'info3');
} }
} }
}).export(module); }).export(module);