From aa0ce62a856297f404a18ed29157bec284062727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Thu, 29 Sep 2016 15:09:36 +0200 Subject: [PATCH] Implement batch logger to log query times when queries are defined with id --- app.js | 2 + batch/batch-logger.js | 86 +++++++++++++++++++++++++++++++++++++++++++ batch/batch.js | 5 ++- batch/index.js | 7 ++-- package.json | 1 + 5 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 batch/batch-logger.js diff --git a/app.js b/app.js index 10261bfb..bf0719ab 100755 --- a/app.js +++ b/app.js @@ -99,6 +99,8 @@ process.on('SIGHUP', function() { global.logger = global.log4js.getLogger(); console.log('Log files reloaded'); }); + + server.batch.logger.reopenFileStreams(); }); process.on('SIGTERM', function () { diff --git a/batch/batch-logger.js b/batch/batch-logger.js new file mode 100644 index 00000000..0948f94b --- /dev/null +++ b/batch/batch-logger.js @@ -0,0 +1,86 @@ +'use strict'; + +var bunyan = require('bunyan'); +var fs = require('fs'); + +var debug = require('./util/debug')('batch-logger'); + +var JobUtils = require('./models/job_state_machine'); +var jobUtils = new JobUtils(); +var JobFallback = require('./models/job_fallback'); + +function BatchLogger (path) { + this.path = path; + this.stream = this.path ? fs.createWriteStream(this.path, { flags: 'a', encoding: 'utf8' }) : process.stdout; + this.logger = bunyan.createLogger({ + name: 'batch-queries', + streams: [{ + level: 'info', + stream: this.stream + }] + }); +} + +module.exports = BatchLogger; + +BatchLogger.prototype.log = function (job) { + if (!isFinished(job)) { + return; + } + + var queries = job.data.query.query; + + for (var i = 0; i < queries.length; i++) { + var query = queries[i]; + + if (!query.id) { + continue; + } + + var node = parseQueryId(query.id); + var output = { + username: job.data.user, + job: job.data.job_id, + analysis: node.analysis, + node: node.id, + type: node.type, + elapsedTime: calculateElpasedTime(query.started_at, query.ended_at) + }; + + debug('analysis %j', output); + + this.logger.info(output); + } +}; + +function isFinished (job) { + return job instanceof JobFallback && + jobUtils.isFinalStatus(job.data.status) && + (!job.data.fallback_status || jobUtils.isFinalStatus(job.data.fallback_status)); +} + +BatchLogger.prototype.reopenFileStreams = function () { + this.logger.reopenFileStreams(); +}; + +function parseQueryId (queryId) { + var data = queryId.split(':'); + + return { + analysis: data[0], + id: data[1], + type: data[2] + }; +} + +function calculateElpasedTime (started_at, ended_at) { + if (!started_at || !ended_at) { + return; + } + + var start = new Date(started_at); + var end = new Date(ended_at); + var elapsedTimeMilliseconds = end.getTime() - start.getTime(); + + return elapsedTimeMilliseconds; +} diff --git a/batch/batch.js b/batch/batch.js index 3edcec0f..76ea6b39 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -7,12 +7,13 @@ var forever = require('./util/forever'); var queue = require('queue-async'); var jobStatus = require('./job_status'); -function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService) { +function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService, logger) { EventEmitter.call(this); this.jobSubscriber = jobSubscriber; this.jobQueuePool = jobQueuePool; this.jobRunner = jobRunner; this.jobService = jobService; + this.logger = logger; } util.inherits(Batch, EventEmitter); @@ -90,6 +91,8 @@ Batch.prototype._consumeJobs = function (host, queue, callback) { debug('Job %s %s in %s', job_id, job.data.status, host); } + self.logger.log(job); + self.emit('job:' + job.data.status, job_id); callback(); diff --git a/batch/index.js b/batch/index.js index afe9ea00..53a0e479 100644 --- a/batch/index.js +++ b/batch/index.js @@ -1,6 +1,5 @@ 'use strict'; - var RedisPool = require('redis-mpool'); var _ = require('underscore'); var JobRunner = require('./job_runner'); @@ -14,9 +13,10 @@ var JobPublisher = require('./job_publisher'); var JobQueue = require('./job_queue'); var JobBackend = require('./job_backend'); var JobService = require('./job_service'); +var BatchLogger = require('./batch-logger'); var Batch = require('./batch'); -module.exports = function batchFactory (metadataBackend, redisConfig, statsdClient) { +module.exports = function batchFactory (metadataBackend, redisConfig, statsdClient, loggerPath) { var redisPoolSubscriber = new RedisPool(_.extend(redisConfig, { name: 'batch-subscriber'})); var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'})); var queueSeeker = new QueueSeeker(metadataBackend); @@ -30,6 +30,7 @@ module.exports = function batchFactory (metadataBackend, redisConfig, statsdClie var jobCanceller = new JobCanceller(userDatabaseMetadataService); var jobService = new JobService(jobBackend, jobCanceller); var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, statsdClient); + var logger = new BatchLogger(loggerPath); - return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService); + return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService, logger); }; diff --git a/package.json b/package.json index 7650cbe0..6c974713 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,7 @@ "Sandro Santilli " ], "dependencies": { + "bunyan": "1.8.1", "cartodb-psql": "~0.6.0", "cartodb-query-tables": "0.2.0", "cartodb-redis": "0.13.1",