mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
FairMQ: Implement PluginServices - Config
This commit is contained in:
parent
739460b2fe
commit
ad0f050c99
|
@ -94,6 +94,7 @@ set(FAIRMQ_HEADER_FILES
|
|||
tools/CppSTL.h
|
||||
tools/Network.h
|
||||
tools/runSimpleMQStateMachine.h
|
||||
tools/Strings.h
|
||||
zeromq/FairMQMessageZMQ.h
|
||||
zeromq/FairMQPollerZMQ.h
|
||||
zeromq/FairMQSocketZMQ.h
|
||||
|
|
|
@ -226,13 +226,13 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(std::string event, int durationI
|
|||
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
|
||||
}
|
||||
|
||||
void FairMQStateMachine::OnStateChange(const std::string& key, std::function<void(const State)> callback)
|
||||
void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback)
|
||||
{
|
||||
fStateChangeCallbacksMap.insert({key, fStateChangeCallback.connect(callback)});
|
||||
fStateChangeCallbacks.insert({key, fStateChangeCallback.connect(callback)});
|
||||
}
|
||||
|
||||
void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key)
|
||||
{
|
||||
fStateChangeCallbacksMap.at(key).disconnect();
|
||||
//fStateChangeCallbacksMap.erase(key);
|
||||
fStateChangeCallbacks.at(key).disconnect();
|
||||
fStateChangeCallbacks.erase(key);
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
|
|||
, fState()
|
||||
, fChangeStateMutex()
|
||||
, fStateChangeCallback()
|
||||
, fStateChangeCallbacksMap()
|
||||
, fStateChangeCallbacks()
|
||||
{}
|
||||
|
||||
// Destructor
|
||||
|
@ -134,7 +134,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
|
|||
{
|
||||
LOG(STATE) << "Entering IDLE state";
|
||||
fsm.fState = IDLE;
|
||||
if (!fsm.fStateChangeCallback.empty())
|
||||
if (!fsm.fStateChangeCallbacks.empty())
|
||||
{
|
||||
fsm.fStateChangeCallback(IDLE);
|
||||
}
|
||||
|
@ -519,39 +519,10 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
|
|||
}
|
||||
}
|
||||
|
||||
std::string GetCurrentStateName() const
|
||||
{
|
||||
return GetStateName(fState);
|
||||
}
|
||||
|
||||
int GetCurrentState() const
|
||||
{
|
||||
return fState;
|
||||
}
|
||||
|
||||
bool CheckCurrentState(int state) const
|
||||
{
|
||||
if (state == fState)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool CheckCurrentState(std::string state) const
|
||||
{
|
||||
if (state == GetCurrentStateName())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
std::string GetCurrentStateName() const { return GetStateName(fState); }
|
||||
int GetCurrentState() const { return fState; }
|
||||
bool CheckCurrentState(int state) const { return state == fState; }
|
||||
bool CheckCurrentState(std::string state) const { return state == GetCurrentStateName(); }
|
||||
|
||||
// this is to run certain functions in a separate thread
|
||||
std::thread fWorkerThread;
|
||||
|
@ -570,7 +541,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
|
|||
std::mutex fChangeStateMutex;
|
||||
|
||||
boost::signals2::signal<void(const State)> fStateChangeCallback;
|
||||
std::unordered_map<std::string, boost::signals2::connection> fStateChangeCallbacksMap;
|
||||
std::unordered_map<std::string, boost::signals2::connection> fStateChangeCallbacks;
|
||||
};
|
||||
|
||||
// reactivate the warning for non-virtual destructor
|
||||
|
@ -619,8 +590,8 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM
|
|||
bool WaitForEndOfStateForMs(int state, int durationInMs);
|
||||
bool WaitForEndOfStateForMs(std::string state, int durationInMs);
|
||||
|
||||
void OnStateChange(const std::string&, std::function<void(const State)> callback);
|
||||
void UnsubscribeFromStateChange(const std::string&);
|
||||
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback);
|
||||
void UnsubscribeFromStateChange(const std::string& key);
|
||||
};
|
||||
|
||||
#endif /* FAIRMQSTATEMACHINE_H_ */
|
||||
|
|
|
@ -36,6 +36,8 @@ class Plugin
|
|||
{
|
||||
public:
|
||||
|
||||
using ProgOptions = boost::optional<boost::program_options::options_description>;
|
||||
|
||||
struct Version
|
||||
{
|
||||
const int fkMajor, fkMinor, fkPatch;
|
||||
|
@ -67,9 +69,10 @@ class Plugin
|
|||
<< "maintainer '" << p.GetMaintainer() << "', "
|
||||
<< "homepage '" << p.GetHomepage() << "'";
|
||||
}
|
||||
static auto NoProgramOptions() -> const boost::optional<boost::program_options::options_description> { return boost::none; }
|
||||
static auto NoProgramOptions() -> ProgOptions { return boost::none; }
|
||||
|
||||
// device control API
|
||||
// see <fairmq/PluginServices.h> for docs
|
||||
using DeviceState = fair::mq::PluginServices::DeviceState;
|
||||
using DeviceStateTransition = fair::mq::PluginServices::DeviceStateTransition;
|
||||
auto ToDeviceState(const std::string& state) const -> fair::mq::PluginServices::DeviceState { return fPluginServices.ToDeviceState(state); }
|
||||
|
@ -79,6 +82,18 @@ class Plugin
|
|||
auto SubscribeToDeviceStateChange(std::function<void(fair::mq::PluginServices::DeviceState)> callback) -> void { fPluginServices.SubscribeToDeviceStateChange(fkName, callback); }
|
||||
auto UnsubscribeFromDeviceStateChange() -> void { fPluginServices.UnsubscribeFromDeviceStateChange(fkName); }
|
||||
|
||||
// device config API
|
||||
// see <fairmq/PluginServices.h> for docs
|
||||
template<typename T>
|
||||
auto SetProperty(const std::string& key, T val) -> void { fPluginServices.SetProperty(key, val); }
|
||||
template<typename T>
|
||||
auto GetProperty(const std::string& key) const -> T { return fPluginServices.GetProperty<T>(key); }
|
||||
auto GetPropertyAsString(const std::string& key) const -> std::string { return fPluginServices.GetPropertyAsString(key); }
|
||||
template<typename T>
|
||||
auto SubscribeToPropertyChange(std::function<void(const std::string& /*key*/, const T /*newValue*/)> callback) const -> void { fPluginServices.SubscribeToPropertyChange(fkName, callback); }
|
||||
template<typename T>
|
||||
auto UnsubscribeFromPropertyChange() -> void { fPluginServices.UnsubscribeFromPropertyChange<T>(fkName); }
|
||||
|
||||
private:
|
||||
|
||||
const std::string fkName;
|
||||
|
|
|
@ -57,7 +57,7 @@ auto fair::mq::PluginManager::PrependSearchPath(const fs::path& path) -> void
|
|||
fSearchPaths.insert(begin(fSearchPaths), path);
|
||||
}
|
||||
|
||||
auto fair::mq::PluginManager::ProgramOptions() -> const po::options_description
|
||||
auto fair::mq::PluginManager::ProgramOptions() -> po::options_description
|
||||
{
|
||||
auto plugin_options = po::options_description{"Plugin Manager"};
|
||||
plugin_options.add_options()
|
||||
|
|
|
@ -49,7 +49,6 @@ class PluginManager
|
|||
public:
|
||||
|
||||
using PluginFactory = std::shared_ptr<fair::mq::Plugin>(PluginServices&);
|
||||
using PluginProgOptions = const boost::optional<boost::program_options::options_description>();
|
||||
|
||||
PluginManager();
|
||||
|
||||
|
@ -65,14 +64,14 @@ class PluginManager
|
|||
auto InstantiatePlugins() -> void;
|
||||
struct PluginInstantiationError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
static auto ProgramOptions() -> const boost::program_options::options_description;
|
||||
static auto ProgramOptions() -> boost::program_options::options_description;
|
||||
static auto MakeFromCommandLineOptions(const std::vector<std::string>) -> std::shared_ptr<PluginManager>;
|
||||
struct ProgramOptionsParseError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
static auto LibPrefix() -> const std::string& { return fgkLibPrefix; }
|
||||
|
||||
auto ForEachPlugin(std::function<void (Plugin&)> func) -> void { for(const auto& p : fPluginOrder) { func(*fPlugins[p]); } }
|
||||
auto ForEachPluginProgOptions(std::function<void (const boost::program_options::options_description&)> func) const -> void { for(const auto& pair : fPluginProgOptions) { func(pair.second); } }
|
||||
auto ForEachPluginProgOptions(std::function<void (boost::program_options::options_description)> func) const -> void { for(const auto& pair : fPluginProgOptions) { func(pair.second); } }
|
||||
|
||||
template<typename... Args>
|
||||
auto EmplacePluginServices(Args&&... args) -> void { fPluginServices = fair::mq::tools::make_unique<PluginServices>(std::forward<Args>(args)...); };
|
||||
|
@ -100,7 +99,7 @@ class PluginManager
|
|||
{
|
||||
fPluginProgOptions.insert({
|
||||
pluginName,
|
||||
lib.get_alias<PluginProgOptions>(ToString("get_", pluginName, "_plugin_progoptions"))().value()
|
||||
lib.get_alias<Plugin::ProgOptions()>(ToString("get_", pluginName, "_plugin_progoptions"))().value()
|
||||
});
|
||||
}
|
||||
catch (const boost::bad_optional_access& e) { /* just ignore, if no prog options are declared */ }
|
||||
|
|
|
@ -8,4 +8,80 @@
|
|||
|
||||
#include <fairmq/PluginServices.h>
|
||||
|
||||
using namespace std;
|
||||
using fair::mq::PluginServices;
|
||||
|
||||
const std::unordered_map<std::string, PluginServices::DeviceState> PluginServices::fkDeviceStateStrMap = {
|
||||
{"OK", DeviceState::Ok},
|
||||
{"ERROR", DeviceState::Error},
|
||||
{"IDLE", DeviceState::Idle},
|
||||
{"INITIALIZING DEVICE", DeviceState::InitializingDevice},
|
||||
{"DEVICE READY", DeviceState::DeviceReady},
|
||||
{"INITIALIZING TASK", DeviceState::InitializingTask},
|
||||
{"READY", DeviceState::Ready},
|
||||
{"RUNNING", DeviceState::Running},
|
||||
{"PAUSED", DeviceState::Paused},
|
||||
{"RESETTING TASK", DeviceState::ResettingTask},
|
||||
{"RESETTING DEVICE", DeviceState::ResettingDevice},
|
||||
{"EXITING", DeviceState::Exiting}
|
||||
};
|
||||
const std::unordered_map<PluginServices::DeviceState, std::string> PluginServices::fkStrDeviceStateMap = {
|
||||
{DeviceState::Ok, "OK"},
|
||||
{DeviceState::Error, "ERROR"},
|
||||
{DeviceState::Idle, "IDLE"},
|
||||
{DeviceState::InitializingDevice, "INITIALIZING DEVICE"},
|
||||
{DeviceState::DeviceReady, "DEVICE READY"},
|
||||
{DeviceState::InitializingTask, "INITIALIZING TASK"},
|
||||
{DeviceState::Ready, "READY"},
|
||||
{DeviceState::Running, "RUNNING"},
|
||||
{DeviceState::Paused, "PAUSED"},
|
||||
{DeviceState::ResettingTask, "RESETTING TASK"},
|
||||
{DeviceState::ResettingDevice, "RESETTING DEVICE"},
|
||||
{DeviceState::Exiting, "EXITING"}
|
||||
};
|
||||
const std::unordered_map<std::string, PluginServices::DeviceStateTransition> PluginServices::fkDeviceStateTransitionStrMap = {
|
||||
{"INIT DEVICE", DeviceStateTransition::InitDevice},
|
||||
{"INIT TASK", DeviceStateTransition::InitTask},
|
||||
{"RUN", DeviceStateTransition::Run},
|
||||
{"PAUSE", DeviceStateTransition::Pause},
|
||||
{"STOP", DeviceStateTransition::Stop},
|
||||
{"RESET TASK", DeviceStateTransition::ResetTask},
|
||||
{"RESET DEVICE", DeviceStateTransition::ResetDevice},
|
||||
{"END", DeviceStateTransition::End},
|
||||
{"ERROR FOUND", DeviceStateTransition::ErrorFound},
|
||||
};
|
||||
const std::unordered_map<PluginServices::DeviceStateTransition, std::string> PluginServices::fkStrDeviceStateTransitionMap = {
|
||||
{DeviceStateTransition::InitDevice, "INIT DEVICE"},
|
||||
{DeviceStateTransition::InitTask, "INIT TASK"},
|
||||
{DeviceStateTransition::Run, "RUN"},
|
||||
{DeviceStateTransition::Pause, "PAUSE"},
|
||||
{DeviceStateTransition::Stop, "STOP"},
|
||||
{DeviceStateTransition::ResetTask, "RESET TASK"},
|
||||
{DeviceStateTransition::ResetDevice, "RESET DEVICE"},
|
||||
{DeviceStateTransition::End, "END"},
|
||||
{DeviceStateTransition::ErrorFound, "ERROR FOUND"},
|
||||
};
|
||||
const std::unordered_map<FairMQDevice::State, PluginServices::DeviceState> PluginServices::fkDeviceStateMap = {
|
||||
{FairMQDevice::OK, DeviceState::Ok},
|
||||
{FairMQDevice::ERROR, DeviceState::Error},
|
||||
{FairMQDevice::IDLE, DeviceState::Idle},
|
||||
{FairMQDevice::INITIALIZING_DEVICE, DeviceState::InitializingDevice},
|
||||
{FairMQDevice::DEVICE_READY, DeviceState::DeviceReady},
|
||||
{FairMQDevice::INITIALIZING_TASK, DeviceState::InitializingTask},
|
||||
{FairMQDevice::READY, DeviceState::Ready},
|
||||
{FairMQDevice::RUNNING, DeviceState::Running},
|
||||
{FairMQDevice::PAUSED, DeviceState::Paused},
|
||||
{FairMQDevice::RESETTING_TASK, DeviceState::ResettingTask},
|
||||
{FairMQDevice::RESETTING_DEVICE, DeviceState::ResettingDevice},
|
||||
{FairMQDevice::EXITING, DeviceState::Exiting}
|
||||
};
|
||||
const std::unordered_map<PluginServices::DeviceStateTransition, FairMQDevice::Event> PluginServices::fkDeviceStateTransitionMap = {
|
||||
{DeviceStateTransition::InitDevice, FairMQDevice::INIT_DEVICE},
|
||||
{DeviceStateTransition::InitTask, FairMQDevice::INIT_TASK},
|
||||
{DeviceStateTransition::Run, FairMQDevice::RUN},
|
||||
{DeviceStateTransition::Pause, FairMQDevice::PAUSE},
|
||||
{DeviceStateTransition::Stop, FairMQDevice::STOP},
|
||||
{DeviceStateTransition::ResetTask, FairMQDevice::RESET_TASK},
|
||||
{DeviceStateTransition::ResetDevice, FairMQDevice::RESET_DEVICE},
|
||||
{DeviceStateTransition::End, FairMQDevice::END},
|
||||
{DeviceStateTransition::ErrorFound, FairMQDevice::ERROR_FOUND}
|
||||
};
|
||||
|
|
|
@ -9,9 +9,9 @@
|
|||
#ifndef FAIR_MQ_PLUGINSERVICES_H
|
||||
#define FAIR_MQ_PLUGINSERVICES_H
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
#include <FairMQDevice.h>
|
||||
#include <options/FairMQProgOptions.h>
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
@ -65,144 +65,141 @@ class PluginServices
|
|||
PluginServices(FairMQProgOptions& config, FairMQDevice& device)
|
||||
: fDevice{device}
|
||||
, fConfig{config}
|
||||
, fConfigEnabled{false}
|
||||
, fkDeviceStateStrMap{
|
||||
{"Ok", DeviceState::Ok},
|
||||
{"Error", DeviceState::Error},
|
||||
{"Idle", DeviceState::Idle},
|
||||
{"InitializingDevice", DeviceState::InitializingDevice},
|
||||
{"DeviceReady", DeviceState::DeviceReady},
|
||||
{"InitializingTask", DeviceState::InitializingTask},
|
||||
{"Ready", DeviceState::Ready},
|
||||
{"Running", DeviceState::Running},
|
||||
{"Paused", DeviceState::Paused},
|
||||
{"ResettingTask", DeviceState::ResettingTask},
|
||||
{"ResettingDevice", DeviceState::ResettingDevice},
|
||||
{"Exiting", DeviceState::Exiting}
|
||||
}
|
||||
, fkStrDeviceStateMap{
|
||||
{DeviceState::Ok, "Ok"},
|
||||
{DeviceState::Error, "Error"},
|
||||
{DeviceState::Idle, "Idle"},
|
||||
{DeviceState::InitializingDevice, "InitializingDevice"},
|
||||
{DeviceState::DeviceReady, "DeviceReady"},
|
||||
{DeviceState::InitializingTask, "InitializingTask"},
|
||||
{DeviceState::Ready, "Ready"},
|
||||
{DeviceState::Running, "Running"},
|
||||
{DeviceState::Paused, "Paused"},
|
||||
{DeviceState::ResettingTask, "ResettingTask"},
|
||||
{DeviceState::ResettingDevice, "ResettingDevice"},
|
||||
{DeviceState::Exiting, "Exiting"}
|
||||
}
|
||||
, fkDeviceStateMap{
|
||||
{FairMQDevice::OK, DeviceState::Ok},
|
||||
{FairMQDevice::ERROR, DeviceState::Error},
|
||||
{FairMQDevice::IDLE, DeviceState::Idle},
|
||||
{FairMQDevice::INITIALIZING_DEVICE, DeviceState::InitializingDevice},
|
||||
{FairMQDevice::DEVICE_READY, DeviceState::DeviceReady},
|
||||
{FairMQDevice::INITIALIZING_TASK, DeviceState::InitializingTask},
|
||||
{FairMQDevice::READY, DeviceState::Ready},
|
||||
{FairMQDevice::RUNNING, DeviceState::Running},
|
||||
{FairMQDevice::PAUSED, DeviceState::Paused},
|
||||
{FairMQDevice::RESETTING_TASK, DeviceState::ResettingTask},
|
||||
{FairMQDevice::RESETTING_DEVICE, DeviceState::ResettingDevice},
|
||||
{FairMQDevice::EXITING, DeviceState::Exiting}
|
||||
}
|
||||
, fkDeviceStateTransitionMap{
|
||||
{DeviceStateTransition::InitDevice, FairMQDevice::INIT_DEVICE},
|
||||
{DeviceStateTransition::InitTask, FairMQDevice::INIT_TASK},
|
||||
{DeviceStateTransition::Run, FairMQDevice::RUN},
|
||||
{DeviceStateTransition::Pause, FairMQDevice::PAUSE},
|
||||
{DeviceStateTransition::Stop, FairMQDevice::STOP},
|
||||
{DeviceStateTransition::ResetTask, FairMQDevice::RESET_TASK},
|
||||
{DeviceStateTransition::ResetDevice, FairMQDevice::RESET_DEVICE},
|
||||
{DeviceStateTransition::End, FairMQDevice::END},
|
||||
{DeviceStateTransition::ErrorFound, FairMQDevice::ERROR_FOUND}
|
||||
}
|
||||
{
|
||||
}
|
||||
|
||||
// Control
|
||||
// Control API
|
||||
|
||||
/// @brief Convert string to DeviceState
|
||||
/// @param state to convert
|
||||
/// @return DeviceState enum entry
|
||||
/// @throw std::out_of_range if a string cannot be resolved to a DeviceState
|
||||
auto ToDeviceState(const std::string& state) const -> DeviceState
|
||||
{
|
||||
return fkDeviceStateStrMap.at(state);
|
||||
}
|
||||
static auto ToDeviceState(const std::string& state) -> DeviceState { return fkDeviceStateStrMap.at(state); }
|
||||
|
||||
/// @brief Convert string to DeviceStateTransition
|
||||
/// @param transition to convert
|
||||
/// @return DeviceStateTransition enum entry
|
||||
/// @throw std::out_of_range if a string cannot be resolved to a DeviceStateTransition
|
||||
static auto ToDeviceStateTransition(const std::string& transition) -> DeviceStateTransition { return fkDeviceStateTransitionStrMap.at(transition); }
|
||||
|
||||
/// @brief Convert DeviceState to string
|
||||
/// @param string to convert
|
||||
/// @param state to convert
|
||||
/// @return string representation of DeviceState enum entry
|
||||
auto ToStr(DeviceState state) const -> std::string
|
||||
{
|
||||
return fkStrDeviceStateMap.at(state);
|
||||
}
|
||||
static auto ToStr(DeviceState state) -> std::string { return fkStrDeviceStateMap.at(state); }
|
||||
friend auto operator<<(std::ostream& os, const DeviceState& state) -> std::ostream& { return os << ToStr(state); }
|
||||
|
||||
/// @brief Convert DeviceStateTransition to string
|
||||
/// @param transition to convert
|
||||
/// @return string representation of DeviceStateTransition enum entry
|
||||
static auto ToStr(DeviceStateTransition transition) -> std::string { return fkStrDeviceStateTransitionMap.at(transition); }
|
||||
friend auto operator<<(std::ostream& os, const DeviceStateTransition& transition) -> std::ostream& { return os << ToStr(transition); }
|
||||
|
||||
/// @return current device state
|
||||
auto GetCurrentDeviceState() const -> DeviceState
|
||||
{
|
||||
return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice.GetCurrentState()));
|
||||
}
|
||||
auto GetCurrentDeviceState() const -> DeviceState { return fkDeviceStateMap.at(static_cast<FairMQDevice::State>(fDevice.GetCurrentState())); }
|
||||
|
||||
/// @brief Trigger a device state transition
|
||||
/// @brief Request a device state transition
|
||||
/// @param next state transition
|
||||
///
|
||||
/// The state transition may not happen immediately, but when the current state evaluates the
|
||||
/// pending transition event and terminates. In other words, the device states are scheduled cooperatively.
|
||||
auto ChangeDeviceState(const DeviceStateTransition next) -> void
|
||||
{
|
||||
fDevice.ChangeState(fkDeviceStateTransitionMap.at(next));
|
||||
}
|
||||
auto ChangeDeviceState(const DeviceStateTransition next) -> void { fDevice.ChangeState(fkDeviceStateTransitionMap.at(next)); }
|
||||
|
||||
/// @brief Subscribe with a callback to device state changes
|
||||
/// @param InputMsgCallback
|
||||
/// @param subscriber id
|
||||
/// @param callback
|
||||
///
|
||||
/// The callback will be called at the beginning of a new state. The callback is called from the thread
|
||||
/// the state is running in.
|
||||
auto SubscribeToDeviceStateChange(const std::string& key, std::function<void(DeviceState /*newState*/)> callback) -> void
|
||||
auto SubscribeToDeviceStateChange(const std::string& subscriber, std::function<void(DeviceState /*newState*/)> callback) -> void
|
||||
{
|
||||
fDevice.OnStateChange(key, [&,callback](FairMQDevice::State newState){
|
||||
fDevice.SubscribeToStateChange(subscriber, [&,callback](FairMQDevice::State newState){
|
||||
callback(fkDeviceStateMap.at(newState));
|
||||
});
|
||||
}
|
||||
|
||||
auto UnsubscribeFromDeviceStateChange(const std::string& key) -> void
|
||||
/// @brief Unsubscribe from device state changes
|
||||
/// @param subscriber id
|
||||
auto UnsubscribeFromDeviceStateChange(const std::string& subscriber) -> void { fDevice.UnsubscribeFromStateChange(subscriber); }
|
||||
|
||||
|
||||
// Config API
|
||||
|
||||
/// @brief Set config property
|
||||
/// @param key
|
||||
/// @param val
|
||||
/// @throws fair::mq::PluginServices::InvalidStateError if method is called in unsupported device states
|
||||
///
|
||||
/// Setting a config property will store the value in the FairMQ internal config store and notify any subscribers about the update.
|
||||
/// It is property dependent, if the call to this method will have an immediate, delayed or any effect at all.
|
||||
template<typename T>
|
||||
auto SetProperty(const std::string& key, T val) -> void
|
||||
{
|
||||
fDevice.UnsubscribeFromStateChange(key);
|
||||
auto currentState = GetCurrentDeviceState();
|
||||
if (currentState == DeviceState::InitializingDevice)
|
||||
{
|
||||
fConfig.SetValue(key, val);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw InvalidStateError{tools::ToString("PluginServices::SetProperty is not supported in device state ", currentState, ". Supported state is ", DeviceState::InitializingDevice, ".")};
|
||||
}
|
||||
}
|
||||
struct InvalidStateError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
/// @brief Read config property
|
||||
/// @param key
|
||||
/// @return config property value
|
||||
///
|
||||
/// TODO Currently, if a non-existing key is requested and a default constructed object is returned.
|
||||
/// This behaviour will be changed in the future to throw an exception in that case to provide a proper sentinel.
|
||||
template<typename T>
|
||||
auto GetProperty(const std::string& key) const -> T { return fConfig.GetValue<T>(key); }
|
||||
|
||||
//// Configuration
|
||||
/// @brief Read config property as string
|
||||
/// @param key
|
||||
/// @return config property value converted to string
|
||||
///
|
||||
/// If a type is not supported, the user can provide support by overloading the ostream operator for this type
|
||||
auto GetPropertyAsString(const std::string& key) const -> std::string { return fConfig.GetStringValue(key); }
|
||||
|
||||
//// Writing only works during Initializing_device state
|
||||
//template<typename T>
|
||||
//auto SetProperty(const std::string& key, T val) -> void;
|
||||
/// @brief Subscribe to property updates of type T
|
||||
/// @param subscriber
|
||||
/// @param callback function
|
||||
///
|
||||
/// While PluginServices provides the SetProperty method which can update properties only during certain device states, there are
|
||||
/// other methods in a FairMQ device that can update properties at any time. Therefore, the callback implementation should expect to be called in any
|
||||
/// device state.
|
||||
// template<typename T>
|
||||
// auto SubscribeToPropertyChange(
|
||||
// const std::string& subscriber,
|
||||
// std::function<void(const std::string& [>key*/, const T /*newValue<])> callback
|
||||
// ) const -> void
|
||||
// {
|
||||
// fConfig.Subscribe(subscriber, callback);
|
||||
// }
|
||||
//
|
||||
// /// @brief Unsubscribe from property updates of type T
|
||||
// /// @param subscriber
|
||||
// template<typename T>
|
||||
// auto UnsubscribeFromPropertyChange(const std::string& subscriber) -> void { fConfig.Unsubscribe<T>(subscriber); }
|
||||
//
|
||||
// TODO Fix property subscription
|
||||
// TODO Property iterator
|
||||
|
||||
//template<typename T>
|
||||
//auto GetProperty(const std::string& key) const -> T;
|
||||
//auto GetPropertyAsString(const std::string& key) const -> std::string;
|
||||
|
||||
//template<typename T>
|
||||
//auto SubscribeToPropertyChange(
|
||||
//const std::string& key,
|
||||
//std::function<void(const std::string& [>key*/, const T /*newValue<])> callback
|
||||
//) const -> void;
|
||||
//auto UnsubscribeFromPropertyChange(const std::string& key) -> void;
|
||||
static const std::unordered_map<std::string, DeviceState> fkDeviceStateStrMap;
|
||||
static const std::unordered_map<DeviceState, std::string> fkStrDeviceStateMap;
|
||||
static const std::unordered_map<std::string, DeviceStateTransition> fkDeviceStateTransitionStrMap;
|
||||
static const std::unordered_map<DeviceStateTransition, std::string> fkStrDeviceStateTransitionMap;
|
||||
static const std::unordered_map<FairMQDevice::State, DeviceState> fkDeviceStateMap;
|
||||
static const std::unordered_map<DeviceStateTransition, FairMQDevice::Event> fkDeviceStateTransitionMap;
|
||||
|
||||
private:
|
||||
|
||||
FairMQProgOptions& fConfig;
|
||||
FairMQDevice& fDevice;
|
||||
std::atomic<bool> fConfigEnabled;
|
||||
const std::unordered_map<std::string, DeviceState> fkDeviceStateStrMap;
|
||||
const std::unordered_map<DeviceState, std::string> fkStrDeviceStateMap;
|
||||
const std::unordered_map<FairMQDevice::State, DeviceState> fkDeviceStateMap;
|
||||
const std::unordered_map<DeviceStateTransition, FairMQDevice::Event> fkDeviceStateTransitionMap;
|
||||
}; /* class PluginServices */
|
||||
|
||||
|
||||
} /* namespace mq */
|
||||
} /* namespace fair */
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
#ifndef FAIRMQEVENTMANAGER_H
|
||||
#define FAIRMQEVENTMANAGER_H
|
||||
|
||||
#include <FairMQLogger.h>
|
||||
|
||||
#include <map>
|
||||
#include <utility>
|
||||
#include <string>
|
||||
|
@ -97,7 +99,7 @@ class FairMQEventManager
|
|||
}
|
||||
|
||||
template <EventId event, typename... ValueType>
|
||||
void Disonnect(const std::string& key)
|
||||
void Disconnect(const std::string& key)
|
||||
{
|
||||
GetSlot<event, ValueType...>(key).disconnect();
|
||||
}
|
||||
|
@ -111,14 +113,7 @@ class FairMQEventManager
|
|||
template <EventId event>
|
||||
bool EventKeyFound(const std::string& key)
|
||||
{
|
||||
if (fEventMap.find(std::pair<EventId, std::string>(event, key)) != fEventMap.end())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return fEventMap.find(std::pair<EventId, std::string>(event, key)) != fEventMap.end();
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -205,6 +205,37 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
|||
return 0;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
int SetValue(const std::string& key, T val)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(fConfigMutex);
|
||||
|
||||
// update variable map
|
||||
UpdateVarMap<typename std::decay<T>::type>(key, val);
|
||||
|
||||
// update FairMQChannel map, check first if data are int or string
|
||||
if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)
|
||||
{
|
||||
if (fMQKeyMap.count(key))
|
||||
{
|
||||
std::string channelName;
|
||||
int index = 0;
|
||||
std::string member;
|
||||
std::tie(channelName, index, member) = fMQKeyMap.at(key);
|
||||
UpdateChannelMap(channelName, index, member, val);
|
||||
}
|
||||
}
|
||||
|
||||
// execute stored function of a given key if exist
|
||||
//if (std::is_same<T, int>::value || std::is_same<T, std::string>::value)//if one wants to restrict type
|
||||
if (EventKeyFound(key))
|
||||
{
|
||||
EmitUpdate<typename std::decay<T>::type>(key, val);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
template <typename T, typename F>
|
||||
void Subscribe(const std::string& key, F&& func) const
|
||||
{
|
||||
|
@ -215,10 +246,19 @@ class FairMQProgOptions : public FairProgOptions , public FairMQEventManager
|
|||
|
||||
if (fVarMap.count(key))
|
||||
{
|
||||
FairMQEventManager::Connect<EventId::UpdateParam, T>(key, std::forward<F>(func));
|
||||
Connect<EventId::UpdateParam, T>(key, std::forward<F>(func));
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void Unsubscribe(const std::string& key) const
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(fConfigMutex);
|
||||
|
||||
Disconnect<EventId::UpdateParam, T>(key);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
template <typename F>
|
||||
void Subscribe(const std::string& key, F&& func)
|
||||
|
|
|
@ -59,7 +59,7 @@ FairProgOptions::~FairProgOptions()
|
|||
|
||||
/// //////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
/// Add option descriptions
|
||||
int FairProgOptions::AddToCmdLineOptions(const po::options_description& optDesc, bool visible)
|
||||
int FairProgOptions::AddToCmdLineOptions(const po::options_description optDesc, bool visible)
|
||||
{
|
||||
fCmdLineOptions.add(optDesc);
|
||||
if (visible)
|
||||
|
@ -69,7 +69,7 @@ int FairProgOptions::AddToCmdLineOptions(const po::options_description& optDesc,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int FairProgOptions::AddToCfgFileOptions(const po::options_description& optDesc, bool visible)
|
||||
int FairProgOptions::AddToCfgFileOptions(const po::options_description optDesc, bool visible)
|
||||
{
|
||||
//if UseConfigFile() not yet called, then enable it with required file name to be provided by command line
|
||||
if (!fUseConfigFile)
|
||||
|
@ -100,7 +100,7 @@ po::options_description& FairProgOptions::GetEnvironmentOptions()
|
|||
return fEnvironmentDesc;
|
||||
}
|
||||
|
||||
int FairProgOptions::AddToEnvironmentOptions(const po::options_description& optDesc)
|
||||
int FairProgOptions::AddToEnvironmentOptions(const po::options_description optDesc)
|
||||
{
|
||||
fEnvironmentDesc.add(optDesc);
|
||||
return 0;
|
||||
|
|
|
@ -65,9 +65,9 @@ class FairProgOptions
|
|||
virtual ~FairProgOptions();
|
||||
|
||||
// add options_description
|
||||
int AddToCmdLineOptions(const po::options_description& optDesc, bool visible = true);
|
||||
int AddToCfgFileOptions(const po::options_description& optDesc, bool visible = true);
|
||||
int AddToEnvironmentOptions(const po::options_description& optDesc);
|
||||
int AddToCmdLineOptions(const po::options_description optDesc, bool visible = true);
|
||||
int AddToCfgFileOptions(const po::options_description optDesc, bool visible = true);
|
||||
int AddToEnvironmentOptions(const po::options_description optDesc);
|
||||
po::options_description& GetCmdLineOptions();
|
||||
po::options_description& GetCfgFileOptions();
|
||||
po::options_description& GetEnvironmentOptions();
|
||||
|
|
|
@ -47,14 +47,16 @@ int main(int argc, const char** argv)
|
|||
boost::program_options::options_description customOptions("Custom options");
|
||||
addCustomOptions(customOptions);
|
||||
|
||||
// Plugin manager needs to be destroyed after config !
|
||||
// TODO Investigate, why
|
||||
auto pluginManager = fair::mq::PluginManager::MakeFromCommandLineOptions(fair::mq::tools::ToStrVector(argc, argv));
|
||||
FairMQProgOptions config;
|
||||
config.AddToCmdLineOptions(customOptions);
|
||||
|
||||
auto pluginManager = fair::mq::PluginManager::MakeFromCommandLineOptions(fair::mq::tools::ToStrVector(argc, argv));
|
||||
config.AddToCmdLineOptions(pluginManager->ProgramOptions());
|
||||
pluginManager->ForEachPluginProgOptions([&config](const boost::program_options::options_description& options){
|
||||
pluginManager->ForEachPluginProgOptions([&config](boost::program_options::options_description options){
|
||||
config.AddToCmdLineOptions(options);
|
||||
});
|
||||
config.AddToCmdLineOptions(pluginManager->ProgramOptions());
|
||||
|
||||
config.ParseAll(argc, argv, true);
|
||||
|
||||
|
|
|
@ -47,12 +47,12 @@ class DummyPlugin : public fair::mq::Plugin
|
|||
}
|
||||
}; /* class DummyPlugin */
|
||||
|
||||
auto DummyPluginProgramOptions() -> const boost::optional<boost::program_options::options_description>
|
||||
auto DummyPluginProgramOptions() -> Plugin::ProgOptions
|
||||
{
|
||||
auto plugin_options = boost::program_options::options_description{"Dummy Plugin"};
|
||||
plugin_options.add_options()
|
||||
("custom-dummy-option", value<std::string>(), "Cool custom option.");
|
||||
("custom-dummy-option2", value<std::string>(), "Another cool custom option.");
|
||||
("custom-dummy-option", boost::program_options::value<std::string>(), "Cool custom option.");
|
||||
("custom-dummy-option2", boost::program_options::value<std::string>(), "Another cool custom option.");
|
||||
return plugin_options;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user