From 81e9581bf380ceea2f5edaf6661c040c7fa81859 Mon Sep 17 00:00:00 2001 From: Alex Polcyn Date: Sat, 23 Sep 2017 18:48:02 -0700 Subject: Remove some sleeps in ruby tests and fix test server shutdown --- src/ruby/end2end/channel_closing_driver.rb | 6 +-- src/ruby/end2end/channel_state_driver.rb | 7 +-- src/ruby/end2end/end2end_common.rb | 6 +-- src/ruby/end2end/forking_client_driver.rb | 6 --- src/ruby/end2end/grpc_class_init_client.rb | 2 +- src/ruby/end2end/killed_client_thread_driver.rb | 56 +++++++++------------- .../multiple_killed_watching_threads_driver.rb | 2 + src/ruby/end2end/sig_handling_client.rb | 30 ++++++++---- src/ruby/end2end/sig_handling_driver.rb | 35 ++++++++++++-- .../end2end/sig_int_during_channel_watch_driver.rb | 4 -- src/ruby/lib/grpc/generic/rpc_server.rb | 37 +++++++++----- src/ruby/qps/worker.rb | 12 +++-- 12 files changed, 117 insertions(+), 86 deletions(-) (limited to 'src/ruby') diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index 0ceb3667eb..57544b0398 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -23,13 +23,11 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' control_stub, client_pid = start_client('channel_closing_client.rb', server_port) - + # sleep to allow time for the client to get into + # the middle of a "watch connectivity state" call sleep 3 begin diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index 98339baebe..f4b1cd2bb8 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -22,14 +22,11 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('channel_state_client.rb', server_port) - + # sleep to allow time for the client to get into + # the middle of a "watch connectivity state" call sleep 3 - Process.kill('SIGTERM', client_pid) begin diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb index a1b824fcbf..790fc23e92 100755 --- a/src/ruby/end2end/end2end_common.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -40,12 +40,13 @@ end # ServerRunner starts an "echo server" that test clients can make calls to class ServerRunner - def initialize(service_impl) + def initialize(service_impl, rpc_server_args: {}) @service_impl = service_impl + @rpc_server_args = rpc_server_args end def run - @srv = GRPC::RpcServer.new + @srv = GRPC::RpcServer.new(@rpc_server_args) port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) @srv.handle(@service_impl) @@ -75,7 +76,6 @@ def start_client(client_main, server_port) client_path, "--client_control_port=#{client_control_port}", "--server_port=#{server_port}") - sleep 1 control_stub = ClientControl::ClientController::Stub.new( "localhost:#{client_control_port}", :this_channel_is_insecure) [control_stub, client_pid] diff --git a/src/ruby/end2end/forking_client_driver.rb b/src/ruby/end2end/forking_client_driver.rb index 63565395f7..5cf1d73112 100755 --- a/src/ruby/end2end/forking_client_driver.rb +++ b/src/ruby/end2end/forking_client_driver.rb @@ -20,12 +20,6 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - # TODO(apolcyn) Can we get rid of this sleep? - # Without it, an immediate call to the just started EchoServer - # fails with UNAVAILABLE - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('forking_client_client.rb', server_port) diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb index c35719a71f..ff40350cfa 100755 --- a/src/ruby/end2end/grpc_class_init_client.rb +++ b/src/ruby/end2end/grpc_class_init_client.rb @@ -54,7 +54,7 @@ def run_concurrency_stress_test(test_proc) test_proc.call - fail 'exception thrown while child thread initing class' + fail '(expected) exception thrown while child thread initing class' end # default (no gc_stress and no concurrency_stress) diff --git a/src/ruby/end2end/killed_client_thread_driver.rb b/src/ruby/end2end/killed_client_thread_driver.rb index fce5d13e82..370f7e686b 100755 --- a/src/ruby/end2end/killed_client_thread_driver.rb +++ b/src/ruby/end2end/killed_client_thread_driver.rb @@ -17,56 +17,46 @@ require_relative './end2end_common' # Service that sleeps for a long time upon receiving an 'echo request' -# Also, this notifies @call_started_cv once it has received a request. +# Also, this calls it's callback upon receiving an RPC as a method +# of synchronization/waiting for the child to start. class SleepingEchoServerImpl < Echo::EchoServer::Service - def initialize(call_started, call_started_mu, call_started_cv) - @call_started = call_started - @call_started_mu = call_started_mu - @call_started_cv = call_started_cv + def initialize(received_rpc_callback) + @received_rpc_callback = received_rpc_callback end def echo(echo_req, _) - @call_started_mu.synchronize do - @call_started.set_true - @call_started_cv.signal - end - sleep 1000 + @received_rpc_callback.call + # sleep forever to get the client stuck waiting + sleep Echo::EchoReply.new(response: echo_req.request) end end -# Mutable boolean -class BoolHolder - attr_reader :val - - def init - @val = false - end - - def set_true - @val = true - end -end - def main STDERR.puts 'start server' - call_started = BoolHolder.new - call_started_mu = Mutex.new - call_started_cv = ConditionVariable.new + client_started = false + client_started_mu = Mutex.new + client_started_cv = ConditionVariable.new + received_rpc_callback = proc do + client_started_mu.synchronize do + client_started = true + client_started_cv.signal + end + end - service_impl = SleepingEchoServerImpl.new(call_started, - call_started_mu, - call_started_cv) - server_runner = ServerRunner.new(service_impl) + service_impl = SleepingEchoServerImpl.new(received_rpc_callback) + # RPCs against the server will all be hanging, so kill thread + # pool workers immediately rather than after waiting for a second. + rpc_server_args = { poll_period: 0, pool_keep_alive: 0 } + server_runner = ServerRunner.new(service_impl, rpc_server_args: rpc_server_args) server_port = server_runner.run - STDERR.puts 'start client' _, client_pid = start_client('killed_client_thread_client.rb', server_port) - call_started_mu.synchronize do - call_started_cv.wait(call_started_mu) until call_started.val + client_started_mu.synchronize do + client_started_cv.wait(client_started_mu) until client_started end # SIGTERM the child process now that it's diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb index 94d5e9da2d..59f6f275e4 100755 --- a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -26,6 +26,8 @@ def watch_state(ch) fail "non-idle state: #{state}" unless state == IDLE ch.watch_connectivity_state(IDLE, Time.now + 360) end + # sleep to get the thread into the middle of a + # "watch connectivity state" call sleep 0.1 thd.kill end diff --git a/src/ruby/end2end/sig_handling_client.rb b/src/ruby/end2end/sig_handling_client.rb index 41b5f334be..129ad7cb7f 100755 --- a/src/ruby/end2end/sig_handling_client.rb +++ b/src/ruby/end2end/sig_handling_client.rb @@ -30,16 +30,18 @@ class SigHandlingClientController < ClientControl::ClientController::Service end def shutdown(_, _) - Thread.new do - # TODO(apolcyn) There is a race between stopping the - # server and the "shutdown" rpc completing, - # See if stop method on server can end active RPC cleanly, to - # avoid this sleep. - sleep 3 + # Spawn a new thread because RpcServer#stop is + # synchronous and blocks until either this RPC has finished, + # or the server's "poll_period" seconds have passed. + @shutdown_thread = Thread.new do @srv.stop end ClientControl::Void.new end + + def join_shutdown_thread + @shutdown_thread.join + end end def main @@ -62,13 +64,23 @@ def main STDERR.puts 'SIGINT received' end - srv = GRPC::RpcServer.new + # The "shutdown" RPC should end very quickly. + # Allow a few seconds to be safe. + srv = GRPC::RpcServer.new(poll_period: 3) srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", :this_channel_is_insecure) - srv.handle(SigHandlingClientController.new(srv, stub)) - srv.run + control_service = SigHandlingClientController.new(srv, stub) + srv.handle(control_service) + server_thread = Thread.new do + srv.run + end + srv.wait_till_running + # send a first RPC to notify the parent process that we've started + stub.echo(Echo::EchoRequest.new(request: 'client/child started')) + server_thread.join + control_service.join_shutdown_thread end main diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index 291bf29424..0ad1cbd661 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -19,17 +19,42 @@ require_relative './end2end_common' +# A service that calls back it's received_rpc_callback +# upon receiving an RPC. Used for synchronization/waiting +# for child process to start. +class ClientStartedService < Echo::EchoServer::Service + def initialize(received_rpc_callback) + @received_rpc_callback = received_rpc_callback + end + + def echo(echo_req, _) + @received_rpc_callback.call unless @received_rpc_callback.nil? + @received_rpc_callback = nil + Echo::EchoReply.new(response: echo_req.request) + end +end + def main STDERR.puts 'start server' - server_runner = ServerRunner.new(EchoServerImpl) - server_port = server_runner.run - - sleep 1 + client_started = false + client_started_mu = Mutex.new + client_started_cv = ConditionVariable.new + received_rpc_callback = proc do + client_started_mu.synchronize do + client_started = true + client_started_cv.signal + end + end + client_started_service = ClientStartedService.new(received_rpc_callback) + server_runner = ServerRunner.new(client_started_service) + server_port = server_runner.run STDERR.puts 'start client' control_stub, client_pid = start_client('sig_handling_client.rb', server_port) - sleep 1 + client_started_mu.synchronize do + client_started_cv.wait(client_started_mu) until client_started + end count = 0 while count < 5 diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb index b054f0f5f3..2df22f48a2 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -23,13 +23,9 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('sig_int_during_channel_watch_client.rb', server_port) - # give time for the client to get into the middle # of a channel state watch call sleep 1 diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index d5fc11dc1c..c80c7fcd32 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -92,9 +92,13 @@ module GRPC # Stops the jobs in the pool def stop GRPC.logger.info('stopping, will wait for all the workers to exit') - schedule { throw :exit } while ready_for_work? - @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop @stopped = true + loop do + break unless ready_for_work? + worker_queue = @ready_workers.pop + worker_queue << [proc { throw :exit }, []] + end @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers @@ -138,7 +142,10 @@ module GRPC end # there shouldn't be any work given to this thread while its busy fail('received a task while busy') unless worker_queue.empty? - @ready_workers << worker_queue + @stop_mutex.synchronize do + return if @stopped + @ready_workers << worker_queue + end end end end @@ -186,8 +193,13 @@ module GRPC # * max_waiting_requests: Deprecated due to internal changes to the thread # pool. This is still an argument for compatibility but is ignored. # - # * poll_period: when present, the server polls for new events with this - # period + # * poll_period: The amount of time in seconds to wait for + # currently-serviced RPC's to finish before cancelling them when shutting + # down the server. + # + # * pool_keep_alive: The amount of time in seconds to wait + # for currently busy thread-pool threads to finish before + # forcing an abrupt exit to each thread. # # * connect_md_proc: # when non-nil is a proc for determining metadata to to send back the client @@ -202,17 +214,18 @@ module GRPC # intercepting server handlers to provide extra functionality. # Interceptors are an EXPERIMENTAL API. # - def initialize(pool_size:DEFAULT_POOL_SIZE, - max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:DEFAULT_POLL_PERIOD, - connect_md_proc:nil, - server_args:{}, - interceptors:[]) + def initialize(pool_size: DEFAULT_POOL_SIZE, + max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, + poll_period: DEFAULT_POLL_PERIOD, + pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, + connect_md_proc: nil, + server_args: {}, + interceptors: []) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size - @pool = Pool.new(@pool_size) + @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 21e8815890..8258487418 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -77,8 +77,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service Grpc::Testing::CoreResponse.new(cores: cpu_cores) end def quit_worker(_args, _call) - Thread.new { - sleep 3 + @shutdown_thread = Thread.new { @server.stop } Grpc::Testing::Void.new @@ -87,6 +86,9 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service @server = s @server_port = sp end + def join_shutdown_thread + @shutdown_thread.join + end end def main @@ -107,11 +109,13 @@ def main # Configure any errors with client or server child threads to surface Thread.abort_on_exception = true - s = GRPC::RpcServer.new + s = GRPC::RpcServer.new(poll_period: 3) s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure) - s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i)) + worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i) + s.handle(worker_service) s.run + worker_service.join_shutdown_thread end main -- cgit v1.2.3