From af971c6ab1711d620c4ad899d303a479f625bc9f Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 18 May 2016 16:00:26 +0200 Subject: [PATCH] Allow rate limiting in the Shared Memory example --- fairmq/devices/FairMQBenchmarkSampler.cxx | 41 +++++++++++++++++++++++ fairmq/devices/FairMQBenchmarkSampler.h | 4 +++ fairmq/run/runBenchmarkSampler.cxx | 5 ++- fairmq/run/startFairMQBenchmark.sh.in | 1 + 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 4e7bd991..9cb19ce1 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -26,6 +26,8 @@ using namespace std; FairMQBenchmarkSampler::FairMQBenchmarkSampler() : fMsgSize(10000) , fNumMsgs(0) + , fMsgCounter(0) + , fMsgRate(1) { } @@ -35,6 +37,8 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler() void FairMQBenchmarkSampler::Run() { + boost::thread resetMsgCounter(boost::bind(&FairMQBenchmarkSampler::ResetMsgCounter, this)); + int numSentMsgs = 0; unique_ptr baseMsg(fTransportFactory->CreateMessage(fMsgSize)); @@ -61,10 +65,40 @@ void FairMQBenchmarkSampler::Run() } } } + + --fMsgCounter; + + while (fMsgCounter == 0) { + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + } } LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state."; LOG(INFO) << "Sending time: "; + + try + { + resetMsgCounter.interrupt(); + resetMsgCounter.join(); + } + catch(boost::thread_resource_error& e) + { + LOG(ERROR) << e.what(); + exit(EXIT_FAILURE); + } +} + +void FairMQBenchmarkSampler::ResetMsgCounter() +{ + while (true) { + try { + fMsgCounter = fMsgRate / 100; + boost::this_thread::sleep(boost::posix_time::milliseconds(10)); + } catch (boost::thread_interrupted&) { + LOG(DEBUG) << "Event rate limiter thread interrupted"; + break; + } + } } void FairMQBenchmarkSampler::SetProperty(const int key, const string& value) @@ -93,6 +127,9 @@ void FairMQBenchmarkSampler::SetProperty(const int key, const int value) case MsgSize: fMsgSize = value; break; + case MsgRate: + fMsgRate = value; + break; case NumMsgs: fNumMsgs = value; break; @@ -108,6 +145,8 @@ int FairMQBenchmarkSampler::GetProperty(const int key, const int default_ /*= 0* { case MsgSize: return fMsgSize; + case MsgRate: + return fMsgRate; case NumMsgs: return fNumMsgs; default: @@ -123,6 +162,8 @@ string FairMQBenchmarkSampler::GetPropertyDescription(const int key) return "MsgSize: Size of the transfered message buffer."; case NumMsgs: return "NumMsgs: Number of messages to send."; + case MsgRate: + return "MsgRate: Maximum msg rate."; default: return FairMQDevice::GetPropertyDescription(key); } diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index 4e9afd40..4566f29d 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -37,6 +37,8 @@ class FairMQBenchmarkSampler : public FairMQDevice FairMQBenchmarkSampler(); virtual ~FairMQBenchmarkSampler(); + void ResetMsgCounter(); + virtual void SetProperty(const int key, const std::string& value); virtual std::string GetProperty(const int key, const std::string& default_ = ""); virtual void SetProperty(const int key, const int value); @@ -48,6 +50,8 @@ class FairMQBenchmarkSampler : public FairMQDevice protected: int fMsgSize; int fNumMsgs; + int fMsgCounter; + int fMsgRate; virtual void Run(); }; diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 3f26a011..a5c3e616 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -29,11 +29,13 @@ int main(int argc, char** argv) { int msgSize; int numMsgs; + int msgRate; options_description samplerOptions("Sampler options"); samplerOptions.add_options() ("msg-size", value(&msgSize)->default_value(1000), "Message size in bytes") - ("num-msgs", value(&numMsgs)->default_value(0), "Number of messages to send"); + ("num-msgs", value(&numMsgs)->default_value(0), "Number of messages to send") + ("msg-rate", value(&msgRate)->default_value(0), "Msg rate limit in maximum number of messages per second"); FairMQProgOptions config; config.AddToCmdLineOptions(samplerOptions); @@ -42,6 +44,7 @@ int main(int argc, char** argv) FairMQBenchmarkSampler sampler; sampler.SetProperty(FairMQBenchmarkSampler::MsgSize, msgSize); sampler.SetProperty(FairMQBenchmarkSampler::NumMsgs, numMsgs); + sampler.SetProperty(FairMQBenchmarkSampler::MsgRate, msgRate); runStateMachine(sampler, config); } diff --git a/fairmq/run/startFairMQBenchmark.sh.in b/fairmq/run/startFairMQBenchmark.sh.in index d9a2e47a..5648db56 100755 --- a/fairmq/run/startFairMQBenchmark.sh.in +++ b/fairmq/run/startFairMQBenchmark.sh.in @@ -28,6 +28,7 @@ SAMPLER+=" --id bsampler1" #SAMPLER+=" --control static" #SAMPLER+=" --transport nanomsg" SAMPLER+=" --msg-size $msgSize" +# SAMPLER+=" --msg-rate 1000" SAMPLER+=" --num-msgs $numMsgs" SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &