504 lines
22 KiB
CoffeeScript
Executable File
504 lines
22 KiB
CoffeeScript
Executable File
Meteor.startup ->
|
|
Meteor.log.info "server start"
|
|
|
|
#remove all data
|
|
Meteor.WhiteboardCleanStatus.remove({})
|
|
clearUsersCollection()
|
|
clearChatCollection()
|
|
clearMeetingsCollection()
|
|
clearShapesCollection()
|
|
clearSlidesCollection()
|
|
clearPresentationsCollection()
|
|
clearPollCollection()
|
|
clearCursorCollection()
|
|
|
|
# create create a PubSub connection, start listening
|
|
Meteor.redisPubSub = new Meteor.RedisPubSub(->
|
|
Meteor.log.info "created pubsub")
|
|
|
|
Meteor.myQueue = new PowerQueue({
|
|
# autoStart:true
|
|
# isPaused: true
|
|
})
|
|
|
|
Meteor.myQueue.taskHandler = (data, next, failures) ->
|
|
eventName = JSON.parse(data.jsonMsg)?.header.name
|
|
if failures > 0
|
|
Meteor.log.error "got a failure on taskHandler #{eventName} #{failures}"
|
|
else
|
|
handleRedisMessage(data, ()->
|
|
length = Meteor.myQueue.length()
|
|
lengthString = ->
|
|
if length>0
|
|
"In the queue we have #{length} event(s) to process."
|
|
else ""
|
|
|
|
Meteor.log.info "in callback after handleRedisMessage #{eventName}.
|
|
#{lengthString()}"
|
|
next()
|
|
)
|
|
|
|
|
|
# To ensure that we process the redis json event messages serially we use a
|
|
# callback. This callback is to be called when the Meteor collection is
|
|
# updated with the information coming in the payload of the json message. The
|
|
# callback signalizes to the queue that we are done processing the current
|
|
# message in the queue and are ready to move on to the next one. If we do not
|
|
# use the callback mechanism we may encounter a race condition situation
|
|
# due to not following the order of events coming through the redis pubsub.
|
|
# for example: a user_left event reaching the collection before a user_joined
|
|
# for the same user.
|
|
@handleRedisMessage = (data, callback) ->
|
|
message = JSON.parse(data.jsonMsg)
|
|
# correlationId = message.payload?.reply_to or message.header?.reply_to
|
|
meetingId = message.payload?.meeting_id
|
|
|
|
# Avoid cluttering the log with json messages carrying little or repetitive
|
|
# information. Comment out a message type in the array to be able to see it
|
|
# in the log upon restarting of the Meteor process.
|
|
notLoggedEventTypes = [
|
|
"keep_alive_reply"
|
|
"page_resized_message"
|
|
"presentation_page_resized_message"
|
|
"presentation_cursor_updated_message"
|
|
"get_presentation_info_reply"
|
|
#"get_users_reply"
|
|
"get_chat_history_reply"
|
|
#"get_all_meetings_reply"
|
|
"get_whiteboard_shapes_reply"
|
|
"presentation_shared_message"
|
|
"presentation_conversion_done_message"
|
|
"presentation_conversion_progress_message"
|
|
"presentation_page_generated_message"
|
|
#"presentation_page_changed_message"
|
|
"BbbPubSubPongMessage"
|
|
"bbb_apps_is_alive_message"
|
|
"user_voice_talking_message"
|
|
"meeting_state_message"
|
|
"get_recording_status_reply"
|
|
]
|
|
|
|
eventName = message.header.name
|
|
meetingId = message.payload?.meeting_id
|
|
|
|
unless message?.header? and message.payload?
|
|
Meteor.log.error "ERROR!! No header or payload"
|
|
callback()
|
|
|
|
unless message.header.name in notLoggedEventTypes
|
|
Meteor.log.info "redis incoming message #{eventName} ",
|
|
message: data.jsonMsg
|
|
|
|
# we currently disregard the pattern and channel
|
|
if message?.header? and message.payload?
|
|
if eventName is 'meeting_created_message'
|
|
# Meteor.log.error JSON.stringify message
|
|
meetingName = message.payload.name
|
|
intendedForRecording = message.payload.recorded
|
|
voiceConf = message.payload.voice_conf
|
|
duration = message.payload.duration
|
|
addMeetingToCollection meetingId, meetingName, intendedForRecording,
|
|
voiceConf, duration, callback
|
|
|
|
# handle voice events
|
|
else if message.payload.user? and eventName in [
|
|
'user_left_voice_message'
|
|
'user_joined_voice_message'
|
|
'user_voice_talking_message'
|
|
'user_voice_muted_message']
|
|
|
|
voiceUserObj = {
|
|
'web_userid': message.payload.user.voiceUser.web_userid
|
|
'listen_only': message.payload.listen_only
|
|
'talking': message.payload.user.voiceUser.talking
|
|
'joined': message.payload.user.voiceUser.joined
|
|
'locked': message.payload.user.voiceUser.locked
|
|
'muted': message.payload.user.voiceUser.muted
|
|
}
|
|
updateVoiceUser meetingId, voiceUserObj, callback
|
|
|
|
else if eventName is 'user_listening_only'
|
|
voiceUserObj = {
|
|
'web_userid': message.payload.userid
|
|
'listen_only': message.payload.listen_only
|
|
}
|
|
updateVoiceUser meetingId, voiceUserObj, callback
|
|
|
|
else if eventName is 'get_all_meetings_reply'
|
|
Meteor.log.info "Let's store some data for the running meetings
|
|
so that when an HTML5 client joins everything is ready!"
|
|
Meteor.log.info JSON.stringify(message)
|
|
listOfMeetings = message.payload.meetings
|
|
|
|
# Processing the meetings recursively with a callback to notify us,
|
|
# ensuring that we update the meeting collection serially
|
|
processMeeting = () ->
|
|
meeting = listOfMeetings.pop()
|
|
if meeting?
|
|
addMeetingToCollection meeting.meetingID, meeting.meetingName,
|
|
meeting.recorded, meeting.voiceBridge, meeting.duration, processMeeting
|
|
else
|
|
callback() # all meeting arrays (if any) have been processed
|
|
|
|
processMeeting()
|
|
|
|
else if eventName is 'user_joined_message'
|
|
userObj = message.payload.user
|
|
dbUser = Meteor.Users.findOne({userId: userObj.userid, meetingId: message.payload.meeting_id})
|
|
|
|
# On attempting reconnection of Flash clients (in voiceBridge) we receive
|
|
# an extra user_joined_message. Ignore it as it will add an extra user
|
|
# in the user list, creating discrepancy with the list in the Flash client
|
|
if dbUser?.user?.connection_status is "offline" and message.payload.user?.phone_user
|
|
Meteor.log.error "offline AND phone user"
|
|
callback() #return without joining the user
|
|
else
|
|
if dbUser?.clientType is "HTML5" # typically html5 users will be in
|
|
# the db [as a dummy user] before the joining message
|
|
status = dbUser?.validated
|
|
Meteor.log.info "in user_joined_message the validStatus
|
|
of the user was #{status}"
|
|
userObj.timeOfJoining = message.header.current_time
|
|
userJoined meetingId, userObj, callback
|
|
else
|
|
userJoined meetingId, userObj, callback
|
|
|
|
|
|
# only process if requester is nodeJSapp means only process in the case when
|
|
# we explicitly request the users
|
|
else if eventName is 'get_users_reply' and message.payload.requester_id is 'nodeJSapp'
|
|
users = message.payload.users
|
|
|
|
#TODO make the serialization be split per meeting. This will allow us to
|
|
# use N threads vs 1 and we'll take advantage of Mongo's concurrency tricks
|
|
|
|
# Processing the users recursively with a callback to notify us,
|
|
# ensuring that we update the users collection serially
|
|
processUser = () ->
|
|
user = users.pop()
|
|
if user?
|
|
user.timeOfJoining = message.header.current_time
|
|
if user.emoji_status isnt 'none' and typeof user.emoji_status is 'string'
|
|
console.log "3"
|
|
user.set_emoji_time = new Date()
|
|
userJoined meetingId, user, processUser
|
|
else
|
|
# console.error("this is not supposed to happen")
|
|
userJoined meetingId, user, processUser
|
|
else
|
|
callback() # all meeting arrays (if any) have been processed
|
|
|
|
processUser()
|
|
|
|
|
|
else if eventName is 'validate_auth_token_reply'
|
|
userId = message.payload.userid
|
|
user = Meteor.Users.findOne({userId:userId, meetingId: meetingId})
|
|
validStatus = message.payload.valid
|
|
|
|
# if the user already exists in the db
|
|
if user?.clientType is "HTML5"
|
|
#if the html5 client user was validated successfully, add a flag
|
|
Meteor.Users.update({userId:userId, meetingId:message.payload.meeting_id},
|
|
{$set:{validated: validStatus}},
|
|
(err, numChanged) ->
|
|
if numChanged.insertedId?
|
|
funct = (cbk) ->
|
|
val=Meteor.Users.findOne({userId:userId, meetingId: message.payload.meeting_id})?.validated
|
|
Meteor.log.info "user.validated for user #{userId} in meeting #{user.meetingId} just became #{val}"
|
|
cbk()
|
|
|
|
funct(callback)
|
|
else
|
|
callback()
|
|
)
|
|
else
|
|
Meteor.log.info "a non-html5 user got validate_auth_token_reply."
|
|
callback()
|
|
|
|
|
|
else if eventName is 'user_left_message'
|
|
userId = message.payload.user?.userid
|
|
if userId? and meetingId?
|
|
markUserOffline meetingId, userId, callback
|
|
else
|
|
callback() #TODO check how to get these cases out and reuse code
|
|
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is 'presenter_assigned_message'
|
|
newPresenterId = message.payload.new_presenter_id
|
|
if newPresenterId?
|
|
# reset the previous presenter
|
|
Meteor.Users.update({"user.presenter": true, meetingId: meetingId},
|
|
{$set: {"user.presenter": false}},
|
|
(err, numUpdated) ->
|
|
Meteor.log.info(" Updating old presenter numUpdated=#{numUpdated},
|
|
err=#{err}")
|
|
)
|
|
# set the new presenter
|
|
Meteor.Users.update({"user.userid": newPresenterId, meetingId: meetingId},
|
|
{$set: {"user.presenter": true}},
|
|
(err, numUpdated) ->
|
|
Meteor.log.info(" Updating new presenter numUpdated=#{numUpdated},
|
|
err=#{err}")
|
|
)
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is 'user_emoji_status_message'
|
|
userId = message.payload.userid
|
|
meetingId = message.payload.meeting_id
|
|
emojiStatus = message.payload.emoji_status
|
|
if userId? and meetingId?
|
|
set_emoji_time = new Date()
|
|
Meteor.Users.update({"user.userid": userId},
|
|
{$set:{"user.set_emoji_time":set_emoji_time,"user.emoji_status":emojiStatus}},
|
|
(err, numUpdated) ->
|
|
Meteor.log.info(" Updating emoji numUpdated=#{numUpdated}, err=#{err}")
|
|
)
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName in ['user_locked_message', 'user_unlocked_message']
|
|
userId = message.payload.userid
|
|
isLocked = message.payload.locked
|
|
setUserLockedStatus(meetingId, userId, isLocked)
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName in ["meeting_ended_message", "meeting_destroyed_event",
|
|
"end_and_kick_all_message", "disconnect_all_users_message"]
|
|
Meteor.log.info("DESTROYING MEETING #{meetingId}")
|
|
removeMeetingFromCollection meetingId, callback
|
|
|
|
###
|
|
if Meteor.Meetings.findOne({meetingId: meetingId})?
|
|
count=Meteor.Users.find({meetingId: meetingId}).count()
|
|
Meteor.log.info "there are #{count} users in the meeting"
|
|
for user in Meteor.Users.find({meetingId: meetingId}).fetch()
|
|
markUserOffline meetingId, user.userId
|
|
#TODO should we clear the chat messages for that meeting?!
|
|
unless eventName is "disconnect_all_users_message"
|
|
removeMeetingFromCollection meetingId
|
|
###
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "get_chat_history_reply" and message.payload.requester_id is "nodeJSapp"
|
|
unless Meteor.Meetings.findOne({MeetingId: message.payload.meeting_id})?
|
|
for chatMessage in message.payload.chat_history
|
|
addChatToCollection meetingId, chatMessage
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "send_public_chat_message" or eventName is "send_private_chat_message"
|
|
messageObject = message.payload.message
|
|
# use current_time instead of message.from_time so that the chats from Flash and HTML5 have uniform times
|
|
messageObject.from_time = message.header.current_time
|
|
addChatToCollection meetingId, messageObject
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "presentation_shared_message"
|
|
presentationId = message.payload.presentation?.id
|
|
# change the currently displayed presentation to presentation.current = false
|
|
Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId},
|
|
{$set: {"presentation.current": false}})
|
|
|
|
#update(if already present) entirely the presentation with the fresh data
|
|
removePresentationFromCollection meetingId, presentationId
|
|
addPresentationToCollection meetingId, message.payload.presentation
|
|
|
|
for slide in message.payload.presentation?.pages
|
|
addSlideToCollection meetingId, message.payload.presentation?.id, slide
|
|
if slide.current
|
|
displayThisSlide meetingId, slide.id, slide
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "get_presentation_info_reply" and message.payload.requester_id is "nodeJSapp"
|
|
for presentation in message.payload.presentations
|
|
addPresentationToCollection meetingId, presentation
|
|
|
|
for page in presentation.pages
|
|
#add the slide to the collection
|
|
addSlideToCollection meetingId, presentation.id, page
|
|
|
|
#request for shapes
|
|
whiteboardId = "#{presentation.id}/#{page.num}" # d2d9a672040fbde2a47a10bf6c37b6a4b5ae187f-1404411622872/1
|
|
#Meteor.log.info "the whiteboard_id here is:" + whiteboardId
|
|
|
|
replyTo = "#{meetingId}/nodeJSapp"
|
|
message =
|
|
"payload":
|
|
"meeting_id": meetingId
|
|
"requester_id": "nodeJSapp"
|
|
"whiteboard_id": whiteboardId
|
|
"reply_to": replyTo
|
|
"header":
|
|
"timestamp": new Date().getTime()
|
|
"name": "request_whiteboard_annotation_history_request"
|
|
|
|
if whiteboardId? and meetingId?
|
|
publish Meteor.config.redis.channels.toBBBApps.whiteboard, message #TODO
|
|
else
|
|
Meteor.log.info "did not have enough information to send a user_leaving_request" #TODO
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "presentation_page_changed_message"
|
|
newSlide = message.payload.page
|
|
displayThisSlide meetingId, newSlide?.id, newSlide
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "presentation_removed_message"
|
|
presentationId = message.payload.presentation_id
|
|
meetingId = message.payload.meeting_id
|
|
removePresentationFromCollection meetingId, presentationId
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "get_whiteboard_shapes_reply" and message.payload.requester_id is "nodeJSapp"
|
|
# Create a whiteboard clean status or find one for the current meeting
|
|
if not Meteor.WhiteboardCleanStatus.findOne({meetingId: meetingId})?
|
|
Meteor.WhiteboardCleanStatus.insert({meetingId: meetingId, in_progress: false})
|
|
|
|
for shape in message.payload.shapes
|
|
whiteboardId = shape.wb_id
|
|
addShapeToCollection meetingId, whiteboardId, shape
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "send_whiteboard_shape_message"
|
|
#Meteor stringifies an array of JSONs (...shape.result) in this message
|
|
#parsing the String and reassigning the value
|
|
if message.payload.shape.shape_type is "poll_result" and typeof message.payload.shape.shape.result is 'string'
|
|
message.payload.shape.shape.result = JSON.parse message.payload.shape.shape.result
|
|
|
|
shape = message.payload.shape
|
|
whiteboardId = shape?.wb_id
|
|
addShapeToCollection meetingId, whiteboardId, shape
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "presentation_cursor_updated_message"
|
|
cursor =
|
|
x: message.payload.x_percent
|
|
y: message.payload.y_percent
|
|
|
|
# update the location of the cursor on the whiteboard
|
|
updateCursorLocation(meetingId, cursor)
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "whiteboard_cleared_message"
|
|
whiteboardId = message.payload.whiteboard_id
|
|
Meteor.WhiteboardCleanStatus.update({meetingId: meetingId}, {$set: {'in_progress': true}})
|
|
removeAllShapesFromSlide meetingId, whiteboardId
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "undo_whiteboard_request"
|
|
whiteboardId = message.payload.whiteboard_id
|
|
shapeId = message.payload.shape_id
|
|
removeShapeFromSlide meetingId, whiteboardId, shapeId
|
|
callback()
|
|
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "presentation_page_resized_message"
|
|
slideId = message.payload.page?.id
|
|
heightRatio = message.payload.page?.height_ratio
|
|
widthRatio = message.payload.page?.width_ratio
|
|
xOffset = message.payload.page?.x_offset
|
|
yOffset = message.payload.page?.y_offset
|
|
presentationId = slideId.split("/")[0]
|
|
Meteor.Slides.update({presentationId: presentationId, "slide.current": true},
|
|
{$set:{"slide.height_ratio": heightRatio,"slide.width_ratio": widthRatio,"slide.x_offset":xOffset,"slide.y_offset":yOffset}}
|
|
)
|
|
callback()
|
|
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "recording_status_changed_message"
|
|
intendedForRecording = message.payload.recorded
|
|
currentlyBeingRecorded = message.payload.recording
|
|
Meteor.Meetings.update({meetingId: meetingId, intendedForRecording: intendedForRecording},
|
|
{$set: {currentlyBeingRecorded: currentlyBeingRecorded}}
|
|
)
|
|
callback()
|
|
|
|
# --------------------------------------------------
|
|
# lock settings ------------------------------------
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "eject_voice_user_message"
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "new_permission_settings"
|
|
oldSettings = Meteor.Meetings.findOne({meetingId:meetingId})?.roomLockSettings
|
|
newSettings = message.payload?.permissions
|
|
|
|
# if the disableMic setting was turned on
|
|
if !oldSettings?.disableMic and newSettings.disableMic
|
|
handleLockingMic(meetingId, newSettings)
|
|
|
|
# substitute with the new lock settings
|
|
Meteor.Meetings.update({meetingId: meetingId}, {$set: {
|
|
'roomLockSettings.disablePrivateChat': newSettings.disablePrivateChat
|
|
'roomLockSettings.disableCam': newSettings.disableCam
|
|
'roomLockSettings.disableMic': newSettings.disableMic
|
|
'roomLockSettings.lockOnJoin': newSettings.lockOnJoin
|
|
'roomLockSettings.lockedLayout': newSettings.lockedLayout
|
|
'roomLockSettings.disablePublicChat': newSettings.disablePublicChat
|
|
'roomLockSettings.lockOnJoinConfigurable': newSettings.lockOnJoinConfigurable #TODO
|
|
}})
|
|
callback()
|
|
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "poll_started_message"
|
|
if message.payload.meeting_id? and message.payload.requester_id? and message.payload.poll?
|
|
if Meteor.Meetings.findOne({meetingId: message.payload.meeting_id})?
|
|
#initializing the list of current users
|
|
users = Meteor.Users.find({meetingId: message.payload.meeting_id},
|
|
{fields:{"user.userid": 1, _id: 0}} ).fetch()
|
|
addPollToCollection message.payload.poll, message.payload.requester_id,
|
|
users, message.payload.meeting_id
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "poll_stopped_message"
|
|
meetingId = message.payload.meeting_id
|
|
poll_id = message.payload.poll_id
|
|
clearPollCollection meetingId, poll_id
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "user_voted_poll_message"
|
|
if message.payload?.poll? and message.payload.meeting_id? and message.payload.presenter_id?
|
|
pollObj = message.payload.poll
|
|
meetingId = message.payload.meeting_id
|
|
requesterId = message.payload.presenter_id
|
|
updatePollCollection pollObj, meetingId, requesterId
|
|
callback()
|
|
|
|
# for now not handling this serially #TODO
|
|
else if eventName is "poll_show_result_message"
|
|
if message.payload.poll.id? and message.payload.meeting_id?
|
|
poll_id = message.payload.poll.id
|
|
meetingId = message.payload.meeting_id
|
|
clearPollCollection meetingId, poll_id
|
|
callback()
|
|
|
|
|
|
else # keep moving in the queue
|
|
unless eventName in notLoggedEventTypes
|
|
Meteor.log.info "WARNING!!!
|
|
THE JSON MESSAGE WAS NOT OF TYPE SUPPORTED BY THIS APPLICATION\n
|
|
#{eventName} {JSON.stringify message}"
|
|
callback()
|
|
else
|
|
callback()
|