diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx index e03e5475..d73527d6 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx @@ -39,11 +39,11 @@ void FairMQExample1Sampler::Run() string* text = new string(fText); - unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); LOG(INFO) << "Sending \"" << fText << "\""; - fChannels.at("data-out").at(0).Send(msg); + Send(msg, "data"); } } diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx b/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx index 551af399..24783c46 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx @@ -25,9 +25,9 @@ void FairMQExample1Sink::Run() { while (CheckCurrentState(RUNNING)) { - unique_ptr msg(fTransportFactory->CreateMessage()); + unique_ptr msg(NewMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) >= 0) + if (Receive(msg, "data") >= 0) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) diff --git a/examples/MQ/1-sampler-sink/ex1-sampler-sink.json b/examples/MQ/1-sampler-sink/ex1-sampler-sink.json index f41683a7..66e0903b 100644 --- a/examples/MQ/1-sampler-sink/ex1-sampler-sink.json +++ b/examples/MQ/1-sampler-sink/ex1-sampler-sink.json @@ -6,7 +6,7 @@ "id": "sampler1", "channel": { - "name": "data-out", + "name": "data", "socket": { "type": "push", @@ -24,7 +24,7 @@ "id": "sink1", "channel": { - "name": "data-in", + "name": "data", "socket": { "type": "pull", diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx index b539d514..0431bee9 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx @@ -36,11 +36,11 @@ void FairMQExample2Processor::Run() while (CheckCurrentState(RUNNING)) { // Create empty message to hold the input - unique_ptr input(fTransportFactory->CreateMessage()); + unique_ptr input(NewMessage()); // Receive the message (blocks until received or interrupted (e.g. by state change)). // Returns size of the received message or -1 if interrupted. - if (fChannels.at("data-in").at(0).Receive(input) >= 0) + if (Receive(input, "data1") >= 0) { LOG(INFO) << "Received data, processing..."; @@ -49,10 +49,10 @@ void FairMQExample2Processor::Run() *text += " (modified by " + fId + ")"; // Create output message - unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); // Send out the output message - fChannels.at("data-out").at(0).Send(msg); + Send(msg, "data2"); } } } diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx index 40f90be6..85a9a156 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx @@ -39,11 +39,11 @@ void FairMQExample2Sampler::Run() string* text = new string(fText); - unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); LOG(INFO) << "Sending \"" << fText << "\""; - fChannels.at("data-out").at(0).Send(msg); + Send(msg, "data1"); } } diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx index cb4dbf52..bfc3a8d2 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx @@ -28,9 +28,9 @@ void FairMQExample2Sink::Run() { while (CheckCurrentState(RUNNING)) { - unique_ptr msg(fTransportFactory->CreateMessage()); + unique_ptr msg(NewMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) >= 0) + if (Receive(msg, "data2") >= 0) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) diff --git a/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json b/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json index 216b9f09..1c096fcd 100644 --- a/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json +++ b/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json @@ -6,7 +6,7 @@ "id": "sampler1", "channel": { - "name": "data-out", + "name": "data1", "socket": { "type": "push", @@ -24,7 +24,7 @@ "id": "processor1", "channel": { - "name": "data-in", + "name": "data1", "socket": { "type": "pull", @@ -37,7 +37,7 @@ }, "channel": { - "name": "data-out", + "name": "data2", "socket": { "type": "push", @@ -55,7 +55,7 @@ "id": "processor2", "channel": { - "name": "data-in", + "name": "data1", "socket": { "type": "pull", @@ -68,7 +68,7 @@ }, "channel": { - "name": "data-out", + "name": "data2", "socket": { "type": "push", @@ -86,7 +86,7 @@ "id": "sink1", "channel": { - "name": "data-in", + "name": "data2", "socket": { "type": "pull", diff --git a/examples/MQ/3-dds/FairMQExample3Processor.cxx b/examples/MQ/3-dds/FairMQExample3Processor.cxx index bef57897..f08301fc 100644 --- a/examples/MQ/3-dds/FairMQExample3Processor.cxx +++ b/examples/MQ/3-dds/FairMQExample3Processor.cxx @@ -34,11 +34,11 @@ void FairMQExample3Processor::Run() while (CheckCurrentState(RUNNING)) { // Create empty message to hold the input - unique_ptr input(fTransportFactory->CreateMessage()); + unique_ptr input(NewMessage()); // Receive the message (blocks until received or interrupted (e.g. by state change)). // Returns size of the received message or -1 if interrupted. - if (fChannels.at("data-in").at(0).Receive(input) >= 0) + if (Receive(input, "data1") >= 0) { LOG(INFO) << "Received data, processing..."; @@ -47,10 +47,10 @@ void FairMQExample3Processor::Run() *text += " (modified by " + fId + ")"; // Create output message - unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); // Send out the output message - fChannels.at("data-out").at(0).Send(msg); + Send(msg, "data2"); } } } diff --git a/examples/MQ/3-dds/FairMQExample3Sampler.cxx b/examples/MQ/3-dds/FairMQExample3Sampler.cxx index b136b998..ea6aa112 100644 --- a/examples/MQ/3-dds/FairMQExample3Sampler.cxx +++ b/examples/MQ/3-dds/FairMQExample3Sampler.cxx @@ -37,11 +37,11 @@ void FairMQExample3Sampler::Run() string* text = new string("Data"); - unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); LOG(INFO) << "Sending \"Data\""; - fChannels.at("data-out").at(0).Send(msg); + Send(msg, "data1"); } } diff --git a/examples/MQ/3-dds/FairMQExample3Sink.cxx b/examples/MQ/3-dds/FairMQExample3Sink.cxx index c51def8a..322fe58a 100644 --- a/examples/MQ/3-dds/FairMQExample3Sink.cxx +++ b/examples/MQ/3-dds/FairMQExample3Sink.cxx @@ -28,9 +28,9 @@ void FairMQExample3Sink::Run() { while (CheckCurrentState(RUNNING)) { - unique_ptr msg(fTransportFactory->CreateMessage()); + unique_ptr msg(NewMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) >= 0) + if (Receive(msg, "data2") >= 0) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) diff --git a/examples/MQ/3-dds/ex3-dds-hosts.cfg b/examples/MQ/3-dds/ex3-dds-hosts.cfg index 6e18da81..2779f81e 100644 --- a/examples/MQ/3-dds/ex3-dds-hosts.cfg +++ b/examples/MQ/3-dds/ex3-dds-hosts.cfg @@ -3,6 +3,6 @@ echo "DBG: SSH ENV Script" #source setup.sh @bash_end@ -sampler, username@localhost, , /tmp/, 1 -processor, username@localhost, , /tmp/, 10 -sink, username@localhost, , /tmp/, 1 +sampler, orybalch@localhost, , /tmp/, 1 +processor, orybalch@localhost, , /tmp/, 10 +sink, orybalch@localhost, , /tmp/, 1 diff --git a/examples/MQ/3-dds/runExample3Processor.cxx b/examples/MQ/3-dds/runExample3Processor.cxx index 16e771b0..e7b8e3bd 100644 --- a/examples/MQ/3-dds/runExample3Processor.cxx +++ b/examples/MQ/3-dds/runExample3Processor.cxx @@ -55,12 +55,12 @@ int main(int argc, char** argv) // configure data output channel FairMQChannel dataInChannel("pull", "connect", ""); dataInChannel.UpdateRateLogging(0); - processor.fChannels["data-in"].push_back(dataInChannel); + processor.fChannels["data1"].push_back(dataInChannel); // configure data output channel FairMQChannel dataOutChannel("push", "connect", ""); dataOutChannel.UpdateRateLogging(0); - processor.fChannels["data-out"].push_back(dataOutChannel); + processor.fChannels["data2"].push_back(dataOutChannel); // Waiting for DDS properties dds::key_value::CKeyValue ddsKeyValue; @@ -97,8 +97,8 @@ int main(int argc, char** argv) } } - processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second); - processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second); + processor.fChannels.at("data1").at(0).UpdateAddress(samplerValues.begin()->second); + processor.fChannels.at("data2").at(0).UpdateAddress(sinkValues.begin()->second); processor.ChangeState("INIT_DEVICE"); processor.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/3-dds/runExample3Sampler.cxx b/examples/MQ/3-dds/runExample3Sampler.cxx index cff16797..bd474b8b 100644 --- a/examples/MQ/3-dds/runExample3Sampler.cxx +++ b/examples/MQ/3-dds/runExample3Sampler.cxx @@ -64,7 +64,7 @@ int main(int argc, char** argv) // configure data output channel FairMQChannel dataOutChannel("push", "bind", ""); dataOutChannel.UpdateRateLogging(0); - sampler.fChannels["data-out"].push_back(dataOutChannel); + sampler.fChannels["data1"].push_back(dataOutChannel); // Get the IP of the current host and store it for binding. map IPs; @@ -85,7 +85,7 @@ int main(int argc, char** argv) // Configure the found host IP for the channel. // TCP port will be chosen randomly during the initialization (binding). - sampler.fChannels.at("data-out").at(0).UpdateAddress(initialOutputAddress); + sampler.fChannels.at("data1").at(0).UpdateAddress(initialOutputAddress); sampler.ChangeState("INIT_DEVICE"); sampler.WaitForInitialValidation(); @@ -93,7 +93,7 @@ int main(int argc, char** argv) // Advertise the bound addresses via DDS property LOG(INFO) << "Giving sampler output address to DDS."; dds::key_value::CKeyValue ddsKeyValue; - ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data-out").at(0).GetAddress()); + ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress()); sampler.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/3-dds/runExample3Sink.cxx b/examples/MQ/3-dds/runExample3Sink.cxx index 995a9536..7f424761 100644 --- a/examples/MQ/3-dds/runExample3Sink.cxx +++ b/examples/MQ/3-dds/runExample3Sink.cxx @@ -64,7 +64,7 @@ int main(int argc, char** argv) // configure data output channel FairMQChannel dataInChannel("pull", "bind", ""); dataInChannel.UpdateRateLogging(0); - sink.fChannels["data-in"].push_back(dataInChannel); + sink.fChannels["data2"].push_back(dataInChannel); // Get the IP of the current host and store it for binding. map IPs; @@ -85,7 +85,7 @@ int main(int argc, char** argv) // Configure the found host IP for the channel. // TCP port will be chosen randomly during the initialization (binding). - sink.fChannels.at("data-in").at(0).UpdateAddress(initialInputAddress); + sink.fChannels.at("data2").at(0).UpdateAddress(initialInputAddress); sink.ChangeState("INIT_DEVICE"); sink.WaitForInitialValidation(); @@ -93,7 +93,7 @@ int main(int argc, char** argv) // Advertise the bound address via DDS property LOG(INFO) << "Giving sink input address to DDS."; dds::key_value::CKeyValue ddsKeyValue; - ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data-in").at(0).GetAddress()); + ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress()); sink.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/4-copypush/FairMQExample4Sampler.cxx b/examples/MQ/4-copypush/FairMQExample4Sampler.cxx index 8bcad499..7632e5bf 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sampler.cxx +++ b/examples/MQ/4-copypush/FairMQExample4Sampler.cxx @@ -34,23 +34,23 @@ void FairMQExample4Sampler::Run() uint64_t* number = new uint64_t(counter); - std::unique_ptr msg(fTransportFactory->CreateMessage(number, sizeof(uint64_t))); + std::unique_ptr msg(NewMessage(number, sizeof(uint64_t))); LOG(INFO) << "Sending \"" << counter << "\""; - if (fChannels.at("data-out").size() > 1) + if (fChannels.at("data").size() > 1) { - for (int i = 1; i < fChannels.at("data-out").size(); ++i) + for (int i = 1; i < fChannels.at("data").size(); ++i) { - std::unique_ptr msgCopy(fTransportFactory->CreateMessage()); + std::unique_ptr msgCopy(NewMessage()); msgCopy->Copy(msg); - fChannels.at("data-out").at(i).Send(msgCopy); + Send(msgCopy, "data", i); } - fChannels.at("data-out").at(0).Send(msg); + Send(msg, "data"); } else { - fChannels.at("data-out").at(0).Send(msg); + Send(msg, "data"); } ++counter; diff --git a/examples/MQ/4-copypush/FairMQExample4Sink.cxx b/examples/MQ/4-copypush/FairMQExample4Sink.cxx index 2444cade..f0704951 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sink.cxx +++ b/examples/MQ/4-copypush/FairMQExample4Sink.cxx @@ -28,9 +28,9 @@ void FairMQExample4Sink::Run() { while (CheckCurrentState(RUNNING)) { - std::unique_ptr msg(fTransportFactory->CreateMessage()); + std::unique_ptr msg(NewMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) >= 0) + if (Receive(msg, "data") >= 0) { LOG(INFO) << "Received message: \"" << *(static_cast(msg->GetData())) << "\""; } diff --git a/examples/MQ/4-copypush/ex4-copypush.json b/examples/MQ/4-copypush/ex4-copypush.json index b5907cdb..8ebc2474 100644 --- a/examples/MQ/4-copypush/ex4-copypush.json +++ b/examples/MQ/4-copypush/ex4-copypush.json @@ -6,7 +6,7 @@ "id": "sampler1", "channel": { - "name": "data-out", + "name": "data", "socket": { "type": "push", @@ -33,7 +33,7 @@ "id": "sink1", "channel": { - "name": "data-in", + "name": "data", "socket": { "type": "pull", @@ -51,7 +51,7 @@ "id": "sink2", "channel": { - "name": "data-in", + "name": "data", "socket": { "type": "pull", diff --git a/examples/MQ/5-req-rep/FairMQExample5Client.cxx b/examples/MQ/5-req-rep/FairMQExample5Client.cxx index 4968c5bc..ac3947d8 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Client.cxx +++ b/examples/MQ/5-req-rep/FairMQExample5Client.cxx @@ -42,14 +42,14 @@ void FairMQExample5Client::Run() string* text = new string(fText); - unique_ptr request(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - unique_ptr reply(fTransportFactory->CreateMessage()); + unique_ptr request(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr reply(NewMessage()); LOG(INFO) << "Sending \"" << fText << "\" to server."; - if (fChannels.at("data").at(0).Send(request) > 0) + if (Send(request, "data") > 0) { - if (fChannels.at("data").at(0).Receive(reply) >= 0) + if (Receive(reply, "data") >= 0) { LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; } diff --git a/examples/MQ/5-req-rep/FairMQExample5Server.cxx b/examples/MQ/5-req-rep/FairMQExample5Server.cxx index 72c33c1f..9f5ea76d 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Server.cxx +++ b/examples/MQ/5-req-rep/FairMQExample5Server.cxx @@ -33,9 +33,9 @@ void FairMQExample5Server::Run() { while (CheckCurrentState(RUNNING)) { - unique_ptr request(fTransportFactory->CreateMessage()); + unique_ptr request(NewMessage()); - if (fChannels.at("data").at(0).Receive(request) >= 0) + if (Receive(request, "data") >= 0) { LOG(INFO) << "Received request from client: \"" << string(static_cast(request->GetData()), request->GetSize()) << "\""; @@ -43,9 +43,9 @@ void FairMQExample5Server::Run() LOG(INFO) << "Sending reply to client."; - unique_ptr reply(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr reply(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - fChannels.at("data").at(0).Send(reply); + Send(reply, "data"); } } } diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx index 5130d8f1..27f026c4 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx @@ -38,9 +38,9 @@ void FairMQExample6Broadcaster::Run() boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); string* text = new string("OK"); - unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - LOG(INFO) << "Sending \"" << "OK" << "\""; - fChannels.at("broadcast-out").at(0).Send(msg); + unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + LOG(INFO) << "Sending OK"; + Send(msg, "broadcast"); } } diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx index d54b074d..5103bc0e 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx @@ -34,37 +34,37 @@ void FairMQExample6Sampler::CustomCleanup(void *data, void *object) void FairMQExample6Sampler::Run() { - std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data-out", "broadcast-in" })); + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data", "broadcast" })); while (CheckCurrentState(RUNNING)) { poller->Poll(-1); - if (poller->CheckInput("broadcast-in", 0)) + if (poller->CheckInput("broadcast", 0)) { - unique_ptr msg(fTransportFactory->CreateMessage()); + unique_ptr msg(NewMessage()); - if (fChannels.at("broadcast-in").at(0).Receive(msg) > 0) + if (Receive(msg, "broadcast") > 0) { LOG(INFO) << "Received broadcast: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; } - } // if (poller->CheckInput("broadcast-in", 0)) + } - if (poller->CheckOutput("data-out", 0)) + if (poller->CheckOutput("data", 0)) { boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); string* text = new string(fText); - unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); LOG(INFO) << "Sending \"" << fText << "\""; - fChannels.at("data-out").at(0).Send(msg); - } // if (poller->CheckOutput("data-out", 0)) - } // while (CheckCurrentState(RUNNING)) + Send(msg, "data"); + } + } } FairMQExample6Sampler::~FairMQExample6Sampler() diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx index f28ee077..de757a06 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx @@ -24,33 +24,29 @@ FairMQExample6Sink::FairMQExample6Sink() void FairMQExample6Sink::Run() { - std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data-in", "broadcast-in" })); + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data", "broadcast" })); while (CheckCurrentState(RUNNING)) { poller->Poll(-1); - if (poller->CheckInput("broadcast-in", 0)) + if (poller->CheckInput("broadcast", 0)) { - unique_ptr msg(fTransportFactory->CreateMessage()); + unique_ptr msg(NewMessage()); - if (fChannels.at("broadcast-in").at(0).Receive(msg) > 0) + if (Receive(msg, "broadcast") > 0) { - LOG(INFO) << "Received broadcast: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; + LOG(INFO) << "Received broadcast: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; } } - if (poller->CheckInput("data-in", 0)) + if (poller->CheckInput("data", 0)) { - unique_ptr msg(fTransportFactory->CreateMessage()); + unique_ptr msg(NewMessage()); - if (fChannels.at("data-in").at(0).Receive(msg) > 0) + if (Receive(msg, "data") > 0) { - LOG(INFO) << "Received message: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; + LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; } } } diff --git a/examples/MQ/6-multiple-channels/ex6-multiple-channels.json b/examples/MQ/6-multiple-channels/ex6-multiple-channels.json index 7479f2bd..d59e34da 100644 --- a/examples/MQ/6-multiple-channels/ex6-multiple-channels.json +++ b/examples/MQ/6-multiple-channels/ex6-multiple-channels.json @@ -6,7 +6,7 @@ "id": "sampler1", "channel": { - "name": "data-out", + "name": "data", "socket": { "type": "push", @@ -19,7 +19,7 @@ }, "channel": { - "name": "broadcast-in", + "name": "broadcast", "socket": { "type": "sub", @@ -37,7 +37,7 @@ "id": "sink1", "channel": { - "name": "data-in", + "name": "data", "socket": { "type": "pull", @@ -50,7 +50,7 @@ }, "channel": { - "name": "broadcast-in", + "name": "broadcast", "socket": { "type": "sub", @@ -68,7 +68,7 @@ "id": "broadcaster1", "channel": { - "name": "broadcast-out", + "name": "broadcast", "socket": { "type": "pub", diff --git a/examples/MQ/8-multipart/FairMQExample8Sampler.cxx b/examples/MQ/8-multipart/FairMQExample8Sampler.cxx index 5cc47996..6e4123f7 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sampler.cxx +++ b/examples/MQ/8-multipart/FairMQExample8Sampler.cxx @@ -39,17 +39,14 @@ void FairMQExample8Sampler::Run() // Set stopFlag to 1 for the first 4 messages, and to 0 for the 5th. counter < 5 ? header->stopFlag = 0 : header->stopFlag = 1; - // Create message part with the header. - unique_ptr headerPart(fTransportFactory->CreateMessage(header, sizeof(Ex8Header))); - // Create message part with the body of 1000 bytes size. - unique_ptr dataPart(fTransportFactory->CreateMessage(1000)); + FairMQParts parts; + parts.AddPart(NewMessage(header, sizeof(Ex8Header))); + parts.AddPart(NewMessage(1000)); LOG(INFO) << "Sending header with stopFlag: " << header->stopFlag; + LOG(INFO) << "Sending body of size: " << parts.At(1).GetSize(); - // Schedule the header part for sending. - fChannels.at("data-out").at(0).SendPart(headerPart); - // Add body part (final part). `Send()` will send/queue all parts. - fChannels.at("data-out").at(0).Send(dataPart); + Send(parts, "data-out"); // Go out of the sending loop if the stopFlag was sent. if (counter == 5) diff --git a/examples/MQ/8-multipart/FairMQExample8Sink.cxx b/examples/MQ/8-multipart/FairMQExample8Sink.cxx index 0dc83784..0791eb4a 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sink.cxx +++ b/examples/MQ/8-multipart/FairMQExample8Sink.cxx @@ -32,21 +32,18 @@ void FairMQExample8Sink::Run() { while (CheckCurrentState(RUNNING)) { - unique_ptr headerPart(fTransportFactory->CreateMessage()); - unique_ptr bodyPart(fTransportFactory->CreateMessage()); + FairMQParts parts; - if (fChannels.at("data-in").at(0).Receive(headerPart) >= 0) + if (Receive(parts, "data-in") >= 0) { - if (fChannels.at("data-in").at(0).Receive(bodyPart) >= 0) + Ex8Header header; + header.stopFlag = (static_cast(parts.At(0).GetData()))->stopFlag; + LOG(INFO) << "Received header with stopFlag: " << header.stopFlag; + LOG(INFO) << "Received body of size: " << parts.At(1).GetSize(); + if (header.stopFlag == 1) { - Ex8Header header; - header.stopFlag = (static_cast(headerPart->GetData()))->stopFlag; - LOG(INFO) << "Received header with stopFlag: " << header.stopFlag; - if (header.stopFlag == 1) - { - LOG(INFO) << "Flag is 0, exiting Run()"; - break; - } + LOG(INFO) << "Flag is 0, exiting Run()"; + break; } } }