aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/rpc_server.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib/grpc/generic/rpc_server.rb')
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb37
1 files changed, 25 insertions, 12 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index d5fc11dc1c..c80c7fcd32 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -92,9 +92,13 @@ module GRPC
# Stops the jobs in the pool
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
- schedule { throw :exit } while ready_for_work?
- @stop_mutex.synchronize do # wait @keep_alive for works to stop
+ @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop
@stopped = true
+ loop do
+ break unless ready_for_work?
+ worker_queue = @ready_workers.pop
+ worker_queue << [proc { throw :exit }, []]
+ end
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
@@ -138,7 +142,10 @@ module GRPC
end
# there shouldn't be any work given to this thread while its busy
fail('received a task while busy') unless worker_queue.empty?
- @ready_workers << worker_queue
+ @stop_mutex.synchronize do
+ return if @stopped
+ @ready_workers << worker_queue
+ end
end
end
end
@@ -186,8 +193,13 @@ module GRPC
# * max_waiting_requests: Deprecated due to internal changes to the thread
# pool. This is still an argument for compatibility but is ignored.
#
- # * poll_period: when present, the server polls for new events with this
- # period
+ # * poll_period: The amount of time in seconds to wait for
+ # currently-serviced RPC's to finish before cancelling them when shutting
+ # down the server.
+ #
+ # * pool_keep_alive: The amount of time in seconds to wait
+ # for currently busy thread-pool threads to finish before
+ # forcing an abrupt exit to each thread.
#
# * connect_md_proc:
# when non-nil is a proc for determining metadata to to send back the client
@@ -202,17 +214,18 @@ module GRPC
# intercepting server handlers to provide extra functionality.
# Interceptors are an EXPERIMENTAL API.
#
- def initialize(pool_size:DEFAULT_POOL_SIZE,
- max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
- poll_period:DEFAULT_POLL_PERIOD,
- connect_md_proc:nil,
- server_args:{},
- interceptors:[])
+ def initialize(pool_size: DEFAULT_POOL_SIZE,
+ max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS,
+ poll_period: DEFAULT_POLL_PERIOD,
+ pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE,
+ connect_md_proc: nil,
+ server_args: {},
+ interceptors: [])
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@pool_size = pool_size
- @pool = Pool.new(@pool_size)
+ @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
# running_state can take 4 values: :not_started, :running, :stopping, and