mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 01:51:45 +00:00
Compare commits
12 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
690e8a0370 | ||
|
1f0c94f898 | ||
|
62ed4e5f80 | ||
|
f1d6b18668 | ||
|
c0153a6b55 | ||
|
86a1dd38a2 | ||
|
be8ab06cc1 | ||
|
b0f73017e2 | ||
|
b83655d5da | ||
|
9c27051cdc | ||
|
65f9519917 | ||
|
b5545c1575 |
@@ -94,7 +94,7 @@ endif()
|
||||
|
||||
if(BUILD_FAIRMQ OR BUILD_SDK)
|
||||
find_package2(PUBLIC FairLogger REQUIRED
|
||||
VERSION 1.2.0
|
||||
VERSION 1.6.0
|
||||
)
|
||||
|
||||
foreach(dep IN LISTS FairLogger_PACKAGE_DEPENDENCIES)
|
||||
|
@@ -10,7 +10,7 @@
|
||||
<exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe>
|
||||
<env reachable="false">fairmq-ex-n-m-env.sh</env>
|
||||
<properties>
|
||||
<name access="write">fmqchan_sync</id>
|
||||
<name access="write">fmqchan_sync</name>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
@@ -18,8 +18,8 @@
|
||||
<exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=pair,method=connect,numSockets=${numReceivers} --dds-i data:%taskIndex%</exe>
|
||||
<env reachable="false">fairmq-ex-n-m-env.sh</env>
|
||||
<properties>
|
||||
<name access="read">fmqchan_sync</id>
|
||||
<name access="read">fmqchan_data</id>
|
||||
<name access="read">fmqchan_sync</name>
|
||||
<name access="read">fmqchan_data</name>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
@@ -27,7 +27,7 @@
|
||||
<exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pair,method=bind,numSockets=${numSenders}</exe>
|
||||
<env reachable="false">fairmq-ex-n-m-env.sh</env>
|
||||
<properties>
|
||||
<name access="write">fmqchan_data</id>
|
||||
<name access="write">fmqchan_data</name>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
|
@@ -10,7 +10,7 @@
|
||||
<exe reachable="true">fairmq-ex-n-m-synchronizer --id sync --rate 100 --color false -P dds --channel-config name=sync,type=pub,method=bind</exe>
|
||||
<env reachable="false">fairmq-ex-n-m-env.sh</env>
|
||||
<properties>
|
||||
<name access="write">fmqchan_sync</id>
|
||||
<name access="write">fmqchan_sync</name>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
@@ -18,8 +18,8 @@
|
||||
<exe reachable="true">fairmq-ex-n-m-sender --id sender%taskIndex% --timeframe-size 100000 --num-receivers ${numReceivers} --color false -P dds --channel-config name=sync,type=sub,method=connect name=data,type=push,method=connect,numSockets=${numReceivers}</exe>
|
||||
<env reachable="false">fairmq-ex-n-m-env.sh</env>
|
||||
<properties>
|
||||
<name access="read">fmqchan_sync</id>
|
||||
<name access="read">fmqchan_data</id>
|
||||
<name access="read">fmqchan_sync</name>
|
||||
<name access="read">fmqchan_data</name>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
@@ -27,7 +27,7 @@
|
||||
<exe reachable="true">fairmq-ex-n-m-receiver --id receiver%taskIndex% --num-senders ${numSenders} --color false -P dds --max-timeframes 10 --channel-config name=data,type=pull,method=bind</exe>
|
||||
<env reachable="false">fairmq-ex-n-m-env.sh</env>
|
||||
<properties>
|
||||
<name access="write">fmqchan_data</id>
|
||||
<name access="write">fmqchan_data</name>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
|
@@ -380,6 +380,7 @@ if(BUILD_FAIRMQ)
|
||||
Boost::boost
|
||||
Boost::date_time
|
||||
Boost::program_options
|
||||
FairLogger::FairLogger
|
||||
PicoSHA2
|
||||
)
|
||||
target_include_directories(fairmq-shmmonitor PUBLIC
|
||||
|
@@ -32,6 +32,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
, fMemSet(false)
|
||||
, fNumParts(1)
|
||||
, fMsgSize(10000)
|
||||
, fMsgAlignment(0)
|
||||
, fMsgRate(0)
|
||||
, fNumIterations(0)
|
||||
, fMaxIterations(0)
|
||||
@@ -44,6 +45,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
fMemSet = fConfig->GetProperty<bool>("memset");
|
||||
fNumParts = fConfig->GetProperty<size_t>("num-parts");
|
||||
fMsgSize = fConfig->GetProperty<size_t>("msg-size");
|
||||
fMsgAlignment = fConfig->GetProperty<size_t>("msg-alignment");
|
||||
fMsgRate = fConfig->GetProperty<float>("msg-rate");
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
fOutChannelName = fConfig->GetProperty<std::string>("out-channel");
|
||||
@@ -64,7 +66,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
FairMQParts parts;
|
||||
|
||||
for (size_t i = 0; i < fNumParts; ++i) {
|
||||
parts.AddPart(dataOutChannel.NewMessage(fMsgSize));
|
||||
parts.AddPart(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment}));
|
||||
if (fMemSet) {
|
||||
std::memset(parts.At(i)->GetData(), 0, parts.At(i)->GetSize());
|
||||
}
|
||||
@@ -79,7 +81,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
++fNumIterations;
|
||||
}
|
||||
} else {
|
||||
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
|
||||
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment}));
|
||||
if (fMemSet) {
|
||||
std::memset(msg->GetData(), 0, msg->GetSize());
|
||||
}
|
||||
@@ -111,6 +113,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
bool fMemSet;
|
||||
size_t fNumParts;
|
||||
size_t fMsgSize;
|
||||
size_t fMsgAlignment;
|
||||
std::atomic<int> fMsgCounter;
|
||||
float fMsgRate;
|
||||
uint64_t fNumIterations;
|
||||
|
@@ -19,6 +19,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
("memset", bpo::value<bool>()->default_value(false), "Memset allocated buffers to 0")
|
||||
("num-parts", bpo::value<size_t>()->default_value(1), "Number of parts to send. 1 will send single messages, not parts")
|
||||
("msg-size", bpo::value<size_t>()->default_value(1000000), "Message size in bytes")
|
||||
("msg-alignment", bpo::value<size_t>()->default_value(0), "Message alignment")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
|
||||
("msg-rate", bpo::value<float>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
||||
}
|
||||
|
@@ -105,12 +105,30 @@ inline auto operator==(AggregatedTopologyState lhs, DeviceState rhs) -> bool
|
||||
inline std::ostream& operator<<(std::ostream& os, const AggregatedTopologyState& state)
|
||||
{
|
||||
if (state == AggregatedTopologyState::Mixed) {
|
||||
return os << "Mixed";
|
||||
return os << "MIXED";
|
||||
} else {
|
||||
return os << static_cast<DeviceState>(state);
|
||||
}
|
||||
}
|
||||
|
||||
inline std::string GetAggregatedTopologyStateName(AggregatedTopologyState s)
|
||||
{
|
||||
if (s == AggregatedTopologyState::Mixed) {
|
||||
return "MIXED";
|
||||
} else {
|
||||
return GetStateName(static_cast<State>(s));
|
||||
}
|
||||
}
|
||||
|
||||
inline AggregatedTopologyState GetAggregatedTopologyState(const std::string& state)
|
||||
{
|
||||
if (state == "MIXED") {
|
||||
return AggregatedTopologyState::Mixed;
|
||||
} else {
|
||||
return static_cast<AggregatedTopologyState>(GetState(state));
|
||||
}
|
||||
}
|
||||
|
||||
struct DeviceStatus
|
||||
{
|
||||
bool subscribed_to_state_changes;
|
||||
|
@@ -25,13 +25,15 @@
|
||||
#include <fairmq/tools/CppSTL.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <boost/process.hpp>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
#include <boost/filesystem.hpp>
|
||||
#include <boost/interprocess/indexes/null_index.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/sync/named_condition.hpp>
|
||||
#include <boost/interprocess/sync/named_mutex.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
#include <boost/interprocess/mem_algo/simple_seq_fit.hpp>
|
||||
#include <boost/process.hpp>
|
||||
|
||||
#include <cstdlib> // getenv
|
||||
#include <condition_variable>
|
||||
@@ -55,6 +57,13 @@ namespace shmem
|
||||
|
||||
struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
|
||||
using SimpleSeqFitSegment = boost::interprocess::basic_managed_shared_memory<char,
|
||||
boost::interprocess::simple_seq_fit<boost::interprocess::mutex_family>,
|
||||
boost::interprocess::iset_index>;
|
||||
using RBTreeBestFitSegment = boost::interprocess::basic_managed_shared_memory<char,
|
||||
boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family>,
|
||||
boost::interprocess::iset_index>;
|
||||
|
||||
class Manager
|
||||
{
|
||||
public:
|
||||
@@ -96,7 +105,9 @@ class Manager
|
||||
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
|
||||
if (mlockSegment) {
|
||||
LOG(debug) << "Locking the managed segment memory pages...";
|
||||
mlock(fSegment.get_address(), fSegment.get_size());
|
||||
if (mlock(fSegment.get_address(), fSegment.get_size()) == -1) {
|
||||
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
||||
}
|
||||
if (zeroSegment) {
|
||||
@@ -131,7 +142,7 @@ class Manager
|
||||
Manager(const Manager&) = delete;
|
||||
Manager operator=(const Manager&) = delete;
|
||||
|
||||
boost::interprocess::managed_shared_memory& Segment() { return fSegment; }
|
||||
RBTreeBestFitSegment& Segment() { return fSegment; }
|
||||
boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
|
||||
|
||||
static void StartMonitor(const std::string& id)
|
||||
@@ -380,8 +391,8 @@ class Manager
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementMsgCounter() { ++fMsgCounter; }
|
||||
void DecrementMsgCounter() { --fMsgCounter; }
|
||||
void IncrementMsgCounter() { fMsgCounter.fetch_add(1, std::memory_order_relaxed); }
|
||||
void DecrementMsgCounter() { fMsgCounter.fetch_sub(1, std::memory_order_relaxed); }
|
||||
|
||||
void SendHeartbeats()
|
||||
{
|
||||
@@ -444,7 +455,8 @@ class Manager
|
||||
private:
|
||||
std::string fShmId;
|
||||
std::string fDeviceId;
|
||||
boost::interprocess::managed_shared_memory fSegment;
|
||||
// boost::interprocess::managed_shared_memory fSegment;
|
||||
RBTreeBestFitSegment fSegment;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
boost::interprocess::named_mutex fShmMtx;
|
||||
|
@@ -244,7 +244,7 @@ class Message final : public fair::mq::Message
|
||||
|
||||
bool InitializeChunk(const size_t size, size_t alignment = 0)
|
||||
{
|
||||
tools::RateLimiter rateLimiter(20);
|
||||
// tools::RateLimiter rateLimiter(20);
|
||||
|
||||
while (fMeta.fHandle < 0) {
|
||||
try {
|
||||
@@ -263,9 +263,10 @@ class Message final : public fair::mq::Message
|
||||
} catch (boost::interprocess::bad_alloc& ba) {
|
||||
// LOG(warn) << "Shared memory full...";
|
||||
if (fManager.ThrowingOnBadAlloc()) {
|
||||
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default"));
|
||||
throw MessageBadAlloc(tools::ToString("shmem: could not create a message of size ", size, ", alignment: ", (alignment != 0) ? std::to_string(alignment) : "default", ", free memory: ", fManager.Segment().get_free_memory()));
|
||||
}
|
||||
rateLimiter.maybe_sleep();
|
||||
// rateLimiter.maybe_sleep();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
if (fManager.Interrupted()) {
|
||||
return false;
|
||||
} else {
|
||||
|
@@ -10,6 +10,7 @@
|
||||
#include "Common.h"
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
#include <fairlogger/Logger.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/file_mapping.hpp>
|
||||
@@ -22,6 +23,10 @@
|
||||
#include <csignal>
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
#include <chrono>
|
||||
#include <ctime>
|
||||
#include <time.h>
|
||||
#include <iomanip>
|
||||
|
||||
#include <termios.h>
|
||||
#include <poll.h>
|
||||
@@ -48,7 +53,7 @@ void signalHandler(int signal)
|
||||
gSignalStatus = signal;
|
||||
}
|
||||
|
||||
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit)
|
||||
Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit)
|
||||
: fSelfDestruct(selfDestruct)
|
||||
, fInteractive(interactive)
|
||||
, fViewOnly(viewOnly)
|
||||
@@ -56,6 +61,7 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
|
||||
, fSeenOnce(false)
|
||||
, fCleanOnExit(cleanOnExit)
|
||||
, fTimeoutInMS(timeoutInMS)
|
||||
, fIntervalInMS(intervalInMS)
|
||||
, fShmId(shmId)
|
||||
, fSegmentName("fmq_" + fShmId + "_main")
|
||||
, fManagementSegmentName("fmq_" + fShmId + "_mng")
|
||||
@@ -74,6 +80,10 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
|
||||
throw DaemonPresent(tools::ToString("fairmq-shmmonitor for shared memory id ", fShmId, " already started or not properly exited."));
|
||||
}
|
||||
}
|
||||
|
||||
Logger::SetConsoleColor(false);
|
||||
Logger::DefineVerbosity(Verbosity::user1, VerbositySpec::Make(VerbositySpec::Info::timestamp_us));
|
||||
Logger::SetVerbosity(Verbosity::verylow);
|
||||
}
|
||||
|
||||
void Monitor::CatchSignals()
|
||||
@@ -110,7 +120,7 @@ void Monitor::Run()
|
||||
Interactive();
|
||||
} else {
|
||||
while (!fTerminating) {
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
this_thread::sleep_for(chrono::milliseconds(fIntervalInMS));
|
||||
CheckSegment();
|
||||
}
|
||||
}
|
||||
@@ -183,7 +193,7 @@ void Monitor::Interactive()
|
||||
PrintHeader();
|
||||
|
||||
while (!fTerminating) {
|
||||
if (poll(cinfd, 1, 100)) {
|
||||
if (poll(cinfd, 1, fIntervalInMS)) {
|
||||
if (fTerminating || gSignalStatus != 0) {
|
||||
break;
|
||||
}
|
||||
@@ -275,7 +285,7 @@ void Monitor::CheckSegment()
|
||||
|
||||
unsigned int numDevices = 0;
|
||||
|
||||
if (fInteractive) {
|
||||
if (fInteractive || fViewOnly) {
|
||||
DeviceCounter* dc = managementSegment.find<DeviceCounter>(bipc::unique_instance).first;
|
||||
if (dc) {
|
||||
numDevices = dc->fCount;
|
||||
@@ -303,6 +313,15 @@ void Monitor::CheckSegment()
|
||||
<< setw(8) << numDevices << " | "
|
||||
<< setw(10) << (fViewOnly ? "view only" : to_string(duration)) << " |"
|
||||
<< c << flush;
|
||||
} else if (fViewOnly) {
|
||||
size_t free = segment.get_free_memory();
|
||||
size_t total = segment.get_size();
|
||||
size_t used = total - free;
|
||||
LOGV(info, user1) << "[" << fSegmentName
|
||||
<< "] devices: " << numDevices
|
||||
<< ", total: " << total
|
||||
<< ", free: " << free
|
||||
<< ", used: " << used;
|
||||
}
|
||||
} catch (bie&) {
|
||||
fHeartbeatTriggered = false;
|
||||
@@ -473,7 +492,7 @@ void Monitor::Cleanup(const ShmId& shmId)
|
||||
RemoveObject(managementSegmentName.c_str());
|
||||
} catch (bie&) {
|
||||
cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl;
|
||||
} catch(std::out_of_range& oor) {
|
||||
} catch(out_of_range& oor) {
|
||||
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
|
||||
}
|
||||
|
||||
|
@@ -37,7 +37,7 @@ struct ShmId
|
||||
class Monitor
|
||||
{
|
||||
public:
|
||||
Monitor(const std::string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit);
|
||||
Monitor(const std::string& shmId, bool selfDestruct, bool interactive, bool viewOnly, unsigned int timeoutInMS, unsigned int intervalInMS, bool runAsDaemon, bool cleanOnExit);
|
||||
|
||||
Monitor(const Monitor&) = delete;
|
||||
Monitor operator=(const Monitor&) = delete;
|
||||
@@ -84,6 +84,7 @@ class Monitor
|
||||
bool fSeenOnce; // true is segment has been opened successfully at least once
|
||||
bool fCleanOnExit;
|
||||
unsigned int fTimeoutInMS;
|
||||
unsigned int fIntervalInMS;
|
||||
std::string fShmId;
|
||||
std::string fSegmentName;
|
||||
std::string fManagementSegmentName;
|
||||
|
@@ -127,13 +127,20 @@ class Poller final : public fair::mq::Poller
|
||||
|
||||
void Poll(const int timeout) override
|
||||
{
|
||||
if (zmq_poll(fItems, fNumItems, timeout) < 0) {
|
||||
if (errno == ETERM) {
|
||||
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
|
||||
} else {
|
||||
LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
|
||||
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
|
||||
while (true) {
|
||||
if (zmq_poll(fItems, fNumItems, timeout) < 0) {
|
||||
if (errno == ETERM) {
|
||||
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
|
||||
return;
|
||||
} else if (errno == EINTR) {
|
||||
LOG(debug) << "polling interrupted by system call";
|
||||
continue;
|
||||
} else {
|
||||
LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
|
||||
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -148,9 +148,6 @@ class Socket final : public fair::mq::Socket
|
||||
if (zmq_errno() == ETERM) {
|
||||
LOG(debug) << "Terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Transfer interrupted by system call";
|
||||
return -1;
|
||||
} else {
|
||||
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return -1;
|
||||
@@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket
|
||||
size_t size = msg->GetSize();
|
||||
fBytesTx += size;
|
||||
return size;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
@@ -220,7 +217,7 @@ class Socket final : public fair::mq::Socket
|
||||
fBytesRx += size;
|
||||
++fMessagesRx;
|
||||
return size;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
@@ -269,7 +266,7 @@ class Socket final : public fair::mq::Socket
|
||||
fBytesTx += totalSize;
|
||||
|
||||
return totalSize;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
@@ -323,7 +320,7 @@ class Socket final : public fair::mq::Socket
|
||||
fBytesRx += totalSize;
|
||||
|
||||
return totalSize;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
|
@@ -43,7 +43,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
: fair::mq::TransportFactory(id)
|
||||
, fDeviceId(id)
|
||||
, fShmId()
|
||||
, fZMQContext(zmq_ctx_new())
|
||||
, fZmqCtx(zmq_ctx_new())
|
||||
, fManager(nullptr)
|
||||
{
|
||||
int major, minor, patch;
|
||||
@@ -51,7 +51,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & "
|
||||
<< "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")";
|
||||
|
||||
if (!fZMQContext) {
|
||||
if (!fZmqCtx) {
|
||||
throw std::runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
|
||||
@@ -70,12 +70,12 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
|
||||
|
||||
try {
|
||||
if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) {
|
||||
if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
|
||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
// Set the maximum number of allowed sockets on the context.
|
||||
if (zmq_ctx_set(fZMQContext, ZMQ_MAX_SOCKETS, 10000) != 0) {
|
||||
if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
|
||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
|
||||
SocketPtr CreateSocket(const std::string& type, const std::string& name) override
|
||||
{
|
||||
return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZMQContext, this);
|
||||
return tools::make_unique<Socket>(*fManager, type, name, GetId(), fZmqCtx, this);
|
||||
}
|
||||
|
||||
PollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override
|
||||
@@ -179,14 +179,17 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
{
|
||||
LOG(debug) << "Destroying Shared Memory transport...";
|
||||
|
||||
if (fZMQContext) {
|
||||
if (zmq_ctx_term(fZMQContext) != 0) {
|
||||
if (errno == EINTR) {
|
||||
LOG(error) << "failed closing context, reason: " << zmq_strerror(errno);
|
||||
} else {
|
||||
fZMQContext = nullptr;
|
||||
return;
|
||||
if (fZmqCtx) {
|
||||
while (true) {
|
||||
if (zmq_ctx_term(fZmqCtx) != 0) {
|
||||
if (errno == EINTR) {
|
||||
LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
|
||||
continue;
|
||||
} else {
|
||||
fZmqCtx = nullptr;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "context not available for shutdown";
|
||||
@@ -196,7 +199,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
private:
|
||||
std::string fDeviceId;
|
||||
std::string fShmId;
|
||||
void* fZMQContext;
|
||||
void* fZmqCtx;
|
||||
std::unique_ptr<Manager> fManager;
|
||||
};
|
||||
|
||||
|
@@ -76,6 +76,7 @@ int main(int argc, char** argv)
|
||||
bool interactive = false;
|
||||
bool viewOnly = false;
|
||||
unsigned int timeoutInMS = 5000;
|
||||
unsigned int intervalInMS = 100;
|
||||
bool runAsDaemon = false;
|
||||
bool cleanOnExit = false;
|
||||
|
||||
@@ -90,6 +91,7 @@ int main(int argc, char** argv)
|
||||
("timeout,t" , value<unsigned int>(&timeoutInMS)->default_value(5000), "Heartbeat timeout in milliseconds")
|
||||
("daemonize,d" , value<bool>(&runAsDaemon)->implicit_value(true), "Daemonize the monitor")
|
||||
("clean-on-exit,e", value<bool>(&cleanOnExit)->implicit_value(true), "Perform cleanup on exit")
|
||||
("interval" , value<unsigned int>(&intervalInMS)->default_value(100), "Output interval for interactive/view-only mode")
|
||||
("help,h", "Print help");
|
||||
|
||||
variables_map vm;
|
||||
@@ -116,8 +118,11 @@ int main(int argc, char** argv)
|
||||
}
|
||||
|
||||
cout << "Starting shared memory monitor for session: \"" << sessionName << "\" (shmId: " << shmId << ")..." << endl;
|
||||
if (viewOnly && !interactive) {
|
||||
cout << "running in non-interactive view-only mode, outputting with interval of " << intervalInMS << "ms. (change with --interval), press ctrl+C to exit." << endl;
|
||||
}
|
||||
|
||||
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, runAsDaemon, cleanOnExit);
|
||||
Monitor monitor(shmId, selfDestruct, interactive, viewOnly, timeoutInMS, intervalInMS, runAsDaemon, cleanOnExit);
|
||||
|
||||
monitor.CatchSignals();
|
||||
monitor.Run();
|
||||
|
@@ -161,13 +161,16 @@ class Context
|
||||
UnsubscribeFromRegionEvents();
|
||||
|
||||
if (fZmqCtx) {
|
||||
if (zmq_ctx_term(fZmqCtx) != 0) {
|
||||
if (errno == EINTR) {
|
||||
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno);
|
||||
} else {
|
||||
fZmqCtx = nullptr;
|
||||
return;
|
||||
while (true) {
|
||||
if (zmq_ctx_term(fZmqCtx) != 0) {
|
||||
if (errno == EINTR) {
|
||||
LOG(debug) << "zmq_ctx_term interrupted by system call, retrying";
|
||||
continue;
|
||||
} else {
|
||||
fZmqCtx = nullptr;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "context not available for shutdown";
|
||||
|
@@ -130,13 +130,20 @@ class Poller final : public fair::mq::Poller
|
||||
|
||||
void Poll(const int timeout) override
|
||||
{
|
||||
if (zmq_poll(fItems, fNumItems, timeout) < 0) {
|
||||
if (errno == ETERM) {
|
||||
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
|
||||
} else {
|
||||
LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
|
||||
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
|
||||
while (true) {
|
||||
if (zmq_poll(fItems, fNumItems, timeout) < 0) {
|
||||
if (errno == ETERM) {
|
||||
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
|
||||
return;
|
||||
} else if (errno == EINTR) {
|
||||
LOG(debug) << "polling interrupted by system call";
|
||||
continue;
|
||||
} else {
|
||||
LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
|
||||
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -126,9 +126,6 @@ class Socket final : public fair::mq::Socket
|
||||
if (zmq_errno() == ETERM) {
|
||||
LOG(debug) << "Terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Transfer interrupted by system call";
|
||||
return -1;
|
||||
} else {
|
||||
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
|
||||
return -1;
|
||||
@@ -151,7 +148,7 @@ class Socket final : public fair::mq::Socket
|
||||
fBytesTx += nbytes;
|
||||
++fMessagesTx;
|
||||
return nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
@@ -177,7 +174,7 @@ class Socket final : public fair::mq::Socket
|
||||
fBytesRx += nbytes;
|
||||
++fMessagesRx;
|
||||
return nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
@@ -212,7 +209,7 @@ class Socket final : public fair::mq::Socket
|
||||
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
|
||||
if (nbytes >= 0) {
|
||||
totalSize += nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
repeat = true;
|
||||
break;
|
||||
@@ -261,7 +258,7 @@ class Socket final : public fair::mq::Socket
|
||||
if (nbytes >= 0) {
|
||||
msgVec.push_back(move(part));
|
||||
totalSize += nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
} else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
repeat = true;
|
||||
break;
|
||||
|
@@ -472,6 +472,42 @@ TEST(Topology2, AggregatedTopologyStateComparison)
|
||||
ASSERT_TRUE(DeviceState::ResettingTask == AggregatedTopologyState::ResettingTask);
|
||||
ASSERT_TRUE(DeviceState::ResettingDevice == AggregatedTopologyState::ResettingDevice);
|
||||
ASSERT_TRUE(DeviceState::Exiting == AggregatedTopologyState::Exiting);
|
||||
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("UNDEFINED") == AggregatedTopologyState::Undefined);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("OK") == AggregatedTopologyState::Ok);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("ERROR") == AggregatedTopologyState::Error);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("IDLE") == AggregatedTopologyState::Idle);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("INITIALIZING DEVICE") == AggregatedTopologyState::InitializingDevice);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("INITIALIZED") == AggregatedTopologyState::Initialized);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("BINDING") == AggregatedTopologyState::Binding);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("BOUND") == AggregatedTopologyState::Bound);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("CONNECTING") == AggregatedTopologyState::Connecting);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("DEVICE READY") == AggregatedTopologyState::DeviceReady);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("INITIALIZING TASK") == AggregatedTopologyState::InitializingTask);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("READY") == AggregatedTopologyState::Ready);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("RUNNING") == AggregatedTopologyState::Running);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("RESETTING TASK") == AggregatedTopologyState::ResettingTask);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("RESETTING DEVICE") == AggregatedTopologyState::ResettingDevice);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("EXITING") == AggregatedTopologyState::Exiting);
|
||||
ASSERT_TRUE(GetAggregatedTopologyState("MIXED") == AggregatedTopologyState::Mixed);
|
||||
|
||||
ASSERT_TRUE("UNDEFINED" == GetAggregatedTopologyStateName(AggregatedTopologyState::Undefined));
|
||||
ASSERT_TRUE("OK" == GetAggregatedTopologyStateName(AggregatedTopologyState::Ok));
|
||||
ASSERT_TRUE("ERROR" == GetAggregatedTopologyStateName(AggregatedTopologyState::Error));
|
||||
ASSERT_TRUE("IDLE" == GetAggregatedTopologyStateName(AggregatedTopologyState::Idle));
|
||||
ASSERT_TRUE("INITIALIZING DEVICE" == GetAggregatedTopologyStateName(AggregatedTopologyState::InitializingDevice));
|
||||
ASSERT_TRUE("INITIALIZED" == GetAggregatedTopologyStateName(AggregatedTopologyState::Initialized));
|
||||
ASSERT_TRUE("BINDING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Binding));
|
||||
ASSERT_TRUE("BOUND" == GetAggregatedTopologyStateName(AggregatedTopologyState::Bound));
|
||||
ASSERT_TRUE("CONNECTING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Connecting));
|
||||
ASSERT_TRUE("DEVICE READY" == GetAggregatedTopologyStateName(AggregatedTopologyState::DeviceReady));
|
||||
ASSERT_TRUE("INITIALIZING TASK" == GetAggregatedTopologyStateName(AggregatedTopologyState::InitializingTask));
|
||||
ASSERT_TRUE("READY" == GetAggregatedTopologyStateName(AggregatedTopologyState::Ready));
|
||||
ASSERT_TRUE("RUNNING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Running));
|
||||
ASSERT_TRUE("RESETTING TASK" == GetAggregatedTopologyStateName(AggregatedTopologyState::ResettingTask));
|
||||
ASSERT_TRUE("RESETTING DEVICE" == GetAggregatedTopologyStateName(AggregatedTopologyState::ResettingDevice));
|
||||
ASSERT_TRUE("EXITING" == GetAggregatedTopologyStateName(AggregatedTopologyState::Exiting));
|
||||
ASSERT_TRUE("MIXED" == GetAggregatedTopologyStateName(AggregatedTopologyState::Mixed));
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
Reference in New Issue
Block a user