Compare commits

...

21 Commits

Author SHA1 Message Date
Alexey Rybalchenko
690e8a0370 Retry on EINTR in blocking zmq calls 2020-08-28 18:22:03 +02:00
Alexey Rybalchenko
1f0c94f898 Fix tag mismatch in topology files 2020-08-17 12:39:10 +02:00
Alexey Rybalchenko
62ed4e5f80 Avoid unconditional call to now() when allocating message 2020-08-13 14:16:12 +02:00
neskovic@gmail.com
f1d6b18668 Message counter: use relaxed/acquire memory ordering 2020-08-13 11:49:55 +02:00
Alexey Rybalchenko
c0153a6b55 shmMonitor: Adjust output slightly 2020-08-07 14:27:14 +02:00
Alexey Rybalchenko
86a1dd38a2 ShmMonitor: Use FairLogger for timestamp calculation 2020-08-07 14:27:14 +02:00
Alexey Rybalchenko
be8ab06cc1 Bump FairLogger requirement to 1.6 2020-08-07 14:27:14 +02:00
Alexey Rybalchenko
b0f73017e2 shmmonitor: add output with -v (non-interactive) 2020-08-06 12:24:01 +02:00
Alexey Rybalchenko
b83655d5da MessageBadAlloc: report amount of available memory 2020-08-06 12:24:01 +02:00
Alexey Rybalchenko
9c27051cdc BenchmarkSampler: add alignment parameter 2020-08-06 12:24:01 +02:00
Alexey Rybalchenko
65f9519917 Add error handling for mlock 2020-08-06 12:24:01 +02:00
Alexey Rybalchenko
b5545c1575 Add helpers for AggregatedTopologyState 2020-07-17 12:41:53 +02:00
Alexey Rybalchenko
3eca8e9def Add test for shm transport options 2020-07-15 13:59:53 +02:00
Alexey Rybalchenko
beb7766fca Shm: add options to zero and/or mlock the segment 2020-07-15 13:59:53 +02:00
Giulio Eulisse
bf909f94dc ofi: adapt to the new API for FairMQSocket::Events 2020-07-15 13:58:47 +02:00
Dennis Klein
1140c4c6ab SDK: Provide comparison operator for device and topo states 2020-07-15 13:01:23 +02:00
Alexey Rybalchenko
a6da208e79 Add Undefined and Mixed state for use in SDK 2020-07-15 13:01:23 +02:00
Giulio Eulisse
ba3a82b1df Update FairMQSocket.h 2020-07-15 12:09:54 +02:00
Giulio Eulisse
e8cc104344 Add API to extract ZMQ_EVENTS from socket backend 2020-07-15 12:09:54 +02:00
Alexey Rybalchenko
d5d5c27958 QC examples: fix incorrect topology path 2020-07-09 23:34:28 +02:00
Alexey Rybalchenko
5a7dcd9fc1 SDK: warn if given path translates to no selected tasks 2020-07-09 23:34:28 +02:00
30 changed files with 415 additions and 121 deletions

View File

@@ -94,7 +94,7 @@ endif()
if(BUILD_FAIRMQ OR BUILD_SDK) if(BUILD_FAIRMQ OR BUILD_SDK)
find_package2(PUBLIC FairLogger REQUIRED find_package2(PUBLIC FairLogger REQUIRED
VERSION 1.2.0 VERSION 1.6.0
) )
foreach(dep IN LISTS FairLogger_PACKAGE_DEPENDENCIES) foreach(dep IN LISTS FairLogger_PACKAGE_DEPENDENCIES)

View File

@@ -10,7 +10,7 @@
<exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe> <exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env> <env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties> <properties>
<name access="write">fmqchan_sync</id> <name access="write">fmqchan_sync</name>
</properties> </properties>
</decltask> </decltask>
@@ -18,8 +18,8 @@
<exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=pair,method=connect,numSockets=${numReceivers} --dds-i data:%taskIndex%</exe> <exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=pair,method=connect,numSockets=${numReceivers} --dds-i data:%taskIndex%</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env> <env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties> <properties>
<name access="read">fmqchan_sync</id> <name access="read">fmqchan_sync</name>
<name access="read">fmqchan_data</id> <name access="read">fmqchan_data</name>
</properties> </properties>
</decltask> </decltask>
@@ -27,7 +27,7 @@
<exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pair,method=bind,numSockets=${numSenders}</exe> <exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pair,method=bind,numSockets=${numSenders}</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env> <env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties> <properties>
<name access="write">fmqchan_data</id> <name access="write">fmqchan_data</name>
</properties> </properties>
</decltask> </decltask>

View File

@@ -10,7 +10,7 @@
<exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe> <exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env> <env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties> <properties>
<name access="write">fmqchan_sync</id> <name access="write">fmqchan_sync</name>
</properties> </properties>
</decltask> </decltask>
@@ -18,8 +18,8 @@
<exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=push,method=connect,numSockets=${numReceivers}</exe> <exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=push,method=connect,numSockets=${numReceivers}</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env> <env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties> <properties>
<name access="read">fmqchan_sync</id> <name access="read">fmqchan_sync</name>
<name access="read">fmqchan_data</id> <name access="read">fmqchan_data</name>
</properties> </properties>
</decltask> </decltask>
@@ -27,7 +27,7 @@
<exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pull,method=bind</exe> <exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pull,method=bind</exe>
<env reachable="false">fairmq-ex-n-m-env.sh</env> <env reachable="false">fairmq-ex-n-m-env.sh</env>
<properties> <properties>
<name access="write">fmqchan_data</id> <name access="write">fmqchan_data</name>
</properties> </properties>
</decltask> </decltask>

View File

@@ -51,7 +51,7 @@ fairmq-dds-command-ui -c k
fairmq-dds-command-ui -c b fairmq-dds-command-ui -c b
fairmq-dds-command-ui -c x fairmq-dds-command-ui -c x
fairmq-dds-command-ui -c j fairmq-dds-command-ui -c j
allexceptqctasks="main/(Sampler|QCDispatcher|Sink)" allexceptqctasks="main/(Sampler|QCDispatcher|Sink).*"
fairmq-dds-command-ui -c r -p $allexceptqctasks fairmq-dds-command-ui -c r -p $allexceptqctasks
qctask="main/QCTask.*" qctask="main/QCTask.*"
qcdispatcher="main/QCDispatcher.*" qcdispatcher="main/QCDispatcher.*"

View File

@@ -380,6 +380,7 @@ if(BUILD_FAIRMQ)
Boost::boost Boost::boost
Boost::date_time Boost::date_time
Boost::program_options Boost::program_options
FairLogger::FairLogger
PicoSHA2 PicoSHA2
) )
target_include_directories(fairmq-shmmonitor PUBLIC target_include_directories(fairmq-shmmonitor PUBLIC

View File

@@ -38,6 +38,10 @@ class FairMQSocket
virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0;
virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0;
/// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value
/// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in
/// the future.
virtual void Events(uint32_t* events) = 0;
virtual void SetLinger(const int value) = 0; virtual void SetLinger(const int value) = 0;
virtual int GetLinger() const = 0; virtual int GetLinger() const = 0;
virtual void SetSndBufSize(const int value) = 0; virtual void SetSndBufSize(const int value) = 0;

View File

@@ -18,9 +18,10 @@ namespace fair
namespace mq namespace mq
{ {
array<string, 15> stateNames = array<string, 16> stateNames =
{ {
{ {
"UNDEFINED",
"OK", "OK",
"ERROR", "ERROR",
"IDLE", "IDLE",
@@ -41,6 +42,7 @@ array<string, 15> stateNames =
unordered_map<string, State> states = unordered_map<string, State> states =
{ {
{ "UNDEFINED", State::Undefined },
{ "OK", State::Ok }, { "OK", State::Ok },
{ "ERROR", State::Error }, { "ERROR", State::Error },
{ "IDLE", State::Idle }, { "IDLE", State::Idle },

View File

@@ -20,6 +20,7 @@ namespace mq
enum class State : int enum class State : int
{ {
Undefined = 0,
Ok, Ok,
Error, Error,
Idle, Idle,
@@ -39,7 +40,7 @@ enum class State : int
enum class Transition : int enum class Transition : int
{ {
Auto, Auto = 0,
InitDevice, InitDevice,
CompleteInit, CompleteInit,
Bind, Bind,

View File

@@ -32,6 +32,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
, fMemSet(false) , fMemSet(false)
, fNumParts(1) , fNumParts(1)
, fMsgSize(10000) , fMsgSize(10000)
, fMsgAlignment(0)
, fMsgRate(0) , fMsgRate(0)
, fNumIterations(0) , fNumIterations(0)
, fMaxIterations(0) , fMaxIterations(0)
@@ -44,6 +45,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
fMemSet = fConfig->GetProperty<bool>("memset"); fMemSet = fConfig->GetProperty<bool>("memset");
fNumParts = fConfig->GetProperty<size_t>("num-parts"); fNumParts = fConfig->GetProperty<size_t>("num-parts");
fMsgSize = fConfig->GetProperty<size_t>("msg-size"); fMsgSize = fConfig->GetProperty<size_t>("msg-size");
fMsgAlignment = fConfig->GetProperty<size_t>("msg-alignment");
fMsgRate = fConfig->GetProperty<float>("msg-rate"); fMsgRate = fConfig->GetProperty<float>("msg-rate");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fOutChannelName = fConfig->GetProperty<std::string>("out-channel"); fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
@@ -64,7 +66,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
FairMQParts parts; FairMQParts parts;
for (size_t i = 0; i < fNumParts; ++i) { for (size_t i = 0; i < fNumParts; ++i) {
parts.AddPart(dataOutChannel.NewMessage(fMsgSize)); parts.AddPart(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment}));
if (fMemSet) { if (fMemSet) {
std::memset(parts.At(i)->GetData(), 0, parts.At(i)->GetSize()); std::memset(parts.At(i)->GetData(), 0, parts.At(i)->GetSize());
} }
@@ -79,7 +81,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
++fNumIterations; ++fNumIterations;
} }
} else { } else {
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize)); FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment}));
if (fMemSet) { if (fMemSet) {
std::memset(msg->GetData(), 0, msg->GetSize()); std::memset(msg->GetData(), 0, msg->GetSize());
} }
@@ -111,6 +113,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
bool fMemSet; bool fMemSet;
size_t fNumParts; size_t fNumParts;
size_t fMsgSize; size_t fMsgSize;
size_t fMsgAlignment;
std::atomic<int> fMsgCounter; std::atomic<int> fMsgCounter;
float fMsgRate; float fMsgRate;
uint64_t fNumIterations; uint64_t fNumIterations;

View File

@@ -45,6 +45,7 @@ class Socket final : public fair::mq::Socket
auto GetId() const -> std::string override { return fId; } auto GetId() const -> std::string override { return fId; }
auto Events(uint32_t *events) -> void override { *events = 0; }
auto Bind(const std::string& address) -> bool override; auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> bool override; auto Connect(const std::string& address) -> bool override;

View File

@@ -65,7 +65,7 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
cout << d.taskId << " : " << d.state << endl; cout << d.taskId << " : " << d.state << endl;
} }
} else if (command == "o") { } else if (command == "o") {
cout << "> dumping config of the devices (" << path << ")" << endl; cout << "> dumping config of " << (path == "" ? "all" : path) << endl;
// TODO: extend this regex to return all properties, once command size limitation is removed. // TODO: extend this regex to return all properties, once command size limitation is removed.
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout)); auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
for (const auto& d : result.second.devices) { for (const auto& d : result.second.devices) {
@@ -79,39 +79,39 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
return; return;
} }
const DeviceProperties props{{pKey, pVal}}; const DeviceProperties props{{pKey, pVal}};
cout << "> sending property (" << path << ")" << endl; cout << "> setting properties --> " << (path == "" ? "all" : path) << endl;
topo.SetProperties(props, path); topo.SetProperties(props, path);
// give dds time to complete request // give dds time to complete request
this_thread::sleep_for(chrono::milliseconds(100)); this_thread::sleep_for(chrono::milliseconds(100));
} else if (command == "i") { } else if (command == "i") {
cout << "> init devices (" << path << ")" << endl; cout << "> initiating InitDevice transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::InitDevice, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::InitDevice, path, std::chrono::milliseconds(timeout));
} else if (command == "k") { } else if (command == "k") {
cout << "> complete init (" << path << ")" << endl; cout << "> initiating CompleteInit transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::CompleteInit, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::CompleteInit, path, std::chrono::milliseconds(timeout));
} else if (command == "b") { } else if (command == "b") {
cout << "> bind devices (" << path << ")" << endl; cout << "> initiating Bind transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Bind, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Bind, path, std::chrono::milliseconds(timeout));
} else if (command == "x") { } else if (command == "x") {
cout << "> connect devices (" << path << ")" << endl; cout << "> initiating Connect transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Connect, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Connect, path, std::chrono::milliseconds(timeout));
} else if (command == "j") { } else if (command == "j") {
cout << "> init tasks (" << path << ")" << endl; cout << "> initiating InitTask transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::InitTask, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::InitTask, path, std::chrono::milliseconds(timeout));
} else if (command == "r") { } else if (command == "r") {
cout << "> run tasks (" << path << ")" << endl; cout << "> initiating Run transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Run, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Run, path, std::chrono::milliseconds(timeout));
} else if (command == "s") { } else if (command == "s") {
cout << "> stop devices (" << path << ")" << endl; cout << "> initiating Stop transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::Stop, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::Stop, path, std::chrono::milliseconds(timeout));
} else if (command == "t") { } else if (command == "t") {
cout << "> reset tasks (" << path << ")" << endl; cout << "> initiating ResetTask transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::ResetTask, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::ResetTask, path, std::chrono::milliseconds(timeout));
} else if (command == "d") { } else if (command == "d") {
cout << "> reset devices (" << path << ")" << endl; cout << "> initiating ResetDevice transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::ResetDevice, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::ResetDevice, path, std::chrono::milliseconds(timeout));
} else if (command == "q") { } else if (command == "q") {
cout << "> end (" << path << ")" << endl; cout << "> initiating End transition --> " << (path == "" ? "all" : path) << endl;
topo.ChangeState(TopologyTransition::End, path, std::chrono::milliseconds(timeout)); topo.ChangeState(TopologyTransition::End, path, std::chrono::milliseconds(timeout));
} else if (command == "h") { } else if (command == "h") {
cout << "> help" << endl; cout << "> help" << endl;
@@ -206,6 +206,7 @@ try {
sendCommand(command, path, timeout, topo, pKey, pVal); sendCommand(command, path, timeout, topo, pKey, pVal);
} }
size_t pos = targetState.find("->"); size_t pos = targetState.find("->");
cout << "> waiting for " << (path == "" ? "all" : path) << " to reach " << targetState << endl;
if (pos == string::npos) { if (pos == string::npos) {
/* auto ec = */topo.WaitForState(GetState(targetState), path, std::chrono::milliseconds(timeout)); /* auto ec = */topo.WaitForState(GetState(targetState), path, std::chrono::milliseconds(timeout));
// cout << "WaitForState(" << targetState << ") result: " << ec.message() << endl; // cout << "WaitForState(" << targetState << ") result: " << ec.message() << endl;

View File

@@ -69,6 +69,8 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") ("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)") ("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-mlock-segment", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.")
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") ("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.") ("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") ("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")

View File

@@ -19,6 +19,7 @@ void addCustomOptions(bpo::options_description& options)
("memset", bpo::value<bool>()->default_value(false), "Memset allocated buffers to 0") ("memset", bpo::value<bool>()->default_value(false), "Memset allocated buffers to 0")
("num-parts", bpo::value<size_t>()->default_value(1), "Number of parts to send. 1 will send single messages, not parts") ("num-parts", bpo::value<size_t>()->default_value(1), "Number of parts to send. 1 will send single messages, not parts")
("msg-size", bpo::value<size_t>()->default_value(1000000), "Message size in bytes") ("msg-size", bpo::value<size_t>()->default_value(1000000), "Message size in bytes")
("msg-alignment", bpo::value<size_t>()->default_value(0), "Message alignment")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)") ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second"); ("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second");
} }

View File

@@ -25,11 +25,6 @@ struct RuntimeError : ::std::runtime_error
{} {}
}; };
struct MixedStateError : RuntimeError
{
using RuntimeError::RuntimeError;
};
} /* namespace sdk */ } /* namespace sdk */
enum class ErrorCode enum class ErrorCode

View File

@@ -70,6 +70,65 @@ const std::map<DeviceTransition, DeviceState> expectedState =
{ DeviceTransition::End, DeviceState::Exiting } { DeviceTransition::End, DeviceState::Exiting }
}; };
// mirrors DeviceState, but adds a "Mixed" state that represents a topology where devices are currently not in the same state.
enum class AggregatedTopologyState : int
{
Undefined = static_cast<int>(fair::mq::State::Undefined),
Ok = static_cast<int>(fair::mq::State::Ok),
Error = static_cast<int>(fair::mq::State::Error),
Idle = static_cast<int>(fair::mq::State::Idle),
InitializingDevice = static_cast<int>(fair::mq::State::InitializingDevice),
Initialized = static_cast<int>(fair::mq::State::Initialized),
Binding = static_cast<int>(fair::mq::State::Binding),
Bound = static_cast<int>(fair::mq::State::Bound),
Connecting = static_cast<int>(fair::mq::State::Connecting),
DeviceReady = static_cast<int>(fair::mq::State::DeviceReady),
InitializingTask = static_cast<int>(fair::mq::State::InitializingTask),
Ready = static_cast<int>(fair::mq::State::Ready),
Running = static_cast<int>(fair::mq::State::Running),
ResettingTask = static_cast<int>(fair::mq::State::ResettingTask),
ResettingDevice = static_cast<int>(fair::mq::State::ResettingDevice),
Exiting = static_cast<int>(fair::mq::State::Exiting),
Mixed
};
inline auto operator==(DeviceState lhs, AggregatedTopologyState rhs) -> bool
{
return static_cast<int>(lhs) == static_cast<int>(rhs);
}
inline auto operator==(AggregatedTopologyState lhs, DeviceState rhs) -> bool
{
return static_cast<int>(lhs) == static_cast<int>(rhs);
}
inline std::ostream& operator<<(std::ostream& os, const AggregatedTopologyState& state)
{
if (state == AggregatedTopologyState::Mixed) {
return os << "MIXED";
} else {
return os << static_cast<DeviceState>(state);
}
}
inline std::string GetAggregatedTopologyStateName(AggregatedTopologyState s)
{
if (s == AggregatedTopologyState::Mixed) {
return "MIXED";
} else {
return GetStateName(static_cast<State>(s));
}
}
inline AggregatedTopologyState GetAggregatedTopologyState(const std::string& state)
{
if (state == "MIXED") {
return AggregatedTopologyState::Mixed;
} else {
return static_cast<AggregatedTopologyState>(GetState(state));
}
}
struct DeviceStatus struct DeviceStatus
{ {
bool subscribed_to_state_changes; bool subscribed_to_state_changes;
@@ -100,22 +159,22 @@ using TopologyStateByTask = std::unordered_map<DDSTask::Id, DeviceStatus>;
using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>; using TopologyStateByCollection = std::unordered_map<DDSCollection::Id, std::vector<DeviceStatus>>;
using TopologyTransition = fair::mq::Transition; using TopologyTransition = fair::mq::Transition;
inline DeviceState AggregateState(const TopologyState& topologyState) inline AggregatedTopologyState AggregateState(const TopologyState& topologyState)
{ {
DeviceState first = topologyState.begin()->state; DeviceState first = topologyState.begin()->state;
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) { if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
return i.state == first; return i.state == first;
})) { })) {
return first; return static_cast<AggregatedTopologyState>(first);
} }
throw MixedStateError("State is not uniform"); return AggregatedTopologyState::Mixed;
} }
inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state) inline bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
{ {
return AggregateState(topologyState) == state; return AggregateState(topologyState) == static_cast<AggregatedTopologyState>(state);
} }
inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState) inline TopologyStateByCollection GroupByCollectionId(const TopologyState& topologyState)
@@ -439,6 +498,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (fTasks.empty()) {
FAIR_LOG(warn) << "ChangeState initiated on an empty set of tasks, check the path argument.";
}
} }
ChangeStateOp() = delete; ChangeStateOp() = delete;
ChangeStateOp(const ChangeStateOp&) = delete; ChangeStateOp(const ChangeStateOp&) = delete;
@@ -741,6 +803,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (fTasks.empty()) {
FAIR_LOG(warn) << "WaitForState initiated on an empty set of tasks, check the path argument.";
}
} }
WaitForStateOp() = delete; WaitForStateOp() = delete;
WaitForStateOp(const WaitForStateOp&) = delete; WaitForStateOp(const WaitForStateOp&) = delete;
@@ -754,9 +819,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
{ {
fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) { fCount = std::count_if(stateIndex.cbegin(), stateIndex.cend(), [=](const auto& s) {
if (ContainsTask(stateData.at(s.second).taskId)) { if (ContainsTask(stateData.at(s.second).taskId)) {
return stateData.at(s.second).state == fTargetCurrentState return stateData.at(s.second).state == fTargetCurrentState &&
&& (stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined);
(stateData.at(s.second).lastState == fTargetLastState || fTargetLastState == DeviceState::Ok);
} else { } else {
return false; return false;
} }
@@ -768,8 +832,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
{ {
if (!fOp.IsCompleted() && ContainsTask(taskId)) { if (!fOp.IsCompleted() && ContainsTask(taskId)) {
if (currentState == fTargetCurrentState && if (currentState == fTargetCurrentState &&
(lastState == fTargetLastState || (lastState == fTargetLastState || fTargetLastState == DeviceState::Undefined)) {
fTargetLastState == DeviceState::Ok)) {
++fCount; ++fCount;
} }
TryCompletion(); TryCompletion();
@@ -873,7 +936,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
template<typename CompletionToken> template<typename CompletionToken>
auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token) auto AsyncWaitForState(const DeviceState targetCurrentState, CompletionToken&& token)
{ {
return AsyncWaitForState(DeviceState::Ok, targetCurrentState, "", Duration(0), std::move(token)); return AsyncWaitForState(DeviceState::Undefined, targetCurrentState, "", Duration(0), std::move(token));
} }
/// @brief Wait for selected FairMQ devices to reach given last & current state in this topology /// @brief Wait for selected FairMQ devices to reach given last & current state in this topology
@@ -903,7 +966,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0)) auto WaitForState(const DeviceState targetCurrentState, const std::string& path = "", Duration timeout = Duration(0))
-> std::error_code -> std::error_code
{ {
return WaitForState(DeviceState::Ok, targetCurrentState, path, timeout); return WaitForState(DeviceState::Undefined, targetCurrentState, path, timeout);
} }
using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult); using GetPropertiesCompletionSignature = void(std::error_code, GetPropertiesResult);
@@ -938,6 +1001,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (expectedCount == 0) {
FAIR_LOG(warn) << "GetProperties initiated on an empty set of tasks, check the path argument.";
}
// FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started."; // FAIR_LOG(debug) << "GetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
} }
GetPropertiesOp() = delete; GetPropertiesOp() = delete;
@@ -1093,6 +1159,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} }
}); });
} }
if (expectedCount == 0) {
FAIR_LOG(warn) << "SetProperties initiated on an empty set of tasks, check the path argument.";
}
// FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started."; // FAIR_LOG(debug) << "SetProperties " << fId << " with expected count of " << fExpectedCount << " started.";
} }
SetPropertiesOp() = delete; SetPropertiesOp() = delete;
@@ -1242,7 +1311,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
int index = 0; int index = 0;
for (const auto& task : fDDSTopo.GetTasks()) { for (const auto& task : fDDSTopo.GetTasks()) {
fStateData.push_back(DeviceStatus{false, DeviceState::Ok, DeviceState::Ok, task.GetId(), task.GetCollectionId()}); fStateData.push_back(DeviceStatus{false, DeviceState::Undefined, DeviceState::Undefined, task.GetId(), task.GetCollectionId()});
fStateIndex.emplace(task.GetId(), index); fStateIndex.emplace(task.GetId(), index);
index++; index++;
} }

View File

@@ -70,9 +70,10 @@ array<string, 17> typeNames =
} }
}; };
array<fair::mq::State, 15> fbStateToMQState = array<fair::mq::State, 16> fbStateToMQState =
{ {
{ {
fair::mq::State::Undefined,
fair::mq::State::Ok, fair::mq::State::Ok,
fair::mq::State::Error, fair::mq::State::Error,
fair::mq::State::Idle, fair::mq::State::Idle,
@@ -91,9 +92,10 @@ array<fair::mq::State, 15> fbStateToMQState =
} }
}; };
array<sdk::cmd::FBState, 15> mqStateToFBState = array<sdk::cmd::FBState, 16> mqStateToFBState =
{ {
{ {
sdk::cmd::FBState_Undefined,
sdk::cmd::FBState_Ok, sdk::cmd::FBState_Ok,
sdk::cmd::FBState_Error, sdk::cmd::FBState_Error,
sdk::cmd::FBState_Idle, sdk::cmd::FBState_Idle,

View File

@@ -6,6 +6,7 @@ enum FBResult:byte {
} }
enum FBState:byte { enum FBState:byte {
Undefined,
Ok, Ok,
Error, Error,
Idle, Idle,

View File

@@ -21,16 +21,19 @@
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <FairMQMessage.h> #include <FairMQMessage.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/tools/CppSTL.h> #include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
#include <boost/process.hpp>
#include <boost/filesystem.hpp>
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/filesystem.hpp>
#include <boost/interprocess/indexes/null_index.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/named_condition.hpp> #include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/named_mutex.hpp> #include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/mem_algo/simple_seq_fit.hpp>
#include <boost/process.hpp>
#include <cstdlib> // getenv #include <cstdlib> // getenv
#include <condition_variable> #include <condition_variable>
@@ -43,6 +46,8 @@
#include <utility> // pair #include <utility> // pair
#include <vector> #include <vector>
#include <sys/mman.h> // mlock
namespace fair namespace fair
{ {
namespace mq namespace mq
@@ -52,11 +57,18 @@ namespace shmem
struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; }; struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; };
using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory<char,
boost::interprocess::simple_seq_fit<boost::interprocess::mutex_family>,
boost::interprocess::iset_index>;
using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory<char,
boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,
boost::interprocess::iset_index>;
class Manager class Manager
{ {
public: public:
Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc) Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(id)) : fShmId(std::move(shmId))
, fDeviceId(std::move(deviceId)) , fDeviceId(std::move(deviceId))
, fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360)
@@ -70,10 +82,39 @@ class Manager
, fMsgCounter(0) , fMsgCounter(0)
, fHeartbeatThread() , fHeartbeatThread()
, fSendHeartbeats(true) , fSendHeartbeats(true)
, fThrowOnBadAlloc(throwOnBadAlloc) , fThrowOnBadAlloc(true)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
bool mlockSegment = false;
bool zeroSegment = false;
bool autolaunchMonitor = false;
if (config) {
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
fThrowOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", fThrowOnBadAlloc);
} else {
LOG(debug) << "ProgOptions not available! Using defaults.";
}
if (autolaunchMonitor) {
StartMonitor(fShmId);
}
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
if (mlock(fSegment.get_address(), fSegment.get_size()) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
fSegment.zero_free_memory();
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc); fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
// store info about the managed segment as region with id 0 // store info about the managed segment as region with id 0
@@ -101,7 +142,7 @@ class Manager
Manager(const Manager&) = delete; Manager(const Manager&) = delete;
Manager operator=(const Manager&) = delete; Manager operator=(const Manager&) = delete;
boost::interprocess::managed_shared_memory& Segment() { return fSegment; } RBTreeBestFitSegment& Segment() { return fSegment; }
boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; } boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
static void StartMonitor(const std::string& id) static void StartMonitor(const std::string& id)
@@ -350,8 +391,8 @@ class Manager
} }
} }
void IncrementMsgCounter() { ++fMsgCounter; } void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
void DecrementMsgCounter() { --fMsgCounter; } void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
void SendHeartbeats() void SendHeartbeats()
{ {
@@ -414,7 +455,8 @@ class Manager
private: private:
std::string fShmId; std::string fShmId;
std::string fDeviceId; std::string fDeviceId;
boost::interprocess::managed_shared_memory fSegment; // boost::interprocess::managed_shared_memory fSegment;
RBTreeBestFitSegment fSegment;
boost::interprocess::managed_shared_memory fManagementSegment; boost::interprocess::managed_shared_memory fManagementSegment;
VoidAlloc fShmVoidAlloc; VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx; boost::interprocess::named_mutex fShmMtx;

View File

@@ -244,7 +244,7 @@ class Message final : public fair::mq::Message
bool InitializeChunk(const size_t size, size_t alignment = 0) bool InitializeChunk(const size_t size, size_t alignment = 0)
{ {
tools::RateLimiter rateLimiter(20); // tools::RateLimiter rateLimiter(20);
while (fMeta.fHandle < 0) { while (fMeta.fHandle < 0) {
try { try {
@@ -263,9 +263,10 @@ class Message final : public fair::mq::Message
} catch (boost::interprocess::bad_alloc& ba) { } catch (boost::interprocess::bad_alloc& ba) {
// LOG(warn) << "Shared memory full..."; // LOG(warn) << "Shared memory full...";
if (fManager.ThrowingOnBadAlloc()) { if (fManager.ThrowingOnBadAlloc()) {
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default")); throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", fManager.Segment().get_free_memory()));
} }
rateLimiter.maybe_sleep(); // rateLimiter.maybe_sleep();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (fManager.Interrupted()) { if (fManager.Interrupted()) {
return false; return false;
} else { } else {

View File

@@ -10,6 +10,7 @@
#include "Common.h" #include "Common.h"
#include <fairmq/Tools.h> #include <fairmq/Tools.h>
#include <fairlogger/Logger.h>
#include <boost/interprocess/managed_shared_memory.hpp> #include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/file_mapping.hpp> #include <boost/interprocess/file_mapping.hpp>
@@ -22,6 +23,10 @@
#include <csignal> #include <csignal>
#include <iostream> #include <iostream>
#include <iomanip> #include <iomanip>
#include <chrono>
#include <ctime>
#include <time.h>
#include <iomanip>
#include <termios.h> #include <termios.h>
#include <poll.h> #include <poll.h>
@@ -48,7 +53,7 @@ void signalHandler(int signal)
gSignalStatus = signal; gSignalStatus = signal;
} }
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit) Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit)
: fSelfDestruct(selfDestruct) : fSelfDestruct(selfDestruct)
, fInteractive(interactive) , fInteractive(interactive)
, fViewOnly(viewOnly) , fViewOnly(viewOnly)
@@ -56,6 +61,7 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
, fSeenOnce(false) , fSeenOnce(false)
, fCleanOnExit(cleanOnExit) , fCleanOnExit(cleanOnExit)
, fTimeoutInMS(timeoutInMS) , fTimeoutInMS(timeoutInMS)
, fIntervalInMS(intervalInMS)
, fShmId(shmId) , fShmId(shmId)
, fSegmentName("fmq_" + fShmId + "_main") , fSegmentName("fmq_" + fShmId + "_main")
, fManagementSegmentName("fmq_" + fShmId + "_mng") , fManagementSegmentName("fmq_" + fShmId + "_mng")
@@ -74,6 +80,10 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited.")); throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited."));
} }
} }
Logger::SetConsoleColor(false);
Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us));
Logger::SetVerbosity(Verbosity::verylow);
} }
void Monitor::CatchSignals() void Monitor::CatchSignals()
@@ -110,7 +120,7 @@ void Monitor::Run()
Interactive(); Interactive();
} else { } else {
while (!fTerminating) { while (!fTerminating) {
this_thread::sleep_for(chrono::milliseconds(100)); this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
CheckSegment(); CheckSegment();
} }
} }
@@ -183,7 +193,7 @@ void Monitor::Interactive()
PrintHeader(); PrintHeader();
while (!fTerminating) { while (!fTerminating) {
if (poll(cinfd, 1, 100)) { if (poll(cinfd, 1, fIntervalInMS)) {
if (fTerminating || gSignalStatus != 0) { if (fTerminating || gSignalStatus != 0) {
break; break;
} }
@@ -275,7 +285,7 @@ void Monitor::CheckSegment()
unsigned int numDevices = 0; unsigned int numDevices = 0;
if (fInteractive) { if (fInteractive || fViewOnly) {
DeviceCounter* dc = managementSegment.find<DeviceCounter>(bipc::unique_instance).first; DeviceCounter* dc = managementSegment.find<DeviceCounter>(bipc::unique_instance).first;
if (dc) { if (dc) {
numDevices = dc->fCount; numDevices = dc->fCount;
@@ -303,6 +313,15 @@ void Monitor::CheckSegment()
<< setw(8) << numDevices << " | " << setw(8) << numDevices << " | "
<< setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |" << setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |"
<< c << flush; << c << flush;
} else if (fViewOnly) {
size_t free = segment.get_free_memory();
size_t total = segment.get_size();
size_t used = total - free;
LOGV(info, user1) << "[" << fSegmentName
<< "] devices: " << numDevices
<< ", total: " << total
<< ", free: " << free
<< ", used: " << used;
} }
} catch (bie&) { } catch (bie&) {
fHeartbeatTriggered = false; fHeartbeatTriggered = false;
@@ -473,7 +492,7 @@ void Monitor::Cleanup(const ShmId& shmId)
RemoveObject(managementSegmentName.c_str()); RemoveObject(managementSegmentName.c_str());
} catch (bie&) { } catch (bie&) {
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl; cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
} catch(std::out_of_range& oor) { } catch(out_of_range& oor) {
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl; cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
} }

View File

@@ -37,7 +37,7 @@ struct ShmId
class Monitor class Monitor
{ {
public: public:
Monitor(const std::string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit); Monitor(const std::string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit);
Monitor(const Monitor&) = delete; Monitor(const Monitor&) = delete;
Monitor operator=(const Monitor&) = delete; Monitor operator=(const Monitor&) = delete;
@@ -84,6 +84,7 @@ class Monitor
bool fSeenOnce; // true is segment has been opened successfully at least once bool fSeenOnce; // true is segment has been opened successfully at least once
bool fCleanOnExit; bool fCleanOnExit;
unsigned int fTimeoutInMS; unsigned int fTimeoutInMS;
unsigned int fIntervalInMS;
std::string fShmId; std::string fShmId;
std::string fSegmentName; std::string fSegmentName;
std::string fManagementSegmentName; std::string fManagementSegmentName;

View File

@@ -127,14 +127,21 @@ class Poller final : public fair::mq::Poller
void Poll(const int timeout) override void Poll(const int timeout) override
{ {
while (true) {
if (zmq_poll(fItems, fNumItems, timeout) < 0) { if (zmq_poll(fItems, fNumItems, timeout) < 0) {
if (errno == ETERM) { if (errno == ETERM) {
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
return;
} else if (errno == EINTR) {
LOG(debug) << "polling interrupted by system call";
continue;
} else { } else {
LOG(error) << "polling failed, reason: " << zmq_strerror(errno); LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
} }
} }
break;
}
} }
bool CheckInput(const int index) override bool CheckInput(const int index) override

View File

@@ -148,9 +148,6 @@ class Socket final : public fair::mq::Socket
if (zmq_errno() == ETERM) { if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId; LOG(debug) << "Terminating socket " << fId;
return -1; return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Transfer interrupted by system call";
return -1;
} else { } else {
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
return -1; return -1;
@@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket
size_t size = msg->GetSize(); size_t size = msg->GetSize();
fBytesTx += size; fBytesTx += size;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@@ -220,7 +217,7 @@ class Socket final : public fair::mq::Socket
fBytesRx += size; fBytesRx += size;
++fMessagesRx; ++fMessagesRx;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@@ -269,7 +266,7 @@ class Socket final : public fair::mq::Socket
fBytesTx += totalSize; fBytesTx += totalSize;
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@@ -323,7 +320,7 @@ class Socket final : public fair::mq::Socket
fBytesRx += totalSize; fBytesRx += totalSize;
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@@ -375,6 +372,15 @@ class Socket final : public fair::mq::Socket
} }
} }
void Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(
tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
}
int GetLinger() const override int GetLinger() const override
{ {
int value = 0; int value = 0;
@@ -485,6 +491,12 @@ class Socket final : public fair::mq::Socket
if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE; if (constant == "snd-more no-block") return ZMQ_DONTWAIT|ZMQ_SNDMORE;
if (constant == "fd") return ZMQ_FD; if (constant == "fd") return ZMQ_FD;
if (constant == "events")
return ZMQ_EVENTS;
if (constant == "pollin")
return ZMQ_POLLIN;
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1; return -1;
} }

View File

@@ -43,7 +43,7 @@ class TransportFactory final : public fair::mq::TransportFactory
: fair::mq::TransportFactory(id) : fair::mq::TransportFactory(id)
, fDeviceId(id) , fDeviceId(id)
, fShmId() , fShmId()
, fZMQContext(zmq_ctx_new()) , fZmqCtx(zmq_ctx_new())
, fManager(nullptr) , fManager(nullptr)
{ {
int major, minor, patch; int major, minor, patch;
@@ -51,21 +51,17 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
<< "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")";
if (!fZMQContext) { if (!fZmqCtx) {
throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
} }
int numIoThreads = 1; int numIoThreads = 1;
std::string sessionName = "default"; std::string sessionName = "default";
size_t segmentSize = 2ULL << 30; size_t segmentSize = 2ULL << 30;
bool autolaunchMonitor = false;
bool throwOnBadAlloc = true;
if (config) { if (config) {
numIoThreads = config->GetProperty<int>("io-threads", numIoThreads); numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
sessionName = config->GetProperty<std::string>("session", sessionName); sessionName = config->GetProperty<std::string>("session", sessionName);
segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize); segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
throwOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", throwOnBadAlloc);
} else { } else {
LOG(debug) << "ProgOptions not available! Using defaults."; LOG(debug) << "ProgOptions not available! Using defaults.";
} }
@@ -74,20 +70,16 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'."; LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
try { try {
if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) { if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
} }
// Set the maximum number of allowed sockets on the context. // Set the maximum number of allowed sockets on the context.
if (zmq_ctx_set(fZMQContext, ZMQ_MAX_SOCKETS, 10000) != 0) { if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
} }
if (autolaunchMonitor) { fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, config);
Manager::StartMonitor(fShmId);
}
fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, throwOnBadAlloc);
} catch (boost::interprocess::interprocess_exception& e) { } catch (boost::interprocess::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what(); LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what())); throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));
@@ -129,7 +121,7 @@ class TransportFactory final : public fair::mq::TransportFactory
SocketPtr CreateSocket(const std::string& type, const std::string& name) override SocketPtr CreateSocket(const std::string& type, const std::string& name) override
{ {
return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZMQContext, this); return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZmqCtx, this);
} }
PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override
@@ -187,15 +179,18 @@ class TransportFactory final : public fair::mq::TransportFactory
{ {
LOG(debug) << "Destroying Shared Memory transport..."; LOG(debug) << "Destroying Shared Memory transport...";
if (fZMQContext) { if (fZmqCtx) {
if (zmq_ctx_term(fZMQContext) != 0) { while (true) {
if (zmq_ctx_term(fZmqCtx) != 0) {
if (errno == EINTR) { if (errno == EINTR) {
LOG(error) << "failed closing context, reason: " << zmq_strerror(errno); LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
continue;
} else { } else {
fZMQContext = nullptr; fZmqCtx = nullptr;
return;
} }
} }
break;
}
} else { } else {
LOG(error) << "context not available for shutdown"; LOG(error) << "context not available for shutdown";
} }
@@ -204,7 +199,7 @@ class TransportFactory final : public fair::mq::TransportFactory
private: private:
std::string fDeviceId; std::string fDeviceId;
std::string fShmId; std::string fShmId;
void* fZMQContext; void* fZmqCtx;
std::unique_ptr<Manager> fManager; std::unique_ptr<Manager> fManager;
}; };

View File

@@ -76,6 +76,7 @@ int main(int argc, char** argv)
bool interactive = false; bool interactive = false;
bool viewOnly = false; bool viewOnly = false;
unsigned int timeoutInMS = 5000; unsigned int timeoutInMS = 5000;
unsigned int intervalInMS = 100;
bool runAsDaemon = false; bool runAsDaemon = false;
bool cleanOnExit = false; bool cleanOnExit = false;
@@ -90,6 +91,7 @@ int main(int argc, char** argv)
("timeout,t" , value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds") ("timeout,t" , value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds")
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor") ("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor")
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit") ("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
("interval" , value<unsigned int>(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode")
("help,h", "Print help"); ("help,h", "Print help");
variables_map vm; variables_map vm;
@@ -116,8 +118,11 @@ int main(int argc, char** argv)
} }
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl; cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
if (viewOnly && !interactive) {
cout << "running in non-interactive view-only mode, outputting with interval of " << intervalInMS << "ms. (change with --interval), press ctrl+C to exit." << endl;
}
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, runAsDaemon, cleanOnExit); Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, runAsDaemon, cleanOnExit);
monitor.CatchSignals(); monitor.CatchSignals();
monitor.Run(); monitor.Run();

View File

@@ -161,14 +161,17 @@ class Context
UnsubscribeFromRegionEvents(); UnsubscribeFromRegionEvents();
if (fZmqCtx) { if (fZmqCtx) {
while (true) {
if (zmq_ctx_term(fZmqCtx) != 0) { if (zmq_ctx_term(fZmqCtx) != 0) {
if (errno == EINTR) { if (errno == EINTR) {
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno); LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
continue;
} else { } else {
fZmqCtx = nullptr; fZmqCtx = nullptr;
return;
} }
} }
break;
}
} else { } else {
LOG(error) << "context not available for shutdown"; LOG(error) << "context not available for shutdown";
} }

View File

@@ -130,14 +130,21 @@ class Poller final : public fair::mq::Poller
void Poll(const int timeout) override void Poll(const int timeout) override
{ {
while (true) {
if (zmq_poll(fItems, fNumItems, timeout) < 0) { if (zmq_poll(fItems, fNumItems, timeout) < 0) {
if (errno == ETERM) { if (errno == ETERM) {
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
return;
} else if (errno == EINTR) {
LOG(debug) << "polling interrupted by system call";
continue;
} else { } else {
LOG(error) << "polling failed, reason: " << zmq_strerror(errno); LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno))); throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
} }
} }
break;
}
} }
bool CheckInput(const int index) override bool CheckInput(const int index) override

View File

@@ -126,9 +126,6 @@ class Socket final : public fair::mq::Socket
if (zmq_errno() == ETERM) { if (zmq_errno() == ETERM) {
LOG(debug) << "Terminating socket " << fId; LOG(debug) << "Terminating socket " << fId;
return -1; return -1;
} else if (zmq_errno() == EINTR) {
LOG(debug) << "Transfer interrupted by system call";
return -1;
} else { } else {
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
return -1; return -1;
@@ -151,7 +148,7 @@ class Socket final : public fair::mq::Socket
fBytesTx += nbytes; fBytesTx += nbytes;
++fMessagesTx; ++fMessagesTx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket
fBytesRx += nbytes; fBytesRx += nbytes;
++fMessagesRx; ++fMessagesRx;
return nbytes; return nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
continue; continue;
} else { } else {
@@ -212,7 +209,7 @@ class Socket final : public fair::mq::Socket
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags); int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
if (nbytes >= 0) { if (nbytes >= 0) {
totalSize += nbytes; totalSize += nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true; repeat = true;
break; break;
@@ -261,7 +258,7 @@ class Socket final : public fair::mq::Socket
if (nbytes >= 0) { if (nbytes >= 0) {
msgVec.push_back(move(part)); msgVec.push_back(move(part));
totalSize += nbytes; totalSize += nbytes;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
if (ShouldRetry(flags, timeout, elapsed)) { if (ShouldRetry(flags, timeout, elapsed)) {
repeat = true; repeat = true;
break; break;
@@ -318,6 +315,15 @@ class Socket final : public fair::mq::Socket
} }
} }
void Events(uint32_t* events) override
{
size_t eventsSize = sizeof(uint32_t);
if (zmq_getsockopt(fSocket, ZMQ_EVENTS, events, &eventsSize) < 0) {
throw SocketError(
tools::ToString("failed setting ZMQ_EVENTS, reason: ", zmq_strerror(errno)));
}
}
void SetLinger(const int value) override void SetLinger(const int value) override
{ {
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
@@ -433,6 +439,12 @@ class Socket final : public fair::mq::Socket
if (constant == "linger") return ZMQ_LINGER; if (constant == "linger") return ZMQ_LINGER;
if (constant == "fd") return ZMQ_FD; if (constant == "fd") return ZMQ_FD;
if (constant == "events")
return ZMQ_EVENTS;
if (constant == "pollin")
return ZMQ_POLLIN;
if (constant == "pollout")
return ZMQ_POLLOUT;
return -1; return -1;
} }

View File

@@ -139,6 +139,32 @@ TEST_F(Topology, ChangeState)
EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true); EXPECT_EQ(sdk::StateEqualsTo(currentState, sdk::DeviceState::InitializingDevice), true);
} }
TEST_F(Topology, MixedState)
{
using namespace fair::mq;
sdk::Topology topo(mDDSTopo, mDDSSession);
auto result1(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/Sampler.*"));
LOG(info) << result1.first;
EXPECT_EQ(result1.first, std::error_code());
EXPECT_EQ(sdk::AggregateState(result1.second), sdk::AggregatedTopologyState::Mixed);
EXPECT_EQ(sdk::StateEqualsTo(result1.second, sdk::DeviceState::InitializingDevice), false);
auto const currentState1 = topo.GetCurrentState();
EXPECT_EQ(sdk::AggregateState(currentState1), sdk::AggregatedTopologyState::Mixed);
EXPECT_EQ(sdk::StateEqualsTo(currentState1, sdk::DeviceState::InitializingDevice), false);
auto result2(topo.ChangeState(sdk::TopologyTransition::InitDevice, "main/SinkGroup/.*"));
LOG(info) << result2.first;
EXPECT_EQ(result2.first, std::error_code());
EXPECT_EQ(sdk::AggregateState(result2.second), sdk::AggregatedTopologyState::InitializingDevice);
EXPECT_EQ(sdk::StateEqualsTo(result2.second, sdk::DeviceState::InitializingDevice), true);
auto const currentState2 = topo.GetCurrentState();
EXPECT_EQ(sdk::AggregateState(currentState2), sdk::AggregatedTopologyState::InitializingDevice);
EXPECT_EQ(sdk::StateEqualsTo(currentState2, sdk::DeviceState::InitializingDevice), true);
}
TEST_F(Topology, AsyncChangeStateConcurrent) TEST_F(Topology, AsyncChangeStateConcurrent)
{ {
using namespace fair::mq; using namespace fair::mq;
@@ -191,9 +217,9 @@ TEST_F(Topology, AsyncChangeStateCollectionView)
ASSERT_EQ(cstate.size(), 5); ASSERT_EQ(cstate.size(), 5);
for (const auto& c : cstate) { for (const auto& c : cstate) {
LOG(debug) << "\t" << c.first; LOG(debug) << "\t" << c.first;
State s; sdk::AggregatedTopologyState s;
ASSERT_NO_THROW(s = sdk::AggregateState(c.second)); ASSERT_NO_THROW(s = sdk::AggregateState(c.second));
ASSERT_EQ(s, State::InitializingDevice); ASSERT_EQ(s, static_cast<sdk::AggregatedTopologyState>(State::InitializingDevice));
LOG(debug) << "\tAggregated state: " << s; LOG(debug) << "\tAggregated state: " << s;
for (const auto& ds : c.second) { for (const auto& ds : c.second) {
LOG(debug) << "\t\t" << ds.state; LOG(debug) << "\t\t" << ds.state;
@@ -426,4 +452,62 @@ TEST_F(Topology, SetAndGetProperties)
ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code()); ASSERT_EQ(topo.ChangeState(TopologyTransition::ResetDevice).first, std::error_code());
} }
TEST(Topology2, AggregatedTopologyStateComparison)
{
using namespace fair::mq::sdk;
ASSERT_TRUE(DeviceState::Undefined == AggregatedTopologyState::Undefined);
ASSERT_TRUE(AggregatedTopologyState::Undefined == DeviceState::Undefined);
ASSERT_TRUE(DeviceState::Ok == AggregatedTopologyState::Ok);
ASSERT_TRUE(DeviceState::Error == AggregatedTopologyState::Error);
ASSERT_TRUE(DeviceState::Idle == AggregatedTopologyState::Idle);
ASSERT_TRUE(DeviceState::InitializingDevice == AggregatedTopologyState::InitializingDevice);
ASSERT_TRUE(DeviceState::Initialized == AggregatedTopologyState::Initialized);
ASSERT_TRUE(DeviceState::Binding == AggregatedTopologyState::Binding);
ASSERT_TRUE(DeviceState::Bound == AggregatedTopologyState::Bound);
ASSERT_TRUE(DeviceState::Connecting == AggregatedTopologyState::Connecting);
ASSERT_TRUE(DeviceState::DeviceReady == AggregatedTopologyState::DeviceReady);
ASSERT_TRUE(DeviceState::InitializingTask == AggregatedTopologyState::InitializingTask);
ASSERT_TRUE(DeviceState::Ready == AggregatedTopologyState::Ready);
ASSERT_TRUE(DeviceState::Running == AggregatedTopologyState::Running);
ASSERT_TRUE(DeviceState::ResettingTask == AggregatedTopologyState::ResettingTask);
ASSERT_TRUE(DeviceState::ResettingDevice == AggregatedTopologyState::ResettingDevice);
ASSERT_TRUE(DeviceState::Exiting == AggregatedTopologyState::Exiting);
ASSERT_TRUE(GetAggregatedTopologyState("UNDEFINED") == AggregatedTopologyState::Undefined);
ASSERT_TRUE(GetAggregatedTopologyState("OK") == AggregatedTopologyState::Ok);
ASSERT_TRUE(GetAggregatedTopologyState("ERROR") == AggregatedTopologyState::Error);
ASSERT_TRUE(GetAggregatedTopologyState("IDLE") == AggregatedTopologyState::Idle);
ASSERT_TRUE(GetAggregatedTopologyState("INITIALIZING DEVICE") == AggregatedTopologyState::InitializingDevice);
ASSERT_TRUE(GetAggregatedTopologyState("INITIALIZED") == AggregatedTopologyState::Initialized);
ASSERT_TRUE(GetAggregatedTopologyState("BINDING") == AggregatedTopologyState::Binding);
ASSERT_TRUE(GetAggregatedTopologyState("BOUND") == AggregatedTopologyState::Bound);
ASSERT_TRUE(GetAggregatedTopologyState("CONNECTING") == AggregatedTopologyState::Connecting);
ASSERT_TRUE(GetAggregatedTopologyState("DEVICE READY") == AggregatedTopologyState::DeviceReady);
ASSERT_TRUE(GetAggregatedTopologyState("INITIALIZING TASK") == AggregatedTopologyState::InitializingTask);
ASSERT_TRUE(GetAggregatedTopologyState("READY") == AggregatedTopologyState::Ready);
ASSERT_TRUE(GetAggregatedTopologyState("RUNNING") == AggregatedTopologyState::Running);
ASSERT_TRUE(GetAggregatedTopologyState("RESETTING TASK") == AggregatedTopologyState::ResettingTask);
ASSERT_TRUE(GetAggregatedTopologyState("RESETTING DEVICE") == AggregatedTopologyState::ResettingDevice);
ASSERT_TRUE(GetAggregatedTopologyState("EXITING") == AggregatedTopologyState::Exiting);
ASSERT_TRUE(GetAggregatedTopologyState("MIXED") == AggregatedTopologyState::Mixed);
ASSERT_TRUE("UNDEFINED" == GetAggregatedTopologyStateName(AggregatedTopologyState::Undefined));
ASSERT_TRUE("OK" == GetAggregatedTopologyStateName(AggregatedTopologyState::Ok));
ASSERT_TRUE("ERROR" == GetAggregatedTopologyStateName(AggregatedTopologyState::Error));
ASSERT_TRUE("IDLE" == GetAggregatedTopologyStateName(AggregatedTopologyState::Idle));
ASSERT_TRUE("INITIALIZING DEVICE" == GetAggregatedTopologyStateName(AggregatedTopologyState::InitializingDevice));
ASSERT_TRUE("INITIALIZED" == GetAggregatedTopologyStateName(AggregatedTopologyState::Initialized));
ASSERT_TRUE("BINDING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Binding));
ASSERT_TRUE("BOUND" == GetAggregatedTopologyStateName(AggregatedTopologyState::Bound));
ASSERT_TRUE("CONNECTING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Connecting));
ASSERT_TRUE("DEVICE READY" == GetAggregatedTopologyStateName(AggregatedTopologyState::DeviceReady));
ASSERT_TRUE("INITIALIZING TASK" == GetAggregatedTopologyStateName(AggregatedTopologyState::InitializingTask));
ASSERT_TRUE("READY" == GetAggregatedTopologyStateName(AggregatedTopologyState::Ready));
ASSERT_TRUE("RUNNING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Running));
ASSERT_TRUE("RESETTING TASK" == GetAggregatedTopologyStateName(AggregatedTopologyState::ResettingTask));
ASSERT_TRUE("RESETTING DEVICE" == GetAggregatedTopologyStateName(AggregatedTopologyState::ResettingDevice));
ASSERT_TRUE("EXITING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Exiting));
ASSERT_TRUE("MIXED" == GetAggregatedTopologyStateName(AggregatedTopologyState::Mixed));
}
} // namespace } // namespace

View File

@@ -66,6 +66,24 @@ void RunOptionsTest(const string& transport)
ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000); ASSERT_EQ(channel.GetSocket().GetRcvKernelSize(), 8000);
} }
void ZeroingAndMlock(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 16384);
config.SetProperty<bool>("shm-zero-segment", true);
config.SetProperty<bool>("shm-mlock-segment", true);
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
FairMQMessagePtr outMsg(factory->CreateMessage(10000));
char test[10000];
memset(test, 0, sizeof(test));
ASSERT_EQ(memcmp(test, outMsg->GetData(), outMsg->GetSize()), 0);
}
TEST(Options, zeromq) TEST(Options, zeromq)
{ {
RunOptionsTest("zeromq"); RunOptionsTest("zeromq");
@@ -76,4 +94,9 @@ TEST(Options, shmem)
RunOptionsTest("shmem"); RunOptionsTest("shmem");
} }
TEST(ZeroingAndMlock, shmem)
{
ZeroingAndMlock("shmem");
}
} // namespace } // namespace