From c1ee3684ee881bcf127994a6ff95ead766e22b8f Mon Sep 17 00:00:00 2001 From: Rafael Silva Date: Thu, 9 Dec 2021 16:04:20 +0000 Subject: [PATCH 1/4] cleanup: trim trailling whitespaces Signed-off-by: Rafael Silva --- examples/bearssl_publisher.c | 44 +- examples/bio_publisher.c | 28 +- examples/bio_publisher_win.c | 26 +- examples/mbedtls_publisher.c | 26 +- examples/openssl_publisher.c | 26 +- examples/openssl_publisher_win.c | 26 +- examples/templates/bearssl_sockets.h | 4 +- examples/templates/bio_sockets.h | 2 +- examples/templates/posix_sockets.h | 6 +- include/mqtt.h | 742 +++++++++++++-------------- include/mqtt_pal.h | 36 +- src/mqtt.c | 120 ++--- src/mqtt_pal.c | 10 +- tests.c | 32 +- 14 files changed, 564 insertions(+), 564 deletions(-) diff --git a/examples/bearssl_publisher.c b/examples/bearssl_publisher.c index 9f9e4d3..04cc1c2 100644 --- a/examples/bearssl_publisher.c +++ b/examples/bearssl_publisher.c @@ -24,13 +24,13 @@ int testCerts(br_x509_trust_anchor *anch); /** * @brief The function that would be called whenever a PUBLISH is received. - * - * @note This function is not used in this example. + * + * @note This function is not used in this example. */ static void publish_callback(void** unused, struct mqtt_response_publish *published); /** - * @brief Safely closes the socket in \p ctx before \c exit. + * @brief Safely closes the socket in \p ctx before \c exit. */ static void exit_example(int status, bearssl_context *ctx); @@ -38,7 +38,7 @@ static void exit_example(int status, bearssl_context *ctx); /** * @brief Callback function to accumulate data in a buffer - */ + */ static void vblob_append(void *cc, const void *data, size_t len); /** @@ -54,14 +54,14 @@ static int certificate_to_trust_anchor(br_x509_certificate *xc, br_x509_trust_an /** * @brief Generates trust anchors for BearSSL from the contents of \p ca_file and stores them * in the \p anchoOut array (based on code in BearSSL tools) - */ + */ static size_t get_trusted_anchors_from_file(const char *ca_file, br_x509_trust_anchor **anchOut); /** * @brief Generates trust anchors for BearSSL from the string \p ca and stores them * in the \p anchOut array (based on code in BearSSL tools) - * + * * @returns The number of trust anchors generated - */ + */ static size_t get_trusted_anchors(const unsigned char *ca, size_t ca_len, br_x509_trust_anchor **anchOut); // Global to return Ctrl-C event @@ -76,9 +76,9 @@ void signalHandler(int signum) { } /** - * A simple program to that publishes the current time until Ctrl-C is pressed. + * A simple program to that publishes the current time until Ctrl-C is pressed. */ -int main(int argc, const char *argv[]) +int main(int argc, const char *argv[]) { const char* addr; const char* port; @@ -126,20 +126,20 @@ int main(int argc, const char *argv[]) /* generate BearSSL trusted anchors - specifically kept out of open_nb_socket since it needs to malloc */ - /* - Generate BearSSL trusted anchors + /* + Generate BearSSL trusted anchors - This code converts the certificate into a format that is readable by the BearSSL library. Sadly there isn't + This code converts the certificate into a format that is readable by the BearSSL library. Sadly there isn't a way to accomplish this without the use of malloc thus I specifically kept this code out of open_nb_socket. The author of the bearSSL library offers two options: - 1) Do the conversion of the certificate in your code. There are examples of how to do this. The benefit of + 1) Do the conversion of the certificate in your code. There are examples of how to do this. The benefit of this is that you can run the same code against different servers by providing the appropriate trusted root pem file. The function get_trusted_anchors does exactly this. 2) Use the tool provided with BearSSL to generate the C code that will initialize the trusted anchor structures. - Essentially it simply generates initialized C structures that you can copy into your code. You will not need - to use malloc but you will lose some flexibility. For information on the tool see + Essentially it simply generates initialized C structures that you can copy into your code. You will not need + to use malloc but you will lose some flexibility. For information on the tool see this page: https://www.bearssl.org/api1.html */ ctx.ta_count = get_trusted_anchors_from_file(ca_file, &ctx.anchOut); @@ -196,7 +196,7 @@ int main(int argc, const char *argv[]) exit_example(EXIT_FAILURE, &ctx); } close_socket(&ctx); - + if (0 != open_nb_socket(&ctx, addr, port, bearssl_iobuf, sizeof(bearssl_iobuf))) { fprintf(stderr, "Unable to open socket: %d\n", errno); @@ -249,7 +249,7 @@ int main(int argc, const char *argv[]) return 4; } usleep(100000U); - } + } /* disconnect */ printf("\n%s disconnecting from %s\n", argv[0], addr); @@ -266,7 +266,7 @@ int main(int argc, const char *argv[]) sleep(1); - /* exit */ + /* exit */ exit_example(EXIT_SUCCESS, &ctx); } @@ -276,7 +276,7 @@ static void exit_example(int status, bearssl_context *ctx) exit(status); } -static void publish_callback(void** unused, struct mqtt_response_publish *published) +static void publish_callback(void** unused, struct mqtt_response_publish *published) { static const char *prelim = "Received publish('"; /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ @@ -302,7 +302,7 @@ static void vblob_append(void *cc, const void *data, size_t len) static void free_ta_contents(br_x509_trust_anchor *ta) { free(ta->dn.data); - switch (ta->pkey.key_type) + switch (ta->pkey.key_type) { case BR_KEYTYPE_RSA: free(ta->pkey.key.rsa.n); @@ -342,7 +342,7 @@ static int certificate_to_trust_anchor(br_x509_certificate *xc, br_x509_trust_an ta->dn.len = vdn.data_length; ta->flags = 0; - if (br_x509_decoder_isCA(&dc)) + if (br_x509_decoder_isCA(&dc)) { ta->flags |= BR_X509_TA_CA; } @@ -403,7 +403,7 @@ static size_t get_trusted_anchors_from_file(const char *ca_file, br_x509_trust_a if (certs != NULL) { size_t read = fread(certs, 1, fsize, f); - + fclose(f); if (read == fsize) { diff --git a/examples/bio_publisher.c b/examples/bio_publisher.c index 56cbc69..562139a 100644 --- a/examples/bio_publisher.c +++ b/examples/bio_publisher.c @@ -1,7 +1,7 @@ /** * @file - * A simple program to that publishes the current time whenever ENTER is pressed. + * A simple program to that publishes the current time whenever ENTER is pressed. */ #include #include @@ -14,30 +14,30 @@ /** * @brief The function that would be called whenever a PUBLISH is received. - * - * @note This function is not used in this example. + * + * @note This function is not used in this example. */ void publish_callback(void** unused, struct mqtt_response_publish *published); /** - * @brief The client's refresher. This function triggers back-end routines to + * @brief The client's refresher. This function triggers back-end routines to * handle ingress/egress traffic to the broker. - * - * @note All this function needs to do is call \ref __mqtt_recv and - * \ref __mqtt_send every so often. I've picked 100 ms meaning that + * + * @note All this function needs to do is call \ref __mqtt_recv and + * \ref __mqtt_send every so often. I've picked 100 ms meaning that * client ingress/egress traffic will be handled every 100 ms. */ void* client_refresher(void* client); /** - * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. + * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. */ void exit_example(int status, BIO* sockfd, pthread_t *client_daemon); /** - * A simple program to that publishes the current time whenever ENTER is pressed. + * A simple program to that publishes the current time whenever ENTER is pressed. */ -int main(int argc, const char *argv[]) +int main(int argc, const char *argv[]) { const char* addr; const char* port; @@ -122,13 +122,13 @@ int main(int argc, const char *argv[]) fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); exit_example(EXIT_FAILURE, sockfd, &client_daemon); } - } + } /* disconnect */ printf("\n%s disconnecting from %s\n", argv[0], addr); sleep(1); - /* exit */ + /* exit */ exit_example(EXIT_SUCCESS, sockfd, &client_daemon); } @@ -141,14 +141,14 @@ void exit_example(int status, BIO* sockfd, pthread_t *client_daemon) -void publish_callback(void** unused, struct mqtt_response_publish *published) +void publish_callback(void** unused, struct mqtt_response_publish *published) { /* not used in this example */ } void* client_refresher(void* client) { - while(1) + while(1) { mqtt_sync((struct mqtt_client*) client); usleep(100000U); diff --git a/examples/bio_publisher_win.c b/examples/bio_publisher_win.c index cf13bc9..88e78f0 100644 --- a/examples/bio_publisher_win.c +++ b/examples/bio_publisher_win.c @@ -1,7 +1,7 @@ /** * @file - * A simple program to that publishes the current time whenever ENTER is pressed. + * A simple program to that publishes the current time whenever ENTER is pressed. */ #include #include @@ -13,30 +13,30 @@ /** * @brief The function that would be called whenever a PUBLISH is received. - * - * @note This function is not used in this example. + * + * @note This function is not used in this example. */ void publish_callback(void** unused, struct mqtt_response_publish *published); /** - * @brief The client's refresher. This function triggers back-end routines to + * @brief The client's refresher. This function triggers back-end routines to * handle ingress/egress traffic to the broker. - * - * @note All this function needs to do is call \ref __mqtt_recv and - * \ref __mqtt_send every so often. I've picked 100 ms meaning that + * + * @note All this function needs to do is call \ref __mqtt_recv and + * \ref __mqtt_send every so often. I've picked 100 ms meaning that * client ingress/egress traffic will be handled every 100 ms. */ void client_refresher(void* client); /** - * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. + * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. */ void exit_example(int status, BIO* sockfd); /** - * A simple program to that publishes the current time whenever ENTER is pressed. + * A simple program to that publishes the current time whenever ENTER is pressed. */ -int main(int argc, const char *argv[]) +int main(int argc, const char *argv[]) { const char* addr; const char* port; @@ -120,13 +120,13 @@ int main(int argc, const char *argv[]) fprintf(stderr, "\nerror: %s\n", mqtt_error_str(client.error)); exit_example(EXIT_FAILURE, sockfd); } - } + } /* disconnect */ printf("\n%s disconnecting from %s\n", argv[0], addr); Sleep(1000); - /* exit */ + /* exit */ exit_example(EXIT_SUCCESS, sockfd); } @@ -138,7 +138,7 @@ void exit_example(int status, BIO* sockfd) -void publish_callback(void** unused, struct mqtt_response_publish *published) +void publish_callback(void** unused, struct mqtt_response_publish *published) { /* not used in this example */ } diff --git a/examples/mbedtls_publisher.c b/examples/mbedtls_publisher.c index 00443f8..e9f0975 100644 --- a/examples/mbedtls_publisher.c +++ b/examples/mbedtls_publisher.c @@ -12,30 +12,30 @@ /** * @brief The function that would be called whenever a PUBLISH is received. - * - * @note This function is not used in this example. + * + * @note This function is not used in this example. */ void publish_callback(void** unused, struct mqtt_response_publish *published); /** - * @brief The client's refresher. This function triggers back-end routines to + * @brief The client's refresher. This function triggers back-end routines to * handle ingress/egress traffic to the broker. - * - * @note All this function needs to do is call \ref __mqtt_recv and - * \ref __mqtt_send every so often. I've picked 100 ms meaning that + * + * @note All this function needs to do is call \ref __mqtt_recv and + * \ref __mqtt_send every so often. I've picked 100 ms meaning that * client ingress/egress traffic will be handled every 100 ms. */ void* client_refresher(void* client); /** - * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. + * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. */ void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_daemon); /** - * A simple program to that publishes the current time whenever ENTER is pressed. + * A simple program to that publishes the current time whenever ENTER is pressed. */ -int main(int argc, const char *argv[]) +int main(int argc, const char *argv[]) { const char* addr; const char* port; @@ -127,13 +127,13 @@ int main(int argc, const char *argv[]) fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); exit_example(EXIT_FAILURE, sockfd, &client_daemon); } - } + } /* disconnect */ printf("\n%s disconnecting from %s\n", argv[0], addr); sleep(1); - /* exit */ + /* exit */ exit_example(EXIT_SUCCESS, sockfd, &client_daemon); } @@ -147,14 +147,14 @@ void exit_example(int status, mqtt_pal_socket_handle sockfd, pthread_t *client_d -void publish_callback(void** unused, struct mqtt_response_publish *published) +void publish_callback(void** unused, struct mqtt_response_publish *published) { /* not used in this example */ } void* client_refresher(void* client) { - while(1) + while(1) { mqtt_sync((struct mqtt_client*) client); usleep(100000U); diff --git a/examples/openssl_publisher.c b/examples/openssl_publisher.c index b7ff454..8051275 100644 --- a/examples/openssl_publisher.c +++ b/examples/openssl_publisher.c @@ -13,30 +13,30 @@ /** * @brief The function that would be called whenever a PUBLISH is received. - * - * @note This function is not used in this example. + * + * @note This function is not used in this example. */ void publish_callback(void** unused, struct mqtt_response_publish *published); /** - * @brief The client's refresher. This function triggers back-end routines to + * @brief The client's refresher. This function triggers back-end routines to * handle ingress/egress traffic to the broker. - * - * @note All this function needs to do is call \ref __mqtt_recv and - * \ref __mqtt_send every so often. I've picked 100 ms meaning that + * + * @note All this function needs to do is call \ref __mqtt_recv and + * \ref __mqtt_send every so often. I've picked 100 ms meaning that * client ingress/egress traffic will be handled every 100 ms. */ void* client_refresher(void* client); /** - * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. + * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. */ void exit_example(int status, BIO* sockfd, pthread_t *client_daemon); /** - * A simple program to that publishes the current time whenever ENTER is pressed. + * A simple program to that publishes the current time whenever ENTER is pressed. */ -int main(int argc, const char *argv[]) +int main(int argc, const char *argv[]) { const char* addr; const char* port; @@ -149,13 +149,13 @@ int main(int argc, const char *argv[]) fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); exit_example(EXIT_FAILURE, sockfd, &client_daemon); } - } + } /* disconnect */ printf("\n%s disconnecting from %s\n", argv[0], addr); sleep(1); - /* exit */ + /* exit */ exit_example(EXIT_SUCCESS, sockfd, &client_daemon); } @@ -168,14 +168,14 @@ void exit_example(int status, BIO* sockfd, pthread_t *client_daemon) -void publish_callback(void** unused, struct mqtt_response_publish *published) +void publish_callback(void** unused, struct mqtt_response_publish *published) { /* not used in this example */ } void* client_refresher(void* client) { - while(1) + while(1) { mqtt_sync((struct mqtt_client*) client); usleep(100000U); diff --git a/examples/openssl_publisher_win.c b/examples/openssl_publisher_win.c index 28b3a1f..26675df 100644 --- a/examples/openssl_publisher_win.c +++ b/examples/openssl_publisher_win.c @@ -12,30 +12,30 @@ /** * @brief The function that would be called whenever a PUBLISH is received. - * - * @note This function is not used in this example. + * + * @note This function is not used in this example. */ void publish_callback(void** unused, struct mqtt_response_publish *published); /** - * @brief The client's refresher. This function triggers back-end routines to + * @brief The client's refresher. This function triggers back-end routines to * handle ingress/egress traffic to the broker. - * - * @note All this function needs to do is call \ref __mqtt_recv and - * \ref __mqtt_send every so often. I've picked 100 ms meaning that + * + * @note All this function needs to do is call \ref __mqtt_recv and + * \ref __mqtt_send every so often. I've picked 100 ms meaning that * client ingress/egress traffic will be handled every 100 ms. */ void client_refresher(void* client); /** - * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. + * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. */ void exit_example(int status, BIO* sockfd); /** - * A simple program to that publishes the current time whenever ENTER is pressed. + * A simple program to that publishes the current time whenever ENTER is pressed. */ -int main(int argc, const char *argv[]) +int main(int argc, const char *argv[]) { const char* addr; const char* port; @@ -131,13 +131,13 @@ int main(int argc, const char *argv[]) fprintf(stderr, "error: %s\n", mqtt_error_str(client.error)); exit_example(EXIT_FAILURE, sockfd); } - } + } /* disconnect */ printf("\n%s disconnecting from %s\n", argv[0], addr); Sleep(1000); - /* exit */ + /* exit */ exit_example(EXIT_SUCCESS, sockfd); } @@ -149,14 +149,14 @@ void exit_example(int status, BIO* sockfd) -void publish_callback(void** unused, struct mqtt_response_publish *published) +void publish_callback(void** unused, struct mqtt_response_publish *published) { /* not used in this example */ } void client_refresher(void* client) { - while(1) + while(1) { mqtt_sync((struct mqtt_client*) client); Sleep(100); diff --git a/examples/templates/bearssl_sockets.h b/examples/templates/bearssl_sockets.h index e0c138c..0f21ab2 100644 --- a/examples/templates/bearssl_sockets.h +++ b/examples/templates/bearssl_sockets.h @@ -35,7 +35,7 @@ static int sock_read(void *ctx, unsigned char *buf, size_t len) { break; } } - + return (int)rlen; } @@ -44,7 +44,7 @@ static int sock_read(void *ctx, unsigned char *buf, size_t len) { */ static int sock_write(void *ctx, const unsigned char *buf, size_t len) { ssize_t wlen; - + for (;;) { wlen = write(*(int *)ctx, buf, len); diff --git a/examples/templates/bio_sockets.h b/examples/templates/bio_sockets.h index b5a146a..947174c 100644 --- a/examples/templates/bio_sockets.h +++ b/examples/templates/bio_sockets.h @@ -22,7 +22,7 @@ BIO* open_nb_socket(const char* addr, const char* port) { return NULL; } - return bio; + return bio; } #endif diff --git a/examples/templates/posix_sockets.h b/examples/templates/posix_sockets.h index 8de9990..ae41e86 100644 --- a/examples/templates/posix_sockets.h +++ b/examples/templates/posix_sockets.h @@ -48,7 +48,7 @@ int open_nb_socket(const char* addr, const char* port) { continue; } break; - } + } /* free servinfo */ freeaddrinfo(servinfo); @@ -63,12 +63,12 @@ int open_nb_socket(const char* addr, const char* port) { } #endif #if defined(__VMS) - /* + /* OpenVMS only partially implements fcntl. It works on file descriptors but silently fails on socket descriptors. So we need to fall back on to the older ioctl system to set non-blocking IO */ - int on = 1; + int on = 1; if (sockfd != -1) ioctl(sockfd, FIONBIO, &on); #endif diff --git a/include/mqtt.h b/include/mqtt.h index db9fa49..34c879d 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -46,53 +46,53 @@ extern "C" { /** * @file * @brief Declares all the MQTT-C functions and datastructures. - * + * * @note You should \#include . - * + * * @example simple_publisher.c - * A simple program to that publishes the current time whenever ENTER is pressed. - * + * A simple program to that publishes the current time whenever ENTER is pressed. + * * Usage: * \code{.sh} * ./bin/simple_publisher [address [port [topic]]] - * \endcode - * - * Where \c address is the address of the MQTT broker, \c port is the port number the + * \endcode + * + * Where \c address is the address of the MQTT broker, \c port is the port number the * MQTT broker is running on, and \c topic is the name of the topic to publish with. Note * that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org", * \c port = \c "1883", and \c topic = "datetime". - * + * * @example simple_subscriber.c * A simple program that subscribes to a single topic and prints all updates that are received. - * + * * Usage: * \code{.sh} * ./bin/simple_subscriber [address [port [topic]]] - * \endcode - * - * Where \c address is the address of the MQTT broker, \c port is the port number the + * \endcode + * + * Where \c address is the address of the MQTT broker, \c port is the port number the * MQTT broker is running on, and \c topic is the name of the topic subscribe to. Note * that all these arguments are optional and the defaults are \c address = \c "test.mosquitto.org", - * \c port = \c "1883", and \c topic = "datetime". - * + * \c port = \c "1883", and \c topic = "datetime". + * * @example reconnect_subscriber.c - * Same program as \ref simple_subscriber.c, but using the automatic reconnect functionality. - * + * Same program as \ref simple_subscriber.c, but using the automatic reconnect functionality. + * * @example bio_publisher.c * Same program as \ref simple_publisher.c, but uses a unencrypted BIO socket. * * @example openssl_publisher.c * Same program as \ref simple_publisher.c, but over an encrypted connection using OpenSSL. - * + * * Usage: * \code{.sh} * ./bin/openssl_publisher ca_file [address [port [topic]]] - * \endcode - * - * + * \endcode + * + * * @defgroup api API * @brief Documentation of everything you need to know to use the MQTT-C client. - * + * * This module contains everything you need to know to use MQTT-C in your application. * For usage examples see: * - @ref simple_publisher.c @@ -100,28 +100,28 @@ extern "C" { * - @ref reconnect_subscriber.c * - @ref bio_publisher.c * - @ref openssl_publisher.c - * - * @note MQTT-C can be used in both single-threaded and multi-threaded applications. All + * + * @note MQTT-C can be used in both single-threaded and multi-threaded applications. All * the functions in \ref api are thread-safe. - * + * * @defgroup packers Control Packet Serialization - * @brief Developer documentation of the functions and datastructures used for serializing MQTT + * @brief Developer documentation of the functions and datastructures used for serializing MQTT * control packets. - * + * * @defgroup unpackers Control Packet Deserialization - * @brief Developer documentation of the functions and datastructures used for deserializing MQTT + * @brief Developer documentation of the functions and datastructures used for deserializing MQTT * control packets. - * + * * @defgroup details Utilities * @brief Developer documentation for the utilities used to implement the MQTT-C client. * - * @note To deserialize a packet from a buffer use \ref mqtt_unpack_response (it's the only + * @note To deserialize a packet from a buffer use \ref mqtt_unpack_response (it's the only * function you need). */ /** - * @brief An enumeration of the MQTT control packet types. + * @brief An enumeration of the MQTT control packet types. * @ingroup unpackers * * @see @@ -148,7 +148,7 @@ extern "C" { /** * @brief The fixed header of an MQTT control packet. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: Fixed Header * @@ -167,15 +167,15 @@ struct mqtt_fixed_header { /** * @brief The protocol identifier for MQTT v3.1.1. * @ingroup packers - * + * * @see * MQTT v3.1.1: CONNECT Variable Header. - * + * */ #define MQTT_PROTOCOL_LEVEL 0x04 -/** - * @brief A macro used to declare the enum MQTTErrors and associated +/** + * @brief A macro used to declare the enum MQTTErrors and associated * error messages (the members of the num) at the same time. */ #define __ALL_MQTT_ERRORS(MQTT_ERROR) \ @@ -210,25 +210,25 @@ struct mqtt_fixed_header { /* todo: add more connection refused errors */ -/** - * @brief A macro used to generate the enum MQTTErrors from +/** + * @brief A macro used to generate the enum MQTTErrors from * \ref __ALL_MQTT_ERRORS * @see __ALL_MQTT_ERRORS */ #define GENERATE_ENUM(ENUM) ENUM, -/** - * @brief A macro used to generate the error messages associated with +/** + * @brief A macro used to generate the error messages associated with * MQTTErrors from \ref __ALL_MQTT_ERRORS * @see __ALL_MQTT_ERRORS */ #define GENERATE_STRING(STRING) #STRING, -/** +/** * @brief An enumeration of error codes. Error messages can be retrieved by calling \ref mqtt_error_str. * @ingroup api - * + * * @see mqtt_error_str */ enum MQTTErrors { @@ -237,47 +237,47 @@ enum MQTTErrors { MQTT_OK = 1 }; -/** +/** * @brief Returns an error message for error code, \p error. * @ingroup api - * + * * @param[in] error the error code. - * + * * @returns The associated error message. */ const char* mqtt_error_str(enum MQTTErrors error); /** * @brief Pack a MQTT 16 bit integer, given a native 16 bit integer . - * + * * @param[out] buf the buffer that the MQTT integer will be written to. * @param[in] integer the native integer to be written to \p buf. - * + * * @warning This function provides no error checking. - * + * * @returns 2 */ ssize_t __mqtt_pack_uint16(uint8_t *buf, uint16_t integer); /** * @brief Unpack a MQTT 16 bit integer to a native 16 bit integer. - * + * * @param[in] buf the buffer that the MQTT integer will be read from. - * + * * @warning This function provides no error checking and does not modify \p buf. - * + * * @returns The native integer */ uint16_t __mqtt_unpack_uint16(const uint8_t *buf); /** * @brief Pack a MQTT string, given a c-string \p str. - * + * * @param[out] buf the buffer that the MQTT string will be written to. * @param[in] str the c-string to be written to \p buf. - * + * * @warning This function provides no error checking. - * + * * @returns strlen(str) + 2 */ ssize_t __mqtt_pack_str(uint8_t *buf, const char* str); @@ -290,10 +290,10 @@ ssize_t __mqtt_pack_str(uint8_t *buf, const char* str); /** * @brief An enumeration of the return codes returned in a CONNACK packet. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: CONNACK return codes. - * + * */ enum MQTTConnackReturnCode { MQTT_CONNACK_ACCEPTED = 0u, @@ -307,21 +307,21 @@ enum MQTTConnackReturnCode { /** * @brief A connection response datastructure. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: CONNACK - Acknowledgement connection response. * */ struct mqtt_response_connack { - /** + /** * @brief Allows client and broker to check if they have a consistent view about whether there is * already a stored session state. */ uint8_t session_present_flag; - /** - * @brief The return code of the connection request. - * + /** + * @brief The return code of the connection request. + * * @see MQTTConnackReturnCode */ enum MQTTConnackReturnCode return_code; @@ -330,24 +330,24 @@ struct mqtt_response_connack { /** * @brief A publish packet received from the broker. * @ingroup unpackers - * - * A publish packet is received from the broker when a client publishes to a topic that the + * + * A publish packet is received from the broker when a client publishes to a topic that the * \em {local client} is subscribed to. * - * @see + * @see * MQTT v3.1.1: PUBLISH - Publish Message. - * + * */ struct mqtt_response_publish { - /** + /** * @brief The DUP flag. DUP flag is 0 if its the first attempt to send this publish packet. A DUP flag * of 1 means that this might be a re-delivery of the packet. */ uint8_t dup_flag; - /** + /** * @brief The quality of service level. - * + * * @see * MQTT v3.1.1: QoS Definitions * @@ -360,9 +360,9 @@ struct mqtt_response_publish { /** @brief Size of the topic name (number of characters). */ uint16_t topic_name_size; - /** - * @brief The topic name. - * @note topic_name is not null terminated. Therefore topic_name_size must be used to get the + /** + * @brief The topic name. + * @note topic_name is not null terminated. Therefore topic_name_size must be used to get the * string length. */ const void* topic_name; @@ -380,10 +380,10 @@ struct mqtt_response_publish { /** * @brief A publish acknowledgement for messages that were published with QoS level 1. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: PUBACK - Publish Acknowledgement. - * + * * */ struct mqtt_response_puback { @@ -394,10 +394,10 @@ struct mqtt_response_puback { /** * @brief The response packet to a PUBLISH packet with QoS level 2. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: PUBREC - Publish Received. - * + * * */ struct mqtt_response_pubrec { @@ -408,10 +408,10 @@ struct mqtt_response_pubrec { /** * @brief The response to a PUBREC packet. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: PUBREL - Publish Release. - * + * * */ struct mqtt_response_pubrel { @@ -422,10 +422,10 @@ struct mqtt_response_pubrel { /** * @brief The response to a PUBREL packet. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: PUBCOMP - Publish Complete. - * + * * */ struct mqtt_response_pubcomp { @@ -436,10 +436,10 @@ struct mqtt_response_pubcomp { /** * @brief An enumeration of subscription acknowledgement return codes. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: SUBACK Return Codes. - * + * */ enum MQTTSubackReturnCodes { MQTT_SUBACK_SUCCESS_MAX_QOS_0 = 0u, @@ -451,18 +451,18 @@ enum MQTTSubackReturnCodes { /** * @brief The response to a subscription request. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: SUBACK - Subscription Acknowledgement. - * + * */ struct mqtt_response_suback { /** @brief The published messages packet ID. */ uint16_t packet_id; - /** + /** * Array of return codes corresponding to the requested subscribe topics. - * + * * @see MQTTSubackReturnCodes */ const uint8_t *return_codes; @@ -474,10 +474,10 @@ struct mqtt_response_suback { /** * @brief The brokers response to a UNSUBSCRIBE request. * @ingroup unpackers - * + * * @see * MQTT v3.1.1: UNSUBACK - Unsubscribe Acknowledgement. - * + * */ struct mqtt_response_unsuback { /** @brief The published messages packet ID. */ @@ -487,12 +487,12 @@ struct mqtt_response_unsuback { /** * @brief The response to a ping request. * @ingroup unpackers - * + * * @note This response contains no members. - * + * * @see * MQTT v3.1.1: PINGRESP - Ping Response. - * + * */ struct mqtt_response_pingresp { int dummy; @@ -508,10 +508,10 @@ struct mqtt_response { /** * @brief A union of the possible responses from the broker. - * + * * @note The fixed_header contains the control type. This control type corresponds to the - * member of this union that should be accessed. For example if - * fixed_header#control_type == \c MQTT_CONTROL_PUBLISH then + * member of this union that should be accessed. For example if + * fixed_header#control_type == \c MQTT_CONTROL_PUBLISH then * decoded#publish should be accessed. */ union { @@ -530,15 +530,15 @@ struct mqtt_response { /** * @brief Deserialize the contents of \p buf into an mqtt_fixed_header object. * @ingroup unpackers - * + * * @note This function performs complete error checking and a positive return value * means the entire mqtt_response can be deserialized from \p buf. - * + * * @param[out] response the response who's \ref mqtt_response.fixed_header will be initialized. * @param[in] buf the buffer. * @param[in] bufsz the total number of bytes in the buffer. - * - * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough + * + * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough * bytes to parse the packet, or a negative value if there was a protocol violation. */ ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t *buf, size_t bufsz); @@ -546,17 +546,17 @@ ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t * /** * @brief Deserialize a CONNACK response from \p buf. * @ingroup unpackers - * + * * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the control packet type * must be \c MQTT_CONTROL_CONNACK. - * + * * @param[out] mqtt_response the mqtt_response that will be initialized. - * @param[in] buf the buffer that contains the variable header and payload of the packet. The + * @param[in] buf the buffer that contains the variable header and payload of the packet. The * first byte of \p buf should be the first byte of the variable header. - * - * @relates mqtt_response_connack - * - * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough + * + * @relates mqtt_response_connack + * + * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough * bytes to parse the packet, or a negative value if there was a protocol violation. */ ssize_t mqtt_unpack_connack_response (struct mqtt_response *mqtt_response, const uint8_t *buf); @@ -564,16 +564,16 @@ ssize_t mqtt_unpack_connack_response (struct mqtt_response *mqtt_response, const /** * @brief Deserialize a publish response from \p buf. * @ingroup unpackers - * + * * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must * have a control type of \c MQTT_CONTROL_PUBLISH. - * + * * @param[out] mqtt_response the response that is initialized from the contents of \p buf. * @param[in] buf the buffer with the incoming data. - * - * @relates mqtt_response_publish - * - * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough + * + * @relates mqtt_response_publish + * + * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough * bytes to parse the packet, or a negative value if there was a protocol violation. */ ssize_t mqtt_unpack_publish_response (struct mqtt_response *mqtt_response, const uint8_t *buf); @@ -581,17 +581,17 @@ ssize_t mqtt_unpack_publish_response (struct mqtt_response *mqtt_response, const /** * @brief Deserialize a PUBACK/PUBREC/PUBREL/PUBCOMP packet from \p buf. * @ingroup unpackers - * + * * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must * have a control type of \c MQTT_CONTROL_PUBACK, \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL * or \c MQTT_CONTROL_PUBCOMP. - * + * * @param[out] mqtt_response the response that is initialized from the contents of \p buf. * @param[in] buf the buffer with the incoming data. * * @relates mqtt_response_puback mqtt_response_pubrec mqtt_response_pubrel mqtt_response_pubcomp - * - * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough + * + * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough * bytes to parse the packet, or a negative value if there was a protocol violation. */ ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf); @@ -599,16 +599,16 @@ ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const u /** * @brief Deserialize a SUBACK packet from \p buf. * @ingroup unpacker - * + * * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must * have a control type of \c MQTT_CONTROL_SUBACK. - * + * * @param[out] mqtt_response the response that is initialized from the contents of \p buf. * @param[in] buf the buffer with the incoming data. * * @relates mqtt_response_suback - * - * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough + * + * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough * bytes to parse the packet, or a negative value if there was a protocol violation. */ ssize_t mqtt_unpack_suback_response(struct mqtt_response *mqtt_response, const uint8_t *buf); @@ -616,32 +616,32 @@ ssize_t mqtt_unpack_suback_response(struct mqtt_response *mqtt_response, const u /** * @brief Deserialize an UNSUBACK packet from \p buf. * @ingroup unpacker - * + * * @pre \ref mqtt_unpack_fixed_header must have returned a positive value and the mqtt_response must * have a control type of \c MQTT_CONTROL_UNSUBACK. - * + * * @param[out] mqtt_response the response that is initialized from the contents of \p buf. * @param[in] buf the buffer with the incoming data. * * @relates mqtt_response_unsuback - * - * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough + * + * @returns The number of bytes that were consumed, or 0 if the buffer does not contain enough * bytes to parse the packet, or a negative value if there was a protocol violation. - */ + */ ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf); /** * @brief Deserialize a packet from the broker. * @ingroup unpackers - * + * * @param[out] response the mqtt_response that will be initialize from \p buf. * @param[in] buf the incoming data buffer. * @param[in] bufsz the number of bytes available in the buffer. - * + * * @relates mqtt_response - * + * * @returns The number of bytes consumed on success, zero \p buf does not contain enough bytes - * to deserialize the packet, a negative value if a protocol violation was encountered. + * to deserialize the packet, a negative value if a protocol violation was encountered. */ ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, size_t bufsz); @@ -650,15 +650,15 @@ ssize_t mqtt_unpack_response(struct mqtt_response* response, const uint8_t *buf, /** * @brief Serialize an mqtt_fixed_header and write it to \p buf. * @ingroup packers - * + * * @note This function performs complete error checking and a positive return value * guarantees the entire packet will fit into the given buffer. - * + * * @param[out] buf the buffer to write to. * @param[in] bufsz the maximum number of bytes that can be put in to \p buf. * @param[in] fixed_header the fixed header that will be serialized. - * - * @returns The number of bytes written to \p buf, or 0 if \p buf is too small, or a + * + * @returns The number of bytes written to \p buf, or 0 if \p buf is too small, or a * negative value if there was a protocol violation. */ ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fixed_header *fixed_header); @@ -666,10 +666,10 @@ ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fix /** * @brief An enumeration of CONNECT packet flags. * @ingroup packers - * + * * @see * MQTT v3.1.1: CONNECT Variable Header. - * + * */ enum MQTTConnectFlags { MQTT_CONNECT_RESERVED = 1u, @@ -684,9 +684,9 @@ enum MQTTConnectFlags { }; /** - * @brief Serialize a connection request into a buffer. + * @brief Serialize a connection request into a buffer. * @ingroup packers - * + * * @param[out] buf the buffer to pack the connection request packet into. * @param[in] bufsz the number of bytes left in \p buf. * @param[in] client_id the ID that identifies the local client. \p client_id can be NULL or an empty @@ -695,33 +695,33 @@ enum MQTTConnectFlags { * Set to \c NULL for no will message. If \p will_topic is not \c NULL a * \p will_message must also be provided. * @param[in] will_message the will message to be published upon a unsuccessful disconnection of - * the local client. Set to \c NULL if \p will_topic is \c NULL. - * \p will_message must \em not be \c NULL if \p will_topic is not + * the local client. Set to \c NULL if \p will_topic is \c NULL. + * \p will_message must \em not be \c NULL if \p will_topic is not * \c NULL. * @param[in] will_message_size The size of \p will_message in bytes. - * @param[in] user_name the username to be used to connect to the broker with. Set to \c NULL if + * @param[in] user_name the username to be used to connect to the broker with. Set to \c NULL if * no username is required. * @param[in] password the password to be used to connect to the broker with. Set to \c NULL if * no password is required. * @param[in] connect_flags additional MQTTConnectFlags to be set. The only flags that need to be * set manually are \c MQTT_CONNECT_CLEAN_SESSION, - * \c MQTT_CONNECT_WILL_QOS_X (for \c X ∈ {0, 1, 2}), and - * \c MQTT_CONNECT_WILL_RETAIN. Set to 0 if no additional flags are + * \c MQTT_CONNECT_WILL_QOS_X (for \c X ∈ {0, 1, 2}), and + * \c MQTT_CONNECT_WILL_RETAIN. Set to 0 if no additional flags are * required. - * @param[in] keep_alive the keep alive time in seconds. It is the responsibility of the clinet + * @param[in] keep_alive the keep alive time in seconds. It is the responsibility of the clinet * to ensure packets are sent to the server \em {at least} this frequently. - * - * @note If there is a \p will_topic and no additional \p connect_flags are given, then by + * + * @note If there is a \p will_topic and no additional \p connect_flags are given, then by * default \p will_message will be published at QoS level 0. - * + * * @see * MQTT v3.1.1: CONNECT - Client Requests a Connection to a Server. * - * - * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the CONNECT + * + * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the CONNECT * packet, a negative value if there was a protocol violation. */ -ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, +ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, const char* client_id, const char* will_topic, const void* will_message, @@ -734,7 +734,7 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, /** * @brief An enumeration of the PUBLISH flags. * @ingroup packers - * + * * @see * MQTT v3.1.1: PUBLISH - Publish Message. * @@ -751,7 +751,7 @@ enum MQTTPublishFlags { /** * @brief Serialize a PUBLISH request and put it in \p buf. * @ingroup packers - * + * * @param[out] buf the buffer to put the PUBLISH packet in. * @param[in] bufsz the maximum number of bytes that can be put into \p buf. * @param[in] topic_name the topic to publish \p application_message under. @@ -759,16 +759,16 @@ enum MQTTPublishFlags { * @param[in] application_message the application message to be published. * @param[in] application_message_size the size of \p application_message in bytes. * @param[in] publish_flags The flags to publish \p application_message with. These include - * the \c MQTT_PUBLISH_DUP flag, \c MQTT_PUBLISH_QOS_X (\c X ∈ + * the \c MQTT_PUBLISH_DUP flag, \c MQTT_PUBLISH_QOS_X (\c X ∈ * {0, 1, 2}), and \c MQTT_PUBLISH_RETAIN flag. - * + * * @note The default QoS is level 0. - * + * * @see * MQTT v3.1.1: PUBLISH - Publish Message. * - * - * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBLISH + * + * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBLISH * packet, a negative value if there was a protocol violation. */ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, @@ -781,15 +781,15 @@ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, /** * @brief Serialize a PUBACK, PUBREC, PUBREL, or PUBCOMP packet and put it in \p buf. * @ingroup packers - * + * * @param[out] buf the buffer to put the PUBXXX packet in. * @param[in] bufsz the maximum number of bytes that can be put into \p buf. - * @param[in] control_type the type of packet. Must be one of: \c MQTT_CONTROL_PUBACK, - * \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL, + * @param[in] control_type the type of packet. Must be one of: \c MQTT_CONTROL_PUBACK, + * \c MQTT_CONTROL_PUBREC, \c MQTT_CONTROL_PUBREL, * or \c MQTT_CONTROL_PUBCOMP. * @param[in] packet_id the packet ID of the packet being acknowledged. - * - * + * + * * @see * MQTT v3.1.1: PUBACK - Publish Acknowledgement. * @@ -802,94 +802,94 @@ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, * @see * MQTT v3.1.1: PUBCOMP - Publish Complete. * - * - * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBXXX + * + * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PUBXXX * packet, a negative value if there was a protocol violation. */ -ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, +ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, enum MQTTControlPacketType control_type, uint16_t packet_id); -/** - * @brief The maximum number topics that can be subscribed to in a single call to +/** + * @brief The maximum number topics that can be subscribed to in a single call to * mqtt_pack_subscribe_request. * @ingroup packers - * + * * @see mqtt_pack_subscribe_request */ #define MQTT_SUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8 -/** +/** * @brief Serialize a SUBSCRIBE packet and put it in \p buf. * @ingroup packers - * + * * @param[out] buf the buffer to put the SUBSCRIBE packet in. * @param[in] bufsz the maximum number of bytes that can be put into \p buf. * @param[in] packet_id the packet ID to be used. * @param[in] ... \c NULL terminated list of (\c {const char *topic_name}, \c {int max_qos_level}) * pairs. - * + * * @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example: * @code * ssize_t n = mqtt_pack_subscribe_request(buf, bufsz, 1234, "topic_1", 0, "topic_2", 2, NULL); * @endcode - * + * * @see * MQTT v3.1.1: SUBSCRIBE - Subscribe to Topics. * - * - * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the SUBSCRIBE + * + * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the SUBSCRIBE * packet, a negative value if there was a protocol violation. */ -ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, - unsigned int packet_id, +ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, + unsigned int packet_id, ...); /* null terminated */ -/** - * @brief The maximum number topics that can be subscribed to in a single call to +/** + * @brief The maximum number topics that can be subscribed to in a single call to * mqtt_pack_unsubscribe_request. * @ingroup packers - * + * * @see mqtt_pack_unsubscribe_request */ #define MQTT_UNSUBSCRIBE_REQUEST_MAX_NUM_TOPICS 8 -/** +/** * @brief Serialize a UNSUBSCRIBE packet and put it in \p buf. * @ingroup packers - * + * * @param[out] buf the buffer to put the UNSUBSCRIBE packet in. * @param[in] bufsz the maximum number of bytes that can be put into \p buf. * @param[in] packet_id the packet ID to be used. * @param[in] ... \c NULL terminated list of \c {const char *topic_name}'s to unsubscribe from. - * + * * @note The variadic arguments, \p ..., \em must be followed by a \c NULL. For example: * @code * ssize_t n = mqtt_pack_unsubscribe_request(buf, bufsz, 4321, "topic_1", "topic_2", NULL); * @endcode - * + * * @see * MQTT v3.1.1: UNSUBSCRIBE - Unsubscribe from Topics. * - * - * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the UNSUBSCRIBE + * + * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the UNSUBSCRIBE * packet, a negative value if there was a protocol violation. */ -ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, - unsigned int packet_id, +ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, + unsigned int packet_id, ...); /* null terminated */ /** * @brief Serialize a PINGREQ and put it into \p buf. * @ingroup packers - * + * * @param[out] buf the buffer to put the PINGREQ packet in. * @param[in] bufsz the maximum number of bytes that can be put into \p buf. - * + * * @see * MQTT v3.1.1: PINGREQ - Ping Request. * - * + * * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the PINGREQ * packet, a negative value if there was a protocol violation. */ @@ -898,22 +898,22 @@ ssize_t mqtt_pack_ping_request(uint8_t *buf, size_t bufsz); /** * @brief Serialize a DISCONNECT and put it into \p buf. * @ingroup packers - * + * * @param[out] buf the buffer to put the DISCONNECT packet in. * @param[in] bufsz the maximum number of bytes that can be put into \p buf. - * + * * @see * MQTT v3.1.1: DISCONNECT - Disconnect Notification. * - * - * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the DISCONNECT + * + * @returns The number of bytes put into \p buf, 0 if \p buf is too small to fit the DISCONNECT * packet, a negative value if there was a protocol violation. */ ssize_t mqtt_pack_disconnect(uint8_t *buf, size_t bufsz); /** - * @brief An enumeration of queued message states. + * @brief An enumeration of queued message states. * @ingroup details */ enum MQTTQueuedMessageState { @@ -937,9 +937,9 @@ struct mqtt_queued_message { /** @brief The state of the message. */ enum MQTTQueuedMessageState state; - /** + /** * @brief The time at which the message was sent.. - * + * * @note A timeout will only occur if the message is in * the MQTT_QUEUED_AWAITING_ACK \c state. */ @@ -950,10 +950,10 @@ struct mqtt_queued_message { */ enum MQTTControlPacketType control_type; - /** + /** * @brief The packet id of the message. - * - * @note This field is only used if the associate \c control_type has a + * + * @note This field is only used if the associate \c control_type has a * \c packet_id field. */ uint16_t packet_id; @@ -962,14 +962,14 @@ struct mqtt_queued_message { /** * @brief A message queue. * @ingroup details - * + * * @note This struct is used internally to manage sending messages. - * @note The only members the user should use are \c curr and \c curr_sz. + * @note The only members the user should use are \c curr and \c curr_sz. */ struct mqtt_message_queue { - /** - * @brief The start of the message queue's memory block. - * + /** + * @brief The start of the message queue's memory block. + * * @warning This member should \em not be manually changed. */ void *mem_start; @@ -979,7 +979,7 @@ struct mqtt_message_queue { /** * @brief A pointer to the position in the buffer you can pack bytes at. - * + * * @note Immediately after packing bytes at \c curr you \em must call * mqtt_mq_register. */ @@ -987,17 +987,17 @@ struct mqtt_message_queue { /** * @brief The number of bytes that can be written to \c curr. - * - * @note curr_sz will decrease by more than the number of bytes you write to - * \c curr. This is because the mqtt_queued_message structs share the - * same memory (and thus, a mqtt_queued_message must be allocated in - * the message queue's memory whenever a new message is registered). + * + * @note curr_sz will decrease by more than the number of bytes you write to + * \c curr. This is because the mqtt_queued_message structs share the + * same memory (and thus, a mqtt_queued_message must be allocated in + * the message queue's memory whenever a new message is registered). */ size_t curr_sz; - + /** * @brief The tail of the array of mqtt_queued_messages's. - * + * * @note This member should not be used manually. */ struct mqtt_queued_message *queue_tail; @@ -1006,11 +1006,11 @@ struct mqtt_message_queue { /** * @brief Initialize a message queue. * @ingroup details - * + * * @param[out] mq The message queue to initialize. * @param[in] buf The buffer for this message queue. - * @param[in] bufsz The number of bytes in the buffer. - * + * @param[in] bufsz The number of bytes in the buffer. + * * @relates mqtt_message_queue */ void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz); @@ -1018,11 +1018,11 @@ void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz); /** * @brief Clear as many messages from the front of the queue as possible. * @ingroup details - * + * * @note Calls to this function are the \em only way to remove messages from the queue. - * + * * @param mq The message queue. - * + * * @relates mqtt_message_queue */ void mqtt_mq_clean(struct mqtt_message_queue *mq); @@ -1030,17 +1030,17 @@ void mqtt_mq_clean(struct mqtt_message_queue *mq); /** * @brief Register a message that was just added to the buffer. * @ingroup details - * + * * @note This function should be called immediately following a call to a packer function * that returned a positive value. The positive value (number of bytes packed) should * be passed to this function. - * + * * @param mq The message queue. * @param[in] nbytes The number of bytes that were just packed. - * + * * @note This function will step mqtt_message_queue::curr and update mqtt_message_queue::curr_sz. * @relates mqtt_message_queue - * + * * @returns The newly added struct mqtt_queued_message. */ struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size_t nbytes); @@ -1048,12 +1048,12 @@ struct mqtt_queued_message* mqtt_mq_register(struct mqtt_message_queue *mq, size /** * @brief Find a message in the message queue. * @ingroup details - * + * * @param mq The message queue. * @param[in] control_type The control type of the message you want to find. - * @param[in] packet_id The packet ID of the message you want to find. Set to \c NULL if you + * @param[in] packet_id The packet ID of the message you want to find. Set to \c NULL if you * don't want to specify a packet ID. - * + * * @relates mqtt_message_queue * @returns The found message. \c NULL if the message was not found. */ @@ -1062,9 +1062,9 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT /** * @brief Returns the mqtt_queued_message at \p index. * @ingroup details - * + * * @param mq_ptr A pointer to the message queue. - * @param index The index of the message. + * @param index The index of the message. * * @returns The mqtt_queued_message at \p index. */ @@ -1085,9 +1085,9 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT /* CLIENT */ /** - * @brief An MQTT client. + * @brief An MQTT client. * @ingroup details - * + * * @note All members can be manipulated via the related functions. */ struct mqtt_client { @@ -1100,8 +1100,8 @@ struct mqtt_client { /** @brief The keep-alive time in seconds. */ uint16_t keep_alive; - /** - * @brief A counter counting pings that have been sent to keep the connection alive. + /** + * @brief A counter counting pings that have been sent to keep the connection alive. * @see keep_alive */ int number_of_keep_alives; @@ -1113,31 +1113,31 @@ struct mqtt_client { */ size_t send_offset; - /** + /** * @brief The timestamp of the last message sent to the buffer. - * + * * This is used to detect the need for keep-alive pings. - * + * * @see keep_alive */ mqtt_pal_time_t time_of_last_send; - /** - * @brief The error state of the client. - * + /** + * @brief The error state of the client. + * * error should be MQTT_OK for the entirety of the connection. - * + * * @note The error state will be MQTT_ERROR_CONNECT_NOT_CALLED until * you call mqtt_connect. */ enum MQTTErrors error; - /** + /** * @brief The timeout period in seconds. - * + * * If the broker doesn't return an ACK within response_timeout seconds a timeout - * will occur and the message will be retransmitted. - * + * will occur and the message will be retransmitted. + * * @note The default value is 30 [seconds] but you can change it at any time. */ int response_timeout; @@ -1146,30 +1146,30 @@ struct mqtt_client { int number_of_timeouts; /** - * @brief Approximately much time it has typically taken to receive responses from the + * @brief Approximately much time it has typically taken to receive responses from the * broker. - * + * * @note This is tracked using a exponential-averaging. */ double typical_response_time; /** * @brief The callback that is called whenever a publish is received from the broker. - * - * Any topics that you have subscribed to will be returned from the broker as - * mqtt_response_publish messages. All the publishes received from the broker will + * + * Any topics that you have subscribed to will be returned from the broker as + * mqtt_response_publish messages. All the publishes received from the broker will * be passed to this function. - * + * * @note A pointer to publish_response_callback_state is always passed to the callback. - * Use publish_response_callback_state to keep track of any state information you + * Use publish_response_callback_state to keep track of any state information you * need. */ void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish); /** * @brief A pointer to any publish_response_callback state information you need. - * - * @note A pointer to this pointer will always be publish_response_callback upon + * + * @note A pointer to this pointer will always be publish_response_callback upon * receiving a publish message from the broker. */ void* publish_response_callback_state; @@ -1178,30 +1178,30 @@ struct mqtt_client { * @brief A user-specified callback, triggered on each \ref mqtt_sync, allowing * the user to perform state inspections (and custom socket error detection) * on the client. - * + * * This callback is triggered on each call to \ref mqtt_sync. If it returns MQTT_OK * then \ref mqtt_sync will continue normally (performing reads and writes). If it * returns an error then \ref mqtt_sync will not call reads and writes. - * + * * This callback can be used to perform custom error detection, namely platform * specific socket error detection, and force the client into an error state. - * - * This member is always initialized to NULL but it can be manually set at any + * + * This member is always initialized to NULL but it can be manually set at any * time. */ enum MQTTErrors (*inspector_callback)(struct mqtt_client*); /** * @brief A callback that is called whenever the client is in an error state. - * + * * This callback is responsible for: application level error handling, closing - * previous sockets, and reestabilishing the connection to the broker and - * session configurations (i.e. subscriptions). + * previous sockets, and reestabilishing the connection to the broker and + * session configurations (i.e. subscriptions). */ void (*reconnect_callback)(struct mqtt_client*, void**); /** - * @brief A pointer to some state. A pointer to this member is passed to + * @brief A pointer to some state. A pointer to this member is passed to * \ref mqtt_client.reconnect_callback. */ void* reconnect_state; @@ -1223,9 +1223,9 @@ struct mqtt_client { size_t curr_sz; } recv_buffer; - /** + /** * @brief A variable passed to support thread-safety. - * + * * A pointer to this variable is passed to \c MQTT_PAL_MUTEX_LOCK, and * \c MQTT_PAL_MUTEX_UNLOCK. */ @@ -1238,11 +1238,11 @@ struct mqtt_client { /** * @brief Generate a new next packet ID. * @ingroup details - * + * * Packet ID's are generated using a max-length LFSR. - * + * * @param client The MQTT client. - * + * * @returns The new packet ID that should be used. */ uint16_t __mqtt_next_pid(struct mqtt_client *client); @@ -1250,80 +1250,80 @@ uint16_t __mqtt_next_pid(struct mqtt_client *client); /** * @brief Handles egress client traffic. * @ingroup details - * + * * @param client The MQTT client. - * - * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. */ ssize_t __mqtt_send(struct mqtt_client *client); /** * @brief Handles ingress client traffic. * @ingroup details - * + * * @param client The MQTT client. - * - * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. */ ssize_t __mqtt_recv(struct mqtt_client *client); /** - * @brief Function that does the actual sending and receiving of + * @brief Function that does the actual sending and receiving of * traffic from the network. * @ingroup api - * + * * All the other functions in the @ref api simply stage messages for * being sent to the broker. This function does the actual sending of - * those messages. Additionally this function receives traffic (responses and + * those messages. Additionally this function receives traffic (responses and * acknowledgements) from the broker and responds to that traffic accordingly. * Lastly this function also calls the \c publish_response_callback when * any \c MQTT_CONTROL_PUBLISH messages are received. - * + * * @pre mqtt_init must have been called. - * + * * @param[in,out] client The MQTT client. - * - * @attention It is the responsibility of the application programmer to + * + * @attention It is the responsibility of the application programmer to * call this function periodically. All functions in the @ref api are * thread-safe so it is perfectly reasonable to have a thread dedicated * to calling this function every 200 ms or so. MQTT-C can be used in single - * threaded application though by simply calling this functino periodically + * threaded application though by simply calling this functino periodically * inside your main thread. See @ref simple_publisher.c and @ref simple_subscriber.c * for examples (specifically the \c client_refresher functions). - * - * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_sync(struct mqtt_client *client); /** * @brief Initializes an MQTT client. * @ingroup api - * + * * This function \em must be called before any other API function calls. - * + * * @pre None. - * + * * @param[out] client The MQTT client. - * @param[in] sockfd The socket file descriptor (or equivalent socket handle, e.g. BIO pointer + * @param[in] sockfd The socket file descriptor (or equivalent socket handle, e.g. BIO pointer * for OpenSSL sockets) connected to the MQTT broker. * @param[in] sendbuf A buffer that will be used for sending messages to the broker. * @param[in] sendbufsz The size of \p sendbuf in bytes. * @param[in] recvbuf A buffer that will be used for receiving messages from the broker. * @param[in] recvbufsz The size of \p recvbuf in bytes. * @param[in] publish_response_callback The callback to call whenever application messages - * are received from the broker. - * + * are received from the broker. + * * @post mqtt_connect must be called. - * + * * @note \p sockfd is a non-blocking TCP connection. * @note If \p sendbuf fills up completely during runtime a \c MQTT_ERROR_SEND_BUFFER_IS_FULL * error will be set. Similarly if \p recvbuf is ever to small to receive a message from * the broker an MQTT_ERROR_RECV_BUFFER_TOO_SMALL error will be set. - * @note A pointer to \ref mqtt_client.publish_response_callback_state is always passed as the - * \c state argument to \p publish_response_callback. Note that the second argument is + * @note A pointer to \ref mqtt_client.publish_response_callback_state is always passed as the + * \c state argument to \p publish_response_callback. Note that the second argument is * the mqtt_response_publish that was received from the broker. - * - * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or + * + * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or * \ref mqtt_init_reconnect more than once per client). * @attention \p sendbuf internally mapped to client's message-to-send queue that actively uses * pointer access. In the case of unaligned \p sendbuf, that may lead to @@ -1341,7 +1341,7 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client); * // ... * } * \endcode - * + * * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_init(struct mqtt_client *client, @@ -1353,44 +1353,44 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, /** * @brief Initializes an MQTT client and enables automatic reconnections. * @ingroup api - * - * An alternative to \ref mqtt_init that allows the client to automatically reconnect to the + * + * An alternative to \ref mqtt_init that allows the client to automatically reconnect to the * broker after an error occurs (e.g. socket error or internal buffer overflows). - * - * This is accomplished by calling the \p reconnect_callback whenever the client enters an error - * state. The job of the \p reconnect_callback is to: (1) perform error handling/logging, - * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the - * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other + * + * This is accomplished by calling the \p reconnect_callback whenever the client enters an error + * state. The job of the \p reconnect_callback is to: (1) perform error handling/logging, + * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the + * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other * API calls such as \ref mqtt_subscribe. - * - * The first argument to the \p reconnect_callback is the client (which will be in an error + * + * The first argument to the \p reconnect_callback is the client (which will be in an error * state) and the second argument is a pointer to a void pointer where you can store some state - * information. Internally, MQTT-C calls the reconnect callback like so: - * - * \code + * information. Internally, MQTT-C calls the reconnect callback like so: + * + * \code * client->reconnect_callback(client, &client->reconnect_state) * \endcode - * + * * Note that the \p reconnect_callback is also called to setup the initial session. After - * calling \ref mqtt_init_reconnect the client will be in the error state + * calling \ref mqtt_init_reconnect the client will be in the error state * \c MQTT_ERROR_INITIAL_RECONNECT. - * + * * @pre None. - * + * * @param[in,out] client The MQTT client that will be initialized. - * @param[in] reconnect_callback The callback that will be called to connect/reconnect the - * client to the broker and perform application level error handling. + * @param[in] reconnect_callback The callback that will be called to connect/reconnect the + * client to the broker and perform application level error handling. * @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback. * If your \p reconnect_callback does not require any state information set this * to NULL. A pointer to the memory address where the client stores a copy of this - * pointer is passed as the second argumnet to \p reconnect_callback. + * pointer is passed as the second argumnet to \p reconnect_callback. * @param[in] publish_response_callback The callback to call whenever application messages - * are received from the broker. - * - * @post Call \p reconnect_callback yourself, or call \ref mqtt_sync + * are received from the broker. + * + * @post Call \p reconnect_callback yourself, or call \ref mqtt_sync * (which will trigger the call to \p reconnect_callback). - * - * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or + * + * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or * \ref mqtt_init_reconnect more than once per client). * */ @@ -1402,24 +1402,24 @@ void mqtt_init_reconnect(struct mqtt_client *client, /** * @brief Safely assign/reassign a socket and buffers to an new/existing client. * @ingroup api - * + * * This function also clears the \p client error state. Upon exiting this function * \c client->error will be \c MQTT_ERROR_CONNECT_NOT_CALLED (which will be cleared) * as soon as \ref mqtt_connect is called. - * - * @pre This function must be called BEFORE \ref mqtt_connect. - * + * + * @pre This function must be called BEFORE \ref mqtt_connect. + * * @param[in,out] client The MQTT client. - * @param[in] socketfd The new socket connected to the broker. + * @param[in] socketfd The new socket connected to the broker. * @param[in] sendbuf The buffer that will be used to buffer egress traffic to the broker. * @param[in] sendbufsz The size of \p sendbuf in bytes. * @param[in] recvbuf The buffer that will be used to buffer ingress traffic from the broker. * @param[in] recvbufsz The size of \p recvbuf in bytes. - * + * * @post Call \ref mqtt_connect. - * - * @attention This function should be used in conjunction with clients that have been - * initialzed with \ref mqtt_init_reconnect. + * + * @attention This function should be used in conjunction with clients that have been + * initialzed with \ref mqtt_init_reconnect. */ void mqtt_reinit(struct mqtt_client* client, mqtt_pal_socket_handle socketfd, @@ -1429,27 +1429,27 @@ void mqtt_reinit(struct mqtt_client* client, /** * @brief Establishes a session with the MQTT broker. * @ingroup api - * + * * @pre mqtt_init must have been called. - * + * * @param[in,out] client The MQTT client. * @param[in] client_id The unique name identifying the client. (or NULL) - * @param[in] will_topic The topic name of client's \p will_message. If no will message is + * @param[in] will_topic The topic name of client's \p will_message. If no will message is * desired set to \c NULL. - * @param[in] will_message The application message (data) to be published in the event the + * @param[in] will_message The application message (data) to be published in the event the * client ungracefully disconnects. Set to \c NULL if \p will_topic is \c NULL. * @param[in] will_message_size The size of \p will_message in bytes. * @param[in] user_name The username to use when establishing the session with the MQTT broker. * Set to \c NULL if a username is not required. * @param[in] password The password to use when establishing the session with the MQTT broker. * Set to \c NULL if a password is not required. - * @param[in] connect_flags Additional \ref MQTTConnectFlags to use when establishing the connection. - * These flags are for forcing the session to start clean, - * \c MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the \p will_message with - * (provided \c will_message != \c NULL), MQTT_CONNECT_WILL_QOS_[0,1,2], and whether + * @param[in] connect_flags Additional \ref MQTTConnectFlags to use when establishing the connection. + * These flags are for forcing the session to start clean, + * \c MQTT_CONNECT_CLEAN_SESSION, the QOS level to publish the \p will_message with + * (provided \c will_message != \c NULL), MQTT_CONNECT_WILL_QOS_[0,1,2], and whether * or not the broker should retain the \c will_message, MQTT_CONNECT_WILL_RETAIN. - * @param[in] keep_alive The keep-alive time in seconds. A reasonable value for this is 400 [seconds]. - * + * @param[in] keep_alive The keep-alive time in seconds. A reasonable value for this is 400 [seconds]. + * * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_connect(struct mqtt_client *client, @@ -1462,26 +1462,26 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client, uint8_t connect_flags, uint16_t keep_alive); -/* +/* todo: will_message should be a void* */ /** * @brief Publish an application message. * @ingroup api - * + * * Publishes an application message to the MQTT broker. - * + * * @pre mqtt_connect must have been called. - * + * * @param[in,out] client The MQTT client. * @param[in] topic_name The name of the topic. * @param[in] application_message The data to be published. * @param[in] application_message_size The size of \p application_message in bytes. - * @param[in] publish_flags \ref MQTTPublishFlags to be used, namely the QOS level to - * publish at (MQTT_PUBLISH_QOS_[0,1,2]) or whether or not the broker should + * @param[in] publish_flags \ref MQTTPublishFlags to be used, namely the QOS level to + * publish at (MQTT_PUBLISH_QOS_[0,1,2]) or whether or not the broker should * retain the publish (MQTT_PUBLISH_RETAIN). - * + * * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_publish(struct mqtt_client *client, @@ -1496,8 +1496,8 @@ enum MQTTErrors mqtt_publish(struct mqtt_client *client, * * @param[in,out] client The MQTT client. * @param[in] packet_id The packet ID of the ingress publish being acknowledged. - * - * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id); @@ -1507,8 +1507,8 @@ ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id); * * @param[in,out] client The MQTT client. * @param[in] packet_id The packet ID of the ingress publish being acknowledged. - * - * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id); @@ -1518,8 +1518,8 @@ ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id); * * @param[in,out] client The MQTT client. * @param[in] packet_id The packet ID of the ingress PUBREC being acknowledged. - * - * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id); @@ -1529,8 +1529,8 @@ ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id); * * @param[in,out] client The MQTT client. * @param[in] packet_id The packet ID of the ingress PUBREL being acknowledged. - * - * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id); @@ -1538,15 +1538,15 @@ ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id); /** * @brief Subscribe to a topic. * @ingroup api - * + * * @pre mqtt_connect must have been called. - * + * * @param[in,out] client The MQTT client. * @param[in] topic_name The name of the topic to subscribe to. * @param[in] max_qos_level The maximum QOS level with which the broker can send application * messages for this topic. - * - * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, const char* topic_name, @@ -1555,62 +1555,62 @@ enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, /** * @brief Unsubscribe from a topic. * @ingroup api - * + * * @pre mqtt_connect must have been called. - * + * * @param[in,out] client The MQTT client. * @param[in] topic_name The name of the topic to unsubscribe from. - * - * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. + * + * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client, const char* topic_name); /** - * @brief Ping the broker. + * @brief Ping the broker. * @ingroup api - * + * * @pre mqtt_connect must have been called. - * + * * @param[in,out] client The MQTT client. - * + * * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_ping(struct mqtt_client *client); /** - * @brief Ping the broker without locking/unlocking the mutex. + * @brief Ping the broker without locking/unlocking the mutex. * @see mqtt_ping */ enum MQTTErrors __mqtt_ping(struct mqtt_client *client); /** - * @brief Terminate the session with the MQTT broker. + * @brief Terminate the session with the MQTT broker. * @ingroup api - * + * * @pre mqtt_connect must have been called. - * + * * @param[in,out] client The MQTT client. - * + * * @note To re-establish the session, mqtt_connect must be called. - * + * * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_disconnect(struct mqtt_client *client); /** * @brief Terminate the session with the MQTT broker and prepare to - * reconnect. Client code should call \ref mqtt_sync immediately + * reconnect. Client code should call \ref mqtt_sync immediately * after this call to prevent message loss. * @ingroup api - * - * @note The user must provide a reconnect callback function for this to + * + * @note The user must provide a reconnect callback function for this to * work as expected. See \r mqtt_client_reconnect. - * + * * @pre mqtt_connect must have been called -* +* * @param[in,out] client The MQTT client. - * + * * @returns \c MQTT_OK upon success, an \ref MQTTErrors otherwise. */ enum MQTTErrors mqtt_reconnect(struct mqtt_client *client); diff --git a/include/mqtt_pal.h b/include/mqtt_pal.h index 3a32a1f..31d693c 100644 --- a/include/mqtt_pal.h +++ b/include/mqtt_pal.h @@ -32,38 +32,38 @@ extern "C" { /** * @file * @brief Includes/supports the types/calls required by the MQTT-C client. - * - * @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore - * responsible for including/supporting all the required types and calls. - * + * + * @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore + * responsible for including/supporting all the required types and calls. + * * @defgroup pal Platform abstraction layer * @brief Documentation of the types and calls required to port MQTT-C to a new platform. - * - * mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a - * new platform the following types, functions, constants, and macros must be defined in + * + * mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a + * new platform the following types, functions, constants, and macros must be defined in * mqtt_pal.h: * - Types: * - \c size_t, \c ssize_t * - \c uint8_t, \c uint16_t, \c uint32_t * - \c va_list - * - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME() - * - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and + * - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME() + * - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and * \c MQTT_PAL_MUTEX_RELEASE * - Functions: * - \c memcpy, \c strlen * - \c va_start, \c va_arg, \c va_end * - Constants: * - \c INT_MIN - * + * * Additionally, three macro's are required: * - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t. * - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t. - * - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds. + * - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds. * - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer. - * - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by + * - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by * \c mtx_pointer. - * - * Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c + * + * Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c * for sending and receiving data using the platforms socket calls. */ @@ -153,12 +153,12 @@ extern "C" { /** * @brief Sends all the bytes in a buffer. * @ingroup pal - * + * * @param[in] fd The file-descriptor (or handle) of the socket. * @param[in] buf A pointer to the first byte in the buffer to send. * @param[in] len The number of bytes to send (starting at \p buf). * @param[in] flags Flags which are passed to the underlying socket. - * + * * @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise. * * Note about the error handling: @@ -173,12 +173,12 @@ ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, /** * @brief Non-blocking receive all the byte available. * @ingroup pal - * + * * @param[in] fd The file-descriptor (or handle) of the socket. * @param[in] buf A pointer to the receive buffer. * @param[in] bufsz The max number of bytes that can be put into \p buf. * @param[in] flags Flags which are passed to the underlying socket. - * + * * @returns The number of bytes received if successful, an \ref MQTTErrors otherwise. * * Note about the error handling: diff --git a/src/mqtt.c b/src/mqtt.c index eaac2fe..96c709c 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -24,11 +24,11 @@ SOFTWARE. #include -/** - * @file +/** + * @file * @brief Implements the functionality of MQTT-C. * @note The only files that are included are mqtt.h and mqtt_pal.h. - * + * * @cond Doxygen_Suppress */ @@ -50,7 +50,7 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { } /* Call inspector callback if necessary */ - + if (client->inspector_callback != NULL) { MQTT_PAL_MUTEX_LOCK(&client->mutex); err = client->inspector_callback(client); @@ -80,7 +80,7 @@ uint16_t __mqtt_next_pid(struct mqtt_client *client) { client->pid_lfsr = 163u; } /* LFSR taps taken from: https://en.wikipedia.org/wiki/Linear-feedback_shift_register */ - + do { struct mqtt_queued_message *curr; unsigned lsb = client->pid_lfsr & 1; @@ -187,7 +187,7 @@ void mqtt_reinit(struct mqtt_client* client, client->recv_buffer.curr_sz = client->recv_buffer.mem_size; } -/** +/** * A macro function that: * 1) Checks that the client isn't in an error state. * 2) Attempts to pack to client's message queue. @@ -241,15 +241,15 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client, if (client->error == MQTT_ERROR_CONNECT_NOT_CALLED) { client->error = MQTT_OK; } - + /* try to pack the message */ - MQTT_CLIENT_TRY_PACK(rv, msg, client, + MQTT_CLIENT_TRY_PACK(rv, msg, client, mqtt_pack_connection_request( client->mq.curr, client->mq.curr_sz, - client_id, will_topic, will_message, - will_message_size,user_name, password, + client_id, will_topic, will_message, + will_message_size,user_name, password, connect_flags, keep_alive - ), + ), 1 ); /* save the control type of the message */ @@ -274,7 +274,7 @@ enum MQTTErrors mqtt_publish(struct mqtt_client *client, /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_publish_request( client->mq.curr, client->mq.curr_sz, topic_name, @@ -282,7 +282,7 @@ enum MQTTErrors mqtt_publish(struct mqtt_client *client, application_message, application_message_size, publish_flags - ), + ), 1 ); /* save the control type and packet id of the message */ @@ -299,7 +299,7 @@ ssize_t __mqtt_puback(struct mqtt_client *client, uint16_t packet_id) { /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_pubxxx_request( client->mq.curr, client->mq.curr_sz, MQTT_CONTROL_PUBACK, @@ -320,7 +320,7 @@ ssize_t __mqtt_pubrec(struct mqtt_client *client, uint16_t packet_id) { /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_pubxxx_request( client->mq.curr, client->mq.curr_sz, MQTT_CONTROL_PUBREC, @@ -341,7 +341,7 @@ ssize_t __mqtt_pubrel(struct mqtt_client *client, uint16_t packet_id) { /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_pubxxx_request( client->mq.curr, client->mq.curr_sz, MQTT_CONTROL_PUBREL, @@ -362,7 +362,7 @@ ssize_t __mqtt_pubcomp(struct mqtt_client *client, uint16_t packet_id) { /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_pubxxx_request( client->mq.curr, client->mq.curr_sz, MQTT_CONTROL_PUBCOMP, @@ -389,14 +389,14 @@ enum MQTTErrors mqtt_subscribe(struct mqtt_client *client, /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_subscribe_request( client->mq.curr, client->mq.curr_sz, packet_id, topic_name, max_qos_level, (const char*)NULL - ), + ), 1 ); /* save the control type and packet id of the message */ @@ -417,13 +417,13 @@ enum MQTTErrors mqtt_unsubscribe(struct mqtt_client *client, /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_unsubscribe_request( client->mq.curr, client->mq.curr_sz, packet_id, topic_name, (const char*)NULL - ), + ), 1 ); /* save the control type and packet id of the message */ @@ -442,14 +442,14 @@ enum MQTTErrors mqtt_ping(struct mqtt_client *client) { return rv; } -enum MQTTErrors __mqtt_ping(struct mqtt_client *client) +enum MQTTErrors __mqtt_ping(struct mqtt_client *client) { ssize_t rv; struct mqtt_queued_message *msg; /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_ping_request( client->mq.curr, client->mq.curr_sz ), @@ -458,7 +458,7 @@ enum MQTTErrors __mqtt_ping(struct mqtt_client *client) /* save the control type and packet id of the message */ msg->control_type = MQTT_CONTROL_PINGREQ; - + return MQTT_OK; } @@ -474,7 +474,7 @@ enum MQTTErrors mqtt_reconnect(struct mqtt_client *client) return err; } -enum MQTTErrors mqtt_disconnect(struct mqtt_client *client) +enum MQTTErrors mqtt_disconnect(struct mqtt_client *client) { ssize_t rv; struct mqtt_queued_message *msg; @@ -482,10 +482,10 @@ enum MQTTErrors mqtt_disconnect(struct mqtt_client *client) /* try to pack the message */ MQTT_CLIENT_TRY_PACK( - rv, msg, client, + rv, msg, client, mqtt_pack_disconnect( client->mq.curr, client->mq.curr_sz - ), + ), 1 ); /* save the control type and packet id of the message */ @@ -495,15 +495,15 @@ enum MQTTErrors mqtt_disconnect(struct mqtt_client *client) return MQTT_OK; } -ssize_t __mqtt_send(struct mqtt_client *client) +ssize_t __mqtt_send(struct mqtt_client *client) { uint8_t inspected; ssize_t len; int inflight_qos2 = 0; int i = 0; - + MQTT_PAL_MUTEX_LOCK(&client->mutex); - + if (client->error < 0 && client->error != MQTT_ERROR_SEND_BUFFER_IS_FULL) { MQTT_PAL_MUTEX_UNLOCK(&client->mutex); return client->error; @@ -528,7 +528,7 @@ ssize_t __mqtt_send(struct mqtt_client *client) /* only send QoS 2 message if there are no inflight QoS 2 PUBLISH messages */ if (msg->control_type == MQTT_CONTROL_PUBLISH - && (msg->state == MQTT_QUEUED_UNSENT || msg->state == MQTT_QUEUED_AWAITING_ACK)) + && (msg->state == MQTT_QUEUED_UNSENT || msg->state == MQTT_QUEUED_AWAITING_ACK)) { inspected = 0x03 & ((msg->start[0]) >> 1); /* qos */ if (inspected == 2) { @@ -569,7 +569,7 @@ ssize_t __mqtt_send(struct mqtt_client *client) client->time_of_last_send = MQTT_PAL_TIME(); msg->time_sent = client->time_of_last_send; - /* + /* Determine the state to put the message in. Control Types: MQTT_CONTROL_CONNECT -> awaiting @@ -599,7 +599,7 @@ ssize_t __mqtt_send(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; } else if (inspected == 1) { msg->state = MQTT_QUEUED_AWAITING_ACK; - /*set DUP flag for subsequent sends [Spec MQTT-3.3.1-1] */ + /*set DUP flag for subsequent sends [Spec MQTT-3.3.1-1] */ msg->start[0] |= MQTT_PUBLISH_DUP; } else { msg->state = MQTT_QUEUED_AWAITING_ACK; @@ -971,7 +971,7 @@ static ssize_t mqtt_fixed_header_rule_violation(const struct mqtt_fixed_header * if (!mqtt_fixed_header_rules.control_type_is_valid[control_type]) { return MQTT_ERROR_CONTROL_FORBIDDEN_TYPE; } - + /* check that flags are appropriate */ if(MQTT_BITFIELD_RULE_VIOLOATION(control_flags, required_flags, mask_required_flags)) { return MQTT_ERROR_CONTROL_INVALID_FLAGS; @@ -985,7 +985,7 @@ ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t * const uint8_t *start = buf; int lshift; ssize_t errcode; - + /* check for null pointers or empty buffer */ if (response == NULL || buf == NULL) { return MQTT_ERROR_NULLPTR; @@ -1017,7 +1017,7 @@ ssize_t mqtt_unpack_fixed_header(struct mqtt_response *response, const uint8_t * /* parse next byte*/ fixed_header->remaining_length += (uint32_t) ((*buf & 0x7F) << lshift); lshift += 7; - } while(*buf & 0x80); /* while continue bit is set */ + } while(*buf & 0x80); /* while continue bit is set */ /* consume last byte */ --bufsz; @@ -1042,7 +1042,7 @@ ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fix const uint8_t *start = buf; ssize_t errcode; uint32_t remaining_length; - + /* check for null pointers or empty buffer */ if (fixed_header == NULL || buf == NULL) { return MQTT_ERROR_NULLPTR; @@ -1072,13 +1072,13 @@ ssize_t mqtt_pack_fixed_header(uint8_t *buf, size_t bufsz, const struct mqtt_fix --bufsz; ++buf; if (bufsz == 0) return 0; - + /* pack next byte */ *buf = remaining_length & 0x7F; if(remaining_length > 127) *buf |= 0x80; remaining_length = remaining_length >> 7; } while(*buf & 0x80); - + /* consume last byte */ --bufsz; ++buf; @@ -1102,7 +1102,7 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, const char* password, uint8_t connect_flags, uint16_t keep_alive) -{ +{ struct mqtt_fixed_header fixed_header; size_t remaining_length; const uint8_t *const start = buf; @@ -1131,7 +1131,7 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, /* there is a will */ connect_flags |= MQTT_CONNECT_WILL_FLAG; remaining_length += __mqtt_packed_cstrlen(will_topic); - + if (will_message == NULL) { /* if there's a will there MUST be a will message */ return MQTT_ERROR_CONNECT_NULL_WILL_MESSAGE; @@ -1139,7 +1139,7 @@ ssize_t mqtt_pack_connection_request(uint8_t* buf, size_t bufsz, remaining_length += 2 + will_message_size; /* size of will_message */ /* assert that the will QOS is valid (i.e. not 3) */ - temp = connect_flags & 0x18; /* mask to QOS */ + temp = connect_flags & 0x18; /* mask to QOS */ if (temp == 0x18) { /* bitwise equality with QoS 3 (invalid)*/ return MQTT_ERROR_CONNECT_FORBIDDEN_WILL_QOS; @@ -1223,7 +1223,7 @@ ssize_t mqtt_unpack_connack_response(struct mqtt_response *mqtt_response, const if (mqtt_response->fixed_header.remaining_length != 2) { return MQTT_ERROR_MALFORMED_RESPONSE; } - + response = &(mqtt_response->decoded.connack); /* unpack */ if (*buf & 0xFE) { @@ -1332,11 +1332,11 @@ ssize_t mqtt_pack_publish_request(uint8_t *buf, size_t bufsz, } ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const uint8_t *buf) -{ +{ const uint8_t *const start = buf; struct mqtt_fixed_header *fixed_header; struct mqtt_response_publish *response; - + fixed_header = &(mqtt_response->fixed_header); response = &(mqtt_response->decoded.publish); @@ -1369,15 +1369,15 @@ ssize_t mqtt_unpack_publish_response(struct mqtt_response *mqtt_response, const response->application_message_size = fixed_header->remaining_length - response->topic_name_size - 4; } buf += response->application_message_size; - + /* return number of bytes consumed */ return buf - start; } /* PUBXXX */ -ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, +ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, enum MQTTControlPacketType control_type, - uint16_t packet_id) + uint16_t packet_id) { const uint8_t *const start = buf; struct mqtt_fixed_header fixed_header; @@ -1404,13 +1404,13 @@ ssize_t mqtt_pack_pubxxx_request(uint8_t *buf, size_t bufsz, if (bufsz < fixed_header.remaining_length) { return 0; } - + buf += __mqtt_pack_uint16(buf, packet_id); return buf - start; } -ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf) +ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const uint8_t *buf) { const uint8_t *const start = buf; uint16_t packet_id; @@ -1441,7 +1441,7 @@ ssize_t mqtt_unpack_pubxxx_response(struct mqtt_response *mqtt_response, const u ssize_t mqtt_unpack_suback_response (struct mqtt_response *mqtt_response, const uint8_t *buf) { const uint8_t *const start = buf; uint32_t remaining_length = mqtt_response->fixed_header.remaining_length; - + /* assert remaining length is at least 3 (for packet id and at least 1 topic) */ if (remaining_length < 3) { return MQTT_ERROR_MALFORMED_RESPONSE; @@ -1511,8 +1511,8 @@ ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, unsigned int pac if (bufsz < fixed_header.remaining_length) { return 0; } - - + + /* pack variable header */ buf += __mqtt_pack_uint16(buf, (uint16_t)packet_id); @@ -1527,7 +1527,7 @@ ssize_t mqtt_pack_subscribe_request(uint8_t *buf, size_t bufsz, unsigned int pac } /* UNSUBACK */ -ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf) +ssize_t mqtt_unpack_unsuback_response(struct mqtt_response *mqtt_response, const uint8_t *buf) { const uint8_t *const start = buf; @@ -1604,8 +1604,8 @@ ssize_t mqtt_pack_unsubscribe_request(uint8_t *buf, size_t bufsz, unsigned int p } /* MESSAGE QUEUE */ -void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz) -{ +void mqtt_mq_init(struct mqtt_message_queue *mq, void *buf, size_t bufsz) +{ if(buf != NULL) { mq->mem_start = buf; @@ -1637,7 +1637,7 @@ void mqtt_mq_clean(struct mqtt_message_queue *mq) { for(new_head = mqtt_mq_get(mq, 0); new_head >= mq->queue_tail; --new_head) { if (new_head->state != MQTT_QUEUED_COMPLETE) break; } - + /* check if everything can be removed */ if (new_head < mq->queue_tail) { mq->curr = (uint8_t *)mq->mem_start; @@ -1655,14 +1655,14 @@ void mqtt_mq_clean(struct mqtt_message_queue *mq) { size_t removing = (size_t) (new_head->start - (uint8_t*) mq->mem_start); memmove(mq->mem_start, new_head->start, n); mq->curr = (unsigned char*)mq->mem_start + n; - + /* move queue */ { ssize_t new_tail_idx = new_head - mq->queue_tail; memmove(mqtt_mq_get(mq, new_tail_idx), mq->queue_tail, sizeof(struct mqtt_queued_message) * (size_t) ((new_tail_idx + 1))); mq->queue_tail = mqtt_mq_get(mq, new_tail_idx); - + { /* bump back start's */ ssize_t i = 0; @@ -1759,7 +1759,7 @@ ssize_t __mqtt_pack_str(uint8_t *buf, const char* str) { for(; i < length; ++i) { *(buf++) = (uint8_t)str[i]; } - + /* return number of bytes consumed */ return length + 2; } diff --git a/src/mqtt_pal.c b/src/mqtt_pal.c index 27e8149..0daadbf 100644 --- a/src/mqtt_pal.c +++ b/src/mqtt_pal.c @@ -24,9 +24,9 @@ SOFTWARE. #include -/** - * @file - * @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and +/** + * @file + * @brief Implements @ref mqtt_pal_sendall and @ref mqtt_pal_recvall and * any platform-specific helpers you'd like. * @cond Doxygen_Suppress */ @@ -196,7 +196,7 @@ static int do_rec_data(mqtt_pal_socket_handle fd, unsigned int status) { if ((rc = fd->low_read(&fd->fd, buffer, length)) < 0) { return MQTT_ERROR_SOCKET_ERROR; } - + br_ssl_engine_recvrec_ack(&fd->sc.eng, rc); } } @@ -306,7 +306,7 @@ ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, return MQTT_ERROR_SOCKET_ERROR; } } - + return (ssize_t)sent; } diff --git a/tests.c b/tests.c index 0b13771..e5233b5 100644 --- a/tests.c +++ b/tests.c @@ -115,7 +115,7 @@ static void TEST__framing__fixed_header(void** state) { correct_buf[0] = (MQTT_CONTROL_PUBLISH << 4) | 0xF; rv = mqtt_unpack_fixed_header(&response, correct_buf, sizeof(correct_buf)); assert_true(rv == 3); - + correct_buf[0] = (MQTT_CONTROL_PUBLISH << 4) | 3; rv = mqtt_unpack_fixed_header(&response, correct_buf, sizeof(correct_buf)); assert_true(rv == 3); @@ -187,14 +187,14 @@ static void TEST__framing__connect(void** state) { ssize_t rv; const uint8_t correct_bytes[] = { (MQTT_CONTROL_CONNECT << 4) | 0, 16, - 0, 4, 'M', 'Q', 'T', 'T', MQTT_PROTOCOL_LEVEL, 0, 0, 120u, + 0, 4, 'M', 'Q', 'T', 'T', MQTT_PROTOCOL_LEVEL, 0, 0, 120u, 0, 4, 'l', 'i', 'a', 'm' }; const uint8_t correct_bytes2[] = { (MQTT_CONTROL_CONNECT << 4) | 0, 51, - 0, 4, 'M', 'Q', 'T', 'T', MQTT_PROTOCOL_LEVEL, - MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_USER_NAME | MQTT_CONNECT_PASSWORD, - 0, 120u, + 0, 4, 'M', 'Q', 'T', 'T', MQTT_PROTOCOL_LEVEL, + MQTT_CONNECT_WILL_FLAG | MQTT_CONNECT_USER_NAME | MQTT_CONNECT_PASSWORD, + 0, 120u, 0, 4, 'l', 'i', 'a', 'm', 0, 9, 'w', 'i', 'l', 'l', 't', 'o', 'p', 'i', 'c', 0, 2, 'h', 'i', @@ -251,8 +251,8 @@ static void TEST__framing__publish(void** state) { struct mqtt_response mqtt_response; struct mqtt_response_publish *response; response = &(mqtt_response.decoded.publish); - - + + rv = mqtt_pack_publish_request(buf, 256, "topic1", 23, "0123456789", 10, MQTT_PUBLISH_RETAIN); assert_true(rv == 20); assert_true(memcmp(buf, correct_bytes, 20) == 0); @@ -301,7 +301,7 @@ static void TEST__utility__connect_disconnect(void** state) { static void TEST__framing__connack(void** state) { uint8_t buf[] = { - (MQTT_CONTROL_CONNACK << 4) | 0, 2, + (MQTT_CONTROL_CONNACK << 4) | 0, 2, 0, MQTT_CONNACK_ACCEPTED }; struct mqtt_response mqtt_response; @@ -458,14 +458,14 @@ static void TEST__framing__unsuback(void** state) { static void TEST__framing__disconnect(void** state) { uint8_t buf[2]; - assert_true(mqtt_pack_disconnect(buf, 2) == 2); + assert_true(mqtt_pack_disconnect(buf, 2) == 2); } static void TEST__framing__ping(void** state) { uint8_t buf[2]; struct mqtt_response response; struct mqtt_fixed_header *fixed_header = &response.fixed_header; - assert_true(mqtt_pack_ping_request(buf, 2) == 2); + assert_true(mqtt_pack_ping_request(buf, 2) == 2); assert_true(mqtt_unpack_fixed_header(&response, buf, 2) == 2); assert_true(fixed_header->control_type == MQTT_CONTROL_PINGREQ); assert_true(fixed_header->remaining_length == 0); @@ -593,7 +593,7 @@ static void TEST__utility__message_queue(void **unused) { /* remove the last two */ mqtt_mq_get(&mq, 0)->state = MQTT_QUEUED_COMPLETE; mqtt_mq_get(&mq, 1)->state = MQTT_QUEUED_COMPLETE; - mqtt_mq_clean(&mq); + mqtt_mq_clean(&mq); assert_true(mqtt_mq_length(&mq) == 0); assert_true(mq.curr_sz == 32 + 3*QM_SZ); assert_true((void*) mq.queue_tail == mq.mem_end); @@ -616,7 +616,7 @@ void publish_callback(void** state, struct mqtt_response_publish *publish) { /*char *name = (char*) malloc(publish->topic_name_size + 1); memcpy(name, publish->topic_name, publish->topic_name_size); name[publish->topic_name_size] = '\0'; - printf("Received a PUBLISH(topic=%s, DUP=%d, QOS=%d, RETAIN=%d, pid=%d) from the broker. Data='%s'\n", + printf("Received a PUBLISH(topic=%s, DUP=%d, QOS=%d, RETAIN=%d, pid=%d) from the broker. Data='%s'\n", name, publish->dup_flag, publish->qos_level, publish->retain_flag, publish->packet_id, (const char*) (publish->application_message) ); @@ -709,7 +709,7 @@ static void TEST__api__publish_subscribe__single(void **unused) { time_t start = time(NULL); while(state == 0 && time(NULL) < start + 10) { - assert_true(__mqtt_recv(&receiver) > 0); + assert_true(__mqtt_recv(&receiver) > 0); usleep(10000); } @@ -728,7 +728,7 @@ static void TEST__api__publish_subscribe__single(void **unused) { #define TEST_PACKET_SIZE (149) #define TEST_DATA_SIZE (128) static void TEST__api__publish_subscribe__multiple(void **unused) { - uint8_t sendmem1[TEST_PACKET_SIZE*4 + sizeof(struct mqtt_queued_message)*4], + uint8_t sendmem1[TEST_PACKET_SIZE*4 + sizeof(struct mqtt_queued_message)*4], sendmem2[TEST_PACKET_SIZE*4 + sizeof(struct mqtt_queued_message)*4]; uint8_t recvmem1[TEST_PACKET_SIZE], recvmem2[TEST_PACKET_SIZE]; struct mqtt_client sender, receiver; @@ -887,7 +887,7 @@ static void TEST__api__publish_subscribe__multiple(void **unused) { assert_true(rv > 0); } - /*sleep for 2 seconds while unsubscribe is sending */ + /*sleep for 2 seconds while unsubscribe is sending */ start = time(NULL); while(time(NULL) < start + 2) { if ((rv = __mqtt_recv(&receiver)) < 0) { @@ -913,7 +913,7 @@ static void TEST__api__publish_subscribe__multiple(void **unused) { printf("error: %s\n", mqtt_error_str(rv)); assert_true(rv > 0); } - /*sleep for 2 seconds to give the publish a chance */ + /*sleep for 2 seconds to give the publish a chance */ start = time(NULL); while(time(NULL) < start + 2) { if ((rv = __mqtt_recv(&receiver)) < 0) { From 08fff71a73fdf01a16ab2160dd1588ae4626b540 Mon Sep 17 00:00:00 2001 From: Rafael Silva Date: Tue, 14 Dec 2021 14:58:03 +0000 Subject: [PATCH 2/4] Add event callback system Signed-off-by: Rafael Silva --- include/mqtt.h | 155 +++++++++++++++++++++++++++++++------------------ src/mqtt.c | 101 ++++++++++++++++++++++++++------ tests.c | 11 ++-- 3 files changed, 188 insertions(+), 79 deletions(-) diff --git a/include/mqtt.h b/include/mqtt.h index 34c879d..07c7674 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -120,7 +120,7 @@ extern "C" { */ - /** +/** * @brief An enumeration of the MQTT control packet types. * @ingroup unpackers * @@ -128,7 +128,7 @@ extern "C" { * MQTT v3.1.1: MQTT Control Packet Types * */ - enum MQTTControlPacketType { +enum MQTTControlPacketType { MQTT_CONTROL_CONNECT=1u, MQTT_CONTROL_CONNACK=2u, MQTT_CONTROL_PUBLISH=3u, @@ -1084,6 +1084,34 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT /* CLIENT */ +/** + * @brief An enumeration of callback events. + * @ingroup details + */ +enum MQTTCallbackEvent { + MQTT_EVENT_RECONNECT, + MQTT_EVENT_CONNECTION_REFUSED, + MQTT_EVENT_CONNECTED, + MQTT_EVENT_DISCONNECTED, + MQTT_EVENT_RECEIVE, + MQTT_EVENT_PUBLISH, + MQTT_EVENT_SUBSCRIBE, + MQTT_EVENT_UNSUBSCRIBE, + MQTT_EVENT_PING, + MQTT_EVENT_PUBLISH_TIMEOUT, + MQTT_EVENT_ERROR +}; + +/** + * @brief union to serve as proxy to multiple datatypes on one pointer. + * @ingroup details + */ +union MQTTCallbackData { + struct mqtt_response_publish *received_msg; + struct mqtt_queued_message *queued_msg; + enum MQTTErrors *error; +}; + /** * @brief An MQTT client. * @ingroup details @@ -1154,17 +1182,56 @@ struct mqtt_client { double typical_response_time; /** - * @brief The callback that is called whenever a publish is received from the broker. + * @brief The callback that is called whenever an event happens + * events happen when: + * - publish is received from the broker. + * + * Any topics that you have subscribed to will be returned from the broker as + * mqtt_response_publish messages. All the publishes received from the broker will + * be passed to this function on a MQTT_EVENT_RECEIVE. + * + * - reconnect is called whenever the client enters an error state + * that requires reinitialization. + * + * The job of the MQTT_EVENT_RECONNECT is to: (1) perform error handling/logging, + * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the + * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other + * API calls such as \ref mqtt_subscribe. + * + * - (dis)connect (refused) is called whenever a connection is complete, is refused, or disconnected. + * + * MQTT_EVENT_CONNECTION_REFUSED is called on a connection refused error + * MQTT_EVENT_CONNECTED is called whenever a connection is acknowledged and accepted + * MQTT_EVENT_DISCONNECTED is called whenver a disconnect is sent by the client * - * Any topics that you have subscribed to will be returned from the broker as - * mqtt_response_publish messages. All the publishes received from the broker will - * be passed to this function. + * - publish is called whenver a message WE published is successful, ie acknowledged * - * @note A pointer to publish_response_callback_state is always passed to the callback. - * Use publish_response_callback_state to keep track of any state information you + * MQTT_EVENT_PUBLISH is called when + * on QoS == 0: when the message is sent + * on QoS == 1: when the message is acknowledged by the broker + * on QoS == 2: when the message is acknowledged by the broker + * + * MQTT_EVENT_PUBLISH_TIMEOUT is called whenever a message that requires acknowledgement is not so + * for a response_timeout period, the message is requeued automatically + * + * - (un)subscribe is called when a (un)subscription is ackowledged + * + * MQTT_EVENT_SUBSCRIBE on sub + * MQTT_EVENT_UNSUBSCRIBE on unsub + * + * - ping is called when we get a ping response + * + * MQTT_EVENT_PING + * + * - error is called when an error state not handled by any of the other events happens + * + * MQTT_EVENT_ERROR + * + * @note A pointer to user_callback_state is always passed to the callback. + * Use user_callback_state to keep track of any state information you * need. */ - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish); + void (*user_callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state); /** * @brief A pointer to any publish_response_callback state information you need. @@ -1172,7 +1239,7 @@ struct mqtt_client { * @note A pointer to this pointer will always be publish_response_callback upon * receiving a publish message from the broker. */ - void* publish_response_callback_state; + void* user_callback_state; /** * @brief A user-specified callback, triggered on each \ref mqtt_sync, allowing @@ -1191,21 +1258,6 @@ struct mqtt_client { */ enum MQTTErrors (*inspector_callback)(struct mqtt_client*); - /** - * @brief A callback that is called whenever the client is in an error state. - * - * This callback is responsible for: application level error handling, closing - * previous sockets, and reestabilishing the connection to the broker and - * session configurations (i.e. subscriptions). - */ - void (*reconnect_callback)(struct mqtt_client*, void**); - - /** - * @brief A pointer to some state. A pointer to this member is passed to - * \ref mqtt_client.reconnect_callback. - */ - void* reconnect_state; - /** * @brief The buffer where ingress data is temporarily stored. */ @@ -1276,8 +1328,8 @@ ssize_t __mqtt_recv(struct mqtt_client *client); * being sent to the broker. This function does the actual sending of * those messages. Additionally this function receives traffic (responses and * acknowledgements) from the broker and responds to that traffic accordingly. - * Lastly this function also calls the \c publish_response_callback when - * any \c MQTT_CONTROL_PUBLISH messages are received. + * Lastly this function also calls the \c user_callback when + * any \c MQTTCallbackEvent events happen. * * @pre mqtt_init must have been called. * @@ -1310,12 +1362,11 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client); * @param[in] sendbufsz The size of \p sendbuf in bytes. * @param[in] recvbuf A buffer that will be used for receiving messages from the broker. * @param[in] recvbufsz The size of \p recvbuf in bytes. - * @param[in] publish_response_callback The callback to call whenever application messages - * are received from the broker. + * @param[in] callback The callback to call whenever events happen. * * @post mqtt_connect must be called. * - * @note \p sockfd is a non-blocking TCP connection. + * @note \p sockfd is a non-blocking socket connection. * @note If \p sendbuf fills up completely during runtime a \c MQTT_ERROR_SEND_BUFFER_IS_FULL * error will be set. Similarly if \p recvbuf is ever to small to receive a message from * the broker an MQTT_ERROR_RECV_BUFFER_TOO_SMALL error will be set. @@ -1348,56 +1399,48 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)); + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); /** - * @brief Initializes an MQTT client and enables automatic reconnections. + * @brief Briefly initializes an MQTT client, expecting full init in a reconnect event. * @ingroup api * - * An alternative to \ref mqtt_init that allows the client to automatically reconnect to the - * broker after an error occurs (e.g. socket error or internal buffer overflows). - * - * This is accomplished by calling the \p reconnect_callback whenever the client enters an error - * state. The job of the \p reconnect_callback is to: (1) perform error handling/logging, - * (2) clean up the old connection (i.e. close client->socketfd), (3) \ref mqtt_reinit the - * client, and (4) reconfigure the MQTT session by calling \ref mqtt_connect followed by other - * API calls such as \ref mqtt_subscribe. + * An alternative to \ref mqtt_init that expects the client to automatically reconnect to the + * broker in the reconnect event after an error occurs (e.g. socket error or internal buffer overflows).. * - * The first argument to the \p reconnect_callback is the client (which will be in an error - * state) and the second argument is a pointer to a void pointer where you can store some state + * The first argument to the \p user_callback is the client (which will be in an error + * state) and the second argument is an MQTTCallbackEvent which identifies the event type + * here we care about the MQTT_EVENT_RECONNECT event + * lastly a pointer to a void pointer where you can store some state * information. Internally, MQTT-C calls the reconnect callback like so: * * \code - * client->reconnect_callback(client, &client->reconnect_state) + * client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_state) * \endcode * - * Note that the \p reconnect_callback is also called to setup the initial session. After + * Note that the \p user_callback is also called to setup the initial session. After * calling \ref mqtt_init_reconnect the client will be in the error state * \c MQTT_ERROR_INITIAL_RECONNECT. * * @pre None. * * @param[in,out] client The MQTT client that will be initialized. - * @param[in] reconnect_callback The callback that will be called to connect/reconnect the - * client to the broker and perform application level error handling. - * @param[in] reconnect_state A pointer to some state data for your \p reconnect_callback. - * If your \p reconnect_callback does not require any state information set this + * @param[in] callback_state A pointer to some state data for your \p user_callback. + * If your \p user_callback does not require any state information set this * to NULL. A pointer to the memory address where the client stores a copy of this - * pointer is passed as the second argumnet to \p reconnect_callback. - * @param[in] publish_response_callback The callback to call whenever application messages - * are received from the broker. + * pointer is passed as an argumnet to \p user_callback. + * @param[in] callback The callback that will be called to connect/reconnect the + * client and every other event. * - * @post Call \p reconnect_callback yourself, or call \ref mqtt_sync - * (which will trigger the call to \p reconnect_callback). + * @post Call \ref mqtt_sync (which will trigger the call to \p user_callback). * * @attention Only initialize an MQTT client once (i.e. don't call \ref mqtt_init or * \ref mqtt_init_reconnect more than once per client). * */ void mqtt_init_reconnect(struct mqtt_client *client, - void (*reconnect_callback)(struct mqtt_client *client, void** state), - void *reconnect_state, - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)); + void *callback_state, + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); /** * @brief Safely assign/reassign a socket and buffers to an new/existing client. diff --git a/src/mqtt.c b/src/mqtt.c index 96c709c..9f13621 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -37,8 +37,8 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { enum MQTTErrors err; int reconnecting = 0; MQTT_PAL_MUTEX_LOCK(&client->mutex); - if (client->error != MQTT_ERROR_RECONNECTING && client->error != MQTT_OK && client->reconnect_callback != NULL) { - client->reconnect_callback(client, &client->reconnect_state); + if (client->error != MQTT_ERROR_RECONNECTING && client->error != MQTT_OK && client->user_callback != NULL) { + client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_callback_state); /* unlocked during CONNECT */ } else { /* mqtt_reconnect will have queued the disconnect packet - that needs to be sent and then call reconnect */ @@ -55,20 +55,30 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { MQTT_PAL_MUTEX_LOCK(&client->mutex); err = client->inspector_callback(client); MQTT_PAL_MUTEX_UNLOCK(&client->mutex); - if (err != MQTT_OK) return err; + if (err != MQTT_OK) goto ERR; } /* Call receive */ - err = (enum MQTTErrors)__mqtt_recv(client); - if (err != MQTT_OK) return err; + err = (enum MQTTErrors)__mqtt_recv(client); + if (err != MQTT_OK) goto ERR; /* Call send */ err = (enum MQTTErrors)__mqtt_send(client); /* mqtt_reconnect will essentially be a disconnect if there is no callback */ - if (reconnecting && client->reconnect_callback != NULL) { + if (reconnecting && client->user_callback != NULL) { MQTT_PAL_MUTEX_LOCK(&client->mutex); - client->reconnect_callback(client, &client->reconnect_state); + client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_callback_state); + } + + ERR: + + if (err != MQTT_ERROR_RECONNECTING && err != MQTT_ERROR_CONNECT_CLIENT_ID_REFUSED && + err != MQTT_ERROR_CONNECTION_REFUSED && err != MQTT_OK && + client->user_callback != NULL) { + /* call timeout callback */ + union MQTTCallbackData data = {.error = &err}; + client->user_callback(client, MQTT_EVENT_ERROR, &data, &client->user_callback_state); } return err; @@ -106,7 +116,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, - void (*publish_response_callback)(void** state,struct mqtt_response_publish *publish)) + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { if (client == NULL || sendbuf == NULL || recvbuf == NULL) { return MQTT_ERROR_NULLPTR; @@ -130,21 +140,19 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, client->number_of_timeouts = 0; client->number_of_keep_alives = 0; client->typical_response_time = -1.0; - client->publish_response_callback = publish_response_callback; client->pid_lfsr = 0; client->send_offset = 0; client->inspector_callback = NULL; - client->reconnect_callback = NULL; - client->reconnect_state = NULL; + client->user_callback_state = NULL; + client->user_callback = callback; return MQTT_OK; } void mqtt_init_reconnect(struct mqtt_client *client, - void (*reconnect)(struct mqtt_client *, void**), - void *reconnect_state, - void (*publish_response_callback)(void** state, struct mqtt_response_publish *publish)) + void *callback_state, + void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { /* initialize mutex */ MQTT_PAL_MUTEX_INIT(&client->mutex); @@ -163,12 +171,11 @@ void mqtt_init_reconnect(struct mqtt_client *client, client->number_of_timeouts = 0; client->number_of_keep_alives = 0; client->typical_response_time = -1.0; - client->publish_response_callback = publish_response_callback; client->send_offset = 0; client->inspector_callback = NULL; - client->reconnect_callback = reconnect; - client->reconnect_state = reconnect_state; + client->user_callback_state = callback_state; + client->user_callback = callback; } void mqtt_reinit(struct mqtt_client* client, @@ -523,6 +530,12 @@ ssize_t __mqtt_send(struct mqtt_client *client) resend = 1; client->number_of_timeouts += 1; client->send_offset = 0; + + if (client->user_callback != NULL) { + /* call timeout callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH_TIMEOUT, &data, &client->user_callback_state); + } } } @@ -590,13 +603,25 @@ ssize_t __mqtt_send(struct mqtt_client *client) switch (msg->control_type) { case MQTT_CONTROL_PUBACK: case MQTT_CONTROL_PUBCOMP: + msg->state = MQTT_QUEUED_COMPLETE; + break; case MQTT_CONTROL_DISCONNECT: msg->state = MQTT_QUEUED_COMPLETE; + if (client->user_callback != NULL) { + /* call disconnect callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_DISCONNECTED, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBLISH: inspected = ( MQTT_PUBLISH_QOS_MASK & (msg->start[0]) ) >> 1; /* qos */ if (inspected == 0) { msg->state = MQTT_QUEUED_COMPLETE; + if (client->user_callback != NULL) { + /* call publish callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + } } else if (inspected == 1) { msg->state = MQTT_QUEUED_AWAITING_ACK; /*set DUP flag for subsequent sends [Spec MQTT-3.3.1-1] */ @@ -731,8 +756,18 @@ ssize_t __mqtt_recv(struct mqtt_client *client) client->error = MQTT_ERROR_CONNECTION_REFUSED; mqtt_recv_ret = MQTT_ERROR_CONNECTION_REFUSED; } + if (client->user_callback != NULL) { + /* call connection refused callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_CONNECTION_REFUSED, &data, &client->user_callback_state); + } break; } + if (client->user_callback != NULL) { + /* call connected callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_CONNECTED, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBLISH: /* stage response, none if qos==0, PUBACK if qos==1, PUBREC if qos==2 */ @@ -756,8 +791,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) break; } } - /* call publish callback */ - client->publish_response_callback(&client->publish_response_callback_state, &response.decoded.publish); + if (client->user_callback != NULL) { + /* call receive callback */ + union MQTTCallbackData data = {.received_msg = &response.decoded.publish}; + client->user_callback(client, MQTT_EVENT_RECEIVE, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBACK: /* release associated PUBLISH */ @@ -770,6 +808,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); + if (client->user_callback != NULL) { + /* call publish callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBREC: /* check if this is a duplicate */ @@ -793,6 +836,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) mqtt_recv_ret = rv; break; } + if (client->user_callback != NULL) { + /* call publish callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PUBREL: /* release associated PUBREC */ @@ -842,6 +890,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) mqtt_recv_ret = MQTT_ERROR_SUBSCRIBE_FAILED; break; } + if (client->user_callback != NULL) { + /* call subscribed callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_SUBSCRIBE, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_UNSUBACK: /* release associated UNSUBSCRIBE */ @@ -854,6 +907,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); + if (client->user_callback != NULL) { + /* call unsubscribed callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_UNSUBSCRIBE, &data, &client->user_callback_state); + } break; case MQTT_CONTROL_PINGRESP: /* release associated PINGREQ */ @@ -866,6 +924,11 @@ ssize_t __mqtt_recv(struct mqtt_client *client) msg->state = MQTT_QUEUED_COMPLETE; /* update response time */ client->typical_response_time = 0.875 * (client->typical_response_time) + 0.125 * (double) (MQTT_PAL_TIME() - msg->time_sent); + if (client->user_callback != NULL) { + /* call ping callback */ + union MQTTCallbackData data = {.queued_msg = msg}; + client->user_callback(client, MQTT_EVENT_PING, &data, &client->user_callback_state); + } break; default: client->error = MQTT_ERROR_MALFORMED_RESPONSE; diff --git a/tests.c b/tests.c index e5233b5..0727111 100644 --- a/tests.c +++ b/tests.c @@ -612,7 +612,10 @@ static void TEST__utility__pid_lfsr(void **unused) { assert_true(period == 65535u); } -void publish_callback(void** state, struct mqtt_response_publish *publish) { +void publish_callback(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state) { + (void) client; + (void) event; + (void) data; /*char *name = (char*) malloc(publish->topic_name_size + 1); memcpy(name, publish->topic_name, publish->topic_name_size); name[publish->topic_name_size] = '\0'; @@ -621,7 +624,7 @@ void publish_callback(void** state, struct mqtt_response_publish *publish) { (const char*) (publish->application_message) ); free(name);*/ - **(int**)state += 1; + **(int**)user_state += 1; } static void TEST__api__connect_ping_disconnect(void **unused) { @@ -679,7 +682,7 @@ static void TEST__api__publish_subscribe__single(void **unused) { sockfd = open_nb_socket(addr, port); mqtt_init(&receiver, sockfd, sendmem2, sizeof(sendmem2), recvmem2, sizeof(recvmem2), publish_callback); - receiver.publish_response_callback_state = &state; + receiver.user_callback_state = &state; /* connect both */ assert_true(mqtt_connect(&sender, "liam-123", NULL, NULL, 0, NULL, NULL, 0, 30) > 0); @@ -742,7 +745,7 @@ static void TEST__api__publish_subscribe__multiple(void **unused) { sockfd = open_nb_socket(addr, port); mqtt_init(&receiver, sockfd, sendmem2, sizeof(sendmem2), recvmem2, sizeof(recvmem2), publish_callback); - receiver.publish_response_callback_state = &state; + receiver.user_callback_state = &state; /* connect both */ if ((rv = mqtt_connect(&sender, "liam-123", NULL, NULL, 0, NULL, NULL, MQTT_CONNECT_CLEAN_SESSION, 30)) <= 0) { From c589ff4d58dbf9908e0a3a26e50ed26ab0431770 Mon Sep 17 00:00:00 2001 From: Rafael Silva Date: Wed, 15 Dec 2021 12:45:11 +0000 Subject: [PATCH 3/4] fix #150 Signed-off-by: Rafael Silva --- include/mqtt.h | 1 + src/mqtt.c | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/include/mqtt.h b/include/mqtt.h index 07c7674..9513c6c 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -206,6 +206,7 @@ struct mqtt_fixed_header { MQTT_ERROR(MQTT_ERROR_INITIAL_RECONNECT) \ MQTT_ERROR(MQTT_ERROR_INVALID_REMAINING_LENGTH) \ MQTT_ERROR(MQTT_ERROR_CLEAN_SESSION_IS_REQUIRED) \ + MQTT_ERROR(MQTT_ERROR_RECONNECT_FAILED) \ MQTT_ERROR(MQTT_ERROR_RECONNECTING) /* todo: add more connection refused errors */ diff --git a/src/mqtt.c b/src/mqtt.c index 9f13621..1172133 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -39,7 +39,15 @@ enum MQTTErrors mqtt_sync(struct mqtt_client *client) { MQTT_PAL_MUTEX_LOCK(&client->mutex); if (client->error != MQTT_ERROR_RECONNECTING && client->error != MQTT_OK && client->user_callback != NULL) { client->user_callback(client, MQTT_EVENT_RECONNECT, NULL, &client->user_callback_state); - /* unlocked during CONNECT */ + if (client->error != MQTT_OK) { + client->error = MQTT_ERROR_RECONNECT_FAILED; + + /* normally unlocked during CONNECT */ + MQTT_PAL_MUTEX_UNLOCK(&client->mutex); + } + + err = client->error; + if (err != MQTT_OK) goto ERR; } else { /* mqtt_reconnect will have queued the disconnect packet - that needs to be sent and then call reconnect */ if (client->error == MQTT_ERROR_RECONNECTING) { From 38f31accc64446aa485381c8914be7d9f3a35015 Mon Sep 17 00:00:00 2001 From: Rafael Silva Date: Wed, 22 Dec 2021 15:28:33 +0000 Subject: [PATCH 4/4] Add event enable/disable feature Signed-off-by: Rafael Silva --- include/mqtt.h | 78 ++++++++++++++++++++++++++++++++++++++++---------- src/mqtt.c | 31 +++++++++++++++----- 2 files changed, 87 insertions(+), 22 deletions(-) diff --git a/include/mqtt.h b/include/mqtt.h index 9513c6c..5ff29bb 100644 --- a/include/mqtt.h +++ b/include/mqtt.h @@ -1090,19 +1090,36 @@ struct mqtt_queued_message* mqtt_mq_find(struct mqtt_message_queue *mq, enum MQT * @ingroup details */ enum MQTTCallbackEvent { - MQTT_EVENT_RECONNECT, - MQTT_EVENT_CONNECTION_REFUSED, - MQTT_EVENT_CONNECTED, - MQTT_EVENT_DISCONNECTED, - MQTT_EVENT_RECEIVE, - MQTT_EVENT_PUBLISH, - MQTT_EVENT_SUBSCRIBE, - MQTT_EVENT_UNSUBSCRIBE, - MQTT_EVENT_PING, - MQTT_EVENT_PUBLISH_TIMEOUT, - MQTT_EVENT_ERROR + MQTT_EVENT_RECONNECT = (1 << 0), // bit 0 + MQTT_EVENT_CONNECTION_REFUSED = (1 << 1), // bit 1 + MQTT_EVENT_CONNECTED = (1 << 2), // bit 2 + MQTT_EVENT_DISCONNECTED = (1 << 3), // bit 3 + MQTT_EVENT_RECEIVED = (1 << 4), // bit 4 + MQTT_EVENT_PUBLISHED = (1 << 5), // bit 5 + MQTT_EVENT_SUBSCRIBED = (1 << 6), // bit 6 + MQTT_EVENT_UNSUBSCRIBED = (1 << 7), // bit 7 + MQTT_EVENT_PING = (1 << 8), // bit 8 + MQTT_EVENT_PUBLISH_TIMEOUT = (1 << 9), // bit 9 + MQTT_EVENT_ERROR = (1 << 10), // bit 10 }; +//TODO: brief +/** + * @brief + * + */ +#define MQTT_EVENT_MASK (MQTT_EVENT_RECONNECT | \ + MQTT_EVENT_CONNECTION_REFUSED | \ + MQTT_EVENT_CONNECTED | \ + MQTT_EVENT_DISCONNECTED | \ + MQTT_EVENT_RECEIVED | \ + MQTT_EVENT_PUBLISHED | \ + MQTT_EVENT_SUBSCRIBED | \ + MQTT_EVENT_UNSUBSCRIBED | \ + MQTT_EVENT_PING | \ + MQTT_EVENT_PUBLISH_TIMEOUT | \ + MQTT_EVENT_ERROR) + /** * @brief union to serve as proxy to multiple datatypes on one pointer. * @ingroup details @@ -1189,7 +1206,7 @@ struct mqtt_client { * * Any topics that you have subscribed to will be returned from the broker as * mqtt_response_publish messages. All the publishes received from the broker will - * be passed to this function on a MQTT_EVENT_RECEIVE. + * be passed to this function on a MQTT_EVENT_RECEIVED. * * - reconnect is called whenever the client enters an error state * that requires reinitialization. @@ -1207,7 +1224,7 @@ struct mqtt_client { * * - publish is called whenver a message WE published is successful, ie acknowledged * - * MQTT_EVENT_PUBLISH is called when + * MQTT_EVENT_PUBLISHED is called when * on QoS == 0: when the message is sent * on QoS == 1: when the message is acknowledged by the broker * on QoS == 2: when the message is acknowledged by the broker @@ -1217,8 +1234,8 @@ struct mqtt_client { * * - (un)subscribe is called when a (un)subscription is ackowledged * - * MQTT_EVENT_SUBSCRIBE on sub - * MQTT_EVENT_UNSUBSCRIBE on unsub + * MQTT_EVENT_SUBSCRIBED on sub + * MQTT_EVENT_UNSUBSCRIBED on unsub * * - ping is called when we get a ping response * @@ -1259,6 +1276,14 @@ struct mqtt_client { */ enum MQTTErrors (*inspector_callback)(struct mqtt_client*); + /** + * @brief Event enable flag + * + * this is a bit field of the events, where each bit represents an event which is enabled when set to 1 + * the bit positions correspond to \ref enum MQTTCallbackEvent + */ + uint16_t event_enable; + /** * @brief The buffer where ingress data is temporarily stored. */ @@ -1348,6 +1373,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client); */ enum MQTTErrors mqtt_sync(struct mqtt_client *client); +//TODO: doc new fields /** * @brief Initializes an MQTT client. * @ingroup api @@ -1400,8 +1426,11 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, + uint16_t event_flags, + void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); +//TODO: doc new fields /** * @brief Briefly initializes an MQTT client, expecting full init in a reconnect event. * @ingroup api @@ -1440,6 +1469,7 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, * */ void mqtt_init_reconnect(struct mqtt_client *client, + uint16_t event_flags, void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)); @@ -1510,6 +1540,24 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client, todo: will_message should be a void* */ +//TODO: docs +/** + * @brief + * + * @param event_flags + * @return enum MQTTErrors + */ +void mqtt_event_enable(struct mqtt_client *client, uint16_t event_flags); + +//TODO: docs +/** + * @brief + * + * @param event_flags + * @return enum MQTTErrors + */ +void mqtt_event_disable(struct mqtt_client *client, uint16_t event_flags); + /** * @brief Publish an application message. * @ingroup api diff --git a/src/mqtt.c b/src/mqtt.c index 1172133..868e176 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -124,6 +124,8 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, mqtt_pal_socket_handle sockfd, uint8_t *sendbuf, size_t sendbufsz, uint8_t *recvbuf, size_t recvbufsz, + uint16_t event_flags, + void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { if (client == NULL || sendbuf == NULL || recvbuf == NULL) { @@ -152,13 +154,17 @@ enum MQTTErrors mqtt_init(struct mqtt_client *client, client->send_offset = 0; client->inspector_callback = NULL; - client->user_callback_state = NULL; + client->user_callback_state = callback_state; client->user_callback = callback; + /* RECEIVED event enabled by default */ + client->event_enable = MQTT_EVENT_RECEIVED | (event_flags & MQTT_EVENT_MASK); + return MQTT_OK; } void mqtt_init_reconnect(struct mqtt_client *client, + uint16_t event_flags, void *callback_state, void (*callback)(struct mqtt_client* client, enum MQTTCallbackEvent event, union MQTTCallbackData* data, void** user_state)) { @@ -184,6 +190,9 @@ void mqtt_init_reconnect(struct mqtt_client *client, client->inspector_callback = NULL; client->user_callback_state = callback_state; client->user_callback = callback; + + /* RECEIVED and RECONNECT event enabled by default */ + client->event_enable = MQTT_EVENT_RECEIVED | MQTT_EVENT_RECONNECT | (event_flags & MQTT_EVENT_MASK); } void mqtt_reinit(struct mqtt_client* client, @@ -274,6 +283,14 @@ enum MQTTErrors mqtt_connect(struct mqtt_client *client, return MQTT_OK; } +void mqtt_event_enable(struct mqtt_client *client, uint16_t event_flags) { + client->event_enable |= (event_flags & MQTT_EVENT_MASK); +} + +void mqtt_event_disable(struct mqtt_client *client, uint16_t event_flags) { + client->event_enable &= ~(event_flags & MQTT_EVENT_MASK); +} + enum MQTTErrors mqtt_publish(struct mqtt_client *client, const char* topic_name, const void* application_message, @@ -628,7 +645,7 @@ ssize_t __mqtt_send(struct mqtt_client *client) if (client->user_callback != NULL) { /* call publish callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state); } } else if (inspected == 1) { msg->state = MQTT_QUEUED_AWAITING_ACK; @@ -802,7 +819,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call receive callback */ union MQTTCallbackData data = {.received_msg = &response.decoded.publish}; - client->user_callback(client, MQTT_EVENT_RECEIVE, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_RECEIVED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PUBACK: @@ -819,7 +836,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call publish callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PUBREC: @@ -847,7 +864,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call publish callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_PUBLISH, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_PUBLISHED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PUBREL: @@ -901,7 +918,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call subscribed callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_SUBSCRIBE, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_SUBSCRIBED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_UNSUBACK: @@ -918,7 +935,7 @@ ssize_t __mqtt_recv(struct mqtt_client *client) if (client->user_callback != NULL) { /* call unsubscribed callback */ union MQTTCallbackData data = {.queued_msg = msg}; - client->user_callback(client, MQTT_EVENT_UNSUBSCRIBE, &data, &client->user_callback_state); + client->user_callback(client, MQTT_EVENT_UNSUBSCRIBED, &data, &client->user_callback_state); } break; case MQTT_CONTROL_PINGRESP: