Compare commits

..

4 Commits

Author SHA1 Message Date
Alexey Rybalchenko
addfd071bb Fix incorrect parameters in region example scripts 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
2d27abc533 Examples: add a script for externally created region 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
faf577086a shm: fix initialization of rc segment when region is created externally 2023-11-24 14:19:21 +01:00
Alexey Rybalchenko
ff1f9b94ef shm: include rcCountSegment free memory in the monitor output 2023-11-24 14:19:21 +01:00
7 changed files with 112 additions and 29 deletions

View File

@@ -61,7 +61,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}" @ONLY)
endforeach()
if(ARG_CONFIG)
@@ -119,7 +119,7 @@ function(add_example)
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
foreach(script IN LISTS scripts)
set(script_file "${script_prefix}-${script}.sh")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install")
configure_file("${CMAKE_CURRENT_SOURCE_DIR}/${script_file}.in" "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install" @ONLY)
install(
PROGRAMS "${CMAKE_CURRENT_BINARY_DIR}/${script_file}_install"
DESTINATION ${PROJECT_INSTALL_BINDIR}

View File

@@ -0,0 +1,95 @@
#!/bin/bash
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"
# SAMPLER+=" --sampling-rate 10"
SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --transport $transport"
SAMPLER+=" --shmid 1"
SAMPLER+=" --shm-monitor false"
SAMPLER+=" --rc-segment-size 200000000"
SAMPLER+=" --external-region true"
SAMPLER+=" --shm-no-cleanup true"
SAMPLER+=" --chan-name data1"
SAMPLER+=" --channel-config name=data1,type=push,method=bind,address=tcp://127.0.0.1:7777"
xterm -geometry 90x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
PROCESSOR1="fairmq-ex-region-processor"
PROCESSOR1+=" --id processor1"
PROCESSOR1+=" --severity debug"
PROCESSOR1+=" --transport $transport"
PROCESSOR1+=" --shmid 1"
PROCESSOR1+=" --shm-segment-id 1"
PROCESSOR1+=" --shm-monitor false"
PROCESSOR1+=" --shm-no-cleanup true"
PROCESSOR1+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR1+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7778"
PROCESSOR1+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7779"
xterm -geometry 90x40+550+40 -hold -e @EX_BIN_DIR@/$PROCESSOR1 &
PROCESSOR2="fairmq-ex-region-processor"
PROCESSOR2+=" --id processor2"
PROCESSOR2+=" --severity debug"
PROCESSOR2+=" --transport $transport"
PROCESSOR2+=" --shmid 1"
PROCESSOR2+=" --shm-segment-id 2"
PROCESSOR2+=" --shm-monitor false"
PROCESSOR2+=" --shm-no-cleanup true"
PROCESSOR2+=" --channel-config name=data1,type=pull,method=connect,address=tcp://127.0.0.1:7777"
PROCESSOR2+=" name=data2,type=push,method=bind,address=tcp://127.0.0.1:7788"
PROCESSOR2+=" name=data3,type=push,method=bind,address=tcp://127.0.0.1:7789"
xterm -geometry 90x40+550+600 -hold -e @EX_BIN_DIR@/$PROCESSOR2 &
SINK1_1="fairmq-ex-region-sink"
SINK1_1+=" --id sink1_1"
SINK1_1+=" --severity debug"
SINK1_1+=" --chan-name data2"
SINK1_1+=" --transport $transport"
SINK1_1+=" --shmid 1"
SINK1_1+=" --shm-segment-id 1"
SINK1_1+=" --shm-monitor false"
SINK1_1+=" --shm-no-cleanup true"
SINK1_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7778"
xterm -geometry 90x20+1100+0 -hold -e @EX_BIN_DIR@/$SINK1_1 &
SINK1_2="fairmq-ex-region-sink"
SINK1_2+=" --id sink1_2"
SINK1_2+=" --severity debug"
SINK1_2+=" --chan-name data3"
SINK1_2+=" --transport $transport"
SINK1_2+=" --shmid 1"
SINK1_2+=" --shm-segment-id 1"
SINK1_2+=" --shm-monitor false"
SINK1_2+=" --shm-no-cleanup true"
SINK1_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7779"
xterm -geometry 90x20+1100+300 -hold -e @EX_BIN_DIR@/$SINK1_2 &
SINK2_1="fairmq-ex-region-sink"
SINK2_1+=" --id sink2_1"
SINK2_1+=" --severity debug"
SINK2_1+=" --chan-name data2"
SINK2_1+=" --transport $transport"
SINK2_1+=" --shmid 1"
SINK2_1+=" --shm-segment-id 2"
SINK2_1+=" --shm-monitor false"
SINK2_1+=" --shm-no-cleanup true"
SINK2_1+=" --channel-config name=data2,type=pull,method=connect,address=tcp://127.0.0.1:7788"
xterm -geometry 90x20+1100+600 -hold -e @EX_BIN_DIR@/$SINK2_1 &
SINK2_2="fairmq-ex-region-sink"
SINK2_2+=" --id sink2_2"
SINK2_2+=" --severity debug"
SINK2_2+=" --chan-name data3"
SINK2_2+=" --transport $transport"
SINK2_2+=" --shmid 1"
SINK2_2+=" --shm-segment-id 2"
SINK2_2+=" --shm-monitor false"
SINK2_2+=" --shm-no-cleanup true"
SINK2_2+=" --channel-config name=data3,type=pull,method=connect,address=tcp://127.0.0.1:7789"
xterm -geometry 90x20+1100+900 -hold -e @EX_BIN_DIR@/$SINK2_2 &

View File

@@ -2,16 +2,8 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem"
msgSize="1000000"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"

View File

@@ -2,16 +2,8 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
transport="shmem"
msgSize="1000000"
if [[ $1 =~ ^[a-z]+$ ]]; then
transport=$1
fi
if [[ $2 =~ ^[0-9]+$ ]]; then
msgSize=$1
fi
transport=${1:-shmem}
msgSize=${2:-1000000}
SAMPLER="fairmq-ex-region-sampler"
SAMPLER+=" --id sampler1"

View File

@@ -323,7 +323,6 @@ class Manager
}
const uint16_t id = cfg.id.value();
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
@@ -340,6 +339,12 @@ class Manager
LOG(debug) << "Unmanaged region (view) already present, promoting to controller";
region->BecomeController(cfg);
} else {
// we need to update local config, if the region information already exists
auto info = fShmRegions->find(id);
if (info != fShmRegions->end()) {
cfg.rcSegmentSize = info->second.fRCSegmentSize;
}
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, true, cfg));
region = res.first->second.get();
}
@@ -348,7 +353,6 @@ class Manager
// start ack receiver only if a callback has been provided.
if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback);
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues();
region->StartAckSender();
region->StartAckReceiver();
@@ -401,19 +405,18 @@ class Manager
try {
RegionConfig cfg;
const uint64_t rcSegmentSize = cfg.rcSegmentSize;
// get region info
{
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
RegionInfo regionInfo = fShmRegions->at(id);
cfg.id = id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
cfg.path = regionInfo.fPath.c_str();
}
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, false, std::move(cfg)));
r.first->second->InitializeRefCountSegment(rcSegmentSize);
r.first->second->InitializeQueues();
r.first->second->StartAckSender();
return r.first->second.get();
@@ -482,6 +485,7 @@ class Manager
cfg.id = info.id;
cfg.creationFlags = regionInfo.fCreationFlags;
cfg.path = regionInfo.fPath.c_str();
cfg.rcSegmentSize = regionInfo.fRCSegmentSize;
regionCfgs.emplace(info.id, cfg);
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
} else {
@@ -503,10 +507,8 @@ class Manager
if (it != fRegions.end()) {
region = it->second.get();
} else {
const uint64_t rcSegmentSize = cfgIt->second.rcSegmentSize;
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, false, cfgIt->second));
region = r.first->second.get();
region->InitializeRefCountSegment(rcSegmentSize);
region->InitializeQueues();
region->StartAckSender();
}

View File

@@ -274,7 +274,7 @@ bool Monitor::PrintShm(const ShmId& shmId)
try {
managed_shared_memory rcCountSegment(open_read_only, MakeShmName(shmId.shmId, "rrc", id).c_str());
ss << ", rcCountSegment size: " << rcCountSegment.get_size();
ss << ", rcCountSegment size: " << rcCountSegment.get_size() << ", free: " << rcCountSegment.get_free_memory();
} catch (bie&) {
ss << ", rcCountSegment: not found";
}

View File

@@ -146,6 +146,8 @@ struct UnmanagedRegion
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
InitializeRefCountSegment(cfg.rcSegmentSize);
if (fControlling && created) {
Register(shmId, cfg);
}