mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Move shmem prototype out of MQ examples
This commit is contained in:
parent
b7d97f6306
commit
aa8d16ff9a
|
@ -54,6 +54,7 @@ endif()
|
||||||
if(BUILD_TESTING)
|
if(BUILD_TESTING)
|
||||||
add_subdirectory(test)
|
add_subdirectory(test)
|
||||||
endif()
|
endif()
|
||||||
|
add_subdirectory(shmem/prototype)
|
||||||
|
|
||||||
##########################
|
##########################
|
||||||
# libFairMQ header files #
|
# libFairMQ header files #
|
||||||
|
|
76
fairmq/shmem/prototype/CMakeLists.txt
Normal file
76
fairmq/shmem/prototype/CMakeLists.txt
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
################################################################################
|
||||||
|
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||||
|
# #
|
||||||
|
# This software is distributed under the terms of the #
|
||||||
|
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
|
||||||
|
# copied verbatim in the file "LICENSE" #
|
||||||
|
################################################################################
|
||||||
|
|
||||||
|
configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/shm-prototype.json
|
||||||
|
${CMAKE_BINARY_DIR}/bin/config/shm-prototype.json)
|
||||||
|
configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/startShmPrototype.sh.in
|
||||||
|
${CMAKE_BINARY_DIR}/bin/prototype/shmem/startShmPrototype.sh)
|
||||||
|
|
||||||
|
Set(INCLUDE_DIRECTORIES
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||||
|
${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype
|
||||||
|
${CMAKE_CURRENT_BINARY_DIR}
|
||||||
|
)
|
||||||
|
|
||||||
|
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||||
|
${Boost_INCLUDE_DIR}
|
||||||
|
${ZeroMQ_INCLUDE_DIR}
|
||||||
|
)
|
||||||
|
|
||||||
|
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||||
|
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||||
|
|
||||||
|
Set(LINK_DIRECTORIES
|
||||||
|
${Boost_LIBRARY_DIRS}
|
||||||
|
)
|
||||||
|
|
||||||
|
Link_Directories(${LINK_DIRECTORIES})
|
||||||
|
|
||||||
|
Set(SRCS
|
||||||
|
"FairMQShmPrototypeSampler.cxx"
|
||||||
|
"FairMQShmPrototypeSink.cxx"
|
||||||
|
)
|
||||||
|
|
||||||
|
Set(DEPENDENCIES
|
||||||
|
${DEPENDENCIES}
|
||||||
|
${Boost_INTERPROCESS_LIBRARY}
|
||||||
|
FairMQ
|
||||||
|
)
|
||||||
|
|
||||||
|
Set(LIBRARY_NAME FairMQShmPrototype)
|
||||||
|
|
||||||
|
GENERATE_LIBRARY()
|
||||||
|
|
||||||
|
Set(Exe_Names
|
||||||
|
shm-prototype-sampler
|
||||||
|
shm-prototype-sink
|
||||||
|
)
|
||||||
|
|
||||||
|
Set(Exe_Source
|
||||||
|
runShmPrototypeSampler.cxx
|
||||||
|
runShmPrototypeSink.cxx
|
||||||
|
)
|
||||||
|
|
||||||
|
list(LENGTH Exe_Names _length)
|
||||||
|
math(EXPR _length ${_length}-1)
|
||||||
|
|
||||||
|
set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/prototype/shmem")
|
||||||
|
|
||||||
|
ForEach(_file RANGE 0 ${_length})
|
||||||
|
list(GET Exe_Names ${_file} _name)
|
||||||
|
list(GET Exe_Source ${_file} _src)
|
||||||
|
set(EXE_NAME ${_name})
|
||||||
|
set(SRCS ${_src})
|
||||||
|
set(DEPENDENCIES FairMQShmPrototype)
|
||||||
|
GENERATE_EXECUTABLE()
|
||||||
|
EndForEach(_file RANGE 0 ${_length})
|
227
fairmq/shmem/prototype/FairMQShmPrototypeSampler.cxx
Normal file
227
fairmq/shmem/prototype/FairMQShmPrototypeSampler.cxx
Normal file
|
@ -0,0 +1,227 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
/**
|
||||||
|
* FairMQShmPrototypeSampler.cpp
|
||||||
|
*
|
||||||
|
* @since 2016-04-08
|
||||||
|
* @author A. Rybalchenko
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <chrono>
|
||||||
|
#include <iomanip>
|
||||||
|
|
||||||
|
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||||
|
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
|
||||||
|
|
||||||
|
#include "FairMQShmPrototypeSampler.h"
|
||||||
|
#include "FairMQProgOptions.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
#include "ShmChunk.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
|
FairMQShmPrototypeSampler::FairMQShmPrototypeSampler()
|
||||||
|
: fMsgSize(10000)
|
||||||
|
, fMsgCounter(0)
|
||||||
|
, fMsgRate(1)
|
||||||
|
, fBytesOut(0)
|
||||||
|
, fMsgOut(0)
|
||||||
|
, fBytesOutNew(0)
|
||||||
|
, fMsgOutNew(0)
|
||||||
|
{
|
||||||
|
if (shared_memory_object::remove("FairMQSharedMemoryPrototype"))
|
||||||
|
{
|
||||||
|
LOG(INFO) << "Successfully removed shared memory upon device start.";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(INFO) << "Did not remove shared memory upon device start.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQShmPrototypeSampler::~FairMQShmPrototypeSampler()
|
||||||
|
{
|
||||||
|
if (shared_memory_object::remove("FairMQSharedMemoryPrototype"))
|
||||||
|
{
|
||||||
|
LOG(INFO) << "Successfully removed shared memory after the device has stopped.";
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(INFO) << "Did not remove shared memory after the device stopped. Still in use?";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQShmPrototypeSampler::Init()
|
||||||
|
{
|
||||||
|
fMsgSize = fConfig->GetValue<int>("msg-size");
|
||||||
|
fMsgRate = fConfig->GetValue<int>("msg-rate");
|
||||||
|
|
||||||
|
SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000);
|
||||||
|
LOG(INFO) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are "
|
||||||
|
<< SegmentManager::Instance().Segment()->get_free_memory() << " bytes.";
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQShmPrototypeSampler::Run()
|
||||||
|
{
|
||||||
|
// count sent messages (also used in creating ShmChunk container ID)
|
||||||
|
static uint64_t numSentMsgs = 0;
|
||||||
|
|
||||||
|
LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize;
|
||||||
|
|
||||||
|
// start rate logger and acknowledgement listener in separate threads
|
||||||
|
thread rateLogger(&FairMQShmPrototypeSampler::Log, this, 1000);
|
||||||
|
// thread resetMsgCounter(&FairMQShmPrototypeSampler::ResetMsgCounter, this);
|
||||||
|
|
||||||
|
// int charnum = 97;
|
||||||
|
|
||||||
|
while (CheckCurrentState(RUNNING))
|
||||||
|
{
|
||||||
|
void* ptr = nullptr;
|
||||||
|
bipc::managed_shared_memory::handle_t handle;
|
||||||
|
|
||||||
|
while (!ptr)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ptr = SegmentManager::Instance().Segment()->allocate(fMsgSize);
|
||||||
|
}
|
||||||
|
catch (bipc::bad_alloc& ba)
|
||||||
|
{
|
||||||
|
this_thread::sleep_for(chrono::milliseconds(50));
|
||||||
|
if (CheckCurrentState(RUNNING))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// // ShmChunk container ID
|
||||||
|
// string chunkID = "c" + to_string(numSentMsgs);
|
||||||
|
// // shared pointer ID
|
||||||
|
// string ownerID = "o" + to_string(numSentMsgs);
|
||||||
|
|
||||||
|
// ShPtrOwner* owner = nullptr;
|
||||||
|
|
||||||
|
// try
|
||||||
|
// {
|
||||||
|
// owner = SegmentManager::Instance().Segment()->construct<ShPtrOwner>(ownerID.c_str())(
|
||||||
|
// make_managed_shared_ptr(SegmentManager::Instance().Segment()->construct<ShmChunk>(chunkID.c_str())(fMsgSize),
|
||||||
|
// *(SegmentManager::Instance().Segment())));
|
||||||
|
// }
|
||||||
|
// catch (bipc::bad_alloc& ba)
|
||||||
|
// {
|
||||||
|
// LOG(WARN) << "Shared memory full...";
|
||||||
|
// this_thread::sleep_for(chrono::milliseconds(100));
|
||||||
|
// continue;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// void* ptr = owner->fPtr->GetData();
|
||||||
|
|
||||||
|
// write something to memory, otherwise only (incomplete) allocation will be measured
|
||||||
|
// memset(ptr, 0, fMsgSize);
|
||||||
|
|
||||||
|
// static_cast<char*>(ptr)[3] = charnum++;
|
||||||
|
// if (charnum == 123)
|
||||||
|
// {
|
||||||
|
// charnum = 97;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// LOG(DEBUG) << "chunk handle: " << owner->fPtr->GetHandle();
|
||||||
|
// LOG(DEBUG) << "chunk size: " << owner->fPtr->GetSize();
|
||||||
|
// LOG(DEBUG) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count();
|
||||||
|
|
||||||
|
// char* cptr = static_cast<char*>(ptr);
|
||||||
|
// LOG(DEBUG) << "check: " << cptr[3];
|
||||||
|
|
||||||
|
// FairMQMessagePtr msg(NewSimpleMessage(ownerID));
|
||||||
|
|
||||||
|
if (ptr)
|
||||||
|
{
|
||||||
|
handle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr);
|
||||||
|
FairMQMessagePtr msg(NewMessage(sizeof(ExMetaHeader)));
|
||||||
|
ExMetaHeader* metaPtr = new(msg->GetData()) ExMetaHeader();
|
||||||
|
metaPtr->fSize = fMsgSize;
|
||||||
|
metaPtr->fHandle = handle;
|
||||||
|
|
||||||
|
// LOG(INFO) << metaPtr->fSize;
|
||||||
|
// LOG(INFO) << metaPtr->fHandle;
|
||||||
|
// LOG(WARN) << ptr;
|
||||||
|
|
||||||
|
if (Send(msg, "meta", 0) > 0)
|
||||||
|
{
|
||||||
|
fBytesOutNew += fMsgSize;
|
||||||
|
++fMsgOutNew;
|
||||||
|
++numSentMsgs;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
SegmentManager::Instance().Segment()->deallocate(ptr);
|
||||||
|
// SegmentManager::Instance().Segment()->destroy_ptr(owner);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --fMsgCounter;
|
||||||
|
// while (fMsgCounter == 0)
|
||||||
|
// {
|
||||||
|
// this_thread::sleep_for(chrono::milliseconds(1));
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state.";
|
||||||
|
|
||||||
|
rateLogger.join();
|
||||||
|
// resetMsgCounter.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQShmPrototypeSampler::Log(const int intervalInMs)
|
||||||
|
{
|
||||||
|
timestamp_t t0 = get_timestamp();
|
||||||
|
timestamp_t t1;
|
||||||
|
timestamp_t msSinceLastLog;
|
||||||
|
|
||||||
|
double mbPerSecOut = 0;
|
||||||
|
double msgPerSecOut = 0;
|
||||||
|
|
||||||
|
while (CheckCurrentState(RUNNING))
|
||||||
|
{
|
||||||
|
t1 = get_timestamp();
|
||||||
|
|
||||||
|
msSinceLastLog = (t1 - t0) / 1000.0L;
|
||||||
|
|
||||||
|
mbPerSecOut = (static_cast<double>(fBytesOutNew - fBytesOut) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
|
||||||
|
fBytesOut = fBytesOutNew;
|
||||||
|
|
||||||
|
msgPerSecOut = static_cast<double>(fMsgOutNew - fMsgOut) / static_cast<double>(msSinceLastLog) * 1000.;
|
||||||
|
fMsgOut = fMsgOutNew;
|
||||||
|
|
||||||
|
LOG(DEBUG) << fixed
|
||||||
|
<< setprecision(0) << "out: " << msgPerSecOut << " msg ("
|
||||||
|
<< setprecision(2) << mbPerSecOut << " MB)\t("
|
||||||
|
<< SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)";
|
||||||
|
|
||||||
|
t0 = t1;
|
||||||
|
this_thread::sleep_for(chrono::milliseconds(intervalInMs));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQShmPrototypeSampler::ResetMsgCounter()
|
||||||
|
{
|
||||||
|
while (CheckCurrentState(RUNNING))
|
||||||
|
{
|
||||||
|
fMsgCounter = fMsgRate / 100;
|
||||||
|
this_thread::sleep_for(chrono::milliseconds(10));
|
||||||
|
}
|
||||||
|
}
|
45
fairmq/shmem/prototype/FairMQShmPrototypeSampler.h
Normal file
45
fairmq/shmem/prototype/FairMQShmPrototypeSampler.h
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
/**
|
||||||
|
* FairMQShmPrototypeSampler.h
|
||||||
|
*
|
||||||
|
* @since 2016-04-08
|
||||||
|
* @author A. Rybalchenko
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSHMPROTOTYPESAMPLER_H_
|
||||||
|
#define FAIRMQSHMPROTOTYPESAMPLER_H_
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
|
||||||
|
class FairMQShmPrototypeSampler : public FairMQDevice
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQShmPrototypeSampler();
|
||||||
|
virtual ~FairMQShmPrototypeSampler();
|
||||||
|
|
||||||
|
void Log(const int intervalInMs);
|
||||||
|
void ResetMsgCounter();
|
||||||
|
|
||||||
|
protected:
|
||||||
|
unsigned int fMsgSize;
|
||||||
|
unsigned int fMsgCounter;
|
||||||
|
unsigned int fMsgRate;
|
||||||
|
|
||||||
|
unsigned long long fBytesOut;
|
||||||
|
unsigned long long fMsgOut;
|
||||||
|
std::atomic<unsigned long long> fBytesOutNew;
|
||||||
|
std::atomic<unsigned long long> fMsgOutNew;
|
||||||
|
|
||||||
|
virtual void Init();
|
||||||
|
virtual void Run();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSHMPROTOTYPESAMPLER_H_ */
|
145
fairmq/shmem/prototype/FairMQShmPrototypeSink.cxx
Normal file
145
fairmq/shmem/prototype/FairMQShmPrototypeSink.cxx
Normal file
|
@ -0,0 +1,145 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
/**
|
||||||
|
* FairMQShmPrototypeSink.cxx
|
||||||
|
*
|
||||||
|
* @since 2016-04-08
|
||||||
|
* @author A. Rybalchenko
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <chrono>
|
||||||
|
#include <iomanip>
|
||||||
|
|
||||||
|
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||||
|
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
|
||||||
|
|
||||||
|
#include "FairMQShmPrototypeSink.h"
|
||||||
|
#include "FairMQProgOptions.h"
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
#include "ShmChunk.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
|
FairMQShmPrototypeSink::FairMQShmPrototypeSink()
|
||||||
|
: fBytesIn(0)
|
||||||
|
, fMsgIn(0)
|
||||||
|
, fBytesInNew(0)
|
||||||
|
, fMsgInNew(0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQShmPrototypeSink::~FairMQShmPrototypeSink()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQShmPrototypeSink::Init()
|
||||||
|
{
|
||||||
|
SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000);
|
||||||
|
LOG(INFO) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are "
|
||||||
|
<< SegmentManager::Instance().Segment()->get_free_memory() << " bytes.";
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQShmPrototypeSink::Run()
|
||||||
|
{
|
||||||
|
static uint64_t numReceivedMsgs = 0;
|
||||||
|
|
||||||
|
thread rateLogger(&FairMQShmPrototypeSink::Log, this, 1000);
|
||||||
|
|
||||||
|
while (CheckCurrentState(RUNNING))
|
||||||
|
{
|
||||||
|
FairMQMessagePtr msg(NewMessage());
|
||||||
|
|
||||||
|
if (Receive(msg, "meta") > 0)
|
||||||
|
{
|
||||||
|
ExMetaHeader* hdr = static_cast<ExMetaHeader*>(msg->GetData());
|
||||||
|
size_t size = hdr->fSize;
|
||||||
|
bipc::managed_shared_memory::handle_t handle = hdr->fHandle;
|
||||||
|
void* ptr = SegmentManager::Instance().Segment()->get_address_from_handle(handle);
|
||||||
|
|
||||||
|
// LOG(INFO) << size;
|
||||||
|
// LOG(INFO) << handle;
|
||||||
|
// LOG(WARN) << ptr;
|
||||||
|
|
||||||
|
fBytesInNew += size;
|
||||||
|
++fMsgInNew;
|
||||||
|
SegmentManager::Instance().Segment()->deallocate(ptr);
|
||||||
|
|
||||||
|
// get the shared pointer ID from the received message
|
||||||
|
// string ownerID(static_cast<char*>(msg->GetData()), msg->GetSize());
|
||||||
|
|
||||||
|
// find the shared pointer in shared memory with its ID
|
||||||
|
// ShPtrOwner* owner = SegmentManager::Instance().Segment()->find<ShPtrOwner>(ownerID.c_str()).first;
|
||||||
|
// LOG(DEBUG) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count();
|
||||||
|
|
||||||
|
|
||||||
|
// if (owner)
|
||||||
|
// {
|
||||||
|
// // void* ptr = owner->fPtr->GetData();
|
||||||
|
|
||||||
|
// // LOG(DEBUG) << "chunk handle: " << owner->fPtr->GetHandle();
|
||||||
|
// // LOG(DEBUG) << "chunk size: " << owner->fPtr->GetSize();
|
||||||
|
|
||||||
|
// fBytesInNew += owner->fPtr->GetSize();
|
||||||
|
// ++fMsgInNew;
|
||||||
|
|
||||||
|
// // char* cptr = static_cast<char*>(ptr);
|
||||||
|
// // LOG(DEBUG) << "check: " << cptr[3];
|
||||||
|
|
||||||
|
// SegmentManager::Instance().Segment()->deallocate(ptr);
|
||||||
|
|
||||||
|
// // SegmentManager::Instance().Segment()->destroy_ptr(owner);
|
||||||
|
// }
|
||||||
|
// else
|
||||||
|
// {
|
||||||
|
// LOG(WARN) << "Shared pointer is zero.";
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
++numReceivedMsgs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(INFO) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state.";
|
||||||
|
|
||||||
|
rateLogger.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQShmPrototypeSink::Log(const int intervalInMs)
|
||||||
|
{
|
||||||
|
timestamp_t t0 = get_timestamp();
|
||||||
|
timestamp_t t1;
|
||||||
|
timestamp_t msSinceLastLog;
|
||||||
|
|
||||||
|
double mbPerSecIn = 0;
|
||||||
|
double msgPerSecIn = 0;
|
||||||
|
|
||||||
|
while (CheckCurrentState(RUNNING))
|
||||||
|
{
|
||||||
|
t1 = get_timestamp();
|
||||||
|
|
||||||
|
msSinceLastLog = (t1 - t0) / 1000.0L;
|
||||||
|
|
||||||
|
mbPerSecIn = (static_cast<double>(fBytesInNew - fBytesIn) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
|
||||||
|
fBytesIn = fBytesInNew;
|
||||||
|
|
||||||
|
msgPerSecIn = static_cast<double>(fMsgInNew - fMsgIn) / static_cast<double>(msSinceLastLog) * 1000.;
|
||||||
|
fMsgIn = fMsgInNew;
|
||||||
|
|
||||||
|
LOG(DEBUG) << fixed
|
||||||
|
<< setprecision(0) << "in: " << msgPerSecIn << " msg ("
|
||||||
|
<< setprecision(2) << mbPerSecIn << " MB)\t("
|
||||||
|
<< SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)";
|
||||||
|
|
||||||
|
t0 = t1;
|
||||||
|
this_thread::sleep_for(chrono::milliseconds(intervalInMs));
|
||||||
|
}
|
||||||
|
}
|
40
fairmq/shmem/prototype/FairMQShmPrototypeSink.h
Normal file
40
fairmq/shmem/prototype/FairMQShmPrototypeSink.h
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
/**
|
||||||
|
* FairMQShmPrototypeSink.h
|
||||||
|
*
|
||||||
|
* @since 2016-04-08
|
||||||
|
* @author A. Rybalchenko
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef FAIRMQSHMPROTOTYPESINK_H_
|
||||||
|
#define FAIRMQSHMPROTOTYPESINK_H_
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
|
#include "FairMQDevice.h"
|
||||||
|
|
||||||
|
class FairMQShmPrototypeSink : public FairMQDevice
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FairMQShmPrototypeSink();
|
||||||
|
virtual ~FairMQShmPrototypeSink();
|
||||||
|
|
||||||
|
void Log(const int intervalInMs);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
unsigned long long fBytesIn;
|
||||||
|
unsigned long long fMsgIn;
|
||||||
|
std::atomic<unsigned long long> fBytesInNew;
|
||||||
|
std::atomic<unsigned long long> fMsgInNew;
|
||||||
|
|
||||||
|
virtual void Init();
|
||||||
|
virtual void Run();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif /* FAIRMQSHMPROTOTYPESINK_H_ */
|
182
fairmq/shmem/prototype/ShmChunk.h
Normal file
182
fairmq/shmem/prototype/ShmChunk.h
Normal file
|
@ -0,0 +1,182 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
/**
|
||||||
|
* ShmChunk.h
|
||||||
|
*
|
||||||
|
* @since 2016-04-08
|
||||||
|
* @author A. Rybalchenko
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef SHMCHUNK_H_
|
||||||
|
#define SHMCHUNK_H_
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
#include <chrono>
|
||||||
|
|
||||||
|
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||||
|
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
|
||||||
|
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
|
namespace bipc = boost::interprocess;
|
||||||
|
|
||||||
|
class SegmentManager
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static SegmentManager& Instance()
|
||||||
|
{
|
||||||
|
static SegmentManager man;
|
||||||
|
return man;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InitializeSegment(const std::string& op, const std::string& name, const size_t size = 0)
|
||||||
|
{
|
||||||
|
if (!fSegment)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (op == "open_or_create")
|
||||||
|
{
|
||||||
|
fSegment = new bipc::managed_shared_memory(bipc::open_or_create, name.c_str(), size);
|
||||||
|
}
|
||||||
|
else if (op == "create_only")
|
||||||
|
{
|
||||||
|
fSegment = new bipc::managed_shared_memory(bipc::create_only, name.c_str(), size);
|
||||||
|
}
|
||||||
|
else if (op == "open_only")
|
||||||
|
{
|
||||||
|
int numTries = 0;
|
||||||
|
bool success = false;
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
fSegment = new bipc::managed_shared_memory(bipc::open_only, name.c_str());
|
||||||
|
success = true;
|
||||||
|
}
|
||||||
|
catch (bipc::interprocess_exception& ie)
|
||||||
|
{
|
||||||
|
if (++numTries == 5)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "Could not open shared memory after " << numTries << " attempts, exiting!";
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(DEBUG) << "Could not open shared memory segment on try " << numTries << ". Retrying in 1 second...";
|
||||||
|
LOG(DEBUG) << ie.what();
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while (!success);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "Unknown operation when initializing shared memory segment: " << op;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (std::exception& e)
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "Exception during shared memory segment initialization: " << e.what() << ", application will now exit";
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(INFO) << "Segment already initialized";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bipc::managed_shared_memory* Segment() const
|
||||||
|
{
|
||||||
|
if (fSegment)
|
||||||
|
{
|
||||||
|
return fSegment;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOG(ERROR) << "Segment not initialized";
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
SegmentManager()
|
||||||
|
: fSegment(nullptr)
|
||||||
|
{}
|
||||||
|
|
||||||
|
bipc::managed_shared_memory* fSegment;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct alignas(16) ExMetaHeader
|
||||||
|
{
|
||||||
|
uint64_t fSize;
|
||||||
|
bipc::managed_shared_memory::handle_t fHandle;
|
||||||
|
};
|
||||||
|
|
||||||
|
// class ShmChunk
|
||||||
|
// {
|
||||||
|
// public:
|
||||||
|
// ShmChunk()
|
||||||
|
// : fHandle()
|
||||||
|
// , fSize(0)
|
||||||
|
// {
|
||||||
|
// }
|
||||||
|
|
||||||
|
// ShmChunk(const size_t size)
|
||||||
|
// : fHandle()
|
||||||
|
// , fSize(size)
|
||||||
|
// {
|
||||||
|
// void* ptr = SegmentManager::Instance().Segment()->allocate(size);
|
||||||
|
// fHandle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// ~ShmChunk()
|
||||||
|
// {
|
||||||
|
// SegmentManager::Instance().Segment()->deallocate(SegmentManager::Instance().Segment()->get_address_from_handle(fHandle));
|
||||||
|
// }
|
||||||
|
|
||||||
|
// bipc::managed_shared_memory::handle_t GetHandle() const
|
||||||
|
// {
|
||||||
|
// return fHandle;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// void* GetData() const
|
||||||
|
// {
|
||||||
|
// return SegmentManager::Instance().Segment()->get_address_from_handle(fHandle);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// size_t GetSize() const
|
||||||
|
// {
|
||||||
|
// return fSize;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// private:
|
||||||
|
// bipc::managed_shared_memory::handle_t fHandle;
|
||||||
|
// size_t fSize;
|
||||||
|
// };
|
||||||
|
|
||||||
|
// typedef bipc::managed_shared_ptr<ShmChunk, bipc::managed_shared_memory>::type ShPtrType;
|
||||||
|
|
||||||
|
// struct ShPtrOwner
|
||||||
|
// {
|
||||||
|
// ShPtrOwner(const ShPtrType& other)
|
||||||
|
// : fPtr(other)
|
||||||
|
// {}
|
||||||
|
|
||||||
|
// ShPtrOwner(const ShPtrOwner& other)
|
||||||
|
// : fPtr(other.fPtr)
|
||||||
|
// {}
|
||||||
|
|
||||||
|
// ShPtrType fPtr;
|
||||||
|
// };
|
||||||
|
|
||||||
|
#endif /* SHMCHUNK_H_ */
|
24
fairmq/shmem/prototype/runShmPrototypeSampler.cxx
Normal file
24
fairmq/shmem/prototype/runShmPrototypeSampler.cxx
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include "runFairMQDevice.h"
|
||||||
|
#include "FairMQShmPrototypeSampler.h"
|
||||||
|
|
||||||
|
namespace bpo = boost::program_options;
|
||||||
|
|
||||||
|
void addCustomOptions(bpo::options_description& options)
|
||||||
|
{
|
||||||
|
options.add_options()
|
||||||
|
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
||||||
|
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
|
{
|
||||||
|
return new FairMQShmPrototypeSampler();
|
||||||
|
}
|
21
fairmq/shmem/prototype/runShmPrototypeSink.cxx
Normal file
21
fairmq/shmem/prototype/runShmPrototypeSink.cxx
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include "runFairMQDevice.h"
|
||||||
|
#include "FairMQShmPrototypeSink.h"
|
||||||
|
|
||||||
|
namespace bpo = boost::program_options;
|
||||||
|
|
||||||
|
void addCustomOptions(bpo::options_description& /*options*/)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||||
|
{
|
||||||
|
return new FairMQShmPrototypeSink();
|
||||||
|
}
|
34
fairmq/shmem/prototype/shm-prototype.json
Normal file
34
fairmq/shmem/prototype/shm-prototype.json
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
{
|
||||||
|
"fairMQOptions": {
|
||||||
|
"devices": [
|
||||||
|
{
|
||||||
|
"id": "sampler1",
|
||||||
|
"channels": [
|
||||||
|
{
|
||||||
|
"name": "meta",
|
||||||
|
"type": "push",
|
||||||
|
"method": "bind",
|
||||||
|
"address": "tcp://127.0.0.1:5555",
|
||||||
|
"sndBufSize": 10,
|
||||||
|
"rcvBufSize": 10,
|
||||||
|
"rateLogging": 0
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "sink1",
|
||||||
|
"channels": [
|
||||||
|
{
|
||||||
|
"name": "meta",
|
||||||
|
"type": "pull",
|
||||||
|
"method": "connect",
|
||||||
|
"address": "tcp://127.0.0.1:5555",
|
||||||
|
"sndBufSize": 10,
|
||||||
|
"rcvBufSize": 10,
|
||||||
|
"rateLogging": 0
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
49
fairmq/shmem/prototype/startShmPrototype.sh.in
Executable file
49
fairmq/shmem/prototype/startShmPrototype.sh.in
Executable file
|
@ -0,0 +1,49 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
msgSize="1000000"
|
||||||
|
transport="zeromq"
|
||||||
|
|
||||||
|
if [[ $1 =~ ^[0-9]+$ ]]; then
|
||||||
|
msgSize=$1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "Starting shared memory example with message size of $msgSize bytes."
|
||||||
|
echo ""
|
||||||
|
echo "Usage: startShmPrototype [message size=1000000]"
|
||||||
|
|
||||||
|
SAMPLER="shm-prototype-sampler"
|
||||||
|
SAMPLER+=" --id sampler1"
|
||||||
|
SAMPLER+=" --transport $transport"
|
||||||
|
# SAMPLER+=" --verbosity TRACE"
|
||||||
|
SAMPLER+=" --msg-size $msgSize"
|
||||||
|
# SAMPLER+=" --msg-rate 1000"
|
||||||
|
SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
||||||
|
xterm -geometry 80x32+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SAMPLER &
|
||||||
|
|
||||||
|
SINK1="shm-prototype-sink"
|
||||||
|
SINK1+=" --id sink1"
|
||||||
|
SINK1+=" --transport $transport"
|
||||||
|
# SINK1+=" --verbose TRACE"
|
||||||
|
SINK1+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
||||||
|
xterm -geometry 80x32+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK1 &
|
||||||
|
|
||||||
|
# SINK2="shm-prototype-sink"
|
||||||
|
# SINK2+=" --id sink2"
|
||||||
|
# SINK2+=" --transport $transport"
|
||||||
|
# # SINK2+=" --verbose TRACE"
|
||||||
|
# SINK2+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
||||||
|
# xterm -geometry 80x32+500+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK2 &
|
||||||
|
|
||||||
|
# SINK3="shm-prototype-sink"
|
||||||
|
# SINK3+=" --id sink3"
|
||||||
|
# SINK3+=" --transport $transport"
|
||||||
|
# # SINK3+=" --verbose TRACE"
|
||||||
|
# SINK3+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
||||||
|
# xterm -geometry 80x32+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK3 &
|
||||||
|
|
||||||
|
# SINK4="shm-prototype-sink"
|
||||||
|
# SINK4+=" --id sink4"
|
||||||
|
# SINK4+=" --transport $transport"
|
||||||
|
# # SINK4+=" --verbose TRACE"
|
||||||
|
# SINK4+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
||||||
|
# xterm -geometry 80x32+1000+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK4 &
|
Loading…
Reference in New Issue
Block a user