diff options
author | Vijay Pai <vpai@google.com> | 2016-11-07 09:29:35 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-11-07 09:29:35 -0800 |
commit | aa50f2e6a7d1592518994c04a173b238ff01a12d (patch) | |
tree | 7ed230446e52b3b3f5afa6beeb6cc00ddba39054 | |
parent | 24f861b1f9b78c1760872828cf538155dcf547fb (diff) | |
parent | a713b76d575e124cf2bc155ab54107930dad0fa8 (diff) |
Merge pull request #8632 from apolcyn/fix_ruby_deadlock
remove wait queue from ruby thread pool to avoid possibility of deadlock
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 52 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_pool_spec.rb | 62 |
2 files changed, 69 insertions, 45 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index efdbeb6f2d..ec75ca81d7 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -48,6 +48,10 @@ module GRPC @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive + + # Each worker thread has its own queue to push and pull jobs + # these queues are put into @ready_queues when that worker is idle + @ready_workers = Queue.new end # Returns the number of jobs waiting @@ -55,6 +59,13 @@ module GRPC @jobs.size end + def ready_for_work? + # Busy worker threads are either doing work, or have a single job + # waiting on them. Workers that are idle with no jobs waiting + # have their "queues" in @ready_workers + !@ready_workers.empty? + end + # Runs the given block on the queue with the provided args. # # @param args the args passed blk when it is called @@ -67,7 +78,11 @@ module GRPC return end GRPC.logger.info('schedule another job') - @jobs << [blk, args] + fail 'No worker threads available' if @ready_workers.empty? + worker_queue = @ready_workers.pop + + fail 'worker already has a task waiting' unless worker_queue.empty? + worker_queue << [blk, args] end end @@ -77,9 +92,11 @@ module GRPC fail 'already stopped' if @stopped end until @workers.size == @size.to_i - next_thread = Thread.new do + new_worker_queue = Queue.new + @ready_workers << new_worker_queue + next_thread = Thread.new(new_worker_queue) do |jobs| catch(:exit) do # allows { throw :exit } to kill a thread - loop_execute_jobs + loop_execute_jobs(jobs) end remove_current_thread end @@ -90,7 +107,7 @@ module GRPC # Stops the jobs in the pool def stop GRPC.logger.info('stopping, will wait for all the workers to exit') - @workers.size.times { schedule { throw :exit } } + schedule { throw :exit } while ready_for_work? @stop_mutex.synchronize do # wait @keep_alive for works to stop @stopped = true @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 @@ -125,15 +142,18 @@ module GRPC end end - def loop_execute_jobs + def loop_execute_jobs(worker_queue) loop do begin - blk, args = @jobs.pop + blk, args = worker_queue.pop blk.call(*args) rescue StandardError => e GRPC.logger.warn('Error in worker thread') GRPC.logger.warn(e) end + # there shouldn't be any work given to this thread while its busy + fail('received a task while busy') unless worker_queue.empty? + @ready_workers << worker_queue end end end @@ -147,10 +167,10 @@ module GRPC def_delegators :@server, :add_http2_port - # Default thread pool size is 3 - DEFAULT_POOL_SIZE = 3 + # Default thread pool size is 30 + DEFAULT_POOL_SIZE = 30 - # Default max_waiting_requests size is 20 + # Deprecated due to internal changes to the thread pool DEFAULT_MAX_WAITING_REQUESTS = 20 # Default poll period is 1s @@ -175,11 +195,11 @@ module GRPC # instance. # # * pool_size: the size of the thread pool the server uses to run its - # threads + # threads. No more concurrent requests can be made than the size + # of the thread pool # - # * max_waiting_requests: the maximum number of requests that are not - # being handled to allow. When this limit is exceeded, the server responds - # with not available to new requests + # * max_waiting_requests: Deprecated due to internal changes to the thread + # pool. This is still an argument for compatibility but is ignored. # # * poll_period: when present, the server polls for new events with this # period @@ -330,10 +350,8 @@ module GRPC # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs def available?(an_rpc) - jobs_count, max = @pool.jobs_waiting, @max_waiting_requests - GRPC.logger.info("waiting: #{jobs_count}, max: #{max}") - return an_rpc if @pool.jobs_waiting <= @max_waiting_requests - GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") + return an_rpc if @pool.ready_for_work? + GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true) diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index b67008de48..48ccaee510 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -29,6 +29,8 @@ require 'grpc' +Thread.abort_on_exception = true + describe GRPC::Pool do Pool = GRPC::Pool @@ -44,32 +46,34 @@ describe GRPC::Pool do end end - describe '#jobs_waiting' do - it 'at start, it is zero' do + describe '#ready_for_work?' do + it 'before start it is not ready' do p = Pool.new(1) - expect(p.jobs_waiting).to be(0) + expect(p.ready_for_work?).to be(false) end - it 'it increases, with each scheduled job if the pool is not running' do - p = Pool.new(1) - job = proc {} - expect(p.jobs_waiting).to be(0) - 5.times do |i| + it 'it stops being ready after all workers jobs waiting or running' do + p = Pool.new(5) + p.start + job = proc { sleep(3) } # sleep so workers busy when done scheduling + 5.times do + expect(p.ready_for_work?).to be(true) p.schedule(&job) - expect(p.jobs_waiting).to be(i + 1) end + expect(p.ready_for_work?).to be(false) end - it 'it decreases as jobs are run' do - p = Pool.new(1) + it 'it becomes ready again after jobs complete' do + p = Pool.new(5) + p.start job = proc {} - expect(p.jobs_waiting).to be(0) - 3.times do + 5.times do + expect(p.ready_for_work?).to be(true) p.schedule(&job) end - p.start - sleep 2 - expect(p.jobs_waiting).to be(0) + expect(p.ready_for_work?).to be(false) + sleep 5 # give the pool time do get at least one task done + expect(p.ready_for_work?).to be(true) end end @@ -90,6 +94,18 @@ describe GRPC::Pool do expect(q.pop).to be(o) p.stop end + + it 'it throws an error if all of the workers have tasks to do' do + p = Pool.new(5) + p.start + job = proc {} + 5.times do + expect(p.ready_for_work?).to be(true) + p.schedule(&job) + end + expect { p.schedule(&job) }.to raise_error + expect { p.schedule(&job) }.to raise_error + end end describe '#stop' do @@ -113,18 +129,8 @@ describe GRPC::Pool do end describe '#start' do - it 'runs pre-scheduled jobs' do - p = Pool.new(2) - o, q = Object.new, Queue.new - n = 5 # arbitrary - n.times { p.schedule(o, &q.method(:push)) } - p.start - n.times { expect(q.pop).to be(o) } - p.stop - end - - it 'runs jobs as they are scheduled ' do - p = Pool.new(2) + it 'runs jobs as they are scheduled' do + p = Pool.new(5) o, q = Object.new, Queue.new p.start n = 5 # arbitrary |