From 6d63f2f2996dc71bf1b20e032f22dcea3018b1f4 Mon Sep 17 00:00:00 2001 From: Leonardo Crauss Daronco Date: Wed, 12 Nov 2014 16:35:25 -0200 Subject: [PATCH] Webhooks: persist data to redis Save hooks and meetingID mappings to redis and get them back when the application starts. Still missing a way to remove old data in case the app loses events (e.g. a hook for a specific meeting might stay on redis forever if the app lost the meeting_destroyed event). --- labs/bbb-callback/application.coffee | 8 +- labs/bbb-callback/config.coffee | 7 ++ labs/bbb-callback/hook.coffee | 97 ++++++++++++++++++------- labs/bbb-callback/meeting_id_map.coffee | 24 ++++++ labs/bbb-callback/web_hooks.coffee | 36 --------- 5 files changed, 107 insertions(+), 65 deletions(-) diff --git a/labs/bbb-callback/application.coffee b/labs/bbb-callback/application.coffee index aa25499c27..3d4378a720 100644 --- a/labs/bbb-callback/application.coffee +++ b/labs/bbb-callback/application.coffee @@ -1,4 +1,6 @@ config = require("./config") +Hook = require("./hook") +MeetingIDMap = require("./meeting_id_map") WebHooks = require("./web_hooks") WebServer = require("./web_server") @@ -11,5 +13,7 @@ module.exports = class Application @webServer = new WebServer() start: -> - @webServer.start(config.server.port) - @webHooks.start() + Hook.initialize => + MeetingIDMap.initialize => + @webServer.start(config.server.port) + @webHooks.start() diff --git a/labs/bbb-callback/config.coffee b/labs/bbb-callback/config.coffee index 1a13222266..e31f6f9e70 100644 --- a/labs/bbb-callback/config.coffee +++ b/labs/bbb-callback/config.coffee @@ -29,6 +29,13 @@ config.hooks.events = [ # { channel: "bigbluebutton:from-bbb-apps:meeting", name: "user_registered_message" }, ] +# Redis +config.redis = {} +config.redis.keys = {} +config.redis.keys.hook = (id) -> "bigbluebutton:webhooks:hook:#{id}" +config.redis.keys.hooks = "bigbluebutton:webhooks:hooks" +config.redis.keys.mappings = "bigbluebutton:webhooks:mappings" + config.api = {} config.api.responses = {} config.api.responses.failure = (key, msg) -> diff --git a/labs/bbb-callback/hook.coffee b/labs/bbb-callback/hook.coffee index 05a41996b9..d708702861 100644 --- a/labs/bbb-callback/hook.coffee +++ b/labs/bbb-callback/hook.coffee @@ -1,11 +1,14 @@ _ = require("lodash") +async = require("async") +redis = require("redis") +config = require("./config") CallbackEmitter = require("./callback_emitter") MeetingIDMap = require("./meeting_id_map") # The database of hooks. db = {} -nextId = 1 +nextID = 1 # The representation of a hook and its properties. Stored in memory and persisted # to redis. @@ -15,22 +18,34 @@ nextId = 1 # TODO: at some point the queue needs to be cleared, or we need a size limit on it module.exports = class Hook - # @initialize = -> - # # get hooks from redis - constructor: -> @id = null @callbackURL = null @externalMeetingID = null @queue = [] @emitter = null + @redisClient = redis.createClient() - saveSync: -> - db[@id] = this - db[@id] + save: (callback) -> + @redisClient.hmset config.redis.keys.hook(@id), @toRedis(), (error, reply) => + console.log "Hook: error saving hook to redis!", error, reply if error? + @redisClient.sadd config.redis.keys.hooks, @id, (error, reply) => + console.log "Hook: error saving hookID to the list of hooks!", error, reply if error? - destroySync: -> - Hook.destroySync @id + db[@id] = this + callback?(error, db[@id]) + + destroy: (callback) -> + @redisClient.srem config.redis.keys.hooks, @id, (error, reply) => + console.log "Hook: error removing hookID from the list of hooks!", error, reply if error? + @redisClient.del config.redis.keys.hook(@id), (error) => + console.log "Hook: error removing hook from redis!", error if error? + + if db[@id] + delete db[@id] + callback?(error, true) + else + callback?(error, false) # Is this a global hook? isGlobal: -> @@ -40,11 +55,6 @@ module.exports = class Hook targetMeetingID: -> @externalMeetingID - # mapFromRedis: (redisData) -> - # @callbackURL = redisData?.callbackURL - # @externalMeetingID = redisData?.externalMeetingID - # @id = redisData?.subscriptionID - # Puts a new message in the queue. Will also trigger a processing in the queue so this # message might be processed instantly. enqueue: (message) -> @@ -52,6 +62,18 @@ module.exports = class Hook @queue.push message @_processQueue() + toRedis: -> + { + "hookID": @id, + "externalMeetingID": @externalMeetingID, + "callbackURL": @callbackURL + } + + fromRedis: (redisData) -> + @callbackURL = redisData?.callbackURL + @externalMeetingID = redisData?.externalMeetingID + @id = parseInt(redisData?.hookID) + # Gets the first message in the queue and start an emitter to send it. Will only do it # if there is no emitter running already and if there is a message in the queue. _processQueue: -> @@ -82,12 +104,10 @@ module.exports = class Hook console.log msg hook = new Hook() - hook.id = nextId++ + hook.id = nextID++ hook.callbackURL = callbackURL hook.externalMeetingID = meetingID - hook.saveSync() - - callback?(null, hook) + hook.save (error, hook) -> callback?(error, hook) @removeSubscription = (hookID, callback) -> hook = Hook.getSync(hookID) @@ -96,8 +116,7 @@ module.exports = class Hook msg += " for the meeting [#{hook.externalMeetingID}]" if hook.externalMeetingID? console.log msg - hook.destroySync() - callback?(null, true) + hook.destroy (error, removed) -> callback?(error, removed) else callback?(null, false) @@ -131,13 +150,6 @@ module.exports = class Hook , []) arr - @destroySync = (id) -> - if db[id] - delete db[id] - true - else - false - @clearSync = -> for id of db delete db[id] @@ -147,3 +159,34 @@ module.exports = class Hook for id of db if db[id].callbackURL is callbackURL return db[id] + + @initialize = (callback) -> + Hook.resync(callback) + + # Gets all hooks from redis to populate the local database. + # Calls `callback()` when done. + @resync = (callback) -> + client = redis.createClient() + tasks = [] + + client.smembers config.redis.keys.hooks, (error, hooks) => + console.log "Hook: error getting list of hooks from redis", error if error? + + hooks.forEach (id) => + tasks.push (done) => + client.hgetall config.redis.keys.hook(id), (error, hookData) -> + console.log "Hook: error getting information for a hook from redis", error if error? + + if hookData? + hook = new Hook() + hook.fromRedis hookData + hook.save (error, hook) -> + nextID = hook.id + 1 if hook.id >= nextID + done(null, hook) + else + done(null, null) + + async.series tasks, (errors, result) -> + hooks = _.map(Hook.allSync(), (hook) -> "[#{hook.id}] #{hook.callbackURL}") + console.log "Hook: finished resync, hooks registered:", hooks + callback?() diff --git a/labs/bbb-callback/meeting_id_map.coffee b/labs/bbb-callback/meeting_id_map.coffee index ac06a13d30..70943d9eb7 100644 --- a/labs/bbb-callback/meeting_id_map.coffee +++ b/labs/bbb-callback/meeting_id_map.coffee @@ -1,4 +1,7 @@ _ = require("lodash") +redis = require("redis") + +config = require("./config") # The database of mappings. Format: # { internalMeetingID: externalMeetingID } @@ -11,12 +14,14 @@ module.exports = class MeetingIDMap unless internalMeetingID in _.keys(db) db[internalMeetingID] = externalMeetingID console.log "MeetingIDMap: added meeting mapping to the list { #{internalMeetingID}: #{db[internalMeetingID]} }" + MeetingIDMap.updateRedis() @removeMapping = (internalMeetingID) -> if internalMeetingID in _.keys(db) console.log "MeetingIDMap: removing meeting mapping from the list { #{internalMeetingID}: #{db[internalMeetingID]} }" delete db[internalMeetingID] db[internalMeetingID] = null + MeetingIDMap.updateRedis() @getInternalMeetingID = (externalMeetingID) -> for internal, external of db @@ -26,3 +31,22 @@ module.exports = class MeetingIDMap @getExternalMeetingID = (internalMeetingID) -> db[internalMeetingID] + + @initialize = (callback) -> + MeetingIDMap.resync(callback) + + # Gets all mappings from redis to populate the local database. + # Calls `callback()` when done. + @resync = (callback) -> + client = redis.createClient() + client.hgetall config.redis.keys.mappings, (error, mappings) => + MeetingIDMap.fromRedis(mappings) + callback?(error, mappings) + + @fromRedis = (mappings) -> + db = mappings + + @updateRedis = (callback) -> + client = redis.createClient() + client.hmset config.redis.keys.mappings, db, (error, reply) => + callback?(error) diff --git a/labs/bbb-callback/web_hooks.coffee b/labs/bbb-callback/web_hooks.coffee index e7745ace36..67f3b984e6 100644 --- a/labs/bbb-callback/web_hooks.coffee +++ b/labs/bbb-callback/web_hooks.coffee @@ -13,7 +13,6 @@ module.exports = class WebHooks constructor: -> @subscriberEvents = redis.createClient() - @client = redis.createClient() # To map internal and external meeting IDs @subscriberMeetings = redis.createClient() @@ -83,38 +82,3 @@ module.exports = class WebHooks console.log "WebHooks: error processing the message", JSON.stringify(message), ":", e @subscriberMeetings.subscribe config.hooks.meetingsChannel - - # TODO: enable the methods below again when we persist hooks to redis again - # # Gets all hooks from redis. - # # Calls `callback(errors, result)` when done. `result` is an array of `Hook` objects. - # _getHooksFromRedis: (callback) -> - # tasks = [] - # @meetings.forEach (meetingID) => - # console.log "WebHooks: checking hooks for the meeting", meetingID - # tasks.push (done) => - - # @client.lrange "meeting:#{meetingID}:subscriptions", 0, -1, (error, subscriptions) => - # # TODO: treat error - # @_getHooksFromRedisForSubscriptions meetingID, subscriptions, done - - # async.series tasks, (errors, result) -> - # result = _.flatten result - # console.log "Hooks#_getHooksFromRedis: returning", result - # callback?(errors, result) - - # # Get the hook URLs for a list of subscriptions. - # _getHooksFromRedisForSubscriptions: (meetingID, subscriptions, callback) -> - # tasks = [] - # subscriptions.forEach (sid, index) => - - # tasks.push (done) => - # @client.hgetall "meeting:#{meetingID}:subscription:#{sid}", (error, redisData) -> - # # TODO: treat error - # console.log "WebHooks: creating hook url for", redisData - # hook = new Hook() - # hook.mapFromRedis redisData - # done null, hook - - # async.series tasks, (errors, result) -> - # console.log "Hooks#_getHooksFromRedisForSubscriptions: returning", result - # callback?(errors, result)