var _ = require('underscore') , Step = require('step') , cartoData = require('./carto_data') , Cache = require('./cache_validator') , mapnik = require('mapnik') , crypto = require('crypto') , request = require('request') ; module.exports = function(){ var rendererConfig = _.defaults(global.environment.renderer || {}, { cache_ttl: 60000, metatile: 4, bufferSize: 64 }); var me = { base_url: '/tiles/:table', base_url_notable: '/tiles', grainstore: { datasource: global.environment.postgres, cachedir: global.environment.millstone.cache_basedir, mapnik_version: global.environment.mapnik_version || mapnik.versions.mapnik }, mapnik: { metatile: rendererConfig.metatile, bufferSize: rendererConfig.bufferSize }, renderCache: { ttl: rendererConfig.cache_ttl }, redis: global.environment.redis, enable_cors: global.environment.enable_cors, varnish_host: global.environment.varnish.host, varnish_port: global.environment.varnish.port, cache_enabled: global.environment.cache_enabled, log_format: global.environment.log_format }; // Be nice and warn if configured mapnik version // is != instaled mapnik version if ( mapnik.versions.mapnik != me.grainstore.mapnik_version ) { console.warn("WARNING: detected mapnik version (" + mapnik.versions.mapnik + ") != configured mapnik version (" + me.grainstore.mapnik_version + ")"); } /* This whole block is about generating X-Cache-Channel { */ // TODO: review lifetime of elements of this cache // NOTE: by-token indices should only be dropped when // the corresponding layegroup is dropped, because // we have no SQL after layer creation. me.channelCache = {}; // Run a query through the SQL api me.sqlQuery = function (username, api_key, sql, callback) { var api = global.environment.sqlapi; // build up api string var sqlapi = api.protocol + '://' + username + '.' + api.host + ':' + api.port + '/api/' + api.version + '/sql' var qs = { q: sql } // add api_key if given if (_.isString(api_key) && api_key != '') { qs.api_key = api_key; } // call sql api request.get({url:sqlapi, qs:qs, json:true}, function(err, res, body){ if (err){ callback(err); return; } if (res.statusCode != 200) { var msg = res.body.error ? res.body.error : res.body; callback(new Error('unexpected response status (' + res.statusCode + ') for sql query: ' + sql)); return; } callback(null, body.rows); }); }; me.findLastUpdated = function (username, api_key, tableNames, callback) { var sql = 'SELECT EXTRACT(EPOCH FROM max(updated_at)) FROM CDB_TableMetadata WHERE m.tabname::name = any ({' + tableNames.join(',') + '})'; // call sql api me.sqlQuery(username, api_key, sql, function(err, rows){ if (err){ var msg = err.message ? err.message : err; callback(new Error('could not find last updated timestamp: ' + msg)); return; } var last_updated = rows[0].max; callback(null, last_updated); }); }; me.affectedTables = function (username, api_key, sql, callback) { var sql = 'SELECT CDB_QueryTables($windshaft$' + sql + '$windshaft$)'; // call sql api me.sqlQuery(username, api_key, sql, function(err, rows){ if (err){ var msg = err.message ? err.message : err; callback(new Error('could not fetch source tables: ' + msg)); return; } var qtables = rows[0].cdb_querytables; var tableNames = qtables.split(/^\{(.*)\}$/)[1]; tableNames = tableNames.split(','); callback(null, tableNames); }); }; me.buildCacheChannel = function (dbName, tableNames){ return dbName + ':' + tableNames.join(','); }; me.generateMD5 = function(data){ var hash = crypto.createHash('md5'); hash.update(data); return hash.digest('hex'); } me.generateCacheChannel = function(req, callback){ // use key to call sql api with sql request if present, else // just return dbname and table name base key var dbName = req.params.dbname; var cacheKey = [ dbName ]; if ( req.params.token ) cacheKey.push(req.params.token); else if ( req.params.sql ) cacheKey.push( me.generateMD5(req.params.sql) ); cacheKey = cacheKey.join(':'); if ( me.channelCache.hasOwnProperty(cacheKey) ) { callback(null, me.channelCache[cacheKey]); return; } if ( req.params.token ) { if ( ! me.channelCache.hasOwnProperty(cacheKey) ) { callback(new Error('missing channel cache for token ' + req.params.token)); } else { callback(null, me.channelCache[cacheKey]); } return; } if ( ! req.params.sql && ! req.params.token ) { var cacheChannel = me.buildCacheChannel(dbName, [req.params.table]); // not worth caching this callback(null, cacheChannel); return; } if ( ! req.params.sql ) { callback(new Error("this request doesn't need an X-Cache-Channel generated")); return; } var dbName = req.params.dbname; var username = req.headers.host.split('.')[0]; // strip out windshaft/mapnik inserted sql if present var sql = req.params.sql.match(/^\((.*)\)\sas\scdbq$/); sql = (sql != null) ? sql[1] : req.params.sql; me.affectedTables(username, req.params.map_key, sql, function(err, tableNames) { if ( err ) { callback(err); return; } var cacheChannel = me.buildCacheChannel(dbName,tableNames); me.channelCache[cacheKey] = cacheChannel; // store for caching callback(null, cacheChannel); }); }; // Set the cache chanel info to invalidate the cache on the frontend server // // @param req The request object. // The function will have no effect unless req.res exists. // It is expected that req.params contains 'table' and 'dbname' // // @param cb function(err, channel) will be called when ready. // the channel parameter will be null if nothing was added // me.addCacheChannel = function(req, cb) { // skip non-GET requests, or requests for which there's no response if ( req.method != 'GET' || ! req.res ) { cb(null, null); return; } var res = req.res; var cache_policy = req.query.cache_policy; if ( cache_policy == 'persist' ) { res.header('Cache-Control', 'public,max-age=31536000'); // 1 year } else { var ttl = global.environment.varnish.ttl || 86400; res.header('Last-Modified', new Date().toUTCString()); res.header('Cache-Control', 'no-cache,max-age='+ttl+',must-revalidate, public'); } me.generateCacheChannel(req, function(err, channel){ if ( ! err ) { res.header('X-Cache-Channel', channel); cb(null, channel); } else { console.log('ERROR generating cache channel: ' + ( err.message ? err.message : err )); // TODO: evaluate if we should bubble up the error instead cb(null, 'ERROR'); } }); }; me.afterLayergroupCreate = function(req, response, callback) { var token = response.layergroupid; var mapconfig = req.body; var sql = []; _.each(mapconfig.layers, function(lyr) { sql.push(lyr.options.sql); }); sql = sql.join(';'); var dbName = req.params.dbname; var usr = req.headers.host.split('.')[0]; var key = req.params.map_key; var cacheKey = dbName + ':' + token; me.affectedTables(usr, key, sql, function(err, tableNames) { if ( err ) { callback(err); return; } var cacheChannel = me.buildCacheChannel(dbName,tableNames); me.channelCache[cacheKey] = cacheChannel; // store for caching // find last updated me.findLastUpdated(usr, key, tableNames, function(err, lastUpdated) { if ( err ) { callback(err); return; } response.last_updated = lastUpdated; callback(null); }); }); }; /* X-Cache-Channel generation } */ /** * Whitelist input and get database name & default geometry type from * subdomain/user metadata held in CartoDB Redis * @param req - standard express request obj. Should have host & table * @param callback */ me.req2params = function(req, callback){ // Whitelist query parameters and attach format var good_query = ['sql', 'geom_type', 'cache_buster', 'cache_policy', 'callback', 'interactivity', 'map_key', 'api_key', 'style', 'style_version', 'style_convert' ]; var bad_query = _.difference(_.keys(req.query), good_query); _.each(bad_query, function(key){ delete req.query[key]; }); req.params = _.extend({}, req.params); // shuffle things as request is a strange array/object // bring all query values onto req.params object _.extend(req.params, req.query); // for cartodb, ensure interactivity is cartodb_id or user specified req.params.interactivity = req.params.interactivity || 'cartodb_id'; req.params.processXML = function(req, xml, callback) { var dbuser = req.dbuser ? req.dbuser : global.settings.postgres.user; if ( ! me.rx_dbuser ) me.rx_dbuser = /(<\/Parameter>)/; xml = xml.replace(me.rx_dbuser, "$1" + dbuser + "$2"); callback(null, xml); } var that = this; Step( function getPrivacy(){ cartoData.authorize(req, this); }, function gatekeep(err, data){ if(err) throw err; if(data === "0") throw new Error("Sorry, you are unauthorized (permission denied)"); return data; }, function getDatabase(err, data){ if(err) throw err; cartoData.getDatabase(req, this); }, function getGeometryType(err, data){ if (err) throw err; _.extend(req.params, {dbname:data}); cartoData.getGeometryType(req, this); }, function finishSetup(err, data){ if ( err ) { callback(err, req); return; } if (!_.isNull(data)) _.extend(req.params, {geom_type: data}); that.addCacheChannel(req, function(err) { callback(err, req); }); } ); }; /** * Little helper method to get the current list of infowindow variables and return to client * @param req * @param callback */ me.getInfowindow = function(req, callback){ var that = this; Step( function(){ that.req2params(req, this); }, function(err, data){ if (err) callback(err, null); else cartoData.getInfowindow(data, callback); } ); }; /** * Little helper method to get map metadata and return to client * @param req * @param callback */ me.getMapMetadata = function(req, callback){ var that = this; Step( function(){ that.req2params(req, this); }, function(err, data){ if (err) callback(err, null); else cartoData.getMapMetadata(data, callback); } ); }; /** * Helper to clear out tile cache on request * @param req * @param callback */ me.flushCache = function(req, Cache, callback){ var that = this; Step( function getParams(){ // this is mostly to compute req.params.dbname that.req2params(req, this); }, function flushInternalCache(err){ // TODO: implement this, see // http://github.com/Vizzuality/Windshaft-cartodb/issues/73 return true; }, function flushVarnishCache(err){ if (err) { callback(err); return; } if(Cache) { Cache.invalidate_db(req.params.dbname, req.params.table); } callback(null, true); } ); }; return me; }();