Run state handlers on the main thread (breaking change for control).

This commit is contained in:
Alexey Rybalchenko
2018-06-14 13:38:21 +02:00
committed by Dennis Klein
parent c064da91df
commit a53ef79552
11 changed files with 284 additions and 270 deletions

View File

@@ -12,6 +12,7 @@
#include <gtest/gtest.h>
#include <string>
#include <thread>
#include <future> // std::async, std::future
namespace
@@ -19,6 +20,24 @@ namespace
using namespace std;
void control(FairMQDevice& device)
{
device.ChangeState("INIT_DEVICE");
device.WaitForEndOfState("INIT_DEVICE");
device.ChangeState("INIT_TASK");
device.WaitForEndOfState("INIT_TASK");
device.ChangeState("RUN");
device.WaitForEndOfState("RUN");
device.ChangeState("RESET_TASK");
device.WaitForEndOfState("RESET_TASK");
device.ChangeState("RESET_DEVICE");
device.WaitForEndOfState("RESET_DEVICE");
device.ChangeState("END");
}
class MultipleDevices : public ::testing::Test {
public:
MultipleDevices()
@@ -34,20 +53,14 @@ class MultipleDevices : public ::testing::Test {
channel.UpdateRateLogging(0);
sender.fChannels["data"].push_back(channel);
sender.ChangeState("INIT_DEVICE");
sender.WaitForEndOfState("INIT_DEVICE");
sender.ChangeState("INIT_TASK");
sender.WaitForEndOfState("INIT_TASK");
thread t(control, std::ref(sender));
sender.ChangeState("RUN");
sender.WaitForEndOfState("RUN");
sender.RunStateMachine();
sender.ChangeState("RESET_TASK");
sender.WaitForEndOfState("RESET_TASK");
sender.ChangeState("RESET_DEVICE");
sender.WaitForEndOfState("RESET_DEVICE");
sender.ChangeState("END");
if (t.joinable())
{
t.join();
}
return true;
}
@@ -62,20 +75,14 @@ class MultipleDevices : public ::testing::Test {
channel.UpdateRateLogging(0);
receiver.fChannels["data"].push_back(channel);
receiver.ChangeState("INIT_DEVICE");
receiver.WaitForEndOfState("INIT_DEVICE");
receiver.ChangeState("INIT_TASK");
receiver.WaitForEndOfState("INIT_TASK");
thread t(control, std::ref(receiver));
receiver.ChangeState("RUN");
receiver.WaitForEndOfState("RUN");
receiver.RunStateMachine();
receiver.ChangeState("RESET_TASK");
receiver.WaitForEndOfState("RESET_TASK");
receiver.ChangeState("RESET_DEVICE");
receiver.WaitForEndOfState("RESET_DEVICE");
receiver.ChangeState("END");
if (t.joinable())
{
t.join();
}
return true;
}

View File

@@ -14,6 +14,7 @@
#include <FairMQDevice.h>
#include <options/FairMQProgOptions.h>
#include <memory>
#include <thread>
namespace fair
{
@@ -36,21 +37,28 @@ inline auto control(std::shared_ptr<FairMQDevice> device) -> void
struct PluginServices : ::testing::Test {
PluginServices()
: mConfig()
, mDevice{std::make_shared<FairMQDevice>()}
, mServices{&mConfig, mDevice}
: mConfig()
, mDevice{std::make_shared<FairMQDevice>()}
, mServices{&mConfig, mDevice}
, fRunStateMachineThread()
{
fRunStateMachineThread = std::thread(&FairMQDevice::RunStateMachine, mDevice.get());
mDevice->SetTransport("zeromq");
}
~PluginServices()
{
if(mDevice->GetCurrentState() == FairMQDevice::IDLE) control(mDevice);
if (mDevice->GetCurrentState() == FairMQDevice::IDLE) control(mDevice);
if (fRunStateMachineThread.joinable()) {
fRunStateMachineThread.join();
}
}
FairMQProgOptions mConfig;
std::shared_ptr<FairMQDevice> mDevice;
fair::mq::PluginServices mServices;
std::thread fRunStateMachineThread;
};
} /* namespace test */

View File

@@ -16,6 +16,7 @@
#include <sstream>
#include <string>
#include <vector>
#include <thread>
namespace
{
@@ -38,7 +39,7 @@ auto control(shared_ptr<FairMQDevice> device) -> void
TEST(Plugin, Operators)
{
FairMQProgOptions config{};
FairMQProgOptions config;
auto device = make_shared<FairMQDevice>();
PluginServices services{&config, device};
Plugin p1{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", &services};
@@ -46,19 +47,27 @@ TEST(Plugin, Operators)
Plugin p3{"file", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/file.git", &services};
EXPECT_EQ(p1, p2);
EXPECT_NE(p1, p3);
control(device);
thread t(control, device);
device->RunStateMachine();
if (t.joinable()) {
t.join();
}
}
TEST(Plugin, OstreamOperators)
{
FairMQProgOptions config{};
FairMQProgOptions config;
auto device = make_shared<FairMQDevice>();
PluginServices services{&config, device};
Plugin p1{"dds", {1, 0, 0}, "Foo Bar <foo.bar@test.net>", "https://git.test.net/dds.git", &services};
stringstream ss;
ss << p1;
EXPECT_EQ(ss.str(), string{"'dds', version '1.0.0', maintainer 'Foo Bar <foo.bar@test.net>', homepage 'https://git.test.net/dds.git'"});
control(device);
thread t(control, device);
device->RunStateMachine();
if (t.joinable()) {
t.join();
}
}
TEST(PluginVersion, Operators)

View File

@@ -15,6 +15,7 @@
#include <fstream>
#include <memory>
#include <vector>
#include <thread>
namespace
{
@@ -39,8 +40,8 @@ auto control(shared_ptr<FairMQDevice> device) -> void
TEST(PluginManager, LoadPluginDynamic)
{
FairMQProgOptions config{};
auto mgr = PluginManager{};
FairMQProgOptions config;
PluginManager mgr;
auto device = make_shared<FairMQDevice>();
mgr.EmplacePluginServices(&config, device);
@@ -53,7 +54,7 @@ TEST(PluginManager, LoadPluginDynamic)
// check order
const auto expected = vector<string>{"test_dummy", "test_dummy2"};
auto actual = vector<string>{};
auto actual = vector<string>();
mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); });
ASSERT_TRUE(actual == expected);
@@ -62,18 +63,22 @@ TEST(PluginManager, LoadPluginDynamic)
mgr.ForEachPluginProgOptions([&count](const options_description& /*d*/){ ++count; });
ASSERT_EQ(count, 1);
control(device);
thread t(control, device);
device->RunStateMachine();
if (t.joinable()) {
t.join();
}
}
TEST(PluginManager, LoadPluginStatic)
{
auto mgr = PluginManager{};
PluginManager mgr;
auto device = make_shared<FairMQDevice>();
device->SetTransport("zeromq");
ASSERT_NO_THROW(mgr.LoadPlugin("s:control"));
FairMQProgOptions config{};
FairMQProgOptions config;
config.SetValue<string>("control", "static");
config.SetValue("catch-signals", 0);
mgr.EmplacePluginServices(&config, device);
@@ -82,7 +87,7 @@ TEST(PluginManager, LoadPluginStatic)
// check order
const auto expected = vector<string>{"control"};
auto actual = vector<string>{};
auto actual = vector<string>();
mgr.ForEachPlugin([&](Plugin& plugin){ actual.push_back(plugin.GetName()); });
ASSERT_TRUE(actual == expected);
@@ -91,6 +96,8 @@ TEST(PluginManager, LoadPluginStatic)
mgr.ForEachPluginProgOptions([&count](const options_description&){ ++count; });
ASSERT_EQ(count, 1);
device->RunStateMachine();
mgr.WaitForPluginsToReleaseDeviceControl();
}
@@ -112,7 +119,7 @@ TEST(PluginManager, SearchPathValidation)
const auto path1 = path{"/tmp/test1"};
const auto path2 = path{"/tmp/test2"};
const auto path3 = path{"/tmp/test3"};
auto mgr = PluginManager{};
PluginManager mgr;
mgr.SetSearchPaths({path1, path2});
auto expected = vector<path>{path1, path2};
@@ -140,7 +147,7 @@ TEST(PluginManager, SearchPaths)
fs.close();
const auto empty_path = path{""};
auto mgr = PluginManager{};
PluginManager mgr;
ASSERT_NO_THROW(mgr.AppendSearchPath(non_existing_dir));
ASSERT_NO_THROW(mgr.AppendSearchPath(existing_dir));
ASSERT_THROW(mgr.AppendSearchPath(existing_file), PluginManager::BadSearchPath);

View File

@@ -17,7 +17,7 @@ using namespace std;
TEST(PluginManager, LoadPluginPrelinkedDynamic)
{
auto mgr = PluginManager{};
PluginManager mgr;
ASSERT_NO_THROW(mgr.LoadPlugin("p:test_dummy"));
ASSERT_NO_THROW(mgr.LoadPlugin("p:test_dummy2"));