diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index d84f550e..b604bf7b 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -660,9 +660,9 @@ void FairMQChannel::InitTransport(shared_ptr factory) fTransportType = factory->GetType(); } -bool FairMQChannel::InitCommandInterface() +bool FairMQChannel::InitCommandInterface(FairMQSocketPtr channelCmdSocket) { - fChannelCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands"); + fChannelCmdSocket = std::move(channelCmdSocket); if (fChannelCmdSocket) { fChannelCmdSocket->Connect("inproc://commands"); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 5631d016..a522b0fb 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -322,7 +322,7 @@ class FairMQChannel bool CheckCompatibility(std::vector>& msgVec) const; void InitTransport(std::shared_ptr factory); - bool InitCommandInterface(); + bool InitCommandInterface(FairMQSocketPtr channelCmdSocket); bool HandleUnblock() const; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 38a2f2ae..f006f172 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -48,7 +48,7 @@ FairMQDevice::FairMQDevice() , fDefaultTransport() , fInitializationTimeoutInS(120) , fDataCallbacks(false) - , fDeviceCmdSockets() + , fDeviceCmdSocket(nullptr) , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() @@ -79,7 +79,7 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) , fDefaultTransport() , fInitializationTimeoutInS(120) , fDataCallbacks(false) - , fDeviceCmdSockets() + , fDeviceCmdSocket(nullptr) , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() @@ -102,21 +102,6 @@ void FairMQDevice::InitWrapper() exit(EXIT_FAILURE); } - if (fDeviceCmdSockets.empty()) - { - auto p = fDeviceCmdSockets.emplace(fTransportFactory->GetType(), fTransportFactory->CreateSocket("pub", "device-commands")); - if (p.second) - { - p.first->second->Bind("inproc://commands"); - } - else - { - exit(EXIT_FAILURE); - } - - FairMQMessagePtr msg(fTransportFactory->CreateMessage()); - } - // Containers to store the uninitialized channels. vector uninitializedBindingChannels; vector uninitializedConnectingChannels; @@ -248,7 +233,7 @@ void FairMQDevice::AttachChannels(vector& chans) { if (AttachChannel(**itr)) { - (*itr)->InitCommandInterface(); + (*itr)->InitCommandInterface(Transport()->CreateSocket("sub", "device-commands")); (*itr)->SetModified(false); itr = chans.erase(itr); } @@ -476,10 +461,7 @@ void FairMQDevice::RunWrapper() // notify channels to resume transfers FairMQChannel::fInterrupted = false; - for (auto& kv : fDeviceCmdSockets) - { - kv.second->Resume(); - } + fDeviceCmdSocket->Resume(); try { @@ -780,18 +762,12 @@ shared_ptr FairMQDevice::AddTransport(const string& tran pair> trPair(FairMQ::TransportTypes.at(transport), tr); fTransports.insert(trPair); - auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands")); - if (p.second) - { - p.first->second->Bind("inproc://commands"); + if (!fDeviceCmdSocket) { + fDeviceCmdSocket = Transport()->CreateSocket("pub", "device-commands"); + if(!fDeviceCmdSocket->Bind("inproc://commands")) { + exit(EXIT_FAILURE); + } } - else - { - exit(EXIT_FAILURE); - } - - FairMQMessagePtr msg(tr->CreateMessage()); - return tr; } else @@ -974,12 +950,9 @@ void FairMQDevice::LogSocketRates() void FairMQDevice::Unblock() { FairMQChannel::fInterrupted = true; - for (auto& kv : fDeviceCmdSockets) - { - kv.second->Interrupt(); - FairMQMessagePtr cmd(fTransports.at(kv.first)->CreateMessage()); - kv.second->Send(cmd); - } + fDeviceCmdSocket->Interrupt(); + FairMQMessagePtr cmd(Transport()->CreateMessage()); + fDeviceCmdSocket->Send(cmd); } void FairMQDevice::ResetTaskWrapper() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index dcefb31d..856efe75 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -194,7 +194,7 @@ class FairMQDevice : public FairMQStateMachine /// @brief Getter for default transport factory auto Transport() const -> const FairMQTransportFactory* { - return fTransports.cbegin()->second.get(); + return fTransports.at(fair::mq::TransportTypes[GetDefaultTransport()]).get(); } template @@ -517,7 +517,7 @@ class FairMQDevice : public FairMQStateMachine void CreateOwnConfig(); bool fDataCallbacks; - std::unordered_map fDeviceCmdSockets; ///< Sockets used for the internal unblocking mechanism + FairMQSocketPtr fDeviceCmdSocket; ///< Socket used for the internal unblocking mechanism std::unordered_map fMsgInputs; std::unordered_map fMultipartInputs; std::unordered_map> fMultitransportInputs;