fix CID 10112

fix CIDs 10922, 10921

fix CIDs 10369, 10371, 10372, 10373, 10374

fix CID 10406

fix CID 10414

fix CIDs 10850-52, 10857, 10859-60, 10863, 10869-76

fix CIDs 10014 10080
This commit is contained in:
Alexey Rybalchenko 2014-07-04 10:36:11 +02:00
parent 699671a0f1
commit d14d96ea70
5 changed files with 45 additions and 31 deletions

View File

@ -20,11 +20,10 @@
FairMQDevice::FairMQDevice() FairMQDevice::FairMQDevice()
: fNumIoThreads(1) : fNumIoThreads(1)
, , fPayloadInputs(new vector<FairMQSocket*>())
// fPayloadContext(NULL),
fPayloadInputs(new vector<FairMQSocket*>())
, fPayloadOutputs(new vector<FairMQSocket*>()) , fPayloadOutputs(new vector<FairMQSocket*>())
, fLogIntervalInMs(1000) , fLogIntervalInMs(1000)
, fTransportFactory(NULL)
{ {
} }

View File

@ -21,8 +21,6 @@
#include <iomanip> #include <iomanip>
#include <ctime> #include <ctime>
#include "FairMQLogger.h"
using std::ostringstream; using std::ostringstream;
class FairMQLogger class FairMQLogger

View File

@ -34,10 +34,16 @@ void FairMQProxy::Run()
FairMQMessage* msg = fTransportFactory->CreateMessage(); FairMQMessage* msg = fTransportFactory->CreateMessage();
size_t bytes_received = 0;
while (fState == RUNNING) while (fState == RUNNING)
{ {
fPayloadInputs->at(0)->Receive(msg); bytes_received = fPayloadInputs->at(0)->Receive(msg);
fPayloadOutputs->at(0)->Send(msg); if (bytes_received)
{
fPayloadOutputs->at(0)->Send(msg);
bytes_received = 0;
}
} }
delete msg; delete msg;

View File

@ -28,11 +28,13 @@ void FairMQSink::Run()
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
size_t bytes_received = 0;
while (fState == RUNNING) while (fState == RUNNING)
{ {
FairMQMessage* msg = fTransportFactory->CreateMessage(); FairMQMessage* msg = fTransportFactory->CreateMessage();
fPayloadInputs->at(0)->Receive(msg); bytes_received = fPayloadInputs->at(0)->Receive(msg);
delete msg; delete msg;
} }

View File

@ -27,28 +27,37 @@ FairMQStateMachine::~FairMQStateMachine()
void FairMQStateMachine::ChangeState(int event) void FairMQStateMachine::ChangeState(int event)
{ {
switch (event) try
{ {
case INIT: switch (event)
process_event(FairMQFSM::INIT()); {
return; case INIT:
case SETOUTPUT: process_event(FairMQFSM::INIT());
process_event(FairMQFSM::SETOUTPUT()); return;
return; case SETOUTPUT:
case SETINPUT: process_event(FairMQFSM::SETOUTPUT());
process_event(FairMQFSM::SETINPUT()); return;
return; case SETINPUT:
case RUN: process_event(FairMQFSM::SETINPUT());
process_event(FairMQFSM::RUN()); return;
return; case RUN:
case PAUSE: process_event(FairMQFSM::RUN());
process_event(FairMQFSM::PAUSE()); return;
return; case PAUSE:
case STOP: process_event(FairMQFSM::PAUSE());
process_event(FairMQFSM::STOP()); return;
return; case STOP:
case END: process_event(FairMQFSM::STOP());
process_event(FairMQFSM::END()); return;
return; case END:
process_event(FairMQFSM::END());
return;
}
}
catch (boost::bad_function_call& e)
{
LOG(ERROR) << e.what();
}
} }
}