aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2016-03-10 14:24:33 -0800
committerGravatar Nicolas Noble <nicolasnoble@users.noreply.github.com>2016-03-10 14:24:33 -0800
commitfbdf51b07e56d0794776e40bc3a4a054e1883f0e (patch)
tree9485c325b67a36963385e229bc9fbdbd3787d86a
parent52c05e9af44a21fa780745ec5d5aeccbe91fcd87 (diff)
parent1d68520c22fc708cdd6043962a0f59964a8aaf60 (diff)
Merge pull request #5678 from murgatroid99/ruby_synchronization_fixes
Ruby: fix some synchronization code in server implementation
-rw-r--r--src/ruby/.rubocop.yml3
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb97
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb25
3 files changed, 64 insertions, 61 deletions
diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml
index ff5cf8db83..d13ce42655 100644
--- a/src/ruby/.rubocop.yml
+++ b/src/ruby/.rubocop.yml
@@ -15,3 +15,6 @@ Metrics/CyclomaticComplexity:
Metrics/PerceivedComplexity:
Max: 8
+
+Metrics/ClassLength:
+ Max: 250
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ef2997c991..b30d19dd2b 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -107,7 +107,9 @@ module GRPC
# Starts running the jobs in the thread pool.
def start
- fail 'already stopped' if @stopped
+ @stop_mutex.synchronize do
+ fail 'already stopped' if @stopped
+ end
until @workers.size == @size.to_i
next_thread = Thread.new do
catch(:exit) do # allows { throw :exit } to kill a thread
@@ -264,10 +266,10 @@ module GRPC
@pool = Pool.new(@pool_size)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
- @running = false
+ # running_state can take 4 values: :not_started, :running, :stopping, and
+ # :stopped. State transitions can only proceed in that order.
+ @running_state = :not_started
@server = RpcServer.setup_srv(server_override, @cq, **kw)
- @stopped = false
- @stop_mutex = Mutex.new
end
# stops a running server
@@ -275,27 +277,42 @@ module GRPC
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def stop
- return unless @running
- @stop_mutex.synchronize do
- @stopped = true
+ @run_mutex.synchronize do
+ fail 'Cannot stop before starting' if @running_state == :not_started
+ return if @running_state != :running
+ transition_running_state(:stopping)
end
deadline = from_relative_time(@poll_period)
- return if @server.close(@cq, deadline)
- deadline = from_relative_time(@poll_period)
@server.close(@cq, deadline)
@pool.stop
end
- # determines if the server has been stopped
- def stopped?
- @stop_mutex.synchronize do
- return @stopped
+ def running_state
+ @run_mutex.synchronize do
+ return @running_state
+ end
+ end
+
+ # Can only be called while holding @run_mutex
+ def transition_running_state(target_state)
+ state_transitions = {
+ not_started: :running,
+ running: :stopping,
+ stopping: :stopped
+ }
+ if state_transitions[@running_state] == target_state
+ @running_state = target_state
+ else
+ fail "Bad server state transition: #{@running_state}->#{target_state}"
end
end
- # determines if the server is currently running
def running?
- @running
+ running_state == :running
+ end
+
+ def stopped?
+ running_state == :stopped
end
# Is called from other threads to wait for #run to start up the server.
@@ -304,13 +321,11 @@ module GRPC
#
# @param timeout [Numeric] number of seconds to wait
# @result [true, false] true if the server is running, false otherwise
- def wait_till_running(timeout = 0.1)
- end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100
- while Time.now < end_time
- @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running?
- sleep(sleep_period)
+ def wait_till_running(timeout = nil)
+ @run_mutex.synchronize do
+ @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
+ return @running_state == :running
end
- running?
end
# Runs the server in its own thread, then waits for signal INT or TERM on
@@ -360,11 +375,14 @@ module GRPC
# @param service [Object|Class] a service class or object as described
# above
def handle(service)
- fail 'cannot add services if the server is running' if running?
- fail 'cannot add services if the server is stopped' if stopped?
- cls = service.is_a?(Class) ? service : service.class
- assert_valid_service_class(cls)
- add_rpc_descs_for(service)
+ @run_mutex.synchronize do
+ unless @running_state == :not_started
+ fail 'cannot add services if the server has been started'
+ end
+ cls = service.is_a?(Class) ? service : service.class
+ assert_valid_service_class(cls)
+ add_rpc_descs_for(service)
+ end
end
# runs the server
@@ -375,16 +393,13 @@ module GRPC
# - #running? returns true after this is called, until #stop cause the
# the server to stop.
def run
- if rpc_descs.size.zero?
- GRPC.logger.warn('did not run as no services were present')
- return
- end
@run_mutex.synchronize do
- @running = true
- @run_cond.signal
+ fail 'cannot run without registering services' if rpc_descs.size.zero?
+ @pool.start
+ @server.start
+ transition_running_state(:running)
+ @run_cond.broadcast
end
- @pool.start
- @server.start
loop_handle_server_calls
end
@@ -413,9 +428,9 @@ module GRPC
# handles calls to the server
def loop_handle_server_calls
- fail 'not running' unless @running
+ fail 'not started' if running_state == :not_started
loop_tag = Object.new
- until stopped?
+ while running_state == :running
begin
an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
break if (!an_rpc.nil?) && an_rpc.call.nil?
@@ -430,11 +445,14 @@ module GRPC
rescue Core::CallError, RuntimeError => e
# these might happen for various reasonse. The correct behaviour of
# the server is to log them and continue, if it's not shutting down.
- GRPC.logger.warn("server call failed: #{e}") unless stopped?
+ if running_state == :running
+ GRPC.logger.warn("server call failed: #{e}")
+ end
next
end
end
- @running = false
+ # @running_state should be :stopping here
+ @run_mutex.synchronize { transition_running_state(:stopped) }
GRPC.logger.info("stopped: #{self}")
end
@@ -484,9 +502,10 @@ module GRPC
cls.assert_rpc_descs_have_methods
end
+ # This should be called while holding @run_mutex
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
- specs, handlers = rpc_descs, rpc_handlers
+ specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
fail "already registered: rpc #{route} from #{spec}" if specs.key? route
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index be6331d68b..dfaec6d6ed 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -220,19 +220,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**opts)
end
- after(:each) do
- @srv.stop
- end
-
it 'starts out false' do
expect(@srv.stopped?).to be(false)
end
- it 'stays false after a #stop is called before #run' do
- @srv.stop
- expect(@srv.stopped?).to be(false)
- end
-
it 'stays false after the server starts running', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -247,8 +238,8 @@ describe GRPC::RpcServer do
t = Thread.new { @srv.run }
@srv.wait_till_running
@srv.stop
- expect(@srv.stopped?).to be(true)
t.join
+ expect(@srv.stopped?).to be(true)
end
end
@@ -266,9 +257,7 @@ describe GRPC::RpcServer do
server_override: @server
}
r = RpcServer.new(**opts)
- r.run
- expect(r.running?).to be(false)
- r.stop
+ expect { r.run }.to raise_error(RuntimeError)
end
it 'is true after run is called with a registered service' do
@@ -293,10 +282,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**@opts)
end
- after(:each) do
- @srv.stop
- end
-
it 'raises if #run has already been called' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -528,10 +513,6 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
- after(:each) do
- @srv.stop
- end
-
it 'should be added to BadStatus when requests fail', server: true do
service = FailingService.new
@srv.handle(service)