2015-12-10 03:17:45 +08:00
|
|
|
'use strict';
|
|
|
|
|
|
|
|
var PSQL = require('cartodb-psql');
|
|
|
|
|
|
|
|
function JobService() {
|
|
|
|
}
|
|
|
|
|
|
|
|
JobService.prototype.run = function (userDatabaseMetada, callback) {
|
|
|
|
var self = this;
|
|
|
|
|
|
|
|
var pg = new PSQL(userDatabaseMetada, {}, { destroyOnError: true });
|
|
|
|
|
|
|
|
this.getJob(pg, function (err, job) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
self.setJobRunning(pg, job, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
2015-12-10 05:05:04 +08:00
|
|
|
self.runJob(pg, job, function (err, jobResult) {
|
2015-12-10 03:17:45 +08:00
|
|
|
if (err) {
|
2015-12-10 22:08:31 +08:00
|
|
|
|
2015-12-10 03:17:45 +08:00
|
|
|
self.setJobFailed(pg, job, err.message, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
callback(null, jobResult);
|
|
|
|
});
|
2015-12-10 22:08:31 +08:00
|
|
|
|
2015-12-10 03:17:45 +08:00
|
|
|
} else {
|
2015-12-10 22:08:31 +08:00
|
|
|
|
2015-12-10 03:17:45 +08:00
|
|
|
self.setJobDone(pg, job, function (err) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
callback(null, jobResult);
|
|
|
|
});
|
2015-12-10 22:08:31 +08:00
|
|
|
|
2015-12-10 03:17:45 +08:00
|
|
|
}
|
|
|
|
});
|
|
|
|
});
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2015-12-11 01:09:43 +08:00
|
|
|
JobService.prototype.getJob = function (pg, callback) {
|
|
|
|
var getNextJob = "SELECT * FROM cdb_jobs WHERE status='pending' ORDER BY updated_at ASC LIMIT 1";
|
|
|
|
|
|
|
|
pg.query(getNextJob, function (err, result) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
|
|
|
|
callback(null, result.rows[0]);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
2015-12-10 05:05:04 +08:00
|
|
|
JobService.prototype.runJob = function (pg, job, callback) {
|
|
|
|
var query = job.query;
|
|
|
|
|
|
|
|
if (job.query.match(/SELECT\s.*FROM\s.*/i)) {
|
2015-12-10 22:08:31 +08:00
|
|
|
query = 'SELECT * INTO "job_' + job.job_id + '" FROM (' + job.query + ') AS j';
|
2015-12-10 05:05:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
pg.query(query, function (err, jobResult) {
|
2015-12-10 03:17:45 +08:00
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
callback(null, jobResult);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
JobService.prototype.setJobRunning = function (pg, job, callback) {
|
|
|
|
var runningJobQuery = [
|
|
|
|
'UPDATE cdb_jobs SET ',
|
|
|
|
'status = \'running\', ',
|
|
|
|
'updated_at = now() ',
|
|
|
|
' WHERE ',
|
|
|
|
'job_id = \'' + job.job_id + '\' ',
|
|
|
|
' RETURNING job_id;'
|
|
|
|
].join('\n');
|
|
|
|
|
|
|
|
pg.query(runningJobQuery, function (err, result) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
callback(null, result);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
JobService.prototype.setJobDone = function (pg, job, callback) {
|
|
|
|
var doneJobQuery = [
|
|
|
|
'UPDATE cdb_jobs SET ',
|
|
|
|
'status = \'done\', ',
|
|
|
|
'updated_at = now() ',
|
|
|
|
' WHERE ',
|
|
|
|
'job_id = \'' + job.job_id + '\' ',
|
|
|
|
' RETURNING job_id;'
|
|
|
|
].join('\n');
|
|
|
|
|
|
|
|
pg.query(doneJobQuery, function (err, result) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
callback(null, result);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
JobService.prototype.setJobFailed = function (pg, job, message, callback) {
|
|
|
|
var failedJobQuery = [
|
|
|
|
'UPDATE cdb_jobs SET ',
|
|
|
|
'status = \'failed\', ',
|
|
|
|
'failed_reason = \'' + message + '\', ',
|
|
|
|
'updated_at = now() ',
|
|
|
|
' WHERE ',
|
|
|
|
'job_id = \'' + job.job_id + '\' ',
|
|
|
|
' RETURNING job_id;'
|
|
|
|
].join('\n');
|
|
|
|
|
|
|
|
pg.query(failedJobQuery, function (err, result) {
|
|
|
|
if (err) {
|
|
|
|
return callback(err);
|
|
|
|
}
|
|
|
|
callback(null, result);
|
|
|
|
});
|
|
|
|
};
|
|
|
|
|
|
|
|
module.exports = JobService;
|