mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Zmq: remove global (static) state, refactor
This commit is contained in:
parent
ccbf0be572
commit
dbdabd23a4
|
@ -190,6 +190,7 @@ if(BUILD_FAIRMQ)
|
||||||
shmem/Common.h
|
shmem/Common.h
|
||||||
shmem/Manager.h
|
shmem/Manager.h
|
||||||
shmem/Region.h
|
shmem/Region.h
|
||||||
|
zeromq/Context.h
|
||||||
zeromq/FairMQMessageZMQ.h
|
zeromq/FairMQMessageZMQ.h
|
||||||
zeromq/FairMQPollerZMQ.h
|
zeromq/FairMQPollerZMQ.h
|
||||||
zeromq/FairMQUnmanagedRegionZMQ.h
|
zeromq/FairMQUnmanagedRegionZMQ.h
|
||||||
|
|
195
fairmq/zeromq/Context.h
Normal file
195
fairmq/zeromq/Context.h
Normal file
|
@ -0,0 +1,195 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#ifndef FAIR_MQ_ZMQ_CONTEXT_H_
|
||||||
|
#define FAIR_MQ_ZMQ_CONTEXT_H_
|
||||||
|
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
|
#include <FairMQLogger.h>
|
||||||
|
#include <FairMQUnmanagedRegion.h>
|
||||||
|
|
||||||
|
#include <zmq.h>
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <functional>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace fair
|
||||||
|
{
|
||||||
|
namespace mq
|
||||||
|
{
|
||||||
|
namespace zmq
|
||||||
|
{
|
||||||
|
|
||||||
|
struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||||
|
|
||||||
|
class Context
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Context(int numIoThreads)
|
||||||
|
: fZmqCtx(zmq_ctx_new())
|
||||||
|
, fInterrupted(false)
|
||||||
|
, fRegionCounter(1)
|
||||||
|
{
|
||||||
|
if (!fZmqCtx) {
|
||||||
|
throw ContextError(tools::ToString("failed creating context, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) {
|
||||||
|
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
|
throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) {
|
||||||
|
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
|
throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
|
||||||
|
fRegionEvents.emplace(0, nullptr, 0, 0, fair::mq::RegionEvent::local_only);
|
||||||
|
}
|
||||||
|
|
||||||
|
Context(const Context&) = delete;
|
||||||
|
Context operator=(const Context&) = delete;
|
||||||
|
|
||||||
|
void SubscribeToRegionEvents(FairMQRegionEventCallback callback)
|
||||||
|
{
|
||||||
|
if (fRegionEventThread.joinable()) {
|
||||||
|
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
fRegionEventsSubscriptionActive = false;
|
||||||
|
}
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
|
fRegionEventThread.join();
|
||||||
|
}
|
||||||
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
fRegionEventCallback = callback;
|
||||||
|
fRegionEventsSubscriptionActive = true;
|
||||||
|
fRegionEventThread = std::thread(&Context::RegionEventsSubscription, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool SubscribedToRegionEvents() const { return fRegionEventThread.joinable(); }
|
||||||
|
|
||||||
|
void UnsubscribeFromRegionEvents()
|
||||||
|
{
|
||||||
|
if (fRegionEventThread.joinable()) {
|
||||||
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
fRegionEventsSubscriptionActive = false;
|
||||||
|
lock.unlock();
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
|
fRegionEventThread.join();
|
||||||
|
lock.lock();
|
||||||
|
fRegionEventCallback = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RegionEventsSubscription()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(fMtx);
|
||||||
|
while (fRegionEventsSubscriptionActive) {
|
||||||
|
|
||||||
|
while (!fRegionEvents.empty()) {
|
||||||
|
auto i = fRegionEvents.front();
|
||||||
|
fRegionEventCallback(i);
|
||||||
|
fRegionEvents.pop();
|
||||||
|
}
|
||||||
|
fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<fair::mq::RegionInfo> GetRegionInfo() const
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
return fRegionInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t RegionCount() const
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
return fRegionCounter;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AddRegion(uint64_t id, void* ptr, size_t size, int64_t userFlags, fair::mq::RegionEvent event)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
++fRegionCounter;
|
||||||
|
fRegionInfos.emplace_back(id, ptr, size, userFlags, event);
|
||||||
|
fRegionEvents.emplace(id, ptr, size, userFlags, event);
|
||||||
|
}
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoveRegion(uint64_t id)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) {
|
||||||
|
return i.id == id;
|
||||||
|
});
|
||||||
|
if (it != fRegionInfos.end()) {
|
||||||
|
fRegionEvents.push(*it);
|
||||||
|
fRegionEvents.back().event = fair::mq::RegionEvent::destroyed;
|
||||||
|
fRegionInfos.erase(it);
|
||||||
|
} else {
|
||||||
|
LOG(error) << "RemoveRegion: given id (" << id << ") not found.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Interrupt() { fInterrupted.store(true); }
|
||||||
|
void Resume() { fInterrupted.store(false); }
|
||||||
|
void Reset() {}
|
||||||
|
bool Interrupted() { return fInterrupted.load(); }
|
||||||
|
|
||||||
|
void* GetZmqCtx() { return fZmqCtx; }
|
||||||
|
|
||||||
|
~Context()
|
||||||
|
{
|
||||||
|
UnsubscribeFromRegionEvents();
|
||||||
|
|
||||||
|
if (fZmqCtx) {
|
||||||
|
if (zmq_ctx_term(fZmqCtx) != 0) {
|
||||||
|
if (errno == EINTR) {
|
||||||
|
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno);
|
||||||
|
} else {
|
||||||
|
fZmqCtx = nullptr;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG(error) << "context not available for shutdown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void* fZmqCtx;
|
||||||
|
mutable std::mutex fMtx;
|
||||||
|
std::atomic<bool> fInterrupted;
|
||||||
|
|
||||||
|
uint64_t fRegionCounter;
|
||||||
|
std::condition_variable fRegionEventsCV;
|
||||||
|
std::vector<fair::mq::RegionInfo> fRegionInfos;
|
||||||
|
std::queue<fair::mq::RegionInfo> fRegionEvents;
|
||||||
|
std::thread fRegionEventThread;
|
||||||
|
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||||
|
bool fRegionEventsSubscriptionActive;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace zmq
|
||||||
|
} // namespace mq
|
||||||
|
} // namespace fair
|
||||||
|
|
||||||
|
#endif /* FAIR_MQ_ZMQ_CONTEXT_H_ */
|
|
@ -17,14 +17,13 @@
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
#include <fairmq/Tools.h>
|
#include <fairmq/Tools.h>
|
||||||
#include "FairMQUnmanagedRegionZMQ.h"
|
#include "FairMQUnmanagedRegionZMQ.h"
|
||||||
#include <FairMQTransportFactory.h>
|
|
||||||
|
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
FairMQMessageZMQ::FairMQMessageZMQ(FairMQTransportFactory* factory)
|
FairMQMessageZMQ::FairMQMessageZMQ(FairMQTransportFactory* factory)
|
||||||
: FairMQMessage{factory}
|
: FairMQMessage(factory)
|
||||||
, fUsedSizeModified(false)
|
, fUsedSizeModified(false)
|
||||||
, fUsedSize()
|
, fUsedSize()
|
||||||
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
||||||
|
@ -37,7 +36,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQTransportFactory* factory)
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQMessageZMQ::FairMQMessageZMQ(const size_t size, FairMQTransportFactory* factory)
|
FairMQMessageZMQ::FairMQMessageZMQ(const size_t size, FairMQTransportFactory* factory)
|
||||||
: FairMQMessage{factory}
|
: FairMQMessage(factory)
|
||||||
, fUsedSizeModified(false)
|
, fUsedSizeModified(false)
|
||||||
, fUsedSize(size)
|
, fUsedSize(size)
|
||||||
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
||||||
|
@ -50,7 +49,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(const size_t size, FairMQTransportFactory* fa
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory)
|
FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory)
|
||||||
: FairMQMessage{factory}
|
: FairMQMessage(factory)
|
||||||
, fUsedSizeModified(false)
|
, fUsedSizeModified(false)
|
||||||
, fUsedSize()
|
, fUsedSize()
|
||||||
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
||||||
|
@ -63,7 +62,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory)
|
FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory)
|
||||||
: FairMQMessage{factory}
|
: FairMQMessage(factory)
|
||||||
, fUsedSizeModified(false)
|
, fUsedSizeModified(false)
|
||||||
, fUsedSize()
|
, fUsedSize()
|
||||||
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
, fMsg(fair::mq::tools::make_unique<zmq_msg_t>())
|
||||||
|
|
|
@ -13,16 +13,13 @@
|
||||||
|
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
#include <cassert>
|
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
|
||||||
atomic<bool> FairMQSocketZMQ::fInterrupted(false);
|
FairMQSocketZMQ::FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* factory)
|
||||||
|
: FairMQSocket(factory)
|
||||||
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac)
|
, fCtx(ctx)
|
||||||
: FairMQSocket{fac}
|
, fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type)))
|
||||||
, fSocket(nullptr)
|
|
||||||
, fId(id + "." + name + "." + type)
|
, fId(id + "." + name + "." + type)
|
||||||
, fBytesTx(0)
|
, fBytesTx(0)
|
||||||
, fBytesRx(0)
|
, fBytesRx(0)
|
||||||
|
@ -31,9 +28,6 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s
|
||||||
, fSndTimeout(100)
|
, fSndTimeout(100)
|
||||||
, fRcvTimeout(100)
|
, fRcvTimeout(100)
|
||||||
{
|
{
|
||||||
assert(context);
|
|
||||||
fSocket = zmq_socket(context, GetConstant(type));
|
|
||||||
|
|
||||||
if (fSocket == nullptr)
|
if (fSocket == nullptr)
|
||||||
{
|
{
|
||||||
LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
|
LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||||
|
@ -122,7 +116,7 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout)
|
||||||
|
|
||||||
return nbytes;
|
return nbytes;
|
||||||
} else if (zmq_errno() == EAGAIN) {
|
} else if (zmq_errno() == EAGAIN) {
|
||||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
|
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||||
if (timeout > 0) {
|
if (timeout > 0) {
|
||||||
elapsed += fSndTimeout;
|
elapsed += fSndTimeout;
|
||||||
if (elapsed >= timeout) {
|
if (elapsed >= timeout) {
|
||||||
|
@ -161,7 +155,7 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout)
|
||||||
++fMessagesRx;
|
++fMessagesRx;
|
||||||
return nbytes;
|
return nbytes;
|
||||||
} else if (zmq_errno() == EAGAIN) {
|
} else if (zmq_errno() == EAGAIN) {
|
||||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
|
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||||
if (timeout > 0) {
|
if (timeout > 0) {
|
||||||
elapsed += fRcvTimeout;
|
elapsed += fRcvTimeout;
|
||||||
if (elapsed >= timeout) {
|
if (elapsed >= timeout) {
|
||||||
|
@ -213,7 +207,7 @@ int64_t FairMQSocketZMQ::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
|
||||||
} else {
|
} else {
|
||||||
// according to ZMQ docs, this can only occur for the first part
|
// according to ZMQ docs, this can only occur for the first part
|
||||||
if (zmq_errno() == EAGAIN) {
|
if (zmq_errno() == EAGAIN) {
|
||||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
|
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||||
if (timeout > 0) {
|
if (timeout > 0) {
|
||||||
elapsed += fSndTimeout;
|
elapsed += fSndTimeout;
|
||||||
if (elapsed >= timeout) {
|
if (elapsed >= timeout) {
|
||||||
|
@ -278,7 +272,7 @@ int64_t FairMQSocketZMQ::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
|
||||||
msgVec.push_back(move(part));
|
msgVec.push_back(move(part));
|
||||||
totalSize += nbytes;
|
totalSize += nbytes;
|
||||||
} else if (zmq_errno() == EAGAIN) {
|
} else if (zmq_errno() == EAGAIN) {
|
||||||
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
|
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||||
if (timeout > 0) {
|
if (timeout > 0) {
|
||||||
elapsed += fRcvTimeout;
|
elapsed += fRcvTimeout;
|
||||||
if (elapsed >= timeout) {
|
if (elapsed >= timeout) {
|
||||||
|
@ -329,16 +323,6 @@ void FairMQSocketZMQ::Close()
|
||||||
fSocket = nullptr;
|
fSocket = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQSocketZMQ::Interrupt()
|
|
||||||
{
|
|
||||||
fInterrupted = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQSocketZMQ::Resume()
|
|
||||||
{
|
|
||||||
fInterrupted = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* FairMQSocketZMQ::GetSocket() const
|
void* FairMQSocketZMQ::GetSocket() const
|
||||||
{
|
{
|
||||||
return fSocket;
|
return fSocket;
|
||||||
|
|
|
@ -9,18 +9,20 @@
|
||||||
#ifndef FAIRMQSOCKETZMQ_H_
|
#ifndef FAIRMQSOCKETZMQ_H_
|
||||||
#define FAIRMQSOCKETZMQ_H_
|
#define FAIRMQSOCKETZMQ_H_
|
||||||
|
|
||||||
#include <atomic>
|
#include <fairmq/zeromq/Context.h>
|
||||||
|
|
||||||
#include <memory> // unique_ptr
|
|
||||||
|
|
||||||
#include "FairMQSocket.h"
|
#include "FairMQSocket.h"
|
||||||
#include "FairMQMessage.h"
|
#include "FairMQMessage.h"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <memory> // unique_ptr
|
||||||
|
|
||||||
class FairMQTransportFactory;
|
class FairMQTransportFactory;
|
||||||
|
|
||||||
class FairMQSocketZMQ final : public FairMQSocket
|
class FairMQSocketZMQ final : public FairMQSocket
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
FairMQSocketZMQ(const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* factory = nullptr);
|
FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr);
|
||||||
|
|
||||||
FairMQSocketZMQ(const FairMQSocketZMQ&) = delete;
|
FairMQSocketZMQ(const FairMQSocketZMQ&) = delete;
|
||||||
FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete;
|
FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete;
|
||||||
|
|
||||||
|
@ -38,9 +40,6 @@ class FairMQSocketZMQ final : public FairMQSocket
|
||||||
|
|
||||||
void Close() override;
|
void Close() override;
|
||||||
|
|
||||||
static void Interrupt();
|
|
||||||
static void Resume();
|
|
||||||
|
|
||||||
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
void SetOption(const std::string& option, const void* value, size_t valueSize) override;
|
||||||
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
void GetOption(const std::string& option, void* value, size_t* valueSize) override;
|
||||||
|
|
||||||
|
@ -65,6 +64,7 @@ class FairMQSocketZMQ final : public FairMQSocket
|
||||||
~FairMQSocketZMQ() override;
|
~FairMQSocketZMQ() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
fair::mq::zmq::Context& fCtx;
|
||||||
void* fSocket;
|
void* fSocket;
|
||||||
std::string fId;
|
std::string fId;
|
||||||
std::atomic<unsigned long> fBytesTx;
|
std::atomic<unsigned long> fBytesTx;
|
||||||
|
@ -72,8 +72,6 @@ class FairMQSocketZMQ final : public FairMQSocket
|
||||||
std::atomic<unsigned long> fMessagesTx;
|
std::atomic<unsigned long> fMessagesTx;
|
||||||
std::atomic<unsigned long> fMessagesRx;
|
std::atomic<unsigned long> fMessagesRx;
|
||||||
|
|
||||||
static std::atomic<bool> fInterrupted;
|
|
||||||
|
|
||||||
int fSndTimeout;
|
int fSndTimeout;
|
||||||
int fRcvTimeout;
|
int fRcvTimeout;
|
||||||
};
|
};
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
|
|
||||||
#include "FairMQTransportFactoryZMQ.h"
|
#include "FairMQTransportFactoryZMQ.h"
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
#include <algorithm> // find_if
|
#include <algorithm> // find_if
|
||||||
|
@ -15,40 +16,18 @@ using namespace std;
|
||||||
|
|
||||||
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config)
|
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config)
|
||||||
: FairMQTransportFactory(id)
|
: FairMQTransportFactory(id)
|
||||||
, fContext(zmq_ctx_new())
|
, fCtx(nullptr)
|
||||||
, fRegionCounter(0)
|
|
||||||
{
|
{
|
||||||
int major, minor, patch;
|
int major, minor, patch;
|
||||||
zmq_version(&major, &minor, &patch);
|
zmq_version(&major, &minor, &patch);
|
||||||
LOG(debug) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch;
|
LOG(debug) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch;
|
||||||
|
|
||||||
if (!fContext)
|
if (config) {
|
||||||
{
|
fCtx = fair::mq::tools::make_unique<fair::mq::zmq::Context>(config->GetProperty<int>("io-threads", 1));
|
||||||
LOG(error) << "failed creating context, reason: " << zmq_strerror(errno);
|
} else {
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0)
|
|
||||||
{
|
|
||||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
int numIoThreads = 1;
|
|
||||||
if (config)
|
|
||||||
{
|
|
||||||
numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(debug) << "fair::mq::ProgOptions not available! Using defaults.";
|
LOG(debug) << "fair::mq::ProgOptions not available! Using defaults.";
|
||||||
|
fCtx = fair::mq::tools::make_unique<fair::mq::zmq::Context>(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
|
|
||||||
{
|
|
||||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
|
||||||
}
|
|
||||||
|
|
||||||
fRegionEvents.emplace(0, nullptr, 0, 0, fair::mq::RegionEvent::local_only);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage()
|
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage()
|
||||||
|
@ -73,8 +52,7 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionP
|
||||||
|
|
||||||
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name)
|
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name)
|
||||||
{
|
{
|
||||||
assert(fContext);
|
return unique_ptr<FairMQSocket>(new FairMQSocketZMQ(*fCtx, type, name, GetId(), this));
|
||||||
return unique_ptr<FairMQSocket>(new FairMQSocketZMQ(type, name, GetId(), fContext, this));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel>& channels) const
|
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel>& channels) const
|
||||||
|
@ -135,109 +113,13 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(
|
||||||
const string& path /* = "" */,
|
const string& path /* = "" */,
|
||||||
int flags /* = 0 */)
|
int flags /* = 0 */)
|
||||||
{
|
{
|
||||||
unique_ptr<FairMQUnmanagedRegion> ptr = nullptr;
|
unique_ptr<FairMQUnmanagedRegion> ptr = unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this));
|
||||||
{
|
|
||||||
lock_guard<mutex> lock(fMtx);
|
|
||||||
|
|
||||||
++fRegionCounter;
|
|
||||||
ptr = unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, bulkCallback, path, flags, this));
|
|
||||||
auto zPtr = static_cast<FairMQUnmanagedRegionZMQ*>(ptr.get());
|
auto zPtr = static_cast<FairMQUnmanagedRegionZMQ*>(ptr.get());
|
||||||
fRegionInfos.emplace_back(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
|
fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
|
||||||
fRegionEvents.emplace(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
|
|
||||||
}
|
|
||||||
fRegionEventsCV.notify_one();
|
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQTransportFactoryZMQ::SubscribeToRegionEvents(FairMQRegionEventCallback callback)
|
|
||||||
{
|
|
||||||
if (fRegionEventThread.joinable()) {
|
|
||||||
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock(fMtx);
|
|
||||||
fRegionEventsSubscriptionActive = false;
|
|
||||||
}
|
|
||||||
fRegionEventsCV.notify_one();
|
|
||||||
fRegionEventThread.join();
|
|
||||||
}
|
|
||||||
lock_guard<mutex> lock(fMtx);
|
|
||||||
fRegionEventCallback = callback;
|
|
||||||
fRegionEventsSubscriptionActive = true;
|
|
||||||
fRegionEventThread = thread(&FairMQTransportFactoryZMQ::RegionEventsSubscription, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool FairMQTransportFactoryZMQ::SubscribedToRegionEvents()
|
|
||||||
{
|
|
||||||
return fRegionEventThread.joinable();
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQTransportFactoryZMQ::UnsubscribeFromRegionEvents()
|
|
||||||
{
|
|
||||||
if (fRegionEventThread.joinable()) {
|
|
||||||
unique_lock<mutex> lock(fMtx);
|
|
||||||
fRegionEventsSubscriptionActive = false;
|
|
||||||
lock.unlock();
|
|
||||||
fRegionEventsCV.notify_one();
|
|
||||||
fRegionEventThread.join();
|
|
||||||
lock.lock();
|
|
||||||
fRegionEventCallback = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQTransportFactoryZMQ::RegionEventsSubscription()
|
|
||||||
{
|
|
||||||
unique_lock<mutex> lock(fMtx);
|
|
||||||
while (fRegionEventsSubscriptionActive) {
|
|
||||||
|
|
||||||
while (!fRegionEvents.empty()) {
|
|
||||||
auto i = fRegionEvents.front();
|
|
||||||
fRegionEventCallback(i);
|
|
||||||
fRegionEvents.pop();
|
|
||||||
}
|
|
||||||
fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
vector<fair::mq::RegionInfo> FairMQTransportFactoryZMQ::GetRegionInfo()
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock(fMtx);
|
|
||||||
return fRegionInfos;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQTransportFactoryZMQ::RemoveRegion(uint64_t id)
|
|
||||||
{
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock(fMtx);
|
|
||||||
auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) {
|
|
||||||
return i.id == id;
|
|
||||||
});
|
|
||||||
if (it != fRegionInfos.end()) {
|
|
||||||
fRegionEvents.push(*it);
|
|
||||||
fRegionEvents.back().event = fair::mq::RegionEvent::destroyed;
|
|
||||||
fRegionInfos.erase(it);
|
|
||||||
} else {
|
|
||||||
LOG(error) << "RemoveRegion: given id (" << id << ") not found.";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fRegionEventsCV.notify_one();
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
|
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
|
||||||
{
|
{
|
||||||
LOG(debug) << "Destroying ZeroMQ transport...";
|
LOG(debug) << "Destroying ZeroMQ transport...";
|
||||||
|
|
||||||
UnsubscribeFromRegionEvents();
|
|
||||||
|
|
||||||
if (fContext) {
|
|
||||||
if (zmq_ctx_term(fContext) != 0) {
|
|
||||||
if (errno == EINTR) {
|
|
||||||
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno);
|
|
||||||
} else {
|
|
||||||
fContext = nullptr;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG(error) << "context not available for shutdown";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,19 +15,16 @@
|
||||||
#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_
|
#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_
|
||||||
#define FAIRMQTRANSPORTFACTORYZMQ_H_
|
#define FAIRMQTRANSPORTFACTORYZMQ_H_
|
||||||
|
|
||||||
|
#include <fairmq/zeromq/Context.h>
|
||||||
|
#include <fairmq/ProgOptions.h>
|
||||||
#include "FairMQTransportFactory.h"
|
#include "FairMQTransportFactory.h"
|
||||||
#include "FairMQMessageZMQ.h"
|
#include "FairMQMessageZMQ.h"
|
||||||
#include "FairMQSocketZMQ.h"
|
#include "FairMQSocketZMQ.h"
|
||||||
#include "FairMQPollerZMQ.h"
|
#include "FairMQPollerZMQ.h"
|
||||||
#include "FairMQUnmanagedRegionZMQ.h"
|
#include "FairMQUnmanagedRegionZMQ.h"
|
||||||
#include <fairmq/ProgOptions.h>
|
|
||||||
|
|
||||||
#include <condition_variable>
|
#include <memory> // unique_ptr
|
||||||
#include <functional>
|
|
||||||
#include <mutex>
|
|
||||||
#include <queue>
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
||||||
|
@ -54,32 +51,21 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
||||||
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override;
|
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override;
|
||||||
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0);
|
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0);
|
||||||
|
|
||||||
void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override;
|
void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override { fCtx->SubscribeToRegionEvents(callback); }
|
||||||
bool SubscribedToRegionEvents() override;
|
bool SubscribedToRegionEvents() override { return fCtx->SubscribedToRegionEvents(); }
|
||||||
void UnsubscribeFromRegionEvents() override;
|
void UnsubscribeFromRegionEvents() override { fCtx->UnsubscribeFromRegionEvents(); }
|
||||||
void RegionEventsSubscription();
|
std::vector<fair::mq::RegionInfo> GetRegionInfo() override { return fCtx->GetRegionInfo(); }
|
||||||
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
|
|
||||||
void RemoveRegion(uint64_t id);
|
|
||||||
|
|
||||||
fair::mq::Transport GetType() const override { return fair::mq::Transport::ZMQ; }
|
fair::mq::Transport GetType() const override { return fair::mq::Transport::ZMQ; }
|
||||||
|
|
||||||
void Interrupt() override { FairMQSocketZMQ::Interrupt(); }
|
void Interrupt() override { fCtx->Interrupt(); }
|
||||||
void Resume() override { FairMQSocketZMQ::Resume(); }
|
void Resume() override { fCtx->Resume(); }
|
||||||
void Reset() override {}
|
void Reset() override { fCtx->Reset(); }
|
||||||
|
|
||||||
~FairMQTransportFactoryZMQ() override;
|
~FairMQTransportFactoryZMQ() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void* fContext;
|
std::unique_ptr<fair::mq::zmq::Context> fCtx;
|
||||||
|
|
||||||
std::mutex fMtx;
|
|
||||||
uint64_t fRegionCounter;
|
|
||||||
std::condition_variable fRegionEventsCV;
|
|
||||||
std::vector<fair::mq::RegionInfo> fRegionInfos;
|
|
||||||
std::queue<fair::mq::RegionInfo> fRegionEvents;
|
|
||||||
std::thread fRegionEventThread;
|
|
||||||
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
|
||||||
bool fRegionEventsSubscriptionActive;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */
|
#endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */
|
||||||
|
|
|
@ -7,19 +7,19 @@
|
||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
|
|
||||||
#include "FairMQUnmanagedRegionZMQ.h"
|
#include "FairMQUnmanagedRegionZMQ.h"
|
||||||
#include "FairMQTransportFactoryZMQ.h"
|
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id,
|
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(fair::mq::zmq::Context& ctx,
|
||||||
const size_t size,
|
size_t size,
|
||||||
int64_t userFlags,
|
int64_t userFlags,
|
||||||
FairMQRegionCallback callback,
|
FairMQRegionCallback callback,
|
||||||
FairMQRegionBulkCallback bulkCallback,
|
FairMQRegionBulkCallback bulkCallback,
|
||||||
const std::string& /* path = "" */,
|
const std::string& /* path = "" */,
|
||||||
int /* flags = 0 */,
|
int /* flags = 0 */,
|
||||||
FairMQTransportFactory* factory /* = nullptr */)
|
FairMQTransportFactory* factory)
|
||||||
: FairMQUnmanagedRegion(factory)
|
: FairMQUnmanagedRegion(factory)
|
||||||
, fId(id)
|
, fCtx(ctx)
|
||||||
|
, fId(fCtx.RegionCount())
|
||||||
, fBuffer(malloc(size))
|
, fBuffer(malloc(size))
|
||||||
, fSize(size)
|
, fSize(size)
|
||||||
, fUserFlags(userFlags)
|
, fUserFlags(userFlags)
|
||||||
|
@ -39,7 +39,7 @@ size_t FairMQUnmanagedRegionZMQ::GetSize() const
|
||||||
|
|
||||||
FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ()
|
FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ()
|
||||||
{
|
{
|
||||||
LOG(debug) << "destroying region";
|
LOG(debug) << "destroying region " << fId;
|
||||||
static_cast<FairMQTransportFactoryZMQ*>(GetTransport())->RemoveRegion(fId);
|
fCtx.RemoveRegion(fId);
|
||||||
free(fBuffer);
|
free(fBuffer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
#ifndef FAIRMQUNMANAGEDREGIONZMQ_H_
|
#ifndef FAIRMQUNMANAGEDREGIONZMQ_H_
|
||||||
#define FAIRMQUNMANAGEDREGIONZMQ_H_
|
#define FAIRMQUNMANAGEDREGIONZMQ_H_
|
||||||
|
|
||||||
|
#include <fairmq/zeromq/Context.h>
|
||||||
#include "FairMQUnmanagedRegion.h"
|
#include "FairMQUnmanagedRegion.h"
|
||||||
|
|
||||||
#include <cstddef> // size_t
|
#include <cstddef> // size_t
|
||||||
|
@ -21,7 +22,14 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
|
||||||
friend class FairMQMessageZMQ;
|
friend class FairMQMessageZMQ;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr);
|
FairMQUnmanagedRegionZMQ(fair::mq::zmq::Context& ctx,
|
||||||
|
size_t size,
|
||||||
|
int64_t userFlags,
|
||||||
|
FairMQRegionCallback callback,
|
||||||
|
FairMQRegionBulkCallback bulkCallback,
|
||||||
|
const std::string& path = "",
|
||||||
|
int flags = 0,
|
||||||
|
FairMQTransportFactory* factory = nullptr);
|
||||||
|
|
||||||
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
|
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
|
||||||
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
|
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
|
||||||
|
@ -34,6 +42,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
|
||||||
virtual ~FairMQUnmanagedRegionZMQ();
|
virtual ~FairMQUnmanagedRegionZMQ();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
fair::mq::zmq::Context& fCtx;
|
||||||
uint64_t fId;
|
uint64_t fId;
|
||||||
void* fBuffer;
|
void* fBuffer;
|
||||||
size_t fSize;
|
size_t fSize;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user