Add timeout test, disable for now

This commit is contained in:
Alexey Rybalchenko 2019-07-23 16:07:14 +02:00 committed by Dennis Klein
parent be022cfab8
commit 7d1ee82c6b
3 changed files with 60 additions and 29 deletions

View File

@ -68,7 +68,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
LOG(info) << "fair::mq::Topology Adding device " << d; LOG(info) << "fair::mq::Topology Adding device " << d;
fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
} }
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& condition, uint64_t senderId) { fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
LOG(debug) << "Received from " << senderId << ": " << msg; LOG(debug) << "Received from " << senderId << ": " << msg;
std::vector<std::string> parts; std::vector<std::string> parts;
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
@ -97,22 +97,21 @@ Topology::Topology(DDSTopology topo, DDSSession session)
fExecutionThread = std::thread(&Topology::WaitForState, this); fExecutionThread = std::thread(&Topology::WaitForState, this);
} }
auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback cb, const std::chrono::milliseconds& timeout) -> void auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback cb, std::chrono::milliseconds timeout) -> void
{ {
{ {
std::lock_guard<std::mutex> guard(fMtx); std::lock_guard<std::mutex> guard(fMtx);
if (fStateChangeOngoing) { if (fStateChangeOngoing) {
LOG(error) << "State change already in progress, concurrent requested not yet supported"; LOG(error) << "State change already in progress, concurrent requested not yet supported";
return; return; // TODO call the callback with error msg
} }
LOG(info) << "Initiating ChangeState with " << transition << " to " << fkExpectedState.at(transition); LOG(info) << "Initiating ChangeState with " << transition << " to " << fkExpectedState.at(transition);
fStateChangeOngoing = true; fStateChangeOngoing = true;
fChangeStateCallback = cb; fChangeStateCallback = cb;
fStateChangeTimeout = timeout; fStateChangeTimeout = timeout;
fTargetState = fkExpectedState.at(transition);
fDDSSession.SendCommand(GetTransitionName(transition)); fDDSSession.SendCommand(GetTransitionName(transition));
fTargetState = fkExpectedState.at(transition);
} }
fExecutionCV.notify_one(); fExecutionCV.notify_one();
} }
@ -121,33 +120,39 @@ void Topology::WaitForState()
{ {
while (!fShutdown) { while (!fShutdown) {
if (fStateChangeOngoing) { if (fStateChangeOngoing) {
auto condition = [&] { try {
LOG(info) << "checking condition"; auto condition = [&] {
LOG(info) << "fShutdown: " << fShutdown; // LOG(info) << "checking condition";
LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); // LOG(info) << "fShutdown: " << fShutdown;
return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { // LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
return i.second.state == fTargetState; return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) {
}); return i.second.state == fTargetState;
}; });
};
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
// TODO Fix the timeout version // TODO Fix the timeout version
if (fStateChangeTimeout > std::chrono::milliseconds(0)) { if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
LOG(debug) << "initiating wait with timeout"; LOG(debug) << "initiating wait with timeout";
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
LOG(debug) << "timeout"; LOG(debug) << "timeout";
fStateChangeOngoing = false; fStateChangeOngoing = false;
break;
}
} else {
LOG(debug) << "initiating wait without timeout";
fCV.wait(lock, condition);
}
fStateChangeOngoing = false;
if (fShutdown) {
// TODO call the callback here with Aborted result
break; break;
} }
} else { } catch(std::exception& e) {
LOG(debug) << "initiating wait without timeout"; LOG(error) << "Error while processing state request: " << e.what();
fCV.wait(lock, condition); fChangeStateCallback(ChangeStateResult{AsyncOpResult::Error, fTopologyState});
}
fStateChangeOngoing = false;
if (fShutdown) {
break;
} }
fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState}); fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState});

View File

@ -26,6 +26,7 @@
namespace fair { namespace fair {
namespace mq { namespace mq {
// TODO make this a struct with a readable string error msg
enum class AsyncOpResult { enum class AsyncOpResult {
Ok, Ok,
Timeout, Timeout,
@ -70,7 +71,7 @@ class Topology
/// @brief Initiate state transition on all FairMQ devices in this topology /// @brief Initiate state transition on all FairMQ devices in this topology
/// @param t FairMQ device state machine transition /// @param t FairMQ device state machine transition
/// @param cb Completion callback /// @param cb Completion callback
auto ChangeState(TopologyTransition t, ChangeStateCallback cb, const std::chrono::milliseconds& timeout = std::chrono::milliseconds(0)) -> void; auto ChangeState(TopologyTransition t, ChangeStateCallback cb, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) -> void;
static const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTransition>> fkExpectedState; static const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTransition>> fkExpectedState;

View File

@ -27,12 +27,37 @@ TEST_F(Topology, ChangeState)
using fair::mq::sdk::TopologyTransition; using fair::mq::sdk::TopologyTransition;
Topology topo(mDDSTopo, mDDSSession); Topology topo(mDDSTopo, mDDSSession);
Topology::ChangeStateResult r;
fair::mq::tools::Semaphore blocker; fair::mq::tools::Semaphore blocker;
topo.ChangeState(TopologyTransition::Stop, [&](Topology::ChangeStateResult result) { topo.ChangeState(TopologyTransition::Stop, [&](Topology::ChangeStateResult result) {
LOG(info) << result; LOG(info) << result;
r = result;
blocker.Signal(); blocker.Signal();
}); });
blocker.Wait(); blocker.Wait();
EXPECT_EQ(r.rc, fair::mq::AsyncOpResult::Ok);
// TODO add the helper to check state consistency
for (const auto& e : r.state) {
EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready);
}
} }
// TEST_F(Topology, Timeout)
// {
// using fair::mq::sdk::Topology;
// using fair::mq::sdk::TopologyTransition;
// Topology topo(mDDSTopo, mDDSSession);
// Topology::ChangeStateResult r;
// fair::mq::tools::Semaphore blocker;
// topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) {
// LOG(info) << result;
// blocker.Signal();
// }, std::chrono::milliseconds(100));
// blocker.Wait();
// for (const auto& e : r.rc) {
// EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ok);
// }
// }
} // namespace } // namespace