Windshaft-cartodb/lib/models/dataview/histograms/date-histogram.js

324 lines
10 KiB
JavaScript

'use strict';
const BaseHistogram = require('./base-histogram');
const debug = require('debug')('windshaft:dataview:date-histogram');
const utils = require('../../../utils/query-utils');
/**
* Gets the name of a timezone with the same offset as the required
* using the pg_timezone_names table. We do this because it's simpler to pass
* the name than to pass the offset itself as PostgreSQL uses different
* sign convention. For example: TIME ZONE 'CET' is equal to TIME ZONE 'UTC-1',
* not 'UTC+1' which would be expected.
* Gives priority to Etc/GMT±N timezones but still support odd offsets like 8.5
* hours for Asia/Pyongyang.
* It also makes it easier to, in the future, support the input of expected timezone
* instead of the offset; that is using 'Europe/Madrid' instead of
* '+3600' or '+7200'. The daylight saving status can be handled by postgres.
*/
const offsetNameQueryTpl = ctx => `
WITH __wd_tz AS
(
SELECT name
FROM pg_timezone_names
WHERE utc_offset = interval '${ctx.offset} hours'
ORDER BY CASE WHEN name LIKE 'Etc/GMT%' THEN 0 ELSE 1 END
LIMIT 1
),`;
/**
* Function to get the subquery that places each row in its bin depending on
* the aggregation. Since the data stored is in epoch we need to adapt it to
* our timezone so when calling date_trunc it falls into the correct bin
*/
function dataBucketsQuery (ctx) {
var condition_str = '';
if (ctx.start !== 0) {
condition_str = `WHERE ${ctx.column} >= to_timestamp(${ctx.start})`;
}
if (ctx.end !== 0) {
if (condition_str === '') {
condition_str = `WHERE ${ctx.column} <= to_timestamp(${ctx.end})`;
} else {
condition_str += ` and ${ctx.column} <= to_timestamp(${ctx.end})`;
}
}
return `
__wd_buckets AS
(
SELECT
date_trunc('${ctx.aggregation}', timezone(__wd_tz.name, ${ctx.column}::timestamptz)) as timestamp,
count(*) as freq,
${utils.countNULLs(ctx)} as nulls_count
FROM
(
${ctx.query}
) __source, __wd_tz
${condition_str}
GROUP BY 1, __wd_tz.name
),`;
}
/**
* Function that generates an array with all the possible bins between the
* start and end date. If not provided we use the min and max generated from
* the dataBucketsQuery
*/
function allBucketsArrayQuery (ctx) {
var extra_from = '';
var series_start = '';
var series_end = '';
if (ctx.start === 0) {
extra_from = ', __wd_buckets GROUP BY __wd_tz.name';
series_start = 'min(__wd_buckets.timestamp)';
} else {
series_start = `date_trunc('${ctx.aggregation}', timezone(__wd_tz.name, to_timestamp(${ctx.start})))`;
}
if (ctx.end === 0) {
extra_from = ', __wd_buckets GROUP BY __wd_tz.name';
series_end = 'max(__wd_buckets.timestamp)';
} else {
series_end = `date_trunc('${ctx.aggregation}', timezone(__wd_tz.name, to_timestamp(${ctx.end})))`;
}
return `
__wd_all_buckets AS
(
SELECT ARRAY(
SELECT
generate_series(
${series_start},
${series_end},
interval '${ctx.interval}') as bin_start
FROM __wd_tz${extra_from}
) as bins
)`;
}
const dateIntervalQueryTpl = ctx => `
WITH
__cdb_dates AS (
SELECT
MAX(${ctx.column}::timestamp) AS __cdb_end,
MIN(${ctx.column}::timestamp) AS __cdb_start
FROM (${ctx.query}) __cdb_source
),
__cdb_interval_in_days AS (
SELECT
DATE_PART('day', __cdb_end - __cdb_start) AS __cdb_days
FROM __cdb_dates
),
__cdb_interval_in_hours AS (
SELECT
__cdb_days * 24 + DATE_PART('hour', __cdb_end - __cdb_start) AS __cdb_hours
FROM __cdb_interval_in_days, __cdb_dates
),
__cdb_interval_in_minutes AS (
SELECT
__cdb_hours * 60 + DATE_PART('minute', __cdb_end - __cdb_start) AS __cdb_minutes
FROM __cdb_interval_in_hours, __cdb_dates
),
__cdb_interval_in_seconds AS (
SELECT
__cdb_minutes * 60 + DATE_PART('second', __cdb_end - __cdb_start) AS __cdb_seconds
FROM __cdb_interval_in_minutes, __cdb_dates
)
SELECT
ROUND(__cdb_days / 365243) AS millennium,
ROUND(__cdb_days / 36525) AS century,
ROUND(__cdb_days / 3652) AS decade,
ROUND(__cdb_days / 365) AS year,
ROUND(__cdb_days / 91) AS quarter,
ROUND(__cdb_days / 30) AS month,
ROUND(__cdb_days / 7) AS week,
__cdb_days AS day,
__cdb_hours AS hour,
__cdb_minutes AS minute,
__cdb_seconds AS second
FROM __cdb_interval_in_days, __cdb_interval_in_hours, __cdb_interval_in_minutes, __cdb_interval_in_seconds
`;
/** Constant to switch between aggregations in auto mode */
const MAX_INTERVAL_VALUE = 100;
const DATE_AGGREGATIONS = {
auto: true,
second: true,
minute: true,
hour: true,
day: true,
week: true,
month: true,
quarter: true,
year: true,
decade: true,
century: true,
millennium: true
};
/**
date_histogram: {
type: 'histogram',
options: {
column: 'date', // column data type: date
aggregation: 'day' // MANDATORY
offset: -7200 // OPTIONAL (UTC offset in seconds)
}
}
*/
module.exports = class DateHistogram extends BaseHistogram {
constructor (query, options, queries) {
super(query, options, queries);
this.aggregation = options.aggregation;
this.offset = options.offset;
}
_buildQueryTpl (ctx) {
return `
${offsetNameQueryTpl(ctx)}
${dataBucketsQuery(ctx)}
${allBucketsArrayQuery(ctx)}
SELECT
array_position(__wd_all_buckets.bins, __wd_buckets.timestamp) - 1 as bin,
date_part('epoch', timezone(__wd_tz.name, __wd_buckets.timestamp)) AS timestamp,
__wd_buckets.freq as freq,
date_part('epoch', timezone(__wd_tz.name, (__wd_all_buckets.bins)[1])) as timestamp_start,
array_length(__wd_all_buckets.bins, 1) as bins_number,
date_part('epoch', interval '${ctx.interval}') as bin_width,
__wd_buckets.nulls_count as nulls_count
FROM __wd_buckets, __wd_all_buckets, __wd_tz
GROUP BY __wd_tz.name, __wd_all_buckets.bins, __wd_buckets.timestamp, __wd_buckets.nulls_count, __wd_buckets.freq
ORDER BY bin ASC;
`;
}
_buildQuery (psql, override, callback) {
if (!this._isValidAggregation(override)) {
return callback(new Error('Invalid aggregation value. Valid ones: ' +
Object.keys(DATE_AGGREGATIONS).join(', ')
));
}
if (this._getAggregation(override) === 'auto') {
this._getAutomaticAggregation(psql, function (err, aggregation) {
if (err || aggregation === 'none') {
this.aggregation = 'day';
} else {
this.aggregation = aggregation;
}
override.aggregation = this.aggregation;
this._buildQuery(psql, override, callback);
}.bind(this));
return null;
}
var interval = this._getAggregation(override) === 'quarter'
? '3 months' : '1 ' + this._getAggregation(override);
const histogramSql = this._buildQueryTpl({
override: override,
query: this.query,
column: this.column,
aggregation: this._getAggregation(override),
start: this._getBinStart(override),
end: this._getBinEnd(override),
offset: this._parseOffset(override),
interval: interval
});
debug(histogramSql);
return callback(null, histogramSql);
}
_isValidAggregation (override) {
return Object.prototype.hasOwnProperty.call(DATE_AGGREGATIONS, this._getAggregation(override));
}
_getAutomaticAggregation (psql, callback) {
const dateIntervalQuery = dateIntervalQueryTpl({
query: this.query,
column: this.column
});
psql.query(dateIntervalQuery, function (err, result) {
if (err) {
return callback(err);
}
const aggregations = result.rows[0];
const aggregation = Object.keys(aggregations)
.map(key => ({ name: key, value: aggregations[key] }))
.reduce((closer, current) => {
if (current.value > MAX_INTERVAL_VALUE) {
return closer;
}
const closerDiff = MAX_INTERVAL_VALUE - closer.value;
const currentDiff = MAX_INTERVAL_VALUE - current.value;
if (Number.isFinite(current.value) && closerDiff > currentDiff) {
return current;
}
return closer;
}, { name: 'none', value: -1 });
callback(null, aggregation.name);
});
}
_getSummary (result, override) {
const firstRow = result.rows[0] || {};
return {
aggregation: this._getAggregation(override),
offset: this._getOffset(override),
timestamp_start: firstRow.timestamp_start,
bin_width: firstRow.bin_width || 0,
bins_count: firstRow.bins_number || 0,
bins_start: firstRow.timestamp,
nulls: firstRow.nulls_count,
infinities: firstRow.infinities_count,
nans: firstRow.nans_count,
avg: firstRow.avg_val
};
}
_getBuckets (result) {
result.rows.forEach(function (row) {
row.min = row.max = row.avg = row.timestamp;
});
return result.rows.map(({ bin, min, max, avg, freq, timestamp }) => ({ bin, min, max, avg, freq, timestamp }));
}
_getAggregation (override = {}) {
return override.aggregation ? override.aggregation : this.aggregation;
}
_getOffset (override = {}) {
return Number.isFinite(override.offset) ? override.offset : (this.offset || 0);
}
_parseOffset (override) {
if (this._shouldIgnoreOffset(override)) {
return '0';
}
const offsetInHours = Math.ceil(this._getOffset(override) / 3600);
return '' + offsetInHours;
}
_shouldIgnoreOffset (override) {
return (this._getAggregation(override) === 'hour' || this._getAggregation(override) === 'minute');
}
};