From ae9048562589f80d5d9ceb2f14823a0a66898bfd Mon Sep 17 00:00:00 2001 From: Christian Antila Date: Sun, 6 Oct 2024 11:15:16 +0200 Subject: [PATCH 1/2] Added dynamic rate limiting to SCPITransport --- scopehal/SCPITransport.cpp | 59 ++++++++++++++++++++++++++------------ scopehal/SCPITransport.h | 27 ++++++++++++----- 2 files changed, 61 insertions(+), 25 deletions(-) diff --git a/scopehal/SCPITransport.cpp b/scopehal/SCPITransport.cpp index 7b93cc91..f9ab6d7c 100644 --- a/scopehal/SCPITransport.cpp +++ b/scopehal/SCPITransport.cpp @@ -79,8 +79,9 @@ SCPITransport* SCPITransport::CreateTransport(const string& transport, const str @brief Pushes a command into the transmit FIFO then returns immediately. This command will actually be sent the next time FlushCommandQueue() is called. + @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ -void SCPITransport::SendCommandQueued(const string& cmd) +void SCPITransport::SendCommandQueued(const string& cmd, std::chrono::milliseconds settle_time) { lock_guard lock(m_queueMutex); @@ -116,7 +117,8 @@ void SCPITransport::SendCommandQueued(const string& cmd) auto it = m_txQueue.begin(); while(it != m_txQueue.end()) { - tmp = *it; + auto pair = *it; + tmp = pair.first; //Split off subject, if we have one //(ignore leading colon) @@ -142,7 +144,7 @@ void SCPITransport::SendCommandQueued(const string& cmd) { LogTrace("Deduplicating redundant %s command %s and pushing new command %s\n", ncmd.c_str(), - (*it).c_str(), + pair.first.c_str(), cmd.c_str()); auto oldit = it; @@ -159,18 +161,35 @@ void SCPITransport::SendCommandQueued(const string& cmd) } - m_txQueue.push_back(cmd); + // Create a pair with cmd and settle_time + std::pair pair; + pair = make_pair(cmd, settle_time); + + // Push to queue + m_txQueue.push_back(pair); LogTrace("%zu commands now queued\n", m_txQueue.size()); } /** @brief Block until it's time to send the next command when rate limiting. + + @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ -void SCPITransport::RateLimitingWait() +void SCPITransport::RateLimitingWait(std::chrono::milliseconds settle_time) { this_thread::sleep_until(m_nextCommandReady); - m_nextCommandReady = chrono::system_clock::now() + m_rateLimitingInterval; + + if(settle_time == std::chrono::milliseconds(0)) + { + // Use the configured rate limit + m_nextCommandReady = chrono::system_clock::now() + m_rateLimitingInterval; + } + else + { + // Use the specified settle_time + m_nextCommandReady = chrono::system_clock::now() + settle_time; + } } /** @@ -179,7 +198,7 @@ void SCPITransport::RateLimitingWait() bool SCPITransport::FlushCommandQueue() { //Grab the queue, then immediately release the mutex so we can do more queued sends - list tmp; + std::list> tmp; { lock_guard lock(m_queueMutex); tmp = std::move(m_txQueue); @@ -190,11 +209,11 @@ bool SCPITransport::FlushCommandQueue() LogTrace("%zu commands being flushed\n", tmp.size()); lock_guard lock(m_netMutex); - for(auto str : tmp) + for(auto pair : tmp) { if(m_rateLimitingEnabled) - RateLimitingWait(); - SendCommand(str); + RateLimitingWait(pair.second); + SendCommand(pair.first); } return true; } @@ -203,24 +222,26 @@ bool SCPITransport::FlushCommandQueue() @brief Sends a command (flushing any pending/queued commands first), then returns the response. This is an atomic operation requiring no mutexing at the caller side. + @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ -string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon) +string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time) { FlushCommandQueue(); - return SendCommandImmediateWithReply(cmd, endOnSemicolon); + return SendCommandImmediateWithReply(cmd, endOnSemicolon, settle_time); } /** @brief Sends a command (jumping ahead of the queue), then returns the response. This is an atomic operation requiring no mutexing at the caller side. + @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ -string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemicolon) +string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time) { lock_guard lock(m_netMutex); if(m_rateLimitingEnabled) - RateLimitingWait(); + RateLimitingWait(settle_time); SendCommand(cmd); @@ -229,26 +250,28 @@ string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemico /** @brief Sends a command (jumping ahead of the queue) which does not require a response. + @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ -void SCPITransport::SendCommandImmediate(string cmd) +void SCPITransport::SendCommandImmediate(string cmd, std::chrono::milliseconds settle_time) { lock_guard lock(m_netMutex); if(m_rateLimitingEnabled) - RateLimitingWait(); + RateLimitingWait(settle_time); SendCommand(cmd); } /** @brief Sends a command (jumping ahead of the queue) which reads a binary block response + @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ -void* SCPITransport::SendCommandImmediateWithRawBlockReply(string cmd, size_t& len) +void* SCPITransport::SendCommandImmediateWithRawBlockReply(string cmd, size_t& len, std::chrono::milliseconds settle_time) { lock_guard lock(m_netMutex); if(m_rateLimitingEnabled) - RateLimitingWait(); + RateLimitingWait(settle_time); SendCommand(cmd); //Read the length diff --git a/scopehal/SCPITransport.h b/scopehal/SCPITransport.h index ea5639ed..988613e7 100644 --- a/scopehal/SCPITransport.h +++ b/scopehal/SCPITransport.h @@ -60,11 +60,11 @@ class SCPITransport TODO: look into a background thread or something that's automatically launched by the transport to do this after some kind of fixed timeout? */ - void SendCommandQueued(const std::string& cmd); - std::string SendCommandQueuedWithReply(std::string cmd, bool endOnSemicolon = true); - void SendCommandImmediate(std::string cmd); - std::string SendCommandImmediateWithReply(std::string cmd, bool endOnSemicolon = true); - void* SendCommandImmediateWithRawBlockReply(std::string cmd, size_t& len); + void SendCommandQueued(const std::string& cmd, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0)); + std::string SendCommandQueuedWithReply(std::string cmd, bool endOnSemicolon = true, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0)); + void SendCommandImmediate(std::string cmd, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0)); + std::string SendCommandImmediateWithReply(std::string cmd, bool endOnSemicolon = true, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0)); + void* SendCommandImmediateWithRawBlockReply(std::string cmd, size_t& len, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0)); bool FlushCommandQueue(); //Manual mutex locking for ReadRawData() etc @@ -89,6 +89,19 @@ class SCPITransport should be used if at all possible. Once rate limiting is enabled on a transport, it cannot be disabled. + + Invidual commands can be rate limited with the parameter `settle_time` in each Send*() call. If `settle_time` + is set to 0 (default value) it will default to the time specified in the rate limiting (if enabled). If + `settle_time` is set to anything else than 0, then this time will be used to block all subsequent message for + the specified amount of time. + + Note that `settle_time` will always override the rate limit, even when a lower value is used. + + When using `settle_time` on a write only call, it will block for the specified amount of time after the command + is sent. + + When using `settle_time` on a request, the message will be sent, a reply will be read back immidiately, and + then the blocking will take place as the last step. */ void EnableRateLimiting(std::chrono::milliseconds interval) { @@ -126,7 +139,7 @@ class SCPITransport static SCPITransport* CreateTransport(const std::string& transport, const std::string& args); protected: - void RateLimitingWait(); + void RateLimitingWait(std::chrono::milliseconds settle_time = std::chrono::milliseconds(0)); //Class enumeration typedef std::map< std::string, CreateProcType > CreateMapType; @@ -135,7 +148,7 @@ class SCPITransport //Queued commands waiting to be sent std::mutex m_queueMutex; std::recursive_mutex m_netMutex; - std::list m_txQueue; + std::list> m_txQueue; //Set of commands that are OK to deduplicate std::set m_dedupCommands; From 769829596f1d842c86112e09a9d494eca4b7c5b1 Mon Sep 17 00:00:00 2001 From: Christian Antila Date: Sun, 6 Oct 2024 10:59:51 +0200 Subject: [PATCH 2/2] Some documentation for SCPITransport --- scopehal/SCPITransport.cpp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/scopehal/SCPITransport.cpp b/scopehal/SCPITransport.cpp index f9ab6d7c..64178437 100644 --- a/scopehal/SCPITransport.cpp +++ b/scopehal/SCPITransport.cpp @@ -79,6 +79,8 @@ SCPITransport* SCPITransport::CreateTransport(const string& transport, const str @brief Pushes a command into the transmit FIFO then returns immediately. This command will actually be sent the next time FlushCommandQueue() is called. + + @param cmd Command to be sent @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ void SCPITransport::SendCommandQueued(const string& cmd, std::chrono::milliseconds settle_time) @@ -222,7 +224,11 @@ bool SCPITransport::FlushCommandQueue() @brief Sends a command (flushing any pending/queued commands first), then returns the response. This is an atomic operation requiring no mutexing at the caller side. + + @param cmd Command to be sent @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ + + @return A string with the reply */ string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time) { @@ -234,7 +240,11 @@ string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon @brief Sends a command (jumping ahead of the queue), then returns the response. This is an atomic operation requiring no mutexing at the caller side. + + @param cmd Command to be sent @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ + + @return A string with the reply */ string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time) { @@ -250,6 +260,8 @@ string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemico /** @brief Sends a command (jumping ahead of the queue) which does not require a response. + + @param cmd Command to be sent @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ */ void SCPITransport::SendCommandImmediate(string cmd, std::chrono::milliseconds settle_time) @@ -264,7 +276,12 @@ void SCPITransport::SendCommandImmediate(string cmd, std::chrono::milliseconds s /** @brief Sends a command (jumping ahead of the queue) which reads a binary block response + + @param cmd Command to be sent + @param len A reference to a size_t that will get the number of bytes received written to it. @param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´ + + @return A pointer to the reply buffer. This will need to be deleted manually. */ void* SCPITransport::SendCommandImmediateWithRawBlockReply(string cmd, size_t& len, std::chrono::milliseconds settle_time) {