Add a hack to set the expected msg size via cmd option

This commit is contained in:
Alexey Rybalchenko 2019-03-06 10:41:41 +01:00
parent e1b1e5e21b
commit a9dfe39bf7
7 changed files with 16 additions and 4 deletions

View File

@ -2,7 +2,7 @@
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
msgSize="1000000" msgSize="2000000"
if [[ $1 =~ ^[0-9]+$ ]]; then if [[ $1 =~ ^[0-9]+$ ]]; then
msgSize=$1 msgSize=$1
@ -19,6 +19,7 @@ xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
BUILDER="fairmq-ex-readout-builder" BUILDER="fairmq-ex-readout-builder"
BUILDER+=" --id builder1" BUILDER+=" --id builder1"
BUILDER+=" --severity debug" BUILDER+=" --severity debug"
BUILDER+=" --ofi-size-hint $msgSize"
BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem" BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem"
BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi" BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
@ -26,5 +27,6 @@ xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER &
SINK="fairmq-ex-readout-sink" SINK="fairmq-ex-readout-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --ofi-size-hint $msgSize"
SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi" SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi"
xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK & xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@ -37,6 +37,7 @@ Context::Context(FairMQTransportFactory& sendFactory,
: fIoWork(fIoContext) : fIoWork(fIoContext)
, fReceiveFactory(receiveFactory) , fReceiveFactory(receiveFactory)
, fSendFactory(sendFactory) , fSendFactory(sendFactory)
, fSizeHint(2000000) // temporary hack to provide expected message size for receive
{ {
InitThreadPool(numberIoThreads); InitThreadPool(numberIoThreads);
} }

View File

@ -72,6 +72,8 @@ class Context
auto Reset() -> void; auto Reset() -> void;
auto MakeReceiveMessage(size_t size) -> MessagePtr; auto MakeReceiveMessage(size_t size) -> MessagePtr;
auto MakeSendMessage(size_t size) -> MessagePtr; auto MakeSendMessage(size_t size) -> MessagePtr;
size_t GetSizeHint() { return fSizeHint; } // temporary hack to provide expected message size for receive
void SetSizeHint(size_t size) { fSizeHint = size; } // temporary hack to provide expected message size for receive
private: private:
boost::asio::io_context fIoContext; boost::asio::io_context fIoContext;
@ -80,6 +82,8 @@ class Context
FairMQTransportFactory& fReceiveFactory; FairMQTransportFactory& fReceiveFactory;
FairMQTransportFactory& fSendFactory; FairMQTransportFactory& fSendFactory;
size_t fSizeHint; // temporary hack to provide expected message size for receive
auto InitThreadPool(int numberIoThreads) -> void; auto InitThreadPool(int numberIoThreads) -> void;
}; /* class Context */ }; /* class Context */

View File

@ -213,7 +213,7 @@ auto Socket::OnSend(MessagePtr& msg) -> void
{ {
// LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend"; // LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend";
auto size = 2000000; auto size = msg->GetSize();
// LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize(); // LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize();
@ -283,7 +283,7 @@ auto Socket::RecvQueueReader() -> void
{ {
fRecvSem.async_wait([&](const boost::system::error_code& ec) { fRecvSem.async_wait([&](const boost::system::error_code& ec) {
if (!ec) { if (!ec) {
auto size = 2000000; static size_t size = fContext.GetSizeHint(); // temporary hack to provide expected message size for receive
auto msg = fContext.MakeReceiveMessage(size); auto msg = fContext.MakeReceiveMessage(size);
boost::asio::mutable_buffer buffer(msg->GetData(), size); boost::asio::mutable_buffer buffer(msg->GetData(), size);

View File

@ -23,12 +23,15 @@ namespace ofi
using namespace std; using namespace std;
TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* /*config*/) TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config)
try : FairMQTransportFactory(id) try : FairMQTransportFactory(id)
, fContext(*this, *this, 1) , fContext(*this, *this, 1)
{ {
LOG(debug) << "OFI transport: Using AZMQ & " LOG(debug) << "OFI transport: Using AZMQ & "
<< "asiofi (" << fContext.GetAsiofiVersion() << ")"; << "asiofi (" << fContext.GetAsiofiVersion() << ")";
if (config) {
fContext.SetSizeHint(config->GetValue<size_t>("ofi-size-hint")); // temporary hack to provide expected message size for receive
}
} catch (ContextError& e) { } catch (ContextError& e) {
throw TransportFactoryError{e.what()}; throw TransportFactoryError{e.what()};
} }

View File

@ -66,6 +66,7 @@ FairMQProgOptions::FairMQProgOptions()
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)") ("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t >()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.") ("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("ofi-size-hint", po::value<size_t >()->default_value(2000000), "EXPERIMENTAL: OFI size hint for the allocator.")
("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).") ("rate", po::value<float >()->default_value(0.), "Rate for conditional run loop (Hz).")
("session", po::value<string >()->default_value("default"), "Session name."); ("session", po::value<string >()->default_value("default"), "Session name.");

View File

@ -72,6 +72,7 @@ SINK+=" --id sink1"
#SINK+=" --io-threads 2" #SINK+=" --io-threads 2"
#SINK+=" --control static" #SINK+=" --control static"
SINK+=" --transport $transport" SINK+=" --transport $transport"
SINK+=" --ofi-size-hint $msgSize"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --multipart false" SINK+=" --multipart false"
SINK+=" --max-iterations $maxIterations" SINK+=" --max-iterations $maxIterations"