From e78232e900b6f302dd2f4f8dd0b26ebf45393a41 Mon Sep 17 00:00:00 2001 From: Lorenzo Miniero Date: Tue, 9 Jan 2024 15:27:47 +0100 Subject: [PATCH] Perform audio-level detection in AudioBridge participant thread (#3312) --- src/plugins/janus_audiobridge.c | 173 +++++++++++++++++--------------- src/plugins/plugin.c | 18 ++++ src/plugins/plugin.h | 9 +- 3 files changed, 117 insertions(+), 83 deletions(-) diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index 860e7efc8d..4f4fdb618e 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -1729,30 +1729,28 @@ typedef struct janus_audiobridge_rtp_relay_packet { /* Buffered audio/video packet */ typedef struct janus_audiobridge_buffer_packet { /* Pointer to the packet data, if RTP */ - char *buffer; - /* Size of the packet */ - int len; - /* Whether the packet contains silence, according to the RTP extension */ - gboolean silence; + janus_plugin_rtp *rtp; /* Monotonic insert time */ int64_t inserted; } janus_audiobridge_buffer_packet; -static janus_audiobridge_buffer_packet *janus_audiobridge_buffer_packet_create(char *buffer, int len, gboolean silence) { +static janus_audiobridge_buffer_packet *janus_audiobridge_buffer_packet_create(janus_plugin_rtp *rtp) { janus_audiobridge_buffer_packet *pkt = g_malloc(sizeof(janus_audiobridge_buffer_packet)); - pkt->buffer = g_malloc(len); - pkt->len = len; - pkt->silence = silence; - memcpy(pkt->buffer, buffer, len); + pkt->rtp = janus_plugin_rtp_duplicate(rtp); pkt->inserted = janus_get_monotonic_time(); return pkt; } static void janus_audiobridge_buffer_packet_destroy(janus_audiobridge_buffer_packet *pkt) { if(!pkt) return; - g_free(pkt->buffer); + if(pkt->rtp) + g_free(pkt->rtp->buffer); + g_free(pkt->rtp); g_free(pkt); } +static void janus_audiobridge_participant_istalking(janus_audiobridge_session *session, + janus_audiobridge_participant *participant, janus_plugin_rtp *packet, gboolean *silence); + static void janus_audiobridge_participant_clear_jitter_buffer(janus_audiobridge_participant *participant) { if(participant->jitter) { jitter_buffer_reset(participant->jitter); @@ -5891,74 +5889,9 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r rtp->type, participant->codec == JANUS_AUDIOCODEC_PCMA ? 8 : 0); return; } - /* Check the audio levels, in case we need to notify participants about who's talking */ - gboolean silence = FALSE; - if(participant->extmap_id > 0) { - /* Check the audio levels, in case we need to notify participants about who's talking */ - int level = packet->extensions.audio_level; - if(level != -1) { - /* Is this silence? */ - silence = (level == 127); - if(participant->room && participant->room->audiolevel_event) { - /* We also need to detect who's talking: update our monitoring stuff */ - int audio_active_packets = participant->room ? participant->room->audio_active_packets : 100; - int audio_level_average = participant->room ? participant->room->audio_level_average : 25; - /* Check if we need to override those with user specific properties */ - if(participant->user_audio_active_packets > 0) - audio_active_packets = participant->user_audio_active_packets; - if(participant->user_audio_level_average > 0) - audio_level_average = participant->user_audio_level_average; - participant->audio_dBov_sum += level; - participant->audio_active_packets++; - participant->dBov_level = level; - if(participant->audio_active_packets > 0 && participant->audio_active_packets == audio_active_packets) { - gboolean notify_talk_event = FALSE; - if((float) participant->audio_dBov_sum / (float) participant->audio_active_packets < audio_level_average) { - /* Participant talking, should we notify all participants? */ - if(!participant->talking) - notify_talk_event = TRUE; - participant->talking = TRUE; - } else { - /* Participant not talking anymore, should we notify all participants? */ - if(participant->talking) - notify_talk_event = TRUE; - participant->talking = FALSE; - } - participant->audio_active_packets = 0; - participant->audio_dBov_sum = 0; - /* Only notify in case of state changes */ - if(participant->room && notify_talk_event) { - janus_mutex_lock(&participant->room->mutex); - json_t *event = json_object(); - json_object_set_new(event, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking")); - json_object_set_new(event, "room", - string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) : - json_integer(participant->room ? participant->room->room_id : 0)); - json_object_set_new(event, "id", - string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id)); - /* Notify the speaker this event is related to as well */ - janus_audiobridge_notify_participants(participant, event, TRUE); - json_decref(event); - janus_mutex_unlock(&participant->room->mutex); - /* Also notify event handlers */ - if(notify_events && gateway->events_is_enabled()) { - json_t *info = json_object(); - json_object_set_new(info, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking")); - json_object_set_new(info, "room", - string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) : - json_integer(participant->room ? participant->room->room_id : 0)); - json_object_set_new(info, "id", - string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id)); - gateway->notify_event(&janus_audiobridge_plugin, session->handle, info); - } - } - } - } - } - } /* Queue the audio packet in the jitter buffer (we won't decode now, there might be buffering involved) */ if(participant->jitter) { - janus_audiobridge_buffer_packet *pkt = janus_audiobridge_buffer_packet_create(buf, len, silence); + janus_audiobridge_buffer_packet *pkt = janus_audiobridge_buffer_packet_create(packet); janus_mutex_lock(&participant->qmutex); /* Limit the size of the jitter buffer */ gint64 now = janus_get_monotonic_time(); @@ -7955,6 +7888,7 @@ static void *janus_audiobridge_mixer_thread(void *data) { janus_audiobridge_room *audiobridge = (janus_audiobridge_room *)data; if(!audiobridge) { JANUS_LOG(LOG_ERR, "Invalid room!\n"); + g_thread_unref(g_thread_self()); return NULL; } JANUS_LOG(LOG_VERB, "Thread is for mixing room %s (%s) at rate %"SCNu32"...\n", @@ -8578,6 +8512,7 @@ static void *janus_audiobridge_mixer_thread(void *data) { janus_refcount_decrease(&audiobridge->ref); + g_thread_unref(g_thread_self()); return NULL; } @@ -8635,7 +8570,11 @@ static void *janus_audiobridge_participant_thread(void *data) { if(participant->jitter) { ret = jitter_buffer_get(participant->jitter, &jbp, participant->codec == JANUS_AUDIOCODEC_OPUS ? 960 : 160, NULL); jitter_buffer_tick(participant->jitter); - if(ret == JITTER_BUFFER_OK) { + if(ret != JITTER_BUFFER_OK) { + /* No packet in the jitter buffer? Move on the talking detection, if needed */ + janus_audiobridge_participant_istalking(session, participant, NULL, NULL); + } else { + /* Decode the audio packet */ bpkt = (janus_audiobridge_buffer_packet *)jbp.data; janus_mutex_unlock(&participant->qmutex); locked = FALSE; @@ -8645,8 +8584,10 @@ static void *janus_audiobridge_participant_thread(void *data) { break; } /* Access the payload */ + char *buffer = bpkt->rtp ? bpkt->rtp->buffer : NULL; + uint16_t len = bpkt->rtp ? bpkt->rtp->length : 0; int plen = 0; - const unsigned char *payload = (const unsigned char *)janus_rtp_payload(bpkt->buffer, bpkt->len, &plen); + const unsigned char *payload = (const unsigned char *)janus_rtp_payload(buffer, len, &plen); if(!payload) { g_atomic_int_set(&participant->decoding, 0); JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n", @@ -8654,7 +8595,7 @@ static void *janus_audiobridge_participant_thread(void *data) { janus_audiobridge_buffer_packet_destroy(bpkt); break; } - rtp = (janus_rtp_header *)bpkt->buffer; + rtp = (janus_rtp_header *)buffer; /* If this is Opus, check if there's a packet gap we should fix with FEC */ use_fec = FALSE; if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) { @@ -8689,8 +8630,9 @@ static void *janus_audiobridge_participant_thread(void *data) { pkt->ssrc = 0; pkt->timestamp = ntohl(rtp->timestamp); pkt->seq_number = ntohs(rtp->seq_number); - /* We might check the audio level extension to see if this is silence */ - pkt->silence = bpkt->silence; + /* Check the audio level extension to see if this is silence */ + pkt->silence = FALSE; + janus_audiobridge_participant_istalking(session, participant, bpkt->rtp, &pkt->silence); pkt->length = 0; if(participant->codec == JANUS_AUDIOCODEC_OPUS) { /* Opus */ @@ -9080,3 +9022,70 @@ static void *janus_audiobridge_plainrtp_relay_thread(void *data) { g_thread_unref(g_thread_self()); return NULL; } + +static void janus_audiobridge_participant_istalking(janus_audiobridge_session *session, + janus_audiobridge_participant *participant, janus_plugin_rtp *packet, gboolean *silence) { + /* Check the audio levels, in case we need to notify participants about who's talking */ + if(participant == NULL || participant->extmap_id < 1) + return; + int level = packet ? packet->extensions.audio_level : 127; + if(level == -1) + return; + if(level == 127 && silence) + *silence = TRUE; + if(participant->room && participant->room->audiolevel_event) { + /* We need to detect who's talking: update our monitoring stuff */ + int audio_active_packets = participant->room ? participant->room->audio_active_packets : 100; + int audio_level_average = participant->room ? participant->room->audio_level_average : 25; + /* Check if we need to override those with user specific properties */ + if(participant->user_audio_active_packets > 0) + audio_active_packets = participant->user_audio_active_packets; + if(participant->user_audio_level_average > 0) + audio_level_average = participant->user_audio_level_average; + participant->audio_dBov_sum += level; + participant->audio_active_packets++; + participant->dBov_level = level; + if(participant->audio_active_packets > 0 && participant->audio_active_packets == audio_active_packets) { + gboolean notify_talk_event = FALSE; + if((float) participant->audio_dBov_sum / (float) participant->audio_active_packets < audio_level_average) { + /* Participant talking, should we notify all participants? */ + if(!participant->talking) + notify_talk_event = TRUE; + participant->talking = TRUE; + } else { + /* Participant not talking anymore, should we notify all participants? */ + if(participant->talking) + notify_talk_event = TRUE; + participant->talking = FALSE; + } + participant->audio_active_packets = 0; + participant->audio_dBov_sum = 0; + /* Only notify in case of state changes */ + if(participant->room && notify_talk_event) { + janus_mutex_lock(&participant->room->mutex); + json_t *event = json_object(); + json_object_set_new(event, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking")); + json_object_set_new(event, "room", + string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) : + json_integer(participant->room ? participant->room->room_id : 0)); + json_object_set_new(event, "id", + string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id)); + /* Notify the speaker this event is related to as well */ + janus_audiobridge_notify_participants(participant, event, TRUE); + json_decref(event); + janus_mutex_unlock(&participant->room->mutex); + /* Also notify event handlers */ + if(notify_events && gateway->events_is_enabled()) { + json_t *info = json_object(); + json_object_set_new(info, "audiobridge", json_string(participant->talking ? "talking" : "stopped-talking")); + json_object_set_new(info, "room", + string_ids ? json_string(participant->room ? participant->room->room_id_str : NULL) : + json_integer(participant->room ? participant->room->room_id : 0)); + json_object_set_new(info, "id", + string_ids ? json_string(participant->user_id_str) : json_integer(participant->user_id)); + gateway->notify_event(&janus_audiobridge_plugin, session->handle, info); + } + } + } + } +} diff --git a/src/plugins/plugin.c b/src/plugins/plugin.c index ac3f018cef..17eab46339 100644 --- a/src/plugins/plugin.c +++ b/src/plugins/plugin.c @@ -57,6 +57,24 @@ void janus_plugin_rtp_reset(janus_plugin_rtp *packet) { janus_plugin_rtp_extensions_reset(&packet->extensions); } } +janus_plugin_rtp *janus_plugin_rtp_duplicate(janus_plugin_rtp *packet) { + janus_plugin_rtp *p = NULL; + if(packet) { + p = g_malloc(sizeof(janus_plugin_rtp)); + p->mindex = packet->mindex; + p->video = packet->video; + if(packet->buffer == NULL || packet->length == 0) { + p->buffer = NULL; + p->length = 0; + } else { + p->buffer = g_malloc(packet->length); + memcpy(p->buffer, packet->buffer, packet->length); + p->length = packet->length; + } + p->extensions = packet->extensions; + } + return p; +} void janus_plugin_rtcp_reset(janus_plugin_rtcp *packet) { if(packet) { memset(packet, 0, sizeof(janus_plugin_rtcp)); diff --git a/src/plugins/plugin.h b/src/plugins/plugin.h index 93d234dc04..42d4712702 100644 --- a/src/plugins/plugin.h +++ b/src/plugins/plugin.h @@ -171,7 +171,7 @@ janus_plugin *create(void) { * Janus instance or it will crash. * */ -#define JANUS_PLUGIN_API_VERSION 103 +#define JANUS_PLUGIN_API_VERSION 104 /*! \brief Initialization of all plugin properties to NULL * @@ -610,6 +610,13 @@ struct janus_plugin_rtp { * @param[in] packet Pointer to the janus_plugin_rtp packet to reset */ void janus_plugin_rtp_reset(janus_plugin_rtp *packet); +/*! \brief Helper method to duplicate the RTP packet and its buffer + * @note The core will always pass non-allocated packets to plugins, which + * means they may have to duplicate them in case they need them for more time. + * @param[in] packet Pointer to the janus_plugin_rtp packet to duplicate + * @returns A pointer to the new janus_plugin_rtp, if successful, or NULL otherwise +*/ +janus_plugin_rtp *janus_plugin_rtp_duplicate(janus_plugin_rtp *packet); /*! \brief Janus plugin RTCP packet */ struct janus_plugin_rtcp {