Fixes #309, added skipped status to fallback-jobs

This commit is contained in:
Daniel García Aubert 2016-05-26 17:37:37 +02:00
parent d9f5ac67f5
commit 1a0e2b681b
4 changed files with 388 additions and 16 deletions

View File

@ -6,6 +6,7 @@ var JOB_STATUS_ENUM = {
DONE: 'done', DONE: 'done',
CANCELLED: 'cancelled', CANCELLED: 'cancelled',
FAILED: 'failed', FAILED: 'failed',
SKIPPED: 'skipped',
UNKNOWN: 'unknown' UNKNOWN: 'unknown'
}; };

View File

@ -7,6 +7,7 @@ var validStatusTransitions = [
[jobStatus.PENDING, jobStatus.RUNNING], [jobStatus.PENDING, jobStatus.RUNNING],
[jobStatus.PENDING, jobStatus.CANCELLED], [jobStatus.PENDING, jobStatus.CANCELLED],
[jobStatus.PENDING, jobStatus.UNKNOWN], [jobStatus.PENDING, jobStatus.UNKNOWN],
[jobStatus.PENDING, jobStatus.SKIPPED],
[jobStatus.RUNNING, jobStatus.DONE], [jobStatus.RUNNING, jobStatus.DONE],
[jobStatus.RUNNING, jobStatus.FAILED], [jobStatus.RUNNING, jobStatus.FAILED],
[jobStatus.RUNNING, jobStatus.CANCELLED], [jobStatus.RUNNING, jobStatus.CANCELLED],

View File

@ -185,9 +185,79 @@ JobFallback.prototype.setStatus = function (status, errorMesssage) {
throw new Error('Cannot set status from ' + this.data.status + ' to ' + status); throw new Error('Cannot set status from ' + this.data.status + ' to ' + status);
} }
if (!resultFromQuery.isChangeAppliedToQueryFallback || status === jobStatus.CANCELLED) {
this._setSkipped(status);
}
this.data.updated_at = now; this.data.updated_at = now;
}; };
JobFallback.prototype._setSkipped = function (status) {
this._setSkippedQueryStatus();
this._setSkippedJobStatus();
if (status === jobStatus.CANCELLED || status === jobStatus.FAILED) {
this._setRestPendingToSkipped(status);
}
};
JobFallback.prototype._setSkippedQueryStatus = function () {
// jshint maxcomplexity: 8
for (var i = 0; i < this.data.query.query.length; i++) {
if (this.data.query.query[i].status === jobStatus.FAILED && this.data.query.query[i].onsuccess) {
if (this.isValidStatusTransition(this.data.query.query[i].fallback_status, jobStatus.SKIPPED)) {
this.data.query.query[i].fallback_status = jobStatus.SKIPPED;
}
}
if (this.data.query.query[i].status === jobStatus.DONE && this.data.query.query[i].onerror) {
if (this.isValidStatusTransition(this.data.query.query[i].fallback_status, jobStatus.SKIPPED)) {
this.data.query.query[i].fallback_status = jobStatus.SKIPPED;
}
}
if (this.data.query.query[i].status === jobStatus.CANCELLED && this.data.query.query[i].fallback_status) {
if (this.isValidStatusTransition(this.data.query.query[i].fallback_status, jobStatus.SKIPPED)) {
this.data.query.query[i].fallback_status = jobStatus.SKIPPED;
}
}
}
};
JobFallback.prototype._setSkippedJobStatus = function () {
// jshint maxcomplexity: 7
if (this.data.status === jobStatus.FAILED && this.data.query.onsuccess) {
if (this.isValidStatusTransition(this.data.fallback_status, jobStatus.SKIPPED)) {
this.data.fallback_status = jobStatus.SKIPPED;
}
}
if (this.data.status === jobStatus.DONE && this.data.query.onerror) {
if (this.isValidStatusTransition(this.data.fallback_status, jobStatus.SKIPPED)) {
this.data.fallback_status = jobStatus.SKIPPED;
}
}
if (this.data.status === jobStatus.CANCELLED && this.data.fallback_status) {
if (this.isValidStatusTransition(this.data.fallback_status, jobStatus.SKIPPED)) {
this.data.fallback_status = jobStatus.SKIPPED;
}
}
};
JobFallback.prototype._setRestPendingToSkipped = function (status) {
for (var i = 0; i < this.data.query.query.length; i++) {
if (this.data.query.query[i].status === jobStatus.PENDING) {
this.data.query.query[i].status = jobStatus.SKIPPED;
}
if (this.data.query.query[i].status !== status &&
this.data.query.query[i].fallback_status === jobStatus.PENDING) {
this.data.query.query[i].fallback_status = jobStatus.SKIPPED;
}
}
};
JobFallback.prototype._getLastStatusFromFinishedQuery = function () { JobFallback.prototype._getLastStatusFromFinishedQuery = function () {
var lastStatus = jobStatus.DONE; var lastStatus = jobStatus.DONE;
@ -263,7 +333,7 @@ JobFallback.prototype._shouldTryToApplyStatusTransitionToQueryFallback = functio
}; };
JobFallback.prototype._setQueryStatus = function (status, errorMesssage) { JobFallback.prototype._setQueryStatus = function (status, errorMesssage) {
// jshint maxcomplexity: 7 // jshint maxcomplexity: 8
var isValid = false; var isValid = false;
var isChangeAppliedToQueryFallback = false; var isChangeAppliedToQueryFallback = false;

View File

@ -133,7 +133,7 @@ describe('Batch API fallback job', function () {
"query": "SELECT * FROM untitle_table_4", "query": "SELECT * FROM untitle_table_4",
"onerror": "SELECT * FROM untitle_table_4 limit 1", "onerror": "SELECT * FROM untitle_table_4 limit 1",
"status": "done", "status": "done",
"fallback_status": "pending" "fallback_status": "skipped"
}] }]
}; };
var interval = setInterval(function () { var interval = setInterval(function () {
@ -268,7 +268,7 @@ describe('Batch API fallback job', function () {
query: 'SELECT * FROM nonexistent_table /* query should fail */', query: 'SELECT * FROM nonexistent_table /* query should fail */',
onsuccess: 'SELECT * FROM untitle_table_4 limit 1', onsuccess: 'SELECT * FROM untitle_table_4 limit 1',
status: 'failed', status: 'failed',
fallback_status: 'pending', fallback_status: 'skipped',
failed_reason: 'relation "nonexistent_table" does not exist' failed_reason: 'relation "nonexistent_table" does not exist'
}] }]
}; };
@ -424,7 +424,7 @@ describe('Batch API fallback job', function () {
return done(err); return done(err);
} }
var job = JSON.parse(res.body); var job = JSON.parse(res.body);
if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.PENDING) { if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.SKIPPED) {
clearInterval(interval); clearInterval(interval);
assert.deepEqual(job.query, expectedQuery); assert.deepEqual(job.query, expectedQuery);
done(); done();
@ -560,7 +560,7 @@ describe('Batch API fallback job', function () {
return done(err); return done(err);
} }
var job = JSON.parse(res.body); var job = JSON.parse(res.body);
if (job.status === jobStatus.DONE && job.fallback_status === jobStatus.PENDING) { if (job.status === jobStatus.DONE && job.fallback_status === jobStatus.SKIPPED) {
clearInterval(interval); clearInterval(interval);
assert.deepEqual(job.query, expectedQuery); assert.deepEqual(job.query, expectedQuery);
done(); done();
@ -759,13 +759,13 @@ describe('Batch API fallback job', function () {
"query": "SELECT * FROM nonexistent_table /* should fail */", "query": "SELECT * FROM nonexistent_table /* should fail */",
"onsuccess": "SELECT * FROM untitle_table_4 limit 1", "onsuccess": "SELECT * FROM untitle_table_4 limit 1",
"status": "failed", "status": "failed",
"fallback_status": "pending", "fallback_status": "skipped",
"failed_reason": 'relation "nonexistent_table" does not exist' "failed_reason": 'relation "nonexistent_table" does not exist'
}, { }, {
"query": "SELECT * FROM untitle_table_4 limit 2", "query": "SELECT * FROM untitle_table_4 limit 2",
"onsuccess": "SELECT * FROM untitle_table_4 limit 3", "onsuccess": "SELECT * FROM untitle_table_4 limit 3",
"status": "pending", "status": "skipped",
"fallback_status": "pending" "fallback_status": "skipped"
}] }]
}; };
@ -842,7 +842,7 @@ describe('Batch API fallback job', function () {
"query": "SELECT * FROM nonexistent_table /* should fail */", "query": "SELECT * FROM nonexistent_table /* should fail */",
"onsuccess": "SELECT * FROM untitle_table_4 limit 3", "onsuccess": "SELECT * FROM untitle_table_4 limit 3",
"status": "failed", "status": "failed",
"fallback_status": "pending", "fallback_status": "skipped",
"failed_reason": 'relation "nonexistent_table" does not exist' "failed_reason": 'relation "nonexistent_table" does not exist'
}] }]
}; };
@ -875,7 +875,7 @@ describe('Batch API fallback job', function () {
}); });
}); });
describe('"onerror" should not be triggered for any query', function () { describe('"onerror" should not be triggered for any query and "skipped"', function () {
var fallbackJob = {}; var fallbackJob = {};
it('should create a job', function (done) { it('should create a job', function (done) {
@ -914,12 +914,12 @@ describe('Batch API fallback job', function () {
query: 'SELECT * FROM untitle_table_4 limit 1', query: 'SELECT * FROM untitle_table_4 limit 1',
onerror: 'SELECT * FROM untitle_table_4 limit 2', onerror: 'SELECT * FROM untitle_table_4 limit 2',
status: 'done', status: 'done',
fallback_status: 'pending' fallback_status: 'skipped'
}, { }, {
query: 'SELECT * FROM untitle_table_4 limit 3', query: 'SELECT * FROM untitle_table_4 limit 3',
onerror: 'SELECT * FROM untitle_table_4 limit 4', onerror: 'SELECT * FROM untitle_table_4 limit 4',
status: 'done', status: 'done',
fallback_status: 'pending' fallback_status: 'skipped'
}] }]
}; };
@ -943,6 +943,144 @@ describe('Batch API fallback job', function () {
assert.deepEqual(job.query, expectedQuery); assert.deepEqual(job.query, expectedQuery);
done(); done();
} else if (job.status === jobStatus.FAILED || job.status === jobStatus.CANCELLED) { } else if (job.status === jobStatus.FAILED || job.status === jobStatus.CANCELLED) {
clearInterval(interval);
done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be done'));
}
});
}, 50);
});
});
describe('"onsuccess" should not be triggered for any query and "skipped"', function () {
var fallbackJob = {};
it('should create a job', function (done) {
assert.response(app, {
url: '/api/v2/sql/job?api_key=1234',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'POST',
data: querystring.stringify({
query: {
query: [{
query: "SELECT * FROM untitle_table_4 limit 1, /* should fail */",
onsuccess: "SELECT * FROM untitle_table_4 limit 2"
}]
}
})
}, {
status: 201
}, function (res, err) {
if (err) {
return done(err);
}
fallbackJob = JSON.parse(res.body);
done();
});
});
it('job should be failed', function (done) {
var expectedQuery = {
query: [{
query: 'SELECT * FROM untitle_table_4 limit 1, /* should fail */',
onsuccess: 'SELECT * FROM untitle_table_4 limit 2',
status: 'failed',
fallback_status: 'skipped',
failed_reason: 'syntax error at end of input'
}]
};
var interval = setInterval(function () {
assert.response(app, {
url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'GET'
}, {
status: 200
}, function (res, err) {
if (err) {
return done(err);
}
var job = JSON.parse(res.body);
if (job.status === jobStatus.FAILED) {
clearInterval(interval);
assert.deepEqual(job.query, expectedQuery);
done();
} else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) {
clearInterval(interval);
done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be failed'));
}
});
}, 50);
});
});
describe('"onsuccess" should not be triggered and "skipped"', function () {
var fallbackJob = {};
it('should create a job', function (done) {
assert.response(app, {
url: '/api/v2/sql/job?api_key=1234',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'POST',
data: querystring.stringify({
query: {
query: [{
query: "SELECT * FROM untitle_table_4 limit 1, /* should fail */",
}],
onsuccess: "SELECT * FROM untitle_table_4 limit 2"
}
})
}, {
status: 201
}, function (res, err) {
if (err) {
return done(err);
}
fallbackJob = JSON.parse(res.body);
done();
});
});
it('job should be failed', function (done) {
var expectedQuery = {
query: [{
query: 'SELECT * FROM untitle_table_4 limit 1, /* should fail */',
status: 'failed',
failed_reason: 'syntax error at end of input'
}],
onsuccess: 'SELECT * FROM untitle_table_4 limit 2'
};
var interval = setInterval(function () {
assert.response(app, {
url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'GET'
}, {
status: 200
}, function (res, err) {
if (err) {
return done(err);
}
var job = JSON.parse(res.body);
if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.SKIPPED) {
clearInterval(interval);
assert.deepEqual(job.query, expectedQuery);
done();
} else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) {
clearInterval(interval); clearInterval(interval);
done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be failed')); done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be failed'));
} }
@ -1329,7 +1467,7 @@ describe('Batch API fallback job', function () {
job.status === jobStatus.FAILED || job.status === jobStatus.FAILED ||
job.status === jobStatus.CANCELLED) { job.status === jobStatus.CANCELLED) {
clearInterval(interval); clearInterval(interval);
done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be done')); done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be running'));
} }
}); });
}, 50); }, 50);
@ -1341,7 +1479,7 @@ describe('Batch API fallback job', function () {
"query": "SELECT pg_sleep(3)", "query": "SELECT pg_sleep(3)",
"onsuccess": "SELECT pg_sleep(0)", "onsuccess": "SELECT pg_sleep(0)",
"status": "cancelled", "status": "cancelled",
"fallback_status": "pending" "fallback_status": "skipped"
}], }],
"onsuccess": "SELECT pg_sleep(0)" "onsuccess": "SELECT pg_sleep(0)"
}; };
@ -1360,7 +1498,7 @@ describe('Batch API fallback job', function () {
return done(err); return done(err);
} }
var job = JSON.parse(res.body); var job = JSON.parse(res.body);
if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.PENDING) { if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.SKIPPED) {
assert.deepEqual(job.query, expectedQuery); assert.deepEqual(job.query, expectedQuery);
done(); done();
} else if (job.status === jobStatus.DONE || job.status === jobStatus.FAILED) { } else if (job.status === jobStatus.DONE || job.status === jobStatus.FAILED) {
@ -1469,7 +1607,7 @@ describe('Batch API fallback job', function () {
return done(err); return done(err);
} }
var job = JSON.parse(res.body); var job = JSON.parse(res.body);
if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.PENDING) { if (job.status === jobStatus.CANCELLED && job.fallback_status === jobStatus.SKIPPED) {
assert.deepEqual(job.query, expectedQuery); assert.deepEqual(job.query, expectedQuery);
done(); done();
} else if (job.status === jobStatus.DONE || job.status === jobStatus.FAILED) { } else if (job.status === jobStatus.DONE || job.status === jobStatus.FAILED) {
@ -1478,4 +1616,166 @@ describe('Batch API fallback job', function () {
}); });
}); });
}); });
describe('should run first "onerror" and job "onerror" and skip the other ones', function () {
var fallbackJob = {};
it('should create a job', function (done) {
assert.response(app, {
url: '/api/v2/sql/job?api_key=1234',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'POST',
data: querystring.stringify({
"query": {
"query": [{
"query": "SELECT * FROM untitle_table_4 limit 1, should fail",
"onerror": "SELECT * FROM untitle_table_4 limit 2"
}, {
"query": "SELECT * FROM untitle_table_4 limit 3",
"onerror": "SELECT * FROM untitle_table_4 limit 4"
}],
"onerror": "SELECT * FROM untitle_table_4 limit 5"
}
})
}, {
status: 201
}, function (res, err) {
if (err) {
return done(err);
}
fallbackJob = JSON.parse(res.body);
done();
});
});
it('job should fail', function (done) {
var expectedQuery = {
"query": [
{
"query": "SELECT * FROM untitle_table_4 limit 1, should fail",
"onerror": "SELECT * FROM untitle_table_4 limit 2",
"status": "failed",
"fallback_status": "done",
"failed_reason": "LIMIT #,# syntax is not supported"
},
{
"query": "SELECT * FROM untitle_table_4 limit 3",
"onerror": "SELECT * FROM untitle_table_4 limit 4",
"status": "skipped",
"fallback_status": "skipped"
}
],
"onerror": "SELECT * FROM untitle_table_4 limit 5"
};
var interval = setInterval(function () {
assert.response(app, {
url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'GET'
}, {
status: 200
}, function (res, err) {
if (err) {
return done(err);
}
var job = JSON.parse(res.body);
if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.DONE) {
clearInterval(interval);
assert.deepEqual(job.query, expectedQuery);
done();
} else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) {
clearInterval(interval);
done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be done'));
}
});
}, 50);
});
});
describe('should fail first "onerror" and job "onerror" and skip the other ones', function () {
var fallbackJob = {};
it('should create a job', function (done) {
assert.response(app, {
url: '/api/v2/sql/job?api_key=1234',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'POST',
data: querystring.stringify({
"query": {
"query": [{
"query": "SELECT * FROM atm_madrid limit 1, should fail",
"onerror": "SELECT * FROM atm_madrid limit 2"
}, {
"query": "SELECT * FROM atm_madrid limit 3",
"onerror": "SELECT * FROM atm_madrid limit 4"
}],
"onerror": "SELECT * FROM atm_madrid limit 5"
}
})
}, {
status: 201
}, function (res, err) {
if (err) {
return done(err);
}
fallbackJob = JSON.parse(res.body);
done();
});
});
it('job should fail', function (done) {
var expectedQuery = {
query: [{
query: 'SELECT * FROM atm_madrid limit 1, should fail',
onerror: 'SELECT * FROM atm_madrid limit 2',
status: 'failed',
fallback_status: 'failed',
failed_reason: 'relation "atm_madrid" does not exist'
}, {
query: 'SELECT * FROM atm_madrid limit 3',
onerror: 'SELECT * FROM atm_madrid limit 4',
status: 'skipped',
fallback_status: 'skipped'
}],
onerror: 'SELECT * FROM atm_madrid limit 5'
};
var interval = setInterval(function () {
assert.response(app, {
url: '/api/v2/sql/job/' + fallbackJob.job_id + '?api_key=1234&',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'host': 'vizzuality.cartodb.com'
},
method: 'GET'
}, {
status: 200
}, function (res, err) {
if (err) {
return done(err);
}
var job = JSON.parse(res.body);
if (job.status === jobStatus.FAILED && job.fallback_status === jobStatus.FAILED) {
clearInterval(interval);
assert.deepEqual(job.query, expectedQuery);
done();
} else if (job.status === jobStatus.DONE || job.status === jobStatus.CANCELLED) {
clearInterval(interval);
done(new Error('Job ' + job.job_id + ' is ' + job.status + ', expected to be done'));
}
});
}, 50);
});
});
}); });