mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Batch Region ack messages
Reduce CPU utilization by batching release ack messages on the IPC queue.
This commit is contained in:
parent
9b326c7a71
commit
2ed2177555
|
@ -327,50 +327,19 @@ void FairMQMessageSHM::CloseMessage()
|
|||
}
|
||||
else
|
||||
{
|
||||
// send notification back to the receiver
|
||||
// RegionBlock block(fHandle, fSize);
|
||||
// if (fManager.GetRegionQueue(fRegionId).try_send(static_cast<void*>(&block), sizeof(RegionBlock), 0))
|
||||
// {
|
||||
// // LOG(info) << "true";
|
||||
// }
|
||||
// // else
|
||||
// // {
|
||||
// // LOG(debug) << "could not send ack";
|
||||
// // }
|
||||
|
||||
// timed version
|
||||
RegionBlock block(fHandle, fSize, fHint);
|
||||
bool success = false;
|
||||
do
|
||||
if (!fRegionPtr)
|
||||
{
|
||||
auto sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200);
|
||||
if (!fRegionPtr)
|
||||
{
|
||||
fRegionPtr = fManager.GetRemoteRegion(fRegionId);
|
||||
}
|
||||
if (fRegionPtr)
|
||||
{
|
||||
// LOG(debug) << "sending ack";
|
||||
if (fRegionPtr->fQueue->timed_send(&block, sizeof(RegionBlock), 0, sndTill))
|
||||
{
|
||||
success = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (fInterrupted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
LOG(debug) << "region ack queue is full, retrying...";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack";
|
||||
success = true;
|
||||
}
|
||||
fRegionPtr = fManager.GetRemoteRegion(fRegionId);
|
||||
}
|
||||
|
||||
if (fRegionPtr)
|
||||
{
|
||||
fRegionPtr->ReleaseBlock({fHandle, fSize, fHint});
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack";
|
||||
}
|
||||
while (!success);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ namespace mq
|
|||
namespace shmem
|
||||
{
|
||||
|
||||
std::unordered_map<uint64_t, Region> Manager::fRegions;
|
||||
std::unordered_map<uint64_t, std::unique_ptr<Region>> Manager::fRegions;
|
||||
|
||||
Manager::Manager(const string& name, size_t size)
|
||||
: fSessionName(name)
|
||||
|
@ -43,7 +43,7 @@ void Manager::Resume()
|
|||
// close remote regions before processing new transfers
|
||||
for (auto it = fRegions.begin(); it != fRegions.end(); /**/)
|
||||
{
|
||||
if (it->second.fRemote)
|
||||
if (it->second->fRemote)
|
||||
{
|
||||
it = fRegions.erase(it);
|
||||
}
|
||||
|
@ -64,11 +64,11 @@ bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id,
|
|||
}
|
||||
else
|
||||
{
|
||||
auto r = fRegions.emplace(id, Region{*this, id, size, false, callback});
|
||||
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, size, false, callback));
|
||||
|
||||
r.first->second.StartReceivingAcks();
|
||||
r.first->second->StartReceivingAcks();
|
||||
|
||||
return &(r.first->second.fRegion);
|
||||
return &(r.first->second->fRegion);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,14 +78,14 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
|
|||
auto it = fRegions.find(id);
|
||||
if (it != fRegions.end())
|
||||
{
|
||||
return &(it->second);
|
||||
return it->second.get();
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
auto r = fRegions.emplace(id, Region{*this, id, 0, true, nullptr});
|
||||
return &(r.first->second);
|
||||
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, 0, true, nullptr));
|
||||
return r.first->second.get();
|
||||
}
|
||||
catch (bipc::interprocess_exception& e)
|
||||
{
|
||||
|
|
|
@ -66,7 +66,7 @@ class Manager
|
|||
std::string fManagementSegmentName;
|
||||
boost::interprocess::managed_shared_memory fSegment;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
static std::unordered_map<uint64_t, Region> fRegions;
|
||||
static std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
|
||||
};
|
||||
|
||||
} // namespace shmem
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include <chrono>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace bipc = ::boost::interprocess;
|
||||
|
@ -32,7 +34,8 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ
|
|||
, fQueueName("fmq_" + fManager.fSessionName +"_rgq_" + to_string(id))
|
||||
, fShmemObject()
|
||||
, fQueue(nullptr)
|
||||
, fWorker()
|
||||
, fReceiveAcksWorker()
|
||||
, fSendAcksWorker()
|
||||
, fCallback(callback)
|
||||
{
|
||||
if (fRemote)
|
||||
|
@ -49,52 +52,118 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ
|
|||
LOG(debug) << "shmem: created region: " << fName;
|
||||
fShmemObject.truncate(size);
|
||||
|
||||
fQueue = fair::mq::tools::make_unique<bipc::message_queue>(bipc::create_only, fQueueName.c_str(), 10000, sizeof(RegionBlock));
|
||||
fQueue = fair::mq::tools::make_unique<bipc::message_queue>(bipc::create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
|
||||
LOG(debug) << "shmem: created region queue: " << fQueueName;
|
||||
}
|
||||
fRegion = bipc::mapped_region(fShmemObject, bipc::read_write); // TODO: add HUGEPAGES flag here
|
||||
// fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_ANONYMOUS | MAP_HUGETLB);
|
||||
|
||||
fSendAcksWorker = std::thread(&Region::SendAcks, this);
|
||||
}
|
||||
|
||||
void Region::StartReceivingAcks()
|
||||
{
|
||||
fWorker = std::thread(&Region::ReceiveAcks, this);
|
||||
fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this);
|
||||
}
|
||||
|
||||
void Region::ReceiveAcks()
|
||||
{
|
||||
unsigned int priority;
|
||||
bipc::message_queue::size_type recvdSize;
|
||||
std::unique_ptr<RegionBlock[]> blocks = fair::mq::tools::make_unique<RegionBlock[]>(fAckBunchSize);
|
||||
|
||||
while (!fStop) // end thread condition (should exist until region is destroyed)
|
||||
{
|
||||
auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200);
|
||||
RegionBlock block;
|
||||
if (fQueue->timed_receive(&block, sizeof(RegionBlock), recvdSize, priority, rcvTill))
|
||||
auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(500);
|
||||
|
||||
while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill))
|
||||
{
|
||||
// LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId;
|
||||
if (fCallback)
|
||||
{
|
||||
fCallback(reinterpret_cast<char*>(fRegion.get_address()) + block.fHandle, block.fSize, reinterpret_cast<void*>(block.fHint));
|
||||
const auto numBlocks = recvdSize / sizeof(RegionBlock);
|
||||
for (size_t i = 0; i < numBlocks; i++)
|
||||
{
|
||||
fCallback(reinterpret_cast<char*>(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast<void*>(blocks[i].fHint));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// LOG(debug) << "queue " << fQueueName << " timeout!";
|
||||
}
|
||||
} // while !fStop
|
||||
|
||||
LOG(debug) << "worker for " << fName << " leaving.";
|
||||
LOG(debug) << "receive ack worker for " << fName << " leaving.";
|
||||
}
|
||||
|
||||
void Region::ReleaseBlock(const RegionBlock &block)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(fBlockLock);
|
||||
|
||||
fBlocksToFree.emplace_back(block);
|
||||
|
||||
if (fBlocksToFree.size() >= fAckBunchSize)
|
||||
{
|
||||
lock.unlock(); // reduces contention on fBlockLock
|
||||
fBlockSendCV.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void Region::SendAcks()
|
||||
{
|
||||
std::unique_ptr<RegionBlock[]> blocks = fair::mq::tools::make_unique<RegionBlock[]>(fAckBunchSize);
|
||||
|
||||
while (true) // we'll try to send all acks before stopping
|
||||
{
|
||||
size_t blocksToSend = 0;
|
||||
|
||||
{ // mutex locking block
|
||||
std::unique_lock<std::mutex> lock(fBlockLock);
|
||||
|
||||
// try to get more blocks without waiting (we can miss a notify from CloseMessage())
|
||||
if (!fStop && (fBlocksToFree.size() < fAckBunchSize))
|
||||
{
|
||||
// cv.wait() timeout: send whatever blocks we have
|
||||
fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
|
||||
}
|
||||
|
||||
blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
|
||||
|
||||
std::copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
|
||||
fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
|
||||
} // unlock the block mutex here while sending over IPC
|
||||
|
||||
if (blocksToSend > 0)
|
||||
{
|
||||
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop)
|
||||
{
|
||||
// receiver slow? yield and try again...
|
||||
this_thread::yield();
|
||||
}
|
||||
}
|
||||
else // blocksToSend == 0
|
||||
{
|
||||
if (fStop)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG(debug) << "send ack worker for " << fName << " leaving.";
|
||||
}
|
||||
|
||||
Region::~Region()
|
||||
{
|
||||
fStop = true;
|
||||
|
||||
if (fSendAcksWorker.joinable())
|
||||
{
|
||||
fSendAcksWorker.join();
|
||||
}
|
||||
|
||||
if (!fRemote)
|
||||
{
|
||||
fStop = true;
|
||||
if (fWorker.joinable())
|
||||
if (fReceiveAcksWorker.joinable())
|
||||
{
|
||||
fWorker.join();
|
||||
fReceiveAcksWorker.join();
|
||||
}
|
||||
|
||||
if (bipc::shared_memory_object::remove(fName.c_str()))
|
||||
|
|
|
@ -19,11 +19,14 @@
|
|||
#include "FairMQUnmanagedRegion.h"
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace fair
|
||||
|
@ -47,6 +50,9 @@ struct Region
|
|||
void StartReceivingAcks();
|
||||
void ReceiveAcks();
|
||||
|
||||
void ReleaseBlock(const RegionBlock &);
|
||||
void SendAcks();
|
||||
|
||||
~Region();
|
||||
|
||||
Manager& fManager;
|
||||
|
@ -56,8 +62,15 @@ struct Region
|
|||
std::string fQueueName;
|
||||
boost::interprocess::shared_memory_object fShmemObject;
|
||||
boost::interprocess::mapped_region fRegion;
|
||||
|
||||
std::mutex fBlockLock;
|
||||
std::condition_variable fBlockSendCV;
|
||||
std::vector<RegionBlock> fBlocksToFree;
|
||||
const std::size_t fAckBunchSize = 256;
|
||||
std::unique_ptr<boost::interprocess::message_queue> fQueue;
|
||||
std::thread fWorker;
|
||||
|
||||
std::thread fReceiveAcksWorker;
|
||||
std::thread fSendAcksWorker;
|
||||
FairMQRegionCallback fCallback;
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user