diff --git a/fairmq/States.cxx b/fairmq/States.cxx index a10666b8..2b8261ef 100644 --- a/fairmq/States.cxx +++ b/fairmq/States.cxx @@ -18,9 +18,10 @@ namespace fair namespace mq { -array stateNames = +array stateNames = { { + "UNDEFINED", "OK", "ERROR", "IDLE", @@ -41,6 +42,7 @@ array stateNames = unordered_map states = { + { "UNDEFINED", State::Undefined }, { "OK", State::Ok }, { "ERROR", State::Error }, { "IDLE", State::Idle }, diff --git a/fairmq/States.h b/fairmq/States.h index 8eafeef9..2d540ed6 100644 --- a/fairmq/States.h +++ b/fairmq/States.h @@ -20,6 +20,7 @@ namespace mq enum class State : int { + Undefined = 0, Ok, Error, Idle, @@ -39,7 +40,7 @@ enum class State : int enum class Transition : int { - Auto, + Auto = 0, InitDevice, CompleteInit, Bind, diff --git a/fairmq/sdk/Error.h b/fairmq/sdk/Error.h index b5a603ae..5f8ae5f2 100644 --- a/fairmq/sdk/Error.h +++ b/fairmq/sdk/Error.h @@ -25,11 +25,6 @@ struct RuntimeError : ::std::runtime_error {} }; -struct MixedStateError : RuntimeError -{ - using RuntimeError::RuntimeError; -}; - } /* namespace sdk */ enum class ErrorCode diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index a4138ac7..c530d79c 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -70,6 +70,37 @@ const std::map expectedState = { DeviceTransition::End, DeviceState::Exiting } }; +// mirrors DeviceState, but adds a "Mixed" state that represents a topology where devices are currently not in the same state. +enum class AggregatedTopologyState : int +{ + Undefined = static_cast(fair::mq::State::Undefined), + Ok = static_cast(fair::mq::State::Ok), + Error = static_cast(fair::mq::State::Error), + Idle = static_cast(fair::mq::State::Idle), + InitializingDevice = static_cast(fair::mq::State::InitializingDevice), + Initialized = static_cast(fair::mq::State::Initialized), + Binding = static_cast(fair::mq::State::Binding), + Bound = static_cast(fair::mq::State::Bound), + Connecting = static_cast(fair::mq::State::Connecting), + DeviceReady = static_cast(fair::mq::State::DeviceReady), + InitializingTask = static_cast(fair::mq::State::InitializingTask), + Ready = static_cast(fair::mq::State::Ready), + Running = static_cast(fair::mq::State::Running), + ResettingTask = static_cast(fair::mq::State::ResettingTask), + ResettingDevice = static_cast(fair::mq::State::ResettingDevice), + Exiting = static_cast(fair::mq::State::Exiting), + Mixed +}; + +inline std::ostream& operator<<(std::ostream& os, const AggregatedTopologyState& state) +{ + if (state == AggregatedTopologyState::Mixed) { + return os << "Mixed"; + } else { + return os << static_cast(state); + } +} + struct DeviceStatus { bool subscribed_to_state_changes; @@ -100,22 +131,22 @@ using TopologyStateByTask = std::unordered_map; using TopologyStateByCollection = std::unordered_map>; using TopologyTransition = fair::mq::Transition; -inline DeviceState AggregateState(const TopologyState& topologyState) +inline AggregatedTopologyState AggregateState(const TopologyState& topologyState) { DeviceState first = topologyState.begin()->state; if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { return i.state == first; })) { - return first; + return static_cast(first); } - throw MixedStateError("State is not uniform"); + return AggregatedTopologyState::Mixed; } inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) { - return AggregateState(topologyState) == state; + return AggregateState(topologyState) == static_cast(state); } inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState) @@ -760,9 +791,8 @@ class BasicTopology : public AsioBase { fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) { if (ContainsTask(stateData.at(s.second).taskId)) { - return stateData.at(s.second).state == fTargetCurrentState - && - (stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Ok); + return stateData.at(s.second).state == fTargetCurrentState && + (stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined); } else { return false; } @@ -774,8 +804,7 @@ class BasicTopology : public AsioBase { if (!fOp.IsCompleted() && ContainsTask(taskId)) { if (currentState == fTargetCurrentState && - (lastState == fTargetLastState || - fTargetLastState == DeviceState::Ok)) { + (lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined)) { ++fCount; } TryCompletion(); @@ -879,7 +908,7 @@ class BasicTopology : public AsioBase template auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token) { - return AsyncWaitForState(DeviceState::Ok, targetCurrentState, "", Duration(0), std::move(token)); + return AsyncWaitForState(DeviceState::Undefined, targetCurrentState, "", Duration(0), std::move(token)); } /// @brief Wait for selected FairMQ devices to reach given last & current state in this topology @@ -909,7 +938,7 @@ class BasicTopology : public AsioBase auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0)) -> std::error_code { - return WaitForState(DeviceState::Ok, targetCurrentState, path, timeout); + return WaitForState(DeviceState::Undefined, targetCurrentState, path, timeout); } using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult); @@ -1254,7 +1283,7 @@ class BasicTopology : public AsioBase int index = 0; for (const auto& task : fDDSTopo.GetTasks()) { - fStateData.push_back(DeviceStatus{false, DeviceState::Ok, DeviceState::Ok, task.GetId(), task.GetCollectionId()}); + fStateData.push_back(DeviceStatus{false, DeviceState::Undefined, DeviceState::Undefined, task.GetId(), task.GetCollectionId()}); fStateIndex.emplace(task.GetId(), index); index++; } diff --git a/fairmq/sdk/commands/Commands.cxx b/fairmq/sdk/commands/Commands.cxx index a3fb56d6..ece53af2 100644 --- a/fairmq/sdk/commands/Commands.cxx +++ b/fairmq/sdk/commands/Commands.cxx @@ -70,9 +70,10 @@ array typeNames = } }; -array fbStateToMQState = +array fbStateToMQState = { { + fair::mq::State::Undefined, fair::mq::State::Ok, fair::mq::State::Error, fair::mq::State::Idle, @@ -91,9 +92,10 @@ array fbStateToMQState = } }; -array mqStateToFBState = +array mqStateToFBState = { { + sdk::cmd::FBState_Undefined, sdk::cmd::FBState_Ok, sdk::cmd::FBState_Error, sdk::cmd::FBState_Idle, diff --git a/fairmq/sdk/commands/CommandsFormat.fbs b/fairmq/sdk/commands/CommandsFormat.fbs index e4ab69fb..0a011553 100644 --- a/fairmq/sdk/commands/CommandsFormat.fbs +++ b/fairmq/sdk/commands/CommandsFormat.fbs @@ -6,6 +6,7 @@ enum FBResult:byte { } enum FBState:byte { + Undefined, Ok, Error, Idle, diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index c9746cdc..6f1220c0 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -139,6 +139,32 @@ TEST_F(Topology, ChangeState) EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true); } +TEST_F(Topology, MixedState) +{ + using namespace fair::mq; + + sdk::Topology topo(mDDSTopo, mDDSSession); + auto result1(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/Sampler.*")); + LOG(info) << result1.first; + + EXPECT_EQ(result1.first, std::error_code()); + EXPECT_EQ(sdk::AggregateState(result1.second), sdk::AggregatedTopologyState::Mixed); + EXPECT_EQ(sdk::StateEqualsTo(result1.second, sdk::DeviceState::InitializingDevice), false); + auto const currentState1 = topo.GetCurrentState(); + EXPECT_EQ(sdk::AggregateState(currentState1), sdk::AggregatedTopologyState::Mixed); + EXPECT_EQ(sdk::StateEqualsTo(currentState1, sdk::DeviceState::InitializingDevice), false); + + auto result2(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/SinkGroup/.*")); + LOG(info) << result2.first; + + EXPECT_EQ(result2.first, std::error_code()); + EXPECT_EQ(sdk::AggregateState(result2.second), sdk::AggregatedTopologyState::InitializingDevice); + EXPECT_EQ(sdk::StateEqualsTo(result2.second, sdk::DeviceState::InitializingDevice), true); + auto const currentState2 = topo.GetCurrentState(); + EXPECT_EQ(sdk::AggregateState(currentState2), sdk::AggregatedTopologyState::InitializingDevice); + EXPECT_EQ(sdk::StateEqualsTo(currentState2, sdk::DeviceState::InitializingDevice), true); +} + TEST_F(Topology, AsyncChangeStateConcurrent) { using namespace fair::mq; @@ -191,9 +217,9 @@ TEST_F(Topology, AsyncChangeStateCollectionView) ASSERT_EQ(cstate.size(), 5); for (const auto& c : cstate) { LOG(debug) << "\t" << c.first; - State s; + sdk::AggregatedTopologyState s; ASSERT_NO_THROW(s = sdk::AggregateState(c.second)); - ASSERT_EQ(s, State::InitializingDevice); + ASSERT_EQ(s, static_cast(State::InitializingDevice)); LOG(debug) << "\tAggregated state: " << s; for (const auto& ds : c.second) { LOG(debug) << "\t\t" << ds.state;