diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index d2dbd649..ab942b7e 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -60,6 +60,27 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, const size_t size, FairMQTr InitializeChunk(size); } +FairMQMessageSHM::FairMQMessageSHM(Manager& manager, MetaHeader* hdr, FairMQTransportFactory* factory) + : FairMQMessage{factory} + , fManager(manager) + , fMessage() + , fQueued(false) + , fMetaCreated(false) + , fRegionId(hdr->fRegionId) + , fRegionPtr(nullptr) + , fHandle(hdr->fHandle) + , fSize(hdr->fSize) + , fHint(hdr->fHint) + , fLocalPtr(nullptr) +{ + if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) { + LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); + } + // fill the zmq buffer with the delivered meta data + memcpy(zmq_msg_data(&fMessage), hdr, sizeof(MetaHeader)); + fMetaCreated = true; +} + FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) : FairMQMessage{factory} , fManager(manager) diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index 505fadc6..315d5240 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -21,6 +21,16 @@ #include class FairMQSocketSHM; +namespace fair +{ +namespace mq +{ +namespace shmem +{ +class MetaHeader; +} +} +} class FairMQMessageSHM final : public FairMQMessage { @@ -32,6 +42,8 @@ class FairMQMessageSHM final : public FairMQMessage FairMQMessageSHM(fair::mq::shmem::Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr); FairMQMessageSHM(fair::mq::shmem::Manager& manager, FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr); + FairMQMessageSHM(fair::mq::shmem::Manager& manager, fair::mq::shmem::MetaHeader* hdr, FairMQTransportFactory* factory = nullptr); + FairMQMessageSHM(const FairMQMessageSHM&) = delete; FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete; diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index b6ce2476..6b9fec75 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -213,10 +213,6 @@ int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeou const unsigned int vecSize = msgVec.size(); int elapsed = 0; - if (vecSize == 1) { - return Send(msgVec.back(), timeout); - } - // put it into zmq message zmq_msg_t zmqMsg; zmq_msg_init_size(&zmqMsg, vecSize * sizeof(MetaHeader)); @@ -322,16 +318,8 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim MetaHeader* hdr = &hdrVec[m]; // create new message (part) - msgVec.emplace_back(tools::make_unique(fManager, GetTransport())); + msgVec.emplace_back(tools::make_unique(fManager, hdr, GetTransport())); FairMQMessageSHM* shmMsg = static_cast(msgVec.back().get()); - // fill the zmq buffer with the delivered meta data - memcpy(zmq_msg_data(shmMsg->GetMessage()), hdr, sizeof(MetaHeader)); - // set the message members with the meta data - shmMsg->fHandle = hdr->fHandle; - shmMsg->fSize = hdr->fSize; - shmMsg->fRegionId = hdr->fRegionId; - shmMsg->fHint = hdr->fHint; - totalSize += shmMsg->GetSize(); }