From d54e2f5a073ac466b83021c4477c19d1d66499cf Mon Sep 17 00:00:00 2001 From: Paul Ramsey Date: Thu, 12 Apr 2018 12:25:28 -0700 Subject: [PATCH] Implementation including multer, custom storage engine, and pg-copy, but without turning over pg-copy, and demonstrating the missing 'sql' parameter in the custom storage engine. --- app/controllers/copy_controller.js | 98 ++++++++++++++++++++++++++++++ app/middlewares/body-parser.js | 4 +- app/server.js | 10 +++ app/utils/multer-pg-copy.js | 83 +++++++++++++++++++++++++ package.json | 1 + 5 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 app/controllers/copy_controller.js create mode 100644 app/utils/multer-pg-copy.js diff --git a/app/controllers/copy_controller.js b/app/controllers/copy_controller.js new file mode 100644 index 00000000..c3bf6578 --- /dev/null +++ b/app/controllers/copy_controller.js @@ -0,0 +1,98 @@ +'use strict'; + +var _ = require('underscore'); +var step = require('step'); +var assert = require('assert'); +var PSQL = require('cartodb-psql'); +var CachedQueryTables = require('../services/cached-query-tables'); +var queryMayWrite = require('../utils/query_may_write'); + +var formats = require('../models/formats'); + +var sanitize_filename = require('../utils/filename_sanitizer'); +var getContentDisposition = require('../utils/content_disposition'); +const userMiddleware = require('../middlewares/user'); +const errorMiddleware = require('../middlewares/error'); +const authorizationMiddleware = require('../middlewares/authorization'); +const connectionParamsMiddleware = require('../middlewares/connection-params'); +const timeoutLimitsMiddleware = require('../middlewares/timeout-limits'); +const { initializeProfilerMiddleware } = require('../middlewares/profiler'); +const rateLimitsMiddleware = require('../middlewares/rate-limit'); +const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware; + +var ONE_YEAR_IN_SECONDS = 31536000; // 1 year time to live by default + +// We need NPM body-parser so we can use the multer and +// still decode the urlencoded 'sql' parameter from +// the POST body +var bodyParser = require('body-parser'); // NPM body-parser + +// We need multer to support multi-part POST content +var multer = require('multer'); + +// The default multer storage engines (file/memory) don't +// do what we need, which is pipe the multer read stream +// straight into the pg-copy write stream, so we use +// a custom storage engine +var multerpgcopy = require('../utils/multer-pg-copy'); +var upload = multer({ storage: multerpgcopy() }); + +// var upload = multer({ dest: '/tmp/' }); + +function CopyController(metadataBackend, userDatabaseService, tableCache, statsd_client, userLimitsService) { + this.metadataBackend = metadataBackend; + this.statsd_client = statsd_client; + this.userDatabaseService = userDatabaseService; + this.queryTables = new CachedQueryTables(tableCache); + this.userLimitsService = userLimitsService; +} + +CopyController.prototype.route = function (app) { + const { base_url } = global.settings; + const copyFromMiddlewares = endpointGroup => { + return [ + initializeProfilerMiddleware('query'), + userMiddleware(), + rateLimitsMiddleware(this.userLimitsService, endpointGroup), + authorizationMiddleware(this.metadataBackend), + connectionParamsMiddleware(this.userDatabaseService), + timeoutLimitsMiddleware(this.metadataBackend), + // bodyParser.urlencoded({ extended: true }), + upload.single('file'), + this.handleCopyFrom.bind(this), + errorMiddleware() + ]; + }; + + app.post(`${base_url}/copyfrom`, copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.QUERY)); +}; + +// jshint maxcomplexity:21 +CopyController.prototype.handleCopyFrom = function (req, res, next) { + + // Why doesn't this function do much of anything? + // Because all the excitement is in the bodyParser and the upload + // middlewards, the first of which should fill out the body.params.sql + // statement and the second of which should run that statement and + // upload it into pgsql. + // All that's left here, is to read the number of records inserted + // and to return some information to the caller on what exactly + // happened. + // Test with: + // curl --form file=@package.json --form sql="COPY this FROM STDOUT" http://cdb.localhost.lan:8080/api/v2/copyfrom + + req.aborted = false; + req.on("close", function() { + if (req.formatter && _.isFunction(req.formatter.cancel)) { + req.formatter.cancel(); + } + req.aborted = true; // TODO: there must be a builtin way to check this + }); + + console.debug("CopyController.prototype.handleCopyFrom: sql = '%s'", req.body.sql) + + res.send('got into handleCopyFrom'); + +}; + +module.exports = CopyController; diff --git a/app/middlewares/body-parser.js b/app/middlewares/body-parser.js index 6c44e295..8bd5ff87 100644 --- a/app/middlewares/body-parser.js +++ b/app/middlewares/body-parser.js @@ -141,5 +141,5 @@ exports.parse['application/json'] = function(req, options, fn){ }); }; -var multipartMiddleware = multer({ limits: { fieldSize: Infinity } }); -exports.parse['multipart/form-data'] = multipartMiddleware.none(); +// var multipartMiddleware = multer({ limits: { fieldSize: Infinity } }); +// exports.parse['multipart/form-data'] = multipartMiddleware.none(); diff --git a/app/server.js b/app/server.js index 6223d4eb..ee262262 100644 --- a/app/server.js +++ b/app/server.js @@ -36,6 +36,7 @@ var cors = require('./middlewares/cors'); var GenericController = require('./controllers/generic_controller'); var QueryController = require('./controllers/query_controller'); +var CopyController = require('./controllers/copy_controller'); var JobController = require('./controllers/job_controller'); var CacheStatusController = require('./controllers/cache_status_controller'); var HealthCheckController = require('./controllers/health_check_controller'); @@ -163,6 +164,15 @@ function App(statsClient) { ); queryController.route(app); + var copyController = new CopyController( + metadataBackend, + userDatabaseService, + tableCache, + statsClient, + userLimitsService + ); + copyController.route(app); + var jobController = new JobController( metadataBackend, userDatabaseService, diff --git a/app/utils/multer-pg-copy.js b/app/utils/multer-pg-copy.js new file mode 100644 index 00000000..f645a652 --- /dev/null +++ b/app/utils/multer-pg-copy.js @@ -0,0 +1,83 @@ +// This is a multer "custom storage engine", see +// https://github.com/expressjs/multer/blob/master/StorageEngine.md +// for the contract. + +var _ = require('underscore'); +var fs = require('fs'); +var copyFrom = require('pg-copy-streams').from; + +var opts; + +function PgCopyCustomStorage (opts) { + this.opts = opts || {}; +} + +PgCopyCustomStorage.prototype._handleFile = function _handleFile (req, file, cb) { + + // Skip the pg-copy for now, just write to /tmp/ + // so we can see what parameters are making it into + // this storage handler + var debug_customstorage = true; + + // Hopefully the body-parser has extracted the 'sql' parameter + // Otherwise, this will be a short trip, as we won't be able + // to run the pg-copy-streams + var sql = req.body.sql; + sql = (sql === "" || _.isUndefined(sql)) ? null : sql; + + console.debug("PgCopyCustomStorage.prototype._handleFile"); + console.debug("PgCopyCustomStorage.prototype._handleFile: sql = '%s'", sql); + + if (debug_customstorage) { + var outStream = fs.createWriteStream('/tmp/sqlApiUploadExample'); + file.stream.pipe(outStream); + outStream.on('error', cb); + outStream.on('finish', function () { + cb(null, { + path: file.path, + size: outStream.bytesWritten + }); + }); + + } else { + // TODO, handle this nicely + if(!_.isString(sql)) { + throw new Error("sql is not set"); + } + + // We expect the pg-connect middleware to have + // set this by the time we are called via multer + if (!req.authDbConnection) { + throw new Error("req.authDbConnection is not set"); + } + var sessionPg = req.authDbConnection; + + sessionPg.connect(function(err, client, done) { + if (err) { + return cb(err); + } + + console.debug("XXX pg.connect"); + + // This is the magic part, see + // https://github.com/brianc/node-pg-copy-streams + var outStream = client.query(copyFrom(sql), function(err, result) { + done(err); + return cb(err, result); + }); + + file.stream.on('error', cb); + outStream.on('error', cb); + outStream.on('end', cb); + file.stream.pipe(outStream); + }); + } +} + +PgCopyCustomStorage.prototype._removeFile = function _removeFile (req, file, cb) { + fs.unlink(file.path, cb) +} + +module.exports = function (opts) { + return new PgCopyCustomStorage(opts) +} \ No newline at end of file diff --git a/package.json b/package.json index d2758511..5cef5e9a 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "node-statsd": "~0.0.7", "node-uuid": "^1.4.7", "oauth-client": "0.3.0", + "pg-copy-streams": "^1.2.0", "qs": "~6.2.1", "queue-async": "~1.0.7", "redis-mpool": "0.5.0",