diff --git a/examples/Client/Client.ino b/examples/Client/Client.ino index f2f7c28..22cb133 100644 --- a/examples/Client/Client.ino +++ b/examples/Client/Client.ino @@ -12,11 +12,11 @@ // #define HOST "homeassistant.local" // #define PORT 8123 -// #define HOST "www.google.com" -// #define PORT 80 +#define HOST "www.google.com" +#define PORT 80 -#define HOST "192.168.125.118" -#define PORT 4000 +// #define HOST "192.168.125.118" +// #define PORT 4000 // 16 slots on esp32 (CONFIG_LWIP_MAX_ACTIVE_TCP) #define MAX_CLIENTS CONFIG_LWIP_MAX_ACTIVE_TCP diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 84a87ac..1f321f8 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -105,6 +105,7 @@ typedef struct { } lwip_event_packet_t; static QueueHandle_t _async_queue; +static QueueHandle_t _poll_queue; static TaskHandle_t _async_service_task_handle = NULL; @@ -143,16 +144,34 @@ static inline bool _get_async_event(lwip_event_packet_t ** e){ return _async_queue && xQueueReceive(_async_queue, e, portMAX_DELAY) == pdPASS; } -static bool _remove_events_with_arg(void * arg){ +static inline bool _init_poll_queue(){ + if(!_poll_queue){ + _poll_queue = xQueueCreate(CONFIG_ASYNC_TCP_POLL_QUEUE_SIZE, sizeof(lwip_event_packet_t *)); + if(!_poll_queue){ + return false; + } + } + return true; +} + +static inline bool _send_async_poll(lwip_event_packet_t ** e){ + return _poll_queue && xQueueSend(_poll_queue, e, portMAX_DELAY) == pdPASS; +} + +static inline bool _get_async_poll(lwip_event_packet_t ** e){ + return _poll_queue && xQueueReceive(_poll_queue, e, portMAX_DELAY) == pdPASS; +} + +static bool _remove_events_with_arg(QueueHandle_t queue, void * arg){ lwip_event_packet_t * first_packet = NULL; lwip_event_packet_t * packet = NULL; - if(!_async_queue){ + if(!queue){ return false; } //figure out which is the first packet so we can keep the order while(!first_packet){ - if(xQueueReceive(_async_queue, &first_packet, 0) != pdPASS){ + if(xQueueReceive(queue, &first_packet, 0) != pdPASS){ return false; } //discard packet if matching @@ -160,19 +179,19 @@ static bool _remove_events_with_arg(void * arg){ free(first_packet); first_packet = NULL; //return first packet to the back of the queue - } else if(xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS){ + } else if(xQueueSend(queue, &first_packet, portMAX_DELAY) != pdPASS){ return false; } } - while(xQueuePeek(_async_queue, &packet, 0) == pdPASS && packet != first_packet){ - if(xQueueReceive(_async_queue, &packet, 0) != pdPASS){ + while(xQueuePeek(queue, &packet, 0) == pdPASS && packet != first_packet){ + if(xQueueReceive(queue, &packet, 0) != pdPASS){ return false; } if((int)packet->arg == (int)arg){ free(packet); packet = NULL; - } else if(xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS){ + } else if(xQueueSend(queue, &packet, portMAX_DELAY) != pdPASS){ return false; } } @@ -184,7 +203,8 @@ static void _handle_async_event(lwip_event_packet_t * e){ // do nothing when arg is NULL //ets_printf("event arg == NULL: 0x%08x\n", e->recv.pcb); } else if(e->event == LWIP_TCP_CLEAR){ - _remove_events_with_arg(e->arg); + _remove_events_with_arg(_async_queue, e->arg); + _remove_events_with_arg(_poll_queue, e->arg); } else if(e->event == LWIP_TCP_RECV){ //ets_printf("-R: 0x%08x\n", e->recv.pcb); AsyncClient::_s_recv(e->arg, e->recv.pcb, e->recv.pb, e->recv.err); @@ -216,7 +236,7 @@ static void _handle_async_event(lwip_event_packet_t * e){ static void _async_service_task(void *pvParameters){ lwip_event_packet_t * packet = NULL; for (;;) { - if(_get_async_event(&packet)){ + if(_get_async_event(&packet) || _get_async_poll(&packet)){ #if CONFIG_ASYNC_TCP_USE_WDT if(esp_task_wdt_add(NULL) != ESP_OK){ log_e("Failed to add async task to WDT"); @@ -262,7 +282,7 @@ static bool customTaskCreateUniversal( } static bool _start_async_task(){ - if(!_init_async_event_queue()){ + if(!_init_async_event_queue() || !_init_poll_queue()){ return false; } if(!_async_service_task_handle){ @@ -302,12 +322,16 @@ static int8_t _tcp_connected(void * arg, tcp_pcb * pcb, int8_t err) { } static int8_t _tcp_poll(void * arg, struct tcp_pcb * pcb) { + // inhibit polling when event queue is getting filled up, let it handle _onack's + if (uxQueueMessagesWaiting(_poll_queue) >= CONFIG_ASYNC_TCP_POLL_QUEUE_SIZE) + return ERR_OK; + //ets_printf("+P: 0x%08x\n", pcb); lwip_event_packet_t * e = (lwip_event_packet_t *)malloc(sizeof(lwip_event_packet_t)); e->event = LWIP_TCP_POLL; e->arg = arg; e->poll.pcb = pcb; - if (!_send_async_event(&e)) { + if (!_send_async_poll(&e)) { free((void*)(e)); } return ERR_OK; diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index 25fb462..7cbdefe 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -69,6 +69,10 @@ extern "C" { #define CONFIG_ASYNC_TCP_QUEUE_SIZE 64 #endif +#ifndef CONFIG_ASYNC_TCP_POLL_QUEUE_SIZE +#define CONFIG_ASYNC_TCP_POLL_QUEUE_SIZE CONFIG_ASYNC_TCP_QUEUE_SIZE +#endif + #ifndef CONFIG_ASYNC_TCP_MAX_ACK_TIME #define CONFIG_ASYNC_TCP_MAX_ACK_TIME 5000 #endif