mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
99c8d33191 | ||
|
660420e4f3 | ||
|
f8f997abe6 | ||
|
40f6db430a | ||
|
2ed2177555 | ||
|
9b326c7a71 | ||
|
9b4c5deb0b | ||
|
7b16c33ccd | ||
|
7e6eb382d5 | ||
|
35399ee039 | ||
|
2cc1117637 |
@@ -7,13 +7,12 @@
|
||||
################################################################################
|
||||
|
||||
cmake_minimum_required(VERSION 3.10 FATAL_ERROR)
|
||||
|
||||
cmake_policy(VERSION 3.10...3.14)
|
||||
|
||||
# Project ######################################################################
|
||||
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
|
||||
include(FairMQLib)
|
||||
|
||||
set_fairmq_cmake_policies()
|
||||
get_git_version()
|
||||
|
||||
project(FairMQ VERSION ${PROJECT_VERSION} LANGUAGES CXX)
|
||||
@@ -328,4 +327,26 @@ else()
|
||||
endif()
|
||||
message(STATUS " ${BWhite}docs${CR} ${docs_summary}")
|
||||
message(STATUS " ")
|
||||
if(RUN_STATIC_ANALYSIS)
|
||||
list(LENGTH PROJECT_STATIC_ANALYSERS size)
|
||||
unset(analyser_list)
|
||||
set(count 0)
|
||||
foreach(analyser IN LISTS PROJECT_STATIC_ANALYSERS)
|
||||
if(${analyser}_FOUND)
|
||||
set(${analyser}_status "${analyser} ${BGreen}YES${CR}")
|
||||
else()
|
||||
set(${analyser}_status "${analyser} ${BRed}NO${CR}")
|
||||
endif()
|
||||
math(EXPR count "${count} + 1")
|
||||
string(APPEND analyser_list "${${analyser}_status}")
|
||||
if(count LESS size)
|
||||
string(APPEND analyser_list "${BWhite},${CR} ")
|
||||
endif()
|
||||
endforeach()
|
||||
set(static_ana_summary "${BWhite}(${CR}${analyser_list}${BWhite})${CR} (disable with ${BMagenta}-DRUN_STATIC_ANALYSIS=OFF${CR})")
|
||||
else()
|
||||
set(static_ana_summary "${BRed}OFF${CR} (default, enable with ${BMagenta}-DRUN_STATIC_ANALYSIS=ON${CR})")
|
||||
endif()
|
||||
message(STATUS " ${Cyan}RUN STATIC ANALYSIS ${static_ana_summary}")
|
||||
message(STATUS " ")
|
||||
################################################################################
|
||||
|
@@ -29,26 +29,6 @@ if(NOT WIN32 AND NOT DISABLE_COLOR)
|
||||
set(BWhite "${Esc}[1;37m")
|
||||
endif()
|
||||
|
||||
# set_fairmq_cmake_policies()
|
||||
#
|
||||
# Sets CMake policies.
|
||||
macro(set_fairmq_cmake_policies)
|
||||
# Find more details to each policy with cmake --help-policy CMPXXXX
|
||||
foreach(policy
|
||||
CMP0025 # Compiler id for Apple Clang is now AppleClang.
|
||||
CMP0028 # Double colon in target name means ALIAS or IMPORTED target.
|
||||
CMP0042 # MACOSX_RPATH is enabled by default.
|
||||
CMP0048 # The ``project()`` command manages VERSION variables.
|
||||
CMP0054 # Only interpret ``if()`` arguments as variables or keywords when unquoted.
|
||||
CMP0074 # ``find_package()`` uses ``<PackageName>_ROOT`` variables.
|
||||
)
|
||||
if(POLICY ${policy})
|
||||
cmake_policy(SET ${policy} NEW)
|
||||
endif()
|
||||
endforeach()
|
||||
endmacro()
|
||||
|
||||
|
||||
find_package(Git)
|
||||
# get_git_version([DEFAULT_VERSION version] [DEFAULT_DATE date] [OUTVAR_PREFIX prefix])
|
||||
#
|
||||
@@ -186,6 +166,34 @@ macro(set_fairmq_defaults)
|
||||
else()
|
||||
set(PROJECT_VERSION_HOTFIX ${PROJECT_VERSION_TWEAK})
|
||||
endif()
|
||||
|
||||
if(NOT DEFINED RUN_STATIC_ANALYSIS)
|
||||
set(RUN_STATIC_ANALYSIS OFF)
|
||||
endif()
|
||||
|
||||
unset(PROJECT_STATIC_ANALYSERS)
|
||||
if(RUN_STATIC_ANALYSIS)
|
||||
set(analyser "clang-tidy")
|
||||
find_program(${analyser}_FOUND "${analyser}")
|
||||
if(${analyser}_FOUND)
|
||||
set(CMAKE_CXX_CLANG_TIDY "${${analyser}_FOUND}" "-color")
|
||||
endif()
|
||||
list(APPEND PROJECT_STATIC_ANALYSERS "${analyser}")
|
||||
|
||||
set(analyser "iwyu")
|
||||
find_program(${analyser}_FOUND "${analyser}")
|
||||
if(${analyser}_FOUND)
|
||||
set(CMAKE_CXX_IWYU "${${analyser}_FOUND}")
|
||||
endif()
|
||||
list(APPEND PROJECT_STATIC_ANALYSERS "${analyser}")
|
||||
|
||||
set(analyser "cpplint")
|
||||
find_program(${analyser}_FOUND "${analyser}")
|
||||
if(${analyser}_FOUND)
|
||||
set(CMAKE_CXX_CPPLINT "${${analyser}_FOUND}")
|
||||
endif()
|
||||
list(APPEND PROJECT_STATIC_ANALYSERS "${analyser}")
|
||||
endif()
|
||||
endmacro()
|
||||
|
||||
function(join VALUES GLUE OUTPUT)
|
||||
|
@@ -24,7 +24,7 @@ FairMQ devices communicate via the communication patterns offered by ZeroMQ (or
|
||||
|
||||
Each FairMQ device has an internal state machine:
|
||||
|
||||

|
||||

|
||||
|
||||
The state machine can be querried and controlled via `GetCurrentStateName()` and `ChangeState("<state name>")` methods. Only legal state transitions are allowed (see image above). Illegal transitions will fail with an error.
|
||||
|
||||
|
Binary file not shown.
Before Width: | Height: | Size: 175 KiB |
1
docs/images/device_states.svg
Normal file
1
docs/images/device_states.svg
Normal file
File diff suppressed because one or more lines are too long
After Width: | Height: | Size: 170 KiB |
@@ -5,36 +5,42 @@
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIRMQEXAMPLEREGIONBUILDER_H
|
||||
#define FAIRMQEXAMPLEREGIONBUILDER_H
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
namespace example_region
|
||||
#include <string>
|
||||
|
||||
namespace example_readout
|
||||
{
|
||||
|
||||
class Builder : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
Builder() {
|
||||
OnData("data1", &Builder::HandleData);
|
||||
}
|
||||
virtual ~Builder() {}
|
||||
Builder()
|
||||
: fOutputChannelName()
|
||||
{}
|
||||
|
||||
void Init() override
|
||||
{
|
||||
fOutputChannelName = fConfig->GetValue<std::string>("output-name");
|
||||
OnData("rb", &Builder::HandleData);
|
||||
}
|
||||
|
||||
protected:
|
||||
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
|
||||
{
|
||||
if (Send(msg, "data2") < 0) {
|
||||
if (Send(msg, fOutputChannelName) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string fOutputChannelName;
|
||||
};
|
||||
|
||||
} // namespace example_region
|
||||
} // namespace example_readout
|
||||
|
||||
#endif /* FAIRMQEXAMPLEREGIONBUILDER_H */
|
||||
|
@@ -6,37 +6,35 @@
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
add_library(ExampleReadoutLib STATIC
|
||||
"Sampler.cxx"
|
||||
"Sampler.h"
|
||||
"Builder.h"
|
||||
"Sink.cxx"
|
||||
"Sink.h"
|
||||
)
|
||||
|
||||
target_link_libraries(ExampleReadoutLib PUBLIC FairMQ)
|
||||
|
||||
add_executable(fairmq-ex-readout-sampler runSampler.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-sampler PRIVATE ExampleReadoutLib)
|
||||
add_executable(fairmq-ex-readout-readout runReadout.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-readout PRIVATE FairMQ)
|
||||
|
||||
add_executable(fairmq-ex-readout-builder runBuilder.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib)
|
||||
target_link_libraries(fairmq-ex-readout-builder PRIVATE FairMQ)
|
||||
|
||||
add_executable(fairmq-ex-readout-sink runSink.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib)
|
||||
add_executable(fairmq-ex-readout-processor runProcessor.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-processor PRIVATE FairMQ)
|
||||
|
||||
add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink)
|
||||
add_executable(fairmq-ex-readout-sender runSender.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-sender PRIVATE FairMQ)
|
||||
|
||||
add_executable(fairmq-ex-readout-receiver runReceiver.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-receiver PRIVATE FairMQ)
|
||||
|
||||
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
|
||||
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh)
|
||||
|
||||
# install
|
||||
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-readout-sampler
|
||||
fairmq-ex-readout-sink
|
||||
fairmq-ex-readout-readout
|
||||
fairmq-ex-readout-builder
|
||||
fairmq-ex-readout-processor
|
||||
fairmq-ex-readout-sender
|
||||
fairmq-ex-readout-receiver
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
RUNTIME DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
@@ -46,9 +44,16 @@ install(
|
||||
set(EX_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR})
|
||||
set(FAIRMQ_BIN_DIR ${CMAKE_INSTALL_PREFIX}/${PROJECT_INSTALL_BINDIR}/fairmq)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout-processing.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-readout.sh
|
||||
)
|
||||
|
||||
install(
|
||||
PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-readout-processing.sh_install
|
||||
DESTINATION ${PROJECT_INSTALL_BINDIR}
|
||||
RENAME fairmq-start-ex-readout-processing.sh
|
||||
)
|
||||
|
@@ -5,38 +5,33 @@
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* Sink.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLEREGIONSINK_H
|
||||
#define FAIRMQEXAMPLEREGIONSINK_H
|
||||
|
||||
#include <string>
|
||||
#ifndef FAIRMQEXAMPLEREGIONPROCESSOR_H
|
||||
#define FAIRMQEXAMPLEREGIONPROCESSOR_H
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
namespace example_region
|
||||
namespace example_readout
|
||||
{
|
||||
|
||||
class Sink : public FairMQDevice
|
||||
class Processor : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
Sink();
|
||||
virtual ~Sink();
|
||||
Processor() {
|
||||
OnData("bp", &Processor::HandleData);
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void Run();
|
||||
virtual void InitTask();
|
||||
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
|
||||
{
|
||||
FairMQMessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize()));
|
||||
if (Send(msg2, "ps") < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace example_region
|
||||
} // namespace example_readout
|
||||
|
||||
#endif /* FAIRMQEXAMPLEREGIONSINK_H */
|
||||
#endif /* FAIRMQEXAMPLEREGIONPROCESSOR_H */
|
@@ -1,5 +1,27 @@
|
||||
Region example
|
||||
==============
|
||||
# Readout example
|
||||
|
||||
This example demonstrates the use of a more advanced feature - UnmanagedRegion, that can be used to create a buffer through one of FairMQ transports. The contents of this buffer are managed by the user, who can also create messages out of sub-buffers of the created buffer. Such feature can be interesting in environments that have special requirements by the hardware that writes the data, to keep the transfer efficient (e.g. shared memory).
|
||||
This examples shows two possible topologies (out of many) for a node connected to a detector readout (followed by a processing node).
|
||||
|
||||
## Setup without new data generation
|
||||
|
||||
```
|
||||
|------------------------------- Readout Node ---------------------------| |- Processing Node -|
|
||||
| Readout --> Builder --> Sender | --> | Receiver |
|
||||
| [# shared memory segment (unused in this topology) ##################] | ofi | |
|
||||
| [# shmem unmanaged region (readout writes here, others read) ########] | | |
|
||||
|------------------------------------------------------------------------| |-------------------|
|
||||
```
|
||||
|
||||
The devices one the Readout Node communicate via shared memory transport. Readout device writes into shared memory unmanaged region. The data is then forwarded through Builder to Sender process, which sends it out via OFI transport.
|
||||
|
||||
## Setup with generating new data on the Readout node
|
||||
|
||||
```
|
||||
|------------------------------- Readout Node ---------------------------| |- Processing Node -|
|
||||
| Readout --> Builder --> Processor --> Sender | --> | Receiver |
|
||||
| [# shared memory segment (used between Proccessor and Sender) #######] | ofi | |
|
||||
| [# shmem unmanaged region (readout writes here, builder & proc read) ] | | |
|
||||
|------------------------------------------------------------------------| |-------------------|
|
||||
```
|
||||
|
||||
In this topology one more device is added - Processor. It examines the arriving data and creates new data in shared memory. This data is not part of the unmanaged region, but lives in the general shared memory segment (unused in the previous setup). This new data is then forwarded to Sender and the Readout device is notified that the corresponding data piece in the unmanaged region is no longer used.
|
||||
|
91
examples/readout/Readout.h
Normal file
91
examples/readout/Readout.h
Normal file
@@ -0,0 +1,91 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#ifndef FAIRMQEXAMPLEREADOUTREADOUT_H
|
||||
#define FAIRMQEXAMPLEREADOUTREADOUT_H
|
||||
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
namespace example_readout
|
||||
{
|
||||
|
||||
class Readout : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
Readout()
|
||||
: fMsgSize(10000)
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
, fRegion(nullptr)
|
||||
, fNumUnackedMsgs(0)
|
||||
{}
|
||||
|
||||
protected:
|
||||
void InitTask() override
|
||||
{
|
||||
fMsgSize = fConfig->GetValue<int>("msg-size");
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("rb",
|
||||
0,
|
||||
10000000,
|
||||
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
|
||||
--fNumUnackedMsgs;
|
||||
if (fMaxIterations > 0) {
|
||||
LOG(debug) << "Received ack";
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
bool ConditionalRun() override
|
||||
{
|
||||
FairMQMessagePtr msg(NewMessageFor("rb", // channel
|
||||
0, // sub-channel
|
||||
fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
|
||||
if (Send(msg, "rb", 0) > 0) {
|
||||
++fNumUnackedMsgs;
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
void ResetTask() override
|
||||
{
|
||||
// if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead.
|
||||
if (fNumUnackedMsgs != 0) {
|
||||
LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")";
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;
|
||||
}
|
||||
fRegion.reset();
|
||||
}
|
||||
|
||||
private:
|
||||
int fMsgSize;
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
FairMQUnmanagedRegionPtr fRegion;
|
||||
std::atomic<uint64_t> fNumUnackedMsgs;
|
||||
};
|
||||
|
||||
} // namespace example_readout
|
||||
|
||||
#endif /* FAIRMQEXAMPLEREADOUTREADOUT_H */
|
54
examples/readout/Receiver.h
Normal file
54
examples/readout/Receiver.h
Normal file
@@ -0,0 +1,54 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
#ifndef FAIRMQEXAMPLEREGIONRECEIVER_H
|
||||
#define FAIRMQEXAMPLEREGIONRECEIVER_H
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
namespace example_readout
|
||||
{
|
||||
|
||||
class Receiver : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
Receiver()
|
||||
: fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
{}
|
||||
|
||||
protected:
|
||||
void InitTask() override
|
||||
{
|
||||
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
}
|
||||
|
||||
void Run() override
|
||||
{
|
||||
FairMQChannel& dataInChannel = fChannels.at("sr").at(0);
|
||||
|
||||
while (!NewStatePending()) {
|
||||
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
|
||||
dataInChannel.Receive(msg);
|
||||
// void* ptr = msg->GetData();
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
};
|
||||
|
||||
} // namespace example_readout
|
||||
|
||||
#endif /* FAIRMQEXAMPLEREGIONRECEIVER_H */
|
@@ -1,91 +0,0 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* Sampler.cpp
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include "Sampler.h"
|
||||
|
||||
#include <thread>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace example_region
|
||||
{
|
||||
|
||||
Sampler::Sampler()
|
||||
: fMsgSize(10000)
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
, fRegion(nullptr)
|
||||
, fNumUnackedMsgs(0)
|
||||
{
|
||||
}
|
||||
|
||||
void Sampler::InitTask()
|
||||
{
|
||||
fMsgSize = fConfig->GetValue<int>("msg-size");
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1",
|
||||
0,
|
||||
10000000,
|
||||
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
|
||||
--fNumUnackedMsgs;
|
||||
if (fMaxIterations > 0)
|
||||
{
|
||||
LOG(debug) << "Received ack";
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
bool Sampler::ConditionalRun()
|
||||
{
|
||||
FairMQMessagePtr msg(NewMessageFor("data1", // channel
|
||||
0, // sub-channel
|
||||
fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
|
||||
if (Send(msg, "data1", 0) > 0)
|
||||
{
|
||||
++fNumUnackedMsgs;
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||
{
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Sampler::ResetTask()
|
||||
{
|
||||
// if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead.
|
||||
if (fNumUnackedMsgs != 0)
|
||||
{
|
||||
LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")";
|
||||
this_thread::sleep_for(chrono::milliseconds(500));
|
||||
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;
|
||||
}
|
||||
fRegion.reset();
|
||||
}
|
||||
|
||||
Sampler::~Sampler()
|
||||
{
|
||||
}
|
||||
|
||||
} // namespace example_region
|
@@ -5,42 +5,42 @@
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* Sampler.h
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQEXAMPLEREGIONSAMPLER_H
|
||||
#define FAIRMQEXAMPLEREGIONSAMPLER_H
|
||||
|
||||
#include <atomic>
|
||||
#ifndef FAIRMQEXAMPLEREGIONSENDER_H
|
||||
#define FAIRMQEXAMPLEREGIONSENDER_H
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
namespace example_region
|
||||
#include <string>
|
||||
|
||||
namespace example_readout
|
||||
{
|
||||
|
||||
class Sampler : public FairMQDevice
|
||||
class Sender : public FairMQDevice
|
||||
{
|
||||
public:
|
||||
Sampler();
|
||||
virtual ~Sampler();
|
||||
Sender()
|
||||
: fInputChannelName()
|
||||
{}
|
||||
|
||||
protected:
|
||||
virtual void InitTask();
|
||||
virtual bool ConditionalRun();
|
||||
virtual void ResetTask();
|
||||
void Init() override
|
||||
{
|
||||
fInputChannelName = fConfig->GetValue<std::string>("input-name");
|
||||
OnData(fInputChannelName, &Sender::HandleData);
|
||||
}
|
||||
|
||||
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
|
||||
{
|
||||
if (Send(msg, "sr") < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
int fMsgSize;
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
FairMQUnmanagedRegionPtr fRegion;
|
||||
std::atomic<uint64_t> fNumUnackedMsgs;
|
||||
std::string fInputChannelName;
|
||||
};
|
||||
|
||||
} // namespace example_region
|
||||
} // namespace example_readout
|
||||
|
||||
#endif /* FAIRMQEXAMPLEREGIONSAMPLER_H */
|
||||
#endif /* FAIRMQEXAMPLEREGIONSENDER_H */
|
@@ -1,56 +0,0 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* Sink.cxx
|
||||
*
|
||||
* @since 2014-10-10
|
||||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include "Sink.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace example_region
|
||||
{
|
||||
|
||||
Sink::Sink()
|
||||
: fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
{
|
||||
}
|
||||
|
||||
void Sink::InitTask()
|
||||
{
|
||||
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
}
|
||||
|
||||
void Sink::Run()
|
||||
{
|
||||
FairMQChannel& dataInChannel = fChannels.at("data").at(0);
|
||||
|
||||
while (!NewStatePending())
|
||||
{
|
||||
FairMQMessagePtr msg(dataInChannel.Transport()->CreateMessage());
|
||||
dataInChannel.Receive(msg);
|
||||
// void* ptr = msg->GetData();
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||
{
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Sink::~Sink()
|
||||
{
|
||||
}
|
||||
|
||||
} // namespace example_region
|
40
examples/readout/fairmq-start-ex-readout-processing.sh.in
Executable file
40
examples/readout/fairmq-start-ex-readout-processing.sh.in
Executable file
@@ -0,0 +1,40 @@
|
||||
#!/bin/bash
|
||||
|
||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||
|
||||
msgSize="1000000"
|
||||
|
||||
if [[ $1 =~ ^[0-9]+$ ]]; then
|
||||
msgSize=$1
|
||||
fi
|
||||
|
||||
READOUT="fairmq-ex-readout-readout"
|
||||
READOUT+=" --id readout1"
|
||||
READOUT+=" --msg-size $msgSize"
|
||||
READOUT+=" --channel-config name=rb,type=pair,method=bind,address=tcp://localhost:7777,transport=shmem"
|
||||
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$READOUT &
|
||||
|
||||
BUILDER="fairmq-ex-readout-builder"
|
||||
BUILDER+=" --id builder1"
|
||||
BUILDER+=" --output-name bp"
|
||||
BUILDER+=" --channel-config name=rb,type=pair,method=connect,address=tcp://localhost:7777,transport=shmem"
|
||||
BUILDER+=" name=bp,type=pair,method=connect,address=tcp://localhost:7778,transport=shmem"
|
||||
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
|
||||
|
||||
PROCESSOR="fairmq-ex-readout-processor"
|
||||
PROCESSOR+=" --id processor1"
|
||||
PROCESSOR+=" --channel-config name=bp,type=pair,method=bind,address=tcp://localhost:7778,transport=shmem"
|
||||
PROCESSOR+=" name=ps,type=pair,method=connect,address=tcp://localhost:7779,transport=shmem"
|
||||
xterm -geometry 80x23+750+500 -hold -e @EX_BIN_DIR@/$PROCESSOR &
|
||||
|
||||
SENDER="fairmq-ex-readout-sender"
|
||||
SENDER+=" --id sender1"
|
||||
SENDER+=" --input-name ps"
|
||||
SENDER+=" --channel-config name=ps,type=pair,method=bind,address=tcp://localhost:7779,transport=shmem"
|
||||
SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7780,transport=ofi"
|
||||
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER &
|
||||
|
||||
RECEIVER="fairmq-ex-readout-receiver"
|
||||
RECEIVER+=" --id receiver1"
|
||||
RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7780,transport=ofi"
|
||||
xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER &
|
@@ -8,23 +8,27 @@ if [[ $1 =~ ^[0-9]+$ ]]; then
|
||||
msgSize=$1
|
||||
fi
|
||||
|
||||
SAMPLER="fairmq-ex-readout-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --severity debug"
|
||||
SAMPLER+=" --msg-size $msgSize"
|
||||
# SAMPLER+=" --rate 10"
|
||||
SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem"
|
||||
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||
READOUT="fairmq-ex-readout-readout"
|
||||
READOUT+=" --id readout1"
|
||||
READOUT+=" --msg-size $msgSize"
|
||||
READOUT+=" --channel-config name=rb,type=pair,method=bind,address=tcp://localhost:7777,transport=shmem"
|
||||
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$READOUT &
|
||||
|
||||
BUILDER="fairmq-ex-readout-builder"
|
||||
BUILDER+=" --id builder1"
|
||||
BUILDER+=" --severity debug"
|
||||
BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem"
|
||||
BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi"
|
||||
BUILDER+=" --output-name bs"
|
||||
BUILDER+=" --channel-config name=rb,type=pair,method=connect,address=tcp://localhost:7777,transport=shmem"
|
||||
BUILDER+=" name=bs,type=pair,method=connect,address=tcp://localhost:7778,transport=shmem"
|
||||
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
|
||||
|
||||
SINK="fairmq-ex-readout-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --severity debug"
|
||||
SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi"
|
||||
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK &
|
||||
SENDER="fairmq-ex-readout-sender"
|
||||
SENDER+=" --id sender1"
|
||||
SENDER+=" --input-name bs"
|
||||
SENDER+=" --channel-config name=bs,type=pair,method=bind,address=tcp://localhost:7778,transport=shmem"
|
||||
SENDER+=" name=sr,type=pair,method=connect,address=tcp://localhost:7779,transport=ofi"
|
||||
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SENDER &
|
||||
|
||||
RECEIVER="fairmq-ex-readout-receiver"
|
||||
RECEIVER+=" --id receiver1"
|
||||
RECEIVER+=" --channel-config name=sr,type=pair,method=bind,address=tcp://localhost:7779,transport=ofi"
|
||||
xterm -geometry 80x23+1500+0 -hold -e @EX_BIN_DIR@/$RECEIVER &
|
||||
|
@@ -11,10 +11,13 @@
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
void addCustomOptions(bpo::options_description& /* options */)
|
||||
{}
|
||||
void addCustomOptions(bpo::options_description& options)
|
||||
{
|
||||
options.add_options()
|
||||
("output-name", bpo::value<std::string>()->default_value("bs"), "Output channel name");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||
{
|
||||
return new example_region::Builder();
|
||||
return new example_readout::Builder();
|
||||
}
|
||||
|
20
examples/readout/runProcessor.cxx
Normal file
20
examples/readout/runProcessor.cxx
Normal file
@@ -0,0 +1,20 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "runFairMQDevice.h"
|
||||
#include "Processor.h"
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
void addCustomOptions(bpo::options_description& /* options */)
|
||||
{}
|
||||
|
||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||
{
|
||||
return new example_readout::Processor();
|
||||
}
|
@@ -7,7 +7,7 @@
|
||||
********************************************************************************/
|
||||
|
||||
#include "runFairMQDevice.h"
|
||||
#include "Sampler.h"
|
||||
#include "Readout.h"
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -20,5 +20,5 @@ void addCustomOptions(bpo::options_description& options)
|
||||
|
||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||
{
|
||||
return new example_region::Sampler();
|
||||
return new example_readout::Readout();
|
||||
}
|
@@ -7,7 +7,7 @@
|
||||
********************************************************************************/
|
||||
|
||||
#include "runFairMQDevice.h"
|
||||
#include "Sink.h"
|
||||
#include "Receiver.h"
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -19,5 +19,5 @@ void addCustomOptions(bpo::options_description& options)
|
||||
|
||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||
{
|
||||
return new example_region::Sink();
|
||||
return new example_readout::Receiver();
|
||||
}
|
23
examples/readout/runSender.cxx
Normal file
23
examples/readout/runSender.cxx
Normal file
@@ -0,0 +1,23 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "runFairMQDevice.h"
|
||||
#include "Sender.h"
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
void addCustomOptions(bpo::options_description& options)
|
||||
{
|
||||
options.add_options()
|
||||
("input-name", bpo::value<std::string>()->default_value("bs"), "Input channel name");
|
||||
}
|
||||
|
||||
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
|
||||
{
|
||||
return new example_readout::Sender();
|
||||
}
|
@@ -26,33 +26,34 @@
|
||||
#include <algorithm> // std::max
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq;
|
||||
|
||||
static map<fair::mq::Transition, fair::mq::State> backwardsCompatibilityWaitForEndOfStateHelper =
|
||||
static map<Transition, State> backwardsCompatibilityWaitForEndOfStateHelper =
|
||||
{
|
||||
{ fair::mq::Transition::InitDevice, fair::mq::State::InitializingDevice },
|
||||
{ fair::mq::Transition::CompleteInit, fair::mq::State::Initialized },
|
||||
{ fair::mq::Transition::Bind, fair::mq::State::Bound },
|
||||
{ fair::mq::Transition::Connect, fair::mq::State::DeviceReady },
|
||||
{ fair::mq::Transition::InitTask, fair::mq::State::Ready },
|
||||
{ fair::mq::Transition::Run, fair::mq::State::Ready },
|
||||
{ fair::mq::Transition::Stop, fair::mq::State::Ready },
|
||||
{ fair::mq::Transition::ResetTask, fair::mq::State::DeviceReady },
|
||||
{ fair::mq::Transition::ResetDevice, fair::mq::State::Idle }
|
||||
{ Transition::InitDevice, State::InitializingDevice },
|
||||
{ Transition::CompleteInit, State::Initialized },
|
||||
{ Transition::Bind, State::Bound },
|
||||
{ Transition::Connect, State::DeviceReady },
|
||||
{ Transition::InitTask, State::Ready },
|
||||
{ Transition::Run, State::Ready },
|
||||
{ Transition::Stop, State::Ready },
|
||||
{ Transition::ResetTask, State::DeviceReady },
|
||||
{ Transition::ResetDevice, State::Idle }
|
||||
};
|
||||
|
||||
static map<int, fair::mq::Transition> backwardsCompatibilityChangeStateHelper =
|
||||
static map<int, Transition> backwardsCompatibilityChangeStateHelper =
|
||||
{
|
||||
{ FairMQDevice::Event::INIT_DEVICE, fair::mq::Transition::InitDevice },
|
||||
{ FairMQDevice::Event::internal_DEVICE_READY, fair::mq::Transition::Auto },
|
||||
{ FairMQDevice::Event::INIT_TASK, fair::mq::Transition::InitTask },
|
||||
{ FairMQDevice::Event::internal_READY, fair::mq::Transition::Auto },
|
||||
{ FairMQDevice::Event::RUN, fair::mq::Transition::Run },
|
||||
{ FairMQDevice::Event::STOP, fair::mq::Transition::Stop },
|
||||
{ FairMQDevice::Event::RESET_TASK, fair::mq::Transition::ResetTask },
|
||||
{ FairMQDevice::Event::RESET_DEVICE, fair::mq::Transition::ResetDevice },
|
||||
{ FairMQDevice::Event::internal_IDLE, fair::mq::Transition::Auto },
|
||||
{ FairMQDevice::Event::END, fair::mq::Transition::End },
|
||||
{ FairMQDevice::Event::ERROR_FOUND, fair::mq::Transition::ErrorFound }
|
||||
{ FairMQDevice::Event::INIT_DEVICE, Transition::InitDevice },
|
||||
{ FairMQDevice::Event::internal_DEVICE_READY, Transition::Auto },
|
||||
{ FairMQDevice::Event::INIT_TASK, Transition::InitTask },
|
||||
{ FairMQDevice::Event::internal_READY, Transition::Auto },
|
||||
{ FairMQDevice::Event::RUN, Transition::Run },
|
||||
{ FairMQDevice::Event::STOP, Transition::Stop },
|
||||
{ FairMQDevice::Event::RESET_TASK, Transition::ResetTask },
|
||||
{ FairMQDevice::Event::RESET_DEVICE, Transition::ResetDevice },
|
||||
{ FairMQDevice::Event::internal_IDLE, Transition::Auto },
|
||||
{ FairMQDevice::Event::END, Transition::End },
|
||||
{ FairMQDevice::Event::ERROR_FOUND, Transition::ErrorFound }
|
||||
};
|
||||
|
||||
FairMQDevice::FairMQDevice()
|
||||
@@ -65,21 +66,21 @@ FairMQDevice::FairMQDevice(FairMQProgOptions& config)
|
||||
{
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
|
||||
FairMQDevice::FairMQDevice(const tools::Version version)
|
||||
: FairMQDevice(nullptr, version)
|
||||
{
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice(FairMQProgOptions& config, const fair::mq::tools::Version version)
|
||||
FairMQDevice::FairMQDevice(FairMQProgOptions& config, const tools::Version version)
|
||||
: FairMQDevice(&config, version)
|
||||
{
|
||||
}
|
||||
|
||||
FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Version version)
|
||||
FairMQDevice::FairMQDevice(FairMQProgOptions* config, const tools::Version version)
|
||||
: fTransportFactory(nullptr)
|
||||
, fTransports()
|
||||
, fChannels()
|
||||
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
|
||||
, fInternalConfig(config ? nullptr : tools::make_unique<FairMQProgOptions>())
|
||||
, fConfig(config ? config : fInternalConfig.get())
|
||||
, fId()
|
||||
, fDefaultTransportType(fair::mq::Transport::ZMQ)
|
||||
@@ -99,11 +100,11 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
|
||||
, fMaxRunRuntimeInS(0)
|
||||
, fRawCmdLineArgs()
|
||||
{
|
||||
SubscribeToNewTransition("device", [&](fair::mq::Transition transition) {
|
||||
SubscribeToNewTransition("device", [&](Transition transition) {
|
||||
LOG(trace) << "device notified on new transition: " << transition;
|
||||
|
||||
switch (transition) {
|
||||
case fair::mq::Transition::Stop:
|
||||
case Transition::Stop:
|
||||
UnblockTransports();
|
||||
break;
|
||||
default:
|
||||
@@ -118,7 +119,7 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
|
||||
lock_guard<mutex> lock(fStatesMtx);
|
||||
fStates.push(state);
|
||||
}
|
||||
fStatesCV.notify_one();
|
||||
fStatesCV.notify_all();
|
||||
|
||||
switch (state) {
|
||||
case fair::mq::State::InitializingDevice:
|
||||
@@ -182,7 +183,7 @@ bool FairMQDevice::ChangeState(const int transition)
|
||||
return ChangeState(backwardsCompatibilityChangeStateHelper.at(transition));
|
||||
}
|
||||
|
||||
void FairMQDevice::WaitForEndOfState(fair::mq::Transition transition)
|
||||
void FairMQDevice::WaitForEndOfState(Transition transition)
|
||||
{
|
||||
WaitForState(backwardsCompatibilityWaitForEndOfStateHelper.at(transition));
|
||||
}
|
||||
@@ -226,7 +227,7 @@ void FairMQDevice::InitWrapper()
|
||||
int subChannelIndex = 0;
|
||||
for (auto& vi : mi.second) {
|
||||
// set channel name: name + vector index
|
||||
vi.fName = fair::mq::tools::ToString(mi.first, "[", subChannelIndex, "]");
|
||||
vi.fName = tools::ToString(mi.first, "[", subChannelIndex, "]");
|
||||
|
||||
// set channel transport
|
||||
LOG(debug) << "Initializing transport for channel " << vi.fName << ": " << fair::mq::TransportNames.at(vi.fTransportType);
|
||||
@@ -237,9 +238,9 @@ void FairMQDevice::InitWrapper()
|
||||
if (vi.fAddress == "unspecified" || vi.fAddress == "") {
|
||||
// if the configured network interface is default, get its name from the default route
|
||||
if (networkInterface == "default") {
|
||||
networkInterface = fair::mq::tools::getDefaultRouteNetworkInterface();
|
||||
networkInterface = tools::getDefaultRouteNetworkInterface();
|
||||
}
|
||||
vi.fAddress = "tcp://" + fair::mq::tools::getInterfaceIP(networkInterface) + ":1";
|
||||
vi.fAddress = "tcp://" + tools::getInterfaceIP(networkInterface) + ":1";
|
||||
}
|
||||
// fill the uninitialized list
|
||||
fUninitializedBindingChannels.push_back(&vi);
|
||||
@@ -251,14 +252,14 @@ void FairMQDevice::InitWrapper()
|
||||
fUninitializedConnectingChannels.push_back(&vi);
|
||||
} else {
|
||||
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi.fName << "' not specified.";
|
||||
throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified."));
|
||||
throw runtime_error(tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi.fName, " not specified."));
|
||||
}
|
||||
|
||||
subChannelIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
// ChangeState(fair::mq::Transition::Auto);
|
||||
// ChangeState(Transition::Auto);
|
||||
}
|
||||
|
||||
void FairMQDevice::BindWrapper()
|
||||
@@ -269,12 +270,12 @@ void FairMQDevice::BindWrapper()
|
||||
|
||||
if (!fUninitializedBindingChannels.empty()) {
|
||||
LOG(error) << fUninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
|
||||
throw runtime_error(fair::mq::tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
|
||||
throw runtime_error(tools::ToString(fUninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
|
||||
}
|
||||
|
||||
Bind();
|
||||
|
||||
ChangeState(fair::mq::Transition::Auto);
|
||||
ChangeState(Transition::Auto);
|
||||
}
|
||||
|
||||
void FairMQDevice::ConnectWrapper()
|
||||
@@ -301,7 +302,7 @@ void FairMQDevice::ConnectWrapper()
|
||||
|
||||
if (numAttempts++ > maxAttempts) {
|
||||
LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts";
|
||||
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
|
||||
throw runtime_error(tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
|
||||
}
|
||||
|
||||
AttachChannels(fUninitializedConnectingChannels);
|
||||
@@ -313,7 +314,7 @@ void FairMQDevice::ConnectWrapper()
|
||||
|
||||
Connect();
|
||||
|
||||
ChangeState(fair::mq::Transition::Auto);
|
||||
ChangeState(Transition::Auto);
|
||||
}
|
||||
|
||||
void FairMQDevice::AttachChannels(vector<FairMQChannel*>& chans)
|
||||
@@ -366,7 +367,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& chan)
|
||||
string hostPart = addressString.substr(0, pos);
|
||||
if (!(bind && hostPart == "*")) {
|
||||
string portPart = addressString.substr(pos + 1);
|
||||
string resolvedHost = fair::mq::tools::getIpFromHostname(hostPart);
|
||||
string resolvedHost = tools::getIpFromHostname(hostPart);
|
||||
if (resolvedHost == "") {
|
||||
return false;
|
||||
}
|
||||
@@ -414,7 +415,7 @@ void FairMQDevice::InitTaskWrapper()
|
||||
{
|
||||
InitTask();
|
||||
|
||||
ChangeState(fair::mq::Transition::Auto);
|
||||
ChangeState(Transition::Auto);
|
||||
}
|
||||
|
||||
bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs)
|
||||
@@ -433,7 +434,7 @@ void FairMQDevice::SortChannel(const string& name, const bool reindex)
|
||||
for (auto vi = fChannels.at(name).begin(); vi != fChannels.at(name).end(); ++vi)
|
||||
{
|
||||
// set channel name: name + vector index
|
||||
vi->fName = fair::mq::tools::ToString(name, "[", vi - fChannels.at(name).begin(), "]");
|
||||
vi->fName = tools::ToString(name, "[", vi - fChannels.at(name).begin(), "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -467,7 +468,7 @@ void FairMQDevice::RunWrapper()
|
||||
HandleMultipleChannelInput();
|
||||
}
|
||||
} else {
|
||||
fair::mq::tools::RateLimiter rateLimiter(fRate);
|
||||
tools::RateLimiter rateLimiter(fRate);
|
||||
|
||||
while (!NewStatePending() && ConditionalRun()) {
|
||||
if (fRate > 0.001) {
|
||||
@@ -481,7 +482,7 @@ void FairMQDevice::RunWrapper()
|
||||
// if Run() exited and the state is still RUNNING, transition to READY.
|
||||
if (!NewStatePending()) {
|
||||
UnblockTransports();
|
||||
ChangeState(fair::mq::Transition::Stop);
|
||||
ChangeState(Transition::Stop);
|
||||
}
|
||||
|
||||
PostRun();
|
||||
@@ -489,10 +490,10 @@ void FairMQDevice::RunWrapper()
|
||||
} catch (const out_of_range& oor) {
|
||||
LOG(error) << "out of range: " << oor.what();
|
||||
LOG(error) << "incorrect/incomplete channel configuration?";
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
ChangeState(Transition::ErrorFound);
|
||||
throw;
|
||||
} catch (...) {
|
||||
ChangeState(fair::mq::Transition::ErrorFound);
|
||||
ChangeState(Transition::ErrorFound);
|
||||
throw;
|
||||
}
|
||||
|
||||
@@ -664,7 +665,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const
|
||||
catch (exception& e)
|
||||
{
|
||||
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
|
||||
throw runtime_error(fair::mq::tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
|
||||
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -738,7 +739,7 @@ void FairMQDevice::LogSocketRates()
|
||||
filteredSockets.push_back(vi->fSocket.get());
|
||||
logIntervals.push_back(vi->fRateLogging);
|
||||
intervalCounters.push_back(0);
|
||||
filteredChannelNames.push_back(fair::mq::tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"));
|
||||
filteredChannelNames.push_back(tools::ToString(mi.first, "[", vi - (mi.second).begin(), "]"));
|
||||
chanNameLen = max(chanNameLen, filteredChannelNames.back().length());
|
||||
}
|
||||
}
|
||||
@@ -814,7 +815,7 @@ void FairMQDevice::LogSocketRates()
|
||||
|
||||
t0 = t1;
|
||||
if (fMaxRunRuntimeInS > 0 && ++secondsElapsed >= fMaxRunRuntimeInS) {
|
||||
ChangeState(fair::mq::Transition::Stop);
|
||||
ChangeState(Transition::Stop);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -830,7 +831,7 @@ void FairMQDevice::ResetTaskWrapper()
|
||||
{
|
||||
ResetTask();
|
||||
|
||||
ChangeState(fair::mq::Transition::Auto);
|
||||
ChangeState(Transition::Auto);
|
||||
}
|
||||
|
||||
void FairMQDevice::ResetWrapper()
|
||||
@@ -850,7 +851,7 @@ void FairMQDevice::ResetWrapper()
|
||||
|
||||
Reset();
|
||||
|
||||
ChangeState(fair::mq::Transition::Auto);
|
||||
ChangeState(Transition::Auto);
|
||||
}
|
||||
|
||||
FairMQDevice::~FairMQDevice()
|
||||
|
@@ -366,8 +366,9 @@ class FairMQDevice
|
||||
try {
|
||||
return fChannels.at(channelName).at(index);
|
||||
} catch (const std::out_of_range& oor) {
|
||||
LOG(error) << "out of range: " << oor.what();
|
||||
LOG(error) << "requested channel has not been configured? check channel names/configuration.";
|
||||
LOG(error) << "channel: " << channelName << ", index: " << index;
|
||||
LOG(error) << "out of range: " << oor.what();
|
||||
throw;
|
||||
}
|
||||
|
||||
|
@@ -196,6 +196,7 @@ class PluginServices
|
||||
|| (currentState == DeviceState::Binding)
|
||||
|| (currentState == DeviceState::Bound)
|
||||
|| (currentState == DeviceState::Connecting)
|
||||
|| (currentState == DeviceState::Ready)
|
||||
|| (currentState == DeviceState::Idle && key == "channel-config")) {
|
||||
fConfig.SetValue(key, val);
|
||||
} else {
|
||||
|
@@ -203,9 +203,10 @@ try {
|
||||
break;
|
||||
case 'i':
|
||||
cout << "\n --> [i] init device\n\n" << flush;
|
||||
ChangeDeviceState(DeviceStateTransition::InitDevice);
|
||||
while (WaitForNextState() != DeviceState::InitializingDevice) {}
|
||||
ChangeDeviceState(DeviceStateTransition::CompleteInit);
|
||||
if (ChangeDeviceState(DeviceStateTransition::InitDevice)) {
|
||||
while (WaitForNextState() != DeviceState::InitializingDevice) {}
|
||||
ChangeDeviceState(DeviceStateTransition::CompleteInit);
|
||||
}
|
||||
break;
|
||||
case 'b':
|
||||
cout << "\n --> [b] bind\n\n" << flush;
|
||||
@@ -324,13 +325,13 @@ void Control::PrintStateMachineColor()
|
||||
<< " ║ \033[01;33mINITIALIZING DEVICE\033[0m ║ ║ \033[01;33mRESETTING DEVICE\033[0m ║ \n"
|
||||
<< " ╚══════════╦══════════╝ ╚════════ ▲ ═════════╝ \n"
|
||||
<< " ┌───────── ▼ ─────────┐ │ ┌────────────────────────────┐ \n"
|
||||
<< " │ \033[01;36mINITIALIZED\033[0m │ │ │ Legend: │ \n"
|
||||
<< " │ \033[01;36mINITIALIZED\033[0m [\033[01;32md\033[0m]──────────┤ │ Legend: │ \n"
|
||||
<< " └─────────[\033[01;32mb\033[0m]─────────┘ │ │----------------------------│ \n"
|
||||
<< " ╔═════════ ▼ ═════════╗ │ │ [\033[01;32mk\033[0m] keyboard shortcut for │ \n"
|
||||
<< " ║ \033[01;33mBINDING\033[0m ║ │ │ interactive controller │ \n"
|
||||
<< " ╚══════════╦══════════╝ │ │ ┌────────────────────────┐ │ \n"
|
||||
<< " ┌───────── ▼ ─────────┐ │ │ │ \033[01;36mIDLING STATE\033[0m │ │ \n"
|
||||
<< " │ \033[01;36mBOUND\033[0m │ │ │ └────────────────────────┘ │ \n"
|
||||
<< " │ \033[01;36mBOUND\033[0m [\033[01;32md\033[0m]──────────┤ │ └────────────────────────┘ │ \n"
|
||||
<< " └─────────[\033[01;32mx\033[0m]─────────┘ │ │ ╔════════════════════════╗ │ \n"
|
||||
<< " ╔═════════ ▼ ═════════╗ │ │ ║ \033[01;33mWORKING STATE\033[0m ║ │ \n"
|
||||
<< " ║ \033[01;33mCONNECTING\033[0m ║ │ │ ╚════════════════════════╝ │ \n"
|
||||
@@ -362,13 +363,13 @@ void Control::PrintStateMachine()
|
||||
<< " ║ INITIALIZING DEVICE ║ ║ RESETTING DEVICE ║ \n"
|
||||
<< " ╚══════════╦══════════╝ ╚════════ ▲ ═════════╝ \n"
|
||||
<< " ┌───────── ▼ ─────────┐ │ ┌────────────────────────────┐ \n"
|
||||
<< " │ INITIALIZED │ │ │ Legend: │ \n"
|
||||
<< " │ INITIALIZED [d]──────────┤ │ Legend: │ \n"
|
||||
<< " └─────────[b]─────────┘ │ │----------------------------│ \n"
|
||||
<< " ╔═════════ ▼ ═════════╗ │ │ [k] keyboard shortcut for │ \n"
|
||||
<< " ║ BINDING ║ │ │ interactive controller │ \n"
|
||||
<< " ╚══════════╦══════════╝ │ │ ┌────────────────────────┐ │ \n"
|
||||
<< " ┌───────── ▼ ─────────┐ │ │ │ IDLING STATE │ │ \n"
|
||||
<< " │ BOUND │ │ │ └────────────────────────┘ │ \n"
|
||||
<< " │ BOUND [d]──────────┤ │ └────────────────────────┘ │ \n"
|
||||
<< " └─────────[x]─────────┘ │ │ ╔════════════════════════╗ │ \n"
|
||||
<< " ╔═════════ ▼ ═════════╗ │ │ ║ WORKING STATE ║ │ \n"
|
||||
<< " ║ CONNECTING ║ │ │ ╚════════════════════════╝ │ \n"
|
||||
|
@@ -8,13 +8,13 @@
|
||||
|
||||
set(plugin FairMQPlugin_dds)
|
||||
add_library(${plugin} SHARED ${CMAKE_CURRENT_SOURCE_DIR}/DDS.cxx ${CMAKE_CURRENT_SOURCE_DIR}/DDS.h)
|
||||
target_link_libraries(${plugin} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib DDS::dds-user-defaults)
|
||||
target_link_libraries(${plugin} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib)
|
||||
target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
|
||||
set_target_properties(${plugin} PROPERTIES CXX_VISIBILITY_PRESET hidden)
|
||||
|
||||
set(exe fairmq-dds-command-ui)
|
||||
add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runDDSCommandUI.cxx)
|
||||
target_link_libraries(${exe} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib DDS::dds-user-defaults)
|
||||
target_link_libraries(${exe} FairMQ DDS::dds_intercom_lib DDS::dds_protocol_lib)
|
||||
target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
|
||||
|
||||
install(TARGETS ${plugin} ${exe}
|
||||
|
@@ -327,50 +327,19 @@ void FairMQMessageSHM::CloseMessage()
|
||||
}
|
||||
else
|
||||
{
|
||||
// send notification back to the receiver
|
||||
// RegionBlock block(fHandle, fSize);
|
||||
// if (fManager.GetRegionQueue(fRegionId).try_send(static_cast<void*>(&block), sizeof(RegionBlock), 0))
|
||||
// {
|
||||
// // LOG(info) << "true";
|
||||
// }
|
||||
// // else
|
||||
// // {
|
||||
// // LOG(debug) << "could not send ack";
|
||||
// // }
|
||||
|
||||
// timed version
|
||||
RegionBlock block(fHandle, fSize, fHint);
|
||||
bool success = false;
|
||||
do
|
||||
if (!fRegionPtr)
|
||||
{
|
||||
auto sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200);
|
||||
if (!fRegionPtr)
|
||||
{
|
||||
fRegionPtr = fManager.GetRemoteRegion(fRegionId);
|
||||
}
|
||||
if (fRegionPtr)
|
||||
{
|
||||
// LOG(debug) << "sending ack";
|
||||
if (fRegionPtr->fQueue->timed_send(&block, sizeof(RegionBlock), 0, sndTill))
|
||||
{
|
||||
success = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (fInterrupted)
|
||||
{
|
||||
break;
|
||||
}
|
||||
LOG(debug) << "region ack queue is full, retrying...";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack";
|
||||
success = true;
|
||||
}
|
||||
fRegionPtr = fManager.GetRemoteRegion(fRegionId);
|
||||
}
|
||||
|
||||
if (fRegionPtr)
|
||||
{
|
||||
fRegionPtr->ReleaseBlock({fHandle, fSize, fHint});
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack";
|
||||
}
|
||||
while (!success);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -19,7 +19,7 @@ namespace mq
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
std::unordered_map<uint64_t, Region> Manager::fRegions;
|
||||
std::unordered_map<uint64_t, std::unique_ptr<Region>> Manager::fRegions;
|
||||
|
||||
Manager::Manager(const string& name, size_t size)
|
||||
: fSessionName(name)
|
||||
@@ -43,7 +43,7 @@ void Manager::Resume()
|
||||
// close remote regions before processing new transfers
|
||||
for (auto it = fRegions.begin(); it != fRegions.end(); /**/)
|
||||
{
|
||||
if (it->second.fRemote)
|
||||
if (it->second->fRemote)
|
||||
{
|
||||
it = fRegions.erase(it);
|
||||
}
|
||||
@@ -64,11 +64,11 @@ bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id,
|
||||
}
|
||||
else
|
||||
{
|
||||
auto r = fRegions.emplace(id, Region{*this, id, size, false, callback});
|
||||
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, size, false, callback));
|
||||
|
||||
r.first->second.StartReceivingAcks();
|
||||
r.first->second->StartReceivingAcks();
|
||||
|
||||
return &(r.first->second.fRegion);
|
||||
return &(r.first->second->fRegion);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,14 +78,14 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
|
||||
auto it = fRegions.find(id);
|
||||
if (it != fRegions.end())
|
||||
{
|
||||
return &(it->second);
|
||||
return it->second.get();
|
||||
}
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
auto r = fRegions.emplace(id, Region{*this, id, 0, true, nullptr});
|
||||
return &(r.first->second);
|
||||
auto r = fRegions.emplace(id, fair::mq::tools::make_unique<Region>(*this, id, 0, true, nullptr));
|
||||
return r.first->second.get();
|
||||
}
|
||||
catch (bipc::interprocess_exception& e)
|
||||
{
|
||||
|
@@ -66,7 +66,7 @@ class Manager
|
||||
std::string fManagementSegmentName;
|
||||
boost::interprocess::managed_shared_memory fSegment;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
static std::unordered_map<uint64_t, Region> fRegions;
|
||||
static std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
|
||||
};
|
||||
|
||||
} // namespace shmem
|
||||
|
@@ -12,6 +12,8 @@
|
||||
|
||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||
|
||||
#include <chrono>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace bipc = ::boost::interprocess;
|
||||
@@ -32,7 +34,8 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ
|
||||
, fQueueName("fmq_" + fManager.fSessionName +"_rgq_" + to_string(id))
|
||||
, fShmemObject()
|
||||
, fQueue(nullptr)
|
||||
, fWorker()
|
||||
, fReceiveAcksWorker()
|
||||
, fSendAcksWorker()
|
||||
, fCallback(callback)
|
||||
{
|
||||
if (fRemote)
|
||||
@@ -49,52 +52,118 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, FairMQ
|
||||
LOG(debug) << "shmem: created region: " << fName;
|
||||
fShmemObject.truncate(size);
|
||||
|
||||
fQueue = fair::mq::tools::make_unique<bipc::message_queue>(bipc::create_only, fQueueName.c_str(), 10000, sizeof(RegionBlock));
|
||||
fQueue = fair::mq::tools::make_unique<bipc::message_queue>(bipc::create_only, fQueueName.c_str(), 1024, fAckBunchSize * sizeof(RegionBlock));
|
||||
LOG(debug) << "shmem: created region queue: " << fQueueName;
|
||||
}
|
||||
fRegion = bipc::mapped_region(fShmemObject, bipc::read_write); // TODO: add HUGEPAGES flag here
|
||||
// fRegion = bipc::mapped_region(fShmemObject, bipc::read_write, 0, 0, 0, MAP_ANONYMOUS | MAP_HUGETLB);
|
||||
|
||||
fSendAcksWorker = std::thread(&Region::SendAcks, this);
|
||||
}
|
||||
|
||||
void Region::StartReceivingAcks()
|
||||
{
|
||||
fWorker = std::thread(&Region::ReceiveAcks, this);
|
||||
fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this);
|
||||
}
|
||||
|
||||
void Region::ReceiveAcks()
|
||||
{
|
||||
unsigned int priority;
|
||||
bipc::message_queue::size_type recvdSize;
|
||||
std::unique_ptr<RegionBlock[]> blocks = fair::mq::tools::make_unique<RegionBlock[]>(fAckBunchSize);
|
||||
|
||||
while (!fStop) // end thread condition (should exist until region is destroyed)
|
||||
{
|
||||
auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(200);
|
||||
RegionBlock block;
|
||||
if (fQueue->timed_receive(&block, sizeof(RegionBlock), recvdSize, priority, rcvTill))
|
||||
auto rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(500);
|
||||
|
||||
while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill))
|
||||
{
|
||||
// LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId;
|
||||
if (fCallback)
|
||||
{
|
||||
fCallback(reinterpret_cast<char*>(fRegion.get_address()) + block.fHandle, block.fSize, reinterpret_cast<void*>(block.fHint));
|
||||
const auto numBlocks = recvdSize / sizeof(RegionBlock);
|
||||
for (size_t i = 0; i < numBlocks; i++)
|
||||
{
|
||||
fCallback(reinterpret_cast<char*>(fRegion.get_address()) + blocks[i].fHandle, blocks[i].fSize, reinterpret_cast<void*>(blocks[i].fHint));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// LOG(debug) << "queue " << fQueueName << " timeout!";
|
||||
}
|
||||
} // while !fStop
|
||||
|
||||
LOG(debug) << "worker for " << fName << " leaving.";
|
||||
LOG(debug) << "receive ack worker for " << fName << " leaving.";
|
||||
}
|
||||
|
||||
void Region::ReleaseBlock(const RegionBlock &block)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(fBlockLock);
|
||||
|
||||
fBlocksToFree.emplace_back(block);
|
||||
|
||||
if (fBlocksToFree.size() >= fAckBunchSize)
|
||||
{
|
||||
lock.unlock(); // reduces contention on fBlockLock
|
||||
fBlockSendCV.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void Region::SendAcks()
|
||||
{
|
||||
std::unique_ptr<RegionBlock[]> blocks = fair::mq::tools::make_unique<RegionBlock[]>(fAckBunchSize);
|
||||
|
||||
while (true) // we'll try to send all acks before stopping
|
||||
{
|
||||
size_t blocksToSend = 0;
|
||||
|
||||
{ // mutex locking block
|
||||
std::unique_lock<std::mutex> lock(fBlockLock);
|
||||
|
||||
// try to get more blocks without waiting (we can miss a notify from CloseMessage())
|
||||
if (!fStop && (fBlocksToFree.size() < fAckBunchSize))
|
||||
{
|
||||
// cv.wait() timeout: send whatever blocks we have
|
||||
fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
|
||||
}
|
||||
|
||||
blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
|
||||
|
||||
std::copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
|
||||
fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
|
||||
} // unlock the block mutex here while sending over IPC
|
||||
|
||||
if (blocksToSend > 0)
|
||||
{
|
||||
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop)
|
||||
{
|
||||
// receiver slow? yield and try again...
|
||||
this_thread::yield();
|
||||
}
|
||||
}
|
||||
else // blocksToSend == 0
|
||||
{
|
||||
if (fStop)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG(debug) << "send ack worker for " << fName << " leaving.";
|
||||
}
|
||||
|
||||
Region::~Region()
|
||||
{
|
||||
fStop = true;
|
||||
|
||||
if (fSendAcksWorker.joinable())
|
||||
{
|
||||
fSendAcksWorker.join();
|
||||
}
|
||||
|
||||
if (!fRemote)
|
||||
{
|
||||
fStop = true;
|
||||
if (fWorker.joinable())
|
||||
if (fReceiveAcksWorker.joinable())
|
||||
{
|
||||
fWorker.join();
|
||||
fReceiveAcksWorker.join();
|
||||
}
|
||||
|
||||
if (bipc::shared_memory_object::remove(fName.c_str()))
|
||||
|
@@ -19,11 +19,14 @@
|
||||
#include "FairMQUnmanagedRegion.h"
|
||||
|
||||
#include <fairmq/Tools.h>
|
||||
#include <fairmq/shmem/Common.h>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace fair
|
||||
@@ -47,6 +50,9 @@ struct Region
|
||||
void StartReceivingAcks();
|
||||
void ReceiveAcks();
|
||||
|
||||
void ReleaseBlock(const RegionBlock &);
|
||||
void SendAcks();
|
||||
|
||||
~Region();
|
||||
|
||||
Manager& fManager;
|
||||
@@ -56,8 +62,15 @@ struct Region
|
||||
std::string fQueueName;
|
||||
boost::interprocess::shared_memory_object fShmemObject;
|
||||
boost::interprocess::mapped_region fRegion;
|
||||
|
||||
std::mutex fBlockLock;
|
||||
std::condition_variable fBlockSendCV;
|
||||
std::vector<RegionBlock> fBlocksToFree;
|
||||
const std::size_t fAckBunchSize = 256;
|
||||
std::unique_ptr<boost::interprocess::message_queue> fQueue;
|
||||
std::thread fWorker;
|
||||
|
||||
std::thread fReceiveAcksWorker;
|
||||
std::thread fSendAcksWorker;
|
||||
FairMQRegionCallback fCallback;
|
||||
};
|
||||
|
||||
|
@@ -27,6 +27,13 @@ std::unique_ptr<T> make_unique(Args&& ...args)
|
||||
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
// make_unique implementation (array variant), until C++14 is default
|
||||
template<typename T>
|
||||
std::unique_ptr<T> make_unique(std::size_t size)
|
||||
{
|
||||
return std::unique_ptr<T>(new typename std::remove_extent<T>::type[size]());
|
||||
}
|
||||
|
||||
// provide an enum hasher to compensate std::hash not supporting enums in C++11
|
||||
template<typename Enum>
|
||||
struct HashEnum
|
||||
|
Reference in New Issue
Block a user