mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
First working version of SDK ChangeState
This commit is contained in:
parent
cf26dd6aa7
commit
8789664cfd
|
@ -36,6 +36,7 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
|
||||||
, fService()
|
, fService()
|
||||||
, fDDSCustomCmd(fService)
|
, fDDSCustomCmd(fService)
|
||||||
, fDDSKeyValue(fService)
|
, fDDSKeyValue(fService)
|
||||||
|
, fDDSTaskId(dds::env_prop<dds::task_id>())
|
||||||
, fBindingChans()
|
, fBindingChans()
|
||||||
, fConnectingChans()
|
, fConnectingChans()
|
||||||
, fStopMutex()
|
, fStopMutex()
|
||||||
|
@ -68,9 +69,13 @@ auto DDS::HandleControl() -> void
|
||||||
LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
|
LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
|
||||||
LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
|
LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
|
||||||
LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
|
LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
|
||||||
|
LOG(debug) << "$DDS_TASK_ID: " << getenv("DDS_TASK_ID");
|
||||||
|
LOG(debug) << "$DDS_LOCATION: " << getenv("DDS_LOCATION");
|
||||||
string dds_session_id(getenv("DDS_SESSION_ID"));
|
string dds_session_id(getenv("DDS_SESSION_ID"));
|
||||||
LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id;
|
LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id;
|
||||||
|
|
||||||
|
LOG(info) << "DDS Task Id (from API): " << fDDSTaskId;
|
||||||
|
|
||||||
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
|
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
|
||||||
SubscribeForCustomCommands();
|
SubscribeForCustomCommands();
|
||||||
|
|
||||||
|
@ -95,7 +100,7 @@ auto DDS::HandleControl() -> void
|
||||||
fCurrentState = newState;
|
fCurrentState = newState;
|
||||||
for (auto subscriberId : fStateChangeSubscribers) {
|
for (auto subscriberId : fStateChangeSubscribers) {
|
||||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
|
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
|
||||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId));
|
fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -309,7 +314,7 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
string id = GetProperty<string>("id");
|
string id = GetProperty<string>("id");
|
||||||
|
|
||||||
fDDSCustomCmd.subscribe([id, this](const string& cmd, const string& cond, uint64_t senderId) {
|
fDDSCustomCmd.subscribe([id, this](const string& cmd, const string& cond, uint64_t senderId) {
|
||||||
LOG(info) << "Received command: " << cmd;
|
LOG(info) << "Received command: '" << cmd << "' from " << senderId;
|
||||||
|
|
||||||
if (cmd == "check-state") {
|
if (cmd == "check-state") {
|
||||||
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
|
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
|
||||||
|
@ -366,7 +371,8 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||||
{
|
{
|
||||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
|
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
|
||||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId));
|
// fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId));
|
||||||
|
fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId));
|
||||||
}
|
}
|
||||||
} else if (cmd == "unsubscribe-from-state-changes") {
|
} else if (cmd == "unsubscribe-from-state-changes") {
|
||||||
{
|
{
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include <fairmq/StateQueue.h>
|
#include <fairmq/StateQueue.h>
|
||||||
|
|
||||||
#include <DDS/dds_intercom.h>
|
#include <DDS/dds_intercom.h>
|
||||||
|
#include <DDS/dds_env_prop.h>
|
||||||
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
@ -79,6 +80,7 @@ class DDS : public Plugin
|
||||||
dds::intercom_api::CIntercomService fService;
|
dds::intercom_api::CIntercomService fService;
|
||||||
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
||||||
dds::intercom_api::CKeyValue fDDSKeyValue;
|
dds::intercom_api::CKeyValue fDDSKeyValue;
|
||||||
|
uint64_t fDDSTaskId;
|
||||||
|
|
||||||
std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
|
std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
|
||||||
std::unordered_map<std::string, DDSConfig> fConnectingChans;
|
std::unordered_map<std::string, DDSConfig> fConnectingChans;
|
||||||
|
|
|
@ -229,9 +229,9 @@ void DDSSession::SubscribeToCommands(std::function<void(const std::string& msg,
|
||||||
{
|
{
|
||||||
fImpl->fSession.unsubscribe(); // TODO REMOVE THIS HACK!!!!
|
fImpl->fSession.unsubscribe(); // TODO REMOVE THIS HACK!!!!
|
||||||
fImpl->fDDSCustomCmd.subscribe(cb);
|
fImpl->fDDSCustomCmd.subscribe(cb);
|
||||||
fImpl->fDDSCustomCmd.subscribeOnReply([](const std::string& reply) {
|
// fImpl->fDDSCustomCmd.subscribeOnReply([](const std::string& reply) {
|
||||||
LOG(debug) << reply;
|
// LOG(debug) << reply;
|
||||||
});
|
// });
|
||||||
}
|
}
|
||||||
|
|
||||||
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
||||||
|
|
|
@ -75,7 +75,7 @@ std::vector<uint64_t> DDSTopology::GetDeviceList()
|
||||||
});
|
});
|
||||||
|
|
||||||
for (auto& it = taskIt.first; it != taskIt.second; ++it) {
|
for (auto& it = taskIt.first; it != taskIt.second; ++it) {
|
||||||
LOG(debug) << "Found task " << it->first << " : " << it->second.m_task->getPath();
|
LOG(debug) << "Found task " << it->first << " : " << "Path: " << it->second.m_task->getPath() << "Name: " << it->second.m_task->getName();
|
||||||
taskIDs.push_back(it->first);
|
taskIDs.push_back(it->first);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,12 +69,18 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||||
fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
||||||
}
|
}
|
||||||
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& condition, uint64_t senderId) {
|
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& condition, uint64_t senderId) {
|
||||||
LOG(info) << "Received from " << senderId << ": " << msg;
|
LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||||
std::vector<std::string> parts;
|
std::vector<std::string> parts;
|
||||||
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
||||||
|
|
||||||
|
for (unsigned int i = 0; i < parts.size(); ++i) {
|
||||||
|
boost::trim(parts.at(i));
|
||||||
|
LOG(info) << "parts[" << i << "]: " << parts.at(i);
|
||||||
|
}
|
||||||
|
|
||||||
if (parts[0] == "state-change") {
|
if (parts[0] == "state-change") {
|
||||||
boost::trim(parts[2]);
|
boost::trim(parts[3]);
|
||||||
AddNewStateEntry(senderId, parts[2]);
|
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
|
||||||
} else if (parts[0] == "state-changes-subscription") {
|
} else if (parts[0] == "state-changes-subscription") {
|
||||||
if (parts[2] != "OK") {
|
if (parts[2] != "OK") {
|
||||||
LOG(error) << "state-changes-subscription failed with return code: " << parts[2];
|
LOG(error) << "state-changes-subscription failed with return code: " << parts[2];
|
||||||
|
@ -99,6 +105,7 @@ auto Topology::ChangeState(fair::mq::Transition transition, ChangeStateCallback
|
||||||
LOG(error) << "State change already in progress, concurrent requested not yet supported";
|
LOG(error) << "State change already in progress, concurrent requested not yet supported";
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
LOG(info) << "Initiating ChangeState with " << transition << " to " << fkExpectedState.at(transition);
|
||||||
fStateChangeOngoing = true;
|
fStateChangeOngoing = true;
|
||||||
fChangeStateCallback = cb;
|
fChangeStateCallback = cb;
|
||||||
fStateChangeTimeout = timeout;
|
fStateChangeTimeout = timeout;
|
||||||
|
@ -114,43 +121,61 @@ void Topology::WaitForState()
|
||||||
{
|
{
|
||||||
while (!fShutdown) {
|
while (!fShutdown) {
|
||||||
if (fStateChangeOngoing) {
|
if (fStateChangeOngoing) {
|
||||||
auto condition = [&] { return fShutdown || std::all_of(fTopologyState.cbegin(),
|
auto condition = [&] {
|
||||||
fTopologyState.cend(),
|
LOG(info) << "checking condition";
|
||||||
[&](TopologyState::value_type i) {
|
LOG(info) << "fShutdown: " << fShutdown;
|
||||||
return i.second.state == fTargetState;
|
LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
||||||
});
|
return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) {
|
||||||
|
return i.second.state == fTargetState;
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lock(fMtx);
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
|
||||||
|
// TODO Fix the timeout version
|
||||||
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
|
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
|
||||||
|
LOG(debug) << "initiating wait with timeout";
|
||||||
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
|
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
|
||||||
LOG(debug) << "timeout";
|
LOG(debug) << "timeout";
|
||||||
// TODO: catch this from another thread...
|
fStateChangeOngoing = false;
|
||||||
throw std::runtime_error("timeout");
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
LOG(debug) << "initiating wait without timeout";
|
||||||
fCV.wait(lock, condition);
|
fCV.wait(lock, condition);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fStateChangeOngoing = false;
|
||||||
if (fShutdown) {
|
if (fShutdown) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
fStateChangeOngoing = false;
|
|
||||||
fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState});
|
fChangeStateCallback(ChangeStateResult{AsyncOpResult::Ok, fTopologyState});
|
||||||
} else {
|
} else {
|
||||||
std::unique_lock<std::mutex> lock(fExecutionMtx);
|
std::unique_lock<std::mutex> lock(fExecutionMtx);
|
||||||
fExecutionCV.wait(lock);
|
fExecutionCV.wait(lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG(debug) << "WaitForState shutting down";
|
||||||
};
|
};
|
||||||
|
|
||||||
void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state)
|
void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state)
|
||||||
{
|
{
|
||||||
|
std::size_t pos = state.find("->");
|
||||||
|
std::string endState = state.substr(pos + 2);
|
||||||
|
LOG(info) << "Adding new state entry: " << senderId << ", " << state << ", end state: " << endState;
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(fMtx);
|
try {
|
||||||
fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(state) };
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) };
|
||||||
|
} catch(const std::exception& e) {
|
||||||
|
LOG(error) << "Exception in AddNewStateEntry: " << e.what();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(info) << "fTopologyState after update: ";
|
||||||
|
for (auto& e : fTopologyState) {
|
||||||
|
LOG(info) << e.first << ": " << e.second.state;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
fCV.notify_one();
|
fCV.notify_one();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user