Compare commits

..

11 Commits

Author SHA1 Message Date
Alexey Rybalchenko
f46d446d52 shm: do mlock/zeroing under shmem lock 2020-12-04 13:27:45 +01:00
Alexey Rybalchenko
db0937f339 shm: ensure local segments are complete for region subscriptions 2020-11-25 13:41:23 +01:00
Alexey Rybalchenko
bb1ce794b6 Let single message methods also return int64_t 2020-11-17 14:04:20 +01:00
Dennis Klein
9e2373b55d extern/asio: Bump to 1.18.0 2020-11-13 15:55:52 +01:00
Dennis Klein
c51e88e114 CI: Do not run unstable tests on macOS 10.14 hosts 2020-11-13 02:58:09 +01:00
Dennis Klein
f9219dab65 CI: Remove old options 2020-11-13 02:58:09 +01:00
Alexey Rybalchenko
0806720f61 Bump DDS version to 3.5.3 (multiple sessions support) 2020-11-13 02:58:09 +01:00
Alexey Rybalchenko
e39d17d09e Apply suggestions from -Wrange-loop-analysis 2020-11-13 02:58:09 +01:00
Alexey Rybalchenko
a14502242f DDS command UI: return EXIT_FAILURE on errors 2020-11-13 02:58:09 +01:00
Alexey Rybalchenko
d3697ec97b SDK: add WaitForPublisherCount() and related ctor arg 2020-11-13 02:58:09 +01:00
Dennis Klein
73377c5100 CI: Update macOS builds 2020-11-05 15:12:02 +01:00
18 changed files with 142 additions and 82 deletions

View File

@@ -82,7 +82,7 @@ endif()
if(BUILD_DDS_PLUGIN OR BUILD_SDK)
find_package2(PRIVATE DDS REQUIRED
VERSION 3.0
VERSION 3.5.3
)
set(DDS_Boost_COMPONENTS system log log_setup regex filesystem thread)
set(DDS_Boost_VERSION 1.67)
@@ -136,7 +136,7 @@ endif()
if(BUILD_SDK)
find_package2(BUNDLED asio
VERSION 1.13.0
VERSION 1.18.0
)
if(NOT asio_FOUND)
build_bundled(asio extern/asio)

View File

@@ -30,9 +30,6 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=ON")
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
Set(configure_options "${configure_options};-DBoost_NO_BOOST_CMAKE=ON")
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
If(EXTRA_FLAGS)
@@ -60,8 +57,13 @@ Ctest_Configure(BUILD "${CTEST_BINARY_DIRECTORY}"
Ctest_Build(BUILD "${CTEST_BINARY_DIRECTORY}")
unset(exclude_tests)
if($ENV{EXCLUDE_UNSTABLE_DDS_TESTS})
set(exclude_tests EXCLUDE ".*\\.localhost$")
endif()
Ctest_Test(BUILD "${CTEST_BINARY_DIRECTORY}"
# PARALLEL_LEVEL $ENV{number_of_processors}
${exclude_tests}
PARALLEL_LEVEL $ENV{number_of_processors}
RETURN_VALUE _ctest_test_ret_val
)

10
Jenkinsfile vendored
View File

@@ -46,6 +46,11 @@ def jobMatrix(String prefix, List specs, Closure callback) {
echo "export GIT_BRANCH=$JOB_BASE_NAME" >> Dart.cfg
echo "echo \\\$PATH" >> Dart.cfg
'''
if (os =~ /macOS10.14/) {
sh "echo \"export EXCLUDE_UNSTABLE_DDS_TESTS=1\" >> Dart.cfg"
}
sh 'cat Dart.cfg'
callback.call(spec, label)
@@ -74,8 +79,9 @@ pipeline{
steps{
script {
def build_jobs = jobMatrix('build', [
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleLLVM11.0.3', fairsoft: 'fairmq_dev'],
[os: 'Debian8', arch: 'x86_64', compiler: 'gcc9.1.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.14', arch: 'x86_64', compiler: 'AppleClang11.0', fairsoft: 'fairmq_dev'],
[os: 'macOS10.15', arch: 'x86_64', compiler: 'AppleClang12.0', fairsoft: 'fairmq_dev'],
]) { spec, label ->
sh './Dart.sh alfa_ci Dart.cfg'
}

2
extern/asio vendored

View File

@@ -257,7 +257,7 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
return fSocket->Send(msg, sndTimeoutInMs);
@@ -267,7 +267,7 @@ class FairMQChannel
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
return fSocket->Receive(msg, rcvTimeoutInMs);

View File

@@ -129,7 +129,7 @@ class FairMQDevice
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
}
@@ -140,7 +140,7 @@ class FairMQDevice
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
int Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
}

View File

@@ -45,8 +45,8 @@ class FairMQSocket
virtual bool Bind(const std::string& address) = 0;
virtual bool Connect(const std::string& address) = 0;
virtual int Send(FairMQMessagePtr& msg, int timeout = -1) = 0;
virtual int Receive(FairMQMessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Send(FairMQMessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Receive(FairMQMessagePtr& msg, int timeout = -1) = 0;
virtual int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;
virtual int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, int timeout = -1) = 0;

View File

@@ -254,7 +254,7 @@ auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoi
}
}
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int64_t
{
// timeout argument not yet implemented
@@ -412,7 +412,7 @@ auto Socket::SendQueueReaderStatic() -> void
});
}
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int64_t
try {
// timeout argument not yet implemented

View File

@@ -49,8 +49,8 @@ class Socket final : public fair::mq::Socket
auto Bind(const std::string& address) -> bool override;
auto Connect(const std::string& address) -> bool override;
auto Send(MessagePtr& msg, int timeout = 0) -> int override;
auto Receive(MessagePtr& msg, int timeout = 0) -> int override;
auto Send(MessagePtr& msg, int timeout = 0) -> int64_t override;
auto Receive(MessagePtr& msg, int timeout = 0) -> int64_t override;
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;

View File

@@ -357,7 +357,7 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui
} break;
case Type::dump_config: {
stringstream ss;
for (const auto pKey : GetPropertyKeys()) {
for (const auto& pKey : GetPropertyKeys()) {
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
}
Cmds outCmds(make<Config>(id, ss.str()));

View File

@@ -12,6 +12,7 @@
#include <asio/associated_allocator.hpp>
#include <asio/associated_executor.hpp>
#include <asio/executor_work_guard.hpp>
#include <asio/dispatch.hpp>
#include <asio/system_executor.hpp>
#include <chrono>
#include <exception>
@@ -69,17 +70,16 @@ struct AsioAsyncOpImpl : AsioAsyncOpImplBase<SignatureArgTypes...>
throw RuntimeError("Async operation already completed");
}
GetEx2().dispatch(
[=, handler = std::move(fHandler)]() mutable {
try {
handler(ec, args...);
} catch (const std::exception& e) {
FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
} catch (...) {
FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
}
},
GetAlloc2());
asio::dispatch(GetEx2(),
[=, handler = std::move(fHandler)]() mutable {
try {
handler(ec, args...);
} catch (const std::exception& e) {
FAIR_LOG(error) << "Uncaught exception in AsioAsyncOp completion handler: " << e.what();
} catch (...) {
FAIR_LOG(error) << "Unknown uncaught exception in AsioAsyncOp completion handler.";
}
});
fWork1.reset();
fWork2.reset();

View File

@@ -9,7 +9,7 @@
#ifndef FAIR_MQ_SDK_ASIOBASE_H
#define FAIR_MQ_SDK_ASIOBASE_H
#include <asio/executor.hpp>
#include <asio/any_io_executor.hpp>
#include <fairmq/sdk/Traits.h>
#include <memory>
#include <utility>
@@ -18,7 +18,7 @@ namespace fair {
namespace mq {
namespace sdk {
using DefaultExecutor = asio::executor;
using DefaultExecutor = asio::any_io_executor;
using DefaultAllocator = std::allocator<int>;
/**

View File

@@ -18,11 +18,13 @@ namespace sdk {
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
/// @param env Optional DDSEnv (needed primarily for unit testing)
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env) -> Topology
DDSEnv env,
bool blockUntilConnected) -> Topology
{
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env)};
return {DDSTopo(std::move(nativeTopo), env), DDSSession(std::move(nativeSession), env), blockUntilConnected};
}
} // namespace sdk

View File

@@ -216,18 +216,21 @@ class BasicTopology : public AsioBase<Executor, Allocator>
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
/// @param topo DDSTopology
/// @param session DDSSession
BasicTopology(DDSTopology topo, DDSSession session)
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session))
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
BasicTopology(DDSTopology topo, DDSSession session, bool blockUntilConnected = false)
: BasicTopology<Executor, Allocator>(asio::system_executor(), std::move(topo), std::move(session), blockUntilConnected)
{}
/// @brief (Re)Construct a FairMQ topology from an existing DDS topology
/// @param ex I/O executor to be associated
/// @param topo DDSTopology
/// @param session DDSSession
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
/// @throws RuntimeError
BasicTopology(const Executor& ex,
DDSTopology topo,
DDSSession session,
bool blockUntilConnected = false,
Allocator alloc = DefaultAllocator())
: AsioBase<Executor, Allocator>(ex, std::move(alloc))
, fDDSSession(std::move(session))
@@ -235,7 +238,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
, fStateData()
, fStateIndex()
, fMtx(std::make_unique<std::mutex>())
, fStateChangeUnsubscriptionCV(std::make_unique<std::condition_variable>())
, fStateChangeSubscriptionsCV(std::make_unique<std::condition_variable>())
, fNumStateChangePublishers(0)
, fHeartbeatsTimer(asio::system_executor())
, fHeartbeatInterval(600000)
{
@@ -251,6 +255,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
fDDSSession.StartDDSService();
SubscribeToStateChanges();
if (blockUntilConnected) {
WaitForPublisherCount(fStateIndex.size());
}
}
/// not copyable
@@ -284,6 +291,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
}
void WaitForPublisherCount(unsigned int number)
{
std::unique_lock<std::mutex> lk(*fMtx);
fStateChangeSubscriptionsCV->wait(lk, [&](){
return fNumStateChangePublishers == number;
});
}
void SendSubscriptionHeartbeats(const std::error_code& ec)
{
if (!ec) {
@@ -308,13 +323,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
// wait for all tasks to confirm unsubscription
std::unique_lock<std::mutex> lk(*fMtx);
fStateChangeUnsubscriptionCV->wait(lk, [&](){
unsigned int count = std::count_if(fStateIndex.cbegin(), fStateIndex.cend(), [=](const auto& s) {
return fStateData.at(s.second).subscribed_to_state_changes == false;
});
return count == fStateIndex.size();
});
WaitForPublisherCount(0);
}
void SubscribeToCommands()
@@ -360,11 +369,19 @@ class BasicTopology : public AsioBase<Executor, Allocator>
DDSTask::Id taskId(cmd.GetTaskId());
try {
std::lock_guard<std::mutex> lk(*fMtx);
std::unique_lock<std::mutex> lk(*fMtx);
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
task.subscribed_to_state_changes = true;
if (!task.subscribed_to_state_changes) {
task.subscribed_to_state_changes = true;
++fNumStateChangePublishers;
} else {
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent subscription confirmation more than once";
}
lk.unlock();
fStateChangeSubscriptionsCV->notify_one();
} catch (const std::exception& e) {
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeSubscription const&): " << e.what();
FAIR_LOG(error) << "Possibly no task with id '" << taskId << "'?";
}
} else {
FAIR_LOG(error) << "State change subscription failed for device: " << cmd.GetDeviceId() << ", task id: " << cmd.GetTaskId();
@@ -379,9 +396,14 @@ class BasicTopology : public AsioBase<Executor, Allocator>
try {
std::unique_lock<std::mutex> lk(*fMtx);
DeviceStatus& task = fStateData.at(fStateIndex.at(taskId));
task.subscribed_to_state_changes = false;
if (task.subscribed_to_state_changes) {
task.subscribed_to_state_changes = false;
--fNumStateChangePublishers;
} else {
FAIR_LOG(warn) << "Task '" << task.taskId << "' sent unsubscription confirmation more than once";
}
lk.unlock();
fStateChangeUnsubscriptionCV->notify_one();
fStateChangeSubscriptionsCV->notify_one();
} catch (const std::exception& e) {
FAIR_LOG(error) << "Exception in HandleCmd(cmd::StateChangeUnsubscription const&): " << e.what();
}
@@ -406,6 +428,7 @@ class BasicTopology : public AsioBase<Executor, Allocator>
// if the task is exiting, it will not respond to unsubscription request anymore, set it to false now.
if (task.state == DeviceState::Exiting) {
task.subscribed_to_state_changes = false;
--fNumStateChangePublishers;
}
// FAIR_LOG(debug) << "Updated state entry: taskId=" << taskId << ", state=" << state;
@@ -1300,7 +1323,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
mutable std::unique_ptr<std::mutex> fMtx;
std::unique_ptr<std::condition_variable> fStateChangeUnsubscriptionCV;
std::unique_ptr<std::condition_variable> fStateChangeSubscriptionsCV;
unsigned int fNumStateChangePublishers;
asio::steady_timer fHeartbeatsTimer;
Duration fHeartbeatInterval;
@@ -1336,9 +1360,11 @@ using Topo = Topology;
/// @param nativeSession Existing and initialized CSession (either via create() or attach())
/// @param nativeTopo Existing CTopology that is activated on the given nativeSession
/// @param env Optional DDSEnv (needed primarily for unit testing)
/// @param blockUntilConnected if true, ctor will wait for all tasks to confirm subscriptions
auto MakeTopology(dds::topology_api::CTopology nativeTopo,
std::shared_ptr<dds::tools_api::CSession> nativeSession,
DDSEnv env = {}) -> Topology;
DDSEnv env = {},
bool blockUntilConnected = false) -> Topology;
} // namespace sdk
} // namespace mq

View File

@@ -9,6 +9,7 @@
#include <fairmq/sdk/commands/Commands.h>
#include <fairmq/States.h>
#include <fairmq/SDK.h>
#include <fairmq/Tools.h>
#include <boost/program_options.hpp>
@@ -63,14 +64,25 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
if (command == "c") {
cout << "> checking state of the devices" << endl;
auto const result = topo.GetCurrentState();
bool error = false;
for (const auto& d : result) {
cout << d.taskId << " : " << d.state << endl;
if (d.state == sdk::DeviceState::Error) {
error = true;
}
}
if (error) {
throw runtime_error("Some of the devices are in the Error state");
}
return;
} else if (command == "o") {
cout << "> dumping config of " << (path == "" ? "all" : path) << endl;
// TODO: extend this regex to return all properties, once command size limitation is removed.
auto const result = topo.GetProperties("^(session|id)$", path, std::chrono::milliseconds(timeout));
if (result.first != std::error_code()) {
cout << "ERROR: GetProperties failed for '" << path << "': " << result.first.message() << endl;
throw runtime_error(tools::ToString("GetProperties failed for '", path, "': ", result.first.message()));
}
for (const auto& d : result.second.devices) {
for (auto const& p : d.second.props) {
cout << d.first << ": " << p.first << " : " << p.second << endl;
@@ -80,11 +92,15 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
} else if (command == "p") {
if (pKey == "" || pVal == "") {
cout << "cannot send property with empty key and/or value! given key: '" << pKey << "', value: '" << pVal << "'." << endl;
return;
throw runtime_error(tools::ToString("cannot send property with empty key and/or value! given key: '", pKey, "', value: '", pVal, "'."));
}
const DeviceProperties props{{pKey, pVal}};
cout << "> setting properties --> " << (path == "" ? "all" : path) << endl;
topo.SetProperties(props, path);
auto const result = topo.SetProperties(props, path);
if (result.first != std::error_code()) {
cout << "ERROR: SetProperties failed for '" << path << "': " << result.first.message() << endl;
throw runtime_error(tools::ToString("SetProperties failed for '", path, "': ", result.first.message()));
}
// give dds time to complete request
this_thread::sleep_for(chrono::milliseconds(100));
return;
@@ -125,10 +141,11 @@ void handleCommand(const string& command, const string& path, unsigned int timeo
} else {
cout << "\033[01;32mInvalid input: [" << command << "]\033[0m" << endl;
printControlsHelp();
return;
throw runtime_error(tools::ToString("\033[01;32mInvalid input: [", command, "]\033[0m"));
}
if (changeStateResult.first != std::error_code()) {
cout << "ERROR: ChangeState failed for '" << path << "': " << changeStateResult.first.message() << endl;
throw runtime_error(tools::ToString("ERROR: ChangeState failed for '", path, "': ", changeStateResult.first.message()));
}
}
@@ -148,7 +165,11 @@ void sendCommand(const string& commandIn, const string& path, unsigned int timeo
command = c;
while (true) {
handleCommand(command, path, timeout, topo, pKey, pVal);
try {
handleCommand(command, path, timeout, topo, pKey, pVal);
} catch(exception& e) {
cout << "Error: " << e.what() << endl;
}
cin >> c;
command = c;
}
@@ -209,7 +230,7 @@ try {
DDSSession session(sessionID, env);
DDSTopology ddsTopo(DDSTopology::Path(topoFile), env);
Topology topo(ddsTopo, session);
Topology topo(ddsTopo, session, true);
if (targetState != "") {
if (command != "") {

View File

@@ -143,23 +143,19 @@ class Manager
} catch(interprocess_exception& bie) {
LOG(error) << "something went wrong: " << bie.what();
}
}
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
if (mlock(boost::apply_visitor(SegmentAddress{}, fSegments.at(fSegmentId)), boost::apply_visitor(SegmentSize{}, fSegments.at(fSegmentId))) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
boost::apply_visitor(SegmentMemoryZeroer{}, fSegments.at(fSegmentId));
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
@@ -367,13 +363,20 @@ class Manager
}
for (const auto& e : *fShmSegments) {
fair::mq::RegionInfo info;
info.managed = true;
info.id = e.first;
info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
result.push_back(info);
// make sure any segments in the session are found
GetSegment(e.first);
try {
fair::mq::RegionInfo info;
info.managed = true;
info.id = e.first;
info.event = RegionEvent::created;
info.ptr = boost::apply_visitor(SegmentAddress{}, fSegments.at(e.first));
info.size = boost::apply_visitor(SegmentSize{}, fSegments.at(e.first));
result.push_back(info);
} catch (const std::out_of_range& oor) {
LOG(error) << "could not find segment with id " << e.first;
LOG(error) << oor.what();
}
}
return result;
@@ -474,7 +477,7 @@ class Manager
try {
// get region info
SegmentInfo segmentInfo = fShmSegments->at(id);
LOG(info) << "LOCATED SEGMENT WITH ID '" << id << "'";
LOG(debug) << "Located segment with id '" << id << "'";
using namespace boost::interprocess;

View File

@@ -155,7 +155,7 @@ class Socket final : public fair::mq::Socket
}
}
int Send(MessagePtr& msg, const int timeout = -1) override
int64_t Send(MessagePtr& msg, const int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {
@@ -191,7 +191,7 @@ class Socket final : public fair::mq::Socket
return static_cast<int>(TransferResult::error);
}
int Receive(MessagePtr& msg, const int timeout = -1) override
int64_t Receive(MessagePtr& msg, const int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {

View File

@@ -132,7 +132,7 @@ class Socket final : public fair::mq::Socket
}
}
int Send(MessagePtr& msg, const int timeout = -1) override
int64_t Send(MessagePtr& msg, const int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {
@@ -162,7 +162,7 @@ class Socket final : public fair::mq::Socket
}
}
int Receive(MessagePtr& msg, const int timeout = -1) override
int64_t Receive(MessagePtr& msg, const int timeout = -1) override
{
int flags = 0;
if (timeout == 0) {