diff --git a/batch/job_status.js b/batch/job_status.js index d212679f..889f1b43 100644 --- a/batch/job_status.js +++ b/batch/job_status.js @@ -6,6 +6,7 @@ var JOB_STATUS_ENUM = { DONE: 'done', CANCELLED: 'cancelled', FAILED: 'failed', + SKIPPED: 'skipped', UNKNOWN: 'unknown' }; diff --git a/batch/models/job_base.js b/batch/models/job_base.js index 03c3451d..ab6da063 100644 --- a/batch/models/job_base.js +++ b/batch/models/job_base.js @@ -1,18 +1,9 @@ 'use strict'; -var assert = require('assert'); +var util = require('util'); var uuid = require('node-uuid'); +var JobStateMachine = require('./job_state_machine'); var jobStatus = require('../job_status'); -var validStatusTransitions = [ - [jobStatus.PENDING, jobStatus.RUNNING], - [jobStatus.PENDING, jobStatus.CANCELLED], - [jobStatus.PENDING, jobStatus.UNKNOWN], - [jobStatus.RUNNING, jobStatus.DONE], - [jobStatus.RUNNING, jobStatus.FAILED], - [jobStatus.RUNNING, jobStatus.CANCELLED], - [jobStatus.RUNNING, jobStatus.PENDING], - [jobStatus.RUNNING, jobStatus.UNKNOWN] -]; var mandatoryProperties = [ 'job_id', 'status', @@ -24,6 +15,8 @@ var mandatoryProperties = [ ]; function JobBase(data) { + JobStateMachine.call(this); + var now = new Date().toISOString(); this.data = data; @@ -40,24 +33,10 @@ function JobBase(data) { this.data.updated_at = now; } } +util.inherits(JobBase, JobStateMachine); module.exports = JobBase; -JobBase.prototype.isValidStatusTransition = function (initialStatus, finalStatus) { - var transition = [ initialStatus, finalStatus ]; - - for (var i = 0; i < validStatusTransitions.length; i++) { - try { - assert.deepEqual(transition, validStatusTransitions[i]); - return true; - } catch (e) { - continue; - } - } - - return false; -}; - // should be implemented by childs JobBase.prototype.getNextQuery = function () { throw new Error('Unimplemented method'); @@ -105,7 +84,7 @@ JobBase.prototype.setQuery = function (query) { JobBase.prototype.setStatus = function (finalStatus, errorMesssage) { var now = new Date().toISOString(); var initialStatus = this.data.status; - var isValid = this.isValidStatusTransition(initialStatus, finalStatus); + var isValid = this.isValidTransition(initialStatus, finalStatus); if (!isValid) { throw new Error('Cannot set status from ' + initialStatus + ' to ' + finalStatus); diff --git a/batch/models/job_fallback.js b/batch/models/job_fallback.js index 22d08547..6117cb72 100644 --- a/batch/models/job_fallback.js +++ b/batch/models/job_fallback.js @@ -3,34 +3,29 @@ var util = require('util'); var JobBase = require('./job_base'); var jobStatus = require('../job_status'); -var breakStatus = [ - jobStatus.CANCELLED, - jobStatus.FAILED, - jobStatus.UNKNOWN -]; -function isBreakStatus(status) { - return breakStatus.indexOf(status) !== -1; -} -var finalStatus = [ - jobStatus.CANCELLED, - jobStatus.DONE, - jobStatus.FAILED, - jobStatus.UNKNOWN -]; -function isFinalStatus(status) { - return finalStatus.indexOf(status) !== -1; -} +var QueryFallback = require('./query/query_fallback'); +var MainFallback = require('./query/main_fallback'); +var QueryFactory = require('./query/query_factory'); function JobFallback(jobDefinition) { JobBase.call(this, jobDefinition); this.init(); + + this.queries = []; + for (var i = 0; i < this.data.query.query.length; i++) { + this.queries[i] = QueryFactory.create(this.data, i); + } + + if (MainFallback.is(this.data)) { + this.fallback = new MainFallback(); + } } util.inherits(JobFallback, JobBase); module.exports = JobFallback; -// from user: { +// 1. from user: { // query: { // query: [{ // query: 'select ...', @@ -39,7 +34,8 @@ module.exports = JobFallback; // onerror: 'select ...' // } // } -// from redis: { +// +// 2. from redis: { // status: 'pending', // fallback_status: 'pending' // query: { @@ -63,11 +59,7 @@ JobFallback.is = function (query) { } for (var i = 0; i < query.query.length; i++) { - if (!query.query[i].query) { - return false; - } - - if (typeof query.query[i].query !== 'string') { + if (!QueryFallback.is(query.query[i])) { return false; } } @@ -76,98 +68,65 @@ JobFallback.is = function (query) { }; JobFallback.prototype.init = function () { - // jshint maxcomplexity: 8 for (var i = 0; i < this.data.query.query.length; i++) { - if ((this.data.query.query[i].onsuccess || this.data.query.query[i].onerror) && - !this.data.query.query[i].status) { + if (shouldInitStatus(this.data.query.query[i])){ this.data.query.query[i].status = jobStatus.PENDING; + } + if (shouldInitQueryFallbackStatus(this.data.query.query[i])) { this.data.query.query[i].fallback_status = jobStatus.PENDING; - } else if (!this.data.query.query[i].status){ - this.data.query.query[i].status = jobStatus.PENDING; } } - if ((this.data.query.onsuccess || this.data.query.onerror) && !this.data.status) { + if (shouldInitStatus(this.data)) { this.data.status = jobStatus.PENDING; - this.data.fallback_status = jobStatus.PENDING; + } - } else if (!this.data.status) { - this.data.status = jobStatus.PENDING; + if (shouldInitFallbackStatus(this.data)) { + this.data.fallback_status = jobStatus.PENDING; + } +}; + +function shouldInitStatus(jobOrQuery) { + return !jobOrQuery.status; +} + +function shouldInitQueryFallbackStatus(query) { + return (query.onsuccess || query.onerror) && !query.fallback_status; +} + +function shouldInitFallbackStatus(job) { + return (job.query.onsuccess || job.query.onerror) && !job.fallback_status; +} + +JobFallback.prototype.getNextQueryFromQueries = function () { + for (var i = 0; i < this.queries.length; i++) { + if (this.queries[i].hasNextQuery(this.data)) { + return this.queries[i].getNextQuery(this.data); + } + } +}; + +JobFallback.prototype.hasNextQueryFromQueries = function () { + return !!this.getNextQueryFromQueries(); +}; + +JobFallback.prototype.getNextQueryFromFallback = function () { + if (this.fallback && this.fallback.hasNextQuery(this.data)) { + + return this.fallback.getNextQuery(this.data); } }; JobFallback.prototype.getNextQuery = function () { - var query = this._getNextQueryFromQuery(); + var query = this.getNextQueryFromQueries(); if (!query) { - query = this._getNextQueryFromJobFallback(); + query = this.getNextQueryFromFallback(); } return query; }; -JobFallback.prototype._hasNextQueryFromQuery = function () { - return !!this._getNextQueryFromQuery(); -}; - -JobFallback.prototype._getNextQueryFromQuery = function () { - // jshint maxcomplexity: 8 - for (var i = 0; i < this.data.query.query.length; i++) { - - if (this.data.query.query[i].fallback_status) { - if (this._isNextQuery(i)) { - return this.data.query.query[i].query; - } else if (this._isNextQueryOnSuccess(i)) { - return this.data.query.query[i].onsuccess; - } else if (this._isNextQueryOnError(i)) { - return this.data.query.query[i].onerror; - } else if (isBreakStatus(this.data.query.query[i].status)) { - return; - } - } else if (this.data.query.query[i].status === jobStatus.PENDING) { - return this.data.query.query[i].query; - } - } -}; - -JobFallback.prototype._getNextQueryFromJobFallback = function () { - if (this.data.fallback_status) { - if (this._isNextQueryOnSuccessJob()) { - return this.data.query.onsuccess; - } else if (this._isNextQueryOnErrorJob()) { - return this.data.query.onerror; - } - } -}; - -JobFallback.prototype._isNextQuery = function (index) { - return this.data.query.query[index].status === jobStatus.PENDING; -}; - -JobFallback.prototype._isNextQueryOnSuccess = function (index) { - return this.data.query.query[index].status === jobStatus.DONE && - this.data.query.query[index].onsuccess && - this.data.query.query[index].fallback_status === jobStatus.PENDING; -}; - -JobFallback.prototype._isNextQueryOnError = function (index) { - return this.data.query.query[index].status === jobStatus.FAILED && - this.data.query.query[index].onerror && - this.data.query.query[index].fallback_status === jobStatus.PENDING; -}; - -JobFallback.prototype._isNextQueryOnSuccessJob = function () { - return this.data.status === jobStatus.DONE && - this.data.query.onsuccess && - this.data.fallback_status === jobStatus.PENDING; -}; - -JobFallback.prototype._isNextQueryOnErrorJob = function () { - return this.data.status === jobStatus.FAILED && - this.data.query.onerror && - this.data.fallback_status === jobStatus.PENDING; -}; - JobFallback.prototype.setQuery = function (query) { if (!JobFallback.is(query)) { throw new Error('You must indicate a valid SQL'); @@ -178,126 +137,72 @@ JobFallback.prototype.setQuery = function (query) { JobFallback.prototype.setStatus = function (status, errorMesssage) { var now = new Date().toISOString(); - var resultFromQuery = this._setQueryStatus(status, errorMesssage); - var resultFromJob = this._setJobStatus(status, resultFromQuery.isChangeAppliedToQueryFallback, errorMesssage); - if (!resultFromJob.isValid && !resultFromQuery.isValid) { - throw new Error('Cannot set status from ' + this.data.status + ' to ' + status); + var hasChanged = this.setQueryStatus(status, this.data, errorMesssage); + hasChanged = this.setJobStatus(status, this.data, hasChanged, errorMesssage); + hasChanged = this.setFallbackStatus(status, this.data, hasChanged); + + if (!hasChanged.isValid) { + throw new Error('Cannot set status to ' + status); } this.data.updated_at = now; }; -JobFallback.prototype._getLastStatusFromFinishedQuery = function () { - var lastStatus = jobStatus.DONE; - - for (var i = 0; i < this.data.query.query.length; i++) { - if (this.data.query.query[i].fallback_status) { - if (isFinalStatus(this.data.query.query[i].status)) { - lastStatus = this.data.query.query[i].status; - } else { - break; - } - } else { - if (isFinalStatus(this.data.query.query[i].status)) { - lastStatus = this.data.query.query[i].status; - } else { - break; - } - } - } - - return lastStatus; +JobFallback.prototype.setQueryStatus = function (status, job, errorMesssage) { + return this.queries.reduce(function (hasChanged, query) { + var result = query.setStatus(status, this.data, hasChanged, errorMesssage); + return result.isValid ? result : hasChanged; + }.bind(this), { isValid: false, appliedToFallback: false }); }; -JobFallback.prototype._setJobStatus = function (status, isChangeAppliedToQueryFallback, errorMesssage) { - var isValid = false; - - status = this._shiftJobStatus(status, isChangeAppliedToQueryFallback); - - isValid = this.isValidStatusTransition(this.data.status, status); - - if (isValid) { - this.data.status = status; - } else if (this.data.fallback_status) { - - isValid = this.isValidStatusTransition(this.data.fallback_status, status); - - if (isValid) { - this.data.fallback_status = status; - } - } - - if (status === jobStatus.FAILED && errorMesssage && !isChangeAppliedToQueryFallback) { - this.data.failed_reason = errorMesssage; - } - - return { - isValid: isValid +JobFallback.prototype.setJobStatus = function (status, job, hasChanged, errorMesssage) { + var result = { + isValid: false, + appliedToFallback: false }; + + status = this.shiftStatus(status, hasChanged); + + result.isValid = this.isValidTransition(job.status, status); + if (result.isValid) { + job.status = status; + if (status === jobStatus.FAILED && errorMesssage && !hasChanged.appliedToFallback) { + job.failed_reason = errorMesssage; + } + } + + return result.isValid ? result : hasChanged; }; -JobFallback.prototype._shiftJobStatus = function (status, isChangeAppliedToQueryFallback) { +JobFallback.prototype.setFallbackStatus = function (status, job, hasChanged) { + var result = hasChanged; + + if (this.fallback && !this.hasNextQueryFromQueries()) { + result = this.fallback.setStatus(status, job, hasChanged); + } + + return result.isValid ? result : hasChanged; +}; + +JobFallback.prototype.shiftStatus = function (status, hasChanged) { // jshint maxcomplexity: 7 - - // In some scenarios we have to change the normal flow in order to keep consistency - // between query's status and job's status. - - if (isChangeAppliedToQueryFallback) { - if (!this._hasNextQueryFromQuery() && (status === jobStatus.DONE || status === jobStatus.FAILED)) { - status = this._getLastStatusFromFinishedQuery(); + if (hasChanged.appliedToFallback) { + if (!this.hasNextQueryFromQueries() && (status === jobStatus.DONE || status === jobStatus.FAILED)) { + status = this.getLastFinishedStatus(); } else if (status === jobStatus.DONE || status === jobStatus.FAILED){ status = jobStatus.PENDING; } - } else if (this._hasNextQueryFromQuery() && status !== jobStatus.RUNNING) { + } else if (this.hasNextQueryFromQueries() && status !== jobStatus.RUNNING) { status = jobStatus.PENDING; } return status; }; - -JobFallback.prototype._shouldTryToApplyStatusTransitionToQueryFallback = function (index) { - return (this.data.query.query[index].status === jobStatus.DONE && this.data.query.query[index].onsuccess) || - (this.data.query.query[index].status === jobStatus.FAILED && this.data.query.query[index].onerror); -}; - -JobFallback.prototype._setQueryStatus = function (status, errorMesssage) { - // jshint maxcomplexity: 7 - var isValid = false; - var isChangeAppliedToQueryFallback = false; - - for (var i = 0; i < this.data.query.query.length; i++) { - isValid = this.isValidStatusTransition(this.data.query.query[i].status, status); - - if (isValid) { - this.data.query.query[i].status = status; - - if (status === jobStatus.FAILED && errorMesssage) { - this.data.query.query[i].failed_reason = errorMesssage; - } - - break; - } - - if (this._shouldTryToApplyStatusTransitionToQueryFallback(i)) { - isValid = this.isValidStatusTransition(this.data.query.query[i].fallback_status, status); - - if (isValid) { - this.data.query.query[i].fallback_status = status; - - if (status === jobStatus.FAILED && errorMesssage) { - this.data.query.query[i].failed_reason = errorMesssage; - } - - isChangeAppliedToQueryFallback = true; - break; - } - } - } - - return { - isValid: isValid, - isChangeAppliedToQueryFallback: isChangeAppliedToQueryFallback - }; +JobFallback.prototype.getLastFinishedStatus = function () { + return this.queries.reduce(function (lastFinished, query) { + var status = query.getStatus(this.data); + return this.isFinalStatus(status) ? status : lastFinished; + }.bind(this), jobStatus.DONE); }; diff --git a/batch/models/job_multiple.js b/batch/models/job_multiple.js index a1fb8317..85cf1d87 100644 --- a/batch/models/job_multiple.js +++ b/batch/models/job_multiple.js @@ -76,7 +76,7 @@ JobMultiple.prototype.setStatus = function (finalStatus, errorMesssage) { } for (var i = 0; i < this.data.query.length; i++) { - var isValid = JobMultiple.super_.prototype.isValidStatusTransition(this.data.query[i].status, finalStatus); + var isValid = JobMultiple.super_.prototype.isValidTransition(this.data.query[i].status, finalStatus); if (isValid) { this.data.query[i].status = finalStatus; diff --git a/batch/models/job_state_machine.js b/batch/models/job_state_machine.js new file mode 100644 index 00000000..21ee60e5 --- /dev/null +++ b/batch/models/job_state_machine.js @@ -0,0 +1,46 @@ +'use strict'; + +var assert = require('assert'); +var jobStatus = require('../job_status'); +var finalStatus = [ + jobStatus.CANCELLED, + jobStatus.DONE, + jobStatus.FAILED, + jobStatus.UNKNOWN +]; + +var validStatusTransitions = [ + [jobStatus.PENDING, jobStatus.RUNNING], + [jobStatus.PENDING, jobStatus.CANCELLED], + [jobStatus.PENDING, jobStatus.UNKNOWN], + [jobStatus.PENDING, jobStatus.SKIPPED], + [jobStatus.RUNNING, jobStatus.DONE], + [jobStatus.RUNNING, jobStatus.FAILED], + [jobStatus.RUNNING, jobStatus.CANCELLED], + [jobStatus.RUNNING, jobStatus.PENDING], + [jobStatus.RUNNING, jobStatus.UNKNOWN] +]; + +function JobStateMachine () { +} + +module.exports = JobStateMachine; + +JobStateMachine.prototype.isValidTransition = function (initialStatus, finalStatus) { + var transition = [ initialStatus, finalStatus ]; + + for (var i = 0; i < validStatusTransitions.length; i++) { + try { + assert.deepEqual(transition, validStatusTransitions[i]); + return true; + } catch (e) { + continue; + } + } + + return false; +}; + +JobStateMachine.prototype.isFinalStatus = function (status) { + return finalStatus.indexOf(status) !== -1; +}; diff --git a/batch/models/query/fallback.js b/batch/models/query/fallback.js new file mode 100644 index 00000000..abfa89df --- /dev/null +++ b/batch/models/query/fallback.js @@ -0,0 +1,69 @@ +'use strict'; + +var util = require('util'); +var QueryBase = require('./query_base'); +var jobStatus = require('../../job_status'); + +function Fallback(index) { + QueryBase.call(this, index); +} +util.inherits(Fallback, QueryBase); + +module.exports = Fallback; + +Fallback.is = function (query) { + if (query.onsuccess || query.onerror) { + return true; + } + return false; +}; + +Fallback.prototype.getNextQuery = function (job) { + if (this.hasOnSuccess(job)) { + return this.getOnSuccess(job); + } + if (this.hasOnError(job)) { + return this.getOnError(job); + } +}; + +Fallback.prototype.getOnSuccess = function (job) { + if (job.query.query[this.index].status === jobStatus.DONE && + job.query.query[this.index].fallback_status === jobStatus.PENDING) { + return job.query.query[this.index].onsuccess; + } +}; + +Fallback.prototype.hasOnSuccess = function (job) { + return !!this.getOnSuccess(job); +}; + +Fallback.prototype.getOnError = function (job) { + if (job.query.query[this.index].status === jobStatus.FAILED && + job.query.query[this.index].fallback_status === jobStatus.PENDING) { + return job.query.query[this.index].onerror; + } +}; + +Fallback.prototype.hasOnError = function (job) { + return !!this.getOnError(job); +}; + +Fallback.prototype.setStatus = function (status, job, errorMessage) { + var isValid = false; + + isValid = this.isValidTransition(job.query.query[this.index].fallback_status, status); + + if (isValid) { + job.query.query[this.index].fallback_status = status; + if (status === jobStatus.FAILED && errorMessage) { + job.query.query[this.index].failed_reason = errorMessage; + } + } + + return isValid; +}; + +Fallback.prototype.getStatus = function (job) { + return job.query.query[this.index].fallback_status; +}; diff --git a/batch/models/query/main_fallback.js b/batch/models/query/main_fallback.js new file mode 100644 index 00000000..7c52194b --- /dev/null +++ b/batch/models/query/main_fallback.js @@ -0,0 +1,74 @@ +'use strict'; + +var util = require('util'); +var QueryBase = require('./query_base'); +var jobStatus = require('../../job_status'); + +function MainFallback() { + QueryBase.call(this); +} +util.inherits(MainFallback, QueryBase); + +module.exports = MainFallback; + +MainFallback.is = function (job) { + if (job.query.onsuccess || job.query.onerror) { + return true; + } + return false; +}; + +MainFallback.prototype.getNextQuery = function (job) { + if (this.hasOnSuccess(job)) { + return this.getOnSuccess(job); + } + + if (this.hasOnError(job)) { + return this.getOnError(job); + } +}; + +MainFallback.prototype.getOnSuccess = function (job) { + if (job.status === jobStatus.DONE && job.fallback_status === jobStatus.PENDING) { + return job.query.onsuccess; + } +}; + +MainFallback.prototype.hasOnSuccess = function (job) { + return !!this.getOnSuccess(job); +}; + +MainFallback.prototype.getOnError = function (job) { + if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.PENDING) { + return job.query.onerror; + } +}; + +MainFallback.prototype.hasOnError = function (job) { + return !!this.getOnError(job); +}; + +MainFallback.prototype.setStatus = function (status, job, previous) { + var isValid = false; + var appliedToFallback = false; + + if (previous.isValid && !previous.appliedToFallback) { + if (this.isFinalStatus(status) && !this.hasNextQuery(job)) { + isValid = this.isValidTransition(job.fallback_status, jobStatus.SKIPPED); + + if (isValid) { + job.fallback_status = jobStatus.SKIPPED; + appliedToFallback = true; + } + } + } else if (!previous.isValid) { + isValid = this.isValidTransition(job.fallback_status, status); + + if (isValid) { + job.fallback_status = status; + appliedToFallback = true; + } + } + + return { isValid: isValid, appliedToFallback: appliedToFallback }; +}; diff --git a/batch/models/query/query.js b/batch/models/query/query.js new file mode 100644 index 00000000..37fb9cef --- /dev/null +++ b/batch/models/query/query.js @@ -0,0 +1,45 @@ +'use strict'; + +var util = require('util'); +var QueryBase = require('./query_base'); +var jobStatus = require('../../job_status'); + +function Query(index) { + QueryBase.call(this, index); +} +util.inherits(Query, QueryBase); + +module.exports = Query; + +Query.is = function (query) { + if (query.query && typeof query.query === 'string') { + return true; + } + + return false; +}; + +Query.prototype.getNextQuery = function (job) { + if (job.query.query[this.index].status === jobStatus.PENDING) { + return job.query.query[this.index].query; + } +}; + +Query.prototype.setStatus = function (status, job, errorMesssage) { + var isValid = false; + + isValid = this.isValidTransition(job.query.query[this.index].status, status); + + if (isValid) { + job.query.query[this.index].status = status; + if (status === jobStatus.FAILED && errorMesssage) { + job.query.query[this.index].failed_reason = errorMesssage; + } + } + + return isValid; +}; + +Query.prototype.getStatus = function (job) { + return job.query.query[this.index].status; +}; diff --git a/batch/models/query/query_base.js b/batch/models/query/query_base.js new file mode 100644 index 00000000..737e1bf5 --- /dev/null +++ b/batch/models/query/query_base.js @@ -0,0 +1,31 @@ +'use strict'; + +var util = require('util'); +var JobStateMachine = require('../job_state_machine'); + +function QueryBase(index) { + JobStateMachine.call(this); + + this.index = index; +} +util.inherits(QueryBase, JobStateMachine); + +module.exports = QueryBase; + +// should be implemented +QueryBase.prototype.setStatus = function () { + throw new Error('Unimplemented method'); +}; + +// should be implemented +QueryBase.prototype.getNextQuery = function () { + throw new Error('Unimplemented method'); +}; + +QueryBase.prototype.hasNextQuery = function (job) { + return !!this.getNextQuery(job); +}; + +QueryBase.prototype.getStatus = function () { + throw new Error('Unimplemented method'); +}; diff --git a/batch/models/query/query_factory.js b/batch/models/query/query_factory.js new file mode 100644 index 00000000..c33534e0 --- /dev/null +++ b/batch/models/query/query_factory.js @@ -0,0 +1,16 @@ +'use strict'; + +var QueryFallback = require('./query_fallback'); + +function QueryFactory() { +} + +module.exports = QueryFactory; + +QueryFactory.create = function (job, index) { + if (QueryFallback.is(job.query.query[index])) { + return new QueryFallback(job, index); + } + + throw new Error('there is no query class for the provided query'); +}; diff --git a/batch/models/query/query_fallback.js b/batch/models/query/query_fallback.js new file mode 100644 index 00000000..cb0579a3 --- /dev/null +++ b/batch/models/query/query_fallback.js @@ -0,0 +1,75 @@ +'use strict'; + +var util = require('util'); +var QueryBase = require('./query_base'); +var Query = require('./query'); +var Fallback = require('./fallback'); +var jobStatus = require('../../job_status'); + +function QueryFallback(job, index) { + QueryBase.call(this, index); + + this.init(job, index); +} + +util.inherits(QueryFallback, QueryBase); + +QueryFallback.is = function (query) { + if (Query.is(query)) { + return true; + } + return false; +}; + +QueryFallback.prototype.init = function (job, index) { + this.query = new Query(index); + + if (Fallback.is(job.query.query[index])) { + this.fallback = new Fallback(index); + } +}; + +QueryFallback.prototype.getNextQuery = function (job) { + if (this.query.hasNextQuery(job)) { + return this.query.getNextQuery(job); + } + + if (this.fallback && this.fallback.hasNextQuery(job)) { + return this.fallback.getNextQuery(job); + } +}; + +QueryFallback.prototype.setStatus = function (status, job, previous, errorMesssage) { + // jshint maxcomplexity: 9 + var isValid = false; + var appliedToFallback = false; + + if (previous.isValid && !previous.appliedToFallback) { + if (status === jobStatus.FAILED || status === jobStatus.CANCELLED) { + this.query.setStatus(jobStatus.SKIPPED, job, errorMesssage); + + if (this.fallback) { + this.fallback.setStatus(jobStatus.SKIPPED, job); + } + } + } else if (!previous.isValid) { + isValid = this.query.setStatus(status, job, errorMesssage); + + if (this.fallback) { + if (!isValid) { + isValid = this.fallback.setStatus(status, job, errorMesssage); + appliedToFallback = true; + } else if (isValid && this.isFinalStatus(status) && !this.fallback.hasNextQuery(job)) { + this.fallback.setStatus(jobStatus.SKIPPED, job); + } + } + } + + return { isValid: isValid, appliedToFallback: appliedToFallback }; +}; + +QueryFallback.prototype.getStatus = function (job) { + return this.query.getStatus(job); +}; + +module.exports = QueryFallback; diff --git a/test/acceptance/job.fallback.test.js b/test/acceptance/job.fallback.test.js index 9a13491e..39fd0128 100644 --- a/test/acceptance/job.fallback.test.js +++ b/test/acceptance/job.fallback.test.js @@ -31,7 +31,6 @@ describe('Batch API fallback job', function () { describe('"onsuccess" on first query should be triggered', function () { var fallbackJob = {}; - it('should create a job', function (done) { assert.response(app, { url: '/api/v2/sql/job?api_key=1234', @@ -133,7 +132,7 @@ describe('Batch API fallback job', function () { "query": "SELECT * FROM untitle_table_4", "onerror": "SELECT * FROM untitle_table_4 limit 1", "status": "done", - "fallback_status": "pending" + "fallback_status": "skipped" }] }; var interval = setInterval(function () { @@ -268,7 +267,7 @@ describe('Batch API fallback job', function () { query: 'SELECT * FROM nonexistent_table /* query should fail */', onsuccess: 'SELECT * FROM untitle_table_4 limit 1', status: 'failed', - fallback_status: 'pending', + fallback_status: 'skipped', failed_reason: 'relation "nonexistent_table" does not exist' }] }; @@ -424,7 +423,7 @@ describe('Batch API fallback job', function () { return done(err); } var job = JSON.parse(res.body); - if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.PENDING) { + if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.SKIPPED) { clearInterval(interval); assert.deepEqual(job.query, expectedQuery); done(); @@ -560,7 +559,7 @@ describe('Batch API fallback job', function () { return done(err); } var job = JSON.parse(res.body); - if (job.status === jobStatus.DONE && job.fallback_status === jobStatus.PENDING) { + if (job.status === jobStatus.DONE && job.fallback_status === jobStatus.SKIPPED) { clearInterval(interval); assert.deepEqual(job.query, expectedQuery); done(); @@ -759,13 +758,13 @@ describe('Batch API fallback job', function () { "query": "SELECT * FROM nonexistent_table /* should fail */", "onsuccess": "SELECT * FROM untitle_table_4 limit 1", "status": "failed", - "fallback_status": "pending", + "fallback_status": "skipped", "failed_reason": 'relation "nonexistent_table" does not exist' }, { "query": "SELECT * FROM untitle_table_4 limit 2", "onsuccess": "SELECT * FROM untitle_table_4 limit 3", - "status": "pending", - "fallback_status": "pending" + "status": "skipped", + "fallback_status": "skipped" }] }; @@ -842,7 +841,7 @@ describe('Batch API fallback job', function () { "query": "SELECT * FROM nonexistent_table /* should fail */", "onsuccess": "SELECT * FROM untitle_table_4 limit 3", "status": "failed", - "fallback_status": "pending", + "fallback_status": "skipped", "failed_reason": 'relation "nonexistent_table" does not exist' }] }; @@ -875,7 +874,7 @@ describe('Batch API fallback job', function () { }); }); - describe('"onerror" should not be triggered for any query', function () { + describe('"onerror" should not be triggered for any query and "skipped"', function () { var fallbackJob = {}; it('should create a job', function (done) { @@ -914,12 +913,12 @@ describe('Batch API fallback job', function () { query: 'SELECT * FROM untitle_table_4 limit 1', onerror: 'SELECT * FROM untitle_table_4 limit 2', status: 'done', - fallback_status: 'pending' + fallback_status: 'skipped' }, { query: 'SELECT * FROM untitle_table_4 limit 3', onerror: 'SELECT * FROM untitle_table_4 limit 4', status: 'done', - fallback_status: 'pending' + fallback_status: 'skipped' }] }; @@ -943,6 +942,144 @@ describe('Batch API fallback job', function () { assert.deepEqual(job.query, expectedQuery); done(); } else if (job.status === jobStatus.FAILED || job.status === jobStatus.CANCELLED) { + clearInterval(interval); + done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be done')); + } + }); + }, 50); + }); + }); + + describe('"onsuccess" should be "skipped"', function () { + var fallbackJob = {}; + + it('should create a job', function (done) { + assert.response(app, { + url: '/api/v2/sql/job?api_key=1234', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'POST', + data: querystring.stringify({ + query: { + query: [{ + query: "SELECT * FROM untitle_table_4 limit 1, /* should fail */", + onsuccess: "SELECT * FROM untitle_table_4 limit 2" + }] + } + }) + }, { + status: 201 + }, function (res, err) { + if (err) { + return done(err); + } + fallbackJob = JSON.parse(res.body); + done(); + }); + }); + + it('job should be failed', function (done) { + var expectedQuery = { + query: [{ + query: 'SELECT * FROM untitle_table_4 limit 1, /* should fail */', + onsuccess: 'SELECT * FROM untitle_table_4 limit 2', + status: 'failed', + fallback_status: 'skipped', + failed_reason: 'syntax error at end of input' + }] + }; + + var interval = setInterval(function () { + assert.response(app, { + url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'GET' + }, { + status: 200 + }, function (res, err) { + if (err) { + return done(err); + } + var job = JSON.parse(res.body); + if (job.status === jobStatus.FAILED) { + clearInterval(interval); + assert.deepEqual(job.query, expectedQuery); + done(); + } else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) { + clearInterval(interval); + done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be failed')); + } + }); + }, 50); + }); + }); + + + describe('"onsuccess" should not be triggered and "skipped"', function () { + var fallbackJob = {}; + + it('should create a job', function (done) { + assert.response(app, { + url: '/api/v2/sql/job?api_key=1234', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'POST', + data: querystring.stringify({ + query: { + query: [{ + query: "SELECT * FROM untitle_table_4 limit 1, /* should fail */", + }], + onsuccess: "SELECT * FROM untitle_table_4 limit 2" + } + }) + }, { + status: 201 + }, function (res, err) { + if (err) { + return done(err); + } + fallbackJob = JSON.parse(res.body); + done(); + }); + }); + + it('job should be failed', function (done) { + var expectedQuery = { + query: [{ + query: 'SELECT * FROM untitle_table_4 limit 1, /* should fail */', + status: 'failed', + failed_reason: 'syntax error at end of input' + }], + onsuccess: 'SELECT * FROM untitle_table_4 limit 2' + }; + + var interval = setInterval(function () { + assert.response(app, { + url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'GET' + }, { + status: 200 + }, function (res, err) { + if (err) { + return done(err); + } + var job = JSON.parse(res.body); + if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.SKIPPED) { + clearInterval(interval); + assert.deepEqual(job.query, expectedQuery); + done(); + } else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) { clearInterval(interval); done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be failed')); } @@ -1329,7 +1466,7 @@ describe('Batch API fallback job', function () { job.status === jobStatus.FAILED || job.status === jobStatus.CANCELLED) { clearInterval(interval); - done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be done')); + done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be running')); } }); }, 50); @@ -1341,7 +1478,7 @@ describe('Batch API fallback job', function () { "query": "SELECT pg_sleep(3)", "onsuccess": "SELECT pg_sleep(0)", "status": "cancelled", - "fallback_status": "pending" + "fallback_status": "skipped" }], "onsuccess": "SELECT pg_sleep(0)" }; @@ -1360,7 +1497,7 @@ describe('Batch API fallback job', function () { return done(err); } var job = JSON.parse(res.body); - if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.PENDING) { + if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.SKIPPED) { assert.deepEqual(job.query, expectedQuery); done(); } else if (job.status === jobStatus.DONE || job.status === jobStatus.FAILED) { @@ -1469,7 +1606,7 @@ describe('Batch API fallback job', function () { return done(err); } var job = JSON.parse(res.body); - if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.PENDING) { + if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.SKIPPED) { assert.deepEqual(job.query, expectedQuery); done(); } else if (job.status === jobStatus.DONE || job.status === jobStatus.FAILED) { @@ -1478,4 +1615,166 @@ describe('Batch API fallback job', function () { }); }); }); + + describe('should run first "onerror" and job "onerror" and skip the other ones', function () { + var fallbackJob = {}; + + it('should create a job', function (done) { + assert.response(app, { + url: '/api/v2/sql/job?api_key=1234', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'POST', + data: querystring.stringify({ + "query": { + "query": [{ + "query": "SELECT * FROM untitle_table_4 limit 1, should fail", + "onerror": "SELECT * FROM untitle_table_4 limit 2" + }, { + "query": "SELECT * FROM untitle_table_4 limit 3", + "onerror": "SELECT * FROM untitle_table_4 limit 4" + }], + "onerror": "SELECT * FROM untitle_table_4 limit 5" + } + }) + }, { + status: 201 + }, function (res, err) { + if (err) { + return done(err); + } + fallbackJob = JSON.parse(res.body); + done(); + }); + }); + + it('job should fail', function (done) { + var expectedQuery = { + "query": [ + { + "query": "SELECT * FROM untitle_table_4 limit 1, should fail", + "onerror": "SELECT * FROM untitle_table_4 limit 2", + "status": "failed", + "fallback_status": "done", + "failed_reason": "LIMIT #,# syntax is not supported" + }, + { + "query": "SELECT * FROM untitle_table_4 limit 3", + "onerror": "SELECT * FROM untitle_table_4 limit 4", + "status": "skipped", + "fallback_status": "skipped" + } + ], + "onerror": "SELECT * FROM untitle_table_4 limit 5" + }; + + var interval = setInterval(function () { + assert.response(app, { + url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'GET' + }, { + status: 200 + }, function (res, err) { + if (err) { + return done(err); + } + var job = JSON.parse(res.body); + if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.DONE) { + clearInterval(interval); + assert.deepEqual(job.query, expectedQuery); + done(); + } else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) { + clearInterval(interval); + done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be failed')); + } + }); + }, 50); + }); + }); + + + describe('should fail first "onerror" and job "onerror" and skip the other ones', function () { + var fallbackJob = {}; + + it('should create a job', function (done) { + assert.response(app, { + url: '/api/v2/sql/job?api_key=1234', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'POST', + data: querystring.stringify({ + "query": { + "query": [{ + "query": "SELECT * FROM atm_madrid limit 1, should fail", + "onerror": "SELECT * FROM atm_madrid limit 2" + }, { + "query": "SELECT * FROM atm_madrid limit 3", + "onerror": "SELECT * FROM atm_madrid limit 4" + }], + "onerror": "SELECT * FROM atm_madrid limit 5" + } + }) + }, { + status: 201 + }, function (res, err) { + if (err) { + return done(err); + } + fallbackJob = JSON.parse(res.body); + done(); + }); + }); + + it('job should fail', function (done) { + var expectedQuery = { + query: [{ + query: 'SELECT * FROM atm_madrid limit 1, should fail', + onerror: 'SELECT * FROM atm_madrid limit 2', + status: 'failed', + fallback_status: 'failed', + failed_reason: 'relation "atm_madrid" does not exist' + }, { + query: 'SELECT * FROM atm_madrid limit 3', + onerror: 'SELECT * FROM atm_madrid limit 4', + status: 'skipped', + fallback_status: 'skipped' + }], + onerror: 'SELECT * FROM atm_madrid limit 5' + }; + + var interval = setInterval(function () { + assert.response(app, { + url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + 'host': 'vizzuality.cartodb.com' + }, + method: 'GET' + }, { + status: 200 + }, function (res, err) { + if (err) { + return done(err); + } + var job = JSON.parse(res.body); + if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.FAILED) { + clearInterval(interval); + assert.deepEqual(job.query, expectedQuery); + done(); + } else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) { + clearInterval(interval); + done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be failed')); + } + }); + }, 50); + }); + }); });