diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 35d3e38a..fc92679f 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -192,11 +192,11 @@ if(BUILD_FAIRMQ) plugins/Builtin.h plugins/config/Config.h plugins/Control.h - shmem/FairMQMessageSHM.h - shmem/FairMQPollerSHM.h - shmem/FairMQUnmanagedRegionSHM.h - shmem/FairMQSocketSHM.h - shmem/FairMQTransportFactorySHM.h + shmem/Message.h + shmem/Poller.h + shmem/UnmanagedRegion.h + shmem/Socket.h + shmem/TransportFactory.h shmem/Common.h shmem/Manager.h shmem/Region.h @@ -253,11 +253,11 @@ if(BUILD_FAIRMQ) SuboptParser.cxx plugins/config/Config.cxx plugins/Control.cxx - shmem/FairMQMessageSHM.cxx - shmem/FairMQPollerSHM.cxx - shmem/FairMQUnmanagedRegionSHM.cxx - shmem/FairMQSocketSHM.cxx - shmem/FairMQTransportFactorySHM.cxx + shmem/Message.cxx + shmem/Poller.cxx + shmem/UnmanagedRegion.cxx + shmem/Socket.cxx + shmem/TransportFactory.cxx shmem/Manager.cxx shmem/Region.cxx zeromq/FairMQMessageZMQ.cxx diff --git a/fairmq/FairMQPoller.h b/fairmq/FairMQPoller.h index d8a58983..1b41e19d 100644 --- a/fairmq/FairMQPoller.h +++ b/fairmq/FairMQPoller.h @@ -31,8 +31,8 @@ namespace fair namespace mq { -using PollerPtr = std::unique_ptr; - +using Poller = FairMQPoller; +using PollerPtr = FairMQPollerPtr; struct PollerError : std::runtime_error { using std::runtime_error::runtime_error; }; } /* namespace mq */ diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index dee98c16..f6e22ec4 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -22,7 +22,7 @@ class FairMQSocket FairMQSocket() {} FairMQSocket(FairMQTransportFactory* fac): fTransport(fac) {} - virtual std::string GetId() = 0; + virtual std::string GetId() const = 0; virtual bool Bind(const std::string& address) = 0; virtual bool Connect(const std::string& address) = 0; diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index 8c7ef5eb..0e6c5452 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -8,7 +8,7 @@ #include #include -#include +#include #ifdef BUILD_NANOMSG_TRANSPORT #include #endif /* BUILD_NANOMSG_TRANSPORT */ @@ -44,7 +44,7 @@ auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, con } else if (type == "shmem") { - return make_shared(finalId, config); + return make_shared(finalId, config); } #ifdef BUILD_NANOMSG_TRANSPORT else if (type == "nanomsg") diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 3c982c17..78e9b53b 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -137,6 +137,7 @@ namespace fair namespace mq { +using TransportFactory = FairMQTransportFactory; struct TransportFactoryError : std::runtime_error { using std::runtime_error::runtime_error; }; } /* namespace mq */ diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index c02caaf3..59e14f85 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -31,7 +31,9 @@ namespace fair namespace mq { -using UnmanagedRegionPtr = std::unique_ptr; +using RegionCallback = FairMQRegionCallback; +using UnmanagedRegion = FairMQUnmanagedRegion; +using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr; } /* namespace mq */ } /* namespace fair */ diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 42c7e0fd..3ba8dca6 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -91,11 +91,6 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str LOG(debug) << "Created socket " << GetId(); } -string FairMQSocketNN::GetId() -{ - return fId; -} - bool FairMQSocketNN::Bind(const string& address) { // LOG(info) << "bind socket " << fId << " on " << address; diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 241162d4..6905383f 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -23,7 +23,7 @@ class FairMQSocketNN final : public FairMQSocket FairMQSocketNN(const FairMQSocketNN&) = delete; FairMQSocketNN operator=(const FairMQSocketNN&) = delete; - std::string GetId() override; + std::string GetId() const override { return fId; } bool Bind(const std::string& address) override; bool Connect(const std::string& address) override; diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 598fdfdc..2a738f4a 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -43,7 +43,7 @@ class Socket final : public fair::mq::Socket Socket(const Socket&) = delete; Socket operator=(const Socket&) = delete; - auto GetId() -> std::string { return fId; } + auto GetId() const -> std::string override { return fId; } auto Bind(const std::string& address) -> bool override; auto Connect(const std::string& address) -> bool override; diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 7b0f618b..ba98b3c8 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -77,8 +77,8 @@ struct MetaHeader { size_t fSize; size_t fRegionId; - boost::interprocess::managed_shared_memory::handle_t fHandle; size_t fHint; + boost::interprocess::managed_shared_memory::handle_t fHandle; }; struct RegionBlock diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx deleted file mode 100644 index ab942b7e..00000000 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ /dev/null @@ -1,302 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -#include "Common.h" -#include "Region.h" - -#include "FairMQMessageSHM.h" -#include "FairMQUnmanagedRegionSHM.h" -#include "FairMQLogger.h" - -#include - -#include - -using namespace std; -using namespace fair::mq::shmem; - -namespace bipc = ::boost::interprocess; -namespace bpt = ::boost::posix_time; - -atomic FairMQMessageSHM::fInterrupted(false); -fair::mq::Transport FairMQMessageSHM::fTransportType = fair::mq::Transport::SHM; - -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fManager(manager) - , fMessage() - , fQueued(false) - , fMetaCreated(false) - , fRegionId(0) - , fRegionPtr(nullptr) - , fHandle(-1) - , fSize(0) - , fHint(0) - , fLocalPtr(nullptr) -{ - if (zmq_msg_init(&fMessage) != 0) { - LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); - } - fMetaCreated = true; -} - -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fManager(manager) - , fMessage() - , fQueued(false) - , fMetaCreated(false) - , fRegionId(0) - , fRegionPtr(nullptr) - , fHandle(-1) - , fSize(0) - , fHint(0) - , fLocalPtr(nullptr) -{ - InitializeChunk(size); -} - -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, MetaHeader* hdr, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fManager(manager) - , fMessage() - , fQueued(false) - , fMetaCreated(false) - , fRegionId(hdr->fRegionId) - , fRegionPtr(nullptr) - , fHandle(hdr->fHandle) - , fSize(hdr->fSize) - , fHint(hdr->fHint) - , fLocalPtr(nullptr) -{ - if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) { - LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); - } - // fill the zmq buffer with the delivered meta data - memcpy(zmq_msg_data(&fMessage), hdr, sizeof(MetaHeader)); - fMetaCreated = true; -} - -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fManager(manager) - , fMessage() - , fQueued(false) - , fMetaCreated(false) - , fRegionId(0) - , fRegionPtr(nullptr) - , fHandle(-1) - , fSize(0) - , fHint(0) - , fLocalPtr(nullptr) -{ - if (InitializeChunk(size)) { - memcpy(fLocalPtr, data, size); - if (ffn) { - ffn(data, hint); - } else { - free(data); - } - } -} - -FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory) - : FairMQMessage{factory} - , fManager(manager) - , fMessage() - , fQueued(false) - , fMetaCreated(false) - , fRegionId(static_cast(region.get())->fRegionId) - , fRegionPtr(nullptr) - , fHandle(-1) - , fSize(size) - , fHint(reinterpret_cast(hint)) - , fLocalPtr(static_cast(data)) -{ - if (reinterpret_cast(data) >= reinterpret_cast(region->GetData()) || - reinterpret_cast(data) <= reinterpret_cast(region->GetData()) + region->GetSize()) { - fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); - - if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) { - LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno); - } else { - MetaHeader header; - header.fSize = size; - header.fHandle = fHandle; - header.fRegionId = fRegionId; - header.fHint = fHint; - memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader)); - - fMetaCreated = true; - } - } else { - LOG(error) << "trying to create region message with data from outside the region"; - throw runtime_error("trying to create region message with data from outside the region"); - } -} - -bool FairMQMessageSHM::InitializeChunk(const size_t size) -{ - while (fHandle < 0) { - try { - bipc::managed_shared_memory::size_type actualSize = size; - char* hint = 0; // unused for bipc::allocate_new - fLocalPtr = fManager.Segment().allocation_command(bipc::allocate_new, size, actualSize, hint); - } catch (bipc::bad_alloc& ba) { - // LOG(warn) << "Shared memory full..."; - this_thread::sleep_for(chrono::milliseconds(50)); - if (fInterrupted) { - return false; - } else { - continue; - } - } - fHandle = fManager.Segment().get_handle_from_address(fLocalPtr); - } - - fSize = size; - - if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) { - LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno); - return false; - } - MetaHeader header; - header.fSize = size; - header.fHandle = fHandle; - header.fRegionId = fRegionId; - header.fHint = fHint; - memcpy(zmq_msg_data(&fMessage), &header, sizeof(MetaHeader)); - - fMetaCreated = true; - - return true; -} - -void FairMQMessageSHM::Rebuild() -{ - CloseMessage(); - - fQueued = false; - - if (zmq_msg_init(&fMessage) != 0) { - LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); - } - fMetaCreated = true; -} - -void FairMQMessageSHM::Rebuild(const size_t size) -{ - CloseMessage(); - fQueued = false; - InitializeChunk(size); -} - -void FairMQMessageSHM::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -{ - CloseMessage(); - - fQueued = false; - - if (InitializeChunk(size)) { - memcpy(fLocalPtr, data, size); - if (ffn) { - ffn(data, hint); - } else { - free(data); - } - } -} - -void* FairMQMessageSHM::GetData() const -{ - if (!fLocalPtr) { - if (fRegionId == 0) { - if (fSize > 0) { - fLocalPtr = reinterpret_cast(fManager.Segment().get_address_from_handle(fHandle)); - } else { - fLocalPtr = nullptr; - } - } else { - fRegionPtr = fManager.GetRemoteRegion(fRegionId); - if (fRegionPtr) { - fLocalPtr = reinterpret_cast(fRegionPtr->fRegion.get_address()) + fHandle; - } else { - // LOG(warn) << "could not get pointer from a region message"; - fLocalPtr = nullptr; - } - } - } - - return fLocalPtr; -} - -bool FairMQMessageSHM::SetUsedSize(const size_t size) -{ - if (size == fSize) { - return true; - } else if (size <= fSize) { - try { - bipc::managed_shared_memory::size_type shrunkSize = size; - fLocalPtr = fManager.Segment().allocation_command(bipc::shrink_in_place, fSize + 128, shrunkSize, fLocalPtr); - fSize = size; - - // update meta header - MetaHeader* hdrPtr = static_cast(zmq_msg_data(&fMessage)); - hdrPtr->fSize = fSize; - return true; - } catch (bipc::interprocess_exception& e) { - LOG(info) << "could not set used size: " << e.what(); - return false; - } - } else { - LOG(error) << "cannot set used size higher than original."; - return false; - } -} - -void FairMQMessageSHM::Copy(const FairMQMessage& msg) -{ - if (fHandle < 0) { - bipc::managed_shared_memory::handle_t otherHandle = static_cast(msg).fHandle; - if (otherHandle) { - if (InitializeChunk(msg.GetSize())) { - memcpy(GetData(), msg.GetData(), msg.GetSize()); - } - } else { - LOG(error) << "copy fail: source message not initialized!"; - } - } else { - LOG(error) << "copy fail: target message already initialized!"; - } -} - -void FairMQMessageSHM::CloseMessage() -{ - if (fHandle >= 0 && !fQueued) { - if (fRegionId == 0) { - fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle)); - fHandle = -1; - } else { - if (!fRegionPtr) { - fRegionPtr = fManager.GetRemoteRegion(fRegionId); - } - - if (fRegionPtr) { - fRegionPtr->ReleaseBlock({fHandle, fSize, fHint}); - } else { - LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack"; - } - } - } - - if (fMetaCreated) { - if (zmq_msg_close(&fMessage) != 0) { - LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); - } - fMetaCreated = false; - } -} diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h deleted file mode 100644 index 315d5240..00000000 --- a/fairmq/shmem/FairMQMessageSHM.h +++ /dev/null @@ -1,84 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -#ifndef FAIRMQMESSAGESHM_H_ -#define FAIRMQMESSAGESHM_H_ - -#include - -#include "FairMQMessage.h" -#include "FairMQUnmanagedRegion.h" - -#include - -#include - -#include // size_t -#include - -class FairMQSocketSHM; -namespace fair -{ -namespace mq -{ -namespace shmem -{ -class MetaHeader; -} -} -} - -class FairMQMessageSHM final : public FairMQMessage -{ - friend class FairMQSocketSHM; - - public: - FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQTransportFactory* factory = nullptr); - FairMQMessageSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr); - FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr); - FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr); - - FairMQMessageSHM(fair::mq::shmem::Manager& manager, fair::mq::shmem::MetaHeader* hdr, FairMQTransportFactory* factory = nullptr); - - FairMQMessageSHM(const FairMQMessageSHM&) = delete; - FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete; - - void Rebuild() override; - void Rebuild(const size_t size) override; - void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - - void* GetData() const override; - size_t GetSize() const override { return fSize; } - - bool SetUsedSize(const size_t size) override; - - fair::mq::Transport GetType() const override { return fTransportType; } - - void Copy(const FairMQMessage& msg) override; - - ~FairMQMessageSHM() override { CloseMessage(); } - - private: - fair::mq::shmem::Manager& fManager; - zmq_msg_t fMessage; - bool fQueued; - bool fMetaCreated; - static std::atomic fInterrupted; - static fair::mq::Transport fTransportType; - size_t fRegionId; - mutable fair::mq::shmem::Region* fRegionPtr; - boost::interprocess::managed_shared_memory::handle_t fHandle; - size_t fSize; - size_t fHint; - mutable char* fLocalPtr; - - bool InitializeChunk(const size_t size); - zmq_msg_t* GetMessage() { return &fMessage; } - void CloseMessage(); -}; - -#endif /* FAIRMQMESSAGESHM_H_ */ diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h deleted file mode 100644 index 6dd8959d..00000000 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ /dev/null @@ -1,67 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIRMQTRANSPORTFACTORYSHM_H_ -#define FAIRMQTRANSPORTFACTORYSHM_H_ - -#include -#include - -#include "FairMQTransportFactory.h" -#include "FairMQMessageSHM.h" -#include "FairMQSocketSHM.h" -#include "FairMQPollerSHM.h" -#include "FairMQUnmanagedRegionSHM.h" -#include - -#include -#include -#include -#include - -class FairMQTransportFactorySHM final : public FairMQTransportFactory -{ - public: - FairMQTransportFactorySHM(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr); - FairMQTransportFactorySHM(const FairMQTransportFactorySHM&) = delete; - FairMQTransportFactorySHM operator=(const FairMQTransportFactorySHM&) = delete; - - FairMQMessagePtr CreateMessage() override; - FairMQMessagePtr CreateMessage(const size_t size) override; - FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override; - - FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override; - - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; - - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; - - fair::mq::Transport GetType() const override; - - void Interrupt() override { FairMQSocketSHM::Interrupt(); } - void Resume() override { FairMQSocketSHM::Resume(); } - void Reset() override {} - - ~FairMQTransportFactorySHM() override; - - private: - void SendHeartbeats(); - - static fair::mq::Transport fTransportType; - std::string fDeviceId; - std::string fShmId; - void* fZMQContext; - std::unique_ptr fManager; - std::thread fHeartbeatThread; - std::atomic fSendHeartbeats; -}; - -#endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */ diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index c3ca3b44..f76c111c 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -6,8 +6,9 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include "Manager.h" +#include "Common.h" + #include #include @@ -83,7 +84,7 @@ void Manager::StartMonitor(const std::string& id) this_thread::sleep_for(chrono::milliseconds(10)); if (++numTries > 1000) { LOG(error) << "Did not get response from fairmq-shmmonitor after " << 10 * 1000 << " milliseconds. Exiting."; - throw runtime_error(fair::mq::tools::ToString("Did not get response from fairmq-shmmonitor after ", 10 * 1000, " milliseconds. Exiting.")); + throw runtime_error(tools::ToString("Did not get response from fairmq-shmmonitor after ", 10 * 1000, " milliseconds. Exiting.")); } } } while (true); @@ -109,7 +110,7 @@ void Manager::Resume() } } -bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) +bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) { auto it = fRegions.find(id); if (it != fRegions.end()) { @@ -125,7 +126,7 @@ bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, } // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; - auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, size, false, callback, path, flags)); + auto r = fRegions.emplace(id, tools::make_unique(*this, id, size, false, callback, path, flags)); r.first->second->StartReceivingAcks(); @@ -158,7 +159,7 @@ Region* Manager::GetRemoteRegion(const uint64_t id) } // LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; - auto r = fRegions.emplace(id, fair::mq::tools::make_unique(*this, id, 0, true, nullptr, path, flags)); + auto r = fRegions.emplace(id, tools::make_unique(*this, id, 0, true, nullptr, path, flags)); return r.first->second.get(); } catch (bie& e) { LOG(warn) << "Could not get remote region for id: " << id; diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 2a1e2569..c9bb5e92 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** - * FairMQShmManager.h + * Manager.h * * @since 2016-04-08 * @author A. Rybalchenko @@ -15,11 +15,10 @@ #ifndef FAIR_MQ_SHMEM_MANAGER_H_ #define FAIR_MQ_SHMEM_MANAGER_H_ -#include -#include +#include "Region.h" +#include "Common.h" -#include "FairMQLogger.h" -#include "FairMQMessage.h" +#include #include #include @@ -64,7 +63,7 @@ class Manager int IncrementDeviceCounter(); int DecrementDeviceCounter(); - boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); + boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path = "", int flags = 0); Region* GetRemoteRegion(const uint64_t id); void RemoveRegion(const uint64_t id); @@ -77,7 +76,7 @@ class Manager boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fManagementSegment; boost::interprocess::named_mutex fShmMtx; - fair::mq::shmem::DeviceCounter* fDeviceCounter; + DeviceCounter* fDeviceCounter; static std::unordered_map> fRegions; }; diff --git a/fairmq/shmem/Message.cxx b/fairmq/shmem/Message.cxx new file mode 100644 index 00000000..56c985f0 --- /dev/null +++ b/fairmq/shmem/Message.cxx @@ -0,0 +1,232 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Region.h" +#include "Message.h" +#include "UnmanagedRegion.h" + +#include + +#include + +#include + +using namespace std; + +namespace bipc = ::boost::interprocess; +namespace bpt = ::boost::posix_time; + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +atomic Message::fInterrupted(false); +Transport Message::fTransportType = Transport::SHM; + +Message::Message(Manager& manager, FairMQTransportFactory* factory) + : fair::mq::Message{factory} + , fManager(manager) + , fQueued(false) + , fMeta{0, 0, 0, -1} + , fRegionPtr(nullptr) + , fLocalPtr(nullptr) +{ +} + +Message::Message(Manager& manager, const size_t size, FairMQTransportFactory* factory) + : fair::mq::Message{factory} + , fManager(manager) + , fQueued(false) + , fMeta{0, 0, 0, -1} + , fRegionPtr(nullptr) + , fLocalPtr(nullptr) +{ + InitializeChunk(size); +} + +Message::Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory) + : fair::mq::Message{factory} + , fManager(manager) + , fQueued(false) + , fMeta{hdr} + , fRegionPtr(nullptr) + , fLocalPtr(nullptr) +{ +} + +Message::Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) + : fair::mq::Message{factory} + , fManager(manager) + , fQueued(false) + , fMeta{0, 0, 0, -1} + , fRegionPtr(nullptr) + , fLocalPtr(nullptr) +{ + if (InitializeChunk(size)) { + std::memcpy(fLocalPtr, data, size); + if (ffn) { + ffn(data, hint); + } else { + free(data); + } + } +} + +Message::Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory) + : fair::mq::Message{factory} + , fManager(manager) + , fQueued(false) + , fMeta{size, static_cast(region.get())->fRegionId, reinterpret_cast(hint), -1} + , fRegionPtr(nullptr) + , fLocalPtr(static_cast(data)) +{ + if (reinterpret_cast(data) >= reinterpret_cast(region->GetData()) || + reinterpret_cast(data) <= reinterpret_cast(region->GetData()) + region->GetSize()) { + fMeta.fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); + } else { + LOG(error) << "trying to create region message with data from outside the region"; + throw runtime_error("trying to create region message with data from outside the region"); + } +} + +bool Message::InitializeChunk(const size_t size) +{ + while (fMeta.fHandle < 0) { + try { + bipc::managed_shared_memory::size_type actualSize = size; + char* hint = 0; // unused for bipc::allocate_new + fLocalPtr = fManager.Segment().allocation_command(bipc::allocate_new, size, actualSize, hint); + } catch (bipc::bad_alloc& ba) { + // LOG(warn) << "Shared memory full..."; + this_thread::sleep_for(chrono::milliseconds(50)); + if (fInterrupted) { + return false; + } else { + continue; + } + } + fMeta.fHandle = fManager.Segment().get_handle_from_address(fLocalPtr); + } + + fMeta.fSize = size; + return true; +} + +void Message::Rebuild() +{ + CloseMessage(); + fQueued = false; +} + +void Message::Rebuild(const size_t size) +{ + CloseMessage(); + fQueued = false; + InitializeChunk(size); +} + +void Message::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) +{ + CloseMessage(); + fQueued = false; + + if (InitializeChunk(size)) { + std::memcpy(fLocalPtr, data, size); + if (ffn) { + ffn(data, hint); + } else { + free(data); + } + } +} + +void* Message::GetData() const +{ + if (!fLocalPtr) { + if (fMeta.fRegionId == 0) { + if (fMeta.fSize > 0) { + fLocalPtr = reinterpret_cast(fManager.Segment().get_address_from_handle(fMeta.fHandle)); + } else { + fLocalPtr = nullptr; + } + } else { + fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId); + if (fRegionPtr) { + fLocalPtr = reinterpret_cast(fRegionPtr->fRegion.get_address()) + fMeta.fHandle; + } else { + // LOG(warn) << "could not get pointer from a region message"; + fLocalPtr = nullptr; + } + } + } + + return fLocalPtr; +} + +bool Message::SetUsedSize(const size_t size) +{ + if (size == fMeta.fSize) { + return true; + } else if (size <= fMeta.fSize) { + try { + bipc::managed_shared_memory::size_type shrunkSize = size; + fLocalPtr = fManager.Segment().allocation_command(bipc::shrink_in_place, fMeta.fSize + 128, shrunkSize, fLocalPtr); + fMeta.fSize = size; + return true; + } catch (bipc::interprocess_exception& e) { + LOG(info) << "could not set used size: " << e.what(); + return false; + } + } else { + LOG(error) << "cannot set used size higher than original."; + return false; + } +} + +void Message::Copy(const fair::mq::Message& msg) +{ + if (fMeta.fHandle < 0) { + bipc::managed_shared_memory::handle_t otherHandle = static_cast(msg).fMeta.fHandle; + if (otherHandle) { + if (InitializeChunk(msg.GetSize())) { + std::memcpy(GetData(), msg.GetData(), msg.GetSize()); + } + } else { + LOG(error) << "copy fail: source message not initialized!"; + } + } else { + LOG(error) << "copy fail: target message already initialized!"; + } +} + +void Message::CloseMessage() +{ + if (fMeta.fHandle >= 0 && !fQueued) { + if (fMeta.fRegionId == 0) { + fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fMeta.fHandle)); + fMeta.fHandle = -1; + } else { + if (!fRegionPtr) { + fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId); + } + + if (fRegionPtr) { + fRegionPtr->ReleaseBlock({fMeta.fHandle, fMeta.fSize, fMeta.fHint}); + } else { + LOG(warn) << "region ack queue for id " << fMeta.fRegionId << " no longer exist. Not sending ack"; + } + } + } +} + +} +} +} diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h new file mode 100644 index 00000000..d7b79498 --- /dev/null +++ b/fairmq/shmem/Message.h @@ -0,0 +1,79 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIR_MQ_SHMEM_MESSAGE_H_ +#define FAIR_MQ_SHMEM_MESSAGE_H_ + +#include "Common.h" +#include "Manager.h" + +#include +#include + +#include + +#include // size_t +#include + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +class Socket; + +class Message final : public fair::mq::Message +{ + friend class Socket; + + public: + Message(Manager& manager, FairMQTransportFactory* factory = nullptr); + Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr); + Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr); + Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr); + + Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory = nullptr); + + Message(const Message&) = delete; + Message operator=(const Message&) = delete; + + void Rebuild() override; + void Rebuild(const size_t size) override; + void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; + + void* GetData() const override; + size_t GetSize() const override { return fMeta.fSize; } + + bool SetUsedSize(const size_t size) override; + + Transport GetType() const override { return fTransportType; } + + void Copy(const fair::mq::Message& msg) override; + + ~Message() override { CloseMessage(); } + + private: + Manager& fManager; + bool fQueued; + MetaHeader fMeta; + mutable Region* fRegionPtr; + mutable char* fLocalPtr; + + static std::atomic fInterrupted; + static Transport fTransportType; + + bool InitializeChunk(const size_t size); + void CloseMessage(); +}; + +} +} +} + +#endif /* FAIR_MQ_SHMEM_MESSAGE_H_ */ diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 1283ab72..8b63c7c5 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -6,8 +6,9 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include "Monitor.h" +#include "Common.h" + #include #include @@ -275,7 +276,7 @@ void Monitor::CheckSegment() unsigned int numDevices = 0; if (fInteractive) { - fair::mq::shmem::DeviceCounter* dc = managementSegment.find(bipc::unique_instance).first; + DeviceCounter* dc = managementSegment.find(bipc::unique_instance).first; if (dc) { numDevices = dc->fCount; } diff --git a/fairmq/shmem/FairMQPollerSHM.cxx b/fairmq/shmem/Poller.cxx similarity index 70% rename from fairmq/shmem/FairMQPollerSHM.cxx rename to fairmq/shmem/Poller.cxx index c314dc4d..a70bf2eb 100644 --- a/fairmq/shmem/FairMQPollerSHM.cxx +++ b/fairmq/shmem/Poller.cxx @@ -6,21 +6,29 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** - * FairMQPollerSHM.cxx + * Poller.cxx * * @since 2014-01-23 * @author A. Rybalchenko */ -#include "FairMQPollerSHM.h" -#include "FairMQSocketSHM.h" -#include "FairMQLogger.h" +#include "Poller.h" +#include "Socket.h" + +#include #include using namespace std; -FairMQPollerSHM::FairMQPollerSHM(const vector& channels) +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +Poller::Poller(const vector& channels) : fItems() , fNumItems(0) , fOffsetMap() @@ -30,19 +38,19 @@ FairMQPollerSHM::FairMQPollerSHM(const vector& channels) for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); + fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); fItems[i].fd = 0; fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[i], type); } } -FairMQPollerSHM::FairMQPollerSHM(const vector& channels) +Poller::Poller(const vector& channels) : fItems() , fNumItems(0) , fOffsetMap() @@ -52,19 +60,19 @@ FairMQPollerSHM::FairMQPollerSHM(const vector& channels) for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); + fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); fItems[i].fd = 0; fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[i], type); } } -FairMQPollerSHM::FairMQPollerSHM(const unordered_map>& channelsMap, const vector& channelList) +Poller::Poller(const unordered_map>& channelsMap, const vector& channelList) : fItems() , fNumItems(0) , fOffsetMap() @@ -89,19 +97,19 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); + fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); fItems[index].fd = 0; fItems[index].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[index], type); } } } - catch (const std::out_of_range& oor) + catch (const out_of_range& oor) { LOG(error) << "at least one of the provided channel keys for poller initialization is invalid"; LOG(error) << "out of range error: " << oor.what() << '\n'; @@ -109,7 +117,7 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map + +#include +#include #include #include -#include - -#include "FairMQPoller.h" -#include "FairMQChannel.h" -#include "FairMQTransportFactorySHM.h" - class FairMQChannel; -class FairMQPollerSHM final : public FairMQPoller +namespace fair +{ +namespace mq +{ +namespace shmem { - friend class FairMQChannel; - friend class FairMQTransportFactorySHM; +class Poller final : public fair::mq::Poller +{ public: - FairMQPollerSHM(const std::vector& channels); - FairMQPollerSHM(const std::vector& channels); - FairMQPollerSHM(const std::unordered_map>& channelsMap, const std::vector& channelList); + Poller(const std::vector& channels); + Poller(const std::vector& channels); + Poller(const std::unordered_map>& channelsMap, const std::vector& channelList); - FairMQPollerSHM(const FairMQPollerSHM&) = delete; - FairMQPollerSHM operator=(const FairMQPollerSHM&) = delete; + Poller(const Poller&) = delete; + Poller operator=(const Poller&) = delete; void SetItemEvents(zmq_pollitem_t& item, const int type); @@ -40,7 +43,7 @@ class FairMQPollerSHM final : public FairMQPoller bool CheckInput(const std::string& channelKey, const int index) override; bool CheckOutput(const std::string& channelKey, const int index) override; - ~FairMQPollerSHM() override; + ~Poller() override; private: zmq_pollitem_t* fItems; @@ -49,4 +52,8 @@ class FairMQPollerSHM final : public FairMQPoller std::unordered_map fOffsetMap; }; -#endif /* FAIRMQPOLLERSHM_H_ */ +} +} +} + +#endif /* FAIR_MQ_SHMEM_POLLER_H_ */ diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx index 395dcf5e..cd046dc1 100644 --- a/fairmq/shmem/Region.cxx +++ b/fairmq/shmem/Region.cxx @@ -6,17 +6,18 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include -#include +#include "Region.h" +#include "Common.h" +#include "Manager.h" + #include #include #include #include #include -#include +#include #include using namespace std; @@ -31,7 +32,7 @@ namespace mq namespace shmem { -Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */) +Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, RegionCallback callback, const string& path /* = "" */, int flags /* = 0 */) : fManager(manager) , fRemote(remote) , fStop(false) diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 924b40fe..ea2f83dd 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** -* FairMQShmManager.h +* Region.h * * @since 2016-04-08 * @author A. Rybalchenko @@ -15,10 +15,10 @@ #ifndef FAIR_MQ_SHMEM_REGION_H_ #define FAIR_MQ_SHMEM_REGION_H_ -#include "FairMQLogger.h" -#include "FairMQUnmanagedRegion.h" +#include "Common.h" -#include +#include +#include #include #include @@ -40,7 +40,7 @@ class Manager; struct Region { - Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0); + Region(Manager& manager, uint64_t id, uint64_t size, bool remote, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0); Region() = delete; @@ -75,7 +75,7 @@ struct Region std::thread fReceiveAcksWorker; std::thread fSendAcksWorker; - FairMQRegionCallback fCallback; + RegionCallback fCallback; }; } // namespace shmem diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/Socket.cxx similarity index 69% rename from fairmq/shmem/FairMQSocketSHM.cxx rename to fairmq/shmem/Socket.cxx index 6b9fec75..18eb8760 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/Socket.cxx @@ -5,12 +5,13 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "Common.h" -#include "FairMQSocketSHM.h" -#include "FairMQMessageSHM.h" -#include "FairMQUnmanagedRegionSHM.h" -#include "FairMQLogger.h" +#include "Common.h" +#include "Socket.h" +#include "Message.h" +#include "UnmanagedRegion.h" + +#include #include #include @@ -18,13 +19,31 @@ #include using namespace std; -using namespace fair::mq::shmem; -using namespace fair::mq; -atomic FairMQSocketSHM::fInterrupted(false); +namespace fair +{ +namespace mq +{ +namespace shmem +{ -FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac /*=nullptr*/) - : FairMQSocket{fac} +atomic Socket::fInterrupted(false); + +struct ZMsg +{ + ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); } + explicit ZMsg(size_t size) { int rc __attribute__((unused)) = zmq_msg_init_size(&fMsg, size); assert(rc == 0); } + ~ZMsg() { int rc __attribute__((unused)) = zmq_msg_close(&fMsg); assert(rc == 0); } + + void* Data() { return zmq_msg_data(&fMsg); } + size_t Size() { return zmq_msg_size(&fMsg); } + zmq_msg_t* Msg() { return &fMsg; } + + zmq_msg_t fMsg; +}; + +Socket::Socket(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac /*=nullptr*/) + : fair::mq::Socket{fac} , fSocket(nullptr) , fManager(manager) , fId(id + "." + name + "." + type) @@ -40,7 +59,7 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str if (fSocket == nullptr) { LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); + throw SocketError(tools::ToString("Failed creating socket ", fId, ", reason: ", zmq_strerror(errno))); } if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) { @@ -72,16 +91,14 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str if (type == "sub" || type == "pub") { LOG(error) << "PUB/SUB socket type is not supported for shared memory transport"; - throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport"); + throw SocketError("PUB/SUB socket type is not supported for shared memory transport"); } - LOG(debug) << "Created socket " << GetId(); } -bool FairMQSocketSHM::Bind(const string& address) +bool Socket::Bind(const string& address) { // LOG(info) << "binding socket " << fId << " on " << address; - if (zmq_bind(fSocket, address.c_str()) != 0) { if (errno == EADDRINUSE) { // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. @@ -93,19 +110,17 @@ bool FairMQSocketSHM::Bind(const string& address) return true; } -bool FairMQSocketSHM::Connect(const string& address) +bool Socket::Connect(const string& address) { // LOG(info) << "connecting socket " << fId << " on " << address; - if (zmq_connect(fSocket, address.c_str()) != 0) { LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); return false; } - return true; } -int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) +int Socket::Send(MessagePtr& msg, const int timeout) { int flags = 0; if (timeout == 0) { @@ -113,16 +128,17 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) } int elapsed = 0; + Message* shmMsg = static_cast(msg.get()); + ZMsg zmqMsg(sizeof(MetaHeader)); + std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader)); + while (true && !fInterrupted) { - int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); - if (nbytes == 0) { + int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); + if (nbytes > 0) { + shmMsg->fQueued = true; ++fMessagesTx; - return nbytes; - } else if (nbytes > 0) { - static_cast(msg.get())->fQueued = true; size_t size = msg->GetSize(); fBytesTx += size; - ++fMessagesTx; return size; } else if (zmq_errno() == EAGAIN) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { @@ -140,7 +156,7 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) LOG(info) << "terminating socket " << fId; return -1; } else { - LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } } @@ -148,7 +164,7 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) return -1; } -int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) +int Socket::Receive(MessagePtr& msg, const int timeout) { int flags = 0; if (timeout == 0) { @@ -156,28 +172,18 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) } int elapsed = 0; + ZMsg zmqMsg; + while (true) { - FairMQMessageSHM* shmMsg = static_cast(msg.get()); - zmq_msg_t* zmqMsg = shmMsg->GetMessage(); - int nbytes = zmq_msg_recv(zmqMsg, fSocket, flags); - if (nbytes == 0) { - ++fMessagesRx; - return nbytes; - } else if (nbytes > 0) { + Message* shmMsg = static_cast(msg.get()); + int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); + if (nbytes > 0) { // check for number of received messages. must be 1 - const auto numMsgs = nbytes / sizeof(MetaHeader); - if (numMsgs > 1) { - LOG(error) << "Receiving SHM multipart with a single message receive call"; - } + assert((nbytes / sizeof(MetaHeader)) == 1); - assert(numMsgs == 1); - - MetaHeader* hdr = static_cast(zmq_msg_data(zmqMsg)); + MetaHeader* hdr = static_cast(zmqMsg.Data()); size_t size = hdr->fSize; - shmMsg->fHandle = hdr->fHandle; - shmMsg->fSize = size; - shmMsg->fRegionId = hdr->fRegionId; - shmMsg->fHint = hdr->fHint; + shmMsg->fMeta = *hdr; fBytesRx += size; ++fMessagesRx; @@ -198,95 +204,74 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) LOG(info) << "terminating socket " << fId; return -1; } else { - LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } } } -int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeout) +int64_t Socket::Send(vector& msgVec, const int timeout) { int flags = 0; if (timeout == 0) { flags = ZMQ_DONTWAIT; } - const unsigned int vecSize = msgVec.size(); int elapsed = 0; // put it into zmq message - zmq_msg_t zmqMsg; - zmq_msg_init_size(&zmqMsg, vecSize * sizeof(MetaHeader)); + const unsigned int vecSize = msgVec.size(); + ZMsg zmqMsg(vecSize * sizeof(MetaHeader)); // prepare the message with shm metas - MetaHeader* metas = static_cast(zmq_msg_data(&zmqMsg)); + MetaHeader* metas = static_cast(zmqMsg.Data()); for (auto& msg : msgVec) { - zmq_msg_t* metaMsg = static_cast(msg.get())->GetMessage(); - if (zmq_msg_size(metaMsg) > 0) { - memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader)); - } else { - // if the message is empty, create meta data to reflect this - // (always creating meta data for empty messages would add an unnecessary allocation for the receive case, so we do it lazily here) - MetaHeader hdr; - hdr.fSize = 0; - hdr.fHandle = -1; - hdr.fRegionId = 0; - hdr.fHint = 0; - memcpy(metas++, &hdr, sizeof(MetaHeader)); - } + Message* shmMsg = static_cast(msg.get()); + std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader)); } while (!fInterrupted) { int64_t totalSize = 0; - int nbytes = zmq_msg_send(&zmqMsg, fSocket, flags); - if (nbytes == 0) { - zmq_msg_close(&zmqMsg); - return nbytes; - } else if (nbytes > 0) { + int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); + if (nbytes > 0) { assert(static_cast(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing for (auto& msg : msgVec) { - FairMQMessageSHM* shmMsg = static_cast(msg.get()); + Message* shmMsg = static_cast(msg.get()); shmMsg->fQueued = true; - totalSize += shmMsg->fSize; + totalSize += shmMsg->fMeta.fSize; } // store statistics on how many messages have been sent fMessagesTx++; fBytesTx += totalSize; - zmq_msg_close(&zmqMsg); return totalSize; } else if (zmq_errno() == EAGAIN) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) { - zmq_msg_close(&zmqMsg); return -2; } } continue; } else { - zmq_msg_close(&zmqMsg); return -2; } } else if (zmq_errno() == ETERM) { - zmq_msg_close(&zmqMsg); LOG(info) << "terminating socket " << fId; return -1; } else { - zmq_msg_close(&zmqMsg); - LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } } - zmq_msg_close(&zmqMsg); return -1; } -int64_t FairMQSocketSHM::Receive(vector& msgVec, const int timeout) +int64_t Socket::Receive(vector& msgVec, const int timeout) { int flags = 0; if (timeout == 0) { @@ -294,18 +279,14 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim } int elapsed = 0; - zmq_msg_t zmqMsg; - zmq_msg_init(&zmqMsg); + ZMsg zmqMsg; while (!fInterrupted) { int64_t totalSize = 0; - int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags); - if (nbytes == 0) { - zmq_msg_close(&zmqMsg); - return 0; - } else if (nbytes > 0) { - MetaHeader* hdrVec = static_cast(zmq_msg_data(&zmqMsg)); - const auto hdrVecSize = zmq_msg_size(&zmqMsg); + int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); + if (nbytes > 0) { + MetaHeader* hdrVec = static_cast(zmqMsg.Data()); + const auto hdrVecSize = zmqMsg.Size(); assert(hdrVecSize > 0); assert(hdrVecSize % sizeof(MetaHeader) == 0); @@ -314,12 +295,9 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim msgVec.reserve(numMessages); for (size_t m = 0; m < numMessages; m++) { - // get the meta data pointer - MetaHeader* hdr = &hdrVec[m]; - // create new message (part) - msgVec.emplace_back(tools::make_unique(fManager, hdr, GetTransport())); - FairMQMessageSHM* shmMsg = static_cast(msgVec.back().get()); + msgVec.emplace_back(tools::make_unique(fManager, hdrVec[m], GetTransport())); + Message* shmMsg = static_cast(msgVec.back().get()); totalSize += shmMsg->GetSize(); } @@ -327,34 +305,29 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim fMessagesRx++; fBytesRx += totalSize; - zmq_msg_close(&zmqMsg); return totalSize; } else if (zmq_errno() == EAGAIN) { if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) { - zmq_msg_close(&zmqMsg); return -2; } } continue; } else { - zmq_msg_close(&zmqMsg); return -2; } } else { - zmq_msg_close(&zmqMsg); - LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); + LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes; return nbytes; } } - zmq_msg_close(&zmqMsg); return -1; } -void FairMQSocketSHM::Close() +void Socket::Close() { // LOG(debug) << "Closing socket " << fId; @@ -369,42 +342,42 @@ void FairMQSocketSHM::Close() fSocket = nullptr; } -void FairMQSocketSHM::Interrupt() +void Socket::Interrupt() { Manager::Interrupt(); - FairMQMessageSHM::fInterrupted = true; + Message::fInterrupted = true; fInterrupted = true; } -void FairMQSocketSHM::Resume() +void Socket::Resume() { Manager::Resume(); - FairMQMessageSHM::fInterrupted = false; + Message::fInterrupted = false; fInterrupted = false; } -void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize) +void Socket::SetOption(const string& option, const void* value, size_t valueSize) { if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); } } -void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* valueSize) +void Socket::GetOption(const string& option, void* value, size_t* valueSize) { if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno); } } -void FairMQSocketSHM::SetLinger(const int value) +void Socket::SetLinger(const int value) { if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); } } -int FairMQSocketSHM::GetLinger() const +int Socket::GetLinger() const { int value = 0; size_t valueSize = sizeof(value); @@ -414,14 +387,14 @@ int FairMQSocketSHM::GetLinger() const return value; } -void FairMQSocketSHM::SetSndBufSize(const int value) +void Socket::SetSndBufSize(const int value) { if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); } } -int FairMQSocketSHM::GetSndBufSize() const +int Socket::GetSndBufSize() const { int value = 0; size_t valueSize = sizeof(value); @@ -431,14 +404,14 @@ int FairMQSocketSHM::GetSndBufSize() const return value; } -void FairMQSocketSHM::SetRcvBufSize(const int value) +void Socket::SetRcvBufSize(const int value) { if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); } } -int FairMQSocketSHM::GetRcvBufSize() const +int Socket::GetRcvBufSize() const { int value = 0; size_t valueSize = sizeof(value); @@ -448,14 +421,14 @@ int FairMQSocketSHM::GetRcvBufSize() const return value; } -void FairMQSocketSHM::SetSndKernelSize(const int value) +void Socket::SetSndKernelSize(const int value) { if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); } } -int FairMQSocketSHM::GetSndKernelSize() const +int Socket::GetSndKernelSize() const { int value = 0; size_t valueSize = sizeof(value); @@ -465,14 +438,14 @@ int FairMQSocketSHM::GetSndKernelSize() const return value; } -void FairMQSocketSHM::SetRcvKernelSize(const int value) +void Socket::SetRcvKernelSize(const int value) { if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); } } -int FairMQSocketSHM::GetRcvKernelSize() const +int Socket::GetRcvKernelSize() const { int value = 0; size_t valueSize = sizeof(value); @@ -482,7 +455,7 @@ int FairMQSocketSHM::GetRcvKernelSize() const return value; } -int FairMQSocketSHM::GetConstant(const string& constant) +int Socket::GetConstant(const string& constant) { if (constant == "") return 0; if (constant == "sub") return ZMQ_SUB; @@ -510,3 +483,7 @@ int FairMQSocketSHM::GetConstant(const string& constant) return -1; } + +} +} +} diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/Socket.h similarity index 68% rename from fairmq/shmem/FairMQSocketSHM.h rename to fairmq/shmem/Socket.h index df9d7c55..76940f6d 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/Socket.h @@ -5,34 +5,42 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQSOCKETSHM_H_ -#define FAIRMQSOCKETSHM_H_ +#ifndef FAIR_MQ_SHMEM_SOCKET_H_ +#define FAIR_MQ_SHMEM_SOCKET_H_ -#include "FairMQSocket.h" -#include "FairMQMessage.h" +#include "Manager.h" -#include +#include +#include #include #include // unique_ptr + class FairMQTransportFactory; -class FairMQSocketSHM final : public FairMQSocket +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +class Socket final : public fair::mq::Socket { public: - FairMQSocketSHM(fair::mq::shmem::Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* fac = nullptr); - FairMQSocketSHM(const FairMQSocketSHM&) = delete; - FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete; + Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* fac = nullptr); + Socket(const Socket&) = delete; + Socket operator=(const Socket&) = delete; - std::string GetId() override { return fId; } + std::string GetId() const override { return fId; } bool Bind(const std::string& address) override; bool Connect(const std::string& address) override; - int Send(FairMQMessagePtr& msg, const int timeout = -1) override; - int Receive(FairMQMessagePtr& msg, const int timeout = -1) override; - int64_t Send(std::vector>& msgVec, const int timeout = -1) override; - int64_t Receive(std::vector>& msgVec, const int timeout = -1) override; + int Send(MessagePtr& msg, const int timeout = -1) override; + int Receive(MessagePtr& msg, const int timeout = -1) override; + int64_t Send(std::vector& msgVec, const int timeout = -1) override; + int64_t Receive(std::vector& msgVec, const int timeout = -1) override; void* GetSocket() const { return fSocket; } @@ -62,11 +70,11 @@ class FairMQSocketSHM final : public FairMQSocket static int GetConstant(const std::string& constant); - ~FairMQSocketSHM() override { Close(); } + ~Socket() override { Close(); } private: void* fSocket; - fair::mq::shmem::Manager& fManager; + Manager& fManager; std::string fId; std::atomic fBytesTx; std::atomic fBytesRx; @@ -79,4 +87,8 @@ class FairMQSocketSHM final : public FairMQSocket int fRcvTimeout; }; -#endif /* FAIRMQSOCKETSHM_H_ */ +} +} +} + +#endif /* FAIR_MQ_SHMEM_SOCKET_H_ */ diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/TransportFactory.cxx similarity index 59% rename from fairmq/shmem/FairMQTransportFactorySHM.cxx rename to fairmq/shmem/TransportFactory.cxx index 6146dae9..13129e98 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/TransportFactory.cxx @@ -6,9 +6,9 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "FairMQLogger.h" -#include "FairMQTransportFactorySHM.h" +#include "TransportFactory.h" +#include #include #include @@ -26,15 +26,21 @@ #include // getenv using namespace std; -using namespace fair::mq::shmem; namespace bpt = ::boost::posix_time; namespace bipc = ::boost::interprocess; -fair::mq::Transport FairMQTransportFactorySHM::fTransportType = fair::mq::Transport::SHM; +namespace fair +{ +namespace mq +{ +namespace shmem +{ -FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fair::mq::ProgOptions* config) - : FairMQTransportFactory(id) +Transport TransportFactory::fTransportType = Transport::SHM; + +TransportFactory::TransportFactory(const string& id, const ProgOptions* config) + : fair::mq::TransportFactory(id) , fDeviceId(id) , fShmId() , fZMQContext(nullptr) @@ -49,7 +55,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fai fZMQContext = zmq_ctx_new(); if (!fZMQContext) { - throw runtime_error(fair::mq::tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); + throw runtime_error(tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); } int numIoThreads = 1; @@ -62,7 +68,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fai segmentSize = config->GetProperty("shm-segment-size", segmentSize); autolaunchMonitor = config->GetProperty("shm-monitor", autolaunchMonitor); } else { - LOG(debug) << "fair::mq::ProgOptions not available! Using defaults."; + LOG(debug) << "ProgOptions not available! Using defaults."; } fShmId = buildShmIdFromSessionIdAndUserId(sessionName); @@ -81,18 +87,18 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const fai Manager::StartMonitor(fShmId); } - fManager = fair::mq::tools::make_unique(fShmId, segmentSize); + fManager = tools::make_unique(fShmId, segmentSize); } catch (bipc::interprocess_exception& e) { LOG(error) << "Could not initialize shared memory transport: " << e.what(); - throw runtime_error(fair::mq::tools::ToString("Could not initialize shared memory transport: ", e.what())); + throw runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what())); } fSendHeartbeats = true; - fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this); + fHeartbeatThread = thread(&TransportFactory::SendHeartbeats, this); } -void FairMQTransportFactorySHM::SendHeartbeats() +void TransportFactory::SendHeartbeats() { string controlQueueName("fmq_" + fShmId + "_cq"); while (fSendHeartbeats) { @@ -111,58 +117,58 @@ void FairMQTransportFactorySHM::SendHeartbeats() } } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() +MessagePtr TransportFactory::CreateMessage() { - return unique_ptr(new FairMQMessageSHM(*fManager, this)); + return tools::make_unique(*fManager, this); } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(const size_t size) +MessagePtr TransportFactory::CreateMessage(const size_t size) { - return unique_ptr(new FairMQMessageSHM(*fManager, size, this)); + return tools::make_unique(*fManager, size, this); } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) +MessagePtr TransportFactory::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) { - return unique_ptr(new FairMQMessageSHM(*fManager, data, size, ffn, hint, this)); + return tools::make_unique(*fManager, data, size, ffn, hint, this); } -FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) +MessagePtr TransportFactory::CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint) { - return unique_ptr(new FairMQMessageSHM(*fManager, region, data, size, hint, this)); + return tools::make_unique(*fManager, region, data, size, hint, this); } -FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) +SocketPtr TransportFactory::CreateSocket(const string& type, const string& name) { assert(fZMQContext); - return unique_ptr(new FairMQSocketSHM(*fManager, type, name, GetId(), fZMQContext, this)); + return tools::make_unique(*fManager, type, name, GetId(), fZMQContext, this); } -FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const +PollerPtr TransportFactory::CreatePoller(const vector& channels) const { - return unique_ptr(new FairMQPollerSHM(channels)); + return tools::make_unique(channels); } -FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const +PollerPtr TransportFactory::CreatePoller(const vector& channels) const { - return unique_ptr(new FairMQPollerSHM(channels)); + return tools::make_unique(channels); } -FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const +PollerPtr TransportFactory::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const { - return unique_ptr(new FairMQPollerSHM(channelsMap, channelList)); + return tools::make_unique(channelsMap, channelList); } -FairMQUnmanagedRegionPtr FairMQTransportFactorySHM::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const +UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const { - return unique_ptr(new FairMQUnmanagedRegionSHM(*fManager, size, callback, path, flags)); + return tools::make_unique(*fManager, size, callback, path, flags); } -fair::mq::Transport FairMQTransportFactorySHM::GetType() const +Transport TransportFactory::GetType() const { return fTransportType; } -FairMQTransportFactorySHM::~FairMQTransportFactorySHM() +TransportFactory::~TransportFactory() { LOG(debug) << "Destroying Shared Memory transport..."; fSendHeartbeats = false; @@ -181,3 +187,7 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() LOG(error) << "context not available for shutdown"; } } + +} // namespace shmem +} // namespace mq +} // namespace fair diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h new file mode 100644 index 00000000..6e20e905 --- /dev/null +++ b/fairmq/shmem/TransportFactory.h @@ -0,0 +1,78 @@ +/******************************************************************************** + * Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ +#define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ + +#include "Manager.h" +#include "Common.h" +#include "Message.h" +#include "Socket.h" +#include "Poller.h" +#include "UnmanagedRegion.h" + +#include +#include + +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +class TransportFactory final : public fair::mq::TransportFactory +{ + public: + TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr); + TransportFactory(const TransportFactory&) = delete; + TransportFactory operator=(const TransportFactory&) = delete; + + MessagePtr CreateMessage() override; + MessagePtr CreateMessage(const size_t size) override; + MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; + MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override; + + SocketPtr CreateSocket(const std::string& type, const std::string& name) override; + + PollerPtr CreatePoller(const std::vector& channels) const override; + PollerPtr CreatePoller(const std::vector& channels) const override; + PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; + + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; + + Transport GetType() const override; + + void Interrupt() override { Socket::Interrupt(); } + void Resume() override { Socket::Resume(); } + void Reset() override {} + + ~TransportFactory() override; + + private: + void SendHeartbeats(); + + static Transport fTransportType; + std::string fDeviceId; + std::string fShmId; + void* fZMQContext; + std::unique_ptr fManager; + std::thread fHeartbeatThread; + std::atomic fSendHeartbeats; +}; + +} // namespace shmem +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_ */ diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx b/fairmq/shmem/UnmanagedRegion.cxx similarity index 87% rename from fairmq/shmem/FairMQUnmanagedRegionSHM.cxx rename to fairmq/shmem/UnmanagedRegion.cxx index 1a45147b..88a7df8d 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx +++ b/fairmq/shmem/UnmanagedRegion.cxx @@ -7,15 +7,20 @@ ********************************************************************************/ #include "Common.h" - -#include "FairMQUnmanagedRegionSHM.h" +#include "UnmanagedRegion.h" using namespace std; -using namespace fair::mq::shmem; namespace bipc = ::boost::interprocess; -FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +UnmanagedRegion::UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) : fManager(manager) , fRegion(nullptr) , fRegionId(0) @@ -41,3 +46,7 @@ FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_ throw; } } + +} +} +} diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.h b/fairmq/shmem/UnmanagedRegion.h similarity index 60% rename from fairmq/shmem/FairMQUnmanagedRegionSHM.h rename to fairmq/shmem/UnmanagedRegion.h index 8dea7523..cb583373 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -6,13 +6,13 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQUNMANAGEDREGIONSHM_H_ -#define FAIRMQUNMANAGEDREGIONSHM_H_ +#ifndef FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ +#define FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ -#include +#include "Manager.h" -#include "FairMQUnmanagedRegion.h" -#include "FairMQLogger.h" +#include +#include #include #include @@ -20,23 +20,37 @@ #include // size_t #include -class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion +namespace fair { - friend class FairMQSocketSHM; - friend class FairMQMessageSHM; +namespace mq +{ +namespace shmem +{ + +class Message; +class Socket; + +class UnmanagedRegion final : public fair::mq::UnmanagedRegion +{ + friend class Message; + friend class Socket; public: - FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0); + UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0); void* GetData() const override { return fRegion->get_address(); } size_t GetSize() const override { return fRegion->get_size(); } - ~FairMQUnmanagedRegionSHM() override { fManager.RemoveRegion(fRegionId); } + ~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); } private: - fair::mq::shmem::Manager& fManager; + Manager& fManager; boost::interprocess::mapped_region* fRegion; uint64_t fRegionId; }; -#endif /* FAIRMQUNMANAGEDREGIONSHM_H_ */ \ No newline at end of file +} +} +} + +#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */ \ No newline at end of file diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index e1b3b94e..f7302f81 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -5,8 +5,8 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include "Monitor.h" +#include "Common.h" #include diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index e775176e..4ca54f6a 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -74,11 +74,6 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s LOG(debug) << "Created socket " << GetId(); } -string FairMQSocketZMQ::GetId() -{ - return fId; -} - bool FairMQSocketZMQ::Bind(const string& address) { // LOG(info) << "bind socket " << fId << " on " << address; diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index ba055878..ba84d2aa 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -24,7 +24,7 @@ class FairMQSocketZMQ final : public FairMQSocket FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; - std::string GetId() override; + std::string GetId() const override { return fId; } bool Bind(const std::string& address) override; bool Connect(const std::string& address) override;