diff options
author | 2016-05-30 14:05:33 +0000 | |
---|---|---|
committer | 2016-05-31 08:37:24 +0000 | |
commit | 6dd29091ad6b877f932f2dd26166dff4ac1160be (patch) | |
tree | 5652d905dab3758bbcd2b9bfc5f8cb5040728310 /src | |
parent | 4ab4f05a8cbbaa4b7d938d1d305961fe95a5416c (diff) |
Use the age-old "write to a file descriptor in signal handler" idiom to handle SIGINT.
std::mutex and friends are not safe from signal handlers . I originally dismissed this approach because I thought it would be complicated, but it turned out to be much, much simpler than what we had before.
The alternative would be signalfd(), which is Linux-only.
--
MOS_MIGRATED_REVID=123578795
Diffstat (limited to 'src')
-rw-r--r-- | src/main/cpp/blaze.cc | 161 |
1 files changed, 101 insertions, 60 deletions
diff --git a/src/main/cpp/blaze.cc b/src/main/cpp/blaze.cc index f5eaa74291..bc6ba9c0d8 100644 --- a/src/main/cpp/blaze.cc +++ b/src/main/cpp/blaze.cc @@ -262,7 +262,7 @@ class AfUnixBlazeServer : public BlazeServer { class GrpcBlazeServer : public BlazeServer { public: GrpcBlazeServer(); - virtual ~GrpcBlazeServer() {} + virtual ~GrpcBlazeServer(); bool Connect() override; void Disconnect() override; @@ -271,25 +271,24 @@ class GrpcBlazeServer : public BlazeServer { void Cancel() override; private: - enum CancelThreadAction { NOTHING, JOIN, CANCEL }; + enum CancelThreadAction { NOTHING, JOIN, CANCEL, COMMAND_ID_RECEIVED }; std::unique_ptr<command_server::CommandServer::Stub> client_; std::string request_cookie_; std::string response_cookie_; std::string command_id_; - // The mutex is locked only once in the cancel thread (the only place where - // we call .wait()), so it's okay to use a condition variable with a - // recursive mutex. - std::condition_variable_any cancel_thread_signal_; + // protects command_id_ . Although we always set it before making the cancel + // thread do something with it, the mutex is still useful because it provides + // a memory fence. + std::mutex cancel_thread_mutex_; - // protects command_id_ and cancel_thread_action_ Needs to be recursive so - // that there is no deadlock if the main thread receives a SIGINT while the - // mutex is locked. - std::recursive_mutex cancel_thread_mutex_; - CancelThreadAction cancel_thread_action_; + int recv_socket_; // Socket the cancel thread reads actions from + int send_socket_; // Socket the main thread writes actions to void CancelThread(); + void SendAction(CancelThreadAction action); + void SendCancelMessage(); }; @@ -1885,6 +1884,28 @@ static void null_grpc_log_function(gpr_log_func_args *args) { GrpcBlazeServer::GrpcBlazeServer() { gpr_set_log_function(null_grpc_log_function); connected_ = false; + int fd[2]; + if (pipe(fd) < 0) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "pipe()"); + } + recv_socket_ = fd[0]; + send_socket_ = fd[1]; + + if (fcntl(recv_socket_, F_SETFD, FD_CLOEXEC) == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "fcntl(F_SETFD, FD_CLOEXEC) failed"); + } + + if (fcntl(send_socket_, F_SETFD, FD_CLOEXEC) == -1) { + pdie(blaze_exit_code::LOCAL_ENVIRONMENTAL_ERROR, + "fcntl(F_SETFD, FD_CLOEXEC) failed"); + } +} + +GrpcBlazeServer::~GrpcBlazeServer() { + close(send_socket_); + close(recv_socket_); } bool GrpcBlazeServer::Connect() { @@ -1950,17 +1971,26 @@ bool GrpcBlazeServer::Connect() { // // When the user presses Ctrl-C, a SIGINT is delivered to the client, which is // translated into a BlazeServer::Cancel() call. Since it's not a good idea to -// do significant work in signal handlers, all it does is to set a flag and wake -// up the cancellation thread. +// do significant work in signal handlers, all it does is write a byte to an +// unnamed pipe. +// +// This unnamed pipe is used to communicate with the cancel thread. Whenever +// something interesting happens, a byte is written into it, which is read by +// the cancel thread. These commands are available: +// +// - NOP +// - JOIN. The cancel thread needs to be terminated. +// - CANCEL. If the command ID is already available, a cancel request is sent. +// - COMMAND_ID_RECEIVED. The client learned the command ID from the server. +// If there is a pending cancellation request, it is acted upon. // -// That thread in turn issues the CancelRequest RPC to the server if the command -// ID is known. If not, it goes to sleep again until the server tells the client -// the command ID, at which point it wakes up again and delivers the -// cancellation request. +// The only data the cancellation thread shares with the main thread is the +// file descriptor for receiving commands and command_id_, the latter of which +// is protected by a mutex, which mainly serves as a memory fence. // -// In every case, the cancellation thread is joined at the end of the execution -// of the command. The main thread wakes it up just so that it can finish -// (using the JOIN action) +// The cancellation thread is joined at the end of the execution of the command. +// The main thread wakes it up just so that it can finish (using the JOIN +// action) // // It's conceivable that the server is busy and thus it cannot service the // cancellation request. In that case, we simply ignore the failure and the both @@ -1969,38 +1999,59 @@ bool GrpcBlazeServer::Connect() { // delivered to the server) void GrpcBlazeServer::CancelThread() { bool running = true; - std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_); + bool cancel = false; + bool command_id_received = false; while (running) { - cancel_thread_signal_.wait(lock); - switch (cancel_thread_action_) { - case JOIN: + char buf; + int bytes_read = read(recv_socket_, &buf, 1); + if (bytes_read == -1 && errno == EINTR) { + continue; + } else if (bytes_read != 1) { + pdie(blaze_exit_code::INTERNAL_ERROR, + "Cannot communicate with cancel thread"); + } + + switch (buf) { + case CancelThreadAction::NOTHING: + break; + + case CancelThreadAction::JOIN: running = false; - cancel_thread_action_ = NOTHING; break; - case CANCEL: - // If we don't know the command ID yet, we'll be woken up when it - // becomes known. - if (command_id_.size() > 0) { - command_server::CancelRequest request; - request.set_cookie(request_cookie_); - request.set_command_id(command_id_); - grpc::ClientContext context; - context.set_deadline(std::chrono::system_clock::now() + - std::chrono::milliseconds(100)); - command_server::CancelResponse response; - // There isn't a lot we can do if this request fails - client_->Cancel(&context, request, &response); - cancel_thread_action_ = NOTHING; + case CancelThreadAction::COMMAND_ID_RECEIVED: + command_id_received = true; + if (cancel) { + SendCancelMessage(); + cancel = false; } break; - case NOTHING: - break; + case CancelThreadAction::CANCEL: + if (command_id_received) { + SendCancelMessage(); + } else { + cancel = true; + } + break; } } } +void GrpcBlazeServer::SendCancelMessage() { + std::unique_lock<std::mutex> lock(cancel_thread_mutex_); + + command_server::CancelRequest request; + request.set_cookie(request_cookie_); + request.set_command_id(command_id_); + grpc::ClientContext context; + context.set_deadline(std::chrono::system_clock::now() + + std::chrono::milliseconds(100)); + command_server::CancelResponse response; + // There isn't a lot we can do if this request fails + client_->Cancel(&context, request, &response); +} + // This will wait indefinitely until the server shuts down void GrpcBlazeServer::KillRunningServer() { assert(connected_); @@ -2056,11 +2107,6 @@ unsigned int GrpcBlazeServer::Communicate() { // (one during server startup and one emitted by the server) blaze::ReleaseLock(&blaze_lock_); - { - std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_); - cancel_thread_action_ = NOTHING; - } - std::thread cancel_thread(&GrpcBlazeServer::CancelThread, this); bool command_id_set = false; while (reader->Read(&response)) { @@ -2080,20 +2126,14 @@ unsigned int GrpcBlazeServer::Communicate() { } if (!command_id_set && response.command_id().size() > 0) { - std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_); + std::unique_lock<std::mutex> lock(cancel_thread_mutex_); command_id_ = response.command_id(); command_id_set = true; - // Wake up the cancellation thread in case there is a pending cancellation - cancel_thread_signal_.notify_one(); + SendAction(CancelThreadAction::COMMAND_ID_RECEIVED); } } - { - // Wake up the cancellation thread so that it can finish - std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_); - cancel_thread_action_ = JOIN; - cancel_thread_signal_.notify_one(); - } + SendAction(CancelThreadAction::JOIN); cancel_thread.join(); if (!response.finished()) { @@ -2113,13 +2153,14 @@ void GrpcBlazeServer::Disconnect() { connected_ = false; } +void GrpcBlazeServer::SendAction(CancelThreadAction action) { + char msg = action; + write(send_socket_, &msg, 1); // We assume this always works +} + void GrpcBlazeServer::Cancel() { assert(connected_); - - std::unique_lock<std::recursive_mutex> lock(cancel_thread_mutex_); - // Wake up the cancellation thread and tell it to issue its RPC - cancel_thread_action_ = CANCEL; - cancel_thread_signal_.notify_one(); + SendAction(CancelThreadAction::CANCEL); } } // namespace blaze |