aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Lukacs Berki <lberki@google.com>2016-05-30 14:05:33 +0000
committerGravatar Dmitry Lomov <dslomov@google.com>2016-05-31 08:37:24 +0000
commit6dd29091ad6b877f932f2dd26166dff4ac1160be (patch)
tree5652d905dab3758bbcd2b9bfc5f8cb5040728310 /src
parent4ab4f05a8cbbaa4b7d938d1d305961fe95a5416c (diff)
Use the age-old "write to a file descriptor in signal handler" idiom to handle SIGINT.
std::mutex and friends are not safe from signal handlers . I originally dismissed this approach because I thought it would be complicated, but it turned out to be much, much simpler than what we had before. The alternative would be signalfd(), which is Linux-only. -- MOS_MIGRATED_REVID=123578795
Diffstat (limited to 'src')
-rw-r--r--src/main/cpp/blaze.cc161
1 files changed, 101 insertions, 60 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc
index f5eaa74291..bc6ba9c0d8 100644
--- a/src/main/cpp/blaze.cc
+++ b/src/main/cpp/blaze.cc
@@ -262,7 +262,7 @@ class AfUnixBlazeServer : public BlazeServer {
class GrpcBlazeServer : public BlazeServer {
public:
GrpcBlazeServer();
- virtual ~GrpcBlazeServer() {}
+ virtual ~GrpcBlazeServer();
bool Connect() override;
void Disconnect() override;
@@ -271,25 +271,24 @@ class GrpcBlazeServer : public BlazeServer {
void Cancel() override;
private:
- enum CancelThreadAction { NOTHING, JOIN, CANCEL };
+ enum CancelThreadAction { NOTHING, JOIN, CANCEL, COMMAND_ID_RECEIVED };
std::unique_ptr<command_server::CommandServer::Stub> client_;
std::string request_cookie_;
std::string response_cookie_;
std::string command_id_;
- // The mutex is locked only once in the cancel thread (the only place where
- // we call .wait()), so it's okay to use a condition variable with a
- // recursive mutex.
- std::condition_variable_any cancel_thread_signal_;
+ // protects command_id_ . Although we always set it before making the cancel
+ // thread do something with it, the mutex is still useful because it provides
+ // a memory fence.
+ std::mutex cancel_thread_mutex_;
- // protects command_id_ and cancel_thread_action_ Needs to be recursive so
- // that there is no deadlock if the main thread receives a SIGINT while the
- // mutex is locked.
- std::recursive_mutex cancel_thread_mutex_;
- CancelThreadAction cancel_thread_action_;
+ int recv_socket_; // Socket the cancel thread reads actions from
+ int send_socket_; // Socket the main thread writes actions to
void CancelThread();
+ void SendAction(CancelThreadAction action);
+ void SendCancelMessage();
};
@@ -1885,6 +1884,28 @@ static void null_grpc_log_function(gpr_log_func_args *args) {
GrpcBlazeServer::GrpcBlazeServer() {
gpr_set_log_function(null_grpc_log_function);
connected_ = false;
+ int fd[2];
+ if (pipe(fd) < 0) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "pipe()");
+ }
+ recv_socket_ = fd[0];
+ send_socket_ = fd[1];
+
+ if (fcntl(recv_socket_, F_SETFD, FD_CLOEXEC) == -1) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "fcntl(F_SETFD, FD_CLOEXEC) failed");
+ }
+
+ if (fcntl(send_socket_, F_SETFD, FD_CLOEXEC) == -1) {
+ pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR,
+ "fcntl(F_SETFD, FD_CLOEXEC) failed");
+ }
+}
+
+GrpcBlazeServer::~GrpcBlazeServer() {
+ close(send_socket_);
+ close(recv_socket_);
}
bool GrpcBlazeServer::Connect() {
@@ -1950,17 +1971,26 @@ bool GrpcBlazeServer::Connect() {
//
// When the user presses Ctrl-C, a SIGINT is delivered to the client, which is
// translated into a BlazeServer::Cancel() call. Since it's not a good idea to
-// do significant work in signal handlers, all it does is to set a flag and wake
-// up the cancellation thread.
+// do significant work in signal handlers, all it does is write a byte to an
+// unnamed pipe.
+//
+// This unnamed pipe is used to communicate with the cancel thread. Whenever
+// something interesting happens, a byte is written into it, which is read by
+// the cancel thread. These commands are available:
+//
+// - NOP
+// - JOIN. The cancel thread needs to be terminated.
+// - CANCEL. If the command ID is already available, a cancel request is sent.
+// - COMMAND_ID_RECEIVED. The client learned the command ID from the server.
+// If there is a pending cancellation request, it is acted upon.
//
-// That thread in turn issues the CancelRequest RPC to the server if the command
-// ID is known. If not, it goes to sleep again until the server tells the client
-// the command ID, at which point it wakes up again and delivers the
-// cancellation request.
+// The only data the cancellation thread shares with the main thread is the
+// file descriptor for receiving commands and command_id_, the latter of which
+// is protected by a mutex, which mainly serves as a memory fence.
//
-// In every case, the cancellation thread is joined at the end of the execution
-// of the command. The main thread wakes it up just so that it can finish
-// (using the JOIN action)
+// The cancellation thread is joined at the end of the execution of the command.
+// The main thread wakes it up just so that it can finish (using the JOIN
+// action)
//
// It's conceivable that the server is busy and thus it cannot service the
// cancellation request. In that case, we simply ignore the failure and the both
@@ -1969,38 +1999,59 @@ bool GrpcBlazeServer::Connect() {
// delivered to the server)
void GrpcBlazeServer::CancelThread() {
bool running = true;
- std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_);
+ bool cancel = false;
+ bool command_id_received = false;
while (running) {
- cancel_thread_signal_.wait(lock);
- switch (cancel_thread_action_) {
- case JOIN:
+ char buf;
+ int bytes_read = read(recv_socket_, &buf, 1);
+ if (bytes_read == -1 && errno == EINTR) {
+ continue;
+ } else if (bytes_read != 1) {
+ pdie(blaze_exit_code::INTERNAL_ERROR,
+ "Cannot communicate with cancel thread");
+ }
+
+ switch (buf) {
+ case CancelThreadAction::NOTHING:
+ break;
+
+ case CancelThreadAction::JOIN:
running = false;
- cancel_thread_action_ = NOTHING;
break;
- case CANCEL:
- // If we don't know the command ID yet, we'll be woken up when it
- // becomes known.
- if (command_id_.size() > 0) {
- command_server::CancelRequest request;
- request.set_cookie(request_cookie_);
- request.set_command_id(command_id_);
- grpc::ClientContext context;
- context.set_deadline(std::chrono::system_clock::now() +
- std::chrono::milliseconds(100));
- command_server::CancelResponse response;
- // There isn't a lot we can do if this request fails
- client_->Cancel(&context, request, &response);
- cancel_thread_action_ = NOTHING;
+ case CancelThreadAction::COMMAND_ID_RECEIVED:
+ command_id_received = true;
+ if (cancel) {
+ SendCancelMessage();
+ cancel = false;
}
break;
- case NOTHING:
- break;
+ case CancelThreadAction::CANCEL:
+ if (command_id_received) {
+ SendCancelMessage();
+ } else {
+ cancel = true;
+ }
+ break;
}
}
}
+void GrpcBlazeServer::SendCancelMessage() {
+ std::unique_lock<std::mutex> lock(cancel_thread_mutex_);
+
+ command_server::CancelRequest request;
+ request.set_cookie(request_cookie_);
+ request.set_command_id(command_id_);
+ grpc::ClientContext context;
+ context.set_deadline(std::chrono::system_clock::now() +
+ std::chrono::milliseconds(100));
+ command_server::CancelResponse response;
+ // There isn't a lot we can do if this request fails
+ client_->Cancel(&context, request, &response);
+}
+
// This will wait indefinitely until the server shuts down
void GrpcBlazeServer::KillRunningServer() {
assert(connected_);
@@ -2056,11 +2107,6 @@ unsigned int GrpcBlazeServer::Communicate() {
// (one during server startup and one emitted by the server)
blaze::ReleaseLock(&blaze_lock_);
- {
- std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_);
- cancel_thread_action_ = NOTHING;
- }
-
std::thread cancel_thread(&GrpcBlazeServer::CancelThread, this);
bool command_id_set = false;
while (reader->Read(&response)) {
@@ -2080,20 +2126,14 @@ unsigned int GrpcBlazeServer::Communicate() {
}
if (!command_id_set && response.command_id().size() > 0) {
- std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_);
+ std::unique_lock<std::mutex> lock(cancel_thread_mutex_);
command_id_ = response.command_id();
command_id_set = true;
- // Wake up the cancellation thread in case there is a pending cancellation
- cancel_thread_signal_.notify_one();
+ SendAction(CancelThreadAction::COMMAND_ID_RECEIVED);
}
}
- {
- // Wake up the cancellation thread so that it can finish
- std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_);
- cancel_thread_action_ = JOIN;
- cancel_thread_signal_.notify_one();
- }
+ SendAction(CancelThreadAction::JOIN);
cancel_thread.join();
if (!response.finished()) {
@@ -2113,13 +2153,14 @@ void GrpcBlazeServer::Disconnect() {
connected_ = false;
}
+void GrpcBlazeServer::SendAction(CancelThreadAction action) {
+ char msg = action;
+ write(send_socket_, &msg, 1); // We assume this always works
+}
+
void GrpcBlazeServer::Cancel() {
assert(connected_);
-
- std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_);
- // Wake up the cancellation thread and tell it to issue its RPC
- cancel_thread_action_ = CANCEL;
- cancel_thread_signal_.notify_one();
+ SendAction(CancelThreadAction::CANCEL);
}
} // namespace blaze