From e73fcbd595023d68d3c11e7ee5011aeced244b43 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Mon, 19 Feb 2018 18:44:08 +0100 Subject: [PATCH] FairMQ: Parameterize the command interface initializing with sub socket ! THIS PATCH BREAKS NANOMSG TRANSPORT ! The subscriber command socket was created using the transport factory of the channel which might not implement sub sockets. This patch creates the subscriber command sockets in the device initialization and passes them down (move) to the command interface initialization. This patch puts more focus on the GetSocket interface of FairMQSocket, because all command sockets are now implemented with the default transport - the channels use an internal poller which polls over sockets of potentially different transports now (e.g. zeromq command socket and nanomsg data socket). Basically, all transports need to return file descriptors compatible to be used in a single poll set. THIS IS NOT THE CASE! ! THIS PATCH BREAKS NANOMSG TRANSPORT ! --- fairmq/FairMQChannel.cxx | 4 ++-- fairmq/FairMQChannel.h | 2 +- fairmq/FairMQDevice.cxx | 51 ++++++++++------------------------------ fairmq/FairMQDevice.h | 4 ++-- 4 files changed, 17 insertions(+), 44 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index d84f550e..b604bf7b 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -660,9 +660,9 @@ void FairMQChannel::InitTransport(shared_ptr factory) fTransportType = factory->GetType(); } -bool FairMQChannel::InitCommandInterface() +bool FairMQChannel::InitCommandInterface(FairMQSocketPtr channelCmdSocket) { - fChannelCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands"); + fChannelCmdSocket = std::move(channelCmdSocket); if (fChannelCmdSocket) { fChannelCmdSocket->Connect("inproc://commands"); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 5631d016..a522b0fb 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -322,7 +322,7 @@ class FairMQChannel bool CheckCompatibility(std::vector>& msgVec) const; void InitTransport(std::shared_ptr factory); - bool InitCommandInterface(); + bool InitCommandInterface(FairMQSocketPtr channelCmdSocket); bool HandleUnblock() const; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 38a2f2ae..f006f172 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -48,7 +48,7 @@ FairMQDevice::FairMQDevice() , fDefaultTransport() , fInitializationTimeoutInS(120) , fDataCallbacks(false) - , fDeviceCmdSockets() + , fDeviceCmdSocket(nullptr) , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() @@ -79,7 +79,7 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) , fDefaultTransport() , fInitializationTimeoutInS(120) , fDataCallbacks(false) - , fDeviceCmdSockets() + , fDeviceCmdSocket(nullptr) , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() @@ -102,21 +102,6 @@ void FairMQDevice::InitWrapper() exit(EXIT_FAILURE); } - if (fDeviceCmdSockets.empty()) - { - auto p = fDeviceCmdSockets.emplace(fTransportFactory->GetType(), fTransportFactory->CreateSocket("pub", "device-commands")); - if (p.second) - { - p.first->second->Bind("inproc://commands"); - } - else - { - exit(EXIT_FAILURE); - } - - FairMQMessagePtr msg(fTransportFactory->CreateMessage()); - } - // Containers to store the uninitialized channels. vector uninitializedBindingChannels; vector uninitializedConnectingChannels; @@ -248,7 +233,7 @@ void FairMQDevice::AttachChannels(vector& chans) { if (AttachChannel(**itr)) { - (*itr)->InitCommandInterface(); + (*itr)->InitCommandInterface(Transport()->CreateSocket("sub", "device-commands")); (*itr)->SetModified(false); itr = chans.erase(itr); } @@ -476,10 +461,7 @@ void FairMQDevice::RunWrapper() // notify channels to resume transfers FairMQChannel::fInterrupted = false; - for (auto& kv : fDeviceCmdSockets) - { - kv.second->Resume(); - } + fDeviceCmdSocket->Resume(); try { @@ -780,18 +762,12 @@ shared_ptr FairMQDevice::AddTransport(const string& tran pair> trPair(FairMQ::TransportTypes.at(transport), tr); fTransports.insert(trPair); - auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands")); - if (p.second) - { - p.first->second->Bind("inproc://commands"); + if (!fDeviceCmdSocket) { + fDeviceCmdSocket = Transport()->CreateSocket("pub", "device-commands"); + if(!fDeviceCmdSocket->Bind("inproc://commands")) { + exit(EXIT_FAILURE); + } } - else - { - exit(EXIT_FAILURE); - } - - FairMQMessagePtr msg(tr->CreateMessage()); - return tr; } else @@ -974,12 +950,9 @@ void FairMQDevice::LogSocketRates() void FairMQDevice::Unblock() { FairMQChannel::fInterrupted = true; - for (auto& kv : fDeviceCmdSockets) - { - kv.second->Interrupt(); - FairMQMessagePtr cmd(fTransports.at(kv.first)->CreateMessage()); - kv.second->Send(cmd); - } + fDeviceCmdSocket->Interrupt(); + FairMQMessagePtr cmd(Transport()->CreateMessage()); + fDeviceCmdSocket->Send(cmd); } void FairMQDevice::ResetTaskWrapper() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index dcefb31d..856efe75 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -194,7 +194,7 @@ class FairMQDevice : public FairMQStateMachine /// @brief Getter for default transport factory auto Transport() const -> const FairMQTransportFactory* { - return fTransports.cbegin()->second.get(); + return fTransports.at(fair::mq::TransportTypes[GetDefaultTransport()]).get(); } template @@ -517,7 +517,7 @@ class FairMQDevice : public FairMQStateMachine void CreateOwnConfig(); bool fDataCallbacks; - std::unordered_map fDeviceCmdSockets; ///< Sockets used for the internal unblocking mechanism + FairMQSocketPtr fDeviceCmdSocket; ///< Socket used for the internal unblocking mechanism std::unordered_map fMsgInputs; std::unordered_map fMultipartInputs; std::unordered_map> fMultitransportInputs;