mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Add support and test for concurrent TransitionTo
This commit is contained in:
parent
74d301a16f
commit
5256e7c580
|
@ -106,6 +106,11 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version)
|
||||||
, fMaxRunRuntimeInS(DefaultMaxRunTime)
|
, fMaxRunRuntimeInS(DefaultMaxRunTime)
|
||||||
, fInitializationTimeoutInS(DefaultInitTimeout)
|
, fInitializationTimeoutInS(DefaultInitTimeout)
|
||||||
, fRawCmdLineArgs()
|
, fRawCmdLineArgs()
|
||||||
|
, fStates()
|
||||||
|
, fStatesMtx()
|
||||||
|
, fStatesCV()
|
||||||
|
, fTransitionMtx()
|
||||||
|
, fTransitioning(false)
|
||||||
{
|
{
|
||||||
SubscribeToNewTransition("device", [&](Transition transition) {
|
SubscribeToNewTransition("device", [&](Transition transition) {
|
||||||
LOG(trace) << "device notified on new transition: " << transition;
|
LOG(trace) << "device notified on new transition: " << transition;
|
||||||
|
@ -187,6 +192,15 @@ void FairMQDevice::WaitForState(fair::mq::State state)
|
||||||
|
|
||||||
void FairMQDevice::TransitionTo(const fair::mq::State s)
|
void FairMQDevice::TransitionTo(const fair::mq::State s)
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock(fTransitionMtx);
|
||||||
|
if (fTransitioning) {
|
||||||
|
LOG(debug) << "Attempting a transition with TransitionTo() while another one is already in progress";
|
||||||
|
throw OngoingTransition("Attempting a transition with TransitionTo() while another one is already in progress");
|
||||||
|
}
|
||||||
|
fTransitioning = true;
|
||||||
|
}
|
||||||
|
|
||||||
using fair::mq::State;
|
using fair::mq::State;
|
||||||
State currentState = GetCurrentState();
|
State currentState = GetCurrentState();
|
||||||
|
|
||||||
|
@ -225,6 +239,11 @@ void FairMQDevice::TransitionTo(const fair::mq::State s)
|
||||||
|
|
||||||
currentState = WaitForNextState();
|
currentState = WaitForNextState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock(fTransitionMtx);
|
||||||
|
fTransitioning = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool FairMQDevice::ChangeState(const int transition)
|
bool FairMQDevice::ChangeState(const int transition)
|
||||||
|
|
|
@ -44,6 +44,14 @@ using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChann
|
||||||
using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
|
using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
|
||||||
using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
|
using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
|
||||||
|
|
||||||
|
namespace fair
|
||||||
|
{
|
||||||
|
namespace mq
|
||||||
|
{
|
||||||
|
struct OngoingTransition : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class FairMQDevice
|
class FairMQDevice
|
||||||
{
|
{
|
||||||
friend class FairMQChannel;
|
friend class FairMQChannel;
|
||||||
|
@ -584,6 +592,9 @@ class FairMQDevice
|
||||||
std::queue<fair::mq::State> fStates;
|
std::queue<fair::mq::State> fStates;
|
||||||
std::mutex fStatesMtx;
|
std::mutex fStatesMtx;
|
||||||
std::condition_variable fStatesCV;
|
std::condition_variable fStatesCV;
|
||||||
|
|
||||||
|
std::mutex fTransitionMtx;
|
||||||
|
bool fTransitioning;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FAIRMQDEVICE_H_ */
|
#endif /* FAIRMQDEVICE_H_ */
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
|
|
||||||
#include <FairMQDevice.h>
|
#include <FairMQDevice.h>
|
||||||
|
#include <FairMQLogger.h>
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
@ -19,7 +20,19 @@ namespace
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
|
||||||
void transitionTo(const std::vector<State>& states, int numExpectedStates)
|
class SlowDevice : public FairMQDevice
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
SlowDevice() {}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void Init()
|
||||||
|
{
|
||||||
|
this_thread::sleep_for(chrono::milliseconds(100));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void transitionTo(const vector<State>& states, int numExpectedStates)
|
||||||
{
|
{
|
||||||
FairMQDevice device;
|
FairMQDevice device;
|
||||||
|
|
||||||
|
@ -55,4 +68,33 @@ TEST(Transitions, TransitionTo)
|
||||||
transitionTo({State::Running, State::Exiting}, 16);
|
transitionTo({State::Running, State::Exiting}, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(Transitions, ConcurrentTransitionTos)
|
||||||
|
{
|
||||||
|
fair::Logger::SetConsoleSeverity("debug");
|
||||||
|
SlowDevice slowDevice;
|
||||||
|
|
||||||
|
vector<State> states({State::Ready, State::Exiting});
|
||||||
|
|
||||||
|
thread t1([&] {
|
||||||
|
for (const auto& s : states) {
|
||||||
|
slowDevice.TransitionTo(s);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
thread t2([&] {
|
||||||
|
this_thread::sleep_for(chrono::milliseconds(50));
|
||||||
|
ASSERT_THROW(slowDevice.TransitionTo(State::Exiting), OngoingTransition);
|
||||||
|
});
|
||||||
|
|
||||||
|
slowDevice.RunStateMachine();
|
||||||
|
|
||||||
|
if (t1.joinable()) {
|
||||||
|
t1.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (t2.joinable()) {
|
||||||
|
t2.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
Loading…
Reference in New Issue
Block a user