diff options
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc.rb | 3 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 6 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 63 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/service.rb | 16 | ||||
-rw-r--r-- | src/ruby/lib/grpc/signals.rb | 69 |
5 files changed, 89 insertions, 68 deletions
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index 79fa705b1c..7c9aae30e9 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -33,6 +33,7 @@ require_relative 'grpc/errors' require_relative 'grpc/grpc' require_relative 'grpc/logconfig' require_relative 'grpc/notifier' +require_relative 'grpc/signals' require_relative 'grpc/version' require_relative 'grpc/core/time_consts' require_relative 'grpc/generic/active_call' @@ -47,3 +48,5 @@ begin ensure file.close end + +GRPC::Signals.wait_for_signals diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index ecf3cc3293..fd20a86144 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -28,7 +28,9 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'forwardable' +require 'weakref' require_relative 'bidi_call' +require_relative '../signals' class Struct # BatchResult is the struct returned by calls to call#start_batch. @@ -121,6 +123,10 @@ module GRPC @unmarshal = unmarshal @metadata_tag = metadata_tag @op_notifier = nil + weak_self = WeakRef.new(self) + remove_handler = GRPC::Signals.register_handler(&weak_self + .method(:cancel)) + ObjectSpace.define_finalizer(self, remove_handler) end # output_metadata are provides access to hash that can be used to diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index a0f4071adc..238aaa9656 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -28,46 +28,13 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require_relative '../grpc' +require_relative '../signals' require_relative 'active_call' require_relative 'service' require 'thread' -# A global that contains signals the gRPC servers should respond to. -$grpc_signals = [] - # GRPC contains the General RPC module. module GRPC - # Handles the signals in $grpc_signals. - # - # @return false if the server should exit, true if not. - def handle_signals - loop do - sig = $grpc_signals.shift - case sig - when 'INT' - return false - when 'TERM' - return false - when nil - return true - end - end - true - end - module_function :handle_signals - - # Sets up a signal handler that adds signals to the signal handling global. - # - # Signal handlers should do as little as humanly possible. - # Here, they just add themselves to $grpc_signals - # - # RpcServer (and later other parts of gRPC) monitors the signals - # $grpc_signals in its own non-signal context. - def trap_signals - %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } - end - module_function :trap_signals - # Pool is a simple thread pool. class Pool # Default keep alive period is 1s @@ -328,23 +295,6 @@ module GRPC end end - # Runs the server in its own thread, then waits for signal INT or TERM on - # the current thread to terminate it. - def run_till_terminated - GRPC.trap_signals - t = Thread.new do - run - end - t.abort_on_exception = true - wait_till_running - until running_state == :stopped - sleep SIGNAL_CHECK_PERIOD - break unless GRPC.handle_signals - end - stop - t.join - end - # handle registration of classes # # service is either a class that includes GRPC::GenericService and whose @@ -403,9 +353,14 @@ module GRPC transition_running_state(:running) @run_cond.broadcast end + remove_signal_handler = GRPC::Signals.register_handler { stop } loop_handle_server_calls + # Remove signal handler when server stops + remove_signal_handler.call end + alias_method :run_till_terminated, :run + # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests @@ -503,10 +458,8 @@ module GRPC unless cls.include?(GenericService) fail "#{cls} must 'include GenericService'" end - if cls.rpc_descs.size.zero? - fail "#{cls} should specify some rpc descriptions" - end - cls.assert_rpc_descs_have_methods + fail "#{cls} should specify some rpc descriptions" if + cls.rpc_descs.size.zero? end # This should be called while holding @run_mutex diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb index 8e940b5b13..0a166e823e 100644 --- a/src/ruby/lib/grpc/generic/service.rb +++ b/src/ruby/lib/grpc/generic/service.rb @@ -110,6 +110,9 @@ module GRPC rpc_descs[name] = RpcDesc.new(name, input, output, marshal_class_method, unmarshal_class_method) + define_method(name) do + fail GRPC::BadStatus, GRPC::Core::StatusCodes::UNIMPLEMENTED + end end def inherited(subclass) @@ -199,19 +202,6 @@ module GRPC end end end - - # Asserts that the appropriate methods are defined for each added rpc - # spec. Is intended to aid verifying that server classes are correctly - # implemented. - def assert_rpc_descs_have_methods - rpc_descs.each_pair do |m, spec| - mth_name = GenericService.underscore(m.to_s).to_sym - unless instance_methods.include?(mth_name) - fail "#{self} does not provide instance method '#{mth_name}'" - end - spec.assert_arity_matches(instance_method(mth_name)) - end - end end def self.included(o) diff --git a/src/ruby/lib/grpc/signals.rb b/src/ruby/lib/grpc/signals.rb new file mode 100644 index 0000000000..2ab85c8bb1 --- /dev/null +++ b/src/ruby/lib/grpc/signals.rb @@ -0,0 +1,69 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require 'thread' +require_relative 'grpc' + +# GRPC contains the General RPC module. +module GRPC + # Signals contains gRPC functions related to signal handling + module Signals + @interpreter_exiting = false + @signal_handlers = [] + @handlers_mutex = Mutex.new + + def register_handler(&handler) + @handlers_mutex.synchronize do + @signal_handlers.push(handler) + handler.call if @exit_signal_received + end + # Returns a function to remove the handler + lambda do + @handlers_mutex.synchronize { @signal_handlers.delete(handler) } + end + end + module_function :register_handler + + def wait_for_signals + t = Thread.new do + sleep 0.1 until GRPC::Core.signal_received? || @interpreter_exiting + unless @interpreter_exiting + @handlers_mutex.synchronize do + @signal_handlers.each(&:call) + end + end + end + at_exit do + @interpreter_exiting = true + t.join + end + end + module_function :wait_for_signals + end +end |