Shared cache for affected tables in layergroup and map controllers
This commit is contained in:
parent
c295584864
commit
e8b5845174
@ -48,7 +48,7 @@ QueryTablesApi.prototype.getAffectedTablesAndLastUpdatedTime = function (usernam
|
||||
function handleAffectedTablesAndLastUpdatedTimeRows(err, rows, callback) {
|
||||
if (err || rows.length === 0) {
|
||||
var msg = err.message ? err.message : err;
|
||||
callback(new Error('could not fetch affected tables and last updated time: ' + msg));
|
||||
callback(new Error('could not fetch affected tables or last updated time: ' + msg));
|
||||
return;
|
||||
}
|
||||
|
||||
@ -65,6 +65,35 @@ function handleAffectedTablesAndLastUpdatedTimeRows(err, rows, callback) {
|
||||
});
|
||||
}
|
||||
|
||||
QueryTablesApi.prototype.getLastUpdatedTime = function (username, tableNames, callback) {
|
||||
if (!Array.isArray(tableNames) || tableNames.length === 0) {
|
||||
return callback(null, 0);
|
||||
}
|
||||
|
||||
var query = [
|
||||
'SELECT EXTRACT(EPOCH FROM max(updated_at)) as max',
|
||||
'FROM CDB_TableMetadata m WHERE m.tabname = any (ARRAY[',
|
||||
tableNames.map(function(t) { return "'" + t + "'::regclass"; }).join(','),
|
||||
'])'
|
||||
].join(' ');
|
||||
|
||||
this.pgQueryRunner.run(username, query, handleLastUpdatedTimeRows, callback);
|
||||
};
|
||||
|
||||
function handleLastUpdatedTimeRows(err, rows, callback) {
|
||||
if (err) {
|
||||
var msg = err.message ? err.message : err;
|
||||
return callback(new Error('could not fetch affected tables or last updated time: ' + msg));
|
||||
}
|
||||
// when the table has not updated_at means it hasn't been changed so a default last_updated is set
|
||||
var lastUpdated = 0;
|
||||
if (rows.length !== 0) {
|
||||
lastUpdated = rows[0].max || 0;
|
||||
}
|
||||
|
||||
return callback(null, lastUpdated*1000);
|
||||
}
|
||||
|
||||
function prepareSql(sql) {
|
||||
return sql
|
||||
.replace(affectedTableRegexCache.bbox, 'ST_MakeEnvelope(0,0,0,0)')
|
||||
|
22
lib/cartodb/cache/layergroup_affected_tables.js
vendored
Normal file
22
lib/cartodb/cache/layergroup_affected_tables.js
vendored
Normal file
@ -0,0 +1,22 @@
|
||||
function LayergroupAffectedTables() {
|
||||
// layergroupId -> affected tables cache
|
||||
this.cache = {};
|
||||
}
|
||||
|
||||
module.exports = LayergroupAffectedTables;
|
||||
|
||||
LayergroupAffectedTables.prototype.hasAffectedTables = function(dbName, layergroupId) {
|
||||
return this.cache.hasOwnProperty(createKey(dbName, layergroupId));
|
||||
};
|
||||
|
||||
LayergroupAffectedTables.prototype.set = function(dbName, layergroupId, affectedTables) {
|
||||
this.cache[createKey(dbName, layergroupId)] = affectedTables;
|
||||
};
|
||||
|
||||
LayergroupAffectedTables.prototype.get = function(dbName, layergroupId) {
|
||||
return this.cache[createKey(dbName, layergroupId)];
|
||||
};
|
||||
|
||||
function createKey(dbName, layergroupId) {
|
||||
return dbName + ':' + layergroupId;
|
||||
}
|
@ -15,6 +15,10 @@ DatabaseTables.prototype.key = function() {
|
||||
}.bind(this));
|
||||
};
|
||||
|
||||
DatabaseTables.prototype.getCacheChannel = function() {
|
||||
return this.dbName + ':' + this.tableNames.join(',');
|
||||
};
|
||||
|
||||
function shortHashKey(target) {
|
||||
return crypto.createHash('sha256').update(target).digest('base64').substring(0,6);
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ var step = require('step');
|
||||
var cors = require('../middleware/cors');
|
||||
|
||||
var MapStoreMapConfigProvider = require('../models/mapconfig/map_store_provider');
|
||||
var TablesCacheEntry = require('../cache/model/database_tables_entry');
|
||||
|
||||
/**
|
||||
* @param app
|
||||
@ -11,19 +12,23 @@ var MapStoreMapConfigProvider = require('../models/mapconfig/map_store_provider'
|
||||
* @param {TileBackend} tileBackend
|
||||
* @param {PreviewBackend} previewBackend
|
||||
* @param {AttributesBackend} attributesBackend
|
||||
* @param {SurrogateKeysCache} surrogateKeysCache
|
||||
* @param {UserLimitsApi} userLimitsApi
|
||||
* @param {QueryTablesApi} queryTablesApi
|
||||
* @param {LayergroupAffectedTables} layergroupAffectedTables
|
||||
* @constructor
|
||||
*/
|
||||
function LayergroupController(app, mapStore, tileBackend, previewBackend, attributesBackend, userLimitsApi,
|
||||
queryTablesApi) {
|
||||
function LayergroupController(app, mapStore, tileBackend, previewBackend, attributesBackend, surrogateKeysCache,
|
||||
userLimitsApi, queryTablesApi, layergroupAffectedTables) {
|
||||
this.app = app;
|
||||
this.mapStore = mapStore;
|
||||
this.tileBackend = tileBackend;
|
||||
this.previewBackend = previewBackend;
|
||||
this.attributesBackend = attributesBackend;
|
||||
this.surrogateKeysCache = surrogateKeysCache;
|
||||
this.userLimitsApi = userLimitsApi;
|
||||
this.queryTablesApi = queryTablesApi;
|
||||
this.layergroupAffectedTables = layergroupAffectedTables;
|
||||
|
||||
this.channelCache = {};
|
||||
}
|
||||
@ -232,16 +237,19 @@ LayergroupController.prototype.sendResponse = function(req, res, args) {
|
||||
}
|
||||
res.header('Last-Modified', lastUpdated.toUTCString());
|
||||
|
||||
var dbName = req.params.dbname;
|
||||
step(
|
||||
function getCacheChannel() {
|
||||
self.cacheChannel(req, this);
|
||||
function getAffectedTables() {
|
||||
self.getAffectedTables(req.context.user, dbName, req.params.token, this);
|
||||
},
|
||||
function sendResponse(err, cacheChannel) {
|
||||
function sendResponse(err, affectedTables) {
|
||||
if (err) {
|
||||
console.log('ERROR generating cache channel: ' + err);
|
||||
}
|
||||
if (!!cacheChannel) {
|
||||
res.header('X-Cache-Channel', cacheChannel);
|
||||
if (!!affectedTables) {
|
||||
var tablesCacheEntry = new TablesCacheEntry(dbName, affectedTables);
|
||||
res.header('X-Cache-Channel', tablesCacheEntry.getCacheChannel());
|
||||
self.surrogateKeysCache.tag(res, tablesCacheEntry);
|
||||
}
|
||||
self.app.sendResponse(res, args);
|
||||
}
|
||||
@ -249,30 +257,13 @@ LayergroupController.prototype.sendResponse = function(req, res, args) {
|
||||
|
||||
};
|
||||
|
||||
LayergroupController.prototype.buildCacheChannel = function (dbName, tableNames){
|
||||
return dbName + ':' + tableNames.join(',');
|
||||
};
|
||||
|
||||
LayergroupController.prototype.cacheChannel = function(req, callback) {
|
||||
if (req.profiler) {
|
||||
req.profiler.start('addCacheChannel');
|
||||
}
|
||||
|
||||
var dbName = req.params.dbname;
|
||||
|
||||
// no token means no tables associated
|
||||
if (!req.params.token) {
|
||||
return callback(null, this.buildCacheChannel(dbName, []));
|
||||
}
|
||||
|
||||
LayergroupController.prototype.getAffectedTables = function(user, dbName, layergroupId, callback) {
|
||||
var self = this;
|
||||
|
||||
var cacheKey = [ dbName, req.params.token ].join(':');
|
||||
|
||||
step(
|
||||
function checkCached() {
|
||||
if ( self.channelCache.hasOwnProperty(cacheKey) ) {
|
||||
return callback(null, self.channelCache[cacheKey]);
|
||||
if (self.layergroupAffectedTables.hasAffectedTables(dbName, layergroupId)) {
|
||||
return callback(null, self.layergroupAffectedTables.get(dbName, layergroupId));
|
||||
}
|
||||
return null;
|
||||
},
|
||||
@ -281,12 +272,9 @@ LayergroupController.prototype.cacheChannel = function(req, callback) {
|
||||
|
||||
step(
|
||||
function loadFromStore() {
|
||||
self.mapStore.load(req.params.token, this);
|
||||
self.mapStore.load(layergroupId, this);
|
||||
},
|
||||
function getSQL(err, mapConfig) {
|
||||
if (req.profiler) {
|
||||
req.profiler.done('mapStore_load');
|
||||
}
|
||||
assert.ifError(err);
|
||||
|
||||
var queries = mapConfig.getLayers()
|
||||
@ -309,22 +297,17 @@ LayergroupController.prototype.cacheChannel = function(req, callback) {
|
||||
throw new Error("this request doesn't need an X-Cache-Channel generated");
|
||||
}
|
||||
|
||||
self.queryTablesApi.getAffectedTablesInQuery(req.context.user, sql, this); // in addCacheChannel
|
||||
self.queryTablesApi.getAffectedTablesInQuery(user, sql, this); // in addCacheChannel
|
||||
},
|
||||
function buildCacheChannel(err, tableNames) {
|
||||
assert.ifError(err);
|
||||
|
||||
if (req.profiler) {
|
||||
req.profiler.done('affectedTables');
|
||||
}
|
||||
self.layergroupAffectedTables.set(dbName, layergroupId, tableNames);
|
||||
|
||||
var cacheChannel = self.buildCacheChannel(dbName, tableNames);
|
||||
self.channelCache[cacheKey] = cacheChannel;
|
||||
|
||||
return cacheChannel;
|
||||
return tableNames;
|
||||
},
|
||||
function finish(err, cacheChannel) {
|
||||
callback(err, cacheChannel);
|
||||
function finish(err, affectedTables) {
|
||||
callback(err, affectedTables);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
@ -23,11 +23,12 @@ var CreateLayergroupMapConfigProvider = require('../models/mapconfig/create_laye
|
||||
* @param metadataBackend
|
||||
* @param {QueryTablesApi} queryTablesApi
|
||||
* @param {SurrogateKeysCache} surrogateKeysCache
|
||||
* @param {{UserLimitsApi}} userLimitsApi
|
||||
* @param {UserLimitsApi} userLimitsApi
|
||||
* @param {LayergroupAffectedTables} layergroupAffectedTables
|
||||
* @constructor
|
||||
*/
|
||||
function MapController(app, pgConnection, templateMaps, mapBackend, metadataBackend, queryTablesApi,
|
||||
surrogateKeysCache, userLimitsApi) {
|
||||
surrogateKeysCache, userLimitsApi, layergroupAffectedTables) {
|
||||
this.app = app;
|
||||
this.pgConnection = pgConnection;
|
||||
this.templateMaps = templateMaps;
|
||||
@ -36,9 +37,9 @@ function MapController(app, pgConnection, templateMaps, mapBackend, metadataBack
|
||||
this.queryTablesApi = queryTablesApi;
|
||||
this.surrogateKeysCache = surrogateKeysCache;
|
||||
this.userLimitsApi = userLimitsApi;
|
||||
this.namedLayersAdapter = new MapConfigNamedLayersAdapter(templateMaps);
|
||||
this.layergroupAffectedTables = layergroupAffectedTables;
|
||||
|
||||
this.channelCache = {};
|
||||
this.namedLayersAdapter = new MapConfigNamedLayersAdapter(templateMaps);
|
||||
}
|
||||
|
||||
module.exports = MapController;
|
||||
@ -265,31 +266,46 @@ MapController.prototype.afterLayergroupCreate = function(req, res, mapconfig, la
|
||||
}).join(';');
|
||||
|
||||
var dbName = req.params.dbname;
|
||||
var cacheKey = dbName + ':' + layergroup.layergroupid;
|
||||
var layergroupId = layergroup.layergroupid;
|
||||
|
||||
step(
|
||||
function getAffectedTablesAndLastUpdatedTime() {
|
||||
self.queryTablesApi.getAffectedTablesAndLastUpdatedTime(username, sql, this);
|
||||
function checkCachedAffectedTables() {
|
||||
return self.layergroupAffectedTables.hasAffectedTables(dbName, layergroupId);
|
||||
},
|
||||
function getAffectedTablesAndLastUpdatedTime(err, hasCache) {
|
||||
assert.ifError(err);
|
||||
if (hasCache) {
|
||||
var next = this;
|
||||
var affectedTables = self.layergroupAffectedTables.get(dbName, layergroupId);
|
||||
self.queryTablesApi.getLastUpdatedTime(username, affectedTables, function(err, lastUpdatedTime) {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
return next(null, { affectedTables: affectedTables, lastUpdatedTime: lastUpdatedTime });
|
||||
});
|
||||
} else {
|
||||
self.queryTablesApi.getAffectedTablesAndLastUpdatedTime(username, sql, this);
|
||||
}
|
||||
},
|
||||
function handleAffectedTablesAndLastUpdatedTime(err, result) {
|
||||
if (req.profiler) {
|
||||
req.profiler.done('queryTablesAndLastUpdated');
|
||||
}
|
||||
assert.ifError(err);
|
||||
var cacheChannel = self.buildCacheChannel(dbName, result.affectedTables);
|
||||
self.channelCache[cacheKey] = cacheChannel;
|
||||
self.layergroupAffectedTables.set(dbName, layergroupId, result.affectedTables);
|
||||
|
||||
// last update for layergroup cache buster
|
||||
layergroup.layergroupid = layergroup.layergroupid + ':' + result.lastUpdatedTime;
|
||||
layergroup.last_updated = new Date(result.lastUpdatedTime).toISOString();
|
||||
|
||||
if (req.method === 'GET') {
|
||||
var tableCacheEntry = new TablesCacheEntry(dbName, result.affectedTables);
|
||||
var ttl = global.environment.varnish.layergroupTtl || 86400;
|
||||
res.header('Cache-Control', 'public,max-age='+ttl+',must-revalidate');
|
||||
res.header('Last-Modified', (new Date()).toUTCString());
|
||||
res.header('X-Cache-Channel', cacheChannel);
|
||||
res.header('X-Cache-Channel', tableCacheEntry.getCacheChannel());
|
||||
if (result.affectedTables && result.affectedTables.length > 0) {
|
||||
self.surrogateKeysCache.tag(res, new TablesCacheEntry(dbName, result.affectedTables));
|
||||
self.surrogateKeysCache.tag(res, tableCacheEntry);
|
||||
}
|
||||
}
|
||||
|
||||
@ -300,7 +316,3 @@ MapController.prototype.afterLayergroupCreate = function(req, res, mapconfig, la
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
MapController.prototype.buildCacheChannel = function (dbName, tableNames){
|
||||
return dbName + ':' + tableNames.join(',');
|
||||
};
|
||||
|
@ -20,6 +20,7 @@ var TemplateMaps = require('./backends/template_maps.js');
|
||||
var QueryTablesApi = require('./api/query_tables_api');
|
||||
var UserLimitsApi = require('./api/user_limits_api');
|
||||
var AuthApi = require('./api/auth_api');
|
||||
var LayergroupAffectedTablesCache = require('./cache/layergroup_affected_tables');
|
||||
var PgQueryRunner = require('./backends/pg_query_runner');
|
||||
var PgConnection = require('./backends/pg_connection');
|
||||
|
||||
@ -161,6 +162,9 @@ module.exports = function(serverOptions) {
|
||||
var mapValidatorBackend = new windshaft.backend.MapValidator(tileBackend, attributesBackend);
|
||||
var mapBackend = new windshaft.backend.Map(rendererCache, mapStore, mapValidatorBackend);
|
||||
|
||||
var layergroupAffectedTablesCache = new LayergroupAffectedTablesCache();
|
||||
app.layergroupAffectedTablesCache = layergroupAffectedTablesCache;
|
||||
|
||||
var authApi = new AuthApi(pgConnection, metadataBackend, mapStore, templateMaps);
|
||||
|
||||
app.findStatusCode = function(err) {
|
||||
@ -191,8 +195,10 @@ module.exports = function(serverOptions) {
|
||||
tileBackend,
|
||||
previewBackend,
|
||||
attributesBackend,
|
||||
surrogateKeysCache,
|
||||
userLimitsApi,
|
||||
queryTablesApi
|
||||
queryTablesApi,
|
||||
layergroupAffectedTablesCache
|
||||
).register(app);
|
||||
|
||||
new controller.Map(
|
||||
@ -203,7 +209,8 @@ module.exports = function(serverOptions) {
|
||||
metadataBackend,
|
||||
queryTablesApi,
|
||||
surrogateKeysCache,
|
||||
userLimitsApi
|
||||
userLimitsApi,
|
||||
layergroupAffectedTablesCache
|
||||
).register(app);
|
||||
|
||||
new controller.NamedMaps(
|
||||
|
@ -318,7 +318,7 @@ describe('tests from old api translated to multilayer', function() {
|
||||
|
||||
var parsed = JSON.parse(res.body);
|
||||
assert.deepEqual(parsed, {
|
||||
errors: ["Error: could not fetch affected tables and last updated time: fake error message"]
|
||||
errors: ["Error: could not fetch affected tables or last updated time: fake error message"]
|
||||
});
|
||||
|
||||
done();
|
||||
@ -346,7 +346,7 @@ describe('tests from old api translated to multilayer', function() {
|
||||
};
|
||||
|
||||
// reset internal cacheChannel cache
|
||||
server.channelCache = {};
|
||||
server.layergroupAffectedTablesCache.cache = {};
|
||||
|
||||
assert.response(server,
|
||||
{
|
||||
|
@ -21,7 +21,7 @@ var server = new CartodbWindshaft(serverOptions);
|
||||
server.setMaxListeners(0);
|
||||
|
||||
describe('template_api', function() {
|
||||
server.channelCache = {};
|
||||
server.layergroupAffectedTablesCache.cache = {};
|
||||
|
||||
var redis_client = redis.createClient(global.environment.redis.port);
|
||||
|
||||
@ -1155,7 +1155,7 @@ describe('template_api', function() {
|
||||
assert.ok(cc);
|
||||
assert.ok(cc.match, /ciao/, cc);
|
||||
// hack simulating restart...
|
||||
server.channelCache = {}; // need to clean channel cache
|
||||
server.layergroupAffectedTablesCache.cache = {}; // need to clean channel cache
|
||||
var get_request = {
|
||||
url: '/api/v1/map/' + layergroupid + ':cb1/0/0/0/1.json.torque?auth_token=valid1',
|
||||
method: 'GET',
|
||||
|
Loading…
Reference in New Issue
Block a user