diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index cdb81310..7e8022c5 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -45,6 +45,8 @@ FairMQChannel::FairMQChannel() , fNoBlockFlag(0) , fSndMoreFlag(0) , fMultipart(false) + , fModified(true) + , fReset(false) { } @@ -68,6 +70,8 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fNoBlockFlag(0) , fSndMoreFlag(0) , fMultipart(false) + , fModified(true) + , fReset(false) { } @@ -91,6 +95,8 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan) , fNoBlockFlag(chan.fNoBlockFlag) , fSndMoreFlag(chan.fSndMoreFlag) , fMultipart(chan.fMultipart) + , fModified(chan.fModified) + , fReset(false) {} FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) @@ -267,6 +273,7 @@ void FairMQChannel::UpdateType(const string& type) unique_lock lock(fChannelMutex); fIsValid = false; fType = type; + fModified = true; } catch (exception& e) { @@ -282,6 +289,7 @@ void FairMQChannel::UpdateMethod(const string& method) unique_lock lock(fChannelMutex); fIsValid = false; fMethod = method; + fModified = true; } catch (exception& e) { @@ -297,6 +305,7 @@ void FairMQChannel::UpdateAddress(const string& address) unique_lock lock(fChannelMutex); fIsValid = false; fAddress = address; + fModified = true; } catch (exception& e) { @@ -312,6 +321,7 @@ void FairMQChannel::UpdateTransport(const string& transport) unique_lock lock(fChannelMutex); fIsValid = false; fTransport = transport; + fModified = true; } catch (exception& e) { @@ -327,6 +337,7 @@ void FairMQChannel::UpdateSndBufSize(const int sndBufSize) unique_lock lock(fChannelMutex); fIsValid = false; fSndBufSize = sndBufSize; + fModified = true; } catch (exception& e) { @@ -342,6 +353,7 @@ void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) unique_lock lock(fChannelMutex); fIsValid = false; fRcvBufSize = rcvBufSize; + fModified = true; } catch (exception& e) { @@ -357,6 +369,7 @@ void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) unique_lock lock(fChannelMutex); fIsValid = false; fSndKernelSize = sndKernelSize; + fModified = true; } catch (exception& e) { @@ -372,6 +385,7 @@ void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) unique_lock lock(fChannelMutex); fIsValid = false; fRcvKernelSize = rcvKernelSize; + fModified = true; } catch (exception& e) { @@ -387,6 +401,7 @@ void FairMQChannel::UpdateRateLogging(const int rateLogging) unique_lock lock(fChannelMutex); fIsValid = false; fRateLogging = rateLogging; + fModified = true; } catch (exception& e) { diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 38578e1d..57723d43 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -244,7 +244,6 @@ class FairMQChannel std::string fName; std::atomic fIsValid; - FairMQPollerPtr fPoller; FairMQSocketPtr fChannelCmdSocket; @@ -270,6 +269,8 @@ class FairMQChannel static std::atomic fInterrupted; bool fMultipart; + bool fModified; + bool fReset; }; #endif /* FAIRMQCHANNEL_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 69e946e9..d507afcd 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -129,6 +129,7 @@ void FairMQDevice::AttachChannels(list& chans) if (AttachChannel(**itr)) { (*itr)->InitCommandInterface(); + (*itr)->fModified = false; chans.erase(itr++); } else @@ -177,40 +178,53 @@ void FairMQDevice::InitWrapper() { for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi) { - // set channel name: name + vector index - stringstream ss; - ss << mi->first << "[" << vi - (mi->second).begin() << "]"; - vi->fName = ss.str(); - - if (vi->fMethod == "bind") + if (vi->fModified) { - // if binding address is not specified, try getting it from the configured network interface - if (vi->fAddress == "unspecified" || vi->fAddress == "") + if (vi->fReset) { - // if the configured network interface is default, get its name from the default route - if (fNetworkInterface == "default") - { - fNetworkInterface = FairMQ::tools::getDefaultRouteNetworkInterface(); - } - vi->fAddress = "tcp://" + FairMQ::tools::getInterfaceIP(fNetworkInterface) + ":1"; + vi->fSocket->Close(); + vi->fSocket = nullptr; + + vi->fPoller = nullptr; + + vi->fChannelCmdSocket->Close(); + vi->fChannelCmdSocket = nullptr; + } + // set channel name: name + vector index + stringstream ss; + ss << mi->first << "[" << vi - (mi->second).begin() << "]"; + vi->fName = ss.str(); + + if (vi->fMethod == "bind") + { + // if binding address is not specified, try getting it from the configured network interface + if (vi->fAddress == "unspecified" || vi->fAddress == "") + { + // if the configured network interface is default, get its name from the default route + if (fNetworkInterface == "default") + { + fNetworkInterface = FairMQ::tools::getDefaultRouteNetworkInterface(); + } + vi->fAddress = "tcp://" + FairMQ::tools::getInterfaceIP(fNetworkInterface) + ":1"; + } + // fill the uninitialized list + uninitializedBindingChannels.push_back(&(*vi)); + } + else if (vi->fMethod == "connect") + { + // fill the uninitialized list + uninitializedConnectingChannels.push_back(&(*vi)); + } + else if (vi->fAddress.find_first_of("@+>") != string::npos) + { + // fill the uninitialized list + uninitializedConnectingChannels.push_back(&(*vi)); + } + else + { + LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified."; + exit(EXIT_FAILURE); } - // fill the uninitialized list - uninitializedBindingChannels.push_back(&(*vi)); - } - else if (vi->fMethod == "connect") - { - // fill the uninitialized list - uninitializedConnectingChannels.push_back(&(*vi)); - } - else if (vi->fAddress.find_first_of("@+>") != string::npos) - { - // fill the uninitialized list - uninitializedConnectingChannels.push_back(&(*vi)); - } - else - { - LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified."; - exit(EXIT_FAILURE); } } } @@ -1180,13 +1194,14 @@ void FairMQDevice::Reset() // iterate over the channels vector for (auto& vi : mi.second) { - vi.fSocket->Close(); - vi.fSocket = nullptr; + vi.fReset = true; + // vi.fSocket->Close(); + // vi.fSocket = nullptr; - vi.fPoller = nullptr; + // vi.fPoller = nullptr; - vi.fChannelCmdSocket->Close(); - vi.fChannelCmdSocket = nullptr; + // vi.fChannelCmdSocket->Close(); + // vi.fChannelCmdSocket = nullptr; } } } @@ -1207,18 +1222,24 @@ void FairMQDevice::Exit() LOG(DEBUG) << "Closing sockets..."; // iterate over the channels - for (const auto& c : fChannels) + for (auto& c : fChannels) { // iterate over the sub-channels - for (const auto& sc : c.second) + for (auto& sc : c.second) { if (sc.fSocket) { sc.fSocket->Close(); + sc.fSocket = nullptr; } if (sc.fChannelCmdSocket) { sc.fChannelCmdSocket->Close(); + sc.fChannelCmdSocket = nullptr; + } + if (sc.fPoller) + { + sc.fPoller = nullptr; } } }