Webhooks: persist data to redis

Save hooks and meetingID mappings to redis and get them back when the
application starts.
Still missing a way to remove old data in case the app loses events (e.g.
a hook for a specific meeting might stay on redis forever if the app
lost the meeting_destroyed event).
This commit is contained in:
Leonardo Crauss Daronco 2014-11-12 16:35:25 -02:00
parent bfdae8f204
commit 6d63f2f299
5 changed files with 107 additions and 65 deletions

View File

@ -1,4 +1,6 @@
config = require("./config")
Hook = require("./hook")
MeetingIDMap = require("./meeting_id_map")
WebHooks = require("./web_hooks")
WebServer = require("./web_server")
@ -11,5 +13,7 @@ module.exports = class Application
@webServer = new WebServer()
start: ->
@webServer.start(config.server.port)
@webHooks.start()
Hook.initialize =>
MeetingIDMap.initialize =>
@webServer.start(config.server.port)
@webHooks.start()

View File

@ -29,6 +29,13 @@ config.hooks.events = [
# { channel: "bigbluebutton:from-bbb-apps:meeting", name: "user_registered_message" },
]
# Redis
config.redis = {}
config.redis.keys = {}
config.redis.keys.hook = (id) -> "bigbluebutton:webhooks:hook:#{id}"
config.redis.keys.hooks = "bigbluebutton:webhooks:hooks"
config.redis.keys.mappings = "bigbluebutton:webhooks:mappings"
config.api = {}
config.api.responses = {}
config.api.responses.failure = (key, msg) ->

View File

@ -1,11 +1,14 @@
_ = require("lodash")
async = require("async")
redis = require("redis")
config = require("./config")
CallbackEmitter = require("./callback_emitter")
MeetingIDMap = require("./meeting_id_map")
# The database of hooks.
db = {}
nextId = 1
nextID = 1
# The representation of a hook and its properties. Stored in memory and persisted
# to redis.
@ -15,22 +18,34 @@ nextId = 1
# TODO: at some point the queue needs to be cleared, or we need a size limit on it
module.exports = class Hook
# @initialize = ->
# # get hooks from redis
constructor: ->
@id = null
@callbackURL = null
@externalMeetingID = null
@queue = []
@emitter = null
@redisClient = redis.createClient()
saveSync: ->
db[@id] = this
db[@id]
save: (callback) ->
@redisClient.hmset config.redis.keys.hook(@id), @toRedis(), (error, reply) =>
console.log "Hook: error saving hook to redis!", error, reply if error?
@redisClient.sadd config.redis.keys.hooks, @id, (error, reply) =>
console.log "Hook: error saving hookID to the list of hooks!", error, reply if error?
destroySync: ->
Hook.destroySync @id
db[@id] = this
callback?(error, db[@id])
destroy: (callback) ->
@redisClient.srem config.redis.keys.hooks, @id, (error, reply) =>
console.log "Hook: error removing hookID from the list of hooks!", error, reply if error?
@redisClient.del config.redis.keys.hook(@id), (error) =>
console.log "Hook: error removing hook from redis!", error if error?
if db[@id]
delete db[@id]
callback?(error, true)
else
callback?(error, false)
# Is this a global hook?
isGlobal: ->
@ -40,11 +55,6 @@ module.exports = class Hook
targetMeetingID: ->
@externalMeetingID
# mapFromRedis: (redisData) ->
# @callbackURL = redisData?.callbackURL
# @externalMeetingID = redisData?.externalMeetingID
# @id = redisData?.subscriptionID
# Puts a new message in the queue. Will also trigger a processing in the queue so this
# message might be processed instantly.
enqueue: (message) ->
@ -52,6 +62,18 @@ module.exports = class Hook
@queue.push message
@_processQueue()
toRedis: ->
{
"hookID": @id,
"externalMeetingID": @externalMeetingID,
"callbackURL": @callbackURL
}
fromRedis: (redisData) ->
@callbackURL = redisData?.callbackURL
@externalMeetingID = redisData?.externalMeetingID
@id = parseInt(redisData?.hookID)
# Gets the first message in the queue and start an emitter to send it. Will only do it
# if there is no emitter running already and if there is a message in the queue.
_processQueue: ->
@ -82,12 +104,10 @@ module.exports = class Hook
console.log msg
hook = new Hook()
hook.id = nextId++
hook.id = nextID++
hook.callbackURL = callbackURL
hook.externalMeetingID = meetingID
hook.saveSync()
callback?(null, hook)
hook.save (error, hook) -> callback?(error, hook)
@removeSubscription = (hookID, callback) ->
hook = Hook.getSync(hookID)
@ -96,8 +116,7 @@ module.exports = class Hook
msg += " for the meeting [#{hook.externalMeetingID}]" if hook.externalMeetingID?
console.log msg
hook.destroySync()
callback?(null, true)
hook.destroy (error, removed) -> callback?(error, removed)
else
callback?(null, false)
@ -131,13 +150,6 @@ module.exports = class Hook
, [])
arr
@destroySync = (id) ->
if db[id]
delete db[id]
true
else
false
@clearSync = ->
for id of db
delete db[id]
@ -147,3 +159,34 @@ module.exports = class Hook
for id of db
if db[id].callbackURL is callbackURL
return db[id]
@initialize = (callback) ->
Hook.resync(callback)
# Gets all hooks from redis to populate the local database.
# Calls `callback()` when done.
@resync = (callback) ->
client = redis.createClient()
tasks = []
client.smembers config.redis.keys.hooks, (error, hooks) =>
console.log "Hook: error getting list of hooks from redis", error if error?
hooks.forEach (id) =>
tasks.push (done) =>
client.hgetall config.redis.keys.hook(id), (error, hookData) ->
console.log "Hook: error getting information for a hook from redis", error if error?
if hookData?
hook = new Hook()
hook.fromRedis hookData
hook.save (error, hook) ->
nextID = hook.id + 1 if hook.id >= nextID
done(null, hook)
else
done(null, null)
async.series tasks, (errors, result) ->
hooks = _.map(Hook.allSync(), (hook) -> "[#{hook.id}] #{hook.callbackURL}")
console.log "Hook: finished resync, hooks registered:", hooks
callback?()

View File

@ -1,4 +1,7 @@
_ = require("lodash")
redis = require("redis")
config = require("./config")
# The database of mappings. Format:
# { internalMeetingID: externalMeetingID }
@ -11,12 +14,14 @@ module.exports = class MeetingIDMap
unless internalMeetingID in _.keys(db)
db[internalMeetingID] = externalMeetingID
console.log "MeetingIDMap: added meeting mapping to the list { #{internalMeetingID}: #{db[internalMeetingID]} }"
MeetingIDMap.updateRedis()
@removeMapping = (internalMeetingID) ->
if internalMeetingID in _.keys(db)
console.log "MeetingIDMap: removing meeting mapping from the list { #{internalMeetingID}: #{db[internalMeetingID]} }"
delete db[internalMeetingID]
db[internalMeetingID] = null
MeetingIDMap.updateRedis()
@getInternalMeetingID = (externalMeetingID) ->
for internal, external of db
@ -26,3 +31,22 @@ module.exports = class MeetingIDMap
@getExternalMeetingID = (internalMeetingID) ->
db[internalMeetingID]
@initialize = (callback) ->
MeetingIDMap.resync(callback)
# Gets all mappings from redis to populate the local database.
# Calls `callback()` when done.
@resync = (callback) ->
client = redis.createClient()
client.hgetall config.redis.keys.mappings, (error, mappings) =>
MeetingIDMap.fromRedis(mappings)
callback?(error, mappings)
@fromRedis = (mappings) ->
db = mappings
@updateRedis = (callback) ->
client = redis.createClient()
client.hmset config.redis.keys.mappings, db, (error, reply) =>
callback?(error)

View File

@ -13,7 +13,6 @@ module.exports = class WebHooks
constructor: ->
@subscriberEvents = redis.createClient()
@client = redis.createClient()
# To map internal and external meeting IDs
@subscriberMeetings = redis.createClient()
@ -83,38 +82,3 @@ module.exports = class WebHooks
console.log "WebHooks: error processing the message", JSON.stringify(message), ":", e
@subscriberMeetings.subscribe config.hooks.meetingsChannel
# TODO: enable the methods below again when we persist hooks to redis again
# # Gets all hooks from redis.
# # Calls `callback(errors, result)` when done. `result` is an array of `Hook` objects.
# _getHooksFromRedis: (callback) ->
# tasks = []
# @meetings.forEach (meetingID) =>
# console.log "WebHooks: checking hooks for the meeting", meetingID
# tasks.push (done) =>
# @client.lrange "meeting:#{meetingID}:subscriptions", 0, -1, (error, subscriptions) =>
# # TODO: treat error
# @_getHooksFromRedisForSubscriptions meetingID, subscriptions, done
# async.series tasks, (errors, result) ->
# result = _.flatten result
# console.log "Hooks#_getHooksFromRedis: returning", result
# callback?(errors, result)
# # Get the hook URLs for a list of subscriptions.
# _getHooksFromRedisForSubscriptions: (meetingID, subscriptions, callback) ->
# tasks = []
# subscriptions.forEach (sid, index) =>
# tasks.push (done) =>
# @client.hgetall "meeting:#{meetingID}:subscription:#{sid}", (error, redisData) ->
# # TODO: treat error
# console.log "WebHooks: creating hook url for", redisData
# hook = new Hook()
# hook.mapFromRedis redisData
# done null, hook
# async.series tasks, (errors, result) ->
# console.log "Hooks#_getHooksFromRedisForSubscriptions: returning", result
# callback?(errors, result)