mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Add Undefined and Mixed state for use in SDK
This commit is contained in:
parent
ba3a82b1df
commit
a6da208e79
|
@ -18,9 +18,10 @@ namespace fair
|
||||||
namespace mq
|
namespace mq
|
||||||
{
|
{
|
||||||
|
|
||||||
array<string, 15> stateNames =
|
array<string, 16> stateNames =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
|
"UNDEFINED",
|
||||||
"OK",
|
"OK",
|
||||||
"ERROR",
|
"ERROR",
|
||||||
"IDLE",
|
"IDLE",
|
||||||
|
@ -41,6 +42,7 @@ array<string, 15> stateNames =
|
||||||
|
|
||||||
unordered_map<string, State> states =
|
unordered_map<string, State> states =
|
||||||
{
|
{
|
||||||
|
{ "UNDEFINED", State::Undefined },
|
||||||
{ "OK", State::Ok },
|
{ "OK", State::Ok },
|
||||||
{ "ERROR", State::Error },
|
{ "ERROR", State::Error },
|
||||||
{ "IDLE", State::Idle },
|
{ "IDLE", State::Idle },
|
||||||
|
|
|
@ -20,6 +20,7 @@ namespace mq
|
||||||
|
|
||||||
enum class State : int
|
enum class State : int
|
||||||
{
|
{
|
||||||
|
Undefined = 0,
|
||||||
Ok,
|
Ok,
|
||||||
Error,
|
Error,
|
||||||
Idle,
|
Idle,
|
||||||
|
@ -39,7 +40,7 @@ enum class State : int
|
||||||
|
|
||||||
enum class Transition : int
|
enum class Transition : int
|
||||||
{
|
{
|
||||||
Auto,
|
Auto = 0,
|
||||||
InitDevice,
|
InitDevice,
|
||||||
CompleteInit,
|
CompleteInit,
|
||||||
Bind,
|
Bind,
|
||||||
|
|
|
@ -25,11 +25,6 @@ struct RuntimeError : ::std::runtime_error
|
||||||
{}
|
{}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct MixedStateError : RuntimeError
|
|
||||||
{
|
|
||||||
using RuntimeError::RuntimeError;
|
|
||||||
};
|
|
||||||
|
|
||||||
} /* namespace sdk */
|
} /* namespace sdk */
|
||||||
|
|
||||||
enum class ErrorCode
|
enum class ErrorCode
|
||||||
|
|
|
@ -70,6 +70,37 @@ const std::map<DeviceTransition, DeviceState> expectedState =
|
||||||
{ DeviceTransition::End, DeviceState::Exiting }
|
{ DeviceTransition::End, DeviceState::Exiting }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// mirrors DeviceState, but adds a "Mixed" state that represents a topology where devices are currently not in the same state.
|
||||||
|
enum class AggregatedTopologyState : int
|
||||||
|
{
|
||||||
|
Undefined = static_cast<int>(fair::mq::State::Undefined),
|
||||||
|
Ok = static_cast<int>(fair::mq::State::Ok),
|
||||||
|
Error = static_cast<int>(fair::mq::State::Error),
|
||||||
|
Idle = static_cast<int>(fair::mq::State::Idle),
|
||||||
|
InitializingDevice = static_cast<int>(fair::mq::State::InitializingDevice),
|
||||||
|
Initialized = static_cast<int>(fair::mq::State::Initialized),
|
||||||
|
Binding = static_cast<int>(fair::mq::State::Binding),
|
||||||
|
Bound = static_cast<int>(fair::mq::State::Bound),
|
||||||
|
Connecting = static_cast<int>(fair::mq::State::Connecting),
|
||||||
|
DeviceReady = static_cast<int>(fair::mq::State::DeviceReady),
|
||||||
|
InitializingTask = static_cast<int>(fair::mq::State::InitializingTask),
|
||||||
|
Ready = static_cast<int>(fair::mq::State::Ready),
|
||||||
|
Running = static_cast<int>(fair::mq::State::Running),
|
||||||
|
ResettingTask = static_cast<int>(fair::mq::State::ResettingTask),
|
||||||
|
ResettingDevice = static_cast<int>(fair::mq::State::ResettingDevice),
|
||||||
|
Exiting = static_cast<int>(fair::mq::State::Exiting),
|
||||||
|
Mixed
|
||||||
|
};
|
||||||
|
|
||||||
|
inline std::ostream& operator<<(std::ostream& os, const AggregatedTopologyState& state)
|
||||||
|
{
|
||||||
|
if (state == AggregatedTopologyState::Mixed) {
|
||||||
|
return os << "Mixed";
|
||||||
|
} else {
|
||||||
|
return os << static_cast<DeviceState>(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct DeviceStatus
|
struct DeviceStatus
|
||||||
{
|
{
|
||||||
bool subscribed_to_state_changes;
|
bool subscribed_to_state_changes;
|
||||||
|
@ -100,22 +131,22 @@ using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
|
||||||
using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
|
using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
|
||||||
using TopologyTransition = fair::mq::Transition;
|
using TopologyTransition = fair::mq::Transition;
|
||||||
|
|
||||||
inline DeviceState AggregateState(const TopologyState& topologyState)
|
inline AggregatedTopologyState AggregateState(const TopologyState& topologyState)
|
||||||
{
|
{
|
||||||
DeviceState first = topologyState.begin()->state;
|
DeviceState first = topologyState.begin()->state;
|
||||||
|
|
||||||
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
|
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
|
||||||
return i.state == first;
|
return i.state == first;
|
||||||
})) {
|
})) {
|
||||||
return first;
|
return static_cast<AggregatedTopologyState>(first);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw MixedStateError("State is not uniform");
|
return AggregatedTopologyState::Mixed;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
|
inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
|
||||||
{
|
{
|
||||||
return AggregateState(topologyState) == state;
|
return AggregateState(topologyState) == static_cast<AggregatedTopologyState>(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
|
inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
|
||||||
|
@ -760,9 +791,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
|
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
|
||||||
if (ContainsTask(stateData.at(s.second).taskId)) {
|
if (ContainsTask(stateData.at(s.second).taskId)) {
|
||||||
return stateData.at(s.second).state == fTargetCurrentState
|
return stateData.at(s.second).state == fTargetCurrentState &&
|
||||||
&&
|
(stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined);
|
||||||
(stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Ok);
|
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -774,8 +804,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
if (!fOp.IsCompleted() && ContainsTask(taskId)) {
|
if (!fOp.IsCompleted() && ContainsTask(taskId)) {
|
||||||
if (currentState == fTargetCurrentState &&
|
if (currentState == fTargetCurrentState &&
|
||||||
(lastState == fTargetLastState ||
|
(lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined)) {
|
||||||
fTargetLastState == DeviceState::Ok)) {
|
|
||||||
++fCount;
|
++fCount;
|
||||||
}
|
}
|
||||||
TryCompletion();
|
TryCompletion();
|
||||||
|
@ -879,7 +908,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
template<typename CompletionToken>
|
template<typename CompletionToken>
|
||||||
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
|
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
|
||||||
{
|
{
|
||||||
return AsyncWaitForState(DeviceState::Ok, targetCurrentState, "", Duration(0), std::move(token));
|
return AsyncWaitForState(DeviceState::Undefined, targetCurrentState, "", Duration(0), std::move(token));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief Wait for selected FairMQ devices to reach given last & current state in this topology
|
/// @brief Wait for selected FairMQ devices to reach given last & current state in this topology
|
||||||
|
@ -909,7 +938,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
|
auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
|
||||||
-> std::error_code
|
-> std::error_code
|
||||||
{
|
{
|
||||||
return WaitForState(DeviceState::Ok, targetCurrentState, path, timeout);
|
return WaitForState(DeviceState::Undefined, targetCurrentState, path, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
|
using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
|
||||||
|
@ -1254,7 +1283,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
int index = 0;
|
int index = 0;
|
||||||
|
|
||||||
for (const auto& task : fDDSTopo.GetTasks()) {
|
for (const auto& task : fDDSTopo.GetTasks()) {
|
||||||
fStateData.push_back(DeviceStatus{false, DeviceState::Ok, DeviceState::Ok, task.GetId(), task.GetCollectionId()});
|
fStateData.push_back(DeviceStatus{false, DeviceState::Undefined, DeviceState::Undefined, task.GetId(), task.GetCollectionId()});
|
||||||
fStateIndex.emplace(task.GetId(), index);
|
fStateIndex.emplace(task.GetId(), index);
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,9 +70,10 @@ array<string, 17> typeNames =
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
array<fair::mq::State, 15> fbStateToMQState =
|
array<fair::mq::State, 16> fbStateToMQState =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
|
fair::mq::State::Undefined,
|
||||||
fair::mq::State::Ok,
|
fair::mq::State::Ok,
|
||||||
fair::mq::State::Error,
|
fair::mq::State::Error,
|
||||||
fair::mq::State::Idle,
|
fair::mq::State::Idle,
|
||||||
|
@ -91,9 +92,10 @@ array<fair::mq::State, 15> fbStateToMQState =
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
array<sdk::cmd::FBState, 15> mqStateToFBState =
|
array<sdk::cmd::FBState, 16> mqStateToFBState =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
|
sdk::cmd::FBState_Undefined,
|
||||||
sdk::cmd::FBState_Ok,
|
sdk::cmd::FBState_Ok,
|
||||||
sdk::cmd::FBState_Error,
|
sdk::cmd::FBState_Error,
|
||||||
sdk::cmd::FBState_Idle,
|
sdk::cmd::FBState_Idle,
|
||||||
|
|
|
@ -6,6 +6,7 @@ enum FBResult:byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
enum FBState:byte {
|
enum FBState:byte {
|
||||||
|
Undefined,
|
||||||
Ok,
|
Ok,
|
||||||
Error,
|
Error,
|
||||||
Idle,
|
Idle,
|
||||||
|
|
|
@ -139,6 +139,32 @@ TEST_F(Topology, ChangeState)
|
||||||
EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true);
|
EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(Topology, MixedState)
|
||||||
|
{
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
sdk::Topology topo(mDDSTopo, mDDSSession);
|
||||||
|
auto result1(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/Sampler.*"));
|
||||||
|
LOG(info) << result1.first;
|
||||||
|
|
||||||
|
EXPECT_EQ(result1.first, std::error_code());
|
||||||
|
EXPECT_EQ(sdk::AggregateState(result1.second), sdk::AggregatedTopologyState::Mixed);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(result1.second, sdk::DeviceState::InitializingDevice), false);
|
||||||
|
auto const currentState1 = topo.GetCurrentState();
|
||||||
|
EXPECT_EQ(sdk::AggregateState(currentState1), sdk::AggregatedTopologyState::Mixed);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(currentState1, sdk::DeviceState::InitializingDevice), false);
|
||||||
|
|
||||||
|
auto result2(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/SinkGroup/.*"));
|
||||||
|
LOG(info) << result2.first;
|
||||||
|
|
||||||
|
EXPECT_EQ(result2.first, std::error_code());
|
||||||
|
EXPECT_EQ(sdk::AggregateState(result2.second), sdk::AggregatedTopologyState::InitializingDevice);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(result2.second, sdk::DeviceState::InitializingDevice), true);
|
||||||
|
auto const currentState2 = topo.GetCurrentState();
|
||||||
|
EXPECT_EQ(sdk::AggregateState(currentState2), sdk::AggregatedTopologyState::InitializingDevice);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(currentState2, sdk::DeviceState::InitializingDevice), true);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(Topology, AsyncChangeStateConcurrent)
|
TEST_F(Topology, AsyncChangeStateConcurrent)
|
||||||
{
|
{
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
@ -191,9 +217,9 @@ TEST_F(Topology, AsyncChangeStateCollectionView)
|
||||||
ASSERT_EQ(cstate.size(), 5);
|
ASSERT_EQ(cstate.size(), 5);
|
||||||
for (const auto& c : cstate) {
|
for (const auto& c : cstate) {
|
||||||
LOG(debug) << "\t" << c.first;
|
LOG(debug) << "\t" << c.first;
|
||||||
State s;
|
sdk::AggregatedTopologyState s;
|
||||||
ASSERT_NO_THROW(s = sdk::AggregateState(c.second));
|
ASSERT_NO_THROW(s = sdk::AggregateState(c.second));
|
||||||
ASSERT_EQ(s, State::InitializingDevice);
|
ASSERT_EQ(s, static_cast<sdk::AggregatedTopologyState>(State::InitializingDevice));
|
||||||
LOG(debug) << "\tAggregated state: " << s;
|
LOG(debug) << "\tAggregated state: " << s;
|
||||||
for (const auto& ds : c.second) {
|
for (const auto& ds : c.second) {
|
||||||
LOG(debug) << "\t\t" << ds.state;
|
LOG(debug) << "\t\t" << ds.state;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user