From 58a312b73071bdc68af89262a692b30d4c7dfe48 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 14 Nov 2017 17:00:37 +0100 Subject: [PATCH] FairMQ: Introduce callbacks for the FairMQUnmanagedRegion. Callbacks are called when the data buffer of the message assiciated with the corresponding region is no longer needed by the transport. Example in examples/advanced/Region/ --- fairmq/CMakeLists.txt | 12 +- fairmq/FairMQDevice.cxx | 36 ++- fairmq/FairMQDevice.h | 4 +- fairmq/FairMQMessage.h | 7 +- fairmq/FairMQTransportFactory.h | 2 +- fairmq/FairMQUnmanagedRegion.h | 5 +- fairmq/nanomsg/FairMQMessageNN.cxx | 16 +- fairmq/nanomsg/FairMQMessageNN.h | 6 +- fairmq/nanomsg/FairMQSocketNN.cxx | 8 +- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 4 +- fairmq/nanomsg/FairMQTransportFactoryNN.h | 2 +- fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx | 3 +- fairmq/nanomsg/FairMQUnmanagedRegionNN.h | 3 +- fairmq/options/FairMQProgOptions.cxx | 30 +-- fairmq/shmem/{FairMQShmCommon.h => Common.h} | 20 +- fairmq/shmem/FairMQMessageSHM.cxx | 115 +++++++--- fairmq/shmem/FairMQMessageSHM.h | 36 ++- fairmq/shmem/FairMQPollerSHM.h | 16 +- fairmq/shmem/FairMQShmManager.h | 217 ------------------ fairmq/shmem/FairMQSocketSHM.cxx | 21 +- fairmq/shmem/FairMQSocketSHM.h | 55 ++--- fairmq/shmem/FairMQTransportFactorySHM.cxx | 58 +++-- fairmq/shmem/FairMQTransportFactorySHM.h | 13 +- fairmq/shmem/FairMQUnmanagedRegionSHM.cxx | 52 +---- fairmq/shmem/FairMQUnmanagedRegionSHM.h | 47 +--- fairmq/shmem/Manager.cxx | 141 ++++++++++++ fairmq/shmem/Manager.h | 81 +++++++ .../{FairMQShmMonitor.cxx => Monitor.cxx} | 41 ++-- .../shmem/{FairMQShmMonitor.h => Monitor.h} | 4 +- fairmq/shmem/README.md | 16 +- fairmq/shmem/Region.cxx | 121 ++++++++++ fairmq/shmem/Region.h | 68 ++++++ ...runFairMQShmMonitor.cxx => runMonitor.cxx} | 6 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 11 +- fairmq/zeromq/FairMQMessageZMQ.h | 3 - fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 4 +- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 2 +- fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx | 3 +- fairmq/zeromq/FairMQUnmanagedRegionZMQ.h | 6 +- 39 files changed, 747 insertions(+), 548 deletions(-) rename fairmq/shmem/{FairMQShmCommon.h => Common.h} (78%) delete mode 100644 fairmq/shmem/FairMQShmManager.h create mode 100644 fairmq/shmem/Manager.cxx create mode 100644 fairmq/shmem/Manager.h rename fairmq/shmem/{FairMQShmMonitor.cxx => Monitor.cxx} (90%) rename fairmq/shmem/{FairMQShmMonitor.h => Monitor.h} (97%) create mode 100644 fairmq/shmem/Region.cxx create mode 100644 fairmq/shmem/Region.h rename fairmq/shmem/{runFairMQShmMonitor.cxx => runMonitor.cxx} (93%) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 8a5ade1a..240385a5 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -102,8 +102,10 @@ set(FAIRMQ_HEADER_FILES shmem/FairMQUnmanagedRegionSHM.h shmem/FairMQSocketSHM.h shmem/FairMQTransportFactorySHM.h - shmem/FairMQShmMonitor.h - shmem/FairMQShmCommon.h + shmem/Common.h + shmem/Manager.h + shmem/Monitor.h + shmem/Region.h tools/CppSTL.h tools/Network.h tools/Strings.h @@ -163,7 +165,9 @@ set(FAIRMQ_SOURCE_FILES shmem/FairMQUnmanagedRegionSHM.cxx shmem/FairMQSocketSHM.cxx shmem/FairMQTransportFactorySHM.cxx - shmem/FairMQShmMonitor.cxx + shmem/Manager.cxx + shmem/Monitor.cxx + shmem/Region.cxx zeromq/FairMQMessageZMQ.cxx zeromq/FairMQPollerZMQ.cxx zeromq/FairMQUnmanagedRegionZMQ.cxx @@ -262,7 +266,7 @@ target_link_libraries(splitter FairMQ) add_executable(runConfigExample options/runConfigEx.cxx) target_link_libraries(runConfigExample FairMQ) -add_executable(shmmonitor shmem/runFairMQShmMonitor.cxx) +add_executable(shmmonitor shmem/runMonitor.cxx) target_link_libraries(shmmonitor FairMQ) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 2d18ee10..a319f28a 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -119,7 +119,6 @@ void FairMQDevice::InitWrapper() } FairMQMessagePtr msg(fTransportFactory->CreateMessage()); - msg->SetDeviceId(fId); } // Containers to store the uninitialized channels. @@ -131,18 +130,14 @@ void FairMQDevice::InitWrapper() { for (auto vi = mi.second.begin(); vi != mi.second.end(); ++vi) { - if (vi->fModified) - { - if (vi->fReset) - { - vi->fSocket->Close(); - vi->fSocket = nullptr; - - vi->fPoller = nullptr; - - vi->fChannelCmdSocket->Close(); - vi->fChannelCmdSocket = nullptr; - } + // if (vi->fModified) + // { + // if (vi->fReset) + // { + // vi->fSocket.reset(); + // vi->fPoller.reset(); + // vi->fChannelCmdSocket.reset(); + // } // set channel name: name + vector index vi->fName = fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"); @@ -176,7 +171,7 @@ void FairMQDevice::InitWrapper() LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified."; throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); } - } + // } } } @@ -805,7 +800,6 @@ shared_ptr FairMQDevice::AddTransport(const string& tran } FairMQMessagePtr msg(tr->CreateMessage()); - msg->SetDeviceId(fId); return move(tr); } @@ -1064,14 +1058,10 @@ void FairMQDevice::Reset() // iterate over the channels vector for (auto& vi : mi.second) { - vi.fReset = true; - // vi.fSocket->Close(); - // vi.fSocket = nullptr; - - // vi.fPoller = nullptr; - - // vi.fChannelCmdSocket->Close(); - // vi.fChannelCmdSocket = nullptr; + // vi.fReset = true; + vi.fSocket.reset(); + vi.fPoller.reset(); + vi.fChannelCmdSocket.reset(); } } } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index b448f39d..89dfb7fc 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -240,9 +240,9 @@ class FairMQDevice : public FairMQStateMachine return Transport()->CreateUnmanagedRegion(size); } - FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size) + FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr) { - return fChannels.at(channel).at(index).Transport()->CreateUnmanagedRegion(size); + return fChannels.at(channel).at(index).Transport()->CreateUnmanagedRegion(size, callback); } template diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index d34bc16e..fa07fd66 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -32,9 +32,8 @@ class FairMQMessage virtual void* GetMessage() = 0; virtual void* GetData() = 0; virtual size_t GetSize() = 0; - virtual void SetMessage(void* data, size_t size) = 0; - virtual void SetDeviceId(const std::string& deviceId) = 0; + virtual void SetMessage(void* data, size_t size) = 0; virtual FairMQ::Transport GetType() const = 0; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 82f7a63e..0763397f 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -66,7 +66,7 @@ class FairMQTransportFactory /// Create a poller for two sockets virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0; - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size) const = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const = 0; /// Get transport type virtual FairMQ::Transport GetType() const = 0; diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index efe2e55e..dea9e8d6 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -10,7 +10,10 @@ #define FAIRMQUNMANAGEDREGION_H_ #include // size_t -#include // unique_ptr +#include // std::unique_ptr +#include // std::function + +using FairMQRegionCallback = std::function; class FairMQUnmanagedRegion { diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 04d7791d..2c474315 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -22,14 +22,13 @@ using namespace std; -string FairMQMessageNN::fDeviceID = string(); FairMQ::Transport FairMQMessageNN::fTransportType = FairMQ::Transport::NN; FairMQMessageNN::FairMQMessageNN() : fMessage(nullptr) , fSize(0) , fReceiving(false) - , fRegion(false) + , fRegionPtr(nullptr) { fMessage = nn_allocmsg(0, 0); if (!fMessage) @@ -42,7 +41,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size) : fMessage(nullptr) , fSize(0) , fReceiving(false) - , fRegion(false) + , fRegionPtr(nullptr) { fMessage = nn_allocmsg(size, 0); if (!fMessage) @@ -62,7 +61,7 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* : fMessage(nullptr) , fSize(0) , fReceiving(false) - , fRegion(false) + , fRegionPtr(nullptr) { fMessage = nn_allocmsg(size, 0); if (!fMessage) @@ -84,11 +83,11 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* } } -FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& /*region*/, void* data, const size_t size) +FairMQMessageNN::FairMQMessageNN(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) : fMessage(data) , fSize(size) , fReceiving(false) - , fRegion(true) + , fRegionPtr(region.get()) { // currently nanomsg will copy the buffer (data) inside nn_sendmsg() } @@ -153,11 +152,6 @@ void FairMQMessageNN::SetMessage(void* data, const size_t size) fSize = size; } -void FairMQMessageNN::SetDeviceId(const string& deviceId) -{ - fDeviceID = deviceId; -} - FairMQ::Transport FairMQMessageNN::GetType() const { return fTransportType; diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index c71e5ef0..752fb5c4 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -17,6 +17,7 @@ #include #include +#include #include "FairMQMessage.h" #include "FairMQUnmanagedRegion.h" @@ -42,8 +43,6 @@ class FairMQMessageNN : public FairMQMessage virtual void SetMessage(void* data, const size_t size); - virtual void SetDeviceId(const std::string& deviceId); - virtual FairMQ::Transport GetType() const; virtual void Copy(const std::unique_ptr& msg); @@ -56,8 +55,7 @@ class FairMQMessageNN : public FairMQMessage void* fMessage; size_t fSize; bool fReceiving; - bool fRegion; - static std::string fDeviceID; + FairMQUnmanagedRegion* fRegionPtr; static FairMQ::Transport fTransportType; void Clear(); diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 24054382..04f51cd0 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -15,6 +15,7 @@ #include "FairMQSocketNN.h" #include "FairMQMessageNN.h" #include "FairMQLogger.h" +#include "FairMQUnmanagedRegionNN.h" #include #include @@ -127,14 +128,17 @@ int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) while (true) { void* ptr = msg->GetMessage(); - if (static_cast(msg.get())->fRegion == false) + if (static_cast(msg.get())->fRegionPtr == nullptr) { nbytes = nn_send(fSocket, &ptr, NN_MSG, flags); } else { nbytes = nn_send(fSocket, ptr, msg->GetSize(), flags); + // nn_send copies the data, safe to call region callback here + static_cast(static_cast(msg.get())->fRegionPtr)->fCallback(msg->GetMessage(), msg->GetSize()); } + if (nbytes >= 0) { fBytesTx += nbytes; @@ -239,6 +243,8 @@ int64_t FairMQSocketNN::Send(vector>& msgVec, const in static_cast(msgVec[i].get())->fReceiving = false; packer.pack_bin(msgVec[i]->GetSize()); packer.pack_bin_body(static_cast(msgVec[i]->GetData()), msgVec[i]->GetSize()); + // call region callback + static_cast(static_cast(msgVec[i].get())->fRegionPtr)->fCallback(msgVec[i]->GetMessage(), msgVec[i]->GetSize()); } int64_t nbytes = -1; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 977a461f..d0752aac 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -65,9 +65,9 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSo return unique_ptr(new FairMQPollerNN(cmdSocket, dataSocket)); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const { - return unique_ptr(new FairMQUnmanagedRegionNN(size)); + return unique_ptr(new FairMQUnmanagedRegionNN(size, callback)); } FairMQ::Transport FairMQTransportFactoryNN::GetType() const diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 527d913d..7f2c9689 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -37,7 +37,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; FairMQ::Transport GetType() const override; diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx index 56a7b960..f5e4249f 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.cxx @@ -11,9 +11,10 @@ using namespace std; -FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size) +FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback) : fBuffer(malloc(size)) , fSize(size) + , fCallback(callback) { } diff --git a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h index 5b17ea2f..ff99c2be 100644 --- a/fairmq/nanomsg/FairMQUnmanagedRegionNN.h +++ b/fairmq/nanomsg/FairMQUnmanagedRegionNN.h @@ -18,7 +18,7 @@ class FairMQUnmanagedRegionNN : public FairMQUnmanagedRegion friend class FairMQSocketNN; public: - FairMQUnmanagedRegionNN(const size_t size); + FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback); FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete; FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete; @@ -30,6 +30,7 @@ class FairMQUnmanagedRegionNN : public FairMQUnmanagedRegion private: void* fBuffer; size_t fSize; + FairMQRegionCallback fCallback; }; #endif /* FAIRMQUNMANAGEDREGIONNN_H_ */ \ No newline at end of file diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 631e4f38..b93b121c 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -322,21 +322,21 @@ int FairMQProgOptions::NotifySwitchOption() void FairMQProgOptions::FillOptionDescription(boost::program_options::options_description& options) { options.add_options() - ("id", po::value(), "Device ID (required argument).") - ("io-threads", po::value()->default_value(1), "Number of I/O threads.") - ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") - ("config", po::value()->default_value("static"), "Config source ('static'/).") - ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") - ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file.") - ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0).") - ("initialization-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") - ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") - ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") - ("log-to-file", po::value()->default_value(""), "Log output to a file.") - ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") - ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") - ("shm-segment-name", po::value()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.") - ("rate", po::value()->default_value(0.), "rate for conditional run loop (Hz)") + ("id", po::value(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads.") + ("transport", po::value()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") + ("config", po::value()->default_value("static"), "Config source ('static'/).") + ("network-interface", po::value()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") + ("config-key", po::value(), "Use provided value instead of device id for fetching the configuration from the config file.") + ("catch-signals", po::value()->default_value(1), "Enable signal handling (1/0).") + ("initialization-timeout", po::value()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") + ("port-range-min", po::value()->default_value(22000), "Start of the port range for dynamic initialization.") + ("port-range-max", po::value()->default_value(32000), "End of the port range for dynamic initialization.") + ("log-to-file", po::value()->default_value(""), "Log output to a file.") + ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") + ("shm-segment-size", po::value()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") + ("shm-segment-name", po::value()->default_value("fmq_shm_main"), "shmem transport: name of the shared memory segment.") + ("rate", po::value()->default_value(0.), "rate for conditional run loop (Hz)") ; } diff --git a/fairmq/shmem/FairMQShmCommon.h b/fairmq/shmem/Common.h similarity index 78% rename from fairmq/shmem/FairMQShmCommon.h rename to fairmq/shmem/Common.h index c43d6a19..583f3340 100644 --- a/fairmq/shmem/FairMQShmCommon.h +++ b/fairmq/shmem/Common.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #ifndef FAIR_MQ_SHMEM_COMMON_H_ @@ -53,6 +53,22 @@ struct alignas(32) MetaHeader boost::interprocess::managed_shared_memory::handle_t fHandle; }; +struct RegionBlock +{ + RegionBlock() + : fHandle() + , fSize(0) + {} + + RegionBlock(boost::interprocess::managed_shared_memory::handle_t handle, size_t size) + : fHandle(handle) + , fSize(size) + {} + + boost::interprocess::managed_shared_memory::handle_t fHandle; + size_t fSize; +}; + } // namespace shmem } // namespace mq } // namespace fair diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index e4eb50c5..f2c0e868 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -8,28 +8,31 @@ #include #include +#include + #include "FairMQMessageSHM.h" #include "FairMQUnmanagedRegionSHM.h" #include "FairMQLogger.h" -#include "FairMQShmCommon.h" +#include "Common.h" using namespace std; using namespace fair::mq::shmem; namespace bipc = boost::interprocess; +namespace bpt = boost::posix_time; atomic FairMQMessageSHM::fInterrupted(false); FairMQ::Transport FairMQMessageSHM::fTransportType = FairMQ::Transport::SHM; -FairMQMessageSHM::FairMQMessageSHM() - : fMessage() +FairMQMessageSHM::FairMQMessageSHM(Manager& manager) + : fManager(manager) + , fMessage() , fQueued(false) , fMetaCreated(false) , fRegionId(0) - , fHandle() + , fHandle(-1) , fSize(0) , fLocalPtr(nullptr) - , fRemoteRegion(nullptr) { if (zmq_msg_init(&fMessage) != 0) { @@ -38,28 +41,28 @@ FairMQMessageSHM::FairMQMessageSHM() fMetaCreated = true; } -FairMQMessageSHM::FairMQMessageSHM(const size_t size) - : fMessage() +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size) + : fManager(manager) + , fMessage() , fQueued(false) , fMetaCreated(false) , fRegionId(0) - , fHandle() + , fHandle(-1) , fSize(0) , fLocalPtr(nullptr) - , fRemoteRegion(nullptr) { InitializeChunk(size); } -FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) - : fMessage() +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint) + : fManager(manager) + , fMessage() , fQueued(false) , fMetaCreated(false) , fRegionId(0) - , fHandle() + , fHandle(-1) , fSize(0) , fLocalPtr(nullptr) - , fRemoteRegion(nullptr) { if (InitializeChunk(size)) { @@ -75,15 +78,15 @@ FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn } } -FairMQMessageSHM::FairMQMessageSHM(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) - : fMessage() +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size) + : fManager(manager) + , fMessage() , fQueued(false) , fMetaCreated(false) , fRegionId(static_cast(region.get())->fRegionId) - , fHandle() + , fHandle(-1) , fSize(size) , fLocalPtr(data) - , fRemoteRegion(nullptr) { fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); @@ -105,11 +108,11 @@ FairMQMessageSHM::FairMQMessageSHM(FairMQUnmanagedRegionPtr& region, void* data, bool FairMQMessageSHM::InitializeChunk(const size_t size) { - while (!fHandle) + while (fHandle < 0) { try { - fLocalPtr = Manager::Instance().Segment()->allocate(size); + fLocalPtr = fManager.Segment().allocate(size); } catch (bipc::bad_alloc& ba) { @@ -124,7 +127,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size) continue; } } - fHandle = Manager::Instance().Segment()->get_handle_from_address(fLocalPtr); + fHandle = fManager.Segment().get_handle_from_address(fLocalPtr); } fSize = size; @@ -202,15 +205,20 @@ void* FairMQMessageSHM::GetData() { if (fRegionId == 0) { - return Manager::Instance().Segment()->get_address_from_handle(fHandle); + return fManager.Segment().get_address_from_handle(fHandle); } else { - if (!fRemoteRegion) + boost::interprocess::mapped_region* region = fManager.GetRemoteRegion(fRegionId); + if (region) { - fRemoteRegion = FairMQUnmanagedRegionSHM::GetRemoteRegion(fRegionId); + fLocalPtr = reinterpret_cast(region->get_address()) + fHandle; + } + else + { + // LOG(WARN) << "could not get pointer from a region message"; + fLocalPtr = nullptr; } - fLocalPtr = reinterpret_cast(fRemoteRegion->get_address()) + fHandle; return fLocalPtr; } } @@ -226,11 +234,6 @@ void FairMQMessageSHM::SetMessage(void*, const size_t) // dummy method to comply with the interface. functionality not allowed in zeromq. } -void FairMQMessageSHM::SetDeviceId(const string& /*deviceId*/) -{ - // fDeviceID = deviceId; -} - FairMQ::Transport FairMQMessageSHM::GetType() const { return fTransportType; @@ -238,7 +241,7 @@ FairMQ::Transport FairMQMessageSHM::GetType() const void FairMQMessageSHM::Copy(const unique_ptr& msg) { - if (!fHandle) + if (fHandle < 0) { bipc::managed_shared_memory::handle_t otherHandle = static_cast(msg.get())->fHandle; if (otherHandle) @@ -261,10 +264,56 @@ void FairMQMessageSHM::Copy(const unique_ptr& msg) void FairMQMessageSHM::CloseMessage() { - if (fHandle && !fQueued && fRegionId == 0) + if (fHandle >= 0 && !fQueued) { - Manager::Instance().Segment()->deallocate(Manager::Instance().Segment()->get_address_from_handle(fHandle)); - fHandle = 0; + if (fRegionId == 0) + { + fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle)); + fHandle = 0; + } + else + { + // send notification back to the receiver + // RegionBlock block(fHandle, fSize); + // if (fManager.GetRegionQueue(fRegionId).try_send(static_cast(&block), sizeof(RegionBlock), 0)) + // { + // // LOG(INFO) << "true"; + // } + // // else + // // { + // // LOG(DEBUG) << "could not send ack"; + // // } + + // timed version + RegionBlock block(fHandle, fSize); + bool success = false; + do + { + auto sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200); + bipc::message_queue* q = fManager.GetRegionQueue(fRegionId); + if (q) + { + if (q->timed_send(&block, sizeof(RegionBlock), 0, sndTill)) + { + success = true; + } + else + { + if (fInterrupted) + { + break; + } + LOG(DEBUG) << "region ack queue is full, retrying..."; + } + } + else + { + // LOG(WARN) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack"; + success = true; + } + } + while (!success); + } } if (fMetaCreated) diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index 14f82757..f5f3a0be 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -18,54 +18,52 @@ #include "FairMQMessage.h" #include "FairMQUnmanagedRegion.h" -#include "FairMQShmManager.h" +#include "Manager.h" class FairMQMessageSHM : public FairMQMessage { friend class FairMQSocketSHM; public: - FairMQMessageSHM(); - FairMQMessageSHM(const size_t size); - FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - FairMQMessageSHM(FairMQUnmanagedRegionPtr& region, void* data, const size_t size); + FairMQMessageSHM(fair::mq::shmem::Manager& manager); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, const size_t size); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size); FairMQMessageSHM(const FairMQMessageSHM&) = delete; FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete; bool InitializeChunk(const size_t size); - virtual void Rebuild(); - virtual void Rebuild(const size_t size); - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + void Rebuild() override; + void Rebuild(const size_t size) override; + void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - virtual void* GetMessage(); - virtual void* GetData(); - virtual size_t GetSize(); + void* GetMessage() override; + void* GetData() override; + size_t GetSize() override; - virtual void SetMessage(void* data, const size_t size); + void SetMessage(void* data, const size_t size) override; - virtual void SetDeviceId(const std::string& deviceId); + FairMQ::Transport GetType() const override; - virtual FairMQ::Transport GetType() const; - - virtual void Copy(const std::unique_ptr& msg); + void Copy(const std::unique_ptr& msg) override; void CloseMessage(); - virtual ~FairMQMessageSHM(); + ~FairMQMessageSHM() override; private: + fair::mq::shmem::Manager& fManager; zmq_msg_t fMessage; bool fQueued; bool fMetaCreated; static std::atomic fInterrupted; static FairMQ::Transport fTransportType; uint64_t fRegionId; - bipc::managed_shared_memory::handle_t fHandle; + boost::interprocess::managed_shared_memory::handle_t fHandle; size_t fSize; void* fLocalPtr; - boost::interprocess::mapped_region* fRemoteRegion; // cache region ptr }; #endif /* FAIRMQMESSAGESHM_H_ */ diff --git a/fairmq/shmem/FairMQPollerSHM.h b/fairmq/shmem/FairMQPollerSHM.h index eafd9ba1..bb4909ea 100644 --- a/fairmq/shmem/FairMQPollerSHM.h +++ b/fairmq/shmem/FairMQPollerSHM.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #ifndef FAIRMQPOLLERSHM_H_ @@ -35,13 +35,13 @@ class FairMQPollerSHM : public FairMQPoller void SetItemEvents(zmq_pollitem_t& item, const int type); - virtual void Poll(const int timeout); - virtual bool CheckInput(const int index); - virtual bool CheckOutput(const int index); - virtual bool CheckInput(const std::string channelKey, const int index); - virtual bool CheckOutput(const std::string channelKey, const int index); + void Poll(const int timeout) override; + bool CheckInput(const int index) override; + bool CheckOutput(const int index) override; + bool CheckInput(const std::string channelKey, const int index) override; + bool CheckOutput(const std::string channelKey, const int index) override; - virtual ~FairMQPollerSHM(); + ~FairMQPollerSHM() override; private: FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); diff --git a/fairmq/shmem/FairMQShmManager.h b/fairmq/shmem/FairMQShmManager.h deleted file mode 100644 index 4ca13ebc..00000000 --- a/fairmq/shmem/FairMQShmManager.h +++ /dev/null @@ -1,217 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQShmManager.h - * - * @since 2016-04-08 - * @author A. Rybalchenko - */ - -#ifndef FAIRMQSHMMANAGER_H_ -#define FAIRMQSHMMANAGER_H_ - -#include -#include - -#include -#include - -#include "FairMQLogger.h" - -namespace bipc = boost::interprocess; - -namespace fair -{ -namespace mq -{ -namespace shmem -{ - -class Manager -{ - public: - static Manager& Instance() - { - static Manager man; - return man; - } - - void InitializeSegment(const std::string& op, const std::string& name, const size_t size = 0) - { - if (!fSegment) - { - try - { - if (op == "open_or_create") - { - fSegment = new bipc::managed_shared_memory(bipc::open_or_create, name.c_str(), size); - } - else if (op == "create_only") - { - fSegment = new bipc::managed_shared_memory(bipc::create_only, name.c_str(), size); - } - else if (op == "open_only") - { - int numTries = 0; - bool success = false; - - do - { - try - { - fSegment = new bipc::managed_shared_memory(bipc::open_only, name.c_str()); - success = true; - } - catch (bipc::interprocess_exception& ie) - { - if (++numTries == 5) - { - LOG(ERROR) << "Could not open shared memory after " << numTries << " attempts, exiting!"; - exit(EXIT_FAILURE); - } - else - { - LOG(DEBUG) << "Could not open shared memory segment on try " << numTries << ". Retrying in 1 second..."; - LOG(DEBUG) << ie.what(); - - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - } - } - while (!success); - } - else - { - LOG(ERROR) << "Unknown operation when initializing shared memory segment: " << op; - } - } - catch (std::exception& e) - { - LOG(ERROR) << "Exception during shared memory segment initialization: " << e.what() << ", application will now exit"; - exit(EXIT_FAILURE); - } - } - else - { - LOG(INFO) << "Segment already initialized"; - } - } - - bipc::managed_shared_memory* Segment() const - { - if (fSegment) - { - return fSegment; - } - else - { - LOG(ERROR) << "Segment not initialized"; - exit(EXIT_FAILURE); - } - } - - void Remove() - { - if (bipc::shared_memory_object::remove("fairmq_shmem_main")) - { - LOG(DEBUG) << "shmem: successfully removed \"fairmq_shmem_main\" segment after the device has stopped."; - } - else - { - LOG(DEBUG) << "shmem: did not remove \"fairmq_shmem_main\" segment after the device stopped. Already removed?"; - } - - if (bipc::shared_memory_object::remove("fairmq_shmem_management")) - { - LOG(DEBUG) << "shmem: successfully removed \"fairmq_shmem_management\" segment after the device has stopped."; - } - else - { - LOG(DEBUG) << "shmem: did not remove \"fairmq_shmem_management\" segment after the device stopped. Already removed?"; - } - } - - bipc::managed_shared_memory& ManagementSegment() - { - return fManagementSegment; - } - - private: - Manager() - : fSegment(nullptr) - , fManagementSegment(bipc::open_or_create, "fairmq_shmem_management", 65536) - {} - Manager(const Manager&) = delete; - Manager operator=(const Manager&) = delete; - - bipc::managed_shared_memory* fSegment; - bipc::managed_shared_memory fManagementSegment; -}; - -// class Chunk -// { -// public: -// Chunk() -// : fHandle() -// , fSize(0) -// { -// } - -// Chunk(const size_t size) -// : fHandle() -// , fSize(size) -// { -// void* ptr = Manager::Instance().Segment()->allocate(size); -// fHandle = Manager::Instance().Segment()->get_handle_from_address(ptr); -// } - -// ~Chunk() -// { -// Manager::Instance().Segment()->deallocate(Manager::Instance().Segment()->get_address_from_handle(fHandle)); -// } - -// bipc::managed_shared_memory::handle_t GetHandle() const -// { -// return fHandle; -// } - -// void* GetData() const -// { -// return Manager::Instance().Segment()->get_address_from_handle(fHandle); -// } - -// size_t GetSize() const -// { -// return fSize; -// } - -// private: -// bipc::managed_shared_memory::handle_t fHandle; -// size_t fSize; -// }; - -// typedef bipc::managed_shared_ptr::type ShPtrType; - -// struct ShPtrOwner -// { -// ShPtrOwner(const ShPtrType& other) -// : fPtr(other) -// {} - -// ShPtrOwner(const ShPtrOwner& other) -// : fPtr(other.fPtr) -// {} - -// ShPtrType fPtr; -// }; - -} // namespace shmem -} // namespace mq -} // namespace fair - -#endif /* FAIRMQSHMMANAGER_H_ */ diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 5695095d..dd0aafe2 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -13,16 +13,17 @@ #include "FairMQMessageSHM.h" #include "FairMQUnmanagedRegionSHM.h" #include "FairMQLogger.h" -#include "FairMQShmCommon.h" +#include "Common.h" using namespace std; using namespace fair::mq::shmem; atomic FairMQSocketSHM::fInterrupted(false); -FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const string& id /*= ""*/, void* context) +FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) - , fSocket(NULL) + , fManager(manager) + , fSocket(nullptr) , fId() , fBytesTx(0) , fBytesRx(0) @@ -34,7 +35,7 @@ FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const s assert(context); fSocket = zmq_socket(context, GetConstant(type)); - if (fSocket == NULL) + if (fSocket == nullptr) { LOG(ERROR) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); @@ -67,7 +68,7 @@ FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const s if (type == "sub") { - if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0) != 0) + if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) { LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } @@ -301,7 +302,7 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int fla do { - FairMQMessagePtr part(new FairMQMessageSHM()); + FairMQMessagePtr part(new FairMQMessageSHM(fManager)); zmq_msg_t* msgPtr = static_cast(part->GetMessage()); int nbytes = zmq_msg_recv(msgPtr, fSocket, flags); @@ -360,7 +361,7 @@ void FairMQSocketSHM::Close() { // LOG(DEBUG) << "Closing socket " << fId; - if (fSocket == NULL) + if (fSocket == nullptr) { return; } @@ -370,20 +371,20 @@ void FairMQSocketSHM::Close() LOG(ERROR) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); } - fSocket = NULL; + fSocket = nullptr; } void FairMQSocketSHM::Interrupt() { + fManager.Interrupt(); FairMQMessageSHM::fInterrupted = true; - FairMQUnmanagedRegionSHM::fInterrupted = true; fInterrupted = true; } void FairMQSocketSHM::Resume() { + fManager.Resume(); FairMQMessageSHM::fInterrupted = false; - FairMQUnmanagedRegionSHM::fInterrupted = true; fInterrupted = false; } diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index c7b7f905..08253b24 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #ifndef FAIRMQSOCKETSHM_H_ @@ -14,52 +14,53 @@ #include "FairMQSocket.h" #include "FairMQMessage.h" -#include "FairMQShmManager.h" +#include "Manager.h" class FairMQSocketSHM : public FairMQSocket { public: - FairMQSocketSHM(const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr); + FairMQSocketSHM(fair::mq::shmem::Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr); FairMQSocketSHM(const FairMQSocketSHM&) = delete; FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete; - virtual std::string GetId(); + std::string GetId() override; - virtual bool Bind(const std::string& address); - virtual void Connect(const std::string& address); + bool Bind(const std::string& address) override; + void Connect(const std::string& address) override; - virtual int Send(FairMQMessagePtr& msg, const int flags = 0); - virtual int Receive(FairMQMessagePtr& msg, const int flags = 0); + int Send(FairMQMessagePtr& msg, const int flags = 0) override; + int Receive(FairMQMessagePtr& msg, const int flags = 0) override; - virtual int64_t Send(std::vector>& msgVec, const int flags = 0); - virtual int64_t Receive(std::vector>& msgVec, const int flags = 0); + int64_t Send(std::vector>& msgVec, const int flags = 0) override; + int64_t Receive(std::vector>& msgVec, const int flags = 0) override; - virtual void* GetSocket() const; - virtual int GetSocket(int nothing) const; - virtual void Close(); + void* GetSocket() const override; + int GetSocket(int nothing) const override; + void Close() override; - virtual void Interrupt(); - virtual void Resume(); + void Interrupt() override; + void Resume() override; - virtual void SetOption(const std::string& option, const void* value, size_t valueSize); - virtual void GetOption(const std::string& option, void* value, size_t* valueSize); + void SetOption(const std::string& option, const void* value, size_t valueSize) override; + void GetOption(const std::string& option, void* value, size_t* valueSize) override; - virtual unsigned long GetBytesTx() const; - virtual unsigned long GetBytesRx() const; - virtual unsigned long GetMessagesTx() const; - virtual unsigned long GetMessagesRx() const; + unsigned long GetBytesTx() const override; + unsigned long GetBytesRx() const override; + unsigned long GetMessagesTx() const override; + unsigned long GetMessagesRx() const override; - virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method); - virtual int GetSendTimeout() const; - virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method); - virtual int GetReceiveTimeout() const; + bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method) override; + int GetSendTimeout() const override; + bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method) override; + int GetReceiveTimeout() const override; static int GetConstant(const std::string& constant); - virtual ~FairMQSocketSHM(); + ~FairMQSocketSHM() override; private: void* fSocket; + fair::mq::shmem::Manager& fManager; std::string fId; std::atomic fBytesTx; std::atomic fBytesRx; diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 270c5c84..2bc1d784 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -7,7 +7,6 @@ ********************************************************************************/ #include "FairMQLogger.h" -#include "FairMQShmManager.h" #include "FairMQTransportFactorySHM.h" #include @@ -27,9 +26,10 @@ using namespace std; using namespace fair::mq::shmem; -namespace bipc = boost::interprocess; + namespace bfs = boost::filesystem; namespace bpt = boost::posix_time; +namespace bipc = boost::interprocess; FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM; @@ -39,8 +39,10 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai , fHeartbeatSocket(nullptr) , fHeartbeatThread() , fSendHeartbeats(true) - , fShMutex(bipc::open_or_create, "fairmq_shmem_mutex") + , fShMutex(bipc::open_or_create, "fmq_shm_mutex") , fDeviceCounter(nullptr) + , fSegmentName() + , fManager(nullptr) { int major, minor, patch; zmq_version(&major, &minor, &patch); @@ -53,15 +55,15 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); } - + int numIoThreads = 1; + fSegmentName = "fmq_shm_main"; size_t segmentSize = 2000000000; - string segmentName = "fairmq_shmem_main"; if (config) { numIoThreads = config->GetValue("io-threads"); + fSegmentName = config->GetValue("shm-segment-name"); segmentSize = config->GetValue("shm-segment-size"); - segmentName = config->GetValue("shm-segment-name"); } else { @@ -79,13 +81,13 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); } - Manager::Instance().InitializeSegment("open_or_create", segmentName, segmentSize); - LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes."; + fManager = fair::mq::tools::make_unique(fSegmentName, segmentSize); + LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes."; { bipc::scoped_lock lock(fShMutex); - fDeviceCounter = Manager::Instance().Segment()->find(bipc::unique_instance).first; + fDeviceCounter = fManager->Segment().find(bipc::unique_instance).first; if (fDeviceCounter) { LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; @@ -95,7 +97,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai else { LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1"; - fDeviceCounter = Manager::Instance().Segment()->construct(bipc::unique_instance)(1); + fDeviceCounter = fManager->Segment().construct(bipc::unique_instance)(1); LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->fCount; } @@ -110,7 +112,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai // } // else // { - // LOG(DEBUG) << "shmem: found shmmonitor in fairmq_shmem_management."; + // LOG(DEBUG) << "shmem: found shmmonitor in fmq_shm_management."; // } // } // catch (std::exception& e) @@ -140,7 +142,7 @@ void FairMQTransportFactorySHM::StartMonitor() do { - MonitorStatus* monitorStatus = Manager::Instance().ManagementSegment().find(bipc::unique_instance).first; + MonitorStatus* monitorStatus = fManager->ManagementSegment().find(bipc::unique_instance).first; if (monitorStatus) { LOG(DEBUG) << "shmem: shmmonitor started"; @@ -165,7 +167,7 @@ void FairMQTransportFactorySHM::SendHeartbeats() { try { - bipc::message_queue mq(bipc::open_only, "fairmq_shmem_control_queue"); + bipc::message_queue mq(bipc::open_only, "fmq_shm_control_queue"); bool heartbeat = true; bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill)) @@ -180,35 +182,35 @@ void FairMQTransportFactorySHM::SendHeartbeats() catch (bipc::interprocess_exception& ie) { this_thread::sleep_for(chrono::milliseconds(500)); - // LOG(WARN) << "no fairmq_shmem_control_queue found"; + // LOG(WARN) << "no fmq_shm_control_queue found"; } } } FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() const { - return unique_ptr(new FairMQMessageSHM()); + return unique_ptr(new FairMQMessageSHM(*fManager)); } FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(const size_t size) const { - return unique_ptr(new FairMQMessageSHM(size)); + return unique_ptr(new FairMQMessageSHM(*fManager, size)); } FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const { - return unique_ptr(new FairMQMessageSHM(data, size, ffn, hint)); + return unique_ptr(new FairMQMessageSHM(*fManager, data, size, ffn, hint)); } FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) const { - return unique_ptr(new FairMQMessageSHM(region, data, size)); + return unique_ptr(new FairMQMessageSHM(*fManager, region, data, size)); } FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) const { assert(fContext); - return unique_ptr(new FairMQSocketSHM(type, name, GetId(), fContext)); + return unique_ptr(new FairMQSocketSHM(*fManager, type, name, GetId(), fContext)); } FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const @@ -231,9 +233,9 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdS return unique_ptr(new FairMQPollerSHM(cmdSocket, dataSocket)); } -FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size) const +FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const { - return unique_ptr(new FairMQUnmanagedRegionSHM(size)); + return unique_ptr(new FairMQUnmanagedRegionSHM(*fManager, size, callback)); } FairMQTransportFactorySHM::~FairMQTransportFactorySHM() @@ -261,6 +263,8 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() LOG(ERROR) << "shmem: Terminate(): context not available for shutdown"; } + bool lastRemoved = false; + { // mutex scope bipc::scoped_lock lock(fShMutex); @@ -268,15 +272,21 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() if (fDeviceCounter->fCount == 0) { - LOG(DEBUG) << "shmem: last 'fairmq_shmem_main' user, removing segment."; + LOG(DEBUG) << "shmem: last " << fSegmentName << " user, removing segment."; - Manager::Instance().Remove(); + fManager->RemoveSegment(); + lastRemoved = true; } else { - LOG(DEBUG) << "shmem: other 'fairmq_shmem_main' users present (" << fDeviceCounter->fCount << "), not removing it."; + LOG(DEBUG) << "shmem: other " << fSegmentName << " users present (" << fDeviceCounter->fCount << "), not removing it."; } } + + if (lastRemoved) + { + boost::interprocess::named_mutex::remove("fmq_shm_mutex"); + } } FairMQ::Transport FairMQTransportFactorySHM::GetType() const diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index c656841b..14c6c0af 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -10,20 +10,21 @@ #define FAIRMQTRANSPORTFACTORYSHM_H_ #include "FairMQTransportFactory.h" +#include "Manager.h" #include "FairMQMessageSHM.h" #include "FairMQSocketSHM.h" #include "FairMQPollerSHM.h" -#include "FairMQShmCommon.h" -#include +#include "Common.h" #include "FairMQUnmanagedRegionSHM.h" +#include + +#include #include #include #include #include -#include - class FairMQTransportFactorySHM : public FairMQTransportFactory { public: @@ -43,7 +44,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const override; FairMQ::Transport GetType() const override; @@ -60,6 +61,8 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory std::atomic fSendHeartbeats; boost::interprocess::named_mutex fShMutex; fair::mq::shmem::DeviceCounter* fDeviceCounter; + std::string fSegmentName; + std::unique_ptr fManager; }; #endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */ diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx index 89d332b8..b2dc5e3f 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx @@ -7,26 +7,21 @@ ********************************************************************************/ #include "FairMQUnmanagedRegionSHM.h" -#include "FairMQShmManager.h" -#include "FairMQShmCommon.h" +#include "Common.h" using namespace std; using namespace fair::mq::shmem; namespace bipc = boost::interprocess; -atomic FairMQUnmanagedRegionSHM::fInterrupted(false); -unordered_map FairMQUnmanagedRegionSHM::fRemoteRegionMap; - -FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(const size_t size) - : fRegion(nullptr) +FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback) + : fManager(manager) + , fRegion(nullptr) , fRegionId(0) - , fRegionIdStr() - , fRemote(false) { try { - RegionCounter* rc = Manager::Instance().ManagementSegment().find(bipc::unique_instance).first; + RegionCounter* rc = fManager.ManagementSegment().find(bipc::unique_instance).first; if (rc) { LOG(DEBUG) << "shmem: region counter found, with value of " << rc->fCount << ". incrementing."; @@ -36,29 +31,13 @@ FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(const size_t size) else { LOG(DEBUG) << "shmem: no region counter found, creating one and initializing with 1"; - rc = Manager::Instance().ManagementSegment().construct(bipc::unique_instance)(1); + rc = fManager.ManagementSegment().construct(bipc::unique_instance)(1); LOG(DEBUG) << "shmem: initialized region counter with: " << rc->fCount; } fRegionId = rc->fCount; - fRegionIdStr = "fairmq_shmem_region_" + to_string(fRegionId); - auto it = fRemoteRegionMap.find(fRegionId); - if (it != fRemoteRegionMap.end()) - { - LOG(ERROR) << "Trying to create a region that already exists"; - } - else - { - string regionIdStr = "fairmq_shmem_region_" + to_string(fRegionId); - - LOG(DEBUG) << "creating region with id " << fRegionId; - - auto r = fRemoteRegionMap.emplace(fRegionId, RemoteRegion{regionIdStr, size}); - fRegion = &(r.first->second.fRegion); - - LOG(DEBUG) << "created region with id " << fRegionId; - } + fRegion = fManager.CreateRegion(size, fRegionId, callback); } catch (bipc::interprocess_exception& e) { @@ -78,22 +57,7 @@ size_t FairMQUnmanagedRegionSHM::GetSize() const return fRegion->get_size(); } -bipc::mapped_region* FairMQUnmanagedRegionSHM::GetRemoteRegion(uint64_t regionId) -{ - auto it = fRemoteRegionMap.find(regionId); - if (it != fRemoteRegionMap.end()) - { - return &(it->second.fRegion); - } - else - { - string regionIdStr = "fairmq_shmem_region_" + to_string(regionId); - - auto r = fRemoteRegionMap.emplace(regionId, RemoteRegion{regionIdStr, 0}); - return &(r.first->second.fRegion); - } -} - FairMQUnmanagedRegionSHM::~FairMQUnmanagedRegionSHM() { + fManager.RemoveRegion(fRegionId); } diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.h b/fairmq/shmem/FairMQUnmanagedRegionSHM.h index c177a983..7dbc2d82 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.h +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.h @@ -11,6 +11,7 @@ #include "FairMQUnmanagedRegion.h" #include "FairMQLogger.h" +#include "Manager.h" #include #include @@ -21,59 +22,23 @@ #include #include -struct RemoteRegion // todo: better name? -{ - RemoteRegion(std::string regionIdStr, uint64_t size) - : fRegionName(regionIdStr) - , fShmemObject(boost::interprocess::open_or_create, regionIdStr.c_str(), boost::interprocess::read_write) - { - if (size > 0) - { - fShmemObject.truncate(size); - } - fRegion = boost::interprocess::mapped_region(fShmemObject, boost::interprocess::read_write); // TODO: add HUGEPAGES flag here - } - - RemoteRegion() = delete; - - RemoteRegion(const RemoteRegion& rr) = default; - RemoteRegion(RemoteRegion&& rr) = default; - - ~RemoteRegion() - { - if (boost::interprocess::shared_memory_object::remove(fRegionName.c_str())) - { - LOG(DEBUG) << "destroyed region " << fRegionName; - } - } - - std::string fRegionName; - boost::interprocess::shared_memory_object fShmemObject; - boost::interprocess::mapped_region fRegion; -}; - class FairMQUnmanagedRegionSHM : public FairMQUnmanagedRegion { friend class FairMQSocketSHM; friend class FairMQMessageSHM; public: - FairMQUnmanagedRegionSHM(const size_t size); + FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr); - virtual void* GetData() const override; - virtual size_t GetSize() const override; + void* GetData() const override; + size_t GetSize() const override; - static boost::interprocess::mapped_region* GetRemoteRegion(uint64_t regionId); - - virtual ~FairMQUnmanagedRegionSHM(); + ~FairMQUnmanagedRegionSHM() override; private: - static std::atomic fInterrupted; + fair::mq::shmem::Manager& fManager; boost::interprocess::mapped_region* fRegion; uint64_t fRegionId; - std::string fRegionIdStr; - bool fRemote; - static std::unordered_map fRemoteRegionMap; }; #endif /* FAIRMQUNMANAGEDREGIONSHM_H_ */ \ No newline at end of file diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx new file mode 100644 index 00000000..6b6c6230 --- /dev/null +++ b/fairmq/shmem/Manager.cxx @@ -0,0 +1,141 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + + #include "Manager.h" + #include "Common.h" + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +using namespace std; +namespace bipc = boost::interprocess; + +Manager::Manager(const string& name, size_t size) + : fName(name) + , fSegment(bipc::open_or_create, fName.c_str(), size) + , fManagementSegment(bipc::open_or_create, "fmq_shm_management", 65536) + , fRegions() +{} + +bipc::managed_shared_memory& Manager::Segment() +{ + return fSegment; +} + +void Manager::Interrupt() +{ +} + +void Manager::Resume() +{ + // close remote regions before processing new transfers + for (auto it = fRegions.begin(); it != fRegions.end(); /**/) + { + if (it->second.fRemote) + { + it = fRegions.erase(it); + } + else + { + ++it; + } + } +} + +bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback) +{ + auto it = fRegions.find(id); + if (it != fRegions.end()) + { + LOG(ERROR) << "shmem: Trying to create a region that already exists"; + return nullptr; + } + else + { + auto r = fRegions.emplace(id, Region{*this, id, size, false, callback}); + + r.first->second.StartReceivingAcks(); + + return &(r.first->second.fRegion); + } +} + +bipc::mapped_region* Manager::GetRemoteRegion(const uint64_t id) +{ + // remote region could actually be a local one if a message originates from this device (has been sent out and returned) + auto it = fRegions.find(id); + if (it != fRegions.end()) + { + return &(it->second.fRegion); + } + else + { + try + { + auto r = fRegions.emplace(id, Region{*this, id, 0, true, nullptr}); + return &(r.first->second.fRegion); + } + catch (bipc::interprocess_exception& e) + { + // LOG(WARN) << "remote region (" << id << ") no longer exists"; + return nullptr; + } + + } +} + +void Manager::RemoveRegion(const uint64_t id) +{ + fRegions.erase(id); +} + +bipc::message_queue* Manager::GetRegionQueue(const uint64_t id) +{ + try + { + return fRegions.at(id).fQueue.get(); + } + catch (out_of_range& oor) + { + return nullptr; + } +} + +void Manager::RemoveSegment() +{ + if (bipc::shared_memory_object::remove(fName.c_str())) + { + LOG(DEBUG) << "shmem: successfully removed " << fName << " segment after the device has stopped."; + } + else + { + LOG(DEBUG) << "shmem: did not remove " << fName << " segment after the device stopped. Already removed?"; + } + + if (bipc::shared_memory_object::remove("fmq_shm_management")) + { + LOG(DEBUG) << "shmem: successfully removed \"fmq_shm_management\" segment after the device has stopped."; + } + else + { + LOG(DEBUG) << "shmem: did not remove \"fmq_shm_management\" segment after the device stopped. Already removed?"; + } +} + +bipc::managed_shared_memory& Manager::ManagementSegment() +{ + return fManagementSegment; +} + +} // namespace shmem +} // namespace mq +} // namespace fair diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h new file mode 100644 index 00000000..82eb5f84 --- /dev/null +++ b/fairmq/shmem/Manager.h @@ -0,0 +1,81 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQShmManager.h + * + * @since 2016-04-08 + * @author A. Rybalchenko + */ + +#ifndef FAIR_MQ_SHMEM_MANAGER_H_ +#define FAIR_MQ_SHMEM_MANAGER_H_ + +#include "FairMQLogger.h" +#include "FairMQMessage.h" +#include "fairmq/Tools.h" +#include "Region.h" +#include "Common.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +class Manager +{ + friend class Region; + + public: + Manager(const std::string& name, size_t size); + + Manager() = delete; + + Manager(const Manager&) = delete; + Manager operator=(const Manager&) = delete; + + boost::interprocess::managed_shared_memory& Segment(); + + void Interrupt(); + void Resume(); + + boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback); + boost::interprocess::mapped_region* GetRemoteRegion(const uint64_t id); + void RemoveRegion(const uint64_t id); + + boost::interprocess::message_queue* GetRegionQueue(const uint64_t id); + + void RemoveSegment(); + + boost::interprocess::managed_shared_memory& ManagementSegment(); + + private: + std::string fName; + boost::interprocess::managed_shared_memory fSegment; + boost::interprocess::managed_shared_memory fManagementSegment; + std::unordered_map fRegions; +}; + +} // namespace shmem +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_SHMEM_MANAGER_H_ */ diff --git a/fairmq/shmem/FairMQShmMonitor.cxx b/fairmq/shmem/Monitor.cxx similarity index 90% rename from fairmq/shmem/FairMQShmMonitor.cxx rename to fairmq/shmem/Monitor.cxx index d1a11bed..72bb0b62 100644 --- a/fairmq/shmem/FairMQShmMonitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -6,8 +6,8 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "FairMQShmMonitor.h" -#include "FairMQShmCommon.h" +#include "Monitor.h" +#include "Common.h" #include #include @@ -36,7 +36,7 @@ using StringVector = bipc::vector; namespace { - volatile std::sig_atomic_t gSignalStatus; + volatile std::sig_atomic_t gSignalStatus = 0; } namespace fair @@ -61,7 +61,7 @@ Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, , fHeartbeatTriggered(false) , fLastHeartbeat() , fSignalThread() - , fManagementSegment(bipc::open_or_create, "fairmq_shmem_management", 65536) + , fManagementSegment(bipc::open_or_create, "fmq_shm_management", 65536) { MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; if (monitorStatus != nullptr) @@ -71,7 +71,7 @@ Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, } fManagementSegment.construct(bipc::unique_instance)(); - CleanupControlQueues(); + RemoveQueue("fmq_shm_control_queue"); } void Monitor::CatchSignals() @@ -124,7 +124,7 @@ void Monitor::MonitorHeartbeats() { try { - bipc::message_queue mq(bipc::open_or_create, "fairmq_shmem_control_queue", 1000, sizeof(bool)); + bipc::message_queue mq(bipc::open_or_create, "fmq_shm_control_queue", 1000, sizeof(bool)); unsigned int priority; bipc::message_queue::size_type recvdSize; @@ -149,7 +149,7 @@ void Monitor::MonitorHeartbeats() cout << ie.what() << endl; } - CleanupControlQueues(); + RemoveQueue("fmq_shm_control_queue"); } void Monitor::Interactive() @@ -339,7 +339,7 @@ void Monitor::Cleanup(const string& segmentName) { try { - bipc::managed_shared_memory managementSegment(bipc::open_only, "fairmq_shmem_management"); + bipc::managed_shared_memory managementSegment(bipc::open_only, "fmq_shm_management"); RegionCounter* rc = managementSegment.find(bipc::unique_instance).first; if (rc) { @@ -347,7 +347,8 @@ void Monitor::Cleanup(const string& segmentName) unsigned int regionCount = rc->fCount; for (unsigned int i = 1; i <= regionCount; ++i) { - RemoveObject("fairmq_shmem_region_" + to_string(i)); + RemoveObject("fmq_shm_region_" + to_string(i)); + RemoveQueue(std::string("fmq_shm_region_queue_" + std::to_string(i))); } } else @@ -355,39 +356,41 @@ void Monitor::Cleanup(const string& segmentName) cout << "shmem: no region counter found. no regions to cleanup." << endl; } - RemoveObject("fairmq_shmem_management"); + RemoveObject("fmq_shm_management"); } catch (bipc::interprocess_exception& ie) { - cout << "Did not find \"fairmq_shmem_management\" shared memory segment. No regions to cleanup." << endl; + cout << "Did not find \"fmq_shm_management\" shared memory segment. No regions to cleanup." << endl; } RemoveObject(segmentName); - boost::interprocess::named_mutex::remove("fairmq_shmem_mutex"); + boost::interprocess::named_mutex::remove("fmq_shm_mutex"); + + cout << endl; } void Monitor::RemoveObject(const std::string& name) { if (bipc::shared_memory_object::remove(name.c_str())) { - cout << "Successfully removed \"" << name << "\" shared memory segment." << endl; + cout << "Successfully removed \"" << name << "\"." << endl; } else { - cout << "Did not remove \"" << name << "\" shared memory segment. Already removed?" << endl; + cout << "Did not remove \"" << name << "\". Already removed?" << endl; } } -void Monitor::CleanupControlQueues() +void Monitor::RemoveQueue(const std::string& name) { - if (bipc::message_queue::remove("fairmq_shmem_control_queue")) + if (bipc::message_queue::remove(name.c_str())) { - // cout << "successfully removed control queue" << endl; + cout << "Successfully removed \"" << name << "\"." << endl; } else { - // cout << "could not remove control queue" << endl; + cout << "Did not remove \"" << name << "\". Already removed?" << endl; } } @@ -398,7 +401,7 @@ void Monitor::PrintQueues() try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - StringVector* queues = segment.find("fairmq_shmem_queues").first; + StringVector* queues = segment.find("fmq_shm_queues").first; if (queues) { cout << "found " << queues->size() << " queue(s):" << endl; diff --git a/fairmq/shmem/FairMQShmMonitor.h b/fairmq/shmem/Monitor.h similarity index 97% rename from fairmq/shmem/FairMQShmMonitor.h rename to fairmq/shmem/Monitor.h index dd20f385..59746391 100644 --- a/fairmq/shmem/FairMQShmMonitor.h +++ b/fairmq/shmem/Monitor.h @@ -36,7 +36,8 @@ class Monitor virtual ~Monitor(); static void Cleanup(const std::string& segmentName); - static void CleanupControlQueues(); + static void RemoveObject(const std::string&); + static void RemoveQueue(const std::string&); private: void PrintHeader(); @@ -46,7 +47,6 @@ class Monitor void CheckSegment(); void Interactive(); void SignalMonitor(); - static void RemoveObject(const std::string&); bool fSelfDestruct; // will self-destruct after the memory has been closed bool fInteractive; // running in interactive mode diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index 82268e83..689082b3 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -10,16 +10,16 @@ Devices track and cleanup shared memory on shutdown. For more information on the The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes. -With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters: +With default arguments the monitor will run indefinitely with no output, and clean up shared memory segment if it is open and no heartbeats from devices arrive within a timeout period. It can be further customized with following parameters: - `--segment-name `: customize the name of the shared memory segment (default is "fairmq_shmem_main"). + `--segment-name `: customize the name of the shared memory segment (default is "fmq_shm_main"). `--cleanup`: start monitor, perform cleanup of the memory and quit. `--self-destruct`: run until the memory segment is closed (either naturally via cleanup performed by devices or in case of a crash (no heartbeats within timeout)). `--interactive`: run interactively, with detailed segment details and user input for various shmem operations. `--timeout `: specifiy the timeout for the heartbeats from shmem transports in milliseconds (default 5000). The options can be combined, with the exception of `--cleanup` option, which will invoke the described behaviour independent of other options. -Without the `--self-destruct` option, the monitor will run continously, moitoring (and cleaning up if needed) consecutive topologies. +Without the `--self-destruct` option, the monitor will run continuously, moitoring (and cleaning up if needed) consecutive topologies. Possible further implementation would be to run the monitor with `--self-destruct` with each topology. @@ -27,7 +27,9 @@ The FairMQShmMonitor class can also be used independently from the supplied exec FairMQ Shared Memory currently uses following names to register shared memory on the system: -`fairmq_shmem_main` - main segment name, used for user data (this name can be overridden via `--shm-segment-name`). -`fairmq_shmem_management` - management segment name, used for storing management data. -`fairmq_shmem_control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). -`fairmq_shmem_mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). +`fmq_shm_main` - main segment name, used for user data (this name can be overridden via `--shm-segment-name`). +`fmq_shm_management` - management segment name, used for storing management data. +`fmq_shm_control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). +`fmq_shm_mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). +`fmq_shm_region_` - names of unmanaged regions. +`fmq_shm_region_queue_` - names of queues for the unmanaged regions. diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx new file mode 100644 index 00000000..58350f04 --- /dev/null +++ b/fairmq/shmem/Region.cxx @@ -0,0 +1,121 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Region.h" +#include "Common.h" +#include "Manager.h" + +#include + +#include + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +using namespace std; + +namespace bipc = boost::interprocess; +namespace bpt = boost::posix_time; + +Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback) + : fManager(manager) + , fRemote(remote) + , fStop(false) + , fName("fmq_shm_region_" + to_string(id)) + , fQueueName("fmq_shm_region_queue_" + to_string(id)) + , fShmemObject() + , fQueue(nullptr) + , fWorker() + , fCallback(callback) +{ + if (fRemote) + { + fShmemObject = bipc::shared_memory_object(bipc::open_only, fName.c_str(), bipc::read_write); + LOG(DEBUG) << "shmem: located remote region: " << fName; + + fQueue = fair::mq::tools::make_unique(bipc::open_only, fQueueName.c_str()); + LOG(DEBUG) << "shmem: located remote region queue: " << fQueueName; + } + else + { + fShmemObject = bipc::shared_memory_object(bipc::create_only, fName.c_str(), bipc::read_write); + LOG(DEBUG) << "shmem: created region: " << fName; + fShmemObject.truncate(size); + + fQueue = fair::mq::tools::make_unique(bipc::create_only, fQueueName.c_str(), 10000, sizeof(RegionBlock)); + LOG(DEBUG) << "shmem: created region queue: " << fQueueName; + } + fRegion = bipc::mapped_region(fShmemObject, bipc::read_write); // TODO: add HUGEPAGES flag here + // fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_HUGETLB | MAP_HUGE_1GB); +} + +void Region::StartReceivingAcks() +{ + fWorker = std::thread(&Region::ReceiveAcks, this); +} + +void Region::ReceiveAcks() +{ + unsigned int priority; + bipc::message_queue::size_type recvdSize; + + while (!fStop) // end thread condition (should exist until region is destroyed) + { + auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200); + RegionBlock block; + if (fQueue->timed_receive(&block, sizeof(RegionBlock), recvdSize, priority, rcvTill)) + { + // LOG(DEBUG) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId; + if (fCallback) + { + fCallback(reinterpret_cast(fRegion.get_address()) + block.fHandle, block.fSize); + } + } + else + { + // LOG(DEBUG) << "queue " << fQueueName << " timeout!"; + } + } // while !fStop + + LOG(DEBUG) << "worker for " << fName << " leaving."; +} + +Region::~Region() +{ + if (!fRemote) + { + fStop = true; + if (fWorker.joinable()) + { + fWorker.join(); + } + + if (bipc::shared_memory_object::remove(fName.c_str())) + { + LOG(DEBUG) << "shmem: destroyed region " << fName; + } + + if (bipc::message_queue::remove(fQueueName.c_str())) + { + LOG(DEBUG) << "shmem: removed region queue " << fName; + } + } + else + { + // LOG(DEBUG) << "shmem: region '" << fName << "' is remote, no cleanup necessary."; + LOG(DEBUG) << "shmem: region queue '" << fQueueName << "' is remote, no cleanup necessary"; + } +} + +} // namespace shmem +} // namespace mq +} // namespace fair diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h new file mode 100644 index 00000000..207e9c0d --- /dev/null +++ b/fairmq/shmem/Region.h @@ -0,0 +1,68 @@ +/******************************************************************************** +* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * +* * +* This software is distributed under the terms of the * +* GNU Lesser General Public Licence version 3 (LGPL) version 3, * +* copied verbatim in the file "LICENSE" * +********************************************************************************/ +/** +* FairMQShmManager.h +* +* @since 2016-04-08 +* @author A. Rybalchenko +*/ + +#ifndef FAIR_MQ_SHMEM_REGION_H_ +#define FAIR_MQ_SHMEM_REGION_H_ + +#include "FairMQLogger.h" +#include "FairMQUnmanagedRegion.h" +#include "fairmq/Tools.h" + +#include +#include + +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +class Manager; + +struct Region +{ + Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback = nullptr); + + Region() = delete; + + Region(const Region&) = default; + Region(Region&&) = default; + + void StartReceivingAcks(); + void ReceiveAcks(); + + ~Region(); + + Manager& fManager; + bool fRemote; + bool fStop; + std::string fName; + std::string fQueueName; + boost::interprocess::shared_memory_object fShmemObject; + boost::interprocess::mapped_region fRegion; + std::unique_ptr fQueue; + std::thread fWorker; + FairMQRegionCallback fCallback; +}; + +} // namespace shmem +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_SHMEM_REGION_H_ */ diff --git a/fairmq/shmem/runFairMQShmMonitor.cxx b/fairmq/shmem/runMonitor.cxx similarity index 93% rename from fairmq/shmem/runFairMQShmMonitor.cxx rename to fairmq/shmem/runMonitor.cxx index 825bb746..c619ab76 100644 --- a/fairmq/shmem/runFairMQShmMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -5,7 +5,7 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "FairMQShmMonitor.h" +#include "Monitor.h" #include @@ -27,7 +27,7 @@ int main(int argc, char** argv) options_description desc("Options"); desc.add_options() - ("segment-name", value(&segmentName)->default_value("fairmq_shmem_main"), "Name of the shared memory segment") + ("segment-name", value(&segmentName)->default_value("fmq_shm_main"), "Name of the shared memory segment") ("cleanup", value(&cleanup)->implicit_value(true), "Perform cleanup and quit") ("self-destruct", value(&selfDestruct)->implicit_value(true), "Quit after first closing of the memory") ("interactive", value(&interactive)->implicit_value(true), "Interactive run") @@ -49,7 +49,7 @@ int main(int argc, char** argv) { cout << "Cleaning up \"" << segmentName << "\"..." << endl; fair::mq::shmem::Monitor::Cleanup(segmentName); - fair::mq::shmem::Monitor::CleanupControlQueues(); + fair::mq::shmem::Monitor::RemoveQueue("fmq_shm_control_queue"); return 0; } diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index a0fc2d69..2c70931f 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -17,10 +17,10 @@ #include "FairMQMessageZMQ.h" #include "FairMQLogger.h" +#include "FairMQUnmanagedRegionZMQ.h" using namespace std; -string FairMQMessageZMQ::fDeviceID = string(); FairMQ::Transport FairMQMessageZMQ::fTransportType = FairMQ::Transport::ZMQ; FairMQMessageZMQ::FairMQMessageZMQ() @@ -50,7 +50,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn } } -FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& /*region*/, void* data, const size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size) : fMessage() { // FIXME: make this zero-copy: @@ -62,6 +62,8 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& /*region*/, void* d } memcpy(zmq_msg_data(&fMessage), data, size); + // call region callback + static_cast(region.get())->fCallback(data, size); // if (zmq_msg_init_data(&fMessage, data, size, [](void*, void*){}, nullptr) != 0) // { @@ -116,11 +118,6 @@ void FairMQMessageZMQ::SetMessage(void*, const size_t) // dummy method to comply with the interface. functionality not allowed in zeromq. } -void FairMQMessageZMQ::SetDeviceId(const string& deviceId) -{ - fDeviceID = deviceId; -} - FairMQ::Transport FairMQMessageZMQ::GetType() const { return fTransportType; diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 3e27c286..ce7f0f03 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -41,8 +41,6 @@ class FairMQMessageZMQ : public FairMQMessage virtual void SetMessage(void* data, const size_t size); - virtual void SetDeviceId(const std::string& deviceId); - virtual FairMQ::Transport GetType() const; virtual void Copy(const std::unique_ptr& msg); @@ -53,7 +51,6 @@ class FairMQMessageZMQ : public FairMQMessage private: zmq_msg_t fMessage; - static std::string fDeviceID; static FairMQ::Transport fTransportType; }; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 1bf8565a..ae8b8f37 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -95,9 +95,9 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const FairMQSocket& cmdS return unique_ptr(new FairMQPollerZMQ(cmdSocket, dataSocket)); } -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size) const +FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const { - return unique_ptr(new FairMQUnmanagedRegionZMQ(size)); + return unique_ptr(new FairMQUnmanagedRegionZMQ(size, callback)); } FairMQ::Transport FairMQTransportFactoryZMQ::GetType() const diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 3f0a4b01..1bea8bd9 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -46,7 +46,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size) const override; + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback) const override; FairMQ::Transport GetType() const override; private: diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx index aab7b6f1..190e22dc 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx @@ -11,9 +11,10 @@ using namespace std; -FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size) +FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback) : fBuffer(malloc(size)) , fSize(size) + , fCallback(callback) { } diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h index 41a057a9..5a09813c 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h @@ -15,10 +15,11 @@ class FairMQUnmanagedRegionZMQ : public FairMQUnmanagedRegion { - friend class FairMQSocketSHM; + friend class FairMQSocketZMQ; + friend class FairMQMessageZMQ; public: - FairMQUnmanagedRegionZMQ(const size_t size); + FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback); FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete; @@ -30,6 +31,7 @@ class FairMQUnmanagedRegionZMQ : public FairMQUnmanagedRegion private: void* fBuffer; size_t fSize; + FairMQRegionCallback fCallback; }; #endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */ \ No newline at end of file