Use exceptions for fatal errors in device/channel

- These will be caught by StateMachine::ProcessWork
   and lead to error state.
 - Solve issue where device goes into ready state if
   it encounters misconfigured channel in the Run.
 - deprecate WaitForInitialValidation().
This commit is contained in:
Alexey Rybalchenko 2018-10-31 14:32:31 +01:00 committed by Dennis Klein
parent 3561255cf9
commit 3b5b2b501f
4 changed files with 397 additions and 530 deletions

View File

@ -13,6 +13,7 @@
*/
#include "FairMQChannel.h"
#include <fairmq/Tools.h>
#include <boost/algorithm/string.hpp> // join/split
@ -156,357 +157,239 @@ string FairMQChannel::GetChannelIndex() const
}
string FairMQChannel::GetType() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fType;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
string FairMQChannel::GetMethod() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fMethod;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
string FairMQChannel::GetAddress() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fAddress;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
string FairMQChannel::GetTransportName() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fair::mq::TransportNames.at(fTransportType);
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetSndBufSize() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fSndBufSize;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetRcvBufSize() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fRcvBufSize;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetSndKernelSize() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fSndKernelSize;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetRcvKernelSize() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fRcvKernelSize;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetLinger() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fLinger;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
int FairMQChannel::GetRateLogging() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fRateLogging;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateType(const string& type)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fType = type;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateType: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateMethod(const string& method)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fMethod = method;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateAddress(const string& address)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fAddress = address;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateTransport(const string& transport)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fTransportType = fair::mq::TransportTypes.at(transport);
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fSndBufSize = sndBufSize;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fRcvBufSize = rcvBufSize;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fSndKernelSize = sndKernelSize;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fRcvKernelSize = rcvKernelSize;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateLinger(const int duration)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fLinger = duration;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateRateLogging(const int rateLogging)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fRateLogging = rateLogging;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
auto FairMQChannel::SetModified(const bool modified) -> void
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fModified = modified;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
void FairMQChannel::UpdateChannelName(const string& name)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
fName = name;
fModified = true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::UpdateChannelName: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::IsValid() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
return fIsValid;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what()));
}
bool FairMQChannel::ValidateChannel()
{
try
{
unique_lock<mutex> lock(fChannelMutex);
try {
lock_guard<mutex> lock(fChannelMutex);
stringstream ss;
ss << "Validating channel \"" << fName << "\"... ";
ss << "Validating channel '" << fName << "'... ";
if (fIsValid)
{
@ -522,8 +405,8 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid channel type: \"" << fType << "\"";
exit(EXIT_FAILURE);
LOG(error) << "Invalid channel type: '" << fType << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid channel type: '", fType, "'"));
}
// validate socket address
@ -531,7 +414,7 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(debug) << "invalid channel address: \"" << fAddress << "\"";
LOG(debug) << "invalid channel address: '" << fAddress << "'";
return false;
}
else
@ -554,8 +437,8 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "Invalid endpoint connection method: \"" << fMethod << "\" for " << endpoint;
exit(EXIT_FAILURE);
LOG(error) << "Invalid endpoint connection method: '" << fMethod << "' for " << endpoint;
throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid endpoint connection method: '", fMethod, "' for ", endpoint));
}
address = endpoint;
}
@ -568,7 +451,7 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: \"" << address << "\" (missing port?)";
LOG(error) << "invalid channel address: '" << address << "' (missing port?)";
return false;
}
}
@ -580,7 +463,7 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: \"" << address << "\" (empty IPC address?)";
LOG(error) << "invalid channel address: '" << address << "' (empty IPC address?)";
return false;
}
}
@ -592,7 +475,7 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: \"" << address << "\" (empty inproc address?)";
LOG(error) << "invalid channel address: '" << address << "' (empty inproc address?)";
return false;
}
}
@ -604,7 +487,7 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: \"" << address << "\" (empty verbs address?)";
LOG(error) << "invalid channel address: '" << address << "' (empty verbs address?)";
return false;
}
}
@ -613,7 +496,7 @@ bool FairMQChannel::ValidateChannel()
// if neither TCP or IPC is specified, return invalid
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel address: \"" << address << "\" (missing protocol specifier?)";
LOG(error) << "invalid channel address: '" << address << "' (missing protocol specifier?)";
return false;
}
}
@ -624,8 +507,8 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send buffer size (cannot be negative): \"" << fSndBufSize << "\"";
exit(EXIT_FAILURE);
LOG(error) << "invalid channel send buffer size (cannot be negative): '" << fSndBufSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send buffer size (cannot be negative): '", fSndBufSize, "'"));
}
// validate socket buffer size for receiving
@ -633,8 +516,8 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive buffer size (cannot be negative): \"" << fRcvBufSize << "\"";
exit(EXIT_FAILURE);
LOG(error) << "invalid channel receive buffer size (cannot be negative): '" << fRcvBufSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive buffer size (cannot be negative): '", fRcvBufSize, "'"));
}
// validate socket kernel transmit size for sending
@ -642,8 +525,8 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel send kernel transmit size (cannot be negative): \"" << fSndKernelSize << "\"";
exit(EXIT_FAILURE);
LOG(error) << "invalid channel send kernel transmit size (cannot be negative): '" << fSndKernelSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send kernel transmit size (cannot be negative): '", fSndKernelSize, "'"));
}
// validate socket kernel transmit size for receiving
@ -651,8 +534,8 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): \"" << fRcvKernelSize << "\"";
exit(EXIT_FAILURE);
LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): '" << fRcvKernelSize << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive kernel transmit size (cannot be negative): '", fRcvKernelSize, "'"));
}
// validate socket rate logging interval
@ -660,20 +543,17 @@ bool FairMQChannel::ValidateChannel()
{
ss << "INVALID";
LOG(debug) << ss.str();
LOG(error) << "invalid socket rate logging interval (cannot be negative): \"" << fRateLogging << "\"";
exit(EXIT_FAILURE);
LOG(error) << "invalid socket rate logging interval (cannot be negative): '" << fRateLogging << "'";
throw ChannelConfigurationError(fair::mq::tools::ToString("invalid socket rate logging interval (cannot be negative): '", fRateLogging, "'"));
}
fIsValid = true;
ss << "VALID";
LOG(debug) << ss.str();
return true;
}
catch (exception& e)
{
} catch (exception& e) {
LOG(error) << "Exception caught in FairMQChannel::ValidateChannel: " << e.what();
exit(EXIT_FAILURE);
}
throw ChannelConfigurationError(fair::mq::tools::ToString(e.what()));
}
void FairMQChannel::InitTransport(shared_ptr<FairMQTransportFactory> factory)
@ -684,7 +564,7 @@ void FairMQChannel::InitTransport(shared_ptr<FairMQTransportFactory> factory)
void FairMQChannel::ResetChannel()
{
unique_lock<mutex> lock(fChannelMutex);
lock_guard<mutex> lock(fChannelMutex);
fIsValid = false;
// TODO: implement channel resetting
}

View File

@ -14,6 +14,7 @@
#include <vector>
#include <atomic>
#include <mutex>
#include <stdexcept>
#include <FairMQTransportFactory.h>
#include <FairMQSocket.h>
@ -51,6 +52,8 @@ class FairMQChannel
/// Default destructor
virtual ~FairMQChannel();
struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; };
FairMQSocket& GetSocket() const;
auto Bind(const std::string& address) -> bool

View File

@ -16,7 +16,6 @@
#include <list>
#include <cstdlib>
#include <stdexcept>
#include <random>
#include <chrono>
#include <mutex>
@ -56,9 +55,6 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fInternalConfig(config ? nullptr : fair::mq::tools::make_unique<FairMQProgOptions>())
, fConfig(config ? config : fInternalConfig.get())
, fId()
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fDefaultTransportType(fair::mq::Transport::DEFAULT)
@ -76,6 +72,7 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fInterrupted(false)
, fInterruptedCV()
, fInterruptedMtx()
, fRateLogging(true)
{
}
@ -86,13 +83,12 @@ void FairMQDevice::InitWrapper()
fPortRangeMin = fConfig->GetValue<int>("port-range-min");
fPortRangeMax = fConfig->GetValue<int>("port-range-max");
try
{
try {
fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue<string>("transport"));
}
catch (const exception& e)
{
} catch (const exception& e) {
LOG(error) << "exception: " << e.what();
LOG(error) << "invalid transport type provided: " << fConfig->GetValue<string>("transport");
throw;
}
for (auto& c : fConfig->GetFairMQMap())
@ -159,9 +155,8 @@ void FairMQDevice::InitWrapper()
}
else
{
LOG(error) << "Cannot update configuration. Socket method (bind/connect) not specified.";
ChangeState(ERROR_FOUND);
// throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified.");
LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi->fName << "' not specified.";
throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi->fName, " not specified."));
}
// }
}
@ -174,19 +169,11 @@ void FairMQDevice::InitWrapper()
if (!uninitializedBindingChannels.empty())
{
LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete.";
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete."));
}
CallStateChangeCallbacks(INITIALIZING_DEVICE);
// notify parent thread about completion of first validation.
{
lock_guard<mutex> lock(fInitialValidationMutex);
fInitialValidationFinished = true;
fInitialValidationCondition.notify_one();
}
int initializationTimeoutInS = fConfig->GetValue<int>("initialization-timeout");
// go over the list of channels until all are initialized (and removed from the uninitialized list)
@ -200,8 +187,6 @@ void FairMQDevice::InitWrapper()
{
this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS));
if (fConfig != nullptr)
{
for (auto& chan : uninitializedConnectingChannels)
{
string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"};
@ -211,13 +196,11 @@ void FairMQDevice::InitWrapper()
chan->UpdateAddress(newAddress);
}
}
}
if (numAttempts++ > maxAttempts)
{
LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts";
ChangeState(ERROR_FOUND);
// throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts"));
}
AttachChannels(uninitializedConnectingChannels);
@ -225,13 +208,11 @@ void FairMQDevice::InitWrapper()
Init();
ChangeState(internal_DEVICE_READY);
}
if (fChannels.empty()) {
LOG(warn) << "No channels created after finishing initialization";
}
void FairMQDevice::WaitForInitialValidation()
{
unique_lock<mutex> lock(fInitialValidationMutex);
fInitialValidationCondition.wait(lock, [&] () { return fInitialValidationFinished; });
ChangeState(internal_DEVICE_READY);
}
void FairMQDevice::Init()
@ -382,12 +363,9 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
if (newAddress != ch.fAddress)
{
ch.UpdateAddress(newAddress);
if (fConfig)
{
string key{"chans." + ch.GetChannelPrefix() + "." + ch.GetChannelIndex() + ".address"};
fConfig->SetValue(key, newAddress);
}
}
return true;
}
@ -495,6 +473,7 @@ void FairMQDevice::RunWrapper()
LOG(info) << "DEVICE: Running...";
// start the rate logger thread
fRateLogging = true;
future<void> rateLogger = async(launch::async, &FairMQDevice::LogSocketRates, this);
// notify transports to resume transfers
@ -507,8 +486,7 @@ void FairMQDevice::RunWrapper()
t.second->Resume();
}
try
{
try {
PreRun();
// process either data callbacks or ConditionalRun/Run
@ -538,11 +516,14 @@ void FairMQDevice::RunWrapper()
Run();
}
}
catch (const out_of_range& oor)
{
} catch (const out_of_range& oor) {
LOG(error) << "out of range: " << oor.what();
LOG(error) << "incorrect/incomplete channel configuration?";
fRateLogging = false;
throw;
} catch (...) {
fRateLogging = false;
throw;
}
// if Run() exited and the state is still RUNNING, transition to READY.
@ -721,7 +702,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const
catch (exception& e)
{
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
ChangeState(ERROR_FOUND);
throw runtime_error(fair::mq::tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
}
}
@ -877,7 +858,7 @@ void FairMQDevice::LogSocketRates()
LOG(debug) << "<channel>: in: <#msgs> (<MB>) out: <#msgs> (<MB>)";
while (CheckCurrentState(RUNNING))
while (fRateLogging)
{
t1 = chrono::high_resolution_clock::now();
@ -931,6 +912,7 @@ void FairMQDevice::Unblock()
{
lock_guard<mutex> guard(fInterruptedMtx);
fInterrupted = true;
fRateLogging = false;
}
fInterruptedCV.notify_all();
}
@ -977,11 +959,6 @@ void FairMQDevice::Reset()
{
}
const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const int index) const
{
return fChannels.at(channelName).at(index);
}
void FairMQDevice::Exit()
{
}

View File

@ -31,6 +31,7 @@
#include <functional>
#include <assert.h> // static_assert
#include <type_traits> // is_trivially_copyable
#include <stdexcept>
#include <mutex>
#include <condition_variable>
@ -102,9 +103,9 @@ class FairMQDevice : public FairMQStateMachine
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1)
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return fChannels.at(chan).at(i).Send(msg, sndTimeoutInMs);
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
}
/// Shorthand method to receive `msg` on `chan` at index `i`
@ -113,18 +114,18 @@ class FairMQDevice : public FairMQStateMachine
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1)
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return fChannels.at(chan).at(i).Receive(msg, rcvTimeoutInMs);
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
}
int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, \"channelA\", subchannelIndex, timeout);")))
int SendAsync(FairMQMessagePtr& msg, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, \"channelA\", subchannelIndex, timeout);")))
{
return fChannels.at(chan).at(i).Send(msg, 0);
return GetChannel(channel, index).Send(msg, 0);
}
int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, \"channelA\", subchannelIndex, timeout);")))
int ReceiveAsync(FairMQMessagePtr& msg, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, \"channelA\", subchannelIndex, timeout);")))
{
return fChannels.at(chan).at(i).Receive(msg, 0);
return GetChannel(channel, index).Receive(msg, 0);
}
/// Shorthand method to send FairMQParts on `chan` at index `i`
@ -133,9 +134,9 @@ class FairMQDevice : public FairMQStateMachine
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error.
int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return fChannels.at(chan).at(i).Send(parts.fParts, sndTimeoutInMs);
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
}
/// Shorthand method to receive FairMQParts on `chan` at index `i`
@ -144,18 +145,18 @@ class FairMQDevice : public FairMQStateMachine
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error.
int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return fChannels.at(chan).at(i).Receive(parts.fParts, rcvTimeoutInMs);
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);
}
int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, \"channelA\", subchannelIndex, timeout);")))
int64_t SendAsync(FairMQParts& parts, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, \"channelA\", subchannelIndex, timeout);")))
{
return fChannels.at(chan).at(i).Send(parts.fParts, 0);
return GetChannel(channel, index).Send(parts.fParts, 0);
}
int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, \"channelA\", subchannelIndex, timeout);")))
int64_t ReceiveAsync(FairMQParts& parts, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, \"channelA\", subchannelIndex, timeout);")))
{
return fChannels.at(chan).at(i).Receive(parts.fParts, 0);
return GetChannel(channel, index).Receive(parts.fParts, 0);
}
/// @brief Getter for default transport factory
@ -173,7 +174,7 @@ class FairMQDevice : public FairMQStateMachine
template<typename... Args>
FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args)
{
return fChannels.at(channel).at(index).NewMessage(std::forward<Args>(args)...);
return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
}
template<typename T>
@ -185,7 +186,7 @@ class FairMQDevice : public FairMQStateMachine
template<typename T>
FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data)
{
return fChannels.at(channel).at(index).NewStaticMessage(data);
return GetChannel(channel, index).NewStaticMessage(data);
}
template<typename T>
@ -197,7 +198,7 @@ class FairMQDevice : public FairMQStateMachine
template<typename T>
FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data)
{
return fChannels.at(channel).at(index).NewSimpleMessage(data);
return GetChannel(channel, index).NewSimpleMessage(data);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size)
@ -207,7 +208,7 @@ class FairMQDevice : public FairMQStateMachine
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr)
{
return fChannels.at(channel).at(index).Transport()->CreateUnmanagedRegion(size, callback);
return GetChannel(channel, index).Transport()->CreateUnmanagedRegion(size, callback);
}
template<typename ...Ts>
@ -218,19 +219,19 @@ class FairMQDevice : public FairMQStateMachine
// if more than one channel provided, check compatibility
if (chans.size() > 1)
{
fair::mq::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType();
fair::mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType();
for (unsigned int i = 1; i < chans.size(); ++i)
{
if (type != fChannels.at(chans.at(i)).at(0).Transport()->GetType())
if (type != GetChannel(chans.at(i), 0).Transport()->GetType())
{
LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
ChangeState(ERROR_FOUND);
throw std::runtime_error("poller failed: different transports within same poller are not yet supported.");
}
}
}
return fChannels.at(chans.at(0)).at(0).Transport()->CreatePoller(fChannels, chans);
return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans);
}
FairMQPollerPtr NewPoller(const std::vector<FairMQChannel*>& channels)
@ -245,7 +246,7 @@ class FairMQDevice : public FairMQStateMachine
if (type != channels.at(i)->Transport()->GetType())
{
LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
ChangeState(ERROR_FOUND);
throw std::runtime_error("poller failed: different transports within same poller are not yet supported.");
}
}
}
@ -254,7 +255,7 @@ class FairMQDevice : public FairMQStateMachine
}
/// Waits for the first initialization run to finish
void WaitForInitialValidation();
void WaitForInitialValidation() __attribute__((deprecated("This method will have no effect in future versions and will be removed. Instead subscribe for state changes and inspect configuration values."))) {}
/// Adds a transport to the device if it doesn't exist
/// @param transport Transport string ("zeromq"/"nanomsg"/"shmem")
@ -273,6 +274,7 @@ class FairMQDevice : public FairMQStateMachine
/// @param rhs Left hand side value for comparison
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs);
// overload to easily bind member functions
template<typename T>
void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index))
{
@ -299,6 +301,7 @@ class FairMQDevice : public FairMQStateMachine
}
}
// overload to easily bind member functions
template<typename T>
void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index))
{
@ -325,7 +328,15 @@ class FairMQDevice : public FairMQStateMachine
}
}
const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const;
FairMQChannel& GetChannel(const std::string& channelName, const int index = 0)
try {
return fChannels.at(channelName).at(index);
} catch (const std::out_of_range& oor) {
LOG(error) << "out of range: " << oor.what();
LOG(error) << "requested channel has not been configured? check channel names/configuration.";
fRateLogging = false;
throw;
}
virtual void RegisterChannelEndpoints() {}
@ -443,11 +454,6 @@ class FairMQDevice : public FairMQStateMachine
virtual void Reset();
private:
// condition variable to notify parent thread about end of initial validation.
bool fInitialValidationFinished;
std::condition_variable fInitialValidationCondition;
std::mutex fInitialValidationMutex;
int fPortRangeMin; ///< Minimum value for the port range (if dynamic)
int fPortRangeMax; ///< Maximum value for the port range (if dynamic)
@ -511,6 +517,7 @@ class FairMQDevice : public FairMQStateMachine
std::atomic<bool> fInterrupted;
std::condition_variable fInterruptedCV;
std::mutex fInterruptedMtx;
mutable std::atomic<bool> fRateLogging;
};
#endif /* FAIRMQDEVICE_H_ */