diff --git a/fairmq/sdk/Topology.h b/fairmq/sdk/Topology.h index fbaa451d..a9f70aaf 100644 --- a/fairmq/sdk/Topology.h +++ b/fairmq/sdk/Topology.h @@ -183,6 +183,38 @@ class BasicTopology : public AsioBase throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")"); } + SubscribeToCommands(); + + fDDSSession.StartDDSService(); + SubscribeToStateChanges(); + } + + /// not copyable + BasicTopology(const BasicTopology&) = delete; + BasicTopology& operator=(const BasicTopology&) = delete; + + /// movable + BasicTopology(BasicTopology&&) = default; + BasicTopology& operator=(BasicTopology&&) = default; + + void SubscribeToStateChanges() + { + using namespace fair::mq::sdk::cmd; + // FAIR_LOG(debug) << "Subscribing to state change"; + Cmds cmds(make()); + fDDSSession.SendCommand(cmds.Serialize()); + } + + void UnsubscribeFromStateChanges() + { + using namespace fair::mq::sdk::cmd; + fDDSSession.SendCommand(Cmds(make()).Serialize()); + // give dds a chance to complete request, TODO: track each individual task and its subscription status + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + void SubscribeToCommands() + { using namespace fair::mq::sdk::cmd; fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) { Cmds inCmds; @@ -239,28 +271,6 @@ class BasicTopology : public AsioBase } } }); - - fDDSSession.StartDDSService(); - // FAIR_LOG(debug) << "Subscribing to state change"; - Cmds cmds(make()); - fDDSSession.SendCommand(cmds.Serialize()); - } - - /// not copyable - BasicTopology(const BasicTopology&) = delete; - BasicTopology& operator=(const BasicTopology&) = delete; - - /// movable - BasicTopology(BasicTopology&&) = default; - BasicTopology& operator=(BasicTopology&&) = default; - - void UnsubscribeFromStateChanges() - { - using namespace fair::mq::sdk::cmd; - - fDDSSession.SendCommand(Cmds(make()).Serialize()); - // give dds a chance to complete request, TODO: track each individual task and its subscription status - std::this_thread::sleep_for(std::chrono::milliseconds(100)); } ~BasicTopology()