Windshaft-cartodb/lib/models/dataview/histograms/date-histogram.js

324 lines
10 KiB
JavaScript
Raw Normal View History

'use strict';
2017-09-11 19:54:46 +08:00
const BaseHistogram = require('./base-histogram');
2017-09-12 00:44:14 +08:00
const debug = require('debug')('windshaft:dataview:date-histogram');
2017-12-05 00:03:31 +08:00
const utils = require('../../../utils/query-utils');
/**
* Gets the name of a timezone with the same offset as the required
* using the pg_timezone_names table. We do this because it's simpler to pass
* the name than to pass the offset itself as PostgreSQL uses different
* sign convention. For example: TIME ZONE 'CET' is equal to TIME ZONE 'UTC-1',
* not 'UTC+1' which would be expected.
* Gives priority to Etc/GMT±N timezones but still support odd offsets like 8.5
* hours for Asia/Pyongyang.
* It also makes it easier to, in the future, support the input of expected timezone
* instead of the offset; that is using 'Europe/Madrid' instead of
* '+3600' or '+7200'. The daylight saving status can be handled by postgres.
*/
const offsetNameQueryTpl = ctx => `
WITH __wd_tz AS
(
SELECT name
FROM pg_timezone_names
WHERE utc_offset = interval '${ctx.offset} hours'
ORDER BY CASE WHEN name LIKE 'Etc/GMT%' THEN 0 ELSE 1 END
LIMIT 1
),`;
/**
* Function to get the subquery that places each row in its bin depending on
* the aggregation. Since the data stored is in epoch we need to adapt it to
* our timezone so when calling date_trunc it falls into the correct bin
*/
2019-10-22 01:07:24 +08:00
function dataBucketsQuery (ctx) {
2019-11-14 18:36:47 +08:00
var conditionStr = '';
2017-12-05 00:03:31 +08:00
if (ctx.start !== 0) {
2019-11-14 18:36:47 +08:00
conditionStr = `WHERE ${ctx.column} >= to_timestamp(${ctx.start})`;
2017-12-05 00:03:31 +08:00
}
if (ctx.end !== 0) {
2019-11-14 18:36:47 +08:00
if (conditionStr === '') {
conditionStr = `WHERE ${ctx.column} <= to_timestamp(${ctx.end})`;
2019-10-22 01:07:24 +08:00
} else {
2019-11-14 18:36:47 +08:00
conditionStr += ` and ${ctx.column} <= to_timestamp(${ctx.end})`;
2017-12-05 00:03:31 +08:00
}
}
return `
__wd_buckets AS
(
SELECT
date_trunc('${ctx.aggregation}', timezone(__wd_tz.name, ${ctx.column}::timestamptz)) as timestamp,
count(*) as freq,
${utils.countNULLs(ctx)} as nulls_count
FROM
(
${ctx.query}
) __source, __wd_tz
2019-11-14 18:36:47 +08:00
${conditionStr}
GROUP BY 1, __wd_tz.name
2017-12-05 00:03:31 +08:00
),`;
}
/**
* Function that generates an array with all the possible bins between the
* start and end date. If not provided we use the min and max generated from
* the dataBucketsQuery
*/
2019-10-22 01:07:24 +08:00
function allBucketsArrayQuery (ctx) {
2019-11-14 18:36:47 +08:00
var extraFrom = '';
var seriesStart = '';
var seriesEnd = '';
2017-12-05 00:03:31 +08:00
if (ctx.start === 0) {
2019-11-14 18:36:47 +08:00
extraFrom = ', __wd_buckets GROUP BY __wd_tz.name';
seriesStart = 'min(__wd_buckets.timestamp)';
2017-12-05 00:03:31 +08:00
} else {
2019-11-14 18:36:47 +08:00
seriesStart = `date_trunc('${ctx.aggregation}', timezone(__wd_tz.name, to_timestamp(${ctx.start})))`;
2017-12-05 00:03:31 +08:00
}
if (ctx.end === 0) {
2019-11-14 18:36:47 +08:00
extraFrom = ', __wd_buckets GROUP BY __wd_tz.name';
seriesEnd = 'max(__wd_buckets.timestamp)';
2017-12-05 00:03:31 +08:00
} else {
2019-11-14 18:36:47 +08:00
seriesEnd = `date_trunc('${ctx.aggregation}', timezone(__wd_tz.name, to_timestamp(${ctx.end})))`;
2017-12-05 00:03:31 +08:00
}
return `
__wd_all_buckets AS
(
SELECT ARRAY(
SELECT
generate_series(
2019-11-14 18:36:47 +08:00
${seriesStart},
${seriesEnd},
2017-12-05 00:03:31 +08:00
interval '${ctx.interval}') as bin_start
2019-11-14 18:36:47 +08:00
FROM __wd_tz${extraFrom}
2017-12-05 00:03:31 +08:00
) as bins
)`;
}
const dateIntervalQueryTpl = ctx => `
WITH
__cdb_dates AS (
SELECT
MAX(${ctx.column}::timestamp) AS __cdb_end,
MIN(${ctx.column}::timestamp) AS __cdb_start
FROM (${ctx.query}) __cdb_source
),
__cdb_interval_in_days AS (
SELECT
DATE_PART('day', __cdb_end - __cdb_start) AS __cdb_days
FROM __cdb_dates
),
__cdb_interval_in_hours AS (
SELECT
__cdb_days * 24 + DATE_PART('hour', __cdb_end - __cdb_start) AS __cdb_hours
FROM __cdb_interval_in_days, __cdb_dates
),
__cdb_interval_in_minutes AS (
SELECT
__cdb_hours * 60 + DATE_PART('minute', __cdb_end - __cdb_start) AS __cdb_minutes
FROM __cdb_interval_in_hours, __cdb_dates
),
__cdb_interval_in_seconds AS (
SELECT
__cdb_minutes * 60 + DATE_PART('second', __cdb_end - __cdb_start) AS __cdb_seconds
FROM __cdb_interval_in_minutes, __cdb_dates
)
SELECT
ROUND(__cdb_days / 365243) AS millennium,
ROUND(__cdb_days / 36525) AS century,
ROUND(__cdb_days / 3652) AS decade,
ROUND(__cdb_days / 365) AS year,
ROUND(__cdb_days / 91) AS quarter,
ROUND(__cdb_days / 30) AS month,
ROUND(__cdb_days / 7) AS week,
__cdb_days AS day,
__cdb_hours AS hour,
__cdb_minutes AS minute,
__cdb_seconds AS second
FROM __cdb_interval_in_days, __cdb_interval_in_hours, __cdb_interval_in_minutes, __cdb_interval_in_seconds
`;
/** Constant to switch between aggregations in auto mode */
const MAX_INTERVAL_VALUE = 100;
const DATE_AGGREGATIONS = {
2019-10-22 01:07:24 +08:00
auto: true,
second: true,
minute: true,
hour: true,
day: true,
week: true,
month: true,
quarter: true,
year: true,
decade: true,
century: true,
millennium: true
};
/**
date_histogram: {
type: 'histogram',
options: {
column: 'date', // column data type: date
2017-09-12 01:26:28 +08:00
aggregation: 'day' // MANDATORY
offset: -7200 // OPTIONAL (UTC offset in seconds)
}
}
*/
2017-09-11 19:54:46 +08:00
module.exports = class DateHistogram extends BaseHistogram {
constructor (query, options, queries) {
super(query, options, queries);
2017-09-11 19:54:46 +08:00
this.aggregation = options.aggregation;
this.offset = options.offset;
}
_buildQueryTpl (ctx) {
return `
2017-12-05 00:03:31 +08:00
${offsetNameQueryTpl(ctx)}
${dataBucketsQuery(ctx)}
${allBucketsArrayQuery(ctx)}
SELECT
array_position(__wd_all_buckets.bins, __wd_buckets.timestamp) - 1 as bin,
date_part('epoch', timezone(__wd_tz.name, __wd_buckets.timestamp)) AS timestamp,
__wd_buckets.freq as freq,
date_part('epoch', timezone(__wd_tz.name, (__wd_all_buckets.bins)[1])) as timestamp_start,
array_length(__wd_all_buckets.bins, 1) as bins_number,
date_part('epoch', interval '${ctx.interval}') as bin_width,
__wd_buckets.nulls_count as nulls_count
FROM __wd_buckets, __wd_all_buckets, __wd_tz
GROUP BY __wd_tz.name, __wd_all_buckets.bins, __wd_buckets.timestamp, __wd_buckets.nulls_count, __wd_buckets.freq
ORDER BY bin ASC;
`;
}
_buildQuery (psql, override, callback) {
2017-09-12 00:57:50 +08:00
if (!this._isValidAggregation(override)) {
return callback(new Error('Invalid aggregation value. Valid ones: ' +
Object.keys(DATE_AGGREGATIONS).join(', ')
));
}
2017-09-12 00:44:14 +08:00
if (this._getAggregation(override) === 'auto') {
2017-09-08 18:01:15 +08:00
this._getAutomaticAggregation(psql, function (err, aggregation) {
if (err || aggregation === 'none') {
this.aggregation = 'day';
} else {
this.aggregation = aggregation;
}
override.aggregation = this.aggregation;
2017-09-08 17:55:28 +08:00
this._buildQuery(psql, override, callback);
}.bind(this));
return null;
}
2019-10-22 01:07:24 +08:00
var interval = this._getAggregation(override) === 'quarter'
? '3 months' : '1 ' + this._getAggregation(override);
2017-12-05 00:03:31 +08:00
const histogramSql = this._buildQueryTpl({
override: override,
query: this.query,
column: this.column,
aggregation: this._getAggregation(override),
start: this._getBinStart(override),
end: this._getBinEnd(override),
2017-12-05 00:03:31 +08:00
offset: this._parseOffset(override),
interval: interval
});
2017-09-12 00:44:14 +08:00
debug(histogramSql);
return callback(null, histogramSql);
}
_isValidAggregation (override) {
return Object.prototype.hasOwnProperty.call(DATE_AGGREGATIONS, this._getAggregation(override));
}
2017-09-08 18:01:15 +08:00
_getAutomaticAggregation (psql, callback) {
2017-09-08 23:43:10 +08:00
const dateIntervalQuery = dateIntervalQueryTpl({
query: this.query,
column: this.column
});
psql.query(dateIntervalQuery, function (err, result) {
if (err) {
return callback(err);
}
const aggregations = result.rows[0];
const aggregation = Object.keys(aggregations)
.map(key => ({ name: key, value: aggregations[key] }))
2017-09-12 00:57:50 +08:00
.reduce((closer, current) => {
if (current.value > MAX_INTERVAL_VALUE) {
return closer;
}
2017-09-11 17:48:33 +08:00
const closerDiff = MAX_INTERVAL_VALUE - closer.value;
const currentDiff = MAX_INTERVAL_VALUE - current.value;
if (Number.isFinite(current.value) && closerDiff > currentDiff) {
return current;
}
return closer;
}, { name: 'none', value: -1 });
callback(null, aggregation.name);
});
}
_getSummary (result, override) {
const firstRow = result.rows[0] || {};
return {
aggregation: this._getAggregation(override),
offset: this._getOffset(override),
timestamp_start: firstRow.timestamp_start,
bin_width: firstRow.bin_width || 0,
bins_count: firstRow.bins_number || 0,
bins_start: firstRow.timestamp,
nulls: firstRow.nulls_count,
infinities: firstRow.infinities_count,
nans: firstRow.nans_count,
avg: firstRow.avg_val
};
}
_getBuckets (result) {
2019-10-22 01:07:24 +08:00
result.rows.forEach(function (row) {
2017-12-05 00:03:31 +08:00
row.min = row.max = row.avg = row.timestamp;
});
return result.rows.map(({ bin, min, max, avg, freq, timestamp }) => ({ bin, min, max, avg, freq, timestamp }));
}
_getAggregation (override = {}) {
2017-09-12 00:44:14 +08:00
return override.aggregation ? override.aggregation : this.aggregation;
}
_getOffset (override = {}) {
2017-09-12 00:44:14 +08:00
return Number.isFinite(override.offset) ? override.offset : (this.offset || 0);
}
2017-09-12 00:44:14 +08:00
_parseOffset (override) {
if (this._shouldIgnoreOffset(override)) {
return '0';
}
2017-09-12 00:44:14 +08:00
const offsetInHours = Math.ceil(this._getOffset(override) / 3600);
return '' + offsetInHours;
}
2017-09-12 00:44:14 +08:00
_shouldIgnoreOffset (override) {
return (this._getAggregation(override) === 'hour' || this._getAggregation(override) === 'minute');
}
};