Merge pull request #480 from CartoDB/pgcopy-stream
/copyfrom and /copyto end points
This commit is contained in:
commit
0ce6205127
2
.gitignore
vendored
2
.gitignore
vendored
@ -12,3 +12,5 @@ test/test.log
|
|||||||
test/acceptance/oauth/venv/*
|
test/acceptance/oauth/venv/*
|
||||||
coverage/
|
coverage/
|
||||||
npm-debug.log
|
npm-debug.log
|
||||||
|
log/*.log
|
||||||
|
yarn.lock
|
152
app/controllers/copy_controller.js
Normal file
152
app/controllers/copy_controller.js
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const userMiddleware = require('../middlewares/user');
|
||||||
|
const errorMiddleware = require('../middlewares/error');
|
||||||
|
const authorizationMiddleware = require('../middlewares/authorization');
|
||||||
|
const connectionParamsMiddleware = require('../middlewares/connection-params');
|
||||||
|
const timeoutLimitsMiddleware = require('../middlewares/timeout-limits');
|
||||||
|
const { initializeProfilerMiddleware } = require('../middlewares/profiler');
|
||||||
|
const rateLimitsMiddleware = require('../middlewares/rate-limit');
|
||||||
|
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
|
||||||
|
const Logger = require('../services/logger');
|
||||||
|
const errorHandlerFactory = require('../services/error_handler_factory');
|
||||||
|
const streamCopy = require('../services/stream_copy');
|
||||||
|
|
||||||
|
function CopyController(metadataBackend, userDatabaseService, userLimitsService, statsClient) {
|
||||||
|
this.metadataBackend = metadataBackend;
|
||||||
|
this.userDatabaseService = userDatabaseService;
|
||||||
|
this.userLimitsService = userLimitsService;
|
||||||
|
this.statsClient = statsClient;
|
||||||
|
|
||||||
|
this.logger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion');
|
||||||
|
}
|
||||||
|
|
||||||
|
CopyController.prototype.route = function (app) {
|
||||||
|
const { base_url } = global.settings;
|
||||||
|
|
||||||
|
const copyFromMiddlewares = endpointGroup => {
|
||||||
|
return [
|
||||||
|
initializeProfilerMiddleware('copyfrom'),
|
||||||
|
userMiddleware(this.metadataBackend),
|
||||||
|
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
|
||||||
|
authorizationMiddleware(this.metadataBackend),
|
||||||
|
connectionParamsMiddleware(this.userDatabaseService),
|
||||||
|
timeoutLimitsMiddleware(this.metadataBackend),
|
||||||
|
validateCopyQuery(),
|
||||||
|
handleCopyFrom(this.logger),
|
||||||
|
errorHandler(),
|
||||||
|
errorMiddleware()
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
const copyToMiddlewares = endpointGroup => {
|
||||||
|
return [
|
||||||
|
initializeProfilerMiddleware('copyto'),
|
||||||
|
userMiddleware(this.metadataBackend),
|
||||||
|
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
|
||||||
|
authorizationMiddleware(this.metadataBackend),
|
||||||
|
connectionParamsMiddleware(this.userDatabaseService),
|
||||||
|
timeoutLimitsMiddleware(this.metadataBackend),
|
||||||
|
validateCopyQuery(),
|
||||||
|
handleCopyTo(this.logger),
|
||||||
|
errorHandler(),
|
||||||
|
errorMiddleware()
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
app.post(`${base_url}/sql/copyfrom`, copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_FROM));
|
||||||
|
app.get(`${base_url}/sql/copyto`, copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO));
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
function handleCopyTo (logger) {
|
||||||
|
return function handleCopyToMiddleware (req, res, next) {
|
||||||
|
const filename = req.query.filename || 'carto-sql-copyto.dmp';
|
||||||
|
|
||||||
|
res.header("Content-Disposition", `attachment; filename=${encodeURIComponent(filename)}`);
|
||||||
|
res.header("Content-Type", "application/octet-stream");
|
||||||
|
|
||||||
|
streamCopy.to(
|
||||||
|
res,
|
||||||
|
req.query.q,
|
||||||
|
res.locals.userDbParams,
|
||||||
|
res.locals.user,
|
||||||
|
logger,
|
||||||
|
function(err) {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is a especial endpoint
|
||||||
|
// the data from postgres is streamed to response directly
|
||||||
|
}
|
||||||
|
);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleCopyFrom (logger) {
|
||||||
|
return function handleCopyFromMiddleware (req, res, next) {
|
||||||
|
streamCopy.from(
|
||||||
|
req,
|
||||||
|
req.query.q,
|
||||||
|
res.locals.userDbParams,
|
||||||
|
res.locals.user,
|
||||||
|
req.get('content-encoding') === 'gzip',
|
||||||
|
logger,
|
||||||
|
function(err, metrics) { // TODO: remove when data-ingestion log works: {time, rows}
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: remove when data-ingestion log works
|
||||||
|
const { time, rows, type, format, gzip, size } = metrics;
|
||||||
|
|
||||||
|
if (!time || !rows) {
|
||||||
|
return next(new Error("No rows copied"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: remove when data-ingestion log works
|
||||||
|
if (req.profiler) {
|
||||||
|
req.profiler.add({copyFrom: { type, format, gzip, size, rows, time }});
|
||||||
|
res.header('X-SQLAPI-Profiler', req.profiler.toJSONString());
|
||||||
|
}
|
||||||
|
|
||||||
|
res.send({
|
||||||
|
time,
|
||||||
|
total_rows: rows
|
||||||
|
});
|
||||||
|
}
|
||||||
|
);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function validateCopyQuery () {
|
||||||
|
return function validateCopyQueryMiddleware (req, res, next) {
|
||||||
|
const sql = req.query.q;
|
||||||
|
|
||||||
|
if (!sql) {
|
||||||
|
next(new Error("SQL is missing"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only accept SQL that starts with 'COPY'
|
||||||
|
if (!sql.toUpperCase().startsWith("COPY ")) {
|
||||||
|
next(new Error("SQL must start with COPY"));
|
||||||
|
}
|
||||||
|
|
||||||
|
next();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function errorHandler () {
|
||||||
|
return function errorHandlerMiddleware (err, req, res, next) {
|
||||||
|
if (res.headersSent) {
|
||||||
|
const errorHandler = errorHandlerFactory(err);
|
||||||
|
res.write(JSON.stringify(errorHandler.getResponse()));
|
||||||
|
res.end();
|
||||||
|
} else {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = CopyController;
|
@ -1,5 +1,6 @@
|
|||||||
const util = require('util');
|
const util = require('util');
|
||||||
|
|
||||||
|
const bodyParserMiddleware = require('../middlewares/body-parser');
|
||||||
const userMiddleware = require('../middlewares/user');
|
const userMiddleware = require('../middlewares/user');
|
||||||
const { initializeProfilerMiddleware, finishProfilerMiddleware } = require('../middlewares/profiler');
|
const { initializeProfilerMiddleware, finishProfilerMiddleware } = require('../middlewares/profiler');
|
||||||
const authorizationMiddleware = require('../middlewares/authorization');
|
const authorizationMiddleware = require('../middlewares/authorization');
|
||||||
@ -30,21 +31,25 @@ JobController.prototype.route = function (app) {
|
|||||||
|
|
||||||
app.get(
|
app.get(
|
||||||
`${base_url}/jobs-wip`,
|
`${base_url}/jobs-wip`,
|
||||||
|
bodyParserMiddleware(),
|
||||||
listWorkInProgressJobs(this.jobService),
|
listWorkInProgressJobs(this.jobService),
|
||||||
sendResponse(),
|
sendResponse(),
|
||||||
errorMiddleware()
|
errorMiddleware()
|
||||||
);
|
);
|
||||||
app.post(
|
app.post(
|
||||||
`${base_url}/sql/job`,
|
`${base_url}/sql/job`,
|
||||||
|
bodyParserMiddleware(),
|
||||||
checkBodyPayloadSize(),
|
checkBodyPayloadSize(),
|
||||||
jobMiddlewares('create', createJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_CREATE)
|
jobMiddlewares('create', createJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_CREATE)
|
||||||
);
|
);
|
||||||
app.get(
|
app.get(
|
||||||
`${base_url}/sql/job/:job_id`,
|
`${base_url}/sql/job/:job_id`,
|
||||||
|
bodyParserMiddleware(),
|
||||||
jobMiddlewares('retrieve', getJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_GET)
|
jobMiddlewares('retrieve', getJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_GET)
|
||||||
);
|
);
|
||||||
app.delete(
|
app.delete(
|
||||||
`${base_url}/sql/job/:job_id`,
|
`${base_url}/sql/job/:job_id`,
|
||||||
|
bodyParserMiddleware(),
|
||||||
jobMiddlewares('cancel', cancelJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_DELETE)
|
jobMiddlewares('cancel', cancelJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_DELETE)
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
@ -12,6 +12,7 @@ var formats = require('../models/formats');
|
|||||||
|
|
||||||
var sanitize_filename = require('../utils/filename_sanitizer');
|
var sanitize_filename = require('../utils/filename_sanitizer');
|
||||||
var getContentDisposition = require('../utils/content_disposition');
|
var getContentDisposition = require('../utils/content_disposition');
|
||||||
|
const bodyParserMiddleware = require('../middlewares/body-parser');
|
||||||
const userMiddleware = require('../middlewares/user');
|
const userMiddleware = require('../middlewares/user');
|
||||||
const errorMiddleware = require('../middlewares/error');
|
const errorMiddleware = require('../middlewares/error');
|
||||||
const authorizationMiddleware = require('../middlewares/authorization');
|
const authorizationMiddleware = require('../middlewares/authorization');
|
||||||
@ -37,6 +38,7 @@ QueryController.prototype.route = function (app) {
|
|||||||
|
|
||||||
const queryMiddlewares = endpointGroup => {
|
const queryMiddlewares = endpointGroup => {
|
||||||
return [
|
return [
|
||||||
|
bodyParserMiddleware(),
|
||||||
initializeProfilerMiddleware('query'),
|
initializeProfilerMiddleware('query'),
|
||||||
userMiddleware(this.metadataBackend),
|
userMiddleware(this.metadataBackend),
|
||||||
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
|
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
|
||||||
|
@ -5,7 +5,9 @@ const RATE_LIMIT_ENDPOINTS_GROUPS = {
|
|||||||
QUERY_FORMAT: 'query_format',
|
QUERY_FORMAT: 'query_format',
|
||||||
JOB_CREATE: 'job_create',
|
JOB_CREATE: 'job_create',
|
||||||
JOB_GET: 'job_get',
|
JOB_GET: 'job_get',
|
||||||
JOB_DELETE: 'job_delete'
|
JOB_DELETE: 'job_delete',
|
||||||
|
COPY_FROM: 'copy_from',
|
||||||
|
COPY_TO: 'copy_to'
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
var express = require('express');
|
var express = require('express');
|
||||||
var bodyParser = require('./middlewares/body-parser');
|
|
||||||
var Profiler = require('./stats/profiler-proxy');
|
var Profiler = require('./stats/profiler-proxy');
|
||||||
var _ = require('underscore');
|
var _ = require('underscore');
|
||||||
var TableCacheFactory = require('./utils/table_cache_factory');
|
var TableCacheFactory = require('./utils/table_cache_factory');
|
||||||
@ -34,6 +33,7 @@ var cors = require('./middlewares/cors');
|
|||||||
|
|
||||||
var GenericController = require('./controllers/generic_controller');
|
var GenericController = require('./controllers/generic_controller');
|
||||||
var QueryController = require('./controllers/query_controller');
|
var QueryController = require('./controllers/query_controller');
|
||||||
|
var CopyController = require('./controllers/copy_controller');
|
||||||
var JobController = require('./controllers/job_controller');
|
var JobController = require('./controllers/job_controller');
|
||||||
var CacheStatusController = require('./controllers/cache_status_controller');
|
var CacheStatusController = require('./controllers/cache_status_controller');
|
||||||
var HealthCheckController = require('./controllers/health_check_controller');
|
var HealthCheckController = require('./controllers/health_check_controller');
|
||||||
@ -138,7 +138,6 @@ function App(statsClient) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
app.use(bodyParser());
|
|
||||||
app.enable('jsonp callback');
|
app.enable('jsonp callback');
|
||||||
app.set("trust proxy", true);
|
app.set("trust proxy", true);
|
||||||
app.disable('x-powered-by');
|
app.disable('x-powered-by');
|
||||||
@ -173,6 +172,13 @@ function App(statsClient) {
|
|||||||
);
|
);
|
||||||
queryController.route(app);
|
queryController.route(app);
|
||||||
|
|
||||||
|
var copyController = new CopyController(
|
||||||
|
metadataBackend,
|
||||||
|
userDatabaseService,
|
||||||
|
userLimitsService
|
||||||
|
);
|
||||||
|
copyController.route(app);
|
||||||
|
|
||||||
var jobController = new JobController(
|
var jobController = new JobController(
|
||||||
metadataBackend,
|
metadataBackend,
|
||||||
userDatabaseService,
|
userDatabaseService,
|
||||||
|
33
app/services/logger.js
Normal file
33
app/services/logger.js
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const bunyan = require('bunyan');
|
||||||
|
|
||||||
|
class Logger {
|
||||||
|
constructor (path, name) {
|
||||||
|
const stream = {
|
||||||
|
level: process.env.NODE_ENV === 'test' ? 'fatal' : 'info'
|
||||||
|
};
|
||||||
|
|
||||||
|
if (path) {
|
||||||
|
stream.path = path;
|
||||||
|
} else {
|
||||||
|
stream.stream = process.stdout;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.path = path;
|
||||||
|
this.logger = bunyan.createLogger({
|
||||||
|
name,
|
||||||
|
streams: [stream]
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
info (log, message) {
|
||||||
|
this.logger.info(log, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
warn (log, message) {
|
||||||
|
this.logger.warn(log, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = Logger;
|
132
app/services/stream_copy.js
Normal file
132
app/services/stream_copy.js
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
const zlib = require('zlib');
|
||||||
|
const PSQL = require('cartodb-psql');
|
||||||
|
const copyTo = require('pg-copy-streams').to;
|
||||||
|
const copyFrom = require('pg-copy-streams').from;
|
||||||
|
const StreamCopyMetrics = require('./stream_copy_metrics');
|
||||||
|
const { Client } = require('pg');
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
to (res, sql, userDbParams, user, logger, cb) {
|
||||||
|
let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user);
|
||||||
|
|
||||||
|
const pg = new PSQL(userDbParams);
|
||||||
|
pg.connect(function (err, client, done) {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
let responseEnded = false;
|
||||||
|
let connectionClosedByClient = false;
|
||||||
|
const copyToStream = copyTo(sql);
|
||||||
|
const pgstream = client.query(copyToStream);
|
||||||
|
|
||||||
|
res
|
||||||
|
.on('error', err => {
|
||||||
|
metrics.end(null, err);
|
||||||
|
pgstream.unpipe(res);
|
||||||
|
done();
|
||||||
|
return cb(err);
|
||||||
|
})
|
||||||
|
.on('close', () => {
|
||||||
|
if (!responseEnded) {
|
||||||
|
connectionClosedByClient = true;
|
||||||
|
// Cancel the running COPY TO query
|
||||||
|
// See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY
|
||||||
|
const runningClient = client;
|
||||||
|
const cancelingClient = new Client(runningClient.connectionParameters);
|
||||||
|
cancelingClient.cancel(runningClient, pgstream);
|
||||||
|
|
||||||
|
const err = new Error('Connection closed by client');
|
||||||
|
metrics.end(null, err);
|
||||||
|
pgstream.unpipe(res);
|
||||||
|
// see https://node-postgres.com/api/pool#releasecallback
|
||||||
|
done(err);
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on('end', () => responseEnded = true);
|
||||||
|
|
||||||
|
pgstream
|
||||||
|
.on('error', err => {
|
||||||
|
if (!connectionClosedByClient) {
|
||||||
|
metrics.end(null, err);
|
||||||
|
pgstream.unpipe(res);
|
||||||
|
done(err);
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on('data', data => metrics.addSize(data.length))
|
||||||
|
.on('end', () => {
|
||||||
|
metrics.end(copyToStream.rowCount);
|
||||||
|
done();
|
||||||
|
return cb(null, metrics);
|
||||||
|
})
|
||||||
|
.pipe(res);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
|
||||||
|
from (req, sql, userDbParams, user, gzip, logger, cb) {
|
||||||
|
let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip);
|
||||||
|
|
||||||
|
const pg = new PSQL(userDbParams);
|
||||||
|
pg.connect(function (err, client, done) {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
let copyFromStream = copyFrom(sql);
|
||||||
|
const pgstream = client.query(copyFromStream);
|
||||||
|
pgstream
|
||||||
|
.on('error', err => {
|
||||||
|
metrics.end(null, err);
|
||||||
|
req.unpipe(pgstream);
|
||||||
|
done();
|
||||||
|
return cb(err);
|
||||||
|
})
|
||||||
|
.on('end', function () {
|
||||||
|
metrics.end(copyFromStream.rowCount);
|
||||||
|
done();
|
||||||
|
return cb(null, metrics);
|
||||||
|
});
|
||||||
|
|
||||||
|
let requestEnded = false;
|
||||||
|
|
||||||
|
req
|
||||||
|
.on('error', err => {
|
||||||
|
metrics.end(null, err);
|
||||||
|
req.unpipe(pgstream);
|
||||||
|
pgstream.end();
|
||||||
|
done();
|
||||||
|
return cb(err);
|
||||||
|
})
|
||||||
|
.on('close', () => {
|
||||||
|
if (!requestEnded) {
|
||||||
|
const err = new Error('Connection closed by client');
|
||||||
|
metrics.end(null, err);
|
||||||
|
const connection = client.connection;
|
||||||
|
connection.sendCopyFail('CARTO SQL API: Connection closed by client');
|
||||||
|
req.unpipe(pgstream);
|
||||||
|
done();
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on('data', data => {
|
||||||
|
if (gzip) {
|
||||||
|
metrics.addGzipSize(data.length);
|
||||||
|
} else {
|
||||||
|
metrics.addSize(data.length);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.on('end', () => requestEnded = true);
|
||||||
|
|
||||||
|
if (gzip) {
|
||||||
|
req
|
||||||
|
.pipe(zlib.createGunzip())
|
||||||
|
.on('data', data => metrics.addSize(data.length))
|
||||||
|
.pipe(pgstream);
|
||||||
|
} else {
|
||||||
|
req.pipe(pgstream);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
79
app/services/stream_copy_metrics.js
Normal file
79
app/services/stream_copy_metrics.js
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
const { getFormatFromCopyQuery } = require('../utils/query_info');
|
||||||
|
|
||||||
|
module.exports = class StreamCopyMetrics {
|
||||||
|
constructor(logger, type, sql, user, gzip = null) {
|
||||||
|
this.logger = logger;
|
||||||
|
|
||||||
|
this.type = type;
|
||||||
|
this.format = getFormatFromCopyQuery(sql);
|
||||||
|
this.gzip = gzip;
|
||||||
|
this.username = user;
|
||||||
|
this.size = 0;
|
||||||
|
this.gzipSize = 0;
|
||||||
|
this.rows = 0;
|
||||||
|
|
||||||
|
this.startTime = new Date();
|
||||||
|
this.endTime = null;
|
||||||
|
this.time = null;
|
||||||
|
|
||||||
|
this.error = null;
|
||||||
|
|
||||||
|
this.ended = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
addSize(size) {
|
||||||
|
this.size += size;
|
||||||
|
}
|
||||||
|
|
||||||
|
addGzipSize(size) {
|
||||||
|
this.gzipSize += size;
|
||||||
|
}
|
||||||
|
|
||||||
|
end(rows = null, error = null) {
|
||||||
|
if (this.ended) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.ended = true;
|
||||||
|
|
||||||
|
if (Number.isInteger(rows)) {
|
||||||
|
this.rows = rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (error instanceof Error) {
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.endTime = new Date();
|
||||||
|
this.time = (this.endTime.getTime() - this.startTime.getTime()) / 1000;
|
||||||
|
|
||||||
|
this._log(
|
||||||
|
this.startTime.toISOString(),
|
||||||
|
this.gzip && this.gzipSize ? this.gzipSize : null,
|
||||||
|
this.error ? this.error.message : null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
_log(timestamp, gzipSize = null, errorMessage = null) {
|
||||||
|
let logData = {
|
||||||
|
type: this.type,
|
||||||
|
format: this.format,
|
||||||
|
size: this.size,
|
||||||
|
rows: this.rows,
|
||||||
|
gzip: this.gzip,
|
||||||
|
username: this.username,
|
||||||
|
time: this.time,
|
||||||
|
timestamp
|
||||||
|
};
|
||||||
|
|
||||||
|
if (gzipSize) {
|
||||||
|
logData.gzipSize = gzipSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (errorMessage) {
|
||||||
|
logData.error = errorMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.info(logData);
|
||||||
|
}
|
||||||
|
};
|
29
app/utils/query_info.js
Normal file
29
app/utils/query_info.js
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
const COPY_FORMATS = ['TEXT', 'CSV', 'BINARY'];
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
getFormatFromCopyQuery(copyQuery) {
|
||||||
|
let format = 'TEXT'; // Postgres default format
|
||||||
|
|
||||||
|
copyQuery = copyQuery.toUpperCase();
|
||||||
|
|
||||||
|
if (!copyQuery.startsWith("COPY ")) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(copyQuery.includes(' WITH') && copyQuery.includes('FORMAT ')) {
|
||||||
|
const regex = /\bFORMAT\s+(\w+)/;
|
||||||
|
const result = regex.exec(copyQuery);
|
||||||
|
|
||||||
|
if (result && result.length === 2) {
|
||||||
|
if (COPY_FORMATS.includes(result[1])) {
|
||||||
|
format = result[1];
|
||||||
|
format = format.toUpperCase();
|
||||||
|
} else {
|
||||||
|
format = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return format;
|
||||||
|
}
|
||||||
|
};
|
@ -1,29 +1,19 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
var bunyan = require('bunyan');
|
const Logger = require('../app/services/logger');
|
||||||
|
|
||||||
function BatchLogger (path) {
|
class BatchLogger extends Logger {
|
||||||
var stream = {
|
constructor (path, name) {
|
||||||
level: process.env.NODE_ENV === 'test' ? 'fatal' : 'info'
|
super(path, name);
|
||||||
};
|
}
|
||||||
if (path) {
|
|
||||||
stream.path = path;
|
log (job) {
|
||||||
} else {
|
return job.log(this.logger);
|
||||||
stream.stream = process.stdout;
|
}
|
||||||
|
|
||||||
|
reopenFileStreams () {
|
||||||
|
this.logger.reopenFileStreams();
|
||||||
}
|
}
|
||||||
this.path = path;
|
|
||||||
this.logger = bunyan.createLogger({
|
|
||||||
name: 'batch-queries',
|
|
||||||
streams: [stream]
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = BatchLogger;
|
module.exports = BatchLogger;
|
||||||
|
|
||||||
BatchLogger.prototype.log = function (job) {
|
|
||||||
return job.log(this.logger);
|
|
||||||
};
|
|
||||||
|
|
||||||
BatchLogger.prototype.reopenFileStreams = function () {
|
|
||||||
this.logger.reopenFileStreams();
|
|
||||||
};
|
|
||||||
|
@ -24,7 +24,7 @@ module.exports = function batchFactory (metadataBackend, redisPool, name, statsd
|
|||||||
var jobCanceller = new JobCanceller();
|
var jobCanceller = new JobCanceller();
|
||||||
var jobService = new JobService(jobBackend, jobCanceller);
|
var jobService = new JobService(jobBackend, jobCanceller);
|
||||||
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, metadataBackend, statsdClient);
|
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, metadataBackend, statsdClient);
|
||||||
var logger = new BatchLogger(loggerPath);
|
var logger = new BatchLogger(loggerPath, 'batch-queries');
|
||||||
|
|
||||||
return new Batch(
|
return new Batch(
|
||||||
name,
|
name,
|
||||||
|
@ -122,3 +122,4 @@ module.exports.ratelimits = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
module.exports.validatePGEntitiesAccess = false;
|
module.exports.validatePGEntitiesAccess = false;
|
||||||
|
module.exports.dataIngestionLogPath = 'logs/data-ingestion.log';
|
||||||
|
@ -126,3 +126,4 @@ module.exports.ratelimits = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
module.exports.validatePGEntitiesAccess = false;
|
module.exports.validatePGEntitiesAccess = false;
|
||||||
|
module.exports.dataIngestionLogPath = 'logs/data-ingestion.log';
|
||||||
|
@ -123,3 +123,4 @@ module.exports.ratelimits = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
module.exports.validatePGEntitiesAccess = false;
|
module.exports.validatePGEntitiesAccess = false;
|
||||||
|
module.exports.dataIngestionLogPath = 'logs/data-ingestion.log';
|
||||||
|
@ -123,3 +123,4 @@ module.exports.ratelimits = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
module.exports.validatePGEntitiesAccess = false;
|
module.exports.validatePGEntitiesAccess = false;
|
||||||
|
module.exports.dataIngestionLogPath = 'logs/data-ingestion.log';
|
||||||
|
@ -17,6 +17,7 @@ Remember that in order to access, read or modify data in private tables, you wil
|
|||||||
* [Making calls to the SQL API](making_calls.md)
|
* [Making calls to the SQL API](making_calls.md)
|
||||||
* [Creating Tables with the SQL API](creating_tables.md)
|
* [Creating Tables with the SQL API](creating_tables.md)
|
||||||
* [Batch Queries](batch_queries.md)
|
* [Batch Queries](batch_queries.md)
|
||||||
|
* [Copy Queries](copy_queries.md)
|
||||||
* [Handling Geospatial Data](handling_geospatial_data.md)
|
* [Handling Geospatial Data](handling_geospatial_data.md)
|
||||||
* [Query Optimizations](query_optimizations.md)
|
* [Query Optimizations](query_optimizations.md)
|
||||||
* [API Version Vumber](version.md)
|
* [API Version Vumber](version.md)
|
||||||
|
193
doc/copy_queries.md
Normal file
193
doc/copy_queries.md
Normal file
@ -0,0 +1,193 @@
|
|||||||
|
# Copy Queries
|
||||||
|
|
||||||
|
Copy queries allow you to use the [PostgreSQL copy command](https://www.postgresql.org/docs/10/static/sql-copy.html) for efficient streaming of data to and from CARTO.
|
||||||
|
|
||||||
|
The support for copy is split across two API end points:
|
||||||
|
|
||||||
|
* `http://{username}.carto.com/api/v2/sql/copyfrom` for uploading data to CARTO
|
||||||
|
* `http://{username}.carto.com/api/v2/sql/copyto` for exporting data out of CARTO
|
||||||
|
|
||||||
|
## Copy From
|
||||||
|
|
||||||
|
The PostgreSQL `COPY` command is extremely fast, but requires very precise inputs:
|
||||||
|
|
||||||
|
* A `COPY` command that describes the table and columns of the upload file, and the format of the file.
|
||||||
|
* An upload file that exactly matches the `COPY` command.
|
||||||
|
|
||||||
|
If the `COPY` command, the supplied file, and the target table do not all match, the upload will fail.
|
||||||
|
|
||||||
|
"Copy from" copies data "from" your file, "to" CARTO. "Copy from" uses chunked encoding (`Transfer-Encoding: chunked`) to stream an upload file to the server. This avoids limitations around file size and any need for temporary storage: the data travels from your file straight into the database.
|
||||||
|
|
||||||
|
Parameter | Description
|
||||||
|
--- | ---
|
||||||
|
`api_key` | a write-enabled key
|
||||||
|
`q` | the `COPY` command to load the data
|
||||||
|
|
||||||
|
**The actual COPY file content must be sent as the body of the POST request.**
|
||||||
|
|
||||||
|
Composing a chunked POST is moderately complicated, so most developers will use a tool or scripting language to upload data to CARTO via "copy from".
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
For a table to be readable by CARTO, it must have a minimum of three columns with specific data types:
|
||||||
|
|
||||||
|
* `cartodb_id`, a `serial` primary key
|
||||||
|
* `the_geom`, a `geometry` in the ESPG:4326 projection (aka long/lat)
|
||||||
|
* `the_geom_webmercator`, a `geometry` in the ESPG:3857 projection (aka web mercator)
|
||||||
|
|
||||||
|
Creating a new CARTO table with all the right triggers and columns can be tricky, so here is an example:
|
||||||
|
|
||||||
|
-- create the table using the *required* columns and a
|
||||||
|
-- couple more
|
||||||
|
CREATE TABLE upload_example (
|
||||||
|
the_geom geometry,
|
||||||
|
name text,
|
||||||
|
age integer
|
||||||
|
);
|
||||||
|
|
||||||
|
-- adds the 'cartodb_id' and 'the_geom_webmercator'
|
||||||
|
-- adds the required triggers and indexes
|
||||||
|
SELECT CDB_CartodbfyTable('upload_example');
|
||||||
|
|
||||||
|
Now you are ready to upload your file. Suppose you have a CSV file like this:
|
||||||
|
|
||||||
|
the_geom,name,age
|
||||||
|
SRID=4326;POINT(-126 54),North West,89
|
||||||
|
SRID=4326;POINT(-96 34),South East,99
|
||||||
|
SRID=4326;POINT(-6 -25),Souther Easter,124
|
||||||
|
|
||||||
|
The `COPY` command to upload this file needs to specify the file format (CSV), the fact that there is a header line before the actual data begins, and to enumerate the columns that are in the file so they can be matched to the table columns.
|
||||||
|
|
||||||
|
COPY upload_example (the_geom, name, age)
|
||||||
|
FROM STDIN WITH (FORMAT csv, HEADER true)
|
||||||
|
|
||||||
|
The `FROM STDIN` option tells the database that the input will come from a data stream, and the SQL API will read our uploaded file and use it to feed the stream.
|
||||||
|
|
||||||
|
To actually run upload, you will need a tool or script that can generate a chunked POST, so here are a few examples in different languages.
|
||||||
|
|
||||||
|
### CURL Example
|
||||||
|
|
||||||
|
The [curl](https://curl.haxx.se/) utility makes it easy to run web requests from the command-line, and supports chunked POST upload, so it can feed the `copyfrom` end point.
|
||||||
|
|
||||||
|
Assuming that you have already created the table, and that the CSV file is named "upload_example.csv":
|
||||||
|
|
||||||
|
curl -X POST \
|
||||||
|
-H "Transfer-Encoding: chunked" \
|
||||||
|
-H "Content-Type: application/octet-stream" \
|
||||||
|
--data-binary @upload_example.csv \
|
||||||
|
"http://{username}.carto.com/api/v2/sql/copyfrom?api_key={api_key}&q=COPY+upload_example+(the_geom,+name,+age)+FROM+STDIN+WITH+(FORMAT+csv,+HEADER+true)"
|
||||||
|
|
||||||
|
To upload a larger file, using compression for a faster transfer, first compress the file, and then upload it with the content encoding set:
|
||||||
|
|
||||||
|
curl -X POST \
|
||||||
|
-H "Content-Encoding: gzip" \
|
||||||
|
-H "Transfer-Encoding: chunked" \
|
||||||
|
-H "Content-Type: application/octet-stream" \
|
||||||
|
--data-binary @upload_example.csv.gz \
|
||||||
|
"http://{username}.carto.com/api/v2/sql/copyfrom?api_key={api_key}&q=COPY+upload_example+(the_geom,+name,+age)+FROM+STDIN+WITH+(FORMAT+csv,+HEADER+true)"
|
||||||
|
|
||||||
|
|
||||||
|
### Python Example
|
||||||
|
|
||||||
|
The [Requests](http://docs.python-requests.org/en/master/user/quickstart/) library for HTTP makes doing a file upload relatively terse.
|
||||||
|
|
||||||
|
```python
|
||||||
|
import requests
|
||||||
|
|
||||||
|
api_key = {api_key}
|
||||||
|
username = {api_key}
|
||||||
|
upload_file = 'upload_example.csv'
|
||||||
|
q = "COPY upload_example (the_geom, name, age) FROM STDIN WITH (FORMAT csv, HEADER true)"
|
||||||
|
|
||||||
|
url = "http://%s.carto.com/api/v2/sql/copyfrom" % username
|
||||||
|
with open(upload_file, 'rb') as f:
|
||||||
|
r = requests.post(url, params={'api_key': api_key, 'q': q}, data=f, stream=True)
|
||||||
|
|
||||||
|
if r.status_code != 200:
|
||||||
|
print(r.text)
|
||||||
|
else:
|
||||||
|
status = r.json()
|
||||||
|
print("Success: %s rows imported" % status['total_rows'])
|
||||||
|
```
|
||||||
|
|
||||||
|
A slightly more sophisticated script could read the headers from the CSV and compose the `COPY` command on the fly.
|
||||||
|
|
||||||
|
### Response Format
|
||||||
|
|
||||||
|
A successful upload will return with status code 200, and a small JSON with information about the upload.
|
||||||
|
|
||||||
|
{"time":0.046,"total_rows":10}
|
||||||
|
|
||||||
|
A failed upload will return with status code 400 and a larger JSON with the PostgreSQL error string, and a stack trace from the SQL API.
|
||||||
|
|
||||||
|
{"error":["Unexpected field"]}
|
||||||
|
|
||||||
|
## Copy To
|
||||||
|
|
||||||
|
"Copy to" copies data "to" your desired output file, "from" CARTO.
|
||||||
|
|
||||||
|
Using the `copyto` end point to extract data bypasses the usual JSON formatting applied by the SQL API, so it can dump more data, faster. However, it has the restriction that it will only output in a handful of formats:
|
||||||
|
|
||||||
|
* PgSQL [text format](https://www.postgresql.org/docs/10/static/sql-copy.html#id-1.9.3.52.9.2),
|
||||||
|
* [CSV](https://www.postgresql.org/docs/10/static/sql-copy.html#id-1.9.3.52.9.3), and
|
||||||
|
* PgSQL [binary format](https://www.postgresql.org/docs/10/static/sql-copy.html#id-1.9.3.52.9.4).
|
||||||
|
|
||||||
|
"Copy to" is a simple HTTP GET end point, so any tool or language can be easily used to download data, supplying the following parameters in the URL.
|
||||||
|
|
||||||
|
Parameter | Description
|
||||||
|
--- | ---
|
||||||
|
`api_key` | your API key for reading non-public tables
|
||||||
|
`q` | the `COPY` command to extract the data
|
||||||
|
`filename` | filename to put in the "Content-disposition" HTTP header, useful for tools that automatically save the download to a file name
|
||||||
|
|
||||||
|
|
||||||
|
### CURL Example
|
||||||
|
|
||||||
|
The SQL to start a "copy to" can specify
|
||||||
|
|
||||||
|
* a table to read,
|
||||||
|
* a table and subset of columns to read, or
|
||||||
|
* an arbitrary SQL query to execute and read.
|
||||||
|
|
||||||
|
For our example, we'll read back just the three columns we originally loaded:
|
||||||
|
|
||||||
|
COPY upload_example (the_geom, name, age) TO stdout WITH (FORMAT csv, HEADER true)
|
||||||
|
|
||||||
|
The SQL needs to be URL-encoded before being embedded in the CURL command, so the final result looks like this:
|
||||||
|
|
||||||
|
curl \
|
||||||
|
--output upload_example_dl.csv \
|
||||||
|
--compressed \
|
||||||
|
"http://{username}.carto.com/api/v2/sql/copyto?q=COPY+upload_example+(the_geom,name,age)+TO+stdout+WITH(FORMAT+csv,HEADER+true)&api_key={api_key}"
|
||||||
|
|
||||||
|
### Python Example
|
||||||
|
|
||||||
|
The Python to "copy to" is very simple, because the HTTP call is a simple get. The only complexity in this example is at the end, where the result is streamed back block-by-block, to avoid pulling the entire download into memory before writing to file.
|
||||||
|
|
||||||
|
```python
|
||||||
|
import requests
|
||||||
|
import re
|
||||||
|
|
||||||
|
api_key = {api_key}
|
||||||
|
username = {api_key}
|
||||||
|
download_file = 'upload_example_dl.csv'
|
||||||
|
q = "COPY upload_example (the_geom, name, age) TO stdout WITH (FORMAT csv, HEADER true)"
|
||||||
|
|
||||||
|
# request the download, specifying desired file name
|
||||||
|
url = "http://%s.carto.com/api/v2/sql/copyto" % username
|
||||||
|
r = requests.get(url, params={'api_key': api_key, 'q': q, 'filename': download_file}, stream=True)
|
||||||
|
r.raise_for_status()
|
||||||
|
|
||||||
|
# read save file name from response headers
|
||||||
|
d = r.headers['content-disposition']
|
||||||
|
savefilename = re.findall("filename=(.+)", d)
|
||||||
|
|
||||||
|
if len(savefilename) > 0:
|
||||||
|
with open(savefilename[0], 'wb') as handle:
|
||||||
|
for block in r.iter_content(1024):
|
||||||
|
handle.write(block)
|
||||||
|
print("Downloaded to: %s" % savefilename)
|
||||||
|
else:
|
||||||
|
print("Error: could not find read file name from headers")
|
||||||
|
```
|
||||||
|
|
11
npm-shrinkwrap.json
generated
11
npm-shrinkwrap.json
generated
@ -180,9 +180,9 @@
|
|||||||
"resolved": "https://registry.npmjs.org/cartodb-query-tables/-/cartodb-query-tables-0.2.0.tgz"
|
"resolved": "https://registry.npmjs.org/cartodb-query-tables/-/cartodb-query-tables-0.2.0.tgz"
|
||||||
},
|
},
|
||||||
"cartodb-redis": {
|
"cartodb-redis": {
|
||||||
"version": "1.0.0",
|
"version": "1.0.1",
|
||||||
"from": "cartodb-redis@1.0.0",
|
"from": "git://github.com/CartoDB/node-cartodb-redis.git#remove-auth-fallback",
|
||||||
"resolved": "https://registry.npmjs.org/cartodb-redis/-/cartodb-redis-1.0.0.tgz"
|
"resolved": "git://github.com/CartoDB/node-cartodb-redis.git#c25114a149ad3522e0e5d6850dca45e67332f98f"
|
||||||
},
|
},
|
||||||
"caseless": {
|
"caseless": {
|
||||||
"version": "0.11.0",
|
"version": "0.11.0",
|
||||||
@ -851,6 +851,11 @@
|
|||||||
"from": "pg-connection-string@0.1.3",
|
"from": "pg-connection-string@0.1.3",
|
||||||
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-0.1.3.tgz"
|
"resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-0.1.3.tgz"
|
||||||
},
|
},
|
||||||
|
"pg-copy-streams": {
|
||||||
|
"version": "1.2.0",
|
||||||
|
"from": "cartodb/node-pg-copy-streams#v1.2.0-carto.1",
|
||||||
|
"resolved": "git://github.com/cartodb/node-pg-copy-streams.git#d7e5c1383ff9b43890f2c37bc38cfed1afc2523f"
|
||||||
|
},
|
||||||
"pg-int8": {
|
"pg-int8": {
|
||||||
"version": "1.0.1",
|
"version": "1.0.1",
|
||||||
"from": "pg-int8@1.0.1",
|
"from": "pg-int8@1.0.1",
|
||||||
|
@ -28,10 +28,11 @@
|
|||||||
"express": "~4.13.3",
|
"express": "~4.13.3",
|
||||||
"log4js": "cartodb/log4js-node#cdb",
|
"log4js": "cartodb/log4js-node#cdb",
|
||||||
"lru-cache": "~2.5.0",
|
"lru-cache": "~2.5.0",
|
||||||
"multer": "~1.2.0",
|
"multer": "~1.2.0",
|
||||||
"node-statsd": "~0.0.7",
|
"node-statsd": "~0.0.7",
|
||||||
"node-uuid": "^1.4.7",
|
"node-uuid": "^1.4.7",
|
||||||
"oauth-client": "0.3.0",
|
"oauth-client": "0.3.0",
|
||||||
|
"pg-copy-streams": "cartodb/node-pg-copy-streams#v1.2.0-carto.1",
|
||||||
"qs": "~6.2.1",
|
"qs": "~6.2.1",
|
||||||
"queue-async": "~1.0.7",
|
"queue-async": "~1.0.7",
|
||||||
"redis-mpool": "0.5.0",
|
"redis-mpool": "0.5.0",
|
||||||
|
395
test/acceptance/copy-endpoints.js
Normal file
395
test/acceptance/copy-endpoints.js
Normal file
@ -0,0 +1,395 @@
|
|||||||
|
require('../helper');
|
||||||
|
|
||||||
|
const fs = require('fs');
|
||||||
|
const querystring = require('querystring');
|
||||||
|
const assert = require('../support/assert');
|
||||||
|
const os = require('os');
|
||||||
|
const { Client } = require('pg');
|
||||||
|
|
||||||
|
const StatsClient = require('../../app/stats/client');
|
||||||
|
if (global.settings.statsd) {
|
||||||
|
// Perform keyword substitution in statsd
|
||||||
|
if (global.settings.statsd.prefix) {
|
||||||
|
const hostToken = os.hostname().split('.').reverse().join('.');
|
||||||
|
global.settings.statsd.prefix = global.settings.statsd.prefix.replace(/:host/, hostToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const statsClient = StatsClient.getInstance(global.settings.statsd);
|
||||||
|
const server = require('../../app/server')(statsClient);
|
||||||
|
|
||||||
|
|
||||||
|
describe('copy-endpoints', function() {
|
||||||
|
before(function(done) {
|
||||||
|
const client = new Client({
|
||||||
|
user: 'postgres',
|
||||||
|
host: 'localhost',
|
||||||
|
database: 'cartodb_test_user_1_db',
|
||||||
|
port: 5432,
|
||||||
|
});
|
||||||
|
client.connect();
|
||||||
|
client.query('TRUNCATE copy_endpoints_test', (err/*, res */) => {
|
||||||
|
client.end();
|
||||||
|
done(err);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should work with copyfrom endpoint', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
|
}),
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
const response = JSON.parse(res.body);
|
||||||
|
assert.equal(!!response.time, true);
|
||||||
|
assert.strictEqual(response.total_rows, 6);
|
||||||
|
|
||||||
|
assert.ok(res.headers['x-sqlapi-profiler']);
|
||||||
|
const headers = JSON.parse(res.headers['x-sqlapi-profiler']);
|
||||||
|
assert.ok(headers.copyFrom);
|
||||||
|
const metrics = headers.copyFrom;
|
||||||
|
assert.equal(metrics.size, 57);
|
||||||
|
assert.equal(metrics.format, 'CSV');
|
||||||
|
assert.equal(metrics.time, response.time);
|
||||||
|
assert.equal(metrics.rows, response.total_rows);
|
||||||
|
assert.equal(metrics.gzip, false);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should fail with copyfrom endpoint and unexisting table', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: "COPY unexisting_table (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
|
}),
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepEqual(
|
||||||
|
JSON.parse(res.body),
|
||||||
|
{
|
||||||
|
error:['relation \"unexisting_table\" does not exist']
|
||||||
|
}
|
||||||
|
);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should fail with copyfrom endpoint and without csv', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
|
}),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepEqual(
|
||||||
|
JSON.parse(res.body),
|
||||||
|
{
|
||||||
|
error:['No rows copied']
|
||||||
|
}
|
||||||
|
);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should fail with copyfrom endpoint and without q', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom",
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepEqual(
|
||||||
|
JSON.parse(res.body),
|
||||||
|
{
|
||||||
|
error:["SQL is missing"]
|
||||||
|
}
|
||||||
|
);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should work with copyto endpoint', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyto?" + querystring.stringify({
|
||||||
|
q: 'COPY copy_endpoints_test TO STDOUT',
|
||||||
|
filename: '/tmp/output.dmp'
|
||||||
|
}),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.strictEqual(
|
||||||
|
res.body,
|
||||||
|
'11\tPaul\t10\n12\tPeter\t10\n13\tMatthew\t10\n14\t\\N\t10\n15\tJames\t10\n16\tJohn\t10\n'
|
||||||
|
);
|
||||||
|
|
||||||
|
assert.equal(res.headers['content-disposition'], 'attachment; filename=%2Ftmp%2Foutput.dmp');
|
||||||
|
assert.equal(res.headers['content-type'], 'application/octet-stream');
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should fail with copyto endpoint and without sql', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyto?" + querystring.stringify({
|
||||||
|
filename: '/tmp/output.dmp'
|
||||||
|
}),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepEqual(
|
||||||
|
JSON.parse(res.body),
|
||||||
|
{
|
||||||
|
error:["SQL is missing"]
|
||||||
|
}
|
||||||
|
);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should work with copyfrom and gzip', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: "COPY copy_endpoints_test2 (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
|
}),
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv.gz'),
|
||||||
|
headers: {
|
||||||
|
host: 'vizzuality.cartodb.com',
|
||||||
|
'content-encoding': 'gzip'
|
||||||
|
},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
const response = JSON.parse(res.body);
|
||||||
|
assert.equal(!!response.time, true);
|
||||||
|
assert.strictEqual(response.total_rows, 6);
|
||||||
|
|
||||||
|
assert.ok(res.headers['x-sqlapi-profiler']);
|
||||||
|
const headers = JSON.parse(res.headers['x-sqlapi-profiler']);
|
||||||
|
assert.ok(headers.copyFrom);
|
||||||
|
const metrics = headers.copyFrom;
|
||||||
|
assert.equal(metrics.size, 57);
|
||||||
|
assert.equal(metrics.format, 'CSV');
|
||||||
|
assert.equal(metrics.time, response.time);
|
||||||
|
assert.equal(metrics.rows, response.total_rows);
|
||||||
|
assert.equal(metrics.gzip, true);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
describe('copy-endpoints timeout', function() {
|
||||||
|
it('should fail with copyfrom and timeout', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: '/api/v1/sql?q=set statement_timeout = 10',
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},
|
||||||
|
function(err) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"
|
||||||
|
}),
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},
|
||||||
|
{
|
||||||
|
status: 429,
|
||||||
|
headers: { 'Content-Type': 'application/json; charset=utf-8' }
|
||||||
|
},
|
||||||
|
function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.deepEqual(JSON.parse(res.body), {
|
||||||
|
error: [
|
||||||
|
'You are over platform\'s limits. Please contact us to know more details'
|
||||||
|
],
|
||||||
|
context: 'limit',
|
||||||
|
detail: 'datasource'
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql?q=set statement_timeout = 2000",
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},
|
||||||
|
done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should fail with copyto and timeout', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: '/api/v1/sql?q=set statement_timeout = 20',
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},
|
||||||
|
function(err) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyto?" + querystring.stringify({
|
||||||
|
q: 'COPY populated_places_simple_reduced TO STDOUT',
|
||||||
|
filename: '/tmp/output.dmp'
|
||||||
|
}),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
const error = {
|
||||||
|
error:["You are over platform's limits. Please contact us to know more details"],
|
||||||
|
context:"limit",
|
||||||
|
detail:"datasource"
|
||||||
|
};
|
||||||
|
const expectedError = res.body.substring(res.body.length - JSON.stringify(error).length);
|
||||||
|
assert.deepEqual(JSON.parse(expectedError), error);
|
||||||
|
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql?q=set statement_timeout = 2000",
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},
|
||||||
|
done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
describe('copy-endpoints db connections', function() {
|
||||||
|
before(function() {
|
||||||
|
this.db_pool_size = global.settings.db_pool_size;
|
||||||
|
global.settings.db_pool_size = 1;
|
||||||
|
});
|
||||||
|
|
||||||
|
after(function() {
|
||||||
|
global.settings.db_pool_size = this.db_pool_size;
|
||||||
|
});
|
||||||
|
|
||||||
|
it('copyfrom', function(done) {
|
||||||
|
const query = "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)";
|
||||||
|
function doCopyFrom() {
|
||||||
|
return new Promise(resolve => {
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyfrom?" + querystring.stringify({
|
||||||
|
q: query
|
||||||
|
}),
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
const response = JSON.parse(res.body);
|
||||||
|
assert.ok(response.time);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Promise.all([doCopyFrom(), doCopyFrom(), doCopyFrom()]).then(function() {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('copyto', function(done) {
|
||||||
|
function doCopyTo() {
|
||||||
|
return new Promise(resolve => {
|
||||||
|
assert.response(server, {
|
||||||
|
url: "/api/v1/sql/copyto?" + querystring.stringify({
|
||||||
|
q: 'COPY copy_endpoints_test TO STDOUT',
|
||||||
|
filename: '/tmp/output.dmp'
|
||||||
|
}),
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'GET'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.ok(res.body);
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Promise.all([doCopyTo(), doCopyTo(), doCopyTo()]).then(function() {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('copy-endpoints client disconnection', function() {
|
||||||
|
// Give it enough time to connect and issue the query
|
||||||
|
// but not too much so as to disconnect in the middle of the query.
|
||||||
|
const client_disconnect_timeout = 10;
|
||||||
|
|
||||||
|
before(function() {
|
||||||
|
this.db_pool_size = global.settings.db_pool_size;
|
||||||
|
global.settings.db_pool_size = 1;
|
||||||
|
});
|
||||||
|
|
||||||
|
after(function() {
|
||||||
|
global.settings.db_pool_size = this.db_pool_size;
|
||||||
|
});
|
||||||
|
|
||||||
|
var assertCanReuseConnection = function (done) {
|
||||||
|
assert.response(server, {
|
||||||
|
url: '/api/v1/sql?' + querystring.stringify({
|
||||||
|
q: 'SELECT 1',
|
||||||
|
}),
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET'
|
||||||
|
}, {}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
assert.ok(res.statusCode === 200);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
it('COPY TO returns the connection to the pool if the client disconnects', function(done) {
|
||||||
|
assert.response(server, {
|
||||||
|
url: '/api/v1/sql/copyto?' + querystring.stringify({
|
||||||
|
q: 'COPY (SELECT * FROM generate_series(1, 100000)) TO STDOUT',
|
||||||
|
}),
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'GET',
|
||||||
|
timeout: client_disconnect_timeout
|
||||||
|
}, {}, function(err) {
|
||||||
|
// we're expecting a timeout error
|
||||||
|
assert.ok(err);
|
||||||
|
assert.ok(err.code === 'ETIMEDOUT' || err.code === 'ESOCKETTIMEDOUT');
|
||||||
|
assertCanReuseConnection(done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('COPY FROM returns the connection to the pool if the client disconnects', function(done) {
|
||||||
|
assert.response(server, {
|
||||||
|
url: '/api/v1/sql/copyfrom?' + querystring.stringify({
|
||||||
|
q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)",
|
||||||
|
}),
|
||||||
|
headers: { host: 'vizzuality.cartodb.com' },
|
||||||
|
method: 'POST',
|
||||||
|
data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'),
|
||||||
|
timeout: client_disconnect_timeout
|
||||||
|
}, {}, function(err) {
|
||||||
|
// we're expecting a timeout error
|
||||||
|
assert.ok(err);
|
||||||
|
assert.ok(err.code === 'ETIMEDOUT' || err.code === 'ESOCKETTIMEDOUT');
|
||||||
|
assertCanReuseConnection(done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
});
|
24
test/acceptance/query-multipart.js
Normal file
24
test/acceptance/query-multipart.js
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
require('../helper');
|
||||||
|
|
||||||
|
const server = require('../../app/server')();
|
||||||
|
const assert = require('../support/assert');
|
||||||
|
|
||||||
|
describe('query-multipart', function() {
|
||||||
|
it('make query from a multipart form', function(done){
|
||||||
|
assert.response(server, {
|
||||||
|
url: '/api/v1/sql',
|
||||||
|
formData: {
|
||||||
|
q: 'SELECT 2 as n'
|
||||||
|
},
|
||||||
|
headers: {host: 'vizzuality.cartodb.com'},
|
||||||
|
method: 'POST'
|
||||||
|
},{}, function(err, res) {
|
||||||
|
assert.ifError(err);
|
||||||
|
const response = JSON.parse(res.body);
|
||||||
|
assert.equal(typeof(response.time) !== 'undefined', true);
|
||||||
|
assert.strictEqual(response.total_rows, 1);
|
||||||
|
assert.deepStrictEqual(response.rows, [{n:2}]);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
@ -48,6 +48,10 @@ assert.response = function(server, req, res, callback) {
|
|||||||
requestParams.body = req.body || req.data;
|
requestParams.body = req.body || req.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (req.formData) {
|
||||||
|
requestParams.formData = req.formData;
|
||||||
|
}
|
||||||
|
|
||||||
debug('Request params', requestParams);
|
debug('Request params', requestParams);
|
||||||
request(requestParams, function assert$response$requestHandler(error, response, body) {
|
request(requestParams, function assert$response$requestHandler(error, response, body) {
|
||||||
debug('Request response', error);
|
debug('Request response', error);
|
||||||
|
7
test/support/csv/copy_test_table.csv
Normal file
7
test/support/csv/copy_test_table.csv
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
id,name
|
||||||
|
11,Paul
|
||||||
|
12,Peter
|
||||||
|
13,Matthew
|
||||||
|
14,
|
||||||
|
15,James
|
||||||
|
16,John
|
|
BIN
test/support/csv/copy_test_table.csv.gz
Normal file
BIN
test/support/csv/copy_test_table.csv.gz
Normal file
Binary file not shown.
@ -217,3 +217,21 @@ INSERT INTO CDB_TableMetadata (tabname, updated_at) VALUES ('scoped_table_1'::re
|
|||||||
|
|
||||||
GRANT SELECT ON CDB_TableMetadata TO :TESTUSER;
|
GRANT SELECT ON CDB_TableMetadata TO :TESTUSER;
|
||||||
GRANT SELECT ON CDB_TableMetadata TO test_cartodb_user_2;
|
GRANT SELECT ON CDB_TableMetadata TO test_cartodb_user_2;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS copy_endpoints_test;
|
||||||
|
CREATE TABLE copy_endpoints_test (
|
||||||
|
id integer,
|
||||||
|
name text,
|
||||||
|
age integer default 10
|
||||||
|
);
|
||||||
|
GRANT ALL ON TABLE copy_endpoints_test TO :TESTUSER;
|
||||||
|
GRANT ALL ON TABLE copy_endpoints_test TO :PUBLICUSER;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS copy_endpoints_test2;
|
||||||
|
CREATE TABLE copy_endpoints_test2 (
|
||||||
|
id integer,
|
||||||
|
name text,
|
||||||
|
age integer default 10
|
||||||
|
);
|
||||||
|
GRANT ALL ON TABLE copy_endpoints_test2 TO :TESTUSER;
|
||||||
|
GRANT ALL ON TABLE copy_endpoints_test2 TO :PUBLICUSER;
|
||||||
|
64
test/unit/query_info.test.js
Normal file
64
test/unit/query_info.test.js
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
const assert = require('assert');
|
||||||
|
const queryInfo = require('../../app/utils/query_info');
|
||||||
|
|
||||||
|
describe('query info', function () {
|
||||||
|
describe('copy format', function () {
|
||||||
|
describe('csv', function () {
|
||||||
|
const validQueries = [
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)",
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)",
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV , DELIMITER ',', HEADER true)",
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV)",
|
||||||
|
"COPY copy_endpoints_test FROM STDIN WITH(FORMAT csv,HEADER true)"
|
||||||
|
];
|
||||||
|
|
||||||
|
validQueries.forEach(query => {
|
||||||
|
it(query, function() {
|
||||||
|
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||||
|
assert.equal(result, 'CSV');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('text', function() {
|
||||||
|
const validQueries = [
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT TEXT)",
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN",
|
||||||
|
];
|
||||||
|
|
||||||
|
validQueries.forEach(query => {
|
||||||
|
it(query, function() {
|
||||||
|
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||||
|
assert.equal(result, 'TEXT');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('binary', function() {
|
||||||
|
const validQueries = [
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT BINARY)",
|
||||||
|
];
|
||||||
|
|
||||||
|
validQueries.forEach(query => {
|
||||||
|
it(query, function() {
|
||||||
|
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||||
|
assert.equal(result, 'BINARY');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('should fail', function() {
|
||||||
|
const validQueries = [
|
||||||
|
"COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT ERROR)",
|
||||||
|
"SELECT * from copy_endpoints_test"
|
||||||
|
];
|
||||||
|
|
||||||
|
validQueries.forEach(query => {
|
||||||
|
it(query, function() {
|
||||||
|
const result = queryInfo.getFormatFromCopyQuery(query);
|
||||||
|
assert.strictEqual(result, false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
Loading…
Reference in New Issue
Block a user