Skip to content

Commit

Permalink
Add group call
Browse files Browse the repository at this point in the history
  • Loading branch information
borbuevbeksultan committed Nov 25, 2024
1 parent 715d502 commit f1e80c5
Show file tree
Hide file tree
Showing 11 changed files with 1,405 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.openfuture.openmessenger.kurento

import io.openfuture.openmessenger.kurento.groupcall.CallHandler
import io.openfuture.openmessenger.kurento.groupcall.RoomManager
import io.openfuture.openmessenger.kurento.groupcall.UserRegistry
import org.kurento.client.KurentoClient
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
Expand Down Expand Up @@ -33,7 +36,23 @@ class KurentoWebsocketConfigurer(
}

override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
registry.addHandler(groupCallHandler(), "/groupcall").setAllowedOriginPatterns("*")
registry.addHandler(handler(), "/helloworld").setAllowedOriginPatterns("*")
}
@Bean
fun registry(): UserRegistry {
return UserRegistry()
}

@Bean
fun roomManager(): RoomManager {
return RoomManager()
}

@Bean
fun groupCallHandler(): CallHandler {
return CallHandler()
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.openfuture.openmessenger.kurento.groupcall

import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.JsonObject
import org.kurento.client.IceCandidate
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.socket.CloseStatus
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import java.io.IOException

class CallHandler : TextWebSocketHandler() {
@Autowired
private val roomManager: RoomManager? = null

@Autowired
private val registry: UserRegistry? = null

@Throws(Exception::class)
public override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
val jsonMessage = gson.fromJson(message.payload, JsonObject::class.java)

val user = registry!!.getBySession(session)

if (user != null) {
log.debug("Incoming message from user '{}': {}", user.name, jsonMessage)
} else {
log.debug("Incoming message from new user: {}", jsonMessage)
}

when (jsonMessage["id"].asString) {
"joinRoom" -> joinRoom(jsonMessage, session)
"receiveVideoFrom" -> {
val senderName = jsonMessage["sender"].asString
val sender = registry.getByName(senderName)
val sdpOffer = jsonMessage["sdpOffer"].asString
user!!.receiveVideoFrom(sender!!, sdpOffer)
}

"leaveRoom" -> leaveRoom(user!!)
"onIceCandidate" -> {
val candidate = jsonMessage["candidate"].asJsonObject

if (user != null) {
val cand = IceCandidate(
candidate["candidate"].asString,
candidate["sdpMid"].asString, candidate["sdpMLineIndex"].asInt
)
user.addCandidate(cand, jsonMessage["name"].asString)
}
}

else -> {}
}
}

@Throws(Exception::class)
override fun afterConnectionClosed(session: WebSocketSession, status: CloseStatus) {
val user = registry!!.removeBySession(session)
roomManager!!.getRoom(user.roomName).leave(user)
}

@Throws(IOException::class)
private fun joinRoom(params: JsonObject, session: WebSocketSession) {
val roomName = params["room"].asString
val name = params["name"].asString
log.info("PARTICIPANT {}: trying to join room {}", name, roomName)

val room = roomManager!!.getRoom(roomName)
val user = room.join(name, session)
registry!!.register(user)
}

@Throws(IOException::class)
private fun leaveRoom(user: UserSession) {
val room = roomManager!!.getRoom(user.roomName)
room.leave(user)
if (room.participants.isEmpty()) {
roomManager.removeRoom(room)
}
}

companion object {
private val log: Logger = LoggerFactory.getLogger(CallHandler::class.java)

private val gson: Gson = GsonBuilder().create()
}
}
152 changes: 152 additions & 0 deletions src/main/kotlin/io/openfuture/openmessenger/kurento/groupcall/Room.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package io.openfuture.openmessenger.kurento.groupcall

import com.google.gson.*
import org.kurento.client.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.web.socket.WebSocketSession
import java.io.Closeable
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import javax.annotation.PreDestroy

class Room(val name: String?, private val pipeline: MediaPipeline) : Closeable {
private val log: Logger = LoggerFactory.getLogger(Room::class.java)

val participants: ConcurrentMap<String?, UserSession> = ConcurrentHashMap()

init {
log.info("ROOM {} has been created", name)
}

@PreDestroy
private fun shutdown() {
this.close()
}

@Throws(IOException::class)
fun join(userName: String, session: WebSocketSession): UserSession {
log.info("ROOM {}: adding participant {}", this.name, userName)
val participant = UserSession(userName, this.name, session, this.pipeline)
joinRoom(participant)
participants[participant.name] = participant
sendParticipantNames(participant)
return participant
}

@Throws(IOException::class)
fun leave(user: UserSession) {
log.debug("PARTICIPANT {}: Leaving room {}", user.name, this.name)
this.removeParticipant(user.name)
user.close()
}

@Throws(IOException::class)
private fun joinRoom(newParticipant: UserSession): Collection<String?> {
val newParticipantMsg = JsonObject()
newParticipantMsg.addProperty("id", "newParticipantArrived")
newParticipantMsg.addProperty("name", newParticipant.name)

val participantsList: MutableList<String?> = ArrayList(participants.values.size)
log.debug(
"ROOM {}: notifying other participants of new participant {}", name,
newParticipant.name
)

for (participant in participants.values) {
try {
participant.sendMessage(newParticipantMsg)
} catch (e: IOException) {
log.debug("ROOM {}: participant {} could not be notified", name, participant.name, e)
}
participantsList.add(participant.name)
}

return participantsList
}

@Throws(IOException::class)
private fun removeParticipant(name: String?) {
participants.remove(name)

log.debug("ROOM {}: notifying all users that {} is leaving the room", this.name, name)

val unnotifiedParticipants: MutableList<String?> = ArrayList()
val participantLeftJson = JsonObject()
participantLeftJson.addProperty("id", "participantLeft")
participantLeftJson.addProperty("name", name)
for (participant in participants.values) {
try {
participant.cancelVideoFrom(name)
participant.sendMessage(participantLeftJson)
} catch (e: IOException) {
unnotifiedParticipants.add(participant.name)
}
}

if (!unnotifiedParticipants.isEmpty()) {
log.debug(
"ROOM {}: The users {} could not be notified that {} left the room", this.name,
unnotifiedParticipants, name
)
}
}

@Throws(IOException::class)
fun sendParticipantNames(user: UserSession) {
val participantsArray = JsonArray()
for (participant in this.getParticipants()) {
if (participant != user) {
val participantName: JsonElement = JsonPrimitive(participant.name)
participantsArray.add(participantName)
}
}

val existingParticipantsMsg = JsonObject()
existingParticipantsMsg.addProperty("id", "existingParticipants")
existingParticipantsMsg.add("data", participantsArray)
log.debug(
"PARTICIPANT {}: sending a list of {} participants", user.name,
participantsArray.size()
)
user.sendMessage(existingParticipantsMsg)
}

fun getParticipants(): Collection<UserSession> {
return participants.values
}

fun getParticipant(name: String?): UserSession? {
return participants[name]
}

override fun close() {
for (user in participants.values) {
try {
user.close()
} catch (e: IOException) {
log.debug(
"ROOM {}: Could not invoke close on participant {}", this.name, user.name,
e
)
}
}

participants.clear()

pipeline.release(object : Continuation<Void?> {
@Throws(Exception::class)
override fun onSuccess(result: Void?) {
log.trace("ROOM {}: Released Pipeline", this@Room.name)
}

@Throws(Exception::class)
override fun onError(cause: Throwable) {
log.warn("PARTICIPANT {}: Could not release Pipeline", this@Room.name)
}
})

log.debug("Room {} closed", this.name)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* (C) Copyright 2014 Kurento (http://kurento.org/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.openfuture.openmessenger.kurento.groupcall

import org.kurento.client.KurentoClient
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap

class RoomManager {
private val log: Logger = LoggerFactory.getLogger(RoomManager::class.java)

@Autowired
private val kurento: KurentoClient? = null

private val rooms: ConcurrentMap<String?, Room> = ConcurrentHashMap()

fun getRoom(roomName: String?): Room {
log.debug("Searching for room {}", roomName)
var room = rooms[roomName]

if (room == null) {
log.debug("Room {} not existent. Will create now!", roomName)
room = Room(roomName, kurento!!.createMediaPipeline())
rooms[roomName] = room
}
log.debug("Room {} found!", roomName)
return room
}

fun removeRoom(room: Room) {
rooms.remove(room.name)
room.close()
log.info("Room {} removed and closed", room.name)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.openfuture.openmessenger.kurento.groupcall

import org.springframework.web.socket.WebSocketSession
import java.util.concurrent.ConcurrentHashMap

class UserRegistry {
private val usersByName = ConcurrentHashMap<String?, UserSession>()
private val usersBySessionId = ConcurrentHashMap<String, UserSession>()

fun register(user: UserSession) {
usersByName[user.name] = user
usersBySessionId[user.session.id] = user
}

fun getByName(name: String?): UserSession? {
return usersByName[name]
}

fun getBySession(session: WebSocketSession): UserSession? {
return usersBySessionId[session.id]
}

fun exists(name: String?): Boolean {
return usersByName.keys.contains(name)
}

fun removeBySession(session: WebSocketSession): UserSession {
val user = getBySession(session)!!
usersByName.remove(user.name)
usersBySessionId.remove(session.id)
return user
}
}
Loading

0 comments on commit f1e80c5

Please sign in to comment.