diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 510c116f..69fef3ca 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -68,7 +68,7 @@ Topology::Topology(DDSTopology topo, DDSSession session) LOG(info) << "fair::mq::Topology Adding device " << d; fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); } - fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& condition, uint64_t senderId) { + fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) { LOG(debug) << "Received from " << senderId << ": " << msg; std::vector parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); @@ -97,22 +97,21 @@ Topology::Topology(DDSTopology topo, DDSSession session) fExecutionThread = std::thread(&Topology::WaitForState, this); } -auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback cb, const std::chrono::milliseconds& timeout) -> void +auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback cb, std::chrono::milliseconds timeout) -> void { { std::lock_guard guard(fMtx); if (fStateChangeOngoing) { LOG(error) << "State change already in progress, concurrent requested not yet supported"; - return; + return; // TODO call the callback with error msg } LOG(info) << "Initiating ChangeState with " << transition << " to " << fkExpectedState.at(transition); fStateChangeOngoing = true; fChangeStateCallback = cb; fStateChangeTimeout = timeout; + fTargetState = fkExpectedState.at(transition); fDDSSession.SendCommand(GetTransitionName(transition)); - - fTargetState = fkExpectedState.at(transition); } fExecutionCV.notify_one(); } @@ -121,33 +120,39 @@ void Topology::WaitForState() { while (!fShutdown) { if (fStateChangeOngoing) { - auto condition = [&] { - LOG(info) << "checking condition"; - LOG(info) << "fShutdown: " << fShutdown; - LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); - return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { - return i.second.state == fTargetState; - }); - }; + try { + auto condition = [&] { + // LOG(info) << "checking condition"; + // LOG(info) << "fShutdown: " << fShutdown; + // LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); + return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { + return i.second.state == fTargetState; + }); + }; - std::unique_lock lock(fMtx); + std::unique_lock lock(fMtx); - // TODO Fix the timeout version - if (fStateChangeTimeout > std::chrono::milliseconds(0)) { - LOG(debug) << "initiating wait with timeout"; - if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { - LOG(debug) << "timeout"; - fStateChangeOngoing = false; + // TODO Fix the timeout version + if (fStateChangeTimeout > std::chrono::milliseconds(0)) { + LOG(debug) << "initiating wait with timeout"; + if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { + LOG(debug) << "timeout"; + fStateChangeOngoing = false; + break; + } + } else { + LOG(debug) << "initiating wait without timeout"; + fCV.wait(lock, condition); + } + + fStateChangeOngoing = false; + if (fShutdown) { + // TODO call the callback here with Aborted result break; } - } else { - LOG(debug) << "initiating wait without timeout"; - fCV.wait(lock, condition); - } - - fStateChangeOngoing = false; - if (fShutdown) { - break; + } catch(std::exception& e) { + LOG(error) << "Error while processing state request: " << e.what(); + fChangeStateCallback(ChangeStateResult{AsyncOpResult::Error, fTopologyState}); } fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState}); diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index 675f8f3f..47efb9fb 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -26,6 +26,7 @@ namespace fair { namespace mq { +// TODO make this a struct with a readable string error msg enum class AsyncOpResult { Ok, Timeout, @@ -70,7 +71,7 @@ class Topology /// @brief Initiate state transition on all FairMQ devices in this topology /// @param t FairMQ device state machine transition /// @param cb Completion callback - auto ChangeState(TopologyTransition t, ChangeStateCallback cb, const std::chrono::milliseconds& timeout = std::chrono::milliseconds(0)) -> void; + auto ChangeState(TopologyTransition t, ChangeStateCallback cb, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) -> void; static const std::unordered_map> fkExpectedState; diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index a544943f..8b67b6ba 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -27,12 +27,37 @@ TEST_F(Topology, ChangeState) using fair::mq::sdk::TopologyTransition; Topology topo(mDDSTopo, mDDSSession); + Topology::ChangeStateResult r; fair::mq::tools::Semaphore blocker; topo.ChangeState(TopologyTransition::Stop, [&](Topology::ChangeStateResult result) { LOG(info) << result; + r = result; blocker.Signal(); }); blocker.Wait(); + EXPECT_EQ(r.rc, fair::mq::AsyncOpResult::Ok); + // TODO add the helper to check state consistency + for (const auto& e : r.state) { + EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready); + } } +// TEST_F(Topology, Timeout) +// { +// using fair::mq::sdk::Topology; +// using fair::mq::sdk::TopologyTransition; + +// Topology topo(mDDSTopo, mDDSSession); +// Topology::ChangeStateResult r; +// fair::mq::tools::Semaphore blocker; +// topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) { +// LOG(info) << result; +// blocker.Signal(); +// }, std::chrono::milliseconds(100)); +// blocker.Wait(); +// for (const auto& e : r.rc) { +// EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ok); +// } +// } + } // namespace