mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Used cached default transport in FairMQDevice::Transport()
This commit is contained in:
parent
9906475b6f
commit
155618af57
|
@ -28,7 +28,7 @@ FairMQChannel::FairMQChannel()
|
||||||
, fType("unspecified")
|
, fType("unspecified")
|
||||||
, fMethod("unspecified")
|
, fMethod("unspecified")
|
||||||
, fAddress("unspecified")
|
, fAddress("unspecified")
|
||||||
, fTransport("default")
|
, fTransportName("default")
|
||||||
, fSndBufSize(1000)
|
, fSndBufSize(1000)
|
||||||
, fRcvBufSize(1000)
|
, fRcvBufSize(1000)
|
||||||
, fSndKernelSize(0)
|
, fSndKernelSize(0)
|
||||||
|
@ -49,7 +49,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
|
||||||
, fType(type)
|
, fType(type)
|
||||||
, fMethod(method)
|
, fMethod(method)
|
||||||
, fAddress(address)
|
, fAddress(address)
|
||||||
, fTransport("default")
|
, fTransportName("default")
|
||||||
, fSndBufSize(1000)
|
, fSndBufSize(1000)
|
||||||
, fRcvBufSize(1000)
|
, fRcvBufSize(1000)
|
||||||
, fSndKernelSize(0)
|
, fSndKernelSize(0)
|
||||||
|
@ -70,7 +70,7 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared
|
||||||
, fType(type)
|
, fType(type)
|
||||||
, fMethod("unspecified")
|
, fMethod("unspecified")
|
||||||
, fAddress("unspecified")
|
, fAddress("unspecified")
|
||||||
, fTransport("default") // TODO refactor, either use string representation or enum type
|
, fTransportName("default") // TODO refactor, either use string representation or enum type
|
||||||
, fSndBufSize(1000)
|
, fSndBufSize(1000)
|
||||||
, fRcvBufSize(1000)
|
, fRcvBufSize(1000)
|
||||||
, fSndKernelSize(0)
|
, fSndKernelSize(0)
|
||||||
|
@ -91,7 +91,7 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
|
||||||
, fType(chan.fType)
|
, fType(chan.fType)
|
||||||
, fMethod(chan.fMethod)
|
, fMethod(chan.fMethod)
|
||||||
, fAddress(chan.fAddress)
|
, fAddress(chan.fAddress)
|
||||||
, fTransport(chan.fTransport)
|
, fTransportName(chan.fTransportName)
|
||||||
, fSndBufSize(chan.fSndBufSize)
|
, fSndBufSize(chan.fSndBufSize)
|
||||||
, fRcvBufSize(chan.fRcvBufSize)
|
, fRcvBufSize(chan.fRcvBufSize)
|
||||||
, fSndKernelSize(chan.fSndKernelSize)
|
, fSndKernelSize(chan.fSndKernelSize)
|
||||||
|
@ -111,7 +111,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
|
||||||
fType = chan.fType;
|
fType = chan.fType;
|
||||||
fMethod = chan.fMethod;
|
fMethod = chan.fMethod;
|
||||||
fAddress = chan.fAddress;
|
fAddress = chan.fAddress;
|
||||||
fTransport = chan.fTransport;
|
fTransportName = chan.fTransportName;
|
||||||
fSndBufSize = chan.fSndBufSize;
|
fSndBufSize = chan.fSndBufSize;
|
||||||
fRcvBufSize = chan.fRcvBufSize;
|
fRcvBufSize = chan.fRcvBufSize;
|
||||||
fSndKernelSize = chan.fSndKernelSize;
|
fSndKernelSize = chan.fSndKernelSize;
|
||||||
|
@ -194,16 +194,16 @@ string FairMQChannel::GetAddress() const
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
string FairMQChannel::GetTransport() const
|
string FairMQChannel::GetTransportName() const
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
unique_lock<mutex> lock(fChannelMutex);
|
unique_lock<mutex> lock(fChannelMutex);
|
||||||
return fTransport;
|
return fTransportName;
|
||||||
}
|
}
|
||||||
catch (exception& e)
|
catch (exception& e)
|
||||||
{
|
{
|
||||||
LOG(error) << "Exception caught in FairMQChannel::GetTransport: " << e.what();
|
LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what();
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -332,7 +332,7 @@ void FairMQChannel::UpdateTransport(const string& transport)
|
||||||
{
|
{
|
||||||
unique_lock<mutex> lock(fChannelMutex);
|
unique_lock<mutex> lock(fChannelMutex);
|
||||||
fIsValid = false;
|
fIsValid = false;
|
||||||
fTransport = transport;
|
fTransportName = transport;
|
||||||
fModified = true;
|
fModified = true;
|
||||||
}
|
}
|
||||||
catch (exception& e)
|
catch (exception& e)
|
||||||
|
@ -587,13 +587,11 @@ bool FairMQChannel::ValidateChannel()
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate channel transport
|
// validate channel transport
|
||||||
// const string channelTransportNames[] = { "default", "zeromq", "nanomsg", "shmem" };
|
if (FairMQ::TransportTypes.find(fTransportName) == FairMQ::TransportTypes.end())
|
||||||
// const set<string> channelTransports(channelTransportNames, channelTransportNames + sizeof(channelTransportNames) / sizeof(string));
|
|
||||||
if (FairMQ::TransportTypes.find(fTransport) == FairMQ::TransportTypes.end())
|
|
||||||
{
|
{
|
||||||
ss << "INVALID";
|
ss << "INVALID";
|
||||||
LOG(debug) << ss.str();
|
LOG(debug) << ss.str();
|
||||||
LOG(error) << "Invalid channel transport: \"" << fTransport << "\"";
|
LOG(error) << "Invalid channel transport: \"" << fTransportName << "\"";
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -92,7 +92,7 @@ class FairMQChannel
|
||||||
|
|
||||||
/// Get channel transport ("default", "zeromq", "nanomsg" or "shmem")
|
/// Get channel transport ("default", "zeromq", "nanomsg" or "shmem")
|
||||||
/// @return Returns channel transport (e.g. "default", "zeromq", "nanomsg" or "shmem")
|
/// @return Returns channel transport (e.g. "default", "zeromq", "nanomsg" or "shmem")
|
||||||
std::string GetTransport() const;
|
std::string GetTransportName() const;
|
||||||
|
|
||||||
/// Get socket send buffer size (in number of messages)
|
/// Get socket send buffer size (in number of messages)
|
||||||
/// @return Returns socket send buffer size (in number of messages)
|
/// @return Returns socket send buffer size (in number of messages)
|
||||||
|
@ -301,7 +301,7 @@ class FairMQChannel
|
||||||
std::string fType;
|
std::string fType;
|
||||||
std::string fMethod;
|
std::string fMethod;
|
||||||
std::string fAddress;
|
std::string fAddress;
|
||||||
std::string fTransport;
|
std::string fTransportName;
|
||||||
int fSndBufSize;
|
int fSndBufSize;
|
||||||
int fRcvBufSize;
|
int fRcvBufSize;
|
||||||
int fSndKernelSize;
|
int fSndKernelSize;
|
||||||
|
|
|
@ -42,7 +42,7 @@ FairMQDevice::FairMQDevice()
|
||||||
, fPortRangeMin(22000)
|
, fPortRangeMin(22000)
|
||||||
, fPortRangeMax(32000)
|
, fPortRangeMax(32000)
|
||||||
, fNetworkInterface()
|
, fNetworkInterface()
|
||||||
, fDefaultTransport("default")
|
, fDefaultTransportName("default")
|
||||||
, fInitializationTimeoutInS(120)
|
, fInitializationTimeoutInS(120)
|
||||||
, fDataCallbacks(false)
|
, fDataCallbacks(false)
|
||||||
, fMsgInputs()
|
, fMsgInputs()
|
||||||
|
@ -72,7 +72,7 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version)
|
||||||
, fPortRangeMin(22000)
|
, fPortRangeMin(22000)
|
||||||
, fPortRangeMax(32000)
|
, fPortRangeMax(32000)
|
||||||
, fNetworkInterface()
|
, fNetworkInterface()
|
||||||
, fDefaultTransport("default")
|
, fDefaultTransportName("default")
|
||||||
, fInitializationTimeoutInS(120)
|
, fInitializationTimeoutInS(120)
|
||||||
, fDataCallbacks(false)
|
, fDataCallbacks(false)
|
||||||
, fMsgInputs()
|
, fMsgInputs()
|
||||||
|
@ -246,15 +246,15 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
|
||||||
{
|
{
|
||||||
if (!ch.fTransportFactory)
|
if (!ch.fTransportFactory)
|
||||||
{
|
{
|
||||||
if (ch.fTransport == "default" || ch.fTransport == fDefaultTransport)
|
if (ch.fTransportName == "default" || ch.fTransportName == fDefaultTransportName)
|
||||||
{
|
{
|
||||||
LOG(debug) << ch.fName << ": using default transport";
|
LOG(debug) << ch.fName << ": using default transport";
|
||||||
ch.InitTransport(fTransportFactory);
|
ch.InitTransport(fTransportFactory);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG(debug) << ch.fName << ": channel transport (" << fDefaultTransport << ") overriden to " << ch.fTransport;
|
LOG(debug) << ch.fName << ": channel transport (" << fDefaultTransportName << ") overriden to " << ch.fTransportName;
|
||||||
ch.InitTransport(AddTransport(ch.fTransport));
|
ch.InitTransport(AddTransport(ch.fTransportName));
|
||||||
}
|
}
|
||||||
ch.fTransportType = ch.fTransportFactory->GetType();
|
ch.fTransportType = ch.fTransportFactory->GetType();
|
||||||
}
|
}
|
||||||
|
@ -804,7 +804,7 @@ void FairMQDevice::CreateOwnConfig()
|
||||||
fNumIoThreads = fConfig->GetValue<int>("io-threads");
|
fNumIoThreads = fConfig->GetValue<int>("io-threads");
|
||||||
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
|
fInitializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
|
||||||
fRate = fConfig->GetValue<float>("rate");
|
fRate = fConfig->GetValue<float>("rate");
|
||||||
fDefaultTransport = fConfig->GetValue<string>("transport");
|
fDefaultTransportName = fConfig->GetValue<string>("transport");
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::SetTransport(const string& transport)
|
void FairMQDevice::SetTransport(const string& transport)
|
||||||
|
@ -844,8 +844,8 @@ void FairMQDevice::SetConfig(FairMQProgOptions& config)
|
||||||
fNumIoThreads = config.GetValue<int>("io-threads");
|
fNumIoThreads = config.GetValue<int>("io-threads");
|
||||||
fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout");
|
fInitializationTimeoutInS = config.GetValue<int>("initialization-timeout");
|
||||||
fRate = fConfig->GetValue<float>("rate");
|
fRate = fConfig->GetValue<float>("rate");
|
||||||
fDefaultTransport = config.GetValue<string>("transport");
|
fDefaultTransportName = config.GetValue<string>("transport");
|
||||||
SetTransport(fDefaultTransport);
|
SetTransport(fDefaultTransportName);
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::LogSocketRates()
|
void FairMQDevice::LogSocketRates()
|
||||||
|
|
|
@ -196,7 +196,7 @@ class FairMQDevice : public FairMQStateMachine
|
||||||
/// @brief Getter for default transport factory
|
/// @brief Getter for default transport factory
|
||||||
auto Transport() const -> const FairMQTransportFactory*
|
auto Transport() const -> const FairMQTransportFactory*
|
||||||
{
|
{
|
||||||
return fTransports.at(fair::mq::TransportTypes[GetDefaultTransport()]).get();
|
return fTransportFactory.get();;
|
||||||
}
|
}
|
||||||
|
|
||||||
template<typename... Args>
|
template<typename... Args>
|
||||||
|
@ -407,8 +407,8 @@ class FairMQDevice : public FairMQStateMachine
|
||||||
void SetNetworkInterface(const std::string& networkInterface) { fNetworkInterface = networkInterface; }
|
void SetNetworkInterface(const std::string& networkInterface) { fNetworkInterface = networkInterface; }
|
||||||
std::string GetNetworkInterface() const { return fNetworkInterface; }
|
std::string GetNetworkInterface() const { return fNetworkInterface; }
|
||||||
|
|
||||||
void SetDefaultTransport(const std::string& defaultTransport) { fDefaultTransport = defaultTransport; }
|
void SetDefaultTransportName(const std::string& defaultTransportName) { fDefaultTransportName = defaultTransportName; }
|
||||||
std::string GetDefaultTransport() const { return fDefaultTransport; }
|
std::string GetDefaultTransportName() const { return fDefaultTransportName; }
|
||||||
|
|
||||||
void SetInitializationTimeoutInS(int initializationTimeoutInS) { fInitializationTimeoutInS = initializationTimeoutInS; }
|
void SetInitializationTimeoutInS(int initializationTimeoutInS) { fInitializationTimeoutInS = initializationTimeoutInS; }
|
||||||
int GetInitializationTimeoutInS() const { return fInitializationTimeoutInS; }
|
int GetInitializationTimeoutInS() const { return fInitializationTimeoutInS; }
|
||||||
|
@ -472,7 +472,7 @@ class FairMQDevice : public FairMQStateMachine
|
||||||
int fPortRangeMax; ///< Maximum value for the port range (if dynamic)
|
int fPortRangeMax; ///< Maximum value for the port range (if dynamic)
|
||||||
|
|
||||||
std::string fNetworkInterface; ///< Network interface to use for dynamic binding
|
std::string fNetworkInterface; ///< Network interface to use for dynamic binding
|
||||||
std::string fDefaultTransport; ///< Default transport for the device
|
std::string fDefaultTransportName; ///< Default transport for the device
|
||||||
|
|
||||||
int fInitializationTimeoutInS; ///< Timeout for the initialization (in seconds)
|
int fInitializationTimeoutInS; ///< Timeout for the initialization (in seconds)
|
||||||
|
|
||||||
|
|
|
@ -147,7 +147,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
|
||||||
commonChannel.UpdateType(q.second.get<string>("type", commonChannel.GetType()));
|
commonChannel.UpdateType(q.second.get<string>("type", commonChannel.GetType()));
|
||||||
commonChannel.UpdateMethod(q.second.get<string>("method", commonChannel.GetMethod()));
|
commonChannel.UpdateMethod(q.second.get<string>("method", commonChannel.GetMethod()));
|
||||||
commonChannel.UpdateAddress(q.second.get<string>("address", commonChannel.GetAddress()));
|
commonChannel.UpdateAddress(q.second.get<string>("address", commonChannel.GetAddress()));
|
||||||
commonChannel.UpdateTransport(q.second.get<string>("transport", commonChannel.GetTransport()));
|
commonChannel.UpdateTransport(q.second.get<string>("transport", commonChannel.GetTransportName()));
|
||||||
commonChannel.UpdateSndBufSize(q.second.get<int>("sndBufSize", commonChannel.GetSndBufSize()));
|
commonChannel.UpdateSndBufSize(q.second.get<int>("sndBufSize", commonChannel.GetSndBufSize()));
|
||||||
commonChannel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", commonChannel.GetRcvBufSize()));
|
commonChannel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", commonChannel.GetRcvBufSize()));
|
||||||
commonChannel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize()));
|
commonChannel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize()));
|
||||||
|
@ -166,7 +166,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa
|
||||||
LOG(debug) << "\ttype = " << commonChannel.GetType();
|
LOG(debug) << "\ttype = " << commonChannel.GetType();
|
||||||
LOG(debug) << "\tmethod = " << commonChannel.GetMethod();
|
LOG(debug) << "\tmethod = " << commonChannel.GetMethod();
|
||||||
LOG(debug) << "\taddress = " << commonChannel.GetAddress();
|
LOG(debug) << "\taddress = " << commonChannel.GetAddress();
|
||||||
LOG(debug) << "\ttransport = " << commonChannel.GetTransport();
|
LOG(debug) << "\ttransport = " << commonChannel.GetTransportName();
|
||||||
LOG(debug) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
|
LOG(debug) << "\tsndBufSize = " << commonChannel.GetSndBufSize();
|
||||||
LOG(debug) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
|
LOG(debug) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
|
||||||
LOG(debug) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
|
LOG(debug) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
|
||||||
|
@ -208,7 +208,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
|
||||||
channel.UpdateType(q.second.get<string>("type", channel.GetType()));
|
channel.UpdateType(q.second.get<string>("type", channel.GetType()));
|
||||||
channel.UpdateMethod(q.second.get<string>("method", channel.GetMethod()));
|
channel.UpdateMethod(q.second.get<string>("method", channel.GetMethod()));
|
||||||
channel.UpdateAddress(q.second.get<string>("address", channel.GetAddress()));
|
channel.UpdateAddress(q.second.get<string>("address", channel.GetAddress()));
|
||||||
channel.UpdateTransport(q.second.get<string>("transport", channel.GetTransport()));
|
channel.UpdateTransport(q.second.get<string>("transport", channel.GetTransportName()));
|
||||||
channel.UpdateSndBufSize(q.second.get<int>("sndBufSize", channel.GetSndBufSize()));
|
channel.UpdateSndBufSize(q.second.get<int>("sndBufSize", channel.GetSndBufSize()));
|
||||||
channel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", channel.GetRcvBufSize()));
|
channel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", channel.GetRcvBufSize()));
|
||||||
channel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", channel.GetSndKernelSize()));
|
channel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", channel.GetSndKernelSize()));
|
||||||
|
@ -219,7 +219,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
|
||||||
LOG(debug) << "\ttype = " << channel.GetType();
|
LOG(debug) << "\ttype = " << channel.GetType();
|
||||||
LOG(debug) << "\tmethod = " << channel.GetMethod();
|
LOG(debug) << "\tmethod = " << channel.GetMethod();
|
||||||
LOG(debug) << "\taddress = " << channel.GetAddress();
|
LOG(debug) << "\taddress = " << channel.GetAddress();
|
||||||
LOG(debug) << "\ttransport = " << channel.GetTransport();
|
LOG(debug) << "\ttransport = " << channel.GetTransportName();
|
||||||
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
|
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
|
||||||
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
|
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
|
||||||
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
|
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
|
||||||
|
@ -247,7 +247,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
|
||||||
LOG(debug) << "\ttype = " << channel.GetType();
|
LOG(debug) << "\ttype = " << channel.GetType();
|
||||||
LOG(debug) << "\tmethod = " << channel.GetMethod();
|
LOG(debug) << "\tmethod = " << channel.GetMethod();
|
||||||
LOG(debug) << "\taddress = " << channel.GetAddress();
|
LOG(debug) << "\taddress = " << channel.GetAddress();
|
||||||
LOG(debug) << "\ttransport = " << channel.GetTransport();
|
LOG(debug) << "\ttransport = " << channel.GetTransportName();
|
||||||
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
|
LOG(debug) << "\tsndBufSize = " << channel.GetSndBufSize();
|
||||||
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
|
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
|
||||||
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
|
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
|
||||||
|
|
|
@ -196,7 +196,7 @@ void FairMQProgOptions::UpdateMQValues()
|
||||||
UpdateVarMap<string>(typeKey, channel.GetType());
|
UpdateVarMap<string>(typeKey, channel.GetType());
|
||||||
UpdateVarMap<string>(methodKey, channel.GetMethod());
|
UpdateVarMap<string>(methodKey, channel.GetMethod());
|
||||||
UpdateVarMap<string>(addressKey, channel.GetAddress());
|
UpdateVarMap<string>(addressKey, channel.GetAddress());
|
||||||
UpdateVarMap<string>(transportKey, channel.GetTransport());
|
UpdateVarMap<string>(transportKey, channel.GetTransportName());
|
||||||
UpdateVarMap<int>(sndBufSizeKey, channel.GetSndBufSize());
|
UpdateVarMap<int>(sndBufSizeKey, channel.GetSndBufSize());
|
||||||
UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize());
|
UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize());
|
||||||
UpdateVarMap<int>(sndKernelSizeKey, channel.GetSndKernelSize());
|
UpdateVarMap<int>(sndKernelSizeKey, channel.GetSndKernelSize());
|
||||||
|
|
Loading…
Reference in New Issue
Block a user