aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/ruby/end2end/channel_closing_client.rb10
-rw-r--r--src/ruby/ext/grpc/rb_channel.c69
2 files changed, 45 insertions, 34 deletions
diff --git a/src/ruby/end2end/channel_closing_client.rb b/src/ruby/end2end/channel_closing_client.rb
index 88fa9ae5e6..a9df078421 100755
--- a/src/ruby/end2end/channel_closing_client.rb
+++ b/src/ruby/end2end/channel_closing_client.rb
@@ -36,9 +36,8 @@ class ChannelClosingClientController < ClientControl::ClientController::Service
@ch = ch
end
def shutdown(_, _)
- STDERR.puts "about to close channel"
@ch.close
- STDERR.puts "just closed channel"
+ ClientControl::Void.new
end
end
@@ -63,12 +62,13 @@ def main
srv.run
end
- # this should break out once the channel is closed
+ # this should break out with an exception once the channel is closed
loop do
- state = ch.connectivity_state(true)
begin
+ state = ch.connectivity_state(true)
ch.watch_connectivity_state(state, Time.now + 360)
- rescue RuntimeException => e
+ rescue RuntimeError => e
+ STDERR.puts "(expected) error occurred: #{e.inspect}"
break
end
end
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 08d48f2a04..94a10faf3f 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -78,6 +78,7 @@ typedef struct grpc_rb_channel {
int safe_to_destroy;
grpc_connectivity_state current_connectivity_state;
+ int mu_init_done;
gpr_mu channel_mu;
gpr_cv channel_cv;
} grpc_rb_channel;
@@ -106,6 +107,11 @@ static void grpc_rb_channel_free(void *p) {
ch->wrapped = NULL;
}
+ if (ch->mu_init_done) {
+ gpr_mu_destroy(&ch->channel_mu);
+ gpr_cv_destroy(&ch->channel_cv);
+ }
+
xfree(p);
}
@@ -164,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
+ wrapper->mu_init_done = 0;
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (TYPE(credentials) == T_SYMBOL) {
@@ -185,6 +192,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
gpr_mu_init(&wrapper->channel_mu);
gpr_cv_init(&wrapper->channel_cv);
+ wrapper->mu_init_done = 1;
gpr_mu_lock(&wrapper->channel_mu);
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
@@ -244,13 +252,38 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
typedef struct watch_state_stack {
grpc_rb_channel *wrapper;
gpr_timespec deadline;
+ int last_state;
} watch_state_stack;
static void *watch_channel_state_without_gvl(void *arg) {
- gpr_timespec deadline = ((watch_state_stack*)arg)->deadline;
- grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper;
+ watch_state_stack *stack = (watch_state_stack*)arg;
+
+ gpr_timespec deadline = stack->deadline;
+ grpc_rb_channel *wrapper = stack->wrapper;
+ int last_state = stack->last_state;
+
+ gpr_mu_lock(&wrapper->channel_mu);
+ if (wrapper->current_connectivity_state != last_state) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
+ }
+ if (wrapper->request_safe_destroy) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
+ }
+ if (wrapper->safe_to_destroy) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
+ }
+
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
- return NULL;
+
+ if (wrapper->current_connectivity_state != last_state) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)1;
+ }
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
}
static void watch_channel_state_unblocking_func(void *arg) {
@@ -273,6 +306,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
watch_state_stack stack;
+ void* out;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@@ -286,33 +320,13 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
- gpr_mu_lock(&wrapper->channel_mu);
- if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return Qtrue;
- }
- if (wrapper->request_safe_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel");
- return Qfalse;
- }
- if (wrapper->safe_to_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return Qfalse;
- }
stack.wrapper = wrapper;
stack.deadline = grpc_rb_time_timeval(deadline, 0);
- rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
- if (wrapper->request_safe_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state");
- return Qfalse;
- }
- if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
- gpr_mu_unlock(&wrapper->channel_mu);
+ stack.last_state = NUM2LONG(last_state);
+ out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
+ if (out) {
return Qtrue;
}
- gpr_mu_unlock(&wrapper->channel_mu);
return Qfalse;
}
@@ -460,9 +474,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
GPR_ASSERT(wrapper->safe_to_destroy);
gpr_mu_unlock(&wrapper->channel_mu);
- gpr_mu_destroy(&wrapper->channel_mu);
- gpr_cv_destroy(&wrapper->channel_cv);
-
grpc_channel_destroy(wrapper->wrapped);
}