merge the trunk with the development of ZeroMQ branch

git-svn-id: https://subversion.gsi.de/fairroot/fairbase/trunk@22451 0381ead4-6506-0410-b988-94b70fbc4730
This commit is contained in:
Mohammad Al-Turany
2013-10-25 12:42:48 +00:00
parent d65d7e490f
commit 5121fe3ae5
36 changed files with 1291 additions and 727 deletions

View File

@@ -2,10 +2,15 @@
* FairMQSampler.cpp
*
* Created on: Sep 27, 2012
* Author: dklein
* Author: A. Rybalchenko, D. Klein
*/
#include <vector>
#include <unistd.h>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/timer/timer.hpp>
#include "TList.h"
#include "TObjString.h"
@@ -30,8 +35,9 @@ FairMQSampler::FairMQSampler() :
FairMQSampler::~FairMQSampler()
{
delete fSamplerTask;
delete fFairRunAna;
if(fFairRunAna) {
fFairRunAna->TerminateRun();
}
}
void FairMQSampler::Init()
@@ -41,7 +47,9 @@ void FairMQSampler::Init()
fSamplerTask->SetBranch(fBranch);
fFairRunAna->SetInputFile(TString(fInputFile));
fFairRunAna->SetOutputFile("dummy.out");
TString output=fInputFile;
output.Append(".out.root");
fFairRunAna->SetOutputFile(output.Data());
fFairRunAna->AddTask(fSamplerTask);
@@ -51,89 +59,114 @@ void FairMQSampler::Init()
rtdb->setFirstInput(parInput1);
rtdb->print();
// read complete file and extract digis.
fFairRunAna->Init();
fFairRunAna->Run(0, 0);
//fFairRunAna->Run(0, 0);
FairRootManager* ioman = FairRootManager::Instance();
fNumEvents = Int_t((ioman->GetInChain())->GetEntries());
}
void FairMQSampler::Run()
{
void* status; //necessary for pthread_join
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
usleep(1000000);
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
pthread_t logger;
pthread_create(&logger, NULL, &FairMQDevice::callLogSocketRates, this);
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
boost::thread resetEventCounter(boost::bind(&FairMQSampler::ResetEventCounter, this));
//boost::thread commandListener(boost::bind(&FairMQSampler::ListenToCommands, this));
pthread_t resetEventCounter;
pthread_create(&resetEventCounter, NULL, &FairMQSampler::callResetEventCounter, this);
int sentMsgs = 0;
while (true) {
for( std::vector<FairMQMessage*>::iterator itr = fSamplerTask->GetOutput()->begin(); itr != fSamplerTask->GetOutput()->end(); itr++ ) {
FairMQMessage event;
event.Copy(*itr);
boost::timer::auto_cpu_timer timer;
fPayloadOutputs->at(0)->Send(&event);
std::cout << "Number of events to process: " << fNumEvents << std::endl;
--fEventCounter;
Long64_t eventNr = 0;
while (fEventCounter == 0) {
usleep(1000);
}
// while ( fState == RUNNING ) {
for ( /* eventNr */ ; eventNr < fNumEvents; eventNr++ ) {
fFairRunAna->RunMQ(eventNr);
fPayloadOutputs->at(0)->Send(fSamplerTask->GetOutput());
sentMsgs++;
--fEventCounter;
while (fEventCounter == 0) {
boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}
if( fState != RUNNING ) { break; }
}
pthread_join(logger, &status);
pthread_join(resetEventCounter, &status);
boost::this_thread::interruption_point();
// }
boost::timer::cpu_times const elapsed_time(timer.elapsed());
std::cout << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2) << std::endl;
std::cout << "Sent " << sentMsgs << " messages!" << std::endl;
//boost::this_thread::sleep(boost::posix_time::milliseconds(5000));
rateLogger.interrupt();
rateLogger.join();
resetEventCounter.interrupt();
resetEventCounter.join();
//commandListener.interrupt();
//commandListener.join();
}
void* FairMQSampler::ResetEventCounter()
void FairMQSampler::ResetEventCounter()
{
while (true) {
fEventCounter = fEventRate / 100;
usleep(10000);
while ( true ) {
try {
fEventCounter = fEventRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
} catch (boost::thread_interrupted&) {
std::cout << "resetEventCounter interrupted" << std::endl;
break;
}
}
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping resetEventCounter <<<<<<<");
}
void FairMQSampler::Log(Int_t intervalInMs)
void FairMQSampler::ListenToCommands()
{
timestamp_t t0;
timestamp_t t1;
ULong_t bytes = fPayloadOutputs->at(0)->GetBytesTx();
ULong_t messages = fPayloadOutputs->at(0)->GetMessagesTx();
ULong_t bytesNew;
ULong_t messagesNew;
Double_t megabytesPerSecond = (bytesNew - bytes) / (1024 * 1024);
Double_t messagesPerSecond = (messagesNew - messages);
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> ListenToCommands <<<<<<<");
t0 = get_timestamp();
// Initialize poll set
zmq_pollitem_t items[] = {
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 }
};
while (true) {
usleep(intervalInMs * 1000);
Bool_t received = false;
while ( true ) {
try {
FairMQMessage msg;
t1 = get_timestamp();
zmq_poll(items, 1, 100);
bytesNew = fPayloadOutputs->at(0)->GetBytesTx();
messagesNew = fPayloadOutputs->at(0)->GetMessagesTx();
if (items[0].revents & ZMQ_POLLIN) {
received = fPayloadInputs->at(0)->Receive(&msg);
}
timestamp_t timeSinceLastLog_ms = (t1 - t0) / 1000.0L;
if (received) {
//command handling goes here.
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "> received command <");
received = false;
}
megabytesPerSecond = ((Double_t) (bytesNew - bytes) / (1024. * 1024.)) / (Double_t) timeSinceLastLog_ms * 1000.;
messagesPerSecond = (Double_t) (messagesNew - messages) / (Double_t) timeSinceLastLog_ms * 1000.;
std::stringstream logmsg;
logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s";
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
bytes = bytesNew;
messages = messagesNew;
t0 = t1;
boost::this_thread::interruption_point();
} catch (boost::thread_interrupted&) {
std::cout << "commandListener interrupted" << std::endl;
break;
}
}
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, ">>>>>>> stopping commandListener <<<<<<<");
}
void FairMQSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/)
void FairMQSampler::SetProperty(const Int_t& key, const TString& value, const Int_t& slot/*= 0*/)
{
switch (key) {
case InputFile:
@@ -151,7 +184,7 @@ void FairMQSampler::SetProperty(Int_t key, TString value, Int_t slot/*= 0*/)
}
}
TString FairMQSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t slot/*= 0*/)
TString FairMQSampler::GetProperty(const Int_t& key, const TString& default_/*= ""*/, const Int_t& slot/*= 0*/)
{
switch (key) {
case InputFile:
@@ -165,7 +198,7 @@ TString FairMQSampler::GetProperty(Int_t key, TString default_/*= ""*/, Int_t sl
}
}
void FairMQSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/)
void FairMQSampler::SetProperty(const Int_t& key, const Int_t& value, const Int_t& slot/*= 0*/)
{
switch (key) {
case EventRate:
@@ -177,7 +210,7 @@ void FairMQSampler::SetProperty(Int_t key, Int_t value, Int_t slot/*= 0*/)
}
}
Int_t FairMQSampler::GetProperty(Int_t key, Int_t default_/*= 0*/, Int_t slot/*= 0*/)
Int_t FairMQSampler::GetProperty(const Int_t& key, const Int_t& default_/*= 0*/, const Int_t& slot/*= 0*/)
{
switch (key) {
case EventRate: