Don't use to-be-deprecated names

This commit is contained in:
Alexey Rybalchenko
2022-01-14 00:48:34 +01:00
committed by Dennis Klein
parent f15f669853
commit bfd08bb33f
90 changed files with 411 additions and 391 deletions

View File

@@ -30,7 +30,7 @@ struct Sampler : fair::mq::Device
// create message object with a pointer to the data buffer, its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg(NewMessage(
fair::mq::MessagePtr msg(NewMessage(
const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

View File

@@ -20,7 +20,7 @@ struct Processor : fair::mq::Device
OnData("data1", &Processor::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
@@ -32,7 +32,7 @@ struct Processor : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
fair::mq::MessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));

View File

@@ -28,7 +28,7 @@ struct Sampler : fair::mq::Device
{
// Initializing message with NewStaticMessage will avoid copy
// but won't delete the data after the sending is completed.
FairMQMessagePtr msg(NewStaticMessage(fText));
fair::mq::MessagePtr msg(NewStaticMessage(fText));
LOG(info) << "Sending \"" << fText << "\"";

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

View File

@@ -27,10 +27,10 @@ struct Sampler : fair::mq::Device
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage(fCounter++));
fair::mq::MessagePtr msg(NewSimpleMessage(fCounter++));
for (int i = 0; i < fNumDataChannels - 1; ++i) {
FairMQMessagePtr msgCopy(NewMessage());
fair::mq::MessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
Send(msgCopy, "data", i);
}

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received message: \"" << *(static_cast<uint64_t*>(msg->GetData())) << "\"";

View File

@@ -20,7 +20,7 @@ struct Processor : fair::mq::Device
OnData("data1", &Processor::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received data, processing...";
@@ -32,7 +32,7 @@ struct Processor : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
fair::mq::MessagePtr msg2(NewMessage(const_cast<char*>(text->c_str()),
text->length(),
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); },
text));

View File

@@ -24,7 +24,7 @@ struct Sampler : fair::mq::Device
{
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("Data"));
fair::mq::MessagePtr msg(NewSimpleMessage("Data"));
LOG(info) << "Sending \"Data\"";

View File

@@ -25,7 +25,7 @@ struct Sink : fair::mq::Device
fIterations = fConfig->GetValue<uint64_t>("iterations");
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
LOG(info) << "Received: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";

View File

@@ -5,7 +5,7 @@ A topology of two devices - Sampler and Sink, communicating with PUSH-PULL patte
The Sampler sends a multipart message to the Sink, consisting of two message parts - header and body.
Each message part is a regular FairMQMessage. To combine them into a multi-part message use `FairMQParts`. Add messages to `FairMQParts` with `AddPart` method.
Each message part is a regular fair::mq::Message. To combine them into a multi-part message use `fair::mq::Parts`. Add messages to `fair::mq::Parts` with `AddPart` method.
All parts are guaranteed to be delivered together. The Receive call in the sink will recive the entire parts structure.

View File

@@ -35,15 +35,15 @@ struct Sampler : fair::mq::Device
}
LOG(info) << "Sending header with stopFlag: " << header.stopFlag;
FairMQParts parts;
fair::mq::Parts parts;
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
parts.AddPart(NewSimpleMessage(header));
parts.AddPart(NewMessage(1000));
// create more data parts, testing the FairMQParts in-place constructor
FairMQParts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
// create more data parts, testing the fair::mq::Parts in-place constructor
fair::mq::Parts auxData{ NewMessage(500), NewMessage(600), NewMessage(700) };
assert(auxData.Size() == 3);
parts.AddPart(std::move(auxData));
assert(auxData.Size() == 0);

View File

@@ -20,7 +20,7 @@ struct Sink : fair::mq::Device
OnData("data", &Sink::HandleData);
}
bool HandleData(FairMQParts& parts, int)
bool HandleData(fair::mq::Parts& parts, int)
{
LOG(info) << "Received message with " << parts.Size() << " parts";

View File

@@ -5,4 +5,4 @@ This example demonstrates how to work with multiple channels and multiplex betwe
A topology of three devices - **Sampler**, **Sink** and **Broadcaster**. The Sampler sends data to the Sink via the PUSH-PULL pattern. The Broadcaster device sends a message to both Sampler and Sink containing a string "OK" every second. The Broadcaster sends the message via PUB pattern. Both Sampler and Sink, besides doing their PUSH-PULL job, listen via SUB to the Broadcaster.
The multiplexing between their data channels and the broadcast channels happens with `FairMQPoller`.
The multiplexing between their data channels and the broadcast channels happens with `fair::mq::Poller`.

View File

@@ -22,7 +22,7 @@ struct Broadcaster : fair::mq::Device
// NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place).
// Should only be used for small data because of the cost of an additional copy
FairMQMessagePtr msg(NewSimpleMessage("OK"));
fair::mq::MessagePtr msg(NewSimpleMessage("OK"));
LOG(info) << "Sending OK";

View File

@@ -7,7 +7,7 @@
********************************************************************************/
#include <fairmq/Device.h>
#include <FairMQPoller.h>
#include <fairmq/Poller.h>
#include <fairmq/runDevice.h>
#include <chrono>
@@ -26,13 +26,13 @@ struct Sampler : fair::mq::Device
void Run() override
{
FairMQPollerPtr poller(NewPoller("data", "broadcast"));
fair::mq::PollerPtr poller(NewPoller("data", "broadcast"));
while (!NewStatePending()) {
poller->Poll(100);
if (poller->CheckInput("broadcast", 0)) {
FairMQMessagePtr msg(NewMessage());
fair::mq::MessagePtr msg(NewMessage());
if (Receive(msg, "broadcast") > 0) {
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
@@ -42,7 +42,7 @@ struct Sampler : fair::mq::Device
if (poller->CheckOutput("data", 0)) {
std::this_thread::sleep_for(std::chrono::seconds(1));
FairMQMessagePtr msg(NewSimpleMessage(fText));
fair::mq::MessagePtr msg(NewSimpleMessage(fText));
if (Send(msg, "data") > 0) {
LOG(info) << "Sent \"" << fText << "\"";

View File

@@ -27,7 +27,7 @@ struct Sink : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
bool HandleBroadcast(fair::mq::MessagePtr& msg, int /*index*/)
{
LOG(info) << "Received broadcast: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedBroadcast = true;
@@ -35,7 +35,7 @@ struct Sink : fair::mq::Device
return CheckIterations();
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
LOG(info) << "Received message: \"" << std::string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
fReceivedData = true;

View File

@@ -28,7 +28,7 @@ struct Sampler1 : fair::mq::Device
bool ConditionalRun() override
{
// Creates a message using the transport of channel data1
FairMQMessagePtr msg(NewMessageFor("data1", 0, 1000000));
fair::mq::MessagePtr msg(NewMessageFor("data1", 0, 1000000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
@@ -54,7 +54,7 @@ struct Sampler1 : fair::mq::Device
uint64_t numAcks = 0;
while (!NewStatePending()) {
FairMQMessagePtr ack(NewMessageFor("ack", 0));
fair::mq::MessagePtr ack(NewMessageFor("ack", 0));
if (Receive(ack, "ack") < 0) {
break;
}

View File

@@ -22,7 +22,7 @@ struct Sampler2 : fair::mq::Device
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
fair::mq::MessagePtr msg(NewMessage(1000));
// in case of error or transfer interruption, return false to go to IDLE state
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).

View File

@@ -26,11 +26,11 @@ struct Sink : fair::mq::Device
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData1(FairMQMessagePtr& /*msg*/, int /*index*/)
bool HandleData1(fair::mq::MessagePtr& /*msg*/, int /*index*/)
{
fNumIterations1++;
// Creates a message using the transport of channel ack
FairMQMessagePtr ack(NewMessageFor("ack", 0));
fair::mq::MessagePtr ack(NewMessageFor("ack", 0));
if (Send(ack, "ack") < 0) {
return false;
}
@@ -40,7 +40,7 @@ struct Sink : fair::mq::Device
}
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
bool HandleData2(FairMQMessagePtr& /*msg*/, int /*index*/)
bool HandleData2(fair::mq::MessagePtr& /*msg*/, int /*index*/)
{
fNumIterations2++;
// return true if want to be called again (otherwise go to IDLE state)

View File

@@ -24,7 +24,7 @@ namespace bpo = boost::program_options;
struct TFBuffer
{
FairMQParts parts;
fair::mq::Parts parts;
chrono::steady_clock::time_point start;
chrono::steady_clock::time_point end;
};
@@ -43,7 +43,7 @@ struct Receiver : fair::mq::Device
fMaxTimeframes = GetConfig()->GetValue<int>("max-timeframes");
}
bool HandleData(FairMQParts& parts, int /* index */)
bool HandleData(fair::mq::Parts& parts, int /* index */)
{
Header& h = *(static_cast<Header*>(parts.At(0)->GetData()));
// LOG(info) << "Received sub-time frame #" << h.id << " from Sender" << h.senderIndex;
@@ -107,7 +107,7 @@ void addCustomOptions(bpo::options_description& options)
("max-timeframes", bpo::value<int>()->default_value(0), "Maximum number of timeframes to receive (0 - unlimited)");
}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Receiver>();
}

View File

@@ -28,11 +28,11 @@ struct Sender : fair::mq::Device
void Run() override
{
FairMQChannel& dataInChannel = GetChannel("sync", 0);
fair::mq::Channel& dataInChannel = GetChannel("sync", 0);
while (!NewStatePending()) {
Header h;
FairMQMessagePtr id(NewMessage());
fair::mq::MessagePtr id(NewMessage());
if (dataInChannel.Receive(id) > 0) {
h.id = *(static_cast<uint16_t*>(id->GetData()));
h.senderIndex = fIndex;
@@ -40,7 +40,7 @@ struct Sender : fair::mq::Device
continue;
}
FairMQParts parts;
fair::mq::Parts parts;
parts.AddPart(NewSimpleMessage(h));
parts.AddPart(NewMessage(fSubtimeframeSize));
@@ -66,7 +66,7 @@ void addCustomOptions(bpo::options_description& options)
("subtimeframe-size", bpo::value<int>()->default_value(1000), "Subtimeframe size in bytes")
("num-receivers", bpo::value<int>()->required(), "Number of EPNs");
}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Sender>();
}

View File

@@ -19,7 +19,7 @@ struct Synchronizer : fair::mq::Device
{
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewSimpleMessage(fTimeframeId));
fair::mq::MessagePtr msg(NewSimpleMessage(fTimeframeId));
if (Send(msg, "sync") > 0) {
if (++fTimeframeId == UINT16_MAX - 1) {
@@ -37,7 +37,7 @@ struct Synchronizer : fair::mq::Device
};
void addCustomOptions(bpo::options_description& /* options */) {}
std::unique_ptr<fair::mq::Device> getDevice(FairMQProgOptions& /* config */)
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /* config */)
{
return std::make_unique<Synchronizer>();
}

View File

@@ -30,10 +30,10 @@ struct QCDispatcher : fair::mq::Device
});
}
bool HandleData(FairMQMessagePtr& msg, int)
bool HandleData(fair::mq::MessagePtr& msg, int)
{
if (fDoQC.load() == true) {
FairMQMessagePtr msgCopy(NewMessage());
fair::mq::MessagePtr msgCopy(NewMessage());
msgCopy->Copy(*msg);
if (Send(msg, "qc") < 0) {
return false;

View File

@@ -9,12 +9,12 @@
#include <fairmq/Device.h>
#include <fairmq/runDevice.h>
class QCTask : public FairMQDevice
class QCTask : public fair::mq::Device
{
public:
QCTask()
{
OnData("qc", [](FairMQMessagePtr& /*msg*/, int) {
OnData("qc", [](fair::mq::MessagePtr& /*msg*/, int) {
LOG(info) << "received data";
return false;
});

View File

@@ -16,7 +16,7 @@ struct Sampler : fair::mq::Device
{
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessage(1000));
fair::mq::MessagePtr msg(NewMessage(1000));
if (Send(msg, "data1") < 0) {
return false;

View File

@@ -14,7 +14,7 @@
struct Sink : fair::mq::Device
{
Sink() { OnData("data2", &Sink::HandleData); }
bool HandleData(FairMQMessagePtr& /*msg*/, int /*index*/) { return true; }
bool HandleData(fair::mq::MessagePtr& /*msg*/, int /*index*/) { return true; }
};
namespace bpo = boost::program_options;

View File

@@ -13,7 +13,7 @@
namespace bpo = boost::program_options;
class Builder : public FairMQDevice
class Builder : public fair::mq::Device
{
public:
Builder() = default;
@@ -24,7 +24,7 @@ class Builder : public FairMQDevice
OnData("rb", &Builder::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
if (Send(msg, fOutputChannelName) < 0) {
return false;

View File

@@ -18,9 +18,9 @@ struct Processor : fair::mq::Device
OnData("bp", &Processor::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
FairMQMessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize()));
fair::mq::MessagePtr msg2(NewMessageFor("ps", 0, msg->GetSize()));
if (Send(msg2, "ps") < 0) {
return false;
}

View File

@@ -22,7 +22,7 @@ struct Readout : fair::mq::Device
fMsgSize = fConfig->GetProperty<int>("msg-size");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("rb",
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("rb",
0,
10000000,
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
@@ -36,7 +36,7 @@ struct Readout : fair::mq::Device
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessageFor("rb", // channel
fair::mq::MessagePtr msg(NewMessageFor("rb", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
@@ -71,7 +71,7 @@ struct Readout : fair::mq::Device
int fMsgSize = 10000;
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
FairMQUnmanagedRegionPtr fRegion = nullptr;
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::atomic<uint64_t> fNumUnackedMsgs = 0;
};

View File

@@ -21,7 +21,7 @@ struct Sender : fair::mq::Device
OnData(fInputChannelName, &Sender::HandleData);
}
bool HandleData(FairMQMessagePtr& msg, int /*index*/)
bool HandleData(fair::mq::MessagePtr& msg, int /*index*/)
{
if (Send(msg, "sr") < 0) {
return false;

View File

@@ -23,7 +23,7 @@ struct Sampler : fair::mq::Device
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
GetChannel("data", 0).Transport()->SubscribeToRegionEvents([](fair::mq::RegionInfo info) {
LOG(info) << "Region event: " << info.event << ": "
<< (info.managed ? "managed" : "unmanaged")
<< ", id: " << info.id
@@ -36,7 +36,7 @@ struct Sampler : fair::mq::Device
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, // ... and this sub-channel
10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
@@ -50,7 +50,7 @@ struct Sampler : fair::mq::Device
bool ConditionalRun() override
{
FairMQMessagePtr msg(NewMessageFor("data", // channel
fair::mq::MessagePtr msg(NewMessageFor("data", // channel
0, // sub-channel
fRegion, // region
fRegion->GetData(), // ptr within region
@@ -93,7 +93,7 @@ struct Sampler : fair::mq::Device
uint32_t fLinger = 100;
uint64_t fMaxIterations = 0;
uint64_t fNumIterations = 0;
FairMQUnmanagedRegionPtr fRegion = nullptr;
fair::mq::UnmanagedRegionPtr fRegion = nullptr;
std::mutex fMtx;
uint64_t fNumUnackedMsgs = 0;
};

View File

@@ -32,11 +32,11 @@ struct Client : fair::mq::Device
// its size,
// custom deletion function (called when transfer is done),
// and pointer to the object managing the data buffer
FairMQMessagePtr req(NewMessage(const_cast<char*>(text->c_str()), // data
fair::mq::MessagePtr req(NewMessage(const_cast<char*>(text->c_str()), // data
text->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, // deletion callback
text)); // object that manages the data
FairMQMessagePtr rep(NewMessage());
fair::mq::MessagePtr rep(NewMessage());
LOG(info) << "Sending \"" << fText << "\" to server.";

View File

@@ -26,7 +26,7 @@ struct Server : fair::mq::Device
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
}
bool HandleData(FairMQMessagePtr& req, int)
bool HandleData(fair::mq::MessagePtr& req, int)
{
LOG(info) << "Received request from client: \"" << std::string(static_cast<char*>(req->GetData()), req->GetSize()) << "\"";
@@ -34,7 +34,7 @@ struct Server : fair::mq::Device
LOG(info) << "Sending reply to client.";
FairMQMessagePtr rep(NewMessage(const_cast<char*>(text->c_str()), // data
fair::mq::MessagePtr rep(NewMessage(const_cast<char*>(text->c_str()), // data
text->length(), // size
[](void* /*data*/, void* object) { delete static_cast<std::string*>(object); }, // deletion callback
text)); // object that manages the data