diff options
author | 2017-03-15 11:34:08 -0700 | |
---|---|---|
committer | 2017-03-15 12:14:13 -0700 | |
commit | f3147b3a7c92212ca6e2289222d9a7d52e0e0f78 (patch) | |
tree | 7db56eaaababb7734e5cd9da096860fea15de95b /src/ruby/ext/grpc/rb_channel.c | |
parent | 16d97edf56b036c28dbe60d1184ef89b71bf8a90 (diff) |
watch channel state without the gil to fix deadlock on abrupt SIGTERM
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 33 |
1 files changed, 28 insertions, 5 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index d143c54d21..08d48f2a04 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -191,7 +191,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); @@ -241,6 +241,26 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); } +typedef struct watch_state_stack { + grpc_rb_channel *wrapper; + gpr_timespec deadline; +} watch_state_stack; + +static void *watch_channel_state_without_gvl(void *arg) { + gpr_timespec deadline = ((watch_state_stack*)arg)->deadline; + grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper; + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); + return NULL; +} + +static void watch_channel_state_unblocking_func(void *arg) { + grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); + gpr_mu_lock(&wrapper->channel_mu); + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); +} + /* Wait until the channel's connectivity state becomes different from * "last_state", or "deadline" expires. * Returns true if the the channel's connectivity state becomes @@ -252,6 +272,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; + watch_state_stack stack; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -279,7 +300,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, gpr_mu_unlock(&wrapper->channel_mu); return Qfalse; } - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); + stack.wrapper = wrapper; + stack.deadline = grpc_rb_time_timeval(deadline, 0); + rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); if (wrapper->request_safe_destroy) { gpr_mu_unlock(&wrapper->channel_mu); rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state"); @@ -403,7 +426,7 @@ static void grpc_rb_channel_try_register_connection_polling( gpr_mu_lock(&wrapper->channel_mu); if (wrapper->request_safe_destroy) { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); return; } @@ -412,7 +435,7 @@ static void grpc_rb_channel_try_register_connection_polling( conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); if (conn_state != wrapper->current_connectivity_state) { wrapper->current_connectivity_state = conn_state; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); } // avoid posting work to the channel polling cq if it's been shutdown if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { @@ -420,7 +443,7 @@ static void grpc_rb_channel_try_register_connection_polling( wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); } else { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); } gpr_mu_unlock(&global_connection_polling_mu); gpr_mu_unlock(&wrapper->channel_mu); |