From d595fb65574007527eab0c03bc90de4e6e3b2ac8 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 16 May 2016 14:53:13 -0700 Subject: Handle signals properly when dropping GVL --- src/ruby/ext/grpc/rb_call.c | 8 +++- src/ruby/ext/grpc/rb_completion_queue.c | 6 ++- src/ruby/ext/grpc/rb_completion_queue.h | 6 ++- src/ruby/ext/grpc/rb_grpc.c | 2 - src/ruby/ext/grpc/rb_server.c | 14 ++++++- src/ruby/ext/grpc/rb_signal.c | 70 --------------------------------- src/ruby/ext/grpc/rb_signal.h | 39 ------------------ src/ruby/lib/grpc.rb | 1 - src/ruby/lib/grpc/signals.rb | 69 -------------------------------- 9 files changed, 27 insertions(+), 188 deletions(-) delete mode 100644 src/ruby/ext/grpc/rb_signal.c delete mode 100644 src/ruby/ext/grpc/rb_signal.h delete mode 100644 src/ruby/lib/grpc/signals.rb diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 1b06273af9..b43ad08eba 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -722,6 +722,10 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { return result; } +static void run_batch_unblock_func(void *call) { + grpc_call_cancel((grpc_call*)call, NULL); +} + /* call-seq: cq = CompletionQueue.new ops = { @@ -772,7 +776,9 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, grpc_call_error_detail_of(err), err); return Qnil; } - ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout, + run_batch_unblock_func, + (void*)call); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 4bb615f8be..4f671807eb 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -142,7 +142,9 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) { /* Blocks until the next event for given tag is available, and returns the * event. */ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, - VALUE timeout) { + VALUE timeout, + rb_unblock_function_t *ubf, + void *unblock_arg) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); TypedData_Get_Struct(self, grpc_completion_queue, @@ -159,7 +161,7 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, } next_call.event.type = GRPC_QUEUE_TIMEOUT; rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, - (void *)&next_call, NULL, NULL); + (void *)&next_call, ubf, unblock_arg); return next_call.event; } diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 6cc4e96589..4bd5739869 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -46,8 +46,10 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); * * This avoids having code that holds the GIL repeated at multiple sites. */ -grpc_event grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag, - VALUE timeout); +grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, + VALUE timeout, + rb_unblock_function_t *ubf, + void *unblock_arg); /* Initializes the CompletionQueue class. */ void Init_grpc_completion_queue(); diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 5277148fc9..06a07ac646 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -50,7 +50,6 @@ #include "rb_loader.h" #include "rb_server.h" #include "rb_server_credentials.h" -#include "rb_signal.h" static VALUE grpc_rb_cTimeVal = Qnil; @@ -333,7 +332,6 @@ void Init_grpc_c() { Init_grpc_channel_credentials(); Init_grpc_server(); Init_grpc_server_credentials(); - Init_grpc_signals(); Init_grpc_status_codes(); Init_grpc_time_consts(); } diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 2b3acaaf59..aa7fa0af8e 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -60,6 +60,7 @@ typedef struct grpc_rb_server { VALUE mark; /* The actual server */ grpc_server *wrapped; + grpc_completion_queue *queue; } grpc_rb_server; /* Destroys server instances. */ @@ -145,6 +146,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) { } grpc_server_register_completion_queue(srv, cq, NULL); wrapper->wrapped = srv; + wrapper->queue = cq; /* Add the cq as the server's mark object. This ensures the ruby cq can't be GCed before the server */ @@ -205,6 +207,11 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) { grpc_call_details_destroy(&st->details); } +static void request_call_unblock_func(void *ptr) { + grpc_rb_server *rb_srv = (grpc_rb_server*)ptr; + grpc_server_shutdown_and_notify(rb_srv->wrapped, rb_srv->queue, rb_srv); +} + /* call-seq: cq = CompletionQueue.new tag = Object.new @@ -242,7 +249,9 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, return Qnil; } - ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout); + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout, + request_call_unblock_func, + (void*)s); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_request_call_stack_cleanup(&st); return Qnil; @@ -305,7 +314,8 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) { if (s->wrapped != NULL) { grpc_server_shutdown_and_notify(s->wrapped, cq, NULL); - ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout); + ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout, + NULL, NULL); if (!ev.success) { rb_warn("server shutdown failed, cancelling the calls, objects may leak"); grpc_server_cancel_all_calls(s->wrapped); diff --git a/src/ruby/ext/grpc/rb_signal.c b/src/ruby/ext/grpc/rb_signal.c deleted file mode 100644 index a9e512374b..0000000000 --- a/src/ruby/ext/grpc/rb_signal.c +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include -#include -#include - -#include - -#include "rb_grpc.h" - -static void (*old_sigint_handler)(int); -static void (*old_sigterm_handler)(int); - -static volatile bool signal_received = false; - -/* This has to be handled at the C level instead of Ruby, because Ruby signal - * handlers are constrained to run in the main interpreter thread. If that main - * thread is blocked on grpc_completion_queue_pluck, the signal handlers will - * never run */ -static void handle_signal(int signum) { - signal_received = true; - if (signum == SIGINT) { - old_sigint_handler(signum); - } else if (signum == SIGTERM) { - old_sigterm_handler(signum); - } -} - -static VALUE grpc_rb_signal_received(VALUE self) { - (void)self; - return signal_received ? Qtrue : Qfalse; -} - -void Init_grpc_signals() { - old_sigint_handler = signal(SIGINT, handle_signal); - old_sigterm_handler = signal(SIGTERM, handle_signal); - rb_define_singleton_method(grpc_rb_mGrpcCore, "signal_received?", - grpc_rb_signal_received, 0); -} diff --git a/src/ruby/ext/grpc/rb_signal.h b/src/ruby/ext/grpc/rb_signal.h deleted file mode 100644 index 07e49c0a8b..0000000000 --- a/src/ruby/ext/grpc/rb_signal.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_RB_SIGNAL_H_ -#define GRPC_RB_SIGNAL_H_ - -void Init_grpc_signals(); - -#endif /* GRPC_RB_SIGNAL_H_ */ diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index 7c9aae30e9..19b514e4e5 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -33,7 +33,6 @@ require_relative 'grpc/errors' require_relative 'grpc/grpc' require_relative 'grpc/logconfig' require_relative 'grpc/notifier' -require_relative 'grpc/signals' require_relative 'grpc/version' require_relative 'grpc/core/time_consts' require_relative 'grpc/generic/active_call' diff --git a/src/ruby/lib/grpc/signals.rb b/src/ruby/lib/grpc/signals.rb deleted file mode 100644 index 2ab85c8bb1..0000000000 --- a/src/ruby/lib/grpc/signals.rb +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2016, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'thread' -require_relative 'grpc' - -# GRPC contains the General RPC module. -module GRPC - # Signals contains gRPC functions related to signal handling - module Signals - @interpreter_exiting = false - @signal_handlers = [] - @handlers_mutex = Mutex.new - - def register_handler(&handler) - @handlers_mutex.synchronize do - @signal_handlers.push(handler) - handler.call if @exit_signal_received - end - # Returns a function to remove the handler - lambda do - @handlers_mutex.synchronize { @signal_handlers.delete(handler) } - end - end - module_function :register_handler - - def wait_for_signals - t = Thread.new do - sleep 0.1 until GRPC::Core.signal_received? || @interpreter_exiting - unless @interpreter_exiting - @handlers_mutex.synchronize do - @signal_handlers.each(&:call) - end - end - end - at_exit do - @interpreter_exiting = true - t.join - end - end - module_function :wait_for_signals - end -end -- cgit v1.2.3 From 76733cf196b0a457046656cad8f8f030a1c950b5 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 16 May 2016 15:00:29 -0700 Subject: Removed remaining references to old server handling code --- src/ruby/lib/grpc.rb | 2 -- src/ruby/lib/grpc/generic/active_call.rb | 5 ----- src/ruby/lib/grpc/generic/rpc_server.rb | 4 ---- 3 files changed, 11 deletions(-) diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index 19b514e4e5..79fa705b1c 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -47,5 +47,3 @@ begin ensure file.close end - -GRPC::Signals.wait_for_signals diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index fd20a86144..7fe588bd4c 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,7 +30,6 @@ require 'forwardable' require 'weakref' require_relative 'bidi_call' -require_relative '../signals' class Struct # BatchResult is the struct returned by calls to call#start_batch. @@ -123,10 +122,6 @@ module GRPC @unmarshal = unmarshal @metadata_tag = metadata_tag @op_notifier = nil - weak_self = WeakRef.new(self) - remove_handler = GRPC::Signals.register_handler(&weak_self - .method(:cancel)) - ObjectSpace.define_finalizer(self, remove_handler) end # output_metadata are provides access to hash that can be used to diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 238aaa9656..e1496d491a 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -28,7 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require_relative '../grpc' -require_relative '../signals' require_relative 'active_call' require_relative 'service' require 'thread' @@ -353,10 +352,7 @@ module GRPC transition_running_state(:running) @run_cond.broadcast end - remove_signal_handler = GRPC::Signals.register_handler { stop } loop_handle_server_calls - # Remove signal handler when server stops - remove_signal_handler.call end alias_method :run_till_terminated, :run -- cgit v1.2.3 From c0ecedba8379beee9480722ca1202e1cc44c124c Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Mon, 16 May 2016 16:14:52 -0700 Subject: Made signal handling properly handle non-killing signals --- src/ruby/ext/grpc/rb_call.c | 8 +---- src/ruby/ext/grpc/rb_completion_queue.c | 62 ++++++++++++++++++++++++++++----- src/ruby/ext/grpc/rb_completion_queue.h | 4 +-- src/ruby/ext/grpc/rb_server.c | 12 ++----- 4 files changed, 57 insertions(+), 29 deletions(-) diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index b43ad08eba..1b06273af9 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -722,10 +722,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { return result; } -static void run_batch_unblock_func(void *call) { - grpc_call_cancel((grpc_call*)call, NULL); -} - /* call-seq: cq = CompletionQueue.new ops = { @@ -776,9 +772,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, grpc_call_error_detail_of(err), err); return Qnil; } - ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout, - run_batch_unblock_func, - (void*)call); + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 4f671807eb..605c7408b4 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -52,21 +52,47 @@ typedef struct next_call_stack { grpc_event event; gpr_timespec timeout; void *tag; + volatile int interrupted; } next_call_stack; /* Calls grpc_completion_queue_next without holding the ruby GIL */ static void *grpc_rb_completion_queue_next_no_gil(void *param) { next_call_stack *const next_call = (next_call_stack*)param; - next_call->event = - grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL); + gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN); + gpr_timespec deadline; + do { + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); + if (gpr_time_cmp(deadline, next_call->timeout) > 0) { + // Then we have run out of time + break; + } + next_call->event = grpc_completion_queue_next(next_call->cq, + deadline, NULL); + if (next_call->event.success) { + break; + } + } while (!next_call->interrupted); return NULL; } /* Calls grpc_completion_queue_pluck without holding the ruby GIL */ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { next_call_stack *const next_call = (next_call_stack*)param; - next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag, - next_call->timeout, NULL); + gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN); + gpr_timespec deadline; + do { + deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); + if (gpr_time_cmp(deadline, next_call->timeout) > 0) { + // Then we have run out of time + break; + } + next_call->event = grpc_completion_queue_pluck(next_call->cq, + next_call->tag, + deadline, NULL); + if (next_call->event.type != GRPC_QUEUE_TIMEOUT) { + break; + } + } while (!next_call->interrupted); return NULL; } @@ -139,12 +165,15 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) { return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq); } +static void unblock_func(void *param) { + next_call_stack *const next_call = (next_call_stack*)param; + next_call->interrupted = 1; +} + /* Blocks until the next event for given tag is available, and returns the * event. */ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, - VALUE timeout, - rb_unblock_function_t *ubf, - void *unblock_arg) { + VALUE timeout) { next_call_stack next_call; MEMZERO(&next_call, next_call_stack, 1); TypedData_Get_Struct(self, grpc_completion_queue, @@ -160,8 +189,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, next_call.tag = ROBJECT(tag); } next_call.event.type = GRPC_QUEUE_TIMEOUT; - rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, - (void *)&next_call, ubf, unblock_arg); + /* Loop until we finish a pluck without an interruption. The internal + pluck function runs either until it is interrupted or it gets an + event, or time runs out. + + The basic reason we need this relatively complicated construction is that + we need to re-acquire the GVL when an interrupt comes in, so that the ruby + interpeter can do what it needs to do with the interrupt. But we also need + to get back to plucking when */ + do { + next_call.interrupted = 0; + rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, + (void *)&next_call, unblock_func, + (void *)&next_call); + /* If an interrupt prevented pluck from returning useful information, then + any plucks that did complete must have timed out */ + } while (next_call.interrupted && + next_call.event.type == GRPC_QUEUE_TIMEOUT); return next_call.event; } diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 4bd5739869..42de43c3fb 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -47,9 +47,7 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); * This avoids having code that holds the GIL repeated at multiple sites. */ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, - VALUE timeout, - rb_unblock_function_t *ubf, - void *unblock_arg); + VALUE timeout); /* Initializes the CompletionQueue class. */ void Init_grpc_completion_queue(); diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index aa7fa0af8e..0899feb685 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -207,11 +207,6 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) { grpc_call_details_destroy(&st->details); } -static void request_call_unblock_func(void *ptr) { - grpc_rb_server *rb_srv = (grpc_rb_server*)ptr; - grpc_server_shutdown_and_notify(rb_srv->wrapped, rb_srv->queue, rb_srv); -} - /* call-seq: cq = CompletionQueue.new tag = Object.new @@ -249,9 +244,7 @@ static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, return Qnil; } - ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout, - request_call_unblock_func, - (void*)s); + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout); if (ev.type == GRPC_QUEUE_TIMEOUT) { grpc_request_call_stack_cleanup(&st); return Qnil; @@ -314,8 +307,7 @@ static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) { if (s->wrapped != NULL) { grpc_server_shutdown_and_notify(s->wrapped, cq, NULL); - ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout, - NULL, NULL); + ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout); if (!ev.success) { rb_warn("server shutdown failed, cancelling the calls, objects may leak"); grpc_server_cancel_all_calls(s->wrapped); -- cgit v1.2.3 From 19bf00b69b665523607b972a8ec86fda04bbd5f4 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 17 May 2016 18:22:01 -0700 Subject: Re-added signal handlers to cancel calls and shut down servers --- src/ruby/lib/grpc.rb | 3 ++ src/ruby/lib/grpc/generic/active_call.rb | 5 +++ src/ruby/lib/grpc/generic/rpc_server.rb | 4 ++ src/ruby/lib/grpc/signals.rb | 68 ++++++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+) create mode 100644 src/ruby/lib/grpc/signals.rb diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index 79fa705b1c..ab21fc4772 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -33,6 +33,7 @@ require_relative 'grpc/errors' require_relative 'grpc/grpc' require_relative 'grpc/logconfig' require_relative 'grpc/notifier' +require_relative 'grpc/signals' require_relative 'grpc/version' require_relative 'grpc/core/time_consts' require_relative 'grpc/generic/active_call' @@ -47,3 +48,5 @@ begin ensure file.close end + +GRPC::Signals.init diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 7fe588bd4c..f91970a923 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,6 +30,7 @@ require 'forwardable' require 'weakref' require_relative 'bidi_call' +require_relative '../signals' class Struct # BatchResult is the struct returned by calls to call#start_batch. @@ -122,6 +123,10 @@ module GRPC @unmarshal = unmarshal @metadata_tag = metadata_tag @op_notifier = nil + weak_self = WeakRef.new(self) + signal_handler = proc { weak_self.cancel if weak_self.weakref_alive? } + remove_handler = GRPC::Signals.register_handler(&signal_handler) + ObjectSpace.define_finalizer(self, remove_handler) end # output_metadata are provides access to hash that can be used to diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index e1496d491a..238aaa9656 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -28,6 +28,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require_relative '../grpc' +require_relative '../signals' require_relative 'active_call' require_relative 'service' require 'thread' @@ -352,7 +353,10 @@ module GRPC transition_running_state(:running) @run_cond.broadcast end + remove_signal_handler = GRPC::Signals.register_handler { stop } loop_handle_server_calls + # Remove signal handler when server stops + remove_signal_handler.call end alias_method :run_till_terminated, :run diff --git a/src/ruby/lib/grpc/signals.rb b/src/ruby/lib/grpc/signals.rb new file mode 100644 index 0000000000..56bc9f32a0 --- /dev/null +++ b/src/ruby/lib/grpc/signals.rb @@ -0,0 +1,68 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'thread' +require_relative 'grpc' + +# GRPC contains the General RPC module. +module GRPC + # Signals contains gRPC functions related to signal handling + module Signals + @signal_handlers = [] + @handlers_mutex = Mutex.new + @previous_handlers = {} + # @signal_received = false + + def register_handler(&handler) + @handlers_mutex.synchronize do + @signal_handlers.push(handler) + # handler.call if @signal_received + end + # Returns a function to remove the handler + lambda do + @handlers_mutex.synchronize { @signal_handlers.delete(handler) } + end + end + module_function :register_handler + + def run_handlers(signal) + # @signal_received = true + @signal_handlers.each(&:call) + @previous_handlers[signal].call + end + module_function :run_handlers + + def init + %w(INT TERM).each do |sig| + @previous_handlers[sig] = Signal.trap(sig) { run_handlers(sig) } + end + end + module_function :init + end +end -- cgit v1.2.3 From e68a71582f4f5dab8cb4823f8ce7f272ed54c7f5 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 18 May 2016 09:46:26 -0700 Subject: Fixed a couple of issues, removed ruby-level signal handling again --- src/ruby/ext/grpc/rb_completion_queue.c | 18 +++------ src/ruby/lib/grpc.rb | 3 -- src/ruby/lib/grpc/generic/active_call.rb | 4 -- src/ruby/lib/grpc/generic/rpc_server.rb | 3 -- src/ruby/lib/grpc/signals.rb | 68 -------------------------------- 5 files changed, 6 insertions(+), 90 deletions(-) delete mode 100644 src/ruby/lib/grpc/signals.rb diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 605c7408b4..b6ddbe88dc 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -62,13 +62,10 @@ static void *grpc_rb_completion_queue_next_no_gil(void *param) { gpr_timespec deadline; do { deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); - if (gpr_time_cmp(deadline, next_call->timeout) > 0) { - // Then we have run out of time - break; - } next_call->event = grpc_completion_queue_next(next_call->cq, deadline, NULL); - if (next_call->event.success) { + if (next_call->event.type != GRPC_QUEUE_TIMEOUT || + gpr_time_cmp(deadline, next_call->timeout) > 0) { break; } } while (!next_call->interrupted); @@ -82,14 +79,11 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) { gpr_timespec deadline; do { deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment); - if (gpr_time_cmp(deadline, next_call->timeout) > 0) { - // Then we have run out of time - break; - } next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag, deadline, NULL); - if (next_call->event.type != GRPC_QUEUE_TIMEOUT) { + if (next_call->event.type != GRPC_QUEUE_TIMEOUT || + gpr_time_cmp(deadline, next_call->timeout) > 0) { break; } } while (!next_call->interrupted); @@ -195,8 +189,8 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag, The basic reason we need this relatively complicated construction is that we need to re-acquire the GVL when an interrupt comes in, so that the ruby - interpeter can do what it needs to do with the interrupt. But we also need - to get back to plucking when */ + interpreter can do what it needs to do with the interrupt. But we also need + to get back to plucking when the interrupt has been handled. */ do { next_call.interrupted = 0; rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil, diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index ab21fc4772..79fa705b1c 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -33,7 +33,6 @@ require_relative 'grpc/errors' require_relative 'grpc/grpc' require_relative 'grpc/logconfig' require_relative 'grpc/notifier' -require_relative 'grpc/signals' require_relative 'grpc/version' require_relative 'grpc/core/time_consts' require_relative 'grpc/generic/active_call' @@ -48,5 +47,3 @@ begin ensure file.close end - -GRPC::Signals.init diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index f91970a923..c982e0fcc5 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -123,10 +123,6 @@ module GRPC @unmarshal = unmarshal @metadata_tag = metadata_tag @op_notifier = nil - weak_self = WeakRef.new(self) - signal_handler = proc { weak_self.cancel if weak_self.weakref_alive? } - remove_handler = GRPC::Signals.register_handler(&signal_handler) - ObjectSpace.define_finalizer(self, remove_handler) end # output_metadata are provides access to hash that can be used to diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 238aaa9656..b548d3cf26 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -353,10 +353,7 @@ module GRPC transition_running_state(:running) @run_cond.broadcast end - remove_signal_handler = GRPC::Signals.register_handler { stop } loop_handle_server_calls - # Remove signal handler when server stops - remove_signal_handler.call end alias_method :run_till_terminated, :run diff --git a/src/ruby/lib/grpc/signals.rb b/src/ruby/lib/grpc/signals.rb deleted file mode 100644 index 56bc9f32a0..0000000000 --- a/src/ruby/lib/grpc/signals.rb +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright 2016, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -require 'thread' -require_relative 'grpc' - -# GRPC contains the General RPC module. -module GRPC - # Signals contains gRPC functions related to signal handling - module Signals - @signal_handlers = [] - @handlers_mutex = Mutex.new - @previous_handlers = {} - # @signal_received = false - - def register_handler(&handler) - @handlers_mutex.synchronize do - @signal_handlers.push(handler) - # handler.call if @signal_received - end - # Returns a function to remove the handler - lambda do - @handlers_mutex.synchronize { @signal_handlers.delete(handler) } - end - end - module_function :register_handler - - def run_handlers(signal) - # @signal_received = true - @signal_handlers.each(&:call) - @previous_handlers[signal].call - end - module_function :run_handlers - - def init - %w(INT TERM).each do |sig| - @previous_handlers[sig] = Signal.trap(sig) { run_handlers(sig) } - end - end - module_function :init - end -end -- cgit v1.2.3 From 746ea97afdcdffa074d4710d6706e4e988d4ad8e Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 18 May 2016 09:49:10 -0700 Subject: Finished removing ruby-level signal handlers again --- src/ruby/lib/grpc/generic/active_call.rb | 1 - src/ruby/lib/grpc/generic/rpc_server.rb | 1 - 2 files changed, 2 deletions(-) diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index c982e0fcc5..7fe588bd4c 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -30,7 +30,6 @@ require 'forwardable' require 'weakref' require_relative 'bidi_call' -require_relative '../signals' class Struct # BatchResult is the struct returned by calls to call#start_batch. diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index b548d3cf26..e1496d491a 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -28,7 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require_relative '../grpc' -require_relative '../signals' require_relative 'active_call' require_relative 'service' require 'thread' -- cgit v1.2.3