aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-11-07 09:29:35 -0800
committerGravatar GitHub <noreply@github.com>2016-11-07 09:29:35 -0800
commitaa50f2e6a7d1592518994c04a173b238ff01a12d (patch)
tree7ed230446e52b3b3f5afa6beeb6cc00ddba39054
parent24f861b1f9b78c1760872828cf538155dcf547fb (diff)
parenta713b76d575e124cf2bc155ab54107930dad0fa8 (diff)
Merge pull request #8632 from apolcyn/fix_ruby_deadlock
remove wait queue from ruby thread pool to avoid possibility of deadlock
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb52
-rw-r--r--src/ruby/spec/generic/rpc_server_pool_spec.rb62
2 files changed, 69 insertions, 45 deletions
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index efdbeb6f2d..ec75ca81d7 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -48,6 +48,10 @@ module GRPC
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
+
+ # Each worker thread has its own queue to push and pull jobs
+ # these queues are put into @ready_queues when that worker is idle
+ @ready_workers = Queue.new
end
# Returns the number of jobs waiting
@@ -55,6 +59,13 @@ module GRPC
@jobs.size
end
+ def ready_for_work?
+ # Busy worker threads are either doing work, or have a single job
+ # waiting on them. Workers that are idle with no jobs waiting
+ # have their "queues" in @ready_workers
+ !@ready_workers.empty?
+ end
+
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
@@ -67,7 +78,11 @@ module GRPC
return
end
GRPC.logger.info('schedule another job')
- @jobs << [blk, args]
+ fail 'No worker threads available' if @ready_workers.empty?
+ worker_queue = @ready_workers.pop
+
+ fail 'worker already has a task waiting' unless worker_queue.empty?
+ worker_queue << [blk, args]
end
end
@@ -77,9 +92,11 @@ module GRPC
fail 'already stopped' if @stopped
end
until @workers.size == @size.to_i
- next_thread = Thread.new do
+ new_worker_queue = Queue.new
+ @ready_workers << new_worker_queue
+ next_thread = Thread.new(new_worker_queue) do |jobs|
catch(:exit) do # allows { throw :exit } to kill a thread
- loop_execute_jobs
+ loop_execute_jobs(jobs)
end
remove_current_thread
end
@@ -90,7 +107,7 @@ module GRPC
# Stops the jobs in the pool
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
- @workers.size.times { schedule { throw :exit } }
+ schedule { throw :exit } while ready_for_work?
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stopped = true
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
@@ -125,15 +142,18 @@ module GRPC
end
end
- def loop_execute_jobs
+ def loop_execute_jobs(worker_queue)
loop do
begin
- blk, args = @jobs.pop
+ blk, args = worker_queue.pop
blk.call(*args)
rescue StandardError => e
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end
+ # there shouldn't be any work given to this thread while its busy
+ fail('received a task while busy') unless worker_queue.empty?
+ @ready_workers << worker_queue
end
end
end
@@ -147,10 +167,10 @@ module GRPC
def_delegators :@server, :add_http2_port
- # Default thread pool size is 3
- DEFAULT_POOL_SIZE = 3
+ # Default thread pool size is 30
+ DEFAULT_POOL_SIZE = 30
- # Default max_waiting_requests size is 20
+ # Deprecated due to internal changes to the thread pool
DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s
@@ -175,11 +195,11 @@ module GRPC
# instance.
#
# * pool_size: the size of the thread pool the server uses to run its
- # threads
+ # threads. No more concurrent requests can be made than the size
+ # of the thread pool
#
- # * max_waiting_requests: the maximum number of requests that are not
- # being handled to allow. When this limit is exceeded, the server responds
- # with not available to new requests
+ # * max_waiting_requests: Deprecated due to internal changes to the thread
+ # pool. This is still an argument for compatibility but is ignored.
#
# * poll_period: when present, the server polls for new events with this
# period
@@ -330,10 +350,8 @@ module GRPC
# Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
def available?(an_rpc)
- jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
- GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
- return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
- GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ return an_rpc if @pool.ready_for_work?
+ GRPC.logger.warn('no free worker threads currently')
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true)
diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb
index b67008de48..48ccaee510 100644
--- a/src/ruby/spec/generic/rpc_server_pool_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb
@@ -29,6 +29,8 @@
require 'grpc'
+Thread.abort_on_exception = true
+
describe GRPC::Pool do
Pool = GRPC::Pool
@@ -44,32 +46,34 @@ describe GRPC::Pool do
end
end
- describe '#jobs_waiting' do
- it 'at start, it is zero' do
+ describe '#ready_for_work?' do
+ it 'before start it is not ready' do
p = Pool.new(1)
- expect(p.jobs_waiting).to be(0)
+ expect(p.ready_for_work?).to be(false)
end
- it 'it increases, with each scheduled job if the pool is not running' do
- p = Pool.new(1)
- job = proc {}
- expect(p.jobs_waiting).to be(0)
- 5.times do |i|
+ it 'it stops being ready after all workers jobs waiting or running' do
+ p = Pool.new(5)
+ p.start
+ job = proc { sleep(3) } # sleep so workers busy when done scheduling
+ 5.times do
+ expect(p.ready_for_work?).to be(true)
p.schedule(&job)
- expect(p.jobs_waiting).to be(i + 1)
end
+ expect(p.ready_for_work?).to be(false)
end
- it 'it decreases as jobs are run' do
- p = Pool.new(1)
+ it 'it becomes ready again after jobs complete' do
+ p = Pool.new(5)
+ p.start
job = proc {}
- expect(p.jobs_waiting).to be(0)
- 3.times do
+ 5.times do
+ expect(p.ready_for_work?).to be(true)
p.schedule(&job)
end
- p.start
- sleep 2
- expect(p.jobs_waiting).to be(0)
+ expect(p.ready_for_work?).to be(false)
+ sleep 5 # give the pool time do get at least one task done
+ expect(p.ready_for_work?).to be(true)
end
end
@@ -90,6 +94,18 @@ describe GRPC::Pool do
expect(q.pop).to be(o)
p.stop
end
+
+ it 'it throws an error if all of the workers have tasks to do' do
+ p = Pool.new(5)
+ p.start
+ job = proc {}
+ 5.times do
+ expect(p.ready_for_work?).to be(true)
+ p.schedule(&job)
+ end
+ expect { p.schedule(&job) }.to raise_error
+ expect { p.schedule(&job) }.to raise_error
+ end
end
describe '#stop' do
@@ -113,18 +129,8 @@ describe GRPC::Pool do
end
describe '#start' do
- it 'runs pre-scheduled jobs' do
- p = Pool.new(2)
- o, q = Object.new, Queue.new
- n = 5 # arbitrary
- n.times { p.schedule(o, &q.method(:push)) }
- p.start
- n.times { expect(q.pop).to be(o) }
- p.stop
- end
-
- it 'runs jobs as they are scheduled ' do
- p = Pool.new(2)
+ it 'runs jobs as they are scheduled' do
+ p = Pool.new(5)
o, q = Object.new, Queue.new
p.start
n = 5 # arbitrary