mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
simplify logger for shorter calls and readability
This commit is contained in:
parent
4b8659a654
commit
ac8bbf154a
|
@ -32,7 +32,7 @@ void FairMQBenchmarkSampler::Init()
|
||||||
|
|
||||||
void FairMQBenchmarkSampler::Run()
|
void FairMQBenchmarkSampler::Run()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
LOG(INFO) << ">>>>>>> Run <<<<<<<";
|
||||||
//boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
//boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
||||||
|
|
||||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||||
|
@ -103,9 +103,7 @@ void FairMQBenchmarkSampler::Log(int intervalInMs)
|
||||||
megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
|
megabytesPerSecond = ((double) (bytesNew - bytes) / (1024. * 1024.)) / (double) timeSinceLastLog_ms * 1000.;
|
||||||
messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.;
|
messagesPerSecond = (double) (messagesNew - messages) / (double) timeSinceLastLog_ms * 1000.;
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(DEBUG) << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s";
|
||||||
logmsg << "send " << messagesPerSecond << " msg/s, " << megabytesPerSecond << " MB/s";
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
|
|
||||||
|
|
||||||
bytes = bytesNew;
|
bytes = bytesNew;
|
||||||
messages = messagesNew;
|
messages = messagesNew;
|
||||||
|
|
|
@ -20,7 +20,7 @@ FairMQBuffer::FairMQBuffer()
|
||||||
|
|
||||||
void FairMQBuffer::Run()
|
void FairMQBuffer::Run()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
LOG(INFO) << ">>>>>>> Run <<<<<<<";
|
||||||
|
|
||||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,8 @@ FairMQDevice::FairMQDevice() :
|
||||||
|
|
||||||
void FairMQDevice::Init()
|
void FairMQDevice::Init()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Init <<<<<<<");
|
LOG(INFO) << ">>>>>>> Init <<<<<<<";
|
||||||
stringstream logmsg;
|
LOG(INFO) << "numIoThreads: " << fNumIoThreads;
|
||||||
logmsg << "numIoThreads: " << fNumIoThreads;
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
// fPayloadContext = new FairMQContextZMQ(fNumIoThreads);
|
// fPayloadContext = new FairMQContextZMQ(fNumIoThreads);
|
||||||
|
|
||||||
|
@ -59,7 +57,7 @@ void FairMQDevice::Init()
|
||||||
|
|
||||||
void FairMQDevice::InitInput()
|
void FairMQDevice::InitInput()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitInput <<<<<<<");
|
LOG(INFO) << ">>>>>>> InitInput <<<<<<<";
|
||||||
|
|
||||||
for (int i = 0; i < fNumInputs; ++i) {
|
for (int i = 0; i < fNumInputs; ++i) {
|
||||||
FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i);
|
FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i);
|
||||||
|
@ -82,7 +80,7 @@ void FairMQDevice::InitInput()
|
||||||
|
|
||||||
void FairMQDevice::InitOutput()
|
void FairMQDevice::InitOutput()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> InitOutput <<<<<<<");
|
LOG(INFO) << ">>>>>>> InitOutput <<<<<<<";
|
||||||
|
|
||||||
for (int i = 0; i < fNumOutputs; ++i) {
|
for (int i = 0; i < fNumOutputs; ++i) {
|
||||||
FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i);
|
FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i);
|
||||||
|
@ -298,9 +296,7 @@ void FairMQDevice::LogSocketRates()
|
||||||
messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog_ms * 1000.;
|
messagesPerSecondInput[i] = (double) (messagesInputNew[i] - messagesInput[i]) / (double) timeSinceLastLog_ms * 1000.;
|
||||||
messagesInput[i] = messagesInputNew[i];
|
messagesInput[i] = messagesInputNew[i];
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s";
|
||||||
logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s";
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
|
|
||||||
|
|
||||||
// Temp stuff for process termination
|
// Temp stuff for process termination
|
||||||
if ( !receivedSomething && messagesPerSecondInput[i] > 0 ) {
|
if ( !receivedSomething && messagesPerSecondInput[i] > 0 ) {
|
||||||
|
@ -326,9 +322,7 @@ void FairMQDevice::LogSocketRates()
|
||||||
messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog_ms * 1000.;
|
messagesPerSecondOutput[i] = (double) (messagesOutputNew[i] - messagesOutput[i]) / (double) timeSinceLastLog_ms * 1000.;
|
||||||
messagesOutput[i] = messagesOutputNew[i];
|
messagesOutput[i] = messagesOutputNew[i];
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s";
|
||||||
logmsg << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] << " MB/s";
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::DEBUG, logmsg.str());
|
|
||||||
|
|
||||||
// Temp stuff for process termination
|
// Temp stuff for process termination
|
||||||
if ( !sentSomething && messagesPerSecondOutput[i] > 0 ) {
|
if ( !sentSomething && messagesPerSecondOutput[i] > 0 ) {
|
||||||
|
@ -377,7 +371,7 @@ void FairMQDevice::LogSocketRates()
|
||||||
delete[] megabytesPerSecondOutput;
|
delete[] megabytesPerSecondOutput;
|
||||||
delete[] messagesPerSecondOutput;
|
delete[] messagesPerSecondOutput;
|
||||||
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> stopping rateLogger <<<<<<<");
|
LOG(INFO) << ">>>>>>> stopping rateLogger <<<<<<<";
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::ListenToCommands()
|
void FairMQDevice::ListenToCommands()
|
||||||
|
@ -386,18 +380,18 @@ void FairMQDevice::ListenToCommands()
|
||||||
|
|
||||||
void FairMQDevice::Shutdown()
|
void FairMQDevice::Shutdown()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing inputs <<<<<<<");
|
LOG(INFO) << ">>>>>>> closing inputs <<<<<<<";
|
||||||
for( vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
|
for( vector<FairMQSocket*>::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++ ) {
|
||||||
(*itr)->Close();
|
(*itr)->Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing outputs <<<<<<<");
|
LOG(INFO) << ">>>>>>> closing outputs <<<<<<<";
|
||||||
for( vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
|
for( vector<FairMQSocket*>::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++ ) {
|
||||||
(*itr)->Close();
|
(*itr)->Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
//FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> closing context <<<<<<<");
|
// LOG(INFO) << ">>>>>>> closing context <<<<<<<";
|
||||||
//fPayloadContext->Close();
|
// fPayloadContext->Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQDevice::~FairMQDevice()
|
FairMQDevice::~FairMQDevice()
|
||||||
|
|
|
@ -5,47 +5,25 @@
|
||||||
* @author D. Klein, A. Rybalchenko
|
* @author D. Klein, A. Rybalchenko
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <iostream>
|
#include <string>
|
||||||
#include <iomanip>
|
#include <stdio.h>
|
||||||
#include <ctime>
|
|
||||||
|
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
using std::cin;
|
using std::string;
|
||||||
using std::cout;
|
using std::cout;
|
||||||
using std::endl;
|
using std::endl;
|
||||||
|
|
||||||
FairMQLogger* FairMQLogger::instance = NULL;
|
FairMQLogger::FairMQLogger()
|
||||||
|
|
||||||
FairMQLogger* FairMQLogger::GetInstance()
|
|
||||||
{
|
|
||||||
if (instance == NULL) {
|
|
||||||
instance = new FairMQLogger();
|
|
||||||
}
|
|
||||||
return instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQLogger* FairMQLogger::InitInstance(const string& bindAddress)
|
|
||||||
{
|
|
||||||
instance = new FairMQLogger(bindAddress);
|
|
||||||
return instance;
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQLogger::FairMQLogger() :
|
|
||||||
fBindAddress("")
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQLogger::FairMQLogger(const string& bindAddress) :
|
|
||||||
fBindAddress(bindAddress)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQLogger::~FairMQLogger()
|
FairMQLogger::~FairMQLogger()
|
||||||
{
|
{
|
||||||
|
cout << os.str() << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQLogger::Log(int type, const string& logmsg)
|
std::ostringstream& FairMQLogger::Log(int type)
|
||||||
{
|
{
|
||||||
timestamp_t tm = get_timestamp();
|
timestamp_t tm = get_timestamp();
|
||||||
timestamp_t ms = tm / 1000.0L;
|
timestamp_t ms = tm / 1000.0L;
|
||||||
|
@ -72,7 +50,9 @@ void FairMQLogger::Log(int type, const string& logmsg)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
cout << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " " << logmsg << endl;
|
os << "[\033[01;36m" << mbstr << fractional_seconds << "\033[0m]" << "[" << type_str << "]" << " ";
|
||||||
|
|
||||||
|
return os;
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp_t get_timestamp ()
|
timestamp_t get_timestamp ()
|
||||||
|
|
|
@ -8,32 +8,34 @@
|
||||||
#ifndef FAIRMQLOGGER_H_
|
#ifndef FAIRMQLOGGER_H_
|
||||||
#define FAIRMQLOGGER_H_
|
#define FAIRMQLOGGER_H_
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
|
#include <iostream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <ctime>
|
||||||
|
|
||||||
using std::string;
|
#include "FairMQLogger.h"
|
||||||
using std::stringstream;
|
|
||||||
|
using std::ostringstream;
|
||||||
|
|
||||||
class FairMQLogger
|
class FairMQLogger
|
||||||
{
|
{
|
||||||
private:
|
|
||||||
static FairMQLogger* instance;
|
|
||||||
string fBindAddress;
|
|
||||||
public:
|
public:
|
||||||
enum {
|
enum {
|
||||||
DEBUG, INFO, ERROR, STATE
|
DEBUG, INFO, ERROR, STATE
|
||||||
};
|
};
|
||||||
FairMQLogger();
|
FairMQLogger();
|
||||||
FairMQLogger(const string& bindAdress);
|
|
||||||
virtual ~FairMQLogger();
|
virtual ~FairMQLogger();
|
||||||
void Log(int type, const string& logmsg);
|
ostringstream& Log(int type);
|
||||||
static FairMQLogger* GetInstance();
|
private:
|
||||||
static FairMQLogger* InitInstance(const string& bindAddress);
|
ostringstream os;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef unsigned long long timestamp_t;
|
typedef unsigned long long timestamp_t;
|
||||||
|
|
||||||
timestamp_t get_timestamp ();
|
timestamp_t get_timestamp ();
|
||||||
|
|
||||||
|
#define LOG(type) \
|
||||||
|
FairMQLogger::FairMQLogger().Log(FairMQLogger::type)
|
||||||
|
|
||||||
#endif /* FAIRMQLOGGER_H_ */
|
#endif /* FAIRMQLOGGER_H_ */
|
||||||
|
|
|
@ -23,7 +23,7 @@ FairMQMerger::~FairMQMerger()
|
||||||
|
|
||||||
void FairMQMerger::Run()
|
void FairMQMerger::Run()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
LOG(INFO) << ">>>>>>> Run <<<<<<<";
|
||||||
|
|
||||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ FairMQProxy::~FairMQProxy()
|
||||||
|
|
||||||
void FairMQProxy::Run()
|
void FairMQProxy::Run()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
LOG(INFO) << ">>>>>>> Run <<<<<<<";
|
||||||
|
|
||||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ FairMQSink::FairMQSink()
|
||||||
|
|
||||||
void FairMQSink::Run()
|
void FairMQSink::Run()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
LOG(INFO) << ">>>>>>> Run <<<<<<<";
|
||||||
|
|
||||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ FairMQSplitter::~FairMQSplitter()
|
||||||
|
|
||||||
void FairMQSplitter::Run()
|
void FairMQSplitter::Run()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, ">>>>>>> Run <<<<<<<");
|
LOG(INFO) << ">>>>>>> Run <<<<<<<";
|
||||||
|
|
||||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||||
|
|
||||||
|
|
|
@ -25,13 +25,13 @@ void FairMQStateMachine::ChangeState(int event)
|
||||||
switch(event) {
|
switch(event) {
|
||||||
|
|
||||||
case INIT:
|
case INIT:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "IDLE --init--> INITIALIZING");
|
LOG(STATE) << "IDLE --init--> INITIALIZING";
|
||||||
fState = INITIALIZING;
|
fState = INITIALIZING;
|
||||||
Init();
|
Init();
|
||||||
return;
|
return;
|
||||||
|
|
||||||
case END:
|
case END:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "IDLE --end--> (o)");
|
LOG(STATE) << "IDLE --end--> (o)";
|
||||||
return;
|
return;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -44,7 +44,7 @@ void FairMQStateMachine::ChangeState(int event)
|
||||||
switch(event) {
|
switch(event) {
|
||||||
|
|
||||||
case SETOUTPUT:
|
case SETOUTPUT:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "INITIALIZING --bind--> SETTINGOUTPUT");
|
LOG(STATE) << "INITIALIZING --bind--> SETTINGOUTPUT";
|
||||||
fState = SETTINGOUTPUT;
|
fState = SETTINGOUTPUT;
|
||||||
InitOutput();
|
InitOutput();
|
||||||
return;
|
return;
|
||||||
|
@ -59,7 +59,7 @@ void FairMQStateMachine::ChangeState(int event)
|
||||||
switch(event) {
|
switch(event) {
|
||||||
|
|
||||||
case SETINPUT:
|
case SETINPUT:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "SETTINGOUTPUT --connect--> SETTINGINPUT");
|
LOG(STATE) << "SETTINGOUTPUT --connect--> SETTINGINPUT";
|
||||||
fState = SETTINGINPUT;
|
fState = SETTINGINPUT;
|
||||||
InitInput();
|
InitInput();
|
||||||
return;
|
return;
|
||||||
|
@ -74,13 +74,13 @@ void FairMQStateMachine::ChangeState(int event)
|
||||||
switch(event) {
|
switch(event) {
|
||||||
|
|
||||||
case PAUSE:
|
case PAUSE:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "SETTINGINPUT --pause--> WAITING");
|
LOG(STATE) << "SETTINGINPUT --pause--> WAITING";
|
||||||
fState = WAITING;
|
fState = WAITING;
|
||||||
Pause();
|
Pause();
|
||||||
return;
|
return;
|
||||||
|
|
||||||
case RUN:
|
case RUN:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "SETTINGINPUT --run--> RUNNING");
|
LOG(STATE) << "SETTINGINPUT --run--> RUNNING";
|
||||||
fState = RUNNING;
|
fState = RUNNING;
|
||||||
running_state = boost::thread(boost::bind(&FairMQStateMachine::Run, this));
|
running_state = boost::thread(boost::bind(&FairMQStateMachine::Run, this));
|
||||||
return;
|
return;
|
||||||
|
@ -95,13 +95,13 @@ void FairMQStateMachine::ChangeState(int event)
|
||||||
switch(event) {
|
switch(event) {
|
||||||
|
|
||||||
case RUN:
|
case RUN:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "WAITING --run--> RUNNING");
|
LOG(STATE) << "WAITING --run--> RUNNING";
|
||||||
fState = RUNNING;
|
fState = RUNNING;
|
||||||
running_state = boost::thread(boost::bind(&FairMQStateMachine::Run, this));
|
running_state = boost::thread(boost::bind(&FairMQStateMachine::Run, this));
|
||||||
return;
|
return;
|
||||||
|
|
||||||
case STOP:
|
case STOP:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "WAITING --stop--> IDLE");
|
LOG(STATE) << "WAITING --stop--> IDLE";
|
||||||
fState = IDLE;
|
fState = IDLE;
|
||||||
Shutdown();
|
Shutdown();
|
||||||
return;
|
return;
|
||||||
|
@ -116,13 +116,13 @@ void FairMQStateMachine::ChangeState(int event)
|
||||||
switch(event) {
|
switch(event) {
|
||||||
|
|
||||||
case PAUSE:
|
case PAUSE:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "RUNNING --pause--> WAITING");
|
LOG(STATE) << "RUNNING --pause--> WAITING";
|
||||||
fState = WAITING;
|
fState = WAITING;
|
||||||
running_state.join();
|
running_state.join();
|
||||||
return;
|
return;
|
||||||
|
|
||||||
case STOP:
|
case STOP:
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::STATE, "RUNNING --stop--> IDLE");
|
LOG(STATE) << "RUNNING --stop--> IDLE";
|
||||||
fState = IDLE;
|
fState = IDLE;
|
||||||
running_state.join();
|
running_state.join();
|
||||||
Shutdown();
|
Shutdown();
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
**D
|
/**
|
||||||
* FairMQMessageNN.cxx
|
* FairMQMessageNN.cxx
|
||||||
*
|
*
|
||||||
* @since 2013-12-05
|
* @since 2013-12-05
|
||||||
|
@ -22,9 +22,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size)
|
||||||
{
|
{
|
||||||
fMessage = nn_allocmsg(size, 0);
|
fMessage = nn_allocmsg(size, 0);
|
||||||
if (!fMessage){
|
if (!fMessage){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
fSize = size;
|
fSize = size;
|
||||||
}
|
}
|
||||||
|
@ -33,9 +31,7 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size)
|
||||||
{
|
{
|
||||||
fMessage = nn_allocmsg(size, 0);
|
fMessage = nn_allocmsg(size, 0);
|
||||||
if (!fMessage){
|
if (!fMessage){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
memcpy (fMessage, data, size);
|
memcpy (fMessage, data, size);
|
||||||
fSize = size;
|
fSize = size;
|
||||||
|
@ -53,9 +49,7 @@ void FairMQMessageNN::Rebuild(size_t size)
|
||||||
Clear();
|
Clear();
|
||||||
fMessage = nn_allocmsg(size, 0);
|
fMessage = nn_allocmsg(size, 0);
|
||||||
if (!fMessage){
|
if (!fMessage){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
fSize = size;
|
fSize = size;
|
||||||
}
|
}
|
||||||
|
@ -65,9 +59,7 @@ void FairMQMessageNN::Rebuild(void* data, size_t size)
|
||||||
Clear();
|
Clear();
|
||||||
fMessage = nn_allocmsg(size, 0);
|
fMessage = nn_allocmsg(size, 0);
|
||||||
if (!fMessage){
|
if (!fMessage){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
memcpy (fMessage, data, size);
|
memcpy (fMessage, data, size);
|
||||||
fSize = size;
|
fSize = size;
|
||||||
|
@ -99,9 +91,7 @@ void FairMQMessageNN::Copy(FairMQMessage* msg)
|
||||||
if (fMessage){
|
if (fMessage){
|
||||||
int rc = nn_freemsg(fMessage);
|
int rc = nn_freemsg(fMessage);
|
||||||
if ( rc < 0 ){
|
if ( rc < 0 ){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed freeing message, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,9 +99,7 @@ void FairMQMessageNN::Copy(FairMQMessage* msg)
|
||||||
|
|
||||||
fMessage = nn_allocmsg(size, 0);
|
fMessage = nn_allocmsg(size, 0);
|
||||||
if (!fMessage){
|
if (!fMessage){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed allocating message, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
std::memcpy (fMessage, msg->GetMessage(), size);
|
std::memcpy (fMessage, msg->GetMessage(), size);
|
||||||
fSize = size;
|
fSize = size;
|
||||||
|
@ -121,9 +109,7 @@ inline void FairMQMessageNN::Clear()
|
||||||
{
|
{
|
||||||
int rc = nn_freemsg(fMessage);
|
int rc = nn_freemsg(fMessage);
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed freeing message, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
} else {
|
} else {
|
||||||
fMessage = NULL;
|
fMessage = NULL;
|
||||||
fSize = 0;
|
fSize = 0;
|
||||||
|
|
|
@ -25,9 +25,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num) :
|
||||||
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
|
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "created socket #" << fId;
|
||||||
logmsg << "created socket #" << fId;
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
string FairMQSocketNN::GetId()
|
string FairMQSocketNN::GetId()
|
||||||
|
@ -37,29 +35,21 @@ string FairMQSocketNN::GetId()
|
||||||
|
|
||||||
void FairMQSocketNN::Bind(const string& address)
|
void FairMQSocketNN::Bind(const string& address)
|
||||||
{
|
{
|
||||||
stringstream logmsg;
|
LOG(INFO) << "bind socket #" << fId << " on " << address;
|
||||||
logmsg << "bind socket #" << fId << " on " << address;
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
int eid = nn_bind(fSocket, address.c_str());
|
int eid = nn_bind(fSocket, address.c_str());
|
||||||
if (eid < 0) {
|
if (eid < 0) {
|
||||||
stringstream logmsg2;
|
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||||
logmsg2 << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQSocketNN::Connect(const string& address)
|
void FairMQSocketNN::Connect(const string& address)
|
||||||
{
|
{
|
||||||
stringstream logmsg;
|
LOG(INFO) << "connect socket #" << fId << " to " << address;
|
||||||
logmsg << "connect socket #" << fId << " to " << address;
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
int eid = nn_connect(fSocket, address.c_str());
|
int eid = nn_connect(fSocket, address.c_str());
|
||||||
if (eid < 0) {
|
if (eid < 0) {
|
||||||
stringstream logmsg2;
|
LOG(ERROR) << "failed connecting socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||||
logmsg2 << "failed connecting socket #" << fId << ", reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,9 +58,7 @@ size_t FairMQSocketNN::Send(FairMQMessage* msg)
|
||||||
void* ptr = msg->GetMessage();
|
void* ptr = msg->GetMessage();
|
||||||
int rc = nn_send(fSocket, &ptr, NN_MSG, 0);
|
int rc = nn_send(fSocket, &ptr, NN_MSG, 0);
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed sending on socket #" << fId << ", reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
} else {
|
} else {
|
||||||
fBytesTx += rc;
|
fBytesTx += rc;
|
||||||
++fMessagesTx;
|
++fMessagesTx;
|
||||||
|
@ -84,9 +72,7 @@ size_t FairMQSocketNN::Receive(FairMQMessage* msg)
|
||||||
void* ptr = NULL;
|
void* ptr = NULL;
|
||||||
int rc = nn_recv(fSocket, &ptr, NN_MSG, 0);
|
int rc = nn_recv(fSocket, &ptr, NN_MSG, 0);
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed receiving on socket #" << fId << ", reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
} else {
|
} else {
|
||||||
fBytesRx += rc;
|
fBytesRx += rc;
|
||||||
++fMessagesRx;
|
++fMessagesRx;
|
||||||
|
@ -115,9 +101,7 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v
|
||||||
{
|
{
|
||||||
int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
|
int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed setting socket option, reason: " << nn_strerror(errno);
|
||||||
logmsg << "failed setting socket option, reason: " << nn_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
FairMQTransportFactoryNN::FairMQTransportFactoryNN()
|
FairMQTransportFactoryNN::FairMQTransportFactoryNN()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "Using nanomsg library");
|
LOG(INFO) << "Using nanonsg library";
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQMessage* FairMQTransportFactoryNN::CreateMessage()
|
FairMQMessage* FairMQTransportFactoryNN::CreateMessage()
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
sampler.SetTransport(transportFactory);
|
sampler.SetTransport(transportFactory);
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
buffer.SetTransport(transportFactory);
|
buffer.SetTransport(transportFactory);
|
||||||
|
|
|
@ -53,9 +53,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
merger.SetTransport(transportFactory);
|
merger.SetTransport(transportFactory);
|
||||||
|
|
|
@ -55,9 +55,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
merger.SetTransport(transportFactory);
|
merger.SetTransport(transportFactory);
|
||||||
|
|
|
@ -54,9 +54,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
splitter.SetTransport(transportFactory);
|
splitter.SetTransport(transportFactory);
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
proxy.SetTransport(transportFactory);
|
proxy.SetTransport(transportFactory);
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
sink.SetTransport(transportFactory);
|
sink.SetTransport(transportFactory);
|
||||||
|
|
|
@ -53,9 +53,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
|
||||||
splitter.SetTransport(transportFactory);
|
splitter.SetTransport(transportFactory);
|
||||||
|
|
|
@ -13,16 +13,12 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
|
||||||
{
|
{
|
||||||
fContext = zmq_ctx_new ();
|
fContext = zmq_ctx_new ();
|
||||||
if (fContext == NULL){
|
if (fContext == NULL){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed creating context, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int rc = zmq_ctx_set (fContext, ZMQ_IO_THREADS, numIoThreads);
|
int rc = zmq_ctx_set (fContext, ZMQ_IO_THREADS, numIoThreads);
|
||||||
if (rc != 0){
|
if (rc != 0){
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed configuring context, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,9 +40,7 @@ void FairMQContextZMQ::Close()
|
||||||
|
|
||||||
int rc = zmq_ctx_destroy (fContext);
|
int rc = zmq_ctx_destroy (fContext);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed closing context, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed closing context, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fContext = NULL;
|
fContext = NULL;
|
||||||
|
|
|
@ -16,9 +16,7 @@ FairMQMessageZMQ::FairMQMessageZMQ() :
|
||||||
{
|
{
|
||||||
int rc = zmq_msg_init (fMessage);
|
int rc = zmq_msg_init (fMessage);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,9 +25,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size) :
|
||||||
{
|
{
|
||||||
int rc = zmq_msg_init_size (fMessage, size);
|
int rc = zmq_msg_init_size (fMessage, size);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,9 +34,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) :
|
||||||
{
|
{
|
||||||
int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
|
int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,9 +43,7 @@ void FairMQMessageZMQ::Rebuild()
|
||||||
CloseMessage();
|
CloseMessage();
|
||||||
int rc = zmq_msg_init (fMessage);
|
int rc = zmq_msg_init (fMessage);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed initializing message, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,9 +52,7 @@ void FairMQMessageZMQ::Rebuild(size_t size)
|
||||||
CloseMessage();
|
CloseMessage();
|
||||||
int rc = zmq_msg_init_size (fMessage, size);
|
int rc = zmq_msg_init_size (fMessage, size);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed initializing message with size, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,9 +61,7 @@ void FairMQMessageZMQ::Rebuild(void* data, size_t size)
|
||||||
CloseMessage();
|
CloseMessage();
|
||||||
int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
|
int rc = zmq_msg_init_data (fMessage, data, size, &CleanUp, NULL);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed initializing message with data, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,9 +89,7 @@ inline void FairMQMessageZMQ::CloseMessage()
|
||||||
{
|
{
|
||||||
int rc = zmq_msg_close (fMessage);
|
int rc = zmq_msg_close (fMessage);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed closing message, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,9 +97,7 @@ void FairMQMessageZMQ::Copy(FairMQMessage* msg)
|
||||||
{
|
{
|
||||||
int rc = zmq_msg_copy (fMessage, (static_cast<FairMQMessageZMQ*>(msg)->fMessage));
|
int rc = zmq_msg_copy (fMessage, (static_cast<FairMQMessageZMQ*>(msg)->fMessage));
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed copying message, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,8 +110,6 @@ FairMQMessageZMQ::~FairMQMessageZMQ()
|
||||||
{
|
{
|
||||||
int rc = zmq_msg_close (fMessage);
|
int rc = zmq_msg_close (fMessage);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed closing message with data, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed closing message with data, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -18,30 +18,25 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num) :
|
||||||
fMessagesTx(0),
|
fMessagesTx(0),
|
||||||
fMessagesRx(0)
|
fMessagesRx(0)
|
||||||
{
|
{
|
||||||
stringstream id; // TODO
|
stringstream id;
|
||||||
id << type << "." << num;
|
id << type << "." << num;
|
||||||
fId = id.str();
|
fId = id.str();
|
||||||
|
|
||||||
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
|
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
|
||||||
|
|
||||||
int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
|
int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type == "sub") {
|
if (type == "sub") {
|
||||||
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
|
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg2;
|
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
|
||||||
logmsg2 << "failed setting socket option, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stringstream logmsg3;
|
LOG(INFO) << "created socket #" << fId;
|
||||||
logmsg3 << "created socket #" << fId;
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg3.str());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
string FairMQSocketZMQ::GetId()
|
string FairMQSocketZMQ::GetId()
|
||||||
|
@ -51,29 +46,21 @@ string FairMQSocketZMQ::GetId()
|
||||||
|
|
||||||
void FairMQSocketZMQ::Bind(const string& address)
|
void FairMQSocketZMQ::Bind(const string& address)
|
||||||
{
|
{
|
||||||
stringstream logmsg;
|
LOG(INFO) << "bind socket #" << fId << " on " << address;
|
||||||
logmsg << "bind socket #" << fId << " on " << address;
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
int rc = zmq_bind (fSocket, address.c_str());
|
int rc = zmq_bind (fSocket, address.c_str());
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg2;
|
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
|
||||||
logmsg2 << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQSocketZMQ::Connect(const string& address)
|
void FairMQSocketZMQ::Connect(const string& address)
|
||||||
{
|
{
|
||||||
stringstream logmsg;
|
LOG(INFO) << "connect socket #" << fId << " on " << address;
|
||||||
logmsg << "connect socket #" << fId << " on " << address;
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
int rc = zmq_connect (fSocket, address.c_str());
|
int rc = zmq_connect (fSocket, address.c_str());
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg2;
|
LOG(ERROR) << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno);
|
||||||
logmsg2 << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg2.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,9 +75,7 @@ size_t FairMQSocketZMQ::Send(FairMQMessage* msg)
|
||||||
if (zmq_errno() == EAGAIN){
|
if (zmq_errno() == EAGAIN){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
return nbytes;
|
return nbytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,9 +90,7 @@ size_t FairMQSocketZMQ::Receive(FairMQMessage* msg)
|
||||||
if (zmq_errno() == EAGAIN){
|
if (zmq_errno() == EAGAIN){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
return nbytes;
|
return nbytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,9 +102,7 @@ void FairMQSocketZMQ::Close()
|
||||||
|
|
||||||
int rc = zmq_close (fSocket);
|
int rc = zmq_close (fSocket);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed closing socket, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed closing socket, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fSocket = NULL;
|
fSocket = NULL;
|
||||||
|
@ -142,9 +123,7 @@ void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t
|
||||||
{
|
{
|
||||||
int rc = zmq_setsockopt(fSocket, GetConstant(option), value, valueSize);
|
int rc = zmq_setsockopt(fSocket, GetConstant(option), value, valueSize);
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
stringstream logmsg;
|
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
|
||||||
logmsg << "failed setting socket option, reason: " << zmq_strerror(errno);
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::ERROR, logmsg.str());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
|
|
||||||
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ()
|
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ()
|
||||||
{
|
{
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, "Using ZeroMQ library");
|
LOG(INFO) << "Using ZeroMQ library";
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()
|
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
sampler.SetTransport(transportFactory);
|
sampler.SetTransport(transportFactory);
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
buffer.SetTransport(transportFactory);
|
buffer.SetTransport(transportFactory);
|
||||||
|
|
|
@ -53,9 +53,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
merger.SetTransport(transportFactory);
|
merger.SetTransport(transportFactory);
|
||||||
|
|
|
@ -55,9 +55,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
merger.SetTransport(transportFactory);
|
merger.SetTransport(transportFactory);
|
||||||
|
|
|
@ -54,9 +54,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
splitter.SetTransport(transportFactory);
|
splitter.SetTransport(transportFactory);
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
proxy.SetTransport(transportFactory);
|
proxy.SetTransport(transportFactory);
|
||||||
|
|
|
@ -52,9 +52,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
sink.SetTransport(transportFactory);
|
sink.SetTransport(transportFactory);
|
||||||
|
|
|
@ -53,9 +53,7 @@ int main(int argc, char** argv)
|
||||||
|
|
||||||
s_catch_signals();
|
s_catch_signals();
|
||||||
|
|
||||||
stringstream logmsg;
|
LOG(INFO) << "PID: " << getpid();
|
||||||
logmsg << "PID: " << getpid();
|
|
||||||
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
|
||||||
|
|
||||||
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
|
||||||
splitter.SetTransport(transportFactory);
|
splitter.SetTransport(transportFactory);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user