mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
SDK: wait for devices to be in running
This commit is contained in:
parent
5d535163f1
commit
ac8cd19915
|
@ -320,23 +320,23 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
|
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
|
||||||
} else if (cmd == "INIT DEVICE") {
|
} else if (cmd == "INIT DEVICE") {
|
||||||
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
|
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
|
||||||
fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId));
|
fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId));
|
||||||
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
|
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
|
||||||
ChangeDeviceState(DeviceStateTransition::CompleteInit);
|
ChangeDeviceState(DeviceStateTransition::CompleteInit);
|
||||||
} else {
|
} else {
|
||||||
fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId));
|
fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId));
|
||||||
}
|
}
|
||||||
} else if (fTransitions.find(cmd) != fTransitions.end()) {
|
} else if (fTransitions.find(cmd) != fTransitions.end()) {
|
||||||
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
|
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
|
||||||
fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId));
|
fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId));
|
||||||
} else {
|
} else {
|
||||||
fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId));
|
fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId));
|
||||||
}
|
}
|
||||||
} else if (cmd == "END") {
|
} else if (cmd == "END") {
|
||||||
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
|
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
|
||||||
fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId));
|
fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId));
|
||||||
} else {
|
} else {
|
||||||
fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId));
|
fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId));
|
||||||
}
|
}
|
||||||
if (ToStr(GetCurrentDeviceState()) == "EXITING") {
|
if (ToStr(GetCurrentDeviceState()) == "EXITING") {
|
||||||
unique_lock<mutex> lock(fStopMutex);
|
unique_lock<mutex> lock(fStopMutex);
|
||||||
|
|
|
@ -170,7 +170,7 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void
|
||||||
fImpl->fSession.sendRequest<dds::tools_api::SSubmitRequest>(submitRequest);
|
fImpl->fSession.sendRequest<dds::tools_api::SSubmitRequest>(submitRequest);
|
||||||
blocker.Wait();
|
blocker.Wait();
|
||||||
|
|
||||||
// Not perfect, but best we can do
|
// perfect
|
||||||
WaitForIdleAgents(agents);
|
WaitForIdleAgents(agents);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,6 +80,7 @@ class DDSSession
|
||||||
};
|
};
|
||||||
auto RequestCommanderInfo() -> CommanderInfo;
|
auto RequestCommanderInfo() -> CommanderInfo;
|
||||||
auto WaitForIdleAgents(Quantity) -> void;
|
auto WaitForIdleAgents(Quantity) -> void;
|
||||||
|
auto WaitForOnlyIdleAgents() -> void;
|
||||||
auto WaitForExecutingAgents(Quantity) -> void;
|
auto WaitForExecutingAgents(Quantity) -> void;
|
||||||
auto ActivateTopology(const Path& topoFile) -> void;
|
auto ActivateTopology(const Path& topoFile) -> void;
|
||||||
auto ActivateTopology(DDSTopology) -> void;
|
auto ActivateTopology(DDSTopology) -> void;
|
||||||
|
|
|
@ -94,6 +94,8 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
if (parts[2] != "OK") {
|
if (parts[2] != "OK") {
|
||||||
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
|
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
|
||||||
}
|
}
|
||||||
|
} else if (parts[1] == "could not queue") {
|
||||||
|
LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
fDDSSession.StartDDSService();
|
fDDSSession.StartDDSService();
|
||||||
|
|
|
@ -28,6 +28,14 @@ TEST_F(Topology, ChangeStateAsync)
|
||||||
|
|
||||||
Topology topo(mDDSTopo, mDDSSession);
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
fair::mq::tools::Semaphore blocker;
|
fair::mq::tools::Semaphore blocker;
|
||||||
|
topo.ChangeState(TopologyTransition::Run, [&blocker, &topo](Topology::ChangeStateResult result) {
|
||||||
|
LOG(info) << result;
|
||||||
|
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
|
EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
|
||||||
|
EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Running), true);
|
||||||
|
blocker.Signal();
|
||||||
|
});
|
||||||
|
blocker.Wait();
|
||||||
topo.ChangeState(TopologyTransition::Stop, [&blocker, &topo](Topology::ChangeStateResult result) {
|
topo.ChangeState(TopologyTransition::Stop, [&blocker, &topo](Topology::ChangeStateResult result) {
|
||||||
LOG(info) << result;
|
LOG(info) << result;
|
||||||
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
|
@ -44,6 +52,7 @@ TEST_F(Topology, ChangeStateSync)
|
||||||
using fair::mq::sdk::TopologyTransition;
|
using fair::mq::sdk::TopologyTransition;
|
||||||
|
|
||||||
Topology topo(mDDSTopo, mDDSSession);
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
|
EXPECT_EQ(topo.ChangeState(TopologyTransition::Run).rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
auto result(topo.ChangeState(TopologyTransition::Stop));
|
auto result(topo.ChangeState(TopologyTransition::Stop));
|
||||||
|
|
||||||
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
|
||||||
|
@ -58,7 +67,7 @@ TEST_F(Topology, ChangeStateConcurrent)
|
||||||
|
|
||||||
Topology topo(mDDSTopo, mDDSSession);
|
Topology topo(mDDSTopo, mDDSSession);
|
||||||
fair::mq::tools::Semaphore blocker;
|
fair::mq::tools::Semaphore blocker;
|
||||||
topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) {
|
topo.ChangeState(TopologyTransition::Run, [&blocker](Topology::ChangeStateResult result) {
|
||||||
LOG(info) << "result for valid ChangeState: " << result;
|
LOG(info) << "result for valid ChangeState: " << result;
|
||||||
blocker.Signal();
|
blocker.Signal();
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue
Block a user