From 3b5b2b501f7cf5c803ab8084ccee9206451cdd62 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 31 Oct 2018 14:32:31 +0100 Subject: [PATCH] Use exceptions for fatal errors in device/channel - These will be caught by StateMachine::ProcessWork and lead to error state. - Solve issue where device goes into ready state if it encounters misconfigured channel in the Run. - deprecate WaitForInitialValidation(). --- fairmq/FairMQChannel.cxx | 768 +++++++++++++++++---------------------- fairmq/FairMQChannel.h | 3 + fairmq/FairMQDevice.cxx | 85 ++--- fairmq/FairMQDevice.h | 71 ++-- 4 files changed, 397 insertions(+), 530 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 2cbcd420..e0e9e157 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -13,6 +13,7 @@ */ #include "FairMQChannel.h" +#include #include // join/split @@ -156,524 +157,403 @@ string FairMQChannel::GetChannelIndex() const } string FairMQChannel::GetType() const -{ - try - { - unique_lock lock(fChannelMutex); - return fType; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fType; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetType: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetMethod() const -{ - try - { - unique_lock lock(fChannelMutex); - return fMethod; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fMethod; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetMethod: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetAddress() const -{ - try - { - unique_lock lock(fChannelMutex); - return fAddress; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fAddress; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetAddress: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } string FairMQChannel::GetTransportName() const -{ - try - { - unique_lock lock(fChannelMutex); - return fair::mq::TransportNames.at(fTransportType); - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fair::mq::TransportNames.at(fTransportType); +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetTransportName: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetSndBufSize() const -{ - try - { - unique_lock lock(fChannelMutex); - return fSndBufSize; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fSndBufSize; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRcvBufSize() const -{ - try - { - unique_lock lock(fChannelMutex); - return fRcvBufSize; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fRcvBufSize; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetSndKernelSize() const -{ - try - { - unique_lock lock(fChannelMutex); - return fSndKernelSize; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fSndKernelSize; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRcvKernelSize() const -{ - try - { - unique_lock lock(fChannelMutex); - return fRcvKernelSize; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fRcvKernelSize; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetLinger() const -{ - try - { - unique_lock lock(fChannelMutex); - return fLinger; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fLinger; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } int FairMQChannel::GetRateLogging() const -{ - try - { - unique_lock lock(fChannelMutex); - return fRateLogging; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fRateLogging; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateType(const string& type) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fType = type; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateType: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fType = type; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateType: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateMethod(const string& method) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fMethod = method; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fMethod = method; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateAddress(const string& address) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fAddress = address; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fAddress = address; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateTransport(const string& transport) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fTransportType = fair::mq::TransportTypes.at(transport); - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fTransportType = fair::mq::TransportTypes.at(transport); + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateTransport: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateSndBufSize(const int sndBufSize) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fSndBufSize = sndBufSize; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fSndBufSize = sndBufSize; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fRcvBufSize = rcvBufSize; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fRcvBufSize = rcvBufSize; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fSndKernelSize = sndKernelSize; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fSndKernelSize = sndKernelSize; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fRcvKernelSize = rcvKernelSize; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fRcvKernelSize = rcvKernelSize; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateLinger(const int duration) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fLinger = duration; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fLinger = duration; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateRateLogging(const int rateLogging) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fRateLogging = rateLogging; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fRateLogging = rateLogging; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } auto FairMQChannel::SetModified(const bool modified) -> void -{ - try - { - unique_lock lock(fChannelMutex); - fModified = modified; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fModified = modified; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::SetModified: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } void FairMQChannel::UpdateChannelName(const string& name) -{ - try - { - unique_lock lock(fChannelMutex); - fIsValid = false; - fName = name; - fModified = true; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::UpdateChannelName: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + fIsValid = false; + fName = name; + fModified = true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::UpdateChannelName: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::IsValid() const -{ - try - { - unique_lock lock(fChannelMutex); - return fIsValid; - } - catch (exception& e) - { - LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what(); - exit(EXIT_FAILURE); - } +try { + lock_guard lock(fChannelMutex); + return fIsValid; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::IsValid: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString("failed to acquire lock: ", e.what())); } bool FairMQChannel::ValidateChannel() -{ - try +try { + lock_guard lock(fChannelMutex); + stringstream ss; + ss << "Validating channel '" << fName << "'... "; + + if (fIsValid) { - unique_lock lock(fChannelMutex); - - stringstream ss; - ss << "Validating channel \"" << fName << "\"... "; - - if (fIsValid) - { - ss << "ALREADY VALID"; - LOG(debug) << ss.str(); - return true; - } - - // validate socket type - const string socketTypeNames[] = { "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" }; - const set socketTypes(socketTypeNames, socketTypeNames + sizeof(socketTypeNames) / sizeof(string)); - if (socketTypes.find(fType) == socketTypes.end()) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "Invalid channel type: \"" << fType << "\""; - exit(EXIT_FAILURE); - } - - // validate socket address - if (fAddress == "unspecified" || fAddress == "") - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(debug) << "invalid channel address: \"" << fAddress << "\""; - return false; - } - else - { - vector endpoints; - boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(";")); - for (const auto endpoint : endpoints) - { - string address; - if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>') - { - address = endpoint.substr(1); - } - else - { - // we don't have a method modifier, check if the default method is set - const string socketMethodNames[] = { "bind", "connect" }; - const set socketMethods(socketMethodNames, socketMethodNames + sizeof(socketMethodNames) / sizeof(string)); - if (socketMethods.find(fMethod) == socketMethods.end()) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "Invalid endpoint connection method: \"" << fMethod << "\" for " << endpoint; - exit(EXIT_FAILURE); - } - address = endpoint; - } - // check if address is a tcp or ipc address - if (address.compare(0, 6, "tcp://") == 0) - { - // check if TCP address contains port delimiter - string addressString = address.substr(6); - if (addressString.find(':') == string::npos) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel address: \"" << address << "\" (missing port?)"; - return false; - } - } - else if (address.compare(0, 6, "ipc://") == 0) - { - // check if IPC address is not empty - string addressString = address.substr(6); - if (addressString == "") - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel address: \"" << address << "\" (empty IPC address?)"; - return false; - } - } - else if (address.compare(0, 9, "inproc://") == 0) - { - // check if IPC address is not empty - string addressString = address.substr(9); - if (addressString == "") - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel address: \"" << address << "\" (empty inproc address?)"; - return false; - } - } - else if (address.compare(0, 8, "verbs://") == 0) - { - // check if IPC address is not empty - string addressString = address.substr(9); - if (addressString == "") - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel address: \"" << address << "\" (empty verbs address?)"; - return false; - } - } - else - { - // if neither TCP or IPC is specified, return invalid - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel address: \"" << address << "\" (missing protocol specifier?)"; - return false; - } - } - } - - // validate socket buffer size for sending - if (fSndBufSize < 0) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel send buffer size (cannot be negative): \"" << fSndBufSize << "\""; - exit(EXIT_FAILURE); - } - - // validate socket buffer size for receiving - if (fRcvBufSize < 0) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel receive buffer size (cannot be negative): \"" << fRcvBufSize << "\""; - exit(EXIT_FAILURE); - } - - // validate socket kernel transmit size for sending - if (fSndKernelSize < 0) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel send kernel transmit size (cannot be negative): \"" << fSndKernelSize << "\""; - exit(EXIT_FAILURE); - } - - // validate socket kernel transmit size for receiving - if (fRcvKernelSize < 0) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): \"" << fRcvKernelSize << "\""; - exit(EXIT_FAILURE); - } - - // validate socket rate logging interval - if (fRateLogging < 0) - { - ss << "INVALID"; - LOG(debug) << ss.str(); - LOG(error) << "invalid socket rate logging interval (cannot be negative): \"" << fRateLogging << "\""; - exit(EXIT_FAILURE); - } - - fIsValid = true; - ss << "VALID"; + ss << "ALREADY VALID"; LOG(debug) << ss.str(); return true; } - catch (exception& e) + + // validate socket type + const string socketTypeNames[] = { "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" }; + const set socketTypes(socketTypeNames, socketTypeNames + sizeof(socketTypeNames) / sizeof(string)); + if (socketTypes.find(fType) == socketTypes.end()) { - LOG(error) << "Exception caught in FairMQChannel::ValidateChannel: " << e.what(); - exit(EXIT_FAILURE); + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "Invalid channel type: '" << fType << "'"; + throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid channel type: '", fType, "'")); } + + // validate socket address + if (fAddress == "unspecified" || fAddress == "") + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(debug) << "invalid channel address: '" << fAddress << "'"; + return false; + } + else + { + vector endpoints; + boost::algorithm::split(endpoints, fAddress, boost::algorithm::is_any_of(";")); + for (const auto endpoint : endpoints) + { + string address; + if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>') + { + address = endpoint.substr(1); + } + else + { + // we don't have a method modifier, check if the default method is set + const string socketMethodNames[] = { "bind", "connect" }; + const set socketMethods(socketMethodNames, socketMethodNames + sizeof(socketMethodNames) / sizeof(string)); + if (socketMethods.find(fMethod) == socketMethods.end()) + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "Invalid endpoint connection method: '" << fMethod << "' for " << endpoint; + throw ChannelConfigurationError(fair::mq::tools::ToString("Invalid endpoint connection method: '", fMethod, "' for ", endpoint)); + } + address = endpoint; + } + // check if address is a tcp or ipc address + if (address.compare(0, 6, "tcp://") == 0) + { + // check if TCP address contains port delimiter + string addressString = address.substr(6); + if (addressString.find(':') == string::npos) + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel address: '" << address << "' (missing port?)"; + return false; + } + } + else if (address.compare(0, 6, "ipc://") == 0) + { + // check if IPC address is not empty + string addressString = address.substr(6); + if (addressString == "") + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel address: '" << address << "' (empty IPC address?)"; + return false; + } + } + else if (address.compare(0, 9, "inproc://") == 0) + { + // check if IPC address is not empty + string addressString = address.substr(9); + if (addressString == "") + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel address: '" << address << "' (empty inproc address?)"; + return false; + } + } + else if (address.compare(0, 8, "verbs://") == 0) + { + // check if IPC address is not empty + string addressString = address.substr(9); + if (addressString == "") + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel address: '" << address << "' (empty verbs address?)"; + return false; + } + } + else + { + // if neither TCP or IPC is specified, return invalid + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel address: '" << address << "' (missing protocol specifier?)"; + return false; + } + } + } + + // validate socket buffer size for sending + if (fSndBufSize < 0) + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel send buffer size (cannot be negative): '" << fSndBufSize << "'"; + throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send buffer size (cannot be negative): '", fSndBufSize, "'")); + } + + // validate socket buffer size for receiving + if (fRcvBufSize < 0) + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel receive buffer size (cannot be negative): '" << fRcvBufSize << "'"; + throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive buffer size (cannot be negative): '", fRcvBufSize, "'")); + } + + // validate socket kernel transmit size for sending + if (fSndKernelSize < 0) + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel send kernel transmit size (cannot be negative): '" << fSndKernelSize << "'"; + throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel send kernel transmit size (cannot be negative): '", fSndKernelSize, "'")); + } + + // validate socket kernel transmit size for receiving + if (fRcvKernelSize < 0) + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid channel receive kernel transmit size (cannot be negative): '" << fRcvKernelSize << "'"; + throw ChannelConfigurationError(fair::mq::tools::ToString("invalid channel receive kernel transmit size (cannot be negative): '", fRcvKernelSize, "'")); + } + + // validate socket rate logging interval + if (fRateLogging < 0) + { + ss << "INVALID"; + LOG(debug) << ss.str(); + LOG(error) << "invalid socket rate logging interval (cannot be negative): '" << fRateLogging << "'"; + throw ChannelConfigurationError(fair::mq::tools::ToString("invalid socket rate logging interval (cannot be negative): '", fRateLogging, "'")); + } + + fIsValid = true; + ss << "VALID"; + LOG(debug) << ss.str(); + return true; +} catch (exception& e) { + LOG(error) << "Exception caught in FairMQChannel::ValidateChannel: " << e.what(); + throw ChannelConfigurationError(fair::mq::tools::ToString(e.what())); } void FairMQChannel::InitTransport(shared_ptr factory) @@ -684,7 +564,7 @@ void FairMQChannel::InitTransport(shared_ptr factory) void FairMQChannel::ResetChannel() { - unique_lock lock(fChannelMutex); + lock_guard lock(fChannelMutex); fIsValid = false; // TODO: implement channel resetting } diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 81d509a1..a6bf3a31 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -51,6 +52,8 @@ class FairMQChannel /// Default destructor virtual ~FairMQChannel(); + struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; }; + FairMQSocket& GetSocket() const; auto Bind(const std::string& address) -> bool diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 5219c812..3e08f29b 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -16,7 +16,6 @@ #include #include -#include #include #include #include @@ -56,9 +55,6 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver , fInternalConfig(config ? nullptr : fair::mq::tools::make_unique()) , fConfig(config ? config : fInternalConfig.get()) , fId() - , fInitialValidationFinished(false) - , fInitialValidationCondition() - , fInitialValidationMutex() , fPortRangeMin(22000) , fPortRangeMax(32000) , fDefaultTransportType(fair::mq::Transport::DEFAULT) @@ -76,6 +72,7 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver , fInterrupted(false) , fInterruptedCV() , fInterruptedMtx() + , fRateLogging(true) { } @@ -86,13 +83,12 @@ void FairMQDevice::InitWrapper() fPortRangeMin = fConfig->GetValue("port-range-min"); fPortRangeMax = fConfig->GetValue("port-range-max"); - try - { + try { fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetValue("transport")); - } - catch (const exception& e) - { + } catch (const exception& e) { + LOG(error) << "exception: " << e.what(); LOG(error) << "invalid transport type provided: " << fConfig->GetValue("transport"); + throw; } for (auto& c : fConfig->GetFairMQMap()) @@ -159,9 +155,8 @@ void FairMQDevice::InitWrapper() } else { - LOG(error) << "Cannot update configuration. Socket method (bind/connect) not specified."; - ChangeState(ERROR_FOUND); - // throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); + LOG(error) << "Cannot update configuration. Socket method (bind/connect) for channel '" << vi->fName << "' not specified."; + throw runtime_error(fair::mq::tools::ToString("Cannot update configuration. Socket method (bind/connect) for channel ", vi->fName, " not specified.")); } // } } @@ -174,19 +169,11 @@ void FairMQDevice::InitWrapper() if (!uninitializedBindingChannels.empty()) { LOG(error) << uninitializedBindingChannels.size() << " of the binding channels could not initialize. Initial configuration incomplete."; - ChangeState(ERROR_FOUND); - // throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); + throw runtime_error(fair::mq::tools::ToString(uninitializedBindingChannels.size(), " of the binding channels could not initialize. Initial configuration incomplete.")); } CallStateChangeCallbacks(INITIALIZING_DEVICE); - // notify parent thread about completion of first validation. - { - lock_guard lock(fInitialValidationMutex); - fInitialValidationFinished = true; - fInitialValidationCondition.notify_one(); - } - int initializationTimeoutInS = fConfig->GetValue("initialization-timeout"); // go over the list of channels until all are initialized (and removed from the uninitialized list) @@ -200,24 +187,20 @@ void FairMQDevice::InitWrapper() { this_thread::sleep_for(chrono::milliseconds(sleepTimeInMS)); - if (fConfig != nullptr) + for (auto& chan : uninitializedConnectingChannels) { - for (auto& chan : uninitializedConnectingChannels) + string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"}; + string newAddress = fConfig->GetValue(key); + if (newAddress != chan->GetAddress()) { - string key{"chans." + chan->GetChannelPrefix() + "." + chan->GetChannelIndex() + ".address"}; - string newAddress = fConfig->GetValue(key); - if (newAddress != chan->GetAddress()) - { - chan->UpdateAddress(newAddress); - } + chan->UpdateAddress(newAddress); } } if (numAttempts++ > maxAttempts) { LOG(error) << "could not connect all channels after " << initializationTimeoutInS << " attempts"; - ChangeState(ERROR_FOUND); - // throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts")); + throw runtime_error(fair::mq::tools::ToString("could not connect all channels after ", initializationTimeoutInS, " attempts")); } AttachChannels(uninitializedConnectingChannels); @@ -225,13 +208,11 @@ void FairMQDevice::InitWrapper() Init(); - ChangeState(internal_DEVICE_READY); -} + if (fChannels.empty()) { + LOG(warn) << "No channels created after finishing initialization"; + } -void FairMQDevice::WaitForInitialValidation() -{ - unique_lock lock(fInitialValidationMutex); - fInitialValidationCondition.wait(lock, [&] () { return fInitialValidationFinished; }); + ChangeState(internal_DEVICE_READY); } void FairMQDevice::Init() @@ -382,11 +363,8 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) if (newAddress != ch.fAddress) { ch.UpdateAddress(newAddress); - if (fConfig) - { - string key{"chans." + ch.GetChannelPrefix() + "." + ch.GetChannelIndex() + ".address"}; - fConfig->SetValue(key, newAddress); - } + string key{"chans." + ch.GetChannelPrefix() + "." + ch.GetChannelIndex() + ".address"}; + fConfig->SetValue(key, newAddress); } return true; @@ -495,6 +473,7 @@ void FairMQDevice::RunWrapper() LOG(info) << "DEVICE: Running..."; // start the rate logger thread + fRateLogging = true; future rateLogger = async(launch::async, &FairMQDevice::LogSocketRates, this); // notify transports to resume transfers @@ -507,8 +486,7 @@ void FairMQDevice::RunWrapper() t.second->Resume(); } - try - { + try { PreRun(); // process either data callbacks or ConditionalRun/Run @@ -538,11 +516,14 @@ void FairMQDevice::RunWrapper() Run(); } - } - catch (const out_of_range& oor) - { + } catch (const out_of_range& oor) { LOG(error) << "out of range: " << oor.what(); LOG(error) << "incorrect/incomplete channel configuration?"; + fRateLogging = false; + throw; + } catch (...) { + fRateLogging = false; + throw; } // if Run() exited and the state is still RUNNING, transition to READY. @@ -721,7 +702,7 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const catch (exception& e) { LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state."; - ChangeState(ERROR_FOUND); + throw runtime_error(fair::mq::tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state.")); } } @@ -877,7 +858,7 @@ void FairMQDevice::LogSocketRates() LOG(debug) << ": in: <#msgs> () out: <#msgs> ()"; - while (CheckCurrentState(RUNNING)) + while (fRateLogging) { t1 = chrono::high_resolution_clock::now(); @@ -931,6 +912,7 @@ void FairMQDevice::Unblock() { lock_guard guard(fInterruptedMtx); fInterrupted = true; + fRateLogging = false; } fInterruptedCV.notify_all(); } @@ -977,11 +959,6 @@ void FairMQDevice::Reset() { } -const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const int index) const -{ - return fChannels.at(channelName).at(index); -} - void FairMQDevice::Exit() { } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 76c05e29..5b237b1c 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -31,6 +31,7 @@ #include #include // static_assert #include // is_trivially_copyable +#include #include #include @@ -102,9 +103,9 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. - int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) + int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) { - return fChannels.at(chan).at(i).Send(msg, sndTimeoutInMs); + return GetChannel(channel, index).Send(msg, sndTimeoutInMs); } /// Shorthand method to receive `msg` on `chan` at index `i` @@ -113,18 +114,18 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. - int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) + int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) { - return fChannels.at(chan).at(i).Receive(msg, rcvTimeoutInMs); + return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); } - int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, \"channelA\", subchannelIndex, timeout);"))) + int SendAsync(FairMQMessagePtr& msg, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(msg, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).Send(msg, 0); + return GetChannel(channel, index).Send(msg, 0); } - int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, \"channelA\", subchannelIndex, timeout);"))) + int ReceiveAsync(FairMQMessagePtr& msg, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(msg, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).Receive(msg, 0); + return GetChannel(channel, index).Receive(msg, 0); } /// Shorthand method to send FairMQParts on `chan` at index `i` @@ -133,9 +134,9 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. -1 if there was an error. - int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) + int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) { - return fChannels.at(chan).at(i).Send(parts.fParts, sndTimeoutInMs); + return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); } /// Shorthand method to receive FairMQParts on `chan` at index `i` @@ -144,18 +145,18 @@ class FairMQDevice : public FairMQStateMachine /// @param i channel index /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) /// @return Number of bytes that have been received. -2 if reading from the queue was not possible or timed out. -1 if there was an error. - int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) + int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) { - return fChannels.at(chan).at(i).Receive(parts.fParts, rcvTimeoutInMs); + return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); } - int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, \"channelA\", subchannelIndex, timeout);"))) + int64_t SendAsync(FairMQParts& parts, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Send, use timeout version with timeout of 0: Send(parts, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).Send(parts.fParts, 0); + return GetChannel(channel, index).Send(parts.fParts, 0); } - int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, \"channelA\", subchannelIndex, timeout);"))) + int64_t ReceiveAsync(FairMQParts& parts, const std::string& channel, const int index = 0) __attribute__((deprecated("For non-blocking Receive, use timeout version with timeout of 0: Receive(parts, \"channelA\", subchannelIndex, timeout);"))) { - return fChannels.at(chan).at(i).Receive(parts.fParts, 0); + return GetChannel(channel, index).Receive(parts.fParts, 0); } /// @brief Getter for default transport factory @@ -173,7 +174,7 @@ class FairMQDevice : public FairMQStateMachine template FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) { - return fChannels.at(channel).at(index).NewMessage(std::forward(args)...); + return GetChannel(channel, index).NewMessage(std::forward(args)...); } template @@ -185,7 +186,7 @@ class FairMQDevice : public FairMQStateMachine template FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) { - return fChannels.at(channel).at(index).NewStaticMessage(data); + return GetChannel(channel, index).NewStaticMessage(data); } template @@ -197,7 +198,7 @@ class FairMQDevice : public FairMQStateMachine template FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) { - return fChannels.at(channel).at(index).NewSimpleMessage(data); + return GetChannel(channel, index).NewSimpleMessage(data); } FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size) @@ -207,7 +208,7 @@ class FairMQDevice : public FairMQStateMachine FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr) { - return fChannels.at(channel).at(index).Transport()->CreateUnmanagedRegion(size, callback); + return GetChannel(channel, index).Transport()->CreateUnmanagedRegion(size, callback); } template @@ -218,19 +219,19 @@ class FairMQDevice : public FairMQStateMachine // if more than one channel provided, check compatibility if (chans.size() > 1) { - fair::mq::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType(); + fair::mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType(); for (unsigned int i = 1; i < chans.size(); ++i) { - if (type != fChannels.at(chans.at(i)).at(0).Transport()->GetType()) + if (type != GetChannel(chans.at(i), 0).Transport()->GetType()) { LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state."; - ChangeState(ERROR_FOUND); + throw std::runtime_error("poller failed: different transports within same poller are not yet supported."); } } } - return fChannels.at(chans.at(0)).at(0).Transport()->CreatePoller(fChannels, chans); + return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans); } FairMQPollerPtr NewPoller(const std::vector& channels) @@ -245,7 +246,7 @@ class FairMQDevice : public FairMQStateMachine if (type != channels.at(i)->Transport()->GetType()) { LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state."; - ChangeState(ERROR_FOUND); + throw std::runtime_error("poller failed: different transports within same poller are not yet supported."); } } } @@ -254,7 +255,7 @@ class FairMQDevice : public FairMQStateMachine } /// Waits for the first initialization run to finish - void WaitForInitialValidation(); + void WaitForInitialValidation() __attribute__((deprecated("This method will have no effect in future versions and will be removed. Instead subscribe for state changes and inspect configuration values."))) {} /// Adds a transport to the device if it doesn't exist /// @param transport Transport string ("zeromq"/"nanomsg"/"shmem") @@ -273,6 +274,7 @@ class FairMQDevice : public FairMQStateMachine /// @param rhs Left hand side value for comparison static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); + // overload to easily bind member functions template void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index)) { @@ -299,6 +301,7 @@ class FairMQDevice : public FairMQStateMachine } } + // overload to easily bind member functions template void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index)) { @@ -325,7 +328,15 @@ class FairMQDevice : public FairMQStateMachine } } - const FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) const; + FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) + try { + return fChannels.at(channelName).at(index); + } catch (const std::out_of_range& oor) { + LOG(error) << "out of range: " << oor.what(); + LOG(error) << "requested channel has not been configured? check channel names/configuration."; + fRateLogging = false; + throw; + } virtual void RegisterChannelEndpoints() {} @@ -443,11 +454,6 @@ class FairMQDevice : public FairMQStateMachine virtual void Reset(); private: - // condition variable to notify parent thread about end of initial validation. - bool fInitialValidationFinished; - std::condition_variable fInitialValidationCondition; - std::mutex fInitialValidationMutex; - int fPortRangeMin; ///< Minimum value for the port range (if dynamic) int fPortRangeMax; ///< Maximum value for the port range (if dynamic) @@ -511,6 +517,7 @@ class FairMQDevice : public FairMQStateMachine std::atomic fInterrupted; std::condition_variable fInterruptedCV; std::mutex fInterruptedMtx; + mutable std::atomic fRateLogging; }; #endif /* FAIRMQDEVICE_H_ */