From 20f00d58d9c999af29bfb0cc9e3b074d13b4ed28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa=20Aubert?= Date: Fri, 8 Jan 2016 15:47:59 +0100 Subject: [PATCH] Refactored batch service to avoid event noise, doing in callback way --- app/controllers/job_controller.js | 21 +++---- batch/batch.js | 30 ++++------ batch/index.js | 14 ++--- batch/job_backend.js | 71 +++++++++++++---------- batch/job_canceller.js | 94 +++++++++++++++---------------- batch/job_runner.js | 90 ++++++++++++++--------------- test/acceptance/batch.test.js | 1 + 7 files changed, 158 insertions(+), 163 deletions(-) diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index 8678e9e2..f54802bc 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -28,6 +28,7 @@ function JobController(metadataBackend, tableCache, statsd_client) { this.statsd_client = statsd_client; this.jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); this.userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + this.jobCanceller = new JobCanceller(this.metadataBackend, this.userDatabaseMetadataService, this.jobBackend); } JobController.prototype.route = function (app) { @@ -102,21 +103,17 @@ JobController.prototype.cancelJob = function (req, res) { req.profiler.done('setDBAuth'); } - var jobCanceller = new JobCanceller(self.metadataBackend, self.userDatabaseMetadataService); - jobCanceller.cancel(job_id) - .on('cancelled', function (job) { - // job is cancelled but surelly jobRunner has not deal whith it yet and it's not saved - job.status = 'cancelled'; + self.jobCanceller.cancel(job_id, function (err, job) { + if (err) { + return next(err); + } - next(null, { - job: job, - host: userDatabase.host - }); - }) - .on('error', function (err) { - next(err); + next(null, { + job: job, + host: userDatabase.host }); + }); }, function handleResponse(err, result) { if ( err ) { diff --git a/batch/batch.js b/batch/batch.js index 1b9d87b4..88d7693e 100644 --- a/batch/batch.js +++ b/batch/batch.js @@ -27,7 +27,7 @@ Batch.prototype.start = function () { // do forever, it does not cause a stack overflow forever(function (next) { - self._consume(host, queue, next); + self._consumeJobs(host, queue, next); }, function (err) { self.jobQueuePool.remove(host); @@ -44,7 +44,7 @@ Batch.prototype.stop = function () { this.jobSubscriber.unsubscribe(); }; -Batch.prototype._consume = function consume(host, queue, callback) { +Batch.prototype._consumeJobs = function (host, queue, callback) { var self = this; queue.dequeue(host, function (err, job_id) { @@ -58,22 +58,16 @@ Batch.prototype._consume = function consume(host, queue, callback) { return callback(emptyQueueError); } - self.jobRunner.run(job_id) - .on('done', function (job) { - console.log('Job %s done in %s', job_id, host); - self.emit('job:done', job.job_id); - callback(); - }) - .on('failed', function (job) { - console.log('Job %s failed in %s', job_id, host); - self.emit('job:failed', job.job_id); - callback(); - }) - .on('error', function (err) { - console.error('Error in job %s due to:', job_id, err.message || err); - self.emit('job:failed', job_id); - callback(); - }); + self.jobRunner.run(job_id, function (err, job) { + if (err) { + return callback(err); + } + + console.log('Job %s %s in %s', job_id, job.status, host); + self.emit('job:' + job.status, job_id); + + callback(); + }); }); }; diff --git a/batch/index.js b/batch/index.js index cae48c61..48ce3cde 100644 --- a/batch/index.js +++ b/batch/index.js @@ -7,25 +7,19 @@ var UserDatabaseMetadataService = require('./user_database_metadata_service'); var JobPublisher = require('./job_publisher'); var JobQueue = require('./job_queue'); var UserIndexer = require('./user_indexer'); +var JobBackend = require('./job_backend'); var Batch = require('./batch'); module.exports = function batchFactory (metadataBackend) { var jobSubscriber = new JobSubscriber(); var jobQueuePool = new JobQueuePool(metadataBackend); - - var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); var jobPublisher = new JobPublisher(); var jobQueue = new JobQueue(metadataBackend); var userIndexer = new UserIndexer(metadataBackend); - - var jobRunner = new JobRunner( - metadataBackend, - userDatabaseMetadataService, - jobPublisher, - jobQueue, - userIndexer - ); + var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); + var userDatabaseMetadataService = new UserDatabaseMetadataService(metadataBackend); + var jobRunner = new JobRunner(jobBackend, userDatabaseMetadataService); return new Batch(jobSubscriber, jobQueuePool, jobRunner); }; diff --git a/batch/job_backend.js b/batch/job_backend.js index 63fe26fb..18573ecf 100644 --- a/batch/job_backend.js +++ b/batch/job_backend.js @@ -1,21 +1,17 @@ 'use strict'; -var util = require('util'); -var EventEmitter = require('events').EventEmitter; var uuid = require('node-uuid'); var queue = require('queue-async'); -var JOBS_TTL_AFTER_RESOLUTION = 48 * 3600; +var JOBS_TTL_IN_SECONDS = 48 * 3600; function JobBackend(metadataBackend, jobQueueProducer, jobPublisher, userIndexer) { - EventEmitter.call(this); + this.db = 5; + this.redisPrefix = 'batch:jobs:'; this.metadataBackend = metadataBackend; this.jobQueueProducer = jobQueueProducer; this.jobPublisher = jobPublisher; this.userIndexer = userIndexer; - this.db = 5; - this.redisPrefix = 'batch:jobs:'; } -util.inherits(JobBackend, EventEmitter); JobBackend.prototype.create = function (username, sql, host, callback) { var self = this; @@ -188,92 +184,107 @@ JobBackend.prototype.get = function (job_id, callback) { }); }; -JobBackend.prototype.setRunning = function (job) { - var self = this; +JobBackend.prototype.setRunning = function (job, callback) { + var now = new Date().toISOString(); var redisParams = [ this.redisPrefix + job.job_id, 'status', 'running', - 'updated_at', new Date().toISOString() + 'updated_at', now ]; this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams, function (err) { if (err) { - return self.emit('error', err); + return callback(err); } - self.emit('running', job); + job.status = 'running'; + job.updated_at = now; + + callback(null, job); }); }; -JobBackend.prototype.setDone = function (job) { +JobBackend.prototype.setDone = function (job, callback) { var self = this; + var now = new Date().toISOString(); var redisKey = this.redisPrefix + job.job_id; var redisParams = [ redisKey, 'status', 'done', - 'updated_at', new Date().toISOString() + 'updated_at', now ]; this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { if (err) { - return self.emit('error', err); + return callback(err); } - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) { + self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { if (err) { - return self.emit('error', err); + return callback(err); } - self.emit('done', job); + job.status = 'done'; + job.updated_at = now; + + callback(null, job); }); }); }; -JobBackend.prototype.setFailed = function (job, err) { +JobBackend.prototype.setFailed = function (job, err, callback) { var self = this; + var now = new Date().toISOString(); var redisKey = this.redisPrefix + job.job_id; var redisParams = [ redisKey, 'status', 'failed', 'failed_reason', err.message, - 'updated_at', new Date().toISOString() + 'updated_at', now ]; this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { if (err) { - return self.emit('error', err); + return callback(err); } - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) { + self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { if (err) { - return self.emit('error', err); + return callback(err); } - self.emit('failed', job); + job.status = 'failed'; + job.updated_at = now; + + callback(null, job); }); }); }; -JobBackend.prototype.setCancelled = function (job) { +JobBackend.prototype.setCancelled = function (job, callback) { var self = this; + var now = new Date().toISOString(); var redisKey = this.redisPrefix + job.job_id; var redisParams = [ redisKey, 'status', 'cancelled', - 'updated_at', new Date().toISOString() + 'updated_at', now ]; this.metadataBackend.redisCmd(this.db, 'HMSET', redisParams , function (err) { if (err) { - return self.emit('error', err); + return callback(err); } - self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_AFTER_RESOLUTION ], function (err) { + self.metadataBackend.redisCmd(self.db, 'EXPIRE', [ redisKey, JOBS_TTL_IN_SECONDS ], function (err) { if (err) { - return self.emit('error', err); + return callback(err); } - self.emit('cancelled', job); + job.status = 'cancelled'; + job.updated_at = now; + + callback(null, job); }); }); diff --git a/batch/job_canceller.js b/batch/job_canceller.js index e74437dd..8e4b4ab8 100644 --- a/batch/job_canceller.js +++ b/batch/job_canceller.js @@ -1,77 +1,75 @@ 'use strict'; -var JobBackend = require('./job_backend'); var PSQL = require('cartodb-psql'); -var JobPublisher = require('./job_publisher'); -var JobQueue = require('./job_queue'); -var UserIndexer = require('./user_indexer'); -function JobCanceller(metadataBackend, userDatabaseMetadataService) { +function JobCanceller(metadataBackend, userDatabaseMetadataService, jobBackend) { this.metadataBackend = metadataBackend; this.userDatabaseMetadataService = userDatabaseMetadataService; + this.jobBackend = jobBackend; } -JobCanceller.prototype.cancel = function (job_id) { +JobCanceller.prototype.cancel = function (job_id, callback) { var self = this; - var jobQueue = new JobQueue(this.metadataBackend); - var jobPublisher = new JobPublisher(); - var userIndexer = new UserIndexer(this.metadataBackend); - var jobBackend = new JobBackend(this.metadataBackend, jobQueue, jobPublisher, userIndexer); - jobBackend.get(job_id, function (err, job) { + self.jobBackend.get(job_id, function (err, job) { if (err) { - return jobBackend.emit('error', err); + return callback(err); } if (job.status === 'pending') { - return jobBackend.setCancelled(job); + return self.jobBackend.setCancelled(job, callback); } if (job.status !== 'running') { - return jobBackend.emit('error', new Error('Job is ' + job.status + ' nothing to do')); + return callback(new Error('Job is ' + job.status + ' nothing to do')); } self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) { if (err) { - return jobBackend.emit('error', err); + return callback(err); } - var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); - - var getPIDQuery = 'SELECT pid FROM pg_stat_activity WHERE query = \'' + - job.query + - ' /* ' + job.job_id + ' */\''; - - pg.query(getPIDQuery, function(err, result) { - if(err) { - return jobBackend.emit('error', err); - } - - if (!result.rows[0] || !result.rows[0].pid) { - return jobBackend.emit('error', new Error('Query not running currently')); - } - - var pid = result.rows[0].pid; - var cancelQuery = 'SELECT pg_cancel_backend(' + pid +')'; - - pg.query(cancelQuery, function (err, result) { - if (err) { - return jobBackend.emit('error', err); - } - - var isCancelled = result.rows[0].pg_cancel_backend; - - if (!isCancelled) { - return jobBackend.emit('error', new Error('Query has not been cancelled')); - } - - jobBackend.emit('cancelled', job); - }); - }); + self._query(job, userDatabaseMetadata, callback); }); }); +}; - return jobBackend; +JobCanceller.prototype._query = function (job, userDatabaseMetadata, callback) { + var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); + var getPIDQuery = 'SELECT pid FROM pg_stat_activity WHERE query = \'' + job.query + + ' /* ' + job.job_id + ' */\''; + + pg.query(getPIDQuery, function(err, result) { + if(err) { + return callback(err); + } + + if (!result.rows[0] || !result.rows[0].pid) { + return callback(new Error('Query not running currently')); + } + + var pid = result.rows[0].pid; + var cancelQuery = 'SELECT pg_cancel_backend(' + pid +')'; + + pg.query(cancelQuery, function (err, result) { + if (err) { + return callback(err); + } + + var isCancelled = result.rows[0].pg_cancel_backend; + + if (!isCancelled) { + return callback(new Error('Query has not been cancelled')); + } + + // JobRunner handles job status through the PG's client error handler (see JobRunner.run:48) + // Due to user needs feedback, this modifies to the current status and updated dat + job.updated_at = new Date().toISOString(); + job.status = 'cancelled'; + + callback(null, job); + }); + }); }; diff --git a/batch/job_runner.js b/batch/job_runner.js index 835506ee..762f3984 100644 --- a/batch/job_runner.js +++ b/batch/job_runner.js @@ -1,74 +1,74 @@ 'use strict'; -var JobBackend = require('./job_backend'); var PSQL = require('cartodb-psql'); var QUERY_CANCELED = '57014'; -function JobRunner(metadataBackend, userDatabaseMetadataService, jobPublisher, jobQueue, userIndexer) { - this.metadataBackend = metadataBackend; +function JobRunner(jobBackend, userDatabaseMetadataService) { + this.jobBackend = jobBackend; this.userDatabaseMetadataService = userDatabaseMetadataService; - this.jobPublisher = jobPublisher; - this.jobQueue = jobQueue; - this.userIndexer = userIndexer; } -JobRunner.prototype.run = function (job_id) { +JobRunner.prototype.run = function (job_id, callback) { var self = this; - var jobBackend = new JobBackend(this.metadataBackend, this.jobQueue, this.jobPublisher, this.userIndexer); - - jobBackend.get(job_id, function (err, job) { + self.jobBackend.get(job_id, function (err, job) { if (err) { - return jobBackend.emit('error', err); + return callback(err); } if (job.status !== 'pending') { - return jobBackend.emit('error', - new Error('Cannot run job ' + job.job_id + ' due to its status is ' + job.status)); + return callback(new Error('Cannot run job ' + job.job_id + ' due to its status is ' + job.status)); } self.userDatabaseMetadataService.getUserMetadata(job.user, function (err, userDatabaseMetadata) { if (err) { - return jobBackend.emit('error', err); + return callback(err); } - var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); - - jobBackend.setRunning(job); - - pg.query('SET statement_timeout=0', function(err) { - if(err) { - return jobBackend.setFailed(job, err); + self.jobBackend.setRunning(job, function (err, job) { + if (err) { + return callback(err); } - // mark query to allow to users cancel their queries whether users request for it - var sql = job.query + ' /* ' + job.job_id + ' */'; - - pg.eventedQuery(sql, function (err, query /* , queryCanceller */) { - if (err) { - return jobBackend.setFailed(job, err); - } - - query.on('error', function (err) { - if (err.code === QUERY_CANCELED) { - return jobBackend.setCancelled(job); - } - - jobBackend.setFailed(job, err); - }); - - query.on('end', function (result) { - if (result) { - jobBackend.setDone(job); - } - }); - }); + self._query(job, userDatabaseMetadata, callback); }); }); }); - - return jobBackend; }; +JobRunner.prototype._query = function (job, userDatabaseMetadata, callback) { + var self = this; + + var pg = new PSQL(userDatabaseMetadata, {}, { destroyOnError: true }); + + pg.query('SET statement_timeout=0', function (err) { + if(err) { + return self.jobBackend.setFailed(job, err, callback); + } + + // mark query to allow to users cancel their queries whether users request for it + var sql = job.query + ' /* ' + job.job_id + ' */'; + + pg.eventedQuery(sql, function (err, query) { + if (err) { + return self.jobBackend.setFailed(job, err, callback); + } + + query.on('error', function (err) { + if (err.code === QUERY_CANCELED) { + return self.jobBackend.setCancelled(job, callback); + } + + self.jobBackend.setFailed(job, err, callback); + }); + + query.on('end', function (result) { + if (result) { + self.jobBackend.setDone(job, callback); + } + }); + }); + }); +}; module.exports = JobRunner; diff --git a/test/acceptance/batch.test.js b/test/acceptance/batch.test.js index 4f3af8ae..2e480201 100644 --- a/test/acceptance/batch.test.js +++ b/test/acceptance/batch.test.js @@ -21,6 +21,7 @@ describe('batch module', function() { var jobPublisher = new JobPublisher(); var userIndexer = new UserIndexer(metadataBackend); var jobBackend = new JobBackend(metadataBackend, jobQueue, jobPublisher, userIndexer); + var batch = new Batch(metadataBackend); before(function () {