aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib')
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb1
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb75
2 files changed, 66 insertions, 10 deletions
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index dc7672d359..7b2c04aa22 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -28,7 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'grpc/generic/active_call'
-require 'xray/thread_dump_signal_handler'
# GRPC contains the General RPC module.
module GRPC
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 30a4bf1532..c7c8267fa3 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -31,7 +31,9 @@ require 'grpc/grpc'
require 'grpc/generic/active_call'
require 'grpc/generic/service'
require 'thread'
-require 'xray/thread_dump_signal_handler'
+
+# A global that contains signals the gRPC servers should respond to.
+$grpc_signals = []
# GRPC contains the General RPC module.
module GRPC
@@ -50,6 +52,23 @@ module GRPC
# Default max_waiting_requests size is 20
DEFAULT_MAX_WAITING_REQUESTS = 20
+ # Default poll period is 1s
+ DEFAULT_POLL_PERIOD = 1
+
+ # Signal check period is 0.25s
+ SIGNAL_CHECK_PERIOD = 0.25
+
+ # Sets up a signal handler that adds signals to the signal handling global.
+ #
+ # Signal handlers should do as little as humanly possible.
+ # Here, they just add themselves to $grpc_signals
+ #
+ # RpcServer (and later other parts of gRPC) monitors the signals
+ # $grpc_signals in its own non-signal context.
+ def self.trap_signals
+ %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
+ end
+
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
@@ -79,7 +98,7 @@ module GRPC
# with not available to new requests
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
- poll_period:INFINITE_FUTURE,
+ poll_period:DEFAULT_POLL_PERIOD,
completion_queue_override:nil,
server_override:nil,
**kw)
@@ -117,6 +136,13 @@ module GRPC
return unless @running
@stopped = true
@pool.stop
+
+ # TODO: uncomment this:
+ #
+ # This segfaults in the c layer, so its commented out for now. Shutdown
+ # still occurs, but the c layer has to do the cleanup.
+ #
+ # @server.close
end
# determines if the server is currently running
@@ -139,7 +165,37 @@ module GRPC
running?
end
- # determines if the server is currently stopped
+ # Runs the server in its own thread, then waits for signal INT or TERM on
+ # the current thread to terminate it.
+ def run_till_terminated
+ self.class.trap_signals
+ t = Thread.new { run }
+ wait_till_running
+ loop do
+ sleep SIGNAL_CHECK_PERIOD
+ break unless handle_signals
+ end
+ stop
+ t.join
+ end
+
+ # Handles the signals in $grpc_signals.
+ #
+ # @return false if the server should exit, true if not.
+ def handle_signals
+ loop do
+ sig = $grpc_signals.shift
+ case sig
+ when 'INT'
+ return false
+ when 'TERM'
+ return false
+ end
+ end
+ true
+ end
+
+ # Determines if the server is currently stopped
def stopped?
@stopped ||= false
end
@@ -265,7 +321,10 @@ module GRPC
# Pool is a simple thread pool for running server requests.
class Pool
- def initialize(size)
+ # Default keep alive period is 1s
+ DEFAULT_KEEP_ALIVE = 1
+
+ def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@@ -273,6 +332,7 @@ module GRPC
@stop_mutex = Mutex.new
@stop_cond = ConditionVariable.new
@workers = []
+ @keep_alive = keep_alive
end
# Returns the number of jobs waiting
@@ -325,15 +385,13 @@ module GRPC
@workers.size.times { schedule { throw :exit } }
@stopped = true
- # TODO: allow configuration of the keepalive period
- keep_alive = 5
@stop_mutex.synchronize do
- @stop_cond.wait(@stop_mutex, keep_alive) if @workers.size > 0
+ @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
# Forcibly shutdown any threads that are still alive.
if @workers.size > 0
- logger.warn("forcibly terminating #{@workers.size} worker(s)")
+ logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
@@ -344,7 +402,6 @@ module GRPC
end
end
end
-
logger.info('stopped, all workers are shutdown')
end
end