From cdc1ba084c7de0c24df16609ff6917bb32f388f2 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 15 Mar 2019 13:25:56 +0100 Subject: [PATCH] Fix broken pipe errors in tools::execute --- fairmq/tools/Process.cxx | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/fairmq/tools/Process.cxx b/fairmq/tools/Process.cxx index 1d6062d8..6aa03913 100644 --- a/fairmq/tools/Process.cxx +++ b/fairmq/tools/Process.cxx @@ -20,6 +20,7 @@ using namespace std; namespace bp = boost::process; namespace ba = boost::asio; +namespace bs = boost::system; namespace fair { @@ -63,27 +64,29 @@ execute_result execute(const string& cmd, const string& prefix, const string& in ba::deadline_timer timer(ios, boost::posix_time::milliseconds(100)); // child process - bp::child c(cmd, bp::std_out > outputPipe, bp::std_err > errorPipe, bp::std_in < inputPipe, ios); + bp::child c(cmd, bp::std_out > outputPipe, bp::std_err > errorPipe, bp::std_in < inputPipe); // handle std_in with a delay if (input != "") { - timer.async_wait([&](const boost::system::error_code& ec1) { + timer.async_wait([&](const bs::error_code& ec1) { if (!ec1) { - ba::async_write(inputPipe, inputBuffer, [&](const boost::system::error_code& ec2, size_t /* n */) { + ba::async_write(inputPipe, inputBuffer, [&](const bs::error_code& ec2, size_t /* n */) { if (!ec2) { - inputPipe.async_close(); + // inputPipe.async_close(); } else { - out << prefix << "error in boost::asio::async_write: " << ec2.value() << endl; + cout << prefix << "error in boost::asio::async_write: " << ec2.message() << endl; + out << prefix << "error in boost::asio::async_write: " << ec2.message() << endl; } }); } else { - out << prefix << "error in boost::asio::deadline_timer.async_wait: " << ec1.value() << endl; + cout << prefix << "error in boost::asio::deadline_timer.async_wait: " << ec1.message() << endl; + out << prefix << "error in boost::asio::deadline_timer.async_wait: " << ec1.message() << endl; } }); } // handle std_out line by line - function onStdOut = [&](const boost::system::error_code& ec, size_t /* n */) { + function onStdOut = [&](const bs::error_code& ec, size_t /* n */) { if (!ec) { istream is(&outputBuffer); string line; @@ -96,13 +99,18 @@ execute_result execute(const string& cmd, const string& prefix, const string& in ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut); } else { - outputPipe.async_close(); + if (ec == ba::error::eof) { + // outputPipe.async_close(); + } else { + cout << prefix << ec.message() << endl; + out << prefix << ec.message() << endl; + } } }; ba::async_read_until(outputPipe, outputBuffer, delimiter, onStdOut); // handle std_err line by line - function onStdErr = [&](const boost::system::error_code& ec, size_t /* n */) { + function onStdErr = [&](const bs::error_code& ec, size_t /* n */) { if (!ec) { istream is(&errorBuffer); string line; @@ -115,7 +123,12 @@ execute_result execute(const string& cmd, const string& prefix, const string& in ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr); } else { - errorPipe.async_close(); + if (ec == ba::error::eof) { + // errorPipe.async_close(); + } else { + cout << prefix << ec.message() << endl; + out << prefix << ec.message() << endl; + } } }; ba::async_read_until(errorPipe, errorBuffer, delimiter, onStdErr); @@ -124,11 +137,11 @@ execute_result execute(const string& cmd, const string& prefix, const string& in c.wait(); result.exit_code = c.exit_code(); - stringstream exitCode; - exitCode << prefix << " Exit code: " << result.exit_code << "\n"; + stringstream exitCode; + exitCode << prefix << " fair::mq::tools::execute: exit code: " << result.exit_code << "\n"; cout << exitCode.str() << flush; - out << prefix << " Exit code: " << result.exit_code << endl; + out << prefix << " fair::mq::tools::execute: exit code: " << result.exit_code << endl; result.console_out = out.str(); return result;