mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Remove shmem prototype code - unused
This commit is contained in:
parent
cf9a2944c2
commit
985150437a
|
@ -28,7 +28,6 @@ endif()
|
||||||
##################
|
##################
|
||||||
# subdirectories #
|
# subdirectories #
|
||||||
##################
|
##################
|
||||||
# add_subdirectory(shmem/prototype)
|
|
||||||
if(BUILD_OFI_TRANSPORT)
|
if(BUILD_OFI_TRANSPORT)
|
||||||
add_subdirectory(ofi)
|
add_subdirectory(ofi)
|
||||||
endif()
|
endif()
|
||||||
|
|
|
@ -1,76 +0,0 @@
|
||||||
################################################################################
|
|
||||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
|
||||||
# #
|
|
||||||
# This software is distributed under the terms of the #
|
|
||||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
|
||||||
# copied verbatim in the file "LICENSE" #
|
|
||||||
################################################################################
|
|
||||||
|
|
||||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/shm-prototype.json
|
|
||||||
${CMAKE_BINARY_DIR}/bin/config/shm-prototype.json)
|
|
||||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype/startShmPrototype.sh.in
|
|
||||||
${CMAKE_BINARY_DIR}/bin/prototype/shmem/startShmPrototype.sh)
|
|
||||||
|
|
||||||
Set(INCLUDE_DIRECTORIES
|
|
||||||
${CMAKE_SOURCE_DIR}/fairmq
|
|
||||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
|
||||||
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
|
||||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
|
||||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
|
||||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
|
||||||
${CMAKE_SOURCE_DIR}/fairmq/shmem/prototype
|
|
||||||
${CMAKE_CURRENT_BINARY_DIR}
|
|
||||||
)
|
|
||||||
|
|
||||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
|
||||||
${Boost_INCLUDE_DIR}
|
|
||||||
${ZeroMQ_INCLUDE_DIR}
|
|
||||||
)
|
|
||||||
|
|
||||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
|
||||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
|
||||||
|
|
||||||
Set(LINK_DIRECTORIES
|
|
||||||
${Boost_LIBRARY_DIRS}
|
|
||||||
)
|
|
||||||
|
|
||||||
Link_Directories(${LINK_DIRECTORIES})
|
|
||||||
|
|
||||||
Set(SRCS
|
|
||||||
"FairMQShmPrototypeSampler.cxx"
|
|
||||||
"FairMQShmPrototypeSink.cxx"
|
|
||||||
)
|
|
||||||
|
|
||||||
Set(DEPENDENCIES
|
|
||||||
${DEPENDENCIES}
|
|
||||||
${Boost_INTERPROCESS_LIBRARY}
|
|
||||||
FairMQ
|
|
||||||
)
|
|
||||||
|
|
||||||
Set(LIBRARY_NAME FairMQShmPrototype)
|
|
||||||
|
|
||||||
GENERATE_LIBRARY()
|
|
||||||
|
|
||||||
Set(Exe_Names
|
|
||||||
shm-prototype-sampler
|
|
||||||
shm-prototype-sink
|
|
||||||
)
|
|
||||||
|
|
||||||
Set(Exe_Source
|
|
||||||
runShmPrototypeSampler.cxx
|
|
||||||
runShmPrototypeSink.cxx
|
|
||||||
)
|
|
||||||
|
|
||||||
list(LENGTH Exe_Names _length)
|
|
||||||
math(EXPR _length ${_length}-1)
|
|
||||||
|
|
||||||
set(EXECUTABLE_OUTPUT_PATH "${EXECUTABLE_OUTPUT_PATH}/prototype/shmem")
|
|
||||||
|
|
||||||
ForEach(_file RANGE 0 ${_length})
|
|
||||||
list(GET Exe_Names ${_file} _name)
|
|
||||||
list(GET Exe_Source ${_file} _src)
|
|
||||||
set(EXE_NAME ${_name})
|
|
||||||
set(SRCS ${_src})
|
|
||||||
set(DEPENDENCIES FairMQShmPrototype)
|
|
||||||
GENERATE_EXECUTABLE()
|
|
||||||
EndForEach(_file RANGE 0 ${_length})
|
|
|
@ -1,227 +0,0 @@
|
||||||
/********************************************************************************
|
|
||||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
||||||
* *
|
|
||||||
* This software is distributed under the terms of the *
|
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
||||||
* copied verbatim in the file "LICENSE" *
|
|
||||||
********************************************************************************/
|
|
||||||
/**
|
|
||||||
* FairMQShmPrototypeSampler.cpp
|
|
||||||
*
|
|
||||||
* @since 2016-04-08
|
|
||||||
* @author A. Rybalchenko
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <thread>
|
|
||||||
#include <chrono>
|
|
||||||
#include <iomanip>
|
|
||||||
|
|
||||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
|
||||||
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
|
|
||||||
|
|
||||||
#include "FairMQShmPrototypeSampler.h"
|
|
||||||
#include "FairMQProgOptions.h"
|
|
||||||
#include "FairMQLogger.h"
|
|
||||||
|
|
||||||
#include "ShmChunk.h"
|
|
||||||
|
|
||||||
using namespace std;
|
|
||||||
using namespace boost::interprocess;
|
|
||||||
|
|
||||||
FairMQShmPrototypeSampler::FairMQShmPrototypeSampler()
|
|
||||||
: fMsgSize(10000)
|
|
||||||
, fMsgCounter(0)
|
|
||||||
, fMsgRate(1)
|
|
||||||
, fBytesOut(0)
|
|
||||||
, fMsgOut(0)
|
|
||||||
, fBytesOutNew(0)
|
|
||||||
, fMsgOutNew(0)
|
|
||||||
{
|
|
||||||
if (shared_memory_object::remove("FairMQSharedMemoryPrototype"))
|
|
||||||
{
|
|
||||||
LOG(info) << "Successfully removed shared memory upon device start.";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(info) << "Did not remove shared memory upon device start.";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQShmPrototypeSampler::~FairMQShmPrototypeSampler()
|
|
||||||
{
|
|
||||||
if (shared_memory_object::remove("FairMQSharedMemoryPrototype"))
|
|
||||||
{
|
|
||||||
LOG(info) << "Successfully removed shared memory after the device has stopped.";
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(info) << "Did not remove shared memory after the device stopped. Still in use?";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQShmPrototypeSampler::Init()
|
|
||||||
{
|
|
||||||
fMsgSize = fConfig->GetValue<int>("msg-size");
|
|
||||||
fMsgRate = fConfig->GetValue<int>("msg-rate");
|
|
||||||
|
|
||||||
SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000);
|
|
||||||
LOG(info) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are "
|
|
||||||
<< SegmentManager::Instance().Segment()->get_free_memory() << " bytes.";
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQShmPrototypeSampler::Run()
|
|
||||||
{
|
|
||||||
// count sent messages (also used in creating ShmChunk container ID)
|
|
||||||
static uint64_t numSentMsgs = 0;
|
|
||||||
|
|
||||||
LOG(info) << "Starting the benchmark with message size of " << fMsgSize;
|
|
||||||
|
|
||||||
// start rate logger and acknowledgement listener in separate threads
|
|
||||||
thread rateLogger(&FairMQShmPrototypeSampler::Log, this, 1000);
|
|
||||||
// thread resetMsgCounter(&FairMQShmPrototypeSampler::ResetMsgCounter, this);
|
|
||||||
|
|
||||||
// int charnum = 97;
|
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING))
|
|
||||||
{
|
|
||||||
void* ptr = nullptr;
|
|
||||||
bipc::managed_shared_memory::handle_t handle;
|
|
||||||
|
|
||||||
while (!ptr)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
ptr = SegmentManager::Instance().Segment()->allocate(fMsgSize);
|
|
||||||
}
|
|
||||||
catch (bipc::bad_alloc& ba)
|
|
||||||
{
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(50));
|
|
||||||
if (CheckCurrentState(RUNNING))
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// // ShmChunk container ID
|
|
||||||
// string chunkID = "c" + to_string(numSentMsgs);
|
|
||||||
// // shared pointer ID
|
|
||||||
// string ownerID = "o" + to_string(numSentMsgs);
|
|
||||||
|
|
||||||
// ShPtrOwner* owner = nullptr;
|
|
||||||
|
|
||||||
// try
|
|
||||||
// {
|
|
||||||
// owner = SegmentManager::Instance().Segment()->construct<ShPtrOwner>(ownerID.c_str())(
|
|
||||||
// make_managed_shared_ptr(SegmentManager::Instance().Segment()->construct<ShmChunk>(chunkID.c_str())(fMsgSize),
|
|
||||||
// *(SegmentManager::Instance().Segment())));
|
|
||||||
// }
|
|
||||||
// catch (bipc::bad_alloc& ba)
|
|
||||||
// {
|
|
||||||
// LOG(warn) << "Shared memory full...";
|
|
||||||
// this_thread::sleep_for(chrono::milliseconds(100));
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// void* ptr = owner->fPtr->GetData();
|
|
||||||
|
|
||||||
// write something to memory, otherwise only (incomplete) allocation will be measured
|
|
||||||
// memset(ptr, 0, fMsgSize);
|
|
||||||
|
|
||||||
// static_cast<char*>(ptr)[3] = charnum++;
|
|
||||||
// if (charnum == 123)
|
|
||||||
// {
|
|
||||||
// charnum = 97;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// LOG(debug) << "chunk handle: " << owner->fPtr->GetHandle();
|
|
||||||
// LOG(debug) << "chunk size: " << owner->fPtr->GetSize();
|
|
||||||
// LOG(debug) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count();
|
|
||||||
|
|
||||||
// char* cptr = static_cast<char*>(ptr);
|
|
||||||
// LOG(debug) << "check: " << cptr[3];
|
|
||||||
|
|
||||||
// FairMQMessagePtr msg(NewSimpleMessage(ownerID));
|
|
||||||
|
|
||||||
if (ptr)
|
|
||||||
{
|
|
||||||
handle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr);
|
|
||||||
FairMQMessagePtr msg(NewMessage(sizeof(ExMetaHeader)));
|
|
||||||
ExMetaHeader* metaPtr = new(msg->GetData()) ExMetaHeader();
|
|
||||||
metaPtr->fSize = fMsgSize;
|
|
||||||
metaPtr->fHandle = handle;
|
|
||||||
|
|
||||||
// LOG(info) << metaPtr->fSize;
|
|
||||||
// LOG(info) << metaPtr->fHandle;
|
|
||||||
// LOG(warn) << ptr;
|
|
||||||
|
|
||||||
if (Send(msg, "meta", 0) > 0)
|
|
||||||
{
|
|
||||||
fBytesOutNew += fMsgSize;
|
|
||||||
++fMsgOutNew;
|
|
||||||
++numSentMsgs;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
SegmentManager::Instance().Segment()->deallocate(ptr);
|
|
||||||
// SegmentManager::Instance().Segment()->destroy_ptr(owner);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// --fMsgCounter;
|
|
||||||
// while (fMsgCounter == 0)
|
|
||||||
// {
|
|
||||||
// this_thread::sleep_for(chrono::milliseconds(1));
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG(info) << "Sent " << numSentMsgs << " messages, leaving RUNNING state.";
|
|
||||||
|
|
||||||
rateLogger.join();
|
|
||||||
// resetMsgCounter.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQShmPrototypeSampler::Log(const int intervalInMs)
|
|
||||||
{
|
|
||||||
chrono::time_point<chrono::high_resolution_clock> t0 = chrono::high_resolution_clock::now();
|
|
||||||
chrono::time_point<chrono::high_resolution_clock> t1;
|
|
||||||
unsigned long long msSinceLastLog;
|
|
||||||
|
|
||||||
double mbPerSecOut = 0;
|
|
||||||
double msgPerSecOut = 0;
|
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING))
|
|
||||||
{
|
|
||||||
t1 = chrono::high_resolution_clock::now();
|
|
||||||
|
|
||||||
msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
|
|
||||||
|
|
||||||
mbPerSecOut = (static_cast<double>(fBytesOutNew - fBytesOut) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
|
|
||||||
fBytesOut = fBytesOutNew;
|
|
||||||
|
|
||||||
msgPerSecOut = static_cast<double>(fMsgOutNew - fMsgOut) / static_cast<double>(msSinceLastLog) * 1000.;
|
|
||||||
fMsgOut = fMsgOutNew;
|
|
||||||
|
|
||||||
LOG(debug) << fixed
|
|
||||||
<< setprecision(0) << "out: " << msgPerSecOut << " msg ("
|
|
||||||
<< setprecision(2) << mbPerSecOut << " MB)\t("
|
|
||||||
<< SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)";
|
|
||||||
|
|
||||||
t0 = t1;
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(intervalInMs));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQShmPrototypeSampler::ResetMsgCounter()
|
|
||||||
{
|
|
||||||
while (CheckCurrentState(RUNNING))
|
|
||||||
{
|
|
||||||
fMsgCounter = fMsgRate / 100;
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(10));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,45 +0,0 @@
|
||||||
/********************************************************************************
|
|
||||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
||||||
* *
|
|
||||||
* This software is distributed under the terms of the *
|
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
||||||
* copied verbatim in the file "LICENSE" *
|
|
||||||
********************************************************************************/
|
|
||||||
/**
|
|
||||||
* FairMQShmPrototypeSampler.h
|
|
||||||
*
|
|
||||||
* @since 2016-04-08
|
|
||||||
* @author A. Rybalchenko
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef FAIRMQSHMPROTOTYPESAMPLER_H_
|
|
||||||
#define FAIRMQSHMPROTOTYPESAMPLER_H_
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
|
|
||||||
#include "FairMQDevice.h"
|
|
||||||
|
|
||||||
class FairMQShmPrototypeSampler : public FairMQDevice
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
FairMQShmPrototypeSampler();
|
|
||||||
virtual ~FairMQShmPrototypeSampler();
|
|
||||||
|
|
||||||
void Log(const int intervalInMs);
|
|
||||||
void ResetMsgCounter();
|
|
||||||
|
|
||||||
protected:
|
|
||||||
unsigned int fMsgSize;
|
|
||||||
unsigned int fMsgCounter;
|
|
||||||
unsigned int fMsgRate;
|
|
||||||
|
|
||||||
unsigned long long fBytesOut;
|
|
||||||
unsigned long long fMsgOut;
|
|
||||||
std::atomic<unsigned long long> fBytesOutNew;
|
|
||||||
std::atomic<unsigned long long> fMsgOutNew;
|
|
||||||
|
|
||||||
virtual void Init();
|
|
||||||
virtual void Run();
|
|
||||||
};
|
|
||||||
|
|
||||||
#endif /* FAIRMQSHMPROTOTYPESAMPLER_H_ */
|
|
|
@ -1,145 +0,0 @@
|
||||||
/********************************************************************************
|
|
||||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
||||||
* *
|
|
||||||
* This software is distributed under the terms of the *
|
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
||||||
* copied verbatim in the file "LICENSE" *
|
|
||||||
********************************************************************************/
|
|
||||||
/**
|
|
||||||
* FairMQShmPrototypeSink.cxx
|
|
||||||
*
|
|
||||||
* @since 2016-04-08
|
|
||||||
* @author A. Rybalchenko
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <thread>
|
|
||||||
#include <chrono>
|
|
||||||
#include <iomanip>
|
|
||||||
|
|
||||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
|
||||||
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
|
|
||||||
|
|
||||||
#include "FairMQShmPrototypeSink.h"
|
|
||||||
#include "FairMQProgOptions.h"
|
|
||||||
#include "FairMQLogger.h"
|
|
||||||
|
|
||||||
#include "ShmChunk.h"
|
|
||||||
|
|
||||||
using namespace std;
|
|
||||||
using namespace boost::interprocess;
|
|
||||||
|
|
||||||
FairMQShmPrototypeSink::FairMQShmPrototypeSink()
|
|
||||||
: fBytesIn(0)
|
|
||||||
, fMsgIn(0)
|
|
||||||
, fBytesInNew(0)
|
|
||||||
, fMsgInNew(0)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQShmPrototypeSink::~FairMQShmPrototypeSink()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQShmPrototypeSink::Init()
|
|
||||||
{
|
|
||||||
SegmentManager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemoryPrototype", 2000000000);
|
|
||||||
LOG(info) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are "
|
|
||||||
<< SegmentManager::Instance().Segment()->get_free_memory() << " bytes.";
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQShmPrototypeSink::Run()
|
|
||||||
{
|
|
||||||
static uint64_t numReceivedMsgs = 0;
|
|
||||||
|
|
||||||
thread rateLogger(&FairMQShmPrototypeSink::Log, this, 1000);
|
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING))
|
|
||||||
{
|
|
||||||
FairMQMessagePtr msg(NewMessage());
|
|
||||||
|
|
||||||
if (Receive(msg, "meta") > 0)
|
|
||||||
{
|
|
||||||
ExMetaHeader* hdr = static_cast<ExMetaHeader*>(msg->GetData());
|
|
||||||
size_t size = hdr->fSize;
|
|
||||||
bipc::managed_shared_memory::handle_t handle = hdr->fHandle;
|
|
||||||
void* ptr = SegmentManager::Instance().Segment()->get_address_from_handle(handle);
|
|
||||||
|
|
||||||
// LOG(info) << size;
|
|
||||||
// LOG(info) << handle;
|
|
||||||
// LOG(warn) << ptr;
|
|
||||||
|
|
||||||
fBytesInNew += size;
|
|
||||||
++fMsgInNew;
|
|
||||||
SegmentManager::Instance().Segment()->deallocate(ptr);
|
|
||||||
|
|
||||||
// get the shared pointer ID from the received message
|
|
||||||
// string ownerID(static_cast<char*>(msg->GetData()), msg->GetSize());
|
|
||||||
|
|
||||||
// find the shared pointer in shared memory with its ID
|
|
||||||
// ShPtrOwner* owner = SegmentManager::Instance().Segment()->find<ShPtrOwner>(ownerID.c_str()).first;
|
|
||||||
// LOG(debug) << "owner (" << ownerID << ") use count: " << owner->fPtr.use_count();
|
|
||||||
|
|
||||||
|
|
||||||
// if (owner)
|
|
||||||
// {
|
|
||||||
// // void* ptr = owner->fPtr->GetData();
|
|
||||||
|
|
||||||
// // LOG(debug) << "chunk handle: " << owner->fPtr->GetHandle();
|
|
||||||
// // LOG(debug) << "chunk size: " << owner->fPtr->GetSize();
|
|
||||||
|
|
||||||
// fBytesInNew += owner->fPtr->GetSize();
|
|
||||||
// ++fMsgInNew;
|
|
||||||
|
|
||||||
// // char* cptr = static_cast<char*>(ptr);
|
|
||||||
// // LOG(debug) << "check: " << cptr[3];
|
|
||||||
|
|
||||||
// SegmentManager::Instance().Segment()->deallocate(ptr);
|
|
||||||
|
|
||||||
// // SegmentManager::Instance().Segment()->destroy_ptr(owner);
|
|
||||||
// }
|
|
||||||
// else
|
|
||||||
// {
|
|
||||||
// LOG(warn) << "Shared pointer is zero.";
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
++numReceivedMsgs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG(info) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state.";
|
|
||||||
|
|
||||||
rateLogger.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQShmPrototypeSink::Log(const int intervalInMs)
|
|
||||||
{
|
|
||||||
chrono::time_point<chrono::high_resolution_clock> t0 = chrono::high_resolution_clock::now();
|
|
||||||
chrono::time_point<chrono::high_resolution_clock> t1;
|
|
||||||
unsigned long long msSinceLastLog;
|
|
||||||
|
|
||||||
double mbPerSecIn = 0;
|
|
||||||
double msgPerSecIn = 0;
|
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING))
|
|
||||||
{
|
|
||||||
t1 = chrono::high_resolution_clock::now();
|
|
||||||
|
|
||||||
msSinceLastLog = chrono::duration_cast<chrono::milliseconds>(t1 - t0).count();
|
|
||||||
|
|
||||||
mbPerSecIn = (static_cast<double>(fBytesInNew - fBytesIn) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
|
|
||||||
fBytesIn = fBytesInNew;
|
|
||||||
|
|
||||||
msgPerSecIn = static_cast<double>(fMsgInNew - fMsgIn) / static_cast<double>(msSinceLastLog) * 1000.;
|
|
||||||
fMsgIn = fMsgInNew;
|
|
||||||
|
|
||||||
LOG(debug) << fixed
|
|
||||||
<< setprecision(0) << "in: " << msgPerSecIn << " msg ("
|
|
||||||
<< setprecision(2) << mbPerSecIn << " MB)\t("
|
|
||||||
<< SegmentManager::Instance().Segment()->get_free_memory() / (1024. * 1024.) << " MB free)";
|
|
||||||
|
|
||||||
t0 = t1;
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(intervalInMs));
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,40 +0,0 @@
|
||||||
/********************************************************************************
|
|
||||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
||||||
* *
|
|
||||||
* This software is distributed under the terms of the *
|
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
||||||
* copied verbatim in the file "LICENSE" *
|
|
||||||
********************************************************************************/
|
|
||||||
/**
|
|
||||||
* FairMQShmPrototypeSink.h
|
|
||||||
*
|
|
||||||
* @since 2016-04-08
|
|
||||||
* @author A. Rybalchenko
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef FAIRMQSHMPROTOTYPESINK_H_
|
|
||||||
#define FAIRMQSHMPROTOTYPESINK_H_
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
|
|
||||||
#include "FairMQDevice.h"
|
|
||||||
|
|
||||||
class FairMQShmPrototypeSink : public FairMQDevice
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
FairMQShmPrototypeSink();
|
|
||||||
virtual ~FairMQShmPrototypeSink();
|
|
||||||
|
|
||||||
void Log(const int intervalInMs);
|
|
||||||
|
|
||||||
protected:
|
|
||||||
unsigned long long fBytesIn;
|
|
||||||
unsigned long long fMsgIn;
|
|
||||||
std::atomic<unsigned long long> fBytesInNew;
|
|
||||||
std::atomic<unsigned long long> fMsgInNew;
|
|
||||||
|
|
||||||
virtual void Init();
|
|
||||||
virtual void Run();
|
|
||||||
};
|
|
||||||
|
|
||||||
#endif /* FAIRMQSHMPROTOTYPESINK_H_ */
|
|
|
@ -1,182 +0,0 @@
|
||||||
/********************************************************************************
|
|
||||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
||||||
* *
|
|
||||||
* This software is distributed under the terms of the *
|
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
||||||
* copied verbatim in the file "LICENSE" *
|
|
||||||
********************************************************************************/
|
|
||||||
/**
|
|
||||||
* ShmChunk.h
|
|
||||||
*
|
|
||||||
* @since 2016-04-08
|
|
||||||
* @author A. Rybalchenko
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef SHMCHUNK_H_
|
|
||||||
#define SHMCHUNK_H_
|
|
||||||
|
|
||||||
#include <thread>
|
|
||||||
#include <chrono>
|
|
||||||
|
|
||||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
|
||||||
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
|
|
||||||
|
|
||||||
#include "FairMQLogger.h"
|
|
||||||
|
|
||||||
namespace bipc = boost::interprocess;
|
|
||||||
|
|
||||||
class SegmentManager
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
static SegmentManager& Instance()
|
|
||||||
{
|
|
||||||
static SegmentManager man;
|
|
||||||
return man;
|
|
||||||
}
|
|
||||||
|
|
||||||
void InitializeSegment(const std::string& op, const std::string& name, const size_t size = 0)
|
|
||||||
{
|
|
||||||
if (!fSegment)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
if (op == "open_or_create")
|
|
||||||
{
|
|
||||||
fSegment = new bipc::managed_shared_memory(bipc::open_or_create, name.c_str(), size);
|
|
||||||
}
|
|
||||||
else if (op == "create_only")
|
|
||||||
{
|
|
||||||
fSegment = new bipc::managed_shared_memory(bipc::create_only, name.c_str(), size);
|
|
||||||
}
|
|
||||||
else if (op == "open_only")
|
|
||||||
{
|
|
||||||
int numTries = 0;
|
|
||||||
bool success = false;
|
|
||||||
|
|
||||||
do
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
fSegment = new bipc::managed_shared_memory(bipc::open_only, name.c_str());
|
|
||||||
success = true;
|
|
||||||
}
|
|
||||||
catch (bipc::interprocess_exception& ie)
|
|
||||||
{
|
|
||||||
if (++numTries == 5)
|
|
||||||
{
|
|
||||||
LOG(error) << "Could not open shared memory after " << numTries << " attempts, exiting!";
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(debug) << "Could not open shared memory segment on try " << numTries << ". Retrying in 1 second...";
|
|
||||||
LOG(debug) << ie.what();
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
while (!success);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(error) << "Unknown operation when initializing shared memory segment: " << op;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (std::exception& e)
|
|
||||||
{
|
|
||||||
LOG(error) << "Exception during shared memory segment initialization: " << e.what() << ", application will now exit";
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(info) << "Segment already initialized";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bipc::managed_shared_memory* Segment() const
|
|
||||||
{
|
|
||||||
if (fSegment)
|
|
||||||
{
|
|
||||||
return fSegment;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(error) << "Segment not initialized";
|
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
SegmentManager()
|
|
||||||
: fSegment(nullptr)
|
|
||||||
{}
|
|
||||||
|
|
||||||
bipc::managed_shared_memory* fSegment;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct alignas(16) ExMetaHeader
|
|
||||||
{
|
|
||||||
uint64_t fSize;
|
|
||||||
bipc::managed_shared_memory::handle_t fHandle;
|
|
||||||
};
|
|
||||||
|
|
||||||
// class ShmChunk
|
|
||||||
// {
|
|
||||||
// public:
|
|
||||||
// ShmChunk()
|
|
||||||
// : fHandle()
|
|
||||||
// , fSize(0)
|
|
||||||
// {
|
|
||||||
// }
|
|
||||||
|
|
||||||
// ShmChunk(const size_t size)
|
|
||||||
// : fHandle()
|
|
||||||
// , fSize(size)
|
|
||||||
// {
|
|
||||||
// void* ptr = SegmentManager::Instance().Segment()->allocate(size);
|
|
||||||
// fHandle = SegmentManager::Instance().Segment()->get_handle_from_address(ptr);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// ~ShmChunk()
|
|
||||||
// {
|
|
||||||
// SegmentManager::Instance().Segment()->deallocate(SegmentManager::Instance().Segment()->get_address_from_handle(fHandle));
|
|
||||||
// }
|
|
||||||
|
|
||||||
// bipc::managed_shared_memory::handle_t GetHandle() const
|
|
||||||
// {
|
|
||||||
// return fHandle;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// void* GetData() const
|
|
||||||
// {
|
|
||||||
// return SegmentManager::Instance().Segment()->get_address_from_handle(fHandle);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// size_t GetSize() const
|
|
||||||
// {
|
|
||||||
// return fSize;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// private:
|
|
||||||
// bipc::managed_shared_memory::handle_t fHandle;
|
|
||||||
// size_t fSize;
|
|
||||||
// };
|
|
||||||
|
|
||||||
// typedef bipc::managed_shared_ptr<ShmChunk, bipc::managed_shared_memory>::type ShPtrType;
|
|
||||||
|
|
||||||
// struct ShPtrOwner
|
|
||||||
// {
|
|
||||||
// ShPtrOwner(const ShPtrType& other)
|
|
||||||
// : fPtr(other)
|
|
||||||
// {}
|
|
||||||
|
|
||||||
// ShPtrOwner(const ShPtrOwner& other)
|
|
||||||
// : fPtr(other.fPtr)
|
|
||||||
// {}
|
|
||||||
|
|
||||||
// ShPtrType fPtr;
|
|
||||||
// };
|
|
||||||
|
|
||||||
#endif /* SHMCHUNK_H_ */
|
|
|
@ -1,24 +0,0 @@
|
||||||
/********************************************************************************
|
|
||||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
||||||
* *
|
|
||||||
* This software is distributed under the terms of the *
|
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
||||||
* copied verbatim in the file "LICENSE" *
|
|
||||||
********************************************************************************/
|
|
||||||
|
|
||||||
#include "runFairMQDevice.h"
|
|
||||||
#include "FairMQShmPrototypeSampler.h"
|
|
||||||
|
|
||||||
namespace bpo = boost::program_options;
|
|
||||||
|
|
||||||
void addCustomOptions(bpo::options_description& options)
|
|
||||||
{
|
|
||||||
options.add_options()
|
|
||||||
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
|
||||||
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
|
||||||
{
|
|
||||||
return new FairMQShmPrototypeSampler();
|
|
||||||
}
|
|
|
@ -1,21 +0,0 @@
|
||||||
/********************************************************************************
|
|
||||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
||||||
* *
|
|
||||||
* This software is distributed under the terms of the *
|
|
||||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
||||||
* copied verbatim in the file "LICENSE" *
|
|
||||||
********************************************************************************/
|
|
||||||
|
|
||||||
#include "runFairMQDevice.h"
|
|
||||||
#include "FairMQShmPrototypeSink.h"
|
|
||||||
|
|
||||||
namespace bpo = boost::program_options;
|
|
||||||
|
|
||||||
void addCustomOptions(bpo::options_description& /*options*/)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
|
||||||
{
|
|
||||||
return new FairMQShmPrototypeSink();
|
|
||||||
}
|
|
|
@ -1,34 +0,0 @@
|
||||||
{
|
|
||||||
"fairMQOptions": {
|
|
||||||
"devices": [
|
|
||||||
{
|
|
||||||
"id": "sampler1",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"name": "meta",
|
|
||||||
"type": "push",
|
|
||||||
"method": "bind",
|
|
||||||
"address": "tcp://127.0.0.1:5555",
|
|
||||||
"sndBufSize": 10,
|
|
||||||
"rcvBufSize": 10,
|
|
||||||
"rateLogging": 0
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"id": "sink1",
|
|
||||||
"channels": [
|
|
||||||
{
|
|
||||||
"name": "meta",
|
|
||||||
"type": "pull",
|
|
||||||
"method": "connect",
|
|
||||||
"address": "tcp://127.0.0.1:5555",
|
|
||||||
"sndBufSize": 10,
|
|
||||||
"rcvBufSize": 10,
|
|
||||||
"rateLogging": 0
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,49 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
|
|
||||||
msgSize="1000000"
|
|
||||||
transport="zeromq"
|
|
||||||
|
|
||||||
if [[ $1 =~ ^[0-9]+$ ]]; then
|
|
||||||
msgSize=$1
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Starting shared memory example with message size of $msgSize bytes."
|
|
||||||
echo ""
|
|
||||||
echo "Usage: startShmPrototype [message size=1000000]"
|
|
||||||
|
|
||||||
SAMPLER="shm-prototype-sampler"
|
|
||||||
SAMPLER+=" --id sampler1"
|
|
||||||
SAMPLER+=" --transport $transport"
|
|
||||||
# SAMPLER+=" --severity TRACE"
|
|
||||||
SAMPLER+=" --msg-size $msgSize"
|
|
||||||
# SAMPLER+=" --msg-rate 1000"
|
|
||||||
SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
|
||||||
xterm -geometry 80x32+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SAMPLER &
|
|
||||||
|
|
||||||
SINK1="shm-prototype-sink"
|
|
||||||
SINK1+=" --id sink1"
|
|
||||||
SINK1+=" --transport $transport"
|
|
||||||
# SINK1+=" --severity TRACE"
|
|
||||||
SINK1+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
|
||||||
xterm -geometry 80x32+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK1 &
|
|
||||||
|
|
||||||
# SINK2="shm-prototype-sink"
|
|
||||||
# SINK2+=" --id sink2"
|
|
||||||
# SINK2+=" --transport $transport"
|
|
||||||
# # SINK2+=" --severity TRACE"
|
|
||||||
# SINK2+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
|
||||||
# xterm -geometry 80x32+500+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK2 &
|
|
||||||
|
|
||||||
# SINK3="shm-prototype-sink"
|
|
||||||
# SINK3+=" --id sink3"
|
|
||||||
# SINK3+=" --transport $transport"
|
|
||||||
# # SINK3+=" --severity TRACE"
|
|
||||||
# SINK3+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
|
||||||
# xterm -geometry 80x32+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK3 &
|
|
||||||
|
|
||||||
# SINK4="shm-prototype-sink"
|
|
||||||
# SINK4+=" --id sink4"
|
|
||||||
# SINK4+=" --transport $transport"
|
|
||||||
# # SINK4+=" --severity TRACE"
|
|
||||||
# SINK4+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/shm-prototype.json"
|
|
||||||
# xterm -geometry 80x32+1000+500 -hold -e @CMAKE_BINARY_DIR@/bin/prototype/shmem/$SINK4 &
|
|
Loading…
Reference in New Issue
Block a user