From 14641d1a3b166f086af13c129e04b2ee5ca13c29 Mon Sep 17 00:00:00 2001 From: Leonardo Crauss Daronco Date: Thu, 13 Nov 2014 12:03:42 -0200 Subject: [PATCH] Webhooks: expire mappings for inactive meetings If a meeting has no events for too long (24 hours), remove the meeting mapping to prevent too much data from being stored forever on redis. --- labs/bbb-callback/config.coffee | 6 + labs/bbb-callback/hook.coffee | 9 +- labs/bbb-callback/meeting_id_map.coffee | 163 +++++++++++++++++++----- labs/bbb-callback/web_hooks.coffee | 12 +- 4 files changed, 156 insertions(+), 34 deletions(-) diff --git a/labs/bbb-callback/config.coffee b/labs/bbb-callback/config.coffee index 8a973502d8..20c72422f7 100644 --- a/labs/bbb-callback/config.coffee +++ b/labs/bbb-callback/config.coffee @@ -35,12 +35,18 @@ config.hooks.retryIntervals = [ 100, 500, 1000, 2000, 4000, 8000, 10000, 30000, 60000, 60000, 60000, 60000 ] +# Mappings of internal to external meeting IDs +config.mappings = {} +config.mappings.cleanupInterval = 1000 # in ms +config.mappings.timeout = 1000*60*60*24 # 24 hours, in ms + # Redis config.redis = {} config.redis.keys = {} config.redis.keys.hook = (id) -> "bigbluebutton:webhooks:hook:#{id}" config.redis.keys.hooks = "bigbluebutton:webhooks:hooks" config.redis.keys.mappings = "bigbluebutton:webhooks:mappings" +config.redis.keys.mapping = (id) -> "bigbluebutton:webhooks:mapping:#{id}" config.api = {} config.api.responses = {} diff --git a/labs/bbb-callback/hook.coffee b/labs/bbb-callback/hook.coffee index 67f85e46ee..947a390ceb 100644 --- a/labs/bbb-callback/hook.coffee +++ b/labs/bbb-callback/hook.coffee @@ -7,6 +7,13 @@ CallbackEmitter = require("./callback_emitter") MeetingIDMap = require("./meeting_id_map") # The database of hooks. +# Used always from memory, but saved to redis for persistence. +# +# Format: +# { id: Hook } +# Format on redis: +# * a SET "...:hooks" with all ids +# * a HASH "...:hook:" for each hook with some of its attributes db = {} nextID = 1 @@ -181,7 +188,7 @@ module.exports = class Hook if hookData? hook = new Hook() - hook.fromRedis hookData + hook.fromRedis(hookData) hook.save (error, hook) -> nextID = hook.id + 1 if hook.id >= nextID done(null, hook) diff --git a/labs/bbb-callback/meeting_id_map.coffee b/labs/bbb-callback/meeting_id_map.coffee index adbc38d422..12fe29c809 100644 --- a/labs/bbb-callback/meeting_id_map.coffee +++ b/labs/bbb-callback/meeting_id_map.coffee @@ -1,57 +1,162 @@ _ = require("lodash") +async = require("async") redis = require("redis") config = require("./config") # The database of mappings. Uses the externalID as key because it changes less than # the internal ID (e.g. the internalID can change for different meetings in the same -# room). Format: -# { externalMeetingID: internalMeetingID } +# room). Used always from memory, but saved to redis for persistence. +# +# Format: +# { +# externalMeetingID: { +# id: @id +# externalMeetingID: @exnternalMeetingID +# internalMeetingID: @internalMeetingID +# lastActivity: @lastActivity +# } +# } +# Format on redis: +# * a SET "...:mappings" with all ids (not meeting ids, the object id) +# * a HASH "...:mapping:" for each mapping with all its attributes db = {} +nextID = 1 # A simple model to store mappings for meeting IDs. module.exports = class MeetingIDMap - @addOrUpdateMapping = (internalMeetingID, externalMeetingID) -> - db[externalMeetingID] = internalMeetingID - console.log "MeetingIDMap: added or changed meeting mapping to the list { #{externalMeetingID}: #{db[externalMeetingID]} }" - MeetingIDMap.updateRedis() + constructor: -> + @id = null + @externalMeetingID = null + @internalMeetingID = null + @lastActivity = null + @redisClient = redis.createClient() - @removeMapping = (internalMeetingID) -> - for external, internal of db - if internalMeetingID is internal - console.log "MeetingIDMap: removing meeting mapping from the list { #{external}: #{db[external]} }" - delete db[external] - db[external] = null - MeetingIDMap.updateRedis() + save: (callback) -> + @redisClient.hmset config.redis.keys.mapping(@id), @toRedis(), (error, reply) => + console.log "Hook: error saving mapping to redis!", error, reply if error? + @redisClient.sadd config.redis.keys.mappings, @id, (error, reply) => + console.log "Hook: error saving mapping ID to the list of mappings!", error, reply if error? + + db[@externalMeetingID] = this + callback?(error, db[@externalMeetingID]) + + destroy: (callback) -> + @redisClient.srem config.redis.keys.mappings, @id, (error, reply) => + console.log "Hook: error removing mapping ID from the list of mappings!", error, reply if error? + @redisClient.del config.redis.keys.mapping(@id), (error) => + console.log "Hook: error removing mapping from redis!", error if error? + + if db[@externalMeetingID] + delete db[@externalMeetingID] + callback?(error, true) + else + callback?(error, false) + + toRedis: -> + r = + "id": @id, + "internalMeetingID": @internalMeetingID + "externalMeetingID": @externalMeetingID + "lastActivity": @lastActivity + r + + fromRedis: (redisData) -> + @id = parseInt(redisData.id) + @externalMeetingID = redisData.externalMeetingID + @internalMeetingID = redisData.internalMeetingID + @lastActivity = redisData.lastActivity + + print: -> + JSON.stringify(@toRedis()) + + @addOrUpdateMapping = (internalMeetingID, externalMeetingID, callback) -> + mapping = new MeetingIDMap() + mapping.id = nextID++ + mapping.internalMeetingID = internalMeetingID + mapping.externalMeetingID = externalMeetingID + mapping.lastActivity = new Date().getTime() + mapping.save (error, result) -> + console.log "MeetingIDMap: added or changed meeting mapping to the list #{externalMeetingID}:", mapping.print() + callback?(error, result) + + @removeMapping = (internalMeetingID, callback) -> + for external, mapping of db + if mapping.internalMeetingID is internalMeetingID + mapping.destroy (error, result) -> + console.log "MeetingIDMap: removing meeting mapping from the list #{external}:", mapping.print() + callback?(error, result) @getInternalMeetingID = (externalMeetingID) -> - db[externalMeetingID] + db[externalMeetingID].internalMeetingID @getExternalMeetingID = (internalMeetingID) -> - for external, internal of db - if internal is internalMeetingID - return external + mapping = MeetingIDMap.findByInternalMeetingID(internalMeetingID) + mapping?.externalMeetingID + + @findByInternalMeetingID = (internalMeetingID) -> + if internalMeetingID? + for external, mapping of db + if mapping.internalMeetingID is internalMeetingID + return mapping null + @allSync = -> + arr = Object.keys(db).reduce((arr, id) -> + arr.push db[id] + arr + , []) + arr + + # Sets the last activity of the mapping for `internalMeetingID` to now. + @reportActivity = (internalMeetingID) -> + mapping = MeetingIDMap.findByInternalMeetingID(internalMeetingID) + if mapping? + mapping.lastActivity = new Date().getTime() + mapping.save() + + # Checks all current mappings for their last activity and removes the ones that + # are "expired", that had their last activity too long ago. + @cleanup = -> + now = new Date().getTime() + all = MeetingIDMap.allSync() + toRemove = _.filter(all, (mapping) -> + mapping.lastActivity < now - config.mappings.timeout + ) + unless _.isEmpty(toRemove) + console.log "MeetingIDMap: expiring the mappings:", _.map(toRemove, (map) -> map.print()) + toRemove.forEach (mapping) -> mapping.destroy() + + # Initializes global methods for this model. @initialize = (callback) -> MeetingIDMap.resync(callback) + MeetingIDMap.cleanupInterval = setInterval(MeetingIDMap.cleanup, config.mappings.cleanupInterval) # Gets all mappings from redis to populate the local database. # Calls `callback()` when done. @resync = (callback) -> client = redis.createClient() - client.hgetall config.redis.keys.mappings, (error, mappings) => - MeetingIDMap.fromRedis(mappings) - callback?(error, mappings) + tasks = [] - @fromRedis = (mappings) -> - if mappings? - db = mappings - else - db = {} + client.smembers config.redis.keys.mappings, (error, mappings) => + console.log "Hook: error getting list of mappings from redis", error if error? - @updateRedis = (callback) -> - client = redis.createClient() - client.hmset config.redis.keys.mappings, db, (error, reply) => - callback?(error) + mappings.forEach (id) => + tasks.push (done) => + client.hgetall config.redis.keys.mapping(id), (error, mappingData) -> + console.log "Hook: error getting information for a mapping from redis", error if error? + + if mappingData? + mapping = new MeetingIDMap() + mapping.fromRedis(mappingData) + mapping.save (error, hook) -> + nextID = mapping.id + 1 if mapping.id >= nextID + done(null, mapping) + else + done(null, null) + + async.series tasks, (errors, result) -> + mappings = _.map(MeetingIDMap.allSync(), (m) -> m.print()) + console.log "MeetingIDMap: finished resync, mappings registered:", mappings + callback?() diff --git a/labs/bbb-callback/web_hooks.coffee b/labs/bbb-callback/web_hooks.coffee index bc43366055..43a79f1ce4 100644 --- a/labs/bbb-callback/web_hooks.coffee +++ b/labs/bbb-callback/web_hooks.coffee @@ -28,10 +28,14 @@ module.exports = class WebHooks @subscriberEvents.on "pmessage", (pattern, channel, message) => try - message = JSON.parse message - if message? and @_filterMessage channel, message - console.log "WebHooks: processing message on [#{channel}]:", JSON.stringify(message) - @_processEvent message + message = JSON.parse(message) + if message? + id = message.payload?.meeting_id + MeetingIDMap.reportActivity(id) + + if @_filterMessage(channel, message) + console.log "WebHooks: processing message on [#{channel}]:", JSON.stringify(message) + @_processEvent(message) catch e console.log "WebHooks: error processing the message", message, ":", e