diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/ruby/end2end/channel_closing_client.rb | 10 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 69 |
2 files changed, 45 insertions, 34 deletions
diff --git a/src/ruby/end2end/channel_closing_client.rb b/src/ruby/end2end/channel_closing_client.rb index 88fa9ae5e6..a9df078421 100755 --- a/src/ruby/end2end/channel_closing_client.rb +++ b/src/ruby/end2end/channel_closing_client.rb @@ -36,9 +36,8 @@ class ChannelClosingClientController < ClientControl::ClientController::Service @ch = ch end def shutdown(_, _) - STDERR.puts "about to close channel" @ch.close - STDERR.puts "just closed channel" + ClientControl::Void.new end end @@ -63,12 +62,13 @@ def main srv.run end - # this should break out once the channel is closed + # this should break out with an exception once the channel is closed loop do - state = ch.connectivity_state(true) begin + state = ch.connectivity_state(true) ch.watch_connectivity_state(state, Time.now + 360) - rescue RuntimeException => e + rescue RuntimeError => e + STDERR.puts "(expected) error occurred: #{e.inspect}" break end end diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 08d48f2a04..94a10faf3f 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -78,6 +78,7 @@ typedef struct grpc_rb_channel { int safe_to_destroy; grpc_connectivity_state current_connectivity_state; + int mu_init_done; gpr_mu channel_mu; gpr_cv channel_cv; } grpc_rb_channel; @@ -106,6 +107,11 @@ static void grpc_rb_channel_free(void *p) { ch->wrapped = NULL; } + if (ch->mu_init_done) { + gpr_mu_destroy(&ch->channel_mu); + gpr_cv_destroy(&ch->channel_cv); + } + xfree(p); } @@ -164,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); + wrapper->mu_init_done = 0; target_chars = StringValueCStr(target); grpc_rb_hash_convert_to_channel_args(channel_args, &args); if (TYPE(credentials) == T_SYMBOL) { @@ -185,6 +192,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { gpr_mu_init(&wrapper->channel_mu); gpr_cv_init(&wrapper->channel_cv); + wrapper->mu_init_done = 1; gpr_mu_lock(&wrapper->channel_mu); wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); @@ -244,13 +252,38 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, typedef struct watch_state_stack { grpc_rb_channel *wrapper; gpr_timespec deadline; + int last_state; } watch_state_stack; static void *watch_channel_state_without_gvl(void *arg) { - gpr_timespec deadline = ((watch_state_stack*)arg)->deadline; - grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper; + watch_state_stack *stack = (watch_state_stack*)arg; + + gpr_timespec deadline = stack->deadline; + grpc_rb_channel *wrapper = stack->wrapper; + int last_state = stack->last_state; + + gpr_mu_lock(&wrapper->channel_mu); + if (wrapper->current_connectivity_state != last_state) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; + } + if (wrapper->request_safe_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; + } + if (wrapper->safe_to_destroy) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; + } + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); - return NULL; + + if (wrapper->current_connectivity_state != last_state) { + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)1; + } + gpr_mu_unlock(&wrapper->channel_mu); + return (void*)0; } static void watch_channel_state_unblocking_func(void *arg) { @@ -273,6 +306,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE deadline) { grpc_rb_channel *wrapper = NULL; watch_state_stack stack; + void* out; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -286,33 +320,13 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, return Qnil; } - gpr_mu_lock(&wrapper->channel_mu); - if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { - gpr_mu_unlock(&wrapper->channel_mu); - return Qtrue; - } - if (wrapper->request_safe_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel"); - return Qfalse; - } - if (wrapper->safe_to_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return Qfalse; - } stack.wrapper = wrapper; stack.deadline = grpc_rb_time_timeval(deadline, 0); - rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); - if (wrapper->request_safe_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state"); - return Qfalse; - } - if (wrapper->current_connectivity_state != NUM2LONG(last_state)) { - gpr_mu_unlock(&wrapper->channel_mu); + stack.last_state = NUM2LONG(last_state); + out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); + if (out) { return Qtrue; } - gpr_mu_unlock(&wrapper->channel_mu); return Qfalse; } @@ -460,9 +474,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { GPR_ASSERT(wrapper->safe_to_destroy); gpr_mu_unlock(&wrapper->channel_mu); - gpr_mu_destroy(&wrapper->channel_mu); - gpr_cv_destroy(&wrapper->channel_cv); - grpc_channel_destroy(wrapper->wrapped); } |