Webhooks: expire mappings for inactive meetings

If a meeting has no events for too long (24 hours), remove the meeting
mapping to prevent too much data from being stored forever on redis.
This commit is contained in:
Leonardo Crauss Daronco 2014-11-13 12:03:42 -02:00
parent 66bfbeb1bc
commit 14641d1a3b
4 changed files with 156 additions and 34 deletions

View File

@ -35,12 +35,18 @@ config.hooks.retryIntervals = [
100, 500, 1000, 2000, 4000, 8000, 10000, 30000, 60000, 60000, 60000, 60000
]
# Mappings of internal to external meeting IDs
config.mappings = {}
config.mappings.cleanupInterval = 1000 # in ms
config.mappings.timeout = 1000*60*60*24 # 24 hours, in ms
# Redis
config.redis = {}
config.redis.keys = {}
config.redis.keys.hook = (id) -> "bigbluebutton:webhooks:hook:#{id}"
config.redis.keys.hooks = "bigbluebutton:webhooks:hooks"
config.redis.keys.mappings = "bigbluebutton:webhooks:mappings"
config.redis.keys.mapping = (id) -> "bigbluebutton:webhooks:mapping:#{id}"
config.api = {}
config.api.responses = {}

View File

@ -7,6 +7,13 @@ CallbackEmitter = require("./callback_emitter")
MeetingIDMap = require("./meeting_id_map")
# The database of hooks.
# Used always from memory, but saved to redis for persistence.
#
# Format:
# { id: Hook }
# Format on redis:
# * a SET "...:hooks" with all ids
# * a HASH "...:hook:<id>" for each hook with some of its attributes
db = {}
nextID = 1
@ -181,7 +188,7 @@ module.exports = class Hook
if hookData?
hook = new Hook()
hook.fromRedis hookData
hook.fromRedis(hookData)
hook.save (error, hook) ->
nextID = hook.id + 1 if hook.id >= nextID
done(null, hook)

View File

@ -1,57 +1,162 @@
_ = require("lodash")
async = require("async")
redis = require("redis")
config = require("./config")
# The database of mappings. Uses the externalID as key because it changes less than
# the internal ID (e.g. the internalID can change for different meetings in the same
# room). Format:
# { externalMeetingID: internalMeetingID }
# room). Used always from memory, but saved to redis for persistence.
#
# Format:
# {
# externalMeetingID: {
# id: @id
# externalMeetingID: @exnternalMeetingID
# internalMeetingID: @internalMeetingID
# lastActivity: @lastActivity
# }
# }
# Format on redis:
# * a SET "...:mappings" with all ids (not meeting ids, the object id)
# * a HASH "...:mapping:<id>" for each mapping with all its attributes
db = {}
nextID = 1
# A simple model to store mappings for meeting IDs.
module.exports = class MeetingIDMap
@addOrUpdateMapping = (internalMeetingID, externalMeetingID) ->
db[externalMeetingID] = internalMeetingID
console.log "MeetingIDMap: added or changed meeting mapping to the list { #{externalMeetingID}: #{db[externalMeetingID]} }"
MeetingIDMap.updateRedis()
constructor: ->
@id = null
@externalMeetingID = null
@internalMeetingID = null
@lastActivity = null
@redisClient = redis.createClient()
@removeMapping = (internalMeetingID) ->
for external, internal of db
if internalMeetingID is internal
console.log "MeetingIDMap: removing meeting mapping from the list { #{external}: #{db[external]} }"
delete db[external]
db[external] = null
MeetingIDMap.updateRedis()
save: (callback) ->
@redisClient.hmset config.redis.keys.mapping(@id), @toRedis(), (error, reply) =>
console.log "Hook: error saving mapping to redis!", error, reply if error?
@redisClient.sadd config.redis.keys.mappings, @id, (error, reply) =>
console.log "Hook: error saving mapping ID to the list of mappings!", error, reply if error?
db[@externalMeetingID] = this
callback?(error, db[@externalMeetingID])
destroy: (callback) ->
@redisClient.srem config.redis.keys.mappings, @id, (error, reply) =>
console.log "Hook: error removing mapping ID from the list of mappings!", error, reply if error?
@redisClient.del config.redis.keys.mapping(@id), (error) =>
console.log "Hook: error removing mapping from redis!", error if error?
if db[@externalMeetingID]
delete db[@externalMeetingID]
callback?(error, true)
else
callback?(error, false)
toRedis: ->
r =
"id": @id,
"internalMeetingID": @internalMeetingID
"externalMeetingID": @externalMeetingID
"lastActivity": @lastActivity
r
fromRedis: (redisData) ->
@id = parseInt(redisData.id)
@externalMeetingID = redisData.externalMeetingID
@internalMeetingID = redisData.internalMeetingID
@lastActivity = redisData.lastActivity
print: ->
JSON.stringify(@toRedis())
@addOrUpdateMapping = (internalMeetingID, externalMeetingID, callback) ->
mapping = new MeetingIDMap()
mapping.id = nextID++
mapping.internalMeetingID = internalMeetingID
mapping.externalMeetingID = externalMeetingID
mapping.lastActivity = new Date().getTime()
mapping.save (error, result) ->
console.log "MeetingIDMap: added or changed meeting mapping to the list #{externalMeetingID}:", mapping.print()
callback?(error, result)
@removeMapping = (internalMeetingID, callback) ->
for external, mapping of db
if mapping.internalMeetingID is internalMeetingID
mapping.destroy (error, result) ->
console.log "MeetingIDMap: removing meeting mapping from the list #{external}:", mapping.print()
callback?(error, result)
@getInternalMeetingID = (externalMeetingID) ->
db[externalMeetingID]
db[externalMeetingID].internalMeetingID
@getExternalMeetingID = (internalMeetingID) ->
for external, internal of db
if internal is internalMeetingID
return external
mapping = MeetingIDMap.findByInternalMeetingID(internalMeetingID)
mapping?.externalMeetingID
@findByInternalMeetingID = (internalMeetingID) ->
if internalMeetingID?
for external, mapping of db
if mapping.internalMeetingID is internalMeetingID
return mapping
null
@allSync = ->
arr = Object.keys(db).reduce((arr, id) ->
arr.push db[id]
arr
, [])
arr
# Sets the last activity of the mapping for `internalMeetingID` to now.
@reportActivity = (internalMeetingID) ->
mapping = MeetingIDMap.findByInternalMeetingID(internalMeetingID)
if mapping?
mapping.lastActivity = new Date().getTime()
mapping.save()
# Checks all current mappings for their last activity and removes the ones that
# are "expired", that had their last activity too long ago.
@cleanup = ->
now = new Date().getTime()
all = MeetingIDMap.allSync()
toRemove = _.filter(all, (mapping) ->
mapping.lastActivity < now - config.mappings.timeout
)
unless _.isEmpty(toRemove)
console.log "MeetingIDMap: expiring the mappings:", _.map(toRemove, (map) -> map.print())
toRemove.forEach (mapping) -> mapping.destroy()
# Initializes global methods for this model.
@initialize = (callback) ->
MeetingIDMap.resync(callback)
MeetingIDMap.cleanupInterval = setInterval(MeetingIDMap.cleanup, config.mappings.cleanupInterval)
# Gets all mappings from redis to populate the local database.
# Calls `callback()` when done.
@resync = (callback) ->
client = redis.createClient()
client.hgetall config.redis.keys.mappings, (error, mappings) =>
MeetingIDMap.fromRedis(mappings)
callback?(error, mappings)
tasks = []
@fromRedis = (mappings) ->
if mappings?
db = mappings
else
db = {}
client.smembers config.redis.keys.mappings, (error, mappings) =>
console.log "Hook: error getting list of mappings from redis", error if error?
@updateRedis = (callback) ->
client = redis.createClient()
client.hmset config.redis.keys.mappings, db, (error, reply) =>
callback?(error)
mappings.forEach (id) =>
tasks.push (done) =>
client.hgetall config.redis.keys.mapping(id), (error, mappingData) ->
console.log "Hook: error getting information for a mapping from redis", error if error?
if mappingData?
mapping = new MeetingIDMap()
mapping.fromRedis(mappingData)
mapping.save (error, hook) ->
nextID = mapping.id + 1 if mapping.id >= nextID
done(null, mapping)
else
done(null, null)
async.series tasks, (errors, result) ->
mappings = _.map(MeetingIDMap.allSync(), (m) -> m.print())
console.log "MeetingIDMap: finished resync, mappings registered:", mappings
callback?()

View File

@ -28,10 +28,14 @@ module.exports = class WebHooks
@subscriberEvents.on "pmessage", (pattern, channel, message) =>
try
message = JSON.parse message
if message? and @_filterMessage channel, message
console.log "WebHooks: processing message on [#{channel}]:", JSON.stringify(message)
@_processEvent message
message = JSON.parse(message)
if message?
id = message.payload?.meeting_id
MeetingIDMap.reportActivity(id)
if @_filterMessage(channel, message)
console.log "WebHooks: processing message on [#{channel}]:", JSON.stringify(message)
@_processEvent(message)
catch e
console.log "WebHooks: error processing the message", message, ":", e