Add empty msg check for transport compatibility checker

This commit is contained in:
Alexey Rybalchenko 2021-06-24 10:50:37 +02:00 committed by Dennis Klein
parent a8bdb91165
commit 4dbb5535c3
4 changed files with 46 additions and 32 deletions

View File

@ -385,6 +385,7 @@ class FairMQChannel
void CheckSendCompatibility(FairMQMessagePtr& msg) void CheckSendCompatibility(FairMQMessagePtr& msg)
{ {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
if (msg->GetSize() > 0) {
FairMQMessagePtr msgWrapper(NewMessage( FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(), msg->GetData(),
msg->GetSize(), msg->GetSize(),
@ -393,6 +394,10 @@ class FairMQChannel
)); ));
msg.release(); msg.release();
msg = move(msgWrapper); msg = move(msgWrapper);
} else {
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
} }
} }
@ -400,7 +405,7 @@ class FairMQChannel
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
if (msg->GetSize() > 0) {
FairMQMessagePtr msgWrapper(NewMessage( FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(), msg->GetData(),
msg->GetSize(), msg->GetSize(),
@ -409,6 +414,10 @@ class FairMQChannel
)); ));
msg.release(); msg.release();
msg = move(msgWrapper); msg = move(msgWrapper);
} else {
FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
} }
} }
} }
@ -425,7 +434,6 @@ class FairMQChannel
{ {
for (auto& msg : msgVec) { for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) { if (fTransportType != msg->GetType()) {
FairMQMessagePtr newMsg(NewMessage()); FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg); msg = move(newMsg);
} }

View File

@ -27,15 +27,13 @@ class Rep : public FairMQDevice
auto Run() -> void override auto Run() -> void override
{ {
auto request1 = FairMQMessagePtr{NewMessage()}; auto request1 = FairMQMessagePtr{NewMessage()};
if (Receive(request1, "data") >= 0) if (Receive(request1, "data") >= 0) {
{
LOG(info) << "Received request 1"; LOG(info) << "Received request 1";
auto reply = FairMQMessagePtr{NewMessage()}; auto reply = FairMQMessagePtr{NewMessage()};
Send(reply, "data"); Send(reply, "data");
} }
auto request2 = FairMQMessagePtr{NewMessage()}; auto request2 = FairMQMessagePtr{NewMessage()};
if (Receive(request2, "data") >= 0) if (Receive(request2, "data") >= 0) {
{
LOG(info) << "Received request 2"; LOG(info) << "Received request 2";
auto reply = FairMQMessagePtr{NewMessage()}; auto reply = FairMQMessagePtr{NewMessage()};
Send(reply, "data"); Send(reply, "data");

View File

@ -30,8 +30,7 @@ class Req : public FairMQDevice
Send(request, "data"); Send(request, "data");
auto reply = FairMQMessagePtr{NewMessage()}; auto reply = FairMQMessagePtr{NewMessage()};
if (Receive(reply, "data") >= 0) if (Receive(reply, "data") >= 0) {
{
LOG(info) << "received reply"; LOG(info) << "received reply";
} }
}; };

View File

@ -27,24 +27,33 @@ auto RunReqRep(string transport) -> void
auto rep = execute_result{"", 0}; auto rep = execute_result{"", 0};
thread rep_thread([&]() { thread rep_thread([&]() {
stringstream cmd; stringstream cmd;
cmd << runTestDevice << " --id rep_" << transport << " --control static " cmd << runTestDevice << " --id rep_" << transport
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\""; << " --control static"
<< " --session " << session
<< " --color false"
<< " --mq-config \"" << mqConfig << "\"";
rep = execute(cmd.str(), "[REP]"); rep = execute(cmd.str(), "[REP]");
}); });
auto req1 = execute_result{"", 0}; auto req1 = execute_result{"", 0};
thread req1_thread([&]() { thread req1_thread([&]() {
stringstream cmd; stringstream cmd;
cmd << runTestDevice << " --id req_1" << transport << " --control static " cmd << runTestDevice << " --id req_1" << transport
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\""; << " --control static"
<< " --session " << session
<< " --color false"
<< " --mq-config \"" << mqConfig << "\"";
req1 = execute(cmd.str(), "[REQ1]"); req1 = execute(cmd.str(), "[REQ1]");
}); });
auto req2 = execute_result{"", 0}; auto req2 = execute_result{"", 0};
thread req2_thread([&]() { thread req2_thread([&]() {
stringstream cmd; stringstream cmd;
cmd << runTestDevice << " --id req_2" << transport << " --control static " cmd << runTestDevice << " --id req_2" << transport
<< "--session " << session << " --color false --mq-config \"" << mqConfig << "\""; << " --control static"
<< " --session " << session
<< " --color false"
<< " --mq-config \"" << mqConfig << "\"";
req2 = execute(cmd.str(), "[REQ2]"); req2 = execute(cmd.str(), "[REQ2]");
}); });