Implementation including multer, custom storage engine, and pg-copy,

but without turning over pg-copy, and demonstrating the missing 'sql'
parameter in the custom storage engine.
This commit is contained in:
Paul Ramsey 2018-04-12 12:25:28 -07:00
parent c9bc11078d
commit d54e2f5a07
5 changed files with 194 additions and 2 deletions

View File

@ -0,0 +1,98 @@
'use strict';
var _ = require('underscore');
var step = require('step');
var assert = require('assert');
var PSQL = require('cartodb-psql');
var CachedQueryTables = require('../services/cached-query-tables');
var queryMayWrite = require('../utils/query_may_write');
var formats = require('../models/formats');
var sanitize_filename = require('../utils/filename_sanitizer');
var getContentDisposition = require('../utils/content_disposition');
const userMiddleware = require('../middlewares/user');
const errorMiddleware = require('../middlewares/error');
const authorizationMiddleware = require('../middlewares/authorization');
const connectionParamsMiddleware = require('../middlewares/connection-params');
const timeoutLimitsMiddleware = require('../middlewares/timeout-limits');
const { initializeProfilerMiddleware } = require('../middlewares/profiler');
const rateLimitsMiddleware = require('../middlewares/rate-limit');
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
var ONE_YEAR_IN_SECONDS = 31536000; // 1 year time to live by default
// We need NPM body-parser so we can use the multer and
// still decode the urlencoded 'sql' parameter from
// the POST body
var bodyParser = require('body-parser'); // NPM body-parser
// We need multer to support multi-part POST content
var multer = require('multer');
// The default multer storage engines (file/memory) don't
// do what we need, which is pipe the multer read stream
// straight into the pg-copy write stream, so we use
// a custom storage engine
var multerpgcopy = require('../utils/multer-pg-copy');
var upload = multer({ storage: multerpgcopy() });
// var upload = multer({ dest: '/tmp/' });
function CopyController(metadataBackend, userDatabaseService, tableCache, statsd_client, userLimitsService) {
this.metadataBackend = metadataBackend;
this.statsd_client = statsd_client;
this.userDatabaseService = userDatabaseService;
this.queryTables = new CachedQueryTables(tableCache);
this.userLimitsService = userLimitsService;
}
CopyController.prototype.route = function (app) {
const { base_url } = global.settings;
const copyFromMiddlewares = endpointGroup => {
return [
initializeProfilerMiddleware('query'),
userMiddleware(),
rateLimitsMiddleware(this.userLimitsService, endpointGroup),
authorizationMiddleware(this.metadataBackend),
connectionParamsMiddleware(this.userDatabaseService),
timeoutLimitsMiddleware(this.metadataBackend),
// bodyParser.urlencoded({ extended: true }),
upload.single('file'),
this.handleCopyFrom.bind(this),
errorMiddleware()
];
};
app.post(`${base_url}/copyfrom`, copyFromMiddlewares(RATE_LIMIT_ENDPOINTS_GROUPS.QUERY));
};
// jshint maxcomplexity:21
CopyController.prototype.handleCopyFrom = function (req, res, next) {
// Why doesn't this function do much of anything?
// Because all the excitement is in the bodyParser and the upload
// middlewards, the first of which should fill out the body.params.sql
// statement and the second of which should run that statement and
// upload it into pgsql.
// All that's left here, is to read the number of records inserted
// and to return some information to the caller on what exactly
// happened.
// Test with:
// curl --form file=@package.json --form sql="COPY this FROM STDOUT" http://cdb.localhost.lan:8080/api/v2/copyfrom
req.aborted = false;
req.on("close", function() {
if (req.formatter && _.isFunction(req.formatter.cancel)) {
req.formatter.cancel();
}
req.aborted = true; // TODO: there must be a builtin way to check this
});
console.debug("CopyController.prototype.handleCopyFrom: sql = '%s'", req.body.sql)
res.send('got into handleCopyFrom');
};
module.exports = CopyController;

View File

@ -141,5 +141,5 @@ exports.parse['application/json'] = function(req, options, fn){
});
};
var multipartMiddleware = multer({ limits: { fieldSize: Infinity } });
exports.parse['multipart/form-data'] = multipartMiddleware.none();
// var multipartMiddleware = multer({ limits: { fieldSize: Infinity } });
// exports.parse['multipart/form-data'] = multipartMiddleware.none();

View File

@ -36,6 +36,7 @@ var cors = require('./middlewares/cors');
var GenericController = require('./controllers/generic_controller');
var QueryController = require('./controllers/query_controller');
var CopyController = require('./controllers/copy_controller');
var JobController = require('./controllers/job_controller');
var CacheStatusController = require('./controllers/cache_status_controller');
var HealthCheckController = require('./controllers/health_check_controller');
@ -163,6 +164,15 @@ function App(statsClient) {
);
queryController.route(app);
var copyController = new CopyController(
metadataBackend,
userDatabaseService,
tableCache,
statsClient,
userLimitsService
);
copyController.route(app);
var jobController = new JobController(
metadataBackend,
userDatabaseService,

View File

@ -0,0 +1,83 @@
// This is a multer "custom storage engine", see
// https://github.com/expressjs/multer/blob/master/StorageEngine.md
// for the contract.
var _ = require('underscore');
var fs = require('fs');
var copyFrom = require('pg-copy-streams').from;
var opts;
function PgCopyCustomStorage (opts) {
this.opts = opts || {};
}
PgCopyCustomStorage.prototype._handleFile = function _handleFile (req, file, cb) {
// Skip the pg-copy for now, just write to /tmp/
// so we can see what parameters are making it into
// this storage handler
var debug_customstorage = true;
// Hopefully the body-parser has extracted the 'sql' parameter
// Otherwise, this will be a short trip, as we won't be able
// to run the pg-copy-streams
var sql = req.body.sql;
sql = (sql === "" || _.isUndefined(sql)) ? null : sql;
console.debug("PgCopyCustomStorage.prototype._handleFile");
console.debug("PgCopyCustomStorage.prototype._handleFile: sql = '%s'", sql);
if (debug_customstorage) {
var outStream = fs.createWriteStream('/tmp/sqlApiUploadExample');
file.stream.pipe(outStream);
outStream.on('error', cb);
outStream.on('finish', function () {
cb(null, {
path: file.path,
size: outStream.bytesWritten
});
});
} else {
// TODO, handle this nicely
if(!_.isString(sql)) {
throw new Error("sql is not set");
}
// We expect the pg-connect middleware to have
// set this by the time we are called via multer
if (!req.authDbConnection) {
throw new Error("req.authDbConnection is not set");
}
var sessionPg = req.authDbConnection;
sessionPg.connect(function(err, client, done) {
if (err) {
return cb(err);
}
console.debug("XXX pg.connect");
// This is the magic part, see
// https://github.com/brianc/node-pg-copy-streams
var outStream = client.query(copyFrom(sql), function(err, result) {
done(err);
return cb(err, result);
});
file.stream.on('error', cb);
outStream.on('error', cb);
outStream.on('end', cb);
file.stream.pipe(outStream);
});
}
}
PgCopyCustomStorage.prototype._removeFile = function _removeFile (req, file, cb) {
fs.unlink(file.path, cb)
}
module.exports = function (opts) {
return new PgCopyCustomStorage(opts)
}

View File

@ -31,6 +31,7 @@
"node-statsd": "~0.0.7",
"node-uuid": "^1.4.7",
"oauth-client": "0.3.0",
"pg-copy-streams": "^1.2.0",
"qs": "~6.2.1",
"queue-async": "~1.0.7",
"redis-mpool": "0.5.0",