Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix locks of speech_channel due to infinite loops on apr_thread_cond_timedwait() #66

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 44 additions & 33 deletions app-unimrcp/speech_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ const char *speech_channel_status_to_string(speech_channel_status_t status)
}

/* Use this function to set the current channel state without locking the
* speech channel. Do this if you already have the speech channel locked.
* speech channel. Do this if you already have the speech channel locked.
*/
void speech_channel_set_state_unlocked(speech_channel_t *schannel, speech_channel_state_t state)
{
Expand All @@ -118,19 +118,23 @@ void speech_channel_set_state_unlocked(speech_channel_t *schannel, speech_channe
void speech_channel_set_state(speech_channel_t *schannel, speech_channel_state_t state)
{
if (schannel) {
if (!schannel->mutex)
return;
apr_thread_mutex_lock(schannel->mutex);

speech_channel_set_state_unlocked(schannel, state);

if (!schannel->mutex)
return;
apr_thread_mutex_unlock(schannel->mutex);
}
}

/* Send BARGE-IN-OCCURRED. */
int speech_channel_bargeinoccurred(speech_channel_t *schannel)
int speech_channel_bargeinoccurred(speech_channel_t *schannel)
{
int status = 0;

if (!schannel)
return -1;

Expand All @@ -150,12 +154,12 @@ int speech_channel_bargeinoccurred(speech_channel_t *schannel)
ast_log(LOG_ERROR, "(%s) Failed to create BARGE_IN_OCCURRED message\n", schannel->name);
status = -1;
} else {
if (!mrcp_application_message_send(schannel->unimrcp_session, schannel->unimrcp_channel, mrcp_message))
if (!mrcp_application_message_send(schannel->unimrcp_session, schannel->unimrcp_channel, mrcp_message)) {
ast_log(LOG_WARNING, "(%s) [speech_channel_bargeinoccurred] Failed to send BARGE_IN_OCCURRED message\n", schannel->name);
else if (schannel->cond != NULL) {
while (schannel->state == SPEECH_CHANNEL_PROCESSING) {
if (apr_thread_cond_timedwait(schannel->cond, schannel->mutex, globals.speech_channel_timeout) == APR_TIMEUP) {
break;
} else if (schannel->cond != NULL) {
if (schannel->state == SPEECH_CHANNEL_PROCESSING) {
if ((apr_thread_cond_timedwait(schannel->cond, schannel->mutex, globals.speech_channel_timeout) == APR_TIMEUP) && (schannel->state == SPEECH_CHANNEL_PROCESSING)) {
ast_log(LOG_WARNING, "(%s) [speech_channel_bargeinoccurred] MRCP session has not closed after %" APR_TIME_T_FMT " ms\n", schannel->name, apr_time_as_msec(globals.speech_channel_timeout));
}
}
}
Expand Down Expand Up @@ -252,7 +256,7 @@ speech_channel_t *speech_channel_create(
ast_log(LOG_DEBUG, "Created speech channel: Name=%s, Type=%s, Codec=%s, Rate=%u on %s\n", schan->name, speech_channel_type_to_string(schan->type), schan->codec, schan->rate,
ast_channel_name(chan));
}

if (!ast_strlen_zero(rec_file_path)) {
schan->rec_file = fopen(rec_file_path, "wb");
if(!schan->rec_file) {
Expand Down Expand Up @@ -305,7 +309,7 @@ speech_channel_t *speech_channel_create(
}

static mpf_termination_t *speech_channel_create_mpf_termination(speech_channel_t *schannel)
{
{
mpf_stream_capabilities_t *capabilities = NULL;
int sample_rate;

Expand Down Expand Up @@ -336,7 +340,7 @@ int speech_channel_destroy(speech_channel_t *schannel)
ast_log(LOG_ERROR, "Speech channel structure pointer is NULL\n");
return -1;
}

ast_log(LOG_DEBUG, "Destroy speech channel: Name=%s, Type=%s, Codec=%s, Rate=%u\n", schannel->name, speech_channel_type_to_string(schannel->type), schannel->codec, schannel->rate);

if (schannel->mutex)
Expand All @@ -355,26 +359,23 @@ int speech_channel_destroy(speech_channel_t *schannel)

/* Destroy the channel and session if not already done. */
if (schannel->state != SPEECH_CHANNEL_CLOSED) {
int warned = 0;

if ((schannel->unimrcp_session != NULL) && (schannel->unimrcp_channel != NULL)) {
if (!mrcp_application_session_terminate(schannel->unimrcp_session))
ast_log(LOG_WARNING, "(%s) Unable to terminate application session\n", schannel->name);
}

ast_log(LOG_DEBUG, "(%s) Waiting for MRCP session to terminate\n", schannel->name);
while (schannel->state != SPEECH_CHANNEL_CLOSED) {
if (schannel->state != SPEECH_CHANNEL_CLOSED) {
if (schannel->cond != NULL) {
if ((apr_thread_cond_timedwait(schannel->cond, schannel->mutex, globals.speech_channel_timeout) == APR_TIMEUP) && (!warned)) {
warned = 1;
if ((apr_thread_cond_timedwait(schannel->cond, schannel->mutex, globals.speech_channel_timeout) == APR_TIMEUP) && (schannel->state != SPEECH_CHANNEL_CLOSED)) {
ast_log(LOG_WARNING, "(%s) MRCP session has not terminated after %" APR_TIME_T_FMT " ms\n", schannel->name, apr_time_as_msec(globals.speech_channel_timeout));
}
}
}
}

if (schannel->state != SPEECH_CHANNEL_CLOSED) {
ast_log(LOG_ERROR, "(%s) Failed to destroy channel. Continuing\n", schannel->name);
ast_log(LOG_ERROR, "(%s) Failed to destroy channel. Continuing\n", schannel->name);
}

if (schannel->rec_file) {
Expand All @@ -388,7 +389,7 @@ int speech_channel_destroy(speech_channel_t *schannel)

if (schannel->audio_queue != NULL) {
if (audio_queue_destroy(schannel->audio_queue) != 0)
ast_log(LOG_WARNING, "(%s) Unable to destroy channel audio queue\n",schannel->name);
ast_log(LOG_WARNING, "(%s) Unable to destroy channel audio queue\n", schannel->name);
}

if (schannel->mutex != NULL)
Expand Down Expand Up @@ -420,6 +421,10 @@ int speech_channel_destroy(speech_channel_t *schannel)
schannel->data = NULL;
schannel->chan = NULL;
schannel->rec_file = NULL;
schannel->format = NULL;
schannel->bits_per_sample = 0;
schannel->silence = 0;
schannel->rate = 0;

return 0;
}
Expand Down Expand Up @@ -452,7 +457,7 @@ int speech_channel_open(speech_channel_t *schannel, ast_mrcp_profile_t *profile)
apr_thread_mutex_unlock(schannel->mutex);
return 2;
}

/* Set session name for logging purposes. */
mrcp_application_session_name_set(schannel->unimrcp_session, schannel->name);

Expand Down Expand Up @@ -494,7 +499,7 @@ int speech_channel_open(speech_channel_t *schannel, ast_mrcp_profile_t *profile)
}

/* Wait for channel to be ready. */
while (schannel->state == SPEECH_CHANNEL_CLOSED)
if (schannel->state == SPEECH_CHANNEL_CLOSED)
apr_thread_cond_timedwait(schannel->cond, schannel->mutex, globals.speech_channel_timeout);

if (schannel->state == SPEECH_CHANNEL_READY) {
Expand Down Expand Up @@ -569,18 +574,18 @@ int speech_channel_stop(speech_channel_t *schannel)
ast_log(LOG_ERROR, "(%s) Failed to create STOP message\n", schannel->name);
status = -1;
} else {
if (!mrcp_application_message_send(schannel->unimrcp_session, schannel->unimrcp_channel, mrcp_message))
if (!mrcp_application_message_send(schannel->unimrcp_session, schannel->unimrcp_channel, mrcp_message)) {
ast_log(LOG_WARNING, "(%s) Failed to send STOP message\n", schannel->name);
else if (schannel->cond != NULL) {
while (schannel->state == SPEECH_CHANNEL_PROCESSING) {
if (apr_thread_cond_timedwait(schannel->cond, schannel->mutex, globals.speech_channel_timeout) == APR_TIMEUP) {
break;
} else if (schannel->cond != NULL) {
if (schannel->state == SPEECH_CHANNEL_PROCESSING) {
if ((apr_thread_cond_timedwait(schannel->cond, schannel->mutex, globals.speech_channel_timeout) == APR_TIMEUP) && (schannel->state == SPEECH_CHANNEL_PROCESSING)) {
ast_log(LOG_WARNING, "(%s) MRCP session has not closed after %" APR_TIME_T_FMT " ms\n", schannel->name, apr_time_as_msec(globals.speech_channel_timeout));
}
}
}

if (schannel->state == SPEECH_CHANNEL_PROCESSING) {
ast_log(LOG_ERROR, "(%s) Timed out waiting for session to close. Continuing\n", schannel->name);
ast_log(LOG_ERROR, "(%s) Timed out waiting for session to close. Continuing\n", schannel->name);
schannel->state = SPEECH_CHANNEL_ERROR;
status = -1;
} else if (schannel->state == SPEECH_CHANNEL_ERROR) {
Expand Down Expand Up @@ -640,14 +645,20 @@ int speech_channel_read(speech_channel_t *schannel, void *data, apr_size_t *len,
apr_size_t req_len = *len;
#endif
audio_queue_t *queue = schannel->audio_queue;
if (!queue)
return -1;

if (!schannel->mutex)
return -1;
apr_thread_mutex_lock(schannel->mutex);

if (schannel->state == SPEECH_CHANNEL_PROCESSING)
status = audio_queue_read(queue, data, len, block);
else
status = 1;

if (!schannel->mutex)
return -1;
apr_thread_mutex_unlock(schannel->mutex);

#if SPEECH_CHANNEL_DUMP
Expand All @@ -657,7 +668,7 @@ int speech_channel_read(speech_channel_t *schannel, void *data, apr_size_t *len,
#endif

#if SPEECH_CHANNEL_TRACE
ast_log(LOG_DEBUG, "(%s) channel_read() status=%d req=%"APR_SIZE_T_FMT" read=%"APR_SIZE_T_FMT"\n",
ast_log(LOG_DEBUG, "(%s) channel_read() status=%d req=%"APR_SIZE_T_FMT" read=%"APR_SIZE_T_FMT"\n",
schannel->name, status, req_len, *len);
#endif

Expand Down Expand Up @@ -696,7 +707,7 @@ int speech_channel_write(speech_channel_t *schannel, void *data, apr_size_t *len
apr_thread_mutex_unlock(schannel->mutex);

#if SPEECH_CHANNEL_TRACE
ast_log(LOG_DEBUG, "(%s) channel_write() status=%d req=%"APR_SIZE_T_FMT" written=%"APR_SIZE_T_FMT"\n",
ast_log(LOG_DEBUG, "(%s) channel_write() status=%d req=%"APR_SIZE_T_FMT" written=%"APR_SIZE_T_FMT"\n",
schannel->name, status, req_len, *len);
#endif

Expand Down Expand Up @@ -731,12 +742,12 @@ int speech_channel_ast_write(speech_channel_t *schannel, void *data, apr_size_t

if (schannel->rec_file)
fwrite(data, 1, len, schannel->rec_file);

if (ast_write(schannel->chan, &fr) < 0) {
ast_log(LOG_WARNING, "(%s) Unable to write frame to channel: %s\n", schannel->name, strerror(errno));
return -1;
}

return 0;
}

Expand All @@ -755,7 +766,7 @@ struct ast_filestream* astchan_stream_file(struct ast_channel *chan, const char
ast_log(LOG_NOTICE, "Stream file %s on %s length:%"APR_OFF_T_FMT"\n", filename, ast_channel_name(chan), filelength);
if (filelength_out)
*filelength_out = filelength;

if (ast_seekstream(fs, 0, SEEK_SET) != 0) {
ast_log(LOG_WARNING, "ast_seekstream failed on %s for %s\n", ast_channel_name(chan), filename);
}
Expand Down Expand Up @@ -825,7 +836,7 @@ static int text_starts_with(const char *text, const char *match)
/* Is there a match? */
result = (textlen > matchlen) && (strncmp(match, text, matchlen) == 0);
}

return result;
}

Expand Down