mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
SDK: Refactor StateChangeOp and add path parameter
This commit is contained in:
parent
24aabdb854
commit
a20ac7af08
|
@ -174,9 +174,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
, fDDSTopo(std::move(topo))
|
, fDDSTopo(std::move(topo))
|
||||||
, fStateData()
|
, fStateData()
|
||||||
, fStateIndex()
|
, fStateIndex()
|
||||||
, fChangeStateOp()
|
|
||||||
, fChangeStateOpTimer(ex)
|
|
||||||
, fChangeStateTarget(DeviceState::Idle)
|
|
||||||
{
|
{
|
||||||
makeTopologyState();
|
makeTopologyState();
|
||||||
|
|
||||||
|
@ -216,10 +213,13 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
auto _cmd = static_cast<TransitionStatus&>(*cmd);
|
auto _cmd = static_cast<TransitionStatus&>(*cmd);
|
||||||
if (_cmd.GetResult() != Result::Ok) {
|
if (_cmd.GetResult() != Result::Ok) {
|
||||||
FAIR_LOG(error) << _cmd.GetTransition() << " transition failed for " << _cmd.GetDeviceId();
|
FAIR_LOG(error) << _cmd.GetTransition() << " transition failed for " << _cmd.GetDeviceId();
|
||||||
|
DDSTask::Id id(_cmd.GetTaskId());
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
if (!fChangeStateOp.IsCompleted() && fStateData.at(fStateIndex.at(_cmd.GetTaskId())).state != fChangeStateTarget) {
|
for (auto& op : fChangeStateOps) {
|
||||||
fChangeStateOpTimer.cancel();
|
if (!op.second.IsCompleted() && op.second.ContainsTask(id) &&
|
||||||
fChangeStateOp.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed), fStateData);
|
fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) {
|
||||||
|
op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -270,15 +270,172 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
fDDSSession.UnsubscribeFromCommands();
|
fDDSSession.UnsubscribeFromCommands();
|
||||||
try {
|
try {
|
||||||
fChangeStateOp.Cancel(fStateData);
|
for (auto& op : fChangeStateOps) {
|
||||||
|
op.second.Complete(MakeErrorCode(ErrorCode::OperationCanceled));
|
||||||
|
}
|
||||||
} catch (...) {}
|
} catch (...) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto HandleCmd(cmd::StateChange const& cmd) -> void
|
||||||
|
{
|
||||||
|
DDSTask::Id taskId(cmd.GetTaskId());
|
||||||
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
|
|
||||||
|
try {
|
||||||
|
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||||
|
task.initialized = true;
|
||||||
|
task.lastState = cmd.GetLastState();
|
||||||
|
task.state = cmd.GetCurrentState();
|
||||||
|
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
||||||
|
|
||||||
|
for (auto& op : fChangeStateOps) {
|
||||||
|
op.second.Update(taskId, cmd.GetCurrentState());
|
||||||
|
}
|
||||||
|
for (auto& op : fWaitForStateOps) {
|
||||||
|
op.second.Update(taskId, cmd.GetLastState(), cmd.GetCurrentState());
|
||||||
|
}
|
||||||
|
} catch (const std::exception& e) {
|
||||||
|
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChange const&): " << e.what();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto HandleCmd(cmd::Properties const& cmd) -> void
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(fMtx);
|
||||||
|
try {
|
||||||
|
auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
|
||||||
|
lk.unlock();
|
||||||
|
op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
|
||||||
|
} catch (std::out_of_range& e) {
|
||||||
|
FAIR_LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId()
|
||||||
|
<< ") not found (probably completed or timed out), "
|
||||||
|
<< "discarding reply of device " << cmd.GetDeviceId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto HandleCmd(cmd::PropertiesSet const& cmd) -> void
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(fMtx);
|
||||||
|
try {
|
||||||
|
auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
|
||||||
|
lk.unlock();
|
||||||
|
op.Update(cmd.GetDeviceId(), cmd.GetResult());
|
||||||
|
} catch (std::out_of_range& e) {
|
||||||
|
FAIR_LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId()
|
||||||
|
<< ") not found (probably completed or timed out), "
|
||||||
|
<< "discarding reply of device " << cmd.GetDeviceId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
using Duration = std::chrono::milliseconds;
|
using Duration = std::chrono::milliseconds;
|
||||||
using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
|
using ChangeStateCompletionSignature = void(std::error_code, TopologyState);
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct ChangeStateOp
|
||||||
|
{
|
||||||
|
using Id = std::size_t;
|
||||||
|
using Count = unsigned int;
|
||||||
|
|
||||||
|
template<typename Handler>
|
||||||
|
ChangeStateOp(Id id,
|
||||||
|
const TopologyTransition transition,
|
||||||
|
std::vector<DDSTask> tasks,
|
||||||
|
TopologyState& stateData,
|
||||||
|
Duration timeout,
|
||||||
|
std::mutex& mutex,
|
||||||
|
Executor const & ex,
|
||||||
|
Allocator const & alloc,
|
||||||
|
Handler&& handler)
|
||||||
|
: fId(id)
|
||||||
|
, fOp(ex, alloc, std::move(handler))
|
||||||
|
, fStateData(stateData)
|
||||||
|
, fTimer(ex)
|
||||||
|
, fCount(0)
|
||||||
|
, fTasks(std::move(tasks))
|
||||||
|
, fTargetState(expectedState.at(transition))
|
||||||
|
, fMtx(mutex)
|
||||||
|
{
|
||||||
|
if (timeout > std::chrono::milliseconds(0)) {
|
||||||
|
fTimer.expires_after(timeout);
|
||||||
|
fTimer.async_wait([&](std::error_code ec) {
|
||||||
|
if (!ec) {
|
||||||
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
|
fOp.Timeout(fStateData);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ChangeStateOp() = delete;
|
||||||
|
ChangeStateOp(const ChangeStateOp&) = delete;
|
||||||
|
ChangeStateOp& operator=(const ChangeStateOp&) = delete;
|
||||||
|
ChangeStateOp(ChangeStateOp&&) = default;
|
||||||
|
ChangeStateOp& operator=(ChangeStateOp&&) = default;
|
||||||
|
~ChangeStateOp() = default;
|
||||||
|
|
||||||
|
/// precondition: fMtx is locked.
|
||||||
|
auto ResetCount(const TopologyStateIndex& stateIndex, const TopologyState& stateData) -> void
|
||||||
|
{
|
||||||
|
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
|
||||||
|
if (ContainsTask(stateData.at(s.second).taskId)) {
|
||||||
|
return stateData.at(s.second).state == fTargetState;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// precondition: fMtx is locked.
|
||||||
|
auto Update(const DDSTask::Id taskId, const DeviceState currentState) -> void
|
||||||
|
{
|
||||||
|
if (!fOp.IsCompleted() && ContainsTask(taskId)) {
|
||||||
|
if (currentState == fTargetState) {
|
||||||
|
++fCount;
|
||||||
|
}
|
||||||
|
TryCompletion();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// precondition: fMtx is locked.
|
||||||
|
auto TryCompletion() -> void
|
||||||
|
{
|
||||||
|
if (!fOp.IsCompleted() && fCount == fTasks.size()) {
|
||||||
|
Complete(std::error_code());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// precondition: fMtx is locked.
|
||||||
|
auto Complete(std::error_code ec) -> void
|
||||||
|
{
|
||||||
|
fTimer.cancel();
|
||||||
|
fOp.Complete(ec, fStateData);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// precondition: fMtx is locked.
|
||||||
|
auto ContainsTask(DDSTask::Id id) -> bool
|
||||||
|
{
|
||||||
|
auto it = std::find_if(fTasks.begin(), fTasks.end(), [id](const DDSTask& t) { return t.GetId() == id; });
|
||||||
|
return it != fTasks.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool IsCompleted() { return fOp.IsCompleted(); }
|
||||||
|
|
||||||
|
auto GetTargetState() const -> DeviceState { return fTargetState; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
Id const fId;
|
||||||
|
AsioAsyncOp<Executor, Allocator, ChangeStateCompletionSignature> fOp;
|
||||||
|
TopologyState& fStateData;
|
||||||
|
asio::steady_timer fTimer;
|
||||||
|
Count fCount;
|
||||||
|
std::vector<DDSTask> fTasks;
|
||||||
|
DeviceState fTargetState;
|
||||||
|
std::mutex& fMtx;
|
||||||
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
/// @brief Initiate state transition on all FairMQ devices in this topology
|
/// @brief Initiate state transition on all FairMQ devices in this topology
|
||||||
/// @param transition FairMQ device state machine transition
|
/// @param transition FairMQ device state machine transition
|
||||||
|
/// @param path Select a subset of FairMQ devices in this topology, empty selects all
|
||||||
/// @param timeout Timeout in milliseconds, 0 means no timeout
|
/// @param timeout Timeout in milliseconds, 0 means no timeout
|
||||||
/// @param token Asio completion token
|
/// @param token Asio completion token
|
||||||
/// @tparam CompletionToken Asio completion token type
|
/// @tparam CompletionToken Asio completion token type
|
||||||
|
@ -301,8 +458,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
/// // async operation canceled
|
/// // async operation canceled
|
||||||
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
|
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
|
||||||
/// // failed to change state of a fairmq device
|
/// // failed to change state of a fairmq device
|
||||||
/// case fair::mq::ErrorCode::OperationInProgress:
|
|
||||||
/// // async operation already in progress
|
|
||||||
/// default:
|
/// default:
|
||||||
/// }
|
/// }
|
||||||
/// }
|
/// }
|
||||||
|
@ -327,8 +482,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
/// // async operation canceled
|
/// // async operation canceled
|
||||||
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
|
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
|
||||||
/// // failed to change state of a fairmq device
|
/// // failed to change state of a fairmq device
|
||||||
/// case fair::mq::ErrorCode::OperationInProgress:
|
|
||||||
/// // async operation already in progress
|
|
||||||
/// default:
|
/// default:
|
||||||
/// }
|
/// }
|
||||||
/// }
|
/// }
|
||||||
|
@ -352,8 +505,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
/// // async operation canceled
|
/// // async operation canceled
|
||||||
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
|
/// case fair::mq::ErrorCode::DeviceChangeStateFailed:
|
||||||
/// // failed to change state of a fairmq device
|
/// // failed to change state of a fairmq device
|
||||||
/// case fair::mq::ErrorCode::OperationInProgress:
|
|
||||||
/// // async operation already in progress
|
|
||||||
/// default:
|
/// default:
|
||||||
/// }
|
/// }
|
||||||
/// }
|
/// }
|
||||||
|
@ -361,76 +512,112 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
/// @endcode
|
/// @endcode
|
||||||
template<typename CompletionToken>
|
template<typename CompletionToken>
|
||||||
auto AsyncChangeState(const TopologyTransition transition,
|
auto AsyncChangeState(const TopologyTransition transition,
|
||||||
|
const std::string& path,
|
||||||
Duration timeout,
|
Duration timeout,
|
||||||
CompletionToken&& token)
|
CompletionToken&& token)
|
||||||
{
|
{
|
||||||
return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](auto handler) {
|
return asio::async_initiate<CompletionToken, ChangeStateCompletionSignature>([&](auto handler) {
|
||||||
|
typename ChangeStateOp::Id const id(tools::UuidHash());
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
std::lock_guard<std::mutex> lk(fMtx);
|
||||||
|
|
||||||
if (fChangeStateOp.IsCompleted()) {
|
for (auto it = begin(fChangeStateOps); it != end(fChangeStateOps);) {
|
||||||
fChangeStateOp = ChangeStateOp(AsioBase<Executor, Allocator>::GetExecutor(),
|
if (it->second.IsCompleted()) {
|
||||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
it = fChangeStateOps.erase(it);
|
||||||
std::move(handler));
|
} else {
|
||||||
fChangeStateTarget = expectedState.at(transition);
|
++it;
|
||||||
ResetTransitionedCount(fChangeStateTarget);
|
|
||||||
cmd::Cmds cmds(cmd::make<cmd::ChangeState>(transition));
|
|
||||||
fDDSSession.SendCommand(cmds.Serialize());
|
|
||||||
if (timeout > std::chrono::milliseconds(0)) {
|
|
||||||
fChangeStateOpTimer.expires_after(timeout);
|
|
||||||
fChangeStateOpTimer.async_wait([&](std::error_code ec) {
|
|
||||||
if (!ec) {
|
|
||||||
std::lock_guard<std::mutex> lk2(fMtx);
|
|
||||||
fChangeStateOp.Timeout(fStateData);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// TODO refactor to hide boiler plate
|
|
||||||
auto ex2(asio::get_associated_executor(handler, AsioBase<Executor, Allocator>::GetExecutor()));
|
|
||||||
auto alloc2(asio::get_associated_allocator(handler, AsioBase<Executor, Allocator>::GetAllocator()));
|
|
||||||
auto state(GetCurrentStateUnsafe());
|
|
||||||
|
|
||||||
ex2.post([h = std::move(handler), s = std::move(state)]() mutable {
|
|
||||||
try {
|
|
||||||
h(MakeErrorCode(ErrorCode::OperationInProgress), s);
|
|
||||||
} catch (const std::exception& e) {
|
|
||||||
FAIR_LOG(error) << "Uncaught exception in completion handler: " << e.what();
|
|
||||||
} catch (...) {
|
|
||||||
FAIR_LOG(error) << "Unknown uncaught exception in completion handler.";
|
|
||||||
}
|
|
||||||
},
|
|
||||||
alloc2);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto p = fChangeStateOps.emplace(
|
||||||
|
std::piecewise_construct,
|
||||||
|
std::forward_as_tuple(id),
|
||||||
|
std::forward_as_tuple(id,
|
||||||
|
transition,
|
||||||
|
fDDSTopo.GetTasks(path),
|
||||||
|
fStateData,
|
||||||
|
timeout,
|
||||||
|
fMtx,
|
||||||
|
AsioBase<Executor, Allocator>::GetExecutor(),
|
||||||
|
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||||
|
std::move(handler)));
|
||||||
|
|
||||||
|
cmd::Cmds cmds(cmd::make<cmd::ChangeState>(transition));
|
||||||
|
fDDSSession.SendCommand(cmds.Serialize(), path);
|
||||||
|
|
||||||
|
p.first->second.ResetCount(fStateIndex, fStateData);
|
||||||
|
// TODO: make sure following operation properly queues the completion and not doing it directly out of initiation call.
|
||||||
|
p.first->second.TryCompletion();
|
||||||
|
|
||||||
},
|
},
|
||||||
token);
|
token);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief Initiate state transition on all FairMQ devices in this topology
|
||||||
|
/// @param transition FairMQ device state machine transition
|
||||||
|
/// @param token Asio completion token
|
||||||
|
/// @tparam CompletionToken Asio completion token type
|
||||||
|
/// @throws std::system_error
|
||||||
template<typename CompletionToken>
|
template<typename CompletionToken>
|
||||||
auto AsyncChangeState(const TopologyTransition transition, CompletionToken&& token)
|
auto AsyncChangeState(const TopologyTransition transition, CompletionToken&& token)
|
||||||
{
|
{
|
||||||
return AsyncChangeState(transition, Duration(0), std::move(token));
|
return AsyncChangeState(transition, "", Duration(0), std::move(token));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief Perform state transition on all FairMQ devices in this topology
|
/// @brief Initiate state transition on all FairMQ devices in this topology with a timeout
|
||||||
/// @param transition FairMQ device state machine transition
|
/// @param transition FairMQ device state machine transition
|
||||||
/// @param timeout Timeout in milliseconds, 0 means no timeout
|
/// @param timeout Timeout in milliseconds, 0 means no timeout
|
||||||
|
/// @param token Asio completion token
|
||||||
|
/// @tparam CompletionToken Asio completion token type
|
||||||
/// @throws std::system_error
|
/// @throws std::system_error
|
||||||
auto ChangeState(const TopologyTransition transition, Duration timeout = Duration(0))
|
template<typename CompletionToken>
|
||||||
|
auto AsyncChangeState(const TopologyTransition transition, Duration timeout, CompletionToken&& token)
|
||||||
|
{
|
||||||
|
return AsyncChangeState(transition, "", timeout, std::move(token));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief Initiate state transition on all FairMQ devices in this topology with a timeout
|
||||||
|
/// @param transition FairMQ device state machine transition
|
||||||
|
/// @param path Select a subset of FairMQ devices in this topology, empty selects all
|
||||||
|
/// @param token Asio completion token
|
||||||
|
/// @tparam CompletionToken Asio completion token type
|
||||||
|
/// @throws std::system_error
|
||||||
|
template<typename CompletionToken>
|
||||||
|
auto AsyncChangeState(const TopologyTransition transition, const std::string& path, CompletionToken&& token)
|
||||||
|
{
|
||||||
|
return AsyncChangeState(transition, path, Duration(0), std::move(token));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// @brief Perform state transition on FairMQ devices in this topology for a specified topology path
|
||||||
|
/// @param transition FairMQ device state machine transition
|
||||||
|
/// @param path Select a subset of FairMQ devices in this topology, empty selects all
|
||||||
|
/// @param timeout Timeout in milliseconds, 0 means no timeout
|
||||||
|
/// @throws std::system_error
|
||||||
|
auto ChangeState(const TopologyTransition transition, const std::string& path = "", Duration timeout = Duration(0))
|
||||||
-> std::pair<std::error_code, TopologyState>
|
-> std::pair<std::error_code, TopologyState>
|
||||||
{
|
{
|
||||||
tools::SharedSemaphore blocker;
|
tools::SharedSemaphore blocker;
|
||||||
std::error_code ec;
|
std::error_code ec;
|
||||||
TopologyState state;
|
TopologyState state;
|
||||||
AsyncChangeState(
|
AsyncChangeState(transition, path, timeout, [&, blocker](std::error_code _ec, TopologyState _state) mutable {
|
||||||
transition, timeout, [&, blocker](std::error_code _ec, TopologyState _state) mutable {
|
ec = _ec;
|
||||||
ec = _ec;
|
state = _state;
|
||||||
state = _state;
|
blocker.Signal();
|
||||||
blocker.Signal();
|
});
|
||||||
});
|
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
return {ec, state};
|
return {ec, state};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @brief Perform state transition on all FairMQ devices in this topology with a timeout
|
||||||
|
/// @param transition FairMQ device state machine transition
|
||||||
|
/// @param timeout Timeout in milliseconds, 0 means no timeout
|
||||||
|
/// @throws std::system_error
|
||||||
|
auto ChangeState(const TopologyTransition transition, Duration timeout)
|
||||||
|
-> std::pair<std::error_code, TopologyState>
|
||||||
|
{
|
||||||
|
return ChangeState(transition, "", timeout);
|
||||||
|
}
|
||||||
|
|
||||||
/// @brief Returns the current state of the topology
|
/// @brief Returns the current state of the topology
|
||||||
/// @return map of id : DeviceStatus (initialized, state)
|
/// @return map of id : DeviceStatus (initialized, state)
|
||||||
auto GetCurrentState() const -> TopologyState
|
auto GetCurrentState() const -> TopologyState
|
||||||
|
@ -492,13 +679,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
{
|
{
|
||||||
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
|
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
|
||||||
if (ContainsTask(stateData.at(s.second).taskId)) {
|
if (ContainsTask(stateData.at(s.second).taskId)) {
|
||||||
if (stateData.at(s.second).state == fTargetCurrentState &&
|
return stateData.at(s.second).state == fTargetCurrentState
|
||||||
(stateData.at(s.second).lastState == fTargetLastState ||
|
&&
|
||||||
fTargetLastState == DeviceState::Ok)) {
|
(stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Ok);
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -547,17 +730,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto HandleCmd(cmd::StateChange const& cmd) -> void
|
|
||||||
{
|
|
||||||
DDSTask::Id taskId(cmd.GetTaskId());
|
|
||||||
UpdateStateEntry(taskId, cmd.GetLastState(), cmd.GetCurrentState());
|
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
|
||||||
for (auto& op : fWaitForStateOps) {
|
|
||||||
op.second.Update(taskId, cmd.GetLastState(), cmd.GetCurrentState());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/// @brief Initiate waiting for selected FairMQ devices to reach given last & current state in this topology
|
/// @brief Initiate waiting for selected FairMQ devices to reach given last & current state in this topology
|
||||||
/// @param targetLastState the target last device state to wait for
|
/// @param targetLastState the target last device state to wait for
|
||||||
|
@ -600,6 +772,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
AsioBase<Executor, Allocator>::GetAllocator(),
|
AsioBase<Executor, Allocator>::GetAllocator(),
|
||||||
std::move(handler)));
|
std::move(handler)));
|
||||||
p.first->second.ResetCount(fStateIndex, fStateData);
|
p.first->second.ResetCount(fStateIndex, fStateData);
|
||||||
|
// TODO: make sure following operation properly queues the completion and not doing it directly out of initiation call.
|
||||||
p.first->second.TryCompletion();
|
p.first->second.TryCompletion();
|
||||||
},
|
},
|
||||||
token);
|
token);
|
||||||
|
@ -736,20 +909,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto HandleCmd(cmd::Properties const& cmd) -> void
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lk(fMtx);
|
|
||||||
try {
|
|
||||||
auto& op(fGetPropertiesOps.at(cmd.GetRequestId()));
|
|
||||||
lk.unlock();
|
|
||||||
op.Update(cmd.GetDeviceId(), cmd.GetResult(), cmd.GetProps());
|
|
||||||
} catch (std::out_of_range& e) {
|
|
||||||
FAIR_LOG(debug) << "GetProperties operation (request id: " << cmd.GetRequestId()
|
|
||||||
<< ") not found (probably completed or timed out), "
|
|
||||||
<< "discarding reply of device " << cmd.GetDeviceId();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/// @brief Initiate property query on selected FairMQ devices in this topology
|
/// @brief Initiate property query on selected FairMQ devices in this topology
|
||||||
/// @param query Key(s) to be queried (regex)
|
/// @param query Key(s) to be queried (regex)
|
||||||
|
@ -903,20 +1062,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
auto HandleCmd(cmd::PropertiesSet const& cmd) -> void
|
|
||||||
{
|
|
||||||
std::unique_lock<std::mutex> lk(fMtx);
|
|
||||||
try {
|
|
||||||
auto& op(fSetPropertiesOps.at(cmd.GetRequestId()));
|
|
||||||
lk.unlock();
|
|
||||||
op.Update(cmd.GetDeviceId(), cmd.GetResult());
|
|
||||||
} catch (std::out_of_range& e) {
|
|
||||||
FAIR_LOG(debug) << "SetProperties operation (request id: " << cmd.GetRequestId()
|
|
||||||
<< ") not found (probably completed or timed out), "
|
|
||||||
<< "discarding reply of device " << cmd.GetDeviceId();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/// @brief Initiate property update on selected FairMQ devices in this topology
|
/// @brief Initiate property update on selected FairMQ devices in this topology
|
||||||
/// @param props Properties to set
|
/// @param props Properties to set
|
||||||
|
@ -1002,15 +1147,10 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
TopologyStateIndex fStateIndex;
|
TopologyStateIndex fStateIndex;
|
||||||
mutable std::mutex fMtx;
|
mutable std::mutex fMtx;
|
||||||
|
|
||||||
using ChangeStateOp = AsioAsyncOp<Executor, Allocator, ChangeStateCompletionSignature>;
|
std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
|
||||||
ChangeStateOp fChangeStateOp;
|
std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
|
||||||
asio::steady_timer fChangeStateOpTimer;
|
|
||||||
DeviceState fChangeStateTarget;
|
|
||||||
TransitionedCount fTransitionedCount;
|
|
||||||
|
|
||||||
std::unordered_map<typename SetPropertiesOp::Id, SetPropertiesOp> fSetPropertiesOps;
|
std::unordered_map<typename SetPropertiesOp::Id, SetPropertiesOp> fSetPropertiesOps;
|
||||||
std::unordered_map<typename GetPropertiesOp::Id, GetPropertiesOp> fGetPropertiesOps;
|
std::unordered_map<typename GetPropertiesOp::Id, GetPropertiesOp> fGetPropertiesOps;
|
||||||
std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
|
|
||||||
|
|
||||||
auto makeTopologyState() -> void
|
auto makeTopologyState() -> void
|
||||||
{
|
{
|
||||||
|
@ -1025,41 +1165,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto UpdateStateEntry(const DDSTask::Id taskId, const DeviceState lastState, const DeviceState currentState) -> void
|
|
||||||
{
|
|
||||||
try {
|
|
||||||
std::lock_guard<std::mutex> lk(fMtx);
|
|
||||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
|
||||||
task.initialized = true;
|
|
||||||
task.lastState = lastState;
|
|
||||||
task.state = currentState;
|
|
||||||
if (task.state == fChangeStateTarget) {
|
|
||||||
++fTransitionedCount;
|
|
||||||
}
|
|
||||||
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
|
||||||
TryChangeStateCompletion();
|
|
||||||
} catch (const std::exception& e) {
|
|
||||||
FAIR_LOG(error) << "Exception in UpdateStateEntry: " << e.what();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// precodition: fMtx is locked.
|
|
||||||
auto TryChangeStateCompletion() -> void
|
|
||||||
{
|
|
||||||
if (!fChangeStateOp.IsCompleted() && fTransitionedCount == fStateData.size()) {
|
|
||||||
fChangeStateOpTimer.cancel();
|
|
||||||
fChangeStateOp.Complete(fStateData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// precodition: fMtx is locked.
|
|
||||||
auto ResetTransitionedCount(DeviceState targetState) -> void
|
|
||||||
{
|
|
||||||
fTransitionedCount = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
|
|
||||||
return fStateData.at(s.second).state == targetState;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// precodition: fMtx is locked.
|
/// precodition: fMtx is locked.
|
||||||
auto GetCurrentStateUnsafe() const -> TopologyState
|
auto GetCurrentStateUnsafe() const -> TopologyState
|
||||||
{
|
{
|
||||||
|
|
|
@ -144,18 +144,21 @@ TEST_F(Topology, AsyncChangeStateConcurrent)
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
|
||||||
sdk::Topology topo(mDDSTopo, mDDSSession);
|
sdk::Topology topo(mDDSTopo, mDDSSession);
|
||||||
tools::SharedSemaphore blocker;
|
topo.AsyncChangeState(sdk::TopologyTransition::InitDevice, "main/Sampler.*",
|
||||||
topo.AsyncChangeState(sdk::TopologyTransition::InitDevice,
|
[](std::error_code ec, sdk::TopologyState) mutable {
|
||||||
[blocker](std::error_code ec, sdk::TopologyState) mutable {
|
LOG(info) << "ChangeState for Sampler: " << ec;
|
||||||
LOG(info) << "result for valid ChangeState: " << ec;
|
EXPECT_EQ(ec, std::error_code());
|
||||||
blocker.Signal();
|
|
||||||
});
|
});
|
||||||
topo.AsyncChangeState(sdk::TopologyTransition::Stop,
|
topo.AsyncChangeState(sdk::TopologyTransition::InitDevice, "main/SinkGroup/.*",
|
||||||
[](std::error_code ec, sdk::TopologyState) {
|
[](std::error_code ec, sdk::TopologyState) mutable {
|
||||||
LOG(ERROR) << "Expected error: " << ec;
|
LOG(info) << "ChangeState for Sinks: " << ec;
|
||||||
EXPECT_EQ(ec, MakeErrorCode(ErrorCode::OperationInProgress));
|
EXPECT_EQ(ec, std::error_code());
|
||||||
});
|
});
|
||||||
blocker.Wait();
|
|
||||||
|
topo.WaitForState(sdk::DeviceState::InitializingDevice);
|
||||||
|
auto const currentState = topo.GetCurrentState();
|
||||||
|
EXPECT_NO_THROW(sdk::AggregateState(currentState));
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(Topology, AsyncChangeStateTimeout)
|
TEST_F(Topology, AsyncChangeStateTimeout)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user