From a85a73469ed04fbbe10e0a6d139774c8289c21a5 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Tue, 5 Dec 2023 10:33:06 +0100 Subject: [PATCH] Added suspend/resume participant API to AudioBridge (1.x) (#3301) --- conf/janus.jcfg.sample.in | 8 + html/audiobridgetest.html | 8 +- html/audiobridgetest.js | 57 ++- src/ice.c | 22 +- src/ice.h | 6 + src/janus.c | 4 + src/plugins/janus_audiobridge.c | 615 ++++++++++++++++++++++++++------ 7 files changed, 604 insertions(+), 116 deletions(-) diff --git a/conf/janus.jcfg.sample.in b/conf/janus.jcfg.sample.in index eb7c5bed70..02f3d91bc9 100644 --- a/conf/janus.jcfg.sample.in +++ b/conf/janus.jcfg.sample.in @@ -294,6 +294,14 @@ nat: { #ice_lite = true #ice_tcp = true + # By default, Janus implements a grace period when detecting ICE + # failures in PeerConnections, to give time to applications to react + # to that, e.g., by enforcing an ICE restart. If you want an ICE + # failure to result in the PeerConnection being closed right away + # (e.g., with the help of consent freshness) then you can do that + # by uncommenting the following property and set it to true + #hangup_on_failed = true + # By default Janus tries to resolve mDNS (.local) candidates: even # though this is now done asynchronously and shouldn't keep the API # busy, even in case mDNS resolution takes a long time to timeout, diff --git a/html/audiobridgetest.html b/html/audiobridgetest.html index 90abc151e3..d2800e6c70 100644 --- a/html/audiobridgetest.html +++ b/html/audiobridgetest.html @@ -83,8 +83,12 @@

Demo details

Participants - -

+
+ + + +
+
    diff --git a/html/audiobridgetest.js b/html/audiobridgetest.js index 9d9a8d95d5..2d07788bdf 100644 --- a/html/audiobridgetest.js +++ b/html/audiobridgetest.js @@ -24,7 +24,7 @@ if(getQueryStringValue("group") !== "") var myusername = null; var myid = null; var webrtcUp = false; -var audioenabled = false; +var audioenabled = false, audiosuspended = false; $(document).ready(function() { @@ -146,6 +146,7 @@ $(document).ready(function() { let display = escapeXmlTags(list[f]["display"]); let setup = list[f]["setup"]; let muted = list[f]["muted"]; + let suspended = list[f]["suspended"]; let spatial = list[f]["spatial_position"]; Janus.debug(" >> [" + id + "] " + display + " (setup=" + setup + ", muted=" + muted + ")"); if($('#rp' + id).length === 0) { @@ -156,8 +157,9 @@ $(document).ready(function() { $('#list').append('
  • ' + slider + display + - ' ' + - '
  • '); + ' ' + + ' ' + + ' '); if(spatial !== null && spatial !== undefined) { $('#sp' + id).slider({ min: 0, max: 100, step: 1, value: 50, handle: 'triangle', enabled: false }); $('#position').removeClass('hide').show(); @@ -172,6 +174,10 @@ $(document).ready(function() { $('#rp' + id + ' > i.absetup').hide(); else $('#rp' + id + ' > i.absetup').removeClass('hide').show(); + if(suspended === true) + $('#rp' + id + ' > i.absusp').removeClass('hide').show(); + else + $('#rp' + id + ' > i.absusp').hide(); if(spatial !== null && spatial !== undefined) $('#sp' + id).slider('setValue', spatial); } @@ -190,6 +196,7 @@ $(document).ready(function() { let display = escapeXmlTags(list[f]["display"]); let setup = list[f]["setup"]; let muted = list[f]["muted"]; + let suspended = list[f]["suspended"]; let spatial = list[f]["spatial_position"]; Janus.debug(" >> [" + id + "] " + display + " (setup=" + setup + ", muted=" + muted + ")"); if($('#rp' + id).length === 0) { @@ -200,8 +207,9 @@ $(document).ready(function() { $('#list').append('
  • ' + slider + display + - ' ' + - '
  • '); + ' ' + + ' ' + + ' '); if(spatial !== null && spatial !== undefined) { $('#sp' + id).slider({ min: 0, max: 100, step: 1, value: 50, handle: 'triangle', enabled: false }); $('#position').removeClass('hide').show(); @@ -216,6 +224,10 @@ $(document).ready(function() { $('#rp' + id + ' > i.absetup').hide(); else $('#rp' + id + ' > i.absetup').removeClass('hide').show(); + if(suspended === true) + $('#rp' + id + ' > i.absusp').removeClass('hide').show(); + else + $('#rp' + id + ' > i.absusp').hide(); if(spatial !== null && spatial !== undefined) $('#sp' + id).slider('setValue', spatial); } @@ -228,6 +240,10 @@ $(document).ready(function() { }); } else if(event === "event") { if(msg["participants"]) { + if(msg["resumed"]) { + // This is a full recap after a suspend: clear the list of participants + $('#list').empty(); + } let list = msg["participants"]; Janus.debug("Got a list of participants:", list); for(let f in list) { @@ -235,6 +251,7 @@ $(document).ready(function() { let display = escapeXmlTags(list[f]["display"]); let setup = list[f]["setup"]; let muted = list[f]["muted"]; + let suspended = list[f]["suspended"]; let spatial = list[f]["spatial_position"]; Janus.debug(" >> [" + id + "] " + display + " (setup=" + setup + ", muted=" + muted + ")"); if($('#rp' + id).length === 0) { @@ -245,8 +262,9 @@ $(document).ready(function() { $('#list').append('
  • ' + slider + display + - ' ' + - '
  • '); + ' ' + + ' ' + + ' '); if(spatial !== null && spatial !== undefined) { $('#sp' + id).slider({ min: 0, max: 100, step: 1, value: 50, handle: 'triangle', enabled: false }); $('#position').removeClass('hide').show(); @@ -261,9 +279,19 @@ $(document).ready(function() { $('#rp' + id + ' > i.absetup').hide(); else $('#rp' + id + ' > i.absetup').removeClass('hide').show(); + if(suspended === true) + $('#rp' + id + ' > i.absusp').removeClass('hide').show(); + else + $('#rp' + id + ' > i.absusp').hide(); if(spatial !== null && spatial !== undefined) $('#sp' + id).slider('setValue', spatial); } + } else if(msg["suspended"]) { + let id = msg["suspended"]; + $('#rp' + id + ' > i.absusp').removeClass('hide').show(); + } else if(msg["resumed"]) { + let id = msg["resumed"]; + $('#rp' + id + ' > i.absusp').hide(); } else if(msg["error"]) { if(msg["error_code"] === 485) { // This is a "no such room" error: give a more meaningful description @@ -330,6 +358,21 @@ $(document).ready(function() { $('#toggleaudio').html("Unmute").removeClass("btn-danger").addClass("btn-success"); mixertest.send({ message: { request: "configure", muted: !audioenabled }}); }).removeClass('hide').show(); + // Suspend button + audiosuspended = false; + $('#togglesuspend').click( + function() { + audiosuspended = !audiosuspended; + if(!audiosuspended) + $('#togglesuspend').html("Suspend").removeClass("btn-success").addClass("btn-danger"); + else + $('#togglesuspend').html("Resume").removeClass("btn-danger").addClass("btn-success"); + mixertest.send({ message: { + request: (audiosuspended ? "suspend" : "resume"), + room: myroom, + id: myid + }}); + }).removeClass('hide').show(); // Spatial position, if enabled $('#position').click( function() { diff --git a/src/ice.c b/src/ice.c index 7d334efa26..938c09bf47 100644 --- a/src/ice.c +++ b/src/ice.c @@ -175,6 +175,18 @@ gboolean janus_ice_is_keepalive_conncheck_enabled(void) { return janus_ice_keepalive_connchecks; } +/* How to react to ICE failures */ +static gboolean janus_ice_hangup_on_failed = FALSE; +void janus_ice_set_hangup_on_failed_enabled(gboolean enabled) { + janus_ice_hangup_on_failed = enabled; + if(janus_ice_hangup_on_failed) { + JANUS_LOG(LOG_INFO, "Will hangup PeerConnections immediately on ICE failures\n"); + } +} +gboolean janus_ice_is_hangup_on_failed_enabled(void) { + return janus_ice_hangup_on_failed; +} + /* Opaque IDs set by applications are by default only passed to event handlers * for correlation purposes, but not sent back to the user or application in * the related Janus API responses or events, unless configured otherwise */ @@ -2143,7 +2155,15 @@ static void janus_ice_cb_component_state_changed(NiceAgent *agent, guint stream_ if(prev_state == NICE_COMPONENT_STATE_CONNECTED || prev_state == NICE_COMPONENT_STATE_READY) { /* Failed after connected/ready means consent freshness detected something broken: * notify the user via a Janus API event and then fire the 'failed' timer as sual */ - janus_ice_notify_ice_failed(handle); + janus_ice_notify_ice_failed(handle); + /* Check if we need to hangup right away, rather than start the grace period */ + if(janus_ice_hangup_on_failed && pc->icefailed_detected == 0) { + /* We do, hangup the PeerConnection */ + JANUS_LOG(LOG_ERR, "[%"SCNu64"] ICE failed for component %d in stream %d...\n", + handle->handle_id, component_id, stream_id); + janus_ice_webrtc_hangup(handle, "ICE failed"); + return; + } } gboolean trickle_recv = (!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE) || janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ALL_TRICKLES)); gboolean answer_recv = janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER); diff --git a/src/ice.h b/src/ice.h index e9d121f2eb..216dcb4d61 100644 --- a/src/ice.h +++ b/src/ice.h @@ -168,6 +168,12 @@ void janus_ice_set_keepalive_conncheck_enabled(gboolean enabled); /*! \brief Method to check whether connectivity checks will be used as keepalives * @returns true if enabled, false (default) otherwise */ gboolean janus_ice_is_keepalive_conncheck_enabled(void); +/*! \brief Method to enable/disable immediate hangups of PeerConnectionss on ICE failures. + * @param[in] enabled Whether the functionality should be enabled or disabled */ +void janus_ice_set_hangup_on_failed_enabled(gboolean enabled); +/*! \brief Method to check whether ICE failures will result in immediate hangups + * @returns true if enabled, false (default) otherwise */ +gboolean janus_ice_is_hangup_on_failed_enabled(void); /*! \brief Method to modify the min NACK value (i.e., the minimum time window of packets per handle to store for retransmissions) * @param[in] mnq The new min NACK value */ void janus_set_min_nack_queue(uint16_t mnq); diff --git a/src/janus.c b/src/janus.c index 11378ddd36..e0132eec25 100644 --- a/src/janus.c +++ b/src/janus.c @@ -368,6 +368,7 @@ static json_t *janus_info(const char *transaction) { #endif json_object_set_new(info, "ice-consent-freshness", janus_ice_is_consent_freshness_enabled() ? json_true() : json_false()); json_object_set_new(info, "ice-keepalive-conncheck", janus_ice_is_keepalive_conncheck_enabled() ? json_true() : json_false()); + json_object_set_new(info, "hangup-on-failed", janus_ice_is_hangup_on_failed_enabled() ? json_true() : json_false()); json_object_set_new(info, "full-trickle", janus_ice_is_full_trickle_enabled() ? json_true() : json_false()); json_object_set_new(info, "mdns-enabled", janus_ice_is_mdns_enabled() ? json_true() : json_false()); json_object_set_new(info, "min-nack-queue", json_integer(janus_get_min_nack_queue())); @@ -5230,6 +5231,9 @@ gint main(int argc, char *argv[]) { item = janus_config_get(config, config_nat, janus_config_type_item, "ice_keepalive_conncheck"); if(item && item->value) janus_ice_set_keepalive_conncheck_enabled(janus_is_true(item->value)); + item = janus_config_get(config, config_nat, janus_config_type_item, "hangup_on_failed"); + if(item && item->value) + janus_ice_set_hangup_on_failed_enabled(janus_is_true(item->value)); if(janus_ice_set_turn_server(turn_server, turn_port, turn_type, turn_user, turn_pwd) < 0) { if(!ignore_unreachable_ice_server) { JANUS_LOG(LOG_FATAL, "Invalid TURN address %s:%u\n", turn_server, turn_port); diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index ab060e49b0..45e9fdfe90 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -375,6 +375,100 @@ room-: { { "audiobridge" : "success", } +\endverbatim + * + * Another option available for administrators is suspending participants: + * in that case, participants are not kicked from the room (they remain + * in), but their contribution is not added to the mix, and they don't + * receive any audio from the mix either. This is a useful option to + * temporarily detach a participant from the room (e.g., because they'll + * be busy somewhere else) while still allowing them to keep the existing + * PeerConnection up and running, so that it can be quickly restored + * when they're back; since they're not part of the mix and don't receive + * any audio, the CPU resources to manage them are reduced as well. By + * default these suspended users participants will still receive events + * related to changes in the room (e.g., participants joining and leaving, + * mutes and unmutes, etc.), but these can be disabled too in case saving + * unnecessary signalling is desired: in that case, a suspended participant + * will only receive a recap of the current status when resumed. + * The \c suspend request must be formatted as follows: + * + * +\verbatim +{ + "request" : "suspend", + "secret" : "", + "room" : , + "id" : , + "pause_events" : + "stop_record" : +} +\endverbatim + * + * A successful request will result in a \c success response: + * +\verbatim +{ + "audiobridge" : "success", +} +\endverbatim + * + * Resuming a suspended participant means bringing them back in the audio + * mix, and allowing them to hear audio through the PeerConnection once more. + * In case events were paused, they'll be resumed and a recap will be sent. + * The \c resume request must be formatted as follows: + * +\verbatim +{ + "request" : "resume", + "secret" : "", + "room" : , + "id" : , + "record": , + "filename": "" +} +\endverbatim + * + * A successful request will result in a \c success response: + * +\verbatim +{ + "audiobridge" : "success", +} +\endverbatim + * + * Both \c suspend and \c resume on a participant will result in a + * notification to the other participants in the room, which means they'll + * all be notified about a participant being suspended or resumed. + * + * To get a list of the available rooms (excluded those configured or + * created as private rooms) you can make use of the \c list request, + * which has to be formatted as follows: + * +\verbatim +{ + "request" : "list" +} +\endverbatim + * + * A successful request will produce a list of rooms in a \c success response: + * +\verbatim +{ + "audiobridge" : "success", + "rooms" : [ // Array of room objects + { // Room #1 + "room" : , + "description" : "", + "pin_required" : , + "sampling_rate" : , + "spatial_audio" : , + "record" : , + "num_participants" : + }, + // Other rooms + ] +} \endverbatim * * To get a list of the available rooms (excluded those configured or @@ -431,6 +525,7 @@ room-: { "display" : "", "setup" : , "muted" : , + "suspended" : , "talking" : , "spatial_position" : , }, @@ -736,6 +831,8 @@ room-: { "display" : "", "token" : "", "muted" : , + "suspended" : , + "pause_events" : "codec" : "", "bitrate" : , "quality" : <0-10, Opus-related complexity to use, the higher the value, the better the quality (but more CPU); optional, default is 4>, @@ -745,7 +842,7 @@ room-: { "secret" : "", "audio_level_average" : "", "audio_active_packets" : "", - "record": , "filename": "" } \endverbatim @@ -829,7 +926,7 @@ room-: { "expected_loss" : "volume" : , "spatial_position" : , - "record": , "filename": "", "group" : "" } @@ -936,6 +1033,8 @@ room-: { "display" : "", "token" : "", "muted" : , + "suspended" : , + "pause_events" : "bitrate" : , "quality" : <0-10, Opus-related complexity to use, higher is higher quality; optional, default is 4>, "expected_loss" : <0-20, a percentage of the expected loss (capped at 20%), only needed in case FEC is used; optional, default is 0 (FEC disabled even when negotiated) or the room default> @@ -1202,6 +1301,8 @@ static struct janus_json_parameter join_parameters[] = { {"token", JSON_STRING, 0}, {"group", JSON_STRING, 0}, {"muted", JANUS_JSON_BOOL, 0}, + {"suspended", JANUS_JSON_BOOL, 0}, + {"pause_events", JANUS_JSON_BOOL, 0}, {"codec", JSON_STRING, 0}, {"bitrate", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"quality", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -1270,6 +1371,14 @@ static struct janus_json_parameter play_file_parameters[] = { static struct janus_json_parameter checkstop_file_parameters[] = { {"file_id", JSON_STRING, JANUS_JSON_PARAM_REQUIRED} }; +static struct janus_json_parameter suspend_parameters[] = { + {"pause_events", JANUS_JSON_BOOL, 0}, + {"stop_record", JANUS_JSON_BOOL, 0}, +}; +static struct janus_json_parameter resume_parameters[] = { + {"record", JANUS_JSON_BOOL, 0}, + {"filename", JSON_STRING, 0}, +}; /* Static configuration instance */ static janus_config *config = NULL; @@ -1564,6 +1673,7 @@ typedef struct janus_audiobridge_participant { int spatial_position; /* Panning of this participant in the mix */ /* RTP stuff */ JitterBuffer *jitter; /* Jitter buffer of incoming audio packets */ + gint64 jitter_next_check; /* Timestamp to perform next jitter buffer size check */ GList *inbuf; /* Decoded audio from this participant, to feed to the mixer */ GAsyncQueue *outbuf; /* Mixed audio to send to this participant */ janus_mutex qmutex; /* Incoming queue mutex */ @@ -1599,7 +1709,11 @@ typedef struct janus_audiobridge_participant { #endif uint group; /* Forwarding group index, if enabled in the room */ janus_mutex rec_mutex; /* Mutex to protect the recorder from race conditions */ - volatile gint destroyed; /* Whether this room has been destroyed */ + janus_mutex suspend_cond_mutex; + GCond suspend_cond; + volatile gint suspended; /* Whether this participant has been temporarily suspended */ + volatile gint paused_events;/* Whether sending events to this participant has been paused because they're suspended */ + volatile gint destroyed; /* Whether this participant has been destroyed */ janus_refcount ref; /* Reference counter for this participant */ } janus_audiobridge_participant; @@ -1639,6 +1753,35 @@ static void janus_audiobridge_buffer_packet_destroy(janus_audiobridge_buffer_pac g_free(pkt); } +static void janus_audiobridge_participant_clear_jitter_buffer(janus_audiobridge_participant *participant) { + if(participant->jitter) { + jitter_buffer_reset(participant->jitter); + } +} + +static void janus_audiobridge_participant_clear_inbuf(janus_audiobridge_participant *participant) { + while(participant->inbuf) { + GList *first = g_list_first(participant->inbuf); + janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; + participant->inbuf = g_list_delete_link(participant->inbuf, first); + first = NULL; + if(pkt == NULL) + continue; + g_free(pkt->data); + pkt->data = NULL; + g_free(pkt); + pkt = NULL; + } +} + +static void janus_audiobridge_participant_clear_outbuf(janus_audiobridge_participant *participant) { + while(participant->outbuf && g_async_queue_length(participant->outbuf) > 0) { + janus_audiobridge_rtp_relay_packet *pkt = g_async_queue_pop(participant->outbuf); + g_free(pkt->data); + g_free(pkt); + } +} + static void janus_audiobridge_participant_destroy(janus_audiobridge_participant *participant) { if(!participant) return; @@ -1666,20 +1809,9 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant opus_decoder_destroy(participant->decoder); if(participant->jitter) jitter_buffer_destroy(participant->jitter); - while(participant->inbuf) { - GList *first = g_list_first(participant->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - participant->inbuf = g_list_delete_link(participant->inbuf, first); - if(pkt) - g_free(pkt->data); - g_free(pkt); - } + janus_audiobridge_participant_clear_inbuf(participant); if(participant->outbuf != NULL) { - while(g_async_queue_length(participant->outbuf) > 0) { - janus_audiobridge_rtp_relay_packet *pkt = g_async_queue_pop(participant->outbuf); - g_free(pkt->data); - g_free(pkt); - } + janus_audiobridge_participant_clear_outbuf(participant); g_async_queue_unref(participant->outbuf); } g_free(participant->mjr_base); @@ -1999,6 +2131,13 @@ static int janus_audiobridge_resample(int16_t *input, int input_num, int input_r #define DEFAULT_COMPLEXITY 4 +/* Jitter Buffer and queue-in settings */ +#define JITTER_BUFFER_MIN_PACKETS 2 +#define JITTER_BUFFER_MAX_PACKETS 40 +#define JITTER_BUFFER_CHECK_USECS 1*G_USEC_PER_SEC +#define QUEUE_IN_MAX_PACKETS 4 + + /* Error codes */ #define JANUS_AUDIOBRIDGE_ERROR_UNKNOWN_ERROR 499 #define JANUS_AUDIOBRIDGE_ERROR_NO_MESSAGE 480 @@ -2767,7 +2906,7 @@ static void janus_audiobridge_notify_participants(janus_audiobridge_participant g_hash_table_iter_init(&iter, participant->room->participants); while(!participant->room->destroyed && g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p && p->session && (p != participant || notify_source_participant)) { + if(p && p->session && (p != participant || notify_source_participant) && !g_atomic_int_get(&p->paused_events)) { JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, msg, NULL); JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret)); @@ -2848,6 +2987,8 @@ json_t *janus_audiobridge_query_session(janus_plugin_session *handle) { json_object_set_new(rtp, "remote-ssrc", json_integer(participant->plainrtp_media.audio_ssrc_peer)); json_object_set_new(info, "plain-rtp", rtp); } + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(info, "suspended", json_true()); } if(session->plugin_offer) json_object_set_new(info, "plugin-offer", json_true()); @@ -3595,19 +3736,10 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s /* Get rid of queued packets */ janus_mutex_lock(&p->qmutex); g_atomic_int_set(&p->active, 0); - while(p->inbuf) { - GList *first = g_list_first(p->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - p->inbuf = g_list_delete_link(p->inbuf, first); - first = NULL; - if(pkt == NULL) - continue; - g_free(pkt->data); - pkt->data = NULL; - g_free(pkt); - pkt = NULL; - } + janus_audiobridge_participant_clear_jitter_buffer(p); + janus_audiobridge_participant_clear_inbuf(p); janus_mutex_unlock(&p->qmutex); + janus_audiobridge_participant_clear_outbuf(p); /* Request a WebRTC hangup */ gateway->close_pc(p->session->handle); } @@ -4054,20 +4186,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s if(participant->muted) { /* Clear the queued packets waiting to be handled */ janus_mutex_lock(&participant->qmutex); - if(participant->jitter) - jitter_buffer_reset(participant->jitter); - while(participant->inbuf) { - GList *first = g_list_first(participant->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - participant->inbuf = g_list_delete_link(participant->inbuf, first); - first = NULL; - if(pkt == NULL) - continue; - g_free(pkt->data); - pkt->data = NULL; - g_free(pkt); - pkt = NULL; - } + janus_audiobridge_participant_clear_jitter_buffer(participant); + janus_audiobridge_participant_clear_inbuf(participant); janus_mutex_unlock(&participant->qmutex); } @@ -4081,6 +4201,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(list, pl); json_t *pub = json_object(); json_object_set_new(pub, "audiobridge", json_string("event")); @@ -4092,6 +4214,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; + if(g_atomic_int_get(&p->paused_events)) + continue; JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, pub, NULL); JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret)); @@ -4194,6 +4318,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; + if(g_atomic_int_get(&p->paused_events)) + continue; JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, event, NULL); JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret)); @@ -4478,6 +4604,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s json_object_set_new(pl, "talking", p->talking ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position)); + if(g_atomic_int_get(&p->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(list, pl); } janus_refcount_decrease(&audiobridge->ref); @@ -5256,6 +5384,241 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s json_object_set_new(response, "file_id", json_string(file_id)); goto prepare_response; #endif + } else if(!strcasecmp(request_text, "suspend") || !strcasecmp(request_text, "resume")) { + gboolean suspend = !strcasecmp(request_text, "suspend"); + JANUS_VALIDATE_JSON_OBJECT(root, secret_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, room_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, roomstr_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + if(!string_ids) { + JANUS_VALIDATE_JSON_OBJECT(root, id_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + } else { + JANUS_VALIDATE_JSON_OBJECT(root, idstr_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + } + if(error_code != 0) + goto prepare_response; + json_t *room = json_object_get(root, "room"); + json_t *id = json_object_get(root, "id"); + guint64 room_id = 0; + char room_id_num[30], *room_id_str = NULL; + if(!string_ids) { + room_id = json_integer_value(room); + g_snprintf(room_id_num, sizeof(room_id_num), "%"SCNu64, room_id); + room_id_str = room_id_num; + } else { + room_id_str = (char *)json_string_value(room); + } + janus_mutex_lock(&rooms_mutex); + janus_audiobridge_room *audiobridge = g_hash_table_lookup(rooms, + string_ids ? (gpointer)room_id_str : (gpointer)&room_id); + if(audiobridge == NULL) { + janus_mutex_unlock(&rooms_mutex); + error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_ROOM; + JANUS_LOG(LOG_ERR, "No such room (%s)\n", room_id_str); + g_snprintf(error_cause, 512, "No such room (%s)", room_id_str); + goto prepare_response; + } + janus_refcount_increase(&audiobridge->ref); + janus_mutex_lock(&audiobridge->mutex); + janus_mutex_unlock(&rooms_mutex); + guint64 user_id = 0; + char user_id_num[30], *user_id_str = NULL; + if(!string_ids) { + user_id = json_integer_value(id); + g_snprintf(user_id_num, sizeof(user_id_num), "%"SCNu64, user_id); + user_id_str = user_id_num; + } else { + user_id_str = (char *)json_string_value(id); + } + janus_audiobridge_participant *participant = g_hash_table_lookup(audiobridge->participants, + string_ids ? (gpointer)user_id_str : (gpointer)&user_id); + if(participant == NULL) { + janus_mutex_unlock(&audiobridge->mutex); + janus_refcount_decrease(&audiobridge->ref); + JANUS_LOG(LOG_ERR, "No such user %s in room %s\n", user_id_str, room_id_str); + error_code = JANUS_AUDIOBRIDGE_ERROR_NO_SUCH_USER; + g_snprintf(error_cause, 512, "No such user %s in room %s", user_id_str, room_id_str); + goto prepare_response; + } + janus_refcount_increase(&participant->ref); + janus_mutex_unlock(&audiobridge->mutex); + /* A secret may be required for this action */ + if(session->participant != participant) { + JANUS_CHECK_SECRET(audiobridge->room_secret, root, "secret", error_code, error_cause, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_UNAUTHORIZED); + if(error_code != 0) { + janus_refcount_decrease(&participant->ref); + janus_refcount_decrease(&audiobridge->ref); + goto prepare_response; + } + } + /* Change the suspend status of this participant */ + gboolean notify_participant = FALSE, recap = FALSE; + if(suspend) { + /* Validate the request */ + JANUS_VALIDATE_JSON_OBJECT(root, suspend_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + if(error_code != 0) { + janus_refcount_decrease(&participant->ref); + janus_refcount_decrease(&audiobridge->ref); + goto prepare_response; + } + /* Suspend this participant */ + janus_mutex_lock(&participant->suspend_cond_mutex); + if(g_atomic_int_compare_and_exchange(&participant->suspended, 0, 1)) { + janus_mutex_unlock(&participant->suspend_cond_mutex); + json_t *pauseevs = json_object_get(root, "pause_events"); + if(pauseevs && json_is_true(pauseevs)) + g_atomic_int_set(&participant->paused_events, 1); + notify_participant = TRUE; + /* Participant is now suspended, so clear the queued packets waiting to be handled */ + janus_mutex_lock(&participant->qmutex); + janus_audiobridge_participant_clear_jitter_buffer(participant); + janus_audiobridge_participant_clear_inbuf(participant); + janus_mutex_unlock(&participant->qmutex); + janus_audiobridge_participant_clear_outbuf(participant); + /* Should we close the recording? */ + json_t *stoprec = json_object_get(root, "stop_record"); + if(stoprec && json_is_true(stoprec)) { + /* Stop recording (ignore if not recording) */ + janus_mutex_lock(&participant->rec_mutex); + janus_audiobridge_recorder_close(participant); + participant->mjr_active = FALSE; + janus_mutex_unlock(&participant->rec_mutex); + } + } else { + janus_mutex_unlock(&participant->suspend_cond_mutex); + } + } else { + /* Validate the request */ + JANUS_VALIDATE_JSON_OBJECT(root, resume_parameters, + error_code, error_cause, TRUE, + JANUS_AUDIOBRIDGE_ERROR_MISSING_ELEMENT, JANUS_AUDIOBRIDGE_ERROR_INVALID_ELEMENT); + if(error_code != 0) { + janus_refcount_decrease(&participant->ref); + janus_refcount_decrease(&audiobridge->ref); + goto prepare_response; + } + /* Resume this participant */ + janus_mutex_lock(&participant->suspend_cond_mutex); + if(g_atomic_int_compare_and_exchange(&participant->suspended, 1, 0)) { + g_cond_signal(&participant->suspend_cond); + janus_mutex_unlock(&participant->suspend_cond_mutex); + notify_participant = TRUE; + /* Should we create a new recording? */ + json_t *record = json_object_get(root, "record"); + json_t *recfile = json_object_get(root, "filename"); + janus_mutex_lock(&participant->rec_mutex); + if(record && json_is_true(record)) { + /* Start recording (ignore if recording already) */ + if(participant->arc != NULL) { + JANUS_LOG(LOG_WARN, "Already recording participant's audio (room %s, user %s)\n", + participant->room->room_id_str, participant->user_id_str); + } else { + JANUS_LOG(LOG_INFO, "Starting recording of participant's audio (room %s, user %s)\n", + participant->room->room_id_str, participant->user_id_str); + const char *recording_base = json_string_value(recfile); + if(recording_base) { + g_free(participant->mjr_base); + participant->mjr_base = g_strdup(recording_base); + } + janus_audiobridge_recorder_create(participant); + participant->mjr_active = TRUE; + } + } + janus_mutex_unlock(&participant->rec_mutex); + /* Should we send a full updated state of the room to the participant? */ + if(g_atomic_int_compare_and_exchange(&participant->paused_events, 1, 0)) { + /* Return a list of all available participants for the resumed participant now */ + recap = TRUE; + json_t *list = json_array(); + GHashTableIter iter; + gpointer value; + g_hash_table_iter_init(&iter, audiobridge->participants); + while(g_hash_table_iter_next(&iter, NULL, &value)) { + janus_audiobridge_participant *p = value; + if(p == participant) { + continue; + } + json_t *pl = json_object(); + json_object_set_new(pl, "id", string_ids ? json_string(p->user_id_str) : json_integer(p->user_id)); + if(p->display) + json_object_set_new(pl, "display", json_string(p->display)); + json_object_set_new(pl, "setup", g_atomic_int_get(&p->session->started) ? json_true() : json_false()); + json_object_set_new(pl, "muted", p->muted ? json_true() : json_false()); + if(p->extmap_id > 0) + json_object_set_new(pl, "talking", p->talking ? json_true() : json_false()); + if(audiobridge->spatial_audio) + json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position)); + if(g_atomic_int_get(&p->suspended)) + json_object_set_new(pl, "suspended", json_true()); + json_array_append_new(list, pl); + } + json_t *event = json_object(); + json_object_set_new(event, "audiobridge", json_string("event")); + json_object_set_new(event, "room", string_ids ? json_string(room_id_str) : json_integer(room_id)); + json_object_set_new(event, "resumed", string_ids ? json_string(user_id_str) : json_integer(user_id)); + json_object_set_new(event, "participants", list); + int ret = gateway->push_event(participant->session->handle, &janus_audiobridge_plugin, NULL, event, NULL); + JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret)); + json_decref(event); + } + } else { + janus_mutex_unlock(&participant->suspend_cond_mutex); + } + } + if(notify_participant) { + /* Notify all participants about the change */ + json_t *event = json_object(); + json_object_set_new(event, "audiobridge", json_string("event")); + json_object_set_new(event, "room", string_ids ? json_string(room_id_str) : json_integer(room_id)); + json_object_set_new(event, suspend ? "suspended" : "resumed", + string_ids ? json_string(user_id_str) : json_integer(user_id)); + GHashTableIter iter; + gpointer value; + g_hash_table_iter_init(&iter, audiobridge->participants); + while(g_hash_table_iter_next(&iter, NULL, &value)) { + janus_audiobridge_participant *p = value; + if((p == participant && recap) || (p != participant && g_atomic_int_get(&p->paused_events))) + continue; + JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); + int ret = gateway->push_event(p->session->handle, &janus_audiobridge_plugin, NULL, event, NULL); + JANUS_LOG(LOG_VERB, " >> %d (%s)\n", ret, janus_get_api_error(ret)); + } + json_decref(event); + /* Also notify event handlers */ + if(notify_events && gateway->events_is_enabled()) { + json_t *info = json_object(); + json_object_set_new(info, "event", json_string(suspend ? "suspended" : "resumed")); + json_object_set_new(info, "room", string_ids ? json_string(room_id_str) : json_integer(room_id)); + json_object_set_new(info, "id", string_ids ? json_string(user_id_str) : json_integer(user_id)); + gateway->notify_event(&janus_audiobridge_plugin, session ? session->handle : NULL, info); + } + } + /* Prepare response */ + response = json_object(); + json_object_set_new(response, "audiobridge", json_string("success")); + /* Done */ + janus_refcount_decrease(&participant->ref); + janus_refcount_decrease(&audiobridge->ref); + goto prepare_response; } else { /* Not a request we recognize, don't do anything */ return NULL; @@ -5461,6 +5824,8 @@ void janus_audiobridge_setup_media(janus_plugin_session *handle) { json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(list, pl); json_t *pub = json_object(); json_object_set_new(pub, "audiobridge", json_string("event")); @@ -5472,7 +5837,7 @@ void janus_audiobridge_setup_media(janus_plugin_session *handle) { g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p == participant) { + if(p == participant || g_atomic_int_get(&p->paused_events)) { continue; /* Skip the new participant itself */ } JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); @@ -5492,7 +5857,7 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r if(!session || g_atomic_int_get(&session->destroyed) || !session->participant) return; janus_audiobridge_participant *participant = (janus_audiobridge_participant *)session->participant; - if(!g_atomic_int_get(&participant->active) || participant->muted || + if(!g_atomic_int_get(&participant->active) || participant->muted || g_atomic_int_get(&participant->suspended) || (participant->codec == JANUS_AUDIOCODEC_OPUS && !participant->decoder) || !participant->room) return; if(participant->room && participant->room->muted && !participant->admin) @@ -5596,6 +5961,21 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r if(participant->jitter) { janus_audiobridge_buffer_packet *pkt = janus_audiobridge_buffer_packet_create(buf, len, silence); janus_mutex_lock(&participant->qmutex); + /* Limit the size of the jitter buffer */ + gint64 now = janus_get_monotonic_time(); + if(participant->jitter_next_check == 0) { + /* Schedule next check */ + participant->jitter_next_check = now + JITTER_BUFFER_CHECK_USECS; + } else if(now >= participant->jitter_next_check) { + spx_int32_t count = 0; + jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_GET_AVALIABLE_COUNT, &count); + if(count > JITTER_BUFFER_MAX_PACKETS) { + JANUS_LOG(LOG_WARN, "Jitter buffer contains too many packets, clearing now (count=%d)\n", count); + janus_audiobridge_participant_clear_jitter_buffer(participant); + } + /* Schedule next check */ + participant->jitter_next_check = now + JITTER_BUFFER_CHECK_USECS; + } JitterBufferPacket jbp = {0}; jbp.data = (char *)pkt; jbp.len = 0; @@ -5757,21 +6137,14 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle g_free(participant->mjr_base); participant->mjr_base = NULL; /* Get rid of queued packets */ - if(participant->jitter) - jitter_buffer_reset(participant->jitter); - while(participant->inbuf) { - GList *first = g_list_first(participant->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - participant->inbuf = g_list_delete_link(participant->inbuf, first); - first = NULL; - if(pkt == NULL) - continue; - g_free(pkt->data); - pkt->data = NULL; - g_free(pkt); - pkt = NULL; - } + janus_audiobridge_participant_clear_jitter_buffer(participant); + janus_audiobridge_participant_clear_inbuf(participant); janus_mutex_unlock(&participant->qmutex); + janus_audiobridge_participant_clear_outbuf(participant); + janus_mutex_lock(&participant->suspend_cond_mutex); + if(g_atomic_int_compare_and_exchange(&participant->suspended, 1, 0)) + g_cond_signal(&participant->suspend_cond); + janus_mutex_unlock(&participant->suspend_cond_mutex); if(audiobridge != NULL) { janus_mutex_unlock(&audiobridge->mutex); if(removed) { @@ -6005,6 +6378,7 @@ static void *janus_audiobridge_handler(void *data) { json_t *display = json_object_get(root, "display"); const char *display_text = display ? json_string_value(display) : NULL; json_t *muted = json_object_get(root, "muted"); + json_t *suspended = json_object_get(root, "suspended"); json_t *gain = json_object_get(root, "volume"); json_t *spatial = json_object_get(root, "spatial_position"); json_t *bitrate = json_object_get(root, "bitrate"); @@ -6119,6 +6493,8 @@ static void *janus_audiobridge_handler(void *data) { participant->display = NULL; participant->jitter = jitter_buffer_init(participant->codec == JANUS_AUDIOCODEC_OPUS ? 960 : 160); jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_SET_DESTROY_CALLBACK, &janus_audiobridge_buffer_packet_destroy); + spx_int32_t min_buffer_size = JITTER_BUFFER_MIN_PACKETS; + jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_SET_MARGIN, &min_buffer_size); participant->inbuf = NULL; participant->outbuf = NULL; participant->encoder = NULL; @@ -6143,6 +6519,14 @@ static void *janus_audiobridge_handler(void *data) { participant->admin = admin; participant->display = display_text ? g_strdup(display_text) : NULL; participant->muted = muted ? json_is_true(muted) : FALSE; /* By default, everyone's unmuted when joining */ + if(suspended && json_is_true(suspended)) { + janus_mutex_lock(&participant->suspend_cond_mutex); + g_atomic_int_set(&participant->suspended, 1); + janus_mutex_unlock(&participant->suspend_cond_mutex); + json_t *pauseevs = json_object_get(root, "pause_events"); + if(pauseevs && json_is_true(pauseevs)) + g_atomic_int_set(&participant->paused_events, 1); + } participant->volume_gain = volume; participant->opus_complexity = complexity; participant->opus_bitrate = opus_bitrate; @@ -6387,6 +6771,8 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(newuserlist, pl); json_object_set_new(newuser, "participants", newuserlist); GHashTableIter iter; @@ -6394,7 +6780,7 @@ static void *janus_audiobridge_handler(void *data) { g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p == participant) { + if(p == participant || g_atomic_int_get(&p->paused_events)) { continue; } JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); @@ -6407,7 +6793,7 @@ static void *janus_audiobridge_handler(void *data) { g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p == participant) { + if(p == participant || g_atomic_int_get(&p->paused_events)) { continue; } json_t *pl = json_object(); @@ -6420,6 +6806,8 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(pl, "talking", p->talking ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(list, pl); } janus_mutex_unlock(&audiobridge->mutex); @@ -6449,6 +6837,8 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(info, "muted", participant->muted ? json_true() : json_false()); if(participant->stereo) json_object_set_new(info, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(info, "suspended", json_true()); gateway->notify_event(&janus_audiobridge_plugin, session->handle, info); } if(user_id_allocated) @@ -6541,21 +6931,8 @@ static void *janus_audiobridge_handler(void *data) { if(participant->muted) { /* Clear the queued packets waiting to be handled */ janus_mutex_lock(&participant->qmutex); - if(participant->jitter) - jitter_buffer_reset(participant->jitter); - while(participant->inbuf) { - GList *first = g_list_first(participant->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - participant->inbuf = g_list_delete_link(participant->inbuf, first); - first = NULL; - if(pkt == NULL) - continue; - if(pkt->data) - g_free(pkt->data); - pkt->data = NULL; - g_free(pkt); - pkt = NULL; - } + janus_audiobridge_participant_clear_jitter_buffer(participant); + janus_audiobridge_participant_clear_inbuf(participant); janus_mutex_unlock(&participant->qmutex); } } @@ -6588,6 +6965,8 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(list, pl); json_t *pub = json_object(); json_object_set_new(pub, "audiobridge", json_string("event")); @@ -6599,7 +6978,7 @@ static void *janus_audiobridge_handler(void *data) { g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p == participant) { + if(p == participant || g_atomic_int_get(&p->paused_events)) { continue; /* Skip the new participant itself */ } JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", @@ -6661,6 +7040,8 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(info, "quality", json_integer(participant->opus_complexity)); if(participant->stereo) json_object_set_new(info, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(info, "suspended", json_true()); gateway->notify_event(&janus_audiobridge_plugin, session->handle, info); } /* If we need to generate an offer ourselves, do that */ @@ -6792,6 +7173,7 @@ static void *janus_audiobridge_handler(void *data) { json_t *display = json_object_get(root, "display"); const char *display_text = display ? json_string_value(display) : NULL; json_t *muted = json_object_get(root, "muted"); + json_t *suspended = json_object_get(root, "suspended"); json_t *gain = json_object_get(root, "volume"); json_t *spatial = json_object_get(root, "spatial_position"); json_t *bitrate = json_object_get(root, "bitrate"); @@ -6981,7 +7363,7 @@ static void *janus_audiobridge_handler(void *data) { g_hash_table_iter_init(&iter, old_audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p == participant) { + if(p == participant || g_atomic_int_get(&p->paused_events)) { continue; /* Skip the new participant itself */ } JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); @@ -7017,6 +7399,14 @@ static void *janus_audiobridge_handler(void *data) { participant->display = display_text ? g_strdup(display_text) : NULL; participant->room = audiobridge; participant->muted = muted ? json_is_true(muted) : FALSE; /* When switching to a new room, you're unmuted by default */ + if(suspended && json_is_true(suspended)) { + janus_mutex_lock(&participant->suspend_cond_mutex); + g_atomic_int_set(&participant->suspended, 1); + janus_mutex_unlock(&participant->suspend_cond_mutex); + json_t *pauseevs = json_object_get(root, "pause_events"); + if(pauseevs && json_is_true(pauseevs)) + g_atomic_int_set(&participant->paused_events, 1); + } participant->audio_active_packets = 0; participant->audio_dBov_sum = 0; participant->talking = FALSE; @@ -7057,12 +7447,14 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(pl, "muted", participant->muted ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(newuserlist, pl); json_object_set_new(newuser, "participants", newuserlist); g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p == participant) { + if(p == participant || g_atomic_int_get(&p->paused_events)) { continue; } JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); @@ -7088,6 +7480,8 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(pl, "talking", p->talking ? json_true() : json_false()); if(audiobridge->spatial_audio) json_object_set_new(pl, "spatial_position", json_integer(p->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(pl, "suspended", json_true()); json_array_append_new(list, pl); } event = json_object(); @@ -7108,6 +7502,8 @@ static void *janus_audiobridge_handler(void *data) { json_object_set_new(info, "muted", participant->muted ? json_true() : json_false()); if(participant->stereo) json_object_set_new(info, "spatial_position", json_integer(participant->spatial_position)); + if(g_atomic_int_get(&participant->suspended)) + json_object_set_new(info, "suspended", json_true()); gateway->notify_event(&janus_audiobridge_plugin, session->handle, info); } if(user_id_allocated) @@ -7148,7 +7544,7 @@ static void *janus_audiobridge_handler(void *data) { g_hash_table_iter_init(&iter, audiobridge->participants); while(g_hash_table_iter_next(&iter, NULL, &value)) { janus_audiobridge_participant *p = value; - if(p == participant) { + if(p == participant || g_atomic_int_get(&p->paused_events)) { continue; /* Skip the new participant itself */ } JANUS_LOG(LOG_VERB, "Notifying participant %s (%s)\n", p->user_id_str, p->display ? p->display : "??"); @@ -7164,21 +7560,10 @@ static void *janus_audiobridge_handler(void *data) { /* Get rid of queued packets */ janus_mutex_lock(&participant->qmutex); g_atomic_int_set(&participant->active, 0); - if(participant->jitter) - jitter_buffer_reset(participant->jitter); - while(participant->inbuf) { - GList *first = g_list_first(participant->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - participant->inbuf = g_list_delete_link(participant->inbuf, first); - first = NULL; - if(pkt == NULL) - continue; - g_free(pkt->data); - pkt->data = NULL; - g_free(pkt); - pkt = NULL; - } + janus_audiobridge_participant_clear_jitter_buffer(participant); + janus_audiobridge_participant_clear_inbuf(participant); janus_mutex_unlock(&participant->qmutex); + janus_audiobridge_participant_clear_outbuf(participant); /* Stop recording, if we were */ janus_mutex_lock(&participant->rec_mutex); janus_audiobridge_recorder_close(participant); @@ -7730,7 +8115,8 @@ static void *janus_audiobridge_mixer_thread(void *data) { while(ps) { janus_audiobridge_participant *p = (janus_audiobridge_participant *)ps->data; janus_mutex_lock(&p->qmutex); - if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) || !g_atomic_int_get(&p->active) || p->muted || !p->inbuf) { + if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) || + !g_atomic_int_get(&p->active) || p->muted || g_atomic_int_get(&p->suspended) || !p->inbuf) { janus_mutex_unlock(&p->qmutex); ps = ps->next; continue; @@ -7974,7 +8360,8 @@ static void *janus_audiobridge_mixer_thread(void *data) { ps = participants_list; while(ps) { janus_audiobridge_participant *p = (janus_audiobridge_participant *)ps->data; - if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started)) { + if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) || + g_atomic_int_get(&p->suspended)) { janus_refcount_decrease(&p->ref); ps = ps->next; continue; @@ -8229,6 +8616,16 @@ static void *janus_audiobridge_participant_thread(void *data) { /* Start working: check both the incoming queue (to decode and queue) and the outgoing one (to encode and send) */ while(!g_atomic_int_get(&stopping) && g_atomic_int_get(&session->destroyed) == 0) { + janus_mutex_lock(&participant->suspend_cond_mutex); + while(g_atomic_int_get(&participant->suspended)) { + g_cond_wait(&participant->suspend_cond, &participant->suspend_cond_mutex); + before = janus_get_monotonic_time(); + participant->context.seq_reset = TRUE; + first = TRUE; + /* Clear the output queue since it might contain old packets and break RTP sequence */ + janus_audiobridge_participant_clear_outbuf(participant); + } + janus_mutex_unlock(&participant->suspend_cond_mutex); /* Start with packets to decode and queue for the mixer */ now = janus_get_monotonic_time(); janus_mutex_lock(&participant->qmutex); @@ -8243,16 +8640,6 @@ static void *janus_audiobridge_participant_thread(void *data) { bpkt = (janus_audiobridge_buffer_packet *)jbp.data; janus_mutex_unlock(&participant->qmutex); locked = FALSE; - rtp = (janus_rtp_header *)bpkt->buffer; - /* If this is Opus, check if there's a packet gap we should fix with FEC */ - use_fec = FALSE; - if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) { - if(ntohs(rtp->seq_number) == participant->expected_seq + 1) { - /* Lost a packet here? Use FEC to recover */ - use_fec = TRUE; - } - } - first = FALSE; if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { /* This means we're cleaning up, so don't try to decode */ janus_audiobridge_buffer_packet_destroy(bpkt); @@ -8268,6 +8655,16 @@ static void *janus_audiobridge_participant_thread(void *data) { janus_audiobridge_buffer_packet_destroy(bpkt); break; } + rtp = (janus_rtp_header *)bpkt->buffer; + /* If this is Opus, check if there's a packet gap we should fix with FEC */ + use_fec = FALSE; + if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) { + if(ntohs(rtp->seq_number) == (participant->expected_seq + 1)) { + /* Lost a packet here? Use FEC to recover */ + use_fec = TRUE; + } + } + first = FALSE; if(use_fec) { /* There was a gap, try to get decode from redundant info first */ pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); @@ -8339,6 +8736,12 @@ static void *janus_audiobridge_participant_thread(void *data) { /* Queue the decoded packet for the mixer */ janus_mutex_lock(&participant->qmutex); locked = TRUE; + /* Do not let queue-in grow too much */ + guint count = g_list_length(participant->inbuf); + if(count > QUEUE_IN_MAX_PACKETS) { + JANUS_LOG(LOG_WARN, "Participant queue-in contains too many packets, clearing now (count=%u)\n", count); + janus_audiobridge_participant_clear_inbuf(participant); + } participant->inbuf = g_list_append(participant->inbuf, pkt); } }