Enable new callback API

- OnData() channel data handler.
 - ConditionalRun() for devices without incoming data.
 - Header file with common main(), to be extended with getDevice/addCustomOptions.
 - Update examples (MQ/Tutorial3) to use the new API and config.
 - NewSimpleMessage() for simpler creation of small messages (additional copy).
 - Replace SetProperty/GetProperty with fConfig access.
 - Runtime configurable channel names for common devices.
 - Configurable logging interval per channel.
 - FairMQMultiplier for distributing same data to multiple outputs.
 - Cleanup state machine messages.
 - Cmd option to toggle signal handling.
 - Simpler API for send/receive timeouts.
 - Enable --log-to-file.
 - Fix coverity issues, warnings.
 - Various code cleanup and minor tweaks.
This commit is contained in:
Alexey Rybalchenko
2016-08-10 09:47:53 +02:00
parent e0a03242ac
commit 16fd63cd5b
54 changed files with 1730 additions and 1665 deletions

View File

@@ -21,6 +21,7 @@
#include <termios.h> // for the InteractiveStateLoop
#include <poll.h>
#include <boost/timer/timer.hpp>
#include <boost/thread.hpp>
#include <boost/random/mersenne_twister.hpp> // for choosing random port in range
#include <boost/random/uniform_int_distribution.hpp> // for choosing random port in range
@@ -47,6 +48,7 @@ static void CallSignalHandler(int signal)
FairMQDevice::FairMQDevice()
: fChannels()
, fConfig(nullptr)
, fId()
, fNetworkInterface()
, fMaxInitializationAttempts(120)
@@ -56,13 +58,15 @@ FairMQDevice::FairMQDevice()
, fLogIntervalInMs(1000)
, fCmdSocket(nullptr)
, fTransportFactory(nullptr)
, fConfig(nullptr)
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
, fCatchingSignals(false)
, fTerminated(false)
, fRunning(false)
, fTerminationRequested(false)
, fInteractiveRunning(false)
, fDataCallbacks(false)
, fMsgInputs()
, fMultipartInputs()
{
}
@@ -81,8 +85,10 @@ void FairMQDevice::SignalHandler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
if (!fTerminated)
if (!fTerminationRequested)
{
fTerminationRequested = true;
ChangeState(STOP);
ChangeState(RESET_TASK);
@@ -94,15 +100,14 @@ void FairMQDevice::SignalHandler(int signal)
ChangeState(END);
// exit(EXIT_FAILURE);
fRunning = false;
fTerminated = true;
fInteractiveRunning = false;
LOG(INFO) << "Exiting.";
}
else
{
LOG(WARN) << "Repeated termination or bad initialization? Aborting.";
// std::abort();
exit(EXIT_FAILURE);
std::abort();
// exit(EXIT_FAILURE);
}
}
@@ -224,13 +229,12 @@ void FairMQDevice::InitWrapper()
// go over the list of channels until all are initialized (and removed from the uninitialized list)
int numAttempts = 0;
int maxAttempts = fMaxInitializationAttempts;
while (!uninitializedConnectingChannels.empty())
{
ConnectChannels(uninitializedConnectingChannels);
if (++numAttempts > maxAttempts)
if (++numAttempts > fMaxInitializationAttempts)
{
LOG(ERROR) << "could not connect all channels after " << maxAttempts << " attempts";
LOG(ERROR) << "could not connect all channels after " << fMaxInitializationAttempts << " attempts";
// TODO: goto ERROR state;
exit(EXIT_FAILURE);
}
@@ -317,58 +321,6 @@ bool FairMQDevice::ConnectChannel(FairMQChannel& ch)
return true;
}
// bool FairMQDevice::InitChannel(FairMQChannel& ch)
// {
// LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")";
// // initialize the socket
// ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId);
// // set high water marks
// ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
// ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
// if (ch.fMethod == "bind")
// {
// // number of attempts when choosing a random port
// int maxAttempts = 1000;
// int numAttempts = 0;
// // initialize random generator
// boost::random::mt19937 gen(getpid());
// boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax);
// LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
// // try to bind to the saved port. In case of failure, try random one.
// if (!ch.fSocket->Bind(ch.fAddress))
// {
// LOG(DEBUG) << "Could not bind to configured port, trying random port in range " << fPortRangeMin << "-" << fPortRangeMax;
// do {
// ++numAttempts;
// if (numAttempts > maxAttempts)
// {
// LOG(ERROR) << "could not bind to any port in the given range after " << maxAttempts << " attempts";
// return false;
// }
// size_t pos = ch.fAddress.rfind(":");
// stringstream newPort;
// newPort << static_cast<int>(randomPort(gen));
// ch.fAddress = ch.fAddress.substr(0, pos + 1) + newPort.str();
// LOG(DEBUG) << "Binding channel " << ch.fChannelName << " on " << ch.fAddress;
// } while (!ch.fSocket->Bind(ch.fAddress));
// }
// }
// else
// {
// LOG(DEBUG) << "Connecting channel " << ch.fChannelName << " to " << ch.fAddress;
// ch.fSocket->Connect(ch.fAddress);
// }
// return true;
// }
void FairMQDevice::InitTaskWrapper()
{
InitTask();
@@ -434,15 +386,119 @@ void FairMQDevice::PrintChannel(const string& name)
}
}
void FairMQDevice::OnData(const string& channelName, InputMsgCallback callback)
{
fDataCallbacks = true;
fMsgInputs.insert(make_pair(channelName, callback));
}
void FairMQDevice::OnData(const string& channelName, InputMultipartCallback callback)
{
fDataCallbacks = true;
fMultipartInputs.insert(make_pair(channelName, callback));
}
void FairMQDevice::RunWrapper()
{
LOG(INFO) << "DEVICE: Running...";
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
FairMQChannel::fInterrupted = false;
try
{
Run();
PreRun();
if (fDataCallbacks)
{
bool exitingRunningCallback = false;
vector<string> inputChannelKeys;
for (const auto& i: fMsgInputs)
{
inputChannelKeys.push_back(i.first);
}
for (const auto& i: fMultipartInputs)
{
inputChannelKeys.push_back(i.first);
}
unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, inputChannelKeys));
while (CheckCurrentState(RUNNING) && !exitingRunningCallback)
{
poller->Poll(200);
for (const auto& mi : fMsgInputs)
{
for (unsigned int i = 0; i < fChannels.at(mi.first).size(); ++i)
{
if (poller->CheckInput(mi.first, i))
{
unique_ptr<FairMQMessage> msg(NewMessage());
if (Receive(msg, mi.first, i) >= 0)
{
if (mi.second(msg, i) == false)
{
exitingRunningCallback = true;
break;
}
}
else
{
exitingRunningCallback = true;
break;
}
}
}
if (exitingRunningCallback)
{
break;
}
}
for (const auto& mi : fMultipartInputs)
{
for (unsigned int i = 0; i < fChannels.at(mi.first).size(); ++i)
{
if (poller->CheckInput(mi.first, i))
{
FairMQParts parts;
if (Receive(parts, mi.first, i) >= 0)
{
if (mi.second(parts, i) == false)
{
exitingRunningCallback = true;
break;
}
}
else
{
exitingRunningCallback = true;
break;
}
}
}
if (exitingRunningCallback)
{
break;
}
}
}
}
else
{
while (CheckCurrentState(RUNNING) && ConditionalRun())
{
}
Run();
}
PostRun();
}
catch (const out_of_range& oor)
{
@@ -477,6 +533,19 @@ void FairMQDevice::Run()
{
}
void FairMQDevice::PreRun()
{
}
bool FairMQDevice::ConditionalRun()
{
return false;
}
void FairMQDevice::PostRun()
{
}
void FairMQDevice::Pause()
{
while (true)
@@ -656,18 +725,22 @@ void FairMQDevice::LogSocketRates()
int numFilteredSockets = 0;
vector<FairMQSocket*> filteredSockets;
vector<string> filteredChannelNames;
vector<int> logIntervals;
vector<int> intervalCounters;
// iterate over the channels map
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (const auto& mi : fChannels)
{
// iterate over the channels vector
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto vi = (mi.second).begin(); vi != (mi.second).end(); ++vi)
{
if (vi->fRateLogging == 1)
if (vi->fRateLogging > 0)
{
filteredSockets.push_back(vi->fSocket);
logIntervals.push_back(vi->fRateLogging);
intervalCounters.push_back(0);
stringstream ss;
ss << mi->first << "[" << vi - (mi->second).begin() << "]";
ss << mi.first << "[" << vi - (mi.second).begin() << "]";
filteredChannelNames.push_back(ss.str());
++numFilteredSockets;
}
@@ -690,12 +763,12 @@ void FairMQDevice::LogSocketRates()
vector<double> msgPerSecOut(numFilteredSockets);
int i = 0;
for (auto itr = filteredSockets.begin(); itr != filteredSockets.end(); ++itr)
for (const auto& vi : filteredSockets)
{
bytesIn.at(i) = (*itr)->GetBytesRx();
bytesOut.at(i) = (*itr)->GetBytesTx();
msgIn.at(i) = (*itr)->GetMessagesRx();
msgOut.at(i) = (*itr)->GetMessagesTx();
bytesIn.at(i) = vi->GetBytesRx();
bytesOut.at(i) = vi->GetBytesTx();
msgIn.at(i) = vi->GetMessagesRx();
msgOut.at(i) = vi->GetMessagesTx();
++i;
}
@@ -711,33 +784,40 @@ void FairMQDevice::LogSocketRates()
i = 0;
for (auto itr = filteredSockets.begin(); itr != filteredSockets.end(); itr++)
for (const auto& vi : filteredSockets)
{
bytesInNew.at(i) = (*itr)->GetBytesRx();
mbPerSecIn.at(i) = (static_cast<double>(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
bytesIn.at(i) = bytesInNew.at(i);
intervalCounters.at(i)++;
msgInNew.at(i) = (*itr)->GetMessagesRx();
msgPerSecIn.at(i) = static_cast<double>(msgInNew.at(i) - msgIn.at(i)) / static_cast<double>(msSinceLastLog) * 1000.;
msgIn.at(i) = msgInNew.at(i);
if (intervalCounters.at(i) == logIntervals.at(i))
{
intervalCounters.at(i) = 0;
bytesOutNew.at(i) = (*itr)->GetBytesTx();
mbPerSecOut.at(i) = (static_cast<double>(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
bytesOut.at(i) = bytesOutNew.at(i);
bytesInNew.at(i) = vi->GetBytesRx();
mbPerSecIn.at(i) = (static_cast<double>(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
bytesIn.at(i) = bytesInNew.at(i);
msgOutNew.at(i) = (*itr)->GetMessagesTx();
msgPerSecOut.at(i) = static_cast<double>(msgOutNew.at(i) - msgOut.at(i)) / static_cast<double>(msSinceLastLog) * 1000.;
msgOut.at(i) = msgOutNew.at(i);
msgInNew.at(i) = vi->GetMessagesRx();
msgPerSecIn.at(i) = static_cast<double>(msgInNew.at(i) - msgIn.at(i)) / static_cast<double>(msSinceLastLog) * 1000.;
msgIn.at(i) = msgInNew.at(i);
LOG(DEBUG) << filteredChannelNames.at(i) << ": "
<< "in: " << msgPerSecIn.at(i) << " msg (" << mbPerSecIn.at(i) << " MB), "
<< "out: " << msgPerSecOut.at(i) << " msg (" << mbPerSecOut.at(i) << " MB)";
bytesOutNew.at(i) = vi->GetBytesTx();
mbPerSecOut.at(i) = (static_cast<double>(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / static_cast<double>(msSinceLastLog) * 1000.;
bytesOut.at(i) = bytesOutNew.at(i);
msgOutNew.at(i) = vi->GetMessagesTx();
msgPerSecOut.at(i) = static_cast<double>(msgOutNew.at(i) - msgOut.at(i)) / static_cast<double>(msSinceLastLog) * 1000.;
msgOut.at(i) = msgOutNew.at(i);
LOG(DEBUG) << filteredChannelNames.at(i) << ": "
<< "in: " << msgPerSecIn.at(i) << " msg (" << mbPerSecIn.at(i) << " MB), "
<< "out: " << msgPerSecOut.at(i) << " msg (" << mbPerSecOut.at(i) << " MB)";
}
++i;
}
t0 = t1;
boost::this_thread::sleep(boost::posix_time::milliseconds(fLogIntervalInMs));
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
catch (boost::thread_interrupted&)
{
@@ -751,7 +831,7 @@ void FairMQDevice::LogSocketRates()
void FairMQDevice::InteractiveStateLoop()
{
fRunning = true;
fInteractiveRunning = true;
char c; // hold the user console input
pollfd cinfd[1];
cinfd[0].fd = fileno(stdin);
@@ -764,11 +844,11 @@ void FairMQDevice::InteractiveStateLoop()
PrintInteractiveStateLoopHelp();
while (fRunning)
while (fInteractiveRunning)
{
if (poll(cinfd, 1, 500))
{
if (!fRunning)
if (!fInteractiveRunning)
{
break;
}
@@ -815,11 +895,23 @@ void FairMQDevice::InteractiveStateLoop()
// break;
case 'q':
LOG(INFO) << "[q] end";
ChangeState(STOP);
ChangeState(RESET_TASK);
WaitForEndOfState(RESET_TASK);
ChangeState(RESET_DEVICE);
WaitForEndOfState(RESET_DEVICE);
ChangeState(END);
if (CheckCurrentState("EXITING"))
{
fRunning = false;
fInteractiveRunning = false;
}
LOG(INFO) << "Exiting.";
break;
default:
LOG(INFO) << "Invalid input: [" << c << "]";
@@ -836,6 +928,7 @@ void FairMQDevice::InteractiveStateLoop()
void FairMQDevice::Unblock()
{
FairMQChannel::fInterrupted = true;
FairMQMessage* cmd = fTransportFactory->CreateMessage();
fCmdSocket->Send(cmd, 0);
delete cmd;
@@ -872,21 +965,21 @@ void FairMQDevice::ResetWrapper()
void FairMQDevice::Reset()
{
// iterate over the channels map
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (auto& mi : fChannels)
{
// iterate over the channels vector
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto& vi : mi.second)
{
vi->fSocket->Close();
delete vi->fSocket;
vi->fSocket = nullptr;
vi.fSocket->Close();
delete vi.fSocket;
vi.fSocket = nullptr;
delete vi->fPoller;
vi->fPoller = nullptr;
delete vi.fPoller;
vi.fPoller = nullptr;
vi->fCmdSocket->Close();
delete vi->fCmdSocket;
vi->fCmdSocket = nullptr;
vi.fCmdSocket->Close();
delete vi.fCmdSocket;
vi.fCmdSocket = nullptr;
}
}
}
@@ -905,18 +998,18 @@ void FairMQDevice::Shutdown()
LOG(DEBUG) << "Closing sockets...";
// iterate over the channels map
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (const auto& mi : fChannels)
{
// iterate over the channels vector
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (const auto& vi : mi.second)
{
if (vi->fSocket)
if (vi.fSocket)
{
vi->fSocket->Close();
vi.fSocket->Close();
}
if (vi->fCmdSocket)
if (vi.fCmdSocket)
{
vi->fCmdSocket->Close();
vi.fCmdSocket->Close();
}
}
}
@@ -932,20 +1025,20 @@ void FairMQDevice::Shutdown()
FairMQDevice::~FairMQDevice()
{
// iterate over the channels map
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (auto& mi : fChannels)
{
// iterate over the channels vector
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto& vi : mi.second)
{
if (vi->fSocket)
if (vi.fSocket)
{
delete vi->fSocket;
vi->fSocket = nullptr;
delete vi.fSocket;
vi.fSocket = nullptr;
}
if (vi->fPoller)
if (vi.fPoller)
{
delete vi->fPoller;
vi->fPoller = nullptr;
delete vi.fPoller;
vi.fPoller = nullptr;
}
}
}