mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 01:51:45 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
3eca8e9def | ||
|
beb7766fca | ||
|
bf909f94dc | ||
|
1140c4c6ab | ||
|
a6da208e79 | ||
|
ba3a82b1df | ||
|
e8cc104344 | ||
|
d5d5c27958 | ||
|
5a7dcd9fc1 |
@@ -51,7 +51,7 @@ fairmq-dds-command-ui -c k
|
|||||||
fairmq-dds-command-ui -c b
|
fairmq-dds-command-ui -c b
|
||||||
fairmq-dds-command-ui -c x
|
fairmq-dds-command-ui -c x
|
||||||
fairmq-dds-command-ui -c j
|
fairmq-dds-command-ui -c j
|
||||||
allexceptqctasks="main/(Sampler|QCDispatcher|Sink)"
|
allexceptqctasks="main/(Sampler|QCDispatcher|Sink).*"
|
||||||
fairmq-dds-command-ui -c r -p $allexceptqctasks
|
fairmq-dds-command-ui -c r -p $allexceptqctasks
|
||||||
qctask="main/QCTask.*"
|
qctask="main/QCTask.*"
|
||||||
qcdispatcher="main/QCDispatcher.*"
|
qcdispatcher="main/QCDispatcher.*"
|
||||||
|
@@ -38,6 +38,10 @@ class FairMQSocket
|
|||||||
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;
|
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;
|
||||||
virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0;
|
virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0;
|
||||||
|
|
||||||
|
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
|
||||||
|
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
|
||||||
|
/// the future.
|
||||||
|
virtual void Events(uint32_t* events) = 0;
|
||||||
virtual void SetLinger(const int value) = 0;
|
virtual void SetLinger(const int value) = 0;
|
||||||
virtual int GetLinger() const = 0;
|
virtual int GetLinger() const = 0;
|
||||||
virtual void SetSndBufSize(const int value) = 0;
|
virtual void SetSndBufSize(const int value) = 0;
|
||||||
|
@@ -18,9 +18,10 @@ namespace fair
|
|||||||
namespace mq
|
namespace mq
|
||||||
{
|
{
|
||||||
|
|
||||||
array<string, 15> stateNames =
|
array<string, 16> stateNames =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
|
"UNDEFINED",
|
||||||
"OK",
|
"OK",
|
||||||
"ERROR",
|
"ERROR",
|
||||||
"IDLE",
|
"IDLE",
|
||||||
@@ -41,6 +42,7 @@ array<string, 15> stateNames =
|
|||||||
|
|
||||||
unordered_map<string, State> states =
|
unordered_map<string, State> states =
|
||||||
{
|
{
|
||||||
|
{ "UNDEFINED", State::Undefined },
|
||||||
{ "OK", State::Ok },
|
{ "OK", State::Ok },
|
||||||
{ "ERROR", State::Error },
|
{ "ERROR", State::Error },
|
||||||
{ "IDLE", State::Idle },
|
{ "IDLE", State::Idle },
|
||||||
|
@@ -20,6 +20,7 @@ namespace mq
|
|||||||
|
|
||||||
enum class State : int
|
enum class State : int
|
||||||
{
|
{
|
||||||
|
Undefined = 0,
|
||||||
Ok,
|
Ok,
|
||||||
Error,
|
Error,
|
||||||
Idle,
|
Idle,
|
||||||
@@ -39,7 +40,7 @@ enum class State : int
|
|||||||
|
|
||||||
enum class Transition : int
|
enum class Transition : int
|
||||||
{
|
{
|
||||||
Auto,
|
Auto = 0,
|
||||||
InitDevice,
|
InitDevice,
|
||||||
CompleteInit,
|
CompleteInit,
|
||||||
Bind,
|
Bind,
|
||||||
|
@@ -45,6 +45,7 @@ class Socket final : public fair::mq::Socket
|
|||||||
|
|
||||||
auto GetId() const -> std::string override { return fId; }
|
auto GetId() const -> std::string override { return fId; }
|
||||||
|
|
||||||
|
auto Events(uint32_t *events) -> void override { *events = 0; }
|
||||||
auto Bind(const std::string& address) -> bool override;
|
auto Bind(const std::string& address) -> bool override;
|
||||||
auto Connect(const std::string& address) -> bool override;
|
auto Connect(const std::string& address) -> bool override;
|
||||||
|
|
||||||
|
@@ -65,7 +65,7 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
|||||||
cout << d.taskId << " : " << d.state << endl;
|
cout << d.taskId << " : " << d.state << endl;
|
||||||
}
|
}
|
||||||
} else if (command == "o") {
|
} else if (command == "o") {
|
||||||
cout << "> dumping config of the devices (" << path << ")" << endl;
|
cout << "> dumping config of " << (path == "" ? "all" : path) << endl;
|
||||||
// TODO: extend this regex to return all properties, once command size limitation is removed.
|
// TODO: extend this regex to return all properties, once command size limitation is removed.
|
||||||
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
|
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
|
||||||
for (const auto& d : result.second.devices) {
|
for (const auto& d : result.second.devices) {
|
||||||
@@ -79,39 +79,39 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const DeviceProperties props{{pKey, pVal}};
|
const DeviceProperties props{{pKey, pVal}};
|
||||||
cout << "> sending property (" << path << ")" << endl;
|
cout << "> setting properties --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.SetProperties(props, path);
|
topo.SetProperties(props, path);
|
||||||
// give dds time to complete request
|
// give dds time to complete request
|
||||||
this_thread::sleep_for(chrono::milliseconds(100));
|
this_thread::sleep_for(chrono::milliseconds(100));
|
||||||
} else if (command == "i") {
|
} else if (command == "i") {
|
||||||
cout << "> init devices (" << path << ")" << endl;
|
cout << "> initiating InitDevice transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::InitDevice, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::InitDevice, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "k") {
|
} else if (command == "k") {
|
||||||
cout << "> complete init (" << path << ")" << endl;
|
cout << "> initiating CompleteInit transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::CompleteInit, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::CompleteInit, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "b") {
|
} else if (command == "b") {
|
||||||
cout << "> bind devices (" << path << ")" << endl;
|
cout << "> initiating Bind transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::Bind, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::Bind, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "x") {
|
} else if (command == "x") {
|
||||||
cout << "> connect devices (" << path << ")" << endl;
|
cout << "> initiating Connect transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::Connect, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::Connect, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "j") {
|
} else if (command == "j") {
|
||||||
cout << "> init tasks (" << path << ")" << endl;
|
cout << "> initiating InitTask transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::InitTask, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::InitTask, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "r") {
|
} else if (command == "r") {
|
||||||
cout << "> run tasks (" << path << ")" << endl;
|
cout << "> initiating Run transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::Run, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::Run, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "s") {
|
} else if (command == "s") {
|
||||||
cout << "> stop devices (" << path << ")" << endl;
|
cout << "> initiating Stop transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::Stop, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::Stop, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "t") {
|
} else if (command == "t") {
|
||||||
cout << "> reset tasks (" << path << ")" << endl;
|
cout << "> initiating ResetTask transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::ResetTask, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::ResetTask, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "d") {
|
} else if (command == "d") {
|
||||||
cout << "> reset devices (" << path << ")" << endl;
|
cout << "> initiating ResetDevice transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::ResetDevice, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::ResetDevice, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "q") {
|
} else if (command == "q") {
|
||||||
cout << "> end (" << path << ")" << endl;
|
cout << "> initiating End transition --> " << (path == "" ? "all" : path) << endl;
|
||||||
topo.ChangeState(TopologyTransition::End, path, std::chrono::milliseconds(timeout));
|
topo.ChangeState(TopologyTransition::End, path, std::chrono::milliseconds(timeout));
|
||||||
} else if (command == "h") {
|
} else if (command == "h") {
|
||||||
cout << "> help" << endl;
|
cout << "> help" << endl;
|
||||||
@@ -206,6 +206,7 @@ try {
|
|||||||
sendCommand(command, path, timeout, topo, pKey, pVal);
|
sendCommand(command, path, timeout, topo, pKey, pVal);
|
||||||
}
|
}
|
||||||
size_t pos = targetState.find("->");
|
size_t pos = targetState.find("->");
|
||||||
|
cout << "> waiting for " << (path == "" ? "all" : path) << " to reach " << targetState << endl;
|
||||||
if (pos == string::npos) {
|
if (pos == string::npos) {
|
||||||
/* auto ec = */topo.WaitForState(GetState(targetState), path, std::chrono::milliseconds(timeout));
|
/* auto ec = */topo.WaitForState(GetState(targetState), path, std::chrono::milliseconds(timeout));
|
||||||
// cout << "WaitForState(" << targetState << ") result: " << ec.message() << endl;
|
// cout << "WaitForState(" << targetState << ") result: " << ec.message() << endl;
|
||||||
|
@@ -69,6 +69,8 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
|
|||||||
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
|
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
|
||||||
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
|
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
|
||||||
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
|
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
|
||||||
|
("shm-mlock-segment", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.")
|
||||||
|
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
|
||||||
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
|
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
|
||||||
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
|
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
|
||||||
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
|
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")
|
||||||
|
@@ -25,11 +25,6 @@ struct RuntimeError : ::std::runtime_error
|
|||||||
{}
|
{}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct MixedStateError : RuntimeError
|
|
||||||
{
|
|
||||||
using RuntimeError::RuntimeError;
|
|
||||||
};
|
|
||||||
|
|
||||||
} /* namespace sdk */
|
} /* namespace sdk */
|
||||||
|
|
||||||
enum class ErrorCode
|
enum class ErrorCode
|
||||||
|
@@ -70,6 +70,47 @@ const std::map<DeviceTransition, DeviceState> expectedState =
|
|||||||
{ DeviceTransition::End, DeviceState::Exiting }
|
{ DeviceTransition::End, DeviceState::Exiting }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// mirrors DeviceState, but adds a "Mixed" state that represents a topology where devices are currently not in the same state.
|
||||||
|
enum class AggregatedTopologyState : int
|
||||||
|
{
|
||||||
|
Undefined = static_cast<int>(fair::mq::State::Undefined),
|
||||||
|
Ok = static_cast<int>(fair::mq::State::Ok),
|
||||||
|
Error = static_cast<int>(fair::mq::State::Error),
|
||||||
|
Idle = static_cast<int>(fair::mq::State::Idle),
|
||||||
|
InitializingDevice = static_cast<int>(fair::mq::State::InitializingDevice),
|
||||||
|
Initialized = static_cast<int>(fair::mq::State::Initialized),
|
||||||
|
Binding = static_cast<int>(fair::mq::State::Binding),
|
||||||
|
Bound = static_cast<int>(fair::mq::State::Bound),
|
||||||
|
Connecting = static_cast<int>(fair::mq::State::Connecting),
|
||||||
|
DeviceReady = static_cast<int>(fair::mq::State::DeviceReady),
|
||||||
|
InitializingTask = static_cast<int>(fair::mq::State::InitializingTask),
|
||||||
|
Ready = static_cast<int>(fair::mq::State::Ready),
|
||||||
|
Running = static_cast<int>(fair::mq::State::Running),
|
||||||
|
ResettingTask = static_cast<int>(fair::mq::State::ResettingTask),
|
||||||
|
ResettingDevice = static_cast<int>(fair::mq::State::ResettingDevice),
|
||||||
|
Exiting = static_cast<int>(fair::mq::State::Exiting),
|
||||||
|
Mixed
|
||||||
|
};
|
||||||
|
|
||||||
|
inline auto operator==(DeviceState lhs, AggregatedTopologyState rhs) -> bool
|
||||||
|
{
|
||||||
|
return static_cast<int>(lhs) == static_cast<int>(rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline auto operator==(AggregatedTopologyState lhs, DeviceState rhs) -> bool
|
||||||
|
{
|
||||||
|
return static_cast<int>(lhs) == static_cast<int>(rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline std::ostream& operator<<(std::ostream& os, const AggregatedTopologyState& state)
|
||||||
|
{
|
||||||
|
if (state == AggregatedTopologyState::Mixed) {
|
||||||
|
return os << "Mixed";
|
||||||
|
} else {
|
||||||
|
return os << static_cast<DeviceState>(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct DeviceStatus
|
struct DeviceStatus
|
||||||
{
|
{
|
||||||
bool subscribed_to_state_changes;
|
bool subscribed_to_state_changes;
|
||||||
@@ -100,22 +141,22 @@ using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
|
|||||||
using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
|
using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
|
||||||
using TopologyTransition = fair::mq::Transition;
|
using TopologyTransition = fair::mq::Transition;
|
||||||
|
|
||||||
inline DeviceState AggregateState(const TopologyState& topologyState)
|
inline AggregatedTopologyState AggregateState(const TopologyState& topologyState)
|
||||||
{
|
{
|
||||||
DeviceState first = topologyState.begin()->state;
|
DeviceState first = topologyState.begin()->state;
|
||||||
|
|
||||||
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
|
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
|
||||||
return i.state == first;
|
return i.state == first;
|
||||||
})) {
|
})) {
|
||||||
return first;
|
return static_cast<AggregatedTopologyState>(first);
|
||||||
}
|
}
|
||||||
|
|
||||||
throw MixedStateError("State is not uniform");
|
return AggregatedTopologyState::Mixed;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
|
inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
|
||||||
{
|
{
|
||||||
return AggregateState(topologyState) == state;
|
return AggregateState(topologyState) == static_cast<AggregatedTopologyState>(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
|
inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
|
||||||
@@ -439,6 +480,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
if (fTasks.empty()) {
|
||||||
|
FAIR_LOG(warn) << "ChangeState initiated on an empty set of tasks, check the path argument.";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ChangeStateOp() = delete;
|
ChangeStateOp() = delete;
|
||||||
ChangeStateOp(const ChangeStateOp&) = delete;
|
ChangeStateOp(const ChangeStateOp&) = delete;
|
||||||
@@ -741,6 +785,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
if (fTasks.empty()) {
|
||||||
|
FAIR_LOG(warn) << "WaitForState initiated on an empty set of tasks, check the path argument.";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
WaitForStateOp() = delete;
|
WaitForStateOp() = delete;
|
||||||
WaitForStateOp(const WaitForStateOp&) = delete;
|
WaitForStateOp(const WaitForStateOp&) = delete;
|
||||||
@@ -754,9 +801,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
{
|
{
|
||||||
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
|
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
|
||||||
if (ContainsTask(stateData.at(s.second).taskId)) {
|
if (ContainsTask(stateData.at(s.second).taskId)) {
|
||||||
return stateData.at(s.second).state == fTargetCurrentState
|
return stateData.at(s.second).state == fTargetCurrentState &&
|
||||||
&&
|
(stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined);
|
||||||
(stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Ok);
|
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@@ -768,8 +814,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
{
|
{
|
||||||
if (!fOp.IsCompleted() && ContainsTask(taskId)) {
|
if (!fOp.IsCompleted() && ContainsTask(taskId)) {
|
||||||
if (currentState == fTargetCurrentState &&
|
if (currentState == fTargetCurrentState &&
|
||||||
(lastState == fTargetLastState ||
|
(lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined)) {
|
||||||
fTargetLastState == DeviceState::Ok)) {
|
|
||||||
++fCount;
|
++fCount;
|
||||||
}
|
}
|
||||||
TryCompletion();
|
TryCompletion();
|
||||||
@@ -873,7 +918,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
template<typename CompletionToken>
|
template<typename CompletionToken>
|
||||||
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
|
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
|
||||||
{
|
{
|
||||||
return AsyncWaitForState(DeviceState::Ok, targetCurrentState, "", Duration(0), std::move(token));
|
return AsyncWaitForState(DeviceState::Undefined, targetCurrentState, "", Duration(0), std::move(token));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @brief Wait for selected FairMQ devices to reach given last & current state in this topology
|
/// @brief Wait for selected FairMQ devices to reach given last & current state in this topology
|
||||||
@@ -903,7 +948,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
|
auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
|
||||||
-> std::error_code
|
-> std::error_code
|
||||||
{
|
{
|
||||||
return WaitForState(DeviceState::Ok, targetCurrentState, path, timeout);
|
return WaitForState(DeviceState::Undefined, targetCurrentState, path, timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
|
using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
|
||||||
@@ -938,6 +983,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
if (expectedCount == 0) {
|
||||||
|
FAIR_LOG(warn) << "GetProperties initiated on an empty set of tasks, check the path argument.";
|
||||||
|
}
|
||||||
// FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
// FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
||||||
}
|
}
|
||||||
GetPropertiesOp() = delete;
|
GetPropertiesOp() = delete;
|
||||||
@@ -1093,6 +1141,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
if (expectedCount == 0) {
|
||||||
|
FAIR_LOG(warn) << "SetProperties initiated on an empty set of tasks, check the path argument.";
|
||||||
|
}
|
||||||
// FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
// FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
|
||||||
}
|
}
|
||||||
SetPropertiesOp() = delete;
|
SetPropertiesOp() = delete;
|
||||||
@@ -1242,7 +1293,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
|||||||
int index = 0;
|
int index = 0;
|
||||||
|
|
||||||
for (const auto& task : fDDSTopo.GetTasks()) {
|
for (const auto& task : fDDSTopo.GetTasks()) {
|
||||||
fStateData.push_back(DeviceStatus{false, DeviceState::Ok, DeviceState::Ok, task.GetId(), task.GetCollectionId()});
|
fStateData.push_back(DeviceStatus{false, DeviceState::Undefined, DeviceState::Undefined, task.GetId(), task.GetCollectionId()});
|
||||||
fStateIndex.emplace(task.GetId(), index);
|
fStateIndex.emplace(task.GetId(), index);
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
|
@@ -70,9 +70,10 @@ array<string, 17> typeNames =
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
array<fair::mq::State, 15> fbStateToMQState =
|
array<fair::mq::State, 16> fbStateToMQState =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
|
fair::mq::State::Undefined,
|
||||||
fair::mq::State::Ok,
|
fair::mq::State::Ok,
|
||||||
fair::mq::State::Error,
|
fair::mq::State::Error,
|
||||||
fair::mq::State::Idle,
|
fair::mq::State::Idle,
|
||||||
@@ -91,9 +92,10 @@ array<fair::mq::State, 15> fbStateToMQState =
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
array<sdk::cmd::FBState, 15> mqStateToFBState =
|
array<sdk::cmd::FBState, 16> mqStateToFBState =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
|
sdk::cmd::FBState_Undefined,
|
||||||
sdk::cmd::FBState_Ok,
|
sdk::cmd::FBState_Ok,
|
||||||
sdk::cmd::FBState_Error,
|
sdk::cmd::FBState_Error,
|
||||||
sdk::cmd::FBState_Idle,
|
sdk::cmd::FBState_Idle,
|
||||||
|
@@ -6,6 +6,7 @@ enum FBResult:byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
enum FBState:byte {
|
enum FBState:byte {
|
||||||
|
Undefined,
|
||||||
Ok,
|
Ok,
|
||||||
Error,
|
Error,
|
||||||
Idle,
|
Idle,
|
||||||
|
@@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
#include <FairMQLogger.h>
|
#include <FairMQLogger.h>
|
||||||
#include <FairMQMessage.h>
|
#include <FairMQMessage.h>
|
||||||
|
#include <fairmq/ProgOptions.h>
|
||||||
#include <fairmq/tools/CppSTL.h>
|
#include <fairmq/tools/CppSTL.h>
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
@@ -43,6 +44,8 @@
|
|||||||
#include <utility> // pair
|
#include <utility> // pair
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include <sys/mman.h> // mlock
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
namespace mq
|
namespace mq
|
||||||
@@ -55,8 +58,8 @@ struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtim
|
|||||||
class Manager
|
class Manager
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc)
|
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
|
||||||
: fShmId(std::move(id))
|
: fShmId(std::move(shmId))
|
||||||
, fDeviceId(std::move(deviceId))
|
, fDeviceId(std::move(deviceId))
|
||||||
, fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
|
, fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
|
||||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360)
|
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360)
|
||||||
@@ -70,10 +73,37 @@ class Manager
|
|||||||
, fMsgCounter(0)
|
, fMsgCounter(0)
|
||||||
, fHeartbeatThread()
|
, fHeartbeatThread()
|
||||||
, fSendHeartbeats(true)
|
, fSendHeartbeats(true)
|
||||||
, fThrowOnBadAlloc(throwOnBadAlloc)
|
, fThrowOnBadAlloc(true)
|
||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
|
bool mlockSegment = false;
|
||||||
|
bool zeroSegment = false;
|
||||||
|
bool autolaunchMonitor = false;
|
||||||
|
if (config) {
|
||||||
|
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
|
||||||
|
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
|
||||||
|
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
|
||||||
|
fThrowOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", fThrowOnBadAlloc);
|
||||||
|
} else {
|
||||||
|
LOG(debug) << "ProgOptions not available! Using defaults.";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (autolaunchMonitor) {
|
||||||
|
StartMonitor(fShmId);
|
||||||
|
}
|
||||||
|
|
||||||
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
|
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
|
||||||
|
if (mlockSegment) {
|
||||||
|
LOG(debug) << "Locking the managed segment memory pages...";
|
||||||
|
mlock(fSegment.get_address(), fSegment.get_size());
|
||||||
|
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
||||||
|
}
|
||||||
|
if (zeroSegment) {
|
||||||
|
LOG(debug) << "Zeroing the managed segment free memory...";
|
||||||
|
fSegment.zero_free_memory();
|
||||||
|
LOG(debug) << "Successfully zeroed the managed segment free memory.";
|
||||||
|
}
|
||||||
|
|
||||||
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
|
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
|
||||||
// store info about the managed segment as region with id 0
|
// store info about the managed segment as region with id 0
|
||||||
|
@@ -375,6 +375,15 @@ class Socket final : public fair::mq::Socket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Events(uint32_t* events) override
|
||||||
|
{
|
||||||
|
size_t eventsSize = sizeof(uint32_t);
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
|
||||||
|
throw SocketError(
|
||||||
|
tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int GetLinger() const override
|
int GetLinger() const override
|
||||||
{
|
{
|
||||||
int value = 0;
|
int value = 0;
|
||||||
@@ -485,6 +494,12 @@ class Socket final : public fair::mq::Socket
|
|||||||
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
|
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
|
||||||
|
|
||||||
if (constant == "fd") return ZMQ_FD;
|
if (constant == "fd") return ZMQ_FD;
|
||||||
|
if (constant == "events")
|
||||||
|
return ZMQ_EVENTS;
|
||||||
|
if (constant == "pollin")
|
||||||
|
return ZMQ_POLLIN;
|
||||||
|
if (constant == "pollout")
|
||||||
|
return ZMQ_POLLOUT;
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@@ -58,14 +58,10 @@ class TransportFactory final : public fair::mq::TransportFactory
|
|||||||
int numIoThreads = 1;
|
int numIoThreads = 1;
|
||||||
std::string sessionName = "default";
|
std::string sessionName = "default";
|
||||||
size_t segmentSize = 2ULL << 30;
|
size_t segmentSize = 2ULL << 30;
|
||||||
bool autolaunchMonitor = false;
|
|
||||||
bool throwOnBadAlloc = true;
|
|
||||||
if (config) {
|
if (config) {
|
||||||
numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
|
numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
|
||||||
sessionName = config->GetProperty<std::string>("session", sessionName);
|
sessionName = config->GetProperty<std::string>("session", sessionName);
|
||||||
segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
|
segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
|
||||||
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
|
|
||||||
throwOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", throwOnBadAlloc);
|
|
||||||
} else {
|
} else {
|
||||||
LOG(debug) << "ProgOptions not available! Using defaults.";
|
LOG(debug) << "ProgOptions not available! Using defaults.";
|
||||||
}
|
}
|
||||||
@@ -83,11 +79,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
|||||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (autolaunchMonitor) {
|
fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, config);
|
||||||
Manager::StartMonitor(fShmId);
|
|
||||||
}
|
|
||||||
|
|
||||||
fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, throwOnBadAlloc);
|
|
||||||
} catch (boost::interprocess::interprocess_exception& e) {
|
} catch (boost::interprocess::interprocess_exception& e) {
|
||||||
LOG(error) << "Could not initialize shared memory transport: " << e.what();
|
LOG(error) << "Could not initialize shared memory transport: " << e.what();
|
||||||
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
|
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
|
||||||
|
@@ -318,6 +318,15 @@ class Socket final : public fair::mq::Socket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Events(uint32_t* events) override
|
||||||
|
{
|
||||||
|
size_t eventsSize = sizeof(uint32_t);
|
||||||
|
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
|
||||||
|
throw SocketError(
|
||||||
|
tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void SetLinger(const int value) override
|
void SetLinger(const int value) override
|
||||||
{
|
{
|
||||||
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
|
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
|
||||||
@@ -433,6 +442,12 @@ class Socket final : public fair::mq::Socket
|
|||||||
if (constant == "linger") return ZMQ_LINGER;
|
if (constant == "linger") return ZMQ_LINGER;
|
||||||
|
|
||||||
if (constant == "fd") return ZMQ_FD;
|
if (constant == "fd") return ZMQ_FD;
|
||||||
|
if (constant == "events")
|
||||||
|
return ZMQ_EVENTS;
|
||||||
|
if (constant == "pollin")
|
||||||
|
return ZMQ_POLLIN;
|
||||||
|
if (constant == "pollout")
|
||||||
|
return ZMQ_POLLOUT;
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@@ -139,6 +139,32 @@ TEST_F(Topology, ChangeState)
|
|||||||
EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true);
|
EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(Topology, MixedState)
|
||||||
|
{
|
||||||
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
sdk::Topology topo(mDDSTopo, mDDSSession);
|
||||||
|
auto result1(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/Sampler.*"));
|
||||||
|
LOG(info) << result1.first;
|
||||||
|
|
||||||
|
EXPECT_EQ(result1.first, std::error_code());
|
||||||
|
EXPECT_EQ(sdk::AggregateState(result1.second), sdk::AggregatedTopologyState::Mixed);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(result1.second, sdk::DeviceState::InitializingDevice), false);
|
||||||
|
auto const currentState1 = topo.GetCurrentState();
|
||||||
|
EXPECT_EQ(sdk::AggregateState(currentState1), sdk::AggregatedTopologyState::Mixed);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(currentState1, sdk::DeviceState::InitializingDevice), false);
|
||||||
|
|
||||||
|
auto result2(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/SinkGroup/.*"));
|
||||||
|
LOG(info) << result2.first;
|
||||||
|
|
||||||
|
EXPECT_EQ(result2.first, std::error_code());
|
||||||
|
EXPECT_EQ(sdk::AggregateState(result2.second), sdk::AggregatedTopologyState::InitializingDevice);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(result2.second, sdk::DeviceState::InitializingDevice), true);
|
||||||
|
auto const currentState2 = topo.GetCurrentState();
|
||||||
|
EXPECT_EQ(sdk::AggregateState(currentState2), sdk::AggregatedTopologyState::InitializingDevice);
|
||||||
|
EXPECT_EQ(sdk::StateEqualsTo(currentState2, sdk::DeviceState::InitializingDevice), true);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(Topology, AsyncChangeStateConcurrent)
|
TEST_F(Topology, AsyncChangeStateConcurrent)
|
||||||
{
|
{
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
@@ -191,9 +217,9 @@ TEST_F(Topology, AsyncChangeStateCollectionView)
|
|||||||
ASSERT_EQ(cstate.size(), 5);
|
ASSERT_EQ(cstate.size(), 5);
|
||||||
for (const auto& c : cstate) {
|
for (const auto& c : cstate) {
|
||||||
LOG(debug) << "\t" << c.first;
|
LOG(debug) << "\t" << c.first;
|
||||||
State s;
|
sdk::AggregatedTopologyState s;
|
||||||
ASSERT_NO_THROW(s = sdk::AggregateState(c.second));
|
ASSERT_NO_THROW(s = sdk::AggregateState(c.second));
|
||||||
ASSERT_EQ(s, State::InitializingDevice);
|
ASSERT_EQ(s, static_cast<sdk::AggregatedTopologyState>(State::InitializingDevice));
|
||||||
LOG(debug) << "\tAggregated state: " << s;
|
LOG(debug) << "\tAggregated state: " << s;
|
||||||
for (const auto& ds : c.second) {
|
for (const auto& ds : c.second) {
|
||||||
LOG(debug) << "\t\t" << ds.state;
|
LOG(debug) << "\t\t" << ds.state;
|
||||||
@@ -426,4 +452,26 @@ TEST_F(Topology, SetAndGetProperties)
|
|||||||
ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code());
|
ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(Topology2, AggregatedTopologyStateComparison)
|
||||||
|
{
|
||||||
|
using namespace fair::mq::sdk;
|
||||||
|
ASSERT_TRUE(DeviceState::Undefined == AggregatedTopologyState::Undefined);
|
||||||
|
ASSERT_TRUE(AggregatedTopologyState::Undefined == DeviceState::Undefined);
|
||||||
|
ASSERT_TRUE(DeviceState::Ok == AggregatedTopologyState::Ok);
|
||||||
|
ASSERT_TRUE(DeviceState::Error == AggregatedTopologyState::Error);
|
||||||
|
ASSERT_TRUE(DeviceState::Idle == AggregatedTopologyState::Idle);
|
||||||
|
ASSERT_TRUE(DeviceState::InitializingDevice == AggregatedTopologyState::InitializingDevice);
|
||||||
|
ASSERT_TRUE(DeviceState::Initialized == AggregatedTopologyState::Initialized);
|
||||||
|
ASSERT_TRUE(DeviceState::Binding == AggregatedTopologyState::Binding);
|
||||||
|
ASSERT_TRUE(DeviceState::Bound == AggregatedTopologyState::Bound);
|
||||||
|
ASSERT_TRUE(DeviceState::Connecting == AggregatedTopologyState::Connecting);
|
||||||
|
ASSERT_TRUE(DeviceState::DeviceReady == AggregatedTopologyState::DeviceReady);
|
||||||
|
ASSERT_TRUE(DeviceState::InitializingTask == AggregatedTopologyState::InitializingTask);
|
||||||
|
ASSERT_TRUE(DeviceState::Ready == AggregatedTopologyState::Ready);
|
||||||
|
ASSERT_TRUE(DeviceState::Running == AggregatedTopologyState::Running);
|
||||||
|
ASSERT_TRUE(DeviceState::ResettingTask == AggregatedTopologyState::ResettingTask);
|
||||||
|
ASSERT_TRUE(DeviceState::ResettingDevice == AggregatedTopologyState::ResettingDevice);
|
||||||
|
ASSERT_TRUE(DeviceState::Exiting == AggregatedTopologyState::Exiting);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
@@ -66,6 +66,24 @@ void RunOptionsTest(const string& transport)
|
|||||||
ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000);
|
ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ZeroingAndMlock(const string& transport)
|
||||||
|
{
|
||||||
|
size_t session{fair::mq::tools::UuidHash()};
|
||||||
|
|
||||||
|
fair::mq::ProgOptions config;
|
||||||
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 16384);
|
||||||
|
config.SetProperty<bool>("shm-zero-segment", true);
|
||||||
|
config.SetProperty<bool>("shm-mlock-segment", true);
|
||||||
|
|
||||||
|
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
|
||||||
|
|
||||||
|
FairMQMessagePtr outMsg(factory->CreateMessage(10000));
|
||||||
|
char test[10000];
|
||||||
|
memset(test, 0, sizeof(test));
|
||||||
|
ASSERT_EQ(memcmp(test, outMsg->GetData(), outMsg->GetSize()), 0);
|
||||||
|
}
|
||||||
|
|
||||||
TEST(Options, zeromq)
|
TEST(Options, zeromq)
|
||||||
{
|
{
|
||||||
RunOptionsTest("zeromq");
|
RunOptionsTest("zeromq");
|
||||||
@@ -76,4 +94,9 @@ TEST(Options, shmem)
|
|||||||
RunOptionsTest("shmem");
|
RunOptionsTest("shmem");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(ZeroingAndMlock, shmem)
|
||||||
|
{
|
||||||
|
ZeroingAndMlock("shmem");
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
Reference in New Issue
Block a user