diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 28c8ff8a..bb2216de 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -7,6 +7,7 @@ ################################################################################ set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices ${Boost_INCLUDE_DIR} ) @@ -48,12 +49,12 @@ set(SRCS "FairMQMessage.cxx" "FairMQSocket.cxx" "FairMQDevice.cxx" - "FairMQBenchmarkSampler.cxx" - "FairMQSink.cxx" - "FairMQBuffer.cxx" - "FairMQProxy.cxx" - "FairMQSplitter.cxx" - "FairMQMerger.cxx" + "devices/FairMQBenchmarkSampler.cxx" + "devices/FairMQSink.cxx" + "devices/FairMQBuffer.cxx" + "devices/FairMQProxy.cxx" + "devices/FairMQSplitter.cxx" + "devices/FairMQMerger.cxx" "FairMQPoller.cxx" ) @@ -101,7 +102,6 @@ endif(NANOMSG_FOUND) set(DEPENDENCIES ${DEPENDENCIES} - ${CMAKE_THREAD_LIBS_INIT} boost_thread boost_timer boost_system ) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index cc00da42..6fff84b8 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -20,6 +20,8 @@ FairMQDevice::FairMQDevice() : fNumIoThreads(1) + , fNumInputs(0) + , fNumOutputs(0) , fPayloadInputs(new vector()) , fPayloadOutputs(new vector()) , fLogIntervalInMs(1000) @@ -32,32 +34,22 @@ void FairMQDevice::Init() LOG(INFO) << ">>>>>>> Init <<<<<<<"; LOG(INFO) << "numIoThreads: " << fNumIoThreads; - fInputAddress = new vector(fNumInputs); - fInputMethod = new vector(); - fInputSocketType = new vector(); - fInputSndBufSize = new vector(); - fInputRcvBufSize = new vector(); - for (int i = 0; i < fNumInputs; ++i) { - fInputMethod->push_back("connect"); // default value, can be overwritten in configuration - fInputSocketType->push_back("sub"); // default value, can be overwritten in configuration - fInputSndBufSize->push_back(10000); // default value, can be overwritten in configuration - fInputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration + fInputAddress.push_back("ipc://default"); // default value, can be overwritten in configuration + fInputMethod.push_back("connect"); // default value, can be overwritten in configuration + fInputSocketType.push_back("sub"); // default value, can be overwritten in configuration + fInputSndBufSize.push_back(10000); // default value, can be overwritten in configuration + fInputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration } - fOutputAddress = new vector(fNumOutputs); - fOutputMethod = new vector(); - fOutputSocketType = new vector(); - fOutputSndBufSize = new vector(); - fOutputRcvBufSize = new vector(); - for (int i = 0; i < fNumOutputs; ++i) { - fOutputMethod->push_back("bind"); // default value, can be overwritten in configuration - fOutputSocketType->push_back("pub"); // default value, can be overwritten in configuration - fOutputSndBufSize->push_back(10000); // default value, can be overwritten in configuration - fOutputRcvBufSize->push_back(10000); // default value, can be overwritten in configuration + fOutputAddress.push_back("ipc://default"); // default value, can be overwritten in configuration + fOutputMethod.push_back("bind"); // default value, can be overwritten in configuration + fOutputSocketType.push_back("pub"); // default value, can be overwritten in configuration + fOutputSndBufSize.push_back(10000); // default value, can be overwritten in configuration + fOutputRcvBufSize.push_back(10000); // default value, can be overwritten in configuration } } @@ -67,26 +59,27 @@ void FairMQDevice::InitInput() for (int i = 0; i < fNumInputs; ++i) { - FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i, fNumIoThreads); + FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType.at(i), i, fNumIoThreads); - socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i))); - socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i))); + socket->SetOption("snd-hwm", &fInputSndBufSize.at(i), sizeof(fInputSndBufSize.at(i))); + socket->SetOption("rcv-hwm", &fInputRcvBufSize.at(i), sizeof(fInputRcvBufSize.at(i))); fPayloadInputs->push_back(socket); try { - if (fInputMethod->at(i) == "bind") + if (fInputMethod.at(i) == "bind") { - fPayloadInputs->at(i)->Bind(fInputAddress->at(i)); + fPayloadInputs->at(i)->Bind(fInputAddress.at(i)); } else { - fPayloadInputs->at(i)->Connect(fInputAddress->at(i)); + fPayloadInputs->at(i)->Connect(fInputAddress.at(i)); } } catch (std::out_of_range& e) { + LOG(ERROR) << e.what(); } } } @@ -97,26 +90,27 @@ void FairMQDevice::InitOutput() for (int i = 0; i < fNumOutputs; ++i) { - FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i, fNumIoThreads); + FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType.at(i), i, fNumIoThreads); - socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i))); - socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i))); + socket->SetOption("snd-hwm", &fOutputSndBufSize.at(i), sizeof(fOutputSndBufSize.at(i))); + socket->SetOption("rcv-hwm", &fOutputRcvBufSize.at(i), sizeof(fOutputRcvBufSize.at(i))); fPayloadOutputs->push_back(socket); try { - if (fOutputMethod->at(i) == "bind") + if (fOutputMethod.at(i) == "bind") { - fPayloadOutputs->at(i)->Bind(fOutputAddress->at(i)); + fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i)); } else { - fPayloadOutputs->at(i)->Connect(fOutputAddress->at(i)); + fPayloadOutputs->at(i)->Connect(fOutputAddress.at(i)); } } catch (std::out_of_range& e) { + LOG(ERROR) << e.what(); } } } @@ -138,28 +132,28 @@ void FairMQDevice::SetProperty(const int key, const string& value, const int slo fId = value; break; case InputAddress: - fInputAddress->erase(fInputAddress->begin() + slot); - fInputAddress->insert(fInputAddress->begin() + slot, value); + fInputAddress.erase(fInputAddress.begin() + slot); + fInputAddress.insert(fInputAddress.begin() + slot, value); break; case OutputAddress: - fOutputAddress->erase(fOutputAddress->begin() + slot); - fOutputAddress->insert(fOutputAddress->begin() + slot, value); + fOutputAddress.erase(fOutputAddress.begin() + slot); + fOutputAddress.insert(fOutputAddress.begin() + slot, value); break; case InputMethod: - fInputMethod->erase(fInputMethod->begin() + slot); - fInputMethod->insert(fInputMethod->begin() + slot, value); + fInputMethod.erase(fInputMethod.begin() + slot); + fInputMethod.insert(fInputMethod.begin() + slot, value); break; case OutputMethod: - fOutputMethod->erase(fOutputMethod->begin() + slot); - fOutputMethod->insert(fOutputMethod->begin() + slot, value); + fOutputMethod.erase(fOutputMethod.begin() + slot); + fOutputMethod.insert(fOutputMethod.begin() + slot, value); break; case InputSocketType: - fInputSocketType->erase(fInputSocketType->begin() + slot); - fInputSocketType->insert(fInputSocketType->begin() + slot, value); + fInputSocketType.erase(fInputSocketType.begin() + slot); + fInputSocketType.insert(fInputSocketType.begin() + slot, value); break; case OutputSocketType: - fOutputSocketType->erase(fOutputSocketType->begin() + slot); - fOutputSocketType->insert(fOutputSocketType->begin() + slot, value); + fOutputSocketType.erase(fOutputSocketType.begin() + slot); + fOutputSocketType.insert(fOutputSocketType.begin() + slot, value); break; default: FairMQConfigurable::SetProperty(key, value, slot); @@ -185,20 +179,20 @@ void FairMQDevice::SetProperty(const int key, const int value, const int slot /* fLogIntervalInMs = value; break; case InputSndBufSize: - fInputSndBufSize->erase(fInputSndBufSize->begin() + slot); - fInputSndBufSize->insert(fInputSndBufSize->begin() + slot, value); + fInputSndBufSize.erase(fInputSndBufSize.begin() + slot); + fInputSndBufSize.insert(fInputSndBufSize.begin() + slot, value); break; case InputRcvBufSize: - fInputRcvBufSize->erase(fInputRcvBufSize->begin() + slot); - fInputRcvBufSize->insert(fInputRcvBufSize->begin() + slot, value); + fInputRcvBufSize.erase(fInputRcvBufSize.begin() + slot); + fInputRcvBufSize.insert(fInputRcvBufSize.begin() + slot, value); break; case OutputSndBufSize: - fOutputSndBufSize->erase(fOutputSndBufSize->begin() + slot); - fOutputSndBufSize->insert(fOutputSndBufSize->begin() + slot, value); + fOutputSndBufSize.erase(fOutputSndBufSize.begin() + slot); + fOutputSndBufSize.insert(fOutputSndBufSize.begin() + slot, value); break; case OutputRcvBufSize: - fOutputRcvBufSize->erase(fOutputRcvBufSize->begin() + slot); - fOutputRcvBufSize->insert(fOutputRcvBufSize->begin() + slot, value); + fOutputRcvBufSize.erase(fOutputRcvBufSize.begin() + slot); + fOutputRcvBufSize.insert(fOutputRcvBufSize.begin() + slot, value); break; default: FairMQConfigurable::SetProperty(key, value, slot); @@ -214,17 +208,17 @@ string FairMQDevice::GetProperty(const int key, const string& default_ /*= ""*/, case Id: return fId; case InputAddress: - return fInputAddress->at(slot); + return fInputAddress.at(slot); case OutputAddress: - return fOutputAddress->at(slot); + return fOutputAddress.at(slot); case InputMethod: - return fInputMethod->at(slot); + return fInputMethod.at(slot); case OutputMethod: - return fOutputMethod->at(slot); + return fOutputMethod.at(slot); case InputSocketType: - return fInputSocketType->at(slot); + return fInputSocketType.at(slot); case OutputSocketType: - return fOutputSocketType->at(slot); + return fOutputSocketType.at(slot); default: return FairMQConfigurable::GetProperty(key, default_, slot); } @@ -240,13 +234,13 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/, const i case LogIntervalInMs: return fLogIntervalInMs; case InputSndBufSize: - return fInputSndBufSize->at(slot); + return fInputSndBufSize.at(slot); case InputRcvBufSize: - return fInputRcvBufSize->at(slot); + return fInputRcvBufSize.at(slot); case OutputSndBufSize: - return fOutputSndBufSize->at(slot); + return fOutputSndBufSize.at(slot); case OutputRcvBufSize: - return fOutputRcvBufSize->at(slot); + return fOutputRcvBufSize.at(slot); default: return FairMQConfigurable::GetProperty(key, default_, slot); } @@ -264,20 +258,20 @@ void FairMQDevice::LogSocketRates() timestamp_t timeSinceLastLog_ms; - unsigned long* bytesInput = new unsigned long[fNumInputs]; - unsigned long* messagesInput = new unsigned long[fNumInputs]; - unsigned long* bytesOutput = new unsigned long[fNumOutputs]; - unsigned long* messagesOutput = new unsigned long[fNumOutputs]; + vector bytesInput(fNumInputs); + vector messagesInput(fNumInputs); + vector bytesOutput(fNumOutputs); + vector messagesOutput(fNumOutputs); - unsigned long* bytesInputNew = new unsigned long[fNumInputs]; - unsigned long* messagesInputNew = new unsigned long[fNumInputs]; - unsigned long* bytesOutputNew = new unsigned long[fNumOutputs]; - unsigned long* messagesOutputNew = new unsigned long[fNumOutputs]; + vector bytesInputNew(fNumInputs); + vector messagesInputNew(fNumInputs); + vector bytesOutputNew(fNumOutputs); + vector messagesOutputNew(fNumOutputs); - double* megabytesPerSecondInput = new double[fNumInputs]; - double* messagesPerSecondInput = new double[fNumInputs]; - double* megabytesPerSecondOutput = new double[fNumOutputs]; - double* messagesPerSecondOutput = new double[fNumOutputs]; + vector megabytesPerSecondInput(fNumInputs); + vector messagesPerSecondInput(fNumInputs); + vector megabytesPerSecondOutput(fNumOutputs); + vector messagesPerSecondOutput(fNumOutputs); // Temp stuff for process termination // bool receivedSomething = false; @@ -289,16 +283,16 @@ void FairMQDevice::LogSocketRates() int i = 0; for (vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) { - bytesInput[i] = (*itr)->GetBytesRx(); - messagesInput[i] = (*itr)->GetMessagesRx(); + bytesInput.at(i) = (*itr)->GetBytesRx(); + messagesInput.at(i) = (*itr)->GetMessagesRx(); ++i; } i = 0; for (vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) { - bytesOutput[i] = (*itr)->GetBytesTx(); - messagesOutput[i] = (*itr)->GetMessagesTx(); + bytesOutput.at(i) = (*itr)->GetBytesTx(); + messagesOutput.at(i) = (*itr)->GetMessagesTx(); ++i; } @@ -316,20 +310,20 @@ void FairMQDevice::LogSocketRates() for (vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) { - bytesInputNew[i] = (*itr)->GetBytesRx(); - megabytesPerSecondInput[i] = ((double)(bytesInputNew[i] - bytesInput[i]) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; - bytesInput[i] = bytesInputNew[i]; - messagesInputNew[i] = (*itr)->GetMessagesRx(); - messagesPerSecondInput[i] = (double)(messagesInputNew[i] - messagesInput[i]) / (double)timeSinceLastLog_ms * 1000.; - messagesInput[i] = messagesInputNew[i]; + bytesInputNew.at(i) = (*itr)->GetBytesRx(); + megabytesPerSecondInput.at(i) = ((double)(bytesInputNew.at(i) - bytesInput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; + bytesInput.at(i) = bytesInputNew.at(i); + messagesInputNew.at(i) = (*itr)->GetMessagesRx(); + messagesPerSecondInput.at(i) = (double)(messagesInputNew.at(i) - messagesInput.at(i)) / (double)timeSinceLastLog_ms * 1000.; + messagesInput.at(i) = messagesInputNew.at(i); - LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput[i] << " msg/s, " << megabytesPerSecondInput[i] << " MB/s"; + LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput.at(i) << " msg/s, " << megabytesPerSecondInput.at(i) << " MB/s"; // Temp stuff for process termination - // if ( !receivedSomething && messagesPerSecondInput[i] > 0 ) { + // if ( !receivedSomething && messagesPerSecondInput.at(i) > 0 ) { // receivedSomething = true; // } - // if ( receivedSomething && messagesPerSecondInput[i] == 0 ) { + // if ( receivedSomething && messagesPerSecondInput.at(i) == 0 ) { // cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << endl; // } else { // didNotReceiveFor = 0; @@ -343,21 +337,21 @@ void FairMQDevice::LogSocketRates() for (vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) { - bytesOutputNew[i] = (*itr)->GetBytesTx(); - megabytesPerSecondOutput[i] = ((double)(bytesOutputNew[i] - bytesOutput[i]) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; - bytesOutput[i] = bytesOutputNew[i]; - messagesOutputNew[i] = (*itr)->GetMessagesTx(); - messagesPerSecondOutput[i] = (double)(messagesOutputNew[i] - messagesOutput[i]) / (double)timeSinceLastLog_ms * 1000.; - messagesOutput[i] = messagesOutputNew[i]; + bytesOutputNew.at(i) = (*itr)->GetBytesTx(); + megabytesPerSecondOutput.at(i) = ((double)(bytesOutputNew.at(i) - bytesOutput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; + bytesOutput.at(i) = bytesOutputNew.at(i); + messagesOutputNew.at(i) = (*itr)->GetMessagesTx(); + messagesPerSecondOutput.at(i) = (double)(messagesOutputNew.at(i) - messagesOutput.at(i)) / (double)timeSinceLastLog_ms * 1000.; + messagesOutput.at(i) = messagesOutputNew.at(i); - LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput[i] << " msg/s, " << megabytesPerSecondOutput[i] + LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput.at(i) << " msg/s, " << megabytesPerSecondOutput.at(i) << " MB/s"; // Temp stuff for process termination - // if ( !sentSomething && messagesPerSecondOutput[i] > 0 ) { + // if ( !sentSomething && messagesPerSecondOutput.at(i) > 0 ) { // sentSomething = true; // } - // if ( sentSomething && messagesPerSecondOutput[i] == 0 ) { + // if ( sentSomething && messagesPerSecondOutput.at(i) == 0 ) { // cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << endl; // } else { // didNotSendFor = 0; @@ -388,21 +382,6 @@ void FairMQDevice::LogSocketRates() } } - delete[] bytesInput; - delete[] messagesInput; - delete[] bytesOutput; - delete[] messagesOutput; - - delete[] bytesInputNew; - delete[] messagesInputNew; - delete[] bytesOutputNew; - delete[] messagesOutputNew; - - delete[] megabytesPerSecondInput; - delete[] messagesPerSecondInput; - delete[] megabytesPerSecondOutput; - delete[] messagesPerSecondOutput; - LOG(INFO) << ">>>>>>> stopping rateLogger <<<<<<<"; } @@ -423,9 +402,6 @@ void FairMQDevice::Shutdown() { (*itr)->Close(); } - - // LOG(INFO) << ">>>>>>> closing context <<<<<<<"; - // fPayloadContext->Close(); } FairMQDevice::~FairMQDevice() @@ -440,8 +416,6 @@ FairMQDevice::~FairMQDevice() delete (*itr); } - delete fInputAddress; - delete fOutputAddress; delete fPayloadInputs; delete fPayloadOutputs; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 14221bca..46019cdb 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -69,28 +69,29 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable protected: string fId; int fNumIoThreads; - FairMQTransportFactory* fTransportFactory; int fNumInputs; int fNumOutputs; - vector* fInputAddress; - vector* fInputMethod; - vector* fInputSocketType; - vector* fInputSndBufSize; - vector* fInputRcvBufSize; + vector fInputAddress; + vector fInputMethod; + vector fInputSocketType; + vector fInputSndBufSize; + vector fInputRcvBufSize; - vector* fOutputAddress; - vector* fOutputMethod; - vector* fOutputSocketType; - vector* fOutputSndBufSize; - vector* fOutputRcvBufSize; + vector fOutputAddress; + vector fOutputMethod; + vector fOutputSocketType; + vector fOutputSndBufSize; + vector fOutputRcvBufSize; vector* fPayloadInputs; vector* fPayloadOutputs; int fLogIntervalInMs; + FairMQTransportFactory* fTransportFactory; + virtual void Init(); virtual void Run(); virtual void Pause(); diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index 1d238501..e6247f5c 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -36,7 +36,7 @@ std::ostringstream& FairMQLogger::Log(int type) timestamp_t ms = tm / 1000.0L; timestamp_t s = ms / 1000.0L; std::time_t t = s; - std::size_t fractional_seconds = ms % 1000; + // std::size_t fractional_seconds = ms % 1000; char mbstr[100]; std::strftime(mbstr, 100, "%H:%M:%S", std::localtime(&t)); diff --git a/fairmq/README.md b/fairmq/README.md index bfc0d70c..53b28c21 100644 --- a/fairmq/README.md +++ b/fairmq/README.md @@ -1,9 +1,35 @@ -fairmq -======== +# FairMQ -The standard FairRoot is running all the different analysis tasks within one process. The FairMQ ([Message Queue](http://en.wikipedia.org/wiki/Message_queue)) allows starting tasks on different processes and provides the communication layer between these processes. +The standard FairRoot is running all the different analysis tasks within one process. The FairMQ ([Message Queue](http://en.wikipedia.org/wiki/Message_queue)) allows starting tasks on different processes and provides the communication layer between these processes. -The underlying communication layer in the FairMQ is now provided by: +## Devices + +The components encapsulating the tasks are called **devices** and derive from the common base class `FairMQDevice`. FairMQ provides ready to use devices to organize the dataflow between the components (without touching the contents of a message), providing functionality like merging and splitting of the data stream (see subdirectory `devices`). + +A number of devices to handle the data from the Tutorial3 detector of FairRoot are provided as an example and can be found in `FairRoot/base/MQ` directory. The implementation of the tasks run by these devices can be found `FairRoot/example/Tutorial3`. The implementation includes sending raw binary data as well as serializing the data with either [Boost Serialization](www.boost.org/doc/libs/release/libs/serialization/), [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or [Root TMessage](http://root.cern.ch/root/html/TMessage.html). Following the examples you can implement your own devices to transport arbitrary data. + +## Topology + +Devices are arranged into **topologies** where each device has a defined number of data inputs and outputs. + +Example of a simple FairMQ topology: + +![example of FairMQ topology](../docs/images/fairmq-example-topology.png?raw=true "Example of possible FairMQ topology") + +Topology configuration is currently happening via setup scripts. This is very rudimentary and a much more flexible system is now in development. For now, example setup scripts can be found in directory `FairRoot/example/Tutorial3/` along with some additional documentation. + +## Messages + +Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content and transport either raw data or serialized data as described above. Message can be initialized in three different ways: + - **with no parameters**: This is usefull for receiving a message, since neither size nor contents are yet known. + - **given message size**: Initialize message body with a size and fill the contents later, either with `memcpy` or by writing directly into message memory. + - **given message size and buffer**: initialize the message given an existing buffer. This is a zero-copy operation. + +After sending the message, the queueing system takes over control over the message body and will free it with `free()` after it is no longer used. A callback can be given to the message object, to be called instead of the destruction with `free()`. + +## Transport Interface + +The communication layer is available through an interface. Two interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Alternative implementation relies on the [nanomsg](http://nanomsg.org) library. Here is an overview to give an idea how interface is implemented: + +![FairMQ transport interface](../docs/images/fairmq-transport-interface.png?raw=true "FairMQ transport interface") -- [ZeroMQ](http://zeromq.org); -- [NanoMSG](http://nanomsg.org). \ No newline at end of file diff --git a/fairmq/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx similarity index 95% rename from fairmq/FairMQBenchmarkSampler.cxx rename to fairmq/devices/FairMQBenchmarkSampler.cxx index ff6f9aaa..c018c72a 100644 --- a/fairmq/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -66,11 +66,14 @@ void FairMQBenchmarkSampler::Run() delete base_msg; - rateLogger.interrupt(); - resetEventCounter.interrupt(); - - rateLogger.join(); - resetEventCounter.join(); + try { + rateLogger.interrupt(); + resetEventCounter.interrupt(); + rateLogger.join(); + resetEventCounter.join(); + } catch(boost::thread_resource_error& e) { + LOG(ERROR) << e.what(); + } } void FairMQBenchmarkSampler::ResetEventCounter() diff --git a/fairmq/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h similarity index 100% rename from fairmq/FairMQBenchmarkSampler.h rename to fairmq/devices/FairMQBenchmarkSampler.h diff --git a/fairmq/FairMQBuffer.cxx b/fairmq/devices/FairMQBuffer.cxx similarity index 89% rename from fairmq/FairMQBuffer.cxx rename to fairmq/devices/FairMQBuffer.cxx index bbe2c33e..62fea1dd 100644 --- a/fairmq/FairMQBuffer.cxx +++ b/fairmq/devices/FairMQBuffer.cxx @@ -46,8 +46,12 @@ void FairMQBuffer::Run() delete msg; } - rateLogger.interrupt(); - rateLogger.join(); + try { + rateLogger.interrupt(); + rateLogger.join(); + } catch(boost::thread_resource_error& e) { + LOG(ERROR) << e.what(); + } } FairMQBuffer::~FairMQBuffer() diff --git a/fairmq/FairMQBuffer.h b/fairmq/devices/FairMQBuffer.h similarity index 100% rename from fairmq/FairMQBuffer.h rename to fairmq/devices/FairMQBuffer.h diff --git a/fairmq/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx similarity index 91% rename from fairmq/FairMQMerger.cxx rename to fairmq/devices/FairMQMerger.cxx index f8030a66..3f7fb04d 100644 --- a/fairmq/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -61,6 +61,10 @@ void FairMQMerger::Run() delete poller; - rateLogger.interrupt(); - rateLogger.join(); + try { + rateLogger.interrupt(); + rateLogger.join(); + } catch(boost::thread_resource_error& e) { + LOG(ERROR) << e.what(); + } } diff --git a/fairmq/FairMQMerger.h b/fairmq/devices/FairMQMerger.h similarity index 100% rename from fairmq/FairMQMerger.h rename to fairmq/devices/FairMQMerger.h diff --git a/fairmq/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx similarity index 89% rename from fairmq/FairMQProxy.cxx rename to fairmq/devices/FairMQProxy.cxx index 4e748223..44770a3a 100644 --- a/fairmq/FairMQProxy.cxx +++ b/fairmq/devices/FairMQProxy.cxx @@ -48,6 +48,10 @@ void FairMQProxy::Run() delete msg; - rateLogger.interrupt(); - rateLogger.join(); + try { + rateLogger.interrupt(); + rateLogger.join(); + } catch(boost::thread_resource_error& e) { + LOG(ERROR) << e.what(); + } } diff --git a/fairmq/FairMQProxy.h b/fairmq/devices/FairMQProxy.h similarity index 100% rename from fairmq/FairMQProxy.h rename to fairmq/devices/FairMQProxy.h diff --git a/fairmq/FairMQSink.cxx b/fairmq/devices/FairMQSink.cxx similarity index 88% rename from fairmq/FairMQSink.cxx rename to fairmq/devices/FairMQSink.cxx index 4cd1a082..b3f923aa 100644 --- a/fairmq/FairMQSink.cxx +++ b/fairmq/devices/FairMQSink.cxx @@ -39,8 +39,12 @@ void FairMQSink::Run() delete msg; } - rateLogger.interrupt(); - rateLogger.join(); + try { + rateLogger.interrupt(); + rateLogger.join(); + } catch(boost::thread_resource_error& e) { + LOG(ERROR) << e.what(); + } } FairMQSink::~FairMQSink() diff --git a/fairmq/FairMQSink.h b/fairmq/devices/FairMQSink.h similarity index 100% rename from fairmq/FairMQSink.h rename to fairmq/devices/FairMQSink.h diff --git a/fairmq/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx similarity index 90% rename from fairmq/FairMQSplitter.cxx rename to fairmq/devices/FairMQSplitter.cxx index b919f8f6..f995e23a 100644 --- a/fairmq/FairMQSplitter.cxx +++ b/fairmq/devices/FairMQSplitter.cxx @@ -55,6 +55,10 @@ void FairMQSplitter::Run() delete msg; } - rateLogger.interrupt(); - rateLogger.join(); + try { + rateLogger.interrupt(); + rateLogger.join(); + } catch(boost::thread_resource_error& e) { + LOG(ERROR) << e.what(); + } } diff --git a/fairmq/FairMQSplitter.h b/fairmq/devices/FairMQSplitter.h similarity index 100% rename from fairmq/FairMQSplitter.h rename to fairmq/devices/FairMQSplitter.h diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 3e539f9e..5e514633 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -16,7 +16,7 @@ FairMQTransportFactoryNN::FairMQTransportFactoryNN() { - LOG(INFO) << "Using nanonsg library"; + LOG(INFO) << "Using nanomsg library"; } FairMQMessage* FairMQTransportFactoryNN::CreateMessage() diff --git a/fairmq/prototest/FairMQBinSink.cxx b/fairmq/prototest/FairMQBinSink.cxx index 2c5a153a..9163b956 100644 --- a/fairmq/prototest/FairMQBinSink.cxx +++ b/fairmq/prototest/FairMQBinSink.cxx @@ -35,8 +35,8 @@ void FairMQBinSink::Run() fPayloadInputs->at(0)->Receive(msg); int inputSize = msg->GetSize(); - int numInput = inputSize / sizeof(Content); - Content* input = reinterpret_cast(msg->GetData()); + // int numInput = inputSize / sizeof(Content); + // Content* input = reinterpret_cast(msg->GetData()); // for (int i = 0; i < numInput; ++i) { // LOG(INFO) << (&input[i])->x << " " << (&input[i])->y << " " << (&input[i])->z << " " << (&input[i])->a << " " << (&input[i])->b;