From dc7edfd7f6ecc94fc5f9a016408d4f96c356a7ff Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Mon, 29 Jul 2024 03:55:07 -0400 Subject: [PATCH 1/4] * NEW [tls] Copy scram changes in tcp layer to tls layer. Signed-off-by: wanghaemq --- src/mqtt/transport/tcp/mqtt_tcp.c | 3 +- src/mqtt/transport/tls/mqtt_tls.c | 277 ++++++++++++++++++++++++++---- 2 files changed, 246 insertions(+), 34 deletions(-) diff --git a/src/mqtt/transport/tcp/mqtt_tcp.c b/src/mqtt/transport/tcp/mqtt_tcp.c index 55ac5734..0fe3563e 100644 --- a/src/mqtt/transport/tcp/mqtt_tcp.c +++ b/src/mqtt/transport/tcp/mqtt_tcp.c @@ -448,7 +448,8 @@ mqtt_tcptran_pipe_nego_cb(void *arg) } #endif property *prop = nni_mqtt_msg_get_connack_property(p->rxmsg); - property_dup((property **)&ep->property, prop); + if (property_dup((property **) &ep->property, prop) != 0) + goto mqtt_error; property_data *data; data = property_get_value(ep->property, RECEIVE_MAXIMUM); if (data) { diff --git a/src/mqtt/transport/tls/mqtt_tls.c b/src/mqtt/transport/tls/mqtt_tls.c index 63621613..100b314e 100644 --- a/src/mqtt/transport/tls/mqtt_tls.c +++ b/src/mqtt/transport/tls/mqtt_tls.c @@ -13,9 +13,17 @@ #include "core/nng_impl.h" #include "nng/mqtt/mqtt_client.h" +#include "nng/supplemental/nanolib/log.h" #include "nng/supplemental/tls/tls.h" #include "supplemental/mqtt/mqtt_msg.h" +#ifdef SUPP_SCRAM +#define SCRAM_ITERATION_CNT_DEFAULT 4096 +#define SCRAM_DIGEST_DEFAULT SCRAM_SHA256 +#define SCRAM_DIGEST_STR_DEFAULT "SCRAM-SHA-256" +#include "supplemental/scram/scram.h" +#endif + // TLS Over TCP transport. Platform specific TLS Over TCP operations must be // supplied as well. @@ -85,8 +93,13 @@ struct mqtts_tcptran_ep { nng_stream_dialer * dialer; nng_stream_listener *listener; nni_dialer * ndialer; - void * connmsg; void * property; // property + void * connmsg; + bool enable_scram; +#ifdef SUPP_SCRAM + void * scram_ctx; + nni_msg * authmsg; +#endif #ifdef NNG_ENABLE_STATS nni_stat_item st_rcv_max; @@ -153,11 +166,14 @@ static int mqtts_tcptran_pipe_init(void *arg, nni_pipe *npipe) { mqtts_tcptran_pipe *p = arg; - p->npipe = npipe; + + p->npipe = npipe; // nni_lmq_init(&p->rslmq, 1024); - p->packmax = 0xFFFF; - p->qosmax = 2; + // set max value by default + p->packmax == 0 ? p->packmax = (uint32_t)0xFFFFFFFF : p->packmax; + p->qosmax == 0 ? p->qosmax = 2 : p->qosmax; + return (0); } @@ -186,6 +202,7 @@ mqtts_tcptran_pipe_fini(void *arg) nni_msg_free(p->rxmsg); // nni_lmq_fini(&p->rslmq); nni_mtx_fini(&p->mtx); + NNI_FREE_STRUCT(p); } @@ -242,7 +259,6 @@ mqtts_tcptran_ep_match(mqtts_tcptran_ep *ep) nni_list_remove(&ep->waitpipes, p); nni_list_append(&ep->busypipes, p); ep->useraio = NULL; - nni_aio_set_output(aio, 0, p); nni_aio_finish(aio, 0, 0); } @@ -255,8 +271,8 @@ mqtts_tcptran_pipe_nego_cb(void *arg) nni_aio * aio = p->negoaio; nni_aio * uaio; int rv; - uint8_t pos = 0; int var_int; + uint8_t pos = 0; nni_mtx_lock(&ep->mtx); @@ -298,8 +314,10 @@ mqtts_tcptran_pipe_nego_cb(void *arg) nni_mtx_unlock(&ep->mtx); return; } - // only accept CONNACK msg - if ((p->rxlen[0] & CMD_CONNACK) != CMD_CONNACK) { + // only accept CONNACK/AUTH msg + if (((p->rxlen[0] & CMD_CONNACK) != CMD_CONNACK) && + ((p->rxlen[0] & CMD_AUTH_V5) != CMD_AUTH_V5)) { + log_error("Invalid type received %x %x", p->rxlen[0], p->rxlen[1]); rv = PROTOCOL_ERROR; goto error; } @@ -327,7 +345,7 @@ mqtts_tcptran_pipe_nego_cb(void *arg) goto error; } } - // remaining length + // got remaining length if (p->gotrxhead < p->wantrxhead) { nni_iov iov; iov.iov_len = p->wantrxhead - p->gotrxhead; @@ -337,57 +355,169 @@ mqtts_tcptran_pipe_nego_cb(void *arg) nni_mtx_unlock(&ep->mtx); return; } - // Connack + // Handle connack/auth if (p->gotrxhead >= p->wantrxhead) { if (p->proto == MQTT_PROTOCOL_VERSION_v5) { - rv = nni_mqttv5_msg_decode(p->rxmsg); + rv = nni_mqttv5_msg_decode(p->rxmsg); ep->reason_code = rv; if (rv != 0) goto mqtt_error; property_free(ep->property); - property *prop = - (void *) nni_mqtt_msg_get_connack_property( - p->rxmsg); +#ifdef SUPP_SCRAM + if (ep->scram_ctx && + nni_mqtt_msg_get_packet_type(p->rxmsg) == NNG_MQTT_AUTH) { + property *prop = nni_mqtt_msg_get_auth_property(p->rxmsg); + if (prop == NULL) { + ep->reason_code = MQTT_ERR_MALFORMED; + rv = MQTT_ERR_PROTOCOL; + log_error("No property found in AUTH msg"); + goto mqtt_error; + } + uint8_t rc = nni_mqtt_msg_get_auth_reason_code(p->rxmsg); + if (rc != 0x18) { + ep->reason_code = MQTT_ERR_MALFORMED; + rv = MQTT_ERR_PROTOCOL; + log_error("Reason code in AUTH msg is invalid"); + goto mqtt_error; + } + property_data *data = property_get_value(prop, AUTHENTICATION_DATA); + if (data == NULL || data->p_value.str.buf == NULL) { + ep->reason_code = MQTT_ERR_MALFORMED; + rv = MQTT_ERR_PROTOCOL; + log_error("No auth data property found in AUTH msg"); + goto mqtt_error; + } + printf("auth:server_first_msg:%.*s\n", + data->p_value.str.length, (char *)data->p_value.str.buf); + char *client_final_msg = scram_handle_server_first_msg( + ep->scram_ctx, (char *)data->p_value.str.buf, data->p_value.str.length); + if (client_final_msg == NULL) { + ep->reason_code = MQTT_ERR_MALFORMED; + rv = MQTT_ERR_PROTOCOL; + log_error("Error in handle scram server_first_msg"); + goto mqtt_error; + } + printf("auth:client_final_msg:%s\n", client_final_msg); + // TODO 0x19 Re-authenticate + // Prepare authmsg with client_final_msg + nni_msg *authmsg; + nni_mqtt_msg_alloc(&authmsg, 0); + nni_mqtt_msg_set_packet_type(authmsg, NNG_MQTT_AUTH); + nni_mqtt_msg_set_auth_reason_code(authmsg, 0x18); + property *props = mqtt_property_alloc(); + property *prop_auth_method = property_set_value_str( + AUTHENTICATION_METHOD, SCRAM_DIGEST_STR_DEFAULT, + strlen(SCRAM_DIGEST_STR_DEFAULT), true); + property *prop_auth_data = property_set_value_str( + AUTHENTICATION_DATA, client_final_msg, strlen(client_final_msg), true); + property_append(props, prop_auth_method); + property_append(props, prop_auth_data); + nni_mqtt_msg_set_auth_property(authmsg, props); + if (0 != nni_mqttv5_msg_encode(authmsg)) { + ep->reason_code = MQTT_ERR_MALFORMED; + rv = MQTT_ERR_PROTOCOL; + log_error("Error in encode auth msg with client_final_msg"); + goto mqtt_error; + } + if (ep->authmsg) + nng_msg_free(ep->authmsg); + ep->authmsg = authmsg; + nng_free(client_final_msg, 0); + // Update got/want to send client_final_msg and recv connack + nng_msg_free(p->rxmsg); + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 2; + p->wanttxhead = nni_msg_header_len(authmsg) + nni_msg_len(authmsg); + p->rxmsg = NULL; + + nni_iov iov[2]; + int niov = 0; + if (nni_msg_header_len(authmsg) > 0) { + iov[niov].iov_buf = nni_msg_header(authmsg); + iov[niov].iov_len = nni_msg_header_len(authmsg); + niov++; + } + if (nni_msg_len(authmsg) > 0) { + iov[niov].iov_buf = nni_msg_body(authmsg); + iov[niov].iov_len = nni_msg_len(authmsg); + niov++; + } + nni_aio_set_iov(aio, niov, iov); + nng_stream_send(p->conn, p->negoaio); + nni_mtx_unlock(&ep->mtx); + + return; + } +#endif + property *prop = nni_mqtt_msg_get_connack_property(p->rxmsg); if (property_dup((property **) &ep->property, prop) != 0) goto mqtt_error; property_data *data; data = property_get_value(ep->property, RECEIVE_MAXIMUM); if (data) { if (data->p_value.u16 == 0) { - rv = MQTT_ERR_PROTOCOL; + rv = MQTT_ERR_PROTOCOL; ep->reason_code = rv; goto mqtt_error; } else { p->sndmax = data->p_value.u16; } } - data = property_get_value( - ep->property, MAXIMUM_PACKET_SIZE); + data = property_get_value(ep->property, MAXIMUM_PACKET_SIZE); if (data) { if (data->p_value.u32 == 0) { - rv = MQTT_ERR_PROTOCOL; + rv = MQTT_ERR_PROTOCOL; ep->reason_code = rv; goto mqtt_error; } else { p->packmax = data->p_value.u32; + log_info("Set max packet size as %ld", p->packmax); } } - data = property_get_value( - ep->property, PUBLISH_MAXIMUM_QOS); + data = property_get_value(ep->property, PUBLISH_MAXIMUM_QOS); if (data) { p->qosmax = data->p_value.u8; } +#ifdef SUPP_SCRAM + data = property_get_value(ep->property, AUTHENTICATION_DATA); + if (data && data->p_value.str.buf && ep->scram_ctx) { + char *server_final_msg = (char *)data->p_value.str.buf; + printf("auth:server_final_msg:%.*s\n", + data->p_value.str.length, server_final_msg); + char *result = scram_handle_server_final_msg( + ep->scram_ctx, server_final_msg, data->p_value.str.length); + if (result == NULL) { + log_error("Enhanced Authentication failed"); + rv = MQTT_ERR_PROTOCOL; + ep->reason_code = rv; + // Failed so closed the connection + goto error; + } else { + log_info("Enhanced Authentication Passed"); + } + } else if (ep->scram_ctx) { + // We want a authenticate response. but not found + log_error("Enhanced Authentication failed"); + rv = MQTT_ERR_PROTOCOL; + ep->reason_code = rv; + goto error; + } else { + // No more action + } +#endif + + } else { - if ((rv = nni_mqtt_msg_decode(p->rxmsg)) != - MQTT_SUCCESS) { + if ((rv = nni_mqtt_msg_decode(p->rxmsg)) != MQTT_SUCCESS) { ep->reason_code = rv; goto mqtt_error; } ep->property = NULL; } - ep->reason_code = - nni_mqtt_msg_get_connack_return_code(p->rxmsg); + ep->reason_code = nni_mqtt_msg_get_connack_return_code(p->rxmsg); } + mqtt_error: // We are ready now. We put this in the wait list, and // then try to run the matcher. @@ -720,7 +850,7 @@ mqtts_tcptran_pipe_send_start(mqtts_tcptran_pipe *p) if (p->closed) { while ((aio = nni_list_first(&p->sendq)) != NULL) { nni_list_remove(&p->sendq, aio); - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_aio_finish_error(aio, SERVER_SHUTTING_DOWN); } return; } @@ -740,19 +870,19 @@ mqtts_tcptran_pipe_send_start(mqtts_tcptran_pipe *p) if (qos > 0) p->sndmax --; if (qos > p->qosmax) { - p->qosmax == 1? (*header &= 0XF9) & (*header |= 0X02):*header; + p->qosmax == 1? (*header &= 0XF9) & (*header |= 0X02): NNI_ARG_UNUSED(*header); p->qosmax == 0? *header &= 0XF9:*header; } - - } - // check max packet size - if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { - txaio = p->txaio; - nni_aio_finish_error(txaio, UNSPECIFIED_ERROR); - return; } } + // check max packet size + if (nni_msg_header_len(msg) + nni_msg_len(msg) > p->packmax) { + txaio = p->txaio; + nni_aio_finish_error(txaio, UNSPECIFIED_ERROR); + return; + } + txaio = p->txaio; niov = 0; @@ -915,6 +1045,49 @@ mqtts_tcptran_pipe_start( rv = nni_mqtt_msg_encode(connmsg); else if (mqtt_version == MQTT_PROTOCOL_VERSION_v5) { property *prop = nni_mqtt_msg_get_connect_property(connmsg); +#ifdef SUPP_SCRAM + if (prop == NULL) + prop = mqtt_property_alloc(); + char *pwd = NULL, *username = NULL; + char *pwd2 = NULL, *username2 = NULL; + int pwdsz, usernamesz; + if (ep->enable_scram == true && + ((pwd = (char *)nni_mqtt_msg_get_connect_password(connmsg)) != NULL) && + ((username = (char *)nni_mqtt_msg_get_connect_user_name(connmsg)) != NULL)) { + pwdsz = nni_mqtt_msg_get_connect_password_len(connmsg); + usernamesz = nni_mqtt_msg_get_connect_user_name_len(connmsg); + pwd2 = strndup(pwd, pwdsz); + username2 = strndup(username, usernamesz); + if (ep->scram_ctx) { + scram_ctx_free(ep->scram_ctx); + } + ep->scram_ctx = scram_ctx_create(pwd2, strlen(pwd2), + SCRAM_ITERATION_CNT_DEFAULT, SCRAM_DIGEST_DEFAULT, 0); + } + if (ep->scram_ctx) { + property *prop_auth_method = property_set_value_str( + AUTHENTICATION_METHOD, SCRAM_DIGEST_STR_DEFAULT, + strlen(SCRAM_DIGEST_STR_DEFAULT), true); + char *client_first_msg = scram_client_first_msg(ep->scram_ctx, username2); + property *prop_auth_data = property_set_value_str( + AUTHENTICATION_DATA, client_first_msg, strlen(client_first_msg), true); + property_append(prop, prop_auth_method); + property_append(prop, prop_auth_data); + nni_mqtt_msg_set_connect_property(connmsg, prop); + prop = NULL; + printf("auth:client_first_msg:%s\n", client_first_msg); + //property_free(prop_auth_method); + //property_free(prop_auth_data); + } + if (pwd2) + nng_free(pwd2, 0); + if (username2) + nng_free(username2, 0); + if (prop) { + mqtt_property_free(prop); + prop = NULL; + } +#endif property_data *data; data = property_get_value(prop, MAXIMUM_PACKET_SIZE); if (data) @@ -975,6 +1148,15 @@ mqtts_tcptran_ep_fini(void *arg) return; } nni_mtx_unlock(&ep->mtx); + +#ifdef SUPP_SCRAM + if (ep->authmsg) + nni_msg_free(ep->authmsg); + ep->authmsg = NULL; + if (ep->scram_ctx) + scram_ctx_free(ep->scram_ctx); +#endif + nni_aio_stop(ep->timeaio); nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); @@ -1250,6 +1432,11 @@ mqtts_tcptran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) } ep->ndialer = ndialer; ep->authmode = NNG_TLS_AUTH_MODE_REQUIRED; + ep->enable_scram = false; +#ifdef SUPP_SCRAM + ep->scram_ctx = NULL; + ep->authmsg = NULL; +#endif if ((rv != 0) || ((rv = nng_stream_dialer_alloc_url(&ep->dialer, &myurl)) != 0)) { @@ -1482,6 +1669,26 @@ mqtts_tcptran_ep_set_reconnect_backoff(void *arg, const void *v, size_t sz, nni_ return (rv); } +static int +mqtts_tcptran_ep_set_enable_scram(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + mqtts_tcptran_ep *ep = arg; + bool tmp; + int rv; + + if ((rv = nni_copyin_bool(&tmp, v, sz, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->enable_scram = tmp; +#ifdef SUPP_SCRAM + log_info("Auth SCRAM status: %s", tmp == 1 ? "Enabled":"Disabled"); +#else + log_warn("Auth SCRAM Error. Try to compile with NNG_ENABLE_SCRAM"); +#endif + nni_mtx_unlock(&ep->mtx); + } + return (rv); +} + static int mqtts_tcptran_ep_bind(void *arg) { @@ -1563,6 +1770,10 @@ static const nni_option mqtts_tcptran_ep_opts[] = { .o_name = NNG_OPT_URL, .o_get = mqtts_tcptran_ep_get_url, }, + { + .o_name = NNG_OPT_MQTT_ENABLE_SCRAM, + .o_set = mqtts_tcptran_ep_set_enable_scram, + }, // terminate list { .o_name = NULL, From 696945ce24919b94e2a62ac859b7ffb518621a11 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Mon, 29 Jul 2024 04:01:50 -0400 Subject: [PATCH 2/4] * FIX [tls] Fix the error of niov. Signed-off-by: wanghaemq --- src/mqtt/transport/tls/mqtt_tls.c | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/mqtt/transport/tls/mqtt_tls.c b/src/mqtt/transport/tls/mqtt_tls.c index 100b314e..a8d1556a 100644 --- a/src/mqtt/transport/tls/mqtt_tls.c +++ b/src/mqtt/transport/tls/mqtt_tls.c @@ -387,7 +387,7 @@ mqtts_tcptran_pipe_nego_cb(void *arg) log_error("No auth data property found in AUTH msg"); goto mqtt_error; } - printf("auth:server_first_msg:%.*s\n", + log_debug("auth:server_first_msg:%.*s\n", data->p_value.str.length, (char *)data->p_value.str.buf); char *client_final_msg = scram_handle_server_first_msg( ep->scram_ctx, (char *)data->p_value.str.buf, data->p_value.str.length); @@ -397,7 +397,7 @@ mqtts_tcptran_pipe_nego_cb(void *arg) log_error("Error in handle scram server_first_msg"); goto mqtt_error; } - printf("auth:client_final_msg:%s\n", client_final_msg); + log_debug("auth:client_final_msg:%s\n", client_final_msg); // TODO 0x19 Re-authenticate // Prepare authmsg with client_final_msg nni_msg *authmsg; @@ -416,7 +416,7 @@ mqtts_tcptran_pipe_nego_cb(void *arg) if (0 != nni_mqttv5_msg_encode(authmsg)) { ep->reason_code = MQTT_ERR_MALFORMED; rv = MQTT_ERR_PROTOCOL; - log_error("Error in encode auth msg with client_final_msg"); + log_error("Error in encode auth msg with client_final_msg\n"); goto mqtt_error; } if (ep->authmsg) @@ -433,12 +433,9 @@ mqtts_tcptran_pipe_nego_cb(void *arg) nni_iov iov[2]; int niov = 0; - if (nni_msg_header_len(authmsg) > 0) { - iov[niov].iov_buf = nni_msg_header(authmsg); - iov[niov].iov_len = nni_msg_header_len(authmsg); - niov++; - } if (nni_msg_len(authmsg) > 0) { + nni_msg_insert(authmsg, nni_msg_header(authmsg), + nni_msg_header_len(authmsg)); iov[niov].iov_buf = nni_msg_body(authmsg); iov[niov].iov_len = nni_msg_len(authmsg); niov++; @@ -483,7 +480,7 @@ mqtts_tcptran_pipe_nego_cb(void *arg) data = property_get_value(ep->property, AUTHENTICATION_DATA); if (data && data->p_value.str.buf && ep->scram_ctx) { char *server_final_msg = (char *)data->p_value.str.buf; - printf("auth:server_final_msg:%.*s\n", + log_debug("auth:server_final_msg:%.*s\n", data->p_value.str.length, server_final_msg); char *result = scram_handle_server_final_msg( ep->scram_ctx, server_final_msg, data->p_value.str.length); @@ -1075,7 +1072,7 @@ mqtts_tcptran_pipe_start( property_append(prop, prop_auth_data); nni_mqtt_msg_set_connect_property(connmsg, prop); prop = NULL; - printf("auth:client_first_msg:%s\n", client_first_msg); + log_debug("auth:client_first_msg:%s\n", client_first_msg); //property_free(prop_auth_method); //property_free(prop_auth_data); } From bda52215bb326e3a84bb9789784b7c1f9d4bd99c Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Mon, 29 Jul 2024 04:03:07 -0400 Subject: [PATCH 3/4] * NEW [scram] add tls supported in scram demo. Signed-off-by: wanghaemq --- demo/mqttv5_scram/mqttv5_scram.c | 192 +++++++++++++++++++++++++++++-- 1 file changed, 185 insertions(+), 7 deletions(-) diff --git a/demo/mqttv5_scram/mqttv5_scram.c b/demo/mqttv5_scram/mqttv5_scram.c index 8c5f17bc..2609ea4b 100644 --- a/demo/mqttv5_scram/mqttv5_scram.c +++ b/demo/mqttv5_scram/mqttv5_scram.c @@ -28,6 +28,7 @@ // #include +#include #include #include #include @@ -38,10 +39,13 @@ #include #include #include +#include // Subcommands -#define PUBLISH "pub" -#define SUBSCRIBE "sub" +#define PUBLISH "pub" +#define SUBSCRIBE "sub" +#define TLS_PUBLISH "pubtls" +#define TLS_SUBSCRIBE "subtls" void fatal(const char *msg, int rv) @@ -87,9 +91,97 @@ connect_cb(nng_pipe p, nng_pipe_ev ev, void *arg) (void) arg; } +void loadfile(const char *path, void **datap, size_t *lenp) +{ + FILE * f; + size_t total_read = 0; + size_t allocation_size = BUFSIZ; + char * fdata; + char * realloc_result; + + if ((f = fopen(path, "rb")) == NULL) { + fprintf(stderr, "Cannot open file %s: %s", path, + strerror(errno)); + exit(1); + } + + if ((fdata = malloc(allocation_size + 1)) == NULL) { + fprintf(stderr, "Out of memory."); + } + + while (1) { + total_read += fread( + fdata + total_read, 1, allocation_size - total_read, f); + if (ferror(f)) { + if (errno == EINTR) { + continue; + } + fprintf(stderr, "Read from %s failed: %s", path, + strerror(errno)); + exit(1); + } + if (feof(f)) { + break; + } + if (total_read == allocation_size) { + if (allocation_size > SIZE_MAX / 2) { + fprintf(stderr, "Out of memory."); + } + allocation_size *= 2; + if ((realloc_result = realloc( + fdata, allocation_size + 1)) == NULL) { + free(fdata); + fprintf(stderr, "Out of memory."); + exit(1); + } + fdata = realloc_result; + } + } + if (f != stdin) { + fclose(f); + } + fdata[total_read] = '\0'; + *datap = fdata; + *lenp = total_read; +} + +int init_dialer_tls(nng_dialer d, const char *cacert, const char *cert, + const char *key, const char *pass) +{ + nng_tls_config *cfg; + int rv; + + if ((rv = nng_tls_config_alloc(&cfg, NNG_TLS_MODE_CLIENT)) != 0) { + return (rv); + } + + if (cert != NULL && key != NULL) { + nng_tls_config_auth_mode(cfg, NNG_TLS_AUTH_MODE_REQUIRED); + if ((rv = nng_tls_config_own_cert(cfg, cert, key, pass)) != + 0) { + goto out; + } + } else { + nng_tls_config_auth_mode(cfg, NNG_TLS_AUTH_MODE_NONE); + } + + if (cacert != NULL) { + if ((rv = nng_tls_config_ca_chain(cfg, cacert, NULL)) != 0) { + goto out; + } + } + + rv = nng_dialer_set_ptr(d, NNG_OPT_TLS_CONFIG, cfg); + +out: + nng_tls_config_free(cfg); + return (rv); +} + // Connect to the given address. int -client_connect(nng_socket *sock, const char *url, bool verbose) +client_connect(nng_socket *sock, const char *url, bool verbose, + int istls, const char *capath, const char *certpath, const char *keypath, const char *pwd) { nng_dialer dialer; int rv; @@ -111,7 +203,7 @@ client_connect(nng_socket *sock, const char *url, bool verbose) nng_msg *connmsg; nng_mqtt_msg_alloc(&connmsg, 0); nng_mqtt_msg_set_packet_type(connmsg, NNG_MQTT_CONNECT); - // To enable SCRAM, version be MQTTv5 + // To enable SCRAM, version must be MQTTv5 nng_mqtt_msg_set_connect_proto_version(connmsg, 5); nng_mqtt_msg_set_connect_keep_alive(connmsg, 600); nng_mqtt_msg_set_connect_user_name(connmsg, "admin"); @@ -144,6 +236,38 @@ client_connect(nng_socket *sock, const char *url, bool verbose) printf("%s\n", buff); } + // Init tls dialer + if(istls) + { + char *ca; size_t calen; + char *cert; size_t certlen; + char *key; size_t keylen; + if (capath) + loadfile(capath, (void**)&ca, &calen); + if (certpath) + loadfile(certpath, (void**)&cert, &certlen); + if (keypath) + loadfile(keypath, (void**)&key, &keylen); + if (capath && calen == 0) { + printf("init_dialer_tls: CA is unavailable"); + return -1; + } + if (certpath && certlen == 0) { + printf("init_dialer_tls: Cert is unavailable"); + return -1; + } + if (keypath && keylen == 0) { + printf("init_dialer_tls: Key is unavailable"); + return -1; + } + + if ((rv = init_dialer_tls(dialer, ca, cert, key, pwd)) != 0) + { + printf("init_dialer_tls: %s", nng_strerror(rv)); + return -1; + } + } + printf("Connecting to server ...\n"); nng_dialer_set_ptr(dialer, NNG_OPT_MQTT_CONNMSG, connmsg); nng_dialer_start(dialer, NNG_FLAG_NONBLOCK); @@ -352,11 +476,47 @@ main(const int argc, const char **argv) const char *exe = argv[0]; const char *cmd; + int istls = 0; + const char *ca = NULL, *cert = NULL, *key = NULL, *pwd = NULL; if (5 == argc && 0 == strcmp(argv[1], SUBSCRIBE)) { cmd = SUBSCRIBE; } else if (6 <= argc && 0 == strcmp(argv[1], PUBLISH)) { cmd = PUBLISH; + } else if (7 == argc && 0 == strcmp(argv[1], TLS_PUBLISH)) { + cmd = TLS_PUBLISH; + istls = 1; + ca = argv[6]; + } else if (9 == argc && 0 == strcmp(argv[1], TLS_PUBLISH)) { + cmd = TLS_PUBLISH; + istls = 1; + ca = argv[6]; + cert = argv[7]; + key = argv[8]; + } else if (10 == argc && 0 == strcmp(argv[1], TLS_PUBLISH)) { + cmd = TLS_PUBLISH; + istls = 1; + ca = argv[6]; + cert = argv[7]; + key = argv[8]; + pwd = argv[9]; + } else if (6 == argc && 0 == strcmp(argv[1], TLS_SUBSCRIBE)) { + cmd = TLS_SUBSCRIBE; + istls = 1; + ca = argv[5]; + } else if (8 == argc && 0 == strcmp(argv[1], TLS_SUBSCRIBE)) { + cmd = TLS_SUBSCRIBE; + istls = 1; + ca = argv[5]; + cert = argv[6]; + key = argv[7]; + } else if (9 == argc && 0 == strcmp(argv[1], TLS_SUBSCRIBE)) { + cmd = TLS_SUBSCRIBE; + istls = 1; + ca = argv[5]; + cert = argv[6]; + key = argv[7]; + pwd = argv[8]; } else { goto error; } @@ -370,7 +530,7 @@ main(const int argc, const char **argv) nng_duration retry = 10000; nng_socket_set_ms(sock, NNG_OPT_MQTT_RETRY_INTERVAL, retry); - client_connect(&sock, url, verbose); + client_connect(&sock, url, verbose, istls, ca, cert, key, pwd); nng_msleep(1000); signal(SIGINT, intHandler); @@ -471,7 +631,25 @@ main(const int argc, const char **argv) error: fprintf(stderr, "Usage: %s %s \n" - " %s %s \n", - exe, PUBLISH, exe, SUBSCRIBE); + " %s %s \n" + " %s %s \n" + " %s %s \n" + " %s %s \n" + " %s %s \n" + " %s %s \n" + " %s %s \n" + "Example: %s %s 'mqtt-tcp://127.0.0.1:1883' 1 topic\n" + " %s %s 'tls+mqtt-tcp://127.0.0.1:8883' 1 topic /etc/cert/ca.pem\n", + exe, PUBLISH, + exe, SUBSCRIBE, + exe, TLS_PUBLISH, + exe, TLS_PUBLISH, + exe, TLS_PUBLISH, + exe, TLS_SUBSCRIBE, + exe, TLS_SUBSCRIBE, + exe, TLS_SUBSCRIBE, + exe, SUBSCRIBE, + exe, TLS_SUBSCRIBE + ); return 1; } From 713dd3e2e276538745f55a890405ff0d879b6fe5 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Mon, 29 Jul 2024 04:13:32 -0400 Subject: [PATCH 4/4] * NEW [demo/scram] Add handler for demo tls_sub/tls_pub. Signed-off-by: wanghaemq --- demo/mqttv5_scram/mqttv5_scram.c | 83 ++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 3 deletions(-) diff --git a/demo/mqttv5_scram/mqttv5_scram.c b/demo/mqttv5_scram/mqttv5_scram.c index 2609ea4b..5b03df31 100644 --- a/demo/mqttv5_scram/mqttv5_scram.c +++ b/demo/mqttv5_scram/mqttv5_scram.c @@ -560,6 +560,28 @@ main(const int argc, const char **argv) nng_thread_create(&threads[i], publish_cb, ¶ms); } + for (i = 0; i < nthread; i++) { + nng_thread_destroy(threads[i]); + } + } else if (strcmp(TLS_PUBLISH, cmd) == 0) { + const char *data = argv[5]; + uint32_t interval = 0; + uint32_t nthread = 1; + + nng_thread *threads[nthread]; + + params.sock = &sock, params.topic = topic; + params.data = (uint8_t *) data; + params.data_len = strlen(data); + params.qos = qos; + params.interval = interval; + params.verbose = verbose; + + size_t i = 0; + for (i = 0; i < nthread; i++) { + nng_thread_create(&threads[i], publish_cb, ¶ms); + } + for (i = 0; i < nthread; i++) { nng_thread_destroy(threads[i]); } @@ -567,9 +589,64 @@ main(const int argc, const char **argv) nng_mqtt_topic_qos subscriptions[] = { { .qos = qos, - .topic = { + .topic = { + .buf = (uint8_t *) topic, + .length = strlen(topic), + }, + .nolocal = 1, + .rap = 1, + .retain_handling = 0, + }, + }; + nng_mqtt_topic unsubscriptions[] = { + { + .buf = (uint8_t *) topic, + .length = strlen(topic), + }, + }; + + property *plist = mqtt_property_alloc(); + mqtt_property_append(plist, + mqtt_property_set_value_varint( + SUBSCRIPTION_IDENTIFIER, 120)); + property *unsub_plist = NULL; + mqtt_property_dup(&unsub_plist, plist); + + // Sync subscription + // rv = nng_mqtt_subscribe(sock, subscriptions, 1, plist); + + // Asynchronous subscription + nng_mqtt_client *client = nng_mqtt_client_alloc(sock, &send_callback, NULL, true); + nng_mqtt_subscribe_async(client, subscriptions, + sizeof(subscriptions) / sizeof(nng_mqtt_topic_qos), plist); + + printf("Start receiving loop:\n"); + while (true) { + nng_msg *msg; + if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) { + fatal("nng_recvmsg", rv); + continue; + } + + // we should only receive publish messages + assert(nng_mqtt_msg_get_packet_type(msg) == NNG_MQTT_PUBLISH); + msg_recv_deal(msg, verbose); + } + + // Sync unsubscription + // rv = nng_mqtt_unsubscribe(sock, subscriptions, 1, plist); + // Asynchronous unsubscription + nng_mqtt_unsubscribe_async(client, unsubscriptions, + sizeof(unsubscriptions) / sizeof(nng_mqtt_topic), + unsub_plist); + nng_mqtt_client_free(client, true); + } else if (strcmp(TLS_SUBSCRIBE, cmd) == 0) { + nng_mqtt_topic_qos subscriptions[] = { + { + .qos = qos, + .topic = { .buf = (uint8_t *) topic, - .length = strlen(topic), + .length = strlen(topic), }, .nolocal = 1, .rap = 1, @@ -620,7 +697,7 @@ main(const int argc, const char **argv) nng_mqtt_client_free(client, true); } - // disconnect + // disconnect property *plist = mqtt_property_alloc(); property *p = mqtt_property_set_value_strpair( USER_PROPERTY, "aaa", strlen("aaa"), "aaa", strlen("aaa"), true);