diff --git a/controller/app/cmdline/src/cmd_line.cpp b/controller/app/cmdline/src/cmd_line.cpp index 80075f3c..ebb19beb 100644 --- a/controller/app/cmdline/src/cmd_line.cpp +++ b/controller/app/cmdline/src/cmd_line.cpp @@ -90,7 +90,7 @@ cmd_line::cmd_line() cmd_line::cmd_line(void (*notification_callback) (void *, int32_t, uint64_t, uint16_t, uint16_t, uint16_t, uint32_t, void *), void (*log_callback) (void *, int32_t, const char *, int32_t), - bool test_mode, char *interface, int32_t log_level) + bool test_mode, char *interface, int32_t log_level, int32_t pacing) : test_mode(test_mode) , output_redirected(false) { @@ -108,6 +108,7 @@ cmd_line::cmd_line(void (*notification_callback) (void *, int32_t, uint64_t, uin atomic_cout << "AVDECC Controller version: " << controller_obj->get_version() << std::endl; print_interfaces_and_select(interface); + sys->discovery_pacing(pacing); sys->process_start(); } diff --git a/controller/app/cmdline/src/cmd_line.h b/controller/app/cmdline/src/cmd_line.h index 8fc258a3..4a2f9578 100644 --- a/controller/app/cmdline/src/cmd_line.h +++ b/controller/app/cmdline/src/cmd_line.h @@ -95,7 +95,7 @@ class cmd_line */ cmd_line(void (*notification_callback) (void *, int32_t, uint64_t, uint16_t, uint16_t, uint16_t, uint32_t, void *), void (*log_callback) (void *, int32_t, const char *, int32_t), - bool test_mode, char *interface, int32_t log_level); + bool test_mode, char *interface, int32_t log_level, int32_t pacing); ~cmd_line(); diff --git a/controller/app/cmdline/src/cmd_line_main.cpp b/controller/app/cmdline/src/cmd_line_main.cpp index 95d9eb45..add20304 100644 --- a/controller/app/cmdline/src/cmd_line_main.cpp +++ b/controller/app/cmdline/src/cmd_line_main.cpp @@ -214,6 +214,7 @@ static void usage(char *argv[]) std::cerr << " -t : Sets test mode which disables checks" << std::endl; std::cerr << " -i interface : Sets the name of the interface to use" << std::endl; std::cerr << " -l log_level : Sets the log level to use." << std::endl; + std::cerr << " -p pacing : Sets packets per second for startup endstation enumeration." << std::endl; std::cerr << log_level_help << std::endl; exit(1); } @@ -225,8 +226,9 @@ int main(int argc, char *argv[]) char *interface = NULL; int c = 0; int32_t log_level = avdecc_lib::LOGGING_LEVEL_ERROR; + int32_t pacing = -1; - while ((c = getopt(argc, argv, "ti:l:")) != -1) { + while ((c = getopt(argc, argv, "ti:l:p:")) != -1) { switch (c) { case 't': test_mode = true; @@ -237,6 +239,9 @@ int main(int argc, char *argv[]) case 'l': log_level = atoi(optarg); break; + case 'p': + pacing = atoi(optarg); + break; case ':': fprintf(stderr, "Option -%c requires an operand\n", optopt); error++; @@ -264,7 +269,7 @@ int main(int argc, char *argv[]) } cmd_line avdecc_cmd_line_ref(notification_callback, log_callback, - test_mode, interface, log_level); + test_mode, interface, log_level, pacing); std::vector input_argv; size_t pos = 0; diff --git a/controller/lib/include/system.h b/controller/lib/include/system.h index b585816b..46db5aef 100644 --- a/controller/lib/include/system.h +++ b/controller/lib/include/system.h @@ -61,6 +61,12 @@ namespace avdecc_lib */ AVDECC_CONTROLLER_LIB32_API virtual int STDCALL get_last_resp_status() = 0; + /** + * Limit background endstation enumeration packet rate. + * \param packets_per_second The number of packets per second. Minimum value is 10. + */ + AVDECC_CONTROLLER_LIB32_API virtual void STDCALL discovery_pacing(int packets_per_second) = 0; + /** * Start point of the system process, which calls the thread initialization function. */ diff --git a/controller/lib/src/discovery_pacer.h b/controller/lib/src/discovery_pacer.h new file mode 100644 index 00000000..d77c678d --- /dev/null +++ b/controller/lib/src/discovery_pacer.h @@ -0,0 +1,101 @@ +/* +* Licensed under the MIT License (MIT) +* +* Copyright (c) 2015 AudioScience Inc. +* +* Permission is hereby granted, free of charge, to any person obtaining a copy of +* this software and associated documentation files (the "Software"), to deal in +* the Software without restriction, including without limitation the rights to +* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +* the Software, and to permit persons to whom the Software is furnished to do so, +* subject to the following conditions: +* +* The above copyright notice and this permission notice shall be included in all +* copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +/** +* discovery pacer.h +* +* This class supports pacing the rate at which background enumeration sends packets. +* +*/ + +#pragma once + +#include "timer.h" + +namespace avdecc_lib +{ + class discovery_pacer + { + public: + static discovery_pacer& getInstance() { + // The only instance + // Guaranteed to be lazy initialized + // Guaranteed that it will be destroyed correctly + static discovery_pacer instance; + return instance; + } + + bool ok_to_send_packet(void) { + if (m_packets_per_second < 0) + return true; + if (m_packets_available > 0) + { + m_packets_available--; + return true; + } + else + { + return false; + } + }; + + void set_packets_per_second(int pps, int tick_rate_ms) + { + if (m_packets_per_second < 0) + { + m_timestamp = timer::clk_convert_to_ms(timer::clk_monotonic()); + } + m_packets_per_second = pps; + m_packets_per_tick = pps * 1000 / tick_rate_ms; + // never go as low as zero packets per tick + if (0 == m_packets_per_tick) + m_packets_per_tick = 1; + }; + + void tick(void) + { + uint32_t the_time = timer::clk_convert_to_ms(timer::clk_monotonic()); + if (the_time != m_timestamp) + { + m_packets_available += (m_packets_per_second * 1000) / (the_time - m_timestamp); + if (m_packets_available > m_packets_per_tick) + m_packets_available = m_packets_per_tick; + } + m_timestamp = the_time; + }; + + private: + // Private Constructor + discovery_pacer(){ m_packets_per_second = -1; }; // by default pacing is disabled + // Stop the compiler generating methods of copy the object + discovery_pacer(discovery_pacer const& copy); // Not Implemented + discovery_pacer& operator=(discovery_pacer const& copy); // Not Implemented + + int m_packets_per_second; // -1 implies no pacing + int m_packets_per_tick; // packets per timer tick + int m_packets_available; // current packets available to send + uint32_t m_timestamp; // last timestamp + + + }; +} \ No newline at end of file diff --git a/controller/lib/src/end_station_imp.cpp b/controller/lib/src/end_station_imp.cpp index 60acefc2..4f3d8329 100644 --- a/controller/lib/src/end_station_imp.cpp +++ b/controller/lib/src/end_station_imp.cpp @@ -40,6 +40,7 @@ #include "system_tx_queue.h" #include "jdksavdecc.h" #include "end_station_imp.h" +#include "discovery_pacer.h" namespace avdecc_lib { @@ -71,7 +72,8 @@ namespace avdecc_lib selected_entity_index = 0; selected_config_index = 0; - read_desc_init(JDKSAVDECC_DESCRIPTOR_ENTITY, 0); + queue_background_read_request(JDKSAVDECC_DESCRIPTOR_ENTITY, 0, 1); + background_read_submit_pending(); return 0; } @@ -407,9 +409,21 @@ namespace avdecc_lib // check inflight timeout if (b->m_timer.timeout()) { - log_imp_ref->post_log_msg(LOGGING_LEVEL_ERROR, "Background read timeout reading descriptor %s index %d\n", utility::aem_desc_value_to_name(b->m_type), b->m_index); ii = m_backbround_read_inflight.erase(ii); - delete b; + /* If read times out, resubmit it. Using 1 second timeouts set in background_read_submit_pending(), so no great load here. */ + b->m_resubmit_count++; + if (b->m_resubmit_count < 10) + { + log_imp_ref->post_log_msg(LOGGING_LEVEL_ERROR, "Background read timeout (resubmit %d) reading descriptor %s index %d from 0x%llx\n", + b->m_resubmit_count, utility::aem_desc_value_to_name(b->m_type), b->m_index, this->end_station_entity_id); + m_backbround_read_pending.push_back(b); + } + else + { + log_imp_ref->post_log_msg(LOGGING_LEVEL_ERROR, "Background read timeout (terminate) reading descriptor %s index %d from 0x%llx\n", + utility::aem_desc_value_to_name(b->m_type), b->m_index, this->end_station_entity_id); + delete b; + } } else { @@ -450,22 +464,16 @@ namespace avdecc_lib // empty submit the next set of read operations if (m_backbround_read_inflight.empty() && !m_backbround_read_pending.empty()) { - background_read_request *b_first = m_backbround_read_pending.front(); - m_backbround_read_pending.pop_front(); - log_imp_ref->post_log_msg(LOGGING_LEVEL_DEBUG, "Background read of %s index %d", utility::aem_desc_value_to_name(b_first->m_type), b_first->m_index); - read_desc_init(b_first->m_type, b_first->m_index); - b_first->m_timer.start(750); // 750 ms timeout (1722.1 timeout is 250ms) - m_backbround_read_inflight.push_back(b_first); - - if (!m_backbround_read_pending.empty()) + uint16_t first_type = m_backbround_read_pending.front()->m_type; + if (!m_backbround_read_pending.empty() && discovery_pacer::getInstance().ok_to_send_packet()) { background_read_request *b_next = m_backbround_read_pending.front(); - while (b_next->m_type == b_first->m_type) + while (b_next->m_type == first_type) { m_backbround_read_pending.pop_front(); log_imp_ref->post_log_msg(LOGGING_LEVEL_DEBUG, "Background read of %s index %d", utility::aem_desc_value_to_name(b_next->m_type), b_next->m_index); read_desc_init(b_next->m_type, b_next->m_index); - b_next->m_timer.start(750); // 750 ms timeout (1722.1 timeout is 250ms) + b_next->m_timer.start(1000); // 1000 ms timeout (1722.1 timeout is 250ms) m_backbround_read_inflight.push_back(b_next); if (m_backbround_read_pending.empty()) { diff --git a/controller/lib/src/end_station_imp.h b/controller/lib/src/end_station_imp.h index 4a81c738..16eae469 100644 --- a/controller/lib/src/end_station_imp.h +++ b/controller/lib/src/end_station_imp.h @@ -42,10 +42,11 @@ namespace avdecc_lib class background_read_request { public: - background_read_request(uint16_t t, uint16_t I) : - m_type(t), m_index(I) {}; - uint16_t m_type; - uint16_t m_index; + background_read_request(uint16_t t, uint16_t I) : + m_type(t), m_index(I) {m_resubmit_count = 0;}; + uint16_t m_type; + uint16_t m_index; + uint16_t m_resubmit_count; timer m_timer; }; diff --git a/controller/lib/src/linux/system_layer2_multithreaded_callback.cpp b/controller/lib/src/linux/system_layer2_multithreaded_callback.cpp index f5257994..737ac179 100644 --- a/controller/lib/src/linux/system_layer2_multithreaded_callback.cpp +++ b/controller/lib/src/linux/system_layer2_multithreaded_callback.cpp @@ -62,7 +62,7 @@ #include "system_message_queue.h" #include "system_tx_queue.h" #include "system_layer2_multithreaded_callback.h" - +#include "discovery_pacer.h" namespace avdecc_lib { @@ -185,6 +185,10 @@ namespace avdecc_lib return resp_status_for_cmd; } + void STDCALL system_layer2_multithreaded_callback::discovery_pacing(int packets_per_second) + { + discovery_pacer::getInstance().set_packets_per_second(packets_per_second, TIME_PERIOD_25_MILLISECONDS); + } int system_layer2_multithreaded_callback::timer_start_interval(int timerfd) { diff --git a/controller/lib/src/linux/system_layer2_multithreaded_callback.h b/controller/lib/src/linux/system_layer2_multithreaded_callback.h index 3508ce5f..dccbc771 100644 --- a/controller/lib/src/linux/system_layer2_multithreaded_callback.h +++ b/controller/lib/src/linux/system_layer2_multithreaded_callback.h @@ -73,6 +73,11 @@ namespace avdecc_lib */ int STDCALL get_last_resp_status(); + /** + * Limit background endstation enumeration packet rate. + */ + void STDCALL discovery_pacing(int packets_per_second); + /** * Start point of the system process, which calls the thread initialization function. */ diff --git a/controller/lib/src/msvc/net_interface_imp.cpp b/controller/lib/src/msvc/net_interface_imp.cpp index 9188ed61..0ee8c217 100644 --- a/controller/lib/src/msvc/net_interface_imp.cpp +++ b/controller/lib/src/msvc/net_interface_imp.cpp @@ -172,7 +172,7 @@ namespace avdecc_lib uint16_t ether_type[1]; ether_type[0] = JDKSAVDECC_AVTP_ETHERTYPE; - set_capture_ether_type(ether_type, 0); // Set the filter + set_capture_ether_type(ether_type, 1); // Set the filter free(AdapterInfo); return 0; diff --git a/controller/lib/src/msvc/system_layer2_multithreaded_callback.cpp b/controller/lib/src/msvc/system_layer2_multithreaded_callback.cpp index f7bf2a53..31ba1251 100644 --- a/controller/lib/src/msvc/system_layer2_multithreaded_callback.cpp +++ b/controller/lib/src/msvc/system_layer2_multithreaded_callback.cpp @@ -37,6 +37,7 @@ #include "system_message_queue.h" #include "system_tx_queue.h" #include "system_layer2_multithreaded_callback.h" +#include "discovery_pacer.h" namespace avdecc_lib { @@ -95,6 +96,7 @@ namespace avdecc_lib int system_layer2_multithreaded_callback::queue_tx_frame(void *notification_id, uint32_t notification_flag, uint8_t *frame, size_t frame_len) { struct poll_thread_data thread_data; + bool tx_queue_full; assert(frame_len < 2048); thread_data.frame = new uint8_t[2048]; @@ -106,7 +108,11 @@ namespace avdecc_lib memcpy(thread_data.frame, frame, frame_len); thread_data.notification_id = notification_id; thread_data.notification_flag = notification_flag; - poll_tx.tx_queue->queue_push(&thread_data); + tx_queue_full = poll_tx.tx_queue->queue_push(&thread_data); + if (!tx_queue_full) + { + log_imp_ref->post_log_msg(LOGGING_LEVEL_ERROR, "SYSTEM: Packet tx_queue full"); + } /** * Check for conditions that cause wait for completion. @@ -138,6 +144,11 @@ namespace avdecc_lib return resp_status_for_cmd; } + void STDCALL system_layer2_multithreaded_callback::discovery_pacing(int packets_per_second) + { + discovery_pacer::getInstance().set_packets_per_second(packets_per_second, NETIF_READ_TIMEOUT_MS); + } + DWORD WINAPI system_layer2_multithreaded_callback::proc_wpcap_thread(LPVOID lpParam) { return reinterpret_cast(lpParam)->proc_wpcap_thread_callback(); @@ -149,6 +160,7 @@ namespace avdecc_lib struct poll_thread_data thread_data; const uint8_t *frame; uint16_t length; + bool rx_queue_full; while (WaitForSingleObject(poll_rx.queue_thread.kill_sem, 0)) { @@ -158,7 +170,7 @@ namespace avdecc_lib { if (length > 2048) { - log_imp_ref->post_log_msg(LOGGING_LEVEL_ERROR, "wpcap returned packet larger than 1600 bytes"); + log_imp_ref->post_log_msg(LOGGING_LEVEL_ERROR, "wpcap returned packet larger than 2048 bytes"); continue; } thread_data.frame_len = length; @@ -168,7 +180,11 @@ namespace avdecc_lib exit(EXIT_FAILURE); } memcpy(thread_data.frame, frame, thread_data.frame_len); - poll_rx.rx_queue->queue_push(&thread_data); + rx_queue_full = poll_rx.rx_queue->queue_push(&thread_data); + if (!rx_queue_full) + { + log_imp_ref->post_log_msg(LOGGING_LEVEL_ERROR, "SYSTEM: Packet rx_queue full"); + } } else { @@ -217,7 +233,7 @@ namespace avdecc_lib int system_layer2_multithreaded_callback::init_wpcap_thread() { - poll_rx.rx_queue = new system_message_queue(256, sizeof(struct poll_thread_data)); + poll_rx.rx_queue = new system_message_queue(1024, sizeof(struct poll_thread_data)); poll_rx.queue_thread.kill_sem = CreateSemaphore(NULL, 0, 32767, NULL); poll_rx.timeout_event = CreateEvent(NULL, FALSE, FALSE, NULL); poll_events_array[WPCAP_TIMEOUT] = poll_rx.timeout_event; @@ -236,7 +252,7 @@ namespace avdecc_lib exit(EXIT_FAILURE); } - poll_tx.tx_queue = new system_message_queue(256, sizeof(struct poll_thread_data)); + poll_tx.tx_queue = new system_message_queue(1024, sizeof(struct poll_thread_data)); poll_tx.queue_thread.kill_sem = CreateSemaphore(NULL, 0, 32767, NULL); poll_tx.timeout_event = CreateEvent(NULL, FALSE, FALSE, NULL); poll_events_array[WPCAP_TX_PACKET] = poll_tx.tx_queue->queue_data_available_object(); @@ -336,6 +352,8 @@ namespace avdecc_lib { bool notification_id_incomplete = false; + discovery_pacer::getInstance().tick(); + if (wait_mgr->active_state()) { if (controller_obj_in_system->is_inflight_cmd_with_notification_id(wait_mgr->get_notify_id()) || diff --git a/controller/lib/src/msvc/system_layer2_multithreaded_callback.h b/controller/lib/src/msvc/system_layer2_multithreaded_callback.h index 5b580a9b..3e7ec521 100644 --- a/controller/lib/src/msvc/system_layer2_multithreaded_callback.h +++ b/controller/lib/src/msvc/system_layer2_multithreaded_callback.h @@ -109,6 +109,11 @@ namespace avdecc_lib */ int STDCALL get_last_resp_status(); + /** + * Limit background endstation enumeration packet rate. + */ + void STDCALL discovery_pacing(int packets_per_second); + /** * Start point of the system process, which calls the thread initialization function. */ diff --git a/controller/lib/src/msvc/system_message_queue.cpp b/controller/lib/src/msvc/system_message_queue.cpp index ab5db275..d10ae136 100644 --- a/controller/lib/src/msvc/system_message_queue.cpp +++ b/controller/lib/src/msvc/system_message_queue.cpp @@ -55,14 +55,66 @@ namespace avdecc_lib DeleteCriticalSection(&critical_section_obj); } - void system_message_queue::queue_push(void *thread_data) + bool system_message_queue::queue_push(void *thread_data) { - WaitForSingleObject(space_avail, INFINITE); + DWORD status; + unsigned int amount_to_grow = 256; + status = WaitForSingleObject(space_avail, 0); + + // if timeout waiting for room in the queue, grow the queue + if (status == WAIT_TIMEOUT) + { + size_t new_count = entry_count + amount_to_grow; + uint8_t * new_buf; + uint8_t * old_buf; + int new_out_pos = 0; + int old_out_pos = 0; + + new_buf = (uint8_t *)calloc(new_count, entry_size); + if (new_buf) + { + // Because buf is used as circular buffer, have + // to copy values carefully to the new buffer. + // We know that the old buffer is full, so have + // two copy operations to perform. + EnterCriticalSection(&critical_section_obj); + old_out_pos = out_pos; + // copy old buffer from out_pos to the end of the old buffer + memcpy(&new_buf[0 * entry_size], + &buf[old_out_pos * entry_size], + entry_size * (entry_count - old_out_pos)); + new_out_pos = (entry_count - old_out_pos); + old_out_pos = (old_out_pos + (entry_count - old_out_pos)) % entry_count; + // copy from start of old buffer to position out_pos + memcpy(&new_buf[new_out_pos * entry_size], + &buf[0 * entry_size], + entry_size * out_pos); + new_out_pos = new_out_pos + out_pos; + out_pos = 0; + in_pos = new_out_pos; + old_buf = buf; + buf = new_buf; + entry_count = new_count; + LeaveCriticalSection(&critical_section_obj); + free(old_buf); + // increase space available + ReleaseSemaphore(space_avail, amount_to_grow, NULL); + // use up a single entry + WaitForSingleObject(space_avail, INFINITE); + } + else + { + // alloc failed, so wait for space in que + WaitForSingleObject(space_avail, INFINITE); + } + } + EnterCriticalSection(&critical_section_obj); memcpy(&buf[in_pos * entry_size], thread_data, entry_size); in_pos = (in_pos + 1) % entry_count; LeaveCriticalSection(&critical_section_obj); ReleaseSemaphore(data_avail, 1, NULL); + return status != WAIT_TIMEOUT; } void system_message_queue::queue_pop_nowait(void *thread_data) diff --git a/controller/lib/src/msvc/system_message_queue.h b/controller/lib/src/msvc/system_message_queue.h index acefb751..d26d067b 100644 --- a/controller/lib/src/msvc/system_message_queue.h +++ b/controller/lib/src/msvc/system_message_queue.h @@ -53,7 +53,7 @@ namespace avdecc_lib ~system_message_queue(); - void queue_push(void *thread_data); + bool queue_push(void *thread_data); void queue_pop_nowait(void *thread_data); diff --git a/controller/lib/src/osx/system_layer2_multithreaded_callback.cpp b/controller/lib/src/osx/system_layer2_multithreaded_callback.cpp index f08c053f..960a7352 100644 --- a/controller/lib/src/osx/system_layer2_multithreaded_callback.cpp +++ b/controller/lib/src/osx/system_layer2_multithreaded_callback.cpp @@ -60,7 +60,7 @@ #include "system_message_queue.h" #include "system_tx_queue.h" #include "system_layer2_multithreaded_callback.h" - +#include "discovery_pacer.h" namespace avdecc_lib { @@ -188,6 +188,11 @@ namespace avdecc_lib return resp_status_for_cmd; } + void STDCALL system_layer2_multithreaded_callback::discovery_pacing(int packets_per_second) + { + discovery_pacer::getInstance().set_packets_per_second(packets_per_second, TIME_PERIOD_25_MILLISECONDS); + } + int system_layer2_multithreaded_callback::fn_timer_cb(struct kevent *priv) { return instance->fn_timer(priv); diff --git a/controller/lib/src/osx/system_layer2_multithreaded_callback.h b/controller/lib/src/osx/system_layer2_multithreaded_callback.h index c0cca92f..fe53bd4b 100644 --- a/controller/lib/src/osx/system_layer2_multithreaded_callback.h +++ b/controller/lib/src/osx/system_layer2_multithreaded_callback.h @@ -71,6 +71,11 @@ namespace avdecc_lib */ int STDCALL get_last_resp_status(); + /** + * Limit background endstation enumeration packet rate. + */ + void STDCALL discovery_pacing(int packets_per_second); + /** * Start point of the system process, which calls the thread initialization function. */ diff --git a/controller/lib/src/timer.h b/controller/lib/src/timer.h index f249c11b..170029aa 100644 --- a/controller/lib/src/timer.h +++ b/controller/lib/src/timer.h @@ -48,9 +48,9 @@ namespace avdecc_lib ~timer(); - avdecc_lib_os::aTimestamp clk_monotonic(void); + static avdecc_lib_os::aTimestamp clk_monotonic(void); - uint32_t clk_convert_to_ms(avdecc_lib_os::aTimestamp timestamp); + static uint32_t clk_convert_to_ms(avdecc_lib_os::aTimestamp timestamp); void start(int duration_ms);