mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Remove container adoption code
This commit is contained in:
parent
1a07137dda
commit
34286ef75e
|
@ -18,15 +18,10 @@
|
||||||
namespace fair {
|
namespace fair {
|
||||||
namespace mq {
|
namespace mq {
|
||||||
|
|
||||||
using ByteSpectatorAllocator = SpectatorAllocator<fair::mq::byte>;
|
|
||||||
using BytePmrAllocator = boost::container::pmr::polymorphic_allocator<fair::mq::byte>;
|
using BytePmrAllocator = boost::container::pmr::polymorphic_allocator<fair::mq::byte>;
|
||||||
|
|
||||||
//_________________________________________________________________________________________________
|
//_________________________________________________________________________________________________
|
||||||
// return the message associated with the container or nullptr if it does not
|
// return the message associated with the container or throw if it is not possible
|
||||||
// make sense (e.g. when
|
|
||||||
// we are just watching an existing message or when the container is not using
|
|
||||||
// FairMQMemoryResource
|
|
||||||
// as backend).
|
|
||||||
template<typename ContainerT>
|
template<typename ContainerT>
|
||||||
// typename std::enable_if<
|
// typename std::enable_if<
|
||||||
// std::is_base_of<
|
// std::is_base_of<
|
||||||
|
@ -61,57 +56,5 @@ FairMQMessagePtr getMessage(ContainerT &&container_, FairMQMemoryResource *targe
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
//_________________________________________________________________________________________________
|
|
||||||
/// Return a vector of const ElemT, no wonership transfer.
|
|
||||||
/// Resource must be kept alive throughout the lifetime of the
|
|
||||||
/// container and associated message.
|
|
||||||
template<typename ElemT>
|
|
||||||
std::vector<const ElemT, boost::container::pmr::polymorphic_allocator<const ElemT>> getVector(
|
|
||||||
size_t nelem,
|
|
||||||
SpectatorMessageResource *resource)
|
|
||||||
{
|
|
||||||
return std::vector<const ElemT, SpectatorAllocator<const ElemT>>(
|
|
||||||
nelem, SpectatorAllocator<ElemT>(resource));
|
|
||||||
};
|
|
||||||
|
|
||||||
//_________________________________________________________________________________________________
|
|
||||||
/// Return a vector of const ElemT, takes ownership of the message
|
|
||||||
template<typename ElemT>
|
|
||||||
std::vector<const ElemT, OwningMessageSpectatorAllocator<const ElemT>>
|
|
||||||
getVector(size_t nelem, FairMQMessagePtr message)
|
|
||||||
{
|
|
||||||
return std::vector<const ElemT, OwningMessageSpectatorAllocator<const ElemT>>(
|
|
||||||
nelem,
|
|
||||||
OwningMessageSpectatorAllocator<const ElemT>(
|
|
||||||
MessageResource{std::move(message)}));
|
|
||||||
};
|
|
||||||
|
|
||||||
//_________________________________________________________________________________________________
|
|
||||||
// TODO: this is C++14, converting it down to C++11 is too much work atm
|
|
||||||
// This returns a unique_ptr of const vector, does not allow modifications at
|
|
||||||
// the cost of pointer
|
|
||||||
// semantics for access.
|
|
||||||
// use auto or decltype to catch the return type.
|
|
||||||
// template<typename ElemT>
|
|
||||||
// auto getVector(size_t nelem, FairMQMessage* message)
|
|
||||||
//{
|
|
||||||
// using DataType = std::vector<ElemT, ByteSpectatorAllocator>;
|
|
||||||
//
|
|
||||||
// struct doubleDeleter
|
|
||||||
// {
|
|
||||||
// // kids: don't do this at home! (but here it's OK)
|
|
||||||
// // this stateful deleter allows a single unique_ptr to manage 2
|
|
||||||
// resources at the same time.
|
|
||||||
// std::unique_ptr<SpectatorMessageResource> extra;
|
|
||||||
// void operator()(const DataType* ptr) { delete ptr; }
|
|
||||||
// };
|
|
||||||
//
|
|
||||||
// using OutputType = std::unique_ptr<const DataType, doubleDeleter>;
|
|
||||||
//
|
|
||||||
// auto resource = std::make_unique<SpectatorMessageResource>(message);
|
|
||||||
// auto output = new DataType(nelem, ByteSpectatorAllocator{resource.get()});
|
|
||||||
// return OutputType(output, doubleDeleter{std::move(resource)});
|
|
||||||
//}
|
|
||||||
|
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
} /* namespace fair */
|
} /* namespace fair */
|
||||||
|
|
|
@ -20,11 +20,3 @@ void *fair::mq::ChannelResource::do_allocate(std::size_t bytes, std::size_t /*al
|
||||||
return setMessage(factory->CreateMessage(bytes));
|
return setMessage(factory->CreateMessage(bytes));
|
||||||
};
|
};
|
||||||
|
|
||||||
fair::mq::MessageResource::MessageResource(FairMQMessagePtr message)
|
|
||||||
: mUpstream{message->GetTransport()->GetMemoryResource()}
|
|
||||||
, mMessageSize{message->GetSize()}
|
|
||||||
, mMessageData{mUpstream ? mUpstream->setMessage(std::move(message))
|
|
||||||
: throw std::runtime_error(
|
|
||||||
"MessageResource::MessageResource message has no upstream resource set")}
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
|
@ -109,230 +109,6 @@ class ChannelResource : public FairMQMemoryResource
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
/// This memory resource only watches, does not allocate/deallocate anything.
|
|
||||||
/// Must be kept alive together with the message as only a pointer to the message is taken.
|
|
||||||
/// In combination with SpectatorAllocator it allows an stl container to "adopt"
|
|
||||||
/// the contents of the message buffer as it's own.
|
|
||||||
class SpectatorMessageResource : public FairMQMemoryResource
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
SpectatorMessageResource() = default;
|
|
||||||
|
|
||||||
SpectatorMessageResource(const FairMQMessage *_message)
|
|
||||||
: message(_message)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQMessagePtr getMessage(void * /*p*/) override { return nullptr; }
|
|
||||||
FairMQTransportFactory *getTransportFactory() noexcept override { return nullptr; }
|
|
||||||
size_t getNumberOfMessages() const noexcept override { return 0; }
|
|
||||||
void *setMessage(FairMQMessagePtr) override { return nullptr; }
|
|
||||||
|
|
||||||
protected:
|
|
||||||
const FairMQMessage *message;
|
|
||||||
|
|
||||||
void *do_allocate(std::size_t bytes, std::size_t /*alignment*/) override
|
|
||||||
{
|
|
||||||
if (message) {
|
|
||||||
if (bytes > message->GetSize()) {
|
|
||||||
throw std::bad_alloc();
|
|
||||||
}
|
|
||||||
return message->GetData();
|
|
||||||
} else {
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void do_deallocate(void * /*p*/, std::size_t /*bytes*/, std::size_t /*alignment*/) override
|
|
||||||
{
|
|
||||||
message = nullptr;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool do_is_equal(const memory_resource &other) const noexcept override
|
|
||||||
{
|
|
||||||
const SpectatorMessageResource *that =
|
|
||||||
dynamic_cast<const SpectatorMessageResource *>(&other);
|
|
||||||
if (!that) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (that->message == message) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/// This memory resource only watches, does not allocate/deallocate anything.
|
|
||||||
/// Ownership of the message is taken. Meant to be used for transparent data
|
|
||||||
/// adoption in containers.
|
|
||||||
/// In combination with SpectatorAllocator it allows an stl container to "adopt"
|
|
||||||
/// the contents of the message buffer as it's own.
|
|
||||||
class MessageResource : public FairMQMemoryResource
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
MessageResource() noexcept = delete;
|
|
||||||
MessageResource(const MessageResource &) noexcept = default;
|
|
||||||
MessageResource(MessageResource &&) noexcept = default;
|
|
||||||
MessageResource &operator=(const MessageResource &) = default;
|
|
||||||
MessageResource &operator=(MessageResource &&) = default;
|
|
||||||
|
|
||||||
MessageResource(FairMQMessagePtr message);
|
|
||||||
|
|
||||||
FairMQMessagePtr getMessage(void *p) override { return mUpstream->getMessage(p); }
|
|
||||||
void *setMessage(FairMQMessagePtr message) override
|
|
||||||
{
|
|
||||||
return mUpstream->setMessage(std::move(message));
|
|
||||||
}
|
|
||||||
|
|
||||||
FairMQTransportFactory *getTransportFactory() noexcept override { return nullptr; }
|
|
||||||
size_t getNumberOfMessages() const noexcept override { return mMessageData ? 1 : 0; }
|
|
||||||
|
|
||||||
protected:
|
|
||||||
FairMQMemoryResource *mUpstream{nullptr};
|
|
||||||
size_t mMessageSize{0};
|
|
||||||
void *mMessageData{nullptr};
|
|
||||||
|
|
||||||
void *do_allocate(std::size_t bytes, std::size_t /*alignment*/) override
|
|
||||||
{
|
|
||||||
if (bytes > mMessageSize) {
|
|
||||||
throw std::bad_alloc();
|
|
||||||
}
|
|
||||||
return mMessageData;
|
|
||||||
}
|
|
||||||
|
|
||||||
void do_deallocate(void * /*p*/, std::size_t /*bytes*/, std::size_t /*alignment*/) override
|
|
||||||
{
|
|
||||||
getMessage(mMessageData); // let the message die.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool do_is_equal(const memory_resource & /*other*/) const noexcept override
|
|
||||||
{
|
|
||||||
// since this uniquely owns the message it can never be equal to anybody
|
|
||||||
// else
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Special allocator that skips default construction/destruction,
|
|
||||||
/// allows an stl container to adopt the contents of a buffer as it's own.
|
|
||||||
/// No ownership of the message or data is taken.
|
|
||||||
///
|
|
||||||
/// This in general (as in STL) is a bad idea, but here it is safe to inherit
|
|
||||||
/// from an allocator since
|
|
||||||
/// we have no additional data and only override some methods so we don't get
|
|
||||||
/// into slicing and other
|
|
||||||
/// problems.
|
|
||||||
template<typename T>
|
|
||||||
class SpectatorAllocator : public boost::container::pmr::polymorphic_allocator<T>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
using boost::container::pmr::polymorphic_allocator<T>::polymorphic_allocator;
|
|
||||||
|
|
||||||
// skip default construction of empty elements
|
|
||||||
// this is important for two reasons: one: it allows us to adopt an existing
|
|
||||||
// buffer (e.g.
|
|
||||||
// incoming message) and quickly construct large vectors while skipping the
|
|
||||||
// element
|
|
||||||
// initialization.
|
|
||||||
template<class U>
|
|
||||||
void construct(U *)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
// dont try to call destructors, makes no sense since resource is managed
|
|
||||||
// externally AND allowed
|
|
||||||
// types cannot have side effects
|
|
||||||
template<typename U>
|
|
||||||
void destroy(U *)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
T *allocate(size_t size)
|
|
||||||
{
|
|
||||||
return reinterpret_cast<T *>(this->resource()->allocate(size * sizeof(T), 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
void deallocate(T *ptr, size_t size)
|
|
||||||
{
|
|
||||||
this->resource()->deallocate(const_cast<typename std::remove_cv<T>::type *>(ptr), size);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/// Special allocator that skips default construction/destruction,
|
|
||||||
/// allows an stl container to adopt the contents of a buffer as it's own.
|
|
||||||
/// Ownership of the message is taken.
|
|
||||||
/// This allocator has a pmr-like interface, but keeps the unique
|
|
||||||
/// MessageResource as internal state,
|
|
||||||
/// allowing full resource (associated message) management internally without
|
|
||||||
/// any global state.
|
|
||||||
template<typename T>
|
|
||||||
class OwningMessageSpectatorAllocator
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
using value_type = T;
|
|
||||||
|
|
||||||
MessageResource mResource;
|
|
||||||
|
|
||||||
OwningMessageSpectatorAllocator() noexcept = default;
|
|
||||||
OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator &) noexcept = default;
|
|
||||||
OwningMessageSpectatorAllocator(OwningMessageSpectatorAllocator &&) noexcept = default;
|
|
||||||
|
|
||||||
OwningMessageSpectatorAllocator(MessageResource &&resource) noexcept
|
|
||||||
: mResource{resource}
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
template<class U>
|
|
||||||
OwningMessageSpectatorAllocator(const OwningMessageSpectatorAllocator<U> &other) noexcept
|
|
||||||
: mResource(other.mResource)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
OwningMessageSpectatorAllocator &operator=(const OwningMessageSpectatorAllocator &other)
|
|
||||||
{
|
|
||||||
mResource = other.mResource;
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
OwningMessageSpectatorAllocator select_on_container_copy_construction() const
|
|
||||||
{
|
|
||||||
return OwningMessageSpectatorAllocator();
|
|
||||||
}
|
|
||||||
|
|
||||||
boost::container::pmr::memory_resource *resource() { return &mResource; }
|
|
||||||
|
|
||||||
// skip default construction of empty elements
|
|
||||||
// this is important for two reasons: one: it allows us to adopt an existing
|
|
||||||
// buffer (e.g.
|
|
||||||
// incoming message) and quickly construct large vectors while skipping the
|
|
||||||
// element
|
|
||||||
// initialization.
|
|
||||||
template<class U>
|
|
||||||
void construct(U *)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
// dont try to call destructors, makes no sense since resource is managed
|
|
||||||
// externally AND allowed
|
|
||||||
// types cannot have side effects
|
|
||||||
template<typename U>
|
|
||||||
void destroy(U *)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
T *allocate(size_t size)
|
|
||||||
{
|
|
||||||
return reinterpret_cast<T *>(mResource.allocate(size * sizeof(T), 0));
|
|
||||||
}
|
|
||||||
|
|
||||||
void deallocate(T *ptr, size_t size)
|
|
||||||
{
|
|
||||||
mResource.deallocate(const_cast<typename std::remove_cv<T>::type *>(ptr), size);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
} /* namespace mq */
|
} /* namespace mq */
|
||||||
} /* namespace fair */
|
} /* namespace fair */
|
||||||
|
|
||||||
|
|
|
@ -92,22 +92,6 @@ TEST(MemoryResources, allocator_test)
|
||||||
}
|
}
|
||||||
EXPECT_TRUE(testData::nallocated == 0);
|
EXPECT_TRUE(testData::nallocated == 0);
|
||||||
EXPECT_TRUE(testData::nallocations == testData::ndeallocations);
|
EXPECT_TRUE(testData::nallocations == testData::ndeallocations);
|
||||||
|
|
||||||
testData::nallocations = 0;
|
|
||||||
testData::ndeallocations = 0;
|
|
||||||
{
|
|
||||||
std::vector<testData, SpectatorAllocator<testData>> v(
|
|
||||||
SpectatorAllocator<testData>{allocZMQ});
|
|
||||||
v.reserve(3);
|
|
||||||
EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 1);
|
|
||||||
v.emplace_back(1);
|
|
||||||
v.emplace_back(2);
|
|
||||||
v.emplace_back(3);
|
|
||||||
EXPECT_TRUE(testData::nallocated == 3);
|
|
||||||
}
|
|
||||||
EXPECT_TRUE(testData::nallocated
|
|
||||||
== 3); // ByteSpectatorAllocator does not call dtors so nallocated remains at 3;
|
|
||||||
EXPECT_TRUE(allocZMQ->getNumberOfMessages() == 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MemoryResources, getMessage_test)
|
TEST(MemoryResources, getMessage_test)
|
||||||
|
@ -150,30 +134,6 @@ TEST(MemoryResources, getMessage_test)
|
||||||
EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData));
|
EXPECT_TRUE(message->GetSize() == 3 * sizeof(testData));
|
||||||
messageArray = static_cast<int*>(message->GetData());
|
messageArray = static_cast<int*>(message->GetData());
|
||||||
EXPECT_TRUE(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6);
|
EXPECT_TRUE(messageArray[0] == 4 && messageArray[1] == 5 && messageArray[2] == 6);
|
||||||
|
|
||||||
{
|
|
||||||
std::vector<testData, SpectatorAllocator<testData>> v(
|
|
||||||
SpectatorAllocator<testData>{allocSHM});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(MemoryResources, adoptVector_test)
|
|
||||||
{
|
|
||||||
// Create a bogus message
|
|
||||||
auto message = factoryZMQ->CreateMessage(3 * sizeof(testData));
|
|
||||||
auto messageAddr = message.get();
|
|
||||||
testData tmpBuf[3] = {3, 2, 1};
|
|
||||||
std::memcpy(message->GetData(), tmpBuf, 3 * sizeof(testData));
|
|
||||||
|
|
||||||
auto adoptedOwner =
|
|
||||||
getVector<testData>(3, std::move(message));
|
|
||||||
EXPECT_TRUE(adoptedOwner[0].i == 3);
|
|
||||||
EXPECT_TRUE(adoptedOwner[1].i == 2);
|
|
||||||
EXPECT_TRUE(adoptedOwner[2].i == 1);
|
|
||||||
|
|
||||||
auto reclaimedMessage = getMessage(std::move(adoptedOwner));
|
|
||||||
EXPECT_TRUE(reclaimedMessage.get() == messageAddr);
|
|
||||||
EXPECT_TRUE(adoptedOwner.size() == 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
Loading…
Reference in New Issue
Block a user