Merge branch 'pgcopy-stream' of github.com:CartoDB/CartoDB-SQL-API into pgcopy-stream-q
This commit is contained in:
commit
ed696a96ec
@ -8,6 +8,7 @@ const timeoutLimitsMiddleware = require('../middlewares/timeout-limits');
|
||||
const { initializeProfilerMiddleware } = require('../middlewares/profiler');
|
||||
const rateLimitsMiddleware = require('../middlewares/rate-limit');
|
||||
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
|
||||
const { getFormatFromCopyQuery } = require('../utils/query_info');
|
||||
|
||||
const zlib = require('zlib');
|
||||
const PSQL = require('cartodb-psql');
|
||||
@ -15,10 +16,11 @@ const copyTo = require('pg-copy-streams').to;
|
||||
const copyFrom = require('pg-copy-streams').from;
|
||||
|
||||
|
||||
function CopyController(metadataBackend, userDatabaseService, userLimitsService) {
|
||||
function CopyController(metadataBackend, userDatabaseService, userLimitsService, statsClient) {
|
||||
this.metadataBackend = metadataBackend;
|
||||
this.userDatabaseService = userDatabaseService;
|
||||
this.userLimitsService = userLimitsService;
|
||||
this.statsClient = statsClient;
|
||||
}
|
||||
|
||||
CopyController.prototype.route = function (app) {
|
||||
@ -32,8 +34,8 @@ CopyController.prototype.route = function (app) {
|
||||
authorizationMiddleware(this.metadataBackend),
|
||||
connectionParamsMiddleware(this.userDatabaseService),
|
||||
timeoutLimitsMiddleware(this.metadataBackend),
|
||||
this.handleCopyFrom.bind(this),
|
||||
this.responseCopyFrom.bind(this),
|
||||
handleCopyFrom(),
|
||||
responseCopyFrom(),
|
||||
errorMiddleware()
|
||||
];
|
||||
};
|
||||
@ -46,7 +48,7 @@ CopyController.prototype.route = function (app) {
|
||||
authorizationMiddleware(this.metadataBackend),
|
||||
connectionParamsMiddleware(this.userDatabaseService),
|
||||
timeoutLimitsMiddleware(this.metadataBackend),
|
||||
this.handleCopyTo.bind(this),
|
||||
handleCopyTo(this.statsClient),
|
||||
errorMiddleware()
|
||||
];
|
||||
};
|
||||
@ -126,28 +128,92 @@ CopyController.prototype.handleCopyFrom = function (req, res, next) {
|
||||
total_rows: copyFromStream.rowCount
|
||||
};
|
||||
|
||||
return next();
|
||||
|
||||
});
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function handleCopyFrom () {
|
||||
return function handleCopyFromMiddleware (req, res, next) {
|
||||
const { sql } = req.query;
|
||||
|
||||
if (!sql) {
|
||||
return next(new Error("Parameter 'sql' is missing, must be in URL or first field in POST"));
|
||||
}
|
||||
|
||||
// Only accept SQL that starts with 'COPY'
|
||||
if (!sql.toUpperCase().startsWith("COPY ")) {
|
||||
return next(new Error("SQL must start with COPY"));
|
||||
}
|
||||
|
||||
res.locals.copyFromSize = 0;
|
||||
|
||||
try {
|
||||
const startTime = Date.now();
|
||||
|
||||
// Connect and run the COPY
|
||||
const pg = new PSQL(res.locals.userDbParams);
|
||||
pg.connect(function (err, client) {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
|
||||
let copyFromStream = copyFrom(sql);
|
||||
const pgstream = client.query(copyFromStream);
|
||||
pgstream
|
||||
.on('error', next)
|
||||
.on('end', function () {
|
||||
res.body = {
|
||||
time: (Date.now() - startTime) / 1000,
|
||||
total_rows: copyFromStream.rowCount
|
||||
};
|
||||
|
||||
return next();
|
||||
});
|
||||
|
||||
if (req.get('content-encoding') === 'gzip') {
|
||||
req.pipe(zlib.createGunzip()).pipe(pgstream);
|
||||
req
|
||||
.pipe(zlib.createGunzip())
|
||||
.on('data', data => res.locals.copyFromSize += data.length)
|
||||
.pipe(pgstream);
|
||||
} else {
|
||||
req.pipe(pgstream);
|
||||
req
|
||||
.on('data', data => res.locals.copyFromSize += data.length)
|
||||
.pipe(pgstream);
|
||||
}
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
CopyController.prototype.responseCopyFrom = function (req, res, next) {
|
||||
function responseCopyFrom () {
|
||||
return function responseCopyFromMiddleware (req, res, next) {
|
||||
if (!res.body || !res.body.total_rows) {
|
||||
return next(new Error("No rows copied"));
|
||||
}
|
||||
|
||||
res.send(res.body);
|
||||
if (req.profiler) {
|
||||
const copyFromMetrics = {
|
||||
size: res.locals.copyFromSize, //bytes
|
||||
format: getFormatFromCopyQuery(req.query.sql),
|
||||
time: res.body.time, //seconds
|
||||
total_rows: res.body.total_rows,
|
||||
gzip: req.get('content-encoding') === 'gzip'
|
||||
};
|
||||
|
||||
req.profiler.add({ copyFrom: copyFromMetrics });
|
||||
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
|
||||
}
|
||||
|
||||
res.send(res.body);
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = CopyController;
|
||||
|
@ -178,7 +178,8 @@ function App(statsClient) {
|
||||
var copyController = new CopyController(
|
||||
metadataBackend,
|
||||
userDatabaseService,
|
||||
userLimitsService
|
||||
userLimitsService,
|
||||
statsClient
|
||||
);
|
||||
copyController.route(app);
|
||||
|
||||
|
28
app/utils/query_info.js
Normal file
28
app/utils/query_info.js
Normal file
@ -0,0 +1,28 @@
|
||||
const COPY_FORMATS = ['TEXT', 'CSV', 'BINARY'];
|
||||
|
||||
module.exports = {
|
||||
getFormatFromCopyQuery(copyQuery) {
|
||||
let format = 'TEXT'; // Postgres default format
|
||||
|
||||
copyQuery = copyQuery.toUpperCase();
|
||||
|
||||
if (!copyQuery.startsWith("COPY ")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if(copyQuery.includes(' WITH ') && copyQuery.includes('FORMAT ')) {
|
||||
const regex = /\bFORMAT\s+(\w+)/;
|
||||
const result = regex.exec(copyQuery);
|
||||
|
||||
if (result && result.length === 2) {
|
||||
if (COPY_FORMATS.includes(result[1])) {
|
||||
format = result[1];
|
||||
} else {
|
||||
format = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return format;
|
||||
}
|
||||
};
|
@ -2,8 +2,20 @@ require('../helper');
|
||||
|
||||
const fs = require('fs');
|
||||
const querystring = require('querystring');
|
||||
const server = require('../../app/server')();
|
||||
const assert = require('../support/assert');
|
||||
const os = require('os');
|
||||
|
||||
const StatsClient = require('../../app/stats/client');
|
||||
if (global.settings.statsd) {
|
||||
// Perform keyword substitution in statsd
|
||||
if (global.settings.statsd.prefix) {
|
||||
const hostToken = os.hostname().split('.').reverse().join('.');
|
||||
global.settings.statsd.prefix = global.settings.statsd.prefix.replace(/:host/, hostToken);
|
||||
}
|
||||
}
|
||||
const statsClient = StatsClient.getInstance(global.settings.statsd);
|
||||
const server = require('../../app/server')(statsClient);
|
||||
|
||||
|
||||
describe('copy-endpoints', function() {
|
||||
it('should work with copyfrom endpoint', function(done){
|
||||
@ -19,6 +31,17 @@ describe('copy-endpoints', function() {
|
||||
const response = JSON.parse(res.body);
|
||||
assert.equal(!!response.time, true);
|
||||
assert.strictEqual(response.total_rows, 6);
|
||||
|
||||
assert.ok(res.headers['x-sqlapi-profiler']);
|
||||
const headers = JSON.parse(res.headers['x-sqlapi-profiler']);
|
||||
assert.ok(headers.copyFrom);
|
||||
const metrics = headers.copyFrom;
|
||||
assert.equal(metrics.size, 57);
|
||||
assert.equal(metrics.format, 'CSV');
|
||||
assert.equal(metrics.time, response.time);
|
||||
assert.equal(metrics.total_rows, response.total_rows);
|
||||
assert.equal(metrics.gzip, false);
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
@ -94,6 +117,10 @@ describe('copy-endpoints', function() {
|
||||
res.body,
|
||||
'11\tPaul\t10\n12\tPeter\t10\n13\tMatthew\t10\n14\t\\N\t10\n15\tJames\t10\n16\tJohn\t10\n'
|
||||
);
|
||||
|
||||
assert.equal(res.headers['content-disposition'], 'attachment; filename=%2Ftmp%2Foutput.dmp');
|
||||
assert.equal(res.headers['content-type'], 'application/octet-stream');
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
@ -114,6 +141,17 @@ describe('copy-endpoints', function() {
|
||||
const response = JSON.parse(res.body);
|
||||
assert.equal(!!response.time, true);
|
||||
assert.strictEqual(response.total_rows, 6);
|
||||
|
||||
assert.ok(res.headers['x-sqlapi-profiler']);
|
||||
const headers = JSON.parse(res.headers['x-sqlapi-profiler']);
|
||||
assert.ok(headers.copyFrom);
|
||||
const metrics = headers.copyFrom;
|
||||
assert.equal(metrics.size, 57);
|
||||
assert.equal(metrics.format, 'CSV');
|
||||
assert.equal(metrics.time, response.time);
|
||||
assert.equal(metrics.total_rows, response.total_rows);
|
||||
assert.equal(metrics.gzip, true);
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
63
test/unit/query_info.test.js
Normal file
63
test/unit/query_info.test.js
Normal file
@ -0,0 +1,63 @@
|
||||
const assert = require('assert');
|
||||
const queryInfo = require('../../app/utils/query_info');
|
||||
|
||||
describe('query info', function () {
|
||||
describe('copy format', function () {
|
||||
describe('csv', function () {
|
||||
const validQueries = [
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)",
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)",
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV , DELIMITER ',', HEADER true)",
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV)",
|
||||
];
|
||||
|
||||
validQueries.forEach(query => {
|
||||
it(query, function() {
|
||||
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||
assert.equal(result, 'CSV');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('text', function() {
|
||||
const validQueries = [
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT TEXT)",
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN",
|
||||
];
|
||||
|
||||
validQueries.forEach(query => {
|
||||
it(query, function() {
|
||||
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||
assert.equal(result, 'TEXT');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('binary', function() {
|
||||
const validQueries = [
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT BINARY)",
|
||||
];
|
||||
|
||||
validQueries.forEach(query => {
|
||||
it(query, function() {
|
||||
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||
assert.equal(result, 'BINARY');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('should fail', function() {
|
||||
const validQueries = [
|
||||
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT ERROR)",
|
||||
"SELECT * from copy_endpoints_test"
|
||||
];
|
||||
|
||||
validQueries.forEach(query => {
|
||||
it(query, function() {
|
||||
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||
assert.strictEqual(result, false);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue
Block a user