diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index 0739ff81..a59818f3 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -211,11 +211,31 @@ class Message final : public fair::mq::Message return true; } else if (newSize <= fMeta.fSize) { try { - fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId); - fMeta.fSize = newSize; - return true; + try { + fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId); + fMeta.fSize = newSize; + return true; + } catch (boost::interprocess::bad_alloc& e) { + // if shrinking fails (can happen due to boost alignment requirements): + // unused size >= 1000000 bytes: reallocate fully + // unused size < 1000000 bytes: simply reset the size and keep the rest of the buffer until message destruction + if (fMeta.fSize - newSize >= 1000000) { + char* newPtr = fManager.Allocate(newSize, fAlignment); + if (newPtr) { + std::memcpy(newPtr, fLocalPtr, newSize); + fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId); + fLocalPtr = newPtr; + fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId); + } else { + LOG(debug) << "could not set used size: " << e.what(); + return false; + } + } + fMeta.fSize = newSize; + return true; + } } catch (boost::interprocess::interprocess_exception& e) { - LOG(info) << "could not set used size: " << e.what(); + LOG(debug) << "could not set used size: " << e.what(); return false; } } else { @@ -257,7 +277,7 @@ class Message final : public fair::mq::Message Manager& fManager; bool fQueued; MetaHeader fMeta; - size_t fAlignment; // TODO: put this to debug mode + size_t fAlignment; mutable Region* fRegionPtr; mutable char* fLocalPtr; diff --git a/test/message/_message.cxx b/test/message/_message.cxx index 2eb2c5a1..73c4e53e 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -40,34 +40,34 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address) pull.Connect(address); { - FairMQMessagePtr outMsg(push.NewMessage(1000)); - ASSERT_EQ(outMsg->GetSize(), 1000); - memcpy(outMsg->GetData(), "ABC", 3); - ASSERT_EQ(outMsg->SetUsedSize(500), true); - ASSERT_EQ(outMsg->SetUsedSize(500), true); - ASSERT_EQ(outMsg->SetUsedSize(700), false); - ASSERT_EQ(outMsg->GetSize(), 500); + FairMQMessagePtr outMsg(push.NewMessage(6)); + ASSERT_EQ(outMsg->GetSize(), 6); + memcpy(outMsg->GetData(), "ABCDEF", 6); + ASSERT_EQ(outMsg->SetUsedSize(5), true); + ASSERT_EQ(outMsg->SetUsedSize(5), true); + ASSERT_EQ(outMsg->SetUsedSize(7), false); + ASSERT_EQ(outMsg->GetSize(), 5); // check if the data is still intact ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); - ASSERT_EQ(outMsg->SetUsedSize(250), true); - ASSERT_EQ(outMsg->GetSize(), 250); + ASSERT_EQ(static_cast(outMsg->GetData())[3], 'D'); + ASSERT_EQ(static_cast(outMsg->GetData())[4], 'E'); + ASSERT_EQ(outMsg->SetUsedSize(2), true); + ASSERT_EQ(outMsg->GetSize(), 2); ASSERT_EQ(static_cast(outMsg->GetData())[0], 'A'); ASSERT_EQ(static_cast(outMsg->GetData())[1], 'B'); - ASSERT_EQ(static_cast(outMsg->GetData())[2], 'C'); FairMQMessagePtr msgCopy(push.NewMessage()); msgCopy->Copy(*outMsg); - ASSERT_EQ(msgCopy->GetSize(), 250); + ASSERT_EQ(msgCopy->GetSize(), 2); - ASSERT_EQ(push.Send(outMsg), 250); + ASSERT_EQ(push.Send(outMsg), 2); FairMQMessagePtr inMsg(pull.NewMessage()); - ASSERT_EQ(pull.Receive(inMsg), 250); - ASSERT_EQ(inMsg->GetSize(), 250); + ASSERT_EQ(pull.Receive(inMsg), 2); + ASSERT_EQ(inMsg->GetSize(), 2); ASSERT_EQ(static_cast(inMsg->GetData())[0], 'A'); ASSERT_EQ(static_cast(inMsg->GetData())[1], 'B'); - ASSERT_EQ(static_cast(inMsg->GetData())[2], 'C'); } {