Skip to content

Commit

Permalink
Merge pull request #11 from mconf/develop
Browse files Browse the repository at this point in the history
chore: update from develop
  • Loading branch information
pedrobmarin authored Feb 4, 2022
2 parents be1d55c + c0228be commit 009f546
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 523 deletions.
2 changes: 1 addition & 1 deletion config/default.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ hooks:
- from-akka-apps-redis-channel
- from-bbb-web-redis-channel
- from-akka-apps-chat-redis-channel
- from-akka-apps-pres-redis-channel
- bigbluebutton:from-bbb-apps:meeting
- bigbluebutton:from-bbb-apps:users
- bigbluebutton:from-bbb-apps:chat
- bigbluebutton:from-rap
# IP where permanent hook will post data (more than 1 URL means more than 1 permanent hook)
permanentURLs: []
Expand Down
46 changes: 31 additions & 15 deletions hook.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module.exports = class Hook {
this.id = null;
this.callbackURL = null;
this.externalMeetingID = null;
this.eventID = null;
this.queue = [];
this.emitter = null;
this.redisClient = Application.redisClient();
Expand Down Expand Up @@ -82,20 +83,26 @@ module.exports = class Hook {
// Puts a new message in the queue. Will also trigger a processing in the queue so this
// message might be processed instantly.
enqueue(message) {
this.redisClient.llen(config.get("redis.keys.eventsPrefix") + ":" + this.id, (error, reply) => {
const length = reply;
if (length < config.get("hooks.queueSize") && this.queue.length < config.get("hooks.queueSize")) {
Logger.info(`[Hook] ${this.callbackURL} enqueueing message:`, JSON.stringify(message));
// Add message to redis queue
this.redisClient.rpush(config.get("redis.keys.eventsPrefix") + ":" + this.id, JSON.stringify(message), (error,reply) => {
if (error != null) { Logger.error("[Hook] error pushing event to redis queue:", JSON.stringify(message), error); }
});
this.queue.push(JSON.stringify(message));
this._processQueue();
} else {
Logger.warn(`[Hook] ${this.callbackURL} queue size exceed, event:`, JSON.stringify(message));
}
});
// If the event is not in the hook's event list, skip it.
if ((this.eventID != null) && ((message == null) || (message.data == null) || (message.data.id == null) || (!this.eventID.some( ev => ev == message.data.id.toLowerCase() )))) {
Logger.info(`[Hook] ${this.callbackURL} skipping message from queue because not in event list for hook:`, JSON.stringify(message));
}
else {
this.redisClient.llen(config.get("redis.keys.eventsPrefix") + ":" + this.id, (error, reply) => {
const length = reply;
if (length < config.get("hooks.queueSize") && this.queue.length < config.get("hooks.queueSize")) {
Logger.info(`[Hook] ${this.callbackURL} enqueueing message:`, JSON.stringify(message));
// Add message to redis queue
this.redisClient.rpush(config.get("redis.keys.eventsPrefix") + ":" + this.id, JSON.stringify(message), (error,reply) => {
if (error != null) { Logger.error("[Hook] error pushing event to redis queue:", JSON.stringify(message), error); }
});
this.queue.push(JSON.stringify(message));
this._processQueue();
} else {
Logger.warn(`[Hook] ${this.callbackURL} queue size exceed, event:`, JSON.stringify(message));
}
});
}
}

toRedis() {
Expand All @@ -106,6 +113,7 @@ module.exports = class Hook {
"getRaw": this.getRaw
};
if (this.externalMeetingID != null) { r.externalMeetingID = this.externalMeetingID; }
if (this.eventID != null) { r.eventID = this.eventID.join(); }
return r;
}

Expand All @@ -119,6 +127,11 @@ module.exports = class Hook {
} else {
this.externalMeetingID = null;
}
if (redisData.eventID != null) {
this.eventID = redisData.eventID.toLowerCase().split(',');
} else {
this.eventID = null;
}
}

// Gets the first message in the queue and start an emitter to send it. Will only do it
Expand Down Expand Up @@ -155,7 +168,7 @@ module.exports = class Hook {
});
}

static addSubscription(callbackURL, meetingID, getRaw, callback) {
static addSubscription(callbackURL, meetingID, eventID, getRaw, callback) {
let hook = Hook.findByCallbackURLSync(callbackURL);
if (hook != null) {
return (typeof callback === 'function' ? callback(new Error("There is already a subscription for this callback URL"), hook) : undefined);
Expand All @@ -167,6 +180,9 @@ module.exports = class Hook {
hook = new Hook();
hook.callbackURL = callbackURL;
hook.externalMeetingID = meetingID;
if (eventID != null) {
hook.eventID = eventID.toLowerCase().split(',');
}
hook.getRaw = getRaw;
hook.permanent = config.get("hooks.permanentURLs").some( obj => {
return obj.url === callbackURL
Expand Down
78 changes: 61 additions & 17 deletions messageMapping.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ module.exports = class MessageMapping {
"UserDisconnectedFromTransferEvtMsg",
];
this.chatEvents = [
"SendPublicMessageEvtMsg",
"SendPrivateMessageEvtMsg",
"GroupChatMessageBroadcastEvtMsg",
];
this.rapEvents = [
"PublishedRecordingSysMsg",
Expand Down Expand Up @@ -70,6 +69,9 @@ module.exports = class MessageMapping {
"unpublished",
"deleted",
];
this.padEvents = [
"PadContentEvtMsg"
];
}

// Map internal message based on it's type
Expand All @@ -88,6 +90,8 @@ module.exports = class MessageMapping {
this.compUserTemplate(messageObj);
} else if (this.mappedEvent(messageObj,this.compRapEvents)) {
this.compRapTemplate(messageObj);
} else if (this.mappedEvent(messageObj,this.padEvents)) {
this.padTemplate(messageObj);
}
}

Expand Down Expand Up @@ -141,6 +145,15 @@ module.exports = class MessageMapping {
}
};
}
if (messageObj.envelope.name === "SetCurrentPresentationEvtMsg") {
this.mappedObject.data.attributes = {
"meeting":{
"internal-meeting-id": meetingId,
"external-meeting-id": IDMapping.getExternalMeetingID(meetingId),
"presentation-id": messageObj.core.body.presentationId
}
};
}
this.mappedMessage = JSON.stringify(this.mappedObject);
Logger.info("[MessageMapping] Mapped message:", this.mappedMessage);
}
Expand Down Expand Up @@ -212,6 +225,9 @@ module.exports = class MessageMapping {
"ts": Date.now()
}
};
if (msgBody.bot) {
this.mappedObject.data["attributes"]["user"]["role"] = 'BOT';
}
if (this.mappedObject.data["id"] === "user-audio-voice-enabled") {
this.mappedObject.data["attributes"]["user"]["listening-only"] = msgBody.listenOnly;
this.mappedObject.data["attributes"]["user"]["sharing-mic"] = ! msgBody.listenOnly;
Expand Down Expand Up @@ -299,7 +315,10 @@ module.exports = class MessageMapping {

// Map internal to external message for chat information
chatTemplate(messageObj) {
const message = messageObj.core.body.message;
const { body } = messageObj.core;
// Ignore private chats
if (body.chatId !== 'MAIN-PUBLIC-GROUP-CHAT') return;

this.mappedObject.data = {
"type": "event",
"id": this.mapInternalMessage(messageObj),
Expand All @@ -309,25 +328,20 @@ module.exports = class MessageMapping {
"external-meeting-id": IDMapping.getExternalMeetingID(messageObj.envelope.routing.meetingId)
},
"chat-message":{
"message": message.message,
"message": body.msg.message,
"sender":{
"internal-user-id": message.fromUserId,
"external-user-id": message.fromUsername,
"timezone-offset": message.fromTimezoneOffset,
"time": message.fromTime
"internal-user-id": body.msg.sender.id,
"external-user-id": body.msg.sender.name,
"timezone-offset": body.msg.fromTimezoneOffset,
"time": body.msg.timestamp
}
}
},
"chat-id": body.chatId
},
"event":{
"ts": Date.now()
}
};
if (messageObj.envelope.name.indexOf("Private") !== -1) {
this.mappedObject.data.attributes["chat-message"].receiver = {
"internal-user-id": message.toUserId,
"external-user-id": message.toUsername
};
}
this.mappedMessage = JSON.stringify(this.mappedObject);
Logger.info("[MessageMapping] Mapped message:", this.mappedMessage);
}
Expand Down Expand Up @@ -444,6 +458,36 @@ module.exports = class MessageMapping {
}
}

padTemplate(messageObj) {
const {
body,
header,
} = messageObj.core;
this.mappedObject.data = {
"type": "event",
"id": this.mapInternalMessage(messageObj),
"attributes":{
"meeting":{
"internal-meeting-id": header.meetingId,
"external-meeting-id": IDMapping.getExternalMeetingID(header.meetingId)
},
"pad":{
"id": body.padId,
"external-pad-id": body.externalId,
"rev": body.rev,
"start": body.start,
"end": body.end,
"text": body.text
}
},
"event":{
"ts": Date.now()
}
};
this.mappedMessage = JSON.stringify(this.mappedObject);
Logger.info("[MessageMapping] Mapped message:", this.mappedMessage);
}

mapInternalMessage(message) {
let name;
if (message.envelope) {
Expand All @@ -469,8 +513,7 @@ module.exports = class MessageMapping {
case "PresenterAssignedEvtMsg": return "user-presenter-assigned";
case "PresenterUnassignedEvtMsg": return "user-presenter-unassigned";
case "UserEmojiChangedEvtMsg": return "user-emoji-changed";
case "SendPublicMessageEvtMsg": return "chat-public-message-sent";
case "SendPrivateMessageEvtMsg": return "chat-private-message-sent";
case "GroupChatMessageBroadcastEvtMsg": return "chat-group-message-sent";
case "UserConnectedToTransferEvtMsg": return "user-joined";
case "UserDisconnectedFromTransferEvtMsg": return "user-left";
case "archive_started": return "rap-archive-started";
Expand Down Expand Up @@ -503,6 +546,7 @@ module.exports = class MessageMapping {
case "user_shared_webcam_message": return "user-cam-broadcast-start";
case "video_stream_unpublished": return "user-cam-broadcast-end";
case "user_status_changed_message": return this.handleUserStatusChanged(message);
case "PadContentEvtMsg": return "pad-content";
} })();
return mappedMsg;
}
Expand Down
Loading

0 comments on commit 009f546

Please sign in to comment.