Add sdk::GroupByCollectionId(TopologyState)

This commit is contained in:
Alexey Rybalchenko 2019-09-05 14:39:37 +02:00 committed by Dennis Klein
parent 2973ce0352
commit acbf57d6f3

View File

@ -21,8 +21,10 @@
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairmq/sdk/AsioAsyncOp.h> #include <fairmq/sdk/AsioAsyncOp.h>
#include <fairmq/sdk/AsioBase.h> #include <fairmq/sdk/AsioBase.h>
#include <fairmq/sdk/DDSCollection.h>
#include <fairmq/sdk/DDSInfo.h> #include <fairmq/sdk/DDSInfo.h>
#include <fairmq/sdk/DDSSession.h> #include <fairmq/sdk/DDSSession.h>
#include <fairmq/sdk/DDSTask.h>
#include <fairmq/sdk/DDSTopology.h> #include <fairmq/sdk/DDSTopology.h>
#include <fairmq/sdk/Error.h> #include <fairmq/sdk/Error.h>
#include <functional> #include <functional>
@ -61,23 +63,26 @@ struct DeviceStatus
{ {
bool initialized; bool initialized;
DeviceState state; DeviceState state;
DDSTask::Id taskId;
DDSCollection::Id collectionId;
}; };
using TopologyState = std::unordered_map<DDSTask::Id, DeviceStatus>; using TopologyState = std::vector<DeviceStatus>;
using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
using TopologyTransition = fair::mq::Transition; using TopologyTransition = fair::mq::Transition;
inline DeviceState AggregateState(const TopologyState& topologyState) inline DeviceState AggregateState(const TopologyState& topologyState)
{ {
DeviceState first = topologyState.begin()->second.state; DeviceState first = topologyState.begin()->state;
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
return i.second.state == first; return i.state == first;
})) { })) {
return first; return first;
} }
throw MixedStateError("State is not uniform"); throw MixedStateError("State is not uniform");
} }
inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
@ -85,6 +90,18 @@ inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
return AggregateState(topologyState) == state; return AggregateState(topologyState) == state;
} }
inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
{
TopologyStateByCollection state;
for (const auto& ds : topologyState) {
if (ds.collectionId != 0) {
state[ds.collectionId].push_back(ds);
}
}
return state;
}
/** /**
* @class BasicTopology Topology.h <fairmq/sdk/Topology.h> * @class BasicTopology Topology.h <fairmq/sdk/Topology.h>
* @tparam Executor Associated I/O executor * @tparam Executor Associated I/O executor
@ -167,7 +184,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
&& fState.at(fDDSSession.GetTaskId(senderId)).state != fChangeStateTarget) { && fState.at(fDDSSession.GetTaskId(senderId)).state != fChangeStateTarget) {
fChangeStateOpTimer.cancel(); fChangeStateOpTimer.cancel();
fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed),
fState); MakeTopologyStateFromMap());
} }
} }
}); });
@ -189,7 +206,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
std::lock_guard<std::mutex> lk(fMtx); std::lock_guard<std::mutex> lk(fMtx);
fDDSSession.UnsubscribeFromCommands(); fDDSSession.UnsubscribeFromCommands();
try { try {
fChangeStateOp.Cancel(fState); fChangeStateOp.Cancel(MakeTopologyStateFromMap());
} catch (...) {} } catch (...) {}
} }
@ -298,7 +315,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
fChangeStateOpTimer.async_wait([&](std::error_code ec) { fChangeStateOpTimer.async_wait([&](std::error_code ec) {
if (!ec) { if (!ec) {
std::lock_guard<std::mutex> lk2(fMtx); std::lock_guard<std::mutex> lk2(fMtx);
fChangeStateOp.Timeout(fState); fChangeStateOp.Timeout(MakeTopologyStateFromMap());
} }
}); });
} }
@ -359,7 +376,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
auto GetCurrentState() const -> TopologyState auto GetCurrentState() const -> TopologyState
{ {
std::lock_guard<std::mutex> lk(fMtx); std::lock_guard<std::mutex> lk(fMtx);
return fState; return MakeTopologyStateFromMap();
} }
auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); } auto AggregateState() const -> DeviceState { return sdk::AggregateState(GetCurrentState()); }
@ -369,7 +386,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
private: private:
DDSSession fDDSSession; DDSSession fDDSSession;
DDSTopology fDDSTopo; DDSTopology fDDSTopo;
TopologyState fState; TopologyStateByTask fState;
mutable std::mutex fMtx; mutable std::mutex fMtx;
using ChangeStateOp = AsioAsyncOp<Executor, Allocator, ChangeStateCompletionSignature>; using ChangeStateOp = AsioAsyncOp<Executor, Allocator, ChangeStateCompletionSignature>;
@ -377,11 +394,11 @@ class BasicTopology : public AsioBase<Executor, Allocator>
asio::steady_timer fChangeStateOpTimer; asio::steady_timer fChangeStateOpTimer;
DeviceState fChangeStateTarget; DeviceState fChangeStateTarget;
static auto makeTopologyState(const DDSTopo& topo) -> TopologyState static auto makeTopologyState(const DDSTopo& topo) -> TopologyStateByTask
{ {
TopologyState state; TopologyStateByTask state;
for (const auto& task : topo.GetTasks()) { for (const auto& task : topo.GetTasks()) {
state.emplace(task.GetId(), DeviceStatus{false, DeviceState::Ok}); state.emplace(task.GetId(), DeviceStatus{false, DeviceState::Ok, task.GetId(), task.GetCollectionId()});
} }
return state; return state;
} }
@ -392,7 +409,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
std::string endState = state.substr(pos + 2); std::string endState = state.substr(pos + 2);
try { try {
std::lock_guard<std::mutex> lk(fMtx); std::lock_guard<std::mutex> lk(fMtx);
fState[taskId] = DeviceStatus{true, fair::mq::GetState(endState)}; DeviceStatus& task = fState.at(taskId);
task.initialized = true;
task.state = fair::mq::GetState(endState);
LOG(debug) << "Updated state entry: taskId=" << taskId << ",state=" << state; LOG(debug) << "Updated state entry: taskId=" << taskId << ",state=" << state;
TryChangeStateCompletion(); TryChangeStateCompletion();
} catch (const std::exception& e) { } catch (const std::exception& e) {
@ -404,20 +423,32 @@ class BasicTopology : public AsioBase<Executor, Allocator>
auto TryChangeStateCompletion() -> void auto TryChangeStateCompletion() -> void
{ {
bool targetStateReached( bool targetStateReached(
std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { std::all_of(fState.cbegin(), fState.cend(), [&](TopologyStateByTask::value_type i) {
return (i.second.state == fChangeStateTarget) && i.second.initialized; return (i.second.state == fChangeStateTarget) && i.second.initialized;
})); }));
if (!fChangeStateOp.IsCompleted() && targetStateReached) { if (!fChangeStateOp.IsCompleted() && targetStateReached) {
fChangeStateOpTimer.cancel(); fChangeStateOpTimer.cancel();
fChangeStateOp.Complete(fState); fChangeStateOp.Complete(MakeTopologyStateFromMap());
} }
} }
/// call only under locked fMtx! /// call only under locked fMtx!
auto GetCurrentStateUnsafe() const -> TopologyState auto GetCurrentStateUnsafe() const -> TopologyState
{ {
return fState; return MakeTopologyStateFromMap();
}
auto MakeTopologyStateFromMap() const -> TopologyState
{
TopologyState state;
state.reserve(fState.size());
for (const auto& e : fState) {
state.push_back(e.second);
}
return state;
} }
}; };