From fbb003b50f5da860b07fa46bbbf21603a3dd4319 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Mon, 20 May 2019 14:26:29 +0200 Subject: [PATCH] FairMQChannel: defaults values, copy-ability --- fairmq/FairMQChannel.cxx | 127 +++++++++++++++------------ fairmq/FairMQChannel.h | 58 ++++++++---- fairmq/options/FairMQProgOptions.cxx | 5 +- 3 files changed, 116 insertions(+), 74 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index fd3fa301..4a293906 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -16,47 +16,55 @@ using namespace std; -mutex FairMQChannel::fChannelMutex; - FairMQChannel::FairMQChannel() - : FairMQChannel("", "unspecified", "unspecified", "unspecified", nullptr) + : FairMQChannel(DefaultName, DefaultType, DefaultMethod, DefaultAddress, nullptr) +{} + +FairMQChannel::FairMQChannel(const string& name) + : FairMQChannel(name, DefaultType, DefaultMethod, DefaultAddress, nullptr) {} FairMQChannel::FairMQChannel(const string& type, const string& method, const string& address) - : FairMQChannel("", type, method, address, nullptr) + : FairMQChannel(DefaultName, type, method, address, nullptr) {} FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr factory) - : FairMQChannel(name, type, "unspecified", "unspecified", factory) + : FairMQChannel(name, type, DefaultMethod, DefaultAddress, factory) {} FairMQChannel::FairMQChannel(const string& name, const string& type, const string& method, const string& address, shared_ptr factory) : fTransportFactory(factory) - , fTransportType(factory ? factory->GetType() : fair::mq::Transport::DEFAULT) + , fTransportType(factory ? factory->GetType() : DefaultTransportType) , fSocket(factory ? factory->CreateSocket(type, name) : nullptr) + , fName(name) , fType(type) , fMethod(method) , fAddress(address) - , fSndBufSize(1000) - , fRcvBufSize(1000) - , fSndKernelSize(0) - , fRcvKernelSize(0) - , fLinger(500) - , fRateLogging(1) - , fPortRangeMin(22000) - , fPortRangeMax(23000) - , fAutoBind(true) - , fName(name) + , fSndBufSize(DefaultSndBufSize) + , fRcvBufSize(DefaultRcvBufSize) + , fSndKernelSize(DefaultSndKernelSize) + , fRcvKernelSize(DefaultRcvKernelSize) + , fLinger(DefaultLinger) + , fRateLogging(DefaultRateLogging) + , fPortRangeMin(DefaultPortRangeMin) + , fPortRangeMax(DefaultPortRangeMax) + , fAutoBind(DefaultAutoBind) , fIsValid(false) , fMultipart(false) , fModified(true) , fReset(false) + , fMtx() {} FairMQChannel::FairMQChannel(const FairMQChannel& chan) + : FairMQChannel(chan, chan.fName) +{} + +FairMQChannel::FairMQChannel(const FairMQChannel& chan, const string& newName) : fTransportFactory(nullptr) , fTransportType(chan.fTransportType) , fSocket(nullptr) + , fName(newName) , fType(chan.fType) , fMethod(chan.fMethod) , fAddress(chan.fAddress) @@ -69,7 +77,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fPortRangeMin(chan.fPortRangeMin) , fPortRangeMax(chan.fPortRangeMax) , fAutoBind(chan.fAutoBind) - , fName(chan.fName) , fIsValid(false) , fMultipart(chan.fMultipart) , fModified(chan.fModified) @@ -81,6 +88,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fTransportFactory = nullptr; fTransportType = chan.fTransportType; fSocket = nullptr; + fName = chan.fName; fType = chan.fType; fMethod = chan.fMethod; fAddress = chan.fAddress; @@ -93,7 +101,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) fPortRangeMin = chan.fPortRangeMin; fPortRangeMax = chan.fPortRangeMax; fAutoBind = chan.fAutoBind; - fName = chan.fName; fIsValid = false; fMultipart = chan.fMultipart; fModified = chan.fModified; @@ -110,13 +117,13 @@ FairMQSocket & FairMQChannel::GetSocket() const string FairMQChannel::GetName() const { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fName; } string FairMQChannel::GetPrefix() const { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); string prefix = fName; prefix = prefix.erase(fName.rfind('[')); return prefix; @@ -124,7 +131,7 @@ string FairMQChannel::GetPrefix() const string FairMQChannel::GetIndex() const { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); string indexStr = fName; indexStr.erase(indexStr.rfind(']')); indexStr.erase(0, indexStr.rfind('[') + 1); @@ -133,7 +140,7 @@ string FairMQChannel::GetIndex() const string FairMQChannel::GetType() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fType; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what(); @@ -142,7 +149,7 @@ try { string FairMQChannel::GetMethod() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fMethod; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what(); @@ -151,7 +158,7 @@ try { string FairMQChannel::GetAddress() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fAddress; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what(); @@ -160,16 +167,26 @@ try { string FairMQChannel::GetTransportName() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fair::mq::TransportNames.at(fTransportType); } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what(); throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } +fair::mq::Transport FairMQChannel::GetTransportType() const +try { + lock_guard lock(fMtx); + return fTransportType; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetTransportType: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); +} + + int FairMQChannel::GetSndBufSize() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fSndBufSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what(); @@ -178,7 +195,7 @@ try { int FairMQChannel::GetRcvBufSize() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fRcvBufSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what(); @@ -187,7 +204,7 @@ try { int FairMQChannel::GetSndKernelSize() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fSndKernelSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what(); @@ -196,7 +213,7 @@ try { int FairMQChannel::GetRcvKernelSize() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fRcvKernelSize; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what(); @@ -205,7 +222,7 @@ try { int FairMQChannel::GetLinger() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fLinger; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what(); @@ -214,7 +231,7 @@ try { int FairMQChannel::GetRateLogging() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fRateLogging; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what(); @@ -223,7 +240,7 @@ try { int FairMQChannel::GetPortRangeMin() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fPortRangeMin; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what(); @@ -232,7 +249,7 @@ try { int FairMQChannel::GetPortRangeMax() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fPortRangeMax; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what(); @@ -241,7 +258,7 @@ try { bool FairMQChannel::GetAutoBind() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fAutoBind; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what(); @@ -250,7 +267,7 @@ try { void FairMQChannel::UpdateType(const string& type) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fType = type; fModified = true; @@ -261,7 +278,7 @@ try { void FairMQChannel::UpdateMethod(const string& method) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fMethod = method; fModified = true; @@ -272,7 +289,7 @@ try { void FairMQChannel::UpdateAddress(const string& address) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fAddress = address; fModified = true; @@ -283,7 +300,7 @@ try { void FairMQChannel::UpdateTransport(const string& transport) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fTransportType = fair::mq::TransportTypes.at(transport); fModified = true; @@ -294,7 +311,7 @@ try { void FairMQChannel::UpdateSndBufSize(const int sndBufSize) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fSndBufSize = sndBufSize; fModified = true; @@ -305,7 +322,7 @@ try { void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fRcvBufSize = rcvBufSize; fModified = true; @@ -316,7 +333,7 @@ try { void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fSndKernelSize = sndKernelSize; fModified = true; @@ -327,7 +344,7 @@ try { void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fRcvKernelSize = rcvKernelSize; fModified = true; @@ -338,7 +355,7 @@ try { void FairMQChannel::UpdateLinger(const int duration) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fLinger = duration; fModified = true; @@ -349,7 +366,7 @@ try { void FairMQChannel::UpdateRateLogging(const int rateLogging) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fRateLogging = rateLogging; fModified = true; @@ -360,7 +377,7 @@ try { void FairMQChannel::UpdatePortRangeMin(const int minPort) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fPortRangeMin = minPort; fModified = true; @@ -371,7 +388,7 @@ try { void FairMQChannel::UpdatePortRangeMax(const int maxPort) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fPortRangeMax = maxPort; fModified = true; @@ -382,7 +399,7 @@ try { void FairMQChannel::UpdateAutoBind(const bool autobind) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fAutoBind = autobind; fModified = true; @@ -393,7 +410,7 @@ try { auto FairMQChannel::SetModified(const bool modified) -> void try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fModified = modified; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what(); @@ -402,7 +419,7 @@ try { void FairMQChannel::UpdateName(const string& name) try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; fName = name; fModified = true; @@ -413,7 +430,7 @@ try { bool FairMQChannel::IsValid() const try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fIsValid; } catch (exception& e) { LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what(); @@ -422,7 +439,7 @@ try { bool FairMQChannel::Validate() try { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); stringstream ss; ss << "Validating channel '" << fName << "'... "; @@ -570,7 +587,7 @@ try { void FairMQChannel::Init() { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fSocket = fTransportFactory->CreateSocket(fType, fName); @@ -592,14 +609,14 @@ void FairMQChannel::Init() bool FairMQChannel::ConnectEndpoint(const string& endpoint) { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); return fSocket->Connect(endpoint); } bool FairMQChannel::BindEndpoint(string& endpoint) { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); // try to bind to the configured port. If it fails, try random one (if AutoBind is on). if (fSocket->Bind(endpoint)) { @@ -637,7 +654,7 @@ bool FairMQChannel::BindEndpoint(string& endpoint) void FairMQChannel::ResetChannel() { - lock_guard lock(fChannelMutex); + lock_guard lock(fMtx); fIsValid = false; // TODO: implement channel resetting } diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 241846cb..84debb3e 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -32,6 +32,10 @@ class FairMQChannel /// Default constructor FairMQChannel(); + /// Constructor + /// @param name Channel name + FairMQChannel(const std::string& name); + /// Constructor /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) /// @param method Socket method (bind/connect) @@ -55,10 +59,19 @@ class FairMQChannel /// Copy Constructor FairMQChannel(const FairMQChannel&); + /// Copy Constructor (with new name) + FairMQChannel(const FairMQChannel&, const std::string& name); + + /// Move constructor + FairMQChannel(FairMQChannel&&) = default; + /// Assignment operator FairMQChannel& operator=(const FairMQChannel&); - /// Default destructor + /// Move assignment operator + FairMQChannel& operator=(FairMQChannel&&) = default; + + /// Destructor virtual ~FairMQChannel() {} struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; }; @@ -106,10 +119,14 @@ class FairMQChannel /// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") std::string GetAddress() const; - /// Get channel transport ("default", "zeromq", "nanomsg" or "shmem") - /// @return Returns channel transport (e.g. "default", "zeromq", "nanomsg" or "shmem") + /// Get channel transport name ("default", "zeromq", "nanomsg" or "shmem") + /// @return Returns channel transport name (e.g. "default", "zeromq", "nanomsg" or "shmem") std::string GetTransportName() const; + /// Get channel transport type + /// @return Returns channel transport type + fair::mq::Transport GetTransportType() const; + /// Get socket send buffer size (in number of messages) /// @return Returns socket send buffer size (in number of messages) int GetSndBufSize() const; @@ -320,6 +337,7 @@ class FairMQChannel fair::mq::Transport fTransportType; std::unique_ptr fSocket; + std::string fName; std::string fType; std::string fMethod; std::string fAddress; @@ -333,23 +351,17 @@ class FairMQChannel int fPortRangeMax; bool fAutoBind; - std::string fName; - std::atomic fIsValid; - - // use static mutex to make the class easily copyable - // implication: same mutex is used for all instances of the class - // this does not hurt much, because mutex is used only during initialization with very low contention - // possible TODO: improve this - static std::mutex fChannelMutex; + bool fIsValid; bool fMultipart; bool fModified; bool fReset; + mutable std::mutex fMtx; + void CheckSendCompatibility(FairMQMessagePtr& msg) { if (fTransportType != msg->GetType()) { - // LOG(debug) << "Channel type does not match message type. Creating wrapper"; FairMQMessagePtr msgWrapper(NewMessage( msg->GetData(), msg->GetSize(), @@ -365,7 +377,7 @@ class FairMQChannel { for (auto& msg : msgVec) { if (fTransportType != msg->GetType()) { - // LOG(debug) << "Channel type does not match message type. Creating wrapper"; + FairMQMessagePtr msgWrapper(NewMessage( msg->GetData(), msg->GetSize(), @@ -381,7 +393,6 @@ class FairMQChannel void CheckReceiveCompatibility(FairMQMessagePtr& msg) { if (fTransportType != msg->GetType()) { - // LOG(debug) << "Channel type does not match message type. Creating wrapper"; FairMQMessagePtr newMsg(NewMessage()); msg = move(newMsg); } @@ -391,7 +402,7 @@ class FairMQChannel { for (auto& msg : msgVec) { if (fTransportType != msg->GetType()) { - // LOG(debug) << "Channel type does not match message type. Creating wrapper"; + FairMQMessagePtr newMsg(NewMessage()); msg = move(newMsg); } @@ -403,7 +414,24 @@ class FairMQChannel fTransportFactory = factory; fTransportType = factory->GetType(); } + auto SetModified(const bool modified) -> void; + + static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT; + static constexpr const char* DefaultTransportName = "default"; + static constexpr const char* DefaultName = ""; + static constexpr const char* DefaultType = "unspecified"; + static constexpr const char* DefaultMethod = "unspecified"; + static constexpr const char* DefaultAddress = "unspecified"; + static constexpr int DefaultSndBufSize = 1000; + static constexpr int DefaultRcvBufSize = 1000; + static constexpr int DefaultSndKernelSize = 0; + static constexpr int DefaultRcvKernelSize = 0; + static constexpr int DefaultLinger = 500; + static constexpr int DefaultRateLogging = 1; + static constexpr int DefaultPortRangeMin = 22000; + static constexpr int DefaultPortRangeMax = 23000; + static constexpr bool DefaultAutoBind = true; }; #endif /* FAIRMQCHANNEL_H_ */ diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index e50675d5..7e16046a 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -263,10 +263,9 @@ void FairMQProgOptions::ParseChannelsFromCmdLine() void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered) { + // clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values fVarMap.clear(); - // get options from cmd line and store in variable map - // here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options if (allowUnregistered) { po::command_line_parser parser{argc, argv}; parser.options(fAllOptions).allow_unregistered(); @@ -321,8 +320,6 @@ void FairMQProgOptions::UpdateChannelInfo() } } -// read FairMQChannelMap and insert/update corresponding values in variable map -// create key for variable map as follow : channelName.index.memberName void FairMQProgOptions::UpdateMQValues() { for (const auto& p : fFairMQChannelMap) {