Merge branch 'master' into pgcopy-stream-refactor2-dgaubert
This commit is contained in:
commit
6955d254f8
6
NEWS.md
6
NEWS.md
@ -1,8 +1,12 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
## 2.1.0
|
## 2.1.1
|
||||||
Released 2018-mm-dd
|
Released 2018-mm-dd
|
||||||
|
|
||||||
|
|
||||||
|
## 2.1.0
|
||||||
|
Released 2018-06-13
|
||||||
|
|
||||||
New features:
|
New features:
|
||||||
* CI tests with Ubuntu Xenial + PostgreSQL 10.1 and Ubuntu Precise + PostgreSQL 9.5
|
* CI tests with Ubuntu Xenial + PostgreSQL 10.1 and Ubuntu Precise + PostgreSQL 9.5
|
||||||
* Making version 2.0.0 configuration parameters backwards compatible
|
* Making version 2.0.0 configuration parameters backwards compatible
|
||||||
|
4
app.js
4
app.js
@ -115,6 +115,10 @@ process.on('SIGHUP', function() {
|
|||||||
if (server.batch && server.batch.logger) {
|
if (server.batch && server.batch.logger) {
|
||||||
server.batch.logger.reopenFileStreams();
|
server.batch.logger.reopenFileStreams();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (server.dataIngestionLogger) {
|
||||||
|
server.dataIngestionLogger.reopenFileStreams();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
process.on('SIGTERM', function () {
|
process.on('SIGTERM', function () {
|
||||||
|
@ -15,13 +15,11 @@ const Logger = require('../services/logger');
|
|||||||
const { Client } = require('pg');
|
const { Client } = require('pg');
|
||||||
const zlib = require('zlib');
|
const zlib = require('zlib');
|
||||||
|
|
||||||
function CopyController(metadataBackend, userDatabaseService, userLimitsService, statsClient) {
|
function CopyController(metadataBackend, userDatabaseService, userLimitsService, logger) {
|
||||||
this.metadataBackend = metadataBackend;
|
this.metadataBackend = metadataBackend;
|
||||||
this.userDatabaseService = userDatabaseService;
|
this.userDatabaseService = userDatabaseService;
|
||||||
this.userLimitsService = userLimitsService;
|
this.userLimitsService = userLimitsService;
|
||||||
this.statsClient = statsClient;
|
this.logger = logger;
|
||||||
|
|
||||||
this.logger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CopyController.prototype.route = function (app) {
|
CopyController.prototype.route = function (app) {
|
||||||
|
@ -28,6 +28,7 @@ var JobQueue = require('../batch/job_queue');
|
|||||||
var JobBackend = require('../batch/job_backend');
|
var JobBackend = require('../batch/job_backend');
|
||||||
var JobCanceller = require('../batch/job_canceller');
|
var JobCanceller = require('../batch/job_canceller');
|
||||||
var JobService = require('../batch/job_service');
|
var JobService = require('../batch/job_service');
|
||||||
|
const Logger = require('./services/logger');
|
||||||
|
|
||||||
var cors = require('./middlewares/cors');
|
var cors = require('./middlewares/cors');
|
||||||
|
|
||||||
@ -154,6 +155,9 @@ function App(statsClient) {
|
|||||||
};
|
};
|
||||||
const userLimitsService = new UserLimitsService(metadataBackend, userLimitsServiceOptions);
|
const userLimitsService = new UserLimitsService(metadataBackend, userLimitsServiceOptions);
|
||||||
|
|
||||||
|
const dataIngestionLogger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion');
|
||||||
|
app.dataIngestionLogger = dataIngestionLogger;
|
||||||
|
|
||||||
var jobPublisher = new JobPublisher(redisPool);
|
var jobPublisher = new JobPublisher(redisPool);
|
||||||
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
var jobQueue = new JobQueue(metadataBackend, jobPublisher);
|
||||||
var jobBackend = new JobBackend(metadataBackend, jobQueue);
|
var jobBackend = new JobBackend(metadataBackend, jobQueue);
|
||||||
@ -175,7 +179,8 @@ function App(statsClient) {
|
|||||||
var copyController = new CopyController(
|
var copyController = new CopyController(
|
||||||
metadataBackend,
|
metadataBackend,
|
||||||
userDatabaseService,
|
userDatabaseService,
|
||||||
userLimitsService
|
userLimitsService,
|
||||||
|
dataIngestionLogger
|
||||||
);
|
);
|
||||||
copyController.route(app);
|
copyController.route(app);
|
||||||
|
|
||||||
|
@ -28,6 +28,11 @@ class Logger {
|
|||||||
warn (log, message) {
|
warn (log, message) {
|
||||||
this.logger.warn(log, message);
|
this.logger.warn(log, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reopenFileStreams () {
|
||||||
|
console.log('Reloading log file', this.path);
|
||||||
|
this.logger.reopenFileStreams();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = Logger;
|
module.exports = Logger;
|
||||||
|
@ -16,6 +16,7 @@ module.exports = class StreamCopyMetrics {
|
|||||||
this.endTime = null;
|
this.endTime = null;
|
||||||
this.time = null;
|
this.time = null;
|
||||||
|
|
||||||
|
this.success = true;
|
||||||
this.error = null;
|
this.error = null;
|
||||||
|
|
||||||
this.ended = false;
|
this.ended = false;
|
||||||
@ -61,7 +62,7 @@ module.exports = class StreamCopyMetrics {
|
|||||||
size: this.size,
|
size: this.size,
|
||||||
rows: this.rows,
|
rows: this.rows,
|
||||||
gzip: this.isGzip,
|
gzip: this.isGzip,
|
||||||
username: this.username,
|
'cdb-user': this.username,
|
||||||
time: this.time,
|
time: this.time,
|
||||||
timestamp
|
timestamp
|
||||||
};
|
};
|
||||||
@ -72,8 +73,11 @@ module.exports = class StreamCopyMetrics {
|
|||||||
|
|
||||||
if (errorMessage) {
|
if (errorMessage) {
|
||||||
logData.error = errorMessage;
|
logData.error = errorMessage;
|
||||||
|
this.success = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logData.success = this.success;
|
||||||
|
|
||||||
this.logger.info(logData);
|
this.logger.info(logData);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -67,7 +67,7 @@ UserDatabaseService.prototype.getConnectionParams = function (username, apikeyTo
|
|||||||
commonDBConfiguration);
|
commonDBConfiguration);
|
||||||
|
|
||||||
if (isOauthAuthorization({ apikeyToken, authorizationLevel})) {
|
if (isOauthAuthorization({ apikeyToken, authorizationLevel})) {
|
||||||
callback(null, masterDBConfiguration, masterDBConfiguration);
|
return callback(null, masterDBConfiguration, masterDBConfiguration);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Default Api key fallback
|
// Default Api key fallback
|
||||||
|
@ -11,9 +11,6 @@ class BatchLogger extends Logger {
|
|||||||
return job.log(this.logger);
|
return job.log(this.logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
reopenFileStreams () {
|
|
||||||
this.logger.reopenFileStreams();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = BatchLogger;
|
module.exports = BatchLogger;
|
||||||
|
@ -5,7 +5,7 @@
|
|||||||
"keywords": [
|
"keywords": [
|
||||||
"cartodb"
|
"cartodb"
|
||||||
],
|
],
|
||||||
"version": "2.1.0",
|
"version": "2.1.1",
|
||||||
"repository": {
|
"repository": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "git://github.com/CartoDB/CartoDB-SQL-API.git"
|
"url": "git://github.com/CartoDB/CartoDB-SQL-API.git"
|
||||||
|
231
test/acceptance/copy-abort.js
Normal file
231
test/acceptance/copy-abort.js
Normal file
@ -0,0 +1,231 @@
|
|||||||
|
const querystring = require('querystring');
|
||||||
|
const StatsClient = require('../../app/stats/client');
|
||||||
|
const statsClient = StatsClient.getInstance(global.settings.statsd);
|
||||||
|
const server = require('../../app/server')(statsClient);
|
||||||
|
const request = require('request');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
const copyQuery = `COPY (
|
||||||
|
INSERT INTO copy_to_test
|
||||||
|
SELECT updated_at
|
||||||
|
FROM generate_series(
|
||||||
|
'1984-06-14 01:00:00'::timestamp,
|
||||||
|
'2018-06-14 01:00:00'::timestamp,
|
||||||
|
'1 hour'::interval
|
||||||
|
) updated_at
|
||||||
|
RETURNING updated_at
|
||||||
|
) TO STDOUT`;
|
||||||
|
|
||||||
|
const createTableQuery = `CREATE TABLE copy_to_test AS
|
||||||
|
(SELECT '2018-06-15 14:49:05.126415+00'::timestamp AS updated_at)`;
|
||||||
|
|
||||||
|
const dropTableQuery = `DROP TABLE copy_to_test`;
|
||||||
|
|
||||||
|
const countQuery = `SELECT count(1) as count FROM copy_to_test`;
|
||||||
|
|
||||||
|
function countInsertedRows (host, port, callback) {
|
||||||
|
setTimeout(function () {
|
||||||
|
const count = querystring.stringify({ q: countQuery, api_key: 1234 });
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
url: `http://${host}:${port}/api/v1/sql?${count}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
request(options, function (err, res, body) {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(res.statusCode, 200);
|
||||||
|
const result = JSON.parse(body);
|
||||||
|
callback(null, result);
|
||||||
|
});
|
||||||
|
}, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('Cancel "copy to" commands', function () {
|
||||||
|
|
||||||
|
beforeEach(function (done) {
|
||||||
|
this.listener = server.listen(0, '127.0.0.1');
|
||||||
|
|
||||||
|
this.listener.on('error', done);
|
||||||
|
|
||||||
|
this.listener.on('listening', () => {
|
||||||
|
const { address, port } = this.listener.address();
|
||||||
|
|
||||||
|
this.host = address;
|
||||||
|
this.port = port;
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
beforeEach(function (done) {
|
||||||
|
const { host, port } = this;
|
||||||
|
|
||||||
|
const createTable = querystring.stringify({ q: createTableQuery, api_key: 1234});
|
||||||
|
|
||||||
|
const createTableOptions = {
|
||||||
|
url: `http://${host}:${port}/api/v1/sql?${createTable}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
request(createTableOptions, function (err, res) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(res.statusCode, 200);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function (done) {
|
||||||
|
const { host, port } = this;
|
||||||
|
|
||||||
|
const dropTable = querystring.stringify({ q: dropTableQuery, api_key: 1234 });
|
||||||
|
|
||||||
|
const dropTableOptions = {
|
||||||
|
url: `http://${host}:${port}/api/v1/sql?${dropTable}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
request(dropTableOptions, function (err, res) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(res.statusCode, 200);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function (done) {
|
||||||
|
this.listener.close(done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('abort on response', function (done) {
|
||||||
|
const { host, port } = this;
|
||||||
|
|
||||||
|
const copy = querystring.stringify({ q: copyQuery, api_key: 1234 });
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
url: `http://${host}:${port}/api/v1/sql/copyto?${copy}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
const req = request(options);
|
||||||
|
|
||||||
|
req.on('response', function () {
|
||||||
|
req.abort();
|
||||||
|
|
||||||
|
countInsertedRows(host, port, function (err, result) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(result.rows[0].count, 1);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('abort on data', function (done) {
|
||||||
|
const { host, port } = this;
|
||||||
|
|
||||||
|
const copy = querystring.stringify({ q: copyQuery, api_key: 1234 });
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
url: `http://${host}:${port}/api/v1/sql/copyto?${copy}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
const req = request(options);
|
||||||
|
|
||||||
|
req.once('data', function () {
|
||||||
|
req.abort();
|
||||||
|
|
||||||
|
countInsertedRows(host, port, function (err, result) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(result.rows[0].count, 1);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
it('destroy on data', function (done) {
|
||||||
|
const { host, port } = this;
|
||||||
|
|
||||||
|
const copy = querystring.stringify({ q: copyQuery, api_key: 1234 });
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
url: `http://${host}:${port}/api/v1/sql/copyto?${copy}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
const req = request(options);
|
||||||
|
|
||||||
|
let response;
|
||||||
|
|
||||||
|
req.on('response', function (res) {
|
||||||
|
response = res;
|
||||||
|
});
|
||||||
|
|
||||||
|
req.once('data', function () {
|
||||||
|
response.destroy();
|
||||||
|
|
||||||
|
countInsertedRows(host, port, function (err, result) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(result.rows[0].count, 1);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('destroy on response', function (done) {
|
||||||
|
const { host, port } = this;
|
||||||
|
|
||||||
|
const copy = querystring.stringify({ q: copyQuery, api_key: 1234 });
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
url: `http://${host}:${port}/api/v1/sql/copyto?${copy}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
const req = request(options);
|
||||||
|
|
||||||
|
req.on('response', function (response) {
|
||||||
|
response.destroy();
|
||||||
|
|
||||||
|
countInsertedRows(host, port, function (err, result) {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(result.rows[0].count, 1);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
@ -5,6 +5,7 @@ const querystring = require('querystring');
|
|||||||
const assert = require('../support/assert');
|
const assert = require('../support/assert');
|
||||||
const os = require('os');
|
const os = require('os');
|
||||||
const { Client } = require('pg');
|
const { Client } = require('pg');
|
||||||
|
const request = require('request');
|
||||||
|
|
||||||
const StatsClient = require('../../app/stats/client');
|
const StatsClient = require('../../app/stats/client');
|
||||||
if (global.settings.statsd) {
|
if (global.settings.statsd) {
|
||||||
@ -19,21 +20,27 @@ const server = require('../../app/server')(statsClient);
|
|||||||
|
|
||||||
|
|
||||||
describe('copy-endpoints', function() {
|
describe('copy-endpoints', function() {
|
||||||
describe('copy-endpoints', function() {
|
before(function() {
|
||||||
before(function(done) {
|
this.client = new Client({
|
||||||
const client = new Client({
|
|
||||||
user: 'postgres',
|
user: 'postgres',
|
||||||
host: 'localhost',
|
host: 'localhost',
|
||||||
database: 'cartodb_test_user_1_db',
|
database: 'cartodb_test_user_1_db',
|
||||||
port: 5432,
|
port: 5432,
|
||||||
});
|
});
|
||||||
client.connect();
|
this.client.connect();
|
||||||
client.query('TRUNCATE copy_endpoints_test', (err/*, res */) => {
|
});
|
||||||
client.end();
|
|
||||||
|
after(function() {
|
||||||
|
this.client.end();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(function (done) {
|
||||||
|
this.client.query('TRUNCATE copy_endpoints_test', err => {
|
||||||
done(err);
|
done(err);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('general', function() {
|
||||||
it('should work with copyfrom endpoint', function(done){
|
it('should work with copyfrom endpoint', function(done){
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
@ -109,6 +116,15 @@ describe('copy-endpoints', function() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should work with copyto endpoint', function(done){
|
it('should work with copyto endpoint', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
|
}),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err) {
|
||||||
|
assert.ifError(err);
|
||||||
|
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: "/api/v1/sql/copyto?" + querystring.stringify({
|
url: "/api/v1/sql/copyto?" + querystring.stringify({
|
||||||
q: 'COPY copy_endpoints_test TO STDOUT',
|
q: 'COPY copy_endpoints_test TO STDOUT',
|
||||||
@ -129,6 +145,7 @@ describe('copy-endpoints', function() {
|
|||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should fail with copyto endpoint and without sql', function(done){
|
it('should fail with copyto endpoint and without sql', function(done){
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
@ -152,7 +169,7 @@ describe('copy-endpoints', function() {
|
|||||||
it('should work with copyfrom and gzip', function(done){
|
it('should work with copyfrom and gzip', function(done){
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
q: "COPY copy_endpoints_test2 (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
}),
|
}),
|
||||||
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv.gz'),
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv.gz'),
|
||||||
headers: {
|
headers: {
|
||||||
@ -172,7 +189,7 @@ describe('copy-endpoints', function() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
describe('copy-endpoints timeout', function() {
|
describe('timeout', function() {
|
||||||
it('should fail with copyfrom and timeout', function(done){
|
it('should fail with copyfrom and timeout', function(done){
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: '/api/v1/sql?q=set statement_timeout = 10',
|
url: '/api/v1/sql?q=set statement_timeout = 10',
|
||||||
@ -214,7 +231,6 @@ describe('copy-endpoints', function() {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should fail with copyto and timeout', function(done){
|
it('should fail with copyto and timeout', function(done){
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: '/api/v1/sql?q=set statement_timeout = 20',
|
url: '/api/v1/sql?q=set statement_timeout = 20',
|
||||||
@ -239,7 +255,6 @@ describe('copy-endpoints', function() {
|
|||||||
};
|
};
|
||||||
const expectedError = res.body.substring(res.body.length - JSON.stringify(error).length);
|
const expectedError = res.body.substring(res.body.length - JSON.stringify(error).length);
|
||||||
assert.deepEqual(JSON.parse(expectedError), error);
|
assert.deepEqual(JSON.parse(expectedError), error);
|
||||||
|
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: "/api/v1/sql?q=set statement_timeout = 2000",
|
url: "/api/v1/sql?q=set statement_timeout = 2000",
|
||||||
headers: {host: 'vizzuality.cartodb.com'},
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
@ -252,7 +267,7 @@ describe('copy-endpoints', function() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
describe('copy-endpoints db connections', function() {
|
describe('db connections', function() {
|
||||||
before(function() {
|
before(function() {
|
||||||
this.db_pool_size = global.settings.db_pool_size;
|
this.db_pool_size = global.settings.db_pool_size;
|
||||||
global.settings.db_pool_size = 1;
|
global.settings.db_pool_size = 1;
|
||||||
@ -305,16 +320,27 @@ describe('copy-endpoints', function() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
|
}),
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err) {
|
||||||
|
assert.ifError(err);
|
||||||
|
|
||||||
Promise.all([doCopyTo(), doCopyTo(), doCopyTo()]).then(function() {
|
Promise.all([doCopyTo(), doCopyTo(), doCopyTo()]).then(function() {
|
||||||
done();
|
done();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('copy-endpoints client disconnection', function() {
|
describe('client disconnection', function() {
|
||||||
// Give it enough time to connect and issue the query
|
// Give it enough time to connect and issue the query
|
||||||
// but not too much so as to disconnect in the middle of the query.
|
// but not too much so as to disconnect in the middle of the query.
|
||||||
const client_disconnect_timeout = 10;
|
const CLIENT_DISCONNECT_TIMEOUT = 100;
|
||||||
|
|
||||||
before(function() {
|
before(function() {
|
||||||
this.db_pool_size = global.settings.db_pool_size;
|
this.db_pool_size = global.settings.db_pool_size;
|
||||||
@ -325,7 +351,7 @@ describe('copy-endpoints', function() {
|
|||||||
global.settings.db_pool_size = this.db_pool_size;
|
global.settings.db_pool_size = this.db_pool_size;
|
||||||
});
|
});
|
||||||
|
|
||||||
var assertCanReuseConnection = function (done) {
|
const assertCanReuseConnection = function (done) {
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: '/api/v1/sql?' + querystring.stringify({
|
url: '/api/v1/sql?' + querystring.stringify({
|
||||||
q: 'SELECT 1',
|
q: 'SELECT 1',
|
||||||
@ -339,36 +365,74 @@ describe('copy-endpoints', function() {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
it('COPY TO returns the connection to the pool if the client disconnects', function(done) {
|
const assertCanReuseCanceledConnection = function (done) {
|
||||||
assert.response(server, {
|
assert.response(server, {
|
||||||
url: '/api/v1/sql/copyto?' + querystring.stringify({
|
url: '/api/v1/sql?' + querystring.stringify({
|
||||||
q: 'COPY (SELECT * FROM generate_series(1, 100000)) TO STDOUT',
|
q: 'SELECT count(*) FROM copy_endpoints_test',
|
||||||
}),
|
}),
|
||||||
headers: { host: 'vizzuality.cartodb.com' },
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
method: 'GET',
|
method: 'GET'
|
||||||
timeout: client_disconnect_timeout
|
}, {}, function(err, res) {
|
||||||
}, {}, function(err) {
|
assert.ifError(err);
|
||||||
// we're expecting a timeout error
|
assert.ok(res.statusCode === 200);
|
||||||
assert.ok(err);
|
const result = JSON.parse(res.body);
|
||||||
assert.ok(err.code === 'ETIMEDOUT' || err.code === 'ESOCKETTIMEDOUT');
|
assert.strictEqual(result.rows[0].count, 0);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
it('COPY TO returns the connection to the pool if the client disconnects', function(done) {
|
||||||
|
const listener = server.listen(0, '127.0.0.1');
|
||||||
|
|
||||||
|
listener.on('error', done);
|
||||||
|
listener.on('listening', function onServerListening () {
|
||||||
|
|
||||||
|
const { address, port } = listener.address();
|
||||||
|
const query = querystring.stringify({
|
||||||
|
q: `COPY (SELECT * FROM generate_series(1, 1000)) TO STDOUT`
|
||||||
|
});
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
url: `http://${address}:${port}/api/v1/sql/copyto?${query}`,
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
};
|
||||||
|
|
||||||
|
const req = request(options);
|
||||||
|
|
||||||
|
req.once('data', () => req.abort());
|
||||||
|
req.on('response', response => {
|
||||||
|
response.on('end', () => {
|
||||||
assertCanReuseConnection(done);
|
assertCanReuseConnection(done);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('COPY FROM returns the connection to the pool if the client disconnects', function(done) {
|
it('COPY FROM returns the connection to the pool if the client disconnects', function(done) {
|
||||||
assert.response(server, {
|
const listener = server.listen(0, '127.0.0.1');
|
||||||
url: '/api/v1/sql/copyfrom?' + querystring.stringify({
|
|
||||||
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)",
|
listener.on('error', done);
|
||||||
}),
|
listener.on('listening', function onServerListening () {
|
||||||
|
|
||||||
|
const { address, port } = listener.address();
|
||||||
|
const query = querystring.stringify({
|
||||||
|
q: `COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)`
|
||||||
|
});
|
||||||
|
|
||||||
|
const options = {
|
||||||
|
url: `http://${address}:${port}/api/v1/sql/copyfrom?${query}`,
|
||||||
headers: { host: 'vizzuality.cartodb.com' },
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv')
|
||||||
timeout: client_disconnect_timeout
|
};
|
||||||
}, {}, function(err) {
|
|
||||||
// we're expecting a timeout error
|
const req = request(options);
|
||||||
assert.ok(err);
|
|
||||||
assert.ok(err.code === 'ETIMEDOUT' || err.code === 'ESOCKETTIMEDOUT');
|
setTimeout(() => {
|
||||||
assertCanReuseConnection(done);
|
req.abort();
|
||||||
|
assertCanReuseCanceledConnection(done);
|
||||||
|
}, CLIENT_DISCONNECT_TIMEOUT);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -226,12 +226,3 @@ CREATE TABLE copy_endpoints_test (
|
|||||||
);
|
);
|
||||||
GRANT ALL ON TABLE copy_endpoints_test TO :TESTUSER;
|
GRANT ALL ON TABLE copy_endpoints_test TO :TESTUSER;
|
||||||
GRANT ALL ON TABLE copy_endpoints_test TO :PUBLICUSER;
|
GRANT ALL ON TABLE copy_endpoints_test TO :PUBLICUSER;
|
||||||
|
|
||||||
DROP TABLE IF EXISTS copy_endpoints_test2;
|
|
||||||
CREATE TABLE copy_endpoints_test2 (
|
|
||||||
id integer,
|
|
||||||
name text,
|
|
||||||
age integer default 10
|
|
||||||
);
|
|
||||||
GRANT ALL ON TABLE copy_endpoints_test2 TO :TESTUSER;
|
|
||||||
GRANT ALL ON TABLE copy_endpoints_test2 TO :PUBLICUSER;
|
|
||||||
|
Loading…
Reference in New Issue
Block a user