aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Michael Lumish <mlumish@google.com>2015-05-26 09:39:53 -0700
committerGravatar Michael Lumish <mlumish@google.com>2015-05-26 09:39:53 -0700
commit592dfaed87ef953b22b1e48b64ae51ad915b5af9 (patch)
tree0242f7368baf04ae9b7c4f8defae71ff0c50c82e /src
parent8639f85c026afa6a6c9df0ee546954662312c00a (diff)
parent4aba356630d86a06b558945d6a2897c64f627652 (diff)
Merge pull request #1735 from tbetbetbe/grpc-rb-rpc-server-improvements
Various tweaks to improve server stability
Diffstat (limited to 'src')
-rw-r--r--src/ruby/.rubocop_todo.yml6
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb60
-rw-r--r--src/ruby/spec/generic/rpc_server_pool_spec.rb4
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb42
4 files changed, 73 insertions, 39 deletions
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml
index ed4a4438b3..c35e970df6 100644
--- a/src/ruby/.rubocop_todo.yml
+++ b/src/ruby/.rubocop_todo.yml
@@ -1,5 +1,5 @@
# This configuration was generated by `rubocop --auto-gen-config`
-# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0.
+# on 2015-05-22 13:23:34 -0700 using RuboCop version 0.30.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
@@ -7,12 +7,12 @@
# Offense count: 30
Metrics/AbcSize:
- Max: 40
+ Max: 38
# Offense count: 3
# Configuration parameters: CountComments.
Metrics/ClassLength:
- Max: 184
+ Max: 192
# Offense count: 35
# Configuration parameters: CountComments.
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 665c144432..dcb11bfbef 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -76,7 +76,7 @@ module GRPC
@jobs = Queue.new
@size = size
@stopped = false
- @stop_mutex = Mutex.new
+ @stop_mutex = Mutex.new # needs to be held when accessing @stopped
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
@@ -92,10 +92,15 @@ module GRPC
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
- fail 'already stopped' if @stopped
return if blk.nil?
- GRPC.logger.info('schedule another job')
- @jobs << [blk, args]
+ @stop_mutex.synchronize do
+ if @stopped
+ GRPC.logger.warn('did not schedule job, already stopped')
+ return
+ end
+ GRPC.logger.info('schedule another job')
+ @jobs << [blk, args]
+ end
end
# Starts running the jobs in the thread pool.
@@ -116,8 +121,8 @@ module GRPC
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
- @stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
+ @stopped = true
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
@@ -249,15 +254,18 @@ module GRPC
server_override:nil,
connect_md_proc:nil,
**kw)
- @cq = RpcServer.setup_cq(completion_queue_override)
- @server = RpcServer.setup_srv(server_override, @cq, **kw)
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
- @pool_size = pool_size
+ @cq = RpcServer.setup_cq(completion_queue_override)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
- @run_mutex = Mutex.new
- @run_cond = ConditionVariable.new
+ @pool_size = pool_size
@pool = Pool.new(@pool_size)
+ @run_cond = ConditionVariable.new
+ @run_mutex = Mutex.new
+ @running = false
+ @server = RpcServer.setup_srv(server_override, @cq, **kw)
+ @stopped = false
+ @stop_mutex = Mutex.new
end
# stops a running server
@@ -266,20 +274,23 @@ module GRPC
# server's current call loop is it's last.
def stop
return unless @running
- @stopped = true
+ @stop_mutex.synchronize do
+ @stopped = true
+ end
@pool.stop
+ @server.close
+ end
- # TODO: uncomment this:
- #
- # This segfaults in the c layer, so its commented out for now. Shutdown
- # still occurs, but the c layer has to do the cleanup.
- #
- # @server.close
+ # determines if the server has been stopped
+ def stopped?
+ @stop_mutex.synchronize do
+ return @stopped
+ end
end
# determines if the server is currently running
def running?
- @running ||= false
+ @running
end
# Is called from other threads to wait for #run to start up the server.
@@ -311,11 +322,6 @@ module GRPC
t.join
end
- # Determines if the server is currently stopped
- def stopped?
- @stopped ||= false
- end
-
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
@@ -407,7 +413,13 @@ module GRPC
request_call_tag = Object.new
until stopped?
deadline = from_relative_time(@poll_period)
- an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ begin
+ an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ rescue Core::CallError, RuntimeError => e
+ # can happen during server shutdown
+ GRPC.logger.warn("server call failed: #{e}")
+ next
+ end
c = new_active_server_call(an_rpc)
unless c.nil?
mth = an_rpc.method.to_sym
diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb
index aae3a7d7cb..b67008de48 100644
--- a/src/ruby/spec/generic/rpc_server_pool_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb
@@ -74,11 +74,11 @@ describe GRPC::Pool do
end
describe '#schedule' do
- it 'throws if the pool is already stopped' do
+ it 'return if the pool is already stopped' do
p = Pool.new(1)
p.stop
job = proc {}
- expect { p.schedule(&job) }.to raise_error
+ expect { p.schedule(&job) }.to_not raise_error
end
it 'adds jobs that get run by the pool' do
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 640b0f656c..e60a8b27c3 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -212,10 +212,14 @@ describe GRPC::RpcServer do
describe '#stopped?' do
before(:each) do
- opts = { a_channel_arg: 'an_arg', poll_period: 1 }
+ opts = { a_channel_arg: 'an_arg', poll_period: 1.5 }
@srv = RpcServer.new(**opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'starts out false' do
expect(@srv.stopped?).to be(false)
end
@@ -225,7 +229,7 @@ describe GRPC::RpcServer do
expect(@srv.stopped?).to be(false)
end
- it 'stays false after the server starts running' do
+ it 'stays false after the server starts running', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
@@ -234,7 +238,7 @@ describe GRPC::RpcServer do
t.join
end
- it 'is true after a running server is stopped' do
+ it 'is true after a running server is stopped', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
@@ -251,21 +255,22 @@ describe GRPC::RpcServer do
expect(r.running?).to be(false)
end
- it 'is false after run is called with no services registered' do
+ it 'is false if run is called with no services registered', server: true do
opts = {
a_channel_arg: 'an_arg',
- poll_period: 1,
+ poll_period: 2,
server_override: @server
}
r = RpcServer.new(**opts)
r.run
expect(r.running?).to be(false)
+ r.stop
end
it 'is true after run is called with a registered service' do
opts = {
a_channel_arg: 'an_arg',
- poll_period: 1,
+ poll_period: 2.5,
server_override: @server
}
r = RpcServer.new(**opts)
@@ -284,6 +289,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**@opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'raises if #run has already been called' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -335,6 +344,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -376,7 +389,7 @@ describe GRPC::RpcServer do
t.join
end
- it 'should receive metadata when a deadline is specified', server: true do
+ it 'should receive metadata if a deadline is specified', server: true do
service = SlowService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@@ -445,11 +458,11 @@ describe GRPC::RpcServer do
it 'should handle multiple parallel requests', server: true do
@srv.handle(EchoService)
- Thread.new { @srv.run }
+ t = Thread.new { @srv.run }
@srv.wait_till_running
req, q = EchoMsg.new, Queue.new
n = 5 # arbitrary
- threads = []
+ threads = [t]
n.times do
threads << Thread.new do
stub = EchoStub.new(@host, **client_opts)
@@ -472,7 +485,7 @@ describe GRPC::RpcServer do
}
alt_srv = RpcServer.new(**opts)
alt_srv.handle(SlowService)
- Thread.new { alt_srv.run }
+ t = Thread.new { alt_srv.run }
alt_srv.wait_till_running
req = EchoMsg.new
n = 5 # arbitrary, use as many to ensure the server pool is exceeded
@@ -490,6 +503,7 @@ describe GRPC::RpcServer do
end
threads.each(&:join)
alt_srv.stop
+ t.join
expect(one_failed_as_unavailable).to be(true)
end
end
@@ -513,6 +527,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'should send connect metadata to the client', server: true do
service = EchoService.new
@srv.handle(service)
@@ -545,6 +563,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'should be added to BadStatus when requests fail', server: true do
service = FailingService.new
@srv.handle(service)