339 lines
14 KiB
CoffeeScript
339 lines
14 KiB
CoffeeScript
rack = require("hat").rack()
|
|
redis = require("redis")
|
|
sanitizer = require("sanitizer")
|
|
|
|
config = require("../config")
|
|
Logger = require("./logger")
|
|
RedisKeys = require("../lib/redis_keys")
|
|
Utils = require("./utils")
|
|
messageLib = require("bigbluebutton-messages")
|
|
|
|
moduleDeps = ["RedisAction", "RedisStore", "RedisPublisher"]
|
|
|
|
# Bridge between events in Redis and the clients connected via Websocket.
|
|
module.exports = class RedisWebsocketBridge
|
|
|
|
# Creates a new RedisWebsocketBridge instance
|
|
#
|
|
# @param io [SocketIO instance]
|
|
constructor: (@io) ->
|
|
config.modules.wait moduleDeps, =>
|
|
@redisAction = config.modules.get("RedisAction")
|
|
@redisPublisher = config.modules.get("RedisPublisher")
|
|
|
|
@sub = redis.createClient()
|
|
subscriptions = ["*"]
|
|
@sub.psubscribe.apply(@sub, subscriptions)
|
|
|
|
# @todo shouldn't need redis store here, shouldn't change redis
|
|
@redisStore = config.modules.get("RedisStore")
|
|
|
|
@_redis_registerListeners()
|
|
@_socket_registerListeners()
|
|
|
|
# Registers listeners to all socket IO events that can be emitted by the clients.
|
|
# The events will always pass content to the server and the server will usually
|
|
# treat this content and generate new events on redis.
|
|
#
|
|
# @private
|
|
_socket_registerListeners: () ->
|
|
@io.sockets.on "connection", (socket) =>
|
|
socket.on "user connect", () => @_socket_onUserConnected2(socket)
|
|
socket.on "disconnect", () => @_socket_onUserDisconnected(socket)
|
|
socket.on "msg", (msg) => @_socket_onChatMessage2(socket, msg)
|
|
socket.on "logout", () => @_socket_onLogout(socket)
|
|
socket.on "allShapes", () => @_socket_onAllShapes(socket)
|
|
|
|
# Listens for messages published to redis
|
|
#
|
|
# @param pattern [string] Matched pattern on Redis PubSub
|
|
# @param channel [string] Channel the message was published on (socket room)
|
|
# @param message [string] Message published (socket message data)
|
|
# @private
|
|
_redis_registerListeners: ->
|
|
@sub.on "pmessage", (pattern, channel, message) =>
|
|
###
|
|
using the message library instead of simply doing attributes = JSON.parse messages
|
|
###
|
|
eventType = null
|
|
messageLib.getEventType(message, #identify the name/type of the message
|
|
(text)->
|
|
eventType = text
|
|
console.log "assigned eventType="+eventType
|
|
,
|
|
(err)->
|
|
console.log err
|
|
)
|
|
|
|
if eventType is null
|
|
attributes = JSON.parse message # no change to this part yet as it's related to
|
|
#...the old style messages
|
|
else
|
|
attributes = null
|
|
|
|
messageLib["#{eventType}_to_javascript_object"](message,#convert to Javascript Object
|
|
(object)->
|
|
console.log "successfully converted (to js object) " + object.header.name
|
|
attributes = object
|
|
return
|
|
,
|
|
(err)->
|
|
console.log "not successful in converting to js object "+ eventType + " " + err
|
|
)
|
|
|
|
Logger.info "message from redis on channel:#{channel}, data:#{message}"
|
|
|
|
console.log "^^^CHANNEL=#{channel}, DATA=#{message}" unless attributes?.aliveID? or attributes?.aliveId?
|
|
|
|
#TEMPORARY ------start------
|
|
unless channel is "bigbluebutton:webhook_events"
|
|
if attributes?.header?.name is "whiteboard_draw_event" or attributes?.header?.name is "user_joined_event" or attributes?.header?.name is "user_left_event"
|
|
channel = "bigbluebutton:bridge"
|
|
#TEMPORARY ------end------
|
|
|
|
if channel is "bigbluebutton:bridge"
|
|
if attributes?.header?.name is "user_joined_event"
|
|
console.log "___#{attributes?.payload?.user?.name} is joining"
|
|
@_redis_onBigbluebuttonBridge2(attributes)
|
|
|
|
else if channel is "bigbluebutton:meeting:presentation"
|
|
@_redis_onBigbluebuttonMeetingPresentation2(attributes)
|
|
|
|
else
|
|
# value of pub channel is used as the name of the SocketIO room to send to
|
|
# apply the parameters to the socket event, and emit it on the channels
|
|
#console.log "\n from within Websocket bridge:" + message + "\n"
|
|
console.log "\nThe channel was strange:" + channel
|
|
@_emitToClients2(channel, attributes)
|
|
|
|
|
|
# When received a message on the channel "bigbluebutton:bridge"
|
|
#
|
|
# @param attributes [Array] follows the format of this example:
|
|
# `["183f0bf3a0982a127bdb8161e0c44eb696b3e75c-1377871468173","mvCur",0.608540925266904,0.298932384341637]`
|
|
# The first attribute is the meetingID
|
|
# @private
|
|
_redis_onBigbluebuttonBridge2: (attributes) ->
|
|
console.log "(_redis_onBigbluebuttonBridge2): " + attributes?.header?.name #for debugging
|
|
|
|
meetingID = attributes?.payload?.meeting?.id
|
|
console.log "*meetingID: " + meetingID
|
|
|
|
emit = =>
|
|
# apply the parameters to the socket event, and emit it on the channels
|
|
#attributes.splice(0, 1) # remove the meeting id from the params
|
|
@_emitToClients2(meetingID, attributes)
|
|
|
|
# When presenter in flex side sends the 'undo' event, remove the current shape from Redis
|
|
# and publish the rest shapes to html5 users
|
|
###if attributes[1] is "undo"
|
|
@redisAction.onUndo meetingID, (err, reply) =>
|
|
@redisPublisher.publishShapes2 meetingID, null, (err) -> emit()###
|
|
|
|
# When presenter in flex side sends the 'clrPaper' event, remove everything from Redis
|
|
###else if attributes[1] is "clrPaper"
|
|
@redisAction.onClearPaper meetingID, (err, reply) => emit()###
|
|
|
|
###else
|
|
emit()###
|
|
emit()
|
|
|
|
# When received a message on the channel "bigbluebutton:meeting:presentation"
|
|
#
|
|
# @private
|
|
_redis_onBigbluebuttonMeetingPresentation2: (attributes) ->
|
|
console.log "**_redis_onBigbluebuttonMeetingPresentation2"
|
|
|
|
if attributes.messageKey is "CONVERSION_COMPLETED"
|
|
meetingID = attributes.room
|
|
@redisPublisher.publishSlides2 meetingID, null, =>
|
|
@redisPublisher.publishViewBox meetingID
|
|
|
|
|
|
# Emits a message to all clients connected in the given channel.
|
|
#
|
|
# @private
|
|
_emitToClients2: (channel, message) -> #message is a JS Object
|
|
console.log "in _emitToClients2:" unless message?.aliveID? or message?.aliveId?
|
|
channelViewers = @io.sockets.in(channel) #channel is the same as meetingID
|
|
|
|
#if the message has "header":{"name":"some_event_name"} use that name
|
|
#otherwise look for "name":"some_event_name" in the top level of the message
|
|
eventName = message?.header?.name or message?.name
|
|
|
|
console.log "**message name**: " + eventName unless message?.aliveID? or message?.aliveId?
|
|
console.log "channelViewers" + channelViewers[0] unless message?.aliveID? or message?.aliveId?
|
|
#console.log "message" + message
|
|
channelViewers.emit.apply(channelViewers, [eventName, message])
|
|
|
|
|
|
# When a user connected to the web socket.
|
|
# Several methods have callbacks but we don't need to wait for them all to run, they
|
|
# can just be triggered and the messages will be sent sometime.
|
|
#
|
|
# @param socket [Object] the socket that generated the event
|
|
# @private
|
|
_socket_onUserConnected2: (socket) ->
|
|
console.log("**userConnected2")
|
|
sessionID = fromSocket2(socket, "sessionID")
|
|
meetingID = fromSocket2(socket, "meetingID")
|
|
console.log "__meetingID=" + meetingID
|
|
@redisAction.isValidSession meetingID, sessionID, (err, reply) =>
|
|
if !reply
|
|
Logger.error "got invalid session for meeting #{meetingID}, session #{sessionID}"
|
|
else
|
|
username = fromSocket(socket, "username")
|
|
socketID = socket.id
|
|
socket.join meetingID # join the socket room with value of the meetingID
|
|
socket.join sessionID # join the socket room with value of the sessionID
|
|
|
|
Logger.info "got a valid session for meeting #{meetingID}, session #{sessionID}, username is '#{username}'"
|
|
|
|
# add socket to list of sockets
|
|
@redisAction.getUserProperties meetingID, sessionID, (err, properties) =>
|
|
Logger.info "publishing the list of users for #{meetingID}"
|
|
@redisPublisher.publishLoadUsers2 meetingID, null, =>
|
|
@redisPublisher.publishPresenter(meetingID)
|
|
|
|
numOfSockets = parseInt(properties.sockets, 10)
|
|
numOfSockets += 1
|
|
@redisStore.hset RedisKeys.getUserString(meetingID, sessionID), "sockets", numOfSockets
|
|
|
|
# if the user is not refreshing, it means its the first time he's entering the session
|
|
# all users should be notified
|
|
# when the user is refreshing the page the other users don't have to be notified
|
|
Logger.info "publishing user join for #{meetingID}"
|
|
receivers = (if properties.refreshing is "false" then null else sessionID)
|
|
@redisStore.hset RedisKeys.getUserString(meetingID, sessionID), "refreshing", false
|
|
@redisPublisher.publishUserJoin2 meetingID, receivers, properties.pubID, properties.username, =>
|
|
@redisPublisher.publishPresenter(meetingID, receivers)
|
|
|
|
# publish everything else we need to update for the client
|
|
Logger.info "publishing messages, slides and shapes to #{meetingID}, #{sessionID}"
|
|
@redisPublisher.publishMessages2(meetingID, sessionID)
|
|
@redisPublisher.publishSlides2 meetingID, sessionID, =>
|
|
@redisPublisher.publishCurrentImagePath(meetingID)
|
|
@redisPublisher.publishTool(meetingID, sessionID)
|
|
@redisPublisher.publishShapes2(meetingID, sessionID)
|
|
@redisPublisher.publishViewBox(meetingID, sessionID)
|
|
|
|
|
|
|
|
# When a user disconnects from the socket
|
|
#
|
|
# @param socket [Object] the socket that generated the event
|
|
# @private
|
|
_socket_onUserDisconnected: (socket) ->
|
|
sessionID = fromSocket(socket, "sessionID")
|
|
meetingID = fromSocket(socket, "meetingID")
|
|
|
|
# check if user is still in database
|
|
@redisAction.isValidSession meetingID, sessionID, (err, isValid) =>
|
|
if isValid
|
|
username = fromSocket(socket, "username")
|
|
socketID = socket.id
|
|
@redisAction.updateUserProperties meetingID, sessionID, ["refreshing", true], (success) =>
|
|
setTimeout (=>
|
|
@redisAction.isValidSession meetingID, sessionID, (err, isValid) =>
|
|
if isValid
|
|
@redisAction.getUserProperties meetingID, sessionID, (err, properties) =>
|
|
numOfSockets = parseInt(properties.sockets, 10)
|
|
numOfSockets -= 1
|
|
if numOfSockets is 0
|
|
@redisAction.deleteUser meetingID, sessionID, (err, reply) =>
|
|
@redisPublisher.publishUserLeave meetingID, sessionID, properties.pubID
|
|
else
|
|
@redisAction.updateUserProperties meetingID, sessionID, ["sockets", numOfSockets]
|
|
else
|
|
@redisPublisher.publishUsernames(meetingID)
|
|
|
|
), 5000 # @todo a 5 sec timeout, really?? timeouts are bad, there must be a better solution.
|
|
|
|
# When a user sends a chat message
|
|
#
|
|
# @param socket [Object] the socket that generated the event
|
|
# @param msg [string] the message received
|
|
# @private
|
|
_socket_onChatMessage2: (socket, msg) ->
|
|
msg = sanitizer.escape(msg)
|
|
sessionID = fromSocket(socket, "sessionID")
|
|
meetingID = fromSocket(socket, "meetingID")
|
|
@redisAction.isValidSession meetingID, sessionID, (err, reply) =>
|
|
if reply
|
|
if msg.length > config.maxChatLength
|
|
@redisPublisher.publishChatMessageTooLong(meetingID, sessionID)
|
|
else
|
|
@redisAction.getUserProperties meetingID, sessionID, (err, properties) =>
|
|
username = fromSocket(socket, "username")
|
|
@redisPublisher.publishChatMessage2(meetingID, username, msg, properties.pubID)
|
|
|
|
messageID = rack() # get a randomly generated id for the message
|
|
@redisStore.rpush RedisKeys.getMessagesString(meetingID, null, null), messageID #store the messageID in the list of messages
|
|
@redisStore.hmset RedisKeys.getMessageString(meetingID, null, null, messageID), "message", msg, "username", username, "userID", properties.pubID
|
|
|
|
# When a user logs out
|
|
#
|
|
# @param socket [Object] the socket that generated the event
|
|
# @private
|
|
_socket_onLogout: (socket) ->
|
|
sessionID = fromSocket(socket, "sessionID")
|
|
meetingID = fromSocket(socket, "meetingID")
|
|
@redisAction.isValidSession meetingID, sessionID, (err, isValid) =>
|
|
if isValid
|
|
|
|
@redisAction.getUserProperties meetingID, sessionID, (err, properties) =>
|
|
@redisPublisher.publishUserLeave meetingID, null, properties.pubID
|
|
|
|
# remove the user from the list of users
|
|
@redisStore.srem RedisKeys.getUsersString(meetingID), sessionID, (numDeleted) =>
|
|
|
|
# delete key from database
|
|
@redisStore.del RedisKeys.getUserString(meetingID, sessionID), (reply) =>
|
|
@redisPublisher.publishLogout(sessionID)
|
|
socket.disconnect() # disconnect own socket
|
|
|
|
# If a user requests all the shapes, publish the shapes to everyone.
|
|
# Only reason this happens is when its fit changes.
|
|
#
|
|
# @param socket [Object] the socket that generated the event
|
|
# @private
|
|
# @todo review if we really need it
|
|
_socket_onAllShapes: (socket) ->
|
|
meetingID = fromSocket(socket, "meetingID")
|
|
@redisPublisher.publishShapes2(meetingID)
|
|
|
|
# Returns a given attribute `attr` registered in the `socket`.
|
|
#
|
|
# @return {string} the value of the attribute requested
|
|
# @internal
|
|
fromSocket = (socket, attr) ->
|
|
|
|
#console.log("\n***handshake**\n")
|
|
#console.log(socket.handshake)
|
|
#console.log("handshake end--\n")
|
|
|
|
socket?.handshake?[attr]
|
|
|
|
# Returns a given attribute `attr` registered in the `socket`.
|
|
#
|
|
# @return {string} the value of the attribute requested
|
|
# @internal
|
|
fromSocket2 = (socket, attr) ->
|
|
|
|
###console.log("\n***handshake**\n")
|
|
console.log(socket.handshake)
|
|
console.log("handshake end--\n")###
|
|
|
|
socket?.handshake?[attr]
|
|
|
|
# Returns whether the current user is the presenter or not.
|
|
#
|
|
# @return {boolean}
|
|
# @internal
|
|
isCurrentPresenter = (socket, presenterID) ->
|
|
id = fromSocket(socket, "sessionID")
|
|
id? and presenterID is id
|
|
|
|
registerResponse = (method, err, reply, message="") ->
|
|
Utils.registerResponse "RedisWebsocketBridge##{method}", err, reply, message
|