diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index c8ef71db..f6221ae9 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -5,15 +5,16 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include - -#include +#include +#include #include "FairMQMessageSHM.h" #include "FairMQUnmanagedRegionSHM.h" #include "FairMQLogger.h" -#include "Common.h" + +#include + +#include using namespace std; using namespace fair::mq::shmem; @@ -30,6 +31,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager) , fQueued(false) , fMetaCreated(false) , fRegionId(0) + , fRegionPtr(nullptr) , fHandle(-1) , fSize(0) , fLocalPtr(nullptr) @@ -47,6 +49,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size) , fQueued(false) , fMetaCreated(false) , fRegionId(0) + , fRegionPtr(nullptr) , fHandle(-1) , fSize(0) , fLocalPtr(nullptr) @@ -60,6 +63,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si , fQueued(false) , fMetaCreated(false) , fRegionId(0) + , fRegionPtr(nullptr) , fHandle(-1) , fSize(0) , fLocalPtr(nullptr) @@ -84,6 +88,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r , fQueued(false) , fMetaCreated(false) , fRegionId(static_cast(region.get())->fRegionId) + , fRegionPtr(nullptr) , fHandle(-1) , fSize(size) , fLocalPtr(data) @@ -209,10 +214,10 @@ void* FairMQMessageSHM::GetData() } else { - boost::interprocess::mapped_region* region = fManager.GetRemoteRegion(fRegionId); - if (region) + fRegionPtr = fManager.GetRemoteRegion(fRegionId); + if (fRegionPtr) { - fLocalPtr = reinterpret_cast(region->get_address()) + fHandle; + fLocalPtr = reinterpret_cast(fRegionPtr->fRegion.get_address()) + fHandle; } else { @@ -290,10 +295,13 @@ void FairMQMessageSHM::CloseMessage() do { auto sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200); - bipc::message_queue* q = fManager.GetRegionQueue(fRegionId); - if (q) + if (!fRegionPtr) { - if (q->timed_send(&block, sizeof(RegionBlock), 0, sndTill)) + fRegionPtr = fManager.GetRemoteRegion(fRegionId); + } + if (fRegionPtr) + { + if (fRegionPtr->fQueue->timed_send(&block, sizeof(RegionBlock), 0, sndTill)) { success = true; } diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index be11f7fc..b742b001 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -8,17 +8,17 @@ #ifndef FAIRMQMESSAGESHM_H_ #define FAIRMQMESSAGESHM_H_ -#include // size_t -#include -#include +#include + +#include "FairMQMessage.h" +#include "FairMQUnmanagedRegion.h" #include #include -#include "FairMQMessage.h" -#include "FairMQUnmanagedRegion.h" -#include "Manager.h" +#include // size_t +#include class FairMQMessageSHM : public FairMQMessage { @@ -61,6 +61,7 @@ class FairMQMessageSHM : public FairMQMessage static std::atomic fInterrupted; static FairMQ::Transport fTransportType; uint64_t fRegionId; + fair::mq::shmem::Region* fRegionPtr; boost::interprocess::managed_shared_memory::handle_t fHandle; size_t fSize; void* fLocalPtr; diff --git a/fairmq/shmem/FairMQPollerSHM.cxx b/fairmq/shmem/FairMQPollerSHM.cxx index fd2672fe..a138c6b4 100644 --- a/fairmq/shmem/FairMQPollerSHM.cxx +++ b/fairmq/shmem/FairMQPollerSHM.cxx @@ -12,11 +12,11 @@ * @author A. Rybalchenko */ -#include - #include "FairMQPollerSHM.h" #include "FairMQLogger.h" +#include + using namespace std; FairMQPollerSHM::FairMQPollerSHM(const vector& channels) diff --git a/fairmq/shmem/FairMQPollerSHM.h b/fairmq/shmem/FairMQPollerSHM.h index 69e34a67..1d3f9526 100644 --- a/fairmq/shmem/FairMQPollerSHM.h +++ b/fairmq/shmem/FairMQPollerSHM.h @@ -10,7 +10,6 @@ #include #include -#include #include diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 8b2230e4..dc5062e9 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -5,15 +5,14 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include - -#include +#include #include "FairMQSocketSHM.h" #include "FairMQMessageSHM.h" #include "FairMQUnmanagedRegionSHM.h" #include "FairMQLogger.h" -#include "Common.h" + +#include using namespace std; using namespace fair::mq::shmem; diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index 10ed4737..876ca171 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -8,13 +8,13 @@ #ifndef FAIRMQSOCKETSHM_H_ #define FAIRMQSOCKETSHM_H_ -#include - -#include // unique_ptr - #include "FairMQSocket.h" #include "FairMQMessage.h" -#include "Manager.h" + +#include + +#include +#include // unique_ptr class FairMQSocketSHM : public FairMQSocket { diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 392a26a5..bbbf502a 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -9,12 +9,13 @@ #ifndef FAIRMQTRANSPORTFACTORYSHM_H_ #define FAIRMQTRANSPORTFACTORYSHM_H_ +#include +#include + #include "FairMQTransportFactory.h" -#include "Manager.h" #include "FairMQMessageSHM.h" #include "FairMQSocketSHM.h" #include "FairMQPollerSHM.h" -#include "Common.h" #include "FairMQUnmanagedRegionSHM.h" #include diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx index d6169c98..ab43ef94 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx @@ -6,8 +6,9 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ +#include + #include "FairMQUnmanagedRegionSHM.h" -#include "Common.h" using namespace std; using namespace fair::mq::shmem; diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.h b/fairmq/shmem/FairMQUnmanagedRegionSHM.h index a87afd7e..89591faf 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.h +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.h @@ -9,18 +9,15 @@ #ifndef FAIRMQUNMANAGEDREGIONSHM_H_ #define FAIRMQUNMANAGEDREGIONSHM_H_ +#include + #include "FairMQUnmanagedRegion.h" #include "FairMQLogger.h" -#include "Manager.h" #include #include #include // size_t -#include -#include -#include -#include class FairMQUnmanagedRegionSHM : public FairMQUnmanagedRegion { diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx index afcc0dd1..3ba5a25d 100644 --- a/fairmq/shmem/Manager.cxx +++ b/fairmq/shmem/Manager.cxx @@ -6,8 +6,8 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ - #include "Manager.h" - #include "Common.h" +#include +#include namespace fair { @@ -71,20 +71,20 @@ bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, } } -bipc::mapped_region* Manager::GetRemoteRegion(const uint64_t id) +Region* Manager::GetRemoteRegion(const uint64_t id) { // remote region could actually be a local one if a message originates from this device (has been sent out and returned) auto it = fRegions.find(id); if (it != fRegions.end()) { - return &(it->second.fRegion); + return &(it->second); } else { try { auto r = fRegions.emplace(id, Region{*this, id, 0, true, nullptr}); - return &(r.first->second.fRegion); + return &(r.first->second); } catch (bipc::interprocess_exception& e) { @@ -100,18 +100,6 @@ void Manager::RemoveRegion(const uint64_t id) fRegions.erase(id); } -bipc::message_queue* Manager::GetRegionQueue(const uint64_t id) -{ - try - { - return fRegions.at(id).fQueue.get(); - } - catch (out_of_range& oor) - { - return nullptr; - } -} - void Manager::RemoveSegment() { if (bipc::shared_memory_object::remove(fSegmentName.c_str())) diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index c81a4c06..6349aef7 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -15,23 +15,18 @@ #ifndef FAIR_MQ_SHMEM_MANAGER_H_ #define FAIR_MQ_SHMEM_MANAGER_H_ +#include +#include +#include + #include "FairMQLogger.h" #include "FairMQMessage.h" -#include "fairmq/Tools.h" -#include "Region.h" -#include "Common.h" #include #include -#include -#include #include -#include -#include #include -#include -#include namespace fair { @@ -58,11 +53,9 @@ class Manager void Resume(); boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, FairMQRegionCallback callback); - boost::interprocess::mapped_region* GetRemoteRegion(const uint64_t id); + Region* GetRemoteRegion(const uint64_t id); void RemoveRegion(const uint64_t id); - boost::interprocess::message_queue* GetRegionQueue(const uint64_t id); - void RemoveSegment(); boost::interprocess::managed_shared_memory& ManagementSegment(); diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 9c054ecf..52e02b89 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -6,8 +6,8 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "Monitor.h" -#include "Common.h" +#include +#include #include #include diff --git a/fairmq/shmem/Region.cxx b/fairmq/shmem/Region.cxx index 4064f279..e0df1c83 100644 --- a/fairmq/shmem/Region.cxx +++ b/fairmq/shmem/Region.cxx @@ -6,14 +6,12 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "Region.h" -#include "Common.h" -#include "Manager.h" +#include +#include +#include #include -#include - namespace fair { namespace mq diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 59f20fc8..81bb7a56 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -17,13 +17,13 @@ #include "FairMQLogger.h" #include "FairMQUnmanagedRegion.h" -#include "fairmq/Tools.h" + +#include #include #include #include -#include #include namespace fair diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index 08897450..c3f92610 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -5,7 +5,7 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "Monitor.h" +#include #include