aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rwxr-xr-xsrc/ruby/end2end/multiple_killed_watching_threads_driver.rb63
-rw-r--r--src/ruby/ext/grpc/rb_channel.c56
-rwxr-xr-xtools/run_tests/helper_scripts/run_ruby_end2end_tests.sh1
3 files changed, 101 insertions, 19 deletions
diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb
new file mode 100755
index 0000000000..206ec8e801
--- /dev/null
+++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb
@@ -0,0 +1,63 @@
+#!/usr/bin/env ruby
+
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require_relative './end2end_common'
+
+Thread.abort_on_exception = true
+
+include GRPC::Core::ConnectivityStates
+
+def watch_state(ch)
+ thd = Thread.new do
+ state = ch.connectivity_state(false)
+ fail "non-idle state: #{state}" unless state == IDLE
+ ch.watch_connectivity_state(IDLE, Time.now + 360)
+ end
+ sleep 0.1
+ thd.kill
+end
+
+def main
+ channels = []
+ 10.times do
+ ch = GRPC::Core::Channel.new('dummy_host',
+ nil, :this_channel_is_insecure)
+ watch_state(ch)
+ channels << ch
+ end
+
+ # checking state should still be safe to call
+ channels.each do |c|
+ fail unless c.connectivity_state(false) == FATAL_FAILURE
+ end
+end
+
+main
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index e02dd0805d..7aaa71eeaf 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -107,6 +107,7 @@ static bg_watched_channel *bg_watched_channel_list_head = NULL;
static void grpc_rb_channel_try_register_connection_polling(
bg_watched_channel *bg);
static void *wait_until_channel_polling_thread_started_no_gil(void *);
+static void wait_until_channel_polling_thread_started_unblocking_func(void *);
static void *channel_init_try_register_connection_polling_without_gil(
void *arg);
@@ -227,12 +228,15 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
char *target_chars = NULL;
grpc_channel_args args;
channel_init_try_register_stack stack;
+ int stop_waiting_for_thread_start = 0;
MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
- rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil,
- NULL, run_poll_channels_loop_unblocking_func,
- NULL);
+ rb_thread_call_without_gvl(
+ wait_until_channel_polling_thread_started_no_gil,
+ &stop_waiting_for_thread_start,
+ wait_until_channel_polling_thread_started_unblocking_func,
+ &stop_waiting_for_thread_start);
/* "3" == 3 mandatory args */
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
@@ -273,7 +277,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
typedef struct get_state_stack {
- grpc_channel *channel;
+ bg_watched_channel *bg;
int try_to_connect;
int out;
} get_state_stack;
@@ -283,14 +287,10 @@ static void *get_state_without_gil(void *arg) {
gpr_mu_lock(&global_connection_polling_mu);
GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
- if (abort_channel_polling) {
- // Assume that this channel has been destroyed by the
- // background thread.
- // The case in which the channel polling thread
- // failed to start just always shows shutdown state.
+ if (stack->bg->channel_destroyed) {
stack->out = GRPC_CHANNEL_SHUTDOWN;
} else {
- stack->out = grpc_channel_check_connectivity_state(stack->channel,
+ stack->out = grpc_channel_check_connectivity_state(stack->bg->channel,
stack->try_to_connect);
}
gpr_mu_unlock(&global_connection_polling_mu);
@@ -322,7 +322,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
return Qnil;
}
- stack.channel = wrapper->bg_wrapped->channel;
+ stack.bg = wrapper->bg_wrapped;
stack.try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
rb_thread_call_without_gvl(get_state_without_gil, &stack, NULL, NULL);
@@ -366,6 +366,16 @@ static void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
return success;
}
+static void wait_for_watch_state_op_complete_unblocking_func(void *arg) {
+ bg_watched_channel *bg = (bg_watched_channel *)arg;
+ gpr_mu_lock(&global_connection_polling_mu);
+ if (!bg->channel_destroyed) {
+ grpc_channel_destroy(bg->channel);
+ bg->channel_destroyed = 1;
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
+}
+
/* Wait until the channel's connectivity state becomes different from
* "last_state", or "deadline" expires.
* Returns true if the the channel's connectivity state becomes
@@ -400,7 +410,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
op_success = rb_thread_call_without_gvl(
wait_for_watch_state_op_complete_without_gvl, &stack,
- run_poll_channels_loop_unblocking_func, NULL);
+ wait_for_watch_state_op_complete_unblocking_func, wrapper->bg_wrapped);
return op_success ? Qtrue : Qfalse;
}
@@ -577,11 +587,7 @@ static void grpc_rb_channel_try_register_connection_polling(
return;
}
GPR_ASSERT(bg->refcount == 1);
- if (bg->channel_destroyed) {
- GPR_ASSERT(abort_channel_polling);
- return;
- }
- if (abort_channel_polling) {
+ if (bg->channel_destroyed || abort_channel_polling) {
return;
}
@@ -697,10 +703,11 @@ static VALUE run_poll_channels_loop(VALUE arg) {
}
static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
- (void)arg;
+ int *stop_waiting = (int *)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
gpr_mu_lock(&global_connection_polling_mu);
- while (!channel_polling_thread_started && !abort_channel_polling) {
+ while (!channel_polling_thread_started && !abort_channel_polling &&
+ !*stop_waiting) {
gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
@@ -709,6 +716,17 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
return NULL;
}
+static void wait_until_channel_polling_thread_started_unblocking_func(
+ void *arg) {
+ int *stop_waiting = (int *)arg;
+ gpr_mu_lock(&global_connection_polling_mu);
+ gpr_log(GPR_DEBUG,
+ "GRPC_RUBY: interrupt wait for channel polling thread to start");
+ *stop_waiting = 1;
+ gpr_cv_broadcast(&global_connection_polling_cv);
+ gpr_mu_unlock(&global_connection_polling_mu);
+}
+
static void *set_abort_channel_polling_without_gil(void *arg) {
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
diff --git a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
index 6688025260..ab882d62bc 100755
--- a/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
+++ b/tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh
@@ -41,4 +41,5 @@ ruby src/ruby/end2end/sig_int_during_channel_watch_driver.rb || EXIT_CODE=1
ruby src/ruby/end2end/killed_client_thread_driver.rb || EXIT_CODE=1
ruby src/ruby/end2end/forking_client_driver.rb || EXIT_CODE=1
ruby src/ruby/end2end/grpc_class_init_driver.rb || EXIT_CODE=1
+ruby src/ruby/end2end/multiple_killed_watching_threads_driver.rb || EXIT_CODE=1
exit $EXIT_CODE