From 0e35f1cb22c913b49f6bff5896958cf4a8d65d5c Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 3 Jul 2019 14:54:54 +0200 Subject: [PATCH] Shmem region: support huge pages via path to hugetlbfs mount --- examples/region/Sampler.cxx | 16 +- examples/region/Sink.cxx | 11 +- fairmq/FairMQChannel.h | 4 +- fairmq/FairMQDevice.h | 4 +- fairmq/FairMQTransportFactory.h | 2 +- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 4 +- fairmq/nanomsg/FairMQTransportFactoryNN.h | 2 +- fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx | 2 +- fairmq/nanomsg/FairMQUnmanagedRegionNN.h | 3 +- fairmq/ofi/TransportFactory.cxx | 2 +- fairmq/ofi/TransportFactory.h | 2 +- fairmq/shmem/Common.h | 35 +++- fairmq/shmem/FairMQMessageSHM.cxx | 40 +--- fairmq/shmem/FairMQSocketSHM.cxx | 14 +- fairmq/shmem/FairMQTransportFactorySHM.cxx | 207 ++++-------------- fairmq/shmem/FairMQTransportFactorySHM.h | 11 +- fairmq/shmem/FairMQUnmanagedRegionSHM.cxx | 20 +- fairmq/shmem/FairMQUnmanagedRegionSHM.h | 3 +- fairmq/shmem/Manager.cxx | 181 ++++++++++++---- fairmq/shmem/Manager.h | 22 +- fairmq/shmem/Monitor.cxx | 220 ++++++++------------ fairmq/shmem/Monitor.h | 5 +- fairmq/shmem/Region.cxx | 139 +++++++------ fairmq/shmem/Region.h | 7 +- fairmq/shmem/runMonitor.cxx | 34 +-- fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 6 +- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 2 +- fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx | 2 +- fairmq/zeromq/FairMQUnmanagedRegionZMQ.h | 3 +- 29 files changed, 466 insertions(+), 537 deletions(-) diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index 678c321e..c5b9f51a 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -40,8 +40,7 @@ void Sampler::InitTask() 10000000, [this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport --fNumUnackedMsgs; - if (fMaxIterations > 0) - { + if (fMaxIterations > 0) { LOG(debug) << "Received ack"; } } @@ -58,12 +57,14 @@ bool Sampler::ConditionalRun() nullptr // hint )); - if (Send(msg, "data", 0) > 0) - { + // static_cast(fRegion->GetData())[3] = 97; + // LOG(info) << "check: " << static_cast(fRegion->GetData())[3]; + // std::this_thread::sleep_for(std::chrono::seconds(1)); + + if (Send(msg, "data", 0) > 0) { ++fNumUnackedMsgs; - if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) - { + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; return false; } @@ -75,8 +76,7 @@ bool Sampler::ConditionalRun() void Sampler::ResetTask() { // if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead. - if (fNumUnackedMsgs != 0) - { + if (fNumUnackedMsgs != 0) { LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")"; this_thread::sleep_for(chrono::milliseconds(500)); LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; diff --git a/examples/region/Sink.cxx b/examples/region/Sink.cxx index 55c1c04b..71d744ec 100644 --- a/examples/region/Sink.cxx +++ b/examples/region/Sink.cxx @@ -35,14 +35,15 @@ void Sink::Run() { FairMQChannel& dataInChannel = fChannels.at("data").at(0); - while (!NewStatePending()) - { + while (!NewStatePending()) { FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage()); dataInChannel.Receive(msg); - // void* ptr = msg->GetData(); - if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) - { + // void* ptr = msg->GetData(); + // char* cptr = static_cast(ptr); + // LOG(info) << "check: " << cptr[3]; + + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; break; } diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 87ef5be1..ac132484 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -345,9 +345,9 @@ class FairMQChannel return Transport()->NewStaticMessage(data); } - FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) + FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) { - return Transport()->CreateUnmanagedRegion(size, callback); + return Transport()->CreateUnmanagedRegion(size, callback, path, flags); } private: diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 6d1678cf..1d04be32 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -240,9 +240,9 @@ class FairMQDevice } // creates unmanaged region with the transport of the specified channel - FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr) + FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) { - return GetChannel(channel, index).NewUnmanagedRegion(size, callback); + return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags); } template diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index fb3d3b84..8fb32893 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -72,7 +72,7 @@ class FairMQTransportFactory /// Create a poller for specific channels (all subchannels) virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0; /// Get transport type virtual fair::mq::Transport GetType() const = 0; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index f8ef6504..f3c63c3d 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -65,9 +65,9 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map(new FairMQPollerNN(channelsMap, channelList)); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const { - return unique_ptr(new FairMQUnmanagedRegionNN(size, callback)); + return unique_ptr(new FairMQUnmanagedRegionNN(size, callback, path, flags)); } fair::mq::Transport FairMQTransportFactoryNN::GetType() const diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index e04b9eb0..abe0d4d1 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -36,7 +36,7 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; fair::mq::Transport GetType() const override; diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx index 7f8c06d5..4490b776 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx @@ -11,7 +11,7 @@ using namespace std; -FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback) +FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */) : fBuffer(malloc(size)) , fSize(size) , fCallback(callback) diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h index bcb7de4d..1f2da647 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h @@ -12,13 +12,14 @@ #include "FairMQUnmanagedRegion.h" #include // size_t +#include class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion { friend class FairMQSocketNN; public: - FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback); + FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete; FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete; diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index eac6127d..4e632470 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -85,7 +85,7 @@ auto TransportFactory::CreatePoller(const unordered_map UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index ca9f9641..3236466b 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -46,7 +46,7 @@ class TransportFactory final : public FairMQTransportFactory auto CreatePoller(const std::vector& channels) const -> PollerPtr override; auto CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const -> PollerPtr override; - auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const -> UnmanagedRegionPtr override; + auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override; auto GetType() const -> Transport override; diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 5048a0bf..60ed8e4c 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -10,8 +10,13 @@ #include #include +#include #include +#include +#include +#include +#include #include #include @@ -24,6 +29,32 @@ namespace mq namespace shmem { +using SegmentManager = boost::interprocess::managed_shared_memory::segment_manager; +using VoidAlloc = boost::interprocess::allocator; +using CharAlloc = boost::interprocess::allocator; +using Str = boost::interprocess::basic_string, CharAlloc>; +using StrAlloc = boost::interprocess::allocator; +using StrVector = boost::interprocess::vector; + +struct RegionInfo +{ + RegionInfo(const VoidAlloc& alloc) + : fPath("", alloc) + , fFlags(0) + {} + + RegionInfo(const char* path, int flags, const VoidAlloc& alloc) + : fPath(path, alloc) + , fFlags(flags) + {} + + Str fPath; + int fFlags; +}; + +using Uint64RegionInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; +using Uint64RegionInfoMap = boost::interprocess::map, Uint64RegionInfoPairAlloc>; + struct DeviceCounter { DeviceCounter(unsigned int c) @@ -35,11 +66,11 @@ struct DeviceCounter struct RegionCounter { - RegionCounter(unsigned int c) + RegionCounter(uint64_t c) : fCount(c) {} - std::atomic fCount; + std::atomic fCount; }; struct MonitorStatus diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index e71b8d71..99a509b4 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -223,25 +223,16 @@ zmq_msg_t* FairMQMessageSHM::GetMessage() void* FairMQMessageSHM::GetData() const { - if (fLocalPtr) - { + if (fLocalPtr) { return fLocalPtr; - } - else - { - if (fRegionId == 0) - { + } else { + if (fRegionId == 0) { return fManager.Segment().get_address_from_handle(fHandle); - } - else - { + } else { fRegionPtr = fManager.GetRemoteRegion(fRegionId); - if (fRegionPtr) - { + if (fRegionPtr) { fLocalPtr = reinterpret_cast(fRegionPtr->fRegion.get_address()) + fHandle; - } - else - { + } else { // LOG(warn) << "could not get pointer from a region message"; fLocalPtr = nullptr; } @@ -257,15 +248,10 @@ size_t FairMQMessageSHM::GetSize() const bool FairMQMessageSHM::SetUsedSize(const size_t size) { - if (size == fSize) - { + if (size == fSize) { return true; - } - else if (size <= fSize) - { - try - { - + } else if (size <= fSize) { + try { bipc::managed_shared_memory::size_type shrunkSize = size; fLocalPtr = fManager.Segment().allocation_command(bipc::shrink_in_place, fSize + 128, shrunkSize, fLocalPtr); fSize = size; @@ -274,15 +260,11 @@ bool FairMQMessageSHM::SetUsedSize(const size_t size) MetaHeader* hdrPtr = static_cast(zmq_msg_data(&fMessage)); hdrPtr->fSize = fSize; return true; - } - catch (bipc::interprocess_exception& e) - { + } catch (bipc::interprocess_exception& e) { LOG(info) << "could not set used size: " << e.what(); return false; } - } - else - { + } else { LOG(error) << "cannot set used size higher than original."; return false; } diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 311c8000..5318021f 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -375,20 +375,20 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim for (size_t m = 0; m < numMessages; m++) { - MetaHeader metaHeader; - memcpy(&metaHeader, &hdrVec[m], sizeof(MetaHeader)); + MetaHeader hdr; + memcpy(&hdr, &hdrVec[m], sizeof(MetaHeader)); msgVec.emplace_back(fair::mq::tools::make_unique(fManager, GetTransport())); FairMQMessageSHM* msg = static_cast(msgVec.back().get()); MetaHeader* msgHdr = static_cast(zmq_msg_data(msg->GetMessage())); - memcpy(msgHdr, &metaHeader, sizeof(MetaHeader)); + memcpy(msgHdr, &hdr, sizeof(MetaHeader)); - msg->fHandle = metaHeader.fHandle; - msg->fSize = metaHeader.fSize; - msg->fRegionId = metaHeader.fRegionId; - msg->fHint = metaHeader.fHint; + msg->fHandle = hdr.fHandle; + msg->fSize = hdr.fSize; + msg->fRegionId = hdr.fRegionId; + msg->fHint = hdr.fHint; totalSize += msg->GetSize(); } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index a17e5e16..5831582d 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -9,11 +9,11 @@ #include "FairMQLogger.h" #include "FairMQTransportFactorySHM.h" +#include + #include #include -#include -#include #include #include @@ -28,7 +28,6 @@ using namespace std; using namespace fair::mq::shmem; -namespace bfs = ::boost::filesystem; namespace bpt = ::boost::posix_time; namespace bipc = ::boost::interprocess; @@ -38,175 +37,73 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai : FairMQTransportFactory(id) , fDeviceId(id) , fShmId() - , fContext(nullptr) + , fZMQContext(nullptr) + , fManager(nullptr) , fHeartbeatThread() , fSendHeartbeats(true) - , fShMutex(nullptr) - , fDeviceCounter(nullptr) - , fManager(nullptr) { int major, minor, patch; zmq_version(&major, &minor, &patch); LOG(debug) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; - fContext = zmq_ctx_new(); - if (!fContext) - { - LOG(error) << "failed creating context, reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); + fZMQContext = zmq_ctx_new(); + if (!fZMQContext) { + throw runtime_error(fair::mq::tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); } int numIoThreads = 1; string sessionName = "default"; size_t segmentSize = 2000000000; bool autolaunchMonitor = false; - if (config) - { + if (config) { numIoThreads = config->GetValue("io-threads"); sessionName = config->GetValue("session"); segmentSize = config->GetValue("shm-segment-size"); autolaunchMonitor = config->GetValue("shm-monitor"); - } - else - { + } else { LOG(debug) << "FairMQProgOptions not available! Using defaults."; } fShmId = buildShmIdFromSessionIdAndUserId(sessionName); - try - { - fShMutex = fair::mq::tools::make_unique(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str()); - - if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) - { + try { + if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) { LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); } // Set the maximum number of allowed sockets on the context. - if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) - { + if (zmq_ctx_set(fZMQContext, ZMQ_MAX_SOCKETS, 10000) != 0) { LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); } fManager = fair::mq::tools::make_unique(fShmId, segmentSize); - LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes."; - { - bipc::scoped_lock lock(*fShMutex); - - fDeviceCounter = fManager->Segment().find(bipc::unique_instance).first; - if (fDeviceCounter) - { - LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; - (fDeviceCounter->fCount)++; - LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; - } - else - { - LOG(debug) << "no device counter found, creating one and initializing with 1"; - fDeviceCounter = fManager->Segment().construct(bipc::unique_instance)(1); - LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; - } - - // start shm monitor - if (autolaunchMonitor) - { - try - { - MonitorStatus* monitorStatus = fManager->ManagementSegment().find(bipc::unique_instance).first; - if (monitorStatus == nullptr) - { - LOG(debug) << "no fairmq-shmmonitor found, starting..."; - StartMonitor(); - } - else - { - LOG(debug) << "found fairmq-shmmonitor."; - } - } - catch (exception& e) - { - LOG(error) << "Exception during fairmq-shmmonitor initialization: " << e.what() << ", application will now exit"; - exit(EXIT_FAILURE); - } - } + if (autolaunchMonitor) { + fManager->StartMonitor(); } - } - catch(bipc::interprocess_exception& e) - { + } catch (bipc::interprocess_exception& e) { LOG(error) << "Could not initialize shared memory transport: " << e.what(); - throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); + throw runtime_error(fair::mq::tools::ToString("Could not initialize shared memory transport: ", e.what())); } fSendHeartbeats = true; fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this); } -void FairMQTransportFactorySHM::StartMonitor() -{ - auto env = boost::this_process::environment(); - - vector ownPath = boost::this_process::path(); - - if (const char* fmqp = getenv("FAIRMQ_PATH")) - { - ownPath.insert(ownPath.begin(), bfs::path(fmqp)); - } - - bfs::path p = boost::process::search_path("fairmq-shmmonitor", ownPath); - - if (!p.empty()) - { - boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env); - int numTries = 0; - do - { - MonitorStatus* monitorStatus = fManager->ManagementSegment().find(bipc::unique_instance).first; - if (monitorStatus) - { - LOG(debug) << "fairmq-shmmonitor started"; - break; - } - else - { - this_thread::sleep_for(chrono::milliseconds(10)); - if (++numTries > 1000) - { - LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting."; - exit(EXIT_FAILURE); - } - } - } - while (true); - } - else - { - LOG(warn) << "could not find fairmq-shmmonitor in the path"; - } -} - void FairMQTransportFactorySHM::SendHeartbeats() { string controlQueueName("fmq_" + fShmId + "_cq"); - while (fSendHeartbeats) - { - try - { + while (fSendHeartbeats) { + try { bipc::message_queue mq(bipc::open_only, controlQueueName.c_str()); bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); - if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) - { + if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) { this_thread::sleep_for(chrono::milliseconds(100)); - } - else - { + } else { LOG(debug) << "control queue timeout"; } - } - catch (bipc::interprocess_exception& ie) - { + } catch (bipc::interprocess_exception& ie) { this_thread::sleep_for(chrono::milliseconds(500)); // LOG(warn) << "no " << controlQueueName << " found"; } @@ -235,8 +132,8 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionP FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) { - assert(fContext); - return unique_ptr(new FairMQSocketSHM(*fManager, type, name, GetId(), fContext, this)); + assert(fZMQContext); + return unique_ptr(new FairMQSocketSHM(*fManager, type, name, GetId(), fZMQContext, this)); } FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const @@ -254,9 +151,14 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map(new FairMQPollerSHM(channelsMap, channelList)); } -FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const +FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const { - return unique_ptr(new FairMQUnmanagedRegionSHM(*fManager, size, callback)); + return unique_ptr(new FairMQUnmanagedRegionSHM(*fManager, size, callback, path, flags)); +} + +fair::mq::Transport FairMQTransportFactorySHM::GetType() const +{ + return fTransportType; } FairMQTransportFactorySHM::~FairMQTransportFactorySHM() @@ -264,53 +166,16 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() fSendHeartbeats = false; fHeartbeatThread.join(); - if (fContext) - { - if (zmq_ctx_term(fContext) != 0) - { - if (errno == EINTR) - { + if (fZMQContext) { + if (zmq_ctx_term(fZMQContext) != 0) { + if (errno == EINTR) { LOG(error) << "failed closing context, reason: " << zmq_strerror(errno); - } - else - { - fContext = nullptr; + } else { + fZMQContext = nullptr; return; } } - } - else - { + } else { LOG(error) << "context not available for shutdown"; } - - bool lastRemoved = false; - - { // mutex scope - bipc::scoped_lock lock(*fShMutex); - - (fDeviceCounter->fCount)--; - - if (fDeviceCounter->fCount == 0) - { - LOG(debug) << "last segment user, removing segment."; - - fManager->RemoveSegment(); - lastRemoved = true; - } - else - { - LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it."; - } - } - - if (lastRemoved) - { - bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str()); - } -} - -fair::mq::Transport FairMQTransportFactorySHM::GetType() const -{ - return fTransportType; } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 93edb39c..ceb6dac5 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -19,8 +19,6 @@ #include "FairMQUnmanagedRegionSHM.h" #include -#include - #include #include #include @@ -44,7 +42,7 @@ class FairMQTransportFactorySHM final : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; fair::mq::Transport GetType() const override; @@ -56,17 +54,14 @@ class FairMQTransportFactorySHM final : public FairMQTransportFactory private: void SendHeartbeats(); - void StartMonitor(); static fair::mq::Transport fTransportType; std::string fDeviceId; std::string fShmId; - void* fContext; + void* fZMQContext; + std::unique_ptr fManager; std::thread fHeartbeatThread; std::atomic fSendHeartbeats; - std::unique_ptr fShMutex; - fair::mq::shmem::DeviceCounter* fDeviceCounter; - std::unique_ptr fManager; }; #endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */ diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx index 7bc18e89..5aa9efe1 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx @@ -15,22 +15,18 @@ using namespace fair::mq::shmem; namespace bipc = ::boost::interprocess; -FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback) +FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) : fManager(manager) , fRegion(nullptr) , fRegionId(0) { - try - { + try { RegionCounter* rc = fManager.ManagementSegment().find(bipc::unique_instance).first; - if (rc) - { + if (rc) { LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing."; (rc->fCount)++; LOG(debug) << "incremented region counter, now: " << rc->fCount; - } - else - { + } else { LOG(debug) << "no region counter found, creating one and initializing with 1"; rc = fManager.ManagementSegment().construct(bipc::unique_instance)(1); LOG(debug) << "initialized region counter with: " << rc->fCount; @@ -38,13 +34,11 @@ FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_ fRegionId = rc->fCount; - fRegion = fManager.CreateRegion(size, fRegionId, callback); - } - catch (bipc::interprocess_exception& e) - { + fRegion = fManager.CreateRegion(size, fRegionId, callback, path, flags); + } catch (bipc::interprocess_exception& e) { LOG(error) << "cannot create region. Already created/not cleaned up?"; LOG(error) << e.what(); - exit(EXIT_FAILURE); + throw; } } diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.h b/fairmq/shmem/FairMQUnmanagedRegionSHM.h index d0629046..4334f891 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.h +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.h @@ -18,6 +18,7 @@ #include #include // size_t +#include class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion { @@ -25,7 +26,7 @@ class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion friend class FairMQMessageSHM; public: - FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr); + FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0); void* GetData() const override; size_t GetSize() const override; diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index 0f90c146..a1652a31 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -9,8 +9,12 @@ #include #include +#include +#include + using namespace std; namespace bipc = ::boost::interprocess; +namespace bfs = ::boost::filesystem; namespace fair { @@ -21,19 +25,86 @@ namespace shmem std::unordered_map> Manager::fRegions; -Manager::Manager(const string& name, size_t size) - : fSessionName(name) - , fSegmentName("fmq_" + fSessionName + "_main") - , fManagementSegmentName("fmq_" + fSessionName + "_mng") +Manager::Manager(const std::string& id, size_t size) + : fShmId(id) + , fSegmentName("fmq_" + fShmId + "_main") + , fManagementSegmentName("fmq_" + fShmId + "_mng") , fSegment(bipc::open_or_create, fSegmentName.c_str(), size) , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) -{} + , fShmMtx(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str()) + , fDeviceCounter(nullptr) +{ + LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; + + bipc::scoped_lock lock(fShmMtx); + + fDeviceCounter = fManagementSegment.find(bipc::unique_instance).first; + + if (fDeviceCounter) { + LOG(debug) << "device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; + (fDeviceCounter->fCount)++; + LOG(debug) << "incremented device counter, now: " << fDeviceCounter->fCount; + } else { + LOG(debug) << "no device counter found, creating one and initializing with 1"; + fDeviceCounter = fManagementSegment.construct(bipc::unique_instance)(1); + LOG(debug) << "initialized device counter with: " << fDeviceCounter->fCount; + } +} bipc::managed_shared_memory& Manager::Segment() { return fSegment; } +bipc::managed_shared_memory& Manager::ManagementSegment() +{ + return fManagementSegment; +} + +void Manager::StartMonitor() +{ + try { + MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; + if (monitorStatus == nullptr) { + LOG(debug) << "no fairmq-shmmonitor found, starting..."; + auto env = boost::this_process::environment(); + + vector ownPath = boost::this_process::path(); + + if (const char* fmqp = getenv("FAIRMQ_PATH")) { + ownPath.insert(ownPath.begin(), bfs::path(fmqp)); + } + + bfs::path p = boost::process::search_path("fairmq-shmmonitor", ownPath); + + if (!p.empty()) { + boost::process::spawn(p, "-x", "--shmid", fShmId, "-d", "-t", "2000", env); + int numTries = 0; + do { + monitorStatus = fManagementSegment.find(bipc::unique_instance).first; + if (monitorStatus) { + LOG(debug) << "fairmq-shmmonitor started"; + break; + } else { + this_thread::sleep_for(chrono::milliseconds(10)); + if (++numTries > 1000) { + LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting."; + throw runtime_error(fair::mq::tools::ToString("Did not get response from fairmq-shmmonitor after ", 10 * 1000, " milliseconds. Exiting.")); + } + } + } while (true); + } else { + LOG(warn) << "could not find fairmq-shmmonitor in the path"; + } + } else { + LOG(debug) << "found fairmq-shmmonitor."; + } + } catch (std::exception& e) { + LOG(error) << "Exception during fairmq-shmmonitor initialization: " << e.what() << ", application will now exit"; + exit(EXIT_FAILURE); + } +} + void Manager::Interrupt() { } @@ -41,30 +112,31 @@ void Manager::Interrupt() void Manager::Resume() { // close remote regions before processing new transfers - for (auto it = fRegions.begin(); it != fRegions.end(); /**/) - { - if (it->second->fRemote) - { + for (auto it = fRegions.begin(); it != fRegions.end(); /**/) { + if (it->second->fRemote) { it = fRegions.erase(it); - } - else - { + } else { ++it; } } } -bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback) +bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) { auto it = fRegions.find(id); - if (it != fRegions.end()) - { + if (it != fRegions.end()) { LOG(error) << "Trying to create a region that already exists"; return nullptr; - } - else - { - auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, size, false, callback)); + } else { + // create region info + { + bipc::scoped_lock lock(fShmMtx); + VoidAlloc voidAlloc(fManagementSegment.get_segment_manager()); + Uint64RegionInfoMap* m = fManagementSegment.find_or_construct(bipc::unique_instance)(voidAlloc); + m->emplace(id, RegionInfo(path.c_str(), flags, voidAlloc)); + } + + auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, size, false, callback, path, flags)); r.first->second->StartReceivingAcks(); @@ -76,20 +148,28 @@ Region* Manager::GetRemoteRegion(const uint64_t id) { // remote region could actually be a local one if a message originates from this device (has been sent out and returned) auto it = fRegions.find(id); - if (it != fRegions.end()) - { + if (it != fRegions.end()) { return it->second.get(); - } - else - { - try - { - auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, 0, true, nullptr)); + } else { + try { + string path; + int flags; + + // get region info + { + bipc::scoped_lock lock(fShmMtx); + VoidAlloc voidAlloc(fSegment.get_segment_manager()); + Uint64RegionInfoMap* m = fManagementSegment.find(bipc::unique_instance).first; + RegionInfo ri = m->at(id); + path = ri.fPath.c_str(); + flags = ri.fFlags; + // LOG(debug) << "path: " << path << ", flags: " << flags; + } + + auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, 0, true, nullptr, path, flags)); return r.first->second.get(); - } - catch (bipc::interprocess_exception& e) - { - // LOG(warn) << "remote region (" << id << ") no longer exists"; + } catch (bipc::interprocess_exception& e) { + LOG(warn) << "Could not get remote region for id: " << id; return nullptr; } @@ -101,30 +181,43 @@ void Manager::RemoveRegion(const uint64_t id) fRegions.erase(id); } -void Manager::RemoveSegment() +void Manager::RemoveSegments() { - if (bipc::shared_memory_object::remove(fSegmentName.c_str())) - { + if (bipc::shared_memory_object::remove(fSegmentName.c_str())) { LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped."; - } - else - { + } else { LOG(debug) << "did not remove " << fSegmentName << " segment after the device stopped. Already removed?"; } - if (bipc::shared_memory_object::remove(fManagementSegmentName.c_str())) - { + if (bipc::shared_memory_object::remove(fManagementSegmentName.c_str())) { LOG(debug) << "successfully removed '" << fManagementSegmentName << "' segment after the device has stopped."; - } - else - { + } else { LOG(debug) << "did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?"; } } -bipc::managed_shared_memory& Manager::ManagementSegment() +Manager::~Manager() { - return fManagementSegment; + bool lastRemoved = false; + + { + bipc::scoped_lock lock(fShmMtx); + + (fDeviceCounter->fCount)--; + + if (fDeviceCounter->fCount == 0) { + LOG(debug) << "last segment user, removing segment."; + + RemoveSegments(); + lastRemoved = true; + } else { + LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it."; + } + } + + if (lastRemoved) { + bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str()); + } } } // namespace shmem diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index c50b59b6..a7c66317 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -40,32 +41,41 @@ class Manager friend struct Region; public: - Manager(const std::string& name, size_t size); + Manager(const std::string& id, size_t size); Manager() = delete; Manager(const Manager&) = delete; Manager operator=(const Manager&) = delete; + ~Manager(); + boost::interprocess::managed_shared_memory& Segment(); + boost::interprocess::managed_shared_memory& ManagementSegment(); + + void StartMonitor(); static void Interrupt(); static void Resume(); - boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback); + int GetDeviceCounter(); + int IncrementDeviceCounter(); + int DecrementDeviceCounter(); + + boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); Region* GetRemoteRegion(const uint64_t id); void RemoveRegion(const uint64_t id); - void RemoveSegment(); - - boost::interprocess::managed_shared_memory& ManagementSegment(); + void RemoveSegments(); private: - std::string fSessionName; + std::string fShmId; std::string fSegmentName; std::string fManagementSegmentName; boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fManagementSegment; + boost::interprocess::named_mutex fShmMtx; + fair::mq::shmem::DeviceCounter* fDeviceCounter; static std::unordered_map> fRegions; }; diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index eb7dae1f..17562dc5 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -8,11 +8,10 @@ #include #include +#include #include -#include -#include -#include +#include #include #include @@ -29,11 +28,6 @@ using namespace std; namespace bipc = ::boost::interprocess; namespace bpt = ::boost::posix_time; -using CharAllocator = bipc::allocator; -using String = bipc::basic_string, CharAllocator>; -using StringAllocator = bipc::allocator; -using StringVector = bipc::vector; - namespace { volatile sig_atomic_t gSignalStatus = 0; @@ -70,8 +64,7 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, unsig , fDeviceHeartbeats() { MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; - if (monitorStatus != nullptr) - { + if (monitorStatus != nullptr) { cout << "fairmq-shmmonitor already started or not properly exited. Try `fairmq-shmmonitor --cleanup`" << endl; exit(EXIT_FAILURE); } @@ -89,16 +82,12 @@ void Monitor::CatchSignals() void Monitor::SignalMonitor() { - while (true) - { - if (gSignalStatus != 0) - { + while (true) { + if (gSignalStatus != 0) { fTerminating = true; cout << "signal: " << gSignalStatus << endl; break; - } - else if (fTerminating) - { + } else if (fTerminating) { break; } @@ -110,14 +99,10 @@ void Monitor::Run() { thread heartbeatThread(&Monitor::MonitorHeartbeats, this); - if (fInteractive) - { + if (fInteractive) { Interactive(); - } - else - { - while (!fTerminating) - { + } else { + while (!fTerminating) { this_thread::sleep_for(chrono::milliseconds(100)); CheckSegment(); } @@ -128,32 +113,25 @@ void Monitor::Run() void Monitor::MonitorHeartbeats() { - try - { + try { bipc::message_queue mq(bipc::open_or_create, fControlQueueName.c_str(), 1000, 256); unsigned int priority; bipc::message_queue::size_type recvdSize; char msg[256] = {0}; - while (!fTerminating) - { + while (!fTerminating) { bpt::ptime rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); - if (mq.timed_receive(&msg, sizeof(msg), recvdSize, priority, rcvTill)) - { + if (mq.timed_receive(&msg, sizeof(msg), recvdSize, priority, rcvTill)) { fHeartbeatTriggered = true; fLastHeartbeat = chrono::high_resolution_clock::now(); string deviceId(msg, recvdSize); fDeviceHeartbeats[deviceId] = fLastHeartbeat; - } - else - { + } else { // cout << "control queue timeout" << endl; } } - } - catch (bipc::interprocess_exception& ie) - { + } catch (bipc::interprocess_exception& ie) { cout << ie.what() << endl; } @@ -178,19 +156,15 @@ void Monitor::Interactive() cout << endl; PrintHeader(); - while (!fTerminating) - { - if (poll(cinfd, 1, 100)) - { - if (fTerminating || gSignalStatus != 0) - { + while (!fTerminating) { + if (poll(cinfd, 1, 100)) { + if (fTerminating || gSignalStatus != 0) { break; } c = getchar(); - switch (c) - { + switch (c) { case 'q': cout << "\n[q] --> quitting." << endl; fTerminating = true; @@ -216,23 +190,20 @@ void Monitor::Interactive() break; } - if (fTerminating) - { + if (fTerminating) { break; } PrintHeader(); } - if (fTerminating) - { + if (fTerminating) { break; } CheckSegment(); - if (!fTerminating) - { + if (!fTerminating) { cout << "\r"; } } @@ -247,12 +218,10 @@ void Monitor::CheckSegment() { char c = '#'; - if (fInteractive) - { + if (fInteractive) { static uint64_t counter = 0; int mod = counter++ % 5; - switch (mod) - { + switch (mod) { case 0: c = '-'; break; @@ -273,37 +242,33 @@ void Monitor::CheckSegment() } } - try - { + try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); + bipc::managed_shared_memory managementSegment(bipc::open_only, fManagementSegmentName.c_str()); fSeenOnce = true; unsigned int numDevices = 0; - fair::mq::shmem::DeviceCounter* dc = segment.find(bipc::unique_instance).first; - if (dc) - { + fair::mq::shmem::DeviceCounter* dc = managementSegment.find(bipc::unique_instance).first; + if (dc) { numDevices = dc->fCount; } auto now = chrono::high_resolution_clock::now(); unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); - if (fHeartbeatTriggered && duration > fTimeoutInMS) - { + if (fHeartbeatTriggered && duration > fTimeoutInMS) { cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl; Cleanup(fShmId); fHeartbeatTriggered = false; - if (fSelfDestruct) - { + if (fSelfDestruct) { cout << "\nself destructing" << endl; fTerminating = true; } } - if (fInteractive) - { + if (fInteractive) { cout << "| " << setw(18) << fSegmentName << " | " << setw(10) << segment.get_size() << " | " @@ -317,12 +282,9 @@ void Monitor::CheckSegment() << c << flush; } - } - catch (bipc::interprocess_exception& ie) - { + } catch (bipc::interprocess_exception& ie) { fHeartbeatTriggered = false; - if (fInteractive) - { + if (fInteractive) { cout << "| " << setw(18) << "-" << " | " << setw(10) << "-" << " | " @@ -338,21 +300,17 @@ void Monitor::CheckSegment() auto now = chrono::high_resolution_clock::now(); unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); - if (fIsDaemon && duration > fTimeoutInMS * 2) - { + if (fIsDaemon && duration > fTimeoutInMS * 2) { Cleanup(fShmId); fHeartbeatTriggered = false; - if (fSelfDestruct) - { + if (fSelfDestruct) { cout << "\nself destructing" << endl; fTerminating = true; } } - if (fSelfDestruct) - { - if (fSeenOnce) - { + if (fSelfDestruct) { + if (fSeenOnce) { cout << "self destructing" << endl; fTerminating = true; } @@ -363,29 +321,38 @@ void Monitor::CheckSegment() void Monitor::Cleanup(const string& shmId) { string managementSegmentName("fmq_" + shmId + "_mng"); - try - { + try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); RegionCounter* rc = managementSegment.find(bipc::unique_instance).first; - if (rc) - { + if (rc) { cout << "Region counter found: " << rc->fCount << endl; - unsigned int regionCount = rc->fCount; - for (unsigned int i = 1; i <= regionCount; ++i) - { - RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); + uint64_t regionCount = rc->fCount; + + Uint64RegionInfoMap* m = managementSegment.find(bipc::unique_instance).first; + + for (uint64_t i = 1; i <= regionCount; ++i) { + if (m != nullptr) { + RegionInfo ri = m->at(i); + string path = ri.fPath.c_str(); + int flags = ri.fFlags; + cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl; + if (path != "") { + RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i))); + } else { + RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); + } + } else { + RemoveObject("fmq_" + shmId + "_rg_" + to_string(i)); + } + RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i))); } - } - else - { - cout << "shmem: no region counter found. no regions to cleanup." << endl; + } else { + cout << "No region counter found. no regions to cleanup." << endl; } RemoveObject(managementSegmentName.c_str()); - } - catch (bipc::interprocess_exception& ie) - { + } catch (bipc::interprocess_exception& ie) { cout << "Did not find '" << managementSegmentName << "' shared memory segment. No regions to cleanup." << endl; } @@ -397,36 +364,36 @@ void Monitor::Cleanup(const string& shmId) void Monitor::RemoveObject(const string& name) { - if (bipc::shared_memory_object::remove(name.c_str())) - { + if (bipc::shared_memory_object::remove(name.c_str())) { cout << "Successfully removed \"" << name << "\"." << endl; + } else { + cout << "Did not remove \"" << name << "\". Already removed?" << endl; } - else - { +} + +void Monitor::RemoveFileMapping(const string& name) +{ + if (bipc::file_mapping::remove(name.c_str())) { + cout << "Successfully removed \"" << name << "\"." << endl; + } else { cout << "Did not remove \"" << name << "\". Already removed?" << endl; } } void Monitor::RemoveQueue(const string& name) { - if (bipc::message_queue::remove(name.c_str())) - { + if (bipc::message_queue::remove(name.c_str())) { cout << "Successfully removed \"" << name << "\"." << endl; - } - else - { + } else { cout << "Did not remove \"" << name << "\". Already removed?" << endl; } } void Monitor::RemoveMutex(const string& name) { - if (bipc::named_mutex::remove(name.c_str())) - { + if (bipc::named_mutex::remove(name.c_str())) { cout << "Successfully removed \"" << name << "\"." << endl; - } - else - { + } else { cout << "Did not remove \"" << name << "\". Already removed?" << endl; } } @@ -435,47 +402,34 @@ void Monitor::PrintQueues() { cout << '\n'; - try - { + try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - StringVector* queues = segment.find(string("fmq_" + fShmId + "_qs").c_str()).first; - if (queues) - { + StrVector* queues = segment.find(string("fmq_" + fShmId + "_qs").c_str()).first; + if (queues) { cout << "found " << queues->size() << " queue(s):" << endl; - for (const auto& queue : *queues) - { + for (const auto& queue : *queues) { string name(queue.c_str()); cout << '\t' << name << " : "; atomic* queueSize = segment.find>(name.c_str()).first; - if (queueSize) - { + if (queueSize) { cout << *queueSize << " messages" << endl; - } - else - { + } else { cout << "\tqueue does not have a queue size entry." << endl; } } - } - else - { + } else { cout << "\tno queues found" << endl; } - } - catch (bipc::interprocess_exception& ie) - { + } catch (bipc::interprocess_exception& ie) { cout << "\tno queues found" << endl; - } - catch (out_of_range& ie) - { + } catch (out_of_range& ie) { cout << "\tno queues found" << endl; } cout << "\n --> last heartbeats: " << endl << endl; auto now = chrono::high_resolution_clock::now(); - for (const auto& h : fDeviceHeartbeats) - { + for (const auto& h : fDeviceHeartbeats) { cout << "\t" << h.first << " : " << chrono::duration(now - h.second).count() << "ms ago." << endl; } @@ -505,12 +459,10 @@ void Monitor::PrintHelp() Monitor::~Monitor() { fManagementSegment.destroy(bipc::unique_instance); - if (fSignalThread.joinable()) - { + if (fSignalThread.joinable()) { fSignalThread.join(); } - if (fCleanOnExit) - { + if (fCleanOnExit) { Cleanup(fShmId); } } diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 6a323f27..61d85f2a 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -26,7 +26,7 @@ namespace shmem class Monitor { public: - Monitor(const std::string& sessionName, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit); + Monitor(const std::string& shmId, bool selfDestruct, bool interactive, unsigned int timeoutInMS, bool runAsDaemon, bool cleanOnExit); Monitor(const Monitor&) = delete; Monitor operator=(const Monitor&) = delete; @@ -36,8 +36,9 @@ class Monitor virtual ~Monitor(); - static void Cleanup(const std::string& sessionName); + static void Cleanup(const std::string& shmId); static void RemoveObject(const std::string&); + static void RemoveFileMapping(const std::string&); static void RemoveQueue(const std::string&); static void RemoveMutex(const std::string&); diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx index 87272a3c..ad5bbf26 100644 --- a/fairmq/shmem/Region.cxx +++ b/fairmq/shmem/Region.cxx @@ -10,7 +10,10 @@ #include #include +#include +#include #include +#include #include @@ -26,64 +29,77 @@ namespace mq namespace shmem { -Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback) +Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */) : fManager(manager) , fRemote(remote) , fStop(false) - , fName("fmq_" + fManager.fSessionName +"_rg_" + to_string(id)) - , fQueueName("fmq_" + fManager.fSessionName +"_rgq_" + to_string(id)) + , fName("fmq_" + fManager.fShmId + "_rg_" + to_string(id)) + , fQueueName("fmq_" + fManager.fShmId + "_rgq_" + to_string(id)) , fShmemObject() + , fFile(nullptr) + , fFileMapping() , fQueue(nullptr) , fReceiveAcksWorker() , fSendAcksWorker() , fCallback(callback) { - if (fRemote) - { - fShmemObject = bipc::shared_memory_object(bipc::open_only, fName.c_str(), bipc::read_write); - LOG(debug) << "shmem: located remote region: " << fName; + if (path != "") { + fName = string(path + fName); - fQueue = fair::mq::tools::make_unique(bipc::open_only, fQueueName.c_str()); - LOG(debug) << "shmem: located remote region queue: " << fQueueName; + fFile = fopen(fName.c_str(), fRemote ? "r+" : "w+"); + + if (!fFile) { + LOG(error) << "Failed to initialize file: " << fName; + LOG(error) << "errno: " << errno << ": " << strerror(errno); + throw runtime_error(tools::ToString("Failed to initialize file for shared memory region: ", strerror(errno))); + } + fFileMapping = bipc::file_mapping(fName.c_str(), bipc::read_write); + LOG(debug) << "shmem: initialized file: " << fName; + fRegion = bipc::mapped_region(fFileMapping, bipc::read_write, 0, size, 0, flags); + } else { + if (fRemote) { + fShmemObject = bipc::shared_memory_object(bipc::open_only, fName.c_str(), bipc::read_write); + } else { + fShmemObject = bipc::shared_memory_object(bipc::create_only, fName.c_str(), bipc::read_write); + fShmemObject.truncate(size); + } + fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, flags); } - else - { - fShmemObject = bipc::shared_memory_object(bipc::create_only, fName.c_str(), bipc::read_write); - LOG(debug) << "shmem: created region: " << fName; - fShmemObject.truncate(size); - fQueue = fair::mq::tools::make_unique(bipc::create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); - LOG(debug) << "shmem: created region queue: " << fQueueName; + InitializeQueues(); + LOG(debug) << "shmem: initialized region: " << fName; + fSendAcksWorker = thread(&Region::SendAcks, this); +} + +void Region::InitializeQueues() +{ + if (fRemote) { + fQueue = tools::make_unique(bipc::open_only, fQueueName.c_str()); + } else { + fQueue = tools::make_unique(bipc::create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock)); } - fRegion = bipc::mapped_region(fShmemObject, bipc::read_write); // TODO: add HUGEPAGES flag here - // fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_ANONYMOUS | MAP_HUGETLB); - - fSendAcksWorker = std::thread(&Region::SendAcks, this); + LOG(debug) << "shmem: initialized region queue: " << fQueueName; } void Region::StartReceivingAcks() { - fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this); + fReceiveAcksWorker = thread(&Region::ReceiveAcks, this); } void Region::ReceiveAcks() { unsigned int priority; bipc::message_queue::size_type recvdSize; - std::unique_ptr blocks = fair::mq::tools::make_unique(fAckBunchSize); + unique_ptr blocks = tools::make_unique(fAckBunchSize); - while (!fStop) // end thread condition (should exist until region is destroyed) - { + while (!fStop) { // end thread condition (should exist until region is destroyed) auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(500); - while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) - { + while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) { // LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId; - if (fCallback) - { + if (fCallback) { const auto numBlocks = recvdSize / sizeof(RegionBlock); - for (size_t i = 0; i < numBlocks; i++) - { + for (size_t i = 0; i < numBlocks; i++) { fCallback(reinterpret_cast(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast(blocks[i].fHint)); } } @@ -95,12 +111,11 @@ void Region::ReceiveAcks() void Region::ReleaseBlock(const RegionBlock &block) { - std::unique_lock lock(fBlockLock); + unique_lock lock(fBlockLock); fBlocksToFree.emplace_back(block); - if (fBlocksToFree.size() >= fAckBunchSize) - { + if (fBlocksToFree.size() >= fAckBunchSize) { lock.unlock(); // reduces contention on fBlockLock fBlockSendCV.notify_one(); } @@ -108,40 +123,33 @@ void Region::ReleaseBlock(const RegionBlock &block) void Region::SendAcks() { - std::unique_ptr blocks = fair::mq::tools::make_unique(fAckBunchSize); + unique_ptr blocks = tools::make_unique(fAckBunchSize); - while (true) // we'll try to send all acks before stopping - { + while (true) { // we'll try to send all acks before stopping size_t blocksToSend = 0; { // mutex locking block - std::unique_lock lock(fBlockLock); + unique_lock lock(fBlockLock); // try to get more blocks without waiting (we can miss a notify from CloseMessage()) - if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) - { + if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) { // cv.wait() timeout: send whatever blocks we have - fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500)); + fBlockSendCV.wait_for(lock, chrono::milliseconds(500)); } - blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize); + blocksToSend = min(fBlocksToFree.size(), fAckBunchSize); - std::copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); + copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get()); fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend); } // unlock the block mutex here while sending over IPC - if (blocksToSend > 0) - { - while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) - { + if (blocksToSend > 0) { + while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) { // receiver slow? yield and try again... this_thread::yield(); } - } - else // blocksToSend == 0 - { - if (fStop) - { + } else { // blocksToSend == 0 + if (fStop) { break; } } @@ -154,30 +162,31 @@ Region::~Region() { fStop = true; - if (fSendAcksWorker.joinable()) - { + if (fSendAcksWorker.joinable()) { fSendAcksWorker.join(); } - if (!fRemote) - { - if (fReceiveAcksWorker.joinable()) - { + if (!fRemote) { + if (fReceiveAcksWorker.joinable()) { fReceiveAcksWorker.join(); } - if (bipc::shared_memory_object::remove(fName.c_str())) - { + if (bipc::shared_memory_object::remove(fName.c_str())) { LOG(debug) << "shmem: destroyed region " << fName; } - if (bipc::message_queue::remove(fQueueName.c_str())) - { - LOG(debug) << "shmem: removed region queue " << fName; + if (bipc::file_mapping::remove(fName.c_str())) { + LOG(debug) << "shmem: destroyed file mapping " << fName; } - } - else - { + + if (fFile) { + fclose(fFile); + } + + if (bipc::message_queue::remove(fQueueName.c_str())) { + LOG(debug) << "shmem: removed region queue " << fQueueName; + } + } else { // LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; LOG(debug) << "shmem: region queue '" << fQueueName << "' is remote, no cleanup necessary"; } diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 11b5d980..c65c2bf7 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -40,13 +41,15 @@ class Manager; struct Region { - Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback = nullptr); + Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0); Region() = delete; Region(const Region&) = default; Region(Region&&) = default; + void InitializeQueues(); + void StartReceivingAcks(); void ReceiveAcks(); @@ -61,6 +64,8 @@ struct Region std::string fName; std::string fQueueName; boost::interprocess::shared_memory_object fShmemObject; + FILE* fFile; + boost::interprocess::file_mapping fFileMapping; boost::interprocess::mapped_region fRegion; std::mutex fBlockLock; diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index d06fc962..0bfe41ed 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -39,36 +39,30 @@ static void daemonize() umask(0); // Create a new SID for the child process - if (setsid() < 0) - { + if (setsid() < 0) { exit(1); } // Change the current working directory. This prevents the current directory from being locked; hence not being able to remove it. - if ((chdir("/")) < 0) - { + if ((chdir("/")) < 0) { exit(1); } // Redirect standard files to /dev/null - if (!freopen("/dev/null", "r", stdin)) - { + if (!freopen("/dev/null", "r", stdin)) { cout << "could not redirect stdin to /dev/null" << endl; } - if (!freopen("/dev/null", "w", stdout)) - { + if (!freopen("/dev/null", "w", stdout)) { cout << "could not redirect stdout to /dev/null" << endl; } - if (!freopen("/dev/null", "w", stderr)) - { + if (!freopen("/dev/null", "w", stderr)) { cout << "could not redirect stderr to /dev/null" << endl; } } int main(int argc, char** argv) { - try - { + try { string sessionName; string shmId; bool cleanup = false; @@ -93,26 +87,22 @@ int main(int argc, char** argv) variables_map vm; store(parse_command_line(argc, argv, desc), vm); - if (vm.count("help")) - { + if (vm.count("help")) { cout << "FairMQ Shared Memory Monitor" << endl << desc << endl; return 0; } notify(vm); - if (runAsDaemon) - { + if (runAsDaemon) { daemonize(); } - if (shmId == "") - { + if (shmId == "") { shmId = buildShmIdFromSessionIdAndUserId(sessionName); } - if (cleanup) - { + if (cleanup) { cout << "Cleaning up \"" << shmId << "\"..." << endl; Monitor::Cleanup(shmId); Monitor::RemoveQueue("fmq_" + shmId + "_cq"); @@ -125,9 +115,7 @@ int main(int argc, char** argv) monitor.CatchSignals(); monitor.Run(); - } - catch (exception& e) - { + } catch (exception& e) { cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl; return 2; } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 6d096511..3b17eefc 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -69,7 +69,7 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionP return unique_ptr(new FairMQMessageZMQ(region, data, size, hint, this)); } -FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) +FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) { assert(fContext); return unique_ptr(new FairMQSocketZMQ(type, name, GetId(), fContext, this)); @@ -90,9 +90,9 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map(new FairMQPollerZMQ(channelsMap, channelList)); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const { - return unique_ptr(new FairMQUnmanagedRegionZMQ(size, callback)); + return unique_ptr(new FairMQUnmanagedRegionZMQ(size, callback, path, flags)); } fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 64d44395..ef06b69c 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -45,7 +45,7 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; fair::mq::Transport GetType() const override; diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx index 7b7e4188..ed9ce453 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx @@ -11,7 +11,7 @@ using namespace std; -FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback) +FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */) : fBuffer(malloc(size)) , fSize(size) , fCallback(callback) diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h index a92cc2a3..bf7c4a1d 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h @@ -12,6 +12,7 @@ #include "FairMQUnmanagedRegion.h" #include // size_t +#include class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion { @@ -19,7 +20,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion friend class FairMQMessageZMQ; public: - FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback); + FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;