Windshaft-cartodb/lib/backends/cluster.js

244 lines
7.3 KiB
JavaScript
Raw Normal View History

2019-02-27 02:19:44 +08:00
'use strict';
const PSQL = require('cartodb-psql');
const dbParamsFromReqParams = require('../utils/database-params');
const debug = require('debug')('backend:cluster');
const AggregationMapConfig = require('../models/aggregation/aggregation-mapconfig');
2019-02-27 02:19:44 +08:00
2019-09-13 22:32:37 +08:00
const WebMercatorHelper = require('cartodb-query-tables').utils.webMercatorHelper;
const webmercator = new WebMercatorHelper();
2019-07-09 00:32:04 +08:00
2019-02-27 02:19:44 +08:00
module.exports = class ClusterBackend {
getClusterFeatures (mapConfigProvider, params, callback) {
mapConfigProvider.getMapConfig((err, _mapConfig) => {
2019-02-27 02:19:44 +08:00
if (err) {
return callback(err);
}
let pg;
try {
pg = new PSQL(dbParamsFromReqParams(params));
} catch (error) {
2019-03-12 01:09:41 +08:00
debug(error);
2019-02-27 02:19:44 +08:00
return callback(error);
}
2019-03-12 01:04:47 +08:00
const { user, layer: layerIndex } = params;
const mapConfig = new AggregationMapConfig(user, _mapConfig.obj(), pg);
2019-02-28 19:13:15 +08:00
const layer = mapConfig.getLayer(layerIndex);
2019-03-12 01:53:47 +08:00
2019-03-01 22:17:22 +08:00
let { aggregation } = params;
2019-03-12 00:53:34 +08:00
try {
2019-03-12 01:04:47 +08:00
validateAggregationLayer(mapConfig, layerIndex);
2019-03-12 00:53:34 +08:00
aggregation = parseAggregation(aggregation);
validateAggregation(aggregation);
} catch (error) {
error.http_status = 400;
error.type = 'layer';
error.subtype = 'aggregation';
error.layer = {
index: layerIndex,
type: layer.type
};
2019-03-01 22:17:22 +08:00
2019-03-12 01:04:47 +08:00
debug(error);
2019-03-12 00:53:34 +08:00
return callback(error);
2019-03-01 22:17:22 +08:00
}
2019-03-12 01:53:47 +08:00
params.aggregation = aggregation;
2019-02-27 02:19:44 +08:00
2019-03-12 01:53:47 +08:00
getFeatures(pg, layer, params, callback);
2019-02-27 02:19:44 +08:00
});
}
};
2019-03-12 01:53:47 +08:00
function getFeatures (pg, layer, params, callback) {
const query = layer.options.sql_raw;
const resolution = layer.options.aggregation.resolution || 1;
getColumnsName(pg, query, (err, columns) => {
if (err) {
return callback(err);
}
const { zoom, clusterId, aggregation } = params;
getClusterFeatures(pg, zoom, clusterId, columns, query, resolution, aggregation, callback);
});
}
2019-02-27 02:19:44 +08:00
const SKIP_COLUMNS = {
'the_geom': true,
'the_geom_webmercator': true
};
function getColumnsName (pg, query, callback) {
2019-03-12 01:53:47 +08:00
const sql = schemaQuery({
2019-02-27 02:19:44 +08:00
query: query
2019-02-27 19:47:20 +08:00
});
2019-02-27 02:19:44 +08:00
debug('> getColumnsName:', sql);
2019-02-27 02:19:44 +08:00
pg.query(sql, function (err, resultSet) {
if (err) {
return callback(err);
}
const fields = resultSet.fields || [];
const columnNames = fields.map(field => field.name)
.filter(columnName => !SKIP_COLUMNS[columnName]);
return callback(null, columnNames);
}, true);
}
2019-03-01 18:21:18 +08:00
function getClusterFeatures (pg, zoom, clusterId, columns, query, resolution, aggregation, callback) {
let sql = clusterFeaturesQuery({
zoom: zoom,
2019-02-27 02:19:44 +08:00
id: clusterId,
query: query,
res: 256/resolution,
2019-02-27 02:19:44 +08:00
columns: columns
2019-02-27 19:47:20 +08:00
});
2019-02-27 02:19:44 +08:00
2019-03-01 18:21:18 +08:00
if (aggregation !== undefined) {
2019-03-01 22:45:38 +08:00
let { columns = [], expressions = [] } = aggregation;
if (expressions) {
2019-03-02 00:02:06 +08:00
expressions = Object.entries(expressions)
.map(([columnName, exp]) => `${exp.aggregate_function}(${exp.aggregated_column}) AS ${columnName}`);
2019-03-01 22:45:38 +08:00
}
2019-03-01 18:21:18 +08:00
sql = aggregationQuery({
columns,
2019-03-01 22:45:38 +08:00
expressions,
query: sql
2019-03-01 18:21:18 +08:00
});
}
debug('> getClusterFeatures:', sql);
2019-02-27 02:19:44 +08:00
pg.query(sql, (err, data) => {
if (err) {
return callback(err);
}
return callback(null, data);
} , true); // use read-only transaction
}
2019-06-27 00:03:02 +08:00
const schemaQuery = ctx => `SELECT * FROM (${ctx.query}) __cdb_cluster_schema LIMIT 0`;
2019-02-27 02:19:44 +08:00
const clusterFeaturesQuery = ctx => `
WITH
_cdb_params AS (
SELECT
${gridResolution(ctx)} AS res
),
_cell AS (
SELECT
ST_MakeEnvelope(
Floor(ST_X(_cdb_query.the_geom_webmercator)/_cdb_params.res)*_cdb_params.res,
Floor(ST_Y(_cdb_query.the_geom_webmercator)/_cdb_params.res)*_cdb_params.res,
Floor(ST_X(_cdb_query.the_geom_webmercator)/_cdb_params.res + 1)*_cdb_params.res,
Floor(ST_Y(_cdb_query.the_geom_webmercator)/_cdb_params.res + 1)*_cdb_params.res,
3857
) AS bbox
FROM (${ctx.query}) _cdb_query, _cdb_params
WHERE _cdb_query.cartodb_id = ${ctx.id}
)
SELECT
${ctx.columns.join(', ')}
FROM (
SELECT _cdb_query.*
FROM _cell, (${ctx.query}) _cdb_query
WHERE ST_Intersects(_cdb_query.the_geom_webmercator, _cell.bbox)
2019-02-27 02:19:44 +08:00
) __cdb_non_geoms_query
`;
const gridResolution = ctx => {
2019-07-15 21:54:31 +08:00
const zoomResolution = webmercator.getResolution({ z : Math.min(38, ctx.zoom) });
return `${256/ctx.res} * (${zoomResolution})::double precision`;
2019-02-27 02:19:44 +08:00
};
2019-03-01 18:21:18 +08:00
const aggregationQuery = ctx => `
SELECT
count(1) as _cdb_feature_count
${ctx.columns.length ? `,${ctx.columns.join(', ')}` : ''}
2019-03-01 22:17:22 +08:00
${ctx.expressions.length ? `,${ctx.expressions.join(', ')}` : ''}
2019-03-01 18:21:18 +08:00
FROM (${ctx.query}) __cdb_aggregation
${ctx.columns.length ? `GROUP BY ${ctx.columns.join(', ')}` : ''}
`;
2019-03-12 01:04:47 +08:00
function validateAggregationLayer (mapConfig, layerIndex) {
if (!hasAggregationLayer(mapConfig, layerIndex)) {
throw new Error(`Map ${mapConfig.id()} has no aggregation defined for layer ${layerIndex}`);
}
}
// TODO: update when https://github.com/CartoDB/Windshaft-cartodb/pull/1082 is merged
function hasAggregationLayer (mapConfig, layerIndex) {
const layer = mapConfig.getLayer(layerIndex);
if (typeof layer.options.aggregation === 'boolean') {
return layer.options.aggregation;
}
return mapConfig.isAggregationLayer(layerIndex);
}
2019-03-12 00:53:34 +08:00
function parseAggregation (aggregation) {
if (aggregation !== undefined) {
try {
aggregation = JSON.parse(aggregation);
} catch (err) {
throw new Error(`Invalid aggregation input, should be a a valid JSON`);
}
}
return aggregation;
}
function validateAggregation (aggregation) {
if (aggregation !== undefined) {
const { columns, expressions } = aggregation;
if (!hasColumns(columns)) {
throw new Error(`Invalid aggregation input, columns should be and array of column names`);
}
validateExpressions(expressions);
}
}
function hasColumns (columns) {
return Array.isArray(columns) && columns.length;
}
function validateExpressions (expressions) {
2019-03-12 00:53:34 +08:00
if (expressions !== undefined) {
if (!isValidExpression(expressions)) {
throw new Error(`Invalid aggregation input, expressions should be and object with valid functions`);
}
2019-03-12 00:53:34 +08:00
for (const { aggregate_function, aggregated_column } of Object.values(expressions)) {
if (typeof aggregated_column !== 'string') {
throw new Error(`Invalid aggregation input, aggregated column should be an string`);
}
if (typeof aggregate_function !== 'string') {
throw new Error(`Invalid aggregation input, aggregate function should be an string`);
}
}
}
}
function isValidExpression (expressions) {
const invalidTypes = ['string', 'number', 'boolean'];
return expressions !== null && !Array.isArray(expressions) && !invalidTypes.includes(typeof expressions);
}