From fcad5799b4c785d428ed30340d79581a5d97026c Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 14 Mar 2017 12:26:17 -0700 Subject: in the middle of fixing watch and get connectivity state to work with new changes --- src/ruby/ext/grpc/rb_channel.c | 113 ++++++++++++++++++++----------- src/ruby/spec/channel_connection_spec.rb | 49 ++++++++++++++ 2 files changed, 121 insertions(+), 41 deletions(-) (limited to 'src/ruby') diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 2489ec2fef..8cd489345d 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -76,8 +76,10 @@ typedef struct grpc_rb_channel { grpc_completion_queue *queue; int request_safe_destroy; int safe_to_destroy; - gpr_mu safe_destroy_mu; - gpr_cv safe_destroy_cv; + grpc_connectivity_state current_connectivity_state; + + gpr_mu channel_mu; + gpr_cv channel_cv; } grpc_rb_channel; /* Forward declarations of functions involved in temporary fix to @@ -180,12 +182,19 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { GPR_ASSERT(ch); wrapper->wrapped = ch; - gpr_mu_init(&wrapper->safe_destroy_mu); - gpr_cv_init(&wrapper->safe_destroy_cv); - gpr_mu_lock(&wrapper->safe_destroy_mu); + + gpr_mu_init(&wrapper->channel_mu); + gpr_cv_init(&wrapper->channel_cv); + + gpr_mu_lock(&wrapper->channel_mu); + wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + gpr_cv_signal(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); + + gpr_mu_lock(&wrapper->channel_mu); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_mu_unlock(&wrapper->channel_mu); grpc_rb_channel_try_register_connection_polling(wrapper); if (args.args != NULL) { @@ -232,43 +241,57 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); } -/* Watch for a change in connectivity state. - - Once the channel connectivity state is different from the last observed - state, tag will be enqueued on cq with success=1 - - If deadline expires BEFORE the state is changed, tag will be enqueued on - the completion queue with success=0 */ +/* Wait until the channel's connectivity state becomes different from + * "last_state", or "deadline" expires. + * Returns true if the the channel's connectivity state becomes + * different from "last_state" within "deadline". + * Returns false if "deadline" expires before the channel's connectivity + * state changes from "last_state". + * */ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; - grpc_completion_queue *cq = NULL; - - void *tag = wrapper; - - grpc_event event; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - ch = wrapper->wrapped; - cq = wrapper->queue; - if (ch == NULL) { + + if (wrapper->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - grpc_channel_watch_connectivity_state( - ch, (grpc_connectivity_state)NUM2LONG(last_state), - grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag); - event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME), - NULL); + if (!FIXNUM_P(last_state)) { + rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant"); + return Qnil; + } - if (event.success) { + gpr_mu_lock(&wrapper->channel_mu); + if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { + gpr_mu_unlock(&wrapper->channel_mu); return Qtrue; - } else { + } + if (wrapper->request_safe_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel"); + return Qfalse; + } + if (wrapper->safe_to_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel"); + return Qfalse; + } + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); + if (wrapper->request_safe_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state"); return Qfalse; } + if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { + gpr_mu_unlock(&wrapper->channel_mu); + return Qtrue; + } + gpr_mu_unlock(&wrapper->channel_mu); + return Qfalse; } /* Create a call given a grpc_channel, in order to call method. The request @@ -378,40 +401,47 @@ static void grpc_rb_channel_try_register_connection_polling( GPR_ASSERT(wrapper); GPR_ASSERT(wrapper->wrapped); - gpr_mu_lock(&wrapper->safe_destroy_mu); + gpr_mu_lock(&wrapper->channel_mu); if (wrapper->request_safe_destroy) { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->safe_destroy_cv); - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_cv_signal(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); return; } gpr_mu_lock(&channel_polling_mu); + + gpr_mu_lock(&wrapper->channel_mu); conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + if (conn_state != wrapper->current_connectivity_state) { + wrapper->current_connectivity_state = conn_state; + gpr_cv_signal(&wrapper->channel_cv); + } // avoid posting work to the channel polling cq if it's been shutdown if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { grpc_channel_watch_connectivity_state( wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); } else { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->safe_destroy_cv); + gpr_cv_signal(&wrapper->channel_cv); } + gpr_mu_unlock(&wrapper->channel_mu); gpr_mu_unlock(&channel_polling_mu); - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_mu_unlock(&wrapper->channel_mu); } -// Note requires wrapper->wrapped, wrapper->safe_destroy_mu/cv initialized +// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { - gpr_mu_lock(&wrapper->safe_destroy_mu); + gpr_mu_lock(&wrapper->channel_mu); while (!wrapper->safe_to_destroy) { wrapper->request_safe_destroy = 1; - gpr_cv_wait(&wrapper->safe_destroy_cv, &wrapper->safe_destroy_mu, + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } GPR_ASSERT(wrapper->safe_to_destroy); - gpr_mu_unlock(&wrapper->safe_destroy_mu); + gpr_mu_unlock(&wrapper->channel_mu); - gpr_mu_destroy(&wrapper->safe_destroy_mu); - gpr_cv_destroy(&wrapper->safe_destroy_cv); + gpr_mu_destroy(&wrapper->channel_mu); + gpr_cv_destroy(&wrapper->channel_cv); grpc_channel_destroy(wrapper->wrapped); } @@ -434,6 +464,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) { } if (event.type == GRPC_OP_COMPLETE) { wrapper = (grpc_rb_channel *)event.tag; + grpc_rb_channel_try_register_connection_polling(wrapper); } } @@ -524,7 +555,7 @@ void Init_grpc_channel() { rb_define_method(grpc_rb_cChannel, "connectivity_state", grpc_rb_channel_get_connectivity_state, -1); rb_define_method(grpc_rb_cChannel, "watch_connectivity_state", - grpc_rb_channel_watch_connectivity_state, 4); + grpc_rb_channel_watch_connectivity_state, 2); rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call, 5); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); diff --git a/src/ruby/spec/channel_connection_spec.rb b/src/ruby/spec/channel_connection_spec.rb index 58ab37d7bc..d8e10f7b76 100644 --- a/src/ruby/spec/channel_connection_spec.rb +++ b/src/ruby/spec/channel_connection_spec.rb @@ -90,4 +90,53 @@ describe 'channel connection behavior' do expect(stub.an_rpc(req)).to be_a(EchoMsg) stop_server end + + it 'observably connects and reconnects to transient server when using the channel state API', trial: true do + port = start_server + ch = GRPC::Core::Channel.new("localhost:#{port}", {}, :this_channel_is_insecure) + + expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) + + state = ch.connectivity_state(true) + + count = 0 + while count < 20 and state != GRPC::Core::ConnectivityStates::READY do + STDERR.puts "first round of waiting for state to become READY" + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state(true) + count += 1 + end + + expect(state).to be(GRPC::Core::ConnectivityStates::READY) + + stop_server + + state = ch.connectivity_state + + count = 0 + while count < 20 and state == GRPC::Core::ConnectivityStates::READY do + STDERR.puts "server shut down. waiting for state to not be READY" + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state + count += 1 + end + + expect(state).to_not be(GRPC::Core::ConnectivityStates::READY) + + start_server(port) + + state = ch.connectivity_state(true) + + count = 0 + while count < 20 and state != GRPC::Core::ConnectivityStates::READY do + STDERR.puts "second round of waiting for state to become READY" + ch.watch_connectivity_state(state, Time.now + 60) + state = ch.connectivity_state(true) + count += 1 + end + + expect(state).to be(GRPC::Core::ConnectivityStates::READY) + + stop_server + end end -- cgit v1.2.3