diff --git a/src/plugins/janus_videoroom.c b/src/plugins/janus_videoroom.c index f6f7b2aeb8..c493c04014 100644 --- a/src/plugins/janus_videoroom.c +++ b/src/plugins/janus_videoroom.c @@ -2713,6 +2713,7 @@ static void janus_videoroom_publisher_dereference_nodebug(janus_videoroom_publis static void janus_videoroom_publisher_destroy(janus_videoroom_publisher *p) { if(p && g_atomic_int_compare_and_exchange(&p->destroyed, 0, 1)) { + janus_mutex_lock(&p->streams_mutex); /* Forwarders with RTCP support may have an extra reference, stop their source */ janus_mutex_lock(&p->rtp_forwarders_mutex); if(g_hash_table_size(p->rtp_forwarders) > 0) { @@ -2746,6 +2747,7 @@ static void janus_videoroom_publisher_destroy(janus_videoroom_publisher *p) { } } janus_mutex_unlock(&p->rtp_forwarders_mutex); + janus_mutex_unlock(&p->streams_mutex); janus_refcount_decrease(&p->ref); } } @@ -4534,6 +4536,7 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) { if(participant->e2ee) json_object_set_new(info, "e2ee", json_true()); json_t *media = json_array(); + janus_mutex_lock(&participant->streams_mutex); GList *temp = participant->streams; while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; @@ -4576,6 +4579,7 @@ json_t *janus_videoroom_query_session(janus_plugin_session *handle) { json_array_append_new(media, m); temp = temp->next; } + janus_mutex_unlock(&participant->streams_mutex); json_object_set_new(info, "streams", media); } if(participant != NULL) @@ -5821,6 +5825,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi goto prepare_response; } janus_refcount_increase(&publisher->ref); /* This is just to handle the request for now */ + janus_mutex_lock(&publisher->streams_mutex); janus_mutex_lock(&publisher->rtp_forwarders_mutex); if(publisher->udp_sock <= 0) { publisher->udp_sock = socket(!ipv6_disabled ? AF_INET6 : AF_INET, SOCK_DGRAM, IPPROTO_UDP); @@ -5850,9 +5855,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi json_t *s = json_array_get(streams, i); json_t *stream_mid = json_object_get(s, "mid"); const char *mid = json_string_value(stream_mid); - janus_mutex_lock(&publisher->streams_mutex); ps = g_hash_table_lookup(publisher->streams_bymid, mid); - janus_mutex_unlock(&publisher->streams_mutex); if(ps == NULL) { /* FIXME Should we return an error instead? */ JANUS_LOG(LOG_WARN, "No such stream with mid '%s', skipping forwarder...\n", mid); @@ -6079,7 +6082,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi guint32 audio_handle = 0; guint32 video_handle[3] = {0, 0, 0}; guint32 data_handle = 0; - janus_mutex_lock(&publisher->streams_mutex); if(audio_port > 0) { /* FIXME Find the audio stream */ GList *temp = publisher->streams; @@ -6224,7 +6226,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi } } } - janus_mutex_unlock(&publisher->streams_mutex); if(audio_handle > 0) { json_object_set_new(rtp_stream, "audio_stream_id", json_integer(audio_handle)); json_object_set_new(rtp_stream, "audio", json_integer(audio_port)); @@ -6254,6 +6255,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi json_object_set_new(rtp_stream, "warning", json_string("deprecated_api")); } janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_mutex_unlock(&publisher->streams_mutex); janus_mutex_unlock(&videoroom->mutex); /* These two unrefs are related to the message handling */ janus_refcount_decrease(&publisher->ref); @@ -6351,6 +6353,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi goto prepare_response; } janus_refcount_increase(&publisher->ref); /* Just to handle the message now */ + janus_mutex_lock(&publisher->streams_mutex); janus_mutex_lock(&publisher->rtp_forwarders_mutex); /* Find the forwarder by iterating on all the streams */ gboolean found = FALSE; @@ -6380,6 +6383,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi temp = temp->next; } janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_mutex_unlock(&publisher->streams_mutex); janus_refcount_decrease(&publisher->ref); janus_mutex_unlock(&videoroom->mutex); janus_refcount_decrease(&videoroom->ref); @@ -6846,6 +6850,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi /* To see if the participant is talking, we need to find the audio stream(s) */ if(g_atomic_int_get(&p->session->started)) { gboolean found = FALSE, talking = FALSE; + janus_mutex_lock(&p->streams_mutex); GList *temp = p->streams; while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; @@ -6856,6 +6861,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi } temp = temp->next; } + janus_mutex_unlock(&p->streams_mutex); if(found) json_object_set_new(pl, "talking", talking ? json_true() : json_false()); } @@ -6908,9 +6914,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi g_hash_table_iter_init(&iter, videoroom->participants); while (!g_atomic_int_get(&videoroom->destroyed) && g_hash_table_iter_next(&iter, NULL, &value)) { janus_videoroom_publisher *p = value; + janus_mutex_lock(&p->streams_mutex); janus_mutex_lock(&p->rtp_forwarders_mutex); if(g_hash_table_size(p->rtp_forwarders) == 0) { janus_mutex_unlock(&p->rtp_forwarders_mutex); + janus_mutex_unlock(&p->streams_mutex); continue; } json_t *pl = json_object(); @@ -6948,6 +6956,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi temp = temp->next; } janus_mutex_unlock(&p->rtp_forwarders_mutex); + janus_mutex_unlock(&p->streams_mutex); json_object_set_new(pl, "forwarders", flist); json_array_append_new(list, pl); } @@ -7000,9 +7009,12 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi /* Something changed */ if(!participant->recording_active) { /* Not recording (anymore?) */ + janus_mutex_lock(&participant->streams_mutex); janus_videoroom_recorder_close(participant); + janus_mutex_unlock(&participant->streams_mutex); } else if(participant->recording_active && g_atomic_int_get(&participant->session->started)) { /* We've started recording, send a PLI and go on */ + janus_mutex_lock(&participant->streams_mutex); GList *temp = participant->streams; while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; @@ -7013,6 +7025,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi } temp = temp->next; } + janus_mutex_unlock(&participant->streams_mutex); } } janus_mutex_unlock(&participant->rec_mutex); @@ -7169,9 +7182,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi g_snprintf(error_cause, 512, "Only local publishers can be remotized"); goto prepare_response; } + janus_mutex_lock(&publisher->streams_mutex); janus_mutex_lock(&publisher->rtp_forwarders_mutex); if(g_hash_table_lookup(publisher->remote_recipients, remote_id) != NULL) { janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_mutex_unlock(&publisher->streams_mutex); janus_refcount_decrease(&publisher->ref); janus_refcount_decrease(&videoroom->ref); JANUS_LOG(LOG_ERR, "Remotization already exists (%s)\n", remote_id); @@ -7185,6 +7200,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi if(publisher->udp_sock <= 0 || (!ipv6_disabled && setsockopt(publisher->udp_sock, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != 0)) { janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_mutex_unlock(&publisher->streams_mutex); janus_refcount_decrease(&publisher->ref); janus_refcount_decrease(&videoroom->ref); JANUS_LOG(LOG_ERR, "Could not open UDP socket for RTP stream for publisher (%s), %d (%s)\n", @@ -7195,7 +7211,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi } } /* Add a new RTP forwarder for each of the publisher streams */ - janus_mutex_lock(&publisher->streams_mutex); janus_videoroom_publisher_stream *ps = NULL; janus_rtp_forwarder *f = NULL; gboolean rtcp_added = FALSE, add_rtcp = FALSE; @@ -7253,7 +7268,6 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi } temp = temp->next; } - janus_mutex_unlock(&publisher->streams_mutex); /* Keep track of this remotization */ janus_videoroom_remote_recipient *recipient = g_malloc(sizeof(janus_videoroom_remote_recipient)); recipient->remote_id = g_strdup(remote_id); @@ -7264,6 +7278,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi g_hash_table_insert(publisher->remote_recipients, g_strdup(remote_id), recipient); /* Done */ janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_mutex_unlock(&publisher->streams_mutex); response = json_object(); json_object_set_new(response, "videoroom", json_string("success")); json_object_set_new(response, "room", string_ids ? json_string(publisher->room_id_str) : json_integer(publisher->room_id)); @@ -7333,10 +7348,12 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi } janus_refcount_increase(&publisher->ref); janus_mutex_unlock(&videoroom->mutex); + janus_mutex_lock(&publisher->streams_mutex); janus_mutex_lock(&publisher->rtp_forwarders_mutex); /* Check if we know of this remotization */ if(g_hash_table_remove(publisher->remote_recipients, remote_id) == FALSE) { janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_mutex_unlock(&publisher->streams_mutex); janus_refcount_decrease(&publisher->ref); janus_refcount_decrease(&videoroom->ref); JANUS_LOG(LOG_ERR, "No such remotization (%s)\n", remote_id); @@ -7368,6 +7385,7 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi temp = temp->next; } janus_mutex_unlock(&publisher->rtp_forwarders_mutex); + janus_mutex_unlock(&publisher->streams_mutex); /* Done */ response = json_object(); json_object_set_new(response, "videoroom", json_string("success")); @@ -7801,9 +7819,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi janus_mutex_init(&ps->subscribers_mutex); janus_mutex_init(&ps->rtp_forwarders_mutex); ps->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_rtp_forwarder_destroy); + janus_mutex_lock(&publisher->streams_mutex); publisher->streams = g_list_append(publisher->streams, ps); g_hash_table_insert(publisher->streams_byid, GINT_TO_POINTER(ps->mindex), ps); g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); + janus_mutex_unlock(&publisher->streams_mutex); mindex++; } /* Done, spawn a thread for this remote publisher */ @@ -8082,11 +8102,11 @@ static json_t *janus_videoroom_process_synchronous_request(janus_videoroom_sessi g_hash_table_insert(publisher->streams_bymid, g_strdup(ps->mid), ps); changes = TRUE; } - janus_mutex_unlock(&publisher->streams_mutex); if(changes) { /* Notify all other participants this publisher's media has changed */ janus_videoroom_notify_about_publisher(publisher, TRUE); } + janus_mutex_unlock(&publisher->streams_mutex); /* Done */ janus_refcount_decrease(&publisher->ref); janus_refcount_decrease(&videoroom->ref); @@ -8352,9 +8372,10 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) { if(session->participant_type == janus_videoroom_p_type_publisher) { janus_videoroom_publisher *participant = janus_videoroom_session_get_publisher(session); /* Notify all other participants that there's a new boy in town */ + janus_mutex_lock(&participant->rec_mutex); + janus_mutex_lock(&participant->streams_mutex); janus_videoroom_notify_about_publisher(participant, FALSE); /* Check if we need to start recording */ - janus_mutex_lock(&participant->rec_mutex); if((participant->room && participant->room->record) || participant->recording_active) { GList *temp = participant->streams; while(temp) { @@ -8364,6 +8385,7 @@ void janus_videoroom_setup_media(janus_plugin_session *handle) { } participant->recording_active = TRUE; } + janus_mutex_unlock(&participant->streams_mutex); janus_mutex_unlock(&participant->rec_mutex); janus_refcount_decrease(&participant->ref); } else if(session->participant_type == janus_videoroom_p_type_subscriber) { @@ -9035,7 +9057,9 @@ static void janus_videoroom_hangup_media_internal(gpointer session_data) { janus_mutex_lock(&participant->rec_mutex); g_free(participant->recording_base); participant->recording_base = NULL; + janus_mutex_lock(&participant->streams_mutex); janus_videoroom_recorder_close(participant); + janus_mutex_unlock(&participant->streams_mutex) janus_mutex_unlock(&participant->rec_mutex); participant->acodec = JANUS_AUDIOCODEC_NONE; participant->vcodec = JANUS_VIDEOCODEC_NONE; @@ -9632,6 +9656,7 @@ static void *janus_videoroom_handler(void *data) { /* Add proper info on all the streams */ gboolean audio_added = FALSE, video_added = FALSE, talking_found = FALSE, talking = FALSE; json_t *media = json_array(); + janus_mutex_lock(&p->streams_mutex); GList *temp = p->streams; while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; @@ -9688,6 +9713,7 @@ static void *janus_videoroom_handler(void *data) { json_array_append_new(media, info); temp = temp->next; } + janus_mutex_unlock(&p->streams_mutex); json_object_set_new(pl, "streams", media); if(talking_found) json_object_set_new(pl, "talking", talking ? json_true() : json_false()); @@ -10731,9 +10757,12 @@ static void *janus_videoroom_handler(void *data) { /* Something changed */ if(!participant->recording_active) { /* Not recording (anymore?) */ + janus_mutex_lock(&participant->streams_mutex) janus_videoroom_recorder_close(participant); + janus_mutex_unlock(&participant->streams_mutex) } else if(participant->recording_active && g_atomic_int_get(&participant->session->started)) { /* We've started recording, send a PLI/FIR and go on */ + janus_mutex_lock(&participant->streams_mutex); GList *temp = participant->streams; while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; @@ -10744,6 +10773,7 @@ static void *janus_videoroom_handler(void *data) { } temp = temp->next; } + janus_mutex_unlock(&participant->streams_mutex); } } janus_mutex_unlock(&participant->rec_mutex); @@ -10787,10 +10817,10 @@ static void *janus_videoroom_handler(void *data) { } } } - janus_mutex_unlock(&participant->streams_mutex); /* If at least a description changed, notify everyone else about the publisher details */ if(desc_updated) janus_videoroom_notify_about_publisher(participant, TRUE); + janus_mutex_unlock(&participant->streams_mutex); } /* Done */ event = json_object(); @@ -11830,6 +11860,7 @@ static void *janus_videoroom_handler(void *data) { feeds = json_array(); json_object_set_new(root, "streams", feeds); janus_refcount_increase(&publisher->ref); + janus_mutex_lock(&publisher->streams_mutex); GList *temp = publisher->streams, *touched_already = NULL; while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; @@ -11857,6 +11888,7 @@ static void *janus_videoroom_handler(void *data) { } temp = temp->next; } + janus_mutex_unlock(&publisher->streams_mutex); g_list_free(touched_already); janus_refcount_decrease(&publisher->ref); /* Take note of the fact this is a legacy request */ @@ -12883,6 +12915,7 @@ static void *janus_videoroom_handler(void *data) { /* Is this room recorded, or are we recording this publisher already? */ janus_mutex_lock(&participant->rec_mutex); if(videoroom->record || participant->recording_active) { + janus_mutex_lock(&participant->streams_mutex); GList *temp = participant->streams; while(temp) { janus_videoroom_publisher_stream *ps = (janus_videoroom_publisher_stream *)temp->data; @@ -12890,6 +12923,7 @@ static void *janus_videoroom_handler(void *data) { temp = temp->next; } participant->recording_active = TRUE; + janus_mutex_unlock(&participant->streams_mutex); } janus_mutex_unlock(&participant->rec_mutex); /* Send the answer back to the publisher */ @@ -12910,7 +12944,9 @@ static void *janus_videoroom_handler(void *data) { /* If this is an update/renegotiation, notify participants about this */ if(sdp_update && g_atomic_int_get(&session->started)) { /* Notify all other participants this publisher's media has changed */ + janus_mutex_lock(&participant->streams_mutex); janus_videoroom_notify_about_publisher(participant, TRUE); + janus_mutex_unlock(&participant->streams_mutex); } /* Done */ if(res != JANUS_OK) { @@ -13223,6 +13259,7 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf, janus_videoroom_publisher *p = ps->publisher; if(p == NULL || g_atomic_int_get(&p->destroyed)) return; + janus_mutex_lock(&p->streams_mutex); janus_mutex_lock(&p->rtp_forwarders_mutex); if(g_hash_table_size(p->rtp_forwarders) == 0) { janus_mutex_unlock(&p->rtp_forwarders_mutex); @@ -13256,6 +13293,7 @@ static void janus_videoroom_rtp_forwarder_rtcp_receive(janus_rtp_forwarder *rf, temp = temp->next; } janus_mutex_unlock(&p->rtp_forwarders_mutex); + janus_mutex_unlock(&p->streams_mutex); if(found) janus_videoroom_reqpli(ps, "RTCP from remotized forwarder"); } @@ -13468,7 +13506,9 @@ static void *janus_videoroom_remote_publisher_thread(void *user_data) { string_ids ? (gpointer)g_strdup(publisher->user_id_str) : (gpointer)janus_uint64_dup(publisher->user_id), publisher); /* Let's also notify all other participants that the publisher is here */ + janus_mutex_lock(&publisher->streams_mutex); janus_videoroom_notify_about_publisher(publisher, FALSE); + janus_mutex_unlock(&publisher->streams_mutex); /* Loop */ int num = 0, i = 0; @@ -13652,7 +13692,9 @@ static void *janus_videoroom_remote_publisher_thread(void *user_data) { janus_mutex_lock(&publisher->rec_mutex); g_free(publisher->recording_base); publisher->recording_base = NULL; + janus_mutex_lock(&publisher->streams_mutex) janus_videoroom_recorder_close(publisher); + janus_mutex_unlock(&publisher->streams_mutex) janus_mutex_unlock(&publisher->rec_mutex); publisher->acodec = JANUS_AUDIOCODEC_NONE; publisher->vcodec = JANUS_VIDEOCODEC_NONE;