Several FairMQ fixes and improvements:

- FairMQ: add possibility to poll on multiple channels.
- FairMQ: include command channel when polling on blocking calls (for unblocking without termination).
- FairMQ: move signal handler inside of FairMQDevice class (call FairMQDevice::CatchSignals() in the main function).
- FairMQ: add 'bool CheckCurrentState(statename)' (instead of 'GetCurrentState() == statename' that cannot be thread safe).
- FairMQDevice: add 'InteractiveStateLoop()' method that can be used to change states from the command line.
- FairMQDevice: add automatic transition to IDLE state if Run() exits without an external event.
- FairMQDevice: implement device reset.
- FairMQDevice: use unordered_map for device channels.
- FairMQChannel: improve address validation for channels.
- FairMQChannel: add ExpectsAnotherPart() method to check if another msg part is expected (old approach still works).
- FairMQ: remove invalid transition from the run files.
- FairMQFileSink: disable ROOT termination signal handler.
- Tutorial3: spawn xterm windows from start scripts without overlapping for better visibility.
- FairMQ Examples: update protobuf test and move its files to a common directory.
- FairMQStateMachine: improve feedback on invalid transitions (more readable).
This commit is contained in:
Alexey Rybalchenko
2015-07-03 22:57:36 +02:00
committed by Mohammad Al-Turany
parent d1bba61939
commit 1302e77a16
65 changed files with 1250 additions and 2234 deletions

View File

@@ -14,6 +14,9 @@
#include <list>
#include <algorithm> // for std::sort()
#include <csignal> // for catching system signals
#include <termios.h> // for the InteractiveStateLoop
#include <boost/thread.hpp>
#include <boost/random/mersenne_twister.hpp> // for choosing random port in range
@@ -25,6 +28,13 @@
using namespace std;
// boost::function and a wrapper to catch the signals
boost::function<void(int)> sigHandler;
static void CallSignalHandler(int signal)
{
sigHandler(signal);
}
FairMQDevice::FairMQDevice()
: fChannels()
, fId()
@@ -33,26 +43,60 @@ FairMQDevice::FairMQDevice()
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fLogIntervalInMs(1000)
, fCommandSocket()
, fTransportFactory(NULL)
, fCmdSocket(nullptr)
, fTransportFactory(nullptr)
, fInitialValidationFinished(false)
, fInitialValidationCondition()
, fInitialValidationMutex()
, fCatchingSignals(false)
{
}
void FairMQDevice::CatchSignals()
{
if (!fCatchingSignals)
{
// setup signal catching
sigHandler = std::bind1st(std::mem_fun(&FairMQDevice::SignalHandler), this);
struct sigaction action;
action.sa_handler = CallSignalHandler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
fCatchingSignals = true;
}
}
void FairMQDevice::SignalHandler(int signal)
{
LOG(INFO) << "Caught signal " << signal;
ChangeState(STOP);
ChangeState(RESET_TASK);
WaitForEndOfState(RESET_TASK);
ChangeState(RESET_DEVICE);
WaitForEndOfState(RESET_DEVICE);
ChangeState(END);
}
void FairMQDevice::InitWrapper()
{
LOG(INFO) << "DEVICE: Initializing...";
fCommandSocket = fTransportFactory->CreateSocket("pair", "device-commands", 1);
fCommandSocket->Bind("inproc://commands");
if (!fCmdSocket)
{
fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", 1);
fCmdSocket->Bind("inproc://commands");
}
// List to store the uninitialized channels.
list<FairMQChannel*> uninitializedChannels;
for (map< string,vector<FairMQChannel> >::iterator mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
{
for (vector<FairMQChannel>::iterator vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
{
// set channel name: name + vector index
stringstream ss;
@@ -66,8 +110,9 @@ void FairMQDevice::InitWrapper()
// go over the list of channels until all are initialized (and removed from the uninitialized list)
int numAttempts = 0;
int maxAttempts = fMaxInitializationTime;
do {
list<FairMQChannel*>::iterator itr = uninitializedChannels.begin();
do
{
auto itr = uninitializedChannels.begin();
while (itr != uninitializedChannels.end())
{
@@ -75,6 +120,7 @@ void FairMQDevice::InitWrapper()
{
if (InitChannel(*(*itr)))
{
(*itr)->InitCommandInterface(fTransportFactory);
uninitializedChannels.erase(itr++);
}
else
@@ -89,7 +135,7 @@ void FairMQDevice::InitWrapper()
}
}
// notify parent thread about end of processing.
// notify parent thread about completion of first validation.
boost::lock_guard<boost::mutex> lock(fInitialValidationMutex);
fInitialValidationFinished = true;
fInitialValidationCondition.notify_one();
@@ -106,7 +152,6 @@ void FairMQDevice::InitWrapper()
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
} while (!uninitializedChannels.empty());
Init();
@@ -134,7 +179,7 @@ void FairMQDevice::Init()
bool FairMQDevice::InitChannel(FairMQChannel& ch)
{
LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " to <" << ch.fType << "> data";
LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")";
// initialize the socket
ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, 1);
// set high water marks
@@ -211,15 +256,15 @@ void FairMQDevice::SortChannel(const string& name, const bool reindex)
{
if (fChannels.find(name) != fChannels.end())
{
sort(fChannels[name].begin(), fChannels[name].end(), SortSocketsByAddress);
sort(fChannels.at(name).begin(), fChannels.at(name).end(), SortSocketsByAddress);
if (reindex)
{
for (vector<FairMQChannel>::iterator vi = fChannels[name].begin(); vi != fChannels[name].end(); ++vi)
for (auto vi = fChannels.at(name).begin(); vi != fChannels.at(name).end(); ++vi)
{
// set channel name: name + vector index
stringstream ss;
ss << name << "[" << vi - fChannels[name].begin() << "]";
ss << name << "[" << vi - fChannels.at(name).begin() << "]";
vi->fChannelName = ss.str();
}
}
@@ -234,7 +279,7 @@ void FairMQDevice::PrintChannel(const string& name)
{
if (fChannels.find(name) != fChannels.end())
{
for (vector<FairMQChannel>::iterator vi = fChannels[name].begin(); vi != fChannels[name].end(); ++vi)
for (auto vi = fChannels[name].begin(); vi != fChannels[name].end(); ++vi)
{
LOG(INFO) << vi->fChannelName << ": "
<< vi->fType << " | "
@@ -269,6 +314,11 @@ void FairMQDevice::RunWrapper()
LOG(ERROR) << e.what();
}
if (CheckCurrentState(RUNNING))
{
ChangeState(internal_READY);
}
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fStateMutex);
fStateFinished = true;
@@ -281,7 +331,7 @@ void FairMQDevice::Run()
void FairMQDevice::Pause()
{
while (GetCurrentState() == PAUSED)
while (CheckCurrentState(PAUSED))
{
try
{
@@ -296,38 +346,6 @@ void FairMQDevice::Pause()
}
}
void FairMQDevice::ResetTaskWrapper()
{
ResetTask();
ChangeState(internal_DEVICE_READY);
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fStateMutex);
fStateFinished = true;
fStateCondition.notify_one();
}
void FairMQDevice::ResetTask()
{
}
void FairMQDevice::ResetWrapper()
{
Reset();
ChangeState(internal_IDLE);
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fStateMutex);
fStateFinished = true;
fStateCondition.notify_one();
}
void FairMQDevice::Reset()
{
}
// Method for setting properties represented as a string.
void FairMQDevice::SetProperty(const int key, const string& value)
{
@@ -417,10 +435,10 @@ void FairMQDevice::LogSocketRates()
vector<string> filteredChannelNames;
// iterate over the channels map
for (map< string,vector<FairMQChannel> >::iterator mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
{
// iterate over the channels vector
for (vector<FairMQChannel>::iterator vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
{
if (vi->fRateLogging == 1)
{
@@ -449,7 +467,7 @@ void FairMQDevice::LogSocketRates()
vector<double> msgPerSecOut(numFilteredSockets);
int i = 0;
for (vector<FairMQSocket*>::iterator itr = filteredSockets.begin(); itr != filteredSockets.end(); ++itr)
for (auto itr = filteredSockets.begin(); itr != filteredSockets.end(); ++itr)
{
bytesIn.at(i) = (*itr)->GetBytesRx();
bytesOut.at(i) = (*itr)->GetBytesTx();
@@ -460,7 +478,7 @@ void FairMQDevice::LogSocketRates()
t0 = get_timestamp();
while (GetCurrentState() == RUNNING)
while (CheckCurrentState(RUNNING))
{
try
{
@@ -470,7 +488,7 @@ void FairMQDevice::LogSocketRates()
i = 0;
for (vector<FairMQSocket*>::iterator itr = filteredSockets.begin(); itr != filteredSockets.end(); itr++)
for (auto itr = filteredSockets.begin(); itr != filteredSockets.end(); itr++)
{
bytesInNew.at(i) = (*itr)->GetBytesRx();
mbPerSecIn.at(i) = ((double)(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.;
@@ -508,11 +526,148 @@ void FairMQDevice::LogSocketRates()
// LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping";
}
void FairMQDevice::InteractiveStateLoop()
{
char c; // hold the user console input
bool running = true;
struct termios t;
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag &= ~ICANON; // disable canonical input
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
PrintInteractiveStateLoopHelp();
while (running && cin >> c)
{
switch (c)
{
case 'i':
LOG(INFO) << "[i] init device";
ChangeState("INIT_DEVICE");
break;
case 'j':
LOG(INFO) << "[j] init task";
ChangeState("INIT_TASK");
break;
case 'p':
LOG(INFO) << "[p] pause";
ChangeState("PAUSE");
break;
case 'r':
LOG(INFO) << "[r] run";
ChangeState("RUN");
break;
case 's':
LOG(INFO) << "[s] stop";
ChangeState("STOP");
break;
case 't':
LOG(INFO) << "[t] reset task";
ChangeState("RESET_TASK");
break;
case 'd':
LOG(INFO) << "[d] reset device";
ChangeState("RESET_DEVICE");
break;
case 'h':
LOG(INFO) << "[h] help";
PrintInteractiveStateLoopHelp();
break;
case 'q':
LOG(INFO) << "[q] end";
ChangeState("END");
if (CheckCurrentState("EXITING"))
{
running = false;
}
break;
default:
LOG(INFO) << "Invalid input: [" << c << "]";
PrintInteractiveStateLoopHelp();
break;
}
}
tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure
t.c_lflag |= ICANON; // re-enable canonical input
tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings
}
inline void FairMQDevice::PrintInteractiveStateLoopHelp()
{
LOG(INFO) << "Use keys to control the state machine:";
LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device";
}
void FairMQDevice::SendCommand(const string& command)
{
FairMQMessage* cmd = fTransportFactory->CreateMessage(command.size());
memcpy(cmd->GetData(), command.c_str(), command.size());
fCommandSocket->Send(cmd, 0);
fCmdSocket->Send(cmd, 0);
}
void FairMQDevice::ResetTaskWrapper()
{
ResetTask();
ChangeState(internal_DEVICE_READY);
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fStateMutex);
fStateFinished = true;
fStateCondition.notify_one();
}
void FairMQDevice::ResetTask()
{
}
void FairMQDevice::ResetWrapper()
{
Reset();
ChangeState(internal_IDLE);
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fStateMutex);
fStateFinished = true;
fStateCondition.notify_one();
}
void FairMQDevice::Reset()
{
LOG(DEBUG) << "Resetting Device...";
// iterate over the channels map
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
{
// iterate over the channels vector
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
{
vi->fSocket->Close();
delete vi->fSocket;
vi->fSocket = nullptr;
delete vi->fPoller;
vi->fPoller = nullptr;
vi->fCmdSocket->Close();
delete vi->fCmdSocket;
vi->fCmdSocket = nullptr;
}
}
LOG(DEBUG) << "Device reset finished!";
}
void FairMQDevice::Terminate()
{
// Termination signal has to be sent only once to any socket.
if (fCmdSocket)
{
fCmdSocket->Terminate();
}
}
void FairMQDevice::Shutdown()
@@ -520,37 +675,54 @@ void FairMQDevice::Shutdown()
LOG(DEBUG) << "Closing sockets...";
// iterate over the channels map
for (map< string,vector<FairMQChannel> >::iterator mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
{
// iterate over the channels vector
for (vector<FairMQChannel>::iterator vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
{
vi->fSocket->Close();
if (vi->fSocket)
{
vi->fSocket->Close();
}
if (vi->fCmdSocket)
{
vi->fCmdSocket->Close();
}
}
}
fCommandSocket->Close();
if (fCmdSocket)
{
fCmdSocket->Close();
}
LOG(DEBUG) << "Closed all sockets!";
}
void FairMQDevice::Terminate()
{
// Termination signal has to be sent only once to any socket.
fCommandSocket->Terminate();
}
FairMQDevice::~FairMQDevice()
{
// iterate over the channels map
for (map< string,vector<FairMQChannel> >::iterator mi = fChannels.begin(); mi != fChannels.end(); ++mi)
for (auto mi = fChannels.begin(); mi != fChannels.end(); ++mi)
{
// iterate over the channels vector
for (vector<FairMQChannel>::iterator vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
{
delete vi->fSocket;
if (vi->fSocket)
{
delete vi->fSocket;
vi->fSocket = nullptr;
}
if (vi->fPoller)
{
delete vi->fPoller;
vi->fPoller = nullptr;
}
}
}
delete fCommandSocket;
if (fCmdSocket)
{
delete fCmdSocket;
fCmdSocket = nullptr;
}
}