Merge pull request #745 from CartoDB/aggregation-dataview-refactor

Aggregation dataview refactor
This commit is contained in:
Simon Martín 2017-10-02 17:24:12 +02:00 committed by GitHub
commit 1e4c63a6dc

View File

@ -1,95 +1,178 @@
var _ = require('underscore'); const BaseDataview = require('./base');
var BaseWidget = require('./base'); const debug = require('debug')('windshaft:dataview:aggregation');
var debug = require('debug')('windshaft:widget:aggregation');
var dot = require('dot'); const filteredQueryTpl = ctx => `
dot.templateSettings.strip = false; filtered_source AS (
SELECT *
FROM (${ctx.query}) _cdb_filtered_source
${ctx.aggregationColumn && ctx.isFloatColumn ? `
WHERE
${ctx.aggregationColumn} != 'infinity'::float
AND
${ctx.aggregationColumn} != '-infinity'::float
AND
${ctx.aggregationColumn} != 'NaN'::float` :
''
}
)
`;
var filteredQueryTpl = dot.template([ const summaryQueryTpl = ctx => `
'filtered_source AS (', summary AS (
' SELECT *', SELECT
' FROM ({{=it._query}}) _cdb_filtered_source', count(1) AS count,
' {{?it._aggregationColumn && it._isFloatColumn}}WHERE', sum(CASE WHEN ${ctx.column} IS NULL THEN 1 ELSE 0 END) AS nulls_count
' {{=it._aggregationColumn}} != \'infinity\'::float', ${ctx.isFloatColumn ? `,
' AND', sum(
' {{=it._aggregationColumn}} != \'-infinity\'::float', CASE
' AND', WHEN ${ctx.aggregationColumn} = 'infinity'::float OR ${ctx.aggregationColumn} = '-infinity'::float
' {{=it._aggregationColumn}} != \'NaN\'::float{{?}}', THEN 1
')' ELSE 0
].join(' \n')); END
) AS infinities_count,
sum(CASE WHEN ${ctx.aggregationColumn} = 'NaN'::float THEN 1 ELSE 0 END) AS nans_count` :
''
}
FROM (${ctx.query}) _cdb_aggregation_nulls
)
`;
var summaryQueryTpl = dot.template([ const rankedCategoriesQueryTpl = ctx => `
'summary AS (', categories AS(
' SELECT', SELECT
' count(1) AS count,', ${ctx.column} AS category,
' sum(CASE WHEN {{=it._column}} IS NULL THEN 1 ELSE 0 END) AS nulls_count', ${ctx.aggregationFn} AS value,
' {{?it._isFloatColumn}},sum(', row_number() OVER (ORDER BY ${ctx.aggregationFn} desc) as rank
' CASE', FROM filtered_source
' WHEN {{=it._aggregationColumn}} = \'infinity\'::float OR {{=it._aggregationColumn}} = \'-infinity\'::float', ${ctx.aggregationColumn !== null ? `WHERE ${ctx.aggregationColumn} IS NOT NULL` : ''}
' THEN 1', GROUP BY ${ctx.column}
' ELSE 0', ORDER BY 2 DESC
' END', )
' ) AS infinities_count,', `;
' sum(CASE WHEN {{=it._aggregationColumn}} = \'NaN\'::float THEN 1 ELSE 0 END) AS nans_count{{?}}',
' FROM ({{=it._query}}) _cdb_aggregation_nulls',
')'
].join('\n'));
var rankedCategoriesQueryTpl = dot.template([ const categoriesSummaryMinMaxQueryTpl = () => `
'categories AS(', categories_summary_min_max AS(
' SELECT {{=it._column}} AS category, {{=it._aggregation}} AS value,', SELECT
' row_number() OVER (ORDER BY {{=it._aggregation}} desc) as rank', max(value) max_val,
' FROM filtered_source', min(value) min_val
' {{?it._aggregationColumn!==null}}WHERE {{=it._aggregationColumn}} IS NOT NULL{{?}}', FROM categories
' GROUP BY {{=it._column}}', )
' ORDER BY 2 DESC', `;
')'
].join('\n'));
var categoriesSummaryMinMaxQueryTpl = dot.template([ const categoriesSummaryCountQueryTpl = ctx => `
'categories_summary_min_max AS(', categories_summary_count AS(
' SELECT max(value) max_val, min(value) min_val', SELECT count(1) AS categories_count
' FROM categories', FROM (
')' SELECT ${ctx.column} AS category
].join('\n')); FROM (${ctx.query}) _cdb_categories
GROUP BY ${ctx.column}
) _cdb_categories_count
)
`;
var categoriesSummaryCountQueryTpl = dot.template([ const specialNumericValuesColumns = () => `, nans_count, infinities_count`;
'categories_summary_count AS(',
' SELECT count(1) AS categories_count',
' FROM (',
' SELECT {{=it._column}} AS category',
' FROM ({{=it._query}}) _cdb_categories',
' GROUP BY {{=it._column}}',
' ) _cdb_categories_count',
')'
].join('\n'));
var rankedAggregationQueryTpl = dot.template([ const rankedAggregationQueryTpl = ctx => `
'SELECT CAST(category AS text), value, false as agg, nulls_count, min_val, max_val,', SELECT
' count, categories_count{{?it._isFloatColumn}}, nans_count, infinities_count{{?}}', CAST(category AS text),
' FROM categories, summary, categories_summary_min_max, categories_summary_count', value,
' WHERE rank < {{=it._limit}}', false as agg,
'UNION ALL', nulls_count,
'SELECT \'Other\' category, {{=it._aggregationFn}}(value) as value, true as agg, nulls_count,', min_val,
' min_val, max_val, count, categories_count{{?it._isFloatColumn}}, nans_count, infinities_count{{?}}', max_val,
' FROM categories, summary, categories_summary_min_max, categories_summary_count', count,
' WHERE rank >= {{=it._limit}}', categories_count
'GROUP BY nulls_count, min_val, max_val, count,', ${ctx.isFloatColumn ? `${specialNumericValuesColumns(ctx)}` : '' }
' categories_count{{?it._isFloatColumn}}, nans_count, infinities_count{{?}}' FROM categories, summary, categories_summary_min_max, categories_summary_count
].join('\n')); WHERE rank < ${ctx.limit}
UNION ALL
SELECT
'Other' category,
${ctx.aggregation !== 'count' ? ctx.aggregation : 'sum'}(value) as value,
true as agg,
nulls_count,
min_val,
max_val,
count,
categories_count
${ctx.isFloatColumn ? `${specialNumericValuesColumns(ctx)}` : '' }
FROM categories, summary, categories_summary_min_max, categories_summary_count
WHERE rank >= ${ctx.limit}
GROUP BY
nulls_count,
min_val,
max_val,
count,
categories_count
${ctx.isFloatColumn ? `${specialNumericValuesColumns(ctx)}` : '' }
`;
var aggregationQueryTpl = dot.template([ const aggregationQueryTpl = ctx => `
'SELECT CAST({{=it._column}} AS text) AS category, {{=it._aggregation}} AS value, false as agg,', SELECT
' nulls_count, min_val, max_val, count, categories_count{{?it._isFloatColumn}}, nans_count, infinities_count{{?}}', CAST(${ctx.column} AS text) AS category,
'FROM ({{=it._query}}) _cdb_aggregation_all, summary, categories_summary_min_max, categories_summary_count', ${ctx.aggregationFn} AS value,
'GROUP BY category, nulls_count, min_val, max_val, count,', false as agg,
' categories_count{{?it._isFloatColumn}}, nans_count, infinities_count{{?}}', nulls_count,
'ORDER BY value DESC' min_val,
].join('\n')); max_val,
count,
categories_count
${ctx.isFloatColumn ? `${specialNumericValuesColumns(ctx)}` : '' }
FROM (${ctx.query}) _cdb_aggregation_all, summary, categories_summary_min_max, categories_summary_count
GROUP BY
category,
nulls_count,
min_val,
max_val,
count,
categories_count
${ctx.isFloatColumn ? `${specialNumericValuesColumns(ctx)}` : '' }
ORDER BY value DESC
`;
var CATEGORIES_LIMIT = 6; const aggregationFnQueryTpl = ctx => `${ctx.aggregation}(${ctx.aggregationColumn})`;
var VALID_OPERATIONS = { const aggregationDataviewQueryTpl = ctx => `
WITH
${filteredQueryTpl(ctx)},
${summaryQueryTpl(ctx)},
${rankedCategoriesQueryTpl(ctx)},
${categoriesSummaryMinMaxQueryTpl(ctx)},
${categoriesSummaryCountQueryTpl(ctx)}
${!!ctx.override.ownFilter ? `${aggregationQueryTpl(ctx)}` : `${rankedAggregationQueryTpl(ctx)}`}
`;
const filterCategoriesQueryTpl = ctx => `
SELECT
${ctx.column} AS category,
${ctx.value} AS value
FROM (${ctx.query}) _cdb_aggregation_search
WHERE CAST(${ctx.column} as text) ILIKE ${ctx.userQuery}
GROUP BY ${ctx.column}
`;
const searchQueryTpl = ctx => `
WITH
search_unfiltered AS (
${ctx.searchUnfiltered}
),
search_filtered AS (
${ctx.searchFiltered}
),
search_union AS (
SELECT * FROM search_unfiltered
UNION ALL
SELECT * FROM search_filtered
)
SELECT category, sum(value) AS value
FROM search_union
GROUP BY category
ORDER BY value desc
`;
const CATEGORIES_LIMIT = 6;
const VALID_OPERATIONS = {
count: [], count: [],
sum: ['aggregationColumn'], sum: ['aggregationColumn'],
avg: ['aggregationColumn'], avg: ['aggregationColumn'],
@ -97,7 +180,7 @@ var VALID_OPERATIONS = {
max: ['aggregationColumn'] max: ['aggregationColumn']
}; };
var TYPE = 'aggregation'; const TYPE = 'aggregation';
/** /**
{ {
@ -108,256 +191,150 @@ var TYPE = 'aggregation';
} }
} }
*/ */
function Aggregation(query, options, queries) { module.exports = class Aggregation extends BaseDataview {
if (!_.isString(options.column)) { constructor (query, options = {}, queries = {}) {
throw new Error('Aggregation expects `column` in widget options'); super();
this._checkOptions(options);
this.query = query;
this.queries = queries;
this.column = options.column;
this.aggregation = options.aggregation;
this.aggregationColumn = options.aggregationColumn;
this._isFloatColumn = null;
} }
if (!_.isString(options.aggregation)) { _checkOptions (options) {
throw new Error('Aggregation expects `aggregation` operation in widget options'); if (typeof options.column !== 'string') {
} throw new Error(`Aggregation expects 'column' in dataview options`);
if (!VALID_OPERATIONS[options.aggregation]) {
throw new Error("Aggregation does not support '" + options.aggregation + "' operation");
}
var requiredOptions = VALID_OPERATIONS[options.aggregation];
var missingOptions = _.difference(requiredOptions, Object.keys(options));
if (missingOptions.length > 0) {
throw new Error(
"Aggregation '" + options.aggregation + "' is missing some options: " + missingOptions.join(',')
);
}
BaseWidget.apply(this);
this.query = query;
this.queries = queries;
this.column = options.column;
this.aggregation = options.aggregation;
this.aggregationColumn = options.aggregationColumn;
this._isFloatColumn = null;
}
Aggregation.prototype = new BaseWidget();
Aggregation.prototype.constructor = Aggregation;
module.exports = Aggregation;
Aggregation.prototype.sql = function(psql, override, callback) {
var self = this;
if (!callback) {
callback = override;
override = {};
}
if (this.aggregationColumn && this._isFloatColumn === null) {
this._isFloatColumn = false;
this.getColumnType(psql, this.aggregationColumn, this.queries.no_filters, function (err, type) {
if (!err && !!type) {
self._isFloatColumn = type.float;
}
self.sql(psql, override, callback);
});
return null;
}
var _query = this.query;
var aggregationSql;
if (!!override.ownFilter) {
aggregationSql = [
this.getCategoriesCTESql(
_query,
this.column,
this.aggregation,
this.aggregationColumn,
this._isFloatColumn
),
aggregationQueryTpl({
_isFloatColumn: this._isFloatColumn,
_query: _query,
_column: this.column,
_aggregation: this.getAggregationSql(),
_limit: CATEGORIES_LIMIT
})
].join('\n');
} else {
aggregationSql = [
this.getCategoriesCTESql(
_query,
this.column,
this.aggregation,
this.aggregationColumn,
this._isFloatColumn
),
rankedAggregationQueryTpl({
_isFloatColumn: this._isFloatColumn,
_query: _query,
_column: this.column,
_aggregationFn: this.aggregation !== 'count' ? this.aggregation : 'sum',
_limit: CATEGORIES_LIMIT
})
].join('\n');
}
debug(aggregationSql);
return callback(null, aggregationSql);
};
Aggregation.prototype.getCategoriesCTESql = function(query, column, aggregation, aggregationColumn, isFloatColumn) {
return [
"WITH",
[
filteredQueryTpl({
_isFloatColumn: isFloatColumn,
_query: this.query,
_column: this.column,
_aggregationColumn: aggregation !== 'count' ? aggregationColumn : null
}),
summaryQueryTpl({
_isFloatColumn: isFloatColumn,
_query: query,
_column: column,
_aggregationColumn: aggregation !== 'count' ? aggregationColumn : null
}),
rankedCategoriesQueryTpl({
_query: query,
_column: column,
_aggregation: this.getAggregationSql(),
_aggregationColumn: aggregation !== 'count' ? aggregationColumn : null
}),
categoriesSummaryMinMaxQueryTpl({
_query: query,
_column: column
}),
categoriesSummaryCountQueryTpl({
_query: query,
_column: column
})
].join(',\n')
].join('\n');
};
var aggregationFnQueryTpl = dot.template('{{=it._aggregationFn}}({{=it._aggregationColumn}})');
Aggregation.prototype.getAggregationSql = function() {
return aggregationFnQueryTpl({
_aggregationFn: this.aggregation,
_aggregationColumn: this.aggregationColumn || 1
});
};
Aggregation.prototype.format = function(result) {
var categories = [];
var count = 0;
var nulls = 0;
var nans = 0;
var infinities = 0;
var minValue = 0;
var maxValue = 0;
var categoriesCount = 0;
if (result.rows.length) {
var firstRow = result.rows[0];
count = firstRow.count;
nulls = firstRow.nulls_count;
nans = firstRow.nans_count;
infinities = firstRow.infinities_count;
minValue = firstRow.min_val;
maxValue = firstRow.max_val;
categoriesCount = firstRow.categories_count;
result.rows.forEach(function(row) {
categories.push(_.omit(row, 'count', 'nulls_count', 'min_val',
'max_val', 'categories_count', 'nans_count', 'infinities_count'));
});
}
return {
aggregation: this.aggregation,
count: count,
nulls: nulls,
nans: nans,
infinities: infinities,
min: minValue,
max: maxValue,
categoriesCount: categoriesCount,
categories: categories
};
};
var filterCategoriesQueryTpl = dot.template([
'SELECT {{=it._column}} AS category, {{=it._value}} AS value',
'FROM ({{=it._query}}) _cdb_aggregation_search',
'WHERE CAST({{=it._column}} as text) ILIKE {{=it._userQuery}}',
'GROUP BY {{=it._column}}'
].join('\n'));
var searchQueryTpl = dot.template([
'WITH',
'search_unfiltered AS (',
' {{=it._searchUnfiltered}}',
'),',
'search_filtered AS (',
' {{=it._searchFiltered}}',
'),',
'search_union AS (',
' SELECT * FROM search_unfiltered',
' UNION ALL',
' SELECT * FROM search_filtered',
')',
'SELECT category, sum(value) AS value',
'FROM search_union',
'GROUP BY category',
'ORDER BY value desc'
].join('\n'));
Aggregation.prototype.search = function(psql, userQuery, callback) {
var self = this;
var _userQuery = psql.escapeLiteral('%' + userQuery + '%');
var _value = this.aggregation !== 'count' && this.aggregationColumn ?
this.aggregation + '(' + this.aggregationColumn + ')' : 'count(1)';
// TODO unfiltered will be wrong as filters are already applied at this point
var query = searchQueryTpl({
_searchUnfiltered: filterCategoriesQueryTpl({
_query: this.query,
_column: this.column,
_value: '0',
_userQuery: _userQuery
}),
_searchFiltered: filterCategoriesQueryTpl({
_query: this.query,
_column: this.column,
_value: _value,
_userQuery: _userQuery
})
});
psql.query(query, function(err, result) {
if (err) {
return callback(err, result);
} }
return callback(null, {type: self.getType(), categories: result.rows }); if (typeof options.aggregation !== 'string') {
}, true); // use read-only transaction throw new Error(`Aggregation expects 'aggregation' operation in dataview options`);
}; }
Aggregation.prototype.getType = function() { if (!VALID_OPERATIONS[options.aggregation]) {
return TYPE; throw new Error(`Aggregation does not support '${options.aggregation}' operation`);
}; }
Aggregation.prototype.toString = function() { const requiredOptions = VALID_OPERATIONS[options.aggregation];
return JSON.stringify({ const missingOptions = requiredOptions.filter(requiredOption => !options.hasOwnProperty(requiredOption));
_type: TYPE,
_query: this.query, if (missingOptions.length > 0) {
_column: this.column, throw new Error(
_aggregation: this.aggregation `Aggregation '${options.aggregation}' is missing some options: ${missingOptions.join(',')}`
}); );
}
}
sql (psql, override, callback) {
if (!callback) {
callback = override;
override = {};
}
if (this._shouldCheckColumnType()) {
this._isFloatColumn = false;
this.getColumnType(psql, this.aggregationColumn, this.queries.no_filters, (err, type) => {
if (!err && !!type) {
this._isFloatColumn = type.float;
}
this.sql(psql, override, callback);
});
return null;
}
const aggregationSql = aggregationDataviewQueryTpl({
override: override,
query: this.query,
column: this.column,
aggregation: this.aggregation,
aggregationColumn: this.aggregation !== 'count' ? this.aggregationColumn : null,
aggregationFn: aggregationFnQueryTpl({
aggregation: this.aggregation,
aggregationColumn: this.aggregationColumn || 1
}),
isFloatColumn: this._isFloatColumn,
limit: CATEGORIES_LIMIT
});
debug(aggregationSql);
return callback(null, aggregationSql);
}
_shouldCheckColumnType () {
return this.aggregationColumn && this._isFloatColumn === null;
}
format (result) {
const {
count = 0,
nulls_count = 0,
nans_count = 0,
infinities_count = 0,
min_val = 0,
max_val = 0,
categories_count = 0
} = result.rows[0] || {};
return {
aggregation: this.aggregation,
count: count,
nulls: nulls_count,
nans: nans_count,
infinities: infinities_count,
min: min_val,
max: max_val,
categoriesCount: categories_count,
categories: result.rows.map(({ category, value, agg }) => ({ category, value, agg }))
};
}
search (psql, userQuery, callback) {
const escapedUserQuery = psql.escapeLiteral(`%${userQuery}%`);
const value = this.aggregation !== 'count' && this.aggregationColumn ?
`${this.aggregation}(${this.aggregationColumn})` :
'count(1)';
// TODO unfiltered will be wrong as filters are already applied at this point
const query = searchQueryTpl({
searchUnfiltered: filterCategoriesQueryTpl({
query: this.query,
column: this.column,
value: '0',
userQuery: escapedUserQuery
}),
searchFiltered: filterCategoriesQueryTpl({
query: this.query,
column: this.column,
value: value,
userQuery: escapedUserQuery
})
});
debug(query);
psql.query(query, (err, result) => {
if (err) {
return callback(err, result);
}
return callback(null, {type: this.getType(), categories: result.rows });
}, true); // use read-only transaction
}
getType () {
return TYPE;
}
toString () {
return JSON.stringify({
_type: TYPE,
_query: this.query,
_column: this.column,
_aggregation: this.aggregation
});
}
}; };