Merge pull request #2885 from antobinary/audio-fix

html5client:serial handling of selected event messages to ensure correct order of db updates
This commit is contained in:
Richard Alam 2015-11-06 17:18:49 -05:00
commit 31d7887dfd
8 changed files with 667 additions and 384 deletions

View File

@ -28,3 +28,6 @@ maibaum:foundation-icons
chriswessels:hammer@3.1.1
fastclick
standard-minifiers
cfs:power-queue
cfs:reactive-list
cfs:micro-queue

View File

@ -16,6 +16,10 @@ caching-compiler@1.0.0
caching-html-compiler@1.0.2
callback-hook@1.0.4
cfs:http-methods@0.0.30
cfs:micro-queue@0.0.6
cfs:power-queue@0.9.11
cfs:reactive-list@0.0.9
cfs:reactive-property@0.0.4
check@1.0.6
chriswessels:hammer@3.1.1
clinical:nightwatch@2.0.1

View File

@ -20,8 +20,8 @@ config.lockOnJoin = true
config.app = {}
#default font sizes for mobile / desktop
config.app.mobileFont = 24
config.app.desktopFont = 14
config.app.mobileFont=16
config.app.desktopFont=14
# Will offer the user to join the audio when entering the meeting
config.app.autoJoinAudio = false

View File

@ -68,8 +68,10 @@ Meteor.methods
from_username: messageObject.from_username
from_lang: messageObject.from_lang
}, (err, numChanged) ->
if err?
Meteor.log.error "_error #{err} when adding chat to collection"
if numChanged.insertedId?
Meteor.log.error "added chat id=[#{numChanged.insertedId}]
Meteor.log.info "_added chat id=[#{numChanged.insertedId}]
#{messageObject.from_username} to #{'PUBLIC' if messageObject.to_username?}:#{messageObject.message}")
# called on server start and meeting end

View File

@ -1,39 +1,48 @@
# --------------------------------------------------------------------------------------------
# Private methods on server
# --------------------------------------------------------------------------------------------
@addMeetingToCollection = (meetingId, name, intendedForRecording, voiceConf, duration) ->
#check if the meeting is already in the collection
obj = Meteor.Meetings.upsert({meetingId:meetingId}, {$set: {
meetingName:name
intendedForRecording: intendedForRecording
currentlyBeingRecorded: false # defaut value
voiceConf: voiceConf
duration: duration
roomLockSettings:
# by default the lock settings will be disabled on meeting create
disablePrivateChat: false
disableCam: false
disableMic: false
lockOnJoin: Meteor.config.lockOnJoin
lockedLayout: false
disablePublicChat: false
lockOnJoinConfigurable: false # TODO
}}, (err, numChanged) ->
if numChanged.insertedId?
Meteor.log.error "added MEETING #{meetingId}")
@addMeetingToCollection = (meetingId, name, intendedForRecording, voiceConf, duration, callback) ->
#check if the meeting is already in the collection
obj = Meteor.Meetings.upsert({meetingId:meetingId}, {$set: {
meetingName:name
intendedForRecording: intendedForRecording
currentlyBeingRecorded: false # default value
voiceConf: voiceConf
duration: duration
roomLockSettings:
# by default the lock settings will be disabled on meeting create
disablePrivateChat: false
disableCam: false
disableMic: false
lockOnJoin: Meteor.config.lockOnJoin
lockedLayout: false
disablePublicChat: false
lockOnJoinConfigurable: false # TODO
}}, (err, numChanged) =>
if numChanged.insertedId?
funct = (cbk) ->
Meteor.log.info "added MEETING #{meetingId}"
cbk()
funct(callback)
else
Meteor.log.error "nothing happened"
callback()
)
@clearMeetingsCollection = (meetingId) ->
if meetingId?
Meteor.Meetings.remove({meetingId: meetingId}, Meteor.log.info "cleared Meetings Collection (meetingId: #{meetingId}!")
Meteor.Meetings.remove({meetingId: meetingId},
Meteor.log.info "cleared Meetings Collection (meetingId: #{meetingId}!")
else
Meteor.Meetings.remove({}, Meteor.log.info "cleared Meetings Collection (all meetings)!")
#clean up upon a meeting's end
@removeMeetingFromCollection = (meetingId) ->
@removeMeetingFromCollection = (meetingId, callback) ->
if Meteor.Meetings.findOne({meetingId: meetingId})?
Meteor.log.info "end of meeting #{meetingId}. Clear the meeting data from all collections"
# delete all users in the meeting
@ -53,6 +62,16 @@
# delete the meeting
clearMeetingsCollection(meetingId)
callback()
else
funct (localCallback) ->
Meteor.log.error ("Error! There was no such meeting #{meetingId}")
localCallback()
funct(callback)
# --------------------------------------------------------------------------------------------
# end Private methods on server
# --------------------------------------------------------------------------------------------

View File

@ -139,7 +139,8 @@ Meteor.methods
# Received information from BBB-Apps that a user left
# Need to update the collection
# params: meetingid, userid as defined in BBB-Apps
@markUserOffline = (meetingId, userId) ->
# callback
@markUserOffline = (meetingId, userId, callback) ->
# mark the user as offline. remove from the collection on meeting_end #TODO
user = Meteor.Users.findOne({meetingId: meetingId, userId: userId})
if user?.clientType is "HTML5"
@ -153,16 +154,30 @@ Meteor.methods
'user.listenOnly': false
}}, (err, numChanged) ->
if err?
Meteor.log.error "_unsucc update (mark as offline) of user #{user?.user.name} #{userId} err=#{JSON.stringify err}"
if numChanged?
Meteor.log.info "_marking as offline html5 user #{user?.user.name} #{userId} numChanged=#{numChanged}"
Meteor.log.error "_unsucc update (mark as offline) of user #{user?.user.name} #{userId}
err=#{JSON.stringify err}"
callback()
else
funct = (cbk) ->
Meteor.log.info "_marking as offline html5 user #{user?.user.name}
#{userId} numChanged=#{numChanged}"
cbk()
funct(callback)
)
else
Meteor.Users.remove({meetingId: meetingId, userId: userId}, (err, numDeletions) ->
if err?
Meteor.log.error "_unsucc deletion of user #{user?.user.name} #{userId} err=#{JSON.stringify err}"
if numDeletions?
Meteor.log.info "_deleting info for user #{user?.user.name} #{userId} numDeletions=#{numDeletions}"
Meteor.log.error "_unsucc deletion of user #{user?.user.name} #{userId}
err=#{JSON.stringify err}"
callback()
else
funct = (cbk) ->
Meteor.log.info "_deleting info for user #{user?.user.name} #{userId}
numDeletions=#{numDeletions}"
cbk()
funct(callback)
)
@ -204,28 +219,66 @@ Meteor.methods
#update a voiceUser - a helper method
@updateVoiceUser = (meetingId, voiceUserObject) ->
@updateVoiceUser = (meetingId, voiceUserObject, callback) ->
u = Meteor.Users.findOne userId: voiceUserObject.web_userid
if u?
if voiceUserObject.talking?
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.talking':voiceUserObject.talking}}) # talking
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid},
{$set: {'user.voiceUser.talking':voiceUserObject.talking}},
(err, numChanged) ->
if err?
Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid}
[talking] err=#{JSON.stringify err}"
callback()
) # talking
if voiceUserObject.joined?
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.joined':voiceUserObject.joined}}) # joined
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid},
{$set: {'user.voiceUser.joined':voiceUserObject.joined}},
(err, numChanged) ->
if err?
Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid}
[joined] err=#{JSON.stringify err}"
callback()
) # joined
if voiceUserObject.locked?
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.locked':voiceUserObject.locked}}) # locked
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid},
{$set: {'user.voiceUser.locked':voiceUserObject.locked}},
(err, numChanged) ->
if err?
Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid}
[locked] err=#{JSON.stringify err}"
callback()
) # locked
if voiceUserObject.muted?
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.muted':voiceUserObject.muted}}) # muted
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid},
{$set: {'user.voiceUser.muted':voiceUserObject.muted}},
(err, numChanged) ->
if err?
Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid}
[muted] err=#{JSON.stringify err}"
callback()
) # muted
if voiceUserObject.listen_only?
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.listenOnly':voiceUserObject.listen_only}}) # listenOnly
Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid},
{$set: {'user.listenOnly':voiceUserObject.listen_only}},
(err, numChanged) ->
if err?
Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid}
[listenOnly] err=#{JSON.stringify err}"
callback()
) # listenOnly
else
Meteor.log.error "ERROR! did not find such voiceUser!"
callback()
@userJoined = (meetingId, user) ->
@userJoined = (meetingId, user, callback) ->
userId = user.userid
u = Meteor.Users.findOne({userId:user.userid, meetingId: meetingId})
# the collection already contains an entry for this user because
# we added a dummy user on register_user_message (to save authToken)
# the collection already contains an entry for this user
# because the user is reconnecting OR
# in the case of an html5 client user we added a dummy user on
# register_user_message (to save authToken)
if u? and u.authToken?
Meteor.Users.update({userId:user.userid, meetingId: meetingId}, {$set:{
user:
@ -255,31 +308,48 @@ Meteor.methods
locked: user.voiceUser.locked
muted: user.voiceUser.muted
webcam_stream: user.webcam_stream
}}, (err, numChanged) ->
Meteor.log.info "_(case1) UPDATING USER #{user.userid}, authToken=#{u.authToken},
locked=#{user.locked}, username=#{user.name}"
}}, (err) ->
if err?
Meteor.log.error "_error #{err} when trying to insert user #{userId}"
callback()
else
funct = (cbk) ->
Meteor.log.info "_(case1) UPDATING USER #{user.userid}, authToken=
#{u.authToken}, locked=#{user.locked}, username=#{user.name}"
cbk()
funct(callback)
)
# only add the welcome message if it's not there already
unless Meteor.Chat.findOne({"message.chat_type":'SYSTEM_MESSAGE', "message.to_userid": userId})?
welcomeMessage = Meteor.config.defaultWelcomeMessage
.replace /%%CONFNAME%%/, Meteor.Meetings.findOne({meetingId: meetingId})?.meetingName
welcomeMessage = welcomeMessage + Meteor.config.defaultWelcomeMessageFooter
# store the welcome message in chat for easy display on the client side
Meteor.Chat.insert({
meetingId: meetingId
message:
chat_type: "SYSTEM_MESSAGE"
message: welcomeMessage
from_color: '0x3399FF'
to_userid: userId
from_userid: "SYSTEM_MESSAGE"
from_username: ""
from_time: user.timeOfJoining?.toString()
}, (err) ->
Meteor.log.info "_added a system message in chat for user #{userId}"
)
welcomeMessage = Meteor.config.defaultWelcomeMessage
.replace /%%CONFNAME%%/, Meteor.Meetings.findOne({meetingId: meetingId})?.meetingName
welcomeMessage = welcomeMessage + Meteor.config.defaultWelcomeMessageFooter
# add the welcome message if it's not there already OR update time_of_joining
Meteor.Chat.upsert({
meetingId: meetingId
userId: userId
'message.chat_type': 'SYSTEM_MESSAGE'
'message.to_userid': userId
}, {
meetingId: meetingId
userId: userId
message:
chat_type: 'SYSTEM_MESSAGE'
message: welcomeMessage
from_color: '0x3399FF'
to_userid: userId
from_userid: 'SYSTEM_MESSAGE'
from_username: ''
from_time: user.timeOfJoining?.toString()
}, (err) ->
if err?
Meteor.log.error "_error #{err} when trying to insert welcome message for #{userId}"
else
Meteor.log.info "_added/updated a system message in chat for user #{userId}"
# note that we already called callback() when updating the user. Adding
# the welcome message in the chat is not as vital and we can afford to
# complete it when possible, without blocking the serial event messages processing
)
else
# Meteor.log.info "NOTE: got user_joined_message #{user.name} #{user.userid}"
@ -314,14 +384,21 @@ Meteor.methods
webcam_stream: user.webcam_stream
}, (err, numChanged) ->
if numChanged.insertedId?
Meteor.log.info "_joining user (case2) userid=[#{userId}]:#{user.name}.
Users.size is now #{Meteor.Users.find({meetingId: meetingId}).count()}")
funct = (cbk) ->
Meteor.log.info "_joining user (case2) userid=[#{userId}]:#{user.name}.
Users.size is now #{Meteor.Users.find({meetingId: meetingId}).count()}"
cbk()
funct(callback)
else
callback()
)
@createDummyUser = (meetingId, userId, authToken) ->
if Meteor.Users.findOne({userId:userId, meetingId: meetingId, authToken:authToken})?
Meteor.log.info "html5 user userid:[#{userId}] from [#{meetingId}] tried to revalidate token"
Meteor.log.info "html5 user userId:[#{userId}] from [#{meetingId}] tried to revalidate token"
else
Meteor.Users.insert({
meetingId: meetingId
@ -330,7 +407,7 @@ Meteor.methods
clientType: "HTML5"
validated: false #will be validated on validate_auth_token_reply
}, (err, id) ->
Meteor.log.info "_added a dummy html5 user with: userid=[#{userId}], id=[#{id}]
Meteor.log.info "_added a dummy html5 user with: userId=[#{userId}]
Users.size is now #{Meteor.Users.find({meetingId: meetingId}).count()}"
)

View File

@ -1,5 +1,4 @@
Meteor.methods
# Construct and send a message to bbb-web to validate the user
validateAuthToken: (meetingId, userId, authToken) ->
Meteor.log.info "sending a validate_auth_token with",
@ -23,6 +22,7 @@ Meteor.methods
else
Meteor.log.info "did not have enough information to send a validate_auth_token message"
class Meteor.RedisPubSub
constructor: (callback) ->
Meteor.log.info "constructor RedisPubSub"
@ -33,9 +33,10 @@ class Meteor.RedisPubSub
Meteor.log.info("Subscribing message on channel: #{Meteor.config.redis.channels.fromBBBApps}")
@subClient.on "psubscribe", Meteor.bindEnvironment(@_onSubscribe)
@subClient.on "pmessage", Meteor.bindEnvironment(@_onMessage)
@subClient.on "pmessage", Meteor.bindEnvironment(@_Q)
@subClient.psubscribe(Meteor.config.redis.channels.fromBBBApps)
callback @
_onSubscribe: (channel, count) =>
@ -48,331 +49,31 @@ class Meteor.RedisPubSub
"payload": {} # I need this, otherwise bbb-apps won't recognize the message
publish Meteor.config.redis.channels.toBBBApps.meeting, message
_Q: (pattern, channel, jsonMsg) =>
message = JSON.parse(jsonMsg)
eventName = message.header.name
console.log "Q #{eventName} #{Meteor.myQueue.total()}"
Meteor.myQueue.add({
pattern: pattern
channel: channel
jsonMsg: jsonMsg
})
_onMessage: (pattern, channel, jsonMsg) =>
# TODO: this has to be in a try/catch block, otherwise the server will
# crash if the message has a bad format
console.log "_onMessage"
message = JSON.parse(jsonMsg)
correlationId = message.payload?.reply_to or message.header?.reply_to
meetingId = message.payload?.meeting_id
# just because it's common. we handle it anyway
notLoggedEventTypes = [
"keep_alive_reply"
"page_resized_message"
"presentation_page_resized_message"
"presentation_cursor_updated_message"
"get_presentation_info_reply"
"get_users_reply"
"get_chat_history_reply"
"get_all_meetings_reply"
"presentation_shared_message"
"presentation_conversion_done_message"
"presentation_conversion_progress_message"
"presentation_page_generated_message"
# "presentation_page_changed_message"
"BbbPubSubPongMessage"
"bbb_apps_is_alive_message"
"user_voice_talking_message"
"meeting_state_message"
"get_recording_status_reply"
]
if message?.header? and message?.payload?
unless message.header.name in notLoggedEventTypes
Meteor.log.info "redis incoming message #{message.header.name} ",
message: jsonMsg
# handle voice events
if message.header.name in ['user_left_voice_message', 'user_joined_voice_message', 'user_voice_talking_message', 'user_voice_muted_message']
if message.payload.user?
updateVoiceUser meetingId,
'web_userid': message.payload.user.voiceUser.web_userid
'listen_only': message.payload.listen_only
'talking': message.payload.user.voiceUser.talking
'joined': message.payload.user.voiceUser.joined
'locked': message.payload.user.voiceUser.locked
'muted': message.payload.user.voiceUser.muted
return
# listen only
if message.header.name is 'user_listening_only'
updateVoiceUser meetingId, {'web_userid': message.payload.userid, 'listen_only': message.payload.listen_only}
# most likely we don't need to ensure that the user's voiceUser's {talking, joined, muted, locked} are false by default #TODO?
return
if message.header.name is "get_all_meetings_reply"
Meteor.log.info "Let's store some data for the running meetings so that when an HTML5 client joins everything is ready!"
listOfMeetings = message.payload.meetings
for meeting in listOfMeetings
# we currently do not have voice_conf or duration in this message.
addMeetingToCollection meeting.meetingID, meeting.meetingName, meeting.recorded, meeting.voiceBridge, meeting.duration
return
if message.header.name is "get_users_reply" and message.payload.requester_id is "nodeJSapp"
unless Meteor.Meetings.findOne({MeetingId: message.payload.meeting_id})?
users = message.payload.users
for user in users
user.timeOfJoining = message.header.current_time # TODO this might need to be removed
if user.emoji_status isnt 'none' and typeof user.emoji_status is 'string'
user.set_emoji_time = new Date()
userJoined meetingId, user
return
if message.header.name is "validate_auth_token_reply"
user = Meteor.Users.findOne({userId:message.payload.userid, meetingId: message.payload.meeting_id})
validStatus = message.payload.valid
# if the user already exists in the db
if user?.clientType is "HTML5"
#if the html5 client user was validated successfully, add a flag
Meteor.Users.update({userId:message.payload.userid, meetingId:message.payload.meeting_id}, {$set:{validated: validStatus}})
Meteor.log.info "user.validated for user #{user.userId} in meeting #{user.meetingId} just
became #{Meteor.Users.findOne({userId:message.payload.userid, meetingId: message.payload.meeting_id})?.validated}"
else
Meteor.log.info "a non-html5 user got validate_auth_token_reply."
return
if message.header.name is "user_registered_message"
#createDummyUser message.payload.meeting_id, message.payload.user
return
if message.header.name is "user_joined_message"
userObj = message.payload.user
dbUser = Meteor.Users.findOne({userId: message.payload.user.userid, meetingId: message.payload.meeting_id})
# On attempting reconnection of Flash clients (in voiceBridge) we receive an extra user_joined_message
# Ignore it as it will add an extra user in the userlist, creating discrepancy with the list in the Flash client
if dbUser?.user?.connection_status is "offline" and message.payload.user?.phone_user
Meteor.log.error "offline AND phone user"
return # without joining the user
else
if dbUser?.clientType is "HTML5" # typically html5 users will be in the db [as a dummy user] before the joining message
status = dbUser?.validated
Meteor.log.info "in user_joined_message the validStatus of the user was #{status}"
userJoined meetingId, userObj
return
if message.header.name is "user_left_message"
userId = message.payload.user?.userid
if userId? and meetingId?
markUserOffline meetingId, userId
return
if message.header.name is "disconnect_user_message"
Meteor.log.info "a user(#{message.payload.userid}) was kicked out from meeting(#{message.payload.meeting_id})"
return
if message.header.name is "get_chat_history_reply" and message.payload.requester_id is "nodeJSapp"
unless Meteor.Meetings.findOne({MeetingId: message.payload.meeting_id})?
for chatMessage in message.payload.chat_history
addChatToCollection meetingId, chatMessage
return
if message.header.name is "send_public_chat_message" or message.header.name is "send_private_chat_message"
messageObject = message.payload.message
# use current_time instead of message.from_time so that the chats from Flash and HTML5 have uniform times
messageObject.from_time = message.header.current_time
addChatToCollection meetingId, messageObject
return
if message.header.name is "meeting_created_message"
meetingName = message.payload.name
intendedForRecording = message.payload.recorded
voiceConf = message.payload.voice_conf
duration = message.payload.duration
addMeetingToCollection meetingId, meetingName, intendedForRecording, voiceConf, duration
return
if message.header.name is "presentation_shared_message"
presentationId = message.payload.presentation?.id
# change the currently displayed presentation to presentation.current = false
Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId},{$set: {"presentation.current": false}})
#update(if already present) entirely the presentation with the fresh data
removePresentationFromCollection meetingId, presentationId
addPresentationToCollection meetingId, message.payload.presentation
for slide in message.payload.presentation?.pages
addSlideToCollection meetingId, message.payload.presentation?.id, slide
if slide.current
displayThisSlide meetingId, slide.id, slide
return
if message.header.name is "get_presentation_info_reply" and message.payload.requester_id is "nodeJSapp"
for presentation in message.payload.presentations
addPresentationToCollection meetingId, presentation
for page in presentation.pages
#add the slide to the collection
addSlideToCollection meetingId, presentation.id, page
#request for shapes
whiteboardId = "#{presentation.id}/#{page.num}" # d2d9a672040fbde2a47a10bf6c37b6a4b5ae187f-1404411622872/1
#Meteor.log.info "the whiteboard_id here is:" + whiteboardId
message =
"payload":
"meeting_id": meetingId
"requester_id": "nodeJSapp"
"whiteboard_id": whiteboardId
"header":
"timestamp": new Date().getTime()
"name": "get_whiteboard_shapes_request"
"version": "0.0.1"
if whiteboardId? and meetingId?
publish Meteor.config.redis.channels.toBBBApps.whiteboard, message #TODO
else
Meteor.log.info "did not have enough information to send a user_leaving_request" #TODO
return
if message.header.name is "presentation_page_changed_message"
newSlide = message.payload.page
displayThisSlide meetingId, newSlide?.id, newSlide
return
if message.header.name is "get_whiteboard_shapes_reply" and message.payload.requester_id is "nodeJSapp"
# Create a whiteboard clean status or find one for the current meeting
if not Meteor.WhiteboardCleanStatus.findOne({meetingId: meetingId})?
Meteor.WhiteboardCleanStatus.insert({meetingId: meetingId, in_progress: false})
for shape in message.payload.shapes
whiteboardId = shape.wb_id
addShapeToCollection meetingId, whiteboardId, shape
return
if message.header.name is "send_whiteboard_shape_message"
#Meteor stringifies an array of JSONs (...shape.result) in this message
#parsing the String and reassigning the value
if message.payload.shape.shape_type is "poll_result" and typeof message.payload.shape.shape.result is 'string'
message.payload.shape.shape.result = JSON.parse message.payload.shape.shape.result
shape = message.payload.shape
whiteboardId = shape?.wb_id
addShapeToCollection meetingId, whiteboardId, shape
return
if message.header.name is "presentation_cursor_updated_message"
x = message.payload.x_percent
y = message.payload.y_percent
Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId},{$set: {"pointer.x": x, "pointer.y": y}})
return
if message.header.name is "whiteboard_cleared_message"
whiteboardId = message.payload.whiteboard_id
Meteor.WhiteboardCleanStatus.update({meetingId: meetingId}, {$set: {'in_progress': true}})
removeAllShapesFromSlide meetingId, whiteboardId
return
if message.header.name is "undo_whiteboard_request"
whiteboardId = message.payload.whiteboard_id
shapeId = message.payload.shape_id
removeShapeFromSlide meetingId, whiteboardId, shapeId
return
if message.header.name is "presenter_assigned_message"
newPresenterId = message.payload.new_presenter_id
if newPresenterId?
# reset the previous presenter
Meteor.Users.update({"user.presenter": true, meetingId: meetingId},{$set: {"user.presenter": false}})
# set the new presenter
Meteor.Users.update({"user.userid": newPresenterId, meetingId: meetingId},{$set: {"user.presenter": true}})
return
if message.header.name is "presentation_page_resized_message"
slideId = message.payload.page?.id
heightRatio = message.payload.page?.height_ratio
widthRatio = message.payload.page?.width_ratio
xOffset = message.payload.page?.x_offset
yOffset = message.payload.page?.y_offset
presentationId = slideId.split("/")[0]
Meteor.Slides.update({presentationId: presentationId, "slide.current": true}, {$set: {"slide.height_ratio": heightRatio, "slide.width_ratio": widthRatio, "slide.x_offset": xOffset, "slide.y_offset": yOffset}})
return
if message.header.name is "user_emoji_status_message"
userId = message.payload.userid
meetingId = message.payload.meeting_id
emojiStatus = message.payload.emoji_status
if userId? and meetingId?
set_emoji_time = new Date()
Meteor.Users.update({"user.userid": userId},{$set: {"user.set_emoji_time": set_emoji_time, "user.emoji_status": emojiStatus}})
return
if message.header.name is "recording_status_changed_message"
intendedForRecording = message.payload.recorded
currentlyBeingRecorded = message.payload.recording
Meteor.Meetings.update({meetingId: meetingId, intendedForRecording: intendedForRecording}, {$set: {currentlyBeingRecorded: currentlyBeingRecorded}})
return
# --------------------------------------------------
# lock settings ------------------------------------
if message.header.name is "eject_voice_user_message"
return
if message.header.name is "new_permission_settings"
oldSettings = Meteor.Meetings.findOne({meetingId:meetingId})?.roomLockSettings
newSettings = message.payload?.permissions
# if the disableMic setting was turned on
if !oldSettings?.disableMic and newSettings.disableMic
handleLockingMic(meetingId, newSettings)
# substitute with the new lock settings
Meteor.Meetings.update({meetingId: meetingId}, {$set: {
'roomLockSettings.disablePrivateChat': newSettings.disablePrivateChat
'roomLockSettings.disableCam': newSettings.disableCam
'roomLockSettings.disableMic': newSettings.disableMic
'roomLockSettings.lockOnJoin': newSettings.lockOnJoin
'roomLockSettings.lockedLayout': newSettings.lockedLayout
'roomLockSettings.disablePublicChat': newSettings.disablePublicChat
'roomLockSettings.lockOnJoinConfigurable': newSettings.lockOnJoinConfigurable #TODO
}})
return
if message.header.name is "user_locked_message" or message.header.name is "user_unlocked_message"
userId = message.payload.userid
isLocked = message.payload.locked
setUserLockedStatus(meetingId, userId, isLocked)
return
if message.header.name in ["meeting_ended_message", "meeting_destroyed_event",
"end_and_kick_all_message", "disconnect_all_users_message"]
if Meteor.Meetings.findOne({meetingId: meetingId})?
Meteor.log.info "there are #{Meteor.Users.find({meetingId: meetingId}).count()} users in the meeting"
for user in Meteor.Users.find({meetingId: meetingId}).fetch()
markUserOffline meetingId, user.userId
#TODO should we clear the chat messages for that meeting?!
unless message.header.name is "disconnect_all_users_message"
removeMeetingFromCollection meetingId
return
if message.header.name is "poll_started_message"
if message.payload.meeting_id? and message.payload.requester_id? and message.payload.poll?
if Meteor.Meetings.findOne({meetingId: message.payload.meeting_id})?
#initializing the list of current users
users = Meteor.Users.find({meetingId: message.payload.meeting_id}, {fields:{"user.userid": 1, _id: 0}} ).fetch()
addPollToCollection message.payload.poll, message.payload.requester_id, users, message.payload.meeting_id
if message.header.name is "poll_stopped_message"
meetingId = message.payload.meeting_id
poll_id = message.payload.poll_id
clearPollCollection meetingId, poll_id
if message.header.name is "user_voted_poll_message"
if message.payload?.poll? and message.payload.meeting_id? and message.payload.presenter_id?
pollObj = message.payload.poll
meetingId = message.payload.meeting_id
requesterId = message.payload.presenter_id
updatePollCollection pollObj, meetingId, requesterId
if message.header.name is "poll_show_result_message"
if message.payload.poll.id? and message.payload.meeting_id?
poll_id = message.payload.poll.id
meetingId = message.payload.meeting_id
clearPollCollection meetingId, poll_id
# --------------------------------------------------------------------------------------------
# Private methods on server

View File

@ -14,3 +14,480 @@ Meteor.startup ->
# create create a PubSub connection, start listening
Meteor.redisPubSub = new Meteor.RedisPubSub(->
Meteor.log.info "created pubsub")
Meteor.myQueue = new PowerQueue({
# autoStart:true
# isPaused: true
})
Meteor.myQueue.taskHandler = (data, next, failures) ->
eventName = JSON.parse(data.jsonMsg)?.header.name
if failures > 0
Meteor.log.error "got a failure on taskHandler #{eventName} #{failures}"
else
handleRedisMessage(data, ()->
Meteor.log.error "in callback after handleRedisMessage #{eventName}"
next()
)
# To ensure that we process the redis json event messages serially we use a
# callback. This callback is to be called when the Meteor collection is
# updated with the information coming in the payload of the json message. The
# callback signalizes to the queue that we are done processing the current
# message in the queue and are ready to move on to the next one. If we do not
# use the callback mechanism we may encounter a race condition situation
# due to not following the order of events coming through the redis pubsub.
# for example: a user_left event reaching the collection before a user_joined
# for the same user.
@handleRedisMessage = (data, callback) ->
message = JSON.parse(data.jsonMsg)
# correlationId = message.payload?.reply_to or message.header?.reply_to
meetingId = message.payload?.meeting_id
# Avoid cluttering the log with json messages carrying little or repetitive
# information. Comment out a message type in the array to be able to see it
# in the log upon restarting of the Meteor process.
notLoggedEventTypes = [
"keep_alive_reply"
"page_resized_message"
"presentation_page_resized_message"
"presentation_cursor_updated_message"
"get_presentation_info_reply"
"get_users_reply"
"get_chat_history_reply"
"get_all_meetings_reply"
"get_whiteboard_shapes_reply"
"presentation_shared_message"
"presentation_conversion_done_message"
"presentation_conversion_progress_message"
"presentation_page_generated_message"
# "presentation_page_changed_message"
"BbbPubSubPongMessage"
"bbb_apps_is_alive_message"
"user_voice_talking_message"
"meeting_state_message"
"get_recording_status_reply"
]
# TODO check if message
eventName = message.header.name
meetingId = message.payload?.meeting_id
# we currently disregard the pattern and channel
# Meteor.log.info "in handleRedisMessage #{eventName}"
if message?.header? and message?.payload?
if eventName is 'meeting_created_message'
# Meteor.log.error JSON.stringify message
meetingName = message.payload.name
intendedForRecording = message.payload.recorded
voiceConf = message.payload.voice_conf
duration = message.payload.duration
addMeetingToCollection meetingId, meetingName, intendedForRecording,
voiceConf, duration, callback
# handle voice events
else if message.payload.user? and eventName in [
'user_left_voice_message'
'user_joined_voice_message'
'user_voice_talking_message'
'user_voice_muted_message']
voiceUserObj = {
'web_userid': message.payload.user.voiceUser.web_userid
'listen_only': message.payload.listen_only
'talking': message.payload.user.voiceUser.talking
'joined': message.payload.user.voiceUser.joined
'locked': message.payload.user.voiceUser.locked
'muted': message.payload.user.voiceUser.muted
}
updateVoiceUser meetingId, voiceUserObj, callback
else if eventName is 'user_listening_only'
voiceUserObj = {
'web_userid': message.payload.userid
'listen_only': message.payload.listen_only
}
updateVoiceUser meetingId, voiceUserObj, callback
else if eventName is 'get_all_meetings_reply'
Meteor.log.info "Let's store some data for the running meetings
so that when an HTML5 client joins everything is ready!"
Meteor.log.info JSON.stringify(message)
listOfMeetings = message.payload.meetings
# Processing the meetings recursively with a callback to notify us,
# ensuring that we update the meeting collection serially
processMeeting = () ->
meeting = listOfMeetings.pop()
if meeting?
addMeetingToCollection meeting.meetingID, meeting.meetingName,
meeting.recorded, meeting.voiceBridge, meeting.duration, processMeeting
else
callback() # all meeting arrays (if any) have been processed
processMeeting()
else if eventName is 'user_joined_message'
Meteor.log.error "\n\n user_joined_message \n\n" + JSON.stringify message
userObj = message.payload.user
dbUser = Meteor.Users.findOne({userId: userObj.userid, meetingId: message.payload.meeting_id})
# On attempting reconnection of Flash clients (in voiceBridge) we receive
# an extra user_joined_message. Ignore it as it will add an extra user
# in the user list, creating discrepancy with the list in the Flash client
if dbUser?.user?.connection_status is "offline" and message.payload.user?.phone_user
Meteor.log.error "offline AND phone user"
callback() #return without joining the user
else
if dbUser?.clientType is "HTML5" # typically html5 users will be in
# the db [as a dummy user] before the joining message
status = dbUser?.validated
Meteor.log.info "in user_joined_message the validStatus
of the user was #{status}"
userObj.timeOfJoining = message.header.current_time
userJoined meetingId, userObj, callback
else
userJoined meetingId, userObj, callback
# only process if requester is nodeJSapp means only process in the case when
# we explicitly request the users
else if eventName is 'get_users_reply' and message.payload.requester_id is 'nodeJSapp'
Meteor.log.error JSON.stringify(message)
if !Meteor.Meetings.findOne({meetingId: meetingId})? #TODO consider removing this cond
users = message.payload.users
#TODO make the serialization be split per meeting. This will allow us to
# use N threads vs 1 and we'll take advantage of Mongo's concurrency tricks
# Processing the users recursively with a callback to notify us,
# ensuring that we update the users collection serially
processUser = () ->
console.log "1"
user = users.pop()
if user?
console.log "2"
user.timeOfJoining = message.header.current_time
if user.emoji_status isnt 'none' and typeof user.emoji_status is 'string'
console.log "3"
user.set_emoji_time = new Date()
userJoined meetingId, user, processUser
else
console.error("this is not supposed to happen")
userJoined meetingId, user, processUser
else
console.log "4"
callback() # all meeting arrays (if any) have been processed
processUser()
else
callback() #TODO test if we get here
else if eventName is 'validate_auth_token_reply'
userId = message.payload.userid
user = Meteor.Users.findOne({userId:userId, meetingId: meetingId})
validStatus = message.payload.valid
# if the user already exists in the db
if user?.clientType is "HTML5"
#if the html5 client user was validated successfully, add a flag
Meteor.Users.update({userId:userId, meetingId:message.payload.meeting_id},
{$set:{validated: validStatus}},
(err, numChanged) ->
if numChanged.insertedId?
funct = (cbk) ->
val=Meteor.Users.findOne({userId:userId, meetingId: message.payload.meeting_id})?.validated
Meteor.log.info "user.validated for user #{userId} in meeting #{user.meetingId} just became #{val}"
cbk()
funct(callback)
else
callback()
)
else
Meteor.log.info "a non-html5 user got validate_auth_token_reply."
callback()
else if eventName is 'user_left_message'
userId = message.payload.user?.userid
if userId? and meetingId?
markUserOffline meetingId, userId, callback
else
callback() #TODO check how to get these cases out and reuse code
# for now not handling this serially #TODO
else if eventName is 'presenter_assigned_message'
newPresenterId = message.payload.new_presenter_id
if newPresenterId?
# reset the previous presenter
Meteor.Users.update({"user.presenter": true, meetingId: meetingId},
{$set: {"user.presenter": false}},
(err, numUpdated) ->
Meteor.log.info(" Updating old presenter numUpdated=#{numUpdated},
err=#{err}")
)
# set the new presenter
Meteor.Users.update({"user.userid": newPresenterId, meetingId: meetingId},
{$set: {"user.presenter": true}},
(err, numUpdated) ->
Meteor.log.info(" Updating new presenter numUpdated=#{numUpdated},
err=#{err}")
)
callback()
# for now not handling this serially #TODO
else if eventName is 'user_emoji_status_message'
userId = message.payload.userid
meetingId = message.payload.meeting_id
emojiStatus = message.payload.emoji_status
if userId? and meetingId?
set_emoji_time = new Date()
Meteor.Users.update({"user.userid": userId},
{$set:{"user.set_emoji_time":set_emoji_time,"user.emoji_status":emojiStatus}},
(err, numUpdated) ->
Meteor.log.info(" Updating emoji numUpdated=#{numUpdated}, err=#{err}")
)
callback()
# for now not handling this serially #TODO
else if eventName in ['user_locked_message', 'user_unlocked_message']
userId = message.payload.userid
isLocked = message.payload.locked
setUserLockedStatus(meetingId, userId, isLocked)
callback()
# for now not handling this serially #TODO
else if eventName in ["meeting_ended_message", "meeting_destroyed_event",
"end_and_kick_all_message", "disconnect_all_users_message"]
Meteor.log.info("DESTROYING MEETING #{meetingId}")
removeMeetingFromCollection meetingId, callback
###
if Meteor.Meetings.findOne({meetingId: meetingId})?
count=Meteor.Users.find({meetingId: meetingId}).count()
Meteor.log.info "there are #{count} users in the meeting"
for user in Meteor.Users.find({meetingId: meetingId}).fetch()
markUserOffline meetingId, user.userId
#TODO should we clear the chat messages for that meeting?!
unless eventName is "disconnect_all_users_message"
removeMeetingFromCollection meetingId
###
# for now not handling this serially #TODO
else if eventName is "get_chat_history_reply" and message.payload.requester_id is "nodeJSapp"
unless Meteor.Meetings.findOne({MeetingId: message.payload.meeting_id})?
for chatMessage in message.payload.chat_history
addChatToCollection meetingId, chatMessage
callback()
# for now not handling this serially #TODO
else if eventName is "send_public_chat_message" or eventName is "send_private_chat_message"
messageObject = message.payload.message
# use current_time instead of message.from_time so that the chats from Flash and HTML5 have uniform times
messageObject.from_time = message.header.current_time
addChatToCollection meetingId, messageObject
callback()
# for now not handling this serially #TODO
else if eventName is "presentation_shared_message"
presentationId = message.payload.presentation?.id
# change the currently displayed presentation to presentation.current = false
Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId},
{$set: {"presentation.current": false}})
#update(if already present) entirely the presentation with the fresh data
removePresentationFromCollection meetingId, presentationId
addPresentationToCollection meetingId, message.payload.presentation
for slide in message.payload.presentation?.pages
addSlideToCollection meetingId, message.payload.presentation?.id, slide
if slide.current
displayThisSlide meetingId, slide.id, slide
callback()
# for now not handling this serially #TODO
else if eventName is "get_presentation_info_reply" and message.payload.requester_id is "nodeJSapp"
for presentation in message.payload.presentations
addPresentationToCollection meetingId, presentation
for page in presentation.pages
#add the slide to the collection
addSlideToCollection meetingId, presentation.id, page
#request for shapes
whiteboardId = "#{presentation.id}/#{page.num}" # d2d9a672040fbde2a47a10bf6c37b6a4b5ae187f-1404411622872/1
#Meteor.log.info "the whiteboard_id here is:" + whiteboardId
message =
"payload":
"meeting_id": meetingId
"requester_id": "nodeJSapp"
"whiteboard_id": whiteboardId
"header":
"timestamp": new Date().getTime()
"name": "get_whiteboard_shapes_request"
"version": "0.0.1"
if whiteboardId? and meetingId?
publish Meteor.config.redis.channels.toBBBApps.whiteboard, message #TODO
else
Meteor.log.info "did not have enough information to send a user_leaving_request" #TODO
callback()
# for now not handling this serially #TODO
else if eventName is "presentation_page_changed_message"
newSlide = message.payload.page
displayThisSlide meetingId, newSlide?.id, newSlide
callback()
# for now not handling this serially #TODO
else if eventName is "get_whiteboard_shapes_reply" and message.payload.requester_id is "nodeJSapp"
# Create a whiteboard clean status or find one for the current meeting
if not Meteor.WhiteboardCleanStatus.findOne({meetingId: meetingId})?
Meteor.WhiteboardCleanStatus.insert({meetingId: meetingId, in_progress: false})
for shape in message.payload.shapes
whiteboardId = shape.wb_id
addShapeToCollection meetingId, whiteboardId, shape
callback()
# for now not handling this serially #TODO
else if eventName is "send_whiteboard_shape_message"
#Meteor stringifies an array of JSONs (...shape.result) in this message
#parsing the String and reassigning the value
if message.payload.shape.shape_type is "poll_result" and typeof message.payload.shape.shape.result is 'string'
message.payload.shape.shape.result = JSON.parse message.payload.shape.shape.result
shape = message.payload.shape
whiteboardId = shape?.wb_id
addShapeToCollection meetingId, whiteboardId, shape
callback()
# for now not handling this serially #TODO
else if eventName is "presentation_cursor_updated_message"
x = message.payload.x_percent
y = message.payload.y_percent
Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId},
{$set: {"pointer.x": x, "pointer.y": y}})
callback()
# for now not handling this serially #TODO
else if eventName is "whiteboard_cleared_message"
whiteboardId = message.payload.whiteboard_id
Meteor.WhiteboardCleanStatus.update({meetingId: meetingId}, {$set: {'in_progress': true}})
removeAllShapesFromSlide meetingId, whiteboardId
callback()
# for now not handling this serially #TODO
else if eventName is "undo_whiteboard_request"
whiteboardId = message.payload.whiteboard_id
shapeId = message.payload.shape_id
removeShapeFromSlide meetingId, whiteboardId, shapeId
callback()
# for now not handling this serially #TODO
else if eventName is "presentation_page_resized_message"
slideId = message.payload.page?.id
heightRatio = message.payload.page?.height_ratio
widthRatio = message.payload.page?.width_ratio
xOffset = message.payload.page?.x_offset
yOffset = message.payload.page?.y_offset
presentationId = slideId.split("/")[0]
Meteor.Slides.update({presentationId: presentationId, "slide.current": true},
{$set:{"slide.height_ratio": heightRatio,"slide.width_ratio": widthRatio,"slide.x_offset":xOffset,"slide.y_offset":yOffset}}
)
callback()
# for now not handling this serially #TODO
else if eventName is "recording_status_changed_message"
intendedForRecording = message.payload.recorded
currentlyBeingRecorded = message.payload.recording
Meteor.Meetings.update({meetingId: meetingId, intendedForRecording: intendedForRecording},
{$set: {currentlyBeingRecorded: currentlyBeingRecorded}}
)
callback()
# --------------------------------------------------
# lock settings ------------------------------------
# for now not handling this serially #TODO
else if eventName is "eject_voice_user_message"
callback()
# for now not handling this serially #TODO
else if eventName is "new_permission_settings"
oldSettings = Meteor.Meetings.findOne({meetingId:meetingId})?.roomLockSettings
newSettings = message.payload?.permissions
# if the disableMic setting was turned on
if !oldSettings?.disableMic and newSettings.disableMic
handleLockingMic(meetingId, newSettings)
# substitute with the new lock settings
Meteor.Meetings.update({meetingId: meetingId}, {$set: {
'roomLockSettings.disablePrivateChat': newSettings.disablePrivateChat
'roomLockSettings.disableCam': newSettings.disableCam
'roomLockSettings.disableMic': newSettings.disableMic
'roomLockSettings.lockOnJoin': newSettings.lockOnJoin
'roomLockSettings.lockedLayout': newSettings.lockedLayout
'roomLockSettings.disablePublicChat': newSettings.disablePublicChat
'roomLockSettings.lockOnJoinConfigurable': newSettings.lockOnJoinConfigurable #TODO
}})
callback()
# for now not handling this serially #TODO
else if eventName is "poll_started_message"
if message.payload.meeting_id? and message.payload.requester_id? and message.payload.poll?
if Meteor.Meetings.findOne({meetingId: message.payload.meeting_id})?
#initializing the list of current users
users = Meteor.Users.find({meetingId: message.payload.meeting_id},
{fields:{"user.userid": 1, _id: 0}} ).fetch()
addPollToCollection message.payload.poll, message.payload.requester_id,
users, message.payload.meeting_id
callback()
# for now not handling this serially #TODO
else if eventName is "poll_stopped_message"
meetingId = message.payload.meeting_id
poll_id = message.payload.poll_id
clearPollCollection meetingId, poll_id
callback()
# for now not handling this serially #TODO
else if eventName is "user_voted_poll_message"
if message.payload?.poll? and message.payload.meeting_id? and message.payload.presenter_id?
pollObj = message.payload.poll
meetingId = message.payload.meeting_id
requesterId = message.payload.presenter_id
updatePollCollection pollObj, meetingId, requesterId
callback()
# for now not handling this serially #TODO
else if eventName is "poll_show_result_message"
if message.payload.poll.id? and message.payload.meeting_id?
poll_id = message.payload.poll.id
meetingId = message.payload.meeting_id
clearPollCollection meetingId, poll_id
callback()
else # keep moving in the queue
unless eventName in notLoggedEventTypes
Meteor.log.info "WARNING!!!\n
THE JSON MESSAGE WAS NOT OF TYPE SUPPORTED BY THIS APPLICATION\n
#{eventName} {JSON.stringify message}"
callback()