From ac8cd19915370e37c1ae5b7bb578f9edfa94adfd Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 24 Jul 2019 16:41:58 +0200 Subject: [PATCH] SDK: wait for devices to be in running --- fairmq/plugins/DDS/DDS.cxx | 12 ++++++------ fairmq/sdk/DDSSession.cxx | 2 +- fairmq/sdk/DDSSession.h | 1 + fairmq/sdk/Topology.cxx | 2 ++ test/sdk/_topology.cxx | 11 ++++++++++- 5 files changed, 20 insertions(+), 8 deletions(-) diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index c4f8cfa5..12a40367 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -320,23 +320,23 @@ auto DDS::SubscribeForCustomCommands() -> void fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); } else if (cmd == "INIT DEVICE") { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); + fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId)); while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {} ChangeDeviceState(DeviceStateTransition::CompleteInit); } else { - fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId)); + fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId)); } } else if (fTransitions.find(cmd) != fTransitions.end()) { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); + fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId)); } else { - fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId)); + fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId)); } } else if (cmd == "END") { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { - fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); + fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId)); } else { - fDDSCustomCmd.send(id + ": could not queue " + cmd + " transition", to_string(senderId)); + fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId)); } if (ToStr(GetCurrentDeviceState()) == "EXITING") { unique_lock lock(fStopMutex); diff --git a/fairmq/sdk/DDSSession.cxx b/fairmq/sdk/DDSSession.cxx index 5b0df0f7..c3adf113 100644 --- a/fairmq/sdk/DDSSession.cxx +++ b/fairmq/sdk/DDSSession.cxx @@ -170,7 +170,7 @@ auto DDSSession::SubmitAgents(Quantity agents) -> void fImpl->fSession.sendRequest(submitRequest); blocker.Wait(); - // Not perfect, but best we can do + // perfect WaitForIdleAgents(agents); } diff --git a/fairmq/sdk/DDSSession.h b/fairmq/sdk/DDSSession.h index a77ed195..93156401 100644 --- a/fairmq/sdk/DDSSession.h +++ b/fairmq/sdk/DDSSession.h @@ -80,6 +80,7 @@ class DDSSession }; auto RequestCommanderInfo() -> CommanderInfo; auto WaitForIdleAgents(Quantity) -> void; + auto WaitForOnlyIdleAgents() -> void; auto WaitForExecutingAgents(Quantity) -> void; auto ActivateTopology(const Path& topoFile) -> void; auto ActivateTopology(DDSTopology) -> void; diff --git a/fairmq/sdk/Topology.cxx b/fairmq/sdk/Topology.cxx index 58a70400..3516d1ce 100644 --- a/fairmq/sdk/Topology.cxx +++ b/fairmq/sdk/Topology.cxx @@ -94,6 +94,8 @@ Topology::Topology(DDSTopology topo, DDSSession session) if (parts[2] != "OK") { LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2]; } + } else if (parts[1] == "could not queue") { + LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId; } }); fDDSSession.StartDDSService(); diff --git a/test/sdk/_topology.cxx b/test/sdk/_topology.cxx index a9542d08..b322c7e2 100644 --- a/test/sdk/_topology.cxx +++ b/test/sdk/_topology.cxx @@ -28,6 +28,14 @@ TEST_F(Topology, ChangeStateAsync) Topology topo(mDDSTopo, mDDSSession); fair::mq::tools::Semaphore blocker; + topo.ChangeState(TopologyTransition::Run, [&blocker, &topo](Topology::ChangeStateResult result) { + LOG(info) << result; + EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); + EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state)); + EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Running), true); + blocker.Signal(); + }); + blocker.Wait(); topo.ChangeState(TopologyTransition::Stop, [&blocker, &topo](Topology::ChangeStateResult result) { LOG(info) << result; EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); @@ -44,6 +52,7 @@ TEST_F(Topology, ChangeStateSync) using fair::mq::sdk::TopologyTransition; Topology topo(mDDSTopo, mDDSSession); + EXPECT_EQ(topo.ChangeState(TopologyTransition::Run).rc, fair::mq::AsyncOpResultCode::Ok); auto result(topo.ChangeState(TopologyTransition::Stop)); EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); @@ -58,7 +67,7 @@ TEST_F(Topology, ChangeStateConcurrent) Topology topo(mDDSTopo, mDDSSession); fair::mq::tools::Semaphore blocker; - topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) { + topo.ChangeState(TopologyTransition::Run, [&blocker](Topology::ChangeStateResult result) { LOG(info) << "result for valid ChangeState: " << result; blocker.Signal(); });