diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 8d2f3111..664ce599 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -284,19 +284,19 @@ class FairMQChannel }; template - inline FairMQMessagePtr NewMessage(Args&&... args) const + FairMQMessagePtr NewMessage(Args&&... args) const { return Transport()->CreateMessage(std::forward(args)...); } template - inline FairMQMessagePtr NewSimpleMessage(const T& data) const + FairMQMessagePtr NewSimpleMessage(const T& data) const { return Transport()->NewSimpleMessage(data); } template - inline FairMQMessagePtr NewStaticMessage(const T& data) const + FairMQMessagePtr NewStaticMessage(const T& data) const { return Transport()->NewStaticMessage(data); } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index faa9b508..dd593fef 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -93,12 +93,12 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable Deserializer().Deserialize(msg, std::forward(data), std::forward(args)...); } - inline int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const + int Send(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).Send(msg); } - inline int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const + int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).Receive(msg); } @@ -109,7 +109,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - inline int Send(FairMQMessagePtr& msg, const std::string& chan, const int i, int sndTimeoutInMs) const + int Send(FairMQMessagePtr& msg, const std::string& chan, const int i, int sndTimeoutInMs) const { return fChannels.at(chan).at(i).Send(msg, sndTimeoutInMs); } @@ -120,7 +120,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - inline int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i, int rcvTimeoutInMs) const + int Receive(FairMQMessagePtr& msg, const std::string& chan, const int i, int rcvTimeoutInMs) const { return fChannels.at(chan).at(i).Receive(msg, rcvTimeoutInMs); } @@ -131,7 +131,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - inline int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const + int SendAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).SendAsync(msg); } @@ -142,17 +142,17 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - inline int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const + int ReceiveAsync(FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).ReceiveAsync(msg); } - inline int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0) const + int64_t Send(FairMQParts& parts, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).Send(parts.fParts); } - inline int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const + int64_t Receive(FairMQParts& parts, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).Receive(parts.fParts); } @@ -163,7 +163,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - inline int64_t Send(FairMQParts& parts, const std::string& chan, const int i, int sndTimeoutInMs) const + int64_t Send(FairMQParts& parts, const std::string& chan, const int i, int sndTimeoutInMs) const { return fChannels.at(chan).at(i).Send(parts.fParts, sndTimeoutInMs); } @@ -174,7 +174,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - inline int64_t Receive(FairMQParts& parts, const std::string& chan, const int i, int rcvTimeoutInMs) const + int64_t Receive(FairMQParts& parts, const std::string& chan, const int i, int rcvTimeoutInMs) const { return fChannels.at(chan).at(i).Receive(parts.fParts, rcvTimeoutInMs); } @@ -185,7 +185,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - inline int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const + int64_t SendAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).SendAsync(parts.fParts); } @@ -196,7 +196,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - inline int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const + int64_t ReceiveAsync(FairMQParts& parts, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts); } @@ -208,37 +208,37 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable } template - inline FairMQMessagePtr NewMessage(Args&&... args) const + FairMQMessagePtr NewMessage(Args&&... args) const { return Transport()->CreateMessage(std::forward(args)...); } template - inline FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const + FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) const { return fChannels.at(channel).at(index).Transport()->CreateMessage(std::forward(args)...); } template - inline FairMQMessagePtr NewStaticMessage(const T& data) const + FairMQMessagePtr NewStaticMessage(const T& data) const { return Transport()->NewStaticMessage(data); } template - inline FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) const + FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) const { return fChannels.at(channel).at(index).NewStaticMessage(data); } template - inline FairMQMessagePtr NewSimpleMessage(const T& data) const + FairMQMessagePtr NewSimpleMessage(const T& data) const { return Transport()->NewSimpleMessage(data); } template - inline FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) const + FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) const { return fChannels.at(channel).at(index).NewSimpleMessage(data); } @@ -263,7 +263,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { FairMQ::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType(); - for (int i = 1; i < chans.size(); ++i) + for (unsigned int i = 1; i < chans.size(); ++i) { if (type != fChannels.at(chans.at(i)).at(0).Transport()->GetType()) { @@ -303,7 +303,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Works only when running in a terminal. Running in background would exit, because no interactive input (std::cin) is possible. void InteractiveStateLoop(); /// Prints the available commands of the InteractiveStateLoop() - inline void PrintInteractiveStateLoopHelp() + void PrintInteractiveStateLoopHelp() { LOG(INFO) << "Use keys to control the state machine:"; LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device"; diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 42a3d5a5..5afe7968 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -79,7 +79,8 @@ struct FairMQFSM : public msmf::state_machine_def { public: FairMQFSM() - : fWorkerThread() + : fState() + , fChangeStateMutex() , fWork() , fWorkAvailableCondition() , fWorkDoneCondition() @@ -87,10 +88,9 @@ struct FairMQFSM : public msmf::state_machine_def , fWorkerTerminated(false) , fWorkActive(false) , fWorkAvailable(false) - , fState() - , fChangeStateMutex() , fStateChangeSignal() , fStateChangeSignalsMap() + , fWorkerThread() {} virtual ~FairMQFSM() diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 6570b964..4cee94ab 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -85,7 +85,7 @@ class FairMQTransportFactory } template - inline FairMQMessagePtr NewSimpleMessage(const T& data) const + FairMQMessagePtr NewSimpleMessage(const T& data) const { // todo: is_trivially_copyable not available on gcc < 5, workaround? // static_assert(std::is_trivially_copyable::value, "The argument type for NewSimpleMessage has to be trivially copyable!"); @@ -94,13 +94,13 @@ class FairMQTransportFactory } template - inline FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const + FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const { std::string* msgStr = new std::string(data); return CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); } - inline FairMQMessagePtr NewSimpleMessage(const std::string& str) const + FairMQMessagePtr NewSimpleMessage(const std::string& str) const { std::string* msgStr = new std::string(str); @@ -108,12 +108,12 @@ class FairMQTransportFactory } template - inline FairMQMessagePtr NewStaticMessage(const T& data) const + FairMQMessagePtr NewStaticMessage(const T& data) const { return CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr); } - inline FairMQMessagePtr NewStaticMessage(const std::string& str) const + FairMQMessagePtr NewStaticMessage(const std::string& str) const { return CreateMessage(const_cast(str.c_str()), str.length(), FairMQNoCleanup, nullptr); } diff --git a/fairmq/Plugin.h b/fairmq/Plugin.h index 966772ba..943cfe86 100644 --- a/fairmq/Plugin.h +++ b/fairmq/Plugin.h @@ -41,6 +41,10 @@ class Plugin Plugin() = delete; Plugin(const std::string name, const Version version, const std::string maintainer, const std::string homepage, PluginServices* pluginServices); + + Plugin(const Plugin&) = delete; + Plugin operator=(const Plugin&) = delete; + virtual ~Plugin(); auto GetName() const -> const std::string& { return fkName; } @@ -90,7 +94,6 @@ class Plugin const std::string fkMaintainer; const std::string fkHomepage; PluginServices* fPluginServices; - }; /* class Plugin */ } /* namespace mq */ diff --git a/fairmq/PluginManager.cxx b/fairmq/PluginManager.cxx index 128c6efe..bbc3dcd5 100644 --- a/fairmq/PluginManager.cxx +++ b/fairmq/PluginManager.cxx @@ -28,7 +28,12 @@ using boost::optional; const std::string fair::mq::PluginManager::fgkLibPrefix = "FairMQPlugin_"; fair::mq::PluginManager::PluginManager() -: fSearchPaths{{"."}} + : fSearchPaths{{"."}} + , fPluginFactories() + , fPlugins() + , fPluginOrder() + , fPluginProgOptions() + , fPluginServices() { } @@ -61,16 +66,17 @@ auto fair::mq::PluginManager::ProgramOptions() -> po::options_description { auto plugin_options = po::options_description{"Plugin Manager"}; plugin_options.add_options() - ("plugin-search-path,S", po::value>()->multitoken(), - "List of plugin search paths.\n\n" - "* Override default search path, e.g.\n" - " -S /home/user/lib /lib\n" - "* Append(>) or prepend(<) to default search path, e.g.\n" - " -S >/lib /lib >(), - "List of plugin names to load in order, e.g. if the file is called 'libFairMQPlugin_example.so', just list 'example' or 'd:example' here. To load a prelinked plugin, list 'p:example' here."); + ("plugin-search-path,S", po::value>()->multitoken(), "List of plugin search paths.\n\n" + "* Override default search path, e.g.\n" + " -S /home/user/lib /lib\n" + "* Append(>) or prepend(<) to default search path, e.g.\n" + " -S >/lib /lib >(), "List of plugin names to load in order," + "e.g. if the file is called 'libFairMQPlugin_example.so', just list 'example' or 'd:example' here." + "To load a prelinked plugin, list 'p:example' here."); + return plugin_options; } diff --git a/fairmq/PluginServices.h b/fairmq/PluginServices.h index 1a23c622..556cbd2c 100644 --- a/fairmq/PluginServices.h +++ b/fairmq/PluginServices.h @@ -37,11 +37,14 @@ class PluginServices public: PluginServices() = delete; PluginServices(FairMQProgOptions* config, std::shared_ptr device) - : fDevice{device} - , fConfig{config} + : fConfig{config} + , fDevice{device} { } + PluginServices(const PluginServices&) = delete; + PluginServices operator=(const PluginServices&) = delete; + /// See https://github.com/FairRootGroup/FairRoot/blob/dev/fairmq/docs/Device.md#13-state-machine enum class DeviceState : int { diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index a8066bb0..7753b672 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -84,7 +84,7 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* } } -FairMQMessageNN::FairMQMessageNN(FairMQRegionPtr& region, void* data, const size_t size) +FairMQMessageNN::FairMQMessageNN(FairMQRegionPtr& /*region*/, void* data, const size_t size) : fMessage(data) , fSize(size) , fReceiving(false) @@ -187,7 +187,7 @@ void FairMQMessageNN::Copy(const unique_ptr& msg) } } -inline void FairMQMessageNN::Clear() +void FairMQMessageNN::Clear() { if (nn_freemsg(fMessage) < 0) { diff --git a/fairmq/options/FairMQSuboptParser.cxx b/fairmq/options/FairMQSuboptParser.cxx index 4f6f0521..5a101af8 100644 --- a/fairmq/options/FairMQSuboptParser.cxx +++ b/fairmq/options/FairMQSuboptParser.cxx @@ -24,47 +24,56 @@ constexpr const char* SUBOPT::channelOptionKeys[]; FairMQMap SUBOPT::UserParser(const po::variables_map& omap, const std::string& deviceId, const std::string& rootNode) { - std::string nodeKey = rootNode + ".device"; - ptree pt; + std::string nodeKey = rootNode + ".device"; + ptree pt; - pt.put(nodeKey + ".id", deviceId.c_str()); - nodeKey += ".channels"; + pt.put(nodeKey + ".id", deviceId.c_str()); + nodeKey += ".channels"; - // parsing of channel properties is the only implemented method right now - if (omap.count(OptionKeyChannelConfig) > 0) { - std::map channelProperties; - auto tokens = omap[OptionKeyChannelConfig].as>(); - for (auto token : tokens) { - std::map::iterator channelProperty = channelProperties.end(); - ptree socketProperty; - std::string channelName; - std::string argString(token); - char* subopts = &argString[0]; - char* value = nullptr; - while (subopts && *subopts != 0 && *subopts != ' ') { - char* saved = subopts; - int subopt=getsubopt(&subopts, (char**)channelOptionKeys, &value); - if (subopt == NAME) { - channelName = value; - channelProperties[channelName].put("name", channelName); - } else - if (subopt>=0 && value != nullptr) { - socketProperty.put(channelOptionKeys[subopt], value); + // parsing of channel properties is the only implemented method right now + if (omap.count(OptionKeyChannelConfig) > 0) + { + std::map channelProperties; + auto tokens = omap[OptionKeyChannelConfig].as>(); + for (auto token : tokens) + { + // std::map::iterator channelProperty = channelProperties.end(); + ptree socketProperty; + std::string channelName; + std::string argString(token); + char* subopts = &argString[0]; + char* value = nullptr; + while (subopts && *subopts != 0 && *subopts != ' ') + { + // char* saved = subopts; + int subopt=getsubopt(&subopts, (char**)channelOptionKeys, &value); + if (subopt == NAME) + { + channelName = value; + channelProperties[channelName].put("name", channelName); + } + else if (subopt>=0 && value != nullptr) + { + socketProperty.put(channelOptionKeys[subopt], value); + } + } + if (channelName != "") + { + channelProperties[channelName].add_child("sockets.socket", socketProperty); + } + else + { + // TODO: what is the error policy here, should we abort? + LOG(ERROR) << "missing channel name in argument of option --channel-config"; + } + } + for (auto channelProperty : channelProperties) + { + pt.add_child(nodeKey + ".channel", channelProperty.second); } - } - if (channelName != "") { - channelProperties[channelName].add_child("sockets.socket", socketProperty); - } else { - // TODO: what is the error policy here, should we abort? - LOG(ERROR) << "missing channel name in argument of option --channel-config"; - } } - for (auto channelProperty : channelProperties) { - pt.add_child(nodeKey + ".channel", channelProperty.second); - } - } - return ptreeToMQMap(pt, deviceId, rootNode); + return ptreeToMQMap(pt, deviceId, rootNode); } -} // namespace FairMQMap +} // namespace FairMQParser diff --git a/fairmq/shmem/FairMQShmMonitor.cxx b/fairmq/shmem/FairMQShmMonitor.cxx index 9ae12045..7976621a 100644 --- a/fairmq/shmem/FairMQShmMonitor.cxx +++ b/fairmq/shmem/FairMQShmMonitor.cxx @@ -345,7 +345,7 @@ void Monitor::Cleanup(const string& segmentName) { cout << "Region counter found: " << rc->fCount << endl; unsigned int regionCount = rc->fCount; - for (int i = 1; i <= regionCount; ++i) + for (unsigned int i = 1; i <= regionCount; ++i) { RemoveObject("fairmq_shmem_region_" + to_string(regionCount)); } @@ -403,7 +403,7 @@ void Monitor::PrintQueues() { cout << "found " << queues->size() << " queue(s):" << endl; - for (int i = 0; i < queues->size(); ++i) + for (unsigned int i = 0; i < queues->size(); ++i) { string name(queues->at(i).c_str()); cout << '\t' << name << " : "; diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt index 5a112813..6e00cff0 100644 --- a/fairmq/test/CMakeLists.txt +++ b/fairmq/test/CMakeLists.txt @@ -149,5 +149,3 @@ add_custom_target(FairMQTests DEPENDS ${ALL_TEST_TARGETS} ) - - diff --git a/fairmq/test/helper/plugins/dummy.h.in b/fairmq/test/helper/plugins/dummy.h.in index 2fb89415..6cb1cd2e 100644 --- a/fairmq/test/helper/plugins/dummy.h.in +++ b/fairmq/test/helper/plugins/dummy.h.in @@ -41,13 +41,15 @@ class DummyPlugin : public fair::mq::Plugin case DeviceState::Exiting: UnsubscribeFromDeviceStateChange(); break; + default: + break; } } ); } }; /* class DummyPlugin */ -auto DummyPluginProgramOptions() -> Plugin::ProgOptions +auto DummyPluginProgramOptions() -> Plugin::ProgOptions { auto plugin_options = boost::program_options::options_description{"Dummy Plugin"}; plugin_options.add_options() diff --git a/fairmq/test/plugins/_plugin.cxx b/fairmq/test/plugins/_plugin.cxx index 4a9ab9ad..8c1caacc 100644 --- a/fairmq/test/plugins/_plugin.cxx +++ b/fairmq/test/plugins/_plugin.cxx @@ -41,9 +41,9 @@ TEST(Plugin, Operators) FairMQProgOptions config{}; auto device = make_shared(); PluginServices services{&config, device}; - auto p1 = Plugin{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; - auto p2 = Plugin{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; - auto p3 = Plugin{"file", {1, 0, 0}, "Foo Bar ", "https://git.test.net/file.git", &services}; + Plugin p1{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; + Plugin p2{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; + Plugin p3{"file", {1, 0, 0}, "Foo Bar ", "https://git.test.net/file.git", &services}; EXPECT_EQ(p1, p2); EXPECT_NE(p1, p3); control(device); @@ -54,7 +54,7 @@ TEST(Plugin, OstreamOperators) FairMQProgOptions config{}; auto device = make_shared(); PluginServices services{&config, device}; - auto p1 = Plugin{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; + Plugin p1{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; stringstream ss; ss << p1; EXPECT_EQ(ss.str(), string{"'dds', version '1.0.0', maintainer 'Foo Bar ', homepage 'https://git.test.net/dds.git'"}); diff --git a/fairmq/test/plugins/_plugin_manager.cxx b/fairmq/test/plugins/_plugin_manager.cxx index 1d8d9c75..da5d4114 100644 --- a/fairmq/test/plugins/_plugin_manager.cxx +++ b/fairmq/test/plugins/_plugin_manager.cxx @@ -59,7 +59,7 @@ TEST(PluginManager, LoadPluginDynamic) // program options auto count = 0; - mgr.ForEachPluginProgOptions([&count](const options_description& d){ ++count; }); + mgr.ForEachPluginProgOptions([&count](const options_description& /*d*/){ ++count; }); ASSERT_EQ(count, 1); control(device); diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index 296f6cd2..4b657e42 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -22,7 +22,7 @@ // template function that takes any device // and runs a simple MQ state machine configured from a JSON file and/or a plugin. template -inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) +int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) { std::string config = cfg.GetValue("config"); std::string control = cfg.GetValue("control"); diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 3ac453fb..d72c1bc8 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -50,7 +50,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn } } -FairMQMessageZMQ::FairMQMessageZMQ(FairMQRegionPtr& region, void* data, const size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(FairMQRegionPtr& /*region*/, void* data, const size_t size) : fMessage() { // FIXME: make this zero-copy: