diff options
Diffstat (limited to 'src/ruby')
-rwxr-xr-x | src/ruby/end2end/channel_closing_driver.rb | 6 | ||||
-rwxr-xr-x | src/ruby/end2end/channel_state_driver.rb | 7 | ||||
-rwxr-xr-x | src/ruby/end2end/end2end_common.rb | 6 | ||||
-rwxr-xr-x | src/ruby/end2end/forking_client_driver.rb | 6 | ||||
-rwxr-xr-x | src/ruby/end2end/grpc_class_init_client.rb | 2 | ||||
-rwxr-xr-x | src/ruby/end2end/killed_client_thread_driver.rb | 56 | ||||
-rwxr-xr-x | src/ruby/end2end/multiple_killed_watching_threads_driver.rb | 2 | ||||
-rwxr-xr-x | src/ruby/end2end/sig_handling_client.rb | 30 | ||||
-rwxr-xr-x | src/ruby/end2end/sig_handling_driver.rb | 35 | ||||
-rwxr-xr-x | src/ruby/end2end/sig_int_during_channel_watch_driver.rb | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.h | 6 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 37 | ||||
-rw-r--r-- | src/ruby/lib/grpc/version.rb | 2 | ||||
-rw-r--r-- | src/ruby/pb/grpc/health/checker.rb | 14 | ||||
-rwxr-xr-x | src/ruby/qps/worker.rb | 12 | ||||
-rw-r--r-- | src/ruby/spec/pb/health/checker_spec.rb | 29 | ||||
-rw-r--r-- | src/ruby/tools/version.rb | 2 |
17 files changed, 165 insertions, 91 deletions
diff --git a/src/ruby/end2end/channel_closing_driver.rb b/src/ruby/end2end/channel_closing_driver.rb index 0ceb3667eb..57544b0398 100755 --- a/src/ruby/end2end/channel_closing_driver.rb +++ b/src/ruby/end2end/channel_closing_driver.rb @@ -23,13 +23,11 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' control_stub, client_pid = start_client('channel_closing_client.rb', server_port) - + # sleep to allow time for the client to get into + # the middle of a "watch connectivity state" call sleep 3 begin diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index 98339baebe..f4b1cd2bb8 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -22,14 +22,11 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('channel_state_client.rb', server_port) - + # sleep to allow time for the client to get into + # the middle of a "watch connectivity state" call sleep 3 - Process.kill('SIGTERM', client_pid) begin diff --git a/src/ruby/end2end/end2end_common.rb b/src/ruby/end2end/end2end_common.rb index a1b824fcbf..790fc23e92 100755 --- a/src/ruby/end2end/end2end_common.rb +++ b/src/ruby/end2end/end2end_common.rb @@ -40,12 +40,13 @@ end # ServerRunner starts an "echo server" that test clients can make calls to class ServerRunner - def initialize(service_impl) + def initialize(service_impl, rpc_server_args: {}) @service_impl = service_impl + @rpc_server_args = rpc_server_args end def run - @srv = GRPC::RpcServer.new + @srv = GRPC::RpcServer.new(@rpc_server_args) port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) @srv.handle(@service_impl) @@ -75,7 +76,6 @@ def start_client(client_main, server_port) client_path, "--client_control_port=#{client_control_port}", "--server_port=#{server_port}") - sleep 1 control_stub = ClientControl::ClientController::Stub.new( "localhost:#{client_control_port}", :this_channel_is_insecure) [control_stub, client_pid] diff --git a/src/ruby/end2end/forking_client_driver.rb b/src/ruby/end2end/forking_client_driver.rb index 63565395f7..5cf1d73112 100755 --- a/src/ruby/end2end/forking_client_driver.rb +++ b/src/ruby/end2end/forking_client_driver.rb @@ -20,12 +20,6 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - # TODO(apolcyn) Can we get rid of this sleep? - # Without it, an immediate call to the just started EchoServer - # fails with UNAVAILABLE - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('forking_client_client.rb', server_port) diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb index c35719a71f..ff40350cfa 100755 --- a/src/ruby/end2end/grpc_class_init_client.rb +++ b/src/ruby/end2end/grpc_class_init_client.rb @@ -54,7 +54,7 @@ def run_concurrency_stress_test(test_proc) test_proc.call - fail 'exception thrown while child thread initing class' + fail '(expected) exception thrown while child thread initing class' end # default (no gc_stress and no concurrency_stress) diff --git a/src/ruby/end2end/killed_client_thread_driver.rb b/src/ruby/end2end/killed_client_thread_driver.rb index fce5d13e82..370f7e686b 100755 --- a/src/ruby/end2end/killed_client_thread_driver.rb +++ b/src/ruby/end2end/killed_client_thread_driver.rb @@ -17,56 +17,46 @@ require_relative './end2end_common' # Service that sleeps for a long time upon receiving an 'echo request' -# Also, this notifies @call_started_cv once it has received a request. +# Also, this calls it's callback upon receiving an RPC as a method +# of synchronization/waiting for the child to start. class SleepingEchoServerImpl < Echo::EchoServer::Service - def initialize(call_started, call_started_mu, call_started_cv) - @call_started = call_started - @call_started_mu = call_started_mu - @call_started_cv = call_started_cv + def initialize(received_rpc_callback) + @received_rpc_callback = received_rpc_callback end def echo(echo_req, _) - @call_started_mu.synchronize do - @call_started.set_true - @call_started_cv.signal - end - sleep 1000 + @received_rpc_callback.call + # sleep forever to get the client stuck waiting + sleep Echo::EchoReply.new(response: echo_req.request) end end -# Mutable boolean -class BoolHolder - attr_reader :val - - def init - @val = false - end - - def set_true - @val = true - end -end - def main STDERR.puts 'start server' - call_started = BoolHolder.new - call_started_mu = Mutex.new - call_started_cv = ConditionVariable.new + client_started = false + client_started_mu = Mutex.new + client_started_cv = ConditionVariable.new + received_rpc_callback = proc do + client_started_mu.synchronize do + client_started = true + client_started_cv.signal + end + end - service_impl = SleepingEchoServerImpl.new(call_started, - call_started_mu, - call_started_cv) - server_runner = ServerRunner.new(service_impl) + service_impl = SleepingEchoServerImpl.new(received_rpc_callback) + # RPCs against the server will all be hanging, so kill thread + # pool workers immediately rather than after waiting for a second. + rpc_server_args = { poll_period: 0, pool_keep_alive: 0 } + server_runner = ServerRunner.new(service_impl, rpc_server_args: rpc_server_args) server_port = server_runner.run - STDERR.puts 'start client' _, client_pid = start_client('killed_client_thread_client.rb', server_port) - call_started_mu.synchronize do - call_started_cv.wait(call_started_mu) until call_started.val + client_started_mu.synchronize do + client_started_cv.wait(client_started_mu) until client_started end # SIGTERM the child process now that it's diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb index 94d5e9da2d..59f6f275e4 100755 --- a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -26,6 +26,8 @@ def watch_state(ch) fail "non-idle state: #{state}" unless state == IDLE ch.watch_connectivity_state(IDLE, Time.now + 360) end + # sleep to get the thread into the middle of a + # "watch connectivity state" call sleep 0.1 thd.kill end diff --git a/src/ruby/end2end/sig_handling_client.rb b/src/ruby/end2end/sig_handling_client.rb index 41b5f334be..129ad7cb7f 100755 --- a/src/ruby/end2end/sig_handling_client.rb +++ b/src/ruby/end2end/sig_handling_client.rb @@ -30,16 +30,18 @@ class SigHandlingClientController < ClientControl::ClientController::Service end def shutdown(_, _) - Thread.new do - # TODO(apolcyn) There is a race between stopping the - # server and the "shutdown" rpc completing, - # See if stop method on server can end active RPC cleanly, to - # avoid this sleep. - sleep 3 + # Spawn a new thread because RpcServer#stop is + # synchronous and blocks until either this RPC has finished, + # or the server's "poll_period" seconds have passed. + @shutdown_thread = Thread.new do @srv.stop end ClientControl::Void.new end + + def join_shutdown_thread + @shutdown_thread.join + end end def main @@ -62,13 +64,23 @@ def main STDERR.puts 'SIGINT received' end - srv = GRPC::RpcServer.new + # The "shutdown" RPC should end very quickly. + # Allow a few seconds to be safe. + srv = GRPC::RpcServer.new(poll_period: 3) srv.add_http2_port("0.0.0.0:#{client_control_port}", :this_port_is_insecure) stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", :this_channel_is_insecure) - srv.handle(SigHandlingClientController.new(srv, stub)) - srv.run + control_service = SigHandlingClientController.new(srv, stub) + srv.handle(control_service) + server_thread = Thread.new do + srv.run + end + srv.wait_till_running + # send a first RPC to notify the parent process that we've started + stub.echo(Echo::EchoRequest.new(request: 'client/child started')) + server_thread.join + control_service.join_shutdown_thread end main diff --git a/src/ruby/end2end/sig_handling_driver.rb b/src/ruby/end2end/sig_handling_driver.rb index 291bf29424..0ad1cbd661 100755 --- a/src/ruby/end2end/sig_handling_driver.rb +++ b/src/ruby/end2end/sig_handling_driver.rb @@ -19,17 +19,42 @@ require_relative './end2end_common' +# A service that calls back it's received_rpc_callback +# upon receiving an RPC. Used for synchronization/waiting +# for child process to start. +class ClientStartedService < Echo::EchoServer::Service + def initialize(received_rpc_callback) + @received_rpc_callback = received_rpc_callback + end + + def echo(echo_req, _) + @received_rpc_callback.call unless @received_rpc_callback.nil? + @received_rpc_callback = nil + Echo::EchoReply.new(response: echo_req.request) + end +end + def main STDERR.puts 'start server' - server_runner = ServerRunner.new(EchoServerImpl) - server_port = server_runner.run - - sleep 1 + client_started = false + client_started_mu = Mutex.new + client_started_cv = ConditionVariable.new + received_rpc_callback = proc do + client_started_mu.synchronize do + client_started = true + client_started_cv.signal + end + end + client_started_service = ClientStartedService.new(received_rpc_callback) + server_runner = ServerRunner.new(client_started_service) + server_port = server_runner.run STDERR.puts 'start client' control_stub, client_pid = start_client('sig_handling_client.rb', server_port) - sleep 1 + client_started_mu.synchronize do + client_started_cv.wait(client_started_mu) until client_started + end count = 0 while count < 5 diff --git a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb index b054f0f5f3..2df22f48a2 100755 --- a/src/ruby/end2end/sig_int_during_channel_watch_driver.rb +++ b/src/ruby/end2end/sig_int_during_channel_watch_driver.rb @@ -23,13 +23,9 @@ def main STDERR.puts 'start server' server_runner = ServerRunner.new(EchoServerImpl) server_port = server_runner.run - - sleep 1 - STDERR.puts 'start client' _, client_pid = start_client('sig_int_during_channel_watch_client.rb', server_port) - # give time for the client to get into the middle # of a channel state watch call sleep 1 diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index ae1e1a0b30..c2698d16ea 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -337,7 +337,7 @@ extern grpc_composite_call_credentials_create_type grpc_composite_call_credentia typedef grpc_call_credentials*(*grpc_google_compute_engine_credentials_create_type)(void* reserved); extern grpc_google_compute_engine_credentials_create_type grpc_google_compute_engine_credentials_create_import; #define grpc_google_compute_engine_credentials_create grpc_google_compute_engine_credentials_create_import -typedef gpr_timespec(*grpc_max_auth_token_lifetime_type)(); +typedef gpr_timespec(*grpc_max_auth_token_lifetime_type)(void); extern grpc_max_auth_token_lifetime_type grpc_max_auth_token_lifetime_import; #define grpc_max_auth_token_lifetime grpc_max_auth_token_lifetime_import typedef grpc_call_credentials*(*grpc_service_account_jwt_access_credentials_create_type)(const char* json_key, gpr_timespec token_lifetime, void* reserved); @@ -589,7 +589,7 @@ extern gpr_free_aligned_type gpr_free_aligned_import; typedef void(*gpr_set_allocation_functions_type)(gpr_allocation_functions functions); extern gpr_set_allocation_functions_type gpr_set_allocation_functions_import; #define gpr_set_allocation_functions gpr_set_allocation_functions_import -typedef gpr_allocation_functions(*gpr_get_allocation_functions_type)(); +typedef gpr_allocation_functions(*gpr_get_allocation_functions_type)(void); extern gpr_get_allocation_functions_type gpr_get_allocation_functions_import; #define gpr_get_allocation_functions gpr_get_allocation_functions_import typedef gpr_avl(*gpr_avl_create_type)(const gpr_avl_vtable* vtable); @@ -712,7 +712,7 @@ extern gpr_log_message_type gpr_log_message_import; typedef void(*gpr_set_log_verbosity_type)(gpr_log_severity min_severity_to_print); extern gpr_set_log_verbosity_type gpr_set_log_verbosity_import; #define gpr_set_log_verbosity gpr_set_log_verbosity_import -typedef void(*gpr_log_verbosity_init_type)(); +typedef void(*gpr_log_verbosity_init_type)(void); extern gpr_log_verbosity_init_type gpr_log_verbosity_init_import; #define gpr_log_verbosity_init gpr_log_verbosity_init_import typedef void(*gpr_set_log_function_type)(gpr_log_func func); diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index d5fc11dc1c..c80c7fcd32 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -92,9 +92,13 @@ module GRPC # Stops the jobs in the pool def stop GRPC.logger.info('stopping, will wait for all the workers to exit') - schedule { throw :exit } while ready_for_work? - @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop @stopped = true + loop do + break unless ready_for_work? + worker_queue = @ready_workers.pop + worker_queue << [proc { throw :exit }, []] + end @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers @@ -138,7 +142,10 @@ module GRPC end # there shouldn't be any work given to this thread while its busy fail('received a task while busy') unless worker_queue.empty? - @ready_workers << worker_queue + @stop_mutex.synchronize do + return if @stopped + @ready_workers << worker_queue + end end end end @@ -186,8 +193,13 @@ module GRPC # * max_waiting_requests: Deprecated due to internal changes to the thread # pool. This is still an argument for compatibility but is ignored. # - # * poll_period: when present, the server polls for new events with this - # period + # * poll_period: The amount of time in seconds to wait for + # currently-serviced RPC's to finish before cancelling them when shutting + # down the server. + # + # * pool_keep_alive: The amount of time in seconds to wait + # for currently busy thread-pool threads to finish before + # forcing an abrupt exit to each thread. # # * connect_md_proc: # when non-nil is a proc for determining metadata to to send back the client @@ -202,17 +214,18 @@ module GRPC # intercepting server handlers to provide extra functionality. # Interceptors are an EXPERIMENTAL API. # - def initialize(pool_size:DEFAULT_POOL_SIZE, - max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, - poll_period:DEFAULT_POLL_PERIOD, - connect_md_proc:nil, - server_args:{}, - interceptors:[]) + def initialize(pool_size: DEFAULT_POOL_SIZE, + max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, + poll_period: DEFAULT_POLL_PERIOD, + pool_keep_alive: GRPC::RpcServer::DEFAULT_POOL_SIZE, + connect_md_proc: nil, + server_args: {}, + interceptors: []) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size - @pool = Pool.new(@pool_size) + @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index 3001579ce7..be1412511a 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -14,5 +14,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '1.8.0.dev' + VERSION = '1.9.0.dev' end diff --git a/src/ruby/pb/grpc/health/checker.rb b/src/ruby/pb/grpc/health/checker.rb index f23db39da5..c492455d8f 100644 --- a/src/ruby/pb/grpc/health/checker.rb +++ b/src/ruby/pb/grpc/health/checker.rb @@ -48,6 +48,20 @@ module Grpc @status_mutex.synchronize { @statuses["#{service}"] = status } end + # Adds given health status for all given services + def set_status_for_services(status, *services) + @status_mutex.synchronize do + services.each { |service| @statuses["#{service}"] = status } + end + end + + # Adds health status for each service given within hash + def add_statuses(service_statuses = {}) + @status_mutex.synchronize do + service_statuses.each_pair { |service, status| @statuses["#{service}"] = status } + end + end + # Clears the status for the given service. def clear_status(service) @status_mutex.synchronize { @statuses.delete("#{service}") } diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 21e8815890..8258487418 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -77,8 +77,7 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service Grpc::Testing::CoreResponse.new(cores: cpu_cores) end def quit_worker(_args, _call) - Thread.new { - sleep 3 + @shutdown_thread = Thread.new { @server.stop } Grpc::Testing::Void.new @@ -87,6 +86,9 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service @server = s @server_port = sp end + def join_shutdown_thread + @shutdown_thread.join + end end def main @@ -107,11 +109,13 @@ def main # Configure any errors with client or server child threads to surface Thread.abort_on_exception = true - s = GRPC::RpcServer.new + s = GRPC::RpcServer.new(poll_period: 3) s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure) - s.handle(WorkerServiceImpl.new(s, options['server_port'].to_i)) + worker_service = WorkerServiceImpl.new(s, options['server_port'].to_i) + s.handle(worker_service) s.run + worker_service.join_shutdown_thread end main diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb index 6c9e206c3f..c79ccfd2e0 100644 --- a/src/ruby/spec/pb/health/checker_spec.rb +++ b/src/ruby/spec/pb/health/checker_spec.rb @@ -99,6 +99,35 @@ describe Grpc::Health::Checker do end end + context 'method `add_statuses`' do + it 'should add status to each service' do + checker = Grpc::Health::Checker.new + checker.add_statuses( + 'service1' => ServingStatus::SERVING, + 'service2' => ServingStatus::NOT_SERVING + ) + service1_health = checker.check(HCReq.new(service: 'service1'), nil) + service2_health = checker.check(HCReq.new(service: 'service2'), nil) + expect(service1_health).to eq(HCResp.new(status: ServingStatus::SERVING)) + expect(service2_health).to eq(HCResp.new(status: ServingStatus::NOT_SERVING)) + end + end + + context 'method `set_status_for_services`' do + it 'should add given status to all given services' do + checker = Grpc::Health::Checker.new + checker.set_status_for_services( + ServingStatus::SERVING, + 'service1', + 'service2' + ) + service1_health = checker.check(HCReq.new(service: 'service1'), nil) + service2_health = checker.check(HCReq.new(service: 'service2'), nil) + expect(service1_health).to eq(HCResp.new(status: ServingStatus::SERVING)) + expect(service2_health).to eq(HCResp.new(status: ServingStatus::SERVING)) + end + end + context 'method `check`' do success_tests.each do |t| it "should fail with NOT_FOUND when #{t[:desc]}" do diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb index c584a7cf59..48aad39e08 100644 --- a/src/ruby/tools/version.rb +++ b/src/ruby/tools/version.rb @@ -14,6 +14,6 @@ module GRPC module Tools - VERSION = '1.8.0.dev' + VERSION = '1.9.0.dev' end end |