From 3670dd8835a562727b908dfda42fb114b29086c9 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Fri, 16 Feb 2018 19:38:59 +0100 Subject: [PATCH] FairMQ: Remove unused Attach method and unused socket flags --- fairmq/FairMQChannel.cxx | 21 ++---- fairmq/FairMQChannel.h | 3 - fairmq/FairMQSocket.cxx | 75 +------------------- fairmq/FairMQSocket.h | 24 +++---- fairmq/nanomsg/FairMQSocketNN.cxx | 17 +++-- fairmq/nanomsg/FairMQSocketNN.h | 25 ++++--- fairmq/shmem/FairMQSocketSHM.cxx | 24 ++++--- fairmq/shmem/FairMQSocketSHM.h | 19 +++-- fairmq/test/helper/devices/TestPairLeft.cxx | 2 +- fairmq/test/helper/devices/TestPairRight.cxx | 2 +- fairmq/zeromq/FairMQSocketZMQ.cxx | 26 +++---- fairmq/zeromq/FairMQSocketZMQ.h | 62 ++++++++-------- 12 files changed, 117 insertions(+), 183 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index ddd07cd0..d84f550e 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -42,8 +42,6 @@ FairMQChannel::FairMQChannel() , fChannelCmdSocket(nullptr) , fTransportType(FairMQ::Transport::DEFAULT) , fTransportFactory(nullptr) - , fNoBlockFlag(0) - , fSndMoreFlag(0) , fMultipart(false) , fModified(true) , fReset(false) @@ -67,8 +65,6 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fChannelCmdSocket(nullptr) , fTransportType(FairMQ::Transport::DEFAULT) , fTransportFactory(nullptr) - , fNoBlockFlag(0) - , fSndMoreFlag(0) , fMultipart(false) , fModified(true) , fReset(false) @@ -92,8 +88,6 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared , fChannelCmdSocket(nullptr) , fTransportType(factory->GetType()) , fTransportFactory(factory) - , fNoBlockFlag(0) - , fSndMoreFlag(0) , fMultipart(false) , fModified(true) , fReset(false) @@ -117,8 +111,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fChannelCmdSocket(nullptr) , fTransportType(FairMQ::Transport::DEFAULT) , fTransportFactory(nullptr) - , fNoBlockFlag(chan.fNoBlockFlag) - , fSndMoreFlag(chan.fSndMoreFlag) , fMultipart(chan.fMultipart) , fModified(chan.fModified) , fReset(false) @@ -142,8 +134,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fChannelCmdSocket = nullptr; fTransportType = FairMQ::Transport::DEFAULT; fTransportFactory = nullptr; - fNoBlockFlag = chan.fNoBlockFlag; - fSndMoreFlag = chan.fSndMoreFlag; return *this; } @@ -677,9 +667,6 @@ bool FairMQChannel::InitCommandInterface() { fChannelCmdSocket->Connect("inproc://commands"); - fNoBlockFlag = fChannelCmdSocket->NOBLOCK; - fSndMoreFlag = fChannelCmdSocket->SNDMORE; - fPoller = fTransportFactory->CreatePoller(*fChannelCmdSocket, *fSocket); return true; @@ -754,13 +741,13 @@ int FairMQChannel::Receive(unique_ptr& msg, int rcvTimeoutInMs) c int FairMQChannel::SendAsync(unique_ptr& msg) const { CheckCompatibility(msg); - return fSocket->Send(msg, fNoBlockFlag); + return fSocket->SendAsync(msg); } int FairMQChannel::ReceiveAsync(unique_ptr& msg) const { CheckCompatibility(msg); - return fSocket->Receive(msg, fNoBlockFlag); + return fSocket->ReceiveAsync(msg); } int64_t FairMQChannel::Send(vector>& msgVec) const @@ -820,7 +807,7 @@ int64_t FairMQChannel::Receive(vector>& msgVec, int rc int64_t FairMQChannel::SendAsync(vector>& msgVec) const { CheckCompatibility(msgVec); - return fSocket->Send(msgVec, fNoBlockFlag); + return fSocket->SendAsync(msgVec); } /// Receives a vector of messages in non-blocking mode. @@ -831,7 +818,7 @@ int64_t FairMQChannel::SendAsync(vector>& msgVec) cons int64_t FairMQChannel::ReceiveAsync(vector>& msgVec) const { CheckCompatibility(msgVec); - return fSocket->Receive(msgVec, fNoBlockFlag); + return fSocket->ReceiveAsync(msgVec); } inline bool FairMQChannel::HandleUnblock() const diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 192bd267..5631d016 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -318,9 +318,6 @@ class FairMQChannel FairMQ::Transport fTransportType; std::shared_ptr fTransportFactory; - int fNoBlockFlag; - int fSndMoreFlag; - bool CheckCompatibility(std::unique_ptr& msg) const; bool CheckCompatibility(std::vector>& msgVec) const; diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx index 0346e472..0855f465 100644 --- a/fairmq/FairMQSocket.cxx +++ b/fairmq/FairMQSocket.cxx @@ -1,80 +1,9 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQSocket.cxx - * - * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQSocket.h" -#include - -bool FairMQSocket::Attach(const std::string& config, bool serverish) -{ - if (config.empty()) - { - return false; - } - if (config.size() < 2) - { - return false; - } - - const char* endpoints = config.c_str(); - - // We hold each individual endpoint here - char endpoint [256]; - while (*endpoints) - { - const char *delimiter = strchr(endpoints, ','); - if (!delimiter) - { - delimiter = endpoints + strlen(endpoints); - } - if (delimiter - endpoints > 255) - { - return false; - } - memcpy(endpoint, endpoints, delimiter - endpoints); - endpoint[delimiter - endpoints] = 0; - - bool rc = false; - - if (endpoint [0] == '@') - { - rc = Bind(endpoint + 1); - } - else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' ) - { - Connect(endpoint + 1); - } - else if (serverish) - { - rc = Bind(endpoint); - } - else - { - Connect(endpoint); - } - - if (!rc) - { - return false; - } - - if (*delimiter == 0) - { - break; - } - - endpoints = delimiter + 1; - } - - return true; -} +#include diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index e79c92f4..d565b9ae 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -18,30 +18,26 @@ class FairMQSocket { public: - const int SNDMORE; - const int RCVMORE; - const int NOBLOCK; - - FairMQSocket(int sndMore, int rcvMore, int noBlock) - : SNDMORE(sndMore) - , RCVMORE(rcvMore) - , NOBLOCK(noBlock) - {} + FairMQSocket() {} virtual std::string GetId() = 0; virtual bool Bind(const std::string& address) = 0; virtual void Connect(const std::string& address) = 0; - virtual bool Attach(const std::string& address, bool serverish = false); - virtual int Send(FairMQMessagePtr& msg, const int flags = 0) = 0; - virtual int Receive(FairMQMessagePtr& msg, const int flags = 0) = 0; + virtual int Send(FairMQMessagePtr& msg) = 0; + virtual int SendAsync(FairMQMessagePtr& msg) = 0; + virtual int Receive(FairMQMessagePtr& msg) = 0; + virtual int ReceiveAsync(FairMQMessagePtr& msg) = 0; - virtual int64_t Send(std::vector>& msgVec, const int flags = 0) = 0; - virtual int64_t Receive(std::vector>& msgVec, const int flags = 0) = 0; + virtual int64_t Send(std::vector>& msgVec) = 0; + virtual int64_t SendAsync(std::vector>& msgVec) = 0; + virtual int64_t Receive(std::vector>& msgVec) = 0; + virtual int64_t ReceiveAsync(std::vector>& msgVec) = 0; virtual void* GetSocket() const = 0; virtual int GetSocket(int nothing) const = 0; + virtual void Close() = 0; virtual void Interrupt() = 0; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 27d1f0af..829e3c53 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -33,16 +33,13 @@ using namespace std; atomic FairMQSocketNN::fInterrupted(false); FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const string& id /*= ""*/) - : FairMQSocket(0, 0, NN_DONTWAIT) - , fSocket(-1) - , fId() + : fSocket(-1) + , fId(id + "." + name + "." + type) , fBytesTx(0) , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) { - fId = id + "." + name + "." + type; - if (type == "router" || type == "dealer") { // Additional info about using the sockets ROUTER and DEALER with nanomsg can be found in: @@ -121,6 +118,16 @@ void FairMQSocketNN::Connect(const string& address) } } +int FairMQSocketNN::Send(FairMQMessagePtr& msg) { return Send(msg, 0); } +int FairMQSocketNN::SendAsync(FairMQMessagePtr& msg) { return Send(msg, NN_DONTWAIT); } +int FairMQSocketNN::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); } +int FairMQSocketNN::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, NN_DONTWAIT); } + +int64_t FairMQSocketNN::Send(std::vector>& msgVec) { return Send(msgVec, 0); } +int64_t FairMQSocketNN::SendAsync(std::vector>& msgVec) { return Send(msgVec, NN_DONTWAIT); } +int64_t FairMQSocketNN::Receive(std::vector>& msgVec) { return Receive(msgVec, 0); } +int64_t FairMQSocketNN::ReceiveAsync(std::vector>& msgVec) { return Receive(msgVec, NN_DONTWAIT); } + int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) { int nbytes = -1; diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 522414be..c256209e 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -1,16 +1,10 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQSocketNN.h - * - * @since 2013-12-05 - * @author A. Rybalchenko - */ #ifndef FAIRMQSOCKETNN_H_ #define FAIRMQSOCKETNN_H_ @@ -33,11 +27,15 @@ class FairMQSocketNN : public FairMQSocket virtual bool Bind(const std::string& address); virtual void Connect(const std::string& address); - virtual int Send(FairMQMessagePtr& msg, const int flags = 0); - virtual int Receive(FairMQMessagePtr& msg, const int flags = 0); + virtual int Send(FairMQMessagePtr& msg); + virtual int SendAsync(FairMQMessagePtr& msg); + virtual int Receive(FairMQMessagePtr& msg); + virtual int ReceiveAsync(FairMQMessagePtr& msg); - virtual int64_t Send(std::vector>& msgVec, const int flags = 0); - virtual int64_t Receive(std::vector>& msgVec, const int flags = 0); + virtual int64_t Send(std::vector>& msgVec); + virtual int64_t SendAsync(std::vector>& msgVec); + virtual int64_t Receive(std::vector>& msgVec); + virtual int64_t ReceiveAsync(std::vector>& msgVec); virtual void* GetSocket() const; virtual int GetSocket(int nothing) const; @@ -71,6 +69,11 @@ class FairMQSocketNN : public FairMQSocket std::atomic fMessagesTx; std::atomic fMessagesRx; static std::atomic fInterrupted; + + int Send(FairMQMessagePtr& msg, const int flags = 0); + int Receive(FairMQMessagePtr& msg, const int flags = 0); + int64_t Send(std::vector>& msgVec, const int flags = 0); + int64_t Receive(std::vector>& msgVec, const int flags = 0); }; #endif /* FAIRMQSOCKETNN_H_ */ diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 0bfe7cb9..ade0d7f5 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -20,17 +20,14 @@ using namespace fair::mq::shmem; atomic FairMQSocketSHM::fInterrupted(false); FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const string& name, const string& id /*= ""*/, void* context) - : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) - , fSocket(nullptr) + : fSocket(nullptr) , fManager(manager) - , fId() + , fId(id + "." + name + "." + type) , fBytesTx(0) , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) { - fId = id + "." + name + "." + type; - assert(context); fSocket = zmq_socket(context, GetConstant(type)); @@ -76,11 +73,6 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str // LOG(info) << "created socket " << fId; } -string FairMQSocketSHM::GetId() -{ - return fId; -} - bool FairMQSocketSHM::Bind(const string& address) { // LOG(info) << "bind socket " << fId << " on " << address; @@ -109,6 +101,16 @@ void FairMQSocketSHM::Connect(const string& address) } } +int FairMQSocketSHM::Send(FairMQMessagePtr& msg) { return Send(msg, 0); } +int FairMQSocketSHM::SendAsync(FairMQMessagePtr& msg) { return Send(msg, ZMQ_DONTWAIT); } +int FairMQSocketSHM::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); } +int FairMQSocketSHM::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, ZMQ_DONTWAIT); } + +int64_t FairMQSocketSHM::Send(std::vector>& msgVec) { return Send(msgVec, 0); } +int64_t FairMQSocketSHM::SendAsync(std::vector>& msgVec) { return Send(msgVec, ZMQ_DONTWAIT); } +int64_t FairMQSocketSHM::Receive(std::vector>& msgVec) { return Receive(msgVec, 0); } +int64_t FairMQSocketSHM::ReceiveAsync(std::vector>& msgVec) { return Receive(msgVec, ZMQ_DONTWAIT); } + int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int flags) { int nbytes = -1; diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index 876ca171..e5541c3f 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -23,16 +23,20 @@ class FairMQSocketSHM : public FairMQSocket FairMQSocketSHM(const FairMQSocketSHM&) = delete; FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete; - std::string GetId() override; + std::string GetId() override { return fId; } bool Bind(const std::string& address) override; void Connect(const std::string& address) override; - int Send(FairMQMessagePtr& msg, const int flags = 0) override; - int Receive(FairMQMessagePtr& msg, const int flags = 0) override; + int Send(FairMQMessagePtr& msg) override; + int SendAsync(FairMQMessagePtr& msg) override; + int Receive(FairMQMessagePtr& msg) override; + int ReceiveAsync(FairMQMessagePtr& msg) override; - int64_t Send(std::vector>& msgVec, const int flags = 0) override; - int64_t Receive(std::vector>& msgVec, const int flags = 0) override; + int64_t Send(std::vector>& msgVec) override; + int64_t SendAsync(std::vector>& msgVec) override; + int64_t Receive(std::vector>& msgVec) override; + int64_t ReceiveAsync(std::vector>& msgVec) override; void* GetSocket() const override; int GetSocket(int nothing) const override; @@ -68,6 +72,11 @@ class FairMQSocketSHM : public FairMQSocket std::atomic fMessagesRx; static std::atomic fInterrupted; + + int Send(FairMQMessagePtr& msg, const int flags); + int Receive(FairMQMessagePtr& msg, const int flags); + int64_t Send(std::vector>& msgVec, const int flags); + int64_t Receive(std::vector>& msgVec, const int flags); }; #endif /* FAIRMQSOCKETSHM_H_ */ diff --git a/fairmq/test/helper/devices/TestPairLeft.cxx b/fairmq/test/helper/devices/TestPairLeft.cxx index 0d748584..d82b85e2 100644 --- a/fairmq/test/helper/devices/TestPairLeft.cxx +++ b/fairmq/test/helper/devices/TestPairLeft.cxx @@ -28,7 +28,7 @@ class PairLeft : public FairMQDevice std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - auto Run() -> void + auto Run() -> void override { auto msg = FairMQMessagePtr{NewMessage()}; Send(msg, "data"); diff --git a/fairmq/test/helper/devices/TestPairRight.cxx b/fairmq/test/helper/devices/TestPairRight.cxx index 8e2e55fa..d8328723 100644 --- a/fairmq/test/helper/devices/TestPairRight.cxx +++ b/fairmq/test/helper/devices/TestPairRight.cxx @@ -28,7 +28,7 @@ class PairRight : public FairMQDevice std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - auto Run() -> void + auto Run() -> void override { auto msg = FairMQMessagePtr{NewMessage()}; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 1bcb9e71..e5560152 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -1,16 +1,10 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQSocketZMQ.cxx - * - * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko - */ #include "FairMQSocketZMQ.h" #include "FairMQMessageZMQ.h" @@ -23,16 +17,13 @@ using namespace std; atomic FairMQSocketZMQ::fInterrupted(false); FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context) - : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) - , fSocket(nullptr) - , fId() + : fSocket(nullptr) + , fId(id + "." + name + "." + type) , fBytesTx(0) , fBytesRx(0) , fMessagesTx(0) , fMessagesRx(0) { - fId = id + "." + name + "." + type; - assert(context); fSocket = zmq_socket(context, GetConstant(type)); @@ -111,6 +102,16 @@ void FairMQSocketZMQ::Connect(const string& address) } } +int FairMQSocketZMQ::Send(FairMQMessagePtr& msg) { return Send(msg, 0); } +int FairMQSocketZMQ::SendAsync(FairMQMessagePtr& msg) { return Send(msg, ZMQ_DONTWAIT); } +int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg) { return Receive(msg, 0); } +int FairMQSocketZMQ::ReceiveAsync(FairMQMessagePtr& msg) { return Receive(msg, ZMQ_DONTWAIT); } + +int64_t FairMQSocketZMQ::Send(std::vector>& msgVec) { return Send(msgVec, 0); } +int64_t FairMQSocketZMQ::SendAsync(std::vector>& msgVec) { return Send(msgVec, ZMQ_DONTWAIT); } +int64_t FairMQSocketZMQ::Receive(std::vector>& msgVec) { return Receive(msgVec, 0); } +int64_t FairMQSocketZMQ::ReceiveAsync(std::vector>& msgVec) { return Receive(msgVec, ZMQ_DONTWAIT); } + int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int flags) { int nbytes = -1; @@ -202,7 +203,6 @@ int64_t FairMQSocketZMQ::Send(vector& msgVec, const int flags) while (true) { totalSize = 0; - nbytes = -1; repeat = false; for (unsigned int i = 0; i < vecSize; ++i) diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 8db59d3b..fa96ab8b 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -1,16 +1,10 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQSocketZMQ.h - * - * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko - */ #ifndef FAIRMQSOCKETZMQ_H_ #define FAIRMQSOCKETZMQ_H_ @@ -29,36 +23,41 @@ class FairMQSocketZMQ : public FairMQSocket FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; - virtual std::string GetId(); + std::string GetId() override; - virtual bool Bind(const std::string& address); - virtual void Connect(const std::string& address); + bool Bind(const std::string& address) override; + void Connect(const std::string& address) override; - virtual int Send(FairMQMessagePtr& msg, const int flags = 0); - virtual int Receive(FairMQMessagePtr& msg, const int flags = 0); + int Send(FairMQMessagePtr& msg) override; + int SendAsync(FairMQMessagePtr& msg) override; + int Receive(FairMQMessagePtr& msg) override; + int ReceiveAsync(FairMQMessagePtr& msg) override; - virtual int64_t Send(std::vector>& msgVec, const int flags = 0); - virtual int64_t Receive(std::vector>& msgVec, const int flags = 0); + int64_t Send(std::vector>& msgVec) override; + int64_t SendAsync(std::vector>& msgVec) override; + int64_t Receive(std::vector>& msgVec) override; + int64_t ReceiveAsync(std::vector>& msgVec) override; - virtual void* GetSocket() const; - virtual int GetSocket(int nothing) const; - virtual void Close(); + void* GetSocket() const override; + int GetSocket(int nothing) const override; - virtual void Interrupt(); - virtual void Resume(); + void Close() override; - virtual void SetOption(const std::string& option, const void* value, size_t valueSize); - virtual void GetOption(const std::string& option, void* value, size_t* valueSize); + void Interrupt() override; + void Resume() override; - virtual unsigned long GetBytesTx() const; - virtual unsigned long GetBytesRx() const; - virtual unsigned long GetMessagesTx() const; - virtual unsigned long GetMessagesRx() const; + void SetOption(const std::string& option, const void* value, size_t valueSize) override; + void GetOption(const std::string& option, void* value, size_t* valueSize) override; - virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method); - virtual int GetSendTimeout() const; - virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method); - virtual int GetReceiveTimeout() const; + unsigned long GetBytesTx() const override; + unsigned long GetBytesRx() const override; + unsigned long GetMessagesTx() const override; + unsigned long GetMessagesRx() const override; + + bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method) override; + int GetSendTimeout() const override; + bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method) override; + int GetReceiveTimeout() const override; static int GetConstant(const std::string& constant); @@ -73,6 +72,11 @@ class FairMQSocketZMQ : public FairMQSocket std::atomic fMessagesRx; static std::atomic fInterrupted; + + int Send(FairMQMessagePtr& msg, const int flags = 0); + int Receive(FairMQMessagePtr& msg, const int flags = 0); + int64_t Send(std::vector>& msgVec, const int flags = 0); + int64_t Receive(std::vector>& msgVec, const int flags = 0); }; #endif /* FAIRMQSOCKETZMQ_H_ */