Skip to content

Commit ae90485

Browse files
committed
Added dynamic rate limiting to SCPITransport
1 parent 7f13107 commit ae90485

File tree

2 files changed

+61
-25
lines changed

2 files changed

+61
-25
lines changed

scopehal/SCPITransport.cpp

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ SCPITransport* SCPITransport::CreateTransport(const string& transport, const str
7979
@brief Pushes a command into the transmit FIFO then returns immediately.
8080
8181
This command will actually be sent the next time FlushCommandQueue() is called.
82+
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
8283
*/
83-
void SCPITransport::SendCommandQueued(const string& cmd)
84+
void SCPITransport::SendCommandQueued(const string& cmd, std::chrono::milliseconds settle_time)
8485
{
8586
lock_guard<mutex> lock(m_queueMutex);
8687

@@ -116,7 +117,8 @@ void SCPITransport::SendCommandQueued(const string& cmd)
116117
auto it = m_txQueue.begin();
117118
while(it != m_txQueue.end())
118119
{
119-
tmp = *it;
120+
auto pair = *it;
121+
tmp = pair.first;
120122

121123
//Split off subject, if we have one
122124
//(ignore leading colon)
@@ -142,7 +144,7 @@ void SCPITransport::SendCommandQueued(const string& cmd)
142144
{
143145
LogTrace("Deduplicating redundant %s command %s and pushing new command %s\n",
144146
ncmd.c_str(),
145-
(*it).c_str(),
147+
pair.first.c_str(),
146148
cmd.c_str());
147149

148150
auto oldit = it;
@@ -159,18 +161,35 @@ void SCPITransport::SendCommandQueued(const string& cmd)
159161

160162
}
161163

162-
m_txQueue.push_back(cmd);
164+
// Create a pair with cmd and settle_time
165+
std::pair<std::string, std::chrono::milliseconds> pair;
166+
pair = make_pair(cmd, settle_time);
167+
168+
// Push to queue
169+
m_txQueue.push_back(pair);
163170

164171
LogTrace("%zu commands now queued\n", m_txQueue.size());
165172
}
166173

167174
/**
168175
@brief Block until it's time to send the next command when rate limiting.
176+
177+
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
169178
*/
170-
void SCPITransport::RateLimitingWait()
179+
void SCPITransport::RateLimitingWait(std::chrono::milliseconds settle_time)
171180
{
172181
this_thread::sleep_until(m_nextCommandReady);
173-
m_nextCommandReady = chrono::system_clock::now() + m_rateLimitingInterval;
182+
183+
if(settle_time == std::chrono::milliseconds(0))
184+
{
185+
// Use the configured rate limit
186+
m_nextCommandReady = chrono::system_clock::now() + m_rateLimitingInterval;
187+
}
188+
else
189+
{
190+
// Use the specified settle_time
191+
m_nextCommandReady = chrono::system_clock::now() + settle_time;
192+
}
174193
}
175194

176195
/**
@@ -179,7 +198,7 @@ void SCPITransport::RateLimitingWait()
179198
bool SCPITransport::FlushCommandQueue()
180199
{
181200
//Grab the queue, then immediately release the mutex so we can do more queued sends
182-
list<string> tmp;
201+
std::list<std::pair<std::string, std::chrono::milliseconds>> tmp;
183202
{
184203
lock_guard<mutex> lock(m_queueMutex);
185204
tmp = std::move(m_txQueue);
@@ -190,11 +209,11 @@ bool SCPITransport::FlushCommandQueue()
190209
LogTrace("%zu commands being flushed\n", tmp.size());
191210

192211
lock_guard<recursive_mutex> lock(m_netMutex);
193-
for(auto str : tmp)
212+
for(auto pair : tmp)
194213
{
195214
if(m_rateLimitingEnabled)
196-
RateLimitingWait();
197-
SendCommand(str);
215+
RateLimitingWait(pair.second);
216+
SendCommand(pair.first);
198217
}
199218
return true;
200219
}
@@ -203,24 +222,26 @@ bool SCPITransport::FlushCommandQueue()
203222
@brief Sends a command (flushing any pending/queued commands first), then returns the response.
204223
205224
This is an atomic operation requiring no mutexing at the caller side.
225+
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
206226
*/
207-
string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon)
227+
string SCPITransport::SendCommandQueuedWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time)
208228
{
209229
FlushCommandQueue();
210-
return SendCommandImmediateWithReply(cmd, endOnSemicolon);
230+
return SendCommandImmediateWithReply(cmd, endOnSemicolon, settle_time);
211231
}
212232

213233
/**
214234
@brief Sends a command (jumping ahead of the queue), then returns the response.
215235
216236
This is an atomic operation requiring no mutexing at the caller side.
237+
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
217238
*/
218-
string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemicolon)
239+
string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemicolon, std::chrono::milliseconds settle_time)
219240
{
220241
lock_guard<recursive_mutex> lock(m_netMutex);
221242

222243
if(m_rateLimitingEnabled)
223-
RateLimitingWait();
244+
RateLimitingWait(settle_time);
224245

225246
SendCommand(cmd);
226247

@@ -229,26 +250,28 @@ string SCPITransport::SendCommandImmediateWithReply(string cmd, bool endOnSemico
229250

230251
/**
231252
@brief Sends a command (jumping ahead of the queue) which does not require a response.
253+
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
232254
*/
233-
void SCPITransport::SendCommandImmediate(string cmd)
255+
void SCPITransport::SendCommandImmediate(string cmd, std::chrono::milliseconds settle_time)
234256
{
235257
lock_guard<recursive_mutex> lock(m_netMutex);
236258

237259
if(m_rateLimitingEnabled)
238-
RateLimitingWait();
260+
RateLimitingWait(settle_time);
239261

240262
SendCommand(cmd);
241263
}
242264

243265
/**
244266
@brief Sends a command (jumping ahead of the queue) which reads a binary block response
267+
@param settle_time Rate limiting time for this specific command, see `EnableRateLimiting()´
245268
*/
246-
void* SCPITransport::SendCommandImmediateWithRawBlockReply(string cmd, size_t& len)
269+
void* SCPITransport::SendCommandImmediateWithRawBlockReply(string cmd, size_t& len, std::chrono::milliseconds settle_time)
247270
{
248271
lock_guard<recursive_mutex> lock(m_netMutex);
249272

250273
if(m_rateLimitingEnabled)
251-
RateLimitingWait();
274+
RateLimitingWait(settle_time);
252275
SendCommand(cmd);
253276

254277
//Read the length

scopehal/SCPITransport.h

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,11 @@ class SCPITransport
6060
TODO: look into a background thread or something that's automatically launched by the transport to do this
6161
after some kind of fixed timeout?
6262
*/
63-
void SendCommandQueued(const std::string& cmd);
64-
std::string SendCommandQueuedWithReply(std::string cmd, bool endOnSemicolon = true);
65-
void SendCommandImmediate(std::string cmd);
66-
std::string SendCommandImmediateWithReply(std::string cmd, bool endOnSemicolon = true);
67-
void* SendCommandImmediateWithRawBlockReply(std::string cmd, size_t& len);
63+
void SendCommandQueued(const std::string& cmd, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
64+
std::string SendCommandQueuedWithReply(std::string cmd, bool endOnSemicolon = true, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
65+
void SendCommandImmediate(std::string cmd, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
66+
std::string SendCommandImmediateWithReply(std::string cmd, bool endOnSemicolon = true, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
67+
void* SendCommandImmediateWithRawBlockReply(std::string cmd, size_t& len, std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
6868
bool FlushCommandQueue();
6969

7070
//Manual mutex locking for ReadRawData() etc
@@ -89,6 +89,19 @@ class SCPITransport
8989
should be used if at all possible.
9090
9191
Once rate limiting is enabled on a transport, it cannot be disabled.
92+
93+
Invidual commands can be rate limited with the parameter `settle_time` in each Send*() call. If `settle_time`
94+
is set to 0 (default value) it will default to the time specified in the rate limiting (if enabled). If
95+
`settle_time` is set to anything else than 0, then this time will be used to block all subsequent message for
96+
the specified amount of time.
97+
98+
Note that `settle_time` will always override the rate limit, even when a lower value is used.
99+
100+
When using `settle_time` on a write only call, it will block for the specified amount of time after the command
101+
is sent.
102+
103+
When using `settle_time` on a request, the message will be sent, a reply will be read back immidiately, and
104+
then the blocking will take place as the last step.
92105
*/
93106
void EnableRateLimiting(std::chrono::milliseconds interval)
94107
{
@@ -126,7 +139,7 @@ class SCPITransport
126139
static SCPITransport* CreateTransport(const std::string& transport, const std::string& args);
127140

128141
protected:
129-
void RateLimitingWait();
142+
void RateLimitingWait(std::chrono::milliseconds settle_time = std::chrono::milliseconds(0));
130143

131144
//Class enumeration
132145
typedef std::map< std::string, CreateProcType > CreateMapType;
@@ -135,7 +148,7 @@ class SCPITransport
135148
//Queued commands waiting to be sent
136149
std::mutex m_queueMutex;
137150
std::recursive_mutex m_netMutex;
138-
std::list<std::string> m_txQueue;
151+
std::list<std::pair<std::string, std::chrono::milliseconds>> m_txQueue;
139152

140153
//Set of commands that are OK to deduplicate
141154
std::set<std::string> m_dedupCommands;

0 commit comments

Comments
 (0)