mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
FairMQ: Extend Multipart and messaging API
- Extend the multipart API to allow sending vectors of messages or helper thin wrapper FairMQParts. See example in examples/MQ/8-multipart. - NewMessage() can be used in devices instead of fTransportFactory->CreateMessage(). Possible arguments remain unchanged (no args, size or data+size). - Send()/Receive() methods can be used in devices instead of fChannels.at("chan").at(i).Send()/Receive(): Send(msg, "chan", i = 0), Receive(msg, "chan", i = 0). - Use the new methods in MQ examples and tests. - No breaking changes, but FAIRMQ_INTERFACE_VERSION is incremented to 3 to allow to check for new methods.
This commit is contained in:
committed by
Mohammad Al-Turany
parent
1bb72fea38
commit
bbadf09aad
@@ -34,11 +34,11 @@ void FairMQExample3Processor::Run()
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
// Create empty message to hold the input
|
||||
unique_ptr<FairMQMessage> input(fTransportFactory->CreateMessage());
|
||||
unique_ptr<FairMQMessage> input(NewMessage());
|
||||
|
||||
// Receive the message (blocks until received or interrupted (e.g. by state change)).
|
||||
// Returns size of the received message or -1 if interrupted.
|
||||
if (fChannels.at("data-in").at(0).Receive(input) >= 0)
|
||||
if (Receive(input, "data1") >= 0)
|
||||
{
|
||||
LOG(INFO) << "Received data, processing...";
|
||||
|
||||
@@ -47,10 +47,10 @@ void FairMQExample3Processor::Run()
|
||||
*text += " (modified by " + fId + ")";
|
||||
|
||||
// Create output message
|
||||
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
|
||||
unique_ptr<FairMQMessage> msg(NewMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
|
||||
|
||||
// Send out the output message
|
||||
fChannels.at("data-out").at(0).Send(msg);
|
||||
Send(msg, "data2");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -37,11 +37,11 @@ void FairMQExample3Sampler::Run()
|
||||
|
||||
string* text = new string("Data");
|
||||
|
||||
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
|
||||
unique_ptr<FairMQMessage> msg(NewMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
|
||||
|
||||
LOG(INFO) << "Sending \"Data\"";
|
||||
|
||||
fChannels.at("data-out").at(0).Send(msg);
|
||||
Send(msg, "data1");
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -28,9 +28,9 @@ void FairMQExample3Sink::Run()
|
||||
{
|
||||
while (CheckCurrentState(RUNNING))
|
||||
{
|
||||
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
|
||||
unique_ptr<FairMQMessage> msg(NewMessage());
|
||||
|
||||
if (fChannels.at("data-in").at(0).Receive(msg) >= 0)
|
||||
if (Receive(msg, "data2") >= 0)
|
||||
{
|
||||
LOG(INFO) << "Received message: \""
|
||||
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
|
||||
|
@@ -3,6 +3,6 @@ echo "DBG: SSH ENV Script"
|
||||
#source setup.sh
|
||||
@bash_end@
|
||||
|
||||
sampler, username@localhost, , /tmp/, 1
|
||||
processor, username@localhost, , /tmp/, 10
|
||||
sink, username@localhost, , /tmp/, 1
|
||||
sampler, orybalch@localhost, , /tmp/, 1
|
||||
processor, orybalch@localhost, , /tmp/, 10
|
||||
sink, orybalch@localhost, , /tmp/, 1
|
||||
|
@@ -55,12 +55,12 @@ int main(int argc, char** argv)
|
||||
// configure data output channel
|
||||
FairMQChannel dataInChannel("pull", "connect", "");
|
||||
dataInChannel.UpdateRateLogging(0);
|
||||
processor.fChannels["data-in"].push_back(dataInChannel);
|
||||
processor.fChannels["data1"].push_back(dataInChannel);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataOutChannel("push", "connect", "");
|
||||
dataOutChannel.UpdateRateLogging(0);
|
||||
processor.fChannels["data-out"].push_back(dataOutChannel);
|
||||
processor.fChannels["data2"].push_back(dataOutChannel);
|
||||
|
||||
// Waiting for DDS properties
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
@@ -97,8 +97,8 @@ int main(int argc, char** argv)
|
||||
}
|
||||
}
|
||||
|
||||
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
|
||||
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
|
||||
processor.fChannels.at("data1").at(0).UpdateAddress(samplerValues.begin()->second);
|
||||
processor.fChannels.at("data2").at(0).UpdateAddress(sinkValues.begin()->second);
|
||||
|
||||
processor.ChangeState("INIT_DEVICE");
|
||||
processor.WaitForEndOfState("INIT_DEVICE");
|
||||
|
@@ -64,7 +64,7 @@ int main(int argc, char** argv)
|
||||
// configure data output channel
|
||||
FairMQChannel dataOutChannel("push", "bind", "");
|
||||
dataOutChannel.UpdateRateLogging(0);
|
||||
sampler.fChannels["data-out"].push_back(dataOutChannel);
|
||||
sampler.fChannels["data1"].push_back(dataOutChannel);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
@@ -85,7 +85,7 @@ int main(int argc, char** argv)
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sampler.fChannels.at("data-out").at(0).UpdateAddress(initialOutputAddress);
|
||||
sampler.fChannels.at("data1").at(0).UpdateAddress(initialOutputAddress);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForInitialValidation();
|
||||
@@ -93,7 +93,7 @@ int main(int argc, char** argv)
|
||||
// Advertise the bound addresses via DDS property
|
||||
LOG(INFO) << "Giving sampler output address to DDS.";
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
|
||||
ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress());
|
||||
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
|
@@ -64,7 +64,7 @@ int main(int argc, char** argv)
|
||||
// configure data output channel
|
||||
FairMQChannel dataInChannel("pull", "bind", "");
|
||||
dataInChannel.UpdateRateLogging(0);
|
||||
sink.fChannels["data-in"].push_back(dataInChannel);
|
||||
sink.fChannels["data2"].push_back(dataInChannel);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
@@ -85,7 +85,7 @@ int main(int argc, char** argv)
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sink.fChannels.at("data-in").at(0).UpdateAddress(initialInputAddress);
|
||||
sink.fChannels.at("data2").at(0).UpdateAddress(initialInputAddress);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForInitialValidation();
|
||||
@@ -93,7 +93,7 @@ int main(int argc, char** argv)
|
||||
// Advertise the bound address via DDS property
|
||||
LOG(INFO) << "Giving sink input address to DDS.";
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data-in").at(0).GetAddress());
|
||||
ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress());
|
||||
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
|
Reference in New Issue
Block a user