Merge pull request #1083 from CartoDB/cluster-refactor

Cluster refactor
This commit is contained in:
Daniel G. Aubert 2019-03-11 19:10:40 +01:00 committed by GitHub
commit 2821d797a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -6,8 +6,6 @@ const debug = require('debug')('backend:cluster');
const AggregationMapConfig = require('../models/aggregation/aggregation-mapconfig');
module.exports = class ClusterBackend {
// TODO: reduce complexity
// jshint maxcomplexity: 17
getClusterFeatures (mapConfigProvider, params, callback) {
mapConfigProvider.getMapConfig((err, _mapConfig) => {
if (err) {
@ -18,16 +16,22 @@ module.exports = class ClusterBackend {
try {
pg = new PSQL(dbParamsFromReqParams(params));
} catch (error) {
debug(error);
return callback(error);
}
const { user, token, layer: layerIndex } = params;
const { user, layer: layerIndex } = params;
const mapConfig = new AggregationMapConfig(user, _mapConfig.obj(), pg);
const layer = mapConfig.getLayer(layerIndex);
if (layer.options.aggregation === false || !mapConfig.isAggregationLayer(layerIndex)) {
const error = new Error(`Map ${token} has no aggregation defined for layer ${layerIndex}`);
let { aggregation } = params;
try {
validateAggregationLayer(mapConfig, layerIndex);
aggregation = parseAggregation(aggregation);
validateAggregation(aggregation);
} catch (error) {
error.http_status = 400;
error.type = 'layer';
error.subtype = 'aggregation';
@ -37,123 +41,39 @@ module.exports = class ClusterBackend {
};
debug(error);
return callback(error);
}
let { aggregation } = params;
params.aggregation = aggregation;
if ( aggregation !== undefined) {
try {
aggregation = JSON.parse(aggregation);
} catch (err) {
const error = new Error(`Invalid aggregation input, should be a a valid JSON`);
error.http_status = 400;
error.type = 'layer';
error.subtype = 'aggregation';
error.layer = {
index: layerIndex,
type: layer.type
};
return callback(error);
}
const { columns, expressions } = aggregation;
if (!Array.isArray(columns) || !columns.length) {
const error = new Error(
`Invalid aggregation input, columns should be and array of column names`
);
error.http_status = 400;
error.type = 'layer';
error.subtype = 'aggregation';
error.layer = {
index: layerIndex,
type: layer.type
};
return callback(error);
}
if (expressions !== undefined) {
if (expressions === null ||
Array.isArray(expressions) ||
['string', 'number', 'boolean'].includes(typeof expressions)) {
const error = new Error(
`Invalid aggregation input, expressions should be and object with valid functions`
);
error.http_status = 400;
error.type = 'layer';
error.subtype = 'aggregation';
error.layer = {
index: layerIndex,
type: layer.type
};
return callback(error);
}
for (const { aggregate_function, aggregated_column } of Object.values(expressions)) {
if (typeof aggregated_column !== 'string') {
const error = new Error(`Invalid aggregation input, aggregated column should be an string`);
error.http_status = 400;
error.type = 'layer';
error.subtype = 'aggregation';
error.layer = {
index: layerIndex,
type: layer.type
};
return callback(error);
}
if (typeof aggregate_function !== 'string') {
const error = new Error(
`Invalid aggregation input, aggregate function should be an string`
);
error.http_status = 400;
error.type = 'layer';
error.subtype = 'aggregation';
error.layer = {
index: layerIndex,
type: layer.type
};
return callback(error);
}
}
}
}
const query = layer.options.sql_raw;
const resolution = layer.options.aggregation.resolution || 1;
getColumnsName(pg, query, (err, columns) => {
if (err) {
return callback(err);
}
const { zoom, clusterId } = params;
getClusterFeatures(pg, zoom, clusterId, columns, query, resolution, aggregation, (err, features) => {
if (err) {
return callback(err);
}
return callback(null, features);
});
});
getFeatures(pg, layer, params, callback);
});
}
};
function getFeatures (pg, layer, params, callback) {
const query = layer.options.sql_raw;
const resolution = layer.options.aggregation.resolution || 1;
getColumnsName(pg, query, (err, columns) => {
if (err) {
return callback(err);
}
const { zoom, clusterId, aggregation } = params;
getClusterFeatures(pg, zoom, clusterId, columns, query, resolution, aggregation, callback);
});
}
const SKIP_COLUMNS = {
'the_geom': true,
'the_geom_webmercator': true
};
function getColumnsName (pg, query, callback) {
const sql = limitedQuery({
const sql = schemaQuery({
query: query
});
@ -207,7 +127,7 @@ function getClusterFeatures (pg, zoom, clusterId, columns, query, resolution, ag
} , true); // use read-only transaction
}
const limitedQuery = ctx => `SELECT * FROM (${ctx.query}) __cdb_schema LIMIT 0`;
const schemaQuery = ctx => `SELECT * FROM (${ctx.query}) __cdb_schema LIMIT 0`;
const clusterFeaturesQuery = ctx => `
WITH
_cdb_params AS (
@ -249,3 +169,73 @@ const aggregationQuery = ctx => `
FROM (${ctx.query}) __cdb_aggregation
${ctx.columns.length ? `GROUP BY ${ctx.columns.join(', ')}` : ''}
`;
function validateAggregationLayer (mapConfig, layerIndex) {
if (!hasAggregationLayer(mapConfig, layerIndex)) {
throw new Error(`Map ${mapConfig.id()} has no aggregation defined for layer ${layerIndex}`);
}
}
// TODO: update when https://github.com/CartoDB/Windshaft-cartodb/pull/1082 is merged
function hasAggregationLayer (mapConfig, layerIndex) {
const layer = mapConfig.getLayer(layerIndex);
if (typeof layer.options.aggregation === 'boolean') {
return layer.options.aggregation;
}
return mapConfig.isAggregationLayer(layerIndex);
}
function parseAggregation (aggregation) {
if (aggregation !== undefined) {
try {
aggregation = JSON.parse(aggregation);
} catch (err) {
throw new Error(`Invalid aggregation input, should be a a valid JSON`);
}
}
return aggregation;
}
function validateAggregation (aggregation) {
if (aggregation !== undefined) {
const { columns, expressions } = aggregation;
if (!hasColumns(columns)) {
throw new Error(`Invalid aggregation input, columns should be and array of column names`);
}
validateExpressions(expressions);
}
}
function hasColumns (columns) {
return Array.isArray(columns) && columns.length;
}
function validateExpressions (expressions) {
if (expressions !== undefined) {
if (!isValidExpression(expressions)) {
throw new Error(`Invalid aggregation input, expressions should be and object with valid functions`);
}
for (const { aggregate_function, aggregated_column } of Object.values(expressions)) {
if (typeof aggregated_column !== 'string') {
throw new Error(`Invalid aggregation input, aggregated column should be an string`);
}
if (typeof aggregate_function !== 'string') {
throw new Error(`Invalid aggregation input, aggregate function should be an string`);
}
}
}
}
function isValidExpression (expressions) {
const invalidTypes = ['string', 'number', 'boolean'];
return expressions !== null && !Array.isArray(expressions) && !invalidTypes.includes(typeof expressions);
}