From 78de73a274128a781522fe767cee618a87f07bb1 Mon Sep 17 00:00:00 2001 From: Gareth Jones Date: Mon, 19 Dec 2011 16:58:21 +1100 Subject: [PATCH] Working version of fully-async log rolling file appender - tests need fixing though --- lib/appenders/file.js | 126 +++------------------- lib/streams.js | 221 ++++++++++++++++++++++++++++++++++++++ package.json | 1 + test/bufferedStream.js | 130 ++++++++++++++++++++++ test/rollingFileStream.js | 118 ++++++++++++++++++++ 5 files changed, 482 insertions(+), 114 deletions(-) create mode 100644 lib/streams.js create mode 100644 test/bufferedStream.js create mode 100644 test/rollingFileStream.js diff --git a/lib/appenders/file.js b/lib/appenders/file.js index c6be910..c76852a 100644 --- a/lib/appenders/file.js +++ b/lib/appenders/file.js @@ -1,6 +1,7 @@ var layouts = require('../layouts') , path = require('path') -, fs = require('fs'); +, fs = require('fs') +, streams = require('../streams'); /** * File Appender writing the logs to a text file. Supports rolling of logs by size. @@ -18,132 +19,29 @@ function fileAppender (file, layout, logSize, numBackups) { //there has to be at least one backup if logSize has been specified numBackups = numBackups === 0 ? 1 : numBackups; - function setupLogRolling () { - try { - var stat = fs.statSync(file); - bytesWritten = stat.size; - if (bytesWritten >= logSize) { - rollThatLog(); - } - } catch (e) { - //file does not exist - bytesWritten = 0; - } - } - - function rollThatLog () { - function index(filename) { - return parseInt(filename.substring((path.basename(file) + '.').length), 10) || 0; - } - - var nameMatcher = new RegExp('^' + path.basename(file)); - function justTheLogFiles (item) { - return nameMatcher.test(item); - } - - function byIndex(a, b) { - if (index(a) > index(b)) { - return 1; - } else if (index(a) < index(b) ) { - return -1; - } else { - return 0; - } - } - - function increaseFileIndex (fileToRename) { - var idx = index(fileToRename); - if (idx < numBackups) { - //on windows, you can get a EEXIST error if you rename a file to an existing file - //so, we'll try to delete the file we're renaming to first - try { - fs.unlinkSync(file + '.' + (idx+1)); - } catch (e) { - //couldn't delete, but that could be because it doesn't exist - //try renaming anyway - } - fs.renameSync(path.join(path.dirname(file), fileToRename), file + '.' + (idx + 1)); - } - } - - //roll the backups (rename file.n to file.n+1, where n <= numBackups) - fs.readdirSync(path.dirname(file)) - .filter(justTheLogFiles) - .sort(byIndex) - .reverse() - .forEach(increaseFileIndex); - - //let's make a new file - var newLogFileFD = fs.openSync(file, 'a', 0644) - , oldLogFileFD = logFile.fd; - logFile.fd = newLogFileFD; - fs.close(oldLogFileFD); - //reset the counter - bytesWritten = 0; - } - - function fileExists (filename) { - try { - fs.statSync(filename); - return true; - } catch (e) { - return false; - } - } - - function openTheStream() { - var stream = fs.createWriteStream(file, { flags: 'a', mode: 0644, encoding: 'utf8' }); - stream.on("open", function() { - canWrite = true; - flushBuffer(); - }); + function openTheStream(file, fileSize, numFiles) { + var stream = new streams.BufferedWriteStream( + new streams.RollingFileStream( + file, + fileSize, + numFiles + ) + ); stream.on("error", function (err) { console.error("log4js.fileAppender - Writing to file %s, error happened ", file, err); }); - stream.on("drain", function() { - canWrite = true; - flushBuffer(); - if (logEventBuffer.length > 0) { - writeToLog(logEventBuffer.shift()); - } - }); return stream; } - function flushBuffer() { - while (logEventBuffer.length > 0 && canWrite) { - writeToLog(logEventBuffer.shift()); - } - } - - var logEventBuffer = [] - , canWrite = false - , logFile = openTheStream(); - - if (logSize > 0) { - setupLogRolling(); - } + var logFile = openTheStream(file, logSize, numBackups); //close the file on process exit. process.on('exit', function() { - flushBuffer(); logFile.end(); - logFile.destroy(); }); - function writeToLog(loggingEvent) { - var logMessage = layout(loggingEvent)+'\n'; - //not entirely accurate, but it'll do. - bytesWritten += logMessage.length; - canWrite = logFile.write(logMessage, "utf8"); - if (bytesWritten >= logSize) { - rollThatLog(); - } - } - return function(loggingEvent) { - logEventBuffer.push(loggingEvent); - flushBuffer(); + logFile.write(layout(loggingEvent)+'\n', "utf8"); }; } diff --git a/lib/streams.js b/lib/streams.js new file mode 100644 index 0000000..40b95b9 --- /dev/null +++ b/lib/streams.js @@ -0,0 +1,221 @@ +var util = require('util') +, fs = require('fs') +, path = require('path') +, events = require('events') +, async = require('async'); + +function BufferedWriteStream(stream) { + var that = this; + this.stream = stream; + this.buffer = []; + this.canWrite = false; + this.bytes = 0; + + this.stream.on("open", function() { + that.canWrite = true; + that.flushBuffer(); + }); + + this.stream.on("error", function (err) { + that.emit("error", err); + }); + + this.stream.on("drain", function() { + that.canWrite = true; + that.flushBuffer(); + }); +} + +util.inherits(BufferedWriteStream, events.EventEmitter); + +Object.defineProperty( + BufferedWriteStream.prototype, + "fd", + { + get: function() { return this.stream.fd; }, + set: function(newFd) { + this.stream.fd = newFd; + this.bytes = 0; + } + } +); + +Object.defineProperty( + BufferedWriteStream.prototype, + "bytesWritten", + { + get: function() { return this.bytes; } + } +); + +BufferedWriteStream.prototype.write = function(data, encoding) { + this.buffer.push({ data: data, encoding: encoding }); + this.flushBuffer(); +} + +BufferedWriteStream.prototype.end = function(data, encoding) { + if (data) { + this.buffer.push({ data: data, encoding: encoding }); + } + this.flushBufferEvenIfCannotWrite(); +} + +BufferedWriteStream.prototype.writeToStream = function(toWrite) { + this.bytes += toWrite.data.length; + this.canWrite = this.stream.write(toWrite.data, toWrite.encoding); +} + +BufferedWriteStream.prototype.flushBufferEvenIfCannotWrite = function() { + while (this.buffer.length > 0) { + this.writeToStream(this.buffer.shift()); + } +} + +BufferedWriteStream.prototype.flushBuffer = function() { + while (this.buffer.length > 0 && this.canWrite) { + this.writeToStream(this.buffer.shift()); + } +} + +function RollingFileStream (filename, size, backups, options) { + this.filename = filename; + this.size = size; + this.backups = backups || 1; + this.options = options || { encoding: "utf8", mode: 0644, flags: 'a' }; + this.rolling = false; + this.writesWhileRolling = []; + + throwErrorIfArgumentsAreNotValid(); + + RollingFileStream.super_.call(this, this.filename, this.options); + this.bytesWritten = currentFileSize(this.filename); + + function currentFileSize(file) { + var fileSize = 0; + try { + fileSize = fs.statSync(file).size; + } catch (e) { + //file does not exist + } + return fileSize; + } + + function throwErrorIfArgumentsAreNotValid() { + if (!filename || !size || size <= 0) { + throw new Error("You must specify a filename and file size"); + } + } +} +util.inherits(RollingFileStream, fs.FileWriteStream); + +RollingFileStream.prototype.write = function(data, encoding) { + if (this.rolling) { + this.writesWhileRolling.push({ data: data, encoding: encoding }); + return false; + } else { + var canWrite = RollingFileStream.super_.prototype.write.call(this, data, encoding); + //this.bytesWritten += data.length; + console.log("bytesWritten: %d, max: %d", this.bytesWritten, this.size); + if (this.bytesWritten >= this.size) { + this.roll(); + } + return canWrite; + } +} + +RollingFileStream.prototype.roll = function () { + var that = this, + nameMatcher = new RegExp('^' + path.basename(this.filename)); + + function justTheseFiles (item) { + return nameMatcher.test(item); + } + + function index(filename) { + return parseInt(filename.substring((path.basename(that.filename) + '.').length), 10) || 0; + } + + function byIndex(a, b) { + if (index(a) > index(b)) { + return 1; + } else if (index(a) < index(b) ) { + return -1; + } else { + return 0; + } + } + + function increaseFileIndex (fileToRename, cb) { + var idx = index(fileToRename); + console.log("Index of %s is %d", fileToRename, idx); + if (idx < that.backups) { + //on windows, you can get a EEXIST error if you rename a file to an existing file + //so, we'll try to delete the file we're renaming to first + fs.unlink(that.filename + '.' + (idx+1), function (err) { + //ignore err: if we could not delete, it's most likely that it doesn't exist + console.log("Renaming %s -> %s", fileToRename, that.filename + '.' + (idx+1)); + fs.rename(path.join(path.dirname(that.filename), fileToRename), that.filename + '.' + (idx + 1), cb); + }); + } else { + cb(); + } + } + + function renameTheFiles(cb) { + //roll the backups (rename file.n to file.n+1, where n <= numBackups) + console.log("Renaming the old files"); + fs.readdir(path.dirname(that.filename), function (err, files) { + async.forEachSeries( + files.filter(justTheseFiles).sort(byIndex).reverse(), + increaseFileIndex, + cb + ); + }); + } + + function openANewFile(cb) { + console.log("Opening a new file"); + fs.open( + that.filename, + that.options.flags, + that.options.mode, + function (err, fd) { + console.log("opened new file"); + var oldLogFileFD = that.fd; + that.fd = fd; + that.writable = true; + fs.close(oldLogFileFD, function() { + that.bytesWritten = 0; + cb(); + }); + } + ); + } + + function emptyRollingQueue(cb) { + console.log("emptying the rolling queue"); + var toWrite; + while ((toWrite = that.writesWhileRolling.shift())) { + RollingFileStream.super_.prototype.write.call(that, toWrite.data, toWrite.encoding); + that.bytesWritten += toWrite.data.length; + } + that.rolling = false; + cb(); + } + + console.log("Starting roll"); + console.log("Queueing up data until we've finished rolling"); + this.rolling = true; + console.log("Flushing underlying stream"); + this.flush(); + + async.series([ + renameTheFiles, + openANewFile, + emptyRollingQueue + ]); + +} + +exports.RollingFileStream = RollingFileStream; +exports.BufferedWriteStream = BufferedWriteStream; \ No newline at end of file diff --git a/package.json b/package.json index e1ddda5..687f4e8 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ "lib": "lib" }, "dependencies": { + "async": "0.1.15" }, "devDependencies": { "vows": ">=0.5.2", diff --git a/test/bufferedStream.js b/test/bufferedStream.js new file mode 100644 index 0000000..4755b28 --- /dev/null +++ b/test/bufferedStream.js @@ -0,0 +1,130 @@ +var vows = require('vows') +, assert = require('assert') +, events = require('events') +, BufferedWriteStream = require('../lib/streams').BufferedWriteStream; + +function FakeStream() { + this.writes = []; + this.canWrite = false; + this.callbacks = {}; +} + +FakeStream.prototype.on = function(event, callback) { + this.callbacks[event] = callback; +} + +FakeStream.prototype.write = function(data, encoding) { + assert.equal("utf8", encoding); + this.writes.push(data); + return this.canWrite; +} + +FakeStream.prototype.emit = function(event, payload) { + this.callbacks[event](payload); +} + +FakeStream.prototype.block = function() { + this.canWrite = false; +} + +FakeStream.prototype.unblock = function() { + this.canWrite = true; + this.emit("drain"); +} + +vows.describe('BufferedWriteStream').addBatch({ + 'stream': { + topic: new BufferedWriteStream(new FakeStream()), + 'should take a stream as an argument and return a stream': function(stream) { + assert.instanceOf(stream, events.EventEmitter); + } + }, + 'before stream is open': { + topic: function() { + var fakeStream = new FakeStream(), + stream = new BufferedWriteStream(fakeStream); + stream.write("Some data", "utf8"); + stream.write("Some more data", "utf8"); + return fakeStream.writes; + }, + 'should buffer writes': function(writes) { + assert.equal(writes.length, 0); + } + }, + 'when stream is open': { + topic: function() { + var fakeStream = new FakeStream(), + stream = new BufferedWriteStream(fakeStream); + stream.write("Some data", "utf8"); + fakeStream.canWrite = true; + fakeStream.emit("open"); + stream.write("Some more data", "utf8"); + return fakeStream.writes; + }, + 'should write data to stream from before stream was open': function (writes) { + assert.equal(writes[0], "Some data"); + }, + 'should write data to stream from after stream was open': function (writes) { + assert.equal(writes[1], "Some more data"); + } + }, + 'when stream is blocked': { + topic: function() { + var fakeStream = new FakeStream(), + stream = new BufferedWriteStream(fakeStream); + fakeStream.emit("open"); + fakeStream.block(); + stream.write("will not know it is blocked until first write", "utf8"); + stream.write("so this one will be buffered, but not the previous one", "utf8"); + return fakeStream.writes; + }, + 'should buffer writes': function (writes) { + assert.equal(writes.length, 1); + assert.equal(writes[0], "will not know it is blocked until first write"); + } + }, + 'when stream is unblocked': { + topic: function() { + var fakeStream = new FakeStream(), + stream = new BufferedWriteStream(fakeStream); + fakeStream.emit("open"); + fakeStream.block(); + stream.write("will not know it is blocked until first write", "utf8"); + stream.write("so this one will be buffered, but not the previous one", "utf8"); + fakeStream.unblock(); + return fakeStream.writes; + }, + 'should send buffered data': function (writes) { + assert.equal(writes.length, 2); + assert.equal(writes[1], "so this one will be buffered, but not the previous one"); + } + }, + 'when stream is closed': { + topic: function() { + var fakeStream = new FakeStream(), + stream = new BufferedWriteStream(fakeStream); + fakeStream.emit("open"); + fakeStream.block(); + stream.write("first write to notice stream is blocked", "utf8"); + stream.write("data while blocked", "utf8"); + stream.end(); + return fakeStream.writes; + }, + 'should send any buffered writes to the stream': function (writes) { + assert.equal(writes.length, 2); + assert.equal(writes[1], "data while blocked"); + } + }, + 'when stream errors': { + topic: function() { + var fakeStream = new FakeStream(), + stream = new BufferedWriteStream(fakeStream); + stream.on("error", this.callback); + fakeStream.emit("error", "oh noes!"); + }, + 'should emit error': function(err, value) { + assert.equal(err, "oh noes!"); + } + } + +}).exportTo(module); \ No newline at end of file diff --git a/test/rollingFileStream.js b/test/rollingFileStream.js new file mode 100644 index 0000000..edec054 --- /dev/null +++ b/test/rollingFileStream.js @@ -0,0 +1,118 @@ +var vows = require('vows') +, assert = require('assert') +, events = require('events') +, fs = require('fs') +, RollingFileStream = require('../lib/streams').RollingFileStream; + +function remove(filename) { + try { + fs.unlinkSync(filename); + } catch (e) { + //doesn't really matter if it failed + } +} + +vows.describe('RollingFileStream').addBatch({ + 'arguments': { + topic: function() { + remove(__dirname + "/test-rolling-file-stream"); + return new RollingFileStream("test-rolling-file-stream", 1024, 5); + }, + 'should take a filename, file size in bytes, number of backups as arguments and return a FileWriteStream': function(stream) { + assert.instanceOf(stream, fs.FileWriteStream); + assert.equal(stream.filename, "test-rolling-file-stream"); + assert.equal(stream.size, 1024); + assert.equal(stream.backups, 5); + }, + 'with default settings for the underlying stream': function(stream) { + assert.equal(stream.mode, 420); + assert.equal(stream.flags, 'a'); + assert.equal(stream.encoding, 'utf8'); + } + }, + 'with stream arguments': { + topic: function() { + remove(__dirname + '/test-rolling-file-stream'); + return new RollingFileStream('test-rolling-file-stream', 1024, 5, { mode: 0666 }); + }, + 'should pass them to the underlying stream': function(stream) { + assert.equal(stream.mode, 0666); + } + }, + 'without size': { + topic: function() { + try { + new RollingFileStream(__dirname + "/test-rolling-file-stream"); + } catch (e) { + return e; + } + }, + 'should throw an error': function(err) { + assert.instanceOf(err, Error); + } + }, + 'without number of backups': { + topic: function() { + remove('test-rolling-file-stream'); + return new RollingFileStream(__dirname + "/test-rolling-file-stream", 1024); + }, + 'should default to 1 backup': function(stream) { + assert.equal(stream.backups, 1); + } + }, + 'writing less than the file size': { + topic: function() { + remove(__dirname + "/test-rolling-file-stream-write-less"); + var that = this, stream = new RollingFileStream(__dirname + "/test-rolling-file-stream-write-less", 100); + stream.on("open", function() { that.callback(null, stream); }); + }, + '(when open)': { + topic: function(stream) { + stream.write("cheese", "utf8"); + stream.end(); + fs.readFile(__dirname + "/test-rolling-file-stream-write-less", "utf8", this.callback); + }, + 'should write to the file': function(contents) { + assert.equal(contents, "cheese"); + }, + 'the number of files': { + topic: function() { + fs.readdir(__dirname, this.callback); + }, + 'should be one': function(files) { + assert.equal(files.filter(function(file) { return file.indexOf('test-rolling-file-stream-write-less') > -1; }).length, 1); + } + } + } + }, + 'writing more than the file size': { + topic: function() { + remove(__dirname + "/test-rolling-file-stream-write-more"); + remove(__dirname + "/test-rolling-file-stream-write-more.1"); + var that = this, stream = new RollingFileStream(__dirname + "/test-rolling-file-stream-write-more", 45); + stream.on("open", function() { that.callback(null, stream); }); + }, + '(when open)': { + topic: function(stream) { + var that = this; + for (var i=0; i < 7; i++) { + stream.write(i +".cheese\n", "utf8"); + } + stream.end(function (err) { + fs.readFile(__dirname + "/test-rolling-file-stream-write-more", "utf8", that.callback); + }); + }, + 'should write to the file': function(contents) { + assert.equal(contents, "5.cheese\n6.cheese\n"); + }, + 'the number of files': { + topic: function() { + fs.readdir(__dirname, this.callback); + }, + 'should be two': function(files) { + assert.equal(files.filter(function(file) { return file.indexOf('test-rolling-file-stream-write-more') > -1; }).length, 2); + } + } + } + } +}).exportTo(module);