event emitter for redis messages

This commit is contained in:
Anton Georgiev 2016-04-18 21:58:36 +00:00
parent 4f8b816b3e
commit 3f45800e7d
3 changed files with 786 additions and 566 deletions

View File

@ -34,3 +34,4 @@ reactive-var
ecmascript
react
react-meteor-data
raix:eventemitter

View File

@ -65,7 +65,10 @@ Meteor.RedisPubSub = (function () {
let eventName, message, messagesWeIgnore;
message = JSON.parse(jsonMsg);
eventName = message.header.name;
messagesWeIgnore = ['BbbPubSubPongMessage', 'bbb_apps_is_alive_message', 'broadcast_layout_message'];
messagesWeIgnore = [
'BbbPubSubPongMessage',
'bbb_apps_is_alive_message',
'broadcast_layout_message',];
if (indexOf.call(messagesWeIgnore, eventName) < 0) {
console.log(`Q ${eventName} ${Meteor.myQueue.total()}`);
return Meteor.myQueue.add({
@ -102,3 +105,743 @@ this.publish = function (channel, message) {
return Meteor.log.info('ERROR!! Meteor.redisPubSub was undefined');
}
};
var handleVoiceEvent = function(arg) {
let _voiceUser, meetingId;
meetingId = arg.payload.meeting_id;
_voiceUser = payload.user.voiceUser;
voiceUserObj = {
web_userid: _voiceUser.web_userid,
listen_only: arg.payload.listen_only,
talking: _voiceUser.talking,
joined: _voiceUser.joined,
locked: _voiceUser.locked,
muted: _voiceUser.muted,
};
return updateVoiceUser(meetingId, voiceUserObj, arg.callback);
}, userId;
var handleLockEvent = function(arg) {
let userId, isLocked;
userId = arg.payload.userid;
isLocked = arg.payload.locked;
setUserLockedStatus(meetingId, userId, isLocked);
return arg.callback();
};
var handleEndOfMeeting = function(arg) {
let meetingId;
meetingId = arg.payload.meeting_id;
Meteor.log.info(`DESTROYING MEETING ${meetingId}`);
return removeMeetingFromCollection(meetingId, arg.callback);
};
var handleChatEvent = function (arg) {
let messageObject, meetingId;
messageObject = arg.payload.message;
meetingId = arg.payload.meeting_id;
// use current_time instead of message.from_time so that the chats from Flash and HTML5 have uniform times
messageObject.from_time = arg.header.current_time;
addChatToCollection(meetingId, messageObject);
return arg.callback();
};
registerHandlers = function (emitter) {
console.log("REGISTER HANDLERS", emitter);
emitter.on('blah', function(arg) {
console.log("got " + "blah", arg.callback);
arg.callback();
});
emitter.on('get_users_reply', function (arg) {
if (arg.payload.requester_id === 'nodeJSapp') {
let users, processUser, meetingId;
console.log("get_users_reply handling");
users = arg.payload.users;
meetingId = arg.payload.meeting_id;
//TODO make the serialization be split per meeting. This will allow us to
// use N threads vs 1 and we'll take advantage of Mongo's concurrency tricks
// Processing the users recursively with a callback to notify us,
// ensuring that we update the users collection serially
processUser = function () {
let user;
user = users.pop();
if (user != null) {
user.timeOfJoining = arg.header.current_time;
if (user.emoji_status !== 'none' && typeof user.emoji_status === 'string') {
user.set_emoji_time = new Date();
return userJoined(meetingId, user, processUser);
} else {
return userJoined(meetingId, user, processUser);
}
} else {
return arg.callback(); // all meeting arrays (if any) have been processed
}
};
return processUser();
}
else {
console.log("NAAAAH");
arg.callback();
}
});
emitter.on('meeting_created_message', function(arg) {
meetingName = arg.payload.name;
intendedForRecording = arg.payload.recorded;
voiceConf = arg.payload.voice_conf;
duration = arg.payload.duration;
meetingId = arg.payload.meeting_id;
return addMeetingToCollection(meetingId, meetingName, intendedForRecording, voiceConf, duration, arg.callback);
});
emitter.on('get_all_meetings_reply', function(arg) {
let listOfMeetings, processMeeting;
Meteor.log.info("Let's store some data for the running meetings so that when an HTML5 client joins everything is ready!");
Meteor.log.info(JSON.stringify(arg.payload));
listOfMeetings = arg.payload.meetings;
// Processing the meetings recursively with a callback to notify us,
// ensuring that we update the meeting collection serially
processMeeting = function () {
let meeting;
meeting = listOfMeetings.pop();
if (meeting != null) {
return addMeetingToCollection(meeting.meetingID, meeting.meetingName, meeting.recorded, meeting.voiceBridge, meeting.duration, processMeeting);
} else {
return arg.callback(); // all meeting arrays (if any) have been processed
}
};
return processMeeting();
});
// VOICE EVENTS
emitter.on('user_left_voice_message', function(arg) {
handleVoiceEvent(arg);
});
emitter.on('user_joined_voice_message', function(arg) {
handleVoiceEvent(arg);
});
emitter.on('user_voice_talking_message', function(arg) {
handleVoiceEvent(arg);
});
emitter.on('user_voice_muted_message', function(arg) {
handleVoiceEvent(arg);
});
emitter.on('user_listening_only', function(arg) {
let voiceUserObj, meetingId;
voiceUserObj = {
web_userid: arg.payload.userid,
listen_only: arg.payload.listen_only,
};
meetingId = arg.payload.meeting_id;
return updateVoiceUser(meetingId, voiceUserObj, arg.callback);
});
emitter.on('user_left_message', function(arg) {
if (arg.payload.user != null && arg.payload.user.userid != null && arg.payload.meeting_id != null) {
let userId, meetingId;
meetingId = arg.payload.meeting_id;
userId = arg.payload.user.userid;
return markUserOffline(meetingId, userId, arg.callback);
} else {
return arg.callback();
}
});
emitter.on('validate_auth_token_reply', function(arg) {
let userId, user, validStatus, payload, meetingId;
meetingId = arg.payload.meeting_id;
userId = arg.payload.userid;
user = Meteor.Users.findOne({
userId: userId,
meetingId: meetingId,
});
validStatus = arg.payload.valid;
// if the user already exists in the db
if (user != null && user.clientType === 'HTML5') {
//if the html5 client user was validated successfully, add a flag
return Meteor.Users.update({
userId: userId,
meetingId: meetingId,
}, {
$set: {
validated: validStatus,
},
}, (err, numChanged) => {
let funct;
if (numChanged.insertedId != null) {
funct = function (cbk) {
let user, val;
user = Meteor.Users.findOne({
userId: userId,
meetingId: meetingId,
});
if (user != null) {
val = user.validated;
}
Meteor.log.info(`user.validated for user ${userId} in meeting ${user.meetingId} just became ${val}`);
return cbk();
};
return funct(arg.callback);
} else {
return arg.callback();
}
});
} else {
Meteor.log.info('a non-html5 user got validate_auth_token_reply.');
return arg.callback();
}
});
emitter.on('user_joined_message', function(arg) {
let userObj, dbUser, meetingId, payload;
meetingId = arg.payload.meeting_id;
payload = arg.payload;
userObj = payload.user;
dbUser = Meteor.Users.findOne({
userId: userObj.userid,
meetingId: meetingId,
});
// On attempting reconnection of Flash clients (in voiceBridge) we receive
// an extra user_joined_message. Ignore it as it will add an extra user
// in the user list, creating discrepancy with the list in the Flash client
if ((dbUser != null && dbUser.user != null && dbUser.user.connection_status === 'offline') && (payload.user != null && payload.user.phone_user)) {
Meteor.log.error('offline AND phone user');
return arg.callback(); //return without joining the user
} else {
if (dbUser != null && dbUser.clientType === 'HTML5') {
let status;
// typically html5 users will be in
// the db [as a dummy user] before the joining message
status = dbUser.validated;
Meteor.log.info(`in user_joined_message the validStatus of the user was ${status}`);
userObj.timeOfJoining = arg.header.current_time;
return userJoined(meetingId, userObj, arg.callback);
} else {
return userJoined(meetingId, userObj, arg.callback);
}
}
});
// for now not handling these serially #TODO
emitter.on('presenter_assigned_message', function(arg) {
let newPresenterId, meetingId;
meetingId = arg.payload.meeting_id;
newPresenterId = arg.payload.new_presenter_id;
if (newPresenterId != null) {
// reset the previous presenter
Meteor.Users.update({
'user.presenter': true,
meetingId: meetingId,
}, {
$set: {
'user.presenter': false,
},
}, (err, numUpdated) => {
return Meteor.log.info(` Updating old presenter numUpdated=${numUpdated}, err=${err}`);
});
// set the new presenter
Meteor.Users.update({
'user.userid': newPresenterId,
meetingId: meetingId,
}, {
$set: {
'user.presenter': true,
},
}, (err, numUpdated) => {
return Meteor.log.info(` Updating new presenter numUpdated=${numUpdated}, err=${err}`);
});
}
return arg.callback();
});
emitter.on('user_emoji_status_message', function(arg) {
let userId, meetingId, emojiStatus;
userId = arg.payload.userid;
meetingId = arg.payload.meeting_id;
emojiStatus = arg.payload.emoji_status;
if (userId != null && meetingId != null) {
let set_emoji_time;
set_emoji_time = new Date();
Meteor.Users.update({
'user.userid': userId,
}, {
$set: {
'user.set_emoji_time': set_emoji_time,
'user.emoji_status': emojiStatus,
},
}, (err, numUpdated) => {
return Meteor.log.info(` Updating emoji numUpdated=${numUpdated}, err=${err}`);
});
}
return arg.callback();
});
emitter.on('user_locked_message', function (arg) {
handleLockEvent(arg);
});
emitter.on('user_unlocked_message', function (arg) {
handleLockEvent(arg);
});
emitter.on('meeting_ended_message', function (arg) {
handleEndOfMeeting(arg);
});
emitter.on('meeting_destroyed_event', function (arg) {
handleEndOfMeeting(arg);
});
emitter.on('end_and_kick_all_message', function (arg) {
handleEndOfMeeting(arg);
});
emitter.on('disconnect_all_users_message', function (arg) {
handleEndOfMeeting(arg);
});
emitter.on('get_chat_history_reply', function (arg) {
if (arg.payload.requester_id === 'nodeJSapp') { //TODO extract this check
let meetingId;
meetingId = arg.payload.meeting_id;
if (Meteor.Meetings.findOne({
MeetingId: meetingId,
}) == null) {
let chatHistory, _chat_history_length, chatMessage;
chatHistory = arg.payload.chat_history;
_chat_history_length = chatHistory.length;
for (i = 0; i < _chat_history_length; i++) {
chatMessage = chatHistory[i];
addChatToCollection(meetingId, chatMessage);
}
}
}
return arg.callback();
});
emitter.on('send_public_chat_message', function (arg) {
handleChatEvent(arg);
});
emitter.on('send_private_chat_message', function (arg) {
handleChatEvent(arg);
});
emitter.on('presentation_shared_message', function (arg) {
let payload, meetingId;
payload = arg.payload;
meetingId = payload.meeting_id;
if (payload.presentation != null && payload.presentation.id != null && meetingId != null) {
let presentationId, pages, slide;
presentationId = payload.presentation.id;
// change the currently displayed presentation to presentation.current = false
Meteor.Presentations.update({
'presentation.current': true,
meetingId: meetingId,
}, {
$set: {
'presentation.current': false,
},
});
//update(if already present) entirely the presentation with the fresh data
removePresentationFromCollection(meetingId, presentationId);
addPresentationToCollection(meetingId, payload.presentation);
pages = payload.presentation.pages;
for (j = 0; j < pages.length; j++) {
slide = pages[j];
addSlideToCollection(
meetingId,
presentationId,
slide
);
}
}
return arg.callback();
});
emitter.on('get_presentation_info_reply', function (arg) {
if (arg.payload.requester_id === 'nodeJSapp') {
let presentations, payload, k, presentation, pages, page, l, meetingId, whiteboardId, replyTo, message;
payload = arg.payload;
meetingId = payload.meeting_id;
presentations = payload.presentations;
for (k = 0; k < payload.presentations.length; k++) {
presentation = presentations[k];
addPresentationToCollection(meetingId, presentation);
pages = presentation.pages;
for (l = 0; l < pages.length; l++) {
page = pages[l];
//add the slide to the collection
addSlideToCollection(meetingId, presentation.id, page);
//request for shapes
whiteboardId = `${presentation.id}/${page.num}`;
//Meteor.log.info "the whiteboard_id here is:" + whiteboardId
replyTo = `${meetingId}/nodeJSapp`;
message = {
payload: {
meeting_id: meetingId,
requester_id: 'nodeJSapp',
whiteboard_id: whiteboardId,
reply_to: replyTo,
},
header: {
timestamp: new Date().getTime(),
name: 'request_whiteboard_annotation_history_request',
},
};
if (whiteboardId != null && meetingId != null) {
publish(Meteor.config.redis.channels.toBBBApps.whiteboard, message);
} else {
Meteor.log.info('did not have enough information to send a user_leaving_request');
}
}
}
return arg.callback();
}
});
emitter.on('presentation_page_changed_message', function (arg) {
let newSlide, meetingId;
newSlide = arg.payload.page;
meetingId = arg.payload.meeting_id;
if (newSlide != null && newSlide.id != null && meetingId != null) {
displayThisSlide(meetingId, newSlide.id, newSlide);
}
return arg.callback();
});
emitter.on('presentation_removed_message', function (arg) {
let presentationId, meetingId;
meetingId = arg.payload.meeting_id;
presentationId = arg.payload.presentation_id;
if (meetingId != null && presentationId != null) {
removePresentationFromCollection(meetingId, presentationId);
}
return arg.callback();
});
emitter.on('get_whiteboard_shapes_reply', function (arg) {
if (arg.payload.requester_id === 'nodeJSapp') {
let meetingId, shapes, shapes_length, m, shape, whiteboardId;
meetingId = arg.payload.meeting_id;
// Create a whiteboard clean status or find one for the current meeting
if (Meteor.WhiteboardCleanStatus.findOne({
meetingId: meetingId,
}) == null) {
Meteor.WhiteboardCleanStatus.insert({
meetingId: meetingId,
in_progress: false,
});
}
shapes = payload.shapes;
shapes_length = shapes.length;
for (m = 0; m < shapes_length; m++) {
shape = shapes[m];
whiteboardId = shape.wb_id;
addShapeToCollection(meetingId, whiteboardId, shape);
}
return arg.callback();
}
});
emitter.on('send_whiteboard_shape_message', function (arg) {
let payload, shape, whiteboardId, meetingId;
payload = arg.payload;
meetingId = payload.meeting_id;
//Meteor stringifies an array of JSONs (...shape.result) in this message
//parsing the String and reassigning the value
if (payload.shape.shape_type === 'poll_result' && typeof payload.shape.shape.result === 'string') {
payload.shape.shape.result = JSON.parse(payload.shape.shape.result);
}
shape = payload.shape;
if (shape != null && shape.wb_id != null) {
whiteboardId = shape.wb_id;
}
addShapeToCollection(meetingId, whiteboardId, shape);
return arg.callback();
});
emitter.on('presentation_cursor_updated_message', function (arg) {
let cursor, meetingId;
meetingId = arg.payload.meeting_id;
cursor = {
x: arg.payload.x_percent,
y: arg.payload.y_percent,
};
// update the location of the cursor on the whiteboard
updateCursorLocation(meetingId, cursor);
return arg.callback();
});
emitter.on('whiteboard_cleared_message', function (arg) {
let whiteboardId, meetingId;
meetingId = arg.payload.meeting_id;
whiteboardId = arg.payload.whiteboard_id;
Meteor.WhiteboardCleanStatus.update({
meetingId: meetingId,
}, {
$set: {
in_progress: true,
},
});
removeAllShapesFromSlide(meetingId, whiteboardId);
return arg.callback();
});
emitter.on('undo_whiteboard_request', function (arg) {
let whiteboardId, meetingId, shapeId;
meetingId = arg.payload.meeting_id;
whiteboardId = arg.payload.whiteboard_id;
shapeId = arg.payload.shape_id;
removeShapeFromSlide(meetingId, whiteboardId, shapeId);
return arg.callback();
});
emitter.on('presentation_page_resized_message', function (arg) {
let page, payload;
payload = arg.payload;
page = payload.page;
if (page != null && page.id != null && page.height_ratio != null
&& page.width_ratio != null && page.x_offset != null && page.y_offset != null) {
let slideId, heightRatio, widthRatio, xOffset, yOffset, presentationId, currentSlide;
slideId = page.id;
heightRatio = page.height_ratio;
widthRatio = page.width_ratio;
xOffset = page.x_offset;
yOffset = page.y_offset;
presentationId = slideId.split('/')[0];
// In the case when we don't resize, but switch a slide, this message
// follows a 'presentation_page_changed' and all these properties are already set.
currentSlide = Meteor.Slides.findOne(
{ presentationId: presentationId,
'slide.current': true, });
if (currentSlide) {
currentSlide = currentSlide.slide;
}
if (currentSlide != null && (currentSlide.height_ratio != heightRatio || currentSlide.width_ratio != widthRatio
|| currentSlide.x_offset != xOffset || currentSlide.y_offset != yOffset)) {
Meteor.Slides.update({
presentationId: presentationId,
'slide.current': true,
}, {
$set: {
'slide.height_ratio': heightRatio,
'slide.width_ratio': widthRatio,
'slide.x_offset': xOffset,
'slide.y_offset': yOffset,
},
});
}
}
return arg.callback();
});
emitter.on('recording_status_changed_message', function (arg) {
let intendedForRecording, currentlyBeingRecorded, meetingId;
intendedForRecording = arg.payload.recorded;
currentlyBeingRecorded = arg.payload.recording;
meetingId = arg.payload.meeting_id;
Meteor.Meetings.update({
meetingId: meetingId,
intendedForRecording: intendedForRecording,
}, {
$set: {
currentlyBeingRecorded: currentlyBeingRecorded,
},
});
return arg.callback();
});
emitter.on('new_permission_settings', function (arg) {
let meetingObject, meetingId, oldSettings, newSettings, payload;
meetingId = arg.payload.meeting_id;
payload = arg.payload;
meetingObject = Meteor.Meetings.findOne({
meetingId: meetingId,
});
if (meetingObject != null && payload != null) {
oldSettings = meetingObject.roomLockSettings;
newSettings = payload.permissions;
// if the disableMic setting was turned on
if (oldSettings != null && !oldSettings.disableMic && newSettings.disableMic) {
handleLockingMic(meetingId, newSettings);
}
// substitute with the new lock settings
Meteor.Meetings.update({
meetingId: meetingId,
}, {
$set: {
'roomLockSettings.disablePrivateChat': newSettings.disablePrivateChat,
'roomLockSettings.disableCam': newSettings.disableCam,
'roomLockSettings.disableMic': newSettings.disableMic,
'roomLockSettings.lockOnJoin': newSettings.lockOnJoin,
'roomLockSettings.lockedLayout': newSettings.lockedLayout,
'roomLockSettings.disablePublicChat': newSettings.disablePublicChat,
'roomLockSettings.lockOnJoinConfigurable': newSettings.lockOnJoinConfigurable,
},
});
}
return arg.callback();
});
emitter.on('poll_show_result_message', function (arg) {
let payload, meetingId, poll_id;
payload = arg.payload;
meetingId = payload.meeting_id;
if (payload != null && payload.poll != null && payload.poll.id != null && meetingId != null) {
poll_id = payload.poll.id;
clearPollCollection(meetingId, poll_id);
}
return arg.callback();
});
emitter.on('poll_started_message', function (arg) {
let payload, meetingId, users;
payload = arg.payload;
meetingId = payload.meeting_id;
if (payload != null && meetingId != null && payload.requester_id != null && payload.poll != null) {
if (Meteor.Meetings.findOne({
meetingId: meetingId,
}) != null) {
users = Meteor.Users.find({
meetingId: meetingId,
}, {
fields: {
'user.userid': 1,
_id: 0,
},
}).fetch();
addPollToCollection(
payload.poll,
payload.requester_id,
users,
meetingId
);
}
}
return arg.callback();
});
emitter.on('poll_stopped_message', function (arg) {
let meetingId, payload, poll_id;
payload = arg.payload;
meetingId = payload.meeting_id;
if (meetingId != null && payload != null && payload.poll_id != null) {
poll_id = payload.poll_id;
clearPollCollection(meetingId, poll_id);
}
return arg.callback();
});
emitter.on('user_voted_poll_message', function (arg) {
let payload, meetingId, pollObj, requesterId;
payload = arg.payload;
meetingId = payload.meeting_id;
if (payload != null && payload.poll != null && meetingId != null && payload.presenter_id != null) {
pollObj = payload.poll;
requesterId = payload.presenter_id;
updatePollCollection(pollObj, meetingId, requesterId);
return arg.callback();
}
});
// TODO how to handle the rest of the messages - is there a wild card?
// we need a way of calling the callback
// emitter.on('' , function (arg) {
// console.log("**********************************" + arg.eventName);
// arg.callback();
// });
emitter.on('meeting_state_message' , function (arg) {
// do nothing
arg.callback();
});
emitter.on('user_registered_message' , function (arg) {
// do nothing
arg.callback();
});
//eject_voice_user_message
/*
// --------------------------------------------------
// lock settings ------------------------------------
// for now not handling this serially #TODO
} else if (eventName === 'eject_voice_user_message') {
return callback();
}
else { // keep moving in the queue
if (indexOf.call(notLoggedEventTypes, eventName) < 0) {
Meteor.log.info(`WARNING!!! THE JSON MESSAGE WAS NOT OF TYPE SUPPORTED BY THIS APPLICATION
${eventName}
{JSON.stringify(message)}` );
}
return callback();
}
}
*/
};

View File

@ -16,6 +16,10 @@ Meteor.startup(() => {
clearPollCollection();
clearCursorCollection();
var eventEmitter = new EventEmitter();
registerHandlers(eventEmitter);
// create create a PubSub connection, start listening
Meteor.redisPubSub = new Meteor.RedisPubSub(function () {
return Meteor.log.info('created pubsub');
@ -25,29 +29,36 @@ Meteor.startup(() => {
// autoStart:true
// isPaused: true
});
Meteor.myQueue.taskHandler = function (data, next, failures) {
let eventName, parsedMsg;
let eventName, parsedMsg, length, lengthString;
parsedMsg = JSON.parse(data.jsonMsg);
// length = Meteor.myQueue.length();
// lengthString = '' + function () {
// if (length > 0) {
// return `In the queue we have ${length} event(s) to process.`;
// }
// };
//
// Meteor.log.info(`in callback after handleRedisMessage ${eventName}. ${lengthString()}`);
if (parsedMsg != null) {
eventName = parsedMsg.header.name;
}
console.log("in taskHandler:" + eventName);
if (failures > 0) {
return Meteor.log.error(`got a failure on taskHandler ${eventName} ${failures}`);
// TODO should we stop or instead return next?
} else {
return handleRedisMessage(data, () => {
let length, lengthString;
length = Meteor.myQueue.length();
lengthString = function () {
if (length > 0) {
return `In the queue we have ${length} event(s) to process.`;
} else {
return '';
}
};
console.log("^^^^^^^^", parsedMsg);
return eventEmitter.emit(eventName, {
payload: parsedMsg.payload,
header: parsedMsg.header,
Meteor.log.info(`in callback after handleRedisMessage ${eventName}. ${lengthString()}`);
return next();
callback: () => {
console.log("ready for next message");
return next();
},
});
}
};
@ -61,7 +72,7 @@ Meteor.startup(() => {
// due to not following the order of events coming through the redis pubsub.
// for example: a user_left event reaching the collection before a user_joined
// for the same user.
return this.handleRedisMessage = function (data, callback) {
/*return this.handleRedisMessage = function (data, bigCallback) {
let chatMessage, currentlyBeingRecorded, cursor, dbUser, duration, emojiStatus, eventName, heightRatio, i, intendedForRecording;
let isLocked, j, k, l, listOfMeetings, m, meetingId, meetingName, meetingObject, message, messageObject, newPresenterId, newSettings;
let newSlide, notLoggedEventTypes, oldSettings, page, pages, pollObj, poll_id, presentation, presentationId, processMeeting, processUser;
@ -100,568 +111,33 @@ Meteor.startup(() => {
'bbb_apps_is_alive_message',
'user_voice_talking_message',
'meeting_state_message',
'get_recording_status_reply', ];
'get_recording_status_reply',];
if (message == null || message.header == null || payload == null) {
Meteor.log.error('ERROR!! No header or payload');
callback();
bigCallback();
} else {
Meteor.log.info("_________________1");
eventEmitter.emit("blah", {
// eventEmitter.emit(eventName, {
meetingId: meetingId,
message: message, //this is the JSON message
c: function() {
console.log("in c:", bigCallback);
// bigCallback();
},
});
}
// LOG in the meteor console
if (eventName, indexOf.call(notLoggedEventTypes, eventName) < 0) {
Meteor.log.info(`redis incoming message ${eventName} `, {
message: data.jsonMsg,
});
}
// we currently disregard the pattern and channel
if (message != null && message.header != null && payload != null) {
if (eventName === 'meeting_created_message') {
// Meteor.log.error JSON.stringify message
meetingName = payload.name;
intendedForRecording = payload.recorded;
voiceConf = payload.voice_conf;
duration = payload.duration;
return addMeetingToCollection(meetingId, meetingName, intendedForRecording, voiceConf, duration, callback);
// TODO should I call callback here?!
// handle voice events
} else if ((payload.user != null) && (eventName === 'user_left_voice_message' || eventName === 'user_joined_voice_message' || eventName === 'user_voice_talking_message' || eventName === 'user_voice_muted_message')) {
_voiceUser = payload.user.voiceUser;
voiceUserObj = {
web_userid: _voiceUser.web_userid,
listen_only: payload.listen_only,
talking: _voiceUser.talking,
joined: _voiceUser.joined,
locked: _voiceUser.locked,
muted: _voiceUser.muted,
};
return updateVoiceUser(meetingId, voiceUserObj, callback);
} else if (eventName === 'user_listening_only') {
voiceUserObj = {
web_userid: payload.userid,
listen_only: payload.listen_only,
};
return updateVoiceUser(meetingId, voiceUserObj, callback);
} else if (eventName === 'get_all_meetings_reply') {
Meteor.log.info("Let's store some data for the running meetings so that when an HTML5 client joins everything is ready!");
Meteor.log.info(JSON.stringify(message));
listOfMeetings = payload.meetings;
// Processing the meetings recursively with a callback to notify us,
// ensuring that we update the meeting collection serially
processMeeting = function () {
let meeting;
meeting = listOfMeetings.pop();
if (meeting != null) {
return addMeetingToCollection(meeting.meetingID, meeting.meetingName, meeting.recorded, meeting.voiceBridge, meeting.duration, processMeeting);
} else {
return callback(); // all meeting arrays (if any) have been processed
}
};
return processMeeting();
} else if (eventName === 'user_joined_message') {
userObj = payload.user;
dbUser = Meteor.Users.findOne({
userId: userObj.userid,
meetingId: meetingId,
});
// On attempting reconnection of Flash clients (in voiceBridge) we receive
// an extra user_joined_message. Ignore it as it will add an extra user
// in the user list, creating discrepancy with the list in the Flash client
if ((dbUser != null && dbUser.user != null && dbUser.user.connection_status === 'offline') && (payload.user != null && payload.user.phone_user)) {
Meteor.log.error('offline AND phone user');
return callback(); //return without joining the user
} else {
if (dbUser != null && dbUser.clientType === 'HTML5') {
// typically html5 users will be in
// the db [as a dummy user] before the joining message
status = dbUser.validated;
Meteor.log.info(`in user_joined_message the validStatus of the user was ${status}`);
userObj.timeOfJoining = message.header.current_time;
return userJoined(meetingId, userObj, callback);
} else {
return userJoined(meetingId, userObj, callback);
}
}
// only process if requester is nodeJSapp means only process in the case when
// we explicitly request the users
} else if (eventName === 'get_users_reply' && payload.requester_id === 'nodeJSapp') {
users = payload.users;
//TODO make the serialization be split per meeting. This will allow us to
// use N threads vs 1 and we'll take advantage of Mongo's concurrency tricks
// Processing the users recursively with a callback to notify us,
// ensuring that we update the users collection serially
processUser = function () {
let user;
user = users.pop();
if (user != null) {
user.timeOfJoining = message.header.current_time;
if (user.emoji_status !== 'none' && typeof user.emoji_status === 'string') {
user.set_emoji_time = new Date();
return userJoined(meetingId, user, processUser);
} else {
return userJoined(meetingId, user, processUser);
}
} else {
return callback(); // all meeting arrays (if any) have been processed
}
};
return processUser();
} else if (eventName === 'validate_auth_token_reply') {
userId = payload.userid;
user = Meteor.Users.findOne({
userId: userId,
meetingId: meetingId,
});
validStatus = payload.valid;
// if the user already exists in the db
if (user != null && user.clientType === 'HTML5') {
//if the html5 client user was validated successfully, add a flag
return Meteor.Users.update({
userId: userId,
meetingId: meetingId,
}, {
$set: {
validated: validStatus,
},
}, (err, numChanged) => {
let funct;
if (numChanged.insertedId != null) {
funct = function (cbk) {
let user, val;
user = Meteor.Users.findOne({
userId: userId,
meetingId: meetingId,
});
if (user != null) {
val = user.validated;
}
Meteor.log.info(`user.validated for user ${userId} in meeting ${user.meetingId} just became ${val}`);
return cbk();
};
return funct(callback);
} else {
return callback();
}
});
} else {
Meteor.log.info('a non-html5 user got validate_auth_token_reply.');
return callback();
}
} else if (eventName === 'user_left_message') {
if (payload.user != null && payload.user.userid != null && meetingId != null) {
userId = payload.user.userid;
return markUserOffline(meetingId, userId, callback);
} else {
return callback(); //TODO check how to get these cases out and reuse code
}
// for now not handling this serially #TODO
} else if (eventName === 'presenter_assigned_message') {
newPresenterId = payload.new_presenter_id;
if (newPresenterId != null) {
// reset the previous presenter
Meteor.Users.update({
'user.presenter': true,
meetingId: meetingId,
}, {
$set: {
'user.presenter': false,
},
}, (err, numUpdated) => {
return Meteor.log.info(` Updating old presenter numUpdated=${numUpdated}, err=${err}`);
});
// set the new presenter
Meteor.Users.update({
'user.userid': newPresenterId,
meetingId: meetingId,
}, {
$set: {
'user.presenter': true,
},
}, (err, numUpdated) => {
return Meteor.log.info(` Updating new presenter numUpdated=${numUpdated}, err=${err}`);
});
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'user_emoji_status_message') {
userId = payload.userid;
emojiStatus = payload.emoji_status;
if (userId != null && meetingId != null) {
set_emoji_time = new Date();
Meteor.Users.update({
'user.userid': userId,
}, {
$set: {
'user.set_emoji_time': set_emoji_time,
'user.emoji_status': emojiStatus,
},
}, (err, numUpdated) => {
return Meteor.log.info(` Updating emoji numUpdated=${numUpdated}, err=${err}`);
});
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'user_locked_message' || eventName === 'user_unlocked_message') {
userId = payload.userid;
isLocked = payload.locked;
setUserLockedStatus(meetingId, userId, isLocked);
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'meeting_ended_message' || eventName === 'meeting_destroyed_event' || eventName === 'end_and_kick_all_message' || eventName === 'disconnect_all_users_message') {
Meteor.log.info(`DESTROYING MEETING ${meetingId}`);
return removeMeetingFromCollection(meetingId, callback);
// for now not handling this serially #TODO
} else if (eventName === 'get_chat_history_reply' && payload.requester_id === 'nodeJSapp') {
if (Meteor.Meetings.findOne({
MeetingId: meetingId,
}) == null) {
chatHistory = payload.chat_history;
_chat_history_length = chatHistory.length;
for (i = 0; i < _chat_history_length; i++) {
chatMessage = chatHistory[i];
addChatToCollection(meetingId, chatMessage);
}
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'send_public_chat_message' || eventName === 'send_private_chat_message') {
messageObject = payload.message;
// use current_time instead of message.from_time so that the chats from Flash and HTML5 have uniform times
messageObject.from_time = message.header.current_time;
addChatToCollection(meetingId, messageObject);
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'presentation_shared_message') {
if (payload.presentation != null && payload.presentation.id != null && meetingId != null) {
presentationId = payload.presentation.id;
// change the currently displayed presentation to presentation.current = false
Meteor.Presentations.update({
'presentation.current': true,
meetingId: meetingId,
}, {
$set: {
'presentation.current': false,
},
});
//update(if already present) entirely the presentation with the fresh data
removePresentationFromCollection(meetingId, presentationId);
addPresentationToCollection(meetingId, payload.presentation);
pages = payload.presentation.pages;
for (j = 0; j < pages.length; j++) {
slide = pages[j];
addSlideToCollection(
meetingId,
presentationId,
slide
);
}
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'get_presentation_info_reply' && payload.requester_id === 'nodeJSapp') {
presentations = payload.presentations;
for (k = 0; k < payload.presentations.length; k++) {
presentation = presentations[k];
addPresentationToCollection(meetingId, presentation);
pages = presentation.pages;
for (l = 0; l < pages.length; l++) {
page = pages[l];
//add the slide to the collection
addSlideToCollection(meetingId, presentation.id, page);
//request for shapes
whiteboardId = `${presentation.id}/${page.num}`;
//Meteor.log.info "the whiteboard_id here is:" + whiteboardId
replyTo = `${meetingId}/nodeJSapp`;
message = {
payload: {
meeting_id: meetingId,
requester_id: 'nodeJSapp',
whiteboard_id: whiteboardId,
reply_to: replyTo,
},
header: {
timestamp: new Date().getTime(),
name: 'request_whiteboard_annotation_history_request',
},
};
if (whiteboardId != null && meetingId != null) {
publish(Meteor.config.redis.channels.toBBBApps.whiteboard, message);
} else {
Meteor.log.info('did not have enough information to send a user_leaving_request');
}
}
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'presentation_page_changed_message') {
newSlide = payload.page;
if (newSlide != null && newSlide.id != null && meetingId != null) {
displayThisSlide(meetingId, newSlide.id, newSlide);
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'presentation_removed_message') {
presentationId = payload.presentation_id;
if (meetingId != null && presentationId != null) {
removePresentationFromCollection(meetingId, presentationId);
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'get_whiteboard_shapes_reply' && payload.requester_id === 'nodeJSapp') {
// Create a whiteboard clean status or find one for the current meeting
if (Meteor.WhiteboardCleanStatus.findOne({
meetingId: meetingId,
}) == null) {
Meteor.WhiteboardCleanStatus.insert({
meetingId: meetingId,
in_progress: false,
});
}
shapes = payload.shapes;
shapes_length = shapes.length;
for (m = 0; m < shapes_length; m++) {
shape = shapes[m];
whiteboardId = shape.wb_id;
addShapeToCollection(meetingId, whiteboardId, shape);
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'send_whiteboard_shape_message') {
//Meteor stringifies an array of JSONs (...shape.result) in this message
//parsing the String and reassigning the value
if (payload.shape.shape_type === 'poll_result' && typeof payload.shape.shape.result === 'string') {
payload.shape.shape.result = JSON.parse(payload.shape.shape.result);
}
shape = payload.shape;
if (shape != null && shape.wb_id != null) {
whiteboardId = shape.wb_id;
}
addShapeToCollection(meetingId, whiteboardId, shape);
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'presentation_cursor_updated_message') {
cursor = {
x: payload.x_percent,
y: payload.y_percent,
};
// update the location of the cursor on the whiteboard
updateCursorLocation(meetingId, cursor);
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'whiteboard_cleared_message') {
whiteboardId = payload.whiteboard_id;
Meteor.WhiteboardCleanStatus.update({
meetingId: meetingId,
}, {
$set: {
in_progress: true,
},
});
removeAllShapesFromSlide(meetingId, whiteboardId);
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'undo_whiteboard_request') {
whiteboardId = payload.whiteboard_id;
shapeId = payload.shape_id;
removeShapeFromSlide(meetingId, whiteboardId, shapeId);
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'presentation_page_resized_message') {
page = payload.page;
if (page != null && page.id != null && page.height_ratio != null && page.width_ratio != null && page.x_offset != null && page.y_offset != null) {
slideId = page.id;
heightRatio = page.height_ratio;
widthRatio = page.width_ratio;
xOffset = page.x_offset;
yOffset = page.y_offset;
presentationId = slideId.split('/')[0];
/*In the case when we don't resize, but switch a slide, this message
follows a 'presentation_page_changed' and all these properties are already set. */
var currentSlide = Meteor.Slides.findOne(
{ presentationId: presentationId,
'slide.current': true, });
if (currentSlide) {
currentSlide = currentSlide.slide;
}
if (currentSlide != null && (currentSlide.height_ratio != heightRatio || currentSlide.width_ratio != widthRatio
|| currentSlide.x_offset != xOffset || currentSlide.y_offset != yOffset)) {
Meteor.Slides.update({
presentationId: presentationId,
'slide.current': true,
}, {
$set: {
'slide.height_ratio': heightRatio,
'slide.width_ratio': widthRatio,
'slide.x_offset': xOffset,
'slide.y_offset': yOffset,
},
});
}
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'recording_status_changed_message') {
intendedForRecording = payload.recorded;
currentlyBeingRecorded = payload.recording;
Meteor.Meetings.update({
meetingId: meetingId,
intendedForRecording: intendedForRecording,
}, {
$set: {
currentlyBeingRecorded: currentlyBeingRecorded,
},
});
return callback();
// --------------------------------------------------
// lock settings ------------------------------------
// for now not handling this serially #TODO
} else if (eventName === 'eject_voice_user_message') {
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'new_permission_settings') {
meetingObject = Meteor.Meetings.findOne({
meetingId: meetingId,
});
if (meetingObject != null && payload != null) {
oldSettings = meetingObject.roomLockSettings;
newSettings = payload.permissions;
// if the disableMic setting was turned on
if (oldSettings != null && !oldSettings.disableMic && newSettings.disableMic) {
handleLockingMic(meetingId, newSettings);
}
// substitute with the new lock settings
Meteor.Meetings.update({
meetingId: meetingId,
}, {
$set: {
'roomLockSettings.disablePrivateChat': newSettings.disablePrivateChat,
'roomLockSettings.disableCam': newSettings.disableCam,
'roomLockSettings.disableMic': newSettings.disableMic,
'roomLockSettings.lockOnJoin': newSettings.lockOnJoin,
'roomLockSettings.lockedLayout': newSettings.lockedLayout,
'roomLockSettings.disablePublicChat': newSettings.disablePublicChat,
'roomLockSettings.lockOnJoinConfigurable': newSettings.lockOnJoinConfigurable,
},
});
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'poll_started_message') {
if (payload != null && meetingId != null && payload.requester_id != null && payload.poll != null) {
if (Meteor.Meetings.findOne({
meetingId: meetingId,
}) != null) {
users = Meteor.Users.find({
meetingId: meetingId,
}, {
fields: {
'user.userid': 1,
_id: 0,
},
}).fetch();
addPollToCollection(
payload.poll,
payload.requester_id,
users,
meetingId
);
}
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'poll_stopped_message') {
if (meetingId != null && payload != null && payload.poll_id != null) {
poll_id = payload.poll_id;
clearPollCollection(meetingId, poll_id);
}
return callback();
// for now not handling this serially #TODO
} else if (eventName === 'user_voted_poll_message') {
if (payload != null && payload.poll != null && meetingId != null && payload.presenter_id != null) {
pollObj = payload.poll;
requesterId = payload.presenter_id;
updatePollCollection(pollObj, meetingId, requesterId);
return callback();
}
// for now not handling this serially #TODO
} else if (eventName === 'poll_show_result_message') {
if (payload != null && payload.poll != null && payload.poll.id != null && meetingId != null) {
poll_id = payload.poll.id;
clearPollCollection(meetingId, poll_id);
}
return callback();
} else { // keep moving in the queue
if (indexOf.call(notLoggedEventTypes, eventName) < 0) {
Meteor.log.info(`WARNING!!! THE JSON MESSAGE WAS NOT OF TYPE SUPPORTED BY THIS APPLICATION
${eventName}
{JSON.stringify(message)}` );
}
return callback();
}
} else {
return callback();
}
};
*/
});