aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main
diff options
context:
space:
mode:
authorGravatar Laszlo Csomor <laszlocsomor@google.com>2016-11-18 11:19:02 +0000
committerGravatar Yun Peng <pcloudy@google.com>2016-11-18 13:20:16 +0000
commitef5ceef023ee630f7efb4c406be3937713b19b6b (patch)
treeaeaec34329f276cdb21e2c9d047d2c0c68d326ed /src/main
parent84a3ed95143a14d05d9459b555fd09ad645aa707 (diff)
Bazel client: create a wrapper around Unix pipes
This allows implementing pipe-handling in a platform-specific way. Windows also supports pipes but through its own API. -- MOS_MIGRATED_REVID=139564316
Diffstat (limited to 'src/main')
-rw-r--r--src/main/cpp/blaze.cc36
-rw-r--r--src/main/cpp/util/file.h12
-rw-r--r--src/main/cpp/util/file_platform.h4
-rw-r--r--src/main/cpp/util/file_posix.cc47
4 files changed, 76 insertions, 23 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index 8f3151aa54..96335651ef 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -236,8 +236,10 @@ class GrpcBlazeServer : public BlazeServer {
std::mutex cancel_thread_mutex_;
int connect_timeout_secs_;
- int recv_socket_; // Socket the cancel thread reads actions from
- int send_socket_; // Socket the main thread writes actions to
+
+ // Pipe that the main thread sends actions to and the cancel thread receieves
+ // actions from.
+ blaze_util::IPipe* _pipe;
void CancelThread();
void SendAction(CancelThreadAction action);
@@ -1404,28 +1406,15 @@ GrpcBlazeServer::GrpcBlazeServer(int connect_timeout_secs) {
gpr_set_log_function(null_grpc_log_function);
- int fd[2];
- if (pipe(fd) < 0) {
- pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
- "pipe()");
- }
- recv_socket_ = fd[0];
- send_socket_ = fd[1];
-
- if (fcntl(recv_socket_, F_SETFD, FD_CLOEXEC) == -1) {
- pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
- "fcntl(F_SETFD, FD_CLOEXEC) failed");
- }
-
- if (fcntl(send_socket_, F_SETFD, FD_CLOEXEC) == -1) {
- pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
- "fcntl(F_SETFD, FD_CLOEXEC) failed");
+ _pipe = blaze_util::CreatePipe();
+ if (_pipe == NULL) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "Couldn't create pipe");
}
}
GrpcBlazeServer::~GrpcBlazeServer() {
- close(send_socket_);
- close(recv_socket_);
+ delete _pipe;
+ _pipe = NULL;
}
bool GrpcBlazeServer::Connect() {
@@ -1528,8 +1517,9 @@ void GrpcBlazeServer::CancelThread() {
bool command_id_received = false;
while (running) {
char buf;
- int bytes_read = read(recv_socket_, &buf, 1);
- if (bytes_read == -1 && errno == EINTR) {
+
+ int bytes_read = _pipe->Receive(&buf, 1);
+ if (bytes_read < 0 && errno == EINTR) {
continue;
} else if (bytes_read != 1) {
pdie(blaze_exit_code::INTERNAL_ERROR,
@@ -1699,7 +1689,7 @@ void GrpcBlazeServer::Disconnect() {
void GrpcBlazeServer::SendAction(CancelThreadAction action) {
char msg = action;
- if (write(send_socket_, &msg, 1) <= 0) {
+ if (!_pipe->Send(&msg, 1)) {
sigprintf("\nCould not interrupt server (cannot write to client pipe)\n\n");
}
}
diff --git a/src/main/cpp/util/file.h b/src/main/cpp/util/file.h
index dfa08ad629..6118eefb0b 100644
--- a/src/main/cpp/util/file.h
+++ b/src/main/cpp/util/file.h
@@ -19,6 +19,18 @@
namespace blaze_util {
+class IPipe {
+ public:
+ virtual ~IPipe() {}
+
+ // Sends `size` bytes from `buffer` through the pipe.
+ virtual bool Send(void *buffer, size_t size) = 0;
+
+ // Receives at most `size` bytes into `buffer` from the pipe.
+ // Returns the number of bytes received; sets `errno` upon error.
+ virtual int Receive(void *buffer, size_t size) = 0;
+};
+
// Returns the part of the path before the final "/". If there is a single
// leading "/" in the path, the result will be the leading "/". If there is
// no "/" in the path, the result is the empty prefix of the input (i.e., "").
diff --git a/src/main/cpp/util/file_platform.h b/src/main/cpp/util/file_platform.h
index a807c0dcb0..7682e212ef 100644
--- a/src/main/cpp/util/file_platform.h
+++ b/src/main/cpp/util/file_platform.h
@@ -21,6 +21,10 @@
namespace blaze_util {
+class IPipe;
+
+IPipe* CreatePipe();
+
// Checks each element of the PATH variable for executable. If none is found, ""
// is returned. Otherwise, the full path to executable is returned. Can die if
// looking up PATH fails.
diff --git a/src/main/cpp/util/file_posix.cc b/src/main/cpp/util/file_posix.cc
index b6c37e19cc..5bb8772d70 100644
--- a/src/main/cpp/util/file_posix.cc
+++ b/src/main/cpp/util/file_posix.cc
@@ -33,6 +33,53 @@ namespace blaze_util {
using std::pair;
using std::string;
+class PosixPipe : public IPipe {
+ public:
+ PosixPipe(int recv_socket, int send_socket)
+ : _recv_socket(recv_socket), _send_socket(send_socket) {}
+
+ PosixPipe() = delete;
+
+ virtual ~PosixPipe() {
+ close(_recv_socket);
+ close(_send_socket);
+ }
+
+ // Sends `size` bytes from `buffer` through the pipe.
+ bool Send(void* buffer, size_t size) override {
+ return write(_send_socket, buffer, size) == size;
+ }
+
+ // Receives at most `size` bytes into `buffer` from the pipe.
+ // Returns the number of bytes received; sets `errno` upon error.
+ int Receive(void* buffer, size_t size) override {
+ return read(_recv_socket, buffer, size);
+ }
+
+ private:
+ int _recv_socket;
+ int _send_socket;
+};
+
+IPipe* CreatePipe() {
+ int fd[2];
+ if (pipe(fd) < 0) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, "pipe()");
+ }
+
+ if (fcntl(fd[0], F_SETFD, FD_CLOEXEC) == -1) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "fcntl(F_SETFD, FD_CLOEXEC) failed");
+ }
+
+ if (fcntl(fd[1], F_SETFD, FD_CLOEXEC) == -1) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "fcntl(F_SETFD, FD_CLOEXEC) failed");
+ }
+
+ return new PosixPipe(fd[0], fd[1]);
+}
+
string Which(const string &executable) {
char *path_cstr = getenv("PATH");
if (path_cstr == NULL || path_cstr[0] == '\0') {