diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 132e9a9e..9811eeb6 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -17,12 +17,14 @@ #include // for size_t +typedef void (fairmq_free_fn) (void *data, void *hint); + class FairMQMessage { public: virtual void Rebuild() = 0; virtual void Rebuild(size_t size) = 0; - virtual void Rebuild(void* data, size_t size) = 0; + virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; virtual void* GetMessage() = 0; virtual void* GetData() = 0; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 3bbcd9de..daeed2d5 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -29,7 +29,7 @@ class FairMQTransportFactory public: virtual FairMQMessage* CreateMessage() = 0; virtual FairMQMessage* CreateMessage(size_t size) = 0; - virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0; + virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL) = 0; virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads) = 0; virtual FairMQPoller* CreatePoller(const vector& inputs) = 0; diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index e244bf79..93983111 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -37,7 +37,7 @@ FairMQMessageNN::FairMQMessageNN(size_t size) fReceiving = false; } -FairMQMessageNN::FairMQMessageNN(void* data, size_t size) +FairMQMessageNN::FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { fMessage = nn_allocmsg(size, 0); if (!fMessage) @@ -47,6 +47,15 @@ FairMQMessageNN::FairMQMessageNN(void* data, size_t size) memcpy(fMessage, data, size); fSize = size; fReceiving = false; + + if(ffn) + { + ffn(data, hint); + } + else + { + if(data) free(data); + } } void FairMQMessageNN::Rebuild() @@ -69,7 +78,7 @@ void FairMQMessageNN::Rebuild(size_t size) fReceiving = false; } -void FairMQMessageNN::Rebuild(void* data, size_t size) +void FairMQMessageNN::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { Clear(); fMessage = nn_allocmsg(size, 0); @@ -80,6 +89,15 @@ void FairMQMessageNN::Rebuild(void* data, size_t size) memcpy(fMessage, data, size); fSize = size; fReceiving = false; + + if(ffn) + { + ffn(data, hint); + } + else + { + if(data) free(data); + } } void* FairMQMessageNN::GetMessage() diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index b99f191b..b59b01b1 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -24,11 +24,11 @@ class FairMQMessageNN : public FairMQMessage public: FairMQMessageNN(); FairMQMessageNN(size_t size); - FairMQMessageNN(void* data, size_t size); + FairMQMessageNN(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual void Rebuild(); virtual void Rebuild(size_t size); - virtual void Rebuild(void* data, size_t site); + virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 53cee8bf..3e539f9e 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -29,9 +29,9 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(size_t size) return new FairMQMessageNN(size); } -FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size) +FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { - return new FairMQMessageNN(data, size); + return new FairMQMessageNN(data, size, ffn, hint); } FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads) diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index e1721a35..eed2eb5b 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -29,7 +29,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(size_t size); - virtual FairMQMessage* CreateMessage(void* data, size_t size); + virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQPoller* CreatePoller(const vector& inputs); diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 8f00a50b..147926ba 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -36,9 +36,9 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size) } } -FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size) +FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { - int rc = zmq_msg_init_data(&fMessage, data, size, &CleanUp, NULL); // TODO: expose the cleanup function part in the interface? + int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint); if (rc != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); @@ -65,10 +65,10 @@ void FairMQMessageZMQ::Rebuild(size_t size) } } -void FairMQMessageZMQ::Rebuild(void* data, size_t size) +void FairMQMessageZMQ::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { CloseMessage(); - int rc = zmq_msg_init_data(&fMessage, data, size, &CleanUp, NULL); // TODO: expose the cleanup function part in the interface? + int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint); if (rc != 0) { LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index d9848f84..5749349d 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -26,11 +26,11 @@ class FairMQMessageZMQ : public FairMQMessage public: FairMQMessageZMQ(); FairMQMessageZMQ(size_t size); - FairMQMessageZMQ(void* data, size_t size); + FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); virtual void Rebuild(); virtual void Rebuild(size_t size); - virtual void Rebuild(void* data, size_t size); + virtual void Rebuild(void* data, size_t size, fairmq_free_fn *ffn = &CleanUp, void* hint = NULL); virtual void* GetMessage(); virtual void* GetData(); diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 82da4eae..0458394f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -33,9 +33,9 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(size_t size) return new FairMQMessageZMQ(size); } -FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size) +FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size, fairmq_free_fn *ffn, void* hint) { - return new FairMQMessageZMQ(data, size); + return new FairMQMessageZMQ(data, size, ffn, hint); } FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads) diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index a7744805..052bcb56 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -30,7 +30,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(); virtual FairMQMessage* CreateMessage(size_t size); - virtual FairMQMessage* CreateMessage(void* data, size_t size); + virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL); virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads); virtual FairMQPoller* CreatePoller(const vector& inputs);