mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
11 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
f46d446d52 | ||
|
db0937f339 | ||
|
bb1ce794b6 | ||
|
9e2373b55d | ||
|
c51e88e114 | ||
|
f9219dab65 | ||
|
0806720f61 | ||
|
e39d17d09e | ||
|
a14502242f | ||
|
d3697ec97b | ||
|
73377c5100 |
@@ -82,7 +82,7 @@ endif()
|
||||
|
||||
if(BUILD_DDS_PLUGIN OR BUILD_SDK)
|
||||
find_package2(PRIVATE DDS REQUIRED
|
||||
VERSION 3.0
|
||||
VERSION 3.5.3
|
||||
)
|
||||
set(DDS_Boost_COMPONENTS system log log_setup regex filesystem thread)
|
||||
set(DDS_Boost_VERSION 1.67)
|
||||
@@ -136,7 +136,7 @@ endif()
|
||||
|
||||
if(BUILD_SDK)
|
||||
find_package2(BUNDLED asio
|
||||
VERSION 1.13.0
|
||||
VERSION 1.18.0
|
||||
)
|
||||
if(NOT asio_FOUND)
|
||||
build_bundled(asio extern/asio)
|
||||
|
@@ -30,9 +30,6 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
|
||||
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
|
||||
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
|
||||
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
|
||||
Set(configure_options "${configure_options};-DBoost_NO_BOOST_CMAKE=ON")
|
||||
|
||||
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
|
||||
If(EXTRA_FLAGS)
|
||||
@@ -60,8 +57,13 @@ Ctest_Configure(BUILD "${CTEST_BINARY_DIRECTORY}"
|
||||
|
||||
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
|
||||
|
||||
unset(exclude_tests)
|
||||
if($ENV{EXCLUDE_UNSTABLE_DDS_TESTS})
|
||||
set(exclude_tests EXCLUDE ".*\\.localhost$")
|
||||
endif()
|
||||
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
|
||||
# PARALLEL_LEVEL $ENV{number_of_processors}
|
||||
${exclude_tests}
|
||||
PARALLEL_LEVEL $ENV{number_of_processors}
|
||||
RETURN_VALUE _ctest_test_ret_val
|
||||
)
|
||||
|
10
Jenkinsfile
vendored
10
Jenkinsfile
vendored
@@ -46,6 +46,11 @@ def jobMatrix(String prefix, List specs, Closure callback) {
|
||||
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
|
||||
echo "echo \\\$PATH" >> Dart.cfg
|
||||
'''
|
||||
|
||||
if (os =~ /macOS10.14/) {
|
||||
sh "echo \"export EXCLUDE_UNSTABLE_DDS_TESTS=1\" >> Dart.cfg"
|
||||
}
|
||||
|
||||
sh 'cat Dart.cfg'
|
||||
|
||||
callback.call(spec, label)
|
||||
@@ -74,8 +79,9 @@ pipeline{
|
||||
steps{
|
||||
script {
|
||||
def build_jobs = jobMatrix('build', [
|
||||
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
|
||||
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleLLVM11.0.3', fairsoft: 'fairmq_dev'],
|
||||
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
|
||||
[os: 'macOS10.14', arch: 'x86_64', compiler: 'AppleClang11.0', fairsoft: 'fairmq_dev'],
|
||||
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleClang12.0', fairsoft: 'fairmq_dev'],
|
||||
]) { spec, label ->
|
||||
sh './Dart.sh alfa_ci Dart.cfg'
|
||||
}
|
||||
|
2
extern/asio
vendored
2
extern/asio
vendored
Submodule extern/asio updated: 90f32660cd...be7badc31a
@@ -257,7 +257,7 @@ class FairMQChannel
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
|
||||
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
|
||||
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
|
||||
{
|
||||
CheckSendCompatibility(msg);
|
||||
return fSocket->Send(msg, sndTimeoutInMs);
|
||||
@@ -267,7 +267,7 @@ class FairMQChannel
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
|
||||
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
|
||||
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
|
||||
{
|
||||
CheckReceiveCompatibility(msg);
|
||||
return fSocket->Receive(msg, rcvTimeoutInMs);
|
||||
|
@@ -129,7 +129,7 @@ class FairMQDevice
|
||||
/// @param i channel index
|
||||
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
|
||||
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
|
||||
int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
|
||||
{
|
||||
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
|
||||
}
|
||||
@@ -140,7 +140,7 @@ class FairMQDevice
|
||||
/// @param i channel index
|
||||
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
|
||||
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
|
||||
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
|
||||
int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
|
||||
{
|
||||
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
|
||||
}
|
||||
|
@@ -45,8 +45,8 @@ class FairMQSocket
|
||||
virtual bool Bind(const std::string& address) = 0;
|
||||
virtual bool Connect(const std::string& address) = 0;
|
||||
|
||||
virtual int Send(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
||||
virtual int Receive(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
||||
virtual int64_t Send(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
||||
virtual int64_t Receive(FairMQMessagePtr& msg, int timeout = -1) = 0;
|
||||
virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;
|
||||
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;
|
||||
|
||||
|
@@ -254,7 +254,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
|
||||
}
|
||||
}
|
||||
|
||||
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
|
||||
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int64_t
|
||||
{
|
||||
// timeout argument not yet implemented
|
||||
|
||||
@@ -412,7 +412,7 @@ auto Socket::SendQueueReaderStatic() -> void
|
||||
});
|
||||
}
|
||||
|
||||
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
|
||||
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int64_t
|
||||
try {
|
||||
// timeout argument not yet implemented
|
||||
|
||||
|
@@ -49,8 +49,8 @@ class Socket final : public fair::mq::Socket
|
||||
auto Bind(const std::string& address) -> bool override;
|
||||
auto Connect(const std::string& address) -> bool override;
|
||||
|
||||
auto Send(MessagePtr& msg, int timeout = 0) -> int override;
|
||||
auto Receive(MessagePtr& msg, int timeout = 0) -> int override;
|
||||
auto Send(MessagePtr& msg, int timeout = 0) -> int64_t override;
|
||||
auto Receive(MessagePtr& msg, int timeout = 0) -> int64_t override;
|
||||
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
||||
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
||||
|
||||
|
@@ -357,7 +357,7 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui
|
||||
} break;
|
||||
case Type::dump_config: {
|
||||
stringstream ss;
|
||||
for (const auto pKey : GetPropertyKeys()) {
|
||||
for (const auto& pKey : GetPropertyKeys()) {
|
||||
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
|
||||
}
|
||||
Cmds outCmds(make<Config>(id, ss.str()));
|
||||
|
@@ -12,6 +12,7 @@
|
||||
#include <asio/associated_allocator.hpp>
|
||||
#include <asio/associated_executor.hpp>
|
||||
#include <asio/executor_work_guard.hpp>
|
||||
#include <asio/dispatch.hpp>
|
||||
#include <asio/system_executor.hpp>
|
||||
#include <chrono>
|
||||
#include <exception>
|
||||
@@ -69,17 +70,16 @@ struct AsioAsyncOpImpl : AsioAsyncOpImplBase<SignatureArgTypes...>
|
||||
throw RuntimeError("Async operation already completed");
|
||||
}
|
||||
|
||||
GetEx2().dispatch(
|
||||
[=, handler = std::move(fHandler)]() mutable {
|
||||
try {
|
||||
handler(ec, args...);
|
||||
} catch (const std::exception& e) {
|
||||
FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
|
||||
} catch (...) {
|
||||
FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
|
||||
}
|
||||
},
|
||||
GetAlloc2());
|
||||
asio::dispatch(GetEx2(),
|
||||
[=, handler = std::move(fHandler)]() mutable {
|
||||
try {
|
||||
handler(ec, args...);
|
||||
} catch (const std::exception& e) {
|
||||
FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
|
||||
} catch (...) {
|
||||
FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
|
||||
}
|
||||
});
|
||||
|
||||
fWork1.reset();
|
||||
fWork2.reset();
|
||||
|
@@ -9,7 +9,7 @@
|
||||
#ifndef FAIR_MQ_SDK_ASIOBASE_H
|
||||
#define FAIR_MQ_SDK_ASIOBASE_H
|
||||
|
||||
#include <asio/executor.hpp>
|
||||
#include <asio/any_io_executor.hpp>
|
||||
#include <fairmq/sdk/Traits.h>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
@@ -18,7 +18,7 @@ namespace fair {
|
||||
namespace mq {
|
||||
namespace sdk {
|
||||
|
||||
using DefaultExecutor = asio::executor;
|
||||
using DefaultExecutor = asio::any_io_executor;
|
||||
using DefaultAllocator = std::allocator<int>;
|
||||
|
||||
/**
|
||||
|
@@ -18,11 +18,13 @@ namespace sdk {
|
||||
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
|
||||
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
|
||||
/// @param env Optional DDSEnv (needed primarily for unit testing)
|
||||
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
|
||||
std::shared_ptr<dds::tools_api::CSession> nativeSession,
|
||||
DDSEnv env) -> Topology
|
||||
DDSEnv env,
|
||||
bool blockUntilConnected) -> Topology
|
||||
{
|
||||
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)};
|
||||
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env), blockUntilConnected};
|
||||
}
|
||||
|
||||
} // namespace sdk
|
||||
|
@@ -216,18 +216,21 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
|
||||
/// @param topo DDSTopology
|
||||
/// @param session DDSSession
|
||||
BasicTopology(DDSTopology topo, DDSSession session)
|
||||
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session))
|
||||
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||
BasicTopology(DDSTopology topo, DDSSession session, bool blockUntilConnected = false)
|
||||
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session), blockUntilConnected)
|
||||
{}
|
||||
|
||||
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
|
||||
/// @param ex I/O executor to be associated
|
||||
/// @param topo DDSTopology
|
||||
/// @param session DDSSession
|
||||
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||
/// @throws RuntimeError
|
||||
BasicTopology(const Executor& ex,
|
||||
DDSTopology topo,
|
||||
DDSSession session,
|
||||
bool blockUntilConnected = false,
|
||||
Allocator alloc = DefaultAllocator())
|
||||
: AsioBase<Executor, Allocator>(ex, std::move(alloc))
|
||||
, fDDSSession(std::move(session))
|
||||
@@ -235,7 +238,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
, fStateData()
|
||||
, fStateIndex()
|
||||
, fMtx(std::make_unique<std::mutex>())
|
||||
, fStateChangeUnsubscriptionCV(std::make_unique<std::condition_variable>())
|
||||
, fStateChangeSubscriptionsCV(std::make_unique<std::condition_variable>())
|
||||
, fNumStateChangePublishers(0)
|
||||
, fHeartbeatsTimer(asio::system_executor())
|
||||
, fHeartbeatInterval(600000)
|
||||
{
|
||||
@@ -251,6 +255,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
|
||||
fDDSSession.StartDDSService();
|
||||
SubscribeToStateChanges();
|
||||
if (blockUntilConnected) {
|
||||
WaitForPublisherCount(fStateIndex.size());
|
||||
}
|
||||
}
|
||||
|
||||
/// not copyable
|
||||
@@ -284,6 +291,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
|
||||
}
|
||||
|
||||
void WaitForPublisherCount(unsigned int number)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
fStateChangeSubscriptionsCV->wait(lk, [&](){
|
||||
return fNumStateChangePublishers == number;
|
||||
});
|
||||
}
|
||||
|
||||
void SendSubscriptionHeartbeats(const std::error_code& ec)
|
||||
{
|
||||
if (!ec) {
|
||||
@@ -308,13 +323,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
|
||||
|
||||
// wait for all tasks to confirm unsubscription
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
fStateChangeUnsubscriptionCV->wait(lk, [&](){
|
||||
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
|
||||
return fStateData.at(s.second).subscribed_to_state_changes == false;
|
||||
});
|
||||
return count == fStateIndex.size();
|
||||
});
|
||||
WaitForPublisherCount(0);
|
||||
}
|
||||
|
||||
void SubscribeToCommands()
|
||||
@@ -360,11 +369,19 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
DDSTask::Id taskId(cmd.GetTaskId());
|
||||
|
||||
try {
|
||||
std::lock_guard<std::mutex> lk(*fMtx);
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.subscribed_to_state_changes = true;
|
||||
if (!task.subscribed_to_state_changes) {
|
||||
task.subscribed_to_state_changes = true;
|
||||
++fNumStateChangePublishers;
|
||||
} else {
|
||||
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent subscription confirmation more than once";
|
||||
}
|
||||
lk.unlock();
|
||||
fStateChangeSubscriptionsCV->notify_one();
|
||||
} catch (const std::exception& e) {
|
||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
|
||||
FAIR_LOG(error) << "Possibly no task with id '" << taskId << "'?";
|
||||
}
|
||||
} else {
|
||||
FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
|
||||
@@ -379,9 +396,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
try {
|
||||
std::unique_lock<std::mutex> lk(*fMtx);
|
||||
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
|
||||
task.subscribed_to_state_changes = false;
|
||||
if (task.subscribed_to_state_changes) {
|
||||
task.subscribed_to_state_changes = false;
|
||||
--fNumStateChangePublishers;
|
||||
} else {
|
||||
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent unsubscription confirmation more than once";
|
||||
}
|
||||
lk.unlock();
|
||||
fStateChangeUnsubscriptionCV->notify_one();
|
||||
fStateChangeSubscriptionsCV->notify_one();
|
||||
} catch (const std::exception& e) {
|
||||
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
|
||||
}
|
||||
@@ -406,6 +428,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
// if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
|
||||
if (task.state == DeviceState::Exiting) {
|
||||
task.subscribed_to_state_changes = false;
|
||||
--fNumStateChangePublishers;
|
||||
}
|
||||
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
|
||||
|
||||
@@ -1300,7 +1323,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
|
||||
|
||||
mutable std::unique_ptr<std::mutex> fMtx;
|
||||
|
||||
std::unique_ptr<std::condition_variable> fStateChangeUnsubscriptionCV;
|
||||
std::unique_ptr<std::condition_variable> fStateChangeSubscriptionsCV;
|
||||
unsigned int fNumStateChangePublishers;
|
||||
asio::steady_timer fHeartbeatsTimer;
|
||||
Duration fHeartbeatInterval;
|
||||
|
||||
@@ -1336,9 +1360,11 @@ using Topo = Topology;
|
||||
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
|
||||
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
|
||||
/// @param env Optional DDSEnv (needed primarily for unit testing)
|
||||
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
|
||||
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
|
||||
std::shared_ptr<dds::tools_api::CSession> nativeSession,
|
||||
DDSEnv env = {}) -> Topology;
|
||||
DDSEnv env = {},
|
||||
bool blockUntilConnected = false) -> Topology;
|
||||
|
||||
} // namespace sdk
|
||||
} // namespace mq
|
||||
|
@@ -9,6 +9,7 @@
|
||||
#include <fairmq/sdk/commands/Commands.h>
|
||||
#include <fairmq/States.h>
|
||||
#include <fairmq/SDK.h>
|
||||
#include <fairmq/Tools.h>
|
||||
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
@@ -63,14 +64,25 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
||||
if (command == "c") {
|
||||
cout << "> checking state of the devices" << endl;
|
||||
auto const result = topo.GetCurrentState();
|
||||
bool error = false;
|
||||
for (const auto& d : result) {
|
||||
cout << d.taskId << " : " << d.state << endl;
|
||||
if (d.state == sdk::DeviceState::Error) {
|
||||
error = true;
|
||||
}
|
||||
}
|
||||
if (error) {
|
||||
throw runtime_error("Some of the devices are in the Error state");
|
||||
}
|
||||
return;
|
||||
} else if (command == "o") {
|
||||
cout << "> dumping config of " << (path == "" ? "all" : path) << endl;
|
||||
// TODO: extend this regex to return all properties, once command size limitation is removed.
|
||||
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
|
||||
if (result.first != std::error_code()) {
|
||||
cout << "ERROR: GetProperties failed for '" << path << "': " << result.first.message() << endl;
|
||||
throw runtime_error(tools::ToString("GetProperties failed for '", path, "': ", result.first.message()));
|
||||
}
|
||||
for (const auto& d : result.second.devices) {
|
||||
for (auto const& p : d.second.props) {
|
||||
cout << d.first << ": " << p.first << " : " << p.second << endl;
|
||||
@@ -80,11 +92,15 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
||||
} else if (command == "p") {
|
||||
if (pKey == "" || pVal == "") {
|
||||
cout << "cannot send property with empty key and/or value! given key: '" << pKey << "', value: '" << pVal << "'." << endl;
|
||||
return;
|
||||
throw runtime_error(tools::ToString("cannot send property with empty key and/or value! given key: '", pKey, "', value: '", pVal, "'."));
|
||||
}
|
||||
const DeviceProperties props{{pKey, pVal}};
|
||||
cout << "> setting properties --> " << (path == "" ? "all" : path) << endl;
|
||||
topo.SetProperties(props, path);
|
||||
auto const result = topo.SetProperties(props, path);
|
||||
if (result.first != std::error_code()) {
|
||||
cout << "ERROR: SetProperties failed for '" << path << "': " << result.first.message() << endl;
|
||||
throw runtime_error(tools::ToString("SetProperties failed for '", path, "': ", result.first.message()));
|
||||
}
|
||||
// give dds time to complete request
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
return;
|
||||
@@ -125,10 +141,11 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
|
||||
} else {
|
||||
cout << "\033[01;32mInvalid input: [" << command << "]\033[0m" << endl;
|
||||
printControlsHelp();
|
||||
return;
|
||||
throw runtime_error(tools::ToString("\033[01;32mInvalid input: [", command, "]\033[0m"));
|
||||
}
|
||||
if (changeStateResult.first != std::error_code()) {
|
||||
cout << "ERROR: ChangeState failed for '" << path << "': " << changeStateResult.first.message() << endl;
|
||||
throw runtime_error(tools::ToString("ERROR: ChangeState failed for '", path, "': ", changeStateResult.first.message()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +165,11 @@ void sendCommand(const string& commandIn, const string& path, unsigned int timeo
|
||||
command = c;
|
||||
|
||||
while (true) {
|
||||
handleCommand(command, path, timeout, topo, pKey, pVal);
|
||||
try {
|
||||
handleCommand(command, path, timeout, topo, pKey, pVal);
|
||||
} catch(exception& e) {
|
||||
cout << "Error: " << e.what() << endl;
|
||||
}
|
||||
cin >> c;
|
||||
command = c;
|
||||
}
|
||||
@@ -209,7 +230,7 @@ try {
|
||||
DDSSession session(sessionID, env);
|
||||
DDSTopology ddsTopo(DDSTopology::Path(topoFile), env);
|
||||
|
||||
Topology topo(ddsTopo, session);
|
||||
Topology topo(ddsTopo, session, true);
|
||||
|
||||
if (targetState != "") {
|
||||
if (command != "") {
|
||||
|
@@ -143,23 +143,19 @@ class Manager
|
||||
} catch(interprocess_exception& bie) {
|
||||
LOG(error) << "something went wrong: " << bie.what();
|
||||
}
|
||||
}
|
||||
|
||||
if (mlockSegment) {
|
||||
LOG(debug) << "Locking the managed segment memory pages...";
|
||||
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
|
||||
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
|
||||
if (mlockSegment) {
|
||||
LOG(debug) << "Locking the managed segment memory pages...";
|
||||
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
|
||||
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
||||
}
|
||||
if (zeroSegment) {
|
||||
LOG(debug) << "Zeroing the managed segment free memory...";
|
||||
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
|
||||
LOG(debug) << "Successfully zeroed the managed segment free memory.";
|
||||
}
|
||||
LOG(debug) << "Successfully locked the managed segment memory pages.";
|
||||
}
|
||||
if (zeroSegment) {
|
||||
LOG(debug) << "Zeroing the managed segment free memory...";
|
||||
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
|
||||
LOG(debug) << "Successfully zeroed the managed segment free memory.";
|
||||
}
|
||||
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
|
||||
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||
|
||||
@@ -367,13 +363,20 @@ class Manager
|
||||
}
|
||||
|
||||
for (const auto& e : *fShmSegments) {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = true;
|
||||
info.id = e.first;
|
||||
info.event = RegionEvent::created;
|
||||
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
|
||||
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
|
||||
result.push_back(info);
|
||||
// make sure any segments in the session are found
|
||||
GetSegment(e.first);
|
||||
try {
|
||||
fair::mq::RegionInfo info;
|
||||
info.managed = true;
|
||||
info.id = e.first;
|
||||
info.event = RegionEvent::created;
|
||||
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
|
||||
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
|
||||
result.push_back(info);
|
||||
} catch (const std::out_of_range& oor) {
|
||||
LOG(error) << "could not find segment with id " << e.first;
|
||||
LOG(error) << oor.what();
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
@@ -474,7 +477,7 @@ class Manager
|
||||
try {
|
||||
// get region info
|
||||
SegmentInfo segmentInfo = fShmSegments->at(id);
|
||||
LOG(info) << "LOCATED SEGMENT WITH ID '" << id << "'";
|
||||
LOG(debug) << "Located segment with id '" << id << "'";
|
||||
|
||||
using namespace boost::interprocess;
|
||||
|
||||
|
@@ -155,7 +155,7 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
}
|
||||
|
||||
int Send(MessagePtr& msg, const int timeout = -1) override
|
||||
int64_t Send(MessagePtr& msg, const int timeout = -1) override
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
@@ -191,7 +191,7 @@ class Socket final : public fair::mq::Socket
|
||||
return static_cast<int>(TransferResult::error);
|
||||
}
|
||||
|
||||
int Receive(MessagePtr& msg, const int timeout = -1) override
|
||||
int64_t Receive(MessagePtr& msg, const int timeout = -1) override
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
|
@@ -132,7 +132,7 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
}
|
||||
|
||||
int Send(MessagePtr& msg, const int timeout = -1) override
|
||||
int64_t Send(MessagePtr& msg, const int timeout = -1) override
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
@@ -162,7 +162,7 @@ class Socket final : public fair::mq::Socket
|
||||
}
|
||||
}
|
||||
|
||||
int Receive(MessagePtr& msg, const int timeout = -1) override
|
||||
int64_t Receive(MessagePtr& msg, const int timeout = -1) override
|
||||
{
|
||||
int flags = 0;
|
||||
if (timeout == 0) {
|
||||
|
Reference in New Issue
Block a user