mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
SDK: Track channel to task id association
This commit is contained in:
parent
008be36125
commit
fd282fa950
|
@ -20,7 +20,9 @@
|
||||||
|
|
||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
#include <mutex>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
#include <unordered_map>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
namespace fair {
|
namespace fair {
|
||||||
|
@ -134,6 +136,8 @@ struct DDSSession::Impl
|
||||||
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
||||||
Id fId;
|
Id fId;
|
||||||
bool fStopOnDestruction;
|
bool fStopOnDestruction;
|
||||||
|
mutable std::mutex fMtx;
|
||||||
|
std::unordered_map<DDSChannel::Id, DDSTask::Id> fTaskIdByChannelIdMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
DDSSession::DDSSession(DDSEnvironment env)
|
DDSSession::DDSSession(DDSEnvironment env)
|
||||||
|
@ -316,6 +320,18 @@ void DDSSession::UnsubscribeFromCommands()
|
||||||
|
|
||||||
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
||||||
|
|
||||||
|
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||||
|
fImpl->fTaskIdByChannelIdMap[channelId] = taskId;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||||
|
return fImpl->fTaskIdByChannelIdMap.at(channelId);
|
||||||
|
}
|
||||||
|
|
||||||
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
|
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
|
||||||
{
|
{
|
||||||
return os << "$DDS_SESSION_ID: " << session.GetId();
|
return os << "$DDS_SESSION_ID: " << session.GetId();
|
||||||
|
|
|
@ -43,6 +43,18 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&;
|
||||||
class DDSTopology;
|
class DDSTopology;
|
||||||
class DDSAgent;
|
class DDSAgent;
|
||||||
|
|
||||||
|
class DDSTask
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using Id = std::uint64_t;
|
||||||
|
};
|
||||||
|
|
||||||
|
class DDSChannel
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
using Id = std::uint64_t;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
|
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
|
||||||
* @brief Represents a DDS session
|
* @brief Represents a DDS session
|
||||||
|
@ -95,6 +107,8 @@ class DDSSession
|
||||||
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
|
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
|
||||||
void UnsubscribeFromCommands();
|
void UnsubscribeFromCommands();
|
||||||
void SendCommand(const std::string&);
|
void SendCommand(const std::string&);
|
||||||
|
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
|
||||||
|
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
|
||||||
|
|
||||||
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
|
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
// LOG(debug) << "Adding device " << d;
|
// LOG(debug) << "Adding device " << d;
|
||||||
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
||||||
}
|
}
|
||||||
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
|
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
||||||
// LOG(debug) << "Received from " << senderId << ": " << msg;
|
// LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
std::vector<std::string> parts;
|
std::vector<std::string> parts;
|
||||||
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
||||||
|
@ -85,7 +85,9 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (parts[0] == "state-change") {
|
if (parts[0] == "state-change") {
|
||||||
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
|
DDSTask::Id taskId(std::stoull(parts[2]));
|
||||||
|
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
|
||||||
|
AddNewStateEntry(taskId, parts[3]);
|
||||||
} else if (parts[0] == "state-changes-subscription") {
|
} else if (parts[0] == "state-changes-subscription") {
|
||||||
LOG(debug) << "Received from " << senderId << ": " << msg;
|
LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
if (parts[2] != "OK") {
|
if (parts[2] != "OK") {
|
||||||
|
|
|
@ -56,7 +56,7 @@ struct DeviceStatus
|
||||||
DeviceState state;
|
DeviceState state;
|
||||||
};
|
};
|
||||||
|
|
||||||
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>;
|
using TopologyState = std::unordered_map<DDSTask::Id, DeviceStatus>;
|
||||||
using TopologyTransition = fair::mq::Transition;
|
using TopologyTransition = fair::mq::Transition;
|
||||||
|
|
||||||
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
|
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
|
|
Loading…
Reference in New Issue
Block a user