2017-10-12 10:00:28 +08:00
/* global PowerQueue */
import Redis from 'redis' ;
import { Meteor } from 'meteor/meteor' ;
import { EventEmitter2 } from 'eventemitter2' ;
import { check } from 'meteor/check' ;
2020-12-08 04:57:33 +08:00
import fs from 'fs' ;
2017-10-12 10:00:28 +08:00
import Logger from './logger' ;
2020-12-10 23:07:06 +08:00
import Metrics from './metrics' ;
2017-10-12 10:00:28 +08:00
// Fake meetingId used for messages that have no meetingId
const NO _MEETING _ID = '_' ;
2020-12-11 01:05:22 +08:00
const { queueMetrics } = Meteor . settings . private . redis . metrics ;
2020-12-08 04:57:33 +08:00
2019-04-06 06:32:21 +08:00
const makeEnvelope = ( channel , eventName , header , body , routing ) => {
2017-10-12 10:00:28 +08:00
const envelope = {
envelope : {
name : eventName ,
2019-04-06 06:32:21 +08:00
routing : routing || {
2020-12-12 05:36:06 +08:00
sender : 'html5-server' ,
2017-10-12 10:00:28 +08:00
} ,
2019-10-19 00:50:38 +08:00
timestamp : Date . now ( ) ,
2017-10-12 10:00:28 +08:00
} ,
core : {
header ,
body ,
} ,
} ;
return JSON . stringify ( envelope ) ;
} ;
2019-02-16 03:45:42 +08:00
class MeetingMessageQueue {
2020-11-13 10:03:57 +08:00
constructor ( eventEmitter , asyncMessages = [ ] , redisDebugEnabled = false ) {
2017-10-12 10:00:28 +08:00
this . asyncMessages = asyncMessages ;
this . emitter = eventEmitter ;
this . queue = new PowerQueue ( ) ;
2020-11-13 10:03:57 +08:00
this . redisDebugEnabled = redisDebugEnabled ;
2017-10-12 10:00:28 +08:00
this . handleTask = this . handleTask . bind ( this ) ;
this . queue . taskHandler = this . handleTask ;
}
handleTask ( data , next ) {
const { channel } = data ;
const { envelope } = data . parsedMessage ;
const { header } = data . parsedMessage . core ;
const { body } = data . parsedMessage . core ;
2018-01-08 08:25:56 +08:00
const { meetingId } = header ;
2017-10-12 10:00:28 +08:00
const eventName = header . name ;
const isAsync = this . asyncMessages . includes ( channel )
|| this . asyncMessages . includes ( eventName ) ;
2020-12-10 02:06:25 +08:00
const beginHandleTimestamp = Date . now ( ) ;
2017-10-12 10:00:28 +08:00
let called = false ;
check ( eventName , String ) ;
check ( body , Object ) ;
const callNext = ( ) => {
if ( called ) return ;
2020-11-13 10:03:57 +08:00
if ( this . redisDebugEnabled ) {
Logger . debug ( ` Redis: ${ eventName } completed ${ isAsync ? 'async' : 'sync' } ` ) ;
}
2017-10-12 10:00:28 +08:00
called = true ;
2020-12-08 04:57:33 +08:00
if ( queueMetrics ) {
const queueId = meetingId || NO _MEETING _ID ;
const dataLength = JSON . stringify ( data ) . length ;
2020-12-10 23:07:06 +08:00
Metrics . processEvent ( queueId , eventName , dataLength , beginHandleTimestamp ) ;
2020-12-08 04:57:33 +08:00
}
2018-04-03 04:16:46 +08:00
const queueLength = this . queue . length ( ) ;
2020-11-25 23:32:45 +08:00
if ( queueLength > 100 ) {
2020-11-13 10:03:57 +08:00
Logger . warn ( ` Redis: MeetingMessageQueue for meetingId= ${ meetingId } has queue size= ${ queueLength } ` ) ;
2018-04-03 04:16:46 +08:00
}
2017-10-12 10:00:28 +08:00
next ( ) ;
} ;
const onError = ( reason ) => {
2019-06-29 02:51:26 +08:00
Logger . error ( ` ${ eventName } : ${ reason . stack ? reason . stack : reason } ` ) ;
2017-10-12 10:00:28 +08:00
callNext ( ) ;
} ;
try {
2020-11-13 10:03:57 +08:00
if ( this . redisDebugEnabled ) {
Logger . debug ( ` Redis: ${ JSON . stringify ( data . parsedMessage . core ) } emitted ` ) ;
}
2017-10-12 10:00:28 +08:00
if ( isAsync ) {
callNext ( ) ;
}
this . emitter
. emitAsync ( eventName , { envelope , header , body } , meetingId )
. then ( callNext )
. catch ( onError ) ;
} catch ( reason ) {
onError ( reason ) ;
}
}
add ( ... args ) {
return this . queue . add ( ... args ) ;
}
}
2017-10-13 03:07:02 +08:00
class RedisPubSub {
2017-10-12 10:00:28 +08:00
static handlePublishError ( err ) {
if ( err ) {
Logger . error ( err ) ;
}
}
constructor ( config = { } ) {
this . config = config ;
this . didSendRequestEvent = false ;
2019-04-13 03:55:25 +08:00
const host = process . env . REDIS _HOST || Meteor . settings . private . redis . host ;
const redisConf = Meteor . settings . private . redis ;
2020-12-12 05:36:06 +08:00
this . instanceId = parseInt ( process . env . INSTANCE _ID , 10 ) || 1 ; // 1 also handles running in dev mode
2021-02-16 11:19:31 +08:00
this . role = process . env . BBB _HTML5 _ROLE ;
2020-12-12 09:45:38 +08:00
this . customRedisChannel = ` to-html5-redis-channel ${ this . instanceId } ` ;
2020-11-19 04:31:36 +08:00
2019-04-13 03:55:25 +08:00
const { password , port } = redisConf ;
2019-06-29 02:51:26 +08:00
if ( password ) {
2019-04-13 03:55:25 +08:00
this . pub = Redis . createClient ( { host , port , password } ) ;
this . sub = Redis . createClient ( { host , port , password } ) ;
this . pub . auth ( password ) ;
this . sub . auth ( password ) ;
} else {
this . pub = Redis . createClient ( { host , port } ) ;
this . sub = Redis . createClient ( { host , port } ) ;
2019-04-10 01:58:56 +08:00
}
2020-12-08 20:37:59 +08:00
if ( queueMetrics ) {
2020-12-10 23:07:06 +08:00
Metrics . startDumpFile ( ) ;
2020-12-08 20:37:59 +08:00
}
2017-10-12 10:00:28 +08:00
this . emitter = new EventEmitter2 ( ) ;
2021-02-16 23:12:25 +08:00
this . meetingsQueues = { } ;
2021-02-16 11:19:31 +08:00
// We create this _ meeting queue because we need to be able to handle system messages (no meetingId in core.header)
2021-02-16 23:12:25 +08:00
this . meetingsQueues [ NO _MEETING _ID ] = new MeetingMessageQueue ( this . emitter , this . config . async , this . config . debug ) ;
2017-10-12 10:00:28 +08:00
this . handleSubscribe = this . handleSubscribe . bind ( this ) ;
this . handleMessage = this . handleMessage . bind ( this ) ;
}
init ( ) {
this . sub . on ( 'psubscribe' , Meteor . bindEnvironment ( this . handleSubscribe ) ) ;
this . sub . on ( 'pmessage' , Meteor . bindEnvironment ( this . handleMessage ) ) ;
const channelsToSubscribe = this . config . subscribeTo ;
2020-12-12 09:45:38 +08:00
2020-12-15 09:55:57 +08:00
channelsToSubscribe . push ( this . customRedisChannel ) ;
2017-10-12 10:00:28 +08:00
2021-02-16 11:19:31 +08:00
switch ( this . role ) {
2021-02-06 01:47:46 +08:00
case 'frontend' :
this . sub . psubscribe ( 'from-akka-apps-frontend-redis-channel' ) ;
if ( this . redisDebugEnabled ) {
Logger . debug ( ` Redis: NodeJSPool: ${ this . instanceId } Role: frontend. Subscribed to 'from-akka-apps-frontend-redis-channel' ` ) ;
}
break ;
2021-02-16 11:19:31 +08:00
case 'backend' :
2021-02-06 01:47:46 +08:00
channelsToSubscribe . forEach ( ( channel ) => {
this . sub . psubscribe ( channel ) ;
if ( this . redisDebugEnabled ) {
Logger . debug ( ` Redis: NodeJSPool: ${ this . instanceId } Role: backend. Subscribed to ' ${ channelsToSubscribe } ' ` ) ;
}
} ) ;
2021-02-16 11:19:31 +08:00
break ;
default :
this . sub . psubscribe ( 'from-akka-apps-frontend-redis-channel' ) ;
channelsToSubscribe . forEach ( ( channel ) => {
this . sub . psubscribe ( channel ) ;
if ( this . redisDebugEnabled ) {
Logger . debug ( ` Redis: NodeJSPool: ${ this . instanceId } Role: ${ this . role } (likely only one nodejs running, doing both frontend and backend. Dev env? ). Subscribed to ' ${ channelsToSubscribe } ' ` ) ;
}
} ) ;
2021-02-06 01:47:46 +08:00
break ;
2020-11-13 10:03:57 +08:00
}
2017-10-12 10:00:28 +08:00
}
updateConfig ( config ) {
this . config = Object . assign ( { } , this . config , config ) ;
2020-11-13 10:03:57 +08:00
this . redisDebugEnabled = this . config . debug ;
2017-10-12 10:00:28 +08:00
}
2018-04-24 21:59:13 +08:00
2017-10-12 10:00:28 +08:00
// TODO: Move this out of this class, maybe pass as a callback to init?
handleSubscribe ( ) {
2021-02-16 11:19:31 +08:00
if ( this . didSendRequestEvent || this . role === 'frontend' ) return ;
2017-10-12 10:00:28 +08:00
// populate collections with pre-existing data
2018-01-08 08:24:05 +08:00
const REDIS _CONFIG = Meteor . settings . private . redis ;
2017-10-12 10:00:28 +08:00
const CHANNEL = REDIS _CONFIG . channels . toAkkaApps ;
const EVENT _NAME = 'GetAllMeetingsReqMsg' ;
const body = {
requesterId : 'nodeJSapp' ,
2020-12-12 05:36:06 +08:00
html5InstanceId : this . instanceId ,
2017-10-12 10:00:28 +08:00
} ;
this . publishSystemMessage ( CHANNEL , EVENT _NAME , body ) ;
this . didSendRequestEvent = true ;
}
handleMessage ( pattern , channel , message ) {
const parsedMessage = JSON . parse ( message ) ;
const { ignored : ignoredMessages , async } = this . config ;
2021-02-16 11:19:31 +08:00
const eventName = parsedMessage . core . header . name ;
2017-10-12 10:00:28 +08:00
if ( ignoredMessages . includes ( channel )
|| ignoredMessages . includes ( eventName ) ) {
2018-10-17 01:48:27 +08:00
if ( eventName === 'CheckAlivePongSysMsg' ) {
return ;
}
2020-11-13 10:03:57 +08:00
if ( this . redisDebugEnabled ) {
Logger . debug ( ` Redis: ${ eventName } skipped ` ) ;
}
2017-10-12 10:00:28 +08:00
return ;
}
2021-02-25 04:56:14 +08:00
if ( this . redisDebugEnabled ) {
Logger . warn ( 'Received event to handle' , { date : new Date ( ) . toISOString ( ) , eventName } ) ;
}
2021-02-16 11:19:31 +08:00
// System messages like Create / Destroy Meeting, etc do not have core.header.meetingId.
// Process them in MeetingQueue['_'] --- the NO_MEETING queueId
const meetingIdFromMessageCoreHeader = parsedMessage . core . header . meetingId || NO _MEETING _ID ;
2017-10-12 10:00:28 +08:00
2021-02-16 11:19:31 +08:00
if ( this . role === 'frontend' ) {
// receiving this message means we need to look at it. Frontends do not have instanceId.
if ( meetingIdFromMessageCoreHeader === NO _MEETING _ID ) { // if this is a system message
2020-11-19 04:31:36 +08:00
2021-02-16 11:19:31 +08:00
if ( eventName === 'MeetingCreatedEvtMsg' || eventName === 'SyncGetMeetingInfoRespMsg' ) {
const meetingIdFromMessageMeetingProp = parsedMessage . core . body . props . meetingProp . intId ;
2021-02-16 23:12:25 +08:00
this . meetingsQueues [ meetingIdFromMessageMeetingProp ] = new MeetingMessageQueue ( this . emitter , async , this . redisDebugEnabled ) ;
2021-02-25 04:56:14 +08:00
if ( this . redisDebugEnabled ) {
Logger . warn ( 'Created frontend queue for meeting' , { date : new Date ( ) . toISOString ( ) , eventName , meetingIdFromMessageMeetingProp } ) ;
}
2021-02-16 11:19:31 +08:00
}
2020-11-19 04:31:36 +08:00
}
2020-12-12 09:45:38 +08:00
2021-02-18 04:51:36 +08:00
if ( ! this . meetingsQueues [ meetingIdFromMessageCoreHeader ] ) {
Logger . warn ( ` Frontend meeting queue had not been initialized ${ message } ` , { eventName , meetingIdFromMessageCoreHeader } )
this . meetingsQueues [ NO _MEETING _ID ] . add ( {
pattern ,
channel ,
eventName ,
parsedMessage ,
} ) ;
} else {
// process the event - whether it's a system message or not, the meetingIdFromMessageCoreHeader value is adjusted
this . meetingsQueues [ meetingIdFromMessageCoreHeader ] . add ( {
pattern ,
channel ,
eventName ,
parsedMessage ,
} ) ;
}
2021-02-16 11:19:31 +08:00
} else {
if ( meetingIdFromMessageCoreHeader === NO _MEETING _ID ) { // if this is a system message
const meetingIdFromMessageMeetingProp = parsedMessage . core . body . props ? . meetingProp ? . intId ;
const instanceIdFromMessage = parsedMessage . core . body . props ? . systemProps ? . html5InstanceId ; // end meeting message does not seem to have systemProps
if ( this . instanceId === instanceIdFromMessage ) {
// create queue or destroy queue
if ( eventName === 'MeetingCreatedEvtMsg' || eventName === 'SyncGetMeetingInfoRespMsg' ) {
2021-02-16 23:12:25 +08:00
this . meetingsQueues [ meetingIdFromMessageMeetingProp ] = new MeetingMessageQueue ( this . emitter , async , this . redisDebugEnabled ) ;
2021-02-25 04:56:14 +08:00
if ( this . redisDebugEnabled ) {
Logger . warn ( 'Created backend queue for meeting' , { date : new Date ( ) . toISOString ( ) , eventName , meetingIdFromMessageMeetingProp } ) ;
}
2021-02-16 11:19:31 +08:00
}
2021-02-16 23:12:25 +08:00
this . meetingsQueues [ NO _MEETING _ID ] . add ( {
2021-02-16 11:19:31 +08:00
pattern ,
channel ,
eventName ,
parsedMessage ,
} ) ;
} else {
if ( eventName === 'MeetingEndedEvtMsg' || eventName === 'MeetingDestroyedEvtMsg' ) {
// MeetingEndedEvtMsg does not follow the system message pattern for meetingId
// but we still need to process it on the backend which is processing the rest of the events
// for this meetingId (it does not contain instanceId either, so we cannot compare that)
const meetingIdForMeetingEnded = parsedMessage . core . body . meetingId ;
2021-02-16 23:12:25 +08:00
if ( ! ! this . meetingsQueues [ meetingIdForMeetingEnded ] ) {
this . meetingsQueues [ NO _MEETING _ID ] . add ( {
2021-02-16 11:19:31 +08:00
pattern ,
channel ,
eventName ,
parsedMessage ,
} ) ;
}
}
2021-03-16 00:30:02 +08:00
// ignore
2021-02-16 11:19:31 +08:00
}
} else {
// add to existing queue
2021-02-16 23:12:25 +08:00
if ( ! ! this . meetingsQueues [ meetingIdFromMessageCoreHeader ] ) {
2021-02-16 11:19:31 +08:00
// only handle message if we have a queue for the meeting. If we don't have a queue, it means it's for a different instanceId
2021-02-16 23:12:25 +08:00
this . meetingsQueues [ meetingIdFromMessageCoreHeader ] . add ( {
2021-02-16 11:19:31 +08:00
pattern ,
channel ,
eventName ,
parsedMessage ,
} ) ;
2021-02-25 04:56:14 +08:00
} else {
2021-03-16 00:55:51 +08:00
Logger . warn ( 'Backend meeting queue had not been initialized' , { eventName , meetingIdFromMessageCoreHeader } )
2021-02-16 11:19:31 +08:00
}
}
2020-11-19 04:31:36 +08:00
}
2017-10-12 10:00:28 +08:00
}
2021-02-16 11:19:31 +08:00
2017-10-12 10:00:28 +08:00
destroyMeetingQueue ( id ) {
2021-02-16 23:12:25 +08:00
delete this . meetingsQueues [ id ] ;
2017-10-12 10:00:28 +08:00
}
on ( ... args ) {
return this . emitter . on ( ... args ) ;
}
publishVoiceMessage ( channel , eventName , voiceConf , payload ) {
const header = {
name : eventName ,
voiceConf ,
} ;
const envelope = makeEnvelope ( channel , eventName , header , payload ) ;
2017-10-13 03:07:02 +08:00
return this . pub . publish ( channel , envelope , RedisPubSub . handlePublishError ) ;
2017-10-12 10:00:28 +08:00
}
publishSystemMessage ( channel , eventName , payload ) {
const header = {
name : eventName ,
} ;
const envelope = makeEnvelope ( channel , eventName , header , payload ) ;
2017-10-13 03:07:02 +08:00
return this . pub . publish ( channel , envelope , RedisPubSub . handlePublishError ) ;
2017-10-12 10:00:28 +08:00
}
publishMeetingMessage ( channel , eventName , meetingId , payload ) {
const header = {
name : eventName ,
meetingId ,
} ;
const envelope = makeEnvelope ( channel , eventName , header , payload ) ;
2017-10-13 03:07:02 +08:00
return this . pub . publish ( channel , envelope , RedisPubSub . handlePublishError ) ;
2017-10-12 10:00:28 +08:00
}
publishUserMessage ( channel , eventName , meetingId , userId , payload ) {
const header = {
name : eventName ,
meetingId ,
userId ,
} ;
2020-05-22 22:45:28 +08:00
if ( ! meetingId || ! userId ) {
2020-05-24 20:22:10 +08:00
Logger . warn ( ` Publishing ${ eventName } with potentially missing data userId= ${ userId } meetingId= ${ meetingId } ` ) ;
2020-05-22 22:45:28 +08:00
}
2019-04-06 06:32:21 +08:00
const envelope = makeEnvelope ( channel , eventName , header , payload , { meetingId , userId } ) ;
2017-10-12 10:00:28 +08:00
2017-10-13 03:07:02 +08:00
return this . pub . publish ( channel , envelope , RedisPubSub . handlePublishError ) ;
2017-10-12 10:00:28 +08:00
}
}
2017-10-13 03:07:02 +08:00
const RedisPubSubSingleton = new RedisPubSub ( ) ;
2017-10-12 10:00:28 +08:00
Meteor . startup ( ( ) => {
2018-01-08 08:24:05 +08:00
const REDIS _CONFIG = Meteor . settings . private . redis ;
2017-10-12 10:00:28 +08:00
RedisPubSubSingleton . updateConfig ( REDIS _CONFIG ) ;
RedisPubSubSingleton . init ( ) ;
} ) ;
export default RedisPubSubSingleton ;