diff options
author | Tim Emiola <temiola@google.com> | 2015-05-22 17:25:49 -0700 |
---|---|---|
committer | Tim Emiola <temiola@google.com> | 2015-05-22 17:25:49 -0700 |
commit | 4aba356630d86a06b558945d6a2897c64f627652 (patch) | |
tree | 4556f522793a68b43072311e49a2ae2b811243ea | |
parent | b79f33c819c33459182130cebce0b57c64b5d0e7 (diff) |
Various tweaks to improve server stability
-rw-r--r-- | src/ruby/.rubocop_todo.yml | 6 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 60 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_pool_spec.rb | 4 | ||||
-rw-r--r-- | src/ruby/spec/generic/rpc_server_spec.rb | 42 |
4 files changed, 73 insertions, 39 deletions
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index ed4a4438b3..c35e970df6 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,5 +1,5 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0. +# on 2015-05-22 13:23:34 -0700 using RuboCop version 0.30.1. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new @@ -7,12 +7,12 @@ # Offense count: 30 Metrics/AbcSize: - Max: 40 + Max: 38 # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 184 + Max: 192 # Offense count: 35 # Configuration parameters: CountComments. diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 665c144432..dcb11bfbef 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -76,7 +76,7 @@ module GRPC @jobs = Queue.new @size = size @stopped = false - @stop_mutex = Mutex.new + @stop_mutex = Mutex.new # needs to be held when accessing @stopped @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive @@ -92,10 +92,15 @@ module GRPC # @param args the args passed blk when it is called # @param blk the block to call def schedule(*args, &blk) - fail 'already stopped' if @stopped return if blk.nil? - GRPC.logger.info('schedule another job') - @jobs << [blk, args] + @stop_mutex.synchronize do + if @stopped + GRPC.logger.warn('did not schedule job, already stopped') + return + end + GRPC.logger.info('schedule another job') + @jobs << [blk, args] + end end # Starts running the jobs in the thread pool. @@ -116,8 +121,8 @@ module GRPC def stop GRPC.logger.info('stopping, will wait for all the workers to exit') @workers.size.times { schedule { throw :exit } } - @stopped = true @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stopped = true @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers @@ -249,15 +254,18 @@ module GRPC server_override:nil, connect_md_proc:nil, **kw) - @cq = RpcServer.setup_cq(completion_queue_override) - @server = RpcServer.setup_srv(server_override, @cq, **kw) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) - @pool_size = pool_size + @cq = RpcServer.setup_cq(completion_queue_override) @max_waiting_requests = max_waiting_requests @poll_period = poll_period - @run_mutex = Mutex.new - @run_cond = ConditionVariable.new + @pool_size = pool_size @pool = Pool.new(@pool_size) + @run_cond = ConditionVariable.new + @run_mutex = Mutex.new + @running = false + @server = RpcServer.setup_srv(server_override, @cq, **kw) + @stopped = false + @stop_mutex = Mutex.new end # stops a running server @@ -266,20 +274,23 @@ module GRPC # server's current call loop is it's last. def stop return unless @running - @stopped = true + @stop_mutex.synchronize do + @stopped = true + end @pool.stop + @server.close + end - # TODO: uncomment this: - # - # This segfaults in the c layer, so its commented out for now. Shutdown - # still occurs, but the c layer has to do the cleanup. - # - # @server.close + # determines if the server has been stopped + def stopped? + @stop_mutex.synchronize do + return @stopped + end end # determines if the server is currently running def running? - @running ||= false + @running end # Is called from other threads to wait for #run to start up the server. @@ -311,11 +322,6 @@ module GRPC t.join end - # Determines if the server is currently stopped - def stopped? - @stopped ||= false - end - # handle registration of classes # # service is either a class that includes GRPC::GenericService and whose @@ -407,7 +413,13 @@ module GRPC request_call_tag = Object.new until stopped? deadline = from_relative_time(@poll_period) - an_rpc = @server.request_call(@cq, request_call_tag, deadline) + begin + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + rescue Core::CallError, RuntimeError => e + # can happen during server shutdown + GRPC.logger.warn("server call failed: #{e}") + next + end c = new_active_server_call(an_rpc) unless c.nil? mth = an_rpc.method.to_sym diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index aae3a7d7cb..b67008de48 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -74,11 +74,11 @@ describe GRPC::Pool do end describe '#schedule' do - it 'throws if the pool is already stopped' do + it 'return if the pool is already stopped' do p = Pool.new(1) p.stop job = proc {} - expect { p.schedule(&job) }.to raise_error + expect { p.schedule(&job) }.to_not raise_error end it 'adds jobs that get run by the pool' do diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 640b0f656c..e60a8b27c3 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -212,10 +212,14 @@ describe GRPC::RpcServer do describe '#stopped?' do before(:each) do - opts = { a_channel_arg: 'an_arg', poll_period: 1 } + opts = { a_channel_arg: 'an_arg', poll_period: 1.5 } @srv = RpcServer.new(**opts) end + after(:each) do + @srv.stop + end + it 'starts out false' do expect(@srv.stopped?).to be(false) end @@ -225,7 +229,7 @@ describe GRPC::RpcServer do expect(@srv.stopped?).to be(false) end - it 'stays false after the server starts running' do + it 'stays false after the server starts running', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running @@ -234,7 +238,7 @@ describe GRPC::RpcServer do t.join end - it 'is true after a running server is stopped' do + it 'is true after a running server is stopped', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running @@ -251,21 +255,22 @@ describe GRPC::RpcServer do expect(r.running?).to be(false) end - it 'is false after run is called with no services registered' do + it 'is false if run is called with no services registered', server: true do opts = { a_channel_arg: 'an_arg', - poll_period: 1, + poll_period: 2, server_override: @server } r = RpcServer.new(**opts) r.run expect(r.running?).to be(false) + r.stop end it 'is true after run is called with a registered service' do opts = { a_channel_arg: 'an_arg', - poll_period: 1, + poll_period: 2.5, server_override: @server } r = RpcServer.new(**opts) @@ -284,6 +289,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**@opts) end + after(:each) do + @srv.stop + end + it 'raises if #run has already been called' do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -335,6 +344,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end + after(:each) do + @srv.stop + end + it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -376,7 +389,7 @@ describe GRPC::RpcServer do t.join end - it 'should receive metadata when a deadline is specified', server: true do + it 'should receive metadata if a deadline is specified', server: true do service = SlowService.new @srv.handle(service) t = Thread.new { @srv.run } @@ -445,11 +458,11 @@ describe GRPC::RpcServer do it 'should handle multiple parallel requests', server: true do @srv.handle(EchoService) - Thread.new { @srv.run } + t = Thread.new { @srv.run } @srv.wait_till_running req, q = EchoMsg.new, Queue.new n = 5 # arbitrary - threads = [] + threads = [t] n.times do threads << Thread.new do stub = EchoStub.new(@host, **client_opts) @@ -472,7 +485,7 @@ describe GRPC::RpcServer do } alt_srv = RpcServer.new(**opts) alt_srv.handle(SlowService) - Thread.new { alt_srv.run } + t = Thread.new { alt_srv.run } alt_srv.wait_till_running req = EchoMsg.new n = 5 # arbitrary, use as many to ensure the server pool is exceeded @@ -490,6 +503,7 @@ describe GRPC::RpcServer do end threads.each(&:join) alt_srv.stop + t.join expect(one_failed_as_unavailable).to be(true) end end @@ -513,6 +527,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end + after(:each) do + @srv.stop + end + it 'should send connect metadata to the client', server: true do service = EchoService.new @srv.handle(service) @@ -545,6 +563,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end + after(:each) do + @srv.stop + end + it 'should be added to BadStatus when requests fail', server: true do service = FailingService.new @srv.handle(service) |