Add region events subscriptions

This commit is contained in:
Alexey Rybalchenko 2020-04-26 22:41:22 +02:00
parent 5721ea9510
commit e1a113aabe
35 changed files with 445 additions and 188 deletions

View File

@ -35,6 +35,14 @@ void Sampler::InitTask()
fMsgSize = fConfig->GetProperty<int>("msg-size"); fMsgSize = fConfig->GetProperty<int>("msg-size");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event;
LOG(warn) << "id: " << info.id;
LOG(warn) << "ptr: " << info.ptr;
LOG(warn) << "size: " << info.size;
LOG(warn) << "flags: " << info.flags;
});
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
0, 0,
10000000, 10000000,
@ -82,6 +90,7 @@ void Sampler::ResetTask()
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs; LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;
} }
fRegion.reset(); fRegion.reset();
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
} }
Sampler::~Sampler() Sampler::~Sampler()

View File

@ -29,6 +29,13 @@ void Sink::InitTask()
{ {
// Get the fMaxIterations value from the command line options (via fConfig) // Get the fMaxIterations value from the command line options (via fConfig)
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
LOG(warn) << ">>>" << info.event;
LOG(warn) << "id: " << info.id;
LOG(warn) << "ptr: " << info.ptr;
LOG(warn) << "size: " << info.size;
LOG(warn) << "flags: " << info.flags;
});
} }
void Sink::Run() void Sink::Run()
@ -50,6 +57,11 @@ void Sink::Run()
} }
} }
void Sink::ResetTask()
{
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
}
Sink::~Sink() Sink::~Sink()
{ {
} }

View File

@ -31,6 +31,7 @@ class Sink : public FairMQDevice
protected: protected:
virtual void Run(); virtual void Run();
virtual void InitTask(); virtual void InitTask();
virtual void ResetTask();
private: private:
uint64_t fMaxIterations; uint64_t fMaxIterations;

View File

@ -19,6 +19,7 @@ SAMPLER+=" --id sampler1"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
SAMPLER+=" --severity debug" SAMPLER+=" --severity debug"
SAMPLER+=" --session $SESSION" SAMPLER+=" --session $SESSION"
SAMPLER+=" --verbosity veryhigh"
SAMPLER+=" --control static --color false" SAMPLER+=" --control static --color false"
SAMPLER+=" --max-iterations 1" SAMPLER+=" --max-iterations 1"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"

View File

@ -245,7 +245,6 @@ if(BUILD_FAIRMQ)
plugins/Control.cxx plugins/Control.cxx
shmem/Message.cxx shmem/Message.cxx
shmem/Poller.cxx shmem/Poller.cxx
shmem/UnmanagedRegion.cxx
shmem/Socket.cxx shmem/Socket.cxx
shmem/TransportFactory.cxx shmem/TransportFactory.cxx
shmem/Manager.cxx shmem/Manager.cxx

View File

@ -340,6 +340,11 @@ class FairMQChannel
return Transport()->CreateUnmanagedRegion(size, callback, path, flags); return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
} }
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0)
{
return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags);
}
static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT; static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
static constexpr const char* DefaultTransportName = "default"; static constexpr const char* DefaultTransportName = "default";
static constexpr const char* DefaultName = ""; static constexpr const char* DefaultName = "";

View File

@ -217,17 +217,47 @@ class FairMQDevice
} }
// creates unamanaged region with the default device transport // creates unamanaged region with the default device transport
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{ {
return Transport()->CreateUnmanagedRegion(size, callback); return Transport()->CreateUnmanagedRegion(size, callback, path, flags);
}
// creates unamanaged region with the default device transport
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size,
const int64_t userFlags,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{
return Transport()->CreateUnmanagedRegion(size, userFlags, callback, path, flags);
} }
// creates unmanaged region with the transport of the specified channel // creates unmanaged region with the transport of the specified channel
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel,
int index,
const size_t size,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{ {
return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags); return GetChannel(channel, index).NewUnmanagedRegion(size, callback, path, flags);
} }
// creates unmanaged region with the transport of the specified channel
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel,
int index,
const size_t size,
const int64_t userFlags,
FairMQRegionCallback callback = nullptr,
const std::string& path = "",
int flags = 0)
{
return GetChannel(channel, index).NewUnmanagedRegion(size, userFlags, callback, path, flags);
}
template<typename ...Ts> template<typename ...Ts>
FairMQPollerPtr NewPoller(const Ts&... inputs) FairMQPollerPtr NewPoller(const Ts&... inputs)
{ {

View File

@ -7,8 +7,8 @@
********************************************************************************/ ********************************************************************************/
#include <FairMQTransportFactory.h> #include <FairMQTransportFactory.h>
#include <zeromq/FairMQTransportFactoryZMQ.h>
#include <fairmq/shmem/TransportFactory.h> #include <fairmq/shmem/TransportFactory.h>
#include <zeromq/FairMQTransportFactoryZMQ.h>
#ifdef BUILD_NANOMSG_TRANSPORT #ifdef BUILD_NANOMSG_TRANSPORT
#include <nanomsg/FairMQTransportFactoryNN.h> #include <nanomsg/FairMQTransportFactoryNN.h>
#endif /* BUILD_NANOMSG_TRANSPORT */ #endif /* BUILD_NANOMSG_TRANSPORT */
@ -17,50 +17,46 @@
#endif #endif
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <fairmq/tools/Unique.h> #include <fairmq/tools/Unique.h>
#include <memory> #include <memory>
#include <string> #include <string>
FairMQTransportFactory::FairMQTransportFactory(const std::string& id) using namespace std;
FairMQTransportFactory::FairMQTransportFactory(const string& id)
: fkId(id) : fkId(id)
{ {}
}
auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id, const fair::mq::ProgOptions* config) -> std::shared_ptr<FairMQTransportFactory> auto FairMQTransportFactory::CreateTransportFactory(const string& type,
const string& id,
const fair::mq::ProgOptions* config)
-> shared_ptr<FairMQTransportFactory>
{ {
using namespace std;
auto finalId = id; auto finalId = id;
// Generate uuid if empty // Generate uuid if empty
if (finalId == "") if (finalId == "") {
{
finalId = fair::mq::tools::Uuid(); finalId = fair::mq::tools::Uuid();
} }
if (type == "zeromq") if (type == "zeromq") {
{
return make_shared<FairMQTransportFactoryZMQ>(finalId, config); return make_shared<FairMQTransportFactoryZMQ>(finalId, config);
} } else if (type == "shmem") {
else if (type == "shmem")
{
return make_shared<fair::mq::shmem::TransportFactory>(finalId, config); return make_shared<fair::mq::shmem::TransportFactory>(finalId, config);
} }
#ifdef BUILD_NANOMSG_TRANSPORT #ifdef BUILD_NANOMSG_TRANSPORT
else if (type == "nanomsg") else if (type == "nanomsg") {
{
return make_shared<FairMQTransportFactoryNN>(finalId, config); return make_shared<FairMQTransportFactoryNN>(finalId, config);
} }
#endif /* BUILD_NANOMSG_TRANSPORT */ #endif /* BUILD_NANOMSG_TRANSPORT */
#ifdef BUILD_OFI_TRANSPORT #ifdef BUILD_OFI_TRANSPORT
else if (type == "ofi") else if (type == "ofi") {
{
return make_shared<fair::mq::ofi::TransportFactory>(finalId, config); return make_shared<fair::mq::ofi::TransportFactory>(finalId, config);
} }
#endif /* BUILD_OFI_TRANSPORT */ #endif /* BUILD_OFI_TRANSPORT */
else else {
{ LOG(error) << "Unavailable transport requested: "
LOG(error) << "Unavailable transport requested: " << "\"" << type << "\"" << ". Available are: " << "\"" << type << "\""
<< ". Available are: "
<< "\"zeromq\"" << "\"zeromq\""
<< "\"shmem\"" << "\"shmem\""
#ifdef BUILD_NANOMSG_TRANSPORT #ifdef BUILD_NANOMSG_TRANSPORT

View File

@ -75,6 +75,12 @@ class FairMQTransportFactory
virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0; virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0; virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0;
virtual void SubscribeToRegionEvents(FairMQRegionEventCallback callback) = 0;
virtual void UnsubscribeFromRegionEvents() = 0;
virtual std::vector<FairMQRegionInfo> GetRegionInfo() = 0;
/// Get transport type /// Get transport type
virtual fair::mq::Transport GetType() const = 0; virtual fair::mq::Transport GetType() const = 0;

View File

@ -12,8 +12,24 @@
#include <cstddef> // size_t #include <cstddef> // size_t
#include <memory> // std::unique_ptr #include <memory> // std::unique_ptr
#include <functional> // std::function #include <functional> // std::function
#include <ostream> // std::ostream
enum class FairMQRegionEvent : int
{
created,
destroyed
};
struct FairMQRegionInfo {
uint64_t id; // id of the region
void* ptr; // pointer to the start of the region
size_t size; // region size
int64_t flags; // custom flags set by the creator
FairMQRegionEvent event;
};
using FairMQRegionCallback = std::function<void(void*, size_t, void*)>; using FairMQRegionCallback = std::function<void(void*, size_t, void*)>;
using FairMQRegionEventCallback = std::function<void(FairMQRegionInfo)>;
class FairMQUnmanagedRegion class FairMQUnmanagedRegion
{ {
@ -26,12 +42,24 @@ class FairMQUnmanagedRegion
using FairMQUnmanagedRegionPtr = std::unique_ptr<FairMQUnmanagedRegion>; using FairMQUnmanagedRegionPtr = std::unique_ptr<FairMQUnmanagedRegion>;
inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event)
{
if (event == FairMQRegionEvent::created) {
return os << "created";
} else {
return os << "destroyed";
}
}
namespace fair namespace fair
{ {
namespace mq namespace mq
{ {
using RegionCallback = FairMQRegionCallback; using RegionCallback = FairMQRegionCallback;
using RegionEventCallback = FairMQRegionEventCallback;
using RegionEvent = FairMQRegionEvent;
using RegionInfo = FairMQRegionInfo;
using UnmanagedRegion = FairMQUnmanagedRegion; using UnmanagedRegion = FairMQUnmanagedRegion;
using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr; using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr;

View File

@ -70,6 +70,11 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const s
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback, path, flags)); return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback, path, flags));
} }
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags));
}
fair::mq::Transport FairMQTransportFactoryNN::GetType() const fair::mq::Transport FairMQTransportFactoryNN::GetType() const
{ {
return fTransportType; return fTransportType;

View File

@ -37,6 +37,11 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; }
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; }
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for nanomsg, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
fair::mq::Transport GetType() const override; fair::mq::Transport GetType() const override;

View File

@ -18,6 +18,13 @@ FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegion
{ {
} }
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */)
: fBuffer(malloc(size))
, fSize(size)
, fCallback(callback)
{
}
void* FairMQUnmanagedRegionNN::GetData() const void* FairMQUnmanagedRegionNN::GetData() const
{ {
return fBuffer; return fBuffer;

View File

@ -20,6 +20,8 @@ class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion
public: public:
FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionNN(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete; FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete;
FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete; FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete;
@ -34,4 +36,4 @@ class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion
FairMQRegionCallback fCallback; FairMQRegionCallback fCallback;
}; };
#endif /* FAIRMQUNMANAGEDREGIONNN_H_ */ #endif /* FAIRMQUNMANAGEDREGIONNN_H_ */

View File

@ -90,6 +90,11 @@ auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegion
throw runtime_error{"Not yet implemented UMR."}; throw runtime_error{"Not yet implemented UMR."};
} }
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr
{
throw runtime_error{"Not yet implemented UMR."};
}
auto TransportFactory::GetType() const -> Transport auto TransportFactory::GetType() const -> Transport
{ {
return Transport::OFI; return Transport::OFI;

View File

@ -46,7 +46,12 @@ class TransportFactory final : public FairMQTransportFactory
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override; auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override; auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override; auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override;
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override;
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; }
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; }
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
auto GetType() const -> Transport override; auto GetType() const -> Transport override;

View File

@ -41,15 +41,21 @@ struct RegionInfo
RegionInfo(const VoidAlloc& alloc) RegionInfo(const VoidAlloc& alloc)
: fPath("", alloc) : fPath("", alloc)
, fFlags(0) , fFlags(0)
, fUserFlags(0)
, fDestroyed(false)
{} {}
RegionInfo(const char* path, int flags, const VoidAlloc& alloc) RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc)
: fPath(path, alloc) : fPath(path, alloc)
, fFlags(flags) , fFlags(flags)
, fUserFlags(userFlags)
, fDestroyed(false)
{} {}
Str fPath; Str fPath;
int fFlags; int fFlags;
uint64_t fUserFlags;
bool fDestroyed;
}; };
using Uint64RegionInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint64_t, RegionInfo>, SegmentManager>; using Uint64RegionInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint64_t, RegionInfo>, SegmentManager>;

View File

@ -7,7 +7,6 @@
********************************************************************************/ ********************************************************************************/
#include "Manager.h" #include "Manager.h"
#include "Common.h"
#include <fairmq/tools/CppSTL.h> #include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
@ -19,6 +18,7 @@ using namespace std;
using bie = ::boost::interprocess::interprocess_exception; using bie = ::boost::interprocess::interprocess_exception;
namespace bipc = ::boost::interprocess; namespace bipc = ::boost::interprocess;
namespace bfs = ::boost::filesystem; namespace bfs = ::boost::filesystem;
namespace bpt = ::boost::posix_time;
namespace fair namespace fair
{ {
@ -27,19 +27,26 @@ namespace mq
namespace shmem namespace shmem
{ {
std::unordered_map<uint64_t, std::unique_ptr<Region>> Manager::fRegions; Manager::Manager(const string& id, size_t size)
Manager::Manager(const std::string& id, size_t size)
: fShmId(id) : fShmId(id)
, fSegmentName("fmq_" + fShmId + "_main") , fSegmentName("fmq_" + fShmId + "_main")
, fManagementSegmentName("fmq_" + fShmId + "_mng") , fManagementSegmentName("fmq_" + fShmId + "_mng")
, fSegment(bipc::open_or_create, fSegmentName.c_str(), size) , fSegment(bipc::open_or_create, fSegmentName.c_str(), size)
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536) , fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 655360)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str()) , fShmMtx(bipc::open_or_create, string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(bipc::open_or_create, string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false)
, fDeviceCounter(nullptr) , fDeviceCounter(nullptr)
, fRegionInfos(nullptr)
, fInterrupted(false)
{ {
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(bipc::unique_instance)(fShmVoidAlloc);
// store info about the managed segment as region with id 0
fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc));
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx); bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fDeviceCounter = fManagementSegment.find<DeviceCounter>(bipc::unique_instance).first; fDeviceCounter = fManagementSegment.find<DeviceCounter>(bipc::unique_instance).first;
@ -55,7 +62,7 @@ Manager::Manager(const std::string& id, size_t size)
} }
} }
void Manager::StartMonitor(const std::string& id) void Manager::StartMonitor(const string& id)
{ {
try { try {
bipc::named_mutex monitorStatus(bipc::open_only, string("fmq_" + id + "_ms").c_str()); bipc::named_mutex monitorStatus(bipc::open_only, string("fmq_" + id + "_ms").c_str());
@ -94,47 +101,74 @@ void Manager::StartMonitor(const std::string& id)
} }
} }
void Manager::Interrupt() pair<bipc::mapped_region*, uint64_t> Manager::CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
{ {
} try {
void Manager::Resume() pair<bipc::mapped_region*, uint64_t> result;
{
// close remote regions before processing new transfers
for (auto it = fRegions.begin(); it != fRegions.end(); /**/) {
if (it->second->fRemote) {
it = fRegions.erase(it);
} else {
++it;
}
}
}
bipc::mapped_region* Manager::CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
{
auto it = fRegions.find(id);
if (it != fRegions.end()) {
LOG(error) << "Trying to create a region that already exists";
return nullptr;
} else {
// create region info
{ {
uint64_t id = 0;
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx); bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
VoidAlloc voidAlloc(fManagementSegment.get_segment_manager());
Uint64RegionInfoMap* infoMap = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(bipc::unique_instance)(voidAlloc); RegionCounter* rc = fManagementSegment.find<RegionCounter>(bipc::unique_instance).first;
infoMap->emplace(id, RegionInfo(path.c_str(), flags, voidAlloc));
if (rc) {
LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing.";
(rc->fCount)++;
LOG(debug) << "incremented region counter, now: " << rc->fCount;
} else {
LOG(debug) << "no region counter found, creating one and initializing with 1";
rc = fManagementSegment.construct<RegionCounter>(bipc::unique_instance)(1);
LOG(debug) << "initialized region counter with: " << rc->fCount;
}
id = rc->fCount;
auto it = fRegions.find(id);
if (it != fRegions.end()) {
LOG(error) << "Trying to create a region that already exists";
return {nullptr, id};
}
// create region info
fRegionInfos->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, size, false, callback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
r.first->second->StartReceivingAcks();
result.first = &(r.first->second->fRegion);
result.second = id;
} }
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; fRegionEventsCV.notify_all();
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, size, false, callback, path, flags)); return result;
r.first->second->StartReceivingAcks(); } catch (bipc::interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
return &(r.first->second->fRegion); LOG(error) << e.what();
throw;
} }
} }
Region* Manager::GetRemoteRegion(const uint64_t id) void Manager::RemoveRegion(const uint64_t id)
{
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegions.erase(id);
fRegionInfos->at(id).fDestroyed = true;
}
fRegionEventsCV.notify_all();
}
Region* Manager::GetRegion(const uint64_t id)
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
return GetRegionUnsafe(id);
}
Region* Manager::GetRegionUnsafe(const uint64_t id)
{ {
// remote region could actually be a local one if a message originates from this device (has been sent out and returned) // remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id); auto it = fRegions.find(id);
@ -142,36 +176,108 @@ Region* Manager::GetRemoteRegion(const uint64_t id)
return it->second.get(); return it->second.get();
} else { } else {
try { try {
string path;
int flags;
// get region info // get region info
{ RegionInfo regionInfo = fRegionInfos->at(id);
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx); string path = regionInfo.fPath.c_str();
Uint64RegionInfoMap* infoMap = fManagementSegment.find<Uint64RegionInfoMap>(bipc::unique_instance).first; int flags = regionInfo.fFlags;
if (infoMap == nullptr) {
LOG(error) << "Unable to locate the region info";
throw SharedMemoryError("Unable to locate remote region info");
}
RegionInfo regionInfo = infoMap->at(id);
path = regionInfo.fPath.c_str();
flags = regionInfo.fFlags;
}
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; // LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags)); auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
r.first->second->StartSendingAcks();
return r.first->second.get(); return r.first->second.get();
} catch (bie& e) { } catch (bie& e) {
LOG(warn) << "Could not get remote region for id: " << id; LOG(warn) << "Could not get remote region for id: " << id;
return nullptr; return nullptr;
} }
} }
} }
void Manager::RemoveRegion(const uint64_t id) vector<fair::mq::RegionInfo> Manager::GetRegionInfo()
{ {
fRegions.erase(id); bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
return GetRegionInfoUnsafe();
}
vector<fair::mq::RegionInfo> Manager::GetRegionInfoUnsafe()
{
vector<fair::mq::RegionInfo> result;
for (const auto& e : *fRegionInfos) {
fair::mq::RegionInfo info;
info.id = e.first;
info.flags = e.second.fUserFlags;
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
if (info.id != 0) {
if (!e.second.fDestroyed) {
auto region = GetRegionUnsafe(info.id);
info.ptr = region->fRegion.get_address();
info.size = region->fRegion.get_size();
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
} else {
if (!e.second.fDestroyed) {
info.ptr = fSegment.get_address();
info.size = fSegment.get_size();
} else {
info.ptr = nullptr;
info.size = 0;
}
result.push_back(info);
}
}
return result;
}
void Manager::SubscribeToRegionEvents(RegionEventCallback callback)
{
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
if (fRegionEventThread.joinable()) {
fRegionEventsSubscriptionActive.store(false);
fRegionEventThread.join();
}
fRegionEventCallback = callback;
fRegionEventsSubscriptionActive.store(true);
fRegionEventThread = thread(&Manager::RegionEventsSubscription, this);
}
void Manager::UnsubscribeFromRegionEvents()
{
if (fRegionEventThread.joinable()) {
fRegionEventsSubscriptionActive.store(false);
fRegionEventsCV.notify_all();
fRegionEventThread.join();
}
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
fRegionEventCallback = nullptr;
}
void Manager::RegionEventsSubscription()
{
while (fRegionEventsSubscriptionActive.load()) {
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
auto infos = GetRegionInfoUnsafe();
for (const auto& i : infos) {
auto el = fObservedRegionEvents.find(i.id);
if (el == fObservedRegionEvents.end()) {
fRegionEventCallback(i);
fObservedRegionEvents.emplace(i.id, i.event);
} else {
if (el->second == RegionEvent::created && i.event == RegionEvent::destroyed) {
fRegionEventCallback(i);
el->second = i.event;
} else {
// LOG(debug) << "ignoring event for id" << i.id << ":";
// LOG(debug) << "incoming event: " << i.event;
// LOG(debug) << "stored event: " << el->second;
}
}
}
fRegionEventsCV.wait(lock);
}
} }
void Manager::RemoveSegments() void Manager::RemoveSegments()
@ -193,6 +299,12 @@ Manager::~Manager()
{ {
bool lastRemoved = false; bool lastRemoved = false;
if (fRegionEventThread.joinable()) {
fRegionEventsSubscriptionActive.store(false);
fRegionEventsCV.notify_all();
fRegionEventThread.join();
}
try { try {
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx); bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
@ -212,6 +324,7 @@ Manager::~Manager()
if (lastRemoved) { if (lastRemoved) {
bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str()); bipc::named_mutex::remove(string("fmq_" + fShmId + "_mtx").c_str());
bipc::named_condition::remove(string("fmq_" + fShmId + "_cv").c_str());
} }
} }

View File

@ -15,18 +15,24 @@
#ifndef FAIR_MQ_SHMEM_MANAGER_H_ #ifndef FAIR_MQ_SHMEM_MANAGER_H_
#define FAIR_MQ_SHMEM_MANAGER_H_ #define FAIR_MQ_SHMEM_MANAGER_H_
#include "Region.h"
#include "Common.h" #include "Common.h"
#include "Region.h"
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <FairMQUnmanagedRegion.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/sync/named_mutex.hpp> #include <boost/interprocess/sync/named_mutex.hpp>
#include <string> #include <set>
#include <unordered_map>
#include <stdexcept> #include <stdexcept>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
namespace fair namespace fair
{ {
@ -56,17 +62,25 @@ class Manager
static void StartMonitor(const std::string&); static void StartMonitor(const std::string&);
static void Interrupt(); void Interrupt() { fInterrupted.store(true); }
static void Resume(); void Resume() { fInterrupted.store(false); }
bool Interrupted() { return fInterrupted.load(); }
int GetDeviceCounter(); int GetDeviceCounter();
int IncrementDeviceCounter(); int IncrementDeviceCounter();
int DecrementDeviceCounter(); int DecrementDeviceCounter();
boost::interprocess::mapped_region* CreateRegion(const size_t size, const uint64_t id, RegionCallback callback, const std::string& path = "", int flags = 0); std::pair<boost::interprocess::mapped_region*, uint64_t> CreateRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0);
Region* GetRemoteRegion(const uint64_t id); Region* GetRegion(const uint64_t id);
Region* GetRegionUnsafe(const uint64_t id);
void RemoveRegion(const uint64_t id); void RemoveRegion(const uint64_t id);
std::vector<fair::mq::RegionInfo> GetRegionInfo();
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe();
void SubscribeToRegionEvents(RegionEventCallback callback);
void UnsubscribeFromRegionEvents();
void RegionEventsSubscription();
void RemoveSegments(); void RemoveSegments();
private: private:
@ -75,9 +89,20 @@ class Manager
std::string fManagementSegmentName; std::string fManagementSegmentName;
boost::interprocess::managed_shared_memory fSegment; boost::interprocess::managed_shared_memory fSegment;
boost::interprocess::managed_shared_memory fManagementSegment; boost::interprocess::managed_shared_memory fManagementSegment;
VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx; boost::interprocess::named_mutex fShmMtx;
boost::interprocess::named_condition fRegionEventsCV;
std::thread fRegionEventThread;
std::atomic<bool> fRegionEventsSubscriptionActive;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
DeviceCounter* fDeviceCounter; DeviceCounter* fDeviceCounter;
static std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions; Uint64RegionInfoMap* fRegionInfos;
std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
std::atomic<bool> fInterrupted;
}; };
} // namespace shmem } // namespace shmem

View File

@ -29,7 +29,6 @@ namespace mq
namespace shmem namespace shmem
{ {
atomic<bool> Message::fInterrupted(false);
Transport Message::fTransportType = Transport::SHM; Transport Message::fTransportType = Transport::SHM;
Message::Message(Manager& manager, FairMQTransportFactory* factory) Message::Message(Manager& manager, FairMQTransportFactory* factory)
@ -113,7 +112,7 @@ bool Message::InitializeChunk(const size_t size)
} catch (bipc::bad_alloc& ba) { } catch (bipc::bad_alloc& ba) {
// LOG(warn) << "Shared memory full..."; // LOG(warn) << "Shared memory full...";
this_thread::sleep_for(chrono::milliseconds(50)); this_thread::sleep_for(chrono::milliseconds(50));
if (fInterrupted) { if (fManager.Interrupted()) {
return false; return false;
} else { } else {
continue; continue;
@ -164,7 +163,7 @@ void* Message::GetData() const
fLocalPtr = nullptr; fLocalPtr = nullptr;
} }
} else { } else {
fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId); fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
if (fRegionPtr) { if (fRegionPtr) {
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->fRegion.get_address()) + fMeta.fHandle; fLocalPtr = reinterpret_cast<char*>(fRegionPtr->fRegion.get_address()) + fMeta.fHandle;
} else { } else {
@ -221,7 +220,7 @@ void Message::CloseMessage()
fMeta.fHandle = -1; fMeta.fHandle = -1;
} else { } else {
if (!fRegionPtr) { if (!fRegionPtr) {
fRegionPtr = fManager.GetRemoteRegion(fMeta.fRegionId); fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
} }
if (fRegionPtr) { if (fRegionPtr) {

View File

@ -65,7 +65,6 @@ class Message final : public fair::mq::Message
mutable Region* fRegionPtr; mutable Region* fRegionPtr;
mutable char* fLocalPtr; mutable char* fLocalPtr;
static std::atomic<bool> fInterrupted;
static Transport fTransportType; static Transport fTransportType;
bool InitializeChunk(const size_t size); bool InitializeChunk(const size_t size);

View File

@ -15,6 +15,7 @@
#include <boost/interprocess/file_mapping.hpp> #include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/sync/named_mutex.hpp> #include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
#include <boost/interprocess/ipc/message_queue.hpp> #include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time.hpp>
@ -427,6 +428,15 @@ void Monitor::RemoveMutex(const string& name)
} }
} }
void Monitor::RemoveCondition(const string& name)
{
if (bipc::named_condition::remove(name.c_str())) {
cout << "Successfully removed \"" << name << "\"." << endl;
} else {
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
}
}
void Monitor::Cleanup(const string& shmId) void Monitor::Cleanup(const string& shmId)
{ {
string managementSegmentName("fmq_" + shmId + "_mng"); string managementSegmentName("fmq_" + shmId + "_mng");
@ -469,6 +479,7 @@ void Monitor::Cleanup(const string& shmId)
RemoveObject("fmq_" + shmId + "_main"); RemoveObject("fmq_" + shmId + "_main");
RemoveMutex("fmq_" + shmId + "_mtx"); RemoveMutex("fmq_" + shmId + "_mtx");
RemoveCondition("fmq_" + shmId + "_cv");
cout << endl; cout << endl;
} }

View File

@ -42,6 +42,7 @@ class Monitor
static void RemoveFileMapping(const std::string&); static void RemoveFileMapping(const std::string&);
static void RemoveQueue(const std::string&); static void RemoveQueue(const std::string&);
static void RemoveMutex(const std::string&); static void RemoveMutex(const std::string&);
static void RemoveCondition(const std::string&);
struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; }; struct DaemonPresent : std::runtime_error { using std::runtime_error::runtime_error; };

View File

@ -71,7 +71,6 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, Region
InitializeQueues(); InitializeQueues();
LOG(debug) << "shmem: initialized region: " << fName; LOG(debug) << "shmem: initialized region: " << fName;
fSendAcksWorker = thread(&Region::SendAcks, this);
} }
void Region::InitializeQueues() void Region::InitializeQueues()
@ -84,6 +83,11 @@ void Region::InitializeQueues()
LOG(debug) << "shmem: initialized region queue: " << fQueueName; LOG(debug) << "shmem: initialized region queue: " << fQueueName;
} }
void Region::StartSendingAcks()
{
fSendAcksWorker = thread(&Region::SendAcks, this);
}
void Region::StartReceivingAcks() void Region::StartReceivingAcks()
{ {
fReceiveAcksWorker = thread(&Region::ReceiveAcks, this); fReceiveAcksWorker = thread(&Region::ReceiveAcks, this);
@ -114,12 +118,12 @@ void Region::ReceiveAcks()
void Region::ReleaseBlock(const RegionBlock &block) void Region::ReleaseBlock(const RegionBlock &block)
{ {
unique_lock<mutex> lock(fBlockLock); unique_lock<mutex> lock(fBlockMtx);
fBlocksToFree.emplace_back(block); fBlocksToFree.emplace_back(block);
if (fBlocksToFree.size() >= fAckBunchSize) { if (fBlocksToFree.size() >= fAckBunchSize) {
lock.unlock(); // reduces contention on fBlockLock lock.unlock(); // reduces contention on fBlockMtx
fBlockSendCV.notify_one(); fBlockSendCV.notify_one();
} }
} }
@ -132,7 +136,7 @@ void Region::SendAcks()
size_t blocksToSend = 0; size_t blocksToSend = 0;
{ // mutex locking block { // mutex locking block
unique_lock<mutex> lock(fBlockLock); unique_lock<mutex> lock(fBlockMtx);
// try to get more blocks without waiting (we can miss a notify from CloseMessage()) // try to get more blocks without waiting (we can miss a notify from CloseMessage())
if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) { if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) {
@ -166,6 +170,7 @@ Region::~Region()
fStop = true; fStop = true;
if (fSendAcksWorker.joinable()) { if (fSendAcksWorker.joinable()) {
fBlockSendCV.notify_one();
fSendAcksWorker.join(); fSendAcksWorker.join();
} }

View File

@ -49,11 +49,11 @@ struct Region
void InitializeQueues(); void InitializeQueues();
void StartSendingAcks();
void SendAcks();
void StartReceivingAcks(); void StartReceivingAcks();
void ReceiveAcks(); void ReceiveAcks();
void ReleaseBlock(const RegionBlock &); void ReleaseBlock(const RegionBlock &);
void SendAcks();
~Region(); ~Region();
@ -67,7 +67,7 @@ struct Region
boost::interprocess::file_mapping fFileMapping; boost::interprocess::file_mapping fFileMapping;
boost::interprocess::mapped_region fRegion; boost::interprocess::mapped_region fRegion;
std::mutex fBlockLock; std::mutex fBlockMtx;
std::condition_variable fBlockSendCV; std::condition_variable fBlockSendCV;
std::vector<RegionBlock> fBlocksToFree; std::vector<RegionBlock> fBlocksToFree;
const std::size_t fAckBunchSize = 256; const std::size_t fAckBunchSize = 256;

View File

@ -28,8 +28,6 @@ namespace mq
namespace shmem namespace shmem
{ {
atomic<bool> Socket::fInterrupted(false);
struct ZMsg struct ZMsg
{ {
ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); } ZMsg() { int rc __attribute__((unused)) = zmq_msg_init(&fMsg); assert(rc == 0); }
@ -133,7 +131,7 @@ int Socket::Send(MessagePtr& msg, const int timeout)
ZMsg zmqMsg(sizeof(MetaHeader)); ZMsg zmqMsg(sizeof(MetaHeader));
std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader)); std::memcpy(zmqMsg.Data(), &(shmMsg->fMeta), sizeof(MetaHeader));
while (true && !fInterrupted) { while (true && !fManager.Interrupted()) {
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
shmMsg->fQueued = true; shmMsg->fQueued = true;
@ -142,7 +140,7 @@ int Socket::Send(MessagePtr& msg, const int timeout)
fBytesTx += size; fBytesTx += size;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) { if (timeout > 0) {
elapsed += fSndTimeout; elapsed += fSndTimeout;
if (elapsed >= timeout) { if (elapsed >= timeout) {
@ -198,7 +196,7 @@ int Socket::Receive(MessagePtr& msg, const int timeout)
++fMessagesRx; ++fMessagesRx;
return size; return size;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) { if (timeout > 0) {
elapsed += fRcvTimeout; elapsed += fRcvTimeout;
if (elapsed >= timeout) { if (elapsed >= timeout) {
@ -242,7 +240,7 @@ int64_t Socket::Send(vector<MessagePtr>& msgVec, const int timeout)
std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader)); std::memcpy(metas++, &(shmMsg->fMeta), sizeof(MetaHeader));
} }
while (!fInterrupted) { while (!fManager.Interrupted()) {
int64_t totalSize = 0; int64_t totalSize = 0;
int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
@ -260,7 +258,7 @@ int64_t Socket::Send(vector<MessagePtr>& msgVec, const int timeout)
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) { if (timeout > 0) {
elapsed += fSndTimeout; elapsed += fSndTimeout;
if (elapsed >= timeout) { if (elapsed >= timeout) {
@ -296,7 +294,7 @@ int64_t Socket::Receive(vector<MessagePtr>& msgVec, const int timeout)
ZMsg zmqMsg; ZMsg zmqMsg;
while (!fInterrupted) { while (!fManager.Interrupted()) {
int64_t totalSize = 0; int64_t totalSize = 0;
int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags);
if (nbytes > 0) { if (nbytes > 0) {
@ -327,7 +325,7 @@ int64_t Socket::Receive(vector<MessagePtr>& msgVec, const int timeout)
return totalSize; return totalSize;
} else if (zmq_errno() == EAGAIN) { } else if (zmq_errno() == EAGAIN) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
if (timeout > 0) { if (timeout > 0) {
elapsed += fRcvTimeout; elapsed += fRcvTimeout;
if (elapsed >= timeout) { if (elapsed >= timeout) {
@ -365,20 +363,6 @@ void Socket::Close()
fSocket = nullptr; fSocket = nullptr;
} }
void Socket::Interrupt()
{
Manager::Interrupt();
Message::fInterrupted = true;
fInterrupted = true;
}
void Socket::Resume()
{
Manager::Resume();
Message::fInterrupted = false;
fInterrupted = false;
}
void Socket::SetOption(const string& option, const void* value, size_t valueSize) void Socket::SetOption(const string& option, const void* value, size_t valueSize)
{ {
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {

View File

@ -46,9 +46,6 @@ class Socket final : public fair::mq::Socket
void Close() override; void Close() override;
static void Interrupt();
static void Resume();
void SetOption(const std::string& option, const void* value, size_t valueSize) override; void SetOption(const std::string& option, const void* value, size_t valueSize) override;
void GetOption(const std::string& option, void* value, size_t* valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override;
@ -81,8 +78,6 @@ class Socket final : public fair::mq::Socket
std::atomic<unsigned long> fMessagesTx; std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx; std::atomic<unsigned long> fMessagesRx;
static std::atomic<bool> fInterrupted;
int fSndTimeout; int fSndTimeout;
int fRcvTimeout; int fRcvTimeout;
}; };

View File

@ -164,6 +164,26 @@ UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, Re
return tools::make_unique<UnmanagedRegion>(*fManager, size, callback, path, flags); return tools::make_unique<UnmanagedRegion>(*fManager, size, callback, path, flags);
} }
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
{
return tools::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, path, flags);
}
void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback)
{
fManager->SubscribeToRegionEvents(callback);
}
void TransportFactory::UnsubscribeFromRegionEvents()
{
fManager->UnsubscribeFromRegionEvents();
}
vector<fair::mq::RegionInfo> TransportFactory::GetRegionInfo()
{
return fManager->GetRegionInfo();
}
Transport TransportFactory::GetType() const Transport TransportFactory::GetType() const
{ {
return fTransportType; return fTransportType;

View File

@ -50,11 +50,16 @@ class TransportFactory final : public fair::mq::TransportFactory
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override; UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
void SubscribeToRegionEvents(RegionEventCallback callback) override;
void UnsubscribeFromRegionEvents() override;
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
Transport GetType() const override; Transport GetType() const override;
void Interrupt() override { Socket::Interrupt(); } void Interrupt() override { fManager->Interrupt(); }
void Resume() override { Socket::Resume(); } void Resume() override { fManager->Resume(); }
void Reset() override; void Reset() override;
void IncrementMsgCounter() { ++fMsgCounter; } void IncrementMsgCounter() { ++fMsgCounter; }

View File

@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include "Common.h"
#include "UnmanagedRegion.h"
using namespace std;
namespace bipc = ::boost::interprocess;
namespace fair
{
namespace mq
{
namespace shmem
{
UnmanagedRegion::UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
: fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
try {
RegionCounter* rc = fManager.ManagementSegment().find<RegionCounter>(bipc::unique_instance).first;
if (rc) {
LOG(debug) << "region counter found, with value of " << rc->fCount << ". incrementing.";
(rc->fCount)++;
LOG(debug) << "incremented region counter, now: " << rc->fCount;
} else {
LOG(debug) << "no region counter found, creating one and initializing with 1";
rc = fManager.ManagementSegment().construct<RegionCounter>(bipc::unique_instance)(1);
LOG(debug) << "initialized region counter with: " << rc->fCount;
}
fRegionId = rc->fCount;
fRegion = fManager.CreateRegion(size, fRegionId, callback, path, flags);
} catch (bipc::interprocess_exception& e) {
LOG(error) << "cannot create region. Already created/not cleaned up?";
LOG(error) << e.what();
throw;
}
}
}
}
}

View File

@ -36,7 +36,19 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
friend class Socket; friend class Socket;
public: public:
UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0); UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0)
: UnmanagedRegion(manager, size, 0, callback, path, flags)
{}
UnmanagedRegion(Manager& manager, const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0)
: fManager(manager)
, fRegion(nullptr)
, fRegionId(0)
{
auto result = fManager.CreateRegion(size, userFlags, callback, path, flags);
fRegion = result.first;
fRegionId = result.second;
}
void* GetData() const override { return fRegion->get_address(); } void* GetData() const override { return fRegion->get_address(); }
size_t GetSize() const override { return fRegion->get_size(); } size_t GetSize() const override { return fRegion->get_size(); }
@ -53,4 +65,4 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
} }
} }
#endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */ #endif /* FAIR_MQ_SHMEM_UNMANAGEDREGION_H_ */

View File

@ -95,6 +95,11 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback, path, flags)); return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback, path, flags));
} }
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, userFlags, callback, path, flags));
}
fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
{ {
return fTransportType; return fTransportType;

View File

@ -46,6 +46,11 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for ZeroMQ"; }
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for ZeroMQ"; }
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for ZeroMQ, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
fair::mq::Transport GetType() const override; fair::mq::Transport GetType() const override;

View File

@ -18,6 +18,13 @@ FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegi
{ {
} }
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, int64_t /* userFlags */, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */)
: fBuffer(malloc(size))
, fSize(size)
, fCallback(callback)
{
}
void* FairMQUnmanagedRegionZMQ::GetData() const void* FairMQUnmanagedRegionZMQ::GetData() const
{ {
return fBuffer; return fBuffer;

View File

@ -21,6 +21,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
public: public:
FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0); FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionZMQ(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
@ -35,4 +36,4 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
FairMQRegionCallback fCallback; FairMQRegionCallback fCallback;
}; };
#endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */ #endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */