aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/lib/grpc/generic/rpc_server.rb
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/lib/grpc/generic/rpc_server.rb')
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb171
1 files changed, 141 insertions, 30 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 7dbcb7d479..00e0db71be 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -31,10 +31,133 @@ require_relative '../grpc'
require_relative 'active_call'
require_relative 'service'
require 'thread'
-require 'concurrent'
# GRPC contains the General RPC module.
module GRPC
+ # Pool is a simple thread pool.
+ class Pool
+ # Default keep alive period is 1s
+ DEFAULT_KEEP_ALIVE = 1
+
+ def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
+ fail 'pool size must be positive' unless size > 0
+ @jobs = Queue.new
+ @size = size
+ @stopped = false
+ @stop_mutex = Mutex.new # needs to be held when accessing @stopped
+ @stop_cond = ConditionVariable.new
+ @workers = []
+ @keep_alive = keep_alive
+
+ # Each worker thread has its own queue to push and pull jobs
+ # these queues are put into @ready_queues when that worker is idle
+ @ready_workers = Queue.new
+ end
+
+ # Returns the number of jobs waiting
+ def jobs_waiting
+ @jobs.size
+ end
+
+ def ready_for_work?
+ # Busy worker threads are either doing work, or have a single job
+ # waiting on them. Workers that are idle with no jobs waiting
+ # have their "queues" in @ready_workers
+ !@ready_workers.empty?
+ end
+
+ # Runs the given block on the queue with the provided args.
+ #
+ # @param args the args passed blk when it is called
+ # @param blk the block to call
+ def schedule(*args, &blk)
+ return if blk.nil?
+ @stop_mutex.synchronize do
+ if @stopped
+ GRPC.logger.warn('did not schedule job, already stopped')
+ return
+ end
+ GRPC.logger.info('schedule another job')
+ fail 'No worker threads available' if @ready_workers.empty?
+ worker_queue = @ready_workers.pop
+
+ fail 'worker already has a task waiting' unless worker_queue.empty?
+ worker_queue << [blk, args]
+ end
+ end
+
+ # Starts running the jobs in the thread pool.
+ def start
+ @stop_mutex.synchronize do
+ fail 'already stopped' if @stopped
+ end
+ until @workers.size == @size.to_i
+ new_worker_queue = Queue.new
+ @ready_workers << new_worker_queue
+ next_thread = Thread.new(new_worker_queue) do |jobs|
+ catch(:exit) do # allows { throw :exit } to kill a thread
+ loop_execute_jobs(jobs)
+ end
+ remove_current_thread
+ end
+ @workers << next_thread
+ end
+ end
+
+ # Stops the jobs in the pool
+ def stop
+ GRPC.logger.info('stopping, will wait for all the workers to exit')
+ schedule { throw :exit } while ready_for_work?
+ @stop_mutex.synchronize do # wait @keep_alive for works to stop
+ @stopped = true
+ @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
+ end
+ forcibly_stop_workers
+ GRPC.logger.info('stopped, all workers are shutdown')
+ end
+
+ protected
+
+ # Forcibly shutdown any threads that are still alive.
+ def forcibly_stop_workers
+ return unless @workers.size > 0
+ GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
+ @workers.each do |t|
+ next unless t.alive?
+ begin
+ t.exit
+ rescue StandardError => e
+ GRPC.logger.warn('error while terminating a worker')
+ GRPC.logger.warn(e)
+ end
+ end
+ end
+
+ # removes the threads from workers, and signal when all the
+ # threads are complete.
+ def remove_current_thread
+ @stop_mutex.synchronize do
+ @workers.delete(Thread.current)
+ @stop_cond.signal if @workers.size.zero?
+ end
+ end
+
+ def loop_execute_jobs(worker_queue)
+ loop do
+ begin
+ blk, args = worker_queue.pop
+ blk.call(*args)
+ rescue StandardError => e
+ GRPC.logger.warn('Error in worker thread')
+ GRPC.logger.warn(e)
+ end
+ # there shouldn't be any work given to this thread while its busy
+ fail('received a task while busy') unless worker_queue.empty?
+ @ready_workers << worker_queue
+ end
+ end
+ end
+
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
@@ -44,14 +167,11 @@ module GRPC
def_delegators :@server, :add_http2_port
- # Default max size of the thread pool size is 100
- DEFAULT_MAX_POOL_SIZE = 100
-
- # Default minimum size of the thread pool is 5
- DEFAULT_MIN_POOL_SIZE = 5
+ # Default thread pool size is 30
+ DEFAULT_POOL_SIZE = 30
- # Default max_waiting_requests size is 60
- DEFAULT_MAX_WAITING_REQUESTS = 60
+ # Deprecated due to internal changes to the thread pool
+ DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s
DEFAULT_POLL_PERIOD = 1
@@ -74,12 +194,12 @@ module GRPC
# There are some specific keyword args used to configure the RpcServer
# instance.
#
- # * pool_size: the maximum size of the thread pool that the server's
- # thread pool can reach.
+ # * pool_size: the size of the thread pool the server uses to run its
+ # threads. No more concurrent requests can be made than the size
+ # of the thread pool
#
- # * max_waiting_requests: the maximum number of requests that are not
- # being handled to allow. When this limit is exceeded, the server responds
- # with not available to new requests
+ # * max_waiting_requests: Deprecated due to internal changes to the thread
+ # pool. This is still an argument for compatibility but is ignored.
#
# * poll_period: when present, the server polls for new events with this
# period
@@ -91,8 +211,7 @@ module GRPC
#
# * server_args:
# A server arguments hash to be passed down to the underlying core server
- def initialize(pool_size:DEFAULT_MAX_POOL_SIZE,
- min_pool_size:DEFAULT_MIN_POOL_SIZE,
+ def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
connect_md_proc:nil,
@@ -100,12 +219,8 @@ module GRPC
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
-
- @pool = Concurrent::ThreadPoolExecutor.new(
- min_threads: [min_pool_size, pool_size].min,
- max_threads: pool_size,
- max_queue: max_waiting_requests,
- fallback_policy: :discard)
+ @pool_size = pool_size
+ @pool = Pool.new(@pool_size)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
# running_state can take 4 values: :not_started, :running, :stopping, and
@@ -126,8 +241,7 @@ module GRPC
end
deadline = from_relative_time(@poll_period)
@server.close(deadline)
- @pool.shutdown
- @pool.wait_for_termination
+ @pool.stop
end
def running_state
@@ -224,6 +338,7 @@ module GRPC
def run
@run_mutex.synchronize do
fail 'cannot run without registering services' if rpc_descs.size.zero?
+ @pool.start
@server.start
transition_running_state(:running)
@run_cond.broadcast
@@ -235,12 +350,8 @@ module GRPC
# Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
def available?(an_rpc)
- jobs_count, max = @pool.queue_length, @pool.max_queue
- GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
-
- # remaining capacity for ThreadPoolExecutors is -1 if unbounded
- return an_rpc if @pool.remaining_capacity != 0
- GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ return an_rpc if @pool.ready_for_work?
+ GRPC.logger.warn('no free worker threads currently')
noop = proc { |x| x }
# Create a new active call that knows that metadata hasn't been
@@ -275,7 +386,7 @@ module GRPC
break if (!an_rpc.nil?) && an_rpc.call.nil?
active_call = new_active_server_call(an_rpc)
unless active_call.nil?
- @pool.post(active_call) do |ac|
+ @pool.schedule(active_call) do |ac|
c, mth = ac
begin
rpc_descs[mth].run_server_method(c, rpc_handlers[mth])