diff --git a/examples/readout/fairmq-start-ex-readout.sh.in b/examples/readout/fairmq-start-ex-readout.sh.in index c3518b84..63da0101 100755 --- a/examples/readout/fairmq-start-ex-readout.sh.in +++ b/examples/readout/fairmq-start-ex-readout.sh.in @@ -2,7 +2,7 @@ export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ -msgSize="1000000" +msgSize="2000000" if [[ $1 =~ ^[0-9]+$ ]]; then msgSize=$1 @@ -19,6 +19,7 @@ xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & BUILDER="fairmq-ex-readout-builder" BUILDER+=" --id builder1" BUILDER+=" --severity debug" +BUILDER+=" --ofi-size-hint $msgSize" BUILDER+=" --channel-config name=data1,type=pair,method=connect,address=tcp://127.0.0.1:7777,transport=shmem" BUILDER+=" name=data2,type=pair,method=connect,address=tcp://127.0.0.1:7778,transport=ofi" xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & @@ -26,5 +27,6 @@ xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$BUILDER & SINK="fairmq-ex-readout-sink" SINK+=" --id sink1" SINK+=" --severity debug" +SINK+=" --ofi-size-hint $msgSize" SINK+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:7778,transport=ofi" xterm -geometry 80x23+1000+0 -hold -e @EX_BIN_DIR@/$SINK & diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index d8db08f6..c842daeb 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -37,6 +37,7 @@ Context::Context(FairMQTransportFactory& sendFactory, : fIoWork(fIoContext) , fReceiveFactory(receiveFactory) , fSendFactory(sendFactory) + , fSizeHint(2000000) // temporary hack to provide expected message size for receive { InitThreadPool(numberIoThreads); } diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index 3f3f4ed0..c49df097 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -72,6 +72,8 @@ class Context auto Reset() -> void; auto MakeReceiveMessage(size_t size) -> MessagePtr; auto MakeSendMessage(size_t size) -> MessagePtr; + size_t GetSizeHint() { return fSizeHint; } // temporary hack to provide expected message size for receive + void SetSizeHint(size_t size) { fSizeHint = size; } // temporary hack to provide expected message size for receive private: boost::asio::io_context fIoContext; @@ -80,6 +82,8 @@ class Context FairMQTransportFactory& fReceiveFactory; FairMQTransportFactory& fSendFactory; + size_t fSizeHint; // temporary hack to provide expected message size for receive + auto InitThreadPool(int numberIoThreads) -> void; }; /* class Context */ diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 45ee1ce7..3d7e9041 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -213,7 +213,7 @@ auto Socket::OnSend(MessagePtr& msg) -> void { // LOG(debug) << "OFI transport (" << fId << "): ENTER OnSend"; - auto size = 2000000; + auto size = msg->GetSize(); // LOG(debug) << "OFI transport (" << fId << "): OnSend: data=" << msg->GetData() << ",size=" << msg->GetSize(); @@ -283,7 +283,7 @@ auto Socket::RecvQueueReader() -> void { fRecvSem.async_wait([&](const boost::system::error_code& ec) { if (!ec) { - auto size = 2000000; + static size_t size = fContext.GetSizeHint(); // temporary hack to provide expected message size for receive auto msg = fContext.MakeReceiveMessage(size); boost::asio::mutable_buffer buffer(msg->GetData(), size); diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 0c2124f7..d3b5c4bc 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -23,12 +23,15 @@ namespace ofi using namespace std; -TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* /*config*/) +TransportFactory::TransportFactory(const string& id, const FairMQProgOptions* config) try : FairMQTransportFactory(id) , fContext(*this, *this, 1) { LOG(debug) << "OFI transport: Using AZMQ & " << "asiofi (" << fContext.GetAsiofiVersion() << ")"; + if (config) { + fContext.SetSizeHint(config->GetValue("ofi-size-hint")); // temporary hack to provide expected message size for receive + } } catch (ContextError& e) { throw TransportFactoryError{e.what()}; } diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index f5540f5b..ff17c5fe 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -66,6 +66,7 @@ FairMQProgOptions::FairMQProgOptions() ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2000000000), "Shared memory: size of the shared memory segment (in bytes).") ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") + ("ofi-size-hint", po::value()->default_value(2000000), "EXPERIMENTAL: OFI size hint for the allocator.") ("rate", po::value()->default_value(0.), "Rate for conditional run loop (Hz).") ("session", po::value()->default_value("default"), "Session name."); diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index 86f44e33..ae67a27f 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -72,6 +72,7 @@ SINK+=" --id sink1" #SINK+=" --io-threads 2" #SINK+=" --control static" SINK+=" --transport $transport" +SINK+=" --ofi-size-hint $msgSize" SINK+=" --severity debug" SINK+=" --multipart false" SINK+=" --max-iterations $maxIterations"