diff options
author | 2017-03-15 11:34:08 -0700 | |
---|---|---|
committer | 2017-03-15 12:14:13 -0700 | |
commit | f3147b3a7c92212ca6e2289222d9a7d52e0e0f78 (patch) | |
tree | 7db56eaaababb7734e5cd9da096860fea15de95b | |
parent | 16d97edf56b036c28dbe60d1184ef89b71bf8a90 (diff) |
watch channel state without the gil to fix deadlock on abrupt SIGTERM
-rwxr-xr-x | src/ruby/end2end/channel_state_driver.rb | 6 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 33 | ||||
-rwxr-xr-x | src/ruby/qps/worker.rb | 2 | ||||
-rwxr-xr-x | tools/run_tests/run_tests.py | 1 |
4 files changed, 31 insertions, 11 deletions
diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb index cab0147e1f..8ef32acfff 100755 --- a/src/ruby/end2end/channel_state_driver.rb +++ b/src/ruby/end2end/channel_state_driver.rb @@ -29,8 +29,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# smoke test for a grpc-using app that receives and -# handles process-ending signals +# make sure that the client doesn't hang when process ended abruptly require_relative './end2end_common' @@ -54,7 +53,8 @@ def main STDERR.puts "timeout wait for client pid #{client_pid}" Process.kill('SIGKILL', client_pid) Process.wait(client_pid) - raise 'Timed out waiting for client process. It likely hangs' + STDERR.puts "killed client child" + raise 'Timed out waiting for client process. It likely hangs when ended abruptly' end server_runner.stop diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index d143c54d21..08d48f2a04 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -191,7 +191,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); @@ -241,6 +241,26 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); } +typedef struct watch_state_stack { + grpc_rb_channel *wrapper; + gpr_timespec deadline; +} watch_state_stack; + +static void *watch_channel_state_without_gvl(void *arg) { + gpr_timespec deadline = ((watch_state_stack*)arg)->deadline; + grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper; + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); + return NULL; +} + +static void watch_channel_state_unblocking_func(void *arg) { + grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); + gpr_mu_lock(&wrapper->channel_mu); + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); +} + /* Wait until the channel's connectivity state becomes different from * "last_state", or "deadline" expires. * Returns true if the the channel's connectivity state becomes @@ -252,6 +272,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; + watch_state_stack stack; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -279,7 +300,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, gpr_mu_unlock(&wrapper->channel_mu); return Qfalse; } - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0)); + stack.wrapper = wrapper; + stack.deadline = grpc_rb_time_timeval(deadline, 0); + rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); if (wrapper->request_safe_destroy) { gpr_mu_unlock(&wrapper->channel_mu); rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state"); @@ -403,7 +426,7 @@ static void grpc_rb_channel_try_register_connection_polling( gpr_mu_lock(&wrapper->channel_mu); if (wrapper->request_safe_destroy) { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); return; } @@ -412,7 +435,7 @@ static void grpc_rb_channel_try_register_connection_polling( conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); if (conn_state != wrapper->current_connectivity_state) { wrapper->current_connectivity_state = conn_state; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); } // avoid posting work to the channel polling cq if it's been shutdown if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { @@ -420,7 +443,7 @@ static void grpc_rb_channel_try_register_connection_polling( wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); } else { wrapper->safe_to_destroy = 1; - gpr_cv_signal(&wrapper->channel_cv); + gpr_cv_broadcast(&wrapper->channel_cv); } gpr_mu_unlock(&global_connection_polling_mu); gpr_mu_unlock(&wrapper->channel_mu); diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 318c1f9e22..61a0b723a3 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -36,8 +36,6 @@ lib_dir = File.join(File.dirname(this_dir), 'lib') $LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir) $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) -puts $LOAD_PATH - require 'grpc' require 'optparse' require 'histogram' diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index dc17e738e6..573196fbb7 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -695,7 +695,6 @@ class RubyLanguage(object): tests = [self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby.sh'], timeout_seconds=10*60, environ=_FORCE_ENVIRON_FOR_WRAPPERS)] - # note these aren't getting ran on windows since no workers tests.append(self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh'], timeout_seconds=10*60, environ=_FORCE_ENVIRON_FOR_WRAPPERS)) |