Implement bulk callbacks for unmanaged regions

This commit is contained in:
Alexey Rybalchenko
2020-05-15 00:20:49 +02:00
parent a15d59c725
commit d22023bcb5
24 changed files with 243 additions and 83 deletions

View File

@@ -81,7 +81,12 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data,
memcpy(zmq_msg_data(fMsg.get()), data, size);
// call region callback
static_cast<FairMQUnmanagedRegionZMQ*>(region.get())->fCallback(data, size, hint);
auto ptr = static_cast<FairMQUnmanagedRegionZMQ*>(region.get());
if (ptr->fBulkCallback) {
ptr->fBulkCallback({{data, size, hint}});
} else if (ptr->fCallback) {
ptr->fCallback(data, size, hint);
}
// if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0)
// {

View File

@@ -84,7 +84,7 @@ bool FairMQSocketZMQ::Bind(const string& address)
// do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range.
return false;
}
LOG(error) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
return false;
}
@@ -97,7 +97,7 @@ bool FairMQSocketZMQ::Connect(const string& address)
if (zmq_connect(fSocket, address.c_str()) != 0)
{
LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
return false;
}

View File

@@ -94,19 +94,55 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<stri
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(
const size_t size,
FairMQRegionCallback callback,
const string& path /* = "" */,
int flags /* = 0 */)
{
return CreateUnmanagedRegion(size, 0, callback, path, flags);
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(
const size_t size,
FairMQRegionBulkCallback bulkCallback,
const string& path /* = "" */,
int flags /* = 0 */)
{
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(
const size_t size,
const int64_t userFlags,
FairMQRegionCallback callback,
const string& path /* = "" */,
int flags /* = 0 */)
{
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(
const size_t size,
const int64_t userFlags,
FairMQRegionBulkCallback bulkCallback,
const string& path /* = "" */,
int flags /* = 0 */)
{
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(
const size_t size,
const int64_t userFlags,
FairMQRegionCallback callback,
FairMQRegionBulkCallback bulkCallback,
const string& path /* = "" */,
int flags /* = 0 */)
{
unique_ptr<FairMQUnmanagedRegion> ptr = nullptr;
{
lock_guard<mutex> lock(fMtx);
++fRegionCounter;
ptr = unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, path, flags, this));
ptr = unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, bulkCallback, path, flags, this));
auto zPtr = static_cast<FairMQUnmanagedRegionZMQ*>(ptr.get());
fRegionInfos.emplace_back(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
fRegionEvents.emplace(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);

View File

@@ -49,7 +49,10 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0);
void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override;
bool SubscribedToRegionEvents() override;

View File

@@ -10,13 +10,21 @@
#include "FairMQTransportFactoryZMQ.h"
#include "FairMQLogger.h"
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id,
const size_t size,
int64_t userFlags,
FairMQRegionCallback callback,
FairMQRegionBulkCallback bulkCallback,
const std::string& /* path = "" */,
int /* flags = 0 */,
FairMQTransportFactory* factory /* = nullptr */)
: FairMQUnmanagedRegion(factory)
, fId(id)
, fBuffer(malloc(size))
, fSize(size)
, fUserFlags(userFlags)
, fCallback(callback)
, fBulkCallback(bulkCallback)
{}
void* FairMQUnmanagedRegionZMQ::GetData() const

View File

@@ -21,7 +21,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
friend class FairMQMessageZMQ;
public:
FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr);
FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr);
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
@@ -39,6 +39,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
size_t fSize;
int64_t fUserFlags;
FairMQRegionCallback fCallback;
FairMQRegionBulkCallback fBulkCallback;
};
#endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */