/******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #ifndef FAIRMQSOCKETSHM_H_ #define FAIRMQSOCKETSHM_H_ #include "FairMQSocket.h" #include "FairMQMessage.h" #include #include #include // unique_ptr class FairMQSocketSHM : public FairMQSocket { public: FairMQSocketSHM(fair::mq::shmem::Manager& manager, const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr); FairMQSocketSHM(const FairMQSocketSHM&) = delete; FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete; std::string GetId() override { return fId; } bool Bind(const std::string& address) override; void Connect(const std::string& address) override; int Send(FairMQMessagePtr& msg, const int timeout = 0) override; int Receive(FairMQMessagePtr& msg, const int timeout = 0) override; int64_t Send(std::vector>& msgVec, const int timeout = 0) override; int64_t Receive(std::vector>& msgVec, const int timeout = 0) override; int TrySend(FairMQMessagePtr& msg) override; int TryReceive(FairMQMessagePtr& msg) override; int64_t TrySend(std::vector>& msgVec) override; int64_t TryReceive(std::vector>& msgVec) override; void* GetSocket() const override; int GetSocket(int nothing) const override; void Close() override; static void Interrupt(); static void Resume(); void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; unsigned long GetBytesTx() const override; unsigned long GetBytesRx() const override; unsigned long GetMessagesTx() const override; unsigned long GetMessagesRx() const override; bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method) override; int GetSendTimeout() const override; bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method) override; int GetReceiveTimeout() const override; static int GetConstant(const std::string& constant); ~FairMQSocketSHM() override; private: void* fSocket; fair::mq::shmem::Manager& fManager; std::string fId; std::atomic fBytesTx; std::atomic fBytesRx; std::atomic fMessagesTx; std::atomic fMessagesRx; static std::atomic fInterrupted; int fSndTimeout; int fRcvTimeout; int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); int64_t SendImpl(std::vector>& msgVec, const int flags, const int timeout); int64_t ReceiveImpl(std::vector>& msgVec, const int flags, const int timeout); }; #endif /* FAIRMQSOCKETSHM_H_ */