mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
shm: handle shrink failure gracefully
This commit is contained in:
parent
72f319e276
commit
0520ee8acd
|
@ -211,11 +211,31 @@ class Message final : public fair::mq::Message
|
||||||
return true;
|
return true;
|
||||||
} else if (newSize <= fMeta.fSize) {
|
} else if (newSize <= fMeta.fSize) {
|
||||||
try {
|
try {
|
||||||
fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
|
try {
|
||||||
fMeta.fSize = newSize;
|
fLocalPtr = fManager.ShrinkInPlace(newSize, fLocalPtr, fMeta.fSegmentId);
|
||||||
return true;
|
fMeta.fSize = newSize;
|
||||||
|
return true;
|
||||||
|
} catch (boost::interprocess::bad_alloc& e) {
|
||||||
|
// if shrinking fails (can happen due to boost alignment requirements):
|
||||||
|
// unused size >= 1000000 bytes: reallocate fully
|
||||||
|
// unused size < 1000000 bytes: simply reset the size and keep the rest of the buffer until message destruction
|
||||||
|
if (fMeta.fSize - newSize >= 1000000) {
|
||||||
|
char* newPtr = fManager.Allocate(newSize, fAlignment);
|
||||||
|
if (newPtr) {
|
||||||
|
std::memcpy(newPtr, fLocalPtr, newSize);
|
||||||
|
fManager.Deallocate(fMeta.fHandle, fMeta.fSegmentId);
|
||||||
|
fLocalPtr = newPtr;
|
||||||
|
fMeta.fHandle = fManager.GetHandleFromAddress(fLocalPtr, fMeta.fSegmentId);
|
||||||
|
} else {
|
||||||
|
LOG(debug) << "could not set used size: " << e.what();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fMeta.fSize = newSize;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
} catch (boost::interprocess::interprocess_exception& e) {
|
} catch (boost::interprocess::interprocess_exception& e) {
|
||||||
LOG(info) << "could not set used size: " << e.what();
|
LOG(debug) << "could not set used size: " << e.what();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -257,7 +277,7 @@ class Message final : public fair::mq::Message
|
||||||
Manager& fManager;
|
Manager& fManager;
|
||||||
bool fQueued;
|
bool fQueued;
|
||||||
MetaHeader fMeta;
|
MetaHeader fMeta;
|
||||||
size_t fAlignment; // TODO: put this to debug mode
|
size_t fAlignment;
|
||||||
mutable Region* fRegionPtr;
|
mutable Region* fRegionPtr;
|
||||||
mutable char* fLocalPtr;
|
mutable char* fLocalPtr;
|
||||||
|
|
||||||
|
|
|
@ -40,34 +40,34 @@ void RunPushPullWithMsgResize(const string& transport, const string& _address)
|
||||||
pull.Connect(address);
|
pull.Connect(address);
|
||||||
|
|
||||||
{
|
{
|
||||||
FairMQMessagePtr outMsg(push.NewMessage(1000));
|
FairMQMessagePtr outMsg(push.NewMessage(6));
|
||||||
ASSERT_EQ(outMsg->GetSize(), 1000);
|
ASSERT_EQ(outMsg->GetSize(), 6);
|
||||||
memcpy(outMsg->GetData(), "ABC", 3);
|
memcpy(outMsg->GetData(), "ABCDEF", 6);
|
||||||
ASSERT_EQ(outMsg->SetUsedSize(500), true);
|
ASSERT_EQ(outMsg->SetUsedSize(5), true);
|
||||||
ASSERT_EQ(outMsg->SetUsedSize(500), true);
|
ASSERT_EQ(outMsg->SetUsedSize(5), true);
|
||||||
ASSERT_EQ(outMsg->SetUsedSize(700), false);
|
ASSERT_EQ(outMsg->SetUsedSize(7), false);
|
||||||
ASSERT_EQ(outMsg->GetSize(), 500);
|
ASSERT_EQ(outMsg->GetSize(), 5);
|
||||||
// check if the data is still intact
|
// check if the data is still intact
|
||||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
||||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
||||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
|
||||||
ASSERT_EQ(outMsg->SetUsedSize(250), true);
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[3], 'D');
|
||||||
ASSERT_EQ(outMsg->GetSize(), 250);
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[4], 'E');
|
||||||
|
ASSERT_EQ(outMsg->SetUsedSize(2), true);
|
||||||
|
ASSERT_EQ(outMsg->GetSize(), 2);
|
||||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[0], 'A');
|
||||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[1], 'B');
|
||||||
ASSERT_EQ(static_cast<char*>(outMsg->GetData())[2], 'C');
|
|
||||||
FairMQMessagePtr msgCopy(push.NewMessage());
|
FairMQMessagePtr msgCopy(push.NewMessage());
|
||||||
msgCopy->Copy(*outMsg);
|
msgCopy->Copy(*outMsg);
|
||||||
ASSERT_EQ(msgCopy->GetSize(), 250);
|
ASSERT_EQ(msgCopy->GetSize(), 2);
|
||||||
|
|
||||||
ASSERT_EQ(push.Send(outMsg), 250);
|
ASSERT_EQ(push.Send(outMsg), 2);
|
||||||
|
|
||||||
FairMQMessagePtr inMsg(pull.NewMessage());
|
FairMQMessagePtr inMsg(pull.NewMessage());
|
||||||
ASSERT_EQ(pull.Receive(inMsg), 250);
|
ASSERT_EQ(pull.Receive(inMsg), 2);
|
||||||
ASSERT_EQ(inMsg->GetSize(), 250);
|
ASSERT_EQ(inMsg->GetSize(), 2);
|
||||||
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
|
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[0], 'A');
|
||||||
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
|
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[1], 'B');
|
||||||
ASSERT_EQ(static_cast<char*>(inMsg->GetData())[2], 'C');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue
Block a user