diff options
author | 2016-11-18 11:19:02 +0000 | |
---|---|---|
committer | 2016-11-18 13:20:16 +0000 | |
commit | ef5ceef023ee630f7efb4c406be3937713b19b6b (patch) | |
tree | aeaec34329f276cdb21e2c9d047d2c0c68d326ed /src/main | |
parent | 84a3ed95143a14d05d9459b555fd09ad645aa707 (diff) |
Bazel client: create a wrapper around Unix pipes
This allows implementing pipe-handling in a
platform-specific way. Windows also supports pipes
but through its own API.
--
MOS_MIGRATED_REVID=139564316
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/cpp/blaze.cc | 36 | ||||
-rw-r--r-- | src/main/cpp/util/file.h | 12 | ||||
-rw-r--r-- | src/main/cpp/util/file_platform.h | 4 | ||||
-rw-r--r-- | src/main/cpp/util/file_posix.cc | 47 |
4 files changed, 76 insertions, 23 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index 8f3151aa54..96335651ef 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc @@ -236,8 +236,10 @@ class GrpcBlazeServer : public BlazeServer { std::mutex cancel_thread_mutex_; int connect_timeout_secs_; - int recv_socket_; // Socket the cancel thread reads actions from - int send_socket_; // Socket the main thread writes actions to + + // Pipe that the main thread sends actions to and the cancel thread receieves + // actions from. + blaze_util::IPipe* _pipe; void CancelThread(); void SendAction(CancelThreadAction action); @@ -1404,28 +1406,15 @@ GrpcBlazeServer::GrpcBlazeServer(int connect_timeout_secs) { gpr_set_log_function(null_grpc_log_function); - int fd[2]; - if (pipe(fd) < 0) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "pipe()"); - } - recv_socket_ = fd[0]; - send_socket_ = fd[1]; - - if (fcntl(recv_socket_, F_SETFD, FD_CLOEXEC) == -1) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "fcntl(F_SETFD, FD_CLOEXEC) failed"); - } - - if (fcntl(send_socket_, F_SETFD, FD_CLOEXEC) == -1) { - pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, - "fcntl(F_SETFD, FD_CLOEXEC) failed"); + _pipe = blaze_util::CreatePipe(); + if (_pipe == NULL) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "Couldn't create pipe"); } } GrpcBlazeServer::~GrpcBlazeServer() { - close(send_socket_); - close(recv_socket_); + delete _pipe; + _pipe = NULL; } bool GrpcBlazeServer::Connect() { @@ -1528,8 +1517,9 @@ void GrpcBlazeServer::CancelThread() { bool command_id_received = false; while (running) { char buf; - int bytes_read = read(recv_socket_, &buf, 1); - if (bytes_read == -1 && errno == EINTR) { + + int bytes_read = _pipe->Receive(&buf, 1); + if (bytes_read < 0 && errno == EINTR) { continue; } else if (bytes_read != 1) { pdie(blaze_exit_code::INTERNAL_ERROR, @@ -1699,7 +1689,7 @@ void GrpcBlazeServer::Disconnect() { void GrpcBlazeServer::SendAction(CancelThreadAction action) { char msg = action; - if (write(send_socket_, &msg, 1) <= 0) { + if (!_pipe->Send(&msg, 1)) { sigprintf("\nCould not interrupt server (cannot write to client pipe)\n\n"); } } diff --git a/src/main/cpp/util/file.h b/src/main/cpp/util/file.h index dfa08ad629..6118eefb0b 100644 --- a/src/main/cpp/util/file.h +++ b/src/main/cpp/util/file.h @@ -19,6 +19,18 @@ namespace blaze_util { +class IPipe { + public: + virtual ~IPipe() {} + + // Sends `size` bytes from `buffer` through the pipe. + virtual bool Send(void *buffer, size_t size) = 0; + + // Receives at most `size` bytes into `buffer` from the pipe. + // Returns the number of bytes received; sets `errno` upon error. + virtual int Receive(void *buffer, size_t size) = 0; +}; + // Returns the part of the path before the final "/". If there is a single // leading "/" in the path, the result will be the leading "/". If there is // no "/" in the path, the result is the empty prefix of the input (i.e., ""). diff --git a/src/main/cpp/util/file_platform.h b/src/main/cpp/util/file_platform.h index a807c0dcb0..7682e212ef 100644 --- a/src/main/cpp/util/file_platform.h +++ b/src/main/cpp/util/file_platform.h @@ -21,6 +21,10 @@ namespace blaze_util { +class IPipe; + +IPipe* CreatePipe(); + // Checks each element of the PATH variable for executable. If none is found, "" // is returned. Otherwise, the full path to executable is returned. Can die if // looking up PATH fails. diff --git a/src/main/cpp/util/file_posix.cc b/src/main/cpp/util/file_posix.cc index b6c37e19cc..5bb8772d70 100644 --- a/src/main/cpp/util/file_posix.cc +++ b/src/main/cpp/util/file_posix.cc @@ -33,6 +33,53 @@ namespace blaze_util { using std::pair; using std::string; +class PosixPipe : public IPipe { + public: + PosixPipe(int recv_socket, int send_socket) + : _recv_socket(recv_socket), _send_socket(send_socket) {} + + PosixPipe() = delete; + + virtual ~PosixPipe() { + close(_recv_socket); + close(_send_socket); + } + + // Sends `size` bytes from `buffer` through the pipe. + bool Send(void* buffer, size_t size) override { + return write(_send_socket, buffer, size) == size; + } + + // Receives at most `size` bytes into `buffer` from the pipe. + // Returns the number of bytes received; sets `errno` upon error. + int Receive(void* buffer, size_t size) override { + return read(_recv_socket, buffer, size); + } + + private: + int _recv_socket; + int _send_socket; +}; + +IPipe* CreatePipe() { + int fd[2]; + if (pipe(fd) < 0) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "pipe()"); + } + + if (fcntl(fd[0], F_SETFD, FD_CLOEXEC) == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "fcntl(F_SETFD, FD_CLOEXEC) failed"); + } + + if (fcntl(fd[1], F_SETFD, FD_CLOEXEC) == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "fcntl(F_SETFD, FD_CLOEXEC) failed"); + } + + return new PosixPipe(fd[0], fd[1]); +} + string Which(const string &executable) { char *path_cstr = getenv("PATH"); if (path_cstr == NULL || path_cstr[0] == '\0') { |