diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index deb8e7ed..cc00da42 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -20,11 +20,10 @@ FairMQDevice::FairMQDevice() : fNumIoThreads(1) - , - // fPayloadContext(NULL), - fPayloadInputs(new vector()) + , fPayloadInputs(new vector()) , fPayloadOutputs(new vector()) , fLogIntervalInMs(1000) + , fTransportFactory(NULL) { } diff --git a/fairmq/FairMQLogger.h b/fairmq/FairMQLogger.h index 24204eb5..6464b539 100644 --- a/fairmq/FairMQLogger.h +++ b/fairmq/FairMQLogger.h @@ -21,8 +21,6 @@ #include #include -#include "FairMQLogger.h" - using std::ostringstream; class FairMQLogger diff --git a/fairmq/FairMQProxy.cxx b/fairmq/FairMQProxy.cxx index a897fc2a..4e748223 100644 --- a/fairmq/FairMQProxy.cxx +++ b/fairmq/FairMQProxy.cxx @@ -34,10 +34,16 @@ void FairMQProxy::Run() FairMQMessage* msg = fTransportFactory->CreateMessage(); + size_t bytes_received = 0; + while (fState == RUNNING) { - fPayloadInputs->at(0)->Receive(msg); - fPayloadOutputs->at(0)->Send(msg); + bytes_received = fPayloadInputs->at(0)->Receive(msg); + if (bytes_received) + { + fPayloadOutputs->at(0)->Send(msg); + bytes_received = 0; + } } delete msg; diff --git a/fairmq/FairMQSink.cxx b/fairmq/FairMQSink.cxx index 22b4ad3d..4cd1a082 100644 --- a/fairmq/FairMQSink.cxx +++ b/fairmq/FairMQSink.cxx @@ -28,11 +28,13 @@ void FairMQSink::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + size_t bytes_received = 0; + while (fState == RUNNING) { FairMQMessage* msg = fTransportFactory->CreateMessage(); - fPayloadInputs->at(0)->Receive(msg); + bytes_received = fPayloadInputs->at(0)->Receive(msg); delete msg; } diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 35e3e967..c18b788d 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -27,28 +27,37 @@ FairMQStateMachine::~FairMQStateMachine() void FairMQStateMachine::ChangeState(int event) { - switch (event) + try { - case INIT: - process_event(FairMQFSM::INIT()); - return; - case SETOUTPUT: - process_event(FairMQFSM::SETOUTPUT()); - return; - case SETINPUT: - process_event(FairMQFSM::SETINPUT()); - return; - case RUN: - process_event(FairMQFSM::RUN()); - return; - case PAUSE: - process_event(FairMQFSM::PAUSE()); - return; - case STOP: - process_event(FairMQFSM::STOP()); - return; - case END: - process_event(FairMQFSM::END()); - return; + switch (event) + { + case INIT: + process_event(FairMQFSM::INIT()); + return; + case SETOUTPUT: + process_event(FairMQFSM::SETOUTPUT()); + return; + case SETINPUT: + process_event(FairMQFSM::SETINPUT()); + return; + case RUN: + process_event(FairMQFSM::RUN()); + return; + case PAUSE: + process_event(FairMQFSM::PAUSE()); + return; + case STOP: + process_event(FairMQFSM::STOP()); + return; + case END: + process_event(FairMQFSM::END()); + return; + } + } + catch (boost::bad_function_call& e) + { + LOG(ERROR) << e.what(); + } + + } -}