Compare commits

...

26 Commits

Author SHA1 Message Date
Alexey Rybalchenko
8cfc04721e Add unit tests for regions 2020-05-11 12:02:19 +02:00
Alexey Rybalchenko
e9318dd234 Add FairMQTransportFactory::GetId() 2020-05-11 12:02:19 +02:00
Alexey Rybalchenko
c8fc5ad33f Add bool FairMQTransportFactory::SubscribedToRegionEvents() 2020-05-11 12:02:19 +02:00
Alexey Rybalchenko
59e32437a2 shmem region subscriptions: fix race condition 2020-05-11 12:02:19 +02:00
Alexey Rybalchenko
a3afadb824 Call region event callback with local_only event for zmq 2020-05-11 12:02:19 +02:00
Alexey Rybalchenko
9992811822 Implement region events for zmq 2020-05-11 12:02:19 +02:00
Alexey Rybalchenko
4218c185a4 Shmem: Send acks also for local regions 2020-05-04 10:01:29 +02:00
Alexey Rybalchenko
5a49c5b9b1 Truncate the file used for the region mapping 2020-05-04 10:01:29 +02:00
Alexey Rybalchenko
960b612d80 Update docs 2020-04-28 14:47:26 +02:00
Alexey Rybalchenko
e1a113aabe Add region events subscriptions 2020-04-28 14:09:04 +02:00
Alexey Rybalchenko
5721ea9510 SDK: send heartbeats when subscribed to state changes 2020-04-10 18:40:14 +02:00
Alexey Rybalchenko
330687772f Add SubscriptionHeartbeat command 2020-04-10 18:40:14 +02:00
Alexey Rybalchenko
7cbd154344 PMIx plugin: Fix Commands API usage 2020-04-07 14:44:51 +02:00
Alexey Rybalchenko
036561ab38 SDK: track state change (un-)subscriptions 2020-04-07 14:44:51 +02:00
Alexey Rybalchenko
274ba5ec00 Commands: Add task id to subscription status cmds 2020-04-07 14:44:51 +02:00
Alexey Rybalchenko
c5efd3e4a6 SDK: minor refactoring of the command handling 2020-04-07 14:44:51 +02:00
Alexey Rybalchenko
0a5820c07f Fix Typo 2020-04-06 18:42:34 +02:00
Dennis Klein
5788daa410 Plugin manager: extent lifetime of DLLs 2020-04-06 18:42:34 +02:00
Alexey Rybalchenko
46014118f0 QC ex: rename qc devices, granular state control 2020-03-30 13:14:12 +02:00
Alexey Rybalchenko
adc4688f9b DDSCommandUI: include path argument in ChangeState 2020-03-30 13:14:12 +02:00
Alexey Rybalchenko
c3127f22e5 SDK: refactor subscription to allow reuse 2020-03-30 13:14:12 +02:00
Alexey Rybalchenko
926ee743ed DDS plugin: refactor for better readability 2020-03-25 09:53:22 +01:00
Alexey Rybalchenko
c7b1304a2c DDS plugin: Update property instead of set to avoid errors 2020-03-25 09:53:22 +01:00
Alexey Rybalchenko
32764e1b12 DDS plugin: refactor for better readability 2020-03-25 09:53:22 +01:00
Alexey Rybalchenko
96348b8462 DDS plugin: improve error message on unexpected update 2020-03-25 09:53:22 +01:00
Alexey Rybalchenko
cd83efadea DDS plugin: refactor to load DDS task id only once 2020-03-25 09:53:22 +01:00
57 changed files with 1278 additions and 521 deletions

View File

@@ -143,7 +143,7 @@ macro(set_fairmq_defaults)
set(PROJECT_EXPORT_SET ${PROJECT_NAME}Targets)
# Configure build types
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "Experimental" "AdressSan" "ThreadSan")
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "Experimental" "AddressSan" "ThreadSan")
set(_warnings "-Wshadow -Wall -Wextra -Wpedantic")
set(CMAKE_CXX_FLAGS_DEBUG "-g ${_warnings}")
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
@@ -151,7 +151,7 @@ macro(set_fairmq_defaults)
set(CMAKE_CXX_FLAGS_NIGHTLY "-O2 -g ${_warnings}")
set(CMAKE_CXX_FLAGS_PROFILE "-g3 ${_warnings} -fno-inline -ftest-coverage -fprofile-arcs")
set(CMAKE_CXX_FLAGS_EXPERIMENTAL "-O2 -g ${_warnings} -DNDEBUG")
set(CMAKE_CXX_FLAGS_ADRESSSAN "-O2 -g ${_warnings} -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_ADDRESSSAN "-O2 -g ${_warnings} -fsanitize=address -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_THREADSAN "-O2 -g ${_warnings} -fsanitize=thread")
unset(_warnings)

View File

@@ -40,7 +40,7 @@ A topology consisting of three layers of devices: synchronizer -> n * senders ->
## QC
A topology consisting of 4 devices - Sampler, QCProducer, QCConsumer and Sink. The data flows from Sampler through QCProducer to Sink. On demand - by setting the corresponding configuration property - the QCProducer device will duplicate the data to the QCConsumer device. The property is set by the topology controller, in this example this is the `fairmq-dds-command-ui` utility.
A topology consisting of 4 devices - Sampler, QCDispatcher, QCTask and Sink. The data flows from Sampler through QCDispatcher to Sink. On demand - by setting the corresponding configuration property - the QCDispatcher device will duplicate the data to the QCTask device. The property is set by the topology controller, in this example this is the `fairmq-dds-command-ui` utility.
## Readout
@@ -52,4 +52,4 @@ This example demonstrates the use of a more advanced feature - UnmanagedRegion,
## Request & Reply
This topology contains two devices that communicate with each other via the **REQ-REP** pettern. Bidirectional communication via a single socket.
This topology contains two devices that communicate with each other via the **REQ-REP** pettern. Bidirectional communication via a single socket.

View File

@@ -9,16 +9,16 @@
add_executable(fairmq-ex-qc-sampler runSampler.cxx)
target_link_libraries(fairmq-ex-qc-sampler PRIVATE FairMQ)
add_executable(fairmq-ex-qc-producer runQCProducer.cxx)
target_link_libraries(fairmq-ex-qc-producer PRIVATE FairMQ)
add_executable(fairmq-ex-qc-dispatcher runQCDispatcher.cxx)
target_link_libraries(fairmq-ex-qc-dispatcher PRIVATE FairMQ)
add_executable(fairmq-ex-qc-consumer runQCConsumer.cxx)
target_link_libraries(fairmq-ex-qc-consumer PRIVATE FairMQ)
add_executable(fairmq-ex-qc-task runQCTask.cxx)
target_link_libraries(fairmq-ex-qc-task PRIVATE FairMQ)
add_executable(fairmq-ex-qc-sink runSink.cxx)
target_link_libraries(fairmq-ex-qc-sink PRIVATE FairMQ)
add_custom_target(ExampleQC DEPENDS fairmq-ex-qc-sampler fairmq-ex-qc-producer fairmq-ex-qc-consumer fairmq-ex-qc-sink)
add_custom_target(ExampleQC DEPENDS fairmq-ex-qc-sampler fairmq-ex-qc-dispatcher fairmq-ex-qc-task fairmq-ex-qc-sink)
list(JOIN Boost_LIBRARY_DIRS ":" LIB_DIR)
set(BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_BINARY_DIR}/fairmq/plugins/DDS)
@@ -37,8 +37,8 @@ endif()
install(
TARGETS
fairmq-ex-qc-sampler
fairmq-ex-qc-producer
fairmq-ex-qc-consumer
fairmq-ex-qc-dispatcher
fairmq-ex-qc-task
fairmq-ex-qc-sink
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}

View File

@@ -1,4 +1,4 @@
QC
==
A topology consisting of 4 devices - Sampler, QCProducer, QCConsumer and Sink. The data flows from Sampler through QCProducer to Sink. On demand - by setting the corresponding configuration property - the QCProducer device will duplicate the data to the QCConsumer device. The property is set by the topology controller, in this example this is the `fairmq-dds-command-ui` utility.
A topology consisting of 4 devices - Sampler, QCDispatcher, QCTask and Sink. The data flows from Sampler through QCDispatcher to Sink. On demand - by setting the corresponding configuration property - the QCDispatcher device will duplicate the data to the QCTask device. The property is set by the topology controller, in this example this is the `fairmq-dds-command-ui` utility.

View File

@@ -12,8 +12,8 @@
</properties>
</decltask>
<decltask name="QCProducer">
<exe>fairmq-ex-qc-producer --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect name=qc,type=push,method=connect -P dds</exe>
<decltask name="QCDispatcher">
<exe>fairmq-ex-qc-dispatcher --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect name=qc,type=push,method=connect -P dds</exe>
<env reachable="false">fairmq-ex-qc-env.sh</env>
<properties>
<name access="read">fmqchan_data1</name>
@@ -22,8 +22,8 @@
</properties>
</decltask>
<decltask name="QCConsumer">
<exe>fairmq-ex-qc-consumer --color false --channel-config name=qc,type=pull,method=bind -P dds</exe>
<decltask name="QCTask">
<exe>fairmq-ex-qc-task --color false --channel-config name=qc,type=pull,method=bind -P dds</exe>
<env reachable="false">fairmq-ex-qc-env.sh</env>
<properties>
<name access="write">fmqchan_qc</name>
@@ -40,8 +40,8 @@
<main name="main">
<task>Sampler</task>
<task>QCProducer</task>
<task>QCConsumer</task>
<task>QCDispatcher</task>
<task>QCTask</task>
<task>Sink</task>
</main>

View File

@@ -51,12 +51,14 @@ fairmq-dds-command-ui -c k
fairmq-dds-command-ui -c b
fairmq-dds-command-ui -c x
fairmq-dds-command-ui -c j
fairmq-dds-command-ui -c r
qcconsumer="main/QCConsumer.*"
qcproducer="main/QCProducer.*"
fairmq-dds-command-ui -c p --property-key qc --property-value active -p $qcproducer
fairmq-dds-command-ui -w "RUNNING->READY" -p $qcconsumer
echo "...$qcconsumer received data and transitioned to READY, sending shutdown..."
allexceptqctasks="main/(Sampler|QCDispatcher|Sink)"
fairmq-dds-command-ui -c r -p $allexceptqctasks
qctask="main/QCTask.*"
qcdispatcher="main/QCDispatcher.*"
fairmq-dds-command-ui -c p --property-key qc --property-value active -p $qcdispatcher
fairmq-dds-command-ui -c r -p $qctask
fairmq-dds-command-ui -w "RUNNING->READY" -p $qctask
echo "...$qctask received data and transitioned to READY, sending shutdown..."
fairmq-dds-command-ui -c s
fairmq-dds-command-ui -c t
fairmq-dds-command-ui -c d

View File

@@ -9,13 +9,13 @@
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
class QCProducer : public FairMQDevice
class QCDispatcher : public FairMQDevice
{
public:
QCProducer()
QCDispatcher()
: fDoQC(false)
{
OnData("data1", &QCProducer::HandleData);
OnData("data1", &QCDispatcher::HandleData);
}
void InitTask() override
@@ -56,4 +56,4 @@ class QCProducer : public FairMQDevice
};
void addCustomOptions(boost::program_options::options_description& /*options*/) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCProducer(); }
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCDispatcher(); }

View File

@@ -9,10 +9,10 @@
#include "runFairMQDevice.h"
#include "FairMQDevice.h"
class QCConsumer : public FairMQDevice
class QCTask : public FairMQDevice
{
public:
QCConsumer()
QCTask()
{
OnData("qc", [](FairMQMessagePtr& /*msg*/, int) {
LOG(info) << "received data";
@@ -23,4 +23,4 @@ class QCConsumer : public FairMQDevice
namespace bpo = boost::program_options;
void addCustomOptions(bpo::options_description& /*options*/) {}
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCConsumer(); }
FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) { return new QCTask(); }

View File

@@ -35,6 +35,14 @@ void Sampler::InitTask()
fMsgSize = fConfig->GetProperty<int>("msg-size");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event;
LOG(warn) << "id: " << info.id;
LOG(warn) << "ptr: " << info.ptr;
LOG(warn) << "size: " << info.size;
LOG(warn) << "flags: " << info.flags;
});
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0,
10000000,
@@ -82,6 +90,7 @@ void Sampler::ResetTask()
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;
}
fRegion.reset();
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
}
Sampler::~Sampler()

View File

@@ -29,6 +29,13 @@ void Sink::InitTask()
{
// Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event;
LOG(warn) << "id: " << info.id;
LOG(warn) << "ptr: " << info.ptr;
LOG(warn) << "size: " << info.size;
LOG(warn) << "flags: " << info.flags;
});
}
void Sink::Run()
@@ -50,6 +57,11 @@ void Sink::Run()
}
}
void Sink::ResetTask()
{
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
}
Sink::~Sink()
{
}

View File

@@ -31,6 +31,7 @@ class Sink : public FairMQDevice
protected:
virtual void Run();
virtual void InitTask();
virtual void ResetTask();
private:
uint64_t fMaxIterations;

View File

@@ -19,6 +19,7 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport"
SAMPLER+=" --severity debug"
SAMPLER+=" --session $SESSION"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1"
SAMPLER+=" --msg-size $msgSize"

View File

@@ -245,7 +245,6 @@ if(BUILD_FAIRMQ)
plugins/Control.cxx
shmem/Message.cxx
shmem/Poller.cxx
shmem/UnmanagedRegion.cxx
shmem/Socket.cxx
shmem/TransportFactory.cxx
shmem/Manager.cxx

View File

@@ -340,6 +340,11 @@ class FairMQChannel
return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0)
{
return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags);
}
static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
static constexpr const char* DefaultTransportName = "default";
static constexpr const char* DefaultName = "";

View File

@@ -217,17 +217,47 @@ class FairMQDevice
}
// creates unamanaged region with the default device transport
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{
return Transport()->CreateUnmanagedRegion(size, callback);
return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
}
// creates unamanaged region with the default device transport
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size,
const int64_t userFlags,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{
return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags);
}
// creates unmanaged region with the transport of the specified channel
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0)
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel,
int index,
const size_t size,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{
return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags);
}
// creates unmanaged region with the transport of the specified channel
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel,
int index,
const size_t size,
const int64_t userFlags,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{
return GetChannel(channel, index).NewUnmanagedRegion(size, userFlags, callback, path, flags);
}
template<typename ...Ts>
FairMQPollerPtr NewPoller(const Ts&... inputs)
{

View File

@@ -7,8 +7,8 @@
********************************************************************************/
#include <FairMQTransportFactory.h>
#include <zeromq/FairMQTransportFactoryZMQ.h>
#include <fairmq/shmem/TransportFactory.h>
#include <zeromq/FairMQTransportFactoryZMQ.h>
#ifdef BUILD_NANOMSG_TRANSPORT
#include <nanomsg/FairMQTransportFactoryNN.h>
#endif /* BUILD_NANOMSG_TRANSPORT */
@@ -17,50 +17,46 @@
#endif
#include <FairMQLogger.h>
#include <fairmq/tools/Unique.h>
#include <memory>
#include <string>
FairMQTransportFactory::FairMQTransportFactory(const std::string& id)
using namespace std;
FairMQTransportFactory::FairMQTransportFactory(const string& id)
: fkId(id)
{
}
{}
auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id, const fair::mq::ProgOptions* config) -> std::shared_ptr<FairMQTransportFactory>
auto FairMQTransportFactory::CreateTransportFactory(const string& type,
const string& id,
const fair::mq::ProgOptions* config)
-> shared_ptr<FairMQTransportFactory>
{
using namespace std;
auto finalId = id;
// Generate uuid if empty
if (finalId == "")
{
if (finalId == "") {
finalId = fair::mq::tools::Uuid();
}
if (type == "zeromq")
{
if (type == "zeromq") {
return make_shared<FairMQTransportFactoryZMQ>(finalId, config);
}
else if (type == "shmem")
{
} else if (type == "shmem") {
return make_shared<fair::mq::shmem::TransportFactory>(finalId, config);
}
#ifdef BUILD_NANOMSG_TRANSPORT
else if (type == "nanomsg")
{
else if (type == "nanomsg") {
return make_shared<FairMQTransportFactoryNN>(finalId, config);
}
#endif /* BUILD_NANOMSG_TRANSPORT */
#ifdef BUILD_OFI_TRANSPORT
else if (type == "ofi")
{
else if (type == "ofi") {
return make_shared<fair::mq::ofi::TransportFactory>(finalId, config);
}
#endif /* BUILD_OFI_TRANSPORT */
else
{
LOG(error) << "Unavailable transport requested: " << "\"" << type << "\"" << ". Available are: "
else {
LOG(error) << "Unavailable transport requested: "
<< "\"" << type << "\""
<< ". Available are: "
<< "\"zeromq\""
<< "\"shmem\""
#ifdef BUILD_NANOMSG_TRANSPORT

View File

@@ -61,20 +61,49 @@ class FairMQTransportFactory
/// @param obj optional helper pointer that can be used in the callback
/// @return pointer to FairMQMessage
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0;
/// @brief create a message with the buffer located within the corresponding unmanaged region
/// @param unmanagedRegion the unmanaged region that this message buffer belongs to
/// @param data message buffer (must be within the region - checked at runtime by the transport)
/// @param size size of the message
/// @param hint optional parameter, returned to the user in the FairMQRegionCallback
virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size, void* hint = 0) = 0;
/// Create a socket
/// @brief Create a socket
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) = 0;
/// Create a poller for a single channel (all subchannels)
/// @brief Create a poller for a single channel (all subchannels)
virtual FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const = 0;
/// Create a poller for specific channels
/// @brief Create a poller for specific channels
virtual FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const = 0;
/// Create a poller for specific channels (all subchannels)
/// @brief Create a poller for specific channels (all subchannels)
virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0;
/// @brief Create new UnmanagedRegion
/// @param size size of the region
/// @param callback callback to be called when a message belonging to this region is no longer needed by the transport
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
/// @brief Create new UnmanagedRegion
/// @param size size of the region
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
/// @param callback callback to be called when a message belonging to this region is no longer needed by the transport
/// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
/// @brief Subscribe to region events (creation, destruction, ...)
/// @param callback the callback that is called when a region event occurs
virtual void SubscribeToRegionEvents(FairMQRegionEventCallback callback) = 0;
/// @brief Check if there is an active subscription to region events
/// @return true/false
virtual bool SubscribedToRegionEvents() = 0;
/// @brief Unsubscribe from region events
virtual void UnsubscribeFromRegionEvents() = 0;
virtual std::vector<FairMQRegionInfo> GetRegionInfo() = 0;
/// Get transport type
virtual fair::mq::Transport GetType() const = 0;

View File

@@ -12,26 +12,89 @@
#include <cstddef> // size_t
#include <memory> // std::unique_ptr
#include <functional> // std::function
#include <ostream> // std::ostream
class FairMQTransportFactory;
enum class FairMQRegionEvent : int
{
created,
destroyed,
local_only
};
struct FairMQRegionInfo
{
FairMQRegionInfo()
: id(0)
, ptr(nullptr)
, size(0)
, flags(0)
, event(FairMQRegionEvent::created)
{}
FairMQRegionInfo(uint64_t _id, void* _ptr, size_t _size, int64_t _flags, FairMQRegionEvent _event)
: id(_id)
, ptr(_ptr)
, size(_size)
, flags(_flags)
, event (_event)
{}
uint64_t id; // id of the region
void* ptr; // pointer to the start of the region
size_t size; // region size
int64_t flags; // custom flags set by the creator
FairMQRegionEvent event;
};
using FairMQRegionCallback = std::function<void(void*, size_t, void*)>;
using FairMQRegionEventCallback = std::function<void(FairMQRegionInfo)>;
class FairMQUnmanagedRegion
{
public:
FairMQUnmanagedRegion() {}
FairMQUnmanagedRegion(FairMQTransportFactory* factory): fTransport(factory) {}
virtual void* GetData() const = 0;
virtual size_t GetSize() const = 0;
virtual uint64_t GetId() const = 0;
FairMQTransportFactory* GetTransport() { return fTransport; }
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
virtual ~FairMQUnmanagedRegion() {};
private:
FairMQTransportFactory* fTransport{nullptr};
};
using FairMQUnmanagedRegionPtr = std::unique_ptr<FairMQUnmanagedRegion>;
inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event)
{
switch (event) {
case FairMQRegionEvent::created:
return os << "created";
case FairMQRegionEvent::destroyed:
return os << "destroyed";
case FairMQRegionEvent::local_only:
return os << "local_only";
default:
return os << "unrecognized event";
}
}
namespace fair
{
namespace mq
{
using RegionCallback = FairMQRegionCallback;
using RegionEventCallback = FairMQRegionEventCallback;
using RegionEvent = FairMQRegionEvent;
using RegionInfo = FairMQRegionInfo;
using UnmanagedRegion = FairMQUnmanagedRegion;
using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr;

View File

@@ -28,6 +28,8 @@ using boost::optional;
const std::string fair::mq::PluginManager::fgkLibPrefix = "FairMQPlugin_";
std::vector<boost::dll::shared_library> fair::mq::PluginManager::fgDLLKeepAlive = std::vector<boost::dll::shared_library>();
fair::mq::PluginManager::PluginManager()
: fSearchPaths{}
, fPluginFactories()

View File

@@ -97,6 +97,7 @@ class PluginManager
using fair::mq::tools::ToString;
auto lib = shared_library{std::forward<Args>(args)...};
fgDLLKeepAlive.push_back(lib);
fPluginFactories[pluginName] = import_alias<PluginFactory>(
shared_library{lib},
@@ -117,6 +118,7 @@ class PluginManager
static const std::string fgkLibPrefix;
std::vector<boost::filesystem::path> fSearchPaths;
static std::vector<boost::dll::shared_library> fgDLLKeepAlive;
std::map<std::string, std::function<PluginFactory>> fPluginFactories;
std::unique_ptr<PluginServices> fPluginServices;
std::map<std::string, std::unique_ptr<Plugin>> fPlugins;

View File

@@ -19,6 +19,7 @@ fair::mq::Transport FairMQTransportFactoryNN::fTransportType = fair::mq::Transpo
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const fair::mq::ProgOptions* /*config*/)
: FairMQTransportFactory(id)
, fRegionCounter(0)
{
LOG(debug) << "Transport: Using nanomsg library";
}
@@ -65,9 +66,14 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map<strin
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback, path, flags));
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(++fRegionCounter, size, callback, path, flags, this));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags, this));
}
fair::mq::Transport FairMQTransportFactoryNN::GetType() const

View File

@@ -36,7 +36,13 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override;
void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; }
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for nanomsg"; return false; }
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; }
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for nanomsg, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
fair::mq::Transport GetType() const override;
@@ -46,6 +52,7 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
private:
static fair::mq::Transport fTransportType;
uint64_t fRegionCounter;
mutable std::vector<FairMQSocket*> fSockets;
};

View File

@@ -11,8 +11,19 @@
using namespace std;
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */)
: fBuffer(malloc(size))
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(uint64_t id, const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
: FairMQUnmanagedRegion(factory)
, fId(id)
, fBuffer(malloc(size))
, fSize(size)
, fCallback(callback)
{
}
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(uint64_t id, const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
: FairMQUnmanagedRegion(factory)
, fId(id)
, fBuffer(malloc(size))
, fSize(size)
, fCallback(callback)
{
@@ -28,6 +39,12 @@ size_t FairMQUnmanagedRegionNN::GetSize() const
return fSize;
}
uint64_t FairMQUnmanagedRegionNN::GetId() const
{
return fId;
}
FairMQUnmanagedRegionNN::~FairMQUnmanagedRegionNN()
{
LOG(debug) << "destroying region";

View File

@@ -19,19 +19,23 @@ class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion
friend class FairMQSocketNN;
public:
FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionNN(uint64_t id, const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr);
FairMQUnmanagedRegionNN(uint64_t id, const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr);
FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete;
FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete;
virtual void* GetData() const override;
virtual size_t GetSize() const override;
void* GetData() const override;
size_t GetSize() const override;
uint64_t GetId() const override;
virtual ~FairMQUnmanagedRegionNN();
private:
uint64_t fId;
void* fBuffer;
size_t fSize;
FairMQRegionCallback fCallback;
};
#endif /* FAIRMQUNMANAGEDREGIONNN_H_ */
#endif /* FAIRMQUNMANAGEDREGIONNN_H_ */

View File

@@ -85,7 +85,12 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
// return PollerPtr{new Poller(channelsMap, channelList)};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}

View File

@@ -46,7 +46,13 @@ class TransportFactory final : public FairMQTransportFactory
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; }
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; }
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; }
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
auto GetType() const -> Transport override;

View File

@@ -8,8 +8,6 @@
#include "DDS.h"
#include <fairmq/sdk/commands/Commands.h>
#include <fairmq/tools/Strings.h>
#include <boost/algorithm/string/join.hpp>
@@ -37,6 +35,7 @@ DDS::DDS(const string& name,
const string& homepage,
PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices)
, fDDSTaskId(dds::env_prop<dds::task_id>())
, fCurrentState(DeviceState::Idle)
, fLastState(DeviceState::Idle)
, fDeviceTerminationRequested(false)
@@ -73,15 +72,22 @@ DDS::DDS(const string& name,
// subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) {
switch (newState) {
case DeviceState::Bound:
case DeviceState::Bound: {
// Receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
// allow updates from key value after channel containers are filled
{
lock_guard<mutex> lk(fUpdateMutex);
fUpdatesAllowed = true;
}
fUpdateCondition.notify_one();
// publish bound addresses via DDS at keys corresponding to the channel
// prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
break;
} break;
case DeviceState::ResettingDevice: {
{
lock_guard<mutex> lk(fUpdateMutex);
@@ -89,9 +95,8 @@ DDS::DDS(const string& name,
}
EmptyChannelContainers();
break;
}
case DeviceState::Exiting:
} break;
case DeviceState::Exiting: {
if (!fControllerThread.joinable()) {
fControllerThread = thread(&DDS::WaitForExitingAck, this);
}
@@ -99,21 +104,32 @@ DDS::DDS(const string& name,
fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
break;
} break;
default:
break;
}
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
using namespace sdk::cmd;
auto now = chrono::steady_clock::now();
string id = GetProperty<string>("id");
fLastState = fCurrentState;
fCurrentState = newState;
using namespace sdk::cmd;
for (auto subscriberId : fStateChangeSubscribers) {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
Cmds cmds(make<StateChange>(id, dds::env_prop<dds::task_id>(), fLastState, fCurrentState));
fDDS.Send(cmds.Serialize(), to_string(subscriberId));
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
for (auto it = fStateChangeSubscribers.cbegin(); it != fStateChangeSubscribers.end();) {
// if a subscriber did not send a heartbeat in more than 3 times the promised interval,
// remove it from the subscriber list
if (chrono::duration<double>(now - it->second.first).count() > 3 * it->second.second) {
LOG(warn) << "Controller '" << it->first
<< "' did not send heartbeats since over 3 intervals ("
<< 3 * it->second.second << " ms), removing it.";
fStateChangeSubscribers.erase(it++);
} else {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << it->first;
Cmds cmds(make<StateChange>(id, fDDSTaskId, fLastState, fCurrentState));
fDDS.Send(cmds.Serialize(), to_string(it->first));
++it;
}
}
});
@@ -143,10 +159,10 @@ auto DDS::StartWorkerThread() -> void
auto DDS::WaitForExitingAck() -> void
{
unique_lock<mutex> lock(fStateChangeSubscriberMutex);
fExitingAcked.wait_for(
lock,
chrono::milliseconds(GetProperty<unsigned int>("wait-for-exiting-ack-timeout")),
[this]() { return fExitingAckedByLastExternalController; });
auto timeout = GetProperty<unsigned int>("wait-for-exiting-ack-timeout");
fExitingAcked.wait_for(lock, chrono::milliseconds(timeout), [this]() {
return fExitingAckedByLastExternalController || fStateChangeSubscribers.empty();
});
}
auto DDS::FillChannelContainers() -> void
@@ -212,11 +228,6 @@ auto DDS::FillChannelContainers() -> void
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
fIofN.insert(make_pair(chanName, IofN(i, n)));
}
{
lock_guard<mutex> lk(fUpdateMutex);
fUpdatesAllowed = true;
}
fUpdateCondition.notify_one();
} catch (const exception& e) {
LOG(error) << "Error filling channel containers: " << e.what();
}
@@ -242,6 +253,12 @@ auto DDS::SubscribeForConnectingChannels() -> void
unique_lock<mutex> lk(fUpdateMutex);
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
}
if (fConnectingChans.find(channelName) == fConnectingChans.end()) {
LOG(error) << "Received an update for a connecting channel, but either no channel with given channel name exists or it has already been configured: '" << channelName << "', ignoring...";
return;
}
string val = value;
// check if it is to handle as one out of multiple values
auto it = fIofN.find(channelName);
@@ -275,7 +292,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
if (mi.second.fNumSubChannels == mi.second.fDDSValues.size()) {
int i = 0;
for (const auto& e : mi.second.fDDSValues) {
SetProperty<string>(string{"chans." + mi.first + "." + to_string(i) + ".address"}, e.second);
auto result = UpdateProperty<string>(string{"chans." + mi.first + "." + to_string(i) + ".address"}, e.second);
if (!result) {
LOG(error) << "UpdateProperty failed for: " << "chans." << mi.first << "." << to_string(i) << ".address" << " - property does not exist";
}
++i;
}
}
@@ -304,119 +324,125 @@ auto DDS::SubscribeForCustomCommands() -> void
fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) {
// LOG(info) << "Received command: '" << cmdStr << "' from " << senderId;
using namespace fair::mq::sdk;
cmd::Cmds inCmds;
sdk::cmd::Cmds inCmds;
inCmds.Deserialize(cmdStr);
for (const auto& cmd : inCmds) {
// LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << senderId;
switch (cmd->GetType()) {
case cmd::Type::check_state: {
fDDS.Send(cmd::Cmds(cmd::make<cmd::CurrentState>(id, GetCurrentDeviceState()))
.Serialize(),
to_string(senderId));
} break;
case cmd::Type::change_state: {
Transition transition = static_cast<cmd::ChangeState&>(*cmd).GetTransition();
if (ChangeDeviceState(transition)) {
cmd::Cmds outCmds(
cmd::make<cmd::TransitionStatus>(id, dds::env_prop<dds::task_id>(), cmd::Result::Ok, transition));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} else {
sdk::cmd::Cmds outCmds(
cmd::make<cmd::TransitionStatus>(id, dds::env_prop<dds::task_id>(), cmd::Result::Failure, transition));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
}
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fLastExternalController = senderId;
}
} break;
case cmd::Type::dump_config: {
stringstream ss;
for (const auto pKey : GetPropertyKeys()) {
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
}
cmd::Cmds outCmds(cmd::make<cmd::Config>(id, ss.str()));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case cmd::Type::state_change_exiting_received: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
if (fLastExternalController == senderId) {
fExitingAckedByLastExternalController = true;
}
}
fExitingAcked.notify_one();
} break;
case cmd::Type::subscribe_to_state_change: {
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.insert(senderId);
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState
<< " to " << senderId;
cmd::Cmds outCmds(
cmd::make<cmd::StateChangeSubscription>(id, cmd::Result::Ok),
cmd::make<cmd::StateChange>(
id, dds::env_prop<dds::task_id>(), fLastState, fCurrentState));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case cmd::Type::unsubscribe_from_state_change: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.erase(senderId);
}
cmd::Cmds outCmds(
cmd::make<cmd::StateChangeUnsubscription>(id, cmd::Result::Ok));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case cmd::Type::get_properties: {
auto _cmd = static_cast<cmd::GetProperties&>(*cmd);
auto const request_id(_cmd.GetRequestId());
auto result(cmd::Result::Ok);
std::vector<std::pair<std::string, std::string>> props;
try {
for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) {
props.push_back({prop.first, prop.second});
}
} catch (std::exception const& e) {
LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what();
result = cmd::Result::Failure;
}
cmd::Cmds const outCmds(cmd::make<cmd::Properties>(id, request_id, result, props));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case cmd::Type::set_properties: {
auto _cmd(static_cast<cmd::SetProperties&>(*cmd));
auto const request_id(_cmd.GetRequestId());
auto result(cmd::Result::Ok);
try {
fair::mq::Properties props;
for (auto const& prop : _cmd.GetProps()) {
props.insert({prop.first, fair::mq::Property(prop.second)});
}
// TODO Handle builtin keys with different value type than string
SetProperties(props);
} catch (std::exception const& e) {
LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what();
result = cmd::Result::Failure;
}
cmd::Cmds const outCmds(cmd::make<cmd::PropertiesSet>(id, request_id, result));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
default:
LOG(warn) << "Unexpected/unknown command received: " << cmdStr;
LOG(warn) << "Origin: " << senderId;
LOG(warn) << "Destination: " << cond;
break;
}
HandleCmd(id, *cmd, cond, senderId);
}
});
}
auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, uint64_t senderId) -> void
{
using namespace fair::mq::sdk;
using namespace fair::mq::sdk::cmd;
// LOG(info) << "Received command type: '" << cmd.GetType() << "' from " << senderId;
switch (cmd.GetType()) {
case Type::check_state: {
fDDS.Send(Cmds(make<CurrentState>(id, GetCurrentDeviceState())).Serialize(), to_string(senderId));
} break;
case Type::change_state: {
Transition transition = static_cast<ChangeState&>(cmd).GetTransition();
if (ChangeDeviceState(transition)) {
Cmds outCmds(make<TransitionStatus>(id, fDDSTaskId, Result::Ok, transition));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} else {
Cmds outCmds(make<TransitionStatus>(id, fDDSTaskId, Result::Failure, transition));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
}
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fLastExternalController = senderId;
}
} break;
case Type::dump_config: {
stringstream ss;
for (const auto pKey : GetPropertyKeys()) {
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
}
Cmds outCmds(make<Config>(id, ss.str()));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case Type::state_change_exiting_received: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
if (fLastExternalController == senderId) {
fExitingAckedByLastExternalController = true;
}
}
fExitingAcked.notify_one();
} break;
case Type::subscribe_to_state_change: {
auto _cmd = static_cast<cmd::SubscribeToStateChange&>(cmd);
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.emplace(senderId, make_pair(chrono::steady_clock::now(), _cmd.GetInterval()));
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
Cmds outCmds(make<StateChangeSubscription>(id, fDDSTaskId, Result::Ok),
make<StateChange>(id, fDDSTaskId, fLastState, fCurrentState));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case Type::subscription_heartbeat: {
try {
auto _cmd = static_cast<cmd::SubscriptionHeartbeat&>(cmd);
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.at(senderId) = make_pair(chrono::steady_clock::now(), _cmd.GetInterval());
} catch(out_of_range& oor) {
LOG(warn) << "Received subscription heartbeat from an unknown controller with id '" << senderId << "'";
}
} break;
case Type::unsubscribe_from_state_change: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.erase(senderId);
}
Cmds outCmds(make<StateChangeUnsubscription>(id, fDDSTaskId, Result::Ok));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case Type::get_properties: {
auto _cmd = static_cast<cmd::GetProperties&>(cmd);
auto const request_id(_cmd.GetRequestId());
auto result(Result::Ok);
vector<pair<string, string>> props;
try {
for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) {
props.push_back({prop.first, prop.second});
}
} catch (exception const& e) {
LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what();
result = Result::Failure;
}
Cmds const outCmds(make<cmd::Properties>(id, request_id, result, props));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case Type::set_properties: {
auto _cmd(static_cast<cmd::SetProperties&>(cmd));
auto const request_id(_cmd.GetRequestId());
auto result(Result::Ok);
try {
fair::mq::Properties props;
for (auto const& prop : _cmd.GetProps()) {
props.insert({prop.first, fair::mq::Property(prop.second)});
}
// TODO Handle builtin keys with different value type than string
SetProperties(props);
} catch (exception const& e) {
LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what();
result = Result::Failure;
}
Cmds const outCmds(make<PropertiesSet>(id, request_id, result));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
default:
LOG(warn) << "Unexpected/unknown command received: " << cmd.GetType();
LOG(warn) << "Origin: " << senderId;
LOG(warn) << "Destination: " << cond;
break;
}
}
DDS::~DDS()
{
UnsubscribeFromDeviceStateChange();

View File

@@ -12,6 +12,7 @@
#include <fairmq/Plugin.h>
#include <fairmq/StateQueue.h>
#include <fairmq/Version.h>
#include <fairmq/sdk/commands/Commands.h>
#include <dds/dds.h>
@@ -23,12 +24,12 @@
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <set>
#include <string>
#include <atomic>
#include <thread>
#include <map>
#include <unordered_map>
#include <utility> // pair
#include <vector>
namespace fair
@@ -142,8 +143,10 @@ class DDS : public Plugin
auto SubscribeForConnectingChannels() -> void;
auto PublishBoundChannels() -> void;
auto SubscribeForCustomCommands() -> void;
auto HandleCmd(const std::string& id, sdk::cmd::Cmd& cmd, const std::string& cond, uint64_t senderId) -> void;
DDSSubscription fDDS;
size_t fDDSTaskId;
std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
std::unordered_map<std::string, DDSConfig> fConnectingChans;
@@ -156,7 +159,7 @@ class DDS : public Plugin
std::atomic<bool> fDeviceTerminationRequested;
std::set<uint64_t> fStateChangeSubscribers;
std::unordered_map<uint64_t, std::pair<std::chrono::steady_clock::time_point, int64_t>> fStateChangeSubscribers;
uint64_t fLastExternalController;
bool fExitingAckedByLastExternalController;
std::condition_variable fExitingAcked;

View File

@@ -65,7 +65,7 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
cout << d.taskId << " : " << d.state << endl;
}
} else if (command == "o") {
cout << "> dumping config of the devices" << endl;
cout << "> dumping config of the devices (" << path << ")" << endl;
// TODO: extend this regex to return all properties, once command size limitation is removed.
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
for (const auto& d : result.second.devices) {
@@ -79,43 +79,43 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
return;
}
const DeviceProperties props{{pKey, pVal}};
cout << "> sending property" << endl;
cout << "> sending property (" << path << ")" << endl;
topo.SetProperties(props, path);
// give dds time to complete request
this_thread::sleep_for(chrono::milliseconds(100));
} else if (command == "i") {
cout << "> init devices" << endl;
topo.ChangeState(TopologyTransition::InitDevice, std::chrono::milliseconds(timeout));
cout << "> init devices (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::InitDevice, path, std::chrono::milliseconds(timeout));
} else if (command == "k") {
cout << "> complete init" << endl;
topo.ChangeState(TopologyTransition::CompleteInit, std::chrono::milliseconds(timeout));
cout << "> complete init (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::CompleteInit, path, std::chrono::milliseconds(timeout));
} else if (command == "b") {
cout << "> bind devices" << endl;
topo.ChangeState(TopologyTransition::Bind, std::chrono::milliseconds(timeout));
cout << "> bind devices (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::Bind, path, std::chrono::milliseconds(timeout));
} else if (command == "x") {
cout << "> connect devices" << endl;
topo.ChangeState(TopologyTransition::Connect, std::chrono::milliseconds(timeout));
cout << "> connect devices (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::Connect, path, std::chrono::milliseconds(timeout));
} else if (command == "j") {
cout << "> init tasks" << endl;
topo.ChangeState(TopologyTransition::InitTask, std::chrono::milliseconds(timeout));
cout << "> init tasks (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::InitTask, path, std::chrono::milliseconds(timeout));
} else if (command == "r") {
cout << "> run tasks" << endl;
topo.ChangeState(TopologyTransition::Run, std::chrono::milliseconds(timeout));
cout << "> run tasks (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::Run, path, std::chrono::milliseconds(timeout));
} else if (command == "s") {
cout << "> stop devices" << endl;
topo.ChangeState(TopologyTransition::Stop, std::chrono::milliseconds(timeout));
cout << "> stop devices (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::Stop, path, std::chrono::milliseconds(timeout));
} else if (command == "t") {
cout << "> reset tasks" << endl;
topo.ChangeState(TopologyTransition::ResetTask, std::chrono::milliseconds(timeout));
cout << "> reset tasks (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::ResetTask, path, std::chrono::milliseconds(timeout));
} else if (command == "d") {
cout << "> reset devices" << endl;
topo.ChangeState(TopologyTransition::ResetDevice, std::chrono::milliseconds(timeout));
cout << "> reset devices (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::ResetDevice, path, std::chrono::milliseconds(timeout));
} else if (command == "q") {
cout << "> end (" << path << ")" << endl;
topo.ChangeState(TopologyTransition::End, path, std::chrono::milliseconds(timeout));
} else if (command == "h") {
cout << "> help" << endl;
printControlsHelp();
} else if (command == "q") {
cout << "> end" << endl;
topo.ChangeState(TopologyTransition::End, std::chrono::milliseconds(timeout));
} else {
cout << "\033[01;32mInvalid input: [" << command << "]\033[0m" << endl;
printControlsHelp();

View File

@@ -171,7 +171,7 @@ auto PMIxPlugin::SubscribeForCommands() -> void
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState
<< " to " << sender;
Cmds outCmds(make<StateChangeSubscription>(fDeviceId, Result::Ok),
Cmds outCmds(make<StateChangeSubscription>(fDeviceId, fProcess.rank, Result::Ok),
make<StateChange>(fDeviceId, 0, fLastState, fCurrentState));
fCommands.Send(outCmds.Serialize(Format::JSON), {sender});
}
@@ -181,7 +181,7 @@ auto PMIxPlugin::SubscribeForCommands() -> void
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.erase(sender.rank);
}
fCommands.Send(Cmds(make<StateChangeUnsubscription>(fDeviceId, Result::Ok))
fCommands.Send(Cmds(make<StateChangeUnsubscription>(fDeviceId, fProcess.rank, Result::Ok))
.Serialize(Format::JSON),
{sender});
}

View File

@@ -53,7 +53,7 @@ struct StateSubscription
explicit StateSubscription(pmix::Commands& commands)
: fCommands(commands)
{
fCommands.Send(Cmds(make<SubscribeToStateChange>()).Serialize(Format::JSON));
fCommands.Send(Cmds(make<SubscribeToStateChange>(600000)).Serialize(Format::JSON));
}
~StateSubscription()

View File

@@ -34,6 +34,7 @@
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <map>
#include <memory>
@@ -71,7 +72,7 @@ const std::map<DeviceTransition, DeviceState> expectedState =
struct DeviceStatus
{
bool initialized;
bool subscribed_to_state_changes;
DeviceState lastState;
DeviceState state;
DDSTask::Id taskId;
@@ -174,6 +175,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
, fDDSTopo(std::move(topo))
, fStateData()
, fStateIndex()
, fHeartbeatsTimer(asio::system_executor())
, fHeartbeatInterval(600000)
{
makeTopologyState();
@@ -183,67 +186,10 @@ class BasicTopology : public AsioBase<Executor, Allocator>
throw RuntimeError("Given topology ", givenTopo, " is not activated (active: ", activeTopo, ")");
}
using namespace fair::mq::sdk::cmd;
fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
Cmds inCmds;
inCmds.Deserialize(msg);
// FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
for (const auto& cmd : inCmds) {
// FAIR_LOG(debug) << " > " << cmd->GetType();
switch (cmd->GetType()) {
case Type::state_change: {
auto _cmd = static_cast<StateChange&>(*cmd);
if (_cmd.GetCurrentState() == DeviceState::Exiting) {
fDDSSession.SendCommand(Cmds(make<StateChangeExitingReceived>()).Serialize(), senderId);
}
HandleCmd(_cmd);
} break;
case Type::state_change_subscription:
if (static_cast<StateChangeSubscription&>(*cmd).GetResult() != Result::Ok) {
FAIR_LOG(error) << "State change subscription failed for " << static_cast<StateChangeSubscription&>(*cmd).GetDeviceId();
}
break;
case Type::state_change_unsubscription:
if (static_cast<StateChangeUnsubscription&>(*cmd).GetResult() != Result::Ok) {
FAIR_LOG(error) << "State change unsubscription failed for " << static_cast<StateChangeUnsubscription&>(*cmd).GetDeviceId();
}
break;
case Type::transition_status: {
auto _cmd = static_cast<TransitionStatus&>(*cmd);
if (_cmd.GetResult() != Result::Ok) {
FAIR_LOG(error) << _cmd.GetTransition() << " transition failed for " << _cmd.GetDeviceId();
DDSTask::Id id(_cmd.GetTaskId());
std::lock_guard<std::mutex> lk(fMtx);
for (auto& op : fChangeStateOps) {
if (!op.second.IsCompleted() && op.second.ContainsTask(id) &&
fStateData.at(fStateIndex.at(id)).state != op.second.GetTargetState()) {
op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
}
}
}
}
break;
case Type::properties: {
HandleCmd(static_cast<cmd::Properties&>(*cmd));
}
break;
case Type::properties_set: {
HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
}
break;
default:
FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
FAIR_LOG(warn) << "Origin: " << senderId;
break;
}
}
});
SubscribeToCommands();
fDDSSession.StartDDSService();
// FAIR_LOG(debug) << "Subscribing to state change";
Cmds cmds(make<SubscribeToStateChange>());
fDDSSession.SendCommand(cmds.Serialize());
SubscribeToStateChanges();
}
/// not copyable
@@ -254,15 +200,6 @@ class BasicTopology : public AsioBase<Executor, Allocator>
BasicTopology(BasicTopology&&) = default;
BasicTopology& operator=(BasicTopology&&) = default;
void UnsubscribeFromStateChanges()
{
using namespace fair::mq::sdk::cmd;
fDDSSession.SendCommand(Cmds(make<UnsubscribeFromStateChange>()).Serialize());
// give dds a chance to complete request, TODO: track each individual task and its subscription status
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
~BasicTopology()
{
UnsubscribeFromStateChanges();
@@ -276,16 +213,139 @@ class BasicTopology : public AsioBase<Executor, Allocator>
} catch (...) {}
}
auto HandleCmd(cmd::StateChange const& cmd) -> void
void SubscribeToStateChanges()
{
// FAIR_LOG(debug) << "Subscribing to state change";
cmd::Cmds cmds(cmd::make<cmd::SubscribeToStateChange>(fHeartbeatInterval.count()));
fDDSSession.SendCommand(cmds.Serialize());
fHeartbeatsTimer.expires_after(fHeartbeatInterval);
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
}
void SendSubscriptionHeartbeats(const std::error_code& ec)
{
if (!ec) {
// Timer expired.
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::SubscriptionHeartbeat>(fHeartbeatInterval.count())).Serialize());
// schedule again
fHeartbeatsTimer.expires_after(fHeartbeatInterval);
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
} else if (ec == asio::error::operation_aborted) {
// FAIR_LOG(debug) << "Heartbeats timer canceled";
} else {
FAIR_LOG(error) << "Timer error: " << ec;
}
}
void UnsubscribeFromStateChanges()
{
// stop sending heartbeats
fHeartbeatsTimer.cancel();
// unsubscribe from state changes
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
// wait for all tasks to confirm unsubscription
std::unique_lock<std::mutex> lk(fMtx);
fStateChangeUnsubscriptionCV.wait(lk, [&](){
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
return fStateData.at(s.second).subscribed_to_state_changes == false;
});
return count == fStateIndex.size();
});
}
void SubscribeToCommands()
{
fDDSSession.SubscribeToCommands([&](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
cmd::Cmds inCmds;
inCmds.Deserialize(msg);
// FAIR_LOG(debug) << "Received " << inCmds.Size() << " command(s) with total size of " << msg.length() << " bytes: ";
for (const auto& cmd : inCmds) {
// FAIR_LOG(debug) << " > " << cmd->GetType();
switch (cmd->GetType()) {
case cmd::Type::state_change_subscription:
HandleCmd(static_cast<cmd::StateChangeSubscription&>(*cmd));
break;
case cmd::Type::state_change_unsubscription:
HandleCmd(static_cast<cmd::StateChangeUnsubscription&>(*cmd));
break;
case cmd::Type::state_change:
HandleCmd(static_cast<cmd::StateChange&>(*cmd), senderId);
break;
case cmd::Type::transition_status:
HandleCmd(static_cast<cmd::TransitionStatus&>(*cmd));
break;
case cmd::Type::properties:
HandleCmd(static_cast<cmd::Properties&>(*cmd));
break;
case cmd::Type::properties_set:
HandleCmd(static_cast<cmd::PropertiesSet&>(*cmd));
break;
default:
FAIR_LOG(warn) << "Unexpected/unknown command received: " << cmd->GetType();
FAIR_LOG(warn) << "Origin: " << senderId;
break;
}
}
});
}
auto HandleCmd(cmd::StateChangeSubscription const& cmd) -> void
{
if (cmd.GetResult() == cmd::Result::Ok) {
DDSTask::Id taskId(cmd.GetTaskId());
try {
std::lock_guard<std::mutex> lk(fMtx);
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
task.subscribed_to_state_changes = true;
} catch (const std::exception& e) {
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
}
} else {
FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
}
}
auto HandleCmd(cmd::StateChangeUnsubscription const& cmd) -> void
{
if (cmd.GetResult() == cmd::Result::Ok) {
DDSTask::Id taskId(cmd.GetTaskId());
try {
std::unique_lock<std::mutex> lk(fMtx);
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
task.subscribed_to_state_changes = false;
lk.unlock();
fStateChangeUnsubscriptionCV.notify_one();
} catch (const std::exception& e) {
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
}
} else {
FAIR_LOG(error) << "State change unsubscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
}
}
auto HandleCmd(cmd::StateChange const& cmd, DDSChannel::Id const& senderId) -> void
{
if (cmd.GetCurrentState() == DeviceState::Exiting) {
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::StateChangeExitingReceived>()).Serialize(), senderId);
}
DDSTask::Id taskId(cmd.GetTaskId());
std::lock_guard<std::mutex> lk(fMtx);
try {
std::lock_guard<std::mutex> lk(fMtx);
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
task.initialized = true;
task.lastState = cmd.GetLastState();
task.state = cmd.GetCurrentState();
// if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
if (task.state == DeviceState::Exiting) {
task.subscribed_to_state_changes = false;
}
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
for (auto& op : fChangeStateOps) {
@@ -299,6 +359,21 @@ class BasicTopology : public AsioBase<Executor, Allocator>
}
}
auto HandleCmd(cmd::TransitionStatus const& cmd) -> void
{
if (cmd.GetResult() != cmd::Result::Ok) {
FAIR_LOG(error) << cmd.GetTransition() << " transition failed for " << cmd.GetDeviceId();
DDSTask::Id taskId(cmd.GetTaskId());
std::lock_guard<std::mutex> lk(fMtx);
for (auto& op : fChangeStateOps) {
if (!op.second.IsCompleted() && op.second.ContainsTask(taskId) &&
fStateData.at(fStateIndex.at(taskId)).state != op.second.GetTargetState()) {
op.second.Complete(MakeErrorCode(ErrorCode::DeviceChangeStateFailed));
}
}
}
}
auto HandleCmd(cmd::Properties const& cmd) -> void
{
std::unique_lock<std::mutex> lk(fMtx);
@@ -619,7 +694,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
}
/// @brief Returns the current state of the topology
/// @return map of id : DeviceStatus (initialized, state)
/// @return map of id : DeviceStatus
auto GetCurrentState() const -> TopologyState
{
std::lock_guard<std::mutex> lk(fMtx);
@@ -1138,6 +1213,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
return {ec, failed};
}
Duration GetHeartbeatInterval() const { return fHeartbeatInterval; }
void SetHeartbeatInterval(Duration duration) { fHeartbeatInterval = duration; }
private:
using TransitionedCount = unsigned int;
@@ -1145,8 +1223,13 @@ class BasicTopology : public AsioBase<Executor, Allocator>
DDSTopology fDDSTopo;
TopologyState fStateData;
TopologyStateIndex fStateIndex;
mutable std::mutex fMtx;
std::condition_variable fStateChangeUnsubscriptionCV;
asio::steady_timer fHeartbeatsTimer;
Duration fHeartbeatInterval;
std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;
std::unordered_map<typename SetPropertiesOp::Id, SetPropertiesOp> fSetPropertiesOps;

View File

@@ -46,7 +46,7 @@ array<string, 2> resultNames =
}
};
array<string, 16> typeNames =
array<string, 17> typeNames =
{
{
"CheckState",
@@ -57,6 +57,7 @@ array<string, 16> typeNames =
"StateChangeExitingReceived",
"GetProperties",
"SetProperties",
"SubscriptionHeartbeat",
"CurrentState",
"TransitionStatus",
@@ -147,7 +148,7 @@ array<sdk::cmd::FBTransition, 12> mqTransitionToFBTransition =
}
};
array<FBCmd, 16> typeToFBCmd =
array<FBCmd, 17> typeToFBCmd =
{
{
FBCmd::FBCmd_check_state,
@@ -158,6 +159,7 @@ array<FBCmd, 16> typeToFBCmd =
FBCmd::FBCmd_state_change_exiting_received,
FBCmd::FBCmd_get_properties,
FBCmd::FBCmd_set_properties,
FBCmd::FBCmd_subscription_heartbeat,
FBCmd::FBCmd_current_state,
FBCmd::FBCmd_transition_status,
FBCmd::FBCmd_config,
@@ -169,7 +171,7 @@ array<FBCmd, 16> typeToFBCmd =
}
};
array<Type, 16> fbCmdToType =
array<Type, 17> fbCmdToType =
{
{
Type::check_state,
@@ -180,6 +182,7 @@ array<Type, 16> fbCmdToType =
Type::state_change_exiting_received,
Type::get_properties,
Type::set_properties,
Type::subscription_heartbeat,
Type::current_state,
Type::transition_status,
Type::config,
@@ -228,7 +231,9 @@ string Cmds::Serialize(const Format type) const
break;
break;
case Type::subscribe_to_state_change: {
auto _cmd = static_cast<SubscribeToStateChange&>(*cmd);
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
cmdBuilder->add_interval(_cmd.GetInterval());
}
break;
case Type::unsubscribe_from_state_change: {
@@ -261,6 +266,12 @@ string Cmds::Serialize(const Format type) const
cmdBuilder->add_properties(props);
}
break;
case Type::subscription_heartbeat: {
auto _cmd = static_cast<SubscriptionHeartbeat&>(*cmd);
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
cmdBuilder->add_interval(_cmd.GetInterval());
}
break;
case Type::current_state: {
auto _cmd = static_cast<CurrentState&>(*cmd);
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
@@ -293,6 +304,7 @@ string Cmds::Serialize(const Format type) const
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
cmdBuilder->add_device_id(deviceId);
cmdBuilder->add_task_id(_cmd.GetTaskId());
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
}
break;
@@ -301,6 +313,7 @@ string Cmds::Serialize(const Format type) const
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
cmdBuilder->add_device_id(deviceId);
cmdBuilder->add_task_id(_cmd.GetTaskId());
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
}
break;
@@ -404,7 +417,7 @@ void Cmds::Deserialize(const string& str, const Format type)
fCmds.emplace_back(make<DumpConfig>());
break;
case FBCmd_subscribe_to_state_change:
fCmds.emplace_back(make<SubscribeToStateChange>());
fCmds.emplace_back(make<SubscribeToStateChange>(cmdPtr.interval()));
break;
case FBCmd_unsubscribe_from_state_change:
fCmds.emplace_back(make<UnsubscribeFromStateChange>());
@@ -423,6 +436,9 @@ void Cmds::Deserialize(const string& str, const Format type)
}
fCmds.emplace_back(make<SetProperties>(cmdPtr.request_id(), properties));
} break;
case FBCmd_subscription_heartbeat:
fCmds.emplace_back(make<SubscriptionHeartbeat>(cmdPtr.interval()));
break;
case FBCmd_current_state:
fCmds.emplace_back(make<CurrentState>(cmdPtr.device_id()->str(), GetMQState(cmdPtr.current_state())));
break;
@@ -433,10 +449,10 @@ void Cmds::Deserialize(const string& str, const Format type)
fCmds.emplace_back(make<Config>(cmdPtr.device_id()->str(), cmdPtr.config_string()->str()));
break;
case FBCmd_state_change_subscription:
fCmds.emplace_back(make<StateChangeSubscription>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result())));
fCmds.emplace_back(make<StateChangeSubscription>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result())));
break;
case FBCmd_state_change_unsubscription:
fCmds.emplace_back(make<StateChangeUnsubscription>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result())));
fCmds.emplace_back(make<StateChangeUnsubscription>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetResult(cmdPtr.result())));
break;
case FBCmd_state_change:
fCmds.emplace_back(make<StateChange>(cmdPtr.device_id()->str(), cmdPtr.task_id(), GetMQState(cmdPtr.last_state()), GetMQState(cmdPtr.current_state())));

View File

@@ -47,12 +47,13 @@ enum class Type : int
state_change_exiting_received, // args: { }
get_properties, // args: { request_id, property_query }
set_properties, // args: { request_id, properties }
subscription_heartbeat, // args: { interval }
current_state, // args: { device_id, current_state }
transition_status, // args: { device_id, task_id, Result, transition }
config, // args: { device_id, config_string }
state_change_subscription, // args: { device_id, Result }
state_change_unsubscription, // args: { device_id, Result }
state_change_subscription, // args: { device_id, task_id, Result }
state_change_unsubscription, // args: { device_id, task_id, Result }
state_change, // args: { device_id, task_id, last_state, current_state }
properties, // args: { device_id, request_id, Result, properties }
properties_set // args: { device_id, request_id, Result }
@@ -95,7 +96,16 @@ struct DumpConfig : Cmd
struct SubscribeToStateChange : Cmd
{
explicit SubscribeToStateChange() : Cmd(Type::subscribe_to_state_change) {}
explicit SubscribeToStateChange(int64_t interval)
: Cmd(Type::subscribe_to_state_change)
, fInterval(interval)
{}
int64_t GetInterval() const { return fInterval; }
void SetInterval(int64_t interval) { fInterval = interval; }
private:
int64_t fInterval;
};
struct UnsubscribeFromStateChange : Cmd
@@ -144,6 +154,20 @@ struct SetProperties : Cmd
std::vector<std::pair<std::string, std::string>> fProperties;
};
struct SubscriptionHeartbeat : Cmd
{
explicit SubscriptionHeartbeat(int64_t interval)
: Cmd(Type::subscription_heartbeat)
, fInterval(interval)
{}
int64_t GetInterval() const { return fInterval; }
void SetInterval(int64_t interval) { fInterval = interval; }
private:
int64_t fInterval;
};
struct CurrentState : Cmd
{
explicit CurrentState(const std::string& id, State currentState)
@@ -208,37 +232,45 @@ struct Config : Cmd
struct StateChangeSubscription : Cmd
{
explicit StateChangeSubscription(const std::string& id, const Result result)
explicit StateChangeSubscription(const std::string& id, const uint64_t taskId, const Result result)
: Cmd(Type::state_change_subscription)
, fDeviceId(id)
, fTaskId(taskId)
, fResult(result)
{}
std::string GetDeviceId() const { return fDeviceId; }
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
uint64_t GetTaskId() const { return fTaskId; }
void SetTaskId(const uint64_t taskId) { fTaskId = taskId; }
Result GetResult() const { return fResult; }
void SetResult(const Result result) { fResult = result; }
private:
std::string fDeviceId;
uint64_t fTaskId;
Result fResult;
};
struct StateChangeUnsubscription : Cmd
{
explicit StateChangeUnsubscription(const std::string& id, const Result result)
explicit StateChangeUnsubscription(const std::string& id, const uint64_t taskId, const Result result)
: Cmd(Type::state_change_unsubscription)
, fDeviceId(id)
, fTaskId(taskId)
, fResult(result)
{}
std::string GetDeviceId() const { return fDeviceId; }
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
uint64_t GetTaskId() const { return fTaskId; }
void SetTaskId(const uint64_t taskId) { fTaskId = taskId; }
Result GetResult() const { return fResult; }
void SetResult(const Result result) { fResult = result; }
private:
std::string fDeviceId;
uint64_t fTaskId;
Result fResult;
};

View File

@@ -47,17 +47,18 @@ enum FBCmd:byte {
check_state, // args: { }
change_state, // args: { transition }
dump_config, // args: { }
subscribe_to_state_change, // args: { }
subscribe_to_state_change, // args: { interval }
unsubscribe_from_state_change, // args: { }
state_change_exiting_received, // args: { }
get_properties, // args: { request_id, property_query }
set_properties, // args: { request_id, properties }
subscription_heartbeat, // args: { interval }
current_state, // args: { device_id, current_state }
transition_status, // args: { device_id, Result, transition }
transition_status, // args: { device_id, task_id, Result, transition }
config, // args: { device_id, config_string }
state_change_subscription, // args: { device_id, Result }
state_change_unsubscription, // args: { device_id, Result }
state_change_subscription, // args: { device_id, task_id, Result }
state_change_unsubscription, // args: { device_id, task_id, Result }
state_change, // args: { device_id, task_id, last_state, current_state }
properties, // args: { device_id, request_id, Result, properties }
properties_set // args: { device_id, request_id, Result }
@@ -68,6 +69,7 @@ table FBCommand {
device_id:string;
task_id:uint64;
request_id:uint64;
interval:int64;
state:FBState;
transition:FBTransition;
result:FBResult;

View File

@@ -41,15 +41,21 @@ struct RegionInfo
RegionInfo(const VoidAlloc& alloc)
: fPath("", alloc)
, fFlags(0)
, fUserFlags(0)
, fDestroyed(false)
{}
RegionInfo(const char* path, int flags, const VoidAlloc& alloc)
RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc)
: fPath(path, alloc)
, fFlags(flags)
, fUserFlags(userFlags)
, fDestroyed(false)
{}
Str fPath;
int fFlags;
uint64_t fUserFlags;
bool fDestroyed;
};
using Uint64RegionInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint64_t, RegionInfo>, SegmentManager>;

View File

@@ -7,7 +7,6 @@
********************************************************************************/
#include "Manager.h"
#include "Common.h"
#include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h>
@@ -19,6 +18,7 @@ using namespace std;
using bie = ::boost::interprocess::interprocess_exception;
namespace bipc = ::boost::interprocess;
namespace bfs = ::boost::filesystem;
namespace bpt = ::boost::posix_time;
namespace fair
{
@@ -27,19 +27,26 @@ namespace mq
namespace shmem
{
std::unordered_map<uint64_t, std::unique_ptr<Region>> Manager::fRegions;
Manager::Manager(const std::string& id, size_t size)
Manager::Manager(const string& id, size_t size)
: fShmId(id)
, fSegmentName("fmq_" + fShmId + "_main")
, fManagementSegmentName("fmq_" + fShmId + "_mng")
, fSegment(bipc::open_or_create, fSegmentName.c_str(), size)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 655360)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(bipc::open_or_create, string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false)
, fDeviceCounter(nullptr)
, fRegionInfos(nullptr)
, fInterrupted(false)
{
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(bipc::unique_instance)(fShmVoidAlloc);
// store info about the managed segment as region with id 0
fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc));
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fDeviceCounter = fManagementSegment.find<DeviceCounter>(bipc::unique_instance).first;
@@ -55,7 +62,7 @@ Manager::Manager(const std::string& id, size_t size)
}
}
void Manager::StartMonitor(const std::string& id)
void Manager::StartMonitor(const string& id)
{
try {
bipc::named_mutex monitorStatus(bipc::open_only, string("fmq_" + id + "_ms").c_str());
@@ -94,47 +101,74 @@ void Manager::StartMonitor(const std::string& id)
}
}
void Manager::Interrupt()
pair<bipc::mapped_region*, uint64_t> Manager::CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
{
}
try {
void Manager::Resume()
{
// close remote regions before processing new transfers
for (auto it = fRegions.begin(); it != fRegions.end(); /**/) {
if (it->second->fRemote) {
it = fRegions.erase(it);
} else {
++it;
}
}
}
pair<bipc::mapped_region*, uint64_t> result;
bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
{
auto it = fRegions.find(id);
if (it != fRegions.end()) {
LOG(error) << "Trying to create a region that already exists";
return nullptr;
} else {
// create region info
{
uint64_t id = 0;
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
VoidAlloc voidAlloc(fManagementSegment.get_segment_manager());
Uint64RegionInfoMap* infoMap = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(bipc::unique_instance)(voidAlloc);
infoMap->emplace(id, RegionInfo(path.c_str(), flags, voidAlloc));
RegionCounter* rc = fManagementSegment.find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing.";
(rc->fCount)++;
LOG(debug) << "incremented region counter, now: " << rc->fCount;
} else {
LOG(debug) << "no region counter found, creating one and initializing with 1";
rc = fManagementSegment.construct<RegionCounter>(bipc::unique_instance)(1);
LOG(debug) << "initialized region counter with: " << rc->fCount;
}
id = rc->fCount;
auto it = fRegions.find(id);
if (it != fRegions.end()) {
LOG(error) << "Trying to create a region that already exists";
return {nullptr, id};
}
// create region info
fRegionInfos->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, size, false, callback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
result.second = id;
}
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
fRegionEventsCV.notify_all();
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, size, false, callback, path, flags));
return result;
r.first->second->StartReceivingAcks();
return &(r.first->second->fRegion);
} catch (bipc::interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what();
throw;
}
}
Region* Manager::GetRemoteRegion(const uint64_t id)
void Manager::RemoveRegion(const uint64_t id)
{
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegions.erase(id);
fRegionInfos->at(id).fDestroyed = true;
}
fRegionEventsCV.notify_all();
}
Region* Manager::GetRegion(const uint64_t id)
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
return GetRegionUnsafe(id);
}
Region* Manager::GetRegionUnsafe(const uint64_t id)
{
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id);
@@ -142,21 +176,10 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
return it->second.get();
} else {
try {
string path;
int flags;
// get region info
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
Uint64RegionInfoMap* infoMap = fManagementSegment.find<Uint64RegionInfoMap>(bipc::unique_instance).first;
if (infoMap == nullptr) {
LOG(error) << "Unable to locate the region info";
throw SharedMemoryError("Unable to locate remote region info");
}
RegionInfo regionInfo = infoMap->at(id);
path = regionInfo.fPath.c_str();
flags = regionInfo.fFlags;
}
RegionInfo regionInfo = fRegionInfos->at(id);
string path = regionInfo.fPath.c_str();
int flags = regionInfo.fFlags;
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
@@ -165,13 +188,106 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
LOG(warn) << "Could not get remote region for id: " << id;
return nullptr;
}
}
}
void Manager::RemoveRegion(const uint64_t id)
vector<fair::mq::RegionInfo> Manager::GetRegionInfo()
{
fRegions.erase(id);
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
return GetRegionInfoUnsafe();
}
vector<fair::mq::RegionInfo> Manager::GetRegionInfoUnsafe()
{
vector<fair::mq::RegionInfo> result;
for (const auto& e : *fRegionInfos) {
fair::mq::RegionInfo info;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.id != 0) {
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
} else {
if (!e.second.fDestroyed) {
info.ptr = fSegment.get_address();
info.size = fSegment.get_size();
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
}
return result;
}
void Manager::SubscribeToRegionEvents(RegionEventCallback callback)
{
if (fRegionEventThread.joinable()) {
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all();
fRegionEventThread.join();
}
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventCallback = callback;
fRegionEventsSubscriptionActive = true;
fRegionEventThread = thread(&Manager::RegionEventsSubscription, this);
}
bool Manager::SubscribedToRegionEvents()
{
return fRegionEventThread.joinable();
}
void Manager::UnsubscribeFromRegionEvents()
{
if (fRegionEventThread.joinable()) {
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_all();
fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
}
}
void Manager::RegionEventsSubscription()
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
while (fRegionEventsSubscriptionActive) {
auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) {
auto el = fObservedRegionEvents.find(i.id);
if (el == fObservedRegionEvents.end()) {
fRegionEventCallback(i);
fObservedRegionEvents.emplace(i.id, i.event);
} else {
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i);
el->second = i.event;
} else {
// LOG(debug) << "ignoring event for id" << i.id << ":";
// LOG(debug) << "incoming event: " << i.event;
// LOG(debug) << "stored event: " << el->second;
}
}
}
fRegionEventsCV.wait(lock);
}
}
void Manager::RemoveSegments()
@@ -193,6 +309,8 @@ Manager::~Manager()
{
bool lastRemoved = false;
UnsubscribeFromRegionEvents();
try {
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
@@ -212,6 +330,7 @@ Manager::~Manager()
if (lastRemoved) {
bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str());
bipc::named_condition::remove(string("fmq_" + fShmId + "_cv").c_str());
}
}

View File

@@ -15,18 +15,24 @@
#ifndef FAIR_MQ_SHMEM_MANAGER_H_
#define FAIR_MQ_SHMEM_MANAGER_H_
#include "Region.h"
#include "Common.h"
#include "Region.h"
#include <FairMQLogger.h>
#include <FairMQUnmanagedRegion.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <string>
#include <unordered_map>
#include <set>
#include <stdexcept>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
namespace fair
{
@@ -56,17 +62,26 @@ class Manager
static void StartMonitor(const std::string&);
static void Interrupt();
static void Resume();
void Interrupt() { fInterrupted.store(true); }
void Resume() { fInterrupted.store(false); }
bool Interrupted() { return fInterrupted.load(); }
int GetDeviceCounter();
int IncrementDeviceCounter();
int DecrementDeviceCounter();
boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path = "", int flags = 0);
Region* GetRemoteRegion(const uint64_t id);
std::pair<boost::interprocess::mapped_region*, uint64_t> CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0);
Region* GetRegion(const uint64_t id);
Region* GetRegionUnsafe(const uint64_t id);
void RemoveRegion(const uint64_t id);
std::vector<fair::mq::RegionInfo> GetRegionInfo();
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe();
void SubscribeToRegionEvents(RegionEventCallback callback);
bool SubscribedToRegionEvents();
void UnsubscribeFromRegionEvents();
void RegionEventsSubscription();
void RemoveSegments();
private:
@@ -75,9 +90,20 @@ class Manager
std::string fManagementSegmentName;
boost::interprocess::managed_shared_memory fSegment;
boost::interprocess::managed_shared_memory fManagementSegment;
VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx;
boost::interprocess::named_condition fRegionEventsCV;
std::thread fRegionEventThread;
bool fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
DeviceCounter* fDeviceCounter;
static std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
Uint64RegionInfoMap* fRegionInfos;
std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
std::atomic<bool> fInterrupted;
};
} // namespace shmem

View File

@@ -29,7 +29,6 @@ namespace mq
namespace shmem
{
atomic<bool> Message::fInterrupted(false);
Transport Message::fTransportType = Transport::SHM;
Message::Message(Manager& manager, FairMQTransportFactory* factory)
@@ -113,7 +112,7 @@ bool Message::InitializeChunk(const size_t size)
} catch (bipc::bad_alloc& ba) {
// LOG(warn) << "Shared memory full...";
this_thread::sleep_for(chrono::milliseconds(50));
if (fInterrupted) {
if (fManager.Interrupted()) {
return false;
} else {
continue;
@@ -164,7 +163,7 @@ void* Message::GetData() const
fLocalPtr = nullptr;
}
} else {
fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId);
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
if (fRegionPtr) {
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->fRegion.get_address()) + fMeta.fHandle;
} else {
@@ -221,7 +220,7 @@ void Message::CloseMessage()
fMeta.fHandle = -1;
} else {
if (!fRegionPtr) {
fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId);
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
}
if (fRegionPtr) {

View File

@@ -65,7 +65,6 @@ class Message final : public fair::mq::Message
mutable Region* fRegionPtr;
mutable char* fLocalPtr;
static std::atomic<bool> fInterrupted;
static Transport fTransportType;
bool InitializeChunk(const size_t size);

View File

@@ -15,6 +15,7 @@
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -427,6 +428,15 @@ void Monitor::RemoveMutex(const string& name)
}
}
void Monitor::RemoveCondition(const string& name)
{
if (bipc::named_condition::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl;
} else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
}
void Monitor::Cleanup(const string& shmId)
{
string managementSegmentName("fmq_" + shmId + "_mng");
@@ -469,6 +479,7 @@ void Monitor::Cleanup(const string& shmId)
RemoveObject("fmq_" + shmId + "_main");
RemoveMutex("fmq_" + shmId + "_mtx");
RemoveCondition("fmq_" + shmId + "_cv");
cout << endl;
}

View File

@@ -42,6 +42,7 @@ class Monitor
static void RemoveFileMapping(const std::string&);
static void RemoveQueue(const std::string&);
static void RemoveMutex(const std::string&);
static void RemoveCondition(const std::string&);
struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; };

View File

@@ -19,6 +19,7 @@
#include <cerrno>
#include <chrono>
#include <ios>
using namespace std;
@@ -49,7 +50,17 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, Region
if (path != "") {
fName = string(path + fName);
fFile = fopen(fName.c_str(), fRemote ? "r+" : "w+");
if (!fRemote) {
// create a file
filebuf fbuf;
if (fbuf.open(fName, ios_base::in | ios_base::out | ios_base::trunc | ios_base::binary)) {
// set the size
fbuf.pubseekoff(size - 1, ios_base::beg);
fbuf.sputc(0);
}
}
fFile = fopen(fName.c_str(), "r+");
if (!fFile) {
LOG(error) << "Failed to initialize file: " << fName;
@@ -70,8 +81,8 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, Region
}
InitializeQueues();
StartSendingAcks();
LOG(debug) << "shmem: initialized region: " << fName;
fSendAcksWorker = thread(&Region::SendAcks, this);
}
void Region::InitializeQueues()
@@ -84,6 +95,11 @@ void Region::InitializeQueues()
LOG(debug) << "shmem: initialized region queue: " << fQueueName;
}
void Region::StartSendingAcks()
{
fSendAcksWorker = thread(&Region::SendAcks, this);
}
void Region::StartReceivingAcks()
{
fReceiveAcksWorker = thread(&Region::ReceiveAcks, this);
@@ -114,12 +130,12 @@ void Region::ReceiveAcks()
void Region::ReleaseBlock(const RegionBlock &block)
{
unique_lock<mutex> lock(fBlockLock);
unique_lock<mutex> lock(fBlockMtx);
fBlocksToFree.emplace_back(block);
if (fBlocksToFree.size() >= fAckBunchSize) {
lock.unlock(); // reduces contention on fBlockLock
lock.unlock(); // reduces contention on fBlockMtx
fBlockSendCV.notify_one();
}
}
@@ -132,7 +148,7 @@ void Region::SendAcks()
size_t blocksToSend = 0;
{ // mutex locking block
unique_lock<mutex> lock(fBlockLock);
unique_lock<mutex> lock(fBlockMtx);
// try to get more blocks without waiting (we can miss a notify from CloseMessage())
if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) {
@@ -166,6 +182,7 @@ Region::~Region()
fStop = true;
if (fSendAcksWorker.joinable()) {
fBlockSendCV.notify_one();
fSendAcksWorker.join();
}

View File

@@ -49,11 +49,11 @@ struct Region
void InitializeQueues();
void StartSendingAcks();
void SendAcks();
void StartReceivingAcks();
void ReceiveAcks();
void ReleaseBlock(const RegionBlock &);
void SendAcks();
~Region();
@@ -67,7 +67,7 @@ struct Region
boost::interprocess::file_mapping fFileMapping;
boost::interprocess::mapped_region fRegion;
std::mutex fBlockLock;
std::mutex fBlockMtx;
std::condition_variable fBlockSendCV;
std::vector<RegionBlock> fBlocksToFree;
const std::size_t fAckBunchSize = 256;

View File

@@ -28,8 +28,6 @@ namespace mq
namespace shmem
{
atomic<bool> Socket::fInterrupted(false);
struct ZMsg
{
ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
@@ -133,7 +131,7 @@ int Socket::Send(MessagePtr& msg, const int timeout)
ZMsg zmqMsg(sizeof(MetaHeader));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true && !fInterrupted) {
while (true && !fManager.Interrupted()) {
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
shmMsg->fQueued = true;
@@ -142,7 +140,7 @@ int Socket::Send(MessagePtr& msg, const int timeout)
fBytesTx += size;
return size;
} else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) {
elapsed += fSndTimeout;
if (elapsed >= timeout) {
@@ -198,7 +196,7 @@ int Socket::Receive(MessagePtr& msg, const int timeout)
++fMessagesRx;
return size;
} else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) {
elapsed += fRcvTimeout;
if (elapsed >= timeout) {
@@ -242,7 +240,7 @@ int64_t Socket::Send(vector<MessagePtr>& msgVec, const int timeout)
std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
}
while (!fInterrupted) {
while (!fManager.Interrupted()) {
int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
@@ -260,7 +258,7 @@ int64_t Socket::Send(vector<MessagePtr>& msgVec, const int timeout)
return totalSize;
} else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) {
elapsed += fSndTimeout;
if (elapsed >= timeout) {
@@ -296,7 +294,7 @@ int64_t Socket::Receive(vector<MessagePtr>& msgVec, const int timeout)
ZMsg zmqMsg;
while (!fInterrupted) {
while (!fManager.Interrupted()) {
int64_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) {
@@ -327,7 +325,7 @@ int64_t Socket::Receive(vector<MessagePtr>& msgVec, const int timeout)
return totalSize;
} else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) {
elapsed += fRcvTimeout;
if (elapsed >= timeout) {
@@ -365,20 +363,6 @@ void Socket::Close()
fSocket = nullptr;
}
void Socket::Interrupt()
{
Manager::Interrupt();
Message::fInterrupted = true;
fInterrupted = true;
}
void Socket::Resume()
{
Manager::Resume();
Message::fInterrupted = false;
fInterrupted = false;
}
void Socket::SetOption(const string& option, const void* value, size_t valueSize)
{
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {

View File

@@ -46,9 +46,6 @@ class Socket final : public fair::mq::Socket
void Close() override;
static void Interrupt();
static void Resume();
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
@@ -81,8 +78,6 @@ class Socket final : public fair::mq::Socket
std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx;
static std::atomic<bool> fInterrupted;
int fSndTimeout;
int fRcvTimeout;
};

View File

@@ -159,9 +159,34 @@ PollerPtr TransportFactory::CreatePoller(const unordered_map<string, vector<Fair
return tools::make_unique<Poller>(channelsMap, channelList);
}
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
{
return tools::make_unique<UnmanagedRegion>(*fManager, size, callback, path, flags);
return tools::make_unique<UnmanagedRegion>(*fManager, size, callback, path, flags, this);
}
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
{
return tools::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, path, flags, this);
}
void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback)
{
fManager->SubscribeToRegionEvents(callback);
}
bool TransportFactory::SubscribedToRegionEvents()
{
return fManager->SubscribedToRegionEvents();
}
void TransportFactory::UnsubscribeFromRegionEvents()
{
fManager->UnsubscribeFromRegionEvents();
}
vector<fair::mq::RegionInfo> TransportFactory::GetRegionInfo()
{
return fManager->GetRegionInfo();
}
Transport TransportFactory::GetType() const

View File

@@ -49,12 +49,18 @@ class TransportFactory final : public fair::mq::TransportFactory
PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override;
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override;
void SubscribeToRegionEvents(RegionEventCallback callback) override;
bool SubscribedToRegionEvents() override;
void UnsubscribeFromRegionEvents() override;
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
Transport GetType() const override;
void Interrupt() override { Socket::Interrupt(); }
void Resume() override { Socket::Resume(); }
void Interrupt() override { fManager->Interrupt(); }
void Resume() override { fManager->Resume(); }
void Reset() override;
void IncrementMsgCounter() { ++fMsgCounter; }

View File

@@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Common.h"
#include "UnmanagedRegion.h"
using namespace std;
namespace bipc = ::boost::interprocess;
namespace fair
{
namespace mq
{
namespace shmem
{
UnmanagedRegion::UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
: fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
try {
RegionCounter* rc = fManager.ManagementSegment().find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing.";
(rc->fCount)++;
LOG(debug) << "incremented region counter, now: " << rc->fCount;
} else {
LOG(debug) << "no region counter found, creating one and initializing with 1";
rc = fManager.ManagementSegment().construct<RegionCounter>(bipc::unique_instance)(1);
LOG(debug) << "initialized region counter with: " << rc->fCount;
}
fRegionId = rc->fCount;
fRegion = fManager.CreateRegion(size, fRegionId, callback, path, flags);
} catch (bipc::interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what();
throw;
}
}
}
}
}

View File

@@ -36,10 +36,24 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
friend class Socket;
public:
UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr)
: UnmanagedRegion(manager, size, 0, callback, path, flags, factory)
{}
UnmanagedRegion(Manager& manager, const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr)
: FairMQUnmanagedRegion(factory)
, fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
auto result = fManager.CreateRegion(size, userFlags, callback, path, flags);
fRegion = result.first;
fRegionId = result.second;
}
void* GetData() const override { return fRegion->get_address(); }
size_t GetSize() const override { return fRegion->get_size(); }
uint64_t GetId() const override { return fRegionId; }
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
@@ -53,4 +67,4 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
}
}
#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */
#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */

View File

@@ -9,6 +9,8 @@
#include "FairMQTransportFactoryZMQ.h"
#include <zmq.h>
#include <algorithm> // find_if
using namespace std;
fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transport::ZMQ;
@@ -16,6 +18,7 @@ fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transp
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config)
: FairMQTransportFactory(id)
, fContext(zmq_ctx_new())
, fRegionCounter(0)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
@@ -47,6 +50,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fai
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
}
fRegionEvents.emplace(0, nullptr, 0, 0, fair::mq::RegionEvent::local_only);
}
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage()
@@ -80,7 +84,7 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChann
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
}
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector<FairMQChannel*>& channels) const
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel*>& channels) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
}
@@ -90,9 +94,98 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<stri
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback, path, flags));
return CreateUnmanagedRegion(size, 0, callback, path, flags);
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
{
unique_ptr<FairMQUnmanagedRegion> ptr = nullptr;
{
lock_guard<mutex> lock(fMtx);
++fRegionCounter;
ptr = unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, path, flags, this));
auto zPtr = static_cast<FairMQUnmanagedRegionZMQ*>(ptr.get());
fRegionInfos.emplace_back(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
fRegionEvents.emplace(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
}
fRegionEventsCV.notify_one();
return ptr;
}
void FairMQTransportFactoryZMQ::SubscribeToRegionEvents(FairMQRegionEventCallback callback)
{
if (fRegionEventThread.joinable()) {
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
{
lock_guard<mutex> lock(fMtx);
fRegionEventsSubscriptionActive = false;
}
fRegionEventsCV.notify_one();
fRegionEventThread.join();
}
lock_guard<mutex> lock(fMtx);
fRegionEventCallback = callback;
fRegionEventsSubscriptionActive = true;
fRegionEventThread = thread(&FairMQTransportFactoryZMQ::RegionEventsSubscription, this);
}
bool FairMQTransportFactoryZMQ::SubscribedToRegionEvents()
{
return fRegionEventThread.joinable();
}
void FairMQTransportFactoryZMQ::UnsubscribeFromRegionEvents()
{
if (fRegionEventThread.joinable()) {
unique_lock<mutex> lock(fMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_one();
fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
}
}
void FairMQTransportFactoryZMQ::RegionEventsSubscription()
{
unique_lock<mutex> lock(fMtx);
while (fRegionEventsSubscriptionActive) {
while (!fRegionEvents.empty()) {
auto i = fRegionEvents.front();
fRegionEventCallback(i);
fRegionEvents.pop();
}
fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); });
}
}
vector<fair::mq::RegionInfo> FairMQTransportFactoryZMQ::GetRegionInfo()
{
lock_guard<mutex> lock(fMtx);
return fRegionInfos;
}
void FairMQTransportFactoryZMQ::RemoveRegion(uint64_t id)
{
{
lock_guard<mutex> lock(fMtx);
auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) {
return i.id == id;
});
if (it != fRegionInfos.end()) {
fRegionEvents.push(*it);
fRegionEvents.back().event = fair::mq::RegionEvent::destroyed;
fRegionInfos.erase(it);
} else {
LOG(error) << "RemoveRegion: given id (" << id << ") not found.";
}
}
fRegionEventsCV.notify_one();
}
fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
@@ -103,23 +196,19 @@ fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
{
LOG(debug) << "Destroying ZeroMQ transport...";
if (fContext)
{
if (zmq_ctx_term(fContext) != 0)
{
if (errno == EINTR)
{
UnsubscribeFromRegionEvents();
if (fContext) {
if (zmq_ctx_term(fContext) != 0) {
if (errno == EINTR) {
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno);
}
else
{
} else {
fContext = nullptr;
return;
}
}
}
else
{
} else {
LOG(error) << "context not available for shutdown";
}
}

View File

@@ -15,9 +15,6 @@
#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_
#define FAIRMQTRANSPORTFACTORYZMQ_H_
#include <vector>
#include <string>
#include "FairMQTransportFactory.h"
#include "FairMQMessageZMQ.h"
#include "FairMQSocketZMQ.h"
@@ -25,6 +22,14 @@
#include "FairMQUnmanagedRegionZMQ.h"
#include <fairmq/ProgOptions.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
{
public:
@@ -32,8 +37,6 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete;
FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete;
~FairMQTransportFactoryZMQ() override;
FairMQMessagePtr CreateMessage() override;
FairMQMessagePtr CreateMessage(const size_t size) override;
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
@@ -45,7 +48,15 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override;
bool SubscribedToRegionEvents() override;
void UnsubscribeFromRegionEvents() override;
void RegionEventsSubscription();
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
void RemoveRegion(uint64_t id);
fair::mq::Transport GetType() const override;
@@ -53,9 +64,20 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
void Resume() override { FairMQSocketZMQ::Resume(); }
void Reset() override {}
~FairMQTransportFactoryZMQ() override;
private:
static fair::mq::Transport fTransportType;
void* fContext;
std::mutex fMtx;
uint64_t fRegionCounter;
std::condition_variable fRegionEventsCV;
std::vector<fair::mq::RegionInfo> fRegionInfos;
std::queue<fair::mq::RegionInfo> fRegionEvents;
std::thread fRegionEventThread;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
bool fRegionEventsSubscriptionActive;
};
#endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */

View File

@@ -7,16 +7,17 @@
********************************************************************************/
#include "FairMQUnmanagedRegionZMQ.h"
#include "FairMQTransportFactoryZMQ.h"
#include "FairMQLogger.h"
using namespace std;
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */)
: fBuffer(malloc(size))
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
: FairMQUnmanagedRegion(factory)
, fId(id)
, fBuffer(malloc(size))
, fSize(size)
, fUserFlags(userFlags)
, fCallback(callback)
{
}
{}
void* FairMQUnmanagedRegionZMQ::GetData() const
{
@@ -31,5 +32,6 @@ size_t FairMQUnmanagedRegionZMQ::GetSize() const
FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ()
{
LOG(debug) << "destroying region";
static_cast<FairMQTransportFactoryZMQ*>(GetTransport())->RemoveRegion(fId);
free(fBuffer);
}

View File

@@ -13,6 +13,7 @@
#include <cstddef> // size_t
#include <string>
class FairMQTransportFactory;
class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
{
@@ -20,19 +21,24 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
friend class FairMQMessageZMQ;
public:
FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr);
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
virtual void* GetData() const override;
virtual size_t GetSize() const override;
uint64_t GetId() const override { return fId; }
int64_t GetUserFlags() const { return fUserFlags; }
virtual ~FairMQUnmanagedRegionZMQ();
private:
uint64_t fId;
void* fBuffer;
size_t fSize;
int64_t fUserFlags;
FairMQRegionCallback fCallback;
};
#endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */
#endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */

View File

@@ -96,6 +96,19 @@ add_testsuite(Message
${definitions}
)
add_testsuite(Region
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
region/_region.cxx
LINKS FairMQ
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/region
${CMAKE_CURRENT_BINARY_DIR}
TIMEOUT 5
${definitions}
)
add_testsuite(Device
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx

View File

@@ -24,16 +24,17 @@ TEST(Format, Construction)
Cmds checkStateCmds(make<CheckState>());
Cmds changeStateCmds(make<ChangeState>(Transition::Stop));
Cmds dumpConfigCmds(make<DumpConfig>());
Cmds subscribeToStateChangeCmds(make<SubscribeToStateChange>());
Cmds subscribeToStateChangeCmds(make<SubscribeToStateChange>(60000));
Cmds unsubscribeFromStateChangeCmds(make<UnsubscribeFromStateChange>());
Cmds stateChangeExitingReceivedCmds(make<StateChangeExitingReceived>());
Cmds getPropertiesCmds(make<GetProperties>(66, "k[12]"));
Cmds setPropertiesCmds(make<SetProperties>(42, props));
Cmds subscriptionHeartbeatCmds(make<SubscriptionHeartbeat>(60000));
Cmds currentStateCmds(make<CurrentState>("somedeviceid", State::Running));
Cmds transitionStatusCmds(make<TransitionStatus>("somedeviceid", 123456, Result::Ok, Transition::Stop));
Cmds configCmds(make<Config>("somedeviceid", "someconfig"));
Cmds stateChangeSubscriptionCmds(make<StateChangeSubscription>("somedeviceid", Result::Ok));
Cmds stateChangeUnsubscriptionCmds(make<StateChangeUnsubscription>("somedeviceid", Result::Ok));
Cmds stateChangeSubscriptionCmds(make<StateChangeSubscription>("somedeviceid", 123456, Result::Ok));
Cmds stateChangeUnsubscriptionCmds(make<StateChangeUnsubscription>("somedeviceid", 123456, Result::Ok));
Cmds stateChangeCmds(make<StateChange>("somedeviceid", 123456, State::Running, State::Ready));
Cmds propertiesCmds(make<Properties>("somedeviceid", 66, Result::Ok, props));
Cmds propertiesSetCmds(make<PropertiesSet>("somedeviceid", 42, Result::Ok));
@@ -43,6 +44,7 @@ TEST(Format, Construction)
ASSERT_EQ(static_cast<ChangeState&>(changeStateCmds.At(0)).GetTransition(), Transition::Stop);
ASSERT_EQ(dumpConfigCmds.At(0).GetType(), Type::dump_config);
ASSERT_EQ(subscribeToStateChangeCmds.At(0).GetType(), Type::subscribe_to_state_change);
ASSERT_EQ(static_cast<SubscribeToStateChange&>(subscribeToStateChangeCmds.At(0)).GetInterval(), 60000);
ASSERT_EQ(unsubscribeFromStateChangeCmds.At(0).GetType(), Type::unsubscribe_from_state_change);
ASSERT_EQ(stateChangeExitingReceivedCmds.At(0).GetType(), Type::state_change_exiting_received);
ASSERT_EQ(getPropertiesCmds.At(0).GetType(), Type::get_properties);
@@ -51,6 +53,8 @@ TEST(Format, Construction)
ASSERT_EQ(setPropertiesCmds.At(0).GetType(), Type::set_properties);
ASSERT_EQ(static_cast<SetProperties&>(setPropertiesCmds.At(0)).GetRequestId(), 42);
ASSERT_EQ(static_cast<SetProperties&>(setPropertiesCmds.At(0)).GetProps(), props);
ASSERT_EQ(subscriptionHeartbeatCmds.At(0).GetType(), Type::subscription_heartbeat);
ASSERT_EQ(static_cast<SubscriptionHeartbeat&>(subscriptionHeartbeatCmds.At(0)).GetInterval(), 60000);
ASSERT_EQ(currentStateCmds.At(0).GetType(), Type::current_state);
ASSERT_EQ(static_cast<CurrentState&>(currentStateCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<CurrentState&>(currentStateCmds.At(0)).GetCurrentState(), State::Running);
@@ -64,9 +68,11 @@ TEST(Format, Construction)
ASSERT_EQ(static_cast<Config&>(configCmds.At(0)).GetConfig(), "someconfig");
ASSERT_EQ(stateChangeSubscriptionCmds.At(0).GetType(), Type::state_change_subscription);
ASSERT_EQ(static_cast<StateChangeSubscription&>(stateChangeSubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<StateChangeSubscription&>(stateChangeSubscriptionCmds.At(0)).GetTaskId(), 123456);
ASSERT_EQ(static_cast<StateChangeSubscription&>(stateChangeSubscriptionCmds.At(0)).GetResult(), Result::Ok);
ASSERT_EQ(stateChangeUnsubscriptionCmds.At(0).GetType(), Type::state_change_unsubscription);
ASSERT_EQ(static_cast<StateChangeUnsubscription&>(stateChangeUnsubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<StateChangeUnsubscription&>(stateChangeUnsubscriptionCmds.At(0)).GetTaskId(), 123456);
ASSERT_EQ(static_cast<StateChangeUnsubscription&>(stateChangeUnsubscriptionCmds.At(0)).GetResult(), Result::Ok);
ASSERT_EQ(stateChangeCmds.At(0).GetType(), Type::state_change);
ASSERT_EQ(static_cast<StateChange&>(stateChangeCmds.At(0)).GetDeviceId(), "somedeviceid");
@@ -91,16 +97,17 @@ void fillCommands(Cmds& cmds)
cmds.Add<CheckState>();
cmds.Add<ChangeState>(Transition::Stop);
cmds.Add<DumpConfig>();
cmds.Add<SubscribeToStateChange>();
cmds.Add<SubscribeToStateChange>(60000);
cmds.Add<UnsubscribeFromStateChange>();
cmds.Add<StateChangeExitingReceived>();
cmds.Add<GetProperties>(66, "k[12]");
cmds.Add<SetProperties>(42, props);
cmds.Add<SubscriptionHeartbeat>(60000);
cmds.Add<CurrentState>("somedeviceid", State::Running);
cmds.Add<TransitionStatus>("somedeviceid", 123456, Result::Ok, Transition::Stop);
cmds.Add<Config>("somedeviceid", "someconfig");
cmds.Add<StateChangeSubscription>("somedeviceid", Result::Ok);
cmds.Add<StateChangeUnsubscription>("somedeviceid", Result::Ok);
cmds.Add<StateChangeSubscription>("somedeviceid", 123456, Result::Ok);
cmds.Add<StateChangeUnsubscription>("somedeviceid", 123456, Result::Ok);
cmds.Add<StateChange>("somedeviceid", 123456, State::Running, State::Ready);
cmds.Add<Properties>("somedeviceid", 66, Result::Ok, props);
cmds.Add<PropertiesSet>("somedeviceid", 42, Result::Ok);
@@ -108,7 +115,7 @@ void fillCommands(Cmds& cmds)
void checkCommands(Cmds& cmds)
{
ASSERT_EQ(cmds.Size(), 16);
ASSERT_EQ(cmds.Size(), 17);
int count = 0;
auto const props(std::vector<std::pair<std::string, std::string>>({{"k1", "v1"}, {"k2", "v2"}}));
@@ -127,6 +134,7 @@ void checkCommands(Cmds& cmds)
break;
case Type::subscribe_to_state_change:
++count;
ASSERT_EQ(static_cast<SubscribeToStateChange&>(*cmd).GetInterval(), 60000);
break;
case Type::unsubscribe_from_state_change:
++count;
@@ -144,6 +152,10 @@ void checkCommands(Cmds& cmds)
ASSERT_EQ(static_cast<SetProperties&>(*cmd).GetRequestId(), 42);
ASSERT_EQ(static_cast<SetProperties&>(*cmd).GetProps(), props);
break;
case Type::subscription_heartbeat:
++count;
ASSERT_EQ(static_cast<SubscriptionHeartbeat&>(*cmd).GetInterval(), 60000);
break;
case Type::current_state:
++count;
ASSERT_EQ(static_cast<CurrentState&>(*cmd).GetDeviceId(), "somedeviceid");
@@ -164,11 +176,13 @@ void checkCommands(Cmds& cmds)
case Type::state_change_subscription:
++count;
ASSERT_EQ(static_cast<StateChangeSubscription&>(*cmd).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<StateChangeSubscription&>(*cmd).GetTaskId(), 123456);
ASSERT_EQ(static_cast<StateChangeSubscription&>(*cmd).GetResult(), Result::Ok);
break;
case Type::state_change_unsubscription:
++count;
ASSERT_EQ(static_cast<StateChangeUnsubscription&>(*cmd).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<StateChangeUnsubscription&>(*cmd).GetTaskId(), 123456);
ASSERT_EQ(static_cast<StateChangeUnsubscription&>(*cmd).GetResult(), Result::Ok);
break;
case Type::state_change:
@@ -197,7 +211,7 @@ void checkCommands(Cmds& cmds)
}
}
ASSERT_EQ(count, 16);
ASSERT_EQ(count, 17);
}
TEST(Format, SerializationBinary)

103
test/region/_region.cxx Normal file
View File

@@ -0,0 +1,103 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <FairMQLogger.h>
#include <FairMQTransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/Tools.h>
#include <gtest/gtest.h>
#include <string>
namespace
{
using namespace std;
void RegionEventSubscriptions(const string& transport)
{
size_t session{fair::mq::tools::UuidHash()};
fair::mq::ProgOptions config;
config.SetProperty<string>("session", to_string(session));
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
constexpr int size1 = 1000000;
constexpr int size2 = 5000000;
constexpr int64_t userFlags = 12345;
fair::mq::tools::SharedSemaphore blocker;
{
auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {});
void* ptr1 = region1->GetData();
uint64_t id1 = region1->GetId();
ASSERT_EQ(region1->GetSize(), size1);
auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {});
void* ptr2 = region2->GetData();
uint64_t id2 = region2->GetId();
ASSERT_EQ(region2->GetSize(), size2);
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
factory->SubscribeToRegionEvents([&](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event;
LOG(warn) << "id: " << info.id;
LOG(warn) << "ptr: " << info.ptr;
LOG(warn) << "size: " << info.size;
LOG(warn) << "flags: " << info.flags;
if (info.event == FairMQRegionEvent::created) {
if (info.id == id1) {
ASSERT_EQ(info.size, size1);
ASSERT_EQ(info.ptr, ptr1);
blocker.Signal();
} else if (info.id == id2) {
ASSERT_EQ(info.size, size2);
ASSERT_EQ(info.ptr, ptr2);
ASSERT_EQ(info.flags, userFlags);
blocker.Signal();
}
} else if (info.event == FairMQRegionEvent::destroyed) {
if (info.id == id1) {
blocker.Signal();
} else if (info.id == id2) {
blocker.Signal();
}
}
});
ASSERT_EQ(factory->SubscribedToRegionEvents(), true);
LOG(info) << "waiting for blockers...";
blocker.Wait();
LOG(info) << "1 done.";
blocker.Wait();
LOG(info) << "2 done.";
}
blocker.Wait();
LOG(info) << "3 done.";
blocker.Wait();
LOG(info) << "4 done.";
LOG(info) << "All done.";
factory->UnsubscribeFromRegionEvents();
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
}
TEST(EventSubscriptions, zeromq)
{
RegionEventSubscriptions("zeromq");
}
TEST(EventSubscriptions, shmem)
{
RegionEventSubscriptions("shmem");
}
} // namespace