diff --git a/.github/workflows/mosq__build.yml b/.github/workflows/mosq__build.yml index abf9dbb4bc..27ff2581c1 100644 --- a/.github/workflows/mosq__build.yml +++ b/.github/workflows/mosq__build.yml @@ -17,7 +17,8 @@ jobs: runs-on: ubuntu-22.04 container: espressif/idf:${{ matrix.idf_ver }} env: - TEST_DIR: components/mosquitto/examples/broker + TEST_DIR: components/mosquitto/examples + TARGET_TEST: broker TARGET_TEST_DIR: build_esp32_default steps: - name: Checkout esp-protocols @@ -29,14 +30,15 @@ jobs: run: | . ${IDF_PATH}/export.sh pip install idf-component-manager idf-build-apps --upgrade - python ci/build_apps.py ${TEST_DIR} - cd ${TEST_DIR} + python ci/build_apps.py -c ${TEST_DIR} -m components/mosquitto/.build-test-rules.yml + # upload only the target test artifacts + cd ${TEST_DIR}/${TARGET_TEST} ${GITHUB_WORKSPACE}/ci/clean_build_artifacts.sh `pwd`/${TARGET_TEST_DIR} zip -qur artifacts.zip ${TARGET_TEST_DIR} - uses: actions/upload-artifact@v4 with: name: mosq_target_esp32_${{ matrix.idf_ver }} - path: ${{ env.TEST_DIR }}/artifacts.zip + path: ${{ env.TEST_DIR }}/${{ env.TARGET_TEST }}/artifacts.zip if-no-files-found: error test_mosq: diff --git a/ci/check_copyright_ignore.txt b/ci/check_copyright_ignore.txt index e69de29bb2..1cd8798f5b 100644 --- a/ci/check_copyright_ignore.txt +++ b/ci/check_copyright_ignore.txt @@ -0,0 +1 @@ +components/mosquitto/examples/serverless_mqtt/components/libjuice/port/juice_random.c diff --git a/components/mosquitto/.build-test-rules.yml b/components/mosquitto/.build-test-rules.yml new file mode 100644 index 0000000000..e1f5846485 --- /dev/null +++ b/components/mosquitto/.build-test-rules.yml @@ -0,0 +1,3 @@ +components/mosquitto/examples/serverless_mqtt: + disable: + - if: IDF_TARGET not in ["esp32", "esp32s3", "esp32c3"] diff --git a/components/mosquitto/.cz.yaml b/components/mosquitto/.cz.yaml index f9b741cd0b..0b6507a505 100644 --- a/components/mosquitto/.cz.yaml +++ b/components/mosquitto/.cz.yaml @@ -3,6 +3,6 @@ commitizen: bump_message: 'bump(mosq): $current_version -> $new_version' pre_bump_hooks: python ../../ci/changelog.py mosquitto tag_format: mosq-v$version - version: 2.0.28~0 + version: 2.0.20 version_files: - idf_component.yml diff --git a/components/mosquitto/CHANGELOG.md b/components/mosquitto/CHANGELOG.md index bcc1c9a553..5c24047c45 100644 --- a/components/mosquitto/CHANGELOG.md +++ b/components/mosquitto/CHANGELOG.md @@ -1,5 +1,23 @@ +# Changelog + +## [2.0.20](https://github.com/espressif/esp-protocols/commits/mosq-v2.0.20) + +### Features + +- Upgrade to mosquitto v2.0.20 ([3b2c614d](https://github.com/espressif/esp-protocols/commit/3b2c614d)) +- Add support for on-message callback ([cdeab8f5](https://github.com/espressif/esp-protocols/commit/cdeab8f5)) +- Add example with two brokers synced on P2P ([d57b8c5b](https://github.com/espressif/esp-protocols/commit/d57b8c5b)) + +### Bug Fixes + +- Fix dependency issues moving esp-tls to public deps ([6cce87e4](https://github.com/espressif/esp-protocols/commit/6cce87e4)) + ## [2.0.28~0](https://github.com/espressif/esp-protocols/commits/mosq-v2.0.28_0) +### Warning + +Incorrect version number! This version published under `2.0.28~0` is based on upstream v2.0.18 + ### Features - Added support for TLS transport using ESP-TLS ([1af4bbe1](https://github.com/espressif/esp-protocols/commit/1af4bbe1)) diff --git a/components/mosquitto/CMakeLists.txt b/components/mosquitto/CMakeLists.txt index 1db843ec19..47e5d551d9 100644 --- a/components/mosquitto/CMakeLists.txt +++ b/components/mosquitto/CMakeLists.txt @@ -81,7 +81,8 @@ idf_component_register(SRCS ${m_srcs} PRIV_INCLUDE_DIRS port/priv_include port/priv_include/sys ${m_dir} ${m_src_dir} ${m_incl_dir} ${m_lib_dir} ${m_deps_dir} INCLUDE_DIRS ${m_incl_dir} port/include - PRIV_REQUIRES newlib esp-tls + REQUIRES esp-tls + PRIV_REQUIRES newlib ) target_compile_definitions(${COMPONENT_LIB} PRIVATE "WITH_BROKER") diff --git a/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt new file mode 100644 index 0000000000..c9935c9567 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/CMakeLists.txt @@ -0,0 +1,6 @@ +# The following five lines of boilerplate have to be in your project's +# CMakeLists in this exact order for cmake to work correctly +cmake_minimum_required(VERSION 3.16) + +include($ENV{IDF_PATH}/tools/cmake/project.cmake) +project(serverless_mqtt) diff --git a/components/mosquitto/examples/serverless_mqtt/README.md b/components/mosquitto/examples/serverless_mqtt/README.md new file mode 100644 index 0000000000..5ee8369e67 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/README.md @@ -0,0 +1,53 @@ +# Brokerless MQTT Example + +MQTT served by (two) mosquitto's running on two ESP chips. + +* Leverages MQTT connectivity between two private networks without cloud premisses. +* Creates two local MQTT servers (on ESP32x's) which are being synchronized over peer to peer connection (established via ICE protocol, by [libjuice](https://github.com/paullouisageneau/libjuice)). + +## How it works + +This example needs two ESP32 chipsets, that will create two separate Wi-Fi networks (IoT networks) used for IoT devices. +Each IoT network is served by an MQTT server (using mosquitto component). +This example will also synchronize these two MQTT brokers, as if there was only one IoT network with one broker. +This example creates a peer to peer connection between two chipsets to keep them synchronize. This connection utilizes libjuice (which implements a simplified ICE-UDP) to traverse NATs, which enabling direct connection between two private networks behind NATs. + +* Diagram + +![demo](serverless.png) + +Here's a step-by-step procedure of establishing this remote connection: +1) Initialize and start Wi-Fi AP (for IoT networks) and Wi-Fi station (for internet connection) +2) Start mosquitto broker on IoT network +3) Start libjuice to gather connection candidates +4) Synchronize using a public MQTT broker and exchange ICE descriptors +5) Establish ICE UDP connection between the two ESP32 chipsets +6) Start forwarding mqtt messages + - Each remote datagram (received from ICE-UDP channel) is re-published to the local MQTT server + - Each local MQTT message (received from mosquitto on_message callback) is sent in ICE-UDP datagram + +## How to use this example + +You need two ESP32 devices that support Wi-Fi station and Wi-Fi software access point. + +* Configure Wi-Fi credentials for both devices on both interfaces + * These devices would be deployed in distinct Wi-Fi environments, so the Wi-Fi station credentials would likely be different. + * They also create their own IoT network (on the soft-AP interface) Wi-Fi, so the AP credentials would likely be the same, suggesting the IoT networks will be keep synchronized (even though these are two distict Wi-Fi networks). +* Choose `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1` for one device and `CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2` for another. It's not important which device is PEER1, since the code is symmetric, but these two devices need to have different role. +* Optionally: You can use `idf.py` `-D` and `-B` flag to keep separate build directories and sdkconfigs for these two roles +``` +idf.py -B build1 -DSDKCONFIG=build1/sdkconfig menuconfig build flash monitor +``` +* Flash and run the two devices and wait for them to connect and synchronize. +* Now you can test MQTT connectivity, for example: + * Join PEER1 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics. + * Join PEER2 device's AP and connect to the MQTT broker with one or more clients, subscribing to one or more topics. + * Whenever you publish to a topic, all subscribed clients should receive the message, no matter which Wi-Fi network they're connected to. + +## Warning + +This example uses libjuice as a dependency: + +* libjuice (UDP Interactive Connectivity Establishment): https://github.com/paullouisageneau/libjuice + +which is distributed under Mozilla Public License v2.0. diff --git a/components/mosquitto/examples/serverless_mqtt/components/libjuice/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/components/libjuice/CMakeLists.txt new file mode 100644 index 0000000000..f7cf012d1b --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/components/libjuice/CMakeLists.txt @@ -0,0 +1,44 @@ +set(LIBJUICE_VERSION "73785387eafe15c02b6a210edb10f722474e8e14") +set(LIBJUICE_URL "https://github.com/paullouisageneau/libjuice/archive/${LIBJUICE_VERSION}.zip") + +set(libjuice_dir ${CMAKE_BINARY_DIR}/libjuice/libjuice-${LIBJUICE_VERSION}) + +# Fetch the library +if(NOT EXISTS ${libjuice_dir}) + message(STATUS "Downloading libjuice ${LIBJUICE_VERSION}...") + file(DOWNLOAD ${LIBJUICE_URL} ${CMAKE_BINARY_DIR}/libjuice.zip SHOW_PROGRESS) + execute_process(COMMAND unzip -o ${CMAKE_BINARY_DIR}/libjuice.zip -d ${CMAKE_BINARY_DIR}/libjuice + WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +endif() + +set(JUICE_SOURCES ${libjuice_dir}/src/addr.c + ${libjuice_dir}/src/agent.c + ${libjuice_dir}/src/base64.c + ${libjuice_dir}/src/conn.c + ${libjuice_dir}/src/conn_mux.c + ${libjuice_dir}/src/conn_poll.c + ${libjuice_dir}/src/conn_thread.c + ${libjuice_dir}/src/const_time.c + ${libjuice_dir}/src/crc32.c + ${libjuice_dir}/src/hash.c + ${libjuice_dir}/src/ice.c + ${libjuice_dir}/src/juice.c + ${libjuice_dir}/src/log.c + ${libjuice_dir}/src/server.c + ${libjuice_dir}/src/stun.c + ${libjuice_dir}/src/timestamp.c + ${libjuice_dir}/src/turn.c + ${libjuice_dir}/src/udp.c +# Use hmac from mbedtls and random numbers from esp_random: +# ${libjuice_dir}/src/hmac.c +# ${libjuice_dir}/src/random.c + ) + +idf_component_register(SRCS port/juice_random.c + ${JUICE_SOURCES} + INCLUDE_DIRS "include" "${libjuice_dir}/include" "${libjuice_dir}/include/juice" + REQUIRES esp_netif + PRIV_REQUIRES sock_utils) + +target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format") +set_source_files_properties(${libjuice_dir}/src/udp.c PROPERTIES COMPILE_FLAGS -Wno-unused-variable) diff --git a/components/mosquitto/examples/serverless_mqtt/components/libjuice/include/ifaddrs.h b/components/mosquitto/examples/serverless_mqtt/components/libjuice/include/ifaddrs.h new file mode 100644 index 0000000000..ba92bc72b8 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/components/libjuice/include/ifaddrs.h @@ -0,0 +1,13 @@ +/* + * SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#pragma once + +// Purpose of this header is to replace udp_sendto() to avoid name conflict with lwip +// added here since ifaddrs.h is included from juice_udp sources +#define udp_sendto juice_udp_sendto + +// other than that, let's just include the ifaddrs (from sock_utils) +#include_next "ifaddrs.h" diff --git a/components/mosquitto/examples/serverless_mqtt/components/libjuice/port/juice_random.c b/components/mosquitto/examples/serverless_mqtt/components/libjuice/port/juice_random.c new file mode 100644 index 0000000000..89c1c6bdaa --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/components/libjuice/port/juice_random.c @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2020 Paul-Louis Ageneau + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ +#include "esp_random.h" + +void juice_random(void *buf, size_t size) +{ + esp_fill_random(buf, size); +} + +void juice_random_str64(char *buf, size_t size) +{ + static const char chars64[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + size_t i = 0; + for (i = 0; i + 1 < size; ++i) { + uint8_t byte = 0; + juice_random(&byte, 1); + buf[i] = chars64[byte & 0x3F]; + } + buf[i] = '\0'; +} + +uint32_t juice_rand32(void) +{ + uint32_t r = 0; + juice_random(&r, sizeof(r)); + return r; +} + +uint64_t juice_rand64(void) +{ + uint64_t r = 0; + juice_random(&r, sizeof(r)); + return r; +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt new file mode 100644 index 0000000000..b757b72863 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/CMakeLists.txt @@ -0,0 +1,4 @@ +idf_component_register(SRCS "serverless_mqtt.c" + "wifi_connect.c" + INCLUDE_DIRS "." + REQUIRES libjuice nvs_flash mqtt json esp_wifi) diff --git a/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild new file mode 100644 index 0000000000..7e0e97de03 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/Kconfig.projbuild @@ -0,0 +1,85 @@ +menu "Example Configuration" + + menu "AP Configuration" + comment "AP Configuration" + + config EXAMPLE_AP_SSID + string "Wi-Fi SSID" + default "myssid" + help + Set the SSID of Wi-Fi ap interface. + + config EXAMPLE_AP_PASSWORD + string "Wi-Fi Password" + default "12345678" + help + Set the password of Wi-Fi ap interface. + + endmenu + + menu "STA Configuration" + comment "STA Configuration" + + config EXAMPLE_STA_SSID + string "WiFi Station SSID" + default "mystationssid" + help + SSID for the example's sta to connect to. + + config EXAMPLE_STA_PASSWORD + string "WiFi Station Password" + default "mystationpassword" + help + WiFi station password for the example to use. + endmenu + + config EXAMPLE_MQTT_BROKER_URI + string "MQTT Broker URL" + default "mqtt://mqtt.eclipseprojects.io" + help + URL of the mqtt broker use for synchronisation and exchanging + ICE connect info (description and candidates). + + config EXAMPLE_MQTT_SYNC_TOPIC + string "MQTT topic for synchronisation" + default "/topic/serverless_mqtt" + help + MQTT topic used fo synchronisation. + + config EXAMPLE_STUN_SERVER + string "Hostname of STUN server" + default "stun.l.google.com" + help + STUN server hostname. + + config EXAMPLE_MQTT_CLIENT_STACK_SIZE + int "Stack size for mqtt client" + default 16384 + help + Set stack size for the mqtt client. + Need more stack, since calling juice API from the handler. + + config EXAMPLE_MQTT_BROKER_PORT + int "port for the mosquitto to listen to" + default 1883 + help + This is a port which the local mosquitto uses. + + choice EXAMPLE_SERVERLESS_ROLE + prompt "Choose your role" + default EXAMPLE_SERVERLESS_ROLE_PEER1 + help + Choose either peer1 or peer2. + It's not very important which device is peer1 + (peer-1 sends sync messages, peer2 listens for them) + It is important that we have two peers, + one with peer1 config, another one with peer2 config + + config EXAMPLE_SERVERLESS_ROLE_PEER1 + bool "peer1" + + config EXAMPLE_SERVERLESS_ROLE_PEER2 + bool "peer2" + endchoice + +endmenu diff --git a/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml b/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml new file mode 100644 index 0000000000..e3297d5351 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/idf_component.yml @@ -0,0 +1,5 @@ +## IDF Component Manager Manifest File +dependencies: + espressif/mosquitto: + override_path: ../../.. + espressif/sock_utils: "*" diff --git a/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c new file mode 100644 index 0000000000..8dd0bee7c7 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/serverless_mqtt.c @@ -0,0 +1,374 @@ +/* + * SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include +#include "freertos/FreeRTOS.h" +#include "freertos/event_groups.h" +#include "mqtt_client.h" +#include "esp_wifi.h" +#include "esp_log.h" +#include "esp_random.h" +#include "esp_check.h" +#include "esp_sleep.h" +#include "mosq_broker.h" +#include "juice/juice.h" +#include "cJSON.h" + +#if defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1) +#define OUR_PEER "1" +#define THEIR_PEER "2" +#elif defined(CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER2) +#define OUR_PEER "2" +#define THEIR_PEER "1" +#endif + +#define PEER_SYNC0 BIT(0) +#define PEER_SYNC1 BIT(1) +#define PEER_SYNC2 BIT(2) +#define PEER_FAIL BIT(3) +#define PEER_GATHER_DONE BIT(4) +#define PEER_DESC_PUBLISHED BIT(5) +#define PEER_CONNECTED BIT(6) + +#define SYNC_BITS (PEER_SYNC1 | PEER_SYNC2 | PEER_FAIL) + +#define PUBLISH_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC OUR_PEER +#define SUBSCRIBE_SYNC_TOPIC CONFIG_EXAMPLE_MQTT_SYNC_TOPIC THEIR_PEER +#define MAX_BUFFER_SIZE JUICE_MAX_SDP_STRING_LEN + +typedef struct message_wrap { + uint16_t topic_len; + uint16_t data_len; + char data[]; +} __attribute__((packed)) message_wrap_t; + +static const char *TAG = "serverless_mqtt" OUR_PEER; +static char s_buffer[MAX_BUFFER_SIZE]; +static EventGroupHandle_t s_state = NULL; +static juice_agent_t *s_agent = NULL; +static cJSON *s_peer_desc_json = NULL; +static char *s_peer_desc = NULL; +static esp_mqtt_client_handle_t s_local_mqtt = NULL; + +char *wifi_get_ipv4(wifi_interface_t interface); +esp_err_t wifi_connect(void); +static esp_err_t sync_peers(void); +static esp_err_t create_candidates(void); +static esp_err_t create_local_client(void); +static esp_err_t create_local_broker(void); + +void app_main(void) +{ + __attribute__((__unused__)) esp_err_t ret; + ESP_GOTO_ON_ERROR(wifi_connect(), err, TAG, "Failed to initialize WiFi"); + ESP_GOTO_ON_ERROR(create_local_broker(), err, TAG, "Failed to create local broker"); + ESP_GOTO_ON_ERROR(create_candidates(), err, TAG, "Failed to create juice candidates"); + ESP_GOTO_ON_ERROR(sync_peers(), err, TAG, "Failed to sync with the other peer"); + EventBits_t bits = xEventGroupWaitBits(s_state, PEER_FAIL | PEER_CONNECTED, pdFALSE, pdFALSE, pdMS_TO_TICKS(90000)); + if (bits & PEER_CONNECTED) { + ESP_LOGI(TAG, "Peer is connected!"); + ESP_GOTO_ON_ERROR(create_local_client(), err, TAG, "Failed to create forwarding mqtt client"); + ESP_LOGI(TAG, "Everything is ready, exiting main task"); + return; + } +err: + ESP_LOGE(TAG, "Non recoverable error, going to sleep for some time (random, max 20s)"); + esp_deep_sleep(1000000LL * (esp_random() % 20)); +} + +static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) +{ + esp_mqtt_event_handle_t event = event_data; + esp_mqtt_client_handle_t client = event->client; + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + if (esp_mqtt_client_subscribe(client, SUBSCRIBE_SYNC_TOPIC, 1) < 0) { + ESP_LOGE(TAG, "Failed to subscribe to the sync topic"); + } + xEventGroupSetBits(s_state, PEER_SYNC0); + break; + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); + printf("DATA=%.*s\r\n", event->data_len, event->data); + if (s_state == NULL || memcmp(event->topic, SUBSCRIBE_SYNC_TOPIC, event->topic_len) != 0) { + break; + } + EventBits_t bits = xEventGroupGetBits(s_state); + if (event->data_len > 1 && s_agent) { + cJSON *root = cJSON_Parse(event->data); + if (root == NULL) { + break; + } + cJSON *desc = cJSON_GetObjectItem(root, "desc"); + if (desc == NULL) { + cJSON_Delete(root); + break; + } + printf("desc->valuestring:%s\n", desc->valuestring); + juice_set_remote_description(s_agent, desc->valuestring); + char cand_name[] = "cand0"; + while (true) { + cJSON *cand = cJSON_GetObjectItem(root, cand_name); + if (cand == NULL) { + break; + } + printf("%s: cand->valuestring:%s\n", cand_name, cand->valuestring); + juice_add_remote_candidate(s_agent, cand->valuestring); + cand_name[4]++; + } + cJSON_Delete(root); + xEventGroupSetBits(s_state, PEER_DESC_PUBLISHED); // this will complete the sync process + // and destroy the mqtt client + } +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + if (event->data_len == 1 && event->data[0] == '1' && (bits & PEER_SYNC2) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "2", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } +#else + if (event->data_len == 1 && event->data[0] == '0' && (bits & PEER_SYNC1) == 0) { + if (esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "1", 1, 1, 0) >= 0) { + xEventGroupSetBits(s_state, PEER_SYNC1); + } else { + xEventGroupSetBits(s_state, PEER_FAIL); + } + } else if (event->data_len == 1 && event->data[0] == '2' && (bits & PEER_SYNC2) == 0) { + xEventGroupSetBits(s_state, PEER_SYNC2); + } +#endif + break; + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + xEventGroupSetBits(s_state, PEER_FAIL); + break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } +} + +static esp_err_t sync_peers(void) +{ + esp_err_t ret = ESP_OK; + esp_mqtt_client_config_t mqtt_cfg = { + .broker.address.uri = CONFIG_EXAMPLE_MQTT_BROKER_URI, + .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, + }; + esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); + ESP_GOTO_ON_FALSE(client, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL), + err, TAG, "Failed to register mqtt event handler"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_start(client), err, TAG, "Failed to start mqtt client"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_SYNC0, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + ESP_LOGI(TAG, "Waiting for the other peer..."); + const int max_sync_retry = 60; + int retry = 0; + while (true) { + EventBits_t bits = xEventGroupWaitBits(s_state, SYNC_BITS, pdTRUE, pdFALSE, pdMS_TO_TICKS(1000)); + if (bits & PEER_SYNC2) { + break; + } + if (bits & PEER_SYNC1) { + continue; + } + ESP_GOTO_ON_FALSE((bits & PEER_FAIL) == 0, ESP_FAIL, err, TAG, "Failed to sync with the other peer"); + ESP_GOTO_ON_FALSE(retry++ < max_sync_retry, ESP_FAIL, err, TAG, "Failed to sync after %d seconds", retry); +#ifdef CONFIG_EXAMPLE_SERVERLESS_ROLE_PEER1 + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, "0", 1, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish mqtt message"); +#endif + } + ESP_LOGI(TAG, "Sync done"); + ESP_RETURN_ON_FALSE(esp_mqtt_client_publish(client, PUBLISH_SYNC_TOPIC, s_peer_desc, 0, 1, 0) >= 0, + ESP_FAIL, TAG, "Failed to publish peer's description"); + ESP_LOGI(TAG, "Waiting for the other peer description and candidates..."); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_DESC_PUBLISHED, pdTRUE, pdTRUE, pdMS_TO_TICKS(10000)), + ESP_FAIL, err, TAG, "Timeout in waiting for the other peer candidates"); +err: + free(s_peer_desc); + esp_mqtt_client_destroy(client); + return ret; +} + +static void juice_state(juice_agent_t *agent, juice_state_t state, void *user_ptr) +{ + ESP_LOGI(TAG, "JUICE state change: %s", juice_state_to_string(state)); + if (state == JUICE_STATE_CONNECTED) { + xEventGroupSetBits(s_state, PEER_CONNECTED); + } else if (state == JUICE_STATE_FAILED || state == JUICE_STATE_DISCONNECTED) { + esp_restart(); + } +} + +static void juice_candidate(juice_agent_t *agent, const char *sdp, void *user_ptr) +{ + static uint8_t cand_nr = 0; + if (s_peer_desc_json && cand_nr < 10) { // supporting only 10 candidates + char cand_name[] = "cand0"; + cand_name[4] += cand_nr++; + cJSON_AddStringToObject(s_peer_desc_json, cand_name, sdp); + } +} + +static void juice_gathering_done(juice_agent_t *agent, void *user_ptr) +{ + ESP_LOGI(TAG, "Gathering done"); + if (s_state) { + xEventGroupSetBits(s_state, PEER_GATHER_DONE); + } +} + +#define ALIGN(size) (((size) + 3U) & ~(3U)) + +static void juice_recv(juice_agent_t *agent, const char *data, size_t size, void *user_ptr) +{ + if (s_local_mqtt) { + message_wrap_t *message = (message_wrap_t *)data; + int topic_len = message->topic_len; + int payload_len = message->data_len; + int topic_len_aligned = ALIGN(topic_len); + char *topic = message->data; + char *payload = message->data + topic_len_aligned; + if (topic_len + topic_len_aligned + 4 > size) { + ESP_LOGE(TAG, "Received invalid message"); + return; + } + ESP_LOGI(TAG, "forwarding remote message: topic:%s", topic); + ESP_LOGI(TAG, "forwarding remote message: payload:%.*s", payload_len, payload); + esp_mqtt_client_publish(s_local_mqtt, topic, payload, payload_len, 0, 0); + } +} + +static esp_err_t create_candidates(void) +{ + ESP_RETURN_ON_FALSE(s_state = xEventGroupCreate(), ESP_ERR_NO_MEM, TAG, "Failed to create state event group"); + s_peer_desc_json = cJSON_CreateObject(); + esp_err_t ret = ESP_OK; + juice_set_log_level(JUICE_LOG_LEVEL_INFO); + juice_config_t config = { .stun_server_host = CONFIG_EXAMPLE_STUN_SERVER, + .bind_address = wifi_get_ipv4(WIFI_IF_STA), + .stun_server_port = 19302, + .cb_state_changed = juice_state, + .cb_candidate = juice_candidate, + .cb_gathering_done = juice_gathering_done, + .cb_recv = juice_recv, + }; + + s_agent = juice_create(&config); + ESP_RETURN_ON_FALSE(s_agent, ESP_FAIL, TAG, "Failed to create juice agent"); + ESP_GOTO_ON_FALSE(juice_get_local_description(s_agent, s_buffer, MAX_BUFFER_SIZE) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to get local description"); + ESP_LOGI(TAG, "desc: %s", s_buffer); + cJSON_AddStringToObject(s_peer_desc_json, "desc", s_buffer); + + ESP_GOTO_ON_FALSE(juice_gather_candidates(s_agent) == JUICE_ERR_SUCCESS, + ESP_FAIL, err, TAG, "Failed to start gathering candidates"); + ESP_GOTO_ON_FALSE(xEventGroupWaitBits(s_state, PEER_GATHER_DONE, pdTRUE, pdTRUE, pdMS_TO_TICKS(30000)), + ESP_FAIL, err, TAG, "Failed to connect to the sync broker"); + s_peer_desc = cJSON_Print(s_peer_desc_json); + ESP_LOGI(TAG, "desc: %s", s_peer_desc); + cJSON_Delete(s_peer_desc_json); + return ESP_OK; + +err: + juice_destroy(s_agent); + s_agent = NULL; + cJSON_Delete(s_peer_desc_json); + s_peer_desc_json = NULL; + return ret; +} + +static void local_handler(void *args, esp_event_base_t base, int32_t id, void *data) +{ + switch (id) { + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "local client connected"); + break; + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "local client disconnected"); + break; + + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "local client error"); + break; + default: + ESP_LOGI(TAG, "local client event id:%d", (int)id); + break; + } +} + +static esp_err_t create_local_client(void) +{ + esp_err_t ret = ESP_OK; + esp_mqtt_client_config_t mqtt_cfg = { + .broker.address.transport = MQTT_TRANSPORT_OVER_TCP, + .broker.address.hostname = wifi_get_ipv4(WIFI_IF_AP), + .broker.address.port = CONFIG_EXAMPLE_MQTT_BROKER_PORT, + .task.stack_size = CONFIG_EXAMPLE_MQTT_CLIENT_STACK_SIZE, + .credentials.client_id = "local_mqtt" + }; + s_local_mqtt = esp_mqtt_client_init(&mqtt_cfg); + ESP_GOTO_ON_FALSE(s_local_mqtt, ESP_ERR_NO_MEM, err, TAG, "Failed to create mqtt client"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_register_event(s_local_mqtt, ESP_EVENT_ANY_ID, local_handler, NULL), + err, TAG, "Failed to register mqtt event handler"); + ESP_GOTO_ON_ERROR(esp_mqtt_client_start(s_local_mqtt), err, TAG, "Failed to start mqtt client"); + + return ESP_OK; +err: + esp_mqtt_client_destroy(s_local_mqtt); + s_local_mqtt = NULL; + return ret; +} + +static void handle_message(char *client, char *topic, char *payload, int len, int qos, int retain) +{ + if (client && strcmp(client, "local_mqtt") == 0 ) { + // This is our little local client -- do not forward + return; + } + ESP_LOGI(TAG, "handle_message topic:%s", topic); + ESP_LOGI(TAG, "handle_message data:%.*s", len, payload); + ESP_LOGI(TAG, "handle_message qos=%d, retain=%d", qos, retain); + if (s_local_mqtt && s_agent) { + int topic_len = strlen(topic) + 1; // null term + int topic_len_aligned = ALIGN(topic_len); + int total_msg_len = 2 + 2 /* msg_wrap header */ + topic_len_aligned + len; + if (total_msg_len > MAX_BUFFER_SIZE) { + ESP_LOGE(TAG, "Fail to forward, message too long"); + return; + } + message_wrap_t *message = (message_wrap_t *)s_buffer; + message->topic_len = topic_len; + message->data_len = len; + + memcpy(s_buffer + 4, topic, topic_len); + memcpy(s_buffer + 4 + topic_len_aligned, payload, len); + juice_send(s_agent, s_buffer, total_msg_len); + } +} + +static void broker_task(void *ctx) +{ + struct mosq_broker_config config = { .host = wifi_get_ipv4(WIFI_IF_AP), .port = CONFIG_EXAMPLE_MQTT_BROKER_PORT, .handle_message_cb = handle_message }; + mosq_broker_run(&config); + vTaskDelete(NULL); +} + +static esp_err_t create_local_broker(void) +{ + return xTaskCreate(broker_task, "mqtt_broker_task", 1024 * 32, NULL, 5, NULL) == pdTRUE ? + ESP_OK : ESP_FAIL; +} diff --git a/components/mosquitto/examples/serverless_mqtt/main/wifi_connect.c b/components/mosquitto/examples/serverless_mqtt/main/wifi_connect.c new file mode 100644 index 0000000000..aeb3af7599 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/main/wifi_connect.c @@ -0,0 +1,122 @@ +/* + * SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include "nvs_flash.h" +#include "esp_event.h" +#include "esp_netif.h" +#include "esp_check.h" +#include "esp_wifi.h" +#include "esp_mac.h" + +#define WIFI_CONNECTED_BIT BIT0 +#define WIFI_FAIL_BIT BIT1 + +static const char *TAG = "serverless_wifi"; +static EventGroupHandle_t s_wifi_events; +static int s_retry_num = 0; +static const int s_max_retry = 30; + +static void wifi_event_handler(void *arg, esp_event_base_t event_base, + int32_t event_id, void *event_data) +{ + if (event_id == WIFI_EVENT_AP_STACONNECTED) { + wifi_event_ap_staconnected_t *event = (wifi_event_ap_staconnected_t *) event_data; + ESP_LOGI(TAG, "station "MACSTR" join, AID=%d", + MAC2STR(event->mac), event->aid); + } else if (event_id == WIFI_EVENT_AP_STADISCONNECTED) { + wifi_event_ap_stadisconnected_t *event = (wifi_event_ap_stadisconnected_t *) event_data; + ESP_LOGI(TAG, "station "MACSTR" leave, AID=%d", + MAC2STR(event->mac), event->aid); + } else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) { + esp_wifi_connect(); + } else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) { + if (s_retry_num < s_max_retry) { + esp_wifi_connect(); + s_retry_num++; + ESP_LOGI(TAG, "retry to connect to the AP"); + } else { + xEventGroupSetBits(s_wifi_events, WIFI_FAIL_BIT); + } + ESP_LOGI(TAG, "Connect to the AP fail"); + } else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) { + ip_event_got_ip_t *event = (ip_event_got_ip_t *) event_data; + ESP_LOGI(TAG, "Got ip:" IPSTR, IP2STR(&event->ip_info.ip)); + s_retry_num = 0; + xEventGroupSetBits(s_wifi_events, WIFI_CONNECTED_BIT); + } +} + +esp_err_t wifi_connect(void) +{ + esp_err_t ret = ESP_OK; + ESP_GOTO_ON_FALSE(s_wifi_events = xEventGroupCreate(), ESP_ERR_NO_MEM, err, TAG, "Failed to create wifi_events"); + ESP_GOTO_ON_ERROR(nvs_flash_init(), err, TAG, "Failed to init nvs flash"); + ESP_GOTO_ON_ERROR(esp_netif_init(), err, TAG, "Failed to init esp_netif"); + ESP_GOTO_ON_ERROR(esp_event_loop_create_default(), err, TAG, "Failed to create default event loop"); + ESP_GOTO_ON_ERROR(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID, wifi_event_handler, NULL), + err, TAG, "Failed to register WiFi event handler"); + ESP_GOTO_ON_ERROR(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP, wifi_event_handler, NULL), + err, TAG, "Failed to register IP event handler"); + + // Initialize WiFi + wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT(); + ESP_GOTO_ON_ERROR(esp_wifi_init(&cfg), err, TAG, "Failed to initialize WiFi"); + ESP_GOTO_ON_ERROR(esp_wifi_set_mode(WIFI_MODE_APSTA), err, TAG, "Failed to set STA+AP mode"); + + // Initialize AP + esp_netif_t *ap = esp_netif_create_default_wifi_ap(); + ESP_GOTO_ON_FALSE(ap, ESP_FAIL, err, TAG, "Failed to create AP network interface"); + wifi_config_t wifi_ap_config = { + .ap = { + .ssid = CONFIG_EXAMPLE_AP_SSID, + .password = CONFIG_EXAMPLE_AP_PASSWORD, + .authmode = WIFI_AUTH_WPA2_PSK, + .max_connection = 4, + }, + }; + ESP_GOTO_ON_ERROR(esp_wifi_set_config(WIFI_IF_AP, &wifi_ap_config), err, TAG, "Failed to set AP config"); + + + // Initialize STA + esp_netif_t *sta = esp_netif_create_default_wifi_sta(); + ESP_GOTO_ON_FALSE(sta, ESP_FAIL, err, TAG, "Failed to create WiFi station network interface"); + wifi_config_t wifi_sta_config = { + .sta = { + .ssid = CONFIG_EXAMPLE_STA_SSID, + .password = CONFIG_EXAMPLE_STA_PASSWORD, + }, + }; + ESP_GOTO_ON_ERROR(esp_wifi_set_config(WIFI_IF_STA, &wifi_sta_config), err, TAG, "Failed to set STA config"); + + // Start WiFi + ESP_GOTO_ON_ERROR(esp_wifi_start(), err, TAG, "Failed to start WiFi"); + + // Wait for connection + EventBits_t bits = xEventGroupWaitBits(s_wifi_events, WIFI_CONNECTED_BIT | WIFI_FAIL_BIT, + pdFALSE, pdFALSE, pdMS_TO_TICKS(30000)); + ESP_GOTO_ON_FALSE((bits & WIFI_CONNECTED_BIT) == WIFI_CONNECTED_BIT, ESP_FAIL, err, + TAG, "Failed to obtain IP address from WiFi station"); + return ESP_OK; +err: + esp_wifi_stop(); + esp_wifi_deinit(); + nvs_flash_deinit(); + esp_netif_deinit(); + esp_event_loop_delete_default(); + return ret; + +} + +_Thread_local char s_ipv4_addr[4 * 4]; // 4 octets + '.'/term + +char *wifi_get_ipv4(wifi_interface_t interface) +{ + esp_netif_t *netif = esp_netif_get_handle_from_ifkey(interface == WIFI_IF_AP ? "WIFI_AP_DEF" : "WIFI_STA_DEF"); + ESP_RETURN_ON_FALSE(netif, NULL, TAG, "Failed to find default Wi-Fi netif"); + esp_netif_ip_info_t ip_info; + ESP_RETURN_ON_FALSE(esp_netif_get_ip_info(netif, &ip_info) == ESP_OK, NULL, TAG, "Failed to get IP from netif"); + ESP_RETURN_ON_FALSE(esp_ip4addr_ntoa(&ip_info.ip, s_ipv4_addr, sizeof(s_ipv4_addr)) != NULL, NULL, TAG, "Failed to convert IP"); + return s_ipv4_addr; +} diff --git a/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults new file mode 100644 index 0000000000..037b680153 --- /dev/null +++ b/components/mosquitto/examples/serverless_mqtt/sdkconfig.defaults @@ -0,0 +1,3 @@ +CONFIG_PARTITION_TABLE_SINGLE_APP_LARGE=y +CONFIG_ESP_MAIN_TASK_STACK_SIZE=16384 +CONFIG_PTHREAD_TASK_STACK_SIZE_DEFAULT=32768 diff --git a/components/mosquitto/examples/serverless_mqtt/serverless.png b/components/mosquitto/examples/serverless_mqtt/serverless.png new file mode 100644 index 0000000000..6822526427 Binary files /dev/null and b/components/mosquitto/examples/serverless_mqtt/serverless.png differ diff --git a/components/mosquitto/idf_component.yml b/components/mosquitto/idf_component.yml index 5ef670047f..d8aee0e68a 100644 --- a/components/mosquitto/idf_component.yml +++ b/components/mosquitto/idf_component.yml @@ -1,4 +1,4 @@ -version: "2.0.28~0" +version: "2.0.20~0" url: https://github.com/espressif/esp-protocols/tree/master/components/mosquitto description: The component provides a simple ESP32 port of mosquitto broker dependencies: diff --git a/components/mosquitto/mosquitto b/components/mosquitto/mosquitto index 3923526c6b..a196c2b244 160000 --- a/components/mosquitto/mosquitto +++ b/components/mosquitto/mosquitto @@ -1 +1 @@ -Subproject commit 3923526c6b4c048bbecad2506c4c9963bc46cd36 +Subproject commit a196c2b244f248072a6b3ac8fb3f00ce0ff63dea diff --git a/components/mosquitto/port/broker.c b/components/mosquitto/port/broker.c index 62cfb29bce..f78f651790 100644 --- a/components/mosquitto/port/broker.c +++ b/components/mosquitto/port/broker.c @@ -101,6 +101,8 @@ void mosq_broker_stop(void) run = 0; } +extern mosq_message_cb_t g_mosq_message_callback; + int mosq_broker_run(struct mosq_broker_config *broker_config) { @@ -125,6 +127,9 @@ int mosq_broker_run(struct mosq_broker_config *broker_config) if (broker_config->tls_cfg) { net__set_tls_config(broker_config->tls_cfg); } + if (broker_config->handle_message_cb) { + g_mosq_message_callback = broker_config->handle_message_cb; + } db.config = &config; diff --git a/components/mosquitto/port/callbacks.c b/components/mosquitto/port/callbacks.c index cf6e5f9a29..dc0f2ad1b5 100644 --- a/components/mosquitto/port/callbacks.c +++ b/components/mosquitto/port/callbacks.c @@ -13,7 +13,9 @@ #include "util_mosq.h" #include "utlist.h" #include "lib_load.h" +#include "mosq_broker.h" +mosq_message_cb_t g_mosq_message_callback = NULL; int mosquitto_callback_register( mosquitto_plugin_id_t *identifier, @@ -44,5 +46,8 @@ void plugin__handle_disconnect(struct mosquitto *context, int reason) int plugin__handle_message(struct mosquitto *context, struct mosquitto_msg_store *stored) { + if (g_mosq_message_callback) { + g_mosq_message_callback(context->id, stored->topic, stored->payload, stored->payloadlen, stored->qos, stored->retain); + } return MOSQ_ERR_SUCCESS; } diff --git a/components/mosquitto/port/include/mosq_broker.h b/components/mosquitto/port/include/mosq_broker.h index 362943557f..d9b858b8c4 100644 --- a/components/mosquitto/port/include/mosq_broker.h +++ b/components/mosquitto/port/include/mosq_broker.h @@ -9,6 +9,7 @@ struct mosquitto__config; +typedef void (*mosq_message_cb_t)(char *client, char *topic, char *data, int len, int qos, int retain); /** * @brief Mosquitto configuration structure * @@ -24,6 +25,10 @@ struct mosq_broker_config { * You can open the respective docs with this idf.py command: * `idf.py docs -sp api-reference/protocols/esp_tls.html` */ + void (*handle_message_cb)(char *client, char *topic, char *data, int len, int qos, int retain); /*!< + * On message callback. If configured, user function is called + * whenever mosquitto processes a message. + */ }; /**