From 4b23a80bd14dcf509ebe8de22f26906d34e0b079 Mon Sep 17 00:00:00 2001 From: Razvan Crainea Date: Thu, 28 Mar 2024 11:46:18 +0200 Subject: [PATCH] event_rabbitmq: add timeout support Add RPC timeout support for any command sent to the rabbitmq server --- modules/rabbitmq/doc/rabbitmq_admin.xml | 27 +++++++++++++++++++++++++ modules/rabbitmq/rabbitmq.c | 17 ++++++++++++++++ modules/rabbitmq/rmq_servers.c | 5 +++++ modules/rabbitmq/rmq_servers.h | 3 +++ 4 files changed, 52 insertions(+) diff --git a/modules/rabbitmq/doc/rabbitmq_admin.xml b/modules/rabbitmq/doc/rabbitmq_admin.xml index 6e776d0af47..506ad2e8354 100644 --- a/modules/rabbitmq/doc/rabbitmq_admin.xml +++ b/modules/rabbitmq/doc/rabbitmq_admin.xml @@ -210,6 +210,33 @@ modparam("rabbitmq", "use_tls", 1) aram("rabbitmq", "connect_timeout", 1000) + + + + +
+ <varname>timeout</varname> (integer) + + Indicates the timeout (in milliseconds) of any command (i.e. publish) + sent to the RabbitMQ server. + + + NOTE that this parameter is available only starting with + RabbitMQ library version 0.9.0; setting it when using an + earlier version will have no effect, and the publish command will run in + blocking mode. + + + + Default value is 0 (no timeout - blocking mode) + + + + Set the <varname>timeout</varname> parameter + +... +modparam("rabbitmq", "timeout", 1000) # timeout after 1s +...
diff --git a/modules/rabbitmq/rabbitmq.c b/modules/rabbitmq/rabbitmq.c index 100c2741dc1..f67b633ad59 100644 --- a/modules/rabbitmq/rabbitmq.c +++ b/modules/rabbitmq/rabbitmq.c @@ -45,8 +45,12 @@ int use_tls; struct openssl_binds openssl_api; struct tls_mgm_binds tls_api; static int rmq_connect_timeout = RMQ_DEFAULT_CONNECT_TIMEOUT; +static int rmq_timeout = 0; struct timeval conn_timeout_tv; +#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000 +struct timeval rpc_timeout_tv; +#endif #if AMQP_VERSION < AMQP_VERSION_CODE(0, 10, 0, 0) gen_lock_t *ssl_lock; @@ -57,6 +61,7 @@ static const param_export_t params[]={ (void *)rmq_server_add}, {"use_tls", INT_PARAM, &use_tls}, {"connect_timeout", INT_PARAM, &rmq_connect_timeout}, + {"timeout", INT_PARAM, &rmq_timeout}, {0,0,0} }; @@ -165,6 +170,18 @@ static int mod_init(void) conn_timeout_tv.tv_sec = rmq_connect_timeout/1000; conn_timeout_tv.tv_usec = (rmq_connect_timeout%1000)*1000; +#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000 + if (rmq_timeout < 0) { + LM_WARN("invalid value for 'timeout' %d; fallback to blocking mode\n", rmq_timeout); + rmq_timeout = 0; + } + rpc_timeout_tv.tv_sec = rmq_timeout/1000; + rpc_timeout_tv.tv_usec = (rmq_timeout%1000)*1000; +#else + if (rmq_timeout != 0) + LM_WARN("setting the timeout without support for it; fallback to blocking mode\n"); +#endif + return 0; } diff --git a/modules/rabbitmq/rmq_servers.c b/modules/rabbitmq/rmq_servers.c index ba3eec0ee78..8e1b1c0d100 100644 --- a/modules/rabbitmq/rmq_servers.c +++ b/modules/rabbitmq/rmq_servers.c @@ -341,6 +341,11 @@ int rmq_reconnect(struct rmq_server *srv) LM_ERR("cannot open AMQP socket\n"); goto clean_rmq_conn; } +#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000 + if (rpc_timeout_tv.tv_sec > 0 && + amqp_set_rpc_timeout(srv->conn, &rpc_timeout_tv) < 0) + LM_ERR("setting RPC timeout - going blocking\n"); +#endif #else socket = amqp_open_socket_noblock(srv->uri.host, srv->uri.port, diff --git a/modules/rabbitmq/rmq_servers.h b/modules/rabbitmq/rmq_servers.h index bdbf17dbe0a..bc3fb2c51ee 100644 --- a/modules/rabbitmq/rmq_servers.h +++ b/modules/rabbitmq/rmq_servers.h @@ -108,5 +108,8 @@ extern int use_tls; extern struct openssl_binds openssl_api; extern struct tls_mgm_binds tls_api; extern struct timeval conn_timeout_tv; +#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000 +extern struct timeval rpc_timeout_tv; +#endif #endif /* _RMQ_SERVERS_H_ */