FairMQ: Remove unused Attach method and unused socket flags

This commit is contained in:
Dennis Klein 2018-02-16 19:38:59 +01:00 committed by Mohammad Al-Turany
parent 3785c0e369
commit 3670dd8835
12 changed files with 117 additions and 183 deletions

View File

@ -42,8 +42,6 @@ FairMQChannel::FairMQChannel()
, fChannelCmdSocket(nullptr) , fChannelCmdSocket(nullptr)
, fTransportType(FairMQ::Transport::DEFAULT) , fTransportType(FairMQ::Transport::DEFAULT)
, fTransportFactory(nullptr) , fTransportFactory(nullptr)
, fNoBlockFlag(0)
, fSndMoreFlag(0)
, fMultipart(false) , fMultipart(false)
, fModified(true) , fModified(true)
, fReset(false) , fReset(false)
@ -67,8 +65,6 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fChannelCmdSocket(nullptr) , fChannelCmdSocket(nullptr)
, fTransportType(FairMQ::Transport::DEFAULT) , fTransportType(FairMQ::Transport::DEFAULT)
, fTransportFactory(nullptr) , fTransportFactory(nullptr)
, fNoBlockFlag(0)
, fSndMoreFlag(0)
, fMultipart(false) , fMultipart(false)
, fModified(true) , fModified(true)
, fReset(false) , fReset(false)
@ -92,8 +88,6 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared
, fChannelCmdSocket(nullptr) , fChannelCmdSocket(nullptr)
, fTransportType(factory->GetType()) , fTransportType(factory->GetType())
, fTransportFactory(factory) , fTransportFactory(factory)
, fNoBlockFlag(0)
, fSndMoreFlag(0)
, fMultipart(false) , fMultipart(false)
, fModified(true) , fModified(true)
, fReset(false) , fReset(false)
@ -117,8 +111,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fChannelCmdSocket(nullptr) , fChannelCmdSocket(nullptr)
, fTransportType(FairMQ::Transport::DEFAULT) , fTransportType(FairMQ::Transport::DEFAULT)
, fTransportFactory(nullptr) , fTransportFactory(nullptr)
, fNoBlockFlag(chan.fNoBlockFlag)
, fSndMoreFlag(chan.fSndMoreFlag)
, fMultipart(chan.fMultipart) , fMultipart(chan.fMultipart)
, fModified(chan.fModified) , fModified(chan.fModified)
, fReset(false) , fReset(false)
@ -142,8 +134,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fChannelCmdSocket = nullptr; fChannelCmdSocket = nullptr;
fTransportType = FairMQ::Transport::DEFAULT; fTransportType = FairMQ::Transport::DEFAULT;
fTransportFactory = nullptr; fTransportFactory = nullptr;
fNoBlockFlag = chan.fNoBlockFlag;
fSndMoreFlag = chan.fSndMoreFlag;
return *this; return *this;
} }
@ -677,9 +667,6 @@ bool FairMQChannel::InitCommandInterface()
{ {
fChannelCmdSocket->Connect("inproc://commands"); fChannelCmdSocket->Connect("inproc://commands");
fNoBlockFlag = fChannelCmdSocket->NOBLOCK;
fSndMoreFlag = fChannelCmdSocket->SNDMORE;
fPoller = fTransportFactory->CreatePoller(*fChannelCmdSocket, *fSocket); fPoller = fTransportFactory->CreatePoller(*fChannelCmdSocket, *fSocket);
return true; return true;
@ -754,13 +741,13 @@ int FairMQChannel::Receive(unique_ptr<FairMQMessage>& msg, int rcvTimeoutInMs) c
int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg) const int FairMQChannel::SendAsync(unique_ptr<FairMQMessage>& msg) const
{ {
CheckCompatibility(msg); CheckCompatibility(msg);
return fSocket->Send(msg, fNoBlockFlag); return fSocket->SendAsync(msg);
} }
int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg) const int FairMQChannel::ReceiveAsync(unique_ptr<FairMQMessage>& msg) const
{ {
CheckCompatibility(msg); CheckCompatibility(msg);
return fSocket->Receive(msg, fNoBlockFlag); return fSocket->ReceiveAsync(msg);
} }
int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec) const int64_t FairMQChannel::Send(vector<unique_ptr<FairMQMessage>>& msgVec) const
@ -820,7 +807,7 @@ int64_t FairMQChannel::Receive(vector<unique_ptr<FairMQMessage>>& msgVec, int rc
int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{ {
CheckCompatibility(msgVec); CheckCompatibility(msgVec);
return fSocket->Send(msgVec, fNoBlockFlag); return fSocket->SendAsync(msgVec);
} }
/// Receives a vector of messages in non-blocking mode. /// Receives a vector of messages in non-blocking mode.
@ -831,7 +818,7 @@ int64_t FairMQChannel::SendAsync(vector<unique_ptr<FairMQMessage>>& msgVec) cons
int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const int64_t FairMQChannel::ReceiveAsync(vector<unique_ptr<FairMQMessage>>& msgVec) const
{ {
CheckCompatibility(msgVec); CheckCompatibility(msgVec);
return fSocket->Receive(msgVec, fNoBlockFlag); return fSocket->ReceiveAsync(msgVec);
} }
inline bool FairMQChannel::HandleUnblock() const inline bool FairMQChannel::HandleUnblock() const

View File

@ -318,9 +318,6 @@ class FairMQChannel
FairMQ::Transport fTransportType; FairMQ::Transport fTransportType;
std::shared_ptr<FairMQTransportFactory> fTransportFactory; std::shared_ptr<FairMQTransportFactory> fTransportFactory;
int fNoBlockFlag;
int fSndMoreFlag;
bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const; bool CheckCompatibility(std::unique_ptr<FairMQMessage>& msg) const;
bool CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const; bool CheckCompatibility(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) const;

View File

@ -1,80 +1,9 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/**
* FairMQSocket.cxx
*
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQSocket.h" #include <FairMQSocket.h>
#include <cstring>
bool FairMQSocket::Attach(const std::string& config, bool serverish)
{
if (config.empty())
{
return false;
}
if (config.size() < 2)
{
return false;
}
const char* endpoints = config.c_str();
// We hold each individual endpoint here
char endpoint [256];
while (*endpoints)
{
const char *delimiter = strchr(endpoints, ',');
if (!delimiter)
{
delimiter = endpoints + strlen(endpoints);
}
if (delimiter - endpoints > 255)
{
return false;
}
memcpy(endpoint, endpoints, delimiter - endpoints);
endpoint[delimiter - endpoints] = 0;
bool rc = false;
if (endpoint [0] == '@')
{
rc = Bind(endpoint + 1);
}
else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' )
{
Connect(endpoint + 1);
}
else if (serverish)
{
rc = Bind(endpoint);
}
else
{
Connect(endpoint);
}
if (!rc)
{
return false;
}
if (*delimiter == 0)
{
break;
}
endpoints = delimiter + 1;
}
return true;
}

View File

@ -18,30 +18,26 @@
class FairMQSocket class FairMQSocket
{ {
public: public:
const int SNDMORE; FairMQSocket() {}
const int RCVMORE;
const int NOBLOCK;
FairMQSocket(int sndMore, int rcvMore, int noBlock)
: SNDMORE(sndMore)
, RCVMORE(rcvMore)
, NOBLOCK(noBlock)
{}
virtual std::string GetId() = 0; virtual std::string GetId() = 0;
virtual bool Bind(const std::string& address) = 0; virtual bool Bind(const std::string& address) = 0;
virtual void Connect(const std::string& address) = 0; virtual void Connect(const std::string& address) = 0;
virtual bool Attach(const std::string& address, bool serverish = false);
virtual int Send(FairMQMessagePtr& msg, const int flags = 0) = 0; virtual int Send(FairMQMessagePtr& msg) = 0;
virtual int Receive(FairMQMessagePtr& msg, const int flags = 0) = 0; virtual int SendAsync(FairMQMessagePtr& msg) = 0;
virtual int Receive(FairMQMessagePtr& msg) = 0;
virtual int ReceiveAsync(FairMQMessagePtr& msg) = 0;
virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0) = 0; virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0) = 0; virtual int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) = 0;
virtual void* GetSocket() const = 0; virtual void* GetSocket() const = 0;
virtual int GetSocket(int nothing) const = 0; virtual int GetSocket(int nothing) const = 0;
virtual void Close() = 0; virtual void Close() = 0;
virtual void Interrupt() = 0; virtual void Interrupt() = 0;

View File

@ -33,16 +33,13 @@ using namespace std;
atomic<bool> FairMQSocketNN::fInterrupted(false); atomic<bool> FairMQSocketNN::fInterrupted(false);
FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const string& id /*= ""*/) FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const string& id /*= ""*/)
: FairMQSocket(0, 0, NN_DONTWAIT) : fSocket(-1)
, fSocket(-1) , fId(id + "." + name + "." + type)
, fId()
, fBytesTx(0) , fBytesTx(0)
, fBytesRx(0) , fBytesRx(0)
, fMessagesTx(0) , fMessagesTx(0)
, fMessagesRx(0) , fMessagesRx(0)
{ {
fId = id + "." + name + "." + type;
if (type == "router" || type == "dealer") if (type == "router" || type == "dealer")
{ {
// Additional info about using the sockets ROUTER and DEALER with nanomsg can be found in: // Additional info about using the sockets ROUTER and DEALER with nanomsg can be found in:
@ -121,6 +118,16 @@ void FairMQSocketNN::Connect(const string& address)
} }
} }
int FairMQSocketNN::Send(FairMQMessagePtr& msg) { return Send(msg, 0); }
int FairMQSocketNN::SendAsync(FairMQMessagePtr& msg) { return Send(msg, NN_DONTWAIT); }
int FairMQSocketNN::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); }
int FairMQSocketNN::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, NN_DONTWAIT); }
int64_t FairMQSocketNN::Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, 0); }
int64_t FairMQSocketNN::SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, NN_DONTWAIT); }
int64_t FairMQSocketNN::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, 0); }
int64_t FairMQSocketNN::ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, NN_DONTWAIT); }
int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags)
{ {
int nbytes = -1; int nbytes = -1;

View File

@ -1,16 +1,10 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/**
* FairMQSocketNN.h
*
* @since 2013-12-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQSOCKETNN_H_ #ifndef FAIRMQSOCKETNN_H_
#define FAIRMQSOCKETNN_H_ #define FAIRMQSOCKETNN_H_
@ -33,11 +27,15 @@ class FairMQSocketNN : public FairMQSocket
virtual bool Bind(const std::string& address); virtual bool Bind(const std::string& address);
virtual void Connect(const std::string& address); virtual void Connect(const std::string& address);
virtual int Send(FairMQMessagePtr& msg, const int flags = 0); virtual int Send(FairMQMessagePtr& msg);
virtual int Receive(FairMQMessagePtr& msg, const int flags = 0); virtual int SendAsync(FairMQMessagePtr& msg);
virtual int Receive(FairMQMessagePtr& msg);
virtual int ReceiveAsync(FairMQMessagePtr& msg);
virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0); virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0); virtual int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec);
virtual void* GetSocket() const; virtual void* GetSocket() const;
virtual int GetSocket(int nothing) const; virtual int GetSocket(int nothing) const;
@ -71,6 +69,11 @@ class FairMQSocketNN : public FairMQSocket
std::atomic<unsigned long> fMessagesTx; std::atomic<unsigned long> fMessagesTx;
std::atomic<unsigned long> fMessagesRx; std::atomic<unsigned long> fMessagesRx;
static std::atomic<bool> fInterrupted; static std::atomic<bool> fInterrupted;
int Send(FairMQMessagePtr& msg, const int flags = 0);
int Receive(FairMQMessagePtr& msg, const int flags = 0);
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
}; };
#endif /* FAIRMQSOCKETNN_H_ */ #endif /* FAIRMQSOCKETNN_H_ */

View File

@ -1,5 +1,5 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
@ -20,17 +20,14 @@ using namespace fair::mq::shmem;
atomic<bool> FairMQSocketSHM::fInterrupted(false); atomic<bool> FairMQSocketSHM::fInterrupted(false);
FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context) FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) : fSocket(nullptr)
, fSocket(nullptr)
, fManager(manager) , fManager(manager)
, fId() , fId(id + "." + name + "." + type)
, fBytesTx(0) , fBytesTx(0)
, fBytesRx(0) , fBytesRx(0)
, fMessagesTx(0) , fMessagesTx(0)
, fMessagesRx(0) , fMessagesRx(0)
{ {
fId = id + "." + name + "." + type;
assert(context); assert(context);
fSocket = zmq_socket(context, GetConstant(type)); fSocket = zmq_socket(context, GetConstant(type));
@ -76,11 +73,6 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
// LOG(info) << "created socket " << fId; // LOG(info) << "created socket " << fId;
} }
string FairMQSocketSHM::GetId()
{
return fId;
}
bool FairMQSocketSHM::Bind(const string& address) bool FairMQSocketSHM::Bind(const string& address)
{ {
// LOG(info) << "bind socket " << fId << " on " << address; // LOG(info) << "bind socket " << fId << " on " << address;
@ -109,6 +101,16 @@ void FairMQSocketSHM::Connect(const string& address)
} }
} }
int FairMQSocketSHM::Send(FairMQMessagePtr& msg) { return Send(msg, 0); }
int FairMQSocketSHM::SendAsync(FairMQMessagePtr& msg) { return Send(msg, ZMQ_DONTWAIT); }
int FairMQSocketSHM::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); }
int FairMQSocketSHM::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, ZMQ_DONTWAIT); }
int64_t FairMQSocketSHM::Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, 0); }
int64_t FairMQSocketSHM::SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, ZMQ_DONTWAIT); }
int64_t FairMQSocketSHM::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, 0); }
int64_t FairMQSocketSHM::ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, ZMQ_DONTWAIT); }
int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags) int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags)
{ {
int nbytes = -1; int nbytes = -1;

View File

@ -23,16 +23,20 @@ class FairMQSocketSHM : public FairMQSocket
FairMQSocketSHM(const FairMQSocketSHM&) = delete; FairMQSocketSHM(const FairMQSocketSHM&) = delete;
FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete; FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete;
std::string GetId() override; std::string GetId() override { return fId; }
bool Bind(const std::string& address) override; bool Bind(const std::string& address) override;
void Connect(const std::string& address) override; void Connect(const std::string& address) override;
int Send(FairMQMessagePtr& msg, const int flags = 0) override; int Send(FairMQMessagePtr& msg) override;
int Receive(FairMQMessagePtr& msg, const int flags = 0) override; int SendAsync(FairMQMessagePtr& msg) override;
int Receive(FairMQMessagePtr& msg) override;
int ReceiveAsync(FairMQMessagePtr& msg) override;
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0) override; int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0) override; int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
void* GetSocket() const override; void* GetSocket() const override;
int GetSocket(int nothing) const override; int GetSocket(int nothing) const override;
@ -68,6 +72,11 @@ class FairMQSocketSHM : public FairMQSocket
std::atomic<unsigned long> fMessagesRx; std::atomic<unsigned long> fMessagesRx;
static std::atomic<bool> fInterrupted; static std::atomic<bool> fInterrupted;
int Send(FairMQMessagePtr& msg, const int flags);
int Receive(FairMQMessagePtr& msg, const int flags);
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags);
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags);
}; };
#endif /* FAIRMQSOCKETSHM_H_ */ #endif /* FAIRMQSOCKETSHM_H_ */

View File

@ -28,7 +28,7 @@ class PairLeft : public FairMQDevice
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
auto Run() -> void auto Run() -> void override
{ {
auto msg = FairMQMessagePtr{NewMessage()}; auto msg = FairMQMessagePtr{NewMessage()};
Send(msg, "data"); Send(msg, "data");

View File

@ -28,7 +28,7 @@ class PairRight : public FairMQDevice
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
auto Run() -> void auto Run() -> void override
{ {
auto msg = FairMQMessagePtr{NewMessage()}; auto msg = FairMQMessagePtr{NewMessage()};

View File

@ -1,16 +1,10 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/**
* FairMQSocketZMQ.cxx
*
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko
*/
#include "FairMQSocketZMQ.h" #include "FairMQSocketZMQ.h"
#include "FairMQMessageZMQ.h" #include "FairMQMessageZMQ.h"
@ -23,16 +17,13 @@ using namespace std;
atomic<bool> FairMQSocketZMQ::fInterrupted(false); atomic<bool> FairMQSocketZMQ::fInterrupted(false);
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context) FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) : fSocket(nullptr)
, fSocket(nullptr) , fId(id + "." + name + "." + type)
, fId()
, fBytesTx(0) , fBytesTx(0)
, fBytesRx(0) , fBytesRx(0)
, fMessagesTx(0) , fMessagesTx(0)
, fMessagesRx(0) , fMessagesRx(0)
{ {
fId = id + "." + name + "." + type;
assert(context); assert(context);
fSocket = zmq_socket(context, GetConstant(type)); fSocket = zmq_socket(context, GetConstant(type));
@ -111,6 +102,16 @@ void FairMQSocketZMQ::Connect(const string& address)
} }
} }
int FairMQSocketZMQ::Send(FairMQMessagePtr& msg) { return Send(msg, 0); }
int FairMQSocketZMQ::SendAsync(FairMQMessagePtr& msg) { return Send(msg, ZMQ_DONTWAIT); }
int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); }
int FairMQSocketZMQ::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, ZMQ_DONTWAIT); }
int64_t FairMQSocketZMQ::Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, 0); }
int64_t FairMQSocketZMQ::SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Send(msgVec, ZMQ_DONTWAIT); }
int64_t FairMQSocketZMQ::Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, 0); }
int64_t FairMQSocketZMQ::ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) { return Receive(msgVec, ZMQ_DONTWAIT); }
int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags) int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags)
{ {
int nbytes = -1; int nbytes = -1;
@ -202,7 +203,6 @@ int64_t FairMQSocketZMQ::Send(vector<FairMQMessagePtr>& msgVec, const int flags)
while (true) while (true)
{ {
totalSize = 0; totalSize = 0;
nbytes = -1;
repeat = false; repeat = false;
for (unsigned int i = 0; i < vecSize; ++i) for (unsigned int i = 0; i < vecSize; ++i)

View File

@ -1,16 +1,10 @@
/******************************************************************************** /********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* * * *
* This software is distributed under the terms of the * * This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
/**
* FairMQSocketZMQ.h
*
* @since 2012-12-05
* @author D. Klein, A. Rybalchenko
*/
#ifndef FAIRMQSOCKETZMQ_H_ #ifndef FAIRMQSOCKETZMQ_H_
#define FAIRMQSOCKETZMQ_H_ #define FAIRMQSOCKETZMQ_H_
@ -29,36 +23,41 @@ class FairMQSocketZMQ : public FairMQSocket
FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ(const FairMQSocketZMQ&) = delete;
FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete;
virtual std::string GetId(); std::string GetId() override;
virtual bool Bind(const std::string& address); bool Bind(const std::string& address) override;
virtual void Connect(const std::string& address); void Connect(const std::string& address) override;
virtual int Send(FairMQMessagePtr& msg, const int flags = 0); int Send(FairMQMessagePtr& msg) override;
virtual int Receive(FairMQMessagePtr& msg, const int flags = 0); int SendAsync(FairMQMessagePtr& msg) override;
int Receive(FairMQMessagePtr& msg) override;
int ReceiveAsync(FairMQMessagePtr& msg) override;
virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0); int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0); int64_t SendAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
int64_t ReceiveAsync(std::vector<std::unique_ptr<FairMQMessage>>& msgVec) override;
virtual void* GetSocket() const; void* GetSocket() const override;
virtual int GetSocket(int nothing) const; int GetSocket(int nothing) const override;
virtual void Close();
virtual void Interrupt(); void Close() override;
virtual void Resume();
virtual void SetOption(const std::string& option, const void* value, size_t valueSize); void Interrupt() override;
virtual void GetOption(const std::string& option, void* value, size_t* valueSize); void Resume() override;
virtual unsigned long GetBytesTx() const; void SetOption(const std::string& option, const void* value, size_t valueSize) override;
virtual unsigned long GetBytesRx() const; void GetOption(const std::string& option, void* value, size_t* valueSize) override;
virtual unsigned long GetMessagesTx() const;
virtual unsigned long GetMessagesRx() const;
virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method); unsigned long GetBytesTx() const override;
virtual int GetSendTimeout() const; unsigned long GetBytesRx() const override;
virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method); unsigned long GetMessagesTx() const override;
virtual int GetReceiveTimeout() const; unsigned long GetMessagesRx() const override;
bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method) override;
int GetSendTimeout() const override;
bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method) override;
int GetReceiveTimeout() const override;
static int GetConstant(const std::string& constant); static int GetConstant(const std::string& constant);
@ -73,6 +72,11 @@ class FairMQSocketZMQ : public FairMQSocket
std::atomic<unsigned long> fMessagesRx; std::atomic<unsigned long> fMessagesRx;
static std::atomic<bool> fInterrupted; static std::atomic<bool> fInterrupted;
int Send(FairMQMessagePtr& msg, const int flags = 0);
int Receive(FairMQMessagePtr& msg, const int flags = 0);
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int flags = 0);
}; };
#endif /* FAIRMQSOCKETZMQ_H_ */ #endif /* FAIRMQSOCKETZMQ_H_ */