diff --git a/examples/1-1/sampler.cxx b/examples/1-1/sampler.cxx index 797d7850..37c3d46d 100644 --- a/examples/1-1/sampler.cxx +++ b/examples/1-1/sampler.cxx @@ -30,7 +30,7 @@ struct Sampler : fair::mq::Device // create message object with a pointer to the data buffer, its size, // custom deletion function (called when transfer is done), // and pointer to the object managing the data buffer - FairMQMessagePtr msg(NewMessage( + fair::mq::MessagePtr msg(NewMessage( const_cast(text->c_str()), text->length(), [](void* /*data*/, void* object) { delete static_cast(object); }, diff --git a/examples/1-1/sink.cxx b/examples/1-1/sink.cxx index c044a8a4..e27db81f 100644 --- a/examples/1-1/sink.cxx +++ b/examples/1-1/sink.cxx @@ -27,7 +27,7 @@ struct Sink : fair::mq::Device fMaxIterations = fConfig->GetProperty("max-iterations"); } - bool HandleData(FairMQMessagePtr& msg, int) + bool HandleData(fair::mq::MessagePtr& msg, int) { LOG(info) << "Received: \"" << std::string(static_cast(msg->GetData()), msg->GetSize()) << "\""; diff --git a/examples/1-n-1/processor.cxx b/examples/1-n-1/processor.cxx index 0703fd7d..eb5efa48 100644 --- a/examples/1-n-1/processor.cxx +++ b/examples/1-n-1/processor.cxx @@ -20,7 +20,7 @@ struct Processor : fair::mq::Device OnData("data1", &Processor::HandleData); } - bool HandleData(FairMQMessagePtr& msg, int) + bool HandleData(fair::mq::MessagePtr& msg, int) { LOG(info) << "Received data, processing..."; @@ -32,7 +32,7 @@ struct Processor : fair::mq::Device // its size, // custom deletion function (called when transfer is done), // and pointer to the object managing the data buffer - FairMQMessagePtr msg2(NewMessage(const_cast(text->c_str()), + fair::mq::MessagePtr msg2(NewMessage(const_cast(text->c_str()), text->length(), [](void* /*data*/, void* object) { delete static_cast(object); }, text)); diff --git a/examples/1-n-1/sampler.cxx b/examples/1-n-1/sampler.cxx index 4235047d..a90ec558 100644 --- a/examples/1-n-1/sampler.cxx +++ b/examples/1-n-1/sampler.cxx @@ -28,7 +28,7 @@ struct Sampler : fair::mq::Device { // Initializing message with NewStaticMessage will avoid copy // but won't delete the data after the sending is completed. - FairMQMessagePtr msg(NewStaticMessage(fText)); + fair::mq::MessagePtr msg(NewStaticMessage(fText)); LOG(info) << "Sending \"" << fText << "\""; diff --git a/examples/1-n-1/sink.cxx b/examples/1-n-1/sink.cxx index 2d2cb4cd..dcd37d9d 100644 --- a/examples/1-n-1/sink.cxx +++ b/examples/1-n-1/sink.cxx @@ -27,7 +27,7 @@ struct Sink : fair::mq::Device fMaxIterations = fConfig->GetProperty("max-iterations"); } - bool HandleData(FairMQMessagePtr& msg, int) + bool HandleData(fair::mq::MessagePtr& msg, int) { LOG(info) << "Received: \"" << std::string(static_cast(msg->GetData()), msg->GetSize()) << "\""; diff --git a/examples/copypush/sampler.cxx b/examples/copypush/sampler.cxx index 2e38a692..abb194e3 100644 --- a/examples/copypush/sampler.cxx +++ b/examples/copypush/sampler.cxx @@ -27,10 +27,10 @@ struct Sampler : fair::mq::Device { // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy - FairMQMessagePtr msg(NewSimpleMessage(fCounter++)); + fair::mq::MessagePtr msg(NewSimpleMessage(fCounter++)); for (int i = 0; i < fNumDataChannels - 1; ++i) { - FairMQMessagePtr msgCopy(NewMessage()); + fair::mq::MessagePtr msgCopy(NewMessage()); msgCopy->Copy(*msg); Send(msgCopy, "data", i); } diff --git a/examples/copypush/sink.cxx b/examples/copypush/sink.cxx index 548c6375..a7100c2e 100644 --- a/examples/copypush/sink.cxx +++ b/examples/copypush/sink.cxx @@ -27,7 +27,7 @@ struct Sink : fair::mq::Device fMaxIterations = fConfig->GetProperty("max-iterations"); } - bool HandleData(FairMQMessagePtr& msg, int) + bool HandleData(fair::mq::MessagePtr& msg, int) { LOG(info) << "Received message: \"" << *(static_cast(msg->GetData())) << "\""; diff --git a/examples/dds/processor.cxx b/examples/dds/processor.cxx index 0703fd7d..eb5efa48 100644 --- a/examples/dds/processor.cxx +++ b/examples/dds/processor.cxx @@ -20,7 +20,7 @@ struct Processor : fair::mq::Device OnData("data1", &Processor::HandleData); } - bool HandleData(FairMQMessagePtr& msg, int) + bool HandleData(fair::mq::MessagePtr& msg, int) { LOG(info) << "Received data, processing..."; @@ -32,7 +32,7 @@ struct Processor : fair::mq::Device // its size, // custom deletion function (called when transfer is done), // and pointer to the object managing the data buffer - FairMQMessagePtr msg2(NewMessage(const_cast(text->c_str()), + fair::mq::MessagePtr msg2(NewMessage(const_cast(text->c_str()), text->length(), [](void* /*data*/, void* object) { delete static_cast(object); }, text)); diff --git a/examples/dds/sampler.cxx b/examples/dds/sampler.cxx index f468ff6a..37e9b54a 100644 --- a/examples/dds/sampler.cxx +++ b/examples/dds/sampler.cxx @@ -24,7 +24,7 @@ struct Sampler : fair::mq::Device { // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy - FairMQMessagePtr msg(NewSimpleMessage("Data")); + fair::mq::MessagePtr msg(NewSimpleMessage("Data")); LOG(info) << "Sending \"Data\""; diff --git a/examples/dds/sink.cxx b/examples/dds/sink.cxx index 29f938fa..60b9d493 100644 --- a/examples/dds/sink.cxx +++ b/examples/dds/sink.cxx @@ -25,7 +25,7 @@ struct Sink : fair::mq::Device fIterations = fConfig->GetValue("iterations"); } - bool HandleData(FairMQMessagePtr& msg, int) + bool HandleData(fair::mq::MessagePtr& msg, int) { LOG(info) << "Received: \"" << std::string(static_cast(msg->GetData()), msg->GetSize()) << "\""; diff --git a/examples/multipart/README.md b/examples/multipart/README.md index fb458836..42a9099a 100644 --- a/examples/multipart/README.md +++ b/examples/multipart/README.md @@ -5,7 +5,7 @@ A topology of two devices - Sampler and Sink, communicating with PUSH-PULL patte The Sampler sends a multipart message to the Sink, consisting of two message parts - header and body. -Each message part is a regular FairMQMessage. To combine them into a multi-part message use `FairMQParts`. Add messages to `FairMQParts` with `AddPart` method. +Each message part is a regular fair::mq::Message. To combine them into a multi-part message use `fair::mq::Parts`. Add messages to `fair::mq::Parts` with `AddPart` method. All parts are guaranteed to be delivered together. The Receive call in the sink will recive the entire parts structure. diff --git a/examples/multipart/sampler.cxx b/examples/multipart/sampler.cxx index 307b48ea..14588b8d 100644 --- a/examples/multipart/sampler.cxx +++ b/examples/multipart/sampler.cxx @@ -35,15 +35,15 @@ struct Sampler : fair::mq::Device } LOG(info) << "Sending header with stopFlag: " << header.stopFlag; - FairMQParts parts; + fair::mq::Parts parts; // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy parts.AddPart(NewSimpleMessage(header)); parts.AddPart(NewMessage(1000)); - // create more data parts, testing the FairMQParts in-place constructor - FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) }; + // create more data parts, testing the fair::mq::Parts in-place constructor + fair::mq::Parts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) }; assert(auxData.Size() == 3); parts.AddPart(std::move(auxData)); assert(auxData.Size() == 0); diff --git a/examples/multipart/sink.cxx b/examples/multipart/sink.cxx index 0fd2a1bd..6c32c503 100644 --- a/examples/multipart/sink.cxx +++ b/examples/multipart/sink.cxx @@ -20,7 +20,7 @@ struct Sink : fair::mq::Device OnData("data", &Sink::HandleData); } - bool HandleData(FairMQParts& parts, int) + bool HandleData(fair::mq::Parts& parts, int) { LOG(info) << "Received message with " << parts.Size() << " parts"; diff --git a/examples/multiple-channels/README.md b/examples/multiple-channels/README.md index f9c5d162..222104f7 100644 --- a/examples/multiple-channels/README.md +++ b/examples/multiple-channels/README.md @@ -5,4 +5,4 @@ This example demonstrates how to work with multiple channels and multiplex betwe A topology of three devices - **Sampler**, **Sink** and **Broadcaster**. The Sampler sends data to the Sink via the PUSH-PULL pattern. The Broadcaster device sends a message to both Sampler and Sink containing a string "OK" every second. The Broadcaster sends the message via PUB pattern. Both Sampler and Sink, besides doing their PUSH-PULL job, listen via SUB to the Broadcaster. -The multiplexing between their data channels and the broadcast channels happens with `FairMQPoller`. +The multiplexing between their data channels and the broadcast channels happens with `fair::mq::Poller`. diff --git a/examples/multiple-channels/broadcaster.cxx b/examples/multiple-channels/broadcaster.cxx index 70ebae33..c59c5af7 100644 --- a/examples/multiple-channels/broadcaster.cxx +++ b/examples/multiple-channels/broadcaster.cxx @@ -22,7 +22,7 @@ struct Broadcaster : fair::mq::Device // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). // Should only be used for small data because of the cost of an additional copy - FairMQMessagePtr msg(NewSimpleMessage("OK")); + fair::mq::MessagePtr msg(NewSimpleMessage("OK")); LOG(info) << "Sending OK"; diff --git a/examples/multiple-channels/sampler.cxx b/examples/multiple-channels/sampler.cxx index 07120e03..cfdefabb 100644 --- a/examples/multiple-channels/sampler.cxx +++ b/examples/multiple-channels/sampler.cxx @@ -7,7 +7,7 @@ ********************************************************************************/ #include -#include +#include #include #include @@ -26,13 +26,13 @@ struct Sampler : fair::mq::Device void Run() override { - FairMQPollerPtr poller(NewPoller("data", "broadcast")); + fair::mq::PollerPtr poller(NewPoller("data", "broadcast")); while (!NewStatePending()) { poller->Poll(100); if (poller->CheckInput("broadcast", 0)) { - FairMQMessagePtr msg(NewMessage()); + fair::mq::MessagePtr msg(NewMessage()); if (Receive(msg, "broadcast") > 0) { LOG(info) << "Received broadcast: \"" << std::string(static_cast(msg->GetData()), msg->GetSize()) << "\""; @@ -42,7 +42,7 @@ struct Sampler : fair::mq::Device if (poller->CheckOutput("data", 0)) { std::this_thread::sleep_for(std::chrono::seconds(1)); - FairMQMessagePtr msg(NewSimpleMessage(fText)); + fair::mq::MessagePtr msg(NewSimpleMessage(fText)); if (Send(msg, "data") > 0) { LOG(info) << "Sent \"" << fText << "\""; diff --git a/examples/multiple-channels/sink.cxx b/examples/multiple-channels/sink.cxx index 60724e49..d5962e50 100644 --- a/examples/multiple-channels/sink.cxx +++ b/examples/multiple-channels/sink.cxx @@ -27,7 +27,7 @@ struct Sink : fair::mq::Device fMaxIterations = fConfig->GetProperty("max-iterations"); } - bool HandleBroadcast(FairMQMessagePtr& msg, int /*index*/) + bool HandleBroadcast(fair::mq::MessagePtr& msg, int /*index*/) { LOG(info) << "Received broadcast: \"" << std::string(static_cast(msg->GetData()), msg->GetSize()) << "\""; fReceivedBroadcast = true; @@ -35,7 +35,7 @@ struct Sink : fair::mq::Device return CheckIterations(); } - bool HandleData(FairMQMessagePtr& msg, int /*index*/) + bool HandleData(fair::mq::MessagePtr& msg, int /*index*/) { LOG(info) << "Received message: \"" << std::string(static_cast(msg->GetData()), msg->GetSize()) << "\""; fReceivedData = true; diff --git a/examples/multiple-transports/sampler1.cxx b/examples/multiple-transports/sampler1.cxx index dc61eddd..e26e283f 100644 --- a/examples/multiple-transports/sampler1.cxx +++ b/examples/multiple-transports/sampler1.cxx @@ -28,7 +28,7 @@ struct Sampler1 : fair::mq::Device bool ConditionalRun() override { // Creates a message using the transport of channel data1 - FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000)); + fair::mq::MessagePtr msg(NewMessageFor("data1", 0, 1000000)); // in case of error or transfer interruption, return false to go to IDLE state // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). @@ -54,7 +54,7 @@ struct Sampler1 : fair::mq::Device uint64_t numAcks = 0; while (!NewStatePending()) { - FairMQMessagePtr ack(NewMessageFor("ack", 0)); + fair::mq::MessagePtr ack(NewMessageFor("ack", 0)); if (Receive(ack, "ack") < 0) { break; } diff --git a/examples/multiple-transports/sampler2.cxx b/examples/multiple-transports/sampler2.cxx index d8c3c638..2b81f572 100644 --- a/examples/multiple-transports/sampler2.cxx +++ b/examples/multiple-transports/sampler2.cxx @@ -22,7 +22,7 @@ struct Sampler2 : fair::mq::Device bool ConditionalRun() override { - FairMQMessagePtr msg(NewMessage(1000)); + fair::mq::MessagePtr msg(NewMessage(1000)); // in case of error or transfer interruption, return false to go to IDLE state // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). diff --git a/examples/multiple-transports/sink.cxx b/examples/multiple-transports/sink.cxx index de6f7519..4ff49da9 100644 --- a/examples/multiple-transports/sink.cxx +++ b/examples/multiple-transports/sink.cxx @@ -26,11 +26,11 @@ struct Sink : fair::mq::Device } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) - bool HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/) + bool HandleData1(fair::mq::MessagePtr& /*msg*/, int /*index*/) { fNumIterations1++; // Creates a message using the transport of channel ack - FairMQMessagePtr ack(NewMessageFor("ack", 0)); + fair::mq::MessagePtr ack(NewMessageFor("ack", 0)); if (Send(ack, "ack") < 0) { return false; } @@ -40,7 +40,7 @@ struct Sink : fair::mq::Device } // handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) - bool HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/) + bool HandleData2(fair::mq::MessagePtr& /*msg*/, int /*index*/) { fNumIterations2++; // return true if want to be called again (otherwise go to IDLE state) diff --git a/examples/n-m/receiver.cxx b/examples/n-m/receiver.cxx index a52956bd..91913450 100644 --- a/examples/n-m/receiver.cxx +++ b/examples/n-m/receiver.cxx @@ -24,7 +24,7 @@ namespace bpo = boost::program_options; struct TFBuffer { - FairMQParts parts; + fair::mq::Parts parts; chrono::steady_clock::time_point start; chrono::steady_clock::time_point end; }; @@ -43,7 +43,7 @@ struct Receiver : fair::mq::Device fMaxTimeframes = GetConfig()->GetValue("max-timeframes"); } - bool HandleData(FairMQParts& parts, int /* index */) + bool HandleData(fair::mq::Parts& parts, int /* index */) { Header& h = *(static_cast(parts.At(0)->GetData())); // LOG(info) << "Received sub-time frame #" << h.id << " from Sender" << h.senderIndex; @@ -107,7 +107,7 @@ void addCustomOptions(bpo::options_description& options) ("max-timeframes", bpo::value()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)"); } -std::unique_ptr getDevice(FairMQProgOptions& /* config */) +std::unique_ptr getDevice(fair::mq::ProgOptions& /* config */) { return std::make_unique(); } diff --git a/examples/n-m/sender.cxx b/examples/n-m/sender.cxx index 9c1dd0e5..a92f7945 100644 --- a/examples/n-m/sender.cxx +++ b/examples/n-m/sender.cxx @@ -28,11 +28,11 @@ struct Sender : fair::mq::Device void Run() override { - FairMQChannel& dataInChannel = GetChannel("sync", 0); + fair::mq::Channel& dataInChannel = GetChannel("sync", 0); while (!NewStatePending()) { Header h; - FairMQMessagePtr id(NewMessage()); + fair::mq::MessagePtr id(NewMessage()); if (dataInChannel.Receive(id) > 0) { h.id = *(static_cast(id->GetData())); h.senderIndex = fIndex; @@ -40,7 +40,7 @@ struct Sender : fair::mq::Device continue; } - FairMQParts parts; + fair::mq::Parts parts; parts.AddPart(NewSimpleMessage(h)); parts.AddPart(NewMessage(fSubtimeframeSize)); @@ -66,7 +66,7 @@ void addCustomOptions(bpo::options_description& options) ("subtimeframe-size", bpo::value()->default_value(1000), "Subtimeframe size in bytes") ("num-receivers", bpo::value()->required(), "Number of EPNs"); } -std::unique_ptr getDevice(FairMQProgOptions& /* config */) +std::unique_ptr getDevice(fair::mq::ProgOptions& /* config */) { return std::make_unique(); } diff --git a/examples/n-m/synchronizer.cxx b/examples/n-m/synchronizer.cxx index c9ea52ba..8f0ccc51 100644 --- a/examples/n-m/synchronizer.cxx +++ b/examples/n-m/synchronizer.cxx @@ -19,7 +19,7 @@ struct Synchronizer : fair::mq::Device { bool ConditionalRun() override { - FairMQMessagePtr msg(NewSimpleMessage(fTimeframeId)); + fair::mq::MessagePtr msg(NewSimpleMessage(fTimeframeId)); if (Send(msg, "sync") > 0) { if (++fTimeframeId == UINT16_MAX - 1) { @@ -37,7 +37,7 @@ struct Synchronizer : fair::mq::Device }; void addCustomOptions(bpo::options_description& /* options */) {} -std::unique_ptr getDevice(FairMQProgOptions& /* config */) +std::unique_ptr getDevice(fair::mq::ProgOptions& /* config */) { return std::make_unique(); } diff --git a/examples/qc/qCDispatcher.cxx b/examples/qc/qCDispatcher.cxx index d2de4bed..8d229693 100644 --- a/examples/qc/qCDispatcher.cxx +++ b/examples/qc/qCDispatcher.cxx @@ -30,10 +30,10 @@ struct QCDispatcher : fair::mq::Device }); } - bool HandleData(FairMQMessagePtr& msg, int) + bool HandleData(fair::mq::MessagePtr& msg, int) { if (fDoQC.load() == true) { - FairMQMessagePtr msgCopy(NewMessage()); + fair::mq::MessagePtr msgCopy(NewMessage()); msgCopy->Copy(*msg); if (Send(msg, "qc") < 0) { return false; diff --git a/examples/qc/qCTask.cxx b/examples/qc/qCTask.cxx index 54bfed57..63e34beb 100644 --- a/examples/qc/qCTask.cxx +++ b/examples/qc/qCTask.cxx @@ -9,12 +9,12 @@ #include #include -class QCTask : public FairMQDevice +class QCTask : public fair::mq::Device { public: QCTask() { - OnData("qc", [](FairMQMessagePtr& /*msg*/, int) { + OnData("qc", [](fair::mq::MessagePtr& /*msg*/, int) { LOG(info) << "received data"; return false; }); diff --git a/examples/qc/sampler.cxx b/examples/qc/sampler.cxx index bbf72588..24956b22 100644 --- a/examples/qc/sampler.cxx +++ b/examples/qc/sampler.cxx @@ -16,7 +16,7 @@ struct Sampler : fair::mq::Device { bool ConditionalRun() override { - FairMQMessagePtr msg(NewMessage(1000)); + fair::mq::MessagePtr msg(NewMessage(1000)); if (Send(msg, "data1") < 0) { return false; diff --git a/examples/qc/sink.cxx b/examples/qc/sink.cxx index 4b342dc5..789ae756 100644 --- a/examples/qc/sink.cxx +++ b/examples/qc/sink.cxx @@ -14,7 +14,7 @@ struct Sink : fair::mq::Device { Sink() { OnData("data2", &Sink::HandleData); } - bool HandleData(FairMQMessagePtr& /*msg*/, int /*index*/) { return true; } + bool HandleData(fair::mq::MessagePtr& /*msg*/, int /*index*/) { return true; } }; namespace bpo = boost::program_options; diff --git a/examples/readout/builder.cxx b/examples/readout/builder.cxx index fe83dc01..2c679f78 100644 --- a/examples/readout/builder.cxx +++ b/examples/readout/builder.cxx @@ -13,7 +13,7 @@ namespace bpo = boost::program_options; -class Builder : public FairMQDevice +class Builder : public fair::mq::Device { public: Builder() = default; @@ -24,7 +24,7 @@ class Builder : public FairMQDevice OnData("rb", &Builder::HandleData); } - bool HandleData(FairMQMessagePtr& msg, int /*index*/) + bool HandleData(fair::mq::MessagePtr& msg, int /*index*/) { if (Send(msg, fOutputChannelName) < 0) { return false; diff --git a/examples/readout/processor.cxx b/examples/readout/processor.cxx index 345418f3..10db3675 100644 --- a/examples/readout/processor.cxx +++ b/examples/readout/processor.cxx @@ -18,9 +18,9 @@ struct Processor : fair::mq::Device OnData("bp", &Processor::HandleData); } - bool HandleData(FairMQMessagePtr& msg, int /*index*/) + bool HandleData(fair::mq::MessagePtr& msg, int /*index*/) { - FairMQMessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize())); + fair::mq::MessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize())); if (Send(msg2, "ps") < 0) { return false; } diff --git a/examples/readout/readout.cxx b/examples/readout/readout.cxx index fb2bfc86..7ec98a6a 100644 --- a/examples/readout/readout.cxx +++ b/examples/readout/readout.cxx @@ -22,7 +22,7 @@ struct Readout : fair::mq::Device fMsgSize = fConfig->GetProperty("msg-size"); fMaxIterations = fConfig->GetProperty("max-iterations"); - fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("rb", + fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("rb", 0, 10000000, [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport @@ -36,7 +36,7 @@ struct Readout : fair::mq::Device bool ConditionalRun() override { - FairMQMessagePtr msg(NewMessageFor("rb", // channel + fair::mq::MessagePtr msg(NewMessageFor("rb", // channel 0, // sub-channel fRegion, // region fRegion->GetData(), // ptr within region @@ -71,7 +71,7 @@ struct Readout : fair::mq::Device int fMsgSize = 10000; uint64_t fMaxIterations = 0; uint64_t fNumIterations = 0; - FairMQUnmanagedRegionPtr fRegion = nullptr; + fair::mq::UnmanagedRegionPtr fRegion = nullptr; std::atomic fNumUnackedMsgs = 0; }; diff --git a/examples/readout/sender.cxx b/examples/readout/sender.cxx index 3faa7823..34148cb3 100644 --- a/examples/readout/sender.cxx +++ b/examples/readout/sender.cxx @@ -21,7 +21,7 @@ struct Sender : fair::mq::Device OnData(fInputChannelName, &Sender::HandleData); } - bool HandleData(FairMQMessagePtr& msg, int /*index*/) + bool HandleData(fair::mq::MessagePtr& msg, int /*index*/) { if (Send(msg, "sr") < 0) { return false; diff --git a/examples/region/sampler.cxx b/examples/region/sampler.cxx index 088c1092..ca745648 100644 --- a/examples/region/sampler.cxx +++ b/examples/region/sampler.cxx @@ -23,7 +23,7 @@ struct Sampler : fair::mq::Device fLinger = fConfig->GetProperty("region-linger"); fMaxIterations = fConfig->GetProperty("max-iterations"); - GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) { + GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) { LOG(info) << "Region event: " << info.event << ": " << (info.managed ? "managed" : "unmanaged") << ", id: " << info.id @@ -36,7 +36,7 @@ struct Sampler : fair::mq::Device regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events regionCfg.lock = true; // mlock region after creation regionCfg.zero = true; // zero region content after creation - fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... + fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... 0, // ... and this sub-channel 10000000, // region size [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport @@ -50,7 +50,7 @@ struct Sampler : fair::mq::Device bool ConditionalRun() override { - FairMQMessagePtr msg(NewMessageFor("data", // channel + fair::mq::MessagePtr msg(NewMessageFor("data", // channel 0, // sub-channel fRegion, // region fRegion->GetData(), // ptr within region @@ -93,7 +93,7 @@ struct Sampler : fair::mq::Device uint32_t fLinger = 100; uint64_t fMaxIterations = 0; uint64_t fNumIterations = 0; - FairMQUnmanagedRegionPtr fRegion = nullptr; + fair::mq::UnmanagedRegionPtr fRegion = nullptr; std::mutex fMtx; uint64_t fNumUnackedMsgs = 0; }; diff --git a/examples/req-rep/client.cxx b/examples/req-rep/client.cxx index 4eee300f..2074f27b 100644 --- a/examples/req-rep/client.cxx +++ b/examples/req-rep/client.cxx @@ -32,11 +32,11 @@ struct Client : fair::mq::Device // its size, // custom deletion function (called when transfer is done), // and pointer to the object managing the data buffer - FairMQMessagePtr req(NewMessage(const_cast(text->c_str()), // data + fair::mq::MessagePtr req(NewMessage(const_cast(text->c_str()), // data text->length(), // size [](void* /*data*/, void* object) { delete static_cast(object); }, // deletion callback text)); // object that manages the data - FairMQMessagePtr rep(NewMessage()); + fair::mq::MessagePtr rep(NewMessage()); LOG(info) << "Sending \"" << fText << "\" to server."; diff --git a/examples/req-rep/server.cxx b/examples/req-rep/server.cxx index f05d98b6..a2d73918 100644 --- a/examples/req-rep/server.cxx +++ b/examples/req-rep/server.cxx @@ -26,7 +26,7 @@ struct Server : fair::mq::Device fMaxIterations = fConfig->GetProperty("max-iterations"); } - bool HandleData(FairMQMessagePtr& req, int) + bool HandleData(fair::mq::MessagePtr& req, int) { LOG(info) << "Received request from client: \"" << std::string(static_cast(req->GetData()), req->GetSize()) << "\""; @@ -34,7 +34,7 @@ struct Server : fair::mq::Device LOG(info) << "Sending reply to client."; - FairMQMessagePtr rep(NewMessage(const_cast(text->c_str()), // data + fair::mq::MessagePtr rep(NewMessage(const_cast(text->c_str()), // data text->length(), // size [](void* /*data*/, void* object) { delete static_cast(object); }, // deletion callback text)); // object that manages the data diff --git a/fairmq/JSONParser.cxx b/fairmq/JSONParser.cxx index 2341ad37..9a61e47e 100644 --- a/fairmq/JSONParser.cxx +++ b/fairmq/JSONParser.cxx @@ -30,13 +30,13 @@ using namespace std; using namespace fair::mq; -using namespace fair::mq::tools; +using namespace tools; using namespace boost::property_tree; namespace fair::mq { -fair::mq::Properties PtreeParser(const ptree& pt, const string& id) +Properties PtreeParser(const ptree& pt, const string& id) { if (id.empty()) { throw ParserError("no device ID provided. Provide with `--id` cmd option"); @@ -47,7 +47,7 @@ fair::mq::Properties PtreeParser(const ptree& pt, const string& id) return helper::DeviceParser(pt.get_child("fairMQOptions"), id); } -fair::mq::Properties JSONParser(const string& filename, const string& deviceId) +Properties JSONParser(const string& filename, const string& deviceId) { ptree pt; LOG(debug) << "Parsing JSON from " << filename << " ..."; @@ -58,9 +58,9 @@ fair::mq::Properties JSONParser(const string& filename, const string& deviceId) namespace helper { -fair::mq::Properties DeviceParser(const ptree& fairMQOptions, const string& deviceId) +Properties DeviceParser(const ptree& fairMQOptions, const string& deviceId) { - fair::mq::Properties properties; + Properties properties; for (const auto& node : fairMQOptions) { if (node.first == "devices") { @@ -82,27 +82,27 @@ fair::mq::Properties DeviceParser(const ptree& fairMQOptions, const string& devi return properties; } -void ChannelParser(const ptree& tree, fair::mq::Properties& properties) +void ChannelParser(const ptree& tree, Properties& properties) { for (const auto& node : tree) { if (node.first == "channels") { for (const auto& cn : node.second) { - fair::mq::Properties commonProperties; - commonProperties.emplace("type", cn.second.get("type", FairMQChannel::DefaultType)); - commonProperties.emplace("method", cn.second.get("method", FairMQChannel::DefaultMethod)); - commonProperties.emplace("address", cn.second.get("address", FairMQChannel::DefaultAddress)); - commonProperties.emplace("transport", cn.second.get("transport", FairMQChannel::DefaultTransportName)); - commonProperties.emplace("sndBufSize", cn.second.get("sndBufSize", FairMQChannel::DefaultSndBufSize)); - commonProperties.emplace("rcvBufSize", cn.second.get("rcvBufSize", FairMQChannel::DefaultRcvBufSize)); - commonProperties.emplace("sndKernelSize", cn.second.get("sndKernelSize", FairMQChannel::DefaultSndKernelSize)); - commonProperties.emplace("rcvKernelSize", cn.second.get("rcvKernelSize", FairMQChannel::DefaultRcvKernelSize)); - commonProperties.emplace("sndTimeoutMs", cn.second.get("sndTimeoutMs", FairMQChannel::DefaultSndTimeoutMs)); - commonProperties.emplace("rcvTimeoutMs", cn.second.get("rcvTimeoutMs", FairMQChannel::DefaultRcvTimeoutMs)); - commonProperties.emplace("linger", cn.second.get("linger", FairMQChannel::DefaultLinger)); - commonProperties.emplace("rateLogging", cn.second.get("rateLogging", FairMQChannel::DefaultRateLogging)); - commonProperties.emplace("portRangeMin", cn.second.get("portRangeMin", FairMQChannel::DefaultPortRangeMin)); - commonProperties.emplace("portRangeMax", cn.second.get("portRangeMax", FairMQChannel::DefaultPortRangeMax)); - commonProperties.emplace("autoBind", cn.second.get("autoBind", FairMQChannel::DefaultAutoBind)); + Properties commonProperties; + commonProperties.emplace("type", cn.second.get("type", Channel::DefaultType)); + commonProperties.emplace("method", cn.second.get("method", Channel::DefaultMethod)); + commonProperties.emplace("address", cn.second.get("address", Channel::DefaultAddress)); + commonProperties.emplace("transport", cn.second.get("transport", Channel::DefaultTransportName)); + commonProperties.emplace("sndBufSize", cn.second.get("sndBufSize", Channel::DefaultSndBufSize)); + commonProperties.emplace("rcvBufSize", cn.second.get("rcvBufSize", Channel::DefaultRcvBufSize)); + commonProperties.emplace("sndKernelSize", cn.second.get("sndKernelSize", Channel::DefaultSndKernelSize)); + commonProperties.emplace("rcvKernelSize", cn.second.get("rcvKernelSize", Channel::DefaultRcvKernelSize)); + commonProperties.emplace("sndTimeoutMs", cn.second.get("sndTimeoutMs", Channel::DefaultSndTimeoutMs)); + commonProperties.emplace("rcvTimeoutMs", cn.second.get("rcvTimeoutMs", Channel::DefaultRcvTimeoutMs)); + commonProperties.emplace("linger", cn.second.get("linger", Channel::DefaultLinger)); + commonProperties.emplace("rateLogging", cn.second.get("rateLogging", Channel::DefaultRateLogging)); + commonProperties.emplace("portRangeMin", cn.second.get("portRangeMin", Channel::DefaultPortRangeMin)); + commonProperties.emplace("portRangeMax", cn.second.get("portRangeMax", Channel::DefaultPortRangeMax)); + commonProperties.emplace("autoBind", cn.second.get("autoBind", Channel::DefaultAutoBind)); string name = cn.second.get("name"); int numSockets = cn.second.get("numSockets", 0); @@ -128,7 +128,7 @@ void ChannelParser(const ptree& tree, fair::mq::Properties& properties) } } -void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties, const string& channelName, const fair::mq::Properties& commonProperties) +void SubChannelParser(const ptree& channelTree, Properties& properties, const string& channelName, const Properties& commonProperties) { // for each socket in channel int i = 0; @@ -137,7 +137,7 @@ void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties if (node.first == "sockets") { for (const auto& sn : node.second) { // a sub-channel inherits relevant properties from the common channel ... - fair::mq::Properties newProperties(commonProperties); + Properties newProperties(commonProperties); // ... and adds/overwrites its own properties newProperties["type"] = sn.second.get("type", boost::any_cast(commonProperties.at("type"))); @@ -177,7 +177,7 @@ void SubChannelParser(const ptree& channelTree, fair::mq::Properties& properties LOG(trace) << "\tNo sockets specified,"; LOG(trace) << "\tapplying common settings to the channel:"; - fair::mq::Properties newProperties(commonProperties); + Properties newProperties(commonProperties); for (auto& p : newProperties) { LOG(trace) << "\t" << setw(13) << left << p.first << " : " << p.second; diff --git a/fairmq/MemoryResourceTools.h b/fairmq/MemoryResourceTools.h index 45391680..d91c43bd 100644 --- a/fairmq/MemoryResourceTools.h +++ b/fairmq/MemoryResourceTools.h @@ -12,7 +12,7 @@ /// /// @author Mikolaj Krzewicki, mkrzewic@cern.ch -#include +#include #include namespace fair::mq @@ -28,13 +28,13 @@ template // pmr::polymorphic_allocator, // typename ContainerT::allocator_type>::value == true, -// FairMQMessagePtr>::type -FairMQMessagePtr getMessage(ContainerT &&container_, FairMQMemoryResource *targetResource = nullptr) +// MessagePtr>::type +MessagePtr getMessage(ContainerT &&container_, MemoryResource *targetResource = nullptr) { auto container = std::move(container_); auto alloc = container.get_allocator(); - auto resource = dynamic_cast(alloc.resource()); + auto resource = dynamic_cast(alloc.resource()); if (!resource && !targetResource) { throw std::runtime_error("Neither the container or target resource specified"); } diff --git a/fairmq/Plugin.cxx b/fairmq/Plugin.cxx index 9ed1256d..157f179f 100644 --- a/fairmq/Plugin.cxx +++ b/fairmq/Plugin.cxx @@ -7,7 +7,7 @@ ********************************************************************************/ #include -#include +#include #include using namespace std; diff --git a/fairmq/ProgOptions.cxx b/fairmq/ProgOptions.cxx index 2fe63f7b..b0e22b3f 100644 --- a/fairmq/ProgOptions.cxx +++ b/fairmq/ProgOptions.cxx @@ -351,7 +351,7 @@ void ProgOptions::DeleteProperty(const string& key) vm.erase(key); } -void ProgOptions::AddChannel(const string& name, const FairMQChannel& channel) +void ProgOptions::AddChannel(const string& name, const Channel& channel) { lock_guard lock(fMtx); unordered_map existingChannels = GetChannelInfoImpl(); diff --git a/fairmq/devices/BenchmarkSampler.h b/fairmq/devices/BenchmarkSampler.h index 013aef91..da00ebb9 100644 --- a/fairmq/devices/BenchmarkSampler.h +++ b/fairmq/devices/BenchmarkSampler.h @@ -44,7 +44,7 @@ class BenchmarkSampler : public Device void Run() override { // store the channel reference to avoid traversing the map on every loop iteration - FairMQChannel& dataOutChannel = GetChannel(fOutChannelName, 0); + Channel& dataOutChannel = GetChannel(fOutChannelName, 0); LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations."; auto tStart = std::chrono::high_resolution_clock::now(); @@ -53,7 +53,7 @@ class BenchmarkSampler : public Device while (!NewStatePending()) { if (fMultipart) { - FairMQParts parts; + Parts parts; for (size_t i = 0; i < fNumParts; ++i) { parts.AddPart(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment})); @@ -71,7 +71,7 @@ class BenchmarkSampler : public Device ++fNumIterations; } } else { - FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment})); + MessagePtr msg(dataOutChannel.NewMessage(fMsgSize, fair::mq::Alignment{fMsgAlignment})); if (fMemSet) { std::memset(msg->GetData(), 0, msg->GetSize()); } diff --git a/fairmq/devices/Merger.h b/fairmq/devices/Merger.h index e08eba17..173e5d90 100644 --- a/fairmq/devices/Merger.h +++ b/fairmq/devices/Merger.h @@ -9,7 +9,7 @@ #ifndef FAIR_MQ_MERGER_H #define FAIR_MQ_MERGER_H -#include +#include #include #include @@ -45,13 +45,13 @@ class Merger : public Device { int numInputs = GetNumSubChannels(fInChannelName); - std::vector chans; + std::vector chans; for (auto& chan : fChannels.at(fInChannelName)) { chans.push_back(&chan); } - FairMQPollerPtr poller(NewPoller(chans)); + PollerPtr poller(NewPoller(chans)); if (fMultipart) { while (!NewStatePending()) { @@ -61,7 +61,7 @@ class Merger : public Device for (int i = 0; i < numInputs; ++i) { // Check if the channel has data ready to be received. if (poller->CheckInput(i)) { - FairMQParts payload; + Parts payload; if (Receive(payload, fInChannelName, i) >= 0) { if (Send(payload, fOutChannelName) < 0) { @@ -83,7 +83,7 @@ class Merger : public Device for (int i = 0; i < numInputs; ++i) { // Check if the channel has data ready to be received. if (poller->CheckInput(i)) { - FairMQMessagePtr payload(fTransportFactory->CreateMessage()); + MessagePtr payload(fTransportFactory->CreateMessage()); if (Receive(payload, fInChannelName, i) >= 0) { if (Send(payload, fOutChannelName) < 0) { diff --git a/fairmq/devices/Multiplier.h b/fairmq/devices/Multiplier.h index 5b48bf7a..fda65ac1 100644 --- a/fairmq/devices/Multiplier.h +++ b/fairmq/devices/Multiplier.h @@ -40,11 +40,11 @@ class Multiplier : public Device } - bool HandleSingleData(std::unique_ptr& payload, int) + bool HandleSingleData(std::unique_ptr& payload, int) { for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel for (unsigned int j = 0; j < GetNumSubChannels(fOutChannelNames.at(i)); ++j) { // all subChannels in a channel - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + MessagePtr msgCopy(fTransportFactory->CreateMessage()); msgCopy->Copy(*payload); Send(msgCopy, fOutChannelNames.at(i), j); @@ -54,7 +54,7 @@ class Multiplier : public Device unsigned int lastChannelSize = GetNumSubChannels(fOutChannelNames.back()); for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + MessagePtr msgCopy(fTransportFactory->CreateMessage()); msgCopy->Copy(*payload); Send(msgCopy, fOutChannelNames.back(), i); @@ -65,14 +65,14 @@ class Multiplier : public Device return true; } - bool HandleMultipartData(FairMQParts& payload, int) + bool HandleMultipartData(Parts& payload, int) { for (unsigned int i = 0; i < fOutChannelNames.size() - 1; ++i) { // all except last channel for (unsigned int j = 0; j < GetNumSubChannels(fOutChannelNames.at(i)); ++j) { // all subChannels in a channel - FairMQParts parts; + Parts parts; for (int k = 0; k < payload.Size(); ++k) { - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + MessagePtr msgCopy(fTransportFactory->CreateMessage()); msgCopy->Copy(payload.AtRef(k)); parts.AddPart(std::move(msgCopy)); } @@ -84,10 +84,10 @@ class Multiplier : public Device unsigned int lastChannelSize = GetNumSubChannels(fOutChannelNames.back()); for (unsigned int i = 0; i < lastChannelSize - 1; ++i) { // iterate over all except last subChannels of the last channel - FairMQParts parts; + Parts parts; for (int k = 0; k < payload.Size(); ++k) { - FairMQMessagePtr msgCopy(fTransportFactory->CreateMessage()); + MessagePtr msgCopy(fTransportFactory->CreateMessage()); msgCopy->Copy(payload.AtRef(k)); parts.AddPart(std::move(msgCopy)); } diff --git a/fairmq/devices/Proxy.h b/fairmq/devices/Proxy.h index 0c89189b..00089232 100644 --- a/fairmq/devices/Proxy.h +++ b/fairmq/devices/Proxy.h @@ -34,7 +34,7 @@ class Proxy : public Device { if (fMultipart) { while (!NewStatePending()) { - FairMQParts payload; + Parts payload; if (Receive(payload, fInChannelName) >= 0) { if (Send(payload, fOutChannelName) < 0) { LOG(debug) << "Transfer interrupted"; @@ -47,7 +47,7 @@ class Proxy : public Device } } else { while (!NewStatePending()) { - FairMQMessagePtr payload(fTransportFactory->CreateMessage()); + MessagePtr payload(fTransportFactory->CreateMessage()); if (Receive(payload, fInChannelName) >= 0) { if (Send(payload, fOutChannelName) < 0) { LOG(debug) << "Transfer interrupted"; diff --git a/fairmq/devices/Sink.h b/fairmq/devices/Sink.h index 32d61854..c10ce55f 100644 --- a/fairmq/devices/Sink.h +++ b/fairmq/devices/Sink.h @@ -9,7 +9,6 @@ #ifndef FAIR_MQ_SINK_H #define FAIR_MQ_SINK_H -#include #include #include @@ -48,7 +47,7 @@ class Sink : public Device void Run() override { // store the channel reference to avoid traversing the map on every loop iteration - FairMQChannel& dataInChannel = GetChannel(fInChannelName, 0); + Channel& dataInChannel = GetChannel(fInChannelName, 0); LOG(info) << "Starting sink and expecting to receive " << fMaxIterations << " messages."; auto tStart = std::chrono::high_resolution_clock::now(); @@ -70,7 +69,7 @@ class Sink : public Device while (!NewStatePending()) { if (fMultipart) { - FairMQParts parts; + Parts parts; if (dataInChannel.Receive(parts) < 0) { continue; } @@ -80,7 +79,7 @@ class Sink : public Device } } } else { - FairMQMessagePtr msg(dataInChannel.NewMessage()); + MessagePtr msg(dataInChannel.NewMessage()); if (dataInChannel.Receive(msg) < 0) { continue; } diff --git a/fairmq/devices/Splitter.h b/fairmq/devices/Splitter.h index addc31e1..2632f162 100644 --- a/fairmq/devices/Splitter.h +++ b/fairmq/devices/Splitter.h @@ -34,9 +34,9 @@ class Splitter : public Device fDirection = 0; if (fMultipart) { - OnData(fInChannelName, &Splitter::HandleData); + OnData(fInChannelName, &Splitter::HandleData); } else { - OnData(fInChannelName, &Splitter::HandleData); + OnData(fInChannelName, &Splitter::HandleData); } } diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index ef67fae9..c79e4bb8 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include diff --git a/fairmq/plugins/PMIx/PMIxCommands.h b/fairmq/plugins/PMIx/PMIxCommands.h index 6cc45e42..e3f2b9a6 100644 --- a/fairmq/plugins/PMIx/PMIxCommands.h +++ b/fairmq/plugins/PMIx/PMIxCommands.h @@ -11,7 +11,7 @@ #include "PMIx.hpp" -#include +#include #include #include // make_unique #include diff --git a/fairmq/plugins/PMIx/PMIxPlugin.h b/fairmq/plugins/PMIx/PMIxPlugin.h index ca3e305b..f6e8ee4e 100644 --- a/fairmq/plugins/PMIx/PMIxPlugin.h +++ b/fairmq/plugins/PMIx/PMIxPlugin.h @@ -14,7 +14,7 @@ #include #include -#include +#include #include #include diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index b0e9a515..b886aedb 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -11,7 +11,7 @@ #include #include -using FairMQDevicePtr = FairMQDevice*; +using FairMQDevicePtr = fair::mq::Device*; // to be implemented by the user to return a child class of FairMQDevice FairMQDevicePtr getDevice(const fair::mq::ProgOptions& config); @@ -45,7 +45,7 @@ int main(int argc, char* argv[]) // }); runner.AddHook([](DeviceRunner& r){ - r.fDevice = std::unique_ptr{getDevice(r.fConfig)}; + r.fDevice = std::unique_ptr{getDevice(r.fConfig)}; }); return runner.Run(); diff --git a/fairmq/shmem/Message.h b/fairmq/shmem/Message.h index b81a3340..37c8779f 100644 --- a/fairmq/shmem/Message.h +++ b/fairmq/shmem/Message.h @@ -35,7 +35,7 @@ class Message final : public fair::mq::Message friend class Socket; public: - Message(Manager& manager, FairMQTransportFactory* factory = nullptr) + Message(Manager& manager, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) , fQueued(false) @@ -46,7 +46,7 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } - Message(Manager& manager, Alignment alignment, FairMQTransportFactory* factory = nullptr) + Message(Manager& manager, Alignment alignment, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) , fQueued(false) @@ -58,7 +58,7 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } - Message(Manager& manager, const size_t size, FairMQTransportFactory* factory = nullptr) + Message(Manager& manager, const size_t size, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) , fQueued(false) @@ -70,7 +70,7 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } - Message(Manager& manager, const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr) + Message(Manager& manager, const size_t size, Alignment alignment, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) , fQueued(false) @@ -83,7 +83,7 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } - Message(Manager& manager, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) + Message(Manager& manager, void* data, const size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) , fQueued(false) @@ -102,7 +102,7 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } - Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr) + Message(Manager& manager, UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) , fQueued(false) @@ -125,7 +125,7 @@ class Message final : public fair::mq::Message fManager.IncrementMsgCounter(); } - Message(Manager& manager, MetaHeader& hdr, FairMQTransportFactory* factory = nullptr) + Message(Manager& manager, MetaHeader& hdr, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fManager(manager) , fQueued(false) @@ -169,7 +169,7 @@ class Message final : public fair::mq::Message InitializeChunk(size, fAlignment); } - void Rebuild(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override + void Rebuild(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override { CloseMessage(); fQueued = false; diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index 65e07516..3d373880 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -51,7 +51,7 @@ struct ZMsg class Socket final : public fair::mq::Socket { public: - Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id, void* context, FairMQTransportFactory* fac = nullptr) + Socket(Manager& manager, const std::string& type, const std::string& name, const std::string& id, void* context, fair::mq::TransportFactory* fac = nullptr) : fair::mq::Socket(fac) , fManager(manager) , fId(id + "." + name + "." + type) diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index ed02c7af..c8720e84 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -113,7 +113,7 @@ class TransportFactory final : public fair::mq::TransportFactory return std::make_unique(*fManager, size, alignment, this); } - MessagePtr CreateMessage(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override + MessagePtr CreateMessage(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override { return std::make_unique(*fManager, data, size, ffn, hint, this); } @@ -128,17 +128,17 @@ class TransportFactory final : public fair::mq::TransportFactory return std::make_unique(*fManager, type, name, GetId(), fZmqCtx, this); } - PollerPtr CreatePoller(const std::vector& channels) const override + PollerPtr CreatePoller(const std::vector& channels) const override { return std::make_unique(channels); } - PollerPtr CreatePoller(const std::vector& channels) const override + PollerPtr CreatePoller(const std::vector& channels) const override { return std::make_unique(channels); } - PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override + PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override { return std::make_unique(channelsMap, channelList); } diff --git a/fairmq/shmem/UnmanagedRegionImpl.h b/fairmq/shmem/UnmanagedRegionImpl.h index 82caa7d5..18ef5ef9 100644 --- a/fairmq/shmem/UnmanagedRegionImpl.h +++ b/fairmq/shmem/UnmanagedRegionImpl.h @@ -34,7 +34,7 @@ class UnmanagedRegionImpl final : public fair::mq::UnmanagedRegion RegionCallback callback, RegionBulkCallback bulkCallback, fair::mq::RegionConfig cfg, - FairMQTransportFactory* factory) + fair::mq::TransportFactory* factory) : fair::mq::UnmanagedRegion(factory) , fManager(manager) , fRegion(nullptr) diff --git a/fairmq/zeromq/Context.h b/fairmq/zeromq/Context.h index b6fe3964..a02b6794 100644 --- a/fairmq/zeromq/Context.h +++ b/fairmq/zeromq/Context.h @@ -10,8 +10,9 @@ #define FAIR_MQ_ZMQ_CONTEXT_H_ #include -#include -#include +#include + +#include #include diff --git a/fairmq/zeromq/Message.h b/fairmq/zeromq/Message.h index b012666c..9624229c 100644 --- a/fairmq/zeromq/Message.h +++ b/fairmq/zeromq/Message.h @@ -10,9 +10,10 @@ #define FAIR_MQ_ZMQ_MESSAGE_H #include -#include -#include -#include +#include +#include + +#include #include @@ -38,7 +39,7 @@ class Message final : public fair::mq::Message Message& operator=(const Message&) = delete; Message& operator=(Message&&) = delete; - Message(FairMQTransportFactory* factory = nullptr) + Message(fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fMsg(std::make_unique()) { @@ -47,7 +48,7 @@ class Message final : public fair::mq::Message } } - Message(Alignment alignment, FairMQTransportFactory* factory = nullptr) + Message(Alignment alignment, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fAlignment(alignment.alignment) , fMsg(std::make_unique()) @@ -57,7 +58,7 @@ class Message final : public fair::mq::Message } } - Message(const size_t size, FairMQTransportFactory* factory = nullptr) + Message(const size_t size, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fMsg(std::make_unique()) { @@ -80,7 +81,7 @@ class Message final : public fair::mq::Message return {static_cast(fullBufferPtr), static_cast(alignedPartPtr)}; } - Message(const size_t size, Alignment alignment, FairMQTransportFactory* factory = nullptr) + Message(const size_t size, Alignment alignment, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fAlignment(alignment.alignment) , fMsg(std::make_unique()) @@ -97,7 +98,7 @@ class Message final : public fair::mq::Message } } - Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) + Message(void* data, const size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fMsg(std::make_unique()) { @@ -106,7 +107,7 @@ class Message final : public fair::mq::Message } } - Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr) + Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Message(factory) , fMsg(std::make_unique()) { @@ -184,7 +185,7 @@ class Message final : public fair::mq::Message } } - void Rebuild(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override + void Rebuild(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override { CloseMessage(); fMsg = std::make_unique(); diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index 7063662b..2167600b 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -9,14 +9,15 @@ #ifndef FAIR_MQ_ZMQ_SOCKET_H #define FAIR_MQ_ZMQ_SOCKET_H -#include -#include -#include +#include +#include #include #include #include #include +#include + #include #include @@ -30,7 +31,7 @@ namespace fair::mq::zmq class Socket final : public fair::mq::Socket { public: - Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id, FairMQTransportFactory* factory = nullptr) + Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id, fair::mq::TransportFactory* factory = nullptr) : fair::mq::Socket(factory) , fCtx(ctx) , fId(id + "." + name + "." + type) @@ -219,7 +220,7 @@ class Socket final : public fair::mq::Socket bool repeat = false; do { - FairMQMessagePtr part = std::make_unique(GetTransport()); + fair::mq::MessagePtr part = std::make_unique(GetTransport()); int nbytes = zmq_msg_recv(static_cast(part.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h index ed8249c9..f94db48b 100644 --- a/fairmq/zeromq/TransportFactory.h +++ b/fairmq/zeromq/TransportFactory.h @@ -14,7 +14,7 @@ #include #include #include -#include +#include #include #include // unique_ptr, make_unique @@ -24,11 +24,11 @@ namespace fair::mq::zmq { -class TransportFactory final : public FairMQTransportFactory +class TransportFactory final : public fair::mq::TransportFactory { public: TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr) - : FairMQTransportFactory(id) + : fair::mq::TransportFactory(id) , fCtx(nullptr) { int major = 0, minor = 0, patch = 0; @@ -68,7 +68,7 @@ class TransportFactory final : public FairMQTransportFactory return std::make_unique(size, alignment, this); } - MessagePtr CreateMessage(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override + MessagePtr CreateMessage(void* data, size_t size, fair::mq::FreeFn* ffn, void* hint = nullptr) override { return std::make_unique(data, size, ffn, hint, this); } @@ -83,17 +83,17 @@ class TransportFactory final : public FairMQTransportFactory return std::make_unique(*fCtx, type, name, GetId(), this); } - PollerPtr CreatePoller(const std::vector& channels) const override + PollerPtr CreatePoller(const std::vector& channels) const override { return std::make_unique(channels); } - PollerPtr CreatePoller(const std::vector& channels) const override + PollerPtr CreatePoller(const std::vector& channels) const override { return std::make_unique(channels); } - PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override + PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override { return std::make_unique(channelsMap, channelList); } diff --git a/fairmq/zeromq/UnmanagedRegion.h b/fairmq/zeromq/UnmanagedRegion.h index 295de0cc..41e518dc 100644 --- a/fairmq/zeromq/UnmanagedRegion.h +++ b/fairmq/zeromq/UnmanagedRegion.h @@ -10,8 +10,9 @@ #define FAIR_MQ_ZMQ_UNMANAGEDREGION_H #include -#include -#include +#include + +#include #include // size_t #include @@ -33,7 +34,7 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, - FairMQTransportFactory* factory, + fair::mq::TransportFactory* factory, fair::mq::RegionConfig cfg) : fair::mq::UnmanagedRegion(factory) , fCtx(ctx) diff --git a/test/device/TestReceiver.h b/test/device/TestReceiver.h index 00a2f920..26918aba 100644 --- a/test/device/TestReceiver.h +++ b/test/device/TestReceiver.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_TESTRECEIVER_H #define FAIR_MQ_TEST_TESTRECEIVER_H -#include -#include +#include +#include #include #include diff --git a/test/device/TestSender.h b/test/device/TestSender.h index 1666e6d9..2f52842d 100644 --- a/test/device/TestSender.h +++ b/test/device/TestSender.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_TESTSENDER_H #define FAIR_MQ_TEST_TESTSENDER_H -#include -#include +#include +#include #include #include diff --git a/test/device/_config.cxx b/test/device/_config.cxx index ca9ed87c..a56c1546 100644 --- a/test/device/_config.cxx +++ b/test/device/_config.cxx @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include +#include #include #include @@ -19,52 +19,53 @@ namespace _config { using namespace std; +using namespace fair::mq; -void control(FairMQDevice& device) +void control(Device& device) { - device.ChangeState(fair::mq::Transition::InitDevice); - device.WaitForState(fair::mq::State::InitializingDevice); - device.ChangeState(fair::mq::Transition::CompleteInit); - device.WaitForState(fair::mq::State::Initialized); - device.ChangeState(fair::mq::Transition::Bind); - device.WaitForState(fair::mq::State::Bound); - device.ChangeState(fair::mq::Transition::Connect); - device.WaitForState(fair::mq::State::DeviceReady); - device.ChangeState(fair::mq::Transition::InitTask); - device.WaitForState(fair::mq::State::Ready); + device.ChangeState(Transition::InitDevice); + device.WaitForState(State::InitializingDevice); + device.ChangeState(Transition::CompleteInit); + device.WaitForState(State::Initialized); + device.ChangeState(Transition::Bind); + device.WaitForState(State::Bound); + device.ChangeState(Transition::Connect); + device.WaitForState(State::DeviceReady); + device.ChangeState(Transition::InitTask); + device.WaitForState(State::Ready); - device.ChangeState(fair::mq::Transition::Run); - device.WaitForState(fair::mq::State::Ready); + device.ChangeState(Transition::Run); + device.WaitForState(State::Ready); - device.ChangeState(fair::mq::Transition::ResetTask); - device.WaitForState(fair::mq::State::DeviceReady); - device.ChangeState(fair::mq::Transition::ResetDevice); - device.WaitForState(fair::mq::State::Idle); + device.ChangeState(Transition::ResetTask); + device.WaitForState(State::DeviceReady); + device.ChangeState(Transition::ResetDevice); + device.WaitForState(State::Idle); - device.ChangeState(fair::mq::Transition::End); + device.ChangeState(Transition::End); } -class TestDevice : public FairMQDevice +class TestDevice : public Device { public: TestDevice(const string& transport) { - fDeviceThread = thread(&FairMQDevice::RunStateMachine, this); + fDeviceThread = thread(&Device::RunStateMachine, this); SetTransport(transport); - ChangeState(fair::mq::Transition::InitDevice); - WaitForState(fair::mq::State::InitializingDevice); - ChangeState(fair::mq::Transition::CompleteInit); - WaitForState(fair::mq::State::Initialized); - ChangeState(fair::mq::Transition::Bind); - WaitForState(fair::mq::State::Bound); - ChangeState(fair::mq::Transition::Connect); - WaitForState(fair::mq::State::DeviceReady); - ChangeState(fair::mq::Transition::InitTask); - WaitForState(fair::mq::State::Ready); + ChangeState(Transition::InitDevice); + WaitForState(State::InitializingDevice); + ChangeState(Transition::CompleteInit); + WaitForState(State::Initialized); + ChangeState(Transition::Bind); + WaitForState(State::Bound); + ChangeState(Transition::Connect); + WaitForState(State::DeviceReady); + ChangeState(Transition::InitTask); + WaitForState(State::Ready); - ChangeState(fair::mq::Transition::Run); + ChangeState(Transition::Run); } TestDevice(const TestDevice&) = delete; @@ -74,15 +75,15 @@ class TestDevice : public FairMQDevice ~TestDevice() override { - WaitForState(fair::mq::State::Running); - ChangeState(fair::mq::Transition::Stop); - WaitForState(fair::mq::State::Ready); - ChangeState(fair::mq::Transition::ResetTask); - WaitForState(fair::mq::State::DeviceReady); - ChangeState(fair::mq::Transition::ResetDevice); - WaitForState(fair::mq::State::Idle); + WaitForState(State::Running); + ChangeState(Transition::Stop); + WaitForState(State::Ready); + ChangeState(Transition::ResetTask); + WaitForState(State::DeviceReady); + ChangeState(Transition::ResetDevice); + WaitForState(State::Idle); - ChangeState(fair::mq::Transition::End); + ChangeState(Transition::End); if (fDeviceThread.joinable()) { fDeviceThread.join(); @@ -100,7 +101,7 @@ class Config : public ::testing::Test string TestDeviceSetConfig(const string& transport) { - fair::mq::ProgOptions config; + ProgOptions config; vector emptyArgs = {"dummy", "--id", "test", "--color", "false"}; @@ -108,10 +109,10 @@ class Config : public ::testing::Test config.SetProperty("transport", transport); - FairMQDevice device; + Device device; device.SetConfig(config); - FairMQChannel channel; + Channel channel; channel.UpdateType("pub"); channel.UpdateMethod("connect"); channel.UpdateAddress("tcp://localhost:5558"); @@ -130,14 +131,14 @@ class Config : public ::testing::Test string TestDeviceSetConfigWithPlugins(const string& transport) { - fair::mq::ProgOptions config; + ProgOptions config; vector emptyArgs = {"dummy", "--id", "test", "--color", "false"}; config.SetProperty("transport", transport); - FairMQDevice device; - fair::mq::PluginManager mgr; + Device device; + PluginManager mgr; mgr.LoadPlugin("s:config"); mgr.ForEachPluginProgOptions([&](boost::program_options::options_description options) { config.AddToCmdLineOptions(options); @@ -146,10 +147,10 @@ class Config : public ::testing::Test mgr.InstantiatePlugins(); config.ParseAll(emptyArgs, true); - fair::mq::DeviceRunner::HandleGeneralOptions(config); + DeviceRunner::HandleGeneralOptions(config); device.SetConfig(config); - FairMQChannel channel; + Channel channel; channel.UpdateType("pub"); channel.UpdateMethod("connect"); channel.UpdateAddress("tcp://localhost:5558"); @@ -177,16 +178,16 @@ class Config : public ::testing::Test string TestDeviceSetTransport(const string& transport) { - FairMQDevice device; + Device device; device.SetTransport(transport); - FairMQChannel channel; + Channel channel; channel.UpdateType("pub"); channel.UpdateMethod("connect"); channel.UpdateAddress("tcp://localhost:5558"); device.AddChannel("data", std::move(channel)); - thread t(&FairMQDevice::RunStateMachine, &device); + thread t(&Device::RunStateMachine, &device); control(device); diff --git a/test/device/_error_state.cxx b/test/device/_error_state.cxx index e9758576..a63c3c07 100644 --- a/test/device/_error_state.cxx +++ b/test/device/_error_state.cxx @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include @@ -22,24 +22,25 @@ namespace { using namespace std; +using namespace fair::mq; using namespace fair::mq::test; using namespace fair::mq::tools; -class BadDevice : public FairMQDevice +class BadDevice : public Device { public: BadDevice() { fDeviceThread = thread([&](){ - EXPECT_THROW(RunStateMachine(), fair::mq::MessageError); + EXPECT_THROW(RunStateMachine(), MessageError); }); SetTransport("shmem"); - ChangeState(fair::mq::Transition::InitDevice); - WaitForState(fair::mq::State::InitializingDevice); - ChangeState(fair::mq::Transition::CompleteInit); - WaitForState(fair::mq::State::Initialized); + ChangeState(Transition::InitDevice); + WaitForState(State::InitializingDevice); + ChangeState(Transition::CompleteInit); + WaitForState(State::Initialized); parts.AddPart(NewMessage()); } @@ -51,7 +52,7 @@ class BadDevice : public FairMQDevice ~BadDevice() override { - ChangeState(fair::mq::Transition::ResetDevice); + ChangeState(Transition::ResetDevice); if (fDeviceThread.joinable()) { fDeviceThread.join(); @@ -60,12 +61,12 @@ class BadDevice : public FairMQDevice private: thread fDeviceThread; - FairMQParts parts; + Parts parts; }; void RunErrorStateIn(const string& state, const string& control, const string& input = "") { - size_t session{fair::mq::tools::UuidHash()}; + size_t session{tools::UuidHash()}; execute_result result{"", 100}; thread device_thread([&]() { diff --git a/test/device/_multiple_devices.cxx b/test/device/_multiple_devices.cxx index b4ae0eac..c771f1a4 100644 --- a/test/device/_multiple_devices.cxx +++ b/test/device/_multiple_devices.cxx @@ -21,7 +21,7 @@ namespace using namespace std; using namespace fair::mq; -void control(FairMQDevice& device) +void control(Device& device) { thread t([&] { device.ChangeState(Transition::InitDevice); @@ -62,7 +62,7 @@ class MultipleDevices : public ::testing::Test { test::Sender sender("data"); sender.SetTransport("zeromq"); - FairMQChannel channel("push", "connect", "ipc://multiple-devices-test"); + Channel channel("push", "connect", "ipc://multiple-devices-test"); channel.UpdateRateLogging(0); sender.AddChannel("data", std::move(channel)); @@ -75,7 +75,7 @@ class MultipleDevices : public ::testing::Test { test::Receiver receiver("data"); receiver.SetTransport("zeromq"); - FairMQChannel channel("pull", "bind", "ipc://multiple-devices-test"); + Channel channel("pull", "bind", "ipc://multiple-devices-test"); channel.UpdateRateLogging(0); receiver.AddChannel("data", std::move(channel)); diff --git a/test/device/_transitions.cxx b/test/device/_transitions.cxx index 7e84e86f..00ad6fb7 100644 --- a/test/device/_transitions.cxx +++ b/test/device/_transitions.cxx @@ -6,8 +6,8 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include +#include #include @@ -20,7 +20,7 @@ namespace using namespace std; using namespace fair::mq; -class SlowDevice : public FairMQDevice +class SlowDevice : public Device { public: SlowDevice() = default; @@ -34,7 +34,7 @@ class SlowDevice : public FairMQDevice void transitionTo(const vector& states, int numExpectedStates) { - FairMQDevice device; + Device device; thread t([&] { for (const auto& s : states) { diff --git a/test/device/_version.cxx b/test/device/_version.cxx index 2c0d3837..6e54c6e3 100644 --- a/test/device/_version.cxx +++ b/test/device/_version.cxx @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include +#include #include diff --git a/test/helper/devices/TestErrorState.h b/test/helper/devices/TestErrorState.h index 2e89addd..d848cb8b 100644 --- a/test/helper/devices/TestErrorState.h +++ b/test/helper/devices/TestErrorState.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_ERROR_STATE_H #define FAIR_MQ_TEST_ERROR_STATE_H -#include -#include +#include +#include #include diff --git a/test/helper/devices/TestExceptions.h b/test/helper/devices/TestExceptions.h index 48de0870..46a77f9c 100644 --- a/test/helper/devices/TestExceptions.h +++ b/test/helper/devices/TestExceptions.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_EXCEPTIONS_H #define FAIR_MQ_TEST_EXCEPTIONS_H -#include -#include +#include +#include #include #include diff --git a/test/helper/devices/TestPairLeft.h b/test/helper/devices/TestPairLeft.h index 573f743f..05afa975 100644 --- a/test/helper/devices/TestPairLeft.h +++ b/test/helper/devices/TestPairLeft.h @@ -9,7 +9,7 @@ #ifndef FAIR_MQ_TEST_PAIRLEFT_H #define FAIR_MQ_TEST_PAIRLEFT_H -#include +#include #include #include diff --git a/test/helper/devices/TestPairRight.h b/test/helper/devices/TestPairRight.h index 281e2815..2d3c5e8d 100644 --- a/test/helper/devices/TestPairRight.h +++ b/test/helper/devices/TestPairRight.h @@ -9,7 +9,7 @@ #ifndef FAIR_MQ_TEST_PAIRRIGHT_H #define FAIR_MQ_TEST_PAIRRIGHT_H -#include +#include #include #include #include diff --git a/test/helper/devices/TestPollIn.h b/test/helper/devices/TestPollIn.h index f86c7328..6b8c6652 100644 --- a/test/helper/devices/TestPollIn.h +++ b/test/helper/devices/TestPollIn.h @@ -9,17 +9,20 @@ #ifndef FAIR_MQ_TEST_POLLIN_H #define FAIR_MQ_TEST_POLLIN_H -#include -#include +#include #include + +#include + #include namespace fair::mq::test { using namespace std; +using namespace fair::mq; -class PollIn : public FairMQDevice +class PollIn : public Device { public: PollIn() @@ -35,12 +38,12 @@ class PollIn : public FairMQDevice auto Run() -> void override { - vector chans; + vector chans; chans.push_back(&GetChannel("data1", 0)); chans.push_back(&GetChannel("data2", 0)); - FairMQPollerPtr poller = nullptr; + PollerPtr poller = nullptr; if (fPollType == 0) { @@ -59,8 +62,8 @@ class PollIn : public FairMQDevice bool arrived2 = false; bool bothArrived = false; - FairMQMessagePtr msg1(NewMessage()); - FairMQMessagePtr msg2(NewMessage()); + MessagePtr msg1(NewMessage()); + MessagePtr msg2(NewMessage()); while (!bothArrived) { diff --git a/test/helper/devices/TestPollOut.h b/test/helper/devices/TestPollOut.h index 654a8979..94a4af71 100644 --- a/test/helper/devices/TestPollOut.h +++ b/test/helper/devices/TestPollOut.h @@ -9,7 +9,7 @@ #ifndef FAIR_MQ_TEST_POLLOUT_H #define FAIR_MQ_TEST_POLLOUT_H -#include +#include #include namespace fair::mq::test diff --git a/test/helper/devices/TestPub.h b/test/helper/devices/TestPub.h index 30054372..ee9c307f 100644 --- a/test/helper/devices/TestPub.h +++ b/test/helper/devices/TestPub.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_PUB_H #define FAIR_MQ_TEST_PUB_H -#include -#include +#include +#include #include #include diff --git a/test/helper/devices/TestPull.h b/test/helper/devices/TestPull.h index 261957cf..9417c7a5 100644 --- a/test/helper/devices/TestPull.h +++ b/test/helper/devices/TestPull.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_PULL_H #define FAIR_MQ_TEST_PULL_H -#include -#include +#include +#include #include namespace fair::mq::test diff --git a/test/helper/devices/TestPush.h b/test/helper/devices/TestPush.h index 69cb2bfd..aac30a1b 100644 --- a/test/helper/devices/TestPush.h +++ b/test/helper/devices/TestPush.h @@ -9,7 +9,7 @@ #ifndef FAIR_MQ_TEST_PUSH_H #define FAIR_MQ_TEST_PUSH_H -#include +#include #include namespace fair::mq::test diff --git a/test/helper/devices/TestRep.h b/test/helper/devices/TestRep.h index 0cb95d46..aebd6566 100644 --- a/test/helper/devices/TestRep.h +++ b/test/helper/devices/TestRep.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_REP_H #define FAIR_MQ_TEST_REP_H -#include -#include +#include +#include #include namespace fair::mq::test diff --git a/test/helper/devices/TestReq.h b/test/helper/devices/TestReq.h index 210cac33..9846a600 100644 --- a/test/helper/devices/TestReq.h +++ b/test/helper/devices/TestReq.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_REQ_H #define FAIR_MQ_TEST_REQ_H -#include -#include +#include +#include #include namespace fair::mq::test diff --git a/test/helper/devices/TestSignals.h b/test/helper/devices/TestSignals.h index 5cd95c59..4f3e7729 100644 --- a/test/helper/devices/TestSignals.h +++ b/test/helper/devices/TestSignals.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_SIGNALS_H #define FAIR_MQ_TEST_SIGNALS_H -#include -#include +#include +#include #include #include diff --git a/test/helper/devices/TestSub.h b/test/helper/devices/TestSub.h index dad14662..47c395ef 100644 --- a/test/helper/devices/TestSub.h +++ b/test/helper/devices/TestSub.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_SUB_H #define FAIR_MQ_TEST_SUB_H -#include -#include +#include +#include #include #include diff --git a/test/helper/devices/TestTransferTimeout.h b/test/helper/devices/TestTransferTimeout.h index 8e4355a4..fbadb44f 100644 --- a/test/helper/devices/TestTransferTimeout.h +++ b/test/helper/devices/TestTransferTimeout.h @@ -9,13 +9,16 @@ #ifndef FAIR_MQ_TEST_TRANSFERTIMEOUT_H #define FAIR_MQ_TEST_TRANSFERTIMEOUT_H -#include -#include +#include + +#include namespace fair::mq::test { -class TransferTimeout : public FairMQDevice +using namespace fair::mq; + +class TransferTimeout : public Device { protected: auto Run() -> void override @@ -38,8 +41,8 @@ class TransferTimeout : public FairMQDevice bool send2PartsCancelingAfter0ms = false; bool receive2PartsCancelingAfter0ms = false; - FairMQMessagePtr msg1(NewMessage()); - FairMQMessagePtr msg2(NewMessage()); + MessagePtr msg1(NewMessage()); + MessagePtr msg2(NewMessage()); if (Send(msg1, "data-out", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "send msg canceled (200ms)"; @@ -69,9 +72,9 @@ class TransferTimeout : public FairMQDevice LOG(error) << "receive msg did not cancel (0ms)"; } - FairMQParts parts1; + Parts parts1; parts1.AddPart(NewMessage(10)); - FairMQParts parts2; + Parts parts2; if (Send(parts1, "data-out", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "send 1 part canceled (200ms)"; @@ -101,10 +104,10 @@ class TransferTimeout : public FairMQDevice LOG(error) << "receive 1 part did not cancel (0ms)"; } - FairMQParts parts3; + Parts parts3; parts3.AddPart(NewMessage(10)); parts3.AddPart(NewMessage(10)); - FairMQParts parts4; + Parts parts4; if (Send(parts3, "data-out", 0, 200) == static_cast(TransferCode::timeout)) { LOG(info) << "send 2 parts canceled (200ms)"; diff --git a/test/helper/devices/TestWaitFor.h b/test/helper/devices/TestWaitFor.h index def2a53f..9f272c39 100644 --- a/test/helper/devices/TestWaitFor.h +++ b/test/helper/devices/TestWaitFor.h @@ -9,8 +9,8 @@ #ifndef FAIR_MQ_TEST_WAITFOR_H #define FAIR_MQ_TEST_WAITFOR_H -#include -#include +#include +#include #include diff --git a/test/memory_resources/_memory_resources.cxx b/test/memory_resources/_memory_resources.cxx index 380c763f..33b95713 100644 --- a/test/memory_resources/_memory_resources.cxx +++ b/test/memory_resources/_memory_resources.cxx @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include +#include #include #include #include @@ -24,7 +24,7 @@ namespace using namespace std; using namespace fair::mq; -using FactoryType = shared_ptr; +using FactoryType = shared_ptr; struct TestData { @@ -81,8 +81,8 @@ TEST(MemoryResources, transportAllocatorMap) // config.SetProperty("session", to_string(session)); config.SetProperty("session", "default"); - FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); - FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config); + FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); + FactoryType factorySHM = TransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config); auto allocZMQ = factoryZMQ->GetMemoryResource(); auto allocSHM = factorySHM->GetMemoryResource(); @@ -103,7 +103,7 @@ TEST(MemoryResources, allocator) ProgOptions config; config.SetProperty("session", to_string(session)); - FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); + FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); auto allocZMQ = factoryZMQ->GetMemoryResource(); @@ -131,12 +131,12 @@ TEST(MemoryResources, getMessage) ProgOptions config; config.SetProperty("session", to_string(session)); - FactoryType factoryZMQ = FairMQTransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); - FactoryType factorySHM = FairMQTransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config); + FactoryType factoryZMQ = TransportFactory::CreateTransportFactory("zeromq", fair::mq::tools::Uuid(), &config); + FactoryType factorySHM = TransportFactory::CreateTransportFactory("shmem", fair::mq::tools::Uuid(), &config); auto allocZMQ = factoryZMQ->GetMemoryResource(); - FairMQMessagePtr message{nullptr}; + MessagePtr message{nullptr}; int* messageArray{nullptr}; diff --git a/test/message/_message.cxx b/test/message/_message.cxx index 08684a37..e82df544 100644 --- a/test/message/_message.cxx +++ b/test/message/_message.cxx @@ -6,7 +6,6 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include #include #include #include @@ -15,6 +14,8 @@ #include #include +#include + #include #include @@ -282,8 +283,8 @@ auto ZeroCopyFromUnmanaged(string const& address) -> void }); { - FairMQChannel push("Push", "push", factory1); - FairMQChannel pull("Pull", "pull", factory2); + Channel push("Push", "push", factory1); + Channel pull("Pull", "pull", factory2); push.Bind(address); pull.Connect(address); diff --git a/test/parts/_iterator_interface.cxx b/test/parts/_iterator_interface.cxx index 3982ba0f..45335c63 100644 --- a/test/parts/_iterator_interface.cxx +++ b/test/parts/_iterator_interface.cxx @@ -7,8 +7,8 @@ ********************************************************************************/ #include -#include -#include +#include +#include #include #include #include @@ -21,11 +21,11 @@ using namespace std; class RandomAccessIterator : public ::testing::Test { protected: - FairMQParts mParts; - shared_ptr mFactory; + fair::mq::Parts mParts; + shared_ptr mFactory; RandomAccessIterator() - : mFactory(FairMQTransportFactory::CreateTransportFactory("zeromq")) + : mFactory(fair::mq::TransportFactory::CreateTransportFactory("zeromq")) { mParts.AddPart(mFactory->NewSimpleMessage("1")); mParts.AddPart(mFactory->NewSimpleMessage("2")); @@ -62,7 +62,7 @@ TEST_F(RandomAccessIterator, RangeForLoopMutation) TEST_F(RandomAccessIterator, ForEachConst) { stringstream out; - for_each(mParts.cbegin(), mParts.cend(), [&out](const FairMQMessagePtr& part) { + for_each(mParts.cbegin(), mParts.cend(), [&out](const fair::mq::MessagePtr& part) { out << string{static_cast(part->GetData()), part->GetSize()}; }); diff --git a/test/plugin_services/Fixture.h b/test/plugin_services/Fixture.h index ca17f000..f994cadd 100644 --- a/test/plugin_services/Fixture.h +++ b/test/plugin_services/Fixture.h @@ -9,17 +9,19 @@ #ifndef FAIR_MQ_TEST_FIXTURE #define FAIR_MQ_TEST_FIXTURE -#include #include -#include +#include #include + +#include + #include #include namespace fair::mq::test { -inline auto control(FairMQDevice& device) -> void +inline auto control(fair::mq::Device& device) -> void { device.ChangeState(fair::mq::Transition::InitDevice); device.WaitForState(fair::mq::State::InitializingDevice); @@ -42,7 +44,7 @@ struct PluginServices : ::testing::Test { , mServices(mConfig, mDevice) , fRunStateMachineThread() { - fRunStateMachineThread = std::thread(&FairMQDevice::RunStateMachine, &mDevice); + fRunStateMachineThread = std::thread(&fair::mq::Device::RunStateMachine, &mDevice); mDevice.SetTransport("zeromq"); } @@ -55,7 +57,7 @@ struct PluginServices : ::testing::Test { } fair::mq::ProgOptions mConfig; - FairMQDevice mDevice; + fair::mq::Device mDevice; fair::mq::PluginServices mServices; std::thread fRunStateMachineThread; }; diff --git a/test/plugins/_plugin.cxx b/test/plugins/_plugin.cxx index 9c6664b9..2218305e 100644 --- a/test/plugins/_plugin.cxx +++ b/test/plugins/_plugin.cxx @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include #include #include @@ -24,28 +24,28 @@ namespace _plugin using namespace std; using namespace fair::mq; -auto control(FairMQDevice& device) -> void +auto control(Device& device) -> void { device.SetTransport("zeromq"); - device.ChangeState(fair::mq::Transition::InitDevice); - device.WaitForState(fair::mq::State::InitializingDevice); - device.ChangeState(fair::mq::Transition::CompleteInit); - device.WaitForState(fair::mq::State::Initialized); - device.ChangeState(fair::mq::Transition::Bind); - device.WaitForState(fair::mq::State::Bound); - device.ChangeState(fair::mq::Transition::Connect); - device.WaitForState(fair::mq::State::DeviceReady); - device.ChangeState(fair::mq::Transition::ResetDevice); - device.WaitForState(fair::mq::State::Idle); + device.ChangeState(Transition::InitDevice); + device.WaitForState(State::InitializingDevice); + device.ChangeState(Transition::CompleteInit); + device.WaitForState(State::Initialized); + device.ChangeState(Transition::Bind); + device.WaitForState(State::Bound); + device.ChangeState(Transition::Connect); + device.WaitForState(State::DeviceReady); + device.ChangeState(Transition::ResetDevice); + device.WaitForState(State::Idle); - device.ChangeState(fair::mq::Transition::End); + device.ChangeState(Transition::End); } TEST(Plugin, Operators) { - fair::mq::ProgOptions config; - FairMQDevice device; + ProgOptions config; + Device device; PluginServices services{config, device}; Plugin p1{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; Plugin p2{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; @@ -61,8 +61,8 @@ TEST(Plugin, Operators) TEST(Plugin, OstreamOperators) { - fair::mq::ProgOptions config; - FairMQDevice device; + ProgOptions config; + Device device; PluginServices services{config, device}; Plugin p1{"dds", {1, 0, 0}, "Foo Bar ", "https://git.test.net/dds.git", &services}; stringstream ss; @@ -77,9 +77,9 @@ TEST(Plugin, OstreamOperators) TEST(PluginVersion, Operators) { - struct fair::mq::tools::Version v1{1, 0, 0}; - struct fair::mq::tools::Version v2{1, 0, 0}; - struct fair::mq::tools::Version v3{1, 2, 0}; + struct tools::Version v1{1, 0, 0}; + struct tools::Version v2{1, 0, 0}; + struct tools::Version v3{1, 2, 0}; EXPECT_EQ(v1, v2); EXPECT_NE(v1, v3); EXPECT_GT(v3, v2); diff --git a/test/plugins/_plugin_manager.cxx b/test/plugins/_plugin_manager.cxx index 89e7bc1b..0916e6ca 100644 --- a/test/plugins/_plugin_manager.cxx +++ b/test/plugins/_plugin_manager.cxx @@ -9,9 +9,9 @@ #include #include #include -#include +#include #include -#include +#include #include #include #include @@ -25,28 +25,28 @@ using namespace boost::filesystem; using namespace boost::program_options; using namespace std; -auto control(FairMQDevice& device) -> void +auto control(Device& device) -> void { device.SetTransport("zeromq"); - device.ChangeState(fair::mq::Transition::InitDevice); - device.WaitForState(fair::mq::State::InitializingDevice); - device.ChangeState(fair::mq::Transition::CompleteInit); - device.WaitForState(fair::mq::State::Initialized); - device.ChangeState(fair::mq::Transition::Bind); - device.WaitForState(fair::mq::State::Bound); - device.ChangeState(fair::mq::Transition::Connect); - device.WaitForState(fair::mq::State::DeviceReady); - device.ChangeState(fair::mq::Transition::ResetDevice); - device.WaitForState(fair::mq::State::Idle); + device.ChangeState(Transition::InitDevice); + device.WaitForState(State::InitializingDevice); + device.ChangeState(Transition::CompleteInit); + device.WaitForState(State::Initialized); + device.ChangeState(Transition::Bind); + device.WaitForState(State::Bound); + device.ChangeState(Transition::Connect); + device.WaitForState(State::DeviceReady); + device.ChangeState(Transition::ResetDevice); + device.WaitForState(State::Idle); - device.ChangeState(fair::mq::Transition::End); + device.ChangeState(Transition::End); } TEST(PluginManager, LoadPluginDynamic) { - fair::mq::ProgOptions config; - FairMQDevice device; + ProgOptions config; + Device device; PluginManager mgr; mgr.EmplacePluginServices(config, device); @@ -77,13 +77,13 @@ TEST(PluginManager, LoadPluginDynamic) TEST(PluginManager, LoadPluginStatic) { - FairMQDevice device; + Device device; PluginManager mgr; device.SetTransport("zeromq"); ASSERT_NO_THROW(mgr.LoadPlugin("s:control")); - fair::mq::ProgOptions config; + ProgOptions config; config.SetProperty("control", "static"); config.SetProperty("catch-signals", 0); mgr.EmplacePluginServices(config, device); diff --git a/test/region/_region.cxx b/test/region/_region.cxx index b2a63ee3..21a41c6c 100644 --- a/test/region/_region.cxx +++ b/test/region/_region.cxx @@ -6,13 +6,14 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include #include #include #include #include +#include + #include #include // make_unique @@ -22,31 +23,32 @@ namespace { using namespace std; +using namespace fair::mq; void RegionsCache(const string& transport, const string& address) { - size_t session1 = fair::mq::tools::UuidHash(); - size_t session2 = fair::mq::tools::UuidHash(); + size_t session1 = tools::UuidHash(); + size_t session2 = tools::UuidHash(); - fair::mq::ProgOptions config1; - fair::mq::ProgOptions config2; + ProgOptions config1; + ProgOptions config2; config1.SetProperty("session", to_string(session1)); config2.SetProperty("session", to_string(session2)); - auto factory1 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config1); - auto factory2 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config2); + auto factory1 = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config1); + auto factory2 = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config2); auto region1 = factory1->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {}); auto region2 = factory2->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {}); void* r1ptr = region1->GetData(); void* r2ptr = region2->GetData(); - FairMQChannel push1("Push1", "push", factory1); - FairMQChannel pull1("Pull1", "pull", factory1); + Channel push1("Push1", "push", factory1); + Channel pull1("Pull1", "pull", factory1); push1.Bind(address + to_string(1)); pull1.Connect(address + to_string(1)); - FairMQChannel push2("Push2", "push", factory2); - FairMQChannel pull2("Pull2", "pull", factory2); + Channel push2("Push2", "push", factory2); + Channel pull2("Pull2", "pull", factory2); push2.Bind(address + to_string(2)); pull2.Connect(address + to_string(2)); @@ -56,27 +58,27 @@ void RegionsCache(const string& transport, const string& address) static_cast(r2ptr)[0] = 99; // c static_cast(static_cast(r2ptr) + 100)[0] = 100; // d - FairMQMessagePtr m1(push1.NewMessage(region1, r1ptr, 100, nullptr)); - FairMQMessagePtr m2(push1.NewMessage(region1, static_cast(r1ptr) + 100, 100, nullptr)); + MessagePtr m1(push1.NewMessage(region1, r1ptr, 100, nullptr)); + MessagePtr m2(push1.NewMessage(region1, static_cast(r1ptr) + 100, 100, nullptr)); push1.Send(m1); push1.Send(m2); - FairMQMessagePtr m3(push2.NewMessage(region2, r2ptr, 100, nullptr)); - FairMQMessagePtr m4(push2.NewMessage(region2, static_cast(r2ptr) + 100, 100, nullptr)); + MessagePtr m3(push2.NewMessage(region2, r2ptr, 100, nullptr)); + MessagePtr m4(push2.NewMessage(region2, static_cast(r2ptr) + 100, 100, nullptr)); push2.Send(m3); push2.Send(m4); } { - FairMQMessagePtr m1(pull1.NewMessage()); - FairMQMessagePtr m2(pull1.NewMessage()); + MessagePtr m1(pull1.NewMessage()); + MessagePtr m2(pull1.NewMessage()); ASSERT_EQ(pull1.Receive(m1), 100); ASSERT_EQ(pull1.Receive(m2), 100); ASSERT_EQ(static_cast(m1->GetData())[0], 'a'); ASSERT_EQ(static_cast(m2->GetData())[0], 'b'); - FairMQMessagePtr m3(pull2.NewMessage()); - FairMQMessagePtr m4(pull2.NewMessage()); + MessagePtr m3(pull2.NewMessage()); + MessagePtr m4(pull2.NewMessage()); ASSERT_EQ(pull2.Receive(m3), 100); ASSERT_EQ(pull2.Receive(m4), 100); ASSERT_EQ(static_cast(m3->GetData())[0], 'c'); @@ -86,17 +88,17 @@ void RegionsCache(const string& transport, const string& address) void RegionEventSubscriptions(const string& transport) { - size_t session{fair::mq::tools::UuidHash()}; + size_t session{tools::UuidHash()}; - fair::mq::ProgOptions config; + ProgOptions config; config.SetProperty("session", to_string(session)); - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); constexpr int size1 = 1000000; constexpr int size2 = 5000000; constexpr int64_t userFlags = 12345; - fair::mq::tools::Semaphore blocker; + tools::Semaphore blocker; { auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {}); @@ -110,14 +112,14 @@ void RegionEventSubscriptions(const string& transport) ASSERT_EQ(region2->GetSize(), size2); ASSERT_EQ(factory->SubscribedToRegionEvents(), false); - factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](FairMQRegionInfo info) { + factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) { LOG(info) << ">>> " << info.event << ": " << (info.managed ? "managed" : "unmanaged") << ", id: " << info.id << ", ptr: " << info.ptr << ", size: " << info.size << ", flags: " << info.flags; - if (info.event == FairMQRegionEvent::created) { + if (info.event == RegionEvent::created) { if (info.id == id1) { ASSERT_EQ(info.size, size1); ASSERT_EQ(info.ptr, ptr1); @@ -128,7 +130,7 @@ void RegionEventSubscriptions(const string& transport) ASSERT_EQ(info.flags, userFlags); blocker.Signal(); } - } else if (info.event == FairMQRegionEvent::destroyed) { + } else if (info.event == RegionEvent::destroyed) { if (info.id == id1) { blocker.Signal(); } else if (info.id == id2) { @@ -157,22 +159,22 @@ void RegionEventSubscriptions(const string& transport) void RegionCallbacks(const string& transport, const string& _address) { - size_t session(fair::mq::tools::UuidHash()); - std::string address(fair::mq::tools::ToString(_address, "_", transport)); + size_t session(tools::UuidHash()); + std::string address(tools::ToString(_address, "_", transport)); - fair::mq::ProgOptions config; + ProgOptions config; config.SetProperty("session", to_string(session)); - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); unique_ptr intPtr1 = make_unique(42); unique_ptr intPtr2 = make_unique(43); - fair::mq::tools::Semaphore blocker; + tools::Semaphore blocker; - FairMQChannel push("Push", "push", factory); + Channel push("Push", "push", factory); push.Bind(address); - FairMQChannel pull("Pull", "pull", factory); + Channel pull("Pull", "pull", factory); pull.Connect(address); void* ptr1 = nullptr; @@ -189,7 +191,7 @@ void RegionCallbacks(const string& transport, const string& _address) }); ptr1 = region1->GetData(); - auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector& blocks) { + auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector& blocks) { ASSERT_EQ(blocks.size(), 1); ASSERT_EQ(blocks.at(0).ptr, ptr2); ASSERT_EQ(blocks.at(0).size, size2); @@ -200,15 +202,15 @@ void RegionCallbacks(const string& transport, const string& _address) ptr2 = region2->GetData(); { - FairMQMessagePtr msg1out(push.NewMessage(region1, ptr1, size1, intPtr1.get())); - FairMQMessagePtr msg2out(push.NewMessage(region2, ptr2, size2, intPtr2.get())); + MessagePtr msg1out(push.NewMessage(region1, ptr1, size1, intPtr1.get())); + MessagePtr msg2out(push.NewMessage(region2, ptr2, size2, intPtr2.get())); ASSERT_EQ(push.Send(msg1out), size1); ASSERT_EQ(push.Send(msg2out), size2); } { - FairMQMessagePtr msg1in(pull.NewMessage()); - FairMQMessagePtr msg2in(pull.NewMessage()); + MessagePtr msg1in(pull.NewMessage()); + MessagePtr msg2in(pull.NewMessage()); ASSERT_EQ(pull.Receive(msg1in), size1); ASSERT_EQ(pull.Receive(msg2in), size2); } diff --git a/test/state_machine/_state_machine.cxx b/test/state_machine/_state_machine.cxx index f5b011bb..eac0d149 100644 --- a/test/state_machine/_state_machine.cxx +++ b/test/state_machine/_state_machine.cxx @@ -8,7 +8,7 @@ #include #include -#include +#include #include #include diff --git a/test/transport/_options.cxx b/test/transport/_options.cxx index 01aef8d0..d4ab5e50 100644 --- a/test/transport/_options.cxx +++ b/test/transport/_options.cxx @@ -39,7 +39,7 @@ void CheckOldOptionInterface(Channel& channel, const string& option) void RunOptionsTest(const string& transport) { - fair::mq::ProgOptions config; + ProgOptions config; config.SetProperty("session", tools::Uuid()); auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); Channel channel("Push", "push", factory); @@ -85,15 +85,15 @@ void ZeroingAndMlock(const string& transport) void ZeroingAndMlockOnCreation(const string& transport) { - size_t session{fair::mq::tools::UuidHash()}; + size_t session{tools::UuidHash()}; - fair::mq::ProgOptions config; + ProgOptions config; config.SetProperty("session", to_string(session)); config.SetProperty("shm-segment-size", 16384); // NOLINT config.SetProperty("shm-mlock-segment-on-creation", true); config.SetProperty("shm-zero-segment-on-creation", true); - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); constexpr size_t size{10000}; auto outMsg(factory->CreateMessage(size)); diff --git a/test/transport/_shmem.cxx b/test/transport/_shmem.cxx index fdfa4cfc..79b14033 100644 --- a/test/transport/_shmem.cxx +++ b/test/transport/_shmem.cxx @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include @@ -29,7 +29,7 @@ void GetFreeMemory() ASSERT_THROW(shmem::Monitor::GetFreeMemory(shmem::SessionId{sessionId}, 0), shmem::Monitor::MonitorError); - auto factory = FairMQTransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config); ASSERT_NO_THROW(shmem::Monitor::GetFreeMemory(shmem::SessionId{sessionId}, 0)); ASSERT_THROW(shmem::Monitor::GetFreeMemory(shmem::SessionId{sessionId}, 1), shmem::Monitor::MonitorError); diff --git a/test/transport/_transfer_timeout.cxx b/test/transport/_transfer_timeout.cxx index 5a596860..d997dffe 100644 --- a/test/transport/_transfer_timeout.cxx +++ b/test/transport/_transfer_timeout.cxx @@ -7,13 +7,16 @@ ********************************************************************************/ #include "runner.h" -#include -#include -#include + +#include +#include #include #include #include #include + +#include + #include #include @@ -24,10 +27,11 @@ namespace { using namespace std; +using namespace fair::mq; using namespace fair::mq::test; using namespace fair::mq::tools; -void delayedInterruptor(FairMQTransportFactory& transport) +void delayedInterruptor(TransportFactory& transport) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); transport.Interrupt(); @@ -35,7 +39,7 @@ void delayedInterruptor(FairMQTransportFactory& transport) auto RunTransferTimeout(string transport) -> void { - size_t session{fair::mq::tools::UuidHash()}; + size_t session{UuidHash()}; stringstream cmd; cmd << runTestDevice << " --id transfer_timeout_" << transport << " --control static --transport " << transport << " --session " << session << " --color false --mq-config \"" << mqConfig << "\""; @@ -48,18 +52,18 @@ auto RunTransferTimeout(string transport) -> void void InterruptTransfer(const string& transport, const string& _address) { - size_t session{fair::mq::tools::UuidHash()}; - std::string address(fair::mq::tools::ToString(_address, "_", transport)); + size_t session{UuidHash()}; + std::string address(ToString(_address, "_", transport)); fair::mq::ProgOptions config; config.SetProperty("session", to_string(session)); - auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); + auto factory = TransportFactory::CreateTransportFactory(transport, Uuid(), &config); - FairMQChannel pull{"Pull", "pull", factory}; + Channel pull{"Pull", "pull", factory}; pull.Bind(address); - FairMQMessagePtr msg(pull.NewMessage()); + MessagePtr msg(pull.NewMessage()); auto t = thread(delayedInterruptor, ref(*factory));