WIP on /copyfrom
This commit is contained in:
parent
2229d0ee57
commit
72bce5732b
@ -3,7 +3,6 @@
|
|||||||
var _ = require('underscore');
|
var _ = require('underscore');
|
||||||
var CachedQueryTables = require('../services/cached-query-tables');
|
var CachedQueryTables = require('../services/cached-query-tables');
|
||||||
|
|
||||||
|
|
||||||
const userMiddleware = require('../middlewares/user');
|
const userMiddleware = require('../middlewares/user');
|
||||||
const errorMiddleware = require('../middlewares/error');
|
const errorMiddleware = require('../middlewares/error');
|
||||||
const authorizationMiddleware = require('../middlewares/authorization');
|
const authorizationMiddleware = require('../middlewares/authorization');
|
||||||
@ -13,6 +12,9 @@ const { initializeProfilerMiddleware } = require('../middlewares/profiler');
|
|||||||
const rateLimitsMiddleware = require('../middlewares/rate-limit');
|
const rateLimitsMiddleware = require('../middlewares/rate-limit');
|
||||||
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
|
const { RATE_LIMIT_ENDPOINTS_GROUPS } = rateLimitsMiddleware;
|
||||||
|
|
||||||
|
// Database requirements
|
||||||
|
var PSQL = require('cartodb-psql');
|
||||||
|
var fs = require('fs');
|
||||||
|
|
||||||
// We need NPM body-parser so we can use the multer and
|
// We need NPM body-parser so we can use the multer and
|
||||||
// still decode the urlencoded 'sql' parameter from
|
// still decode the urlencoded 'sql' parameter from
|
||||||
@ -26,10 +28,13 @@ var multer = require('multer');
|
|||||||
// do what we need, which is pipe the multer read stream
|
// do what we need, which is pipe the multer read stream
|
||||||
// straight into the pg-copy write stream, so we use
|
// straight into the pg-copy write stream, so we use
|
||||||
// a custom storage engine
|
// a custom storage engine
|
||||||
var multerpgcopy = require('../utils/multer-pg-copy');
|
// var multerpgcopy = require('../utils/multer-pg-copy');
|
||||||
var upload = multer({ storage: multerpgcopy() });
|
// var upload = multer({ storage: multerpgcopy() });
|
||||||
|
|
||||||
// var upload = multer({ dest: '/tmp/' });
|
// Store the uploaded file in the tmp directory, with limits on the
|
||||||
|
// size of acceptable uploads
|
||||||
|
var uploadLimits = { fileSize: 1024*1024*1024, fields: 10, files: 1 };
|
||||||
|
var upload = multer({ storage: multer.diskStorage({}), limits: uploadLimits });
|
||||||
|
|
||||||
function CopyController(metadataBackend, userDatabaseService, tableCache, statsd_client, userLimitsService) {
|
function CopyController(metadataBackend, userDatabaseService, tableCache, statsd_client, userLimitsService) {
|
||||||
this.metadataBackend = metadataBackend;
|
this.metadataBackend = metadataBackend;
|
||||||
@ -60,31 +65,64 @@ CopyController.prototype.route = function (app) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// jshint maxcomplexity:21
|
// jshint maxcomplexity:21
|
||||||
CopyController.prototype.handleCopyFrom = function (req, res) {
|
CopyController.prototype.handleCopyFrom = function (req, res, next) {
|
||||||
|
|
||||||
// Why doesn't this function do much of anything?
|
|
||||||
// Because all the excitement is in the bodyParser and the upload
|
|
||||||
// middlewards, the first of which should fill out the body.params.sql
|
|
||||||
// statement and the second of which should run that statement and
|
|
||||||
// upload it into pgsql.
|
|
||||||
// All that's left here, is to read the number of records inserted
|
|
||||||
// and to return some information to the caller on what exactly
|
|
||||||
// happened.
|
|
||||||
// Test with:
|
// Test with:
|
||||||
// curl --form file=@package.json --form sql="COPY this FROM STDOUT" http://cdb.localhost.lan:8080/api/v2/copyfrom
|
// curl --form file=@package.json --form sql="COPY this FROM STDOUT" http://cdb.localhost.lan:8080/api/v2/copyfrom
|
||||||
|
|
||||||
req.aborted = false;
|
|
||||||
req.on("close", function() {
|
|
||||||
if (req.formatter && _.isFunction(req.formatter.cancel)) {
|
|
||||||
req.formatter.cancel();
|
|
||||||
}
|
|
||||||
req.aborted = true; // TODO: there must be a builtin way to check this
|
|
||||||
});
|
|
||||||
|
|
||||||
console.debug("CopyController.prototype.handleCopyFrom: sql = '%s'", req.body.sql);
|
var sql = req.body.sql;
|
||||||
|
sql = (sql === "" || _.isUndefined(sql)) ? null : sql;
|
||||||
|
|
||||||
res.send('got into handleCopyFrom');
|
// Ensure SQL parameter is not missing
|
||||||
|
if (!_.isString(sql)) {
|
||||||
|
throw new Error("Parameter 'sql' is missing");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only accept SQL that starts with 'COPY'
|
||||||
|
if (!sql.toUpperCase().startsWith("COPY ")) {
|
||||||
|
throw new Error("SQL must start with COPY");
|
||||||
|
}
|
||||||
|
|
||||||
|
// If SQL doesn't include "from stdin", add it
|
||||||
|
if (!sql.toUpperCase().endsWith(" FROM STDOUT")) {
|
||||||
|
sql += " FROM STDOUT";
|
||||||
|
}
|
||||||
|
|
||||||
|
console.debug("CopyController.handleCopyFrom: sql = '%s'", sql);
|
||||||
|
|
||||||
|
// The multer middleware should have filled 'req.file' in
|
||||||
|
// with the name, size, path, etc of the file
|
||||||
|
if (typeof req.file === 'undefined') {
|
||||||
|
throw new Error("Parameter 'file' is missing");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Open pgsql COPY pipe and stream file into it
|
||||||
|
const { user: username, userDbParams: dbopts, authDbParams, userLimits, authenticated } = res.locals;
|
||||||
|
var pg = new PSQL(authDbParams);
|
||||||
|
var copyFrom = require('pg-copy-streams').from;
|
||||||
|
var copyFromStream = copyFrom(sql);
|
||||||
|
// console.debug("XXX connect " + sql);
|
||||||
|
pg.dbConnect(pg.getConnectionConfig(), function(err, client, next) {
|
||||||
|
var stream = client.query(copyFromStream);
|
||||||
|
var fileStream = fs.createReadStream(req.file.path);
|
||||||
|
fileStream.on('error', next);
|
||||||
|
stream.on('error', next);
|
||||||
|
stream.on('end', next);
|
||||||
|
fileStream.pipe(stream)
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
console.debug("ERRROR!!!!")
|
||||||
|
next(err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return some useful information about the process,
|
||||||
|
// pg-copy-streams fills in the rowCount after import is complete
|
||||||
|
var result = "handleCopyFrom completed with row count = " + copyFromStream.rowCount;
|
||||||
|
console.debug("CopyController.handleCopyFrom: rowCount = '%s'", copyFromStream.rowCount);
|
||||||
|
|
||||||
|
res.send(result + "\n");
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports = CopyController;
|
module.exports = CopyController;
|
||||||
|
@ -18,7 +18,7 @@
|
|||||||
],
|
],
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"basic-auth": "^2.0.0",
|
"basic-auth": "^2.0.0",
|
||||||
"body-parser": "1.18.2",
|
"body-parser": "~1.18.2",
|
||||||
"bintrees": "1.0.1",
|
"bintrees": "1.0.1",
|
||||||
"bunyan": "1.8.1",
|
"bunyan": "1.8.1",
|
||||||
"cartodb-psql": "0.10.1",
|
"cartodb-psql": "0.10.1",
|
||||||
@ -28,7 +28,7 @@
|
|||||||
"express": "~4.13.3",
|
"express": "~4.13.3",
|
||||||
"log4js": "cartodb/log4js-node#cdb",
|
"log4js": "cartodb/log4js-node#cdb",
|
||||||
"lru-cache": "~2.5.0",
|
"lru-cache": "~2.5.0",
|
||||||
"multer": "~1.2.0",
|
"multer": "~1.3.0",
|
||||||
"node-statsd": "~0.0.7",
|
"node-statsd": "~0.0.7",
|
||||||
"node-uuid": "^1.4.7",
|
"node-uuid": "^1.4.7",
|
||||||
"oauth-client": "0.3.0",
|
"oauth-client": "0.3.0",
|
||||||
|
Loading…
Reference in New Issue
Block a user