mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
9 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
7cacf471b9 | ||
|
7316b0e7f2 | ||
|
1fa82f5f22 | ||
|
1bb77bf47b | ||
|
07fe02a0a0 | ||
|
9cbccface7 | ||
|
7b773cde51 | ||
|
fd282fa950 | ||
|
008be36125 |
@@ -29,7 +29,7 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
|
||||
Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON")
|
||||
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_SDK=OFF")
|
||||
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
|
||||
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
|
||||
|
||||
|
@@ -37,6 +37,12 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-dds-env.sh ${CMAKE_CURRENT_
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh @ONLY)
|
||||
|
||||
# test
|
||||
add_test(NAME Example.DDS.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh localhost)
|
||||
set_tests_properties(Example.DDS.localhost PROPERTIES
|
||||
TIMEOUT 15
|
||||
RUN_SERIAL true
|
||||
PASS_REGULAR_EXPRESSION "Example successful"
|
||||
)
|
||||
|
||||
# install
|
||||
|
||||
|
@@ -8,8 +8,8 @@
|
||||
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
||||
|
||||
<decltask name="Sampler">
|
||||
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
|
||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
|
||||
<requirements>
|
||||
<name>SamplerWorker</name>
|
||||
</requirements>
|
||||
@@ -19,10 +19,10 @@
|
||||
</decltask>
|
||||
|
||||
<decltask name="Processor">
|
||||
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
||||
<requirements>
|
||||
<id>ProcessorWorker</id>
|
||||
<name>ProcessorWorker</name>
|
||||
</requirements>
|
||||
<properties>
|
||||
<name access="read">data1</name>
|
||||
@@ -31,8 +31,8 @@
|
||||
</decltask>
|
||||
|
||||
<decltask name="Sink">
|
||||
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
|
||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
|
||||
<requirements>
|
||||
<name>SinkWorker</name>
|
||||
</requirements>
|
||||
|
@@ -8,8 +8,8 @@
|
||||
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
||||
|
||||
<decltask name="Sampler">
|
||||
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
|
||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
|
||||
<requirements>
|
||||
<name>SamplerWorker</name>
|
||||
</requirements>
|
||||
@@ -19,10 +19,10 @@
|
||||
</decltask>
|
||||
|
||||
<decltask name="Processor">
|
||||
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
|
||||
<requirements>
|
||||
<id>ProcessorWorker</id>
|
||||
<name>ProcessorWorker</name>
|
||||
</requirements>
|
||||
<properties>
|
||||
<name access="read">data1</name>
|
||||
@@ -31,8 +31,8 @@
|
||||
</decltask>
|
||||
|
||||
<decltask name="Sink">
|
||||
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
|
||||
<env reachable="false">fairmq-ex-dds-env.sh</env>
|
||||
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
|
||||
<requirements>
|
||||
<name>SinkWorker</name>
|
||||
</requirements>
|
||||
|
@@ -46,7 +46,7 @@ echo "TOPOLOGY FILE: ${topologyFile}"
|
||||
|
||||
# TODO Uncomment once DDS 2.6 is released
|
||||
# dds-info --active-topology
|
||||
dds-topology --disable-validation --activate ${topologyFile}
|
||||
dds-topology --activate ${topologyFile}
|
||||
# dds-info --active-topology
|
||||
# dds-info --wait-for-executing-agents ${requiredNofAgents}
|
||||
sleep 1
|
||||
@@ -82,4 +82,7 @@ logDir="${wrkDir}/logs"
|
||||
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
|
||||
echo "AGENT LOG FILES IN: ${logDir}"
|
||||
|
||||
# This string is used by ctest to detect success
|
||||
echo "Example successful :)"
|
||||
|
||||
# Cleanup function is called by EXIT trap
|
||||
|
@@ -63,8 +63,6 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
|
||||
<< " / __/ / /_/ / / / _ / / / / /_/ / " << FAIRMQ_REPO_URL << endl
|
||||
<< " /_/ \\__,_/_/_/ /_/ /_/ \\___\\_\\ " << FAIRMQ_LICENSE << " © " << FAIRMQ_COPYRIGHT << endl;
|
||||
}
|
||||
|
||||
config.PrintOptions();
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -169,6 +167,9 @@ auto DeviceRunner::Run() -> int
|
||||
// Instantiate and run plugins
|
||||
fPluginManager.InstantiatePlugins();
|
||||
|
||||
// Log IDLE configuration
|
||||
fConfig.PrintOptions();
|
||||
|
||||
// Run the device
|
||||
fDevice->RunStateMachine();
|
||||
|
||||
|
@@ -74,7 +74,8 @@ auto PluginServices::ReleaseDeviceControl(const string& controller) -> void
|
||||
if (fDeviceController == controller) {
|
||||
fDeviceController = boost::none;
|
||||
} else {
|
||||
throw DeviceControlError{tools::ToString("Plugin '", controller, "' cannot release control because it has not taken over control.")};
|
||||
LOG(debug) << "Plugin '" << controller << "' cannot release control "
|
||||
<< "because it has no control.";
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -51,21 +51,22 @@ DDS::DDS(const string& name,
|
||||
, fLastState(DeviceState::Idle)
|
||||
, fDeviceTerminationRequested(false)
|
||||
, fHeartbeatInterval(100)
|
||||
, fUpdatesAllowed(false)
|
||||
{
|
||||
try {
|
||||
TakeDeviceControl();
|
||||
fControllerThread = thread(&DDS::HandleControl, this);
|
||||
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
|
||||
} catch (PluginServices::DeviceControlError& e) {
|
||||
LOG(debug) << e.what();
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Error in plugin initialization: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
auto DDS::HandleControl() -> void
|
||||
{
|
||||
try {
|
||||
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
|
||||
|
||||
std::string deviceId(GetProperty<std::string>("id"));
|
||||
if (deviceId.empty()) {
|
||||
SetProperty<std::string>("id", dds::env_prop<dds::task_path>());
|
||||
}
|
||||
std::string sessionId(GetProperty<std::string>("session"));
|
||||
if (sessionId == "default") {
|
||||
SetProperty<std::string>("session", dds::env_prop<dds::dds_session_id>());
|
||||
}
|
||||
|
||||
auto control = GetProperty<string>("control");
|
||||
bool staticMode(false);
|
||||
if (control == "static") {
|
||||
@@ -85,22 +86,28 @@ auto DDS::HandleControl() -> void
|
||||
// subscribe to device state changes, pushing new state changes into the event queue
|
||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||
fStateQueue.Push(newState);
|
||||
switch(newState) {
|
||||
case DeviceState::Bound:
|
||||
// Receive addresses of connecting channels from DDS
|
||||
// and propagate addresses of bound channels to DDS.
|
||||
FillChannelContainers();
|
||||
switch (newState) {
|
||||
case DeviceState::Bound:
|
||||
// Receive addresses of connecting channels from DDS
|
||||
// and propagate addresses of bound channels to DDS.
|
||||
FillChannelContainers();
|
||||
|
||||
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
|
||||
PublishBoundChannels();
|
||||
break;
|
||||
case DeviceState::Exiting:
|
||||
fDeviceTerminationRequested = true;
|
||||
UnsubscribeFromDeviceStateChange();
|
||||
ReleaseDeviceControl();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
// publish bound addresses via DDS at keys corresponding to the channel
|
||||
// prefixes, e.g. 'data' in data[i]
|
||||
PublishBoundChannels();
|
||||
break;
|
||||
case DeviceState::ResettingDevice: {
|
||||
std::lock_guard<std::mutex> lk(fUpdateMutex);
|
||||
fUpdatesAllowed = false;
|
||||
break;
|
||||
}
|
||||
case DeviceState::Exiting:
|
||||
fDeviceTerminationRequested = true;
|
||||
UnsubscribeFromDeviceStateChange();
|
||||
ReleaseDeviceControl();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
@@ -114,15 +121,26 @@ auto DDS::HandleControl() -> void
|
||||
});
|
||||
|
||||
if (staticMode) {
|
||||
TransitionDeviceStateTo(DeviceState::Running);
|
||||
|
||||
// wait until stop signal
|
||||
unique_lock<mutex> lock(fStopMutex);
|
||||
while (!fDeviceTerminationRequested) {
|
||||
fStopCondition.wait_for(lock, chrono::seconds(1));
|
||||
}
|
||||
LOG(debug) << "Stopping DDS control plugin";
|
||||
fControllerThread = thread(&DDS::StaticControl, this);
|
||||
}
|
||||
} catch (PluginServices::DeviceControlError& e) {
|
||||
LOG(debug) << e.what();
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Error in plugin initialization: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
auto DDS::StaticControl() -> void
|
||||
{
|
||||
try {
|
||||
TransitionDeviceStateTo(DeviceState::Running);
|
||||
|
||||
// wait until stop signal
|
||||
unique_lock<mutex> lock(fStopMutex);
|
||||
while (!fDeviceTerminationRequested) {
|
||||
fStopCondition.wait_for(lock, chrono::seconds(1));
|
||||
}
|
||||
LOG(debug) << "Stopping DDS plugin static controller";
|
||||
} catch (DeviceErrorState&) {
|
||||
ReleaseDeviceControl();
|
||||
} catch (exception& e) {
|
||||
@@ -197,6 +215,11 @@ auto DDS::FillChannelContainers() -> void
|
||||
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
|
||||
fIofN.insert(make_pair(chanName, IofN(i, n)));
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fUpdateMutex);
|
||||
fUpdatesAllowed = true;
|
||||
}
|
||||
fUpdateCondition.notify_one();
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << "Error filling channel containers: " << e.what();
|
||||
}
|
||||
@@ -209,6 +232,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
|
||||
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
|
||||
try {
|
||||
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
|
||||
|
||||
std::unique_lock<std::mutex> lk(fUpdateMutex);
|
||||
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
|
||||
|
||||
string val = value;
|
||||
// check if it is to handle as one out of multiple values
|
||||
auto it = fIofN.find(propertyId);
|
||||
|
@@ -63,6 +63,9 @@ struct DDSSubscription
|
||||
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
|
||||
});
|
||||
|
||||
// fDDSCustomCmd.subscribe([](const std::string& cmd, const std::string& cond, uint64_t senderId) {
|
||||
// LOG(debug) << "cmd: " << cmd << ", cond: " << cond << ", senderId: " << senderId;
|
||||
// });
|
||||
assert(!dds_session_id.empty());
|
||||
}
|
||||
|
||||
@@ -125,7 +128,7 @@ class DDS : public Plugin
|
||||
~DDS();
|
||||
|
||||
private:
|
||||
auto HandleControl() -> void;
|
||||
auto StaticControl() -> void;
|
||||
|
||||
auto FillChannelContainers() -> void;
|
||||
auto SubscribeForConnectingChannels() -> void;
|
||||
@@ -160,6 +163,10 @@ class DDS : public Plugin
|
||||
|
||||
std::thread fHeartbeatThread;
|
||||
std::chrono::milliseconds fHeartbeatInterval;
|
||||
|
||||
bool fUpdatesAllowed;
|
||||
std::mutex fUpdateMutex;
|
||||
std::condition_variable fUpdateCondition;
|
||||
};
|
||||
|
||||
Plugin::ProgOptions DDSProgramOptions()
|
||||
|
@@ -273,7 +273,7 @@ int main(int argc, char* argv[])
|
||||
cerr << "state-changes-unsubscription failed with return code: " << parts[2];
|
||||
}
|
||||
} else {
|
||||
// cout << "Received: " << msg << endl;
|
||||
cout << "Received: " << msg << endl;
|
||||
}
|
||||
});
|
||||
|
||||
|
@@ -20,7 +20,9 @@
|
||||
|
||||
#include <cassert>
|
||||
#include <cstdlib>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
namespace fair {
|
||||
@@ -134,6 +136,8 @@ struct DDSSession::Impl
|
||||
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
||||
Id fId;
|
||||
bool fStopOnDestruction;
|
||||
mutable std::mutex fMtx;
|
||||
std::unordered_map<DDSChannel::Id, DDSTask::Id> fTaskIdByChannelIdMap;
|
||||
};
|
||||
|
||||
DDSSession::DDSSession(DDSEnvironment env)
|
||||
@@ -316,6 +320,18 @@ void DDSSession::UnsubscribeFromCommands()
|
||||
|
||||
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
|
||||
|
||||
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||
fImpl->fTaskIdByChannelIdMap[channelId] = taskId;
|
||||
}
|
||||
|
||||
auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id
|
||||
{
|
||||
std::lock_guard<std::mutex> lk(fImpl->fMtx);
|
||||
return fImpl->fTaskIdByChannelIdMap.at(channelId);
|
||||
}
|
||||
|
||||
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
|
||||
{
|
||||
return os << "$DDS_SESSION_ID: " << session.GetId();
|
||||
|
@@ -43,6 +43,18 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&;
|
||||
class DDSTopology;
|
||||
class DDSAgent;
|
||||
|
||||
class DDSTask
|
||||
{
|
||||
public:
|
||||
using Id = std::uint64_t;
|
||||
};
|
||||
|
||||
class DDSChannel
|
||||
{
|
||||
public:
|
||||
using Id = std::uint64_t;
|
||||
};
|
||||
|
||||
/**
|
||||
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
|
||||
* @brief Represents a DDS session
|
||||
@@ -95,6 +107,8 @@ class DDSSession
|
||||
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
|
||||
void UnsubscribeFromCommands();
|
||||
void SendCommand(const std::string&);
|
||||
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
|
||||
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
|
||||
|
||||
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;
|
||||
|
||||
|
@@ -75,7 +75,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||
// LOG(debug) << "Adding device " << d;
|
||||
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
|
||||
}
|
||||
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
|
||||
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
|
||||
// LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||
std::vector<std::string> parts;
|
||||
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
|
||||
@@ -85,7 +85,9 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||
}
|
||||
|
||||
if (parts[0] == "state-change") {
|
||||
AddNewStateEntry(std::stoull(parts[2]), parts[3]);
|
||||
DDSTask::Id taskId(std::stoull(parts[2]));
|
||||
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
|
||||
AddNewStateEntry(taskId, parts[3]);
|
||||
} else if (parts[0] == "state-changes-subscription") {
|
||||
LOG(debug) << "Received from " << senderId << ": " << msg;
|
||||
if (parts[2] != "OK") {
|
||||
@@ -96,7 +98,16 @@ Topology::Topology(DDSTopology topo, DDSSession session)
|
||||
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
|
||||
}
|
||||
} else if (parts[1] == "could not queue") {
|
||||
LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId;
|
||||
std::unique_lock<std::mutex> lock(fMtx);
|
||||
|
||||
if (fStateChangeOngoing) {
|
||||
if (fState.at(fDDSSession.GetTaskId(senderId)).state != fTargetState) {
|
||||
fStateChangeError =
|
||||
tools::ToString("Could not queue ", parts[2], " transition on ", senderId);
|
||||
lock.unlock();
|
||||
fCV.notify_one();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
fDDSSession.StartDDSService();
|
||||
@@ -122,13 +133,13 @@ auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb
|
||||
std::unique_lock<std::mutex> lock(fMtx);
|
||||
if (fStateChangeOngoing) {
|
||||
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
|
||||
lock.unlock();
|
||||
}
|
||||
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
|
||||
fStateChangeOngoing = true;
|
||||
fChangeStateCallback = cb;
|
||||
fStateChangeTimeout = timeout;
|
||||
fTargetState = expectedState.at(transition);
|
||||
fStateChangeError.clear();
|
||||
|
||||
fDDSSession.SendCommand(GetTransitionName(transition));
|
||||
}
|
||||
@@ -155,12 +166,15 @@ void Topology::WaitForState()
|
||||
while (!fShutdown) {
|
||||
if (fStateChangeOngoing) {
|
||||
try {
|
||||
std::unique_lock<std::mutex> lock(fMtx);
|
||||
|
||||
auto condition = [&] {
|
||||
// LOG(info) << "checking condition";
|
||||
// LOG(info) << "fShutdown: " << fShutdown;
|
||||
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
|
||||
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
|
||||
return fShutdown
|
||||
|| !fStateChangeError.empty()
|
||||
|| std::all_of(
|
||||
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
|
||||
// TODO Check, if we can make sure that EXITING state change event are not missed
|
||||
@@ -170,8 +184,6 @@ void Topology::WaitForState()
|
||||
});
|
||||
};
|
||||
|
||||
std::unique_lock<std::mutex> lock(fMtx);
|
||||
|
||||
if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
|
||||
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
|
||||
// LOG(debug) << "timeout";
|
||||
@@ -187,6 +199,15 @@ void Topology::WaitForState()
|
||||
}
|
||||
|
||||
fStateChangeOngoing = false;
|
||||
|
||||
if (!fStateChangeError.empty()) {
|
||||
TopologyState state = fState;
|
||||
lock.unlock();
|
||||
fChangeStateCallback(
|
||||
{{AsyncOpResultCode::Error, fStateChangeError}, std::move(state)});
|
||||
break;
|
||||
}
|
||||
|
||||
if (fShutdown) {
|
||||
LOG(debug) << "Aborting because a shutdown was requested";
|
||||
TopologyState state = fState;
|
||||
|
@@ -56,7 +56,7 @@ struct DeviceStatus
|
||||
DeviceState state;
|
||||
};
|
||||
|
||||
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>;
|
||||
using TopologyState = std::unordered_map<DDSTask::Id, DeviceStatus>;
|
||||
using TopologyTransition = fair::mq::Transition;
|
||||
|
||||
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
|
||||
@@ -150,6 +150,7 @@ class Topology
|
||||
ChangeStateCallback fChangeStateCallback;
|
||||
std::chrono::milliseconds fStateChangeTimeout;
|
||||
bool fShutdown;
|
||||
std::string fStateChangeError;
|
||||
|
||||
void WaitForState();
|
||||
void AddNewStateEntry(uint64_t senderId, const std::string& state);
|
||||
|
@@ -26,10 +26,7 @@ TEST_F(PluginServices, OnlySingleController)
|
||||
mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice),
|
||||
fair::mq::PluginServices::DeviceControlError
|
||||
);
|
||||
ASSERT_THROW( // no control for bar
|
||||
mServices.ReleaseDeviceControl("bar"),
|
||||
fair::mq::PluginServices::DeviceControlError
|
||||
);
|
||||
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("bar"));
|
||||
|
||||
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
|
||||
ASSERT_FALSE(mServices.GetDeviceController());
|
||||
|
@@ -105,7 +105,7 @@ TEST_F(Topology, ChangeStateTimeout)
|
||||
blocker.Wait();
|
||||
}
|
||||
|
||||
TEST_F(Topology, ChangeStateFullDeviceLifetime)
|
||||
TEST_F(Topology, ChangeStateFullDeviceLifecycle)
|
||||
{
|
||||
using fair::mq::sdk::Topology;
|
||||
using fair::mq::sdk::TopologyTransition;
|
||||
@@ -125,4 +125,27 @@ TEST_F(Topology, ChangeStateFullDeviceLifetime)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(Topology, ChangeStateFullDeviceLifecycle2)
|
||||
{
|
||||
using fair::mq::sdk::Topology;
|
||||
using fair::mq::sdk::TopologyTransition;
|
||||
|
||||
Topology topo(mDDSTopo, mDDSSession);
|
||||
for (int i(0); i < 10; ++i) {
|
||||
for (auto transition : {TopologyTransition::InitDevice,
|
||||
TopologyTransition::CompleteInit,
|
||||
TopologyTransition::Bind,
|
||||
TopologyTransition::Connect,
|
||||
TopologyTransition::InitTask,
|
||||
TopologyTransition::Run}) {
|
||||
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
for (auto transition : {TopologyTransition::Stop,
|
||||
TopologyTransition::ResetTask,
|
||||
TopologyTransition::ResetDevice}) {
|
||||
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
@@ -6,7 +6,7 @@
|
||||
<declrequirement name="SinkWorker" type="wnname" value="sink"/>
|
||||
|
||||
<decltask name="Sampler">
|
||||
<exe reachable="true">fairmq-bsampler --id sampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
|
||||
<exe reachable="true">fairmq-bsampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
|
||||
<requirements>
|
||||
<name>SamplerWorker</name>
|
||||
</requirements>
|
||||
@@ -16,7 +16,7 @@
|
||||
</decltask>
|
||||
|
||||
<decltask name="Sink">
|
||||
<exe reachable="true">fairmq-sink --id sink_%taskIndex% --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
|
||||
<exe reachable="true">fairmq-sink --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
|
||||
<requirements>
|
||||
<name>SinkWorker</name>
|
||||
</requirements>
|
||||
|
Reference in New Issue
Block a user