FairMQChannel: defaults values, copy-ability

This commit is contained in:
Alexey Rybalchenko 2019-05-20 14:26:29 +02:00 committed by Dennis Klein
parent 4ce378b6b8
commit fbb003b50f
3 changed files with 116 additions and 74 deletions

View File

@ -16,47 +16,55 @@
using namespace std; using namespace std;
mutex FairMQChannel::fChannelMutex;
FairMQChannel::FairMQChannel() FairMQChannel::FairMQChannel()
: FairMQChannel("", "unspecified", "unspecified", "unspecified", nullptr) : FairMQChannel(DefaultName, DefaultType, DefaultMethod, DefaultAddress, nullptr)
{}
FairMQChannel::FairMQChannel(const string& name)
: FairMQChannel(name, DefaultType, DefaultMethod, DefaultAddress, nullptr)
{} {}
FairMQChannel::FairMQChannel(const string& type, const string& method, const string& address) FairMQChannel::FairMQChannel(const string& type, const string& method, const string& address)
: FairMQChannel("", type, method, address, nullptr) : FairMQChannel(DefaultName, type, method, address, nullptr)
{} {}
FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr<FairMQTransportFactory> factory) FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr<FairMQTransportFactory> factory)
: FairMQChannel(name, type, "unspecified", "unspecified", factory) : FairMQChannel(name, type, DefaultMethod, DefaultAddress, factory)
{} {}
FairMQChannel::FairMQChannel(const string& name, const string& type, const string& method, const string& address, shared_ptr<FairMQTransportFactory> factory) FairMQChannel::FairMQChannel(const string& name, const string& type, const string& method, const string& address, shared_ptr<FairMQTransportFactory> factory)
: fTransportFactory(factory) : fTransportFactory(factory)
, fTransportType(factory ? factory->GetType() : fair::mq::Transport::DEFAULT) , fTransportType(factory ? factory->GetType() : DefaultTransportType)
, fSocket(factory ? factory->CreateSocket(type, name) : nullptr) , fSocket(factory ? factory->CreateSocket(type, name) : nullptr)
, fName(name)
, fType(type) , fType(type)
, fMethod(method) , fMethod(method)
, fAddress(address) , fAddress(address)
, fSndBufSize(1000) , fSndBufSize(DefaultSndBufSize)
, fRcvBufSize(1000) , fRcvBufSize(DefaultRcvBufSize)
, fSndKernelSize(0) , fSndKernelSize(DefaultSndKernelSize)
, fRcvKernelSize(0) , fRcvKernelSize(DefaultRcvKernelSize)
, fLinger(500) , fLinger(DefaultLinger)
, fRateLogging(1) , fRateLogging(DefaultRateLogging)
, fPortRangeMin(22000) , fPortRangeMin(DefaultPortRangeMin)
, fPortRangeMax(23000) , fPortRangeMax(DefaultPortRangeMax)
, fAutoBind(true) , fAutoBind(DefaultAutoBind)
, fName(name)
, fIsValid(false) , fIsValid(false)
, fMultipart(false) , fMultipart(false)
, fModified(true) , fModified(true)
, fReset(false) , fReset(false)
, fMtx()
{} {}
FairMQChannel::FairMQChannel(const FairMQChannel& chan) FairMQChannel::FairMQChannel(const FairMQChannel& chan)
: FairMQChannel(chan, chan.fName)
{}
FairMQChannel::FairMQChannel(const FairMQChannel& chan, const string& newName)
: fTransportFactory(nullptr) : fTransportFactory(nullptr)
, fTransportType(chan.fTransportType) , fTransportType(chan.fTransportType)
, fSocket(nullptr) , fSocket(nullptr)
, fName(newName)
, fType(chan.fType) , fType(chan.fType)
, fMethod(chan.fMethod) , fMethod(chan.fMethod)
, fAddress(chan.fAddress) , fAddress(chan.fAddress)
@ -69,7 +77,6 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fPortRangeMin(chan.fPortRangeMin) , fPortRangeMin(chan.fPortRangeMin)
, fPortRangeMax(chan.fPortRangeMax) , fPortRangeMax(chan.fPortRangeMax)
, fAutoBind(chan.fAutoBind) , fAutoBind(chan.fAutoBind)
, fName(chan.fName)
, fIsValid(false) , fIsValid(false)
, fMultipart(chan.fMultipart) , fMultipart(chan.fMultipart)
, fModified(chan.fModified) , fModified(chan.fModified)
@ -81,6 +88,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fTransportFactory = nullptr; fTransportFactory = nullptr;
fTransportType = chan.fTransportType; fTransportType = chan.fTransportType;
fSocket = nullptr; fSocket = nullptr;
fName = chan.fName;
fType = chan.fType; fType = chan.fType;
fMethod = chan.fMethod; fMethod = chan.fMethod;
fAddress = chan.fAddress; fAddress = chan.fAddress;
@ -93,7 +101,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fPortRangeMin = chan.fPortRangeMin; fPortRangeMin = chan.fPortRangeMin;
fPortRangeMax = chan.fPortRangeMax; fPortRangeMax = chan.fPortRangeMax;
fAutoBind = chan.fAutoBind; fAutoBind = chan.fAutoBind;
fName = chan.fName;
fIsValid = false; fIsValid = false;
fMultipart = chan.fMultipart; fMultipart = chan.fMultipart;
fModified = chan.fModified; fModified = chan.fModified;
@ -110,13 +117,13 @@ FairMQSocket & FairMQChannel::GetSocket() const
string FairMQChannel::GetName() const string FairMQChannel::GetName() const
{ {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fName; return fName;
} }
string FairMQChannel::GetPrefix() const string FairMQChannel::GetPrefix() const
{ {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
string prefix = fName; string prefix = fName;
prefix = prefix.erase(fName.rfind('[')); prefix = prefix.erase(fName.rfind('['));
return prefix; return prefix;
@ -124,7 +131,7 @@ string FairMQChannel::GetPrefix() const
string FairMQChannel::GetIndex() const string FairMQChannel::GetIndex() const
{ {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
string indexStr = fName; string indexStr = fName;
indexStr.erase(indexStr.rfind(']')); indexStr.erase(indexStr.rfind(']'));
indexStr.erase(0, indexStr.rfind('[') + 1); indexStr.erase(0, indexStr.rfind('[') + 1);
@ -133,7 +140,7 @@ string FairMQChannel::GetIndex() const
string FairMQChannel::GetType() const string FairMQChannel::GetType() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fType; return fType;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what();
@ -142,7 +149,7 @@ try {
string FairMQChannel::GetMethod() const string FairMQChannel::GetMethod() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fMethod; return fMethod;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what();
@ -151,7 +158,7 @@ try {
string FairMQChannel::GetAddress() const string FairMQChannel::GetAddress() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fAddress; return fAddress;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what();
@ -160,16 +167,26 @@ try {
string FairMQChannel::GetTransportName() const string FairMQChannel::GetTransportName() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fair::mq::TransportNames.at(fTransportType); return fair::mq::TransportNames.at(fTransportType);
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
} }
fair::mq::Transport FairMQChannel::GetTransportType() const
try {
lock_guard<mutex> lock(fMtx);
return fTransportType;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetTransportType: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetSndBufSize() const int FairMQChannel::GetSndBufSize() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fSndBufSize; return fSndBufSize;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what();
@ -178,7 +195,7 @@ try {
int FairMQChannel::GetRcvBufSize() const int FairMQChannel::GetRcvBufSize() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fRcvBufSize; return fRcvBufSize;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what();
@ -187,7 +204,7 @@ try {
int FairMQChannel::GetSndKernelSize() const int FairMQChannel::GetSndKernelSize() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fSndKernelSize; return fSndKernelSize;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what();
@ -196,7 +213,7 @@ try {
int FairMQChannel::GetRcvKernelSize() const int FairMQChannel::GetRcvKernelSize() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fRcvKernelSize; return fRcvKernelSize;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what();
@ -205,7 +222,7 @@ try {
int FairMQChannel::GetLinger() const int FairMQChannel::GetLinger() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fLinger; return fLinger;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what();
@ -214,7 +231,7 @@ try {
int FairMQChannel::GetRateLogging() const int FairMQChannel::GetRateLogging() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fRateLogging; return fRateLogging;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what();
@ -223,7 +240,7 @@ try {
int FairMQChannel::GetPortRangeMin() const int FairMQChannel::GetPortRangeMin() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fPortRangeMin; return fPortRangeMin;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what();
@ -232,7 +249,7 @@ try {
int FairMQChannel::GetPortRangeMax() const int FairMQChannel::GetPortRangeMax() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fPortRangeMax; return fPortRangeMax;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what();
@ -241,7 +258,7 @@ try {
bool FairMQChannel::GetAutoBind() const bool FairMQChannel::GetAutoBind() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fAutoBind; return fAutoBind;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what();
@ -250,7 +267,7 @@ try {
void FairMQChannel::UpdateType(const string& type) void FairMQChannel::UpdateType(const string& type)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fType = type; fType = type;
fModified = true; fModified = true;
@ -261,7 +278,7 @@ try {
void FairMQChannel::UpdateMethod(const string& method) void FairMQChannel::UpdateMethod(const string& method)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fMethod = method; fMethod = method;
fModified = true; fModified = true;
@ -272,7 +289,7 @@ try {
void FairMQChannel::UpdateAddress(const string& address) void FairMQChannel::UpdateAddress(const string& address)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fAddress = address; fAddress = address;
fModified = true; fModified = true;
@ -283,7 +300,7 @@ try {
void FairMQChannel::UpdateTransport(const string& transport) void FairMQChannel::UpdateTransport(const string& transport)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fTransportType = fair::mq::TransportTypes.at(transport); fTransportType = fair::mq::TransportTypes.at(transport);
fModified = true; fModified = true;
@ -294,7 +311,7 @@ try {
void FairMQChannel::UpdateSndBufSize(const int sndBufSize) void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fSndBufSize = sndBufSize; fSndBufSize = sndBufSize;
fModified = true; fModified = true;
@ -305,7 +322,7 @@ try {
void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fRcvBufSize = rcvBufSize; fRcvBufSize = rcvBufSize;
fModified = true; fModified = true;
@ -316,7 +333,7 @@ try {
void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fSndKernelSize = sndKernelSize; fSndKernelSize = sndKernelSize;
fModified = true; fModified = true;
@ -327,7 +344,7 @@ try {
void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fRcvKernelSize = rcvKernelSize; fRcvKernelSize = rcvKernelSize;
fModified = true; fModified = true;
@ -338,7 +355,7 @@ try {
void FairMQChannel::UpdateLinger(const int duration) void FairMQChannel::UpdateLinger(const int duration)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fLinger = duration; fLinger = duration;
fModified = true; fModified = true;
@ -349,7 +366,7 @@ try {
void FairMQChannel::UpdateRateLogging(const int rateLogging) void FairMQChannel::UpdateRateLogging(const int rateLogging)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fRateLogging = rateLogging; fRateLogging = rateLogging;
fModified = true; fModified = true;
@ -360,7 +377,7 @@ try {
void FairMQChannel::UpdatePortRangeMin(const int minPort) void FairMQChannel::UpdatePortRangeMin(const int minPort)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fPortRangeMin = minPort; fPortRangeMin = minPort;
fModified = true; fModified = true;
@ -371,7 +388,7 @@ try {
void FairMQChannel::UpdatePortRangeMax(const int maxPort) void FairMQChannel::UpdatePortRangeMax(const int maxPort)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fPortRangeMax = maxPort; fPortRangeMax = maxPort;
fModified = true; fModified = true;
@ -382,7 +399,7 @@ try {
void FairMQChannel::UpdateAutoBind(const bool autobind) void FairMQChannel::UpdateAutoBind(const bool autobind)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fAutoBind = autobind; fAutoBind = autobind;
fModified = true; fModified = true;
@ -393,7 +410,7 @@ try {
auto FairMQChannel::SetModified(const bool modified) -> void auto FairMQChannel::SetModified(const bool modified) -> void
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fModified = modified; fModified = modified;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what();
@ -402,7 +419,7 @@ try {
void FairMQChannel::UpdateName(const string& name) void FairMQChannel::UpdateName(const string& name)
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
fName = name; fName = name;
fModified = true; fModified = true;
@ -413,7 +430,7 @@ try {
bool FairMQChannel::IsValid() const bool FairMQChannel::IsValid() const
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fIsValid; return fIsValid;
} catch (exception& e) { } catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what(); LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what();
@ -422,7 +439,7 @@ try {
bool FairMQChannel::Validate() bool FairMQChannel::Validate()
try { try {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
stringstream ss; stringstream ss;
ss << "Validating channel '" << fName << "'... "; ss << "Validating channel '" << fName << "'... ";
@ -570,7 +587,7 @@ try {
void FairMQChannel::Init() void FairMQChannel::Init()
{ {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fSocket = fTransportFactory->CreateSocket(fType, fName); fSocket = fTransportFactory->CreateSocket(fType, fName);
@ -592,14 +609,14 @@ void FairMQChannel::Init()
bool FairMQChannel::ConnectEndpoint(const string& endpoint) bool FairMQChannel::ConnectEndpoint(const string& endpoint)
{ {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
return fSocket->Connect(endpoint); return fSocket->Connect(endpoint);
} }
bool FairMQChannel::BindEndpoint(string& endpoint) bool FairMQChannel::BindEndpoint(string& endpoint)
{ {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
// try to bind to the configured port. If it fails, try random one (if AutoBind is on). // try to bind to the configured port. If it fails, try random one (if AutoBind is on).
if (fSocket->Bind(endpoint)) { if (fSocket->Bind(endpoint)) {
@ -637,7 +654,7 @@ bool FairMQChannel::BindEndpoint(string& endpoint)
void FairMQChannel::ResetChannel() void FairMQChannel::ResetChannel()
{ {
lock_guard<mutex> lock(fChannelMutex); lock_guard<mutex> lock(fMtx);
fIsValid = false; fIsValid = false;
// TODO: implement channel resetting // TODO: implement channel resetting
} }

View File

@ -32,6 +32,10 @@ class FairMQChannel
/// Default constructor /// Default constructor
FairMQChannel(); FairMQChannel();
/// Constructor
/// @param name Channel name
FairMQChannel(const std::string& name);
/// Constructor /// Constructor
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
/// @param method Socket method (bind/connect) /// @param method Socket method (bind/connect)
@ -55,10 +59,19 @@ class FairMQChannel
/// Copy Constructor /// Copy Constructor
FairMQChannel(const FairMQChannel&); FairMQChannel(const FairMQChannel&);
/// Copy Constructor (with new name)
FairMQChannel(const FairMQChannel&, const std::string& name);
/// Move constructor
FairMQChannel(FairMQChannel&&) = default;
/// Assignment operator /// Assignment operator
FairMQChannel& operator=(const FairMQChannel&); FairMQChannel& operator=(const FairMQChannel&);
/// Default destructor /// Move assignment operator
FairMQChannel& operator=(FairMQChannel&&) = default;
/// Destructor
virtual ~FairMQChannel() {} virtual ~FairMQChannel() {}
struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; }; struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
@ -106,10 +119,14 @@ class FairMQChannel
/// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") /// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
std::string GetAddress() const; std::string GetAddress() const;
/// Get channel transport ("default", "zeromq", "nanomsg" or "shmem") /// Get channel transport name ("default", "zeromq", "nanomsg" or "shmem")
/// @return Returns channel transport (e.g. "default", "zeromq", "nanomsg" or "shmem") /// @return Returns channel transport name (e.g. "default", "zeromq", "nanomsg" or "shmem")
std::string GetTransportName() const; std::string GetTransportName() const;
/// Get channel transport type
/// @return Returns channel transport type
fair::mq::Transport GetTransportType() const;
/// Get socket send buffer size (in number of messages) /// Get socket send buffer size (in number of messages)
/// @return Returns socket send buffer size (in number of messages) /// @return Returns socket send buffer size (in number of messages)
int GetSndBufSize() const; int GetSndBufSize() const;
@ -320,6 +337,7 @@ class FairMQChannel
fair::mq::Transport fTransportType; fair::mq::Transport fTransportType;
std::unique_ptr<FairMQSocket> fSocket; std::unique_ptr<FairMQSocket> fSocket;
std::string fName;
std::string fType; std::string fType;
std::string fMethod; std::string fMethod;
std::string fAddress; std::string fAddress;
@ -333,23 +351,17 @@ class FairMQChannel
int fPortRangeMax; int fPortRangeMax;
bool fAutoBind; bool fAutoBind;
std::string fName; bool fIsValid;
std::atomic<bool> fIsValid;
// use static mutex to make the class easily copyable
// implication: same mutex is used for all instances of the class
// this does not hurt much, because mutex is used only during initialization with very low contention
// possible TODO: improve this
static std::mutex fChannelMutex;
bool fMultipart; bool fMultipart;
bool fModified; bool fModified;
bool fReset; bool fReset;
mutable std::mutex fMtx;
void CheckSendCompatibility(FairMQMessagePtr& msg) void CheckSendCompatibility(FairMQMessagePtr& msg)
{ {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage( FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(), msg->GetData(),
msg->GetSize(), msg->GetSize(),
@ -365,7 +377,7 @@ class FairMQChannel
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr msgWrapper(NewMessage( FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(), msg->GetData(),
msg->GetSize(), msg->GetSize(),
@ -381,7 +393,6 @@ class FairMQChannel
void CheckReceiveCompatibility(FairMQMessagePtr& msg) void CheckReceiveCompatibility(FairMQMessagePtr& msg)
{ {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage()); FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg); msg = move(newMsg);
} }
@ -391,7 +402,7 @@ class FairMQChannel
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
// LOG(debug) << "Channel type does not match message type. Creating wrapper";
FairMQMessagePtr newMsg(NewMessage()); FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg); msg = move(newMsg);
} }
@ -403,7 +414,24 @@ class FairMQChannel
fTransportFactory = factory; fTransportFactory = factory;
fTransportType = factory->GetType(); fTransportType = factory->GetType();
} }
auto SetModified(const bool modified) -> void; auto SetModified(const bool modified) -> void;
static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
static constexpr const char* DefaultTransportName = "default";
static constexpr const char* DefaultName = "";
static constexpr const char* DefaultType = "unspecified";
static constexpr const char* DefaultMethod = "unspecified";
static constexpr const char* DefaultAddress = "unspecified";
static constexpr int DefaultSndBufSize = 1000;
static constexpr int DefaultRcvBufSize = 1000;
static constexpr int DefaultSndKernelSize = 0;
static constexpr int DefaultRcvKernelSize = 0;
static constexpr int DefaultLinger = 500;
static constexpr int DefaultRateLogging = 1;
static constexpr int DefaultPortRangeMin = 22000;
static constexpr int DefaultPortRangeMax = 23000;
static constexpr bool DefaultAutoBind = true;
}; };
#endif /* FAIRMQCHANNEL_H_ */ #endif /* FAIRMQCHANNEL_H_ */

View File

@ -263,10 +263,9 @@ void FairMQProgOptions::ParseChannelsFromCmdLine()
void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered) void FairMQProgOptions::ParseCmdLine(const int argc, char const* const* argv, bool allowUnregistered)
{ {
// clear the container because it was filled with default values and subsequent calls to store() do not overwrite the existing values
fVarMap.clear(); fVarMap.clear();
// get options from cmd line and store in variable map
// here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options
if (allowUnregistered) { if (allowUnregistered) {
po::command_line_parser parser{argc, argv}; po::command_line_parser parser{argc, argv};
parser.options(fAllOptions).allow_unregistered(); parser.options(fAllOptions).allow_unregistered();
@ -321,8 +320,6 @@ void FairMQProgOptions::UpdateChannelInfo()
} }
} }
// read FairMQChannelMap and insert/update corresponding values in variable map
// create key for variable map as follow : channelName.index.memberName
void FairMQProgOptions::UpdateMQValues() void FairMQProgOptions::UpdateMQValues()
{ {
for (const auto& p : fFairMQChannelMap) { for (const auto& p : fFairMQChannelMap) {