shm: check result of region acquisition

This commit is contained in:
Alexey Rybalchenko 2021-04-09 11:40:36 +02:00
parent 2ca62d06db
commit 8a2641d842
5 changed files with 29 additions and 16 deletions

View File

@ -37,8 +37,8 @@ void Sampler::InitTask()
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event LOG(info) << "Region event: " << info.event << ": "
<< ", managed: " << info.managed << (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id << ", id: " << info.id
<< ", ptr: " << info.ptr << ", ptr: " << info.ptr
<< ", size: " << info.size << ", size: " << info.size

View File

@ -30,8 +30,8 @@ void Sink::InitTask()
// Get the fMaxIterations value from the command line options (via fConfig) // Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(info) << "Region event: " << info.event LOG(info) << "Region event: " << info.event << ": "
<< ", managed: " << info.managed << (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id << ", id: " << info.id
<< ", ptr: " << info.ptr << ", ptr: " << info.ptr
<< ", size: " << info.size << ", size: " << info.size

View File

@ -15,11 +15,11 @@ SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10" # SAMPLER+=" --rate 10"
SAMPLER+=" --transport shmem" SAMPLER+=" --transport shmem"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
SINK="fairmq-ex-region-sink" SINK="fairmq-ex-region-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --transport shmem" SINK+=" --transport shmem"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK & xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@ -322,7 +322,6 @@ class Manager
fRegionEventsCV.notify_all(); fRegionEventsCV.notify_all();
return result; return result;
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?"; LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what(); LOG(error) << e.what();
@ -377,7 +376,7 @@ class Manager
LOG(error) << oor.what(); LOG(error) << oor.what();
return nullptr; return nullptr;
} catch (boost::interprocess::interprocess_exception& e) { } catch (boost::interprocess::interprocess_exception& e) {
LOG(warn) << "Could not get remote region for id '" << id << "'"; LOG(error) << "Could not get remote region for id '" << id << "': " << e.what();
return nullptr; return nullptr;
} }
} }
@ -413,8 +412,12 @@ class Manager
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created; info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (!e.second.fDestroyed) { if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id); auto region = GetRegionUnsafe(info.id);
info.ptr = region->fRegion.get_address(); if (region) {
info.size = region->fRegion.get_size(); info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
} else {
throw std::runtime_error(tools::ToString("GetRegionInfoUnsafe() could not get region with id '", info.id, "'"));
}
} else { } else {
info.ptr = nullptr; info.ptr = nullptr;
info.size = 0; info.size = 0;

View File

@ -85,13 +85,23 @@ struct Region
LOG(debug) << "shmem: initialized file: " << fName; LOG(debug) << "shmem: initialized file: " << fName;
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags); fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
} else { } else {
if (fRemote) { try {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write); if (fRemote) {
} else { fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write); } else {
fShmemObject.truncate(size); fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size);
}
} catch(interprocess_exception& e) {
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
throw;
}
try {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
} catch(interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
throw;
} }
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
} }
InitializeQueues(); InitializeQueues();