mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Fix after rebase
This commit is contained in:
parent
b394feca18
commit
60f1f1000f
|
@ -75,8 +75,10 @@ auto Message::Rebuild() -> void
|
||||||
if (fFreeFunction) {
|
if (fFreeFunction) {
|
||||||
fFreeFunction(fData, fHint);
|
fFreeFunction(fData, fHint);
|
||||||
} else {
|
} else {
|
||||||
|
if (fData) {
|
||||||
fPmr->deallocate(fData, fSize);
|
fPmr->deallocate(fData, fSize);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
fData = nullptr;
|
fData = nullptr;
|
||||||
fInitialSize = 0;
|
fInitialSize = 0;
|
||||||
fSize = 0;
|
fSize = 0;
|
||||||
|
@ -89,8 +91,10 @@ auto Message::Rebuild(const size_t size) -> void
|
||||||
if (fFreeFunction) {
|
if (fFreeFunction) {
|
||||||
fFreeFunction(fData, fHint);
|
fFreeFunction(fData, fHint);
|
||||||
} else {
|
} else {
|
||||||
|
if (fData) {
|
||||||
fPmr->deallocate(fData, fSize);
|
fPmr->deallocate(fData, fSize);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (size) {
|
if (size) {
|
||||||
fData = fPmr->allocate(size);
|
fData = fPmr->allocate(size);
|
||||||
assert(fData);
|
assert(fData);
|
||||||
|
@ -108,8 +112,10 @@ auto Message::Rebuild(void* /*data*/, const size_t size, fairmq_free_fn* ffn, vo
|
||||||
if (fFreeFunction) {
|
if (fFreeFunction) {
|
||||||
fFreeFunction(fData, fHint);
|
fFreeFunction(fData, fHint);
|
||||||
} else {
|
} else {
|
||||||
|
if (fData) {
|
||||||
fPmr->deallocate(fData, fSize);
|
fPmr->deallocate(fData, fSize);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (size) {
|
if (size) {
|
||||||
fData = fPmr->allocate(size);
|
fData = fPmr->allocate(size);
|
||||||
assert(fData);
|
assert(fData);
|
||||||
|
@ -154,9 +160,11 @@ Message::~Message()
|
||||||
if (fFreeFunction) {
|
if (fFreeFunction) {
|
||||||
fFreeFunction(fData, fHint);
|
fFreeFunction(fData, fHint);
|
||||||
} else {
|
} else {
|
||||||
|
if (fData) {
|
||||||
fPmr->deallocate(fData, fSize);
|
fPmr->deallocate(fData, fSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} /* namespace ofi */
|
} /* namespace ofi */
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
|
|
|
@ -17,14 +17,14 @@
|
||||||
#include <boost/asio/buffer.hpp>
|
#include <boost/asio/buffer.hpp>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <rdma/fabric.h>
|
|
||||||
#include <rdma/fi_endpoint.h>
|
|
||||||
#include <rdma/fi_cm.h>
|
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
|
#include <mutex>
|
||||||
|
#include <condition_variable>
|
||||||
|
|
||||||
namespace fair
|
namespace fair
|
||||||
{
|
{
|
||||||
namespace mq
|
namespace mq
|
||||||
|
@ -98,8 +98,6 @@ try {
|
||||||
fLocalDataAddr = addr;
|
fLocalDataAddr = addr;
|
||||||
BindDataEndpoint();
|
BindDataEndpoint();
|
||||||
|
|
||||||
AnnounceDataAddress();
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
catch (const SilentSocketError& e)
|
catch (const SilentSocketError& e)
|
||||||
|
@ -143,17 +141,33 @@ auto Socket::BindDataEndpoint() -> void
|
||||||
assert(!fPassiveDataEndpoint);
|
assert(!fPassiveDataEndpoint);
|
||||||
assert(!fDataEndpoint);
|
assert(!fDataEndpoint);
|
||||||
|
|
||||||
|
std::mutex m;
|
||||||
|
std::condition_variable cv;
|
||||||
|
bool completed(false);
|
||||||
|
|
||||||
fPassiveDataEndpoint = fContext.MakeOfiPassiveEndpoint(fLocalDataAddr);
|
fPassiveDataEndpoint = fContext.MakeOfiPassiveEndpoint(fLocalDataAddr);
|
||||||
fPassiveDataEndpoint->listen([&](fid_t /*handle*/, asiofi::info&& info) {
|
fPassiveDataEndpoint->listen([&](fid_t /*handle*/, asiofi::info&& info) {
|
||||||
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
|
LOG(debug) << "OFI transport (" << fId << "): data band connection request received. Accepting ...";
|
||||||
fDataEndpoint = fContext.MakeOfiConnectedEndpoint(info);
|
fDataEndpoint = fContext.MakeOfiConnectedEndpoint(info);
|
||||||
fDataEndpoint->enable();
|
fDataEndpoint->enable();
|
||||||
fDataEndpoint->accept([&]() {
|
fDataEndpoint->accept([&]() {
|
||||||
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(m);
|
||||||
|
completed = true;
|
||||||
|
}
|
||||||
|
cv.notify_one();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalDataAddr;
|
LOG(debug) << "OFI transport (" << fId << "): data band bound to " << fLocalDataAddr;
|
||||||
|
|
||||||
|
AnnounceDataAddress();
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lk(m);
|
||||||
|
cv.wait(lk, [&](){ return completed; });
|
||||||
|
}
|
||||||
|
LOG(debug) << "OFI transport (" << fId << "): data band connection accepted.";
|
||||||
}
|
}
|
||||||
|
|
||||||
auto Socket::ConnectControlSocket(Context::Address address) -> void
|
auto Socket::ConnectControlSocket(Context::Address address) -> void
|
||||||
|
@ -295,69 +309,31 @@ auto Socket::ReceiveControlMessage() -> CtrlMsgPtr<ControlMessage>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// auto Socket::WaitForControlPeer() -> void
|
|
||||||
// {
|
|
||||||
// assert(fWaitingForControlPeer);
|
|
||||||
//
|
|
||||||
// First frame in message contains event number and value
|
|
||||||
// zmq_msg_t msg;
|
|
||||||
// zmq_msg_init(&msg);
|
|
||||||
// if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1)
|
|
||||||
// throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno)));
|
|
||||||
//
|
|
||||||
// uint8_t* data = (uint8_t*) zmq_msg_data(&msg);
|
|
||||||
// uint16_t event = *(uint16_t*)(data);
|
|
||||||
// int value = *(uint32_t *)(data + 2);
|
|
||||||
//
|
|
||||||
// Second frame in message contains event address
|
|
||||||
// zmq_msg_init(&msg);
|
|
||||||
// if (zmq_msg_recv(&msg, fMonitorSocket, 0) == -1)
|
|
||||||
// throw SocketError(tools::ToString("Failed to get monitor event, reason: ", zmq_strerror(errno)));
|
|
||||||
//
|
|
||||||
// if (event == ZMQ_EVENT_ACCEPTED) {
|
|
||||||
// string localAddress = string(static_cast<char*>(zmq_msg_data(&msg)), zmq_msg_size(&msg));
|
|
||||||
// sockaddr_in remoteAddr;
|
|
||||||
// socklen_t addrSize = sizeof(sockaddr_in);
|
|
||||||
// int ret = getpeername(value, (sockaddr*)&remoteAddr, &addrSize);
|
|
||||||
// if (ret != 0)
|
|
||||||
// throw SocketError(tools::ToString("Failed retrieving remote address, reason: ", strerror(errno)));
|
|
||||||
// string remoteIp(inet_ntoa(remoteAddr.sin_addr));
|
|
||||||
// int remotePort = ntohs(remoteAddr.sin_port);
|
|
||||||
// LOG(debug) << "Accepted control peer connection from " << remoteIp << ":" << remotePort;
|
|
||||||
// } else if (event == ZMQ_EVENT_CONNECTED) {
|
|
||||||
// LOG(debug) << "Connected successfully to control peer";
|
|
||||||
// } else {
|
|
||||||
// LOG(debug) << "Unknown monitor event received: " << event << ". Ignoring.";
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// fWaitingForControlPeer = false;
|
|
||||||
// }
|
|
||||||
|
|
||||||
auto Socket::Send(MessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); }
|
auto Socket::Send(MessagePtr& msg, const int timeout) -> int { return SendImpl(msg, 0, timeout); }
|
||||||
auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); }
|
auto Socket::Receive(MessagePtr& msg, const int timeout) -> int { return ReceiveImpl(msg, 0, timeout); }
|
||||||
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); }
|
auto Socket::Send(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return SendImpl(msgVec, 0, timeout); }
|
||||||
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); }
|
auto Socket::Receive(std::vector<MessagePtr>& msgVec, const int timeout) -> int64_t { return ReceiveImpl(msgVec, 0, timeout); }
|
||||||
|
|
||||||
auto Socket::TrySend(MessagePtr& msg) -> int { return SendImpl(msg, ZMQ_DONTWAIT, 0); }
|
|
||||||
auto Socket::TryReceive(MessagePtr& msg) -> int { return ReceiveImpl(msg, ZMQ_DONTWAIT, 0); }
|
|
||||||
auto Socket::TrySend(std::vector<MessagePtr>& msgVec) -> int64_t { return SendImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
|
||||||
auto Socket::TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t { return ReceiveImpl(msgVec, ZMQ_DONTWAIT, 0); }
|
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <condition_variable>
|
|
||||||
|
|
||||||
auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
|
auto Socket::SendImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
|
||||||
try {
|
try {
|
||||||
auto size = msg->GetSize();
|
auto size = msg->GetSize();
|
||||||
LOG(debug) << "OFI transport (" << fId << "): ENTER SendImpl";
|
// LOG(debug) << "OFI transport (" << fId << "): ENTER SendImpl";
|
||||||
|
|
||||||
// Create and send control message
|
// Create and send control message
|
||||||
auto pb = MakeControlMessage<PostBuffer>(&fCtrlMemPool);
|
auto pb = MakeControlMessage<PostBuffer>(&fCtrlMemPool);
|
||||||
pb->size = size;
|
pb->size = size;
|
||||||
SendControlMessage(StaticUniquePtrUpcast<ControlMessage>(std::move(pb)));
|
SendControlMessage(StaticUniquePtrUpcast<ControlMessage>(std::move(pb)));
|
||||||
LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Control message sent, size=" << size;
|
// LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Control message sent, size=" << size;
|
||||||
|
// LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: msg->GetData()=" << msg->GetData() << ",msg->GetSize()=" << msg->GetSize();
|
||||||
|
|
||||||
if (size) {
|
if (size) {
|
||||||
|
// Receive ack
|
||||||
|
auto ack = StaticUniquePtrDowncast<PostBuffer>(ReceiveControlMessage());
|
||||||
|
assert(ack.get());
|
||||||
|
auto size_ack = ack->size;
|
||||||
|
assert(size == size_ack);
|
||||||
|
// LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Control ack received, size_ack=" << size_ack;
|
||||||
|
|
||||||
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
boost::asio::mutable_buffer buffer(msg->GetData(), size);
|
||||||
asiofi::memory_region mr(fContext.GetDomain(), buffer, asiofi::mr::access::send);
|
asiofi::memory_region mr(fContext.GetDomain(), buffer, asiofi::mr::access::send);
|
||||||
|
|
||||||
|
@ -374,7 +350,7 @@ try {
|
||||||
completed = true;
|
completed = true;
|
||||||
}
|
}
|
||||||
cv.notify_one();
|
cv.notify_one();
|
||||||
LOG(debug) << "OFI transport (" << fId << "): > SendImpl: Data buffer sent";
|
// LOG(debug) << "OFI transport (" << fId << "): > SendImpl: Data buffer sent";
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -382,14 +358,14 @@ try {
|
||||||
std::unique_lock<std::mutex> lk(m);
|
std::unique_lock<std::mutex> lk(m);
|
||||||
cv.wait(lk, [&](){ return completed; });
|
cv.wait(lk, [&](){ return completed; });
|
||||||
}
|
}
|
||||||
LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Data send buffer posted";
|
// LOG(debug) << "OFI transport (" << fId << "): >>>>> SendImpl: Data send buffer posted";
|
||||||
}
|
}
|
||||||
|
|
||||||
msg.reset(nullptr);
|
msg.reset(nullptr);
|
||||||
fBytesTx += size;
|
fBytesTx += size;
|
||||||
fMessagesTx++;
|
fMessagesTx++;
|
||||||
|
|
||||||
LOG(debug) << "OFI transport (" << fId << "): LEAVE SendImpl";
|
// LOG(debug) << "OFI transport (" << fId << "): LEAVE SendImpl";
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
catch (const SilentSocketError& e)
|
catch (const SilentSocketError& e)
|
||||||
|
@ -404,12 +380,12 @@ catch (const std::exception& e)
|
||||||
|
|
||||||
auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
|
auto Socket::ReceiveImpl(FairMQMessagePtr& msg, const int /*flags*/, const int /*timeout*/) -> int
|
||||||
try {
|
try {
|
||||||
LOG(debug) << "OFI transport (" << fId << "): ENTER ReceiveImpl";
|
// LOG(debug) << "OFI transport (" << fId << "): ENTER ReceiveImpl";
|
||||||
// Receive and process control message
|
// Receive and process control message
|
||||||
auto pb = StaticUniquePtrDowncast<PostBuffer>(ReceiveControlMessage());
|
auto pb = StaticUniquePtrDowncast<PostBuffer>(ReceiveControlMessage());
|
||||||
assert(pb.get());
|
assert(pb.get());
|
||||||
auto size = pb->size;
|
auto size = pb->size;
|
||||||
LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Control message received, size=" << size;
|
// LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Control message received, size=" << size;
|
||||||
|
|
||||||
// Receive data
|
// Receive data
|
||||||
if (size) {
|
if (size) {
|
||||||
|
@ -429,19 +405,24 @@ try {
|
||||||
cv.notify_one();
|
cv.notify_one();
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Data buffer posted";
|
// LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Data buffer posted";
|
||||||
|
|
||||||
|
auto ack = MakeControlMessage<PostBuffer>(&fCtrlMemPool);
|
||||||
|
ack->size = size;
|
||||||
|
SendControlMessage(StaticUniquePtrUpcast<ControlMessage>(std::move(ack)));
|
||||||
|
// LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Control Ack sent";
|
||||||
|
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lk(m);
|
std::unique_lock<std::mutex> lk(m);
|
||||||
cv.wait(lk, [&](){ return completed; });
|
cv.wait(lk, [&](){ return completed; });
|
||||||
}
|
}
|
||||||
LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Data received";
|
// LOG(debug) << "OFI transport (" << fId << "): <<<<< ReceiveImpl: Data received";
|
||||||
}
|
}
|
||||||
|
|
||||||
fBytesRx += size;
|
fBytesRx += size;
|
||||||
fMessagesRx++;
|
fMessagesRx++;
|
||||||
|
|
||||||
LOG(debug) << "OFI transport (" << fId << "): EXIT ReceiveImpl";
|
// LOG(debug) << "OFI transport (" << fId << "): EXIT ReceiveImpl";
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
catch (const SilentSocketError& e)
|
catch (const SilentSocketError& e)
|
||||||
|
@ -658,6 +639,14 @@ int Socket::GetLinger() const
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Socket::SetLinger(const int value)
|
||||||
|
{
|
||||||
|
if (zmq_setsockopt(fControlSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) {
|
||||||
|
throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void Socket::SetSndBufSize(const int value)
|
void Socket::SetSndBufSize(const int value)
|
||||||
{
|
{
|
||||||
if (zmq_setsockopt(fControlSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
|
if (zmq_setsockopt(fControlSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) {
|
||||||
|
|
|
@ -51,11 +51,6 @@ class Socket final : public fair::mq::Socket
|
||||||
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
auto Send(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
||||||
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
auto Receive(std::vector<MessagePtr>& msgVec, int timeout = 0) -> int64_t override;
|
||||||
|
|
||||||
auto TrySend(MessagePtr& msg) -> int override;
|
|
||||||
auto TryReceive(MessagePtr& msg) -> int override;
|
|
||||||
auto TrySend(std::vector<MessagePtr>& msgVec) -> int64_t override;
|
|
||||||
auto TryReceive(std::vector<MessagePtr>& msgVec) -> int64_t override;
|
|
||||||
|
|
||||||
auto GetSocket() const -> void* { return fControlSocket; }
|
auto GetSocket() const -> void* { return fControlSocket; }
|
||||||
|
|
||||||
void SetLinger(const int value) override;
|
void SetLinger(const int value) override;
|
||||||
|
|
|
@ -34,12 +34,12 @@ catch (ContextError& e)
|
||||||
throw TransportFactoryError{e.what()};
|
throw TransportFactoryError{e.what()};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateMessage() const -> MessagePtr
|
auto TransportFactory::CreateMessage() -> MessagePtr
|
||||||
{
|
{
|
||||||
return MessagePtr{new Message(&fMemoryResource)};
|
return MessagePtr{new Message(&fMemoryResource)};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateMessage(const size_t size) const -> MessagePtr
|
auto TransportFactory::CreateMessage(const size_t size) -> MessagePtr
|
||||||
{
|
{
|
||||||
return MessagePtr{new Message(&fMemoryResource, size)};
|
return MessagePtr{new Message(&fMemoryResource, size)};
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ auto TransportFactory::CreateMessage(const size_t size) const -> MessagePtr
|
||||||
auto TransportFactory::CreateMessage(void* data,
|
auto TransportFactory::CreateMessage(void* data,
|
||||||
const size_t size,
|
const size_t size,
|
||||||
fairmq_free_fn* ffn,
|
fairmq_free_fn* ffn,
|
||||||
void* hint) const -> MessagePtr
|
void* hint) -> MessagePtr
|
||||||
{
|
{
|
||||||
return MessagePtr{new Message(&fMemoryResource, data, size, ffn, hint)};
|
return MessagePtr{new Message(&fMemoryResource, data, size, ffn, hint)};
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ auto TransportFactory::CreateMessage(void* data,
|
||||||
auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region,
|
auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region,
|
||||||
void* data,
|
void* data,
|
||||||
const size_t size,
|
const size_t size,
|
||||||
void* hint) const -> MessagePtr
|
void* hint) -> MessagePtr
|
||||||
{
|
{
|
||||||
return MessagePtr{new Message(&fMemoryResource, region, data, size, hint)};
|
return MessagePtr{new Message(&fMemoryResource, region, data, size, hint)};
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ auto TransportFactory::CreatePoller(const vector<FairMQChannel>& channels) const
|
||||||
// return PollerPtr{new Poller(channels)};
|
// return PollerPtr{new Poller(channels)};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreatePoller(const vector<const FairMQChannel*>& channels) const -> PollerPtr
|
auto TransportFactory::CreatePoller(const vector<FairMQChannel*>& channels) const -> PollerPtr
|
||||||
{
|
{
|
||||||
throw runtime_error{"Not yet implemented (Poller)."};
|
throw runtime_error{"Not yet implemented (Poller)."};
|
||||||
// return PollerPtr{new Poller(channels)};
|
// return PollerPtr{new Poller(channels)};
|
||||||
|
|
|
@ -35,15 +35,15 @@ class TransportFactory final : public FairMQTransportFactory
|
||||||
TransportFactory(const TransportFactory&) = delete;
|
TransportFactory(const TransportFactory&) = delete;
|
||||||
TransportFactory operator=(const TransportFactory&) = delete;
|
TransportFactory operator=(const TransportFactory&) = delete;
|
||||||
|
|
||||||
auto CreateMessage() const -> MessagePtr override;
|
auto CreateMessage() -> MessagePtr override;
|
||||||
auto CreateMessage(const std::size_t size) const -> MessagePtr override;
|
auto CreateMessage(const std::size_t size) -> MessagePtr override;
|
||||||
auto CreateMessage(void* data, const std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const -> MessagePtr override;
|
auto CreateMessage(void* data, const std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) -> MessagePtr override;
|
||||||
auto CreateMessage(UnmanagedRegionPtr& region, void* data, const std::size_t size, void* hint = nullptr) const -> MessagePtr override;
|
auto CreateMessage(UnmanagedRegionPtr& region, void* data, const std::size_t size, void* hint = nullptr) -> MessagePtr override;
|
||||||
|
|
||||||
auto CreateSocket(const std::string& type, const std::string& name) -> SocketPtr override;
|
auto CreateSocket(const std::string& type, const std::string& name) -> SocketPtr override;
|
||||||
|
|
||||||
auto CreatePoller(const std::vector<FairMQChannel>& channels) const -> PollerPtr override;
|
auto CreatePoller(const std::vector<FairMQChannel>& channels) const -> PollerPtr override;
|
||||||
auto CreatePoller(const std::vector<const FairMQChannel*>& channels) const -> PollerPtr override;
|
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
|
||||||
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
|
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
|
||||||
|
|
||||||
auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const -> UnmanagedRegionPtr override;
|
auto CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr) const -> UnmanagedRegionPtr override;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user