diff options
author | Alexander Polcyn <apolcyn@google.com> | 2017-03-14 16:33:44 -0700 |
---|---|---|
committer | Alexander Polcyn <apolcyn@google.com> | 2017-03-14 17:15:51 -0700 |
commit | be30114a3b95b6dc843f8338d2b8a5470bec2553 (patch) | |
tree | 85f10580ea98756fe6490942ce3b85c009397b8d | |
parent | fcad5799b4c785d428ed30340d79581a5d97026c (diff) |
fix up tests and remove two unlocks in a row bug
-rwxr-xr-x | examples/ruby/greeter_client.rb | 32 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 33 | ||||
-rw-r--r-- | src/ruby/spec/channel_connection_spec.rb | 19 |
3 files changed, 54 insertions, 30 deletions
diff --git a/examples/ruby/greeter_client.rb b/examples/ruby/greeter_client.rb index 1cdf79ebf4..379f41536e 100755 --- a/examples/ruby/greeter_client.rb +++ b/examples/ruby/greeter_client.rb @@ -40,11 +40,39 @@ $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) require 'grpc' require 'helloworld_services_pb' +$int_count = 0 + +def shut_down_term + puts "term sig" + $int_count += 1 + if $int_count > 4 + exit + end +end + +def shut_down_kill + puts "kill sig" + $int_count += 1 + if $int_count > 4 + exit + end +end + + def main stub = Helloworld::Greeter::Stub.new('localhost:50051', :this_channel_is_insecure) user = ARGV.size > 0 ? ARGV[0] : 'world' - message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message - p "Greeting: #{message}" + Signal.trap("TERM") do + shut_down_term + end + Signal.trap("INT") do + shut_down_kill + end + loop do + message = stub.say_hello(Helloworld::HelloRequest.new(name: user)).message + p "Greeting: #{message}" + sleep 4 + end end main diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 8cd489345d..d143c54d21 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -89,7 +89,7 @@ static void grpc_rb_channel_try_register_connection_polling( static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); static grpc_completion_queue *channel_polling_cq; -static gpr_mu channel_polling_mu; +static gpr_mu global_connection_polling_mu; static int abort_channel_polling = 0; /* Destroys Channel instances. */ @@ -188,13 +188,13 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { gpr_mu_lock(&wrapper->channel_mu); wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); - gpr_cv_signal(&wrapper->channel_cv); - gpr_mu_unlock(&wrapper->channel_mu); - - gpr_mu_lock(&wrapper->channel_mu); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; + + gpr_cv_signal(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); + + grpc_rb_channel_try_register_connection_polling(wrapper); if (args.args != NULL) { @@ -277,7 +277,6 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, } if (wrapper->safe_to_destroy) { gpr_mu_unlock(&wrapper->channel_mu); - gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel"); return Qfalse; } gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); @@ -394,7 +393,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { // destroy. // Not safe to call while a channel's connection state is polled. static void grpc_rb_channel_try_register_connection_polling( - grpc_rb_channel *wrapper) { + grpc_rb_channel *wrapper) { grpc_connectivity_state conn_state; gpr_timespec sleep_time = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); @@ -408,9 +407,8 @@ static void grpc_rb_channel_try_register_connection_polling( gpr_mu_unlock(&wrapper->channel_mu); return; } - gpr_mu_lock(&channel_polling_mu); + gpr_mu_lock(&global_connection_polling_mu); - gpr_mu_lock(&wrapper->channel_mu); conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); if (conn_state != wrapper->current_connectivity_state) { wrapper->current_connectivity_state = conn_state; @@ -424,8 +422,7 @@ static void grpc_rb_channel_try_register_connection_polling( wrapper->safe_to_destroy = 1; gpr_cv_signal(&wrapper->channel_cv); } - gpr_mu_unlock(&wrapper->channel_mu); - gpr_mu_unlock(&channel_polling_mu); + gpr_mu_unlock(&global_connection_polling_mu); gpr_mu_unlock(&wrapper->channel_mu); } @@ -454,7 +451,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { // early and falls back to current behavior. static void *run_poll_channels_loop_no_gil(void *arg) { grpc_event event; - grpc_rb_channel *wrapper; (void)arg; for (;;) { event = grpc_completion_queue_next( @@ -463,27 +459,28 @@ static void *run_poll_channels_loop_no_gil(void *arg) { break; } if (event.type == GRPC_OP_COMPLETE) { - wrapper = (grpc_rb_channel *)event.tag; - - grpc_rb_channel_try_register_connection_polling(wrapper); + grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag); } } grpc_completion_queue_destroy(channel_polling_cq); + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop"); return NULL; } // Notify the channel polling loop to cleanup and shutdown. static void grpc_rb_event_unblocking_func(void *arg) { (void)arg; - gpr_mu_lock(&channel_polling_mu); + gpr_mu_lock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting connection polling"); abort_channel_polling = 1; grpc_completion_queue_shutdown(channel_polling_cq); - gpr_mu_unlock(&channel_polling_mu); + gpr_mu_unlock(&global_connection_polling_mu); } // Poll channel connectivity states in background thread without the GIL. static VALUE run_poll_channels_loop(VALUE arg) { (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, grpc_rb_event_unblocking_func, NULL); return Qnil; @@ -501,7 +498,7 @@ static VALUE run_poll_channels_loop(VALUE arg) { */ static void start_poll_channels_loop() { channel_polling_cq = grpc_completion_queue_create(NULL); - gpr_mu_init(&channel_polling_mu); + gpr_mu_init(&global_connection_polling_mu); abort_channel_polling = 0; rb_thread_create(run_poll_channels_loop, NULL); } diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index d8e10f7b76..b344052a21 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -63,7 +63,7 @@ EchoStub = EchoService.rpc_stub_class def start_server(port = 0) @srv = GRPC::RpcServer.new - server_port = @srv.add_http2_port("0.0.0.0:#{port}", :this_port_is_insecure) + server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) @srv.handle(EchoService) @server_thd = Thread.new { @srv.run } @srv.wait_till_running @@ -84,24 +84,25 @@ describe 'channel connection behavior' do req = EchoMsg.new expect(stub.an_rpc(req)).to be_a(EchoMsg) stop_server - expect { stub.an_rpc(req) }.to raise_error(GRPC::Unavailable) + sleep 1 # TODO(apolcyn) grabbing the same port might fail, is this stable enough? start_server(port) expect(stub.an_rpc(req)).to be_a(EchoMsg) stop_server end - it 'observably connects and reconnects to transient server when using the channel state API', trial: true do + it 'observably connects and reconnects to transient server' \ + 'when using the channel state API' do port = start_server - ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure) + ch = GRPC::Core::Channel.new("localhost:#{port}", {}, + :this_channel_is_insecure) expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) state = ch.connectivity_state(true) count = 0 - while count < 20 and state != GRPC::Core::ConnectivityStates::READY do - STDERR.puts "first round of waiting for state to become READY" + while count < 20 && state != GRPC::Core::ConnectivityStates::READY ch.watch_connectivity_state(state, Time.now + 60) state = ch.connectivity_state(true) count += 1 @@ -114,8 +115,7 @@ describe 'channel connection behavior' do state = ch.connectivity_state count = 0 - while count < 20 and state == GRPC::Core::ConnectivityStates::READY do - STDERR.puts "server shut down. waiting for state to not be READY" + while count < 20 && state == GRPC::Core::ConnectivityStates::READY ch.watch_connectivity_state(state, Time.now + 60) state = ch.connectivity_state count += 1 @@ -128,8 +128,7 @@ describe 'channel connection behavior' do state = ch.connectivity_state(true) count = 0 - while count < 20 and state != GRPC::Core::ConnectivityStates::READY do - STDERR.puts "second round of waiting for state to become READY" + while count < 20 && state != GRPC::Core::ConnectivityStates::READY ch.watch_connectivity_state(state, Time.now + 60) state = ch.connectivity_state(true) count += 1 |