diff --git a/conf/janus.jcfg.sample.in b/conf/janus.jcfg.sample.in
index eb7c5bed70..02f3d91bc9 100644
--- a/conf/janus.jcfg.sample.in
+++ b/conf/janus.jcfg.sample.in
@@ -294,6 +294,14 @@ nat: {
#ice_lite = true
#ice_tcp = true
+ # By default, Janus implements a grace period when detecting ICE
+ # failures in PeerConnections, to give time to applications to react
+ # to that, e.g., by enforcing an ICE restart. If you want an ICE
+ # failure to result in the PeerConnection being closed right away
+ # (e.g., with the help of consent freshness) then you can do that
+ # by uncommenting the following property and set it to true
+ #hangup_on_failed = true
+
# By default Janus tries to resolve mDNS (.local) candidates: even
# though this is now done asynchronously and shouldn't keep the API
# busy, even in case mDNS resolution takes a long time to timeout,
diff --git a/html/audiobridgetest.html b/html/audiobridgetest.html
index 90abc151e3..d2800e6c70 100644
--- a/html/audiobridgetest.html
+++ b/html/audiobridgetest.html
@@ -83,8 +83,12 @@
Demo details
Participants
- Mute
- Position
+
+ Position
+ Mute
+ Suspend
+
+
diff --git a/html/audiobridgetest.js b/html/audiobridgetest.js
index 9d9a8d95d5..2d07788bdf 100644
--- a/html/audiobridgetest.js
+++ b/html/audiobridgetest.js
@@ -24,7 +24,7 @@ if(getQueryStringValue("group") !== "")
var myusername = null;
var myid = null;
var webrtcUp = false;
-var audioenabled = false;
+var audioenabled = false, audiosuspended = false;
$(document).ready(function() {
@@ -146,6 +146,7 @@ $(document).ready(function() {
let display = escapeXmlTags(list[f]["display"]);
let setup = list[f]["setup"];
let muted = list[f]["muted"];
+ let suspended = list[f]["suspended"];
let spatial = list[f]["spatial_position"];
Janus.debug(" >> [" + id + "] " + display + " (setup=" + setup + ", muted=" + muted + ")");
if($('#rp' + id).length === 0) {
@@ -156,8 +157,9 @@ $(document).ready(function() {
$('#list').append('' +
slider +
display +
- ' ' +
- ' ');
+ ' ' +
+ ' ' +
+ ' ');
if(spatial !== null && spatial !== undefined) {
$('#sp' + id).slider({ min: 0, max: 100, step: 1, value: 50, handle: 'triangle', enabled: false });
$('#position').removeClass('hide').show();
@@ -172,6 +174,10 @@ $(document).ready(function() {
$('#rp' + id + ' > i.absetup').hide();
else
$('#rp' + id + ' > i.absetup').removeClass('hide').show();
+ if(suspended === true)
+ $('#rp' + id + ' > i.absusp').removeClass('hide').show();
+ else
+ $('#rp' + id + ' > i.absusp').hide();
if(spatial !== null && spatial !== undefined)
$('#sp' + id).slider('setValue', spatial);
}
@@ -190,6 +196,7 @@ $(document).ready(function() {
let display = escapeXmlTags(list[f]["display"]);
let setup = list[f]["setup"];
let muted = list[f]["muted"];
+ let suspended = list[f]["suspended"];
let spatial = list[f]["spatial_position"];
Janus.debug(" >> [" + id + "] " + display + " (setup=" + setup + ", muted=" + muted + ")");
if($('#rp' + id).length === 0) {
@@ -200,8 +207,9 @@ $(document).ready(function() {
$('#list').append('' +
slider +
display +
- ' ' +
- ' ');
+ ' ' +
+ ' ' +
+ ' ');
if(spatial !== null && spatial !== undefined) {
$('#sp' + id).slider({ min: 0, max: 100, step: 1, value: 50, handle: 'triangle', enabled: false });
$('#position').removeClass('hide').show();
@@ -216,6 +224,10 @@ $(document).ready(function() {
$('#rp' + id + ' > i.absetup').hide();
else
$('#rp' + id + ' > i.absetup').removeClass('hide').show();
+ if(suspended === true)
+ $('#rp' + id + ' > i.absusp').removeClass('hide').show();
+ else
+ $('#rp' + id + ' > i.absusp').hide();
if(spatial !== null && spatial !== undefined)
$('#sp' + id).slider('setValue', spatial);
}
@@ -228,6 +240,10 @@ $(document).ready(function() {
});
} else if(event === "event") {
if(msg["participants"]) {
+ if(msg["resumed"]) {
+ // This is a full recap after a suspend: clear the list of participants
+ $('#list').empty();
+ }
let list = msg["participants"];
Janus.debug("Got a list of participants:", list);
for(let f in list) {
@@ -235,6 +251,7 @@ $(document).ready(function() {
let display = escapeXmlTags(list[f]["display"]);
let setup = list[f]["setup"];
let muted = list[f]["muted"];
+ let suspended = list[f]["suspended"];
let spatial = list[f]["spatial_position"];
Janus.debug(" >> [" + id + "] " + display + " (setup=" + setup + ", muted=" + muted + ")");
if($('#rp' + id).length === 0) {
@@ -245,8 +262,9 @@ $(document).ready(function() {
$('#list').append('' +
slider +
display +
- ' ' +
- ' ');
+ ' ' +
+ ' ' +
+ ' ');
if(spatial !== null && spatial !== undefined) {
$('#sp' + id).slider({ min: 0, max: 100, step: 1, value: 50, handle: 'triangle', enabled: false });
$('#position').removeClass('hide').show();
@@ -261,9 +279,19 @@ $(document).ready(function() {
$('#rp' + id + ' > i.absetup').hide();
else
$('#rp' + id + ' > i.absetup').removeClass('hide').show();
+ if(suspended === true)
+ $('#rp' + id + ' > i.absusp').removeClass('hide').show();
+ else
+ $('#rp' + id + ' > i.absusp').hide();
if(spatial !== null && spatial !== undefined)
$('#sp' + id).slider('setValue', spatial);
}
+ } else if(msg["suspended"]) {
+ let id = msg["suspended"];
+ $('#rp' + id + ' > i.absusp').removeClass('hide').show();
+ } else if(msg["resumed"]) {
+ let id = msg["resumed"];
+ $('#rp' + id + ' > i.absusp').hide();
} else if(msg["error"]) {
if(msg["error_code"] === 485) {
// This is a "no such room" error: give a more meaningful description
@@ -330,6 +358,21 @@ $(document).ready(function() {
$('#toggleaudio').html("Unmute").removeClass("btn-danger").addClass("btn-success");
mixertest.send({ message: { request: "configure", muted: !audioenabled }});
}).removeClass('hide').show();
+ // Suspend button
+ audiosuspended = false;
+ $('#togglesuspend').click(
+ function() {
+ audiosuspended = !audiosuspended;
+ if(!audiosuspended)
+ $('#togglesuspend').html("Suspend").removeClass("btn-success").addClass("btn-danger");
+ else
+ $('#togglesuspend').html("Resume").removeClass("btn-danger").addClass("btn-success");
+ mixertest.send({ message: {
+ request: (audiosuspended ? "suspend" : "resume"),
+ room: myroom,
+ id: myid
+ }});
+ }).removeClass('hide').show();
// Spatial position, if enabled
$('#position').click(
function() {
diff --git a/src/ice.c b/src/ice.c
index 7d334efa26..938c09bf47 100644
--- a/src/ice.c
+++ b/src/ice.c
@@ -175,6 +175,18 @@ gboolean janus_ice_is_keepalive_conncheck_enabled(void) {
return janus_ice_keepalive_connchecks;
}
+/* How to react to ICE failures */
+static gboolean janus_ice_hangup_on_failed = FALSE;
+void janus_ice_set_hangup_on_failed_enabled(gboolean enabled) {
+ janus_ice_hangup_on_failed = enabled;
+ if(janus_ice_hangup_on_failed) {
+ JANUS_LOG(LOG_INFO, "Will hangup PeerConnections immediately on ICE failures\n");
+ }
+}
+gboolean janus_ice_is_hangup_on_failed_enabled(void) {
+ return janus_ice_hangup_on_failed;
+}
+
/* Opaque IDs set by applications are by default only passed to event handlers
* for correlation purposes, but not sent back to the user or application in
* the related Janus API responses or events, unless configured otherwise */
@@ -2143,7 +2155,15 @@ static void janus_ice_cb_component_state_changed(NiceAgent *agent, guint stream_
if(prev_state == NICE_COMPONENT_STATE_CONNECTED || prev_state == NICE_COMPONENT_STATE_READY) {
/* Failed after connected/ready means consent freshness detected something broken:
* notify the user via a Janus API event and then fire the 'failed' timer as sual */
- janus_ice_notify_ice_failed(handle);
+ janus_ice_notify_ice_failed(handle);
+ /* Check if we need to hangup right away, rather than start the grace period */
+ if(janus_ice_hangup_on_failed && pc->icefailed_detected == 0) {
+ /* We do, hangup the PeerConnection */
+ JANUS_LOG(LOG_ERR, "[%"SCNu64"] ICE failed for component %d in stream %d...\n",
+ handle->handle_id, component_id, stream_id);
+ janus_ice_webrtc_hangup(handle, "ICE failed");
+ return;
+ }
}
gboolean trickle_recv = (!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE) || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES));
gboolean answer_recv = janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER);
diff --git a/src/ice.h b/src/ice.h
index e9d121f2eb..216dcb4d61 100644
--- a/src/ice.h
+++ b/src/ice.h
@@ -168,6 +168,12 @@ void janus_ice_set_keepalive_conncheck_enabled(gboolean enabled);
/*! \brief Method to check whether connectivity checks will be used as keepalives
* @returns true if enabled, false (default) otherwise */
gboolean janus_ice_is_keepalive_conncheck_enabled(void);
+/*! \brief Method to enable/disable immediate hangups of PeerConnectionss on ICE failures.
+ * @param[in] enabled Whether the functionality should be enabled or disabled */
+void janus_ice_set_hangup_on_failed_enabled(gboolean enabled);
+/*! \brief Method to check whether ICE failures will result in immediate hangups
+ * @returns true if enabled, false (default) otherwise */
+gboolean janus_ice_is_hangup_on_failed_enabled(void);
/*! \brief Method to modify the min NACK value (i.e., the minimum time window of packets per handle to store for retransmissions)
* @param[in] mnq The new min NACK value */
void janus_set_min_nack_queue(uint16_t mnq);
diff --git a/src/janus.c b/src/janus.c
index 11378ddd36..e0132eec25 100644
--- a/src/janus.c
+++ b/src/janus.c
@@ -368,6 +368,7 @@ static json_t *janus_info(const char *transaction) {
#endif
json_object_set_new(info, "ice-consent-freshness", janus_ice_is_consent_freshness_enabled() ? json_true() : json_false());
json_object_set_new(info, "ice-keepalive-conncheck", janus_ice_is_keepalive_conncheck_enabled() ? json_true() : json_false());
+ json_object_set_new(info, "hangup-on-failed", janus_ice_is_hangup_on_failed_enabled() ? json_true() : json_false());
json_object_set_new(info, "full-trickle", janus_ice_is_full_trickle_enabled() ? json_true() : json_false());
json_object_set_new(info, "mdns-enabled", janus_ice_is_mdns_enabled() ? json_true() : json_false());
json_object_set_new(info, "min-nack-queue", json_integer(janus_get_min_nack_queue()));
@@ -5230,6 +5231,9 @@ gint main(int argc, char *argv[]) {
item = janus_config_get(config, config_nat, janus_config_type_item, "ice_keepalive_conncheck");
if(item && item->value)
janus_ice_set_keepalive_conncheck_enabled(janus_is_true(item->value));
+ item = janus_config_get(config, config_nat, janus_config_type_item, "hangup_on_failed");
+ if(item && item->value)
+ janus_ice_set_hangup_on_failed_enabled(janus_is_true(item->value));
if(janus_ice_set_turn_server(turn_server, turn_port, turn_type, turn_user, turn_pwd) < 0) {
if(!ignore_unreachable_ice_server) {
JANUS_LOG(LOG_FATAL, "Invalid TURN address %s:%u\n", turn_server, turn_port);
diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c
index ab060e49b0..45e9fdfe90 100644
--- a/src/plugins/janus_audiobridge.c
+++ b/src/plugins/janus_audiobridge.c
@@ -375,6 +375,100 @@ room-: {
{
"audiobridge" : "success",
}
+\endverbatim
+ *
+ * Another option available for administrators is suspending participants:
+ * in that case, participants are not kicked from the room (they remain
+ * in), but their contribution is not added to the mix, and they don't
+ * receive any audio from the mix either. This is a useful option to
+ * temporarily detach a participant from the room (e.g., because they'll
+ * be busy somewhere else) while still allowing them to keep the existing
+ * PeerConnection up and running, so that it can be quickly restored
+ * when they're back; since they're not part of the mix and don't receive
+ * any audio, the CPU resources to manage them are reduced as well. By
+ * default these suspended users participants will still receive events
+ * related to changes in the room (e.g., participants joining and leaving,
+ * mutes and unmutes, etc.), but these can be disabled too in case saving
+ * unnecessary signalling is desired: in that case, a suspended participant
+ * will only receive a recap of the current status when resumed.
+ * The \c suspend request must be formatted as follows:
+ *
+ *
+\verbatim
+{
+ "request" : "suspend",
+ "secret" : "",
+ "room" : ,
+ "id" : ,
+ "pause_events" :
+ "stop_record" :
+}
+\endverbatim
+ *
+ * A successful request will result in a \c success response:
+ *
+\verbatim
+{
+ "audiobridge" : "success",
+}
+\endverbatim
+ *
+ * Resuming a suspended participant means bringing them back in the audio
+ * mix, and allowing them to hear audio through the PeerConnection once more.
+ * In case events were paused, they'll be resumed and a recap will be sent.
+ * The \c resume request must be formatted as follows:
+ *
+\verbatim
+{
+ "request" : "resume",
+ "secret" : "",
+ "room" : ,
+ "id" : ,
+ "record": ,
+ "filename": ""
+}
+\endverbatim
+ *
+ * A successful request will result in a \c success response:
+ *
+\verbatim
+{
+ "audiobridge" : "success",
+}
+\endverbatim
+ *
+ * Both \c suspend and \c resume on a participant will result in a
+ * notification to the other participants in the room, which means they'll
+ * all be notified about a participant being suspended or resumed.
+ *
+ * To get a list of the available rooms (excluded those configured or
+ * created as private rooms) you can make use of the \c list request,
+ * which has to be formatted as follows:
+ *
+\verbatim
+{
+ "request" : "list"
+}
+\endverbatim
+ *
+ * A successful request will produce a list of rooms in a \c success response:
+ *
+\verbatim
+{
+ "audiobridge" : "success",
+ "rooms" : [ // Array of room objects
+ { // Room #1
+ "room" : ,
+ "description" : "",
+ "pin_required" : ,
+ "sampling_rate" : ,
+ "spatial_audio" : ,
+ "record" : ,
+ "num_participants" :
+ },
+ // Other rooms
+ ]
+}
\endverbatim
*
* To get a list of the available rooms (excluded those configured or
@@ -431,6 +525,7 @@ room-: {
"display" : "",
"setup" : ,
"muted" : ,
+ "suspended" : ,
"talking" : ,
"spatial_position" : ,
},
@@ -736,6 +831,8 @@ room-: {
"display" : "",
"token" : "",
"muted" : ,
+ "suspended" : ,
+ "pause_events" :
"codec" : "",
"bitrate" : ,
"quality" : <0-10, Opus-related complexity to use, the higher the value, the better the quality (but more CPU); optional, default is 4>,
@@ -745,7 +842,7 @@ room-: {
"secret" : "",
"audio_level_average" : "",
"audio_active_packets" : "",
- "record": ,
"filename": ""
}
\endverbatim
@@ -829,7 +926,7 @@ room-: {
"expected_loss" :
"volume" : ,
"spatial_position" : ,
- "record": ,
"filename": "",
"group" : ""
}
@@ -936,6 +1033,8 @@ room-: {
"display" : "",
"token" : "",
"muted" : ,
+ "suspended" : ,
+ "pause_events" :
"bitrate" : ,
"quality" : <0-10, Opus-related complexity to use, higher is higher quality; optional, default is 4>,
"expected_loss" : <0-20, a percentage of the expected loss (capped at 20%), only needed in case FEC is used; optional, default is 0 (FEC disabled even when negotiated) or the room default>
@@ -1202,6 +1301,8 @@ static struct janus_json_parameter join_parameters[] = {
{"token", JSON_STRING, 0},
{"group", JSON_STRING, 0},
{"muted", JANUS_JSON_BOOL, 0},
+ {"suspended", JANUS_JSON_BOOL, 0},
+ {"pause_events", JANUS_JSON_BOOL, 0},
{"codec", JSON_STRING, 0},
{"bitrate", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
{"quality", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE},
@@ -1270,6 +1371,14 @@ static struct janus_json_parameter play_file_parameters[] = {
static struct janus_json_parameter checkstop_file_parameters[] = {
{"file_id", JSON_STRING, JANUS_JSON_PARAM_REQUIRED}
};
+static struct janus_json_parameter suspend_parameters[] = {
+ {"pause_events", JANUS_JSON_BOOL, 0},
+ {"stop_record", JANUS_JSON_BOOL, 0},
+};
+static struct janus_json_parameter resume_parameters[] = {
+ {"record", JANUS_JSON_BOOL, 0},
+ {"filename", JSON_STRING, 0},
+};
/* Static configuration instance */
static janus_config *config = NULL;
@@ -1564,6 +1673,7 @@ typedef struct janus_audiobridge_participant {
int spatial_position; /* Panning of this participant in the mix */
/* RTP stuff */
JitterBuffer *jitter; /* Jitter buffer of incoming audio packets */
+ gint64 jitter_next_check; /* Timestamp to perform next jitter buffer size check */
GList *inbuf; /* Decoded audio from this participant, to feed to the mixer */
GAsyncQueue *outbuf; /* Mixed audio to send to this participant */
janus_mutex qmutex; /* Incoming queue mutex */
@@ -1599,7 +1709,11 @@ typedef struct janus_audiobridge_participant {
#endif
uint group; /* Forwarding group index, if enabled in the room */
janus_mutex rec_mutex; /* Mutex to protect the recorder from race conditions */
- volatile gint destroyed; /* Whether this room has been destroyed */
+ janus_mutex suspend_cond_mutex;
+ GCond suspend_cond;
+ volatile gint suspended; /* Whether this participant has been temporarily suspended */
+ volatile gint paused_events;/* Whether sending events to this participant has been paused because they're suspended */
+ volatile gint destroyed; /* Whether this participant has been destroyed */
janus_refcount ref; /* Reference counter for this participant */
} janus_audiobridge_participant;
@@ -1639,6 +1753,35 @@ static void janus_audiobridge_buffer_packet_destroy(janus_audiobridge_buffer_pac
g_free(pkt);
}
+static void janus_audiobridge_participant_clear_jitter_buffer(janus_audiobridge_participant *participant) {
+ if(participant->jitter) {
+ jitter_buffer_reset(participant->jitter);
+ }
+}
+
+static void janus_audiobridge_participant_clear_inbuf(janus_audiobridge_participant *participant) {
+ while(participant->inbuf) {
+ GList *first = g_list_first(participant->inbuf);
+ janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
+ participant->inbuf = g_list_delete_link(participant->inbuf, first);
+ first = NULL;
+ if(pkt == NULL)
+ continue;
+ g_free(pkt->data);
+ pkt->data = NULL;
+ g_free(pkt);
+ pkt = NULL;
+ }
+}
+
+static void janus_audiobridge_participant_clear_outbuf(janus_audiobridge_participant *participant) {
+ while(participant->outbuf && g_async_queue_length(participant->outbuf) > 0) {
+ janus_audiobridge_rtp_relay_packet *pkt = g_async_queue_pop(participant->outbuf);
+ g_free(pkt->data);
+ g_free(pkt);
+ }
+}
+
static void janus_audiobridge_participant_destroy(janus_audiobridge_participant *participant) {
if(!participant)
return;
@@ -1666,20 +1809,9 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant
opus_decoder_destroy(participant->decoder);
if(participant->jitter)
jitter_buffer_destroy(participant->jitter);
- while(participant->inbuf) {
- GList *first = g_list_first(participant->inbuf);
- janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
- participant->inbuf = g_list_delete_link(participant->inbuf, first);
- if(pkt)
- g_free(pkt->data);
- g_free(pkt);
- }
+ janus_audiobridge_participant_clear_inbuf(participant);
if(participant->outbuf != NULL) {
- while(g_async_queue_length(participant->outbuf) > 0) {
- janus_audiobridge_rtp_relay_packet *pkt = g_async_queue_pop(participant->outbuf);
- g_free(pkt->data);
- g_free(pkt);
- }
+ janus_audiobridge_participant_clear_outbuf(participant);
g_async_queue_unref(participant->outbuf);
}
g_free(participant->mjr_base);
@@ -1999,6 +2131,13 @@ static int janus_audiobridge_resample(int16_t *input, int input_num, int input_r
#define DEFAULT_COMPLEXITY 4
+/* Jitter Buffer and queue-in settings */
+#define JITTER_BUFFER_MIN_PACKETS 2
+#define JITTER_BUFFER_MAX_PACKETS 40
+#define JITTER_BUFFER_CHECK_USECS 1*G_USEC_PER_SEC
+#define QUEUE_IN_MAX_PACKETS 4
+
+
/* Error codes */
#define JANUS_AUDIOBRIDGE_ERROR_UNKNOWN_ERROR 499
#define JANUS_AUDIOBRIDGE_ERROR_NO_MESSAGE 480
@@ -2767,7 +2906,7 @@ static void janus_audiobridge_notify_participants(janus_audiobridge_participant
g_hash_table_iter_init(&iter, participant->room->participants);
while(!participant->room->destroyed && g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p && p->session && (p != participant || notify_source_participant)) {
+ if(p && p->session && (p != participant || notify_source_participant) && !g_atomic_int_get(&p->paused_events)) {
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, msg, NULL);
JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret));
@@ -2848,6 +2987,8 @@ json_t *janus_audiobridge_query_session(janus_plugin_session *handle) {
json_object_set_new(rtp, "remote-ssrc", json_integer(participant->plainrtp_media.audio_ssrc_peer));
json_object_set_new(info, "plain-rtp", rtp);
}
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(info, "suspended", json_true());
}
if(session->plugin_offer)
json_object_set_new(info, "plugin-offer", json_true());
@@ -3595,19 +3736,10 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
/* Get rid of queued packets */
janus_mutex_lock(&p->qmutex);
g_atomic_int_set(&p->active, 0);
- while(p->inbuf) {
- GList *first = g_list_first(p->inbuf);
- janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
- p->inbuf = g_list_delete_link(p->inbuf, first);
- first = NULL;
- if(pkt == NULL)
- continue;
- g_free(pkt->data);
- pkt->data = NULL;
- g_free(pkt);
- pkt = NULL;
- }
+ janus_audiobridge_participant_clear_jitter_buffer(p);
+ janus_audiobridge_participant_clear_inbuf(p);
janus_mutex_unlock(&p->qmutex);
+ janus_audiobridge_participant_clear_outbuf(p);
/* Request a WebRTC hangup */
gateway->close_pc(p->session->handle);
}
@@ -4054,20 +4186,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
if(participant->muted) {
/* Clear the queued packets waiting to be handled */
janus_mutex_lock(&participant->qmutex);
- if(participant->jitter)
- jitter_buffer_reset(participant->jitter);
- while(participant->inbuf) {
- GList *first = g_list_first(participant->inbuf);
- janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
- participant->inbuf = g_list_delete_link(participant->inbuf, first);
- first = NULL;
- if(pkt == NULL)
- continue;
- g_free(pkt->data);
- pkt->data = NULL;
- g_free(pkt);
- pkt = NULL;
- }
+ janus_audiobridge_participant_clear_jitter_buffer(participant);
+ janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
}
@@ -4081,6 +4201,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(list, pl);
json_t *pub = json_object();
json_object_set_new(pub, "audiobridge", json_string("event"));
@@ -4092,6 +4214,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
+ if(g_atomic_int_get(&p->paused_events))
+ continue;
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, pub, NULL);
JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret));
@@ -4194,6 +4318,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
+ if(g_atomic_int_get(&p->paused_events))
+ continue;
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, event, NULL);
JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret));
@@ -4478,6 +4604,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
json_object_set_new(pl, "talking", p->talking ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position));
+ if(g_atomic_int_get(&p->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(list, pl);
}
janus_refcount_decrease(&audiobridge->ref);
@@ -5256,6 +5384,241 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s
json_object_set_new(response, "file_id", json_string(file_id));
goto prepare_response;
#endif
+ } else if(!strcasecmp(request_text, "suspend") || !strcasecmp(request_text, "resume")) {
+ gboolean suspend = !strcasecmp(request_text, "suspend");
+ JANUS_VALIDATE_JSON_OBJECT(root, secret_parameters,
+ error_code, error_cause, TRUE,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT);
+ if(error_code != 0)
+ goto prepare_response;
+ if(!string_ids) {
+ JANUS_VALIDATE_JSON_OBJECT(root, room_parameters,
+ error_code, error_cause, TRUE,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT);
+ } else {
+ JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters,
+ error_code, error_cause, TRUE,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT);
+ }
+ if(error_code != 0)
+ goto prepare_response;
+ if(!string_ids) {
+ JANUS_VALIDATE_JSON_OBJECT(root, id_parameters,
+ error_code, error_cause, TRUE,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT);
+ } else {
+ JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters,
+ error_code, error_cause, TRUE,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT);
+ }
+ if(error_code != 0)
+ goto prepare_response;
+ json_t *room = json_object_get(root, "room");
+ json_t *id = json_object_get(root, "id");
+ guint64 room_id = 0;
+ char room_id_num[30], *room_id_str = NULL;
+ if(!string_ids) {
+ room_id = json_integer_value(room);
+ g_snprintf(room_id_num, sizeof(room_id_num), "%"SCNu64, room_id);
+ room_id_str = room_id_num;
+ } else {
+ room_id_str = (char *)json_string_value(room);
+ }
+ janus_mutex_lock(&rooms_mutex);
+ janus_audiobridge_room *audiobridge = g_hash_table_lookup(rooms,
+ string_ids ? (gpointer)room_id_str : (gpointer)&room_id);
+ if(audiobridge == NULL) {
+ janus_mutex_unlock(&rooms_mutex);
+ error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_ROOM;
+ JANUS_LOG(LOG_ERR, "No such room (%s)\n", room_id_str);
+ g_snprintf(error_cause, 512, "No such room (%s)", room_id_str);
+ goto prepare_response;
+ }
+ janus_refcount_increase(&audiobridge->ref);
+ janus_mutex_lock(&audiobridge->mutex);
+ janus_mutex_unlock(&rooms_mutex);
+ guint64 user_id = 0;
+ char user_id_num[30], *user_id_str = NULL;
+ if(!string_ids) {
+ user_id = json_integer_value(id);
+ g_snprintf(user_id_num, sizeof(user_id_num), "%"SCNu64, user_id);
+ user_id_str = user_id_num;
+ } else {
+ user_id_str = (char *)json_string_value(id);
+ }
+ janus_audiobridge_participant *participant = g_hash_table_lookup(audiobridge->participants,
+ string_ids ? (gpointer)user_id_str : (gpointer)&user_id);
+ if(participant == NULL) {
+ janus_mutex_unlock(&audiobridge->mutex);
+ janus_refcount_decrease(&audiobridge->ref);
+ JANUS_LOG(LOG_ERR, "No such user %s in room %s\n", user_id_str, room_id_str);
+ error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_USER;
+ g_snprintf(error_cause, 512, "No such user %s in room %s", user_id_str, room_id_str);
+ goto prepare_response;
+ }
+ janus_refcount_increase(&participant->ref);
+ janus_mutex_unlock(&audiobridge->mutex);
+ /* A secret may be required for this action */
+ if(session->participant != participant) {
+ JANUS_CHECK_SECRET(audiobridge->room_secret, root, "secret", error_code, error_cause,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_UNAUTHORIZED);
+ if(error_code != 0) {
+ janus_refcount_decrease(&participant->ref);
+ janus_refcount_decrease(&audiobridge->ref);
+ goto prepare_response;
+ }
+ }
+ /* Change the suspend status of this participant */
+ gboolean notify_participant = FALSE, recap = FALSE;
+ if(suspend) {
+ /* Validate the request */
+ JANUS_VALIDATE_JSON_OBJECT(root, suspend_parameters,
+ error_code, error_cause, TRUE,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT);
+ if(error_code != 0) {
+ janus_refcount_decrease(&participant->ref);
+ janus_refcount_decrease(&audiobridge->ref);
+ goto prepare_response;
+ }
+ /* Suspend this participant */
+ janus_mutex_lock(&participant->suspend_cond_mutex);
+ if(g_atomic_int_compare_and_exchange(&participant->suspended, 0, 1)) {
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
+ json_t *pauseevs = json_object_get(root, "pause_events");
+ if(pauseevs && json_is_true(pauseevs))
+ g_atomic_int_set(&participant->paused_events, 1);
+ notify_participant = TRUE;
+ /* Participant is now suspended, so clear the queued packets waiting to be handled */
+ janus_mutex_lock(&participant->qmutex);
+ janus_audiobridge_participant_clear_jitter_buffer(participant);
+ janus_audiobridge_participant_clear_inbuf(participant);
+ janus_mutex_unlock(&participant->qmutex);
+ janus_audiobridge_participant_clear_outbuf(participant);
+ /* Should we close the recording? */
+ json_t *stoprec = json_object_get(root, "stop_record");
+ if(stoprec && json_is_true(stoprec)) {
+ /* Stop recording (ignore if not recording) */
+ janus_mutex_lock(&participant->rec_mutex);
+ janus_audiobridge_recorder_close(participant);
+ participant->mjr_active = FALSE;
+ janus_mutex_unlock(&participant->rec_mutex);
+ }
+ } else {
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
+ }
+ } else {
+ /* Validate the request */
+ JANUS_VALIDATE_JSON_OBJECT(root, resume_parameters,
+ error_code, error_cause, TRUE,
+ JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT);
+ if(error_code != 0) {
+ janus_refcount_decrease(&participant->ref);
+ janus_refcount_decrease(&audiobridge->ref);
+ goto prepare_response;
+ }
+ /* Resume this participant */
+ janus_mutex_lock(&participant->suspend_cond_mutex);
+ if(g_atomic_int_compare_and_exchange(&participant->suspended, 1, 0)) {
+ g_cond_signal(&participant->suspend_cond);
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
+ notify_participant = TRUE;
+ /* Should we create a new recording? */
+ json_t *record = json_object_get(root, "record");
+ json_t *recfile = json_object_get(root, "filename");
+ janus_mutex_lock(&participant->rec_mutex);
+ if(record && json_is_true(record)) {
+ /* Start recording (ignore if recording already) */
+ if(participant->arc != NULL) {
+ JANUS_LOG(LOG_WARN, "Already recording participant's audio (room %s, user %s)\n",
+ participant->room->room_id_str, participant->user_id_str);
+ } else {
+ JANUS_LOG(LOG_INFO, "Starting recording of participant's audio (room %s, user %s)\n",
+ participant->room->room_id_str, participant->user_id_str);
+ const char *recording_base = json_string_value(recfile);
+ if(recording_base) {
+ g_free(participant->mjr_base);
+ participant->mjr_base = g_strdup(recording_base);
+ }
+ janus_audiobridge_recorder_create(participant);
+ participant->mjr_active = TRUE;
+ }
+ }
+ janus_mutex_unlock(&participant->rec_mutex);
+ /* Should we send a full updated state of the room to the participant? */
+ if(g_atomic_int_compare_and_exchange(&participant->paused_events, 1, 0)) {
+ /* Return a list of all available participants for the resumed participant now */
+ recap = TRUE;
+ json_t *list = json_array();
+ GHashTableIter iter;
+ gpointer value;
+ g_hash_table_iter_init(&iter, audiobridge->participants);
+ while(g_hash_table_iter_next(&iter, NULL, &value)) {
+ janus_audiobridge_participant *p = value;
+ if(p == participant) {
+ continue;
+ }
+ json_t *pl = json_object();
+ json_object_set_new(pl, "id", string_ids ? json_string(p->user_id_str) : json_integer(p->user_id));
+ if(p->display)
+ json_object_set_new(pl, "display", json_string(p->display));
+ json_object_set_new(pl, "setup", g_atomic_int_get(&p->session->started) ? json_true() : json_false());
+ json_object_set_new(pl, "muted", p->muted ? json_true() : json_false());
+ if(p->extmap_id > 0)
+ json_object_set_new(pl, "talking", p->talking ? json_true() : json_false());
+ if(audiobridge->spatial_audio)
+ json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position));
+ if(g_atomic_int_get(&p->suspended))
+ json_object_set_new(pl, "suspended", json_true());
+ json_array_append_new(list, pl);
+ }
+ json_t *event = json_object();
+ json_object_set_new(event, "audiobridge", json_string("event"));
+ json_object_set_new(event, "room", string_ids ? json_string(room_id_str) : json_integer(room_id));
+ json_object_set_new(event, "resumed", string_ids ? json_string(user_id_str) : json_integer(user_id));
+ json_object_set_new(event, "participants", list);
+ int ret = gateway->push_event(participant->session->handle, &janus_audiobridge_plugin, NULL, event, NULL);
+ JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret));
+ json_decref(event);
+ }
+ } else {
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
+ }
+ }
+ if(notify_participant) {
+ /* Notify all participants about the change */
+ json_t *event = json_object();
+ json_object_set_new(event, "audiobridge", json_string("event"));
+ json_object_set_new(event, "room", string_ids ? json_string(room_id_str) : json_integer(room_id));
+ json_object_set_new(event, suspend ? "suspended" : "resumed",
+ string_ids ? json_string(user_id_str) : json_integer(user_id));
+ GHashTableIter iter;
+ gpointer value;
+ g_hash_table_iter_init(&iter, audiobridge->participants);
+ while(g_hash_table_iter_next(&iter, NULL, &value)) {
+ janus_audiobridge_participant *p = value;
+ if((p == participant && recap) || (p != participant && g_atomic_int_get(&p->paused_events)))
+ continue;
+ JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
+ int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, event, NULL);
+ JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret));
+ }
+ json_decref(event);
+ /* Also notify event handlers */
+ if(notify_events && gateway->events_is_enabled()) {
+ json_t *info = json_object();
+ json_object_set_new(info, "event", json_string(suspend ? "suspended" : "resumed"));
+ json_object_set_new(info, "room", string_ids ? json_string(room_id_str) : json_integer(room_id));
+ json_object_set_new(info, "id", string_ids ? json_string(user_id_str) : json_integer(user_id));
+ gateway->notify_event(&janus_audiobridge_plugin, session ? session->handle : NULL, info);
+ }
+ }
+ /* Prepare response */
+ response = json_object();
+ json_object_set_new(response, "audiobridge", json_string("success"));
+ /* Done */
+ janus_refcount_decrease(&participant->ref);
+ janus_refcount_decrease(&audiobridge->ref);
+ goto prepare_response;
} else {
/* Not a request we recognize, don't do anything */
return NULL;
@@ -5461,6 +5824,8 @@ void janus_audiobridge_setup_media(janus_plugin_session *handle) {
json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(list, pl);
json_t *pub = json_object();
json_object_set_new(pub, "audiobridge", json_string("event"));
@@ -5472,7 +5837,7 @@ void janus_audiobridge_setup_media(janus_plugin_session *handle) {
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p == participant) {
+ if(p == participant || g_atomic_int_get(&p->paused_events)) {
continue; /* Skip the new participant itself */
}
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
@@ -5492,7 +5857,7 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r
if(!session || g_atomic_int_get(&session->destroyed) || !session->participant)
return;
janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant;
- if(!g_atomic_int_get(&participant->active) || participant->muted ||
+ if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) ||
(participant->codec == JANUS_AUDIOCODEC_OPUS && !participant->decoder) || !participant->room)
return;
if(participant->room && participant->room->muted && !participant->admin)
@@ -5596,6 +5961,21 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r
if(participant->jitter) {
janus_audiobridge_buffer_packet *pkt = janus_audiobridge_buffer_packet_create(buf, len, silence);
janus_mutex_lock(&participant->qmutex);
+ /* Limit the size of the jitter buffer */
+ gint64 now = janus_get_monotonic_time();
+ if(participant->jitter_next_check == 0) {
+ /* Schedule next check */
+ participant->jitter_next_check = now + JITTER_BUFFER_CHECK_USECS;
+ } else if(now >= participant->jitter_next_check) {
+ spx_int32_t count = 0;
+ jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_GET_AVALIABLE_COUNT, &count);
+ if(count > JITTER_BUFFER_MAX_PACKETS) {
+ JANUS_LOG(LOG_WARN, "Jitter buffer contains too many packets, clearing now (count=%d)\n", count);
+ janus_audiobridge_participant_clear_jitter_buffer(participant);
+ }
+ /* Schedule next check */
+ participant->jitter_next_check = now + JITTER_BUFFER_CHECK_USECS;
+ }
JitterBufferPacket jbp = {0};
jbp.data = (char *)pkt;
jbp.len = 0;
@@ -5757,21 +6137,14 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle
g_free(participant->mjr_base);
participant->mjr_base = NULL;
/* Get rid of queued packets */
- if(participant->jitter)
- jitter_buffer_reset(participant->jitter);
- while(participant->inbuf) {
- GList *first = g_list_first(participant->inbuf);
- janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
- participant->inbuf = g_list_delete_link(participant->inbuf, first);
- first = NULL;
- if(pkt == NULL)
- continue;
- g_free(pkt->data);
- pkt->data = NULL;
- g_free(pkt);
- pkt = NULL;
- }
+ janus_audiobridge_participant_clear_jitter_buffer(participant);
+ janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
+ janus_audiobridge_participant_clear_outbuf(participant);
+ janus_mutex_lock(&participant->suspend_cond_mutex);
+ if(g_atomic_int_compare_and_exchange(&participant->suspended, 1, 0))
+ g_cond_signal(&participant->suspend_cond);
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
if(audiobridge != NULL) {
janus_mutex_unlock(&audiobridge->mutex);
if(removed) {
@@ -6005,6 +6378,7 @@ static void *janus_audiobridge_handler(void *data) {
json_t *display = json_object_get(root, "display");
const char *display_text = display ? json_string_value(display) : NULL;
json_t *muted = json_object_get(root, "muted");
+ json_t *suspended = json_object_get(root, "suspended");
json_t *gain = json_object_get(root, "volume");
json_t *spatial = json_object_get(root, "spatial_position");
json_t *bitrate = json_object_get(root, "bitrate");
@@ -6119,6 +6493,8 @@ static void *janus_audiobridge_handler(void *data) {
participant->display = NULL;
participant->jitter = jitter_buffer_init(participant->codec == JANUS_AUDIOCODEC_OPUS ? 960 : 160);
jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_SET_DESTROY_CALLBACK, &janus_audiobridge_buffer_packet_destroy);
+ spx_int32_t min_buffer_size = JITTER_BUFFER_MIN_PACKETS;
+ jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_SET_MARGIN, &min_buffer_size);
participant->inbuf = NULL;
participant->outbuf = NULL;
participant->encoder = NULL;
@@ -6143,6 +6519,14 @@ static void *janus_audiobridge_handler(void *data) {
participant->admin = admin;
participant->display = display_text ? g_strdup(display_text) : NULL;
participant->muted = muted ? json_is_true(muted) : FALSE; /* By default, everyone's unmuted when joining */
+ if(suspended && json_is_true(suspended)) {
+ janus_mutex_lock(&participant->suspend_cond_mutex);
+ g_atomic_int_set(&participant->suspended, 1);
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
+ json_t *pauseevs = json_object_get(root, "pause_events");
+ if(pauseevs && json_is_true(pauseevs))
+ g_atomic_int_set(&participant->paused_events, 1);
+ }
participant->volume_gain = volume;
participant->opus_complexity = complexity;
participant->opus_bitrate = opus_bitrate;
@@ -6387,6 +6771,8 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(newuserlist, pl);
json_object_set_new(newuser, "participants", newuserlist);
GHashTableIter iter;
@@ -6394,7 +6780,7 @@ static void *janus_audiobridge_handler(void *data) {
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p == participant) {
+ if(p == participant || g_atomic_int_get(&p->paused_events)) {
continue;
}
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
@@ -6407,7 +6793,7 @@ static void *janus_audiobridge_handler(void *data) {
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p == participant) {
+ if(p == participant || g_atomic_int_get(&p->paused_events)) {
continue;
}
json_t *pl = json_object();
@@ -6420,6 +6806,8 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(pl, "talking", p->talking ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(list, pl);
}
janus_mutex_unlock(&audiobridge->mutex);
@@ -6449,6 +6837,8 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(info, "muted", participant->muted ? json_true() : json_false());
if(participant->stereo)
json_object_set_new(info, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(info, "suspended", json_true());
gateway->notify_event(&janus_audiobridge_plugin, session->handle, info);
}
if(user_id_allocated)
@@ -6541,21 +6931,8 @@ static void *janus_audiobridge_handler(void *data) {
if(participant->muted) {
/* Clear the queued packets waiting to be handled */
janus_mutex_lock(&participant->qmutex);
- if(participant->jitter)
- jitter_buffer_reset(participant->jitter);
- while(participant->inbuf) {
- GList *first = g_list_first(participant->inbuf);
- janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
- participant->inbuf = g_list_delete_link(participant->inbuf, first);
- first = NULL;
- if(pkt == NULL)
- continue;
- if(pkt->data)
- g_free(pkt->data);
- pkt->data = NULL;
- g_free(pkt);
- pkt = NULL;
- }
+ janus_audiobridge_participant_clear_jitter_buffer(participant);
+ janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
}
}
@@ -6588,6 +6965,8 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(list, pl);
json_t *pub = json_object();
json_object_set_new(pub, "audiobridge", json_string("event"));
@@ -6599,7 +6978,7 @@ static void *janus_audiobridge_handler(void *data) {
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p == participant) {
+ if(p == participant || g_atomic_int_get(&p->paused_events)) {
continue; /* Skip the new participant itself */
}
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n",
@@ -6661,6 +7040,8 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(info, "quality", json_integer(participant->opus_complexity));
if(participant->stereo)
json_object_set_new(info, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(info, "suspended", json_true());
gateway->notify_event(&janus_audiobridge_plugin, session->handle, info);
}
/* If we need to generate an offer ourselves, do that */
@@ -6792,6 +7173,7 @@ static void *janus_audiobridge_handler(void *data) {
json_t *display = json_object_get(root, "display");
const char *display_text = display ? json_string_value(display) : NULL;
json_t *muted = json_object_get(root, "muted");
+ json_t *suspended = json_object_get(root, "suspended");
json_t *gain = json_object_get(root, "volume");
json_t *spatial = json_object_get(root, "spatial_position");
json_t *bitrate = json_object_get(root, "bitrate");
@@ -6981,7 +7363,7 @@ static void *janus_audiobridge_handler(void *data) {
g_hash_table_iter_init(&iter, old_audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p == participant) {
+ if(p == participant || g_atomic_int_get(&p->paused_events)) {
continue; /* Skip the new participant itself */
}
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
@@ -7017,6 +7399,14 @@ static void *janus_audiobridge_handler(void *data) {
participant->display = display_text ? g_strdup(display_text) : NULL;
participant->room = audiobridge;
participant->muted = muted ? json_is_true(muted) : FALSE; /* When switching to a new room, you're unmuted by default */
+ if(suspended && json_is_true(suspended)) {
+ janus_mutex_lock(&participant->suspend_cond_mutex);
+ g_atomic_int_set(&participant->suspended, 1);
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
+ json_t *pauseevs = json_object_get(root, "pause_events");
+ if(pauseevs && json_is_true(pauseevs))
+ g_atomic_int_set(&participant->paused_events, 1);
+ }
participant->audio_active_packets = 0;
participant->audio_dBov_sum = 0;
participant->talking = FALSE;
@@ -7057,12 +7447,14 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(newuserlist, pl);
json_object_set_new(newuser, "participants", newuserlist);
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p == participant) {
+ if(p == participant || g_atomic_int_get(&p->paused_events)) {
continue;
}
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
@@ -7088,6 +7480,8 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(pl, "talking", p->talking ? json_true() : json_false());
if(audiobridge->spatial_audio)
json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(pl, "suspended", json_true());
json_array_append_new(list, pl);
}
event = json_object();
@@ -7108,6 +7502,8 @@ static void *janus_audiobridge_handler(void *data) {
json_object_set_new(info, "muted", participant->muted ? json_true() : json_false());
if(participant->stereo)
json_object_set_new(info, "spatial_position", json_integer(participant->spatial_position));
+ if(g_atomic_int_get(&participant->suspended))
+ json_object_set_new(info, "suspended", json_true());
gateway->notify_event(&janus_audiobridge_plugin, session->handle, info);
}
if(user_id_allocated)
@@ -7148,7 +7544,7 @@ static void *janus_audiobridge_handler(void *data) {
g_hash_table_iter_init(&iter, audiobridge->participants);
while(g_hash_table_iter_next(&iter, NULL, &value)) {
janus_audiobridge_participant *p = value;
- if(p == participant) {
+ if(p == participant || g_atomic_int_get(&p->paused_events)) {
continue; /* Skip the new participant itself */
}
JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??");
@@ -7164,21 +7560,10 @@ static void *janus_audiobridge_handler(void *data) {
/* Get rid of queued packets */
janus_mutex_lock(&participant->qmutex);
g_atomic_int_set(&participant->active, 0);
- if(participant->jitter)
- jitter_buffer_reset(participant->jitter);
- while(participant->inbuf) {
- GList *first = g_list_first(participant->inbuf);
- janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data;
- participant->inbuf = g_list_delete_link(participant->inbuf, first);
- first = NULL;
- if(pkt == NULL)
- continue;
- g_free(pkt->data);
- pkt->data = NULL;
- g_free(pkt);
- pkt = NULL;
- }
+ janus_audiobridge_participant_clear_jitter_buffer(participant);
+ janus_audiobridge_participant_clear_inbuf(participant);
janus_mutex_unlock(&participant->qmutex);
+ janus_audiobridge_participant_clear_outbuf(participant);
/* Stop recording, if we were */
janus_mutex_lock(&participant->rec_mutex);
janus_audiobridge_recorder_close(participant);
@@ -7730,7 +8115,8 @@ static void *janus_audiobridge_mixer_thread(void *data) {
while(ps) {
janus_audiobridge_participant *p = (janus_audiobridge_participant *)ps->data;
janus_mutex_lock(&p->qmutex);
- if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) || !g_atomic_int_get(&p->active) || p->muted || !p->inbuf) {
+ if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) ||
+ !g_atomic_int_get(&p->active) || p->muted || g_atomic_int_get(&p->suspended) || !p->inbuf) {
janus_mutex_unlock(&p->qmutex);
ps = ps->next;
continue;
@@ -7974,7 +8360,8 @@ static void *janus_audiobridge_mixer_thread(void *data) {
ps = participants_list;
while(ps) {
janus_audiobridge_participant *p = (janus_audiobridge_participant *)ps->data;
- if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started)) {
+ if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) ||
+ g_atomic_int_get(&p->suspended)) {
janus_refcount_decrease(&p->ref);
ps = ps->next;
continue;
@@ -8229,6 +8616,16 @@ static void *janus_audiobridge_participant_thread(void *data) {
/* Start working: check both the incoming queue (to decode and queue) and the outgoing one (to encode and send) */
while(!g_atomic_int_get(&stopping) && g_atomic_int_get(&session->destroyed) == 0) {
+ janus_mutex_lock(&participant->suspend_cond_mutex);
+ while(g_atomic_int_get(&participant->suspended)) {
+ g_cond_wait(&participant->suspend_cond, &participant->suspend_cond_mutex);
+ before = janus_get_monotonic_time();
+ participant->context.seq_reset = TRUE;
+ first = TRUE;
+ /* Clear the output queue since it might contain old packets and break RTP sequence */
+ janus_audiobridge_participant_clear_outbuf(participant);
+ }
+ janus_mutex_unlock(&participant->suspend_cond_mutex);
/* Start with packets to decode and queue for the mixer */
now = janus_get_monotonic_time();
janus_mutex_lock(&participant->qmutex);
@@ -8243,16 +8640,6 @@ static void *janus_audiobridge_participant_thread(void *data) {
bpkt = (janus_audiobridge_buffer_packet *)jbp.data;
janus_mutex_unlock(&participant->qmutex);
locked = FALSE;
- rtp = (janus_rtp_header *)bpkt->buffer;
- /* If this is Opus, check if there's a packet gap we should fix with FEC */
- use_fec = FALSE;
- if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) {
- if(ntohs(rtp->seq_number) == participant->expected_seq + 1) {
- /* Lost a packet here? Use FEC to recover */
- use_fec = TRUE;
- }
- }
- first = FALSE;
if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) {
/* This means we're cleaning up, so don't try to decode */
janus_audiobridge_buffer_packet_destroy(bpkt);
@@ -8268,6 +8655,16 @@ static void *janus_audiobridge_participant_thread(void *data) {
janus_audiobridge_buffer_packet_destroy(bpkt);
break;
}
+ rtp = (janus_rtp_header *)bpkt->buffer;
+ /* If this is Opus, check if there's a packet gap we should fix with FEC */
+ use_fec = FALSE;
+ if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) {
+ if(ntohs(rtp->seq_number) == (participant->expected_seq + 1)) {
+ /* Lost a packet here? Use FEC to recover */
+ use_fec = TRUE;
+ }
+ }
+ first = FALSE;
if(use_fec) {
/* There was a gap, try to get decode from redundant info first */
pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet));
@@ -8339,6 +8736,12 @@ static void *janus_audiobridge_participant_thread(void *data) {
/* Queue the decoded packet for the mixer */
janus_mutex_lock(&participant->qmutex);
locked = TRUE;
+ /* Do not let queue-in grow too much */
+ guint count = g_list_length(participant->inbuf);
+ if(count > QUEUE_IN_MAX_PACKETS) {
+ JANUS_LOG(LOG_WARN, "Participant queue-in contains too many packets, clearing now (count=%u)\n", count);
+ janus_audiobridge_participant_clear_inbuf(participant);
+ }
participant->inbuf = g_list_append(participant->inbuf, pkt);
}
}