Remove FairMQChannel from parser, implement bulk operations

This commit is contained in:
Alexey Rybalchenko
2019-05-24 15:54:34 +02:00
committed by Dennis Klein
parent fbb003b50f
commit bf8ec968e7
17 changed files with 568 additions and 559 deletions

View File

@@ -15,6 +15,16 @@
#include <random>
using namespace std;
using namespace fair::mq;
template<typename T>
T GetPropertyOrDefault(const fair::mq::Properties& m, const string& k, const T& ifNotFound) noexcept
{
if (m.count(k)) {
return boost::any_cast<T>(m.at(k));
}
return ifNotFound;
}
FairMQChannel::FairMQChannel()
: FairMQChannel(DefaultName, DefaultType, DefaultMethod, DefaultAddress, nullptr)
@@ -56,6 +66,26 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin
, fMtx()
{}
FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties)
: FairMQChannel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr)
{
string prefix(tools::ToString("chans.", name, ".", index, "."));
fType = GetPropertyOrDefault(properties, string(prefix + "type"), fType);
fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), fMethod);
fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), fAddress);
fTransportType = TransportTypes.at(GetPropertyOrDefault(properties, string(prefix + "transport"), TransportNames.at(fTransportType)));
fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), fSndBufSize);
fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), fRcvBufSize);
fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), fSndKernelSize);
fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), fRcvKernelSize);
fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), fLinger);
fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), fRateLogging);
fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), fPortRangeMin);
fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), fPortRangeMax);
fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), fAutoBind);
}
FairMQChannel::FairMQChannel(const FairMQChannel& chan)
: FairMQChannel(chan, chan.fName)
{}
@@ -144,7 +174,7 @@ try {
return fType;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
string FairMQChannel::GetMethod() const
@@ -153,7 +183,7 @@ try {
return fMethod;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
string FairMQChannel::GetAddress() const
@@ -162,25 +192,25 @@ try {
return fAddress;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
string FairMQChannel::GetTransportName() const
try {
lock_guard<mutex> lock(fMtx);
return fair::mq::TransportNames.at(fTransportType);
return TransportNames.at(fTransportType);
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
fair::mq::Transport FairMQChannel::GetTransportType() const
Transport FairMQChannel::GetTransportType() const
try {
lock_guard<mutex> lock(fMtx);
return fTransportType;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetTransportType: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
@@ -190,7 +220,7 @@ try {
return fSndBufSize;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetRcvBufSize() const
@@ -199,7 +229,7 @@ try {
return fRcvBufSize;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetSndKernelSize() const
@@ -208,7 +238,7 @@ try {
return fSndKernelSize;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetRcvKernelSize() const
@@ -217,7 +247,7 @@ try {
return fRcvKernelSize;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetLinger() const
@@ -226,7 +256,7 @@ try {
return fLinger;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetRateLogging() const
@@ -235,7 +265,7 @@ try {
return fRateLogging;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetPortRangeMin() const
@@ -244,7 +274,7 @@ try {
return fPortRangeMin;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMin: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetPortRangeMax() const
@@ -253,7 +283,7 @@ try {
return fPortRangeMax;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetPortRangeMax: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::GetAutoBind() const
@@ -262,7 +292,7 @@ try {
return fAutoBind;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetAutoBind: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateType(const string& type)
@@ -273,7 +303,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateType: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateMethod(const string& method)
@@ -284,7 +314,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateAddress(const string& address)
@@ -295,18 +325,18 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateTransport(const string& transport)
try {
lock_guard<mutex> lock(fMtx);
fIsValid = false;
fTransportType = fair::mq::TransportTypes.at(transport);
fTransportType = TransportTypes.at(transport);
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
@@ -317,7 +347,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
@@ -328,7 +358,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize)
@@ -339,7 +369,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
@@ -350,7 +380,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateLinger(const int duration)
@@ -361,7 +391,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateRateLogging(const int rateLogging)
@@ -372,7 +402,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdatePortRangeMin(const int minPort)
@@ -383,7 +413,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMin: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdatePortRangeMax(const int maxPort)
@@ -394,7 +424,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdatePortRangeMax: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateAutoBind(const bool autobind)
@@ -405,7 +435,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateAutoBind: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
auto FairMQChannel::SetModified(const bool modified) -> void
@@ -414,7 +444,7 @@ try {
fModified = modified;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateName(const string& name)
@@ -425,7 +455,7 @@ try {
fModified = true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateName: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::IsValid() const
@@ -434,7 +464,7 @@ try {
return fIsValid;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
throw ChannelConfigurationError(tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::Validate()
@@ -462,7 +492,7 @@ try {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid channel type: '" << fType << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid channel type: '", fType, "'"));
throw ChannelConfigurationError(tools::ToString("Invalid channel type: '", fType, "'"));
}
// validate socket address
@@ -485,7 +515,7 @@ try {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid endpoint connection method: '" << fMethod << "' for " << endpoint;
throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid endpoint connection method: '", fMethod, "' for ", endpoint));
throw ChannelConfigurationError(tools::ToString("Invalid endpoint connection method: '", fMethod, "' for ", endpoint));
}
address = endpoint;
}
@@ -541,7 +571,7 @@ try {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send buffer size (cannot be negative): '" << fSndBufSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send buffer size (cannot be negative): '", fSndBufSize, "'"));
throw ChannelConfigurationError(tools::ToString("invalid channel send buffer size (cannot be negative): '", fSndBufSize, "'"));
}
// validate socket buffer size for receiving
@@ -549,7 +579,7 @@ try {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive buffer size (cannot be negative): '" << fRcvBufSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive buffer size (cannot be negative): '", fRcvBufSize, "'"));
throw ChannelConfigurationError(tools::ToString("invalid channel receive buffer size (cannot be negative): '", fRcvBufSize, "'"));
}
// validate socket kernel transmit size for sending
@@ -557,7 +587,7 @@ try {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send kernel transmit size (cannot be negative): '" << fSndKernelSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send kernel transmit size (cannot be negative): '", fSndKernelSize, "'"));
throw ChannelConfigurationError(tools::ToString("invalid channel send kernel transmit size (cannot be negative): '", fSndKernelSize, "'"));
}
// validate socket kernel transmit size for receiving
@@ -565,7 +595,7 @@ try {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): '" << fRcvKernelSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive kernel transmit size (cannot be negative): '", fRcvKernelSize, "'"));
throw ChannelConfigurationError(tools::ToString("invalid channel receive kernel transmit size (cannot be negative): '", fRcvKernelSize, "'"));
}
// validate socket rate logging interval
@@ -573,7 +603,7 @@ try {
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid socket rate logging interval (cannot be negative): '" << fRateLogging << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid socket rate logging interval (cannot be negative): '", fRateLogging, "'"));
throw ChannelConfigurationError(tools::ToString("invalid socket rate logging interval (cannot be negative): '", fRateLogging, "'"));
}
fIsValid = true;
@@ -582,7 +612,7 @@ try {
return true;
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::ValidateChannel: " << e.what();
throw ChannelConfigurationError(fair::mq::tools::ToString(e.what()));
throw ChannelConfigurationError(tools::ToString(e.what()));
}
void FairMQChannel::Init()
@@ -641,7 +671,7 @@ bool FairMQChannel::BindEndpoint(string& endpoint)
}
size_t pos = endpoint.rfind(':');
endpoint = endpoint.substr(0, pos + 1) + fair::mq::tools::ToString(static_cast<int>(randomPort(generator)));
endpoint = endpoint.substr(0, pos + 1) + tools::ToString(static_cast<int>(randomPort(generator)));
} while (!fSocket->Bind(endpoint));
return true;