aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_channel.c
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-03-15 15:54:49 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-03-15 15:54:49 -0700
commit563ec5324f60b8ea521b41a1a440735b8bf6d2a6 (patch)
tree3185c46e4955d52edbcb8ce1f0367f2457f9d176 /src/ruby/ext/grpc/rb_channel.c
parent70bc4921e15f813c118477e26ea4bc5267b5c7e0 (diff)
stop mixing gpr mutexes and the ruby gil to fix channel closing deadlock
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c69
1 files changed, 40 insertions, 29 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 08d48f2a04..94a10faf3f 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -78,6 +78,7 @@ typedef struct grpc_rb_channel {
int safe_to_destroy;
grpc_connectivity_state current_connectivity_state;
+ int mu_init_done;
gpr_mu channel_mu;
gpr_cv channel_cv;
} grpc_rb_channel;
@@ -106,6 +107,11 @@ static void grpc_rb_channel_free(void *p) {
ch->wrapped = NULL;
}
+ if (ch->mu_init_done) {
+ gpr_mu_destroy(&ch->channel_mu);
+ gpr_cv_destroy(&ch->channel_cv);
+ }
+
xfree(p);
}
@@ -164,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
+ wrapper->mu_init_done = 0;
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (TYPE(credentials) == T_SYMBOL) {
@@ -185,6 +192,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
gpr_mu_init(&wrapper->channel_mu);
gpr_cv_init(&wrapper->channel_cv);
+ wrapper->mu_init_done = 1;
gpr_mu_lock(&wrapper->channel_mu);
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
@@ -244,13 +252,38 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
typedef struct watch_state_stack {
grpc_rb_channel *wrapper;
gpr_timespec deadline;
+ int last_state;
} watch_state_stack;
static void *watch_channel_state_without_gvl(void *arg) {
- gpr_timespec deadline = ((watch_state_stack*)arg)->deadline;
- grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper;
+ watch_state_stack *stack = (watch_state_stack*)arg;
+
+ gpr_timespec deadline = stack->deadline;
+ grpc_rb_channel *wrapper = stack->wrapper;
+ int last_state = stack->last_state;
+
+ gpr_mu_lock(&wrapper->channel_mu);
+ if (wrapper->current_connectivity_state != last_state) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
+ }
+ if (wrapper->request_safe_destroy) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
+ }
+ if (wrapper->safe_to_destroy) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
+ }
+
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
- return NULL;
+
+ if (wrapper->current_connectivity_state != last_state) {
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)1;
+ }
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return (void*)0;
}
static void watch_channel_state_unblocking_func(void *arg) {
@@ -273,6 +306,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
watch_state_stack stack;
+ void* out;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@@ -286,33 +320,13 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
- gpr_mu_lock(&wrapper->channel_mu);
- if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return Qtrue;
- }
- if (wrapper->request_safe_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- rb_raise(rb_eRuntimeError, "watch_connectivity_state called on closed channel");
- return Qfalse;
- }
- if (wrapper->safe_to_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return Qfalse;
- }
stack.wrapper = wrapper;
stack.deadline = grpc_rb_time_timeval(deadline, 0);
- rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
- if (wrapper->request_safe_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state");
- return Qfalse;
- }
- if (wrapper->current_connectivity_state != NUM2LONG(last_state)) {
- gpr_mu_unlock(&wrapper->channel_mu);
+ stack.last_state = NUM2LONG(last_state);
+ out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
+ if (out) {
return Qtrue;
}
- gpr_mu_unlock(&wrapper->channel_mu);
return Qfalse;
}
@@ -460,9 +474,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
GPR_ASSERT(wrapper->safe_to_destroy);
gpr_mu_unlock(&wrapper->channel_mu);
- gpr_mu_destroy(&wrapper->channel_mu);
- gpr_cv_destroy(&wrapper->channel_cv);
-
grpc_channel_destroy(wrapper->wrapped);
}