diff --git a/bigbluebutton-html5/app/.meteor/packages b/bigbluebutton-html5/app/.meteor/packages index 77cc55261f..829fe7c205 100644 --- a/bigbluebutton-html5/app/.meteor/packages +++ b/bigbluebutton-html5/app/.meteor/packages @@ -28,3 +28,6 @@ maibaum:foundation-icons chriswessels:hammer@3.1.1 fastclick standard-minifiers +cfs:power-queue +cfs:reactive-list +cfs:micro-queue diff --git a/bigbluebutton-html5/app/.meteor/versions b/bigbluebutton-html5/app/.meteor/versions index e9380f0073..c96a7610d3 100644 --- a/bigbluebutton-html5/app/.meteor/versions +++ b/bigbluebutton-html5/app/.meteor/versions @@ -16,6 +16,10 @@ caching-compiler@1.0.0 caching-html-compiler@1.0.2 callback-hook@1.0.4 cfs:http-methods@0.0.30 +cfs:micro-queue@0.0.6 +cfs:power-queue@0.9.11 +cfs:reactive-list@0.0.9 +cfs:reactive-property@0.0.4 check@1.0.6 chriswessels:hammer@3.1.1 clinical:nightwatch@2.0.1 diff --git a/bigbluebutton-html5/app/config.coffee b/bigbluebutton-html5/app/config.coffee index bc41f22cdf..64062f71ca 100755 --- a/bigbluebutton-html5/app/config.coffee +++ b/bigbluebutton-html5/app/config.coffee @@ -20,8 +20,8 @@ config.lockOnJoin = true config.app = {} #default font sizes for mobile / desktop -config.app.mobileFont = 24 -config.app.desktopFont = 14 +config.app.mobileFont=16 +config.app.desktopFont=14 # Will offer the user to join the audio when entering the meeting config.app.autoJoinAudio = false diff --git a/bigbluebutton-html5/app/server/collection_methods/chat.coffee b/bigbluebutton-html5/app/server/collection_methods/chat.coffee index a043cd5da3..d93c70448a 100755 --- a/bigbluebutton-html5/app/server/collection_methods/chat.coffee +++ b/bigbluebutton-html5/app/server/collection_methods/chat.coffee @@ -68,8 +68,10 @@ Meteor.methods from_username: messageObject.from_username from_lang: messageObject.from_lang }, (err, numChanged) -> + if err? + Meteor.log.error "_error #{err} when adding chat to collection" if numChanged.insertedId? - Meteor.log.error "added chat id=[#{numChanged.insertedId}] + Meteor.log.info "_added chat id=[#{numChanged.insertedId}] #{messageObject.from_username} to #{'PUBLIC' if messageObject.to_username?}:#{messageObject.message}") # called on server start and meeting end diff --git a/bigbluebutton-html5/app/server/collection_methods/meetings.coffee b/bigbluebutton-html5/app/server/collection_methods/meetings.coffee index 248891567e..b8f5ad8ca4 100755 --- a/bigbluebutton-html5/app/server/collection_methods/meetings.coffee +++ b/bigbluebutton-html5/app/server/collection_methods/meetings.coffee @@ -1,39 +1,48 @@ # -------------------------------------------------------------------------------------------- # Private methods on server # -------------------------------------------------------------------------------------------- -@addMeetingToCollection = (meetingId, name, intendedForRecording, voiceConf, duration) -> - #check if the meeting is already in the collection - obj = Meteor.Meetings.upsert({meetingId:meetingId}, {$set: { - meetingName:name - intendedForRecording: intendedForRecording - currentlyBeingRecorded: false # defaut value - voiceConf: voiceConf - duration: duration - roomLockSettings: - # by default the lock settings will be disabled on meeting create - disablePrivateChat: false - disableCam: false - disableMic: false - lockOnJoin: Meteor.config.lockOnJoin - lockedLayout: false - disablePublicChat: false - lockOnJoinConfigurable: false # TODO - }}, (err, numChanged) -> - if numChanged.insertedId? - Meteor.log.error "added MEETING #{meetingId}") +@addMeetingToCollection = (meetingId, name, intendedForRecording, voiceConf, duration, callback) -> + #check if the meeting is already in the collection + + obj = Meteor.Meetings.upsert({meetingId:meetingId}, {$set: { + meetingName:name + intendedForRecording: intendedForRecording + currentlyBeingRecorded: false # default value + voiceConf: voiceConf + duration: duration + roomLockSettings: + # by default the lock settings will be disabled on meeting create + disablePrivateChat: false + disableCam: false + disableMic: false + lockOnJoin: Meteor.config.lockOnJoin + lockedLayout: false + disablePublicChat: false + lockOnJoinConfigurable: false # TODO + }}, (err, numChanged) => + if numChanged.insertedId? + funct = (cbk) -> + Meteor.log.info "added MEETING #{meetingId}" + cbk() + funct(callback) + else + Meteor.log.error "nothing happened" + callback() + ) @clearMeetingsCollection = (meetingId) -> if meetingId? - Meteor.Meetings.remove({meetingId: meetingId}, Meteor.log.info "cleared Meetings Collection (meetingId: #{meetingId}!") + Meteor.Meetings.remove({meetingId: meetingId}, + Meteor.log.info "cleared Meetings Collection (meetingId: #{meetingId}!") else Meteor.Meetings.remove({}, Meteor.log.info "cleared Meetings Collection (all meetings)!") #clean up upon a meeting's end -@removeMeetingFromCollection = (meetingId) -> +@removeMeetingFromCollection = (meetingId, callback) -> if Meteor.Meetings.findOne({meetingId: meetingId})? Meteor.log.info "end of meeting #{meetingId}. Clear the meeting data from all collections" # delete all users in the meeting @@ -53,6 +62,16 @@ # delete the meeting clearMeetingsCollection(meetingId) + + callback() + else + funct (localCallback) -> + Meteor.log.error ("Error! There was no such meeting #{meetingId}") + localCallback() + funct(callback) + + + # -------------------------------------------------------------------------------------------- # end Private methods on server # -------------------------------------------------------------------------------------------- diff --git a/bigbluebutton-html5/app/server/collection_methods/users.coffee b/bigbluebutton-html5/app/server/collection_methods/users.coffee index 5ed34302ef..7a2afc45b9 100755 --- a/bigbluebutton-html5/app/server/collection_methods/users.coffee +++ b/bigbluebutton-html5/app/server/collection_methods/users.coffee @@ -139,7 +139,8 @@ Meteor.methods # Received information from BBB-Apps that a user left # Need to update the collection # params: meetingid, userid as defined in BBB-Apps -@markUserOffline = (meetingId, userId) -> +# callback +@markUserOffline = (meetingId, userId, callback) -> # mark the user as offline. remove from the collection on meeting_end #TODO user = Meteor.Users.findOne({meetingId: meetingId, userId: userId}) if user?.clientType is "HTML5" @@ -153,16 +154,30 @@ Meteor.methods 'user.listenOnly': false }}, (err, numChanged) -> if err? - Meteor.log.error "_unsucc update (mark as offline) of user #{user?.user.name} #{userId} err=#{JSON.stringify err}" - if numChanged? - Meteor.log.info "_marking as offline html5 user #{user?.user.name} #{userId} numChanged=#{numChanged}" + Meteor.log.error "_unsucc update (mark as offline) of user #{user?.user.name} #{userId} + err=#{JSON.stringify err}" + callback() + else + funct = (cbk) -> + Meteor.log.info "_marking as offline html5 user #{user?.user.name} + #{userId} numChanged=#{numChanged}" + cbk() + + funct(callback) ) else Meteor.Users.remove({meetingId: meetingId, userId: userId}, (err, numDeletions) -> if err? - Meteor.log.error "_unsucc deletion of user #{user?.user.name} #{userId} err=#{JSON.stringify err}" - if numDeletions? - Meteor.log.info "_deleting info for user #{user?.user.name} #{userId} numDeletions=#{numDeletions}" + Meteor.log.error "_unsucc deletion of user #{user?.user.name} #{userId} + err=#{JSON.stringify err}" + callback() + else + funct = (cbk) -> + Meteor.log.info "_deleting info for user #{user?.user.name} #{userId} + numDeletions=#{numDeletions}" + cbk() + + funct(callback) ) @@ -204,28 +219,66 @@ Meteor.methods #update a voiceUser - a helper method -@updateVoiceUser = (meetingId, voiceUserObject) -> +@updateVoiceUser = (meetingId, voiceUserObject, callback) -> u = Meteor.Users.findOne userId: voiceUserObject.web_userid if u? if voiceUserObject.talking? - Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.talking':voiceUserObject.talking}}) # talking + Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, + {$set: {'user.voiceUser.talking':voiceUserObject.talking}}, + (err, numChanged) -> + if err? + Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid} + [talking] err=#{JSON.stringify err}" + callback() + ) # talking if voiceUserObject.joined? - Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.joined':voiceUserObject.joined}}) # joined + Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, + {$set: {'user.voiceUser.joined':voiceUserObject.joined}}, + (err, numChanged) -> + if err? + Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid} + [joined] err=#{JSON.stringify err}" + callback() + ) # joined if voiceUserObject.locked? - Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.locked':voiceUserObject.locked}}) # locked + Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, + {$set: {'user.voiceUser.locked':voiceUserObject.locked}}, + (err, numChanged) -> + if err? + Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid} + [locked] err=#{JSON.stringify err}" + callback() + ) # locked if voiceUserObject.muted? - Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.voiceUser.muted':voiceUserObject.muted}}) # muted + Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, + {$set: {'user.voiceUser.muted':voiceUserObject.muted}}, + (err, numChanged) -> + if err? + Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid} + [muted] err=#{JSON.stringify err}" + callback() + ) # muted if voiceUserObject.listen_only? - Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, {$set: {'user.listenOnly':voiceUserObject.listen_only}}) # listenOnly + Meteor.Users.update({meetingId: meetingId ,userId: voiceUserObject.web_userid}, + {$set: {'user.listenOnly':voiceUserObject.listen_only}}, + (err, numChanged) -> + if err? + Meteor.log.error "_unsucc update of voiceUser #{voiceUserObject.web_userid} + [listenOnly] err=#{JSON.stringify err}" + callback() + ) # listenOnly else Meteor.log.error "ERROR! did not find such voiceUser!" + callback() -@userJoined = (meetingId, user) -> +@userJoined = (meetingId, user, callback) -> userId = user.userid u = Meteor.Users.findOne({userId:user.userid, meetingId: meetingId}) - # the collection already contains an entry for this user because - # we added a dummy user on register_user_message (to save authToken) + # the collection already contains an entry for this user + # because the user is reconnecting OR + # in the case of an html5 client user we added a dummy user on + # register_user_message (to save authToken) if u? and u.authToken? Meteor.Users.update({userId:user.userid, meetingId: meetingId}, {$set:{ user: @@ -255,31 +308,48 @@ Meteor.methods locked: user.voiceUser.locked muted: user.voiceUser.muted webcam_stream: user.webcam_stream - }}, (err, numChanged) -> - Meteor.log.info "_(case1) UPDATING USER #{user.userid}, authToken=#{u.authToken}, - locked=#{user.locked}, username=#{user.name}" + }}, (err) -> + if err? + Meteor.log.error "_error #{err} when trying to insert user #{userId}" + callback() + else + funct = (cbk) -> + Meteor.log.info "_(case1) UPDATING USER #{user.userid}, authToken= + #{u.authToken}, locked=#{user.locked}, username=#{user.name}" + cbk() + + funct(callback) ) - # only add the welcome message if it's not there already - unless Meteor.Chat.findOne({"message.chat_type":'SYSTEM_MESSAGE', "message.to_userid": userId})? - welcomeMessage = Meteor.config.defaultWelcomeMessage - .replace /%%CONFNAME%%/, Meteor.Meetings.findOne({meetingId: meetingId})?.meetingName - welcomeMessage = welcomeMessage + Meteor.config.defaultWelcomeMessageFooter - - # store the welcome message in chat for easy display on the client side - Meteor.Chat.insert({ - meetingId: meetingId - message: - chat_type: "SYSTEM_MESSAGE" - message: welcomeMessage - from_color: '0x3399FF' - to_userid: userId - from_userid: "SYSTEM_MESSAGE" - from_username: "" - from_time: user.timeOfJoining?.toString() - }, (err) -> - Meteor.log.info "_added a system message in chat for user #{userId}" - ) + welcomeMessage = Meteor.config.defaultWelcomeMessage + .replace /%%CONFNAME%%/, Meteor.Meetings.findOne({meetingId: meetingId})?.meetingName + welcomeMessage = welcomeMessage + Meteor.config.defaultWelcomeMessageFooter + # add the welcome message if it's not there already OR update time_of_joining + Meteor.Chat.upsert({ + meetingId: meetingId + userId: userId + 'message.chat_type': 'SYSTEM_MESSAGE' + 'message.to_userid': userId + }, { + meetingId: meetingId + userId: userId + message: + chat_type: 'SYSTEM_MESSAGE' + message: welcomeMessage + from_color: '0x3399FF' + to_userid: userId + from_userid: 'SYSTEM_MESSAGE' + from_username: '' + from_time: user.timeOfJoining?.toString() + }, (err) -> + if err? + Meteor.log.error "_error #{err} when trying to insert welcome message for #{userId}" + else + Meteor.log.info "_added/updated a system message in chat for user #{userId}" + # note that we already called callback() when updating the user. Adding + # the welcome message in the chat is not as vital and we can afford to + # complete it when possible, without blocking the serial event messages processing + ) else # Meteor.log.info "NOTE: got user_joined_message #{user.name} #{user.userid}" @@ -314,14 +384,21 @@ Meteor.methods webcam_stream: user.webcam_stream }, (err, numChanged) -> if numChanged.insertedId? - Meteor.log.info "_joining user (case2) userid=[#{userId}]:#{user.name}. - Users.size is now #{Meteor.Users.find({meetingId: meetingId}).count()}") + funct = (cbk) -> + Meteor.log.info "_joining user (case2) userid=[#{userId}]:#{user.name}. + Users.size is now #{Meteor.Users.find({meetingId: meetingId}).count()}" + cbk() + + funct(callback) + else + callback() + ) @createDummyUser = (meetingId, userId, authToken) -> if Meteor.Users.findOne({userId:userId, meetingId: meetingId, authToken:authToken})? - Meteor.log.info "html5 user userid:[#{userId}] from [#{meetingId}] tried to revalidate token" + Meteor.log.info "html5 user userId:[#{userId}] from [#{meetingId}] tried to revalidate token" else Meteor.Users.insert({ meetingId: meetingId @@ -330,7 +407,7 @@ Meteor.methods clientType: "HTML5" validated: false #will be validated on validate_auth_token_reply }, (err, id) -> - Meteor.log.info "_added a dummy html5 user with: userid=[#{userId}], id=[#{id}] + Meteor.log.info "_added a dummy html5 user with: userId=[#{userId}] Users.size is now #{Meteor.Users.find({meetingId: meetingId}).count()}" ) diff --git a/bigbluebutton-html5/app/server/redispubsub.coffee b/bigbluebutton-html5/app/server/redispubsub.coffee index 3565f53e75..a2f995afbb 100755 --- a/bigbluebutton-html5/app/server/redispubsub.coffee +++ b/bigbluebutton-html5/app/server/redispubsub.coffee @@ -1,5 +1,4 @@ Meteor.methods - # Construct and send a message to bbb-web to validate the user validateAuthToken: (meetingId, userId, authToken) -> Meteor.log.info "sending a validate_auth_token with", @@ -23,6 +22,7 @@ Meteor.methods else Meteor.log.info "did not have enough information to send a validate_auth_token message" + class Meteor.RedisPubSub constructor: (callback) -> Meteor.log.info "constructor RedisPubSub" @@ -33,9 +33,10 @@ class Meteor.RedisPubSub Meteor.log.info("Subscribing message on channel: #{Meteor.config.redis.channels.fromBBBApps}") @subClient.on "psubscribe", Meteor.bindEnvironment(@_onSubscribe) - @subClient.on "pmessage", Meteor.bindEnvironment(@_onMessage) + @subClient.on "pmessage", Meteor.bindEnvironment(@_Q) @subClient.psubscribe(Meteor.config.redis.channels.fromBBBApps) + callback @ _onSubscribe: (channel, count) => @@ -48,331 +49,31 @@ class Meteor.RedisPubSub "payload": {} # I need this, otherwise bbb-apps won't recognize the message publish Meteor.config.redis.channels.toBBBApps.meeting, message + + _Q: (pattern, channel, jsonMsg) => + message = JSON.parse(jsonMsg) + eventName = message.header.name + console.log "Q #{eventName} #{Meteor.myQueue.total()}" + + + Meteor.myQueue.add({ + pattern: pattern + channel: channel + jsonMsg: jsonMsg + }) + _onMessage: (pattern, channel, jsonMsg) => # TODO: this has to be in a try/catch block, otherwise the server will # crash if the message has a bad format + console.log "_onMessage" message = JSON.parse(jsonMsg) - correlationId = message.payload?.reply_to or message.header?.reply_to - meetingId = message.payload?.meeting_id - - # just because it's common. we handle it anyway - notLoggedEventTypes = [ - "keep_alive_reply" - "page_resized_message" - "presentation_page_resized_message" - "presentation_cursor_updated_message" - "get_presentation_info_reply" - "get_users_reply" - "get_chat_history_reply" - "get_all_meetings_reply" - "presentation_shared_message" - "presentation_conversion_done_message" - "presentation_conversion_progress_message" - "presentation_page_generated_message" - # "presentation_page_changed_message" - "BbbPubSubPongMessage" - "bbb_apps_is_alive_message" - "user_voice_talking_message" - "meeting_state_message" - "get_recording_status_reply" - ] if message?.header? and message?.payload? unless message.header.name in notLoggedEventTypes Meteor.log.info "redis incoming message #{message.header.name} ", message: jsonMsg - # handle voice events - if message.header.name in ['user_left_voice_message', 'user_joined_voice_message', 'user_voice_talking_message', 'user_voice_muted_message'] - if message.payload.user? - updateVoiceUser meetingId, - 'web_userid': message.payload.user.voiceUser.web_userid - 'listen_only': message.payload.listen_only - 'talking': message.payload.user.voiceUser.talking - 'joined': message.payload.user.voiceUser.joined - 'locked': message.payload.user.voiceUser.locked - 'muted': message.payload.user.voiceUser.muted - return - - # listen only - if message.header.name is 'user_listening_only' - updateVoiceUser meetingId, {'web_userid': message.payload.userid, 'listen_only': message.payload.listen_only} - # most likely we don't need to ensure that the user's voiceUser's {talking, joined, muted, locked} are false by default #TODO? - return - - if message.header.name is "get_all_meetings_reply" - Meteor.log.info "Let's store some data for the running meetings so that when an HTML5 client joins everything is ready!" - listOfMeetings = message.payload.meetings - for meeting in listOfMeetings - # we currently do not have voice_conf or duration in this message. - addMeetingToCollection meeting.meetingID, meeting.meetingName, meeting.recorded, meeting.voiceBridge, meeting.duration - return - - if message.header.name is "get_users_reply" and message.payload.requester_id is "nodeJSapp" - unless Meteor.Meetings.findOne({MeetingId: message.payload.meeting_id})? - users = message.payload.users - for user in users - user.timeOfJoining = message.header.current_time # TODO this might need to be removed - if user.emoji_status isnt 'none' and typeof user.emoji_status is 'string' - user.set_emoji_time = new Date() - userJoined meetingId, user - return - - if message.header.name is "validate_auth_token_reply" - user = Meteor.Users.findOne({userId:message.payload.userid, meetingId: message.payload.meeting_id}) - validStatus = message.payload.valid - - # if the user already exists in the db - if user?.clientType is "HTML5" - #if the html5 client user was validated successfully, add a flag - Meteor.Users.update({userId:message.payload.userid, meetingId:message.payload.meeting_id}, {$set:{validated: validStatus}}) - Meteor.log.info "user.validated for user #{user.userId} in meeting #{user.meetingId} just - became #{Meteor.Users.findOne({userId:message.payload.userid, meetingId: message.payload.meeting_id})?.validated}" - else - Meteor.log.info "a non-html5 user got validate_auth_token_reply." - return - - if message.header.name is "user_registered_message" - #createDummyUser message.payload.meeting_id, message.payload.user - return - - if message.header.name is "user_joined_message" - userObj = message.payload.user - dbUser = Meteor.Users.findOne({userId: message.payload.user.userid, meetingId: message.payload.meeting_id}) - - # On attempting reconnection of Flash clients (in voiceBridge) we receive an extra user_joined_message - # Ignore it as it will add an extra user in the userlist, creating discrepancy with the list in the Flash client - if dbUser?.user?.connection_status is "offline" and message.payload.user?.phone_user - Meteor.log.error "offline AND phone user" - return # without joining the user - else - if dbUser?.clientType is "HTML5" # typically html5 users will be in the db [as a dummy user] before the joining message - status = dbUser?.validated - Meteor.log.info "in user_joined_message the validStatus of the user was #{status}" - userJoined meetingId, userObj - return - - if message.header.name is "user_left_message" - userId = message.payload.user?.userid - if userId? and meetingId? - markUserOffline meetingId, userId - return - - if message.header.name is "disconnect_user_message" - Meteor.log.info "a user(#{message.payload.userid}) was kicked out from meeting(#{message.payload.meeting_id})" - return - - if message.header.name is "get_chat_history_reply" and message.payload.requester_id is "nodeJSapp" - unless Meteor.Meetings.findOne({MeetingId: message.payload.meeting_id})? - for chatMessage in message.payload.chat_history - addChatToCollection meetingId, chatMessage - return - - if message.header.name is "send_public_chat_message" or message.header.name is "send_private_chat_message" - messageObject = message.payload.message - # use current_time instead of message.from_time so that the chats from Flash and HTML5 have uniform times - messageObject.from_time = message.header.current_time - addChatToCollection meetingId, messageObject - return - - if message.header.name is "meeting_created_message" - meetingName = message.payload.name - intendedForRecording = message.payload.recorded - voiceConf = message.payload.voice_conf - duration = message.payload.duration - addMeetingToCollection meetingId, meetingName, intendedForRecording, voiceConf, duration - return - - if message.header.name is "presentation_shared_message" - presentationId = message.payload.presentation?.id - # change the currently displayed presentation to presentation.current = false - Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId},{$set: {"presentation.current": false}}) - - #update(if already present) entirely the presentation with the fresh data - removePresentationFromCollection meetingId, presentationId - addPresentationToCollection meetingId, message.payload.presentation - - for slide in message.payload.presentation?.pages - addSlideToCollection meetingId, message.payload.presentation?.id, slide - if slide.current - displayThisSlide meetingId, slide.id, slide - return - - if message.header.name is "get_presentation_info_reply" and message.payload.requester_id is "nodeJSapp" - for presentation in message.payload.presentations - addPresentationToCollection meetingId, presentation - - for page in presentation.pages - #add the slide to the collection - addSlideToCollection meetingId, presentation.id, page - - #request for shapes - whiteboardId = "#{presentation.id}/#{page.num}" # d2d9a672040fbde2a47a10bf6c37b6a4b5ae187f-1404411622872/1 - #Meteor.log.info "the whiteboard_id here is:" + whiteboardId - - message = - "payload": - "meeting_id": meetingId - "requester_id": "nodeJSapp" - "whiteboard_id": whiteboardId - "header": - "timestamp": new Date().getTime() - "name": "get_whiteboard_shapes_request" - "version": "0.0.1" - - if whiteboardId? and meetingId? - publish Meteor.config.redis.channels.toBBBApps.whiteboard, message #TODO - else - Meteor.log.info "did not have enough information to send a user_leaving_request" #TODO - return - - if message.header.name is "presentation_page_changed_message" - newSlide = message.payload.page - displayThisSlide meetingId, newSlide?.id, newSlide - return - - if message.header.name is "get_whiteboard_shapes_reply" and message.payload.requester_id is "nodeJSapp" - - # Create a whiteboard clean status or find one for the current meeting - if not Meteor.WhiteboardCleanStatus.findOne({meetingId: meetingId})? - Meteor.WhiteboardCleanStatus.insert({meetingId: meetingId, in_progress: false}) - - for shape in message.payload.shapes - whiteboardId = shape.wb_id - addShapeToCollection meetingId, whiteboardId, shape - return - - if message.header.name is "send_whiteboard_shape_message" - - #Meteor stringifies an array of JSONs (...shape.result) in this message - #parsing the String and reassigning the value - if message.payload.shape.shape_type is "poll_result" and typeof message.payload.shape.shape.result is 'string' - message.payload.shape.shape.result = JSON.parse message.payload.shape.shape.result - - shape = message.payload.shape - whiteboardId = shape?.wb_id - addShapeToCollection meetingId, whiteboardId, shape - return - - if message.header.name is "presentation_cursor_updated_message" - x = message.payload.x_percent - y = message.payload.y_percent - Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId},{$set: {"pointer.x": x, "pointer.y": y}}) - return - - if message.header.name is "whiteboard_cleared_message" - whiteboardId = message.payload.whiteboard_id - Meteor.WhiteboardCleanStatus.update({meetingId: meetingId}, {$set: {'in_progress': true}}) - removeAllShapesFromSlide meetingId, whiteboardId - return - - if message.header.name is "undo_whiteboard_request" - whiteboardId = message.payload.whiteboard_id - shapeId = message.payload.shape_id - removeShapeFromSlide meetingId, whiteboardId, shapeId - return - - if message.header.name is "presenter_assigned_message" - newPresenterId = message.payload.new_presenter_id - if newPresenterId? - # reset the previous presenter - Meteor.Users.update({"user.presenter": true, meetingId: meetingId},{$set: {"user.presenter": false}}) - # set the new presenter - Meteor.Users.update({"user.userid": newPresenterId, meetingId: meetingId},{$set: {"user.presenter": true}}) - return - - if message.header.name is "presentation_page_resized_message" - slideId = message.payload.page?.id - heightRatio = message.payload.page?.height_ratio - widthRatio = message.payload.page?.width_ratio - xOffset = message.payload.page?.x_offset - yOffset = message.payload.page?.y_offset - presentationId = slideId.split("/")[0] - Meteor.Slides.update({presentationId: presentationId, "slide.current": true}, {$set: {"slide.height_ratio": heightRatio, "slide.width_ratio": widthRatio, "slide.x_offset": xOffset, "slide.y_offset": yOffset}}) - return - - if message.header.name is "user_emoji_status_message" - userId = message.payload.userid - meetingId = message.payload.meeting_id - emojiStatus = message.payload.emoji_status - if userId? and meetingId? - set_emoji_time = new Date() - Meteor.Users.update({"user.userid": userId},{$set: {"user.set_emoji_time": set_emoji_time, "user.emoji_status": emojiStatus}}) - return - - if message.header.name is "recording_status_changed_message" - intendedForRecording = message.payload.recorded - currentlyBeingRecorded = message.payload.recording - Meteor.Meetings.update({meetingId: meetingId, intendedForRecording: intendedForRecording}, {$set: {currentlyBeingRecorded: currentlyBeingRecorded}}) - return - - # -------------------------------------------------- - # lock settings ------------------------------------ - if message.header.name is "eject_voice_user_message" - return - - if message.header.name is "new_permission_settings" - oldSettings = Meteor.Meetings.findOne({meetingId:meetingId})?.roomLockSettings - newSettings = message.payload?.permissions - - # if the disableMic setting was turned on - if !oldSettings?.disableMic and newSettings.disableMic - handleLockingMic(meetingId, newSettings) - - # substitute with the new lock settings - Meteor.Meetings.update({meetingId: meetingId}, {$set: { - 'roomLockSettings.disablePrivateChat': newSettings.disablePrivateChat - 'roomLockSettings.disableCam': newSettings.disableCam - 'roomLockSettings.disableMic': newSettings.disableMic - 'roomLockSettings.lockOnJoin': newSettings.lockOnJoin - 'roomLockSettings.lockedLayout': newSettings.lockedLayout - 'roomLockSettings.disablePublicChat': newSettings.disablePublicChat - 'roomLockSettings.lockOnJoinConfigurable': newSettings.lockOnJoinConfigurable #TODO - }}) - return - - if message.header.name is "user_locked_message" or message.header.name is "user_unlocked_message" - userId = message.payload.userid - isLocked = message.payload.locked - setUserLockedStatus(meetingId, userId, isLocked) - return - - if message.header.name in ["meeting_ended_message", "meeting_destroyed_event", - "end_and_kick_all_message", "disconnect_all_users_message"] - if Meteor.Meetings.findOne({meetingId: meetingId})? - Meteor.log.info "there are #{Meteor.Users.find({meetingId: meetingId}).count()} users in the meeting" - for user in Meteor.Users.find({meetingId: meetingId}).fetch() - markUserOffline meetingId, user.userId - #TODO should we clear the chat messages for that meeting?! - unless message.header.name is "disconnect_all_users_message" - removeMeetingFromCollection meetingId - return - - if message.header.name is "poll_started_message" - if message.payload.meeting_id? and message.payload.requester_id? and message.payload.poll? - if Meteor.Meetings.findOne({meetingId: message.payload.meeting_id})? - #initializing the list of current users - users = Meteor.Users.find({meetingId: message.payload.meeting_id}, {fields:{"user.userid": 1, _id: 0}} ).fetch() - addPollToCollection message.payload.poll, message.payload.requester_id, users, message.payload.meeting_id - - if message.header.name is "poll_stopped_message" - meetingId = message.payload.meeting_id - poll_id = message.payload.poll_id - clearPollCollection meetingId, poll_id - - if message.header.name is "user_voted_poll_message" - if message.payload?.poll? and message.payload.meeting_id? and message.payload.presenter_id? - pollObj = message.payload.poll - meetingId = message.payload.meeting_id - requesterId = message.payload.presenter_id - updatePollCollection pollObj, meetingId, requesterId - - if message.header.name is "poll_show_result_message" - if message.payload.poll.id? and message.payload.meeting_id? - poll_id = message.payload.poll.id - meetingId = message.payload.meeting_id - clearPollCollection meetingId, poll_id # -------------------------------------------------------------------------------------------- # Private methods on server diff --git a/bigbluebutton-html5/app/server/server.coffee b/bigbluebutton-html5/app/server/server.coffee index 908851c98d..74d90ec536 100755 --- a/bigbluebutton-html5/app/server/server.coffee +++ b/bigbluebutton-html5/app/server/server.coffee @@ -14,3 +14,480 @@ Meteor.startup -> # create create a PubSub connection, start listening Meteor.redisPubSub = new Meteor.RedisPubSub(-> Meteor.log.info "created pubsub") + + + Meteor.myQueue = new PowerQueue({ + # autoStart:true + # isPaused: true + }) + Meteor.myQueue.taskHandler = (data, next, failures) -> + eventName = JSON.parse(data.jsonMsg)?.header.name + if failures > 0 + Meteor.log.error "got a failure on taskHandler #{eventName} #{failures}" + else + handleRedisMessage(data, ()-> + Meteor.log.error "in callback after handleRedisMessage #{eventName}" + next() + ) + + # To ensure that we process the redis json event messages serially we use a + # callback. This callback is to be called when the Meteor collection is + # updated with the information coming in the payload of the json message. The + # callback signalizes to the queue that we are done processing the current + # message in the queue and are ready to move on to the next one. If we do not + # use the callback mechanism we may encounter a race condition situation + # due to not following the order of events coming through the redis pubsub. + # for example: a user_left event reaching the collection before a user_joined + # for the same user. + @handleRedisMessage = (data, callback) -> + message = JSON.parse(data.jsonMsg) + # correlationId = message.payload?.reply_to or message.header?.reply_to + meetingId = message.payload?.meeting_id + + # Avoid cluttering the log with json messages carrying little or repetitive + # information. Comment out a message type in the array to be able to see it + # in the log upon restarting of the Meteor process. + notLoggedEventTypes = [ + "keep_alive_reply" + "page_resized_message" + "presentation_page_resized_message" + "presentation_cursor_updated_message" + "get_presentation_info_reply" + "get_users_reply" + "get_chat_history_reply" + "get_all_meetings_reply" + "get_whiteboard_shapes_reply" + "presentation_shared_message" + "presentation_conversion_done_message" + "presentation_conversion_progress_message" + "presentation_page_generated_message" + # "presentation_page_changed_message" + "BbbPubSubPongMessage" + "bbb_apps_is_alive_message" + "user_voice_talking_message" + "meeting_state_message" + "get_recording_status_reply" + ] + + # TODO check if message + eventName = message.header.name + meetingId = message.payload?.meeting_id + # we currently disregard the pattern and channel + # Meteor.log.info "in handleRedisMessage #{eventName}" + if message?.header? and message?.payload? + if eventName is 'meeting_created_message' + # Meteor.log.error JSON.stringify message + meetingName = message.payload.name + intendedForRecording = message.payload.recorded + voiceConf = message.payload.voice_conf + duration = message.payload.duration + addMeetingToCollection meetingId, meetingName, intendedForRecording, + voiceConf, duration, callback + + # handle voice events + else if message.payload.user? and eventName in [ + 'user_left_voice_message' + 'user_joined_voice_message' + 'user_voice_talking_message' + 'user_voice_muted_message'] + + voiceUserObj = { + 'web_userid': message.payload.user.voiceUser.web_userid + 'listen_only': message.payload.listen_only + 'talking': message.payload.user.voiceUser.talking + 'joined': message.payload.user.voiceUser.joined + 'locked': message.payload.user.voiceUser.locked + 'muted': message.payload.user.voiceUser.muted + } + updateVoiceUser meetingId, voiceUserObj, callback + + else if eventName is 'user_listening_only' + voiceUserObj = { + 'web_userid': message.payload.userid + 'listen_only': message.payload.listen_only + } + updateVoiceUser meetingId, voiceUserObj, callback + + else if eventName is 'get_all_meetings_reply' + Meteor.log.info "Let's store some data for the running meetings + so that when an HTML5 client joins everything is ready!" + Meteor.log.info JSON.stringify(message) + listOfMeetings = message.payload.meetings + + # Processing the meetings recursively with a callback to notify us, + # ensuring that we update the meeting collection serially + processMeeting = () -> + meeting = listOfMeetings.pop() + if meeting? + addMeetingToCollection meeting.meetingID, meeting.meetingName, + meeting.recorded, meeting.voiceBridge, meeting.duration, processMeeting + else + callback() # all meeting arrays (if any) have been processed + + processMeeting() + + else if eventName is 'user_joined_message' + Meteor.log.error "\n\n user_joined_message \n\n" + JSON.stringify message + userObj = message.payload.user + dbUser = Meteor.Users.findOne({userId: userObj.userid, meetingId: message.payload.meeting_id}) + + # On attempting reconnection of Flash clients (in voiceBridge) we receive + # an extra user_joined_message. Ignore it as it will add an extra user + # in the user list, creating discrepancy with the list in the Flash client + if dbUser?.user?.connection_status is "offline" and message.payload.user?.phone_user + Meteor.log.error "offline AND phone user" + callback() #return without joining the user + else + if dbUser?.clientType is "HTML5" # typically html5 users will be in + # the db [as a dummy user] before the joining message + status = dbUser?.validated + Meteor.log.info "in user_joined_message the validStatus + of the user was #{status}" + userObj.timeOfJoining = message.header.current_time + userJoined meetingId, userObj, callback + else + userJoined meetingId, userObj, callback + + + + # only process if requester is nodeJSapp means only process in the case when + # we explicitly request the users + else if eventName is 'get_users_reply' and message.payload.requester_id is 'nodeJSapp' + + Meteor.log.error JSON.stringify(message) + if !Meteor.Meetings.findOne({meetingId: meetingId})? #TODO consider removing this cond + users = message.payload.users + + #TODO make the serialization be split per meeting. This will allow us to + # use N threads vs 1 and we'll take advantage of Mongo's concurrency tricks + + # Processing the users recursively with a callback to notify us, + # ensuring that we update the users collection serially + processUser = () -> + console.log "1" + user = users.pop() + if user? + console.log "2" + user.timeOfJoining = message.header.current_time + if user.emoji_status isnt 'none' and typeof user.emoji_status is 'string' + console.log "3" + user.set_emoji_time = new Date() + userJoined meetingId, user, processUser + else + console.error("this is not supposed to happen") + userJoined meetingId, user, processUser + else + console.log "4" + callback() # all meeting arrays (if any) have been processed + + processUser() + else + callback() #TODO test if we get here + + + else if eventName is 'validate_auth_token_reply' + userId = message.payload.userid + user = Meteor.Users.findOne({userId:userId, meetingId: meetingId}) + validStatus = message.payload.valid + + # if the user already exists in the db + if user?.clientType is "HTML5" + #if the html5 client user was validated successfully, add a flag + Meteor.Users.update({userId:userId, meetingId:message.payload.meeting_id}, + {$set:{validated: validStatus}}, + (err, numChanged) -> + if numChanged.insertedId? + funct = (cbk) -> + val=Meteor.Users.findOne({userId:userId, meetingId: message.payload.meeting_id})?.validated + Meteor.log.info "user.validated for user #{userId} in meeting #{user.meetingId} just became #{val}" + cbk() + + funct(callback) + else + callback() + ) + else + Meteor.log.info "a non-html5 user got validate_auth_token_reply." + callback() + + + + else if eventName is 'user_left_message' + userId = message.payload.user?.userid + if userId? and meetingId? + markUserOffline meetingId, userId, callback + else + callback() #TODO check how to get these cases out and reuse code + + + + + + + + + + # for now not handling this serially #TODO + else if eventName is 'presenter_assigned_message' + newPresenterId = message.payload.new_presenter_id + if newPresenterId? + # reset the previous presenter + Meteor.Users.update({"user.presenter": true, meetingId: meetingId}, + {$set: {"user.presenter": false}}, + (err, numUpdated) -> + Meteor.log.info(" Updating old presenter numUpdated=#{numUpdated}, + err=#{err}") + ) + # set the new presenter + Meteor.Users.update({"user.userid": newPresenterId, meetingId: meetingId}, + {$set: {"user.presenter": true}}, + (err, numUpdated) -> + Meteor.log.info(" Updating new presenter numUpdated=#{numUpdated}, + err=#{err}") + ) + callback() + + # for now not handling this serially #TODO + else if eventName is 'user_emoji_status_message' + userId = message.payload.userid + meetingId = message.payload.meeting_id + emojiStatus = message.payload.emoji_status + if userId? and meetingId? + set_emoji_time = new Date() + Meteor.Users.update({"user.userid": userId}, + {$set:{"user.set_emoji_time":set_emoji_time,"user.emoji_status":emojiStatus}}, + (err, numUpdated) -> + Meteor.log.info(" Updating emoji numUpdated=#{numUpdated}, err=#{err}") + ) + callback() + + # for now not handling this serially #TODO + else if eventName in ['user_locked_message', 'user_unlocked_message'] + userId = message.payload.userid + isLocked = message.payload.locked + setUserLockedStatus(meetingId, userId, isLocked) + callback() + + # for now not handling this serially #TODO + else if eventName in ["meeting_ended_message", "meeting_destroyed_event", + "end_and_kick_all_message", "disconnect_all_users_message"] + Meteor.log.info("DESTROYING MEETING #{meetingId}") + removeMeetingFromCollection meetingId, callback + + ### + if Meteor.Meetings.findOne({meetingId: meetingId})? + count=Meteor.Users.find({meetingId: meetingId}).count() + Meteor.log.info "there are #{count} users in the meeting" + for user in Meteor.Users.find({meetingId: meetingId}).fetch() + markUserOffline meetingId, user.userId + #TODO should we clear the chat messages for that meeting?! + unless eventName is "disconnect_all_users_message" + removeMeetingFromCollection meetingId + ### + + # for now not handling this serially #TODO + else if eventName is "get_chat_history_reply" and message.payload.requester_id is "nodeJSapp" + unless Meteor.Meetings.findOne({MeetingId: message.payload.meeting_id})? + for chatMessage in message.payload.chat_history + addChatToCollection meetingId, chatMessage + callback() + + # for now not handling this serially #TODO + else if eventName is "send_public_chat_message" or eventName is "send_private_chat_message" + messageObject = message.payload.message + # use current_time instead of message.from_time so that the chats from Flash and HTML5 have uniform times + messageObject.from_time = message.header.current_time + addChatToCollection meetingId, messageObject + callback() + + # for now not handling this serially #TODO + else if eventName is "presentation_shared_message" + presentationId = message.payload.presentation?.id + # change the currently displayed presentation to presentation.current = false + Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId}, + {$set: {"presentation.current": false}}) + + #update(if already present) entirely the presentation with the fresh data + removePresentationFromCollection meetingId, presentationId + addPresentationToCollection meetingId, message.payload.presentation + + for slide in message.payload.presentation?.pages + addSlideToCollection meetingId, message.payload.presentation?.id, slide + if slide.current + displayThisSlide meetingId, slide.id, slide + callback() + + # for now not handling this serially #TODO + else if eventName is "get_presentation_info_reply" and message.payload.requester_id is "nodeJSapp" + for presentation in message.payload.presentations + addPresentationToCollection meetingId, presentation + + for page in presentation.pages + #add the slide to the collection + addSlideToCollection meetingId, presentation.id, page + + #request for shapes + whiteboardId = "#{presentation.id}/#{page.num}" # d2d9a672040fbde2a47a10bf6c37b6a4b5ae187f-1404411622872/1 + #Meteor.log.info "the whiteboard_id here is:" + whiteboardId + + message = + "payload": + "meeting_id": meetingId + "requester_id": "nodeJSapp" + "whiteboard_id": whiteboardId + "header": + "timestamp": new Date().getTime() + "name": "get_whiteboard_shapes_request" + "version": "0.0.1" + + if whiteboardId? and meetingId? + publish Meteor.config.redis.channels.toBBBApps.whiteboard, message #TODO + else + Meteor.log.info "did not have enough information to send a user_leaving_request" #TODO + callback() + + # for now not handling this serially #TODO + else if eventName is "presentation_page_changed_message" + newSlide = message.payload.page + displayThisSlide meetingId, newSlide?.id, newSlide + callback() + + # for now not handling this serially #TODO + else if eventName is "get_whiteboard_shapes_reply" and message.payload.requester_id is "nodeJSapp" + # Create a whiteboard clean status or find one for the current meeting + if not Meteor.WhiteboardCleanStatus.findOne({meetingId: meetingId})? + Meteor.WhiteboardCleanStatus.insert({meetingId: meetingId, in_progress: false}) + + for shape in message.payload.shapes + whiteboardId = shape.wb_id + addShapeToCollection meetingId, whiteboardId, shape + callback() + + # for now not handling this serially #TODO + else if eventName is "send_whiteboard_shape_message" + #Meteor stringifies an array of JSONs (...shape.result) in this message + #parsing the String and reassigning the value + if message.payload.shape.shape_type is "poll_result" and typeof message.payload.shape.shape.result is 'string' + message.payload.shape.shape.result = JSON.parse message.payload.shape.shape.result + + shape = message.payload.shape + whiteboardId = shape?.wb_id + addShapeToCollection meetingId, whiteboardId, shape + callback() + + # for now not handling this serially #TODO + else if eventName is "presentation_cursor_updated_message" + x = message.payload.x_percent + y = message.payload.y_percent + Meteor.Presentations.update({"presentation.current": true, meetingId: meetingId}, + {$set: {"pointer.x": x, "pointer.y": y}}) + callback() + + # for now not handling this serially #TODO + else if eventName is "whiteboard_cleared_message" + whiteboardId = message.payload.whiteboard_id + Meteor.WhiteboardCleanStatus.update({meetingId: meetingId}, {$set: {'in_progress': true}}) + removeAllShapesFromSlide meetingId, whiteboardId + callback() + + # for now not handling this serially #TODO + else if eventName is "undo_whiteboard_request" + whiteboardId = message.payload.whiteboard_id + shapeId = message.payload.shape_id + removeShapeFromSlide meetingId, whiteboardId, shapeId + callback() + + + # for now not handling this serially #TODO + else if eventName is "presentation_page_resized_message" + slideId = message.payload.page?.id + heightRatio = message.payload.page?.height_ratio + widthRatio = message.payload.page?.width_ratio + xOffset = message.payload.page?.x_offset + yOffset = message.payload.page?.y_offset + presentationId = slideId.split("/")[0] + Meteor.Slides.update({presentationId: presentationId, "slide.current": true}, + {$set:{"slide.height_ratio": heightRatio,"slide.width_ratio": widthRatio,"slide.x_offset":xOffset,"slide.y_offset":yOffset}} + ) + callback() + + + # for now not handling this serially #TODO + else if eventName is "recording_status_changed_message" + intendedForRecording = message.payload.recorded + currentlyBeingRecorded = message.payload.recording + Meteor.Meetings.update({meetingId: meetingId, intendedForRecording: intendedForRecording}, + {$set: {currentlyBeingRecorded: currentlyBeingRecorded}} + ) + callback() + + # -------------------------------------------------- + # lock settings ------------------------------------ + # for now not handling this serially #TODO + else if eventName is "eject_voice_user_message" + callback() + + # for now not handling this serially #TODO + else if eventName is "new_permission_settings" + oldSettings = Meteor.Meetings.findOne({meetingId:meetingId})?.roomLockSettings + newSettings = message.payload?.permissions + + # if the disableMic setting was turned on + if !oldSettings?.disableMic and newSettings.disableMic + handleLockingMic(meetingId, newSettings) + + # substitute with the new lock settings + Meteor.Meetings.update({meetingId: meetingId}, {$set: { + 'roomLockSettings.disablePrivateChat': newSettings.disablePrivateChat + 'roomLockSettings.disableCam': newSettings.disableCam + 'roomLockSettings.disableMic': newSettings.disableMic + 'roomLockSettings.lockOnJoin': newSettings.lockOnJoin + 'roomLockSettings.lockedLayout': newSettings.lockedLayout + 'roomLockSettings.disablePublicChat': newSettings.disablePublicChat + 'roomLockSettings.lockOnJoinConfigurable': newSettings.lockOnJoinConfigurable #TODO + }}) + callback() + + + # for now not handling this serially #TODO + else if eventName is "poll_started_message" + if message.payload.meeting_id? and message.payload.requester_id? and message.payload.poll? + if Meteor.Meetings.findOne({meetingId: message.payload.meeting_id})? + #initializing the list of current users + users = Meteor.Users.find({meetingId: message.payload.meeting_id}, + {fields:{"user.userid": 1, _id: 0}} ).fetch() + addPollToCollection message.payload.poll, message.payload.requester_id, + users, message.payload.meeting_id + callback() + + # for now not handling this serially #TODO + else if eventName is "poll_stopped_message" + meetingId = message.payload.meeting_id + poll_id = message.payload.poll_id + clearPollCollection meetingId, poll_id + callback() + + # for now not handling this serially #TODO + else if eventName is "user_voted_poll_message" + if message.payload?.poll? and message.payload.meeting_id? and message.payload.presenter_id? + pollObj = message.payload.poll + meetingId = message.payload.meeting_id + requesterId = message.payload.presenter_id + updatePollCollection pollObj, meetingId, requesterId + callback() + + # for now not handling this serially #TODO + else if eventName is "poll_show_result_message" + if message.payload.poll.id? and message.payload.meeting_id? + poll_id = message.payload.poll.id + meetingId = message.payload.meeting_id + clearPollCollection meetingId, poll_id + callback() + + + else # keep moving in the queue + unless eventName in notLoggedEventTypes + Meteor.log.info "WARNING!!!\n + THE JSON MESSAGE WAS NOT OF TYPE SUPPORTED BY THIS APPLICATION\n + #{eventName} {JSON.stringify message}" + callback()