diff options
Diffstat (limited to 'src/ruby/lib')
-rw-r--r-- | src/ruby/lib/grpc/generic/client_stub.rb | 1 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 75 |
2 files changed, 66 insertions, 10 deletions
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb index dc7672d359..7b2c04aa22 100644 --- a/src/ruby/lib/grpc/generic/client_stub.rb +++ b/src/ruby/lib/grpc/generic/client_stub.rb @@ -28,7 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. require 'grpc/generic/active_call' -require 'xray/thread_dump_signal_handler' # GRPC contains the General RPC module. module GRPC diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 30a4bf1532..c7c8267fa3 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -31,7 +31,9 @@ require 'grpc/grpc' require 'grpc/generic/active_call' require 'grpc/generic/service' require 'thread' -require 'xray/thread_dump_signal_handler' + +# A global that contains signals the gRPC servers should respond to. +$grpc_signals = [] # GRPC contains the General RPC module. module GRPC @@ -50,6 +52,23 @@ module GRPC # Default max_waiting_requests size is 20 DEFAULT_MAX_WAITING_REQUESTS = 20 + # Default poll period is 1s + DEFAULT_POLL_PERIOD = 1 + + # Signal check period is 0.25s + SIGNAL_CHECK_PERIOD = 0.25 + + # Sets up a signal handler that adds signals to the signal handling global. + # + # Signal handlers should do as little as humanly possible. + # Here, they just add themselves to $grpc_signals + # + # RpcServer (and later other parts of gRPC) monitors the signals + # $grpc_signals in its own non-signal context. + def self.trap_signals + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } + end + # Creates a new RpcServer. # # The RPC server is configured using keyword arguments. @@ -79,7 +98,7 @@ module GRPC # with not available to new requests def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:INFINITE_FUTURE, + poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, **kw) @@ -117,6 +136,13 @@ module GRPC return unless @running @stopped = true @pool.stop + + # TODO: uncomment this: + # + # This segfaults in the c layer, so its commented out for now. Shutdown + # still occurs, but the c layer has to do the cleanup. + # + # @server.close end # determines if the server is currently running @@ -139,7 +165,37 @@ module GRPC running? end - # determines if the server is currently stopped + # Runs the server in its own thread, then waits for signal INT or TERM on + # the current thread to terminate it. + def run_till_terminated + self.class.trap_signals + t = Thread.new { run } + wait_till_running + loop do + sleep SIGNAL_CHECK_PERIOD + break unless handle_signals + end + stop + t.join + end + + # Handles the signals in $grpc_signals. + # + # @return false if the server should exit, true if not. + def handle_signals + loop do + sig = $grpc_signals.shift + case sig + when 'INT' + return false + when 'TERM' + return false + end + end + true + end + + # Determines if the server is currently stopped def stopped? @stopped ||= false end @@ -265,7 +321,10 @@ module GRPC # Pool is a simple thread pool for running server requests. class Pool - def initialize(size) + # Default keep alive period is 1s + DEFAULT_KEEP_ALIVE = 1 + + def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) fail 'pool size must be positive' unless size > 0 @jobs = Queue.new @size = size @@ -273,6 +332,7 @@ module GRPC @stop_mutex = Mutex.new @stop_cond = ConditionVariable.new @workers = [] + @keep_alive = keep_alive end # Returns the number of jobs waiting @@ -325,15 +385,13 @@ module GRPC @workers.size.times { schedule { throw :exit } } @stopped = true - # TODO: allow configuration of the keepalive period - keep_alive = 5 @stop_mutex.synchronize do - @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0 + @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end # Forcibly shutdown any threads that are still alive. if @workers.size > 0 - logger.warn("forcibly terminating #{@workers.size} worker(s)") + logger.info("forcibly terminating #{@workers.size} worker(s)") @workers.each do |t| next unless t.alive? begin @@ -344,7 +402,6 @@ module GRPC end end end - logger.info('stopped, all workers are shutdown') end end |