From 5b5fecc9940dfc6a722abde8713131d635cabb5d Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 19 Dec 2019 12:52:28 +0100 Subject: [PATCH] Extend multipart tests to include single part, transfer across >1 channel --- test/protocols/_push_pull_multipart.cxx | 141 ++++++++++++++++-------- 1 file changed, 92 insertions(+), 49 deletions(-) diff --git a/test/protocols/_push_pull_multipart.cxx b/test/protocols/_push_pull_multipart.cxx index 974715c8..84246981 100644 --- a/test/protocols/_push_pull_multipart.cxx +++ b/test/protocols/_push_pull_multipart.cxx @@ -25,45 +25,86 @@ namespace using namespace std; -auto RunSingleThreadedMultipart(string transport, string address) -> void { +auto RunSingleThreadedMultipart(string transport, string address1, string address2) -> void { size_t session{fair::mq::tools::UuidHash()}; fair::mq::ProgOptions config; config.SetProperty("session", std::to_string(session)); + auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); - FairMQTransportFactory* factoryptr = factory.get(); - FairMQChannel push("Push", "push", factory); - ASSERT_TRUE(push.Bind(address)); - FairMQChannel pull("Pull", "pull", factory); - pull.Connect(address); + + FairMQChannel push1("Push1", "push", factory); + ASSERT_TRUE(push1.Bind(address1)); + FairMQChannel pull1("Pull1", "pull", factory); + ASSERT_TRUE(pull1.Connect(address1)); + FairMQChannel push2("Push2", "push", factory); + ASSERT_TRUE(push2.Bind(address2)); + FairMQChannel pull2("Pull2", "pull", factory); + ASSERT_TRUE(pull2.Connect(address2)); // TODO validate that fTransportFactory is not nullptr // TODO validate that fSocket is not nullptr - ASSERT_TRUE(push.Validate()); - ASSERT_TRUE(pull.Validate()); + ASSERT_TRUE(push1.Validate()); + ASSERT_TRUE(pull1.Validate()); + ASSERT_TRUE(push2.Validate()); + ASSERT_TRUE(pull2.Validate()); { - auto sentMsg = FairMQParts{}; - sentMsg.AddPart(push.NewSimpleMessage("1")); - sentMsg.AddPart(push.NewSimpleMessage("2")); - sentMsg.AddPart(push.NewSimpleMessage("3")); + FairMQParts multiplePartsOut; + multiplePartsOut.AddPart(push1.NewSimpleMessage("1")); + multiplePartsOut.AddPart(push1.NewSimpleMessage("2")); + multiplePartsOut.AddPart(push1.NewSimpleMessage("3")); + ASSERT_GE(push1.Send(multiplePartsOut), 0); - ASSERT_GE(push.Send(sentMsg), 0); + FairMQParts singlePartOut; + singlePartOut.AddPart(push1.NewSimpleMessage("4")); + ASSERT_GE(push1.Send(singlePartOut), 0); } - auto receivedMsg = FairMQParts{}; - ASSERT_GE(pull.Receive(receivedMsg), 0); + FairMQParts multipleParts; + ASSERT_GE(pull1.Receive(multipleParts), 0); - stringstream out; - for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out,&factoryptr](const FairMQMessagePtr& part) { - out << string{static_cast(part->GetData()), part->GetSize()}; - ASSERT_EQ(part->GetTransport(),factoryptr); + stringstream multiple; + for_each(multipleParts.cbegin(), multipleParts.cend(), [&multiple, &factory](const FairMQMessagePtr& part) { + multiple << string{static_cast(part->GetData()), part->GetSize()}; + ASSERT_EQ(part->GetTransport(), factory.get()); }); - ASSERT_EQ(out.str(), "123"); + ASSERT_EQ(multiple.str(), "123"); + + FairMQParts singlePart; + ASSERT_GE(pull1.Receive(singlePart), 0); + + stringstream single; + for_each(singlePart.cbegin(), singlePart.cend(), [&single](const FairMQMessagePtr& part) { + single << string{static_cast(part->GetData()), part->GetSize()}; + }); + ASSERT_EQ(single.str(), "4"); + + ASSERT_GE(push2.Send(singlePart), 0); + ASSERT_GE(push2.Send(multipleParts), 0); + + { + FairMQParts singlePartIn; + ASSERT_GE(pull2.Receive(singlePartIn), 0); + stringstream singleIn; + for_each(singlePartIn.cbegin(), singlePartIn.cend(), [&singleIn](const FairMQMessagePtr& part) { + singleIn << string{static_cast(part->GetData()), part->GetSize()}; + }); + ASSERT_EQ(singleIn.str(), "4"); + + FairMQParts multiplePartsIn; + ASSERT_GE(pull2.Receive(multiplePartsIn), 0); + stringstream multipleIn; + for_each(multiplePartsIn.cbegin(), multiplePartsIn.cend(), [&multipleIn, &factory](const FairMQMessagePtr& part) { + multipleIn << string{static_cast(part->GetData()), part->GetSize()}; + ASSERT_EQ(part->GetTransport(), factory.get()); + }); + ASSERT_EQ(multipleIn.str(), "123"); + } } -auto RunMultiThreadedMultipart(string transport, string address) -> void +auto RunMultiThreadedMultipart(string transport, string address1) -> void { size_t session{fair::mq::tools::UuidHash()}; @@ -71,31 +112,33 @@ auto RunMultiThreadedMultipart(string transport, string address) -> void config.SetProperty("session", std::to_string(session)); config.SetProperty("io-threads", 1); config.SetProperty("shm-segment-size", 20000000); + auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); - FairMQChannel push("Push", "push", factory); - ASSERT_TRUE(push.Bind(address)); - FairMQChannel pull("Pull", "pull", factory); - pull.Connect(address); - auto pusher = thread{[&push](){ - ASSERT_TRUE(push.Validate()); + FairMQChannel push1("Push1", "push", factory); + ASSERT_TRUE(push1.Bind(address1)); + FairMQChannel pull1("Pull1", "pull", factory); + ASSERT_TRUE(pull1.Connect(address1)); - auto sentMsg = FairMQParts{}; - sentMsg.AddPart(push.NewSimpleMessage("1")); - sentMsg.AddPart(push.NewSimpleMessage("2")); - sentMsg.AddPart(push.NewSimpleMessage("3")); + auto pusher = thread{[&push1](){ + ASSERT_TRUE(push1.Validate()); - ASSERT_GE(push.Send(sentMsg), 0); + FairMQParts sent; + sent.AddPart(push1.NewSimpleMessage("1")); + sent.AddPart(push1.NewSimpleMessage("2")); + sent.AddPart(push1.NewSimpleMessage("3")); + + ASSERT_GE(push1.Send(sent), 0); }}; - auto puller = thread{[&pull](){ - ASSERT_TRUE(pull.Validate()); + auto puller = thread{[&pull1](){ + ASSERT_TRUE(pull1.Validate()); - auto receivedMsg = FairMQParts{}; - ASSERT_GE(pull.Receive(receivedMsg), 0); + FairMQParts received; + ASSERT_GE(pull1.Receive(received), 0); stringstream out; - for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out](const FairMQMessagePtr& part) { + for_each(received.cbegin(), received.cend(), [&out](const FairMQMessagePtr& part) { out << string{static_cast(part->GetData()), part->GetSize()}; }); ASSERT_EQ(out.str(), "123"); @@ -107,69 +150,69 @@ auto RunMultiThreadedMultipart(string transport, string address) -> void TEST(PushPull, Multipart_ST_inproc_zeromq) { - RunSingleThreadedMultipart("zeromq", "inproc://test"); + RunSingleThreadedMultipart("zeromq", "inproc://test1", "inproc://test2"); } TEST(PushPull, Multipart_ST_inproc_shmem) { - RunSingleThreadedMultipart("shmem", "inproc://test"); + RunSingleThreadedMultipart("shmem", "inproc://test1", "inproc://test2"); } #ifdef BUILD_NANOMSG_TRANSPORT TEST(PushPull, Multipart_ST_inproc_nanomsg) { - RunSingleThreadedMultipart("nanomsg", "inproc://test"); + RunSingleThreadedMultipart("nanomsg", "inproc://test1", "inproc://test2"); } #endif /* BUILD_NANOMSG_TRANSPORT */ TEST(PushPull, Multipart_ST_ipc_zeromq) { - RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq"); + RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2"); } TEST(PushPull, Multipart_ST_ipc_shmen) { - RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmen"); + RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmen_1", "ipc://test_Multipart_ST_ipc_shmen_2"); } #ifdef BUILD_NANOMSG_TRANSPORT TEST(PushPull, Multipart_ST_ipc_nanomsg) { - RunSingleThreadedMultipart("nanomsg", "ipc://test_Multipart_ST_ipc_nanomsg"); + RunSingleThreadedMultipart("nanomsg", "ipc://test_Multipart_ST_ipc_nanomsg_1", "ipc://test_Multipart_ST_ipc_nanomsg_2"); } #endif /* BUILD_NANOMSG_TRANSPORT */ TEST(PushPull, Multipart_MT_inproc_zeromq) { - RunMultiThreadedMultipart("zeromq", "inproc://test"); + RunMultiThreadedMultipart("zeromq", "inproc://test_1"); } TEST(PushPull, Multipart_MT_inproc_shmem) { - RunMultiThreadedMultipart("shmem", "inproc://test"); + RunMultiThreadedMultipart("shmem", "inproc://test_1"); } #ifdef BUILD_NANOMSG_TRANSPORT TEST(PushPull, Multipart_MT_inproc_nanomsg) { - RunMultiThreadedMultipart("nanomsg", "inproc://test"); + RunMultiThreadedMultipart("nanomsg", "inproc://test_1"); } #endif /* BUILD_NANOMSG_TRANSPORT */ TEST(PushPull, Multipart_MT_ipc_zeromq) { - RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq"); + RunMultiThreadedMultipart("zeromq", "ipc://test_Multipart_MT_ipc_zeromq_1"); } TEST(PushPull, Multipart_MT_ipc_shmem) { - RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem"); + RunMultiThreadedMultipart("shmem", "ipc://test_Multipart_MT_ipc_shmem_1"); } #ifdef BUILD_NANOMSG_TRANSPORT TEST(PushPull, Multipart_MT_ipc_nanomsg) { - RunMultiThreadedMultipart("nanomsg", "ipc://test_Multipart_MT_ipc_nanomsg"); + RunMultiThreadedMultipart("nanomsg", "ipc://test_Multipart_MT_ipc_nanomsg_1"); } #endif /* BUILD_NANOMSG_TRANSPORT */