diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index bb16d458..d62c5785 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -85,7 +85,6 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin , fMultipart(false) , fModified(true) , fReset(false) - , fMtx() {} FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties) @@ -141,33 +140,26 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) return *this; } - { - // TODO: replace this with std::scoped_lock (c++17) - lock(fMtx, chan.fMtx); - lock_guard lock1(fMtx, adopt_lock); - lock_guard lock2(chan.fMtx, adopt_lock); - - fTransportFactory = nullptr; - fTransportType = chan.fTransportType; - fSocket = nullptr; - fName = chan.fName; - fType = chan.fType; - fMethod = chan.fMethod; - fAddress = chan.fAddress; - fSndBufSize = chan.fSndBufSize; - fRcvBufSize = chan.fRcvBufSize; - fSndKernelSize = chan.fSndKernelSize; - fRcvKernelSize = chan.fRcvKernelSize; - fLinger = chan.fLinger; - fRateLogging = chan.fRateLogging; - fPortRangeMin = chan.fPortRangeMin; - fPortRangeMax = chan.fPortRangeMax; - fAutoBind = chan.fAutoBind; - fIsValid = false; - fMultipart = chan.fMultipart; - fModified = chan.fModified; - fReset = false; - } + fTransportFactory = nullptr; + fTransportType = chan.fTransportType; + fSocket = nullptr; + fName = chan.fName; + fType = chan.fType; + fMethod = chan.fMethod; + fAddress = chan.fAddress; + fSndBufSize = chan.fSndBufSize; + fRcvBufSize = chan.fRcvBufSize; + fSndKernelSize = chan.fSndKernelSize; + fRcvKernelSize = chan.fRcvKernelSize; + fLinger = chan.fLinger; + fRateLogging = chan.fRateLogging; + fPortRangeMin = chan.fPortRangeMin; + fPortRangeMax = chan.fPortRangeMax; + fAutoBind = chan.fAutoBind; + fIsValid = false; + fMultipart = chan.fMultipart; + fModified = chan.fModified; + fReset = false; return *this; } @@ -180,13 +172,11 @@ FairMQSocket & FairMQChannel::GetSocket() const string FairMQChannel::GetName() const { - lock_guard lock(fMtx); return fName; } string FairMQChannel::GetPrefix() const { - lock_guard lock(fMtx); string prefix = fName; prefix = prefix.erase(fName.rfind('[')); return prefix; @@ -194,7 +184,6 @@ string FairMQChannel::GetPrefix() const string FairMQChannel::GetIndex() const { - lock_guard lock(fMtx); string indexStr = fName; indexStr.erase(indexStr.rfind(']')); indexStr.erase(0, indexStr.rfind('[') + 1); @@ -202,307 +191,185 @@ string FairMQChannel::GetIndex() const } string FairMQChannel::GetType() const -try { - lock_guard lock(fMtx); +{ return fType; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetMethod() const -try { - lock_guard lock(fMtx); +{ return fMethod; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetAddress() const -try { - lock_guard lock(fMtx); +{ return fAddress; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetTransportName() const -try { - lock_guard lock(fMtx); +{ return TransportNames.at(fTransportType); -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } Transport FairMQChannel::GetTransportType() const -try { - lock_guard lock(fMtx); +{ return fTransportType; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetTransportType: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } - int FairMQChannel::GetSndBufSize() const -try { - lock_guard lock(fMtx); +{ return fSndBufSize; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRcvBufSize() const -try { - lock_guard lock(fMtx); +{ return fRcvBufSize; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetSndKernelSize() const -try { - lock_guard lock(fMtx); +{ return fSndKernelSize; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRcvKernelSize() const -try { - lock_guard lock(fMtx); +{ return fRcvKernelSize; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetLinger() const -try { - lock_guard lock(fMtx); +{ return fLinger; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRateLogging() const -try { - lock_guard lock(fMtx); +{ return fRateLogging; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetPortRangeMin() const -try { - lock_guard lock(fMtx); +{ return fPortRangeMin; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetPortRangeMax() const -try { - lock_guard lock(fMtx); +{ return fPortRangeMax; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::GetAutoBind() const -try { - lock_guard lock(fMtx); +{ return fAutoBind; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateType(const string& type) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fType = type; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateType: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateMethod(const string& method) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fMethod = method; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateAddress(const string& address) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fAddress = address; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateTransport(const string& transport) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fTransportType = TransportTypes.at(transport); fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateSndBufSize(const int sndBufSize) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fSndBufSize = sndBufSize; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fRcvBufSize = rcvBufSize; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fSndKernelSize = sndKernelSize; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fRcvKernelSize = rcvKernelSize; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateLinger(const int duration) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fLinger = duration; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRateLogging(const int rateLogging) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fRateLogging = rateLogging; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdatePortRangeMin(const int minPort) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fPortRangeMin = minPort; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMin: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdatePortRangeMax(const int maxPort) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fPortRangeMax = maxPort; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMax: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateAutoBind(const bool autobind) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fAutoBind = autobind; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateAutoBind: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } auto FairMQChannel::SetModified(const bool modified) -> void -try { - lock_guard lock(fMtx); +{ fModified = modified; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateName(const string& name) -try { - lock_guard lock(fMtx); +{ fIsValid = false; fName = name; fModified = true; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::UpdateName: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::IsValid() const -try { - lock_guard lock(fMtx); +{ return fIsValid; -} catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what(); - throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::Validate() try { - lock_guard lock(fMtx); stringstream ss; ss << "Validating channel '" << fName << "'... "; @@ -652,8 +519,6 @@ try { void FairMQChannel::Init() { - lock_guard lock(fMtx); - fSocket = fTransportFactory->CreateSocket(fType, fName); // set linger duration (how long socket should wait for outstanding transfers before shutdown) @@ -674,14 +539,11 @@ void FairMQChannel::Init() bool FairMQChannel::ConnectEndpoint(const string& endpoint) { - lock_guard lock(fMtx); return fSocket->Connect(endpoint); } bool FairMQChannel::BindEndpoint(string& endpoint) { - lock_guard lock(fMtx); - // try to bind to the configured port. If it fails, try random one (if AutoBind is on). if (fSocket->Bind(endpoint)) { return true; @@ -725,7 +587,6 @@ bool FairMQChannel::BindEndpoint(string& endpoint) void FairMQChannel::ResetChannel() { - lock_guard lock(fMtx); fIsValid = false; // TODO: implement channel resetting } diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index bab10730..443b6dc9 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -25,6 +25,12 @@ #include // std::move #include // int64_t +/** + * @class FairMQChannel FairMQChannel.h + * @brief Wrapper class for FairMQSocket and related methods + * + * The class is not thread-safe. + */ class FairMQChannel { friend class FairMQDevice; @@ -100,7 +106,7 @@ class FairMQChannel /// Get channel name /// @return Returns full channel name (e.g. "data[0]") - std::string GetName() const ; + std::string GetName() const; /// Get channel prefix /// @return Returns channel prefix (e.g. "data" in "data[0]") @@ -302,10 +308,7 @@ class FairMQChannel unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); } unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); } - auto Transport() -> FairMQTransportFactory* - { - return fTransportFactory.get(); - }; + auto Transport() -> FairMQTransportFactory* { return fTransportFactory.get(); }; template FairMQMessagePtr NewMessage(Args&&... args) @@ -372,8 +375,6 @@ class FairMQChannel bool fModified; bool fReset; - mutable std::mutex fMtx; - void CheckSendCompatibility(FairMQMessagePtr& msg) { if (fTransportType != msg->GetType()) {