mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-14 17:16:47 +00:00
SDK: refactor subscription to allow reuse
This commit is contained in:
parent
926ee743ed
commit
c3127f22e5
|
@ -183,6 +183,38 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||
throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")");
|
||||
}
|
||||
|
||||
SubscribeToCommands();
|
||||
|
||||
fDDSSession.StartDDSService();
|
||||
SubscribeToStateChanges();
|
||||
}
|
||||
|
||||
/// not copyable
|
||||
BasicTopology(const BasicTopology&) = delete;
|
||||
BasicTopology& operator=(const BasicTopology&) = delete;
|
||||
|
||||
/// movable
|
||||
BasicTopology(BasicTopology&&) = default;
|
||||
BasicTopology& operator=(BasicTopology&&) = default;
|
||||
|
||||
void SubscribeToStateChanges()
|
||||
{
|
||||
using namespace fair::mq::sdk::cmd;
|
||||
// FAIR_LOG(debug) << "Subscribing to state change";
|
||||
Cmds cmds(make<SubscribeToStateChange>());
|
||||
fDDSSession.SendCommand(cmds.Serialize());
|
||||
}
|
||||
|
||||
void UnsubscribeFromStateChanges()
|
||||
{
|
||||
using namespace fair::mq::sdk::cmd;
|
||||
fDDSSession.SendCommand(Cmds(make<UnsubscribeFromStateChange>()).Serialize());
|
||||
// give dds a chance to complete request, TODO: track each individual task and its subscription status
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
void SubscribeToCommands()
|
||||
{
|
||||
using namespace fair::mq::sdk::cmd;
|
||||
fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
||||
Cmds inCmds;
|
||||
|
@ -239,28 +271,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||
}
|
||||
}
|
||||
});
|
||||
|
||||
fDDSSession.StartDDSService();
|
||||
// FAIR_LOG(debug) << "Subscribing to state change";
|
||||
Cmds cmds(make<SubscribeToStateChange>());
|
||||
fDDSSession.SendCommand(cmds.Serialize());
|
||||
}
|
||||
|
||||
/// not copyable
|
||||
BasicTopology(const BasicTopology&) = delete;
|
||||
BasicTopology& operator=(const BasicTopology&) = delete;
|
||||
|
||||
/// movable
|
||||
BasicTopology(BasicTopology&&) = default;
|
||||
BasicTopology& operator=(BasicTopology&&) = default;
|
||||
|
||||
void UnsubscribeFromStateChanges()
|
||||
{
|
||||
using namespace fair::mq::sdk::cmd;
|
||||
|
||||
fDDSSession.SendCommand(Cmds(make<UnsubscribeFromStateChange>()).Serialize());
|
||||
// give dds a chance to complete request, TODO: track each individual task and its subscription status
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
~BasicTopology()
|
||||
|
|
Loading…
Reference in New Issue
Block a user