diff --git a/.gitignore b/.gitignore index ed26bf95..147f2348 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ test/test.log test/acceptance/oauth/venv/* coverage/ npm-debug.log +log/*.log +yarn.lock \ No newline at end of file diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js new file mode 100644 index 00000000..1bbc8bd9 --- /dev/null +++ b/app/controllers/copy_controller.js @@ -0,0 +1,152 @@ +'use strict'; + +const userMiddleware = require('../middlewares/user'); +const errorMiddleware = require('../middlewares/error'); +const authorizationMiddleware = require('../middlewares/authorization'); +const connectionParamsMiddleware = require('../middlewares/connection-params'); +const timeoutLimitsMiddleware = require('../middlewares/timeout-limits'); +const { initializeProfilerMiddleware } = require('../middlewares/profiler'); +const rateLimitsMiddleware = require('../middlewares/rate-limit'); +const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware; +const Logger = require('../services/logger'); +const errorHandlerFactory = require('../services/error_handler_factory'); +const streamCopy = require('../services/stream_copy'); + +function CopyController(metadataBackend, userDatabaseService, userLimitsService, statsClient) { + this.metadataBackend = metadataBackend; + this.userDatabaseService = userDatabaseService; + this.userLimitsService = userLimitsService; + this.statsClient = statsClient; + + this.logger = new Logger(global.settings.dataIngestionLogPath, 'data-ingestion'); +} + +CopyController.prototype.route = function (app) { + const { base_url } = global.settings; + + const copyFromMiddlewares = endpointGroup => { + return [ + initializeProfilerMiddleware('copyfrom'), + userMiddleware(this.metadataBackend), + rateLimitsMiddleware(this.userLimitsService, endpointGroup), + authorizationMiddleware(this.metadataBackend), + connectionParamsMiddleware(this.userDatabaseService), + timeoutLimitsMiddleware(this.metadataBackend), + validateCopyQuery(), + handleCopyFrom(this.logger), + errorHandler(), + errorMiddleware() + ]; + }; + + const copyToMiddlewares = endpointGroup => { + return [ + initializeProfilerMiddleware('copyto'), + userMiddleware(this.metadataBackend), + rateLimitsMiddleware(this.userLimitsService, endpointGroup), + authorizationMiddleware(this.metadataBackend), + connectionParamsMiddleware(this.userDatabaseService), + timeoutLimitsMiddleware(this.metadataBackend), + validateCopyQuery(), + handleCopyTo(this.logger), + errorHandler(), + errorMiddleware() + ]; + }; + + app.post(`${base_url}/sql/copyfrom`, copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_FROM)); + app.get(`${base_url}/sql/copyto`, copyToMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.COPY_TO)); +}; + + +function handleCopyTo (logger) { + return function handleCopyToMiddleware (req, res, next) { + const filename = req.query.filename || 'carto-sql-copyto.dmp'; + + res.header("Content-Disposition", `attachment; filename=${encodeURIComponent(filename)}`); + res.header("Content-Type", "application/octet-stream"); + + streamCopy.to( + res, + req.query.q, + res.locals.userDbParams, + res.locals.user, + logger, + function(err) { + if (err) { + return next(err); + } + + // this is a especial endpoint + // the data from postgres is streamed to response directly + } + ); + }; +} + +function handleCopyFrom (logger) { + return function handleCopyFromMiddleware (req, res, next) { + streamCopy.from( + req, + req.query.q, + res.locals.userDbParams, + res.locals.user, + req.get('content-encoding') === 'gzip', + logger, + function(err, metrics) { // TODO: remove when data-ingestion log works: {time, rows} + if (err) { + return next(err); + } + + // TODO: remove when data-ingestion log works + const { time, rows, type, format, gzip, size } = metrics; + + if (!time || !rows) { + return next(new Error("No rows copied")); + } + + // TODO: remove when data-ingestion log works + if (req.profiler) { + req.profiler.add({copyFrom: { type, format, gzip, size, rows, time }}); + res.header('X-SQLAPI-Profiler', req.profiler.toJSONString()); + } + + res.send({ + time, + total_rows: rows + }); + } + ); + }; +} + +function validateCopyQuery () { + return function validateCopyQueryMiddleware (req, res, next) { + const sql = req.query.q; + + if (!sql) { + next(new Error("SQL is missing")); + } + + // Only accept SQL that starts with 'COPY' + if (!sql.toUpperCase().startsWith("COPY ")) { + next(new Error("SQL must start with COPY")); + } + + next(); + }; +} + +function errorHandler () { + return function errorHandlerMiddleware (err, req, res, next) { + if (res.headersSent) { + const errorHandler = errorHandlerFactory(err); + res.write(JSON.stringify(errorHandler.getResponse())); + res.end(); + } else { + return next(err); + } + }; +} + +module.exports = CopyController; diff --git a/app/controllers/job_controller.js b/app/controllers/job_controller.js index ecfca95b..27da106e 100644 --- a/app/controllers/job_controller.js +++ b/app/controllers/job_controller.js @@ -1,5 +1,6 @@ const util = require('util'); +const bodyParserMiddleware = require('../middlewares/body-parser'); const userMiddleware = require('../middlewares/user'); const { initializeProfilerMiddleware, finishProfilerMiddleware } = require('../middlewares/profiler'); const authorizationMiddleware = require('../middlewares/authorization'); @@ -30,21 +31,25 @@ JobController.prototype.route = function (app) { app.get( `${base_url}/jobs-wip`, + bodyParserMiddleware(), listWorkInProgressJobs(this.jobService), sendResponse(), errorMiddleware() ); app.post( `${base_url}/sql/job`, + bodyParserMiddleware(), checkBodyPayloadSize(), jobMiddlewares('create', createJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_CREATE) ); app.get( `${base_url}/sql/job/:job_id`, + bodyParserMiddleware(), jobMiddlewares('retrieve', getJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_GET) ); app.delete( `${base_url}/sql/job/:job_id`, + bodyParserMiddleware(), jobMiddlewares('cancel', cancelJob, RATE_LIMIT_ENDPOINTS_GROUPS.JOB_DELETE) ); }; diff --git a/app/controllers/query_controller.js b/app/controllers/query_controller.js index 5a642423..3c959264 100644 --- a/app/controllers/query_controller.js +++ b/app/controllers/query_controller.js @@ -12,6 +12,7 @@ var formats = require('../models/formats'); var sanitize_filename = require('../utils/filename_sanitizer'); var getContentDisposition = require('../utils/content_disposition'); +const bodyParserMiddleware = require('../middlewares/body-parser'); const userMiddleware = require('../middlewares/user'); const errorMiddleware = require('../middlewares/error'); const authorizationMiddleware = require('../middlewares/authorization'); @@ -37,6 +38,7 @@ QueryController.prototype.route = function (app) { const queryMiddlewares = endpointGroup => { return [ + bodyParserMiddleware(), initializeProfilerMiddleware('query'), userMiddleware(this.metadataBackend), rateLimitsMiddleware(this.userLimitsService, endpointGroup), diff --git a/app/middlewares/rate-limit.js b/app/middlewares/rate-limit.js index 3e4d2f47..4bf7f4c2 100644 --- a/app/middlewares/rate-limit.js +++ b/app/middlewares/rate-limit.js @@ -5,7 +5,9 @@ const RATE_LIMIT_ENDPOINTS_GROUPS = { QUERY_FORMAT: 'query_format', JOB_CREATE: 'job_create', JOB_GET: 'job_get', - JOB_DELETE: 'job_delete' + JOB_DELETE: 'job_delete', + COPY_FROM: 'copy_from', + COPY_TO: 'copy_to' }; diff --git a/app/server.js b/app/server.js index 92a777a8..dbaedc78 100644 --- a/app/server.js +++ b/app/server.js @@ -15,7 +15,6 @@ // var express = require('express'); -var bodyParser = require('./middlewares/body-parser'); var Profiler = require('./stats/profiler-proxy'); var _ = require('underscore'); var TableCacheFactory = require('./utils/table_cache_factory'); @@ -34,6 +33,7 @@ var cors = require('./middlewares/cors'); var GenericController = require('./controllers/generic_controller'); var QueryController = require('./controllers/query_controller'); +var CopyController = require('./controllers/copy_controller'); var JobController = require('./controllers/job_controller'); var CacheStatusController = require('./controllers/cache_status_controller'); var HealthCheckController = require('./controllers/health_check_controller'); @@ -138,7 +138,6 @@ function App(statsClient) { }); } - app.use(bodyParser()); app.enable('jsonp callback'); app.set("trust proxy", true); app.disable('x-powered-by'); @@ -173,6 +172,13 @@ function App(statsClient) { ); queryController.route(app); + var copyController = new CopyController( + metadataBackend, + userDatabaseService, + userLimitsService + ); + copyController.route(app); + var jobController = new JobController( metadataBackend, userDatabaseService, diff --git a/app/services/logger.js b/app/services/logger.js new file mode 100644 index 00000000..87b6cba1 --- /dev/null +++ b/app/services/logger.js @@ -0,0 +1,33 @@ +'use strict'; + +const bunyan = require('bunyan'); + +class Logger { + constructor (path, name) { + const stream = { + level: process.env.NODE_ENV === 'test' ? 'fatal' : 'info' + }; + + if (path) { + stream.path = path; + } else { + stream.stream = process.stdout; + } + + this.path = path; + this.logger = bunyan.createLogger({ + name, + streams: [stream] + }); + } + + info (log, message) { + this.logger.info(log, message); + } + + warn (log, message) { + this.logger.warn(log, message); + } +} + +module.exports = Logger; diff --git a/app/services/stream_copy.js b/app/services/stream_copy.js new file mode 100644 index 00000000..f9bd2a70 --- /dev/null +++ b/app/services/stream_copy.js @@ -0,0 +1,132 @@ +const zlib = require('zlib'); +const PSQL = require('cartodb-psql'); +const copyTo = require('pg-copy-streams').to; +const copyFrom = require('pg-copy-streams').from; +const StreamCopyMetrics = require('./stream_copy_metrics'); +const { Client } = require('pg'); + +module.exports = { + to (res, sql, userDbParams, user, logger, cb) { + let metrics = new StreamCopyMetrics(logger, 'copyto', sql, user); + + const pg = new PSQL(userDbParams); + pg.connect(function (err, client, done) { + if (err) { + return cb(err); + } + + let responseEnded = false; + let connectionClosedByClient = false; + const copyToStream = copyTo(sql); + const pgstream = client.query(copyToStream); + + res + .on('error', err => { + metrics.end(null, err); + pgstream.unpipe(res); + done(); + return cb(err); + }) + .on('close', () => { + if (!responseEnded) { + connectionClosedByClient = true; + // Cancel the running COPY TO query + // See https://www.postgresql.org/docs/9.5/static/protocol-flow.html#PROTOCOL-COPY + const runningClient = client; + const cancelingClient = new Client(runningClient.connectionParameters); + cancelingClient.cancel(runningClient, pgstream); + + const err = new Error('Connection closed by client'); + metrics.end(null, err); + pgstream.unpipe(res); + // see https://node-postgres.com/api/pool#releasecallback + done(err); + return cb(err); + } + }) + .on('end', () => responseEnded = true); + + pgstream + .on('error', err => { + if (!connectionClosedByClient) { + metrics.end(null, err); + pgstream.unpipe(res); + done(err); + return cb(err); + } + }) + .on('data', data => metrics.addSize(data.length)) + .on('end', () => { + metrics.end(copyToStream.rowCount); + done(); + return cb(null, metrics); + }) + .pipe(res); + }); + }, + + from (req, sql, userDbParams, user, gzip, logger, cb) { + let metrics = new StreamCopyMetrics(logger, 'copyfrom', sql, user, gzip); + + const pg = new PSQL(userDbParams); + pg.connect(function (err, client, done) { + if (err) { + return cb(err); + } + + let copyFromStream = copyFrom(sql); + const pgstream = client.query(copyFromStream); + pgstream + .on('error', err => { + metrics.end(null, err); + req.unpipe(pgstream); + done(); + return cb(err); + }) + .on('end', function () { + metrics.end(copyFromStream.rowCount); + done(); + return cb(null, metrics); + }); + + let requestEnded = false; + + req + .on('error', err => { + metrics.end(null, err); + req.unpipe(pgstream); + pgstream.end(); + done(); + return cb(err); + }) + .on('close', () => { + if (!requestEnded) { + const err = new Error('Connection closed by client'); + metrics.end(null, err); + const connection = client.connection; + connection.sendCopyFail('CARTO SQL API: Connection closed by client'); + req.unpipe(pgstream); + done(); + return cb(err); + } + }) + .on('data', data => { + if (gzip) { + metrics.addGzipSize(data.length); + } else { + metrics.addSize(data.length); + } + }) + .on('end', () => requestEnded = true); + + if (gzip) { + req + .pipe(zlib.createGunzip()) + .on('data', data => metrics.addSize(data.length)) + .pipe(pgstream); + } else { + req.pipe(pgstream); + } + }); + } +}; diff --git a/app/services/stream_copy_metrics.js b/app/services/stream_copy_metrics.js new file mode 100644 index 00000000..24143a0d --- /dev/null +++ b/app/services/stream_copy_metrics.js @@ -0,0 +1,79 @@ +const { getFormatFromCopyQuery } = require('../utils/query_info'); + +module.exports = class StreamCopyMetrics { + constructor(logger, type, sql, user, gzip = null) { + this.logger = logger; + + this.type = type; + this.format = getFormatFromCopyQuery(sql); + this.gzip = gzip; + this.username = user; + this.size = 0; + this.gzipSize = 0; + this.rows = 0; + + this.startTime = new Date(); + this.endTime = null; + this.time = null; + + this.error = null; + + this.ended = false; + } + + addSize(size) { + this.size += size; + } + + addGzipSize(size) { + this.gzipSize += size; + } + + end(rows = null, error = null) { + if (this.ended) { + return; + } + + this.ended = true; + + if (Number.isInteger(rows)) { + this.rows = rows; + } + + if (error instanceof Error) { + this.error = error; + } + + this.endTime = new Date(); + this.time = (this.endTime.getTime() - this.startTime.getTime()) / 1000; + + this._log( + this.startTime.toISOString(), + this.gzip && this.gzipSize ? this.gzipSize : null, + this.error ? this.error.message : null + ); + } + + _log(timestamp, gzipSize = null, errorMessage = null) { + let logData = { + type: this.type, + format: this.format, + size: this.size, + rows: this.rows, + gzip: this.gzip, + username: this.username, + time: this.time, + timestamp + }; + + if (gzipSize) { + logData.gzipSize = gzipSize; + } + + if (errorMessage) { + logData.error = errorMessage; + } + + this.logger.info(logData); + } +}; diff --git a/app/utils/query_info.js b/app/utils/query_info.js new file mode 100644 index 00000000..c605c785 --- /dev/null +++ b/app/utils/query_info.js @@ -0,0 +1,29 @@ +const COPY_FORMATS = ['TEXT', 'CSV', 'BINARY']; + +module.exports = { + getFormatFromCopyQuery(copyQuery) { + let format = 'TEXT'; // Postgres default format + + copyQuery = copyQuery.toUpperCase(); + + if (!copyQuery.startsWith("COPY ")) { + return false; + } + + if(copyQuery.includes(' WITH') && copyQuery.includes('FORMAT ')) { + const regex = /\bFORMAT\s+(\w+)/; + const result = regex.exec(copyQuery); + + if (result && result.length === 2) { + if (COPY_FORMATS.includes(result[1])) { + format = result[1]; + format = format.toUpperCase(); + } else { + format = false; + } + } + } + + return format; + } +}; diff --git a/batch/batch-logger.js b/batch/batch-logger.js index 05dc54f8..ef132bd2 100644 --- a/batch/batch-logger.js +++ b/batch/batch-logger.js @@ -1,29 +1,19 @@ 'use strict'; -var bunyan = require('bunyan'); +const Logger = require('../app/services/logger'); -function BatchLogger (path) { - var stream = { - level: process.env.NODE_ENV === 'test' ? 'fatal' : 'info' - }; - if (path) { - stream.path = path; - } else { - stream.stream = process.stdout; +class BatchLogger extends Logger { + constructor (path, name) { + super(path, name); + } + + log (job) { + return job.log(this.logger); + } + + reopenFileStreams () { + this.logger.reopenFileStreams(); } - this.path = path; - this.logger = bunyan.createLogger({ - name: 'batch-queries', - streams: [stream] - }); } module.exports = BatchLogger; - -BatchLogger.prototype.log = function (job) { - return job.log(this.logger); -}; - -BatchLogger.prototype.reopenFileStreams = function () { - this.logger.reopenFileStreams(); -}; diff --git a/batch/index.js b/batch/index.js index 694dfb42..6ef5422d 100644 --- a/batch/index.js +++ b/batch/index.js @@ -24,7 +24,7 @@ module.exports = function batchFactory (metadataBackend, redisPool, name, statsd var jobCanceller = new JobCanceller(); var jobService = new JobService(jobBackend, jobCanceller); var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, metadataBackend, statsdClient); - var logger = new BatchLogger(loggerPath); + var logger = new BatchLogger(loggerPath, 'batch-queries'); return new Batch( name, diff --git a/config/environments/development.js.example b/config/environments/development.js.example index adc7c001..abaa2b94 100644 --- a/config/environments/development.js.example +++ b/config/environments/development.js.example @@ -122,3 +122,4 @@ module.exports.ratelimits = { } module.exports.validatePGEntitiesAccess = false; +module.exports.dataIngestionLogPath = 'logs/data-ingestion.log'; diff --git a/config/environments/production.js.example b/config/environments/production.js.example index 0ee13ad2..45e00732 100644 --- a/config/environments/production.js.example +++ b/config/environments/production.js.example @@ -126,3 +126,4 @@ module.exports.ratelimits = { } module.exports.validatePGEntitiesAccess = false; +module.exports.dataIngestionLogPath = 'logs/data-ingestion.log'; diff --git a/config/environments/staging.js.example b/config/environments/staging.js.example index 99b336bd..2c5929a6 100644 --- a/config/environments/staging.js.example +++ b/config/environments/staging.js.example @@ -123,3 +123,4 @@ module.exports.ratelimits = { } module.exports.validatePGEntitiesAccess = false; +module.exports.dataIngestionLogPath = 'logs/data-ingestion.log'; diff --git a/config/environments/test.js.example b/config/environments/test.js.example index e41f1d5b..42255fb2 100644 --- a/config/environments/test.js.example +++ b/config/environments/test.js.example @@ -123,3 +123,4 @@ module.exports.ratelimits = { } module.exports.validatePGEntitiesAccess = false; +module.exports.dataIngestionLogPath = 'logs/data-ingestion.log'; diff --git a/doc/API.md b/doc/API.md index aba951bc..f367730e 100644 --- a/doc/API.md +++ b/doc/API.md @@ -17,6 +17,7 @@ Remember that in order to access, read or modify data in private tables, you wil * [Making calls to the SQL API](making_calls.md) * [Creating Tables with the SQL API](creating_tables.md) * [Batch Queries](batch_queries.md) +* [Copy Queries](copy_queries.md) * [Handling Geospatial Data](handling_geospatial_data.md) * [Query Optimizations](query_optimizations.md) * [API Version Vumber](version.md) diff --git a/doc/copy_queries.md b/doc/copy_queries.md new file mode 100644 index 00000000..b08eec8c --- /dev/null +++ b/doc/copy_queries.md @@ -0,0 +1,193 @@ +# Copy Queries + +Copy queries allow you to use the [PostgreSQL copy command](https://www.postgresql.org/docs/10/static/sql-copy.html) for efficient streaming of data to and from CARTO. + +The support for copy is split across two API end points: + +* `http://{username}.carto.com/api/v2/sql/copyfrom` for uploading data to CARTO +* `http://{username}.carto.com/api/v2/sql/copyto` for exporting data out of CARTO + +## Copy From + +The PostgreSQL `COPY` command is extremely fast, but requires very precise inputs: + +* A `COPY` command that describes the table and columns of the upload file, and the format of the file. +* An upload file that exactly matches the `COPY` command. + +If the `COPY` command, the supplied file, and the target table do not all match, the upload will fail. + +"Copy from" copies data "from" your file, "to" CARTO. "Copy from" uses chunked encoding (`Transfer-Encoding: chunked`) to stream an upload file to the server. This avoids limitations around file size and any need for temporary storage: the data travels from your file straight into the database. + +Parameter | Description +--- | --- +`api_key` | a write-enabled key +`q` | the `COPY` command to load the data + +**The actual COPY file content must be sent as the body of the POST request.** + +Composing a chunked POST is moderately complicated, so most developers will use a tool or scripting language to upload data to CARTO via "copy from". + +### Example + +For a table to be readable by CARTO, it must have a minimum of three columns with specific data types: + +* `cartodb_id`, a `serial` primary key +* `the_geom`, a `geometry` in the ESPG:4326 projection (aka long/lat) +* `the_geom_webmercator`, a `geometry` in the ESPG:3857 projection (aka web mercator) + +Creating a new CARTO table with all the right triggers and columns can be tricky, so here is an example: + + -- create the table using the *required* columns and a + -- couple more + CREATE TABLE upload_example ( + the_geom geometry, + name text, + age integer + ); + + -- adds the 'cartodb_id' and 'the_geom_webmercator' + -- adds the required triggers and indexes + SELECT CDB_CartodbfyTable('upload_example'); + +Now you are ready to upload your file. Suppose you have a CSV file like this: + + the_geom,name,age + SRID=4326;POINT(-126 54),North West,89 + SRID=4326;POINT(-96 34),South East,99 + SRID=4326;POINT(-6 -25),Souther Easter,124 + +The `COPY` command to upload this file needs to specify the file format (CSV), the fact that there is a header line before the actual data begins, and to enumerate the columns that are in the file so they can be matched to the table columns. + + COPY upload_example (the_geom, name, age) + FROM STDIN WITH (FORMAT csv, HEADER true) + +The `FROM STDIN` option tells the database that the input will come from a data stream, and the SQL API will read our uploaded file and use it to feed the stream. + +To actually run upload, you will need a tool or script that can generate a chunked POST, so here are a few examples in different languages. + +### CURL Example + +The [curl](https://curl.haxx.se/) utility makes it easy to run web requests from the command-line, and supports chunked POST upload, so it can feed the `copyfrom` end point. + +Assuming that you have already created the table, and that the CSV file is named "upload_example.csv": + + curl -X POST \ + -H "Transfer-Encoding: chunked" \ + -H "Content-Type: application/octet-stream" \ + --data-binary @upload_example.csv \ + "http://{username}.carto.com/api/v2/sql/copyfrom?api_key={api_key}&q=COPY+upload_example+(the_geom,+name,+age)+FROM+STDIN+WITH+(FORMAT+csv,+HEADER+true)" + +To upload a larger file, using compression for a faster transfer, first compress the file, and then upload it with the content encoding set: + + curl -X POST \ + -H "Content-Encoding: gzip" \ + -H "Transfer-Encoding: chunked" \ + -H "Content-Type: application/octet-stream" \ + --data-binary @upload_example.csv.gz \ + "http://{username}.carto.com/api/v2/sql/copyfrom?api_key={api_key}&q=COPY+upload_example+(the_geom,+name,+age)+FROM+STDIN+WITH+(FORMAT+csv,+HEADER+true)" + + +### Python Example + +The [Requests](http://docs.python-requests.org/en/master/user/quickstart/) library for HTTP makes doing a file upload relatively terse. + +```python +import requests + +api_key = {api_key} +username = {api_key} +upload_file = 'upload_example.csv' +q = "COPY upload_example (the_geom, name, age) FROM STDIN WITH (FORMAT csv, HEADER true)" + +url = "http://%s.carto.com/api/v2/sql/copyfrom" % username +with open(upload_file, 'rb') as f: + r = requests.post(url, params={'api_key': api_key, 'q': q}, data=f, stream=True) + + if r.status_code != 200: + print(r.text) + else: + status = r.json() + print("Success: %s rows imported" % status['total_rows']) +``` + +A slightly more sophisticated script could read the headers from the CSV and compose the `COPY` command on the fly. + +### Response Format + +A successful upload will return with status code 200, and a small JSON with information about the upload. + + {"time":0.046,"total_rows":10} + +A failed upload will return with status code 400 and a larger JSON with the PostgreSQL error string, and a stack trace from the SQL API. + + {"error":["Unexpected field"]} + +## Copy To + +"Copy to" copies data "to" your desired output file, "from" CARTO. + +Using the `copyto` end point to extract data bypasses the usual JSON formatting applied by the SQL API, so it can dump more data, faster. However, it has the restriction that it will only output in a handful of formats: + +* PgSQL [text format](https://www.postgresql.org/docs/10/static/sql-copy.html#id-1.9.3.52.9.2), +* [CSV](https://www.postgresql.org/docs/10/static/sql-copy.html#id-1.9.3.52.9.3), and +* PgSQL [binary format](https://www.postgresql.org/docs/10/static/sql-copy.html#id-1.9.3.52.9.4). + +"Copy to" is a simple HTTP GET end point, so any tool or language can be easily used to download data, supplying the following parameters in the URL. + +Parameter | Description +--- | --- +`api_key` | your API key for reading non-public tables +`q` | the `COPY` command to extract the data +`filename` | filename to put in the "Content-disposition" HTTP header, useful for tools that automatically save the download to a file name + + +### CURL Example + +The SQL to start a "copy to" can specify + +* a table to read, +* a table and subset of columns to read, or +* an arbitrary SQL query to execute and read. + +For our example, we'll read back just the three columns we originally loaded: + + COPY upload_example (the_geom, name, age) TO stdout WITH (FORMAT csv, HEADER true) + +The SQL needs to be URL-encoded before being embedded in the CURL command, so the final result looks like this: + + curl \ + --output upload_example_dl.csv \ + --compressed \ + "http://{username}.carto.com/api/v2/sql/copyto?q=COPY+upload_example+(the_geom,name,age)+TO+stdout+WITH(FORMAT+csv,HEADER+true)&api_key={api_key}" + +### Python Example + +The Python to "copy to" is very simple, because the HTTP call is a simple get. The only complexity in this example is at the end, where the result is streamed back block-by-block, to avoid pulling the entire download into memory before writing to file. + +```python +import requests +import re + +api_key = {api_key} +username = {api_key} +download_file = 'upload_example_dl.csv' +q = "COPY upload_example (the_geom, name, age) TO stdout WITH (FORMAT csv, HEADER true)" + +# request the download, specifying desired file name +url = "http://%s.carto.com/api/v2/sql/copyto" % username +r = requests.get(url, params={'api_key': api_key, 'q': q, 'filename': download_file}, stream=True) +r.raise_for_status() + +# read save file name from response headers +d = r.headers['content-disposition'] +savefilename = re.findall("filename=(.+)", d) + +if len(savefilename) > 0: + with open(savefilename[0], 'wb') as handle: + for block in r.iter_content(1024): + handle.write(block) + print("Downloaded to: %s" % savefilename) +else: + print("Error: could not find read file name from headers") +``` + diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index e5e3abae..de663306 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -180,9 +180,9 @@ "resolved": "https://registry.npmjs.org/cartodb-query-tables/-/cartodb-query-tables-0.2.0.tgz" }, "cartodb-redis": { - "version": "1.0.0", - "from": "cartodb-redis@1.0.0", - "resolved": "https://registry.npmjs.org/cartodb-redis/-/cartodb-redis-1.0.0.tgz" + "version": "1.0.1", + "from": "git://github.com/CartoDB/node-cartodb-redis.git#remove-auth-fallback", + "resolved": "git://github.com/CartoDB/node-cartodb-redis.git#c25114a149ad3522e0e5d6850dca45e67332f98f" }, "caseless": { "version": "0.11.0", @@ -851,6 +851,11 @@ "from": "pg-connection-string@0.1.3", "resolved": "https://registry.npmjs.org/pg-connection-string/-/pg-connection-string-0.1.3.tgz" }, + "pg-copy-streams": { + "version": "1.2.0", + "from": "cartodb/node-pg-copy-streams#v1.2.0-carto.1", + "resolved": "git://github.com/cartodb/node-pg-copy-streams.git#d7e5c1383ff9b43890f2c37bc38cfed1afc2523f" + }, "pg-int8": { "version": "1.0.1", "from": "pg-int8@1.0.1", diff --git a/package.json b/package.json index a4613260..c6591248 100644 --- a/package.json +++ b/package.json @@ -28,10 +28,11 @@ "express": "~4.13.3", "log4js": "cartodb/log4js-node#cdb", "lru-cache": "~2.5.0", - "multer": "~1.2.0", + "multer": "~1.2.0", "node-statsd": "~0.0.7", "node-uuid": "^1.4.7", "oauth-client": "0.3.0", + "pg-copy-streams": "cartodb/node-pg-copy-streams#v1.2.0-carto.1", "qs": "~6.2.1", "queue-async": "~1.0.7", "redis-mpool": "0.5.0", diff --git a/test/acceptance/copy-endpoints.js b/test/acceptance/copy-endpoints.js new file mode 100644 index 00000000..a6bfdb2e --- /dev/null +++ b/test/acceptance/copy-endpoints.js @@ -0,0 +1,395 @@ +require('../helper'); + +const fs = require('fs'); +const querystring = require('querystring'); +const assert = require('../support/assert'); +const os = require('os'); +const { Client } = require('pg'); + +const StatsClient = require('../../app/stats/client'); +if (global.settings.statsd) { + // Perform keyword substitution in statsd + if (global.settings.statsd.prefix) { + const hostToken = os.hostname().split('.').reverse().join('.'); + global.settings.statsd.prefix = global.settings.statsd.prefix.replace(/:host/, hostToken); + } +} +const statsClient = StatsClient.getInstance(global.settings.statsd); +const server = require('../../app/server')(statsClient); + + +describe('copy-endpoints', function() { + before(function(done) { + const client = new Client({ + user: 'postgres', + host: 'localhost', + database: 'cartodb_test_user_1_db', + port: 5432, + }); + client.connect(); + client.query('TRUNCATE copy_endpoints_test', (err/*, res */) => { + client.end(); + done(err); + }); + }); + + it('should work with copyfrom endpoint', function(done){ + assert.response(server, { + url: "/api/v1/sql/copyfrom?" + querystring.stringify({ + q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)" + }), + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + const response = JSON.parse(res.body); + assert.equal(!!response.time, true); + assert.strictEqual(response.total_rows, 6); + + assert.ok(res.headers['x-sqlapi-profiler']); + const headers = JSON.parse(res.headers['x-sqlapi-profiler']); + assert.ok(headers.copyFrom); + const metrics = headers.copyFrom; + assert.equal(metrics.size, 57); + assert.equal(metrics.format, 'CSV'); + assert.equal(metrics.time, response.time); + assert.equal(metrics.rows, response.total_rows); + assert.equal(metrics.gzip, false); + + done(); + }); + }); + + it('should fail with copyfrom endpoint and unexisting table', function(done){ + assert.response(server, { + url: "/api/v1/sql/copyfrom?" + querystring.stringify({ + q: "COPY unexisting_table (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)" + }), + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + assert.deepEqual( + JSON.parse(res.body), + { + error:['relation \"unexisting_table\" does not exist'] + } + ); + done(); + }); + }); + + it('should fail with copyfrom endpoint and without csv', function(done){ + assert.response(server, { + url: "/api/v1/sql/copyfrom?" + querystring.stringify({ + q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)" + }), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + assert.deepEqual( + JSON.parse(res.body), + { + error:['No rows copied'] + } + ); + done(); + }); + }); + + it('should fail with copyfrom endpoint and without q', function(done){ + assert.response(server, { + url: "/api/v1/sql/copyfrom", + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + assert.deepEqual( + JSON.parse(res.body), + { + error:["SQL is missing"] + } + ); + done(); + }); + }); + + it('should work with copyto endpoint', function(done){ + assert.response(server, { + url: "/api/v1/sql/copyto?" + querystring.stringify({ + q: 'COPY copy_endpoints_test TO STDOUT', + filename: '/tmp/output.dmp' + }), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + },{}, function(err, res) { + assert.ifError(err); + assert.strictEqual( + res.body, + '11\tPaul\t10\n12\tPeter\t10\n13\tMatthew\t10\n14\t\\N\t10\n15\tJames\t10\n16\tJohn\t10\n' + ); + + assert.equal(res.headers['content-disposition'], 'attachment; filename=%2Ftmp%2Foutput.dmp'); + assert.equal(res.headers['content-type'], 'application/octet-stream'); + + done(); + }); + }); + + it('should fail with copyto endpoint and without sql', function(done){ + assert.response(server, { + url: "/api/v1/sql/copyto?" + querystring.stringify({ + filename: '/tmp/output.dmp' + }), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + },{}, function(err, res) { + assert.ifError(err); + assert.deepEqual( + JSON.parse(res.body), + { + error:["SQL is missing"] + } + ); + done(); + }); + }); + + it('should work with copyfrom and gzip', function(done){ + assert.response(server, { + url: "/api/v1/sql/copyfrom?" + querystring.stringify({ + q: "COPY copy_endpoints_test2 (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)" + }), + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv.gz'), + headers: { + host: 'vizzuality.cartodb.com', + 'content-encoding': 'gzip' + }, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + const response = JSON.parse(res.body); + assert.equal(!!response.time, true); + assert.strictEqual(response.total_rows, 6); + + assert.ok(res.headers['x-sqlapi-profiler']); + const headers = JSON.parse(res.headers['x-sqlapi-profiler']); + assert.ok(headers.copyFrom); + const metrics = headers.copyFrom; + assert.equal(metrics.size, 57); + assert.equal(metrics.format, 'CSV'); + assert.equal(metrics.time, response.time); + assert.equal(metrics.rows, response.total_rows); + assert.equal(metrics.gzip, true); + + done(); + }); + }); + +}); + + +describe('copy-endpoints timeout', function() { + it('should fail with copyfrom and timeout', function(done){ + assert.response(server, { + url: '/api/v1/sql?q=set statement_timeout = 10', + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + }, + function(err) { + assert.ifError(err); + assert.response(server, { + url: "/api/v1/sql/copyfrom?" + querystring.stringify({ + q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)" + }), + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + }, + { + status: 429, + headers: { 'Content-Type': 'application/json; charset=utf-8' } + }, + function(err, res) { + assert.ifError(err); + assert.deepEqual(JSON.parse(res.body), { + error: [ + 'You are over platform\'s limits. Please contact us to know more details' + ], + context: 'limit', + detail: 'datasource' + }); + + + assert.response(server, { + url: "/api/v1/sql?q=set statement_timeout = 2000", + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + }, + done); + }); + }); + }); + + it('should fail with copyto and timeout', function(done){ + assert.response(server, { + url: '/api/v1/sql?q=set statement_timeout = 20', + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + }, + function(err) { + assert.ifError(err); + assert.response(server, { + url: "/api/v1/sql/copyto?" + querystring.stringify({ + q: 'COPY populated_places_simple_reduced TO STDOUT', + filename: '/tmp/output.dmp' + }), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + },{}, function(err, res) { + assert.ifError(err); + const error = { + error:["You are over platform's limits. Please contact us to know more details"], + context:"limit", + detail:"datasource" + }; + const expectedError = res.body.substring(res.body.length - JSON.stringify(error).length); + assert.deepEqual(JSON.parse(expectedError), error); + + assert.response(server, { + url: "/api/v1/sql?q=set statement_timeout = 2000", + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + }, + done); + }); + }); + }); +}); + + +describe('copy-endpoints db connections', function() { + before(function() { + this.db_pool_size = global.settings.db_pool_size; + global.settings.db_pool_size = 1; + }); + + after(function() { + global.settings.db_pool_size = this.db_pool_size; + }); + + it('copyfrom', function(done) { + const query = "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)"; + function doCopyFrom() { + return new Promise(resolve => { + assert.response(server, { + url: "/api/v1/sql/copyfrom?" + querystring.stringify({ + q: query + }), + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + const response = JSON.parse(res.body); + assert.ok(response.time); + resolve(); + }); + }); + } + + Promise.all([doCopyFrom(), doCopyFrom(), doCopyFrom()]).then(function() { + done(); + }); + }); + + it('copyto', function(done) { + function doCopyTo() { + return new Promise(resolve => { + assert.response(server, { + url: "/api/v1/sql/copyto?" + querystring.stringify({ + q: 'COPY copy_endpoints_test TO STDOUT', + filename: '/tmp/output.dmp' + }), + headers: {host: 'vizzuality.cartodb.com'}, + method: 'GET' + },{}, function(err, res) { + assert.ifError(err); + assert.ok(res.body); + resolve(); + }); + }); + } + + Promise.all([doCopyTo(), doCopyTo(), doCopyTo()]).then(function() { + done(); + }); + }); +}); + +describe('copy-endpoints client disconnection', function() { + // Give it enough time to connect and issue the query + // but not too much so as to disconnect in the middle of the query. + const client_disconnect_timeout = 10; + + before(function() { + this.db_pool_size = global.settings.db_pool_size; + global.settings.db_pool_size = 1; + }); + + after(function() { + global.settings.db_pool_size = this.db_pool_size; + }); + + var assertCanReuseConnection = function (done) { + assert.response(server, { + url: '/api/v1/sql?' + querystring.stringify({ + q: 'SELECT 1', + }), + headers: { host: 'vizzuality.cartodb.com' }, + method: 'GET' + }, {}, function(err, res) { + assert.ifError(err); + assert.ok(res.statusCode === 200); + done(); + }); + }; + + it('COPY TO returns the connection to the pool if the client disconnects', function(done) { + assert.response(server, { + url: '/api/v1/sql/copyto?' + querystring.stringify({ + q: 'COPY (SELECT * FROM generate_series(1, 100000)) TO STDOUT', + }), + headers: { host: 'vizzuality.cartodb.com' }, + method: 'GET', + timeout: client_disconnect_timeout + }, {}, function(err) { + // we're expecting a timeout error + assert.ok(err); + assert.ok(err.code === 'ETIMEDOUT' || err.code === 'ESOCKETTIMEDOUT'); + assertCanReuseConnection(done); + }); + }); + + it('COPY FROM returns the connection to the pool if the client disconnects', function(done) { + assert.response(server, { + url: '/api/v1/sql/copyfrom?' + querystring.stringify({ + q: "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)", + }), + headers: { host: 'vizzuality.cartodb.com' }, + method: 'POST', + data: fs.createReadStream(__dirname + '/../support/csv/copy_test_table.csv'), + timeout: client_disconnect_timeout + }, {}, function(err) { + // we're expecting a timeout error + assert.ok(err); + assert.ok(err.code === 'ETIMEDOUT' || err.code === 'ESOCKETTIMEDOUT'); + assertCanReuseConnection(done); + }); + }); + +}); diff --git a/test/acceptance/query-multipart.js b/test/acceptance/query-multipart.js new file mode 100644 index 00000000..27f4c3c6 --- /dev/null +++ b/test/acceptance/query-multipart.js @@ -0,0 +1,24 @@ +require('../helper'); + +const server = require('../../app/server')(); +const assert = require('../support/assert'); + +describe('query-multipart', function() { + it('make query from a multipart form', function(done){ + assert.response(server, { + url: '/api/v1/sql', + formData: { + q: 'SELECT 2 as n' + }, + headers: {host: 'vizzuality.cartodb.com'}, + method: 'POST' + },{}, function(err, res) { + assert.ifError(err); + const response = JSON.parse(res.body); + assert.equal(typeof(response.time) !== 'undefined', true); + assert.strictEqual(response.total_rows, 1); + assert.deepStrictEqual(response.rows, [{n:2}]); + done(); + }); + }); +}); diff --git a/test/support/assert.js b/test/support/assert.js index 47c2ae51..3541f278 100644 --- a/test/support/assert.js +++ b/test/support/assert.js @@ -48,6 +48,10 @@ assert.response = function(server, req, res, callback) { requestParams.body = req.body || req.data; } + if (req.formData) { + requestParams.formData = req.formData; + } + debug('Request params', requestParams); request(requestParams, function assert$response$requestHandler(error, response, body) { debug('Request response', error); diff --git a/test/support/csv/copy_test_table.csv b/test/support/csv/copy_test_table.csv new file mode 100644 index 00000000..79d2f3e7 --- /dev/null +++ b/test/support/csv/copy_test_table.csv @@ -0,0 +1,7 @@ +id,name +11,Paul +12,Peter +13,Matthew +14, +15,James +16,John diff --git a/test/support/csv/copy_test_table.csv.gz b/test/support/csv/copy_test_table.csv.gz new file mode 100644 index 00000000..bac1a2d1 Binary files /dev/null and b/test/support/csv/copy_test_table.csv.gz differ diff --git a/test/support/sql/test.sql b/test/support/sql/test.sql index 8f8f9316..711e6183 100644 --- a/test/support/sql/test.sql +++ b/test/support/sql/test.sql @@ -217,3 +217,21 @@ INSERT INTO CDB_TableMetadata (tabname, updated_at) VALUES ('scoped_table_1'::re GRANT SELECT ON CDB_TableMetadata TO :TESTUSER; GRANT SELECT ON CDB_TableMetadata TO test_cartodb_user_2; + +DROP TABLE IF EXISTS copy_endpoints_test; +CREATE TABLE copy_endpoints_test ( + id integer, + name text, + age integer default 10 +); +GRANT ALL ON TABLE copy_endpoints_test TO :TESTUSER; +GRANT ALL ON TABLE copy_endpoints_test TO :PUBLICUSER; + +DROP TABLE IF EXISTS copy_endpoints_test2; +CREATE TABLE copy_endpoints_test2 ( + id integer, + name text, + age integer default 10 +); +GRANT ALL ON TABLE copy_endpoints_test2 TO :TESTUSER; +GRANT ALL ON TABLE copy_endpoints_test2 TO :PUBLICUSER; diff --git a/test/unit/query_info.test.js b/test/unit/query_info.test.js new file mode 100644 index 00000000..d3216b99 --- /dev/null +++ b/test/unit/query_info.test.js @@ -0,0 +1,64 @@ +const assert = require('assert'); +const queryInfo = require('../../app/utils/query_info'); + +describe('query info', function () { + describe('copy format', function () { + describe('csv', function () { + const validQueries = [ + "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)", + "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER true)", + "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV , DELIMITER ',', HEADER true)", + "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT CSV)", + "COPY copy_endpoints_test FROM STDIN WITH(FORMAT csv,HEADER true)" + ]; + + validQueries.forEach(query => { + it(query, function() { + const result = queryInfo.getFormatFromCopyQuery(query); + assert.equal(result, 'CSV'); + }); + }); + }); + + describe('text', function() { + const validQueries = [ + "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT TEXT)", + "COPY copy_endpoints_test (id, name) FROM STDIN", + ]; + + validQueries.forEach(query => { + it(query, function() { + const result = queryInfo.getFormatFromCopyQuery(query); + assert.equal(result, 'TEXT'); + }); + }); + }); + + describe('binary', function() { + const validQueries = [ + "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT BINARY)", + ]; + + validQueries.forEach(query => { + it(query, function() { + const result = queryInfo.getFormatFromCopyQuery(query); + assert.equal(result, 'BINARY'); + }); + }); + }); + + describe('should fail', function() { + const validQueries = [ + "COPY copy_endpoints_test (id, name) FROM STDIN WITH (FORMAT ERROR)", + "SELECT * from copy_endpoints_test" + ]; + + validQueries.forEach(query => { + it(query, function() { + const result = queryInfo.getFormatFromCopyQuery(query); + assert.strictEqual(result, false); + }); + }); + }); + }); +});