mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
6 Commits
v1.4.26
...
ofi_dev_st
Author | SHA1 | Date | |
---|---|---|---|
|
92632a022c | ||
|
bd5105d609 | ||
|
080dd0a9df | ||
|
a9dfe39bf7 | ||
|
e1b1e5e21b | ||
|
763c21ffdd |
@@ -25,7 +25,7 @@ target_link_libraries(fairmq-ex-readout-builder PRIVATE ExampleReadoutLib)
|
||||
add_executable(fairmq-ex-readout-sink runSink.cxx)
|
||||
target_link_libraries(fairmq-ex-readout-sink PRIVATE ExampleReadoutLib)
|
||||
|
||||
add_custom_target(Examplereadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-sink)
|
||||
add_custom_target(ExampleReadout DEPENDS fairmq-ex-readout-sampler fairmq-ex-readout-builder fairmq-ex-readout-sink)
|
||||
|
||||
set(EX_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR})
|
||||
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
|
||||
@@ -36,6 +36,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-readout.sh.in ${CMAKE
|
||||
install(
|
||||
TARGETS
|
||||
fairmq-ex-readout-sampler
|
||||
fairmq-ex-readout-builder
|
||||
fairmq-ex-readout-sink
|
||||
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
|
@@ -1,5 +0,0 @@
|
||||
Region example
|
||||
==============
|
||||
|
||||
This example demonstrates the use of a more advanced feature - UnmanagedRegion, that can be used to create a buffer through one of FairMQ transports. The contents of this buffer are managed by the user, who can also create messages out of sub-buffers of the created buffer. Such feature can be interesting in environments that have special requirements by the hardware that writes the data, to keep the transfer efficient (e.g. shared memory).
|
||||
|
@@ -35,28 +35,23 @@ void Sampler::InitTask()
|
||||
fMsgSize = fConfig->GetValue<int>("msg-size");
|
||||
fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
|
||||
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data1",
|
||||
0,
|
||||
10000000,
|
||||
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
|
||||
--fNumUnackedMsgs;
|
||||
if (fMaxIterations > 0)
|
||||
{
|
||||
LOG(debug) << "Received ack";
|
||||
}
|
||||
}
|
||||
));
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegion(10000000,
|
||||
[this](void* /*data*/, size_t /*size*/, void* /*hint*/) { // callback to be called when message buffers no longer needed by transport
|
||||
--fNumUnackedMsgs;
|
||||
if (fMaxIterations > 0)
|
||||
{
|
||||
LOG(debug) << "Received ack";
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
bool Sampler::ConditionalRun()
|
||||
{
|
||||
FairMQMessagePtr msg(NewMessageFor("data1", // channel
|
||||
0, // sub-channel
|
||||
fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
FairMQMessagePtr msg(NewMessage(fRegion, // region
|
||||
fRegion->GetData(), // ptr within region
|
||||
fMsgSize, // offset from ptr
|
||||
nullptr // hint
|
||||
));
|
||||
|
||||
if (Send(msg, "data1", 0) > 0)
|
||||
{
|
||||
|
@@ -2,7 +2,7 @@
|
||||
|
||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||
|
||||
msgSize="1000000"
|
||||
msgSize="2000000"
|
||||
|
||||
if [[ $1 =~ ^[0-9]+$ ]]; then
|
||||
msgSize=$1
|
||||
@@ -11,6 +11,7 @@ fi
|
||||
SAMPLER="fairmq-ex-readout-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --severity debug"
|
||||
SAMPLER+=" --transport shmem"
|
||||
SAMPLER+=" --msg-size $msgSize"
|
||||
# SAMPLER+=" --rate 10"
|
||||
SAMPLER+=" --channel-config name=data1,type=pair,method=bind,address=tcp://127.0.0.1:7777,transport=shmem"
|
||||
@@ -26,5 +27,6 @@ xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
|
||||
SINK="fairmq-ex-readout-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --severity debug"
|
||||
SINK+=" --ofi-size-hint $msgSize"
|
||||
SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi"
|
||||
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK &
|
||||
|
@@ -227,9 +227,9 @@ class FairMQDevice
|
||||
return GetChannel(channel, index).NewSimpleMessage(data);
|
||||
}
|
||||
|
||||
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size)
|
||||
FairMQUnmanagedRegionPtr NewUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr)
|
||||
{
|
||||
return Transport()->CreateUnmanagedRegion(size);
|
||||
return Transport()->CreateUnmanagedRegion(size, callback);
|
||||
}
|
||||
|
||||
FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, const size_t size, FairMQRegionCallback callback = nullptr)
|
||||
|
@@ -37,6 +37,7 @@ Context::Context(FairMQTransportFactory& sendFactory,
|
||||
: fIoWork(fIoContext)
|
||||
, fReceiveFactory(receiveFactory)
|
||||
, fSendFactory(sendFactory)
|
||||
, fSizeHint(2000000) // temporary hack to provide expected message size for receive
|
||||
{
|
||||
InitThreadPool(numberIoThreads);
|
||||
}
|
||||
|
@@ -72,6 +72,8 @@ class Context
|
||||
auto Reset() -> void;
|
||||
auto MakeReceiveMessage(size_t size) -> MessagePtr;
|
||||
auto MakeSendMessage(size_t size) -> MessagePtr;
|
||||
size_t GetSizeHint() { return fSizeHint; } // temporary hack to provide expected message size for receive
|
||||
void SetSizeHint(size_t size) { fSizeHint = size; } // temporary hack to provide expected message size for receive
|
||||
|
||||
private:
|
||||
boost::asio::io_context fIoContext;
|
||||
@@ -80,6 +82,8 @@ class Context
|
||||
FairMQTransportFactory& fReceiveFactory;
|
||||
FairMQTransportFactory& fSendFactory;
|
||||
|
||||
size_t fSizeHint; // temporary hack to provide expected message size for receive
|
||||
|
||||
auto InitThreadPool(int numberIoThreads) -> void;
|
||||
}; /* class Context */
|
||||
|
||||
|
@@ -43,6 +43,8 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size)
|
||||
, fPmr(pmr)
|
||||
{
|
||||
if (size) {
|
||||
// static void* buffer = fPmr->allocate(size);
|
||||
// fData = buffer;
|
||||
fData = fPmr->allocate(size);
|
||||
assert(fData);
|
||||
}
|
||||
|
@@ -44,7 +44,6 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
||||
, fOfiDomain(nullptr)
|
||||
, fPassiveEndpoint(nullptr)
|
||||
, fDataEndpoint(nullptr)
|
||||
, fControlEndpoint(nullptr)
|
||||
, fId(id + "." + name + "." + type)
|
||||
, fBytesTx(0)
|
||||
, fBytesRx(0)
|
||||
@@ -52,13 +51,13 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
||||
, fMessagesRx(0)
|
||||
, fSndTimeout(100)
|
||||
, fRcvTimeout(100)
|
||||
, fSendQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
|
||||
, fSendQueueRead(fContext.GetIoContext(), ZMQ_PULL)
|
||||
, fRecvQueueWrite(fContext.GetIoContext(), ZMQ_PUSH)
|
||||
, fRecvQueueRead(fContext.GetIoContext(), ZMQ_PULL)
|
||||
, fSendSem(fContext.GetIoContext(), 300)
|
||||
, fRecvSem(fContext.GetIoContext(), 300)
|
||||
, fNeedOfiMemoryRegistration(false)
|
||||
, fBound(false)
|
||||
, fConnected(false)
|
||||
{
|
||||
if (type != "pair") {
|
||||
throw SocketError{tools::ToString("Socket type '", type, "' not implemented for ofi transport.")};
|
||||
@@ -66,22 +65,14 @@ Socket::Socket(Context& context, const string& type, const string& name, const s
|
||||
// TODO wire this up with config
|
||||
azmq::socket::snd_hwm send_max(300);
|
||||
azmq::socket::rcv_hwm recv_max(300);
|
||||
fSendQueueRead.set_option(send_max);
|
||||
fSendQueueRead.set_option(recv_max);
|
||||
fSendQueueWrite.set_option(send_max);
|
||||
fSendQueueWrite.set_option(recv_max);
|
||||
fRecvQueueRead.set_option(send_max);
|
||||
fRecvQueueRead.set_option(recv_max);
|
||||
fRecvQueueWrite.set_option(send_max);
|
||||
fRecvQueueWrite.set_option(recv_max);
|
||||
|
||||
// Setup internal queue
|
||||
auto hashed_id = std::hash<std::string>()(fId);
|
||||
auto hashed_id = hash<string>()(fId);
|
||||
auto queue_id = tools::ToString("inproc://TXQUEUE", hashed_id);
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << "Binding SQR: " << queue_id;
|
||||
fSendQueueRead.bind(queue_id);
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << "Connecting SQW: " << queue_id;
|
||||
fSendQueueWrite.connect(queue_id);
|
||||
queue_id = tools::ToString("inproc://RXQUEUE", hashed_id);
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << "Binding RQR: " << queue_id;
|
||||
fRecvQueueRead.bind(queue_id);
|
||||
@@ -103,9 +94,9 @@ auto Socket::InitOfi(Address addr) -> void
|
||||
hints.set_provider("verbs");
|
||||
}
|
||||
if (fRemoteAddr == addr) {
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), 0, hints);
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), to_string(addr.Port).c_str(), 0, hints);
|
||||
} else {
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), std::to_string(addr.Port).c_str(), FI_SOURCE, hints);
|
||||
fOfiInfo = tools::make_unique<asiofi::info>(addr.Ip.c_str(), to_string(addr.Port).c_str(), FI_SOURCE, hints);
|
||||
}
|
||||
|
||||
LOG(debug) << "OFI transport: " << *fOfiInfo;
|
||||
@@ -118,6 +109,7 @@ auto Socket::InitOfi(Address addr) -> void
|
||||
|
||||
auto Socket::Bind(const string& addr) -> bool
|
||||
try {
|
||||
fBound = false;
|
||||
fLocalAddr = Context::VerifyAddress(addr);
|
||||
if (fLocalAddr.Protocol == "verbs") {
|
||||
fNeedOfiMemoryRegistration = true;
|
||||
@@ -128,66 +120,39 @@ try {
|
||||
fPassiveEndpoint = tools::make_unique<asiofi::passive_endpoint>(fContext.GetIoContext(), *fOfiFabric);
|
||||
//fPassiveEndpoint->set_local_address(Context::ConvertAddress(fLocalAddr));
|
||||
|
||||
BindControlEndpoint();
|
||||
|
||||
return true;
|
||||
}
|
||||
// TODO catch the correct ofi error
|
||||
catch (const SilentSocketError& e)
|
||||
{
|
||||
// do not print error in this case, this is handled by FairMQDevice
|
||||
// in case no connection could be established after trying a number of random ports from a range.
|
||||
return false;
|
||||
}
|
||||
catch (const SocketError& e)
|
||||
{
|
||||
LOG(error) << "OFI transport: " << e.what();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto Socket::BindControlEndpoint() -> void
|
||||
{
|
||||
assert(!fControlEndpoint);
|
||||
|
||||
fPassiveEndpoint->listen([&](asiofi::info&& info) {
|
||||
LOG(debug) << "OFI transport (" << fId
|
||||
<< "): control band connection request received. Accepting ...";
|
||||
fControlEndpoint = tools::make_unique<asiofi::connected_endpoint>(
|
||||
fContext.GetIoContext(), *fOfiDomain, info);
|
||||
fControlEndpoint->enable();
|
||||
fControlEndpoint->accept([&]() {
|
||||
LOG(debug) << "OFI transport (" << fId << "): control band connection accepted.";
|
||||
|
||||
BindDataEndpoint();
|
||||
});
|
||||
});
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): control band bound to " << fLocalAddr;
|
||||
}
|
||||
|
||||
auto Socket::BindDataEndpoint() -> void
|
||||
{
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
fPassiveEndpoint->listen([&](asiofi::info&& info) {
|
||||
LOG(debug) << "OFI transport (" << fId
|
||||
<< "): data band connection request received. Accepting ...";
|
||||
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(
|
||||
fContext.GetIoContext(), *fOfiDomain, info);
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
|
||||
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain, info);
|
||||
fDataEndpoint->enable();
|
||||
fDataEndpoint->accept([&]() {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
|
||||
fBound = true;
|
||||
});
|
||||
});
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalAddr;
|
||||
|
||||
while (!fBound) {
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (const SilentSocketError& e) {// TODO catch the correct ofi error
|
||||
// do not print error in this case, this is handled by FairMQDevice
|
||||
// in case no connection could be established after trying a number of random ports from a range.
|
||||
return false;
|
||||
} catch (const SocketError& e) {
|
||||
LOG(error) << "OFI transport: " << e.what();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto Socket::Connect(const string& address) -> bool
|
||||
try {
|
||||
fConnected = false;
|
||||
fRemoteAddr = Context::VerifyAddress(address);
|
||||
if (fRemoteAddr.Protocol == "verbs") {
|
||||
fNeedOfiMemoryRegistration = true;
|
||||
@@ -195,125 +160,47 @@ try {
|
||||
|
||||
InitOfi(fRemoteAddr);
|
||||
|
||||
ConnectEndpoint(fControlEndpoint, Band::Control);
|
||||
assert(!fDataEndpoint);
|
||||
|
||||
fDataEndpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
|
||||
fDataEndpoint->enable();
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): Sending data band connection request to " << fRemoteAddr;
|
||||
|
||||
fDataEndpoint->connect(Context::ConvertAddress(fRemoteAddr), [&](asiofi::eq::event event) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band conn event happened";
|
||||
if (event == asiofi::eq::event::connected) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): data band connected.";
|
||||
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
|
||||
fConnected = true;
|
||||
} else {
|
||||
LOG(error) << "Could not connect on the first try";
|
||||
}
|
||||
});
|
||||
|
||||
while (!fConnected) {
|
||||
this_thread::sleep_for(chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
catch (const SilentSocketError& e)
|
||||
{
|
||||
} catch (const SilentSocketError& e) {
|
||||
// do not print error in this case, this is handled by FairMQDevice
|
||||
return false;
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << "OFI transport: " << e.what();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto Socket::ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void
|
||||
{
|
||||
assert(!endpoint);
|
||||
|
||||
std::string band(type == Band::Control ? "control" : "data");
|
||||
|
||||
endpoint = tools::make_unique<asiofi::connected_endpoint>(fContext.GetIoContext(), *fOfiDomain);
|
||||
endpoint->enable();
|
||||
|
||||
LOG(debug) << "OFI transport (" << fId << "): Sending " << band << " band connection request to " << fRemoteAddr;
|
||||
|
||||
endpoint->connect(Context::ConvertAddress(fRemoteAddr), [&, band, type](asiofi::eq::event event) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << band << " band conn event happened";
|
||||
if (event == asiofi::eq::event::connected) {
|
||||
LOG(debug) << "OFI transport (" << fId << "): " << band << " band connected.";
|
||||
if (type == Band::Control) {
|
||||
ConnectEndpoint(fDataEndpoint, Band::Data);
|
||||
} else {
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||
}
|
||||
} else {
|
||||
LOG(error) << "asdf";
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// auto Socket::ReceiveDataAddressAnnouncement() -> void
|
||||
// {
|
||||
// azmq::message ctrl;
|
||||
// auto recv = fControlEndpoint.receive(ctrl);
|
||||
// assert(recv == sizeof(DataAddressAnnouncement)); (void)recv;
|
||||
// auto daa(static_cast<const DataAddressAnnouncement*>(ctrl.data()));
|
||||
// assert(daa->type == ControlMessageType::DataAddressAnnouncement);
|
||||
//
|
||||
// sockaddr_in remoteAddr;
|
||||
// remoteAddr.sin_family = AF_INET;
|
||||
// remoteAddr.sin_port = daa->port;
|
||||
// remoteAddr.sin_addr.s_addr = daa->ipv4;
|
||||
//
|
||||
// auto addr = Context::ConvertAddress(remoteAddr);
|
||||
// addr.Protocol = fRemoteDataAddr.Protocol;
|
||||
// LOG(debug) << "OFI transport (" << fId << "): Data address announcement of remote endpoint received: " << addr;
|
||||
// fRemoteDataAddr = addr;
|
||||
// }
|
||||
//
|
||||
// auto Socket::AnnounceDataAddress() -> void
|
||||
// {
|
||||
// fLocalDataAddr = fDataEndpoint->get_local_address();
|
||||
// LOG(debug) << "Address of local ofi endpoint in socket " << fId << ": " << Context::ConvertAddress(fLocalDataAddr);
|
||||
//
|
||||
// Create new data address announcement message
|
||||
// auto daa = MakeControlMessage<DataAddressAnnouncement>();
|
||||
// auto addr = Context::ConvertAddress(fLocalDataAddr);
|
||||
// daa.ipv4 = addr.sin_addr.s_addr;
|
||||
// daa.port = addr.sin_port;
|
||||
//
|
||||
// auto sent = fControlEndpoint.send(boost::asio::buffer(daa));
|
||||
// assert(sent == sizeof(addr)); (void)sent;
|
||||
//
|
||||
// LOG(debug) << "OFI transport (" << fId << "): data band address " << fLocalDataAddr << " announced.";
|
||||
// }
|
||||
|
||||
auto Socket::Send(MessagePtr& msg, const int /*timeout*/) -> int
|
||||
{
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER Send: data=" << msg->GetData() << ",size=" << msg->GetSize();
|
||||
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg)));
|
||||
try {
|
||||
auto res = fSendQueueWrite.send(boost::asio::const_buffer(msgptr, sizeof(MessagePtr)), 0);
|
||||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Send";
|
||||
return res;
|
||||
} catch (const std::exception& e) {
|
||||
msg = std::move(*msgptr);
|
||||
LOG(error) << e.what();
|
||||
return -1;
|
||||
} catch (const boost::system::error_code& e) {
|
||||
msg = std::move(*msgptr);
|
||||
LOG(error) << e;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
|
||||
{
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER Receive";
|
||||
|
||||
try {
|
||||
azmq::message zmsg;
|
||||
auto recv = fRecvQueueRead.receive(zmsg);
|
||||
|
||||
size_t size(0);
|
||||
if (recv > 0) {
|
||||
msg = std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
|
||||
size = msg->GetSize();
|
||||
}
|
||||
|
||||
fBytesRx += size;
|
||||
fMessagesRx++;
|
||||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
|
||||
fSendSem.wait();
|
||||
size_t size = msg->GetSize();
|
||||
OnSend(msg);
|
||||
return size;
|
||||
} catch (const std::exception& e) {
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << e.what();
|
||||
return -1;
|
||||
} catch (const boost::system::error_code& e) {
|
||||
@@ -322,385 +209,136 @@ auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
|
||||
}
|
||||
}
|
||||
|
||||
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
auto Socket::OnSend(MessagePtr& msg) -> void
|
||||
{
|
||||
return SendImpl(msgVec, 0, timeout);
|
||||
}
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend";
|
||||
|
||||
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
{
|
||||
return ReceiveImpl(msgVec, 0, timeout);
|
||||
}
|
||||
|
||||
auto Socket::SendQueueReader() -> void
|
||||
{
|
||||
fSendSem.async_wait([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Wait fSendSem=" <<
|
||||
// fSendSem.get_value();
|
||||
fSendQueueRead.async_receive([&](const boost::system::error_code& ec2,
|
||||
azmq::message& zmsg,
|
||||
size_t bytes_transferred) {
|
||||
if (!ec2) {
|
||||
OnSend(zmsg, bytes_transferred);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto Socket::OnSend(azmq::message& zmsg, size_t /*bytes_transferred*/) -> void
|
||||
{
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend: bytes_transferred=" << bytes_transferred;
|
||||
|
||||
MessagePtr msg(std::move(*(static_cast<MessagePtr*>(zmsg.buffer().data()))));
|
||||
auto size = msg->GetSize();
|
||||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize();
|
||||
|
||||
// Create and send control message
|
||||
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
|
||||
ctrl->size = size;
|
||||
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::send);
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
|
||||
auto desc = mr.desc();
|
||||
fControlEndpoint->send(
|
||||
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control message sent";
|
||||
|
||||
fDataEndpoint->send(buffer, desc, [&, size, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
} else {
|
||||
fControlEndpoint->send(ctrl_msg,
|
||||
[&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Control
|
||||
// message sent";
|
||||
});
|
||||
}
|
||||
|
||||
if (size) {
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send);
|
||||
auto desc = mr.desc();
|
||||
|
||||
fDataEndpoint->send(buffer,
|
||||
desc,
|
||||
[&, size, msg2 = std::move(msg), mr2 = std::move(mr)](
|
||||
boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data
|
||||
// buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >
|
||||
// Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
} else {
|
||||
fDataEndpoint->send(
|
||||
buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem="
|
||||
// << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
++fMessagesTx;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
fDataEndpoint->send(buffer, [&, size, msg2 = move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): >>>>> Data buffer sent";
|
||||
fBytesTx += size;
|
||||
fMessagesTx++;
|
||||
fSendSem.async_signal([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): > Signal fSendSem=" << fSendSem.get_value();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this));
|
||||
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnSend";
|
||||
}
|
||||
|
||||
auto Socket::RecvControlQueueReader() -> void
|
||||
auto Socket::Receive(MessagePtr& msg, const int /*timeout*/) -> int
|
||||
try {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER Receive";
|
||||
azmq::message zmsg;
|
||||
auto recv = fRecvQueueRead.receive(zmsg);
|
||||
|
||||
size_t size = 0;
|
||||
if (recv > 0) {
|
||||
msg = move(*(static_cast<MessagePtr*>(zmsg.buffer().data())));
|
||||
size = msg->GetSize();
|
||||
}
|
||||
|
||||
fBytesRx += size;
|
||||
fMessagesRx++;
|
||||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): LEAVE Receive";
|
||||
return size;
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << e.what();
|
||||
return -1;
|
||||
} catch (const boost::system::error_code& e) {
|
||||
LOG(error) << e;
|
||||
return -1;
|
||||
}
|
||||
|
||||
auto Socket::Receive(vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
{
|
||||
return ReceiveImpl(msgVec, 0, timeout);
|
||||
}
|
||||
|
||||
auto Socket::RecvQueueReader() -> void
|
||||
{
|
||||
fRecvSem.async_wait([&](const boost::system::error_code& ec) {
|
||||
if (!ec) {
|
||||
auto ctrl = MakeControlMessageWithPmr<PostBuffer>(&fControlMemPool);
|
||||
auto ctrl_msg = boost::asio::mutable_buffer(ctrl.get(), sizeof(PostBuffer));
|
||||
static size_t size = fContext.GetSizeHint(); // temporary hack to provide expected message size for receive
|
||||
|
||||
auto msg = fContext.MakeReceiveMessage(size);
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, ctrl_msg, asiofi::mr::access::recv);
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
|
||||
auto desc = mr.desc();
|
||||
|
||||
fControlEndpoint->recv(
|
||||
ctrl_msg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
OnRecvControl(std::move(ctrl2));
|
||||
fDataEndpoint->recv(buffer, desc, [&, msg2 = move(msg), mr2 = move(mr)](boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(move(msg2)));
|
||||
fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec3) {
|
||||
if (!ec3) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
} else {
|
||||
fControlEndpoint->recv(
|
||||
ctrl_msg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {
|
||||
OnRecvControl(std::move(ctrl2));
|
||||
});
|
||||
} else {
|
||||
fDataEndpoint->recv(buffer, [&, msg2 = move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(move(msg2)));
|
||||
fRecvQueueWrite.async_send(azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))), [&](const boost::system::error_code& ec2, size_t /*bytes_transferred2*/) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): <<<<< Data buffer received, bytes_transferred2=" << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec3) {
|
||||
if (!ec3) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), bind(&Socket::RecvQueueReader, this));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto Socket::OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void
|
||||
auto Socket::Send(vector<MessagePtr>& msgVec, const int timeout) -> int64_t
|
||||
{
|
||||
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnRecvControl";
|
||||
|
||||
auto size = ctrl->size;
|
||||
// LOG(debug) << "OFI transport (" << fId << "): OnRecvControl: PostBuffer.size=" << size;
|
||||
|
||||
// Receive data
|
||||
if (size) {
|
||||
auto msg = fContext.MakeReceiveMessage(size);
|
||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||
|
||||
if (fNeedOfiMemoryRegistration) {
|
||||
asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv);
|
||||
auto desc = mr.desc();
|
||||
|
||||
fDataEndpoint->recv(
|
||||
buffer,
|
||||
desc,
|
||||
[&, msg2 = std::move(msg), mr2 = std::move(mr)](
|
||||
boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2)));
|
||||
fRecvQueueWrite.async_send(
|
||||
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
|
||||
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId
|
||||
// << "): <<<<< Data buffer received, bytes_transferred2="
|
||||
// << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
||||
if (!ec2) {
|
||||
//LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
} else {
|
||||
fDataEndpoint->recv(
|
||||
buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable {
|
||||
MessagePtr* msgptr(new std::unique_ptr<Message>(std::move(msg2)));
|
||||
fRecvQueueWrite.async_send(
|
||||
azmq::message(boost::asio::const_buffer(msgptr, sizeof(MessagePtr))),
|
||||
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId
|
||||
// << "): <<<<< Data buffer received, bytes_transferred2="
|
||||
// << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
fRecvQueueWrite.async_send(
|
||||
azmq::message(boost::asio::const_buffer(nullptr, 0)),
|
||||
[&](const boost::system::error_code& ec, size_t /*bytes_transferred2*/) {
|
||||
if (!ec) {
|
||||
// LOG(debug) << "OFI transport (" << fId
|
||||
// << "): <<<<< Data buffer received, bytes_transferred2="
|
||||
// << bytes_transferred2;
|
||||
fRecvSem.async_signal([&](const boost::system::error_code& ec2) {
|
||||
if (!ec2) {
|
||||
// LOG(debug) << "OFI transport (" << fId << "): < Signal fRecvSem";
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this));
|
||||
|
||||
// LOG(debug) << "OFI transport (" << fId << "): LEAVE OnRecvControl";
|
||||
return SendImpl(msgVec, 0, timeout);
|
||||
}
|
||||
|
||||
auto Socket::SendImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
|
||||
{
|
||||
throw SocketError{"Not yet implemented."};
|
||||
// const unsigned int vecSize = msgVec.size();
|
||||
// int elapsed = 0;
|
||||
//
|
||||
// // Sending vector typicaly handles more then one part
|
||||
// if (vecSize > 1)
|
||||
// {
|
||||
// int64_t totalSize = 0;
|
||||
// int nbytes = -1;
|
||||
// bool repeat = false;
|
||||
//
|
||||
// while (true && !fInterrupted)
|
||||
// {
|
||||
// for (unsigned int i = 0; i < vecSize; ++i)
|
||||
// {
|
||||
// nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msgVec[i].get())->GetMessage(),
|
||||
// fSocket,
|
||||
// (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags);
|
||||
// if (nbytes >= 0)
|
||||
// {
|
||||
// static_cast<FairMQMessageSHM*>(msgVec[i].get())->fQueued = true;
|
||||
// size_t size = msgVec[i]->GetSize();
|
||||
//
|
||||
// totalSize += size;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// // according to ZMQ docs, this can only occur for the first part
|
||||
// if (zmq_errno() == EAGAIN)
|
||||
// {
|
||||
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
|
||||
// {
|
||||
// if (timeout)
|
||||
// {
|
||||
// elapsed += fSndTimeout;
|
||||
// if (elapsed >= timeout)
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// repeat = true;
|
||||
// break;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// if (zmq_errno() == ETERM)
|
||||
// {
|
||||
// LOG(info) << "terminating socket " << fId;
|
||||
// return -1;
|
||||
// }
|
||||
// LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
// return nbytes;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (repeat)
|
||||
// {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // store statistics on how many messages have been sent (handle all parts as a single message)
|
||||
// ++fMessagesTx;
|
||||
// fBytesTx += totalSize;
|
||||
// return totalSize;
|
||||
// }
|
||||
//
|
||||
// return -1;
|
||||
// } // If there's only one part, send it as a regular message
|
||||
// else if (vecSize == 1)
|
||||
// {
|
||||
// return Send(msgVec.back(), flags);
|
||||
// }
|
||||
// else // if the vector is empty, something might be wrong
|
||||
// {
|
||||
// LOG(warn) << "Will not send empty vector";
|
||||
// return -1;
|
||||
// }
|
||||
}
|
||||
|
||||
auto Socket::ReceiveImpl(vector<FairMQMessagePtr>& /*msgVec*/, const int /*flags*/, const int /*timeout*/) -> int64_t
|
||||
{
|
||||
throw SocketError{"Not yet implemented."};
|
||||
// int64_t totalSize = 0;
|
||||
// int64_t more = 0;
|
||||
// bool repeat = false;
|
||||
// int elapsed = 0;
|
||||
//
|
||||
// while (true)
|
||||
// {
|
||||
// // Warn if the vector is filled before Receive() and empty it.
|
||||
// // if (msgVec.size() > 0)
|
||||
// // {
|
||||
// // LOG(warn) << "Message vector contains elements before Receive(), they will be deleted!";
|
||||
// // msgVec.clear();
|
||||
// // }
|
||||
//
|
||||
// totalSize = 0;
|
||||
// more = 0;
|
||||
// repeat = false;
|
||||
//
|
||||
// do
|
||||
// {
|
||||
// FairMQMessagePtr part(new FairMQMessageSHM(fManager, GetTransport()));
|
||||
// zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(part.get())->GetMessage();
|
||||
//
|
||||
// int nbytes = zmq_msg_recv(msgPtr, fSocket, flags);
|
||||
// if (nbytes == 0)
|
||||
// {
|
||||
// msgVec.push_back(move(part));
|
||||
// }
|
||||
// else if (nbytes > 0)
|
||||
// {
|
||||
// MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(msgPtr));
|
||||
// size_t size = 0;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fHandle = hdr->fHandle;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fSize = hdr->fSize;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fRegionId = hdr->fRegionId;
|
||||
// static_cast<FairMQMessageSHM*>(part.get())->fHint = hdr->fHint;
|
||||
// size = part->GetSize();
|
||||
//
|
||||
// msgVec.push_back(move(part));
|
||||
//
|
||||
// totalSize += size;
|
||||
// }
|
||||
// else if (zmq_errno() == EAGAIN)
|
||||
// {
|
||||
// if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
|
||||
// {
|
||||
// if (timeout)
|
||||
// {
|
||||
// elapsed += fSndTimeout;
|
||||
// if (elapsed >= timeout)
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// repeat = true;
|
||||
// break;
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// return -2;
|
||||
// }
|
||||
// }
|
||||
// else
|
||||
// {
|
||||
// return nbytes;
|
||||
// }
|
||||
//
|
||||
// size_t more_size = sizeof(more);
|
||||
// zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size);
|
||||
// }
|
||||
// while (more);
|
||||
//
|
||||
// if (repeat)
|
||||
// {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// // store statistics on how many messages have been received (handle all parts as a single message)
|
||||
// ++fMessagesRx;
|
||||
// fBytesRx += totalSize;
|
||||
// return totalSize;
|
||||
// }
|
||||
}
|
||||
|
||||
auto Socket::Close() -> void {}
|
||||
|
@@ -82,12 +82,11 @@ class Socket final : public fair::mq::Socket
|
||||
|
||||
private:
|
||||
Context& fContext;
|
||||
asiofi::allocated_pool_resource fControlMemPool;
|
||||
std::unique_ptr<asiofi::info> fOfiInfo;
|
||||
std::unique_ptr<asiofi::fabric> fOfiFabric;
|
||||
std::unique_ptr<asiofi::domain> fOfiDomain;
|
||||
std::unique_ptr<asiofi::passive_endpoint> fPassiveEndpoint;
|
||||
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint, fControlEndpoint;
|
||||
std::unique_ptr<asiofi::connected_endpoint> fDataEndpoint;
|
||||
std::string fId;
|
||||
std::atomic<unsigned long> fBytesTx;
|
||||
std::atomic<unsigned long> fBytesRx;
|
||||
@@ -97,16 +96,14 @@ class Socket final : public fair::mq::Socket
|
||||
Address fLocalAddr;
|
||||
int fSndTimeout;
|
||||
int fRcvTimeout;
|
||||
azmq::socket fSendQueueWrite, fSendQueueRead;
|
||||
azmq::socket fRecvQueueWrite, fRecvQueueRead;
|
||||
asiofi::semaphore fSendSem, fRecvSem;
|
||||
std::atomic<bool> fNeedOfiMemoryRegistration;
|
||||
std::atomic<bool> fBound;
|
||||
std::atomic<bool> fConnected;
|
||||
|
||||
auto SendQueueReader() -> void;
|
||||
auto OnSend(azmq::message& msg, size_t bytes_transferred) -> void;
|
||||
auto RecvControlQueueReader() -> void;
|
||||
auto OnRecvControl(ofi::unique_ptr<PostBuffer> ctrl) -> void;
|
||||
auto OnReceive() -> void;
|
||||
auto OnSend(MessagePtr& msg) -> void;
|
||||
auto RecvQueueReader() -> void;
|
||||
auto ReceiveImpl(MessagePtr& msg, const int flags, const int timeout) -> int;
|
||||
auto SendImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
|
||||
auto ReceiveImpl(std::vector<MessagePtr>& msgVec, const int flags, const int timeout) -> int64_t;
|
||||
@@ -114,10 +111,6 @@ class Socket final : public fair::mq::Socket
|
||||
// auto WaitForControlPeer() -> void;
|
||||
// auto AnnounceDataAddress() -> void;
|
||||
auto InitOfi(Address addr) -> void;
|
||||
auto BindControlEndpoint() -> void;
|
||||
auto BindDataEndpoint() -> void;
|
||||
enum class Band { Control, Data };
|
||||
auto ConnectEndpoint(std::unique_ptr<asiofi::connected_endpoint>& endpoint, Band type) -> void;
|
||||
// auto ReceiveDataAddressAnnouncement() -> void;
|
||||
}; /* class Socket */
|
||||
|
||||
|
@@ -23,12 +23,15 @@ namespace ofi
|
||||
|
||||
using namespace std;
|
||||
|
||||
TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* /*config*/)
|
||||
TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config)
|
||||
try : FairMQTransportFactory(id)
|
||||
, fContext(*this, *this, 1)
|
||||
{
|
||||
LOG(debug) << "OFI transport: Using AZMQ & "
|
||||
<< "asiofi (" << fContext.GetAsiofiVersion() << ")";
|
||||
if (config) {
|
||||
fContext.SetSizeHint(config->GetValue<size_t>("ofi-size-hint")); // temporary hack to provide expected message size for receive
|
||||
}
|
||||
} catch (ContextError& e) {
|
||||
throw TransportFactoryError{e.what()};
|
||||
}
|
||||
|
@@ -66,6 +66,7 @@ FairMQProgOptions::FairMQProgOptions()
|
||||
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
|
||||
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
|
||||
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
|
||||
("ofi-size-hint", po::value<size_t >()->default_value(2000000), "EXPERIMENTAL: OFI size hint for the allocator.")
|
||||
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
|
||||
("session", po::value<string >()->default_value("default"), "Session name.");
|
||||
|
||||
|
@@ -72,6 +72,7 @@ SINK+=" --id sink1"
|
||||
#SINK+=" --io-threads 2"
|
||||
#SINK+=" --control static"
|
||||
SINK+=" --transport $transport"
|
||||
SINK+=" --ofi-size-hint $msgSize"
|
||||
SINK+=" --severity debug"
|
||||
SINK+=" --multipart false"
|
||||
SINK+=" --max-iterations $maxIterations"
|
||||
|
Reference in New Issue
Block a user