aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-03-15 11:34:08 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-03-15 12:14:13 -0700
commitf3147b3a7c92212ca6e2289222d9a7d52e0e0f78 (patch)
tree7db56eaaababb7734e5cd9da096860fea15de95b
parent16d97edf56b036c28dbe60d1184ef89b71bf8a90 (diff)
watch channel state without the gil to fix deadlock on abrupt SIGTERM
-rwxr-xr-xsrc/ruby/end2end/channel_state_driver.rb6
-rw-r--r--src/ruby/ext/grpc/rb_channel.c33
-rwxr-xr-xsrc/ruby/qps/worker.rb2
-rwxr-xr-xtools/run_tests/run_tests.py1
4 files changed, 31 insertions, 11 deletions
diff --git a/src/ruby/end2end/channel_state_driver.rb b/src/ruby/end2end/channel_state_driver.rb
index cab0147e1f..8ef32acfff 100755
--- a/src/ruby/end2end/channel_state_driver.rb
+++ b/src/ruby/end2end/channel_state_driver.rb
@@ -29,8 +29,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-# smoke test for a grpc-using app that receives and
-# handles process-ending signals
+# make sure that the client doesn't hang when process ended abruptly
require_relative './end2end_common'
@@ -54,7 +53,8 @@ def main
STDERR.puts "timeout wait for client pid #{client_pid}"
Process.kill('SIGKILL', client_pid)
Process.wait(client_pid)
- raise 'Timed out waiting for client process. It likely hangs'
+ STDERR.puts "killed client child"
+ raise 'Timed out waiting for client process. It likely hangs when ended abruptly'
end
server_runner.stop
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index d143c54d21..08d48f2a04 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -191,7 +191,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0;
- gpr_cv_signal(&wrapper->channel_cv);
+ gpr_cv_broadcast(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
@@ -241,6 +241,26 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
grpc_channel_check_connectivity_state(ch, grpc_try_to_connect));
}
+typedef struct watch_state_stack {
+ grpc_rb_channel *wrapper;
+ gpr_timespec deadline;
+} watch_state_stack;
+
+static void *watch_channel_state_without_gvl(void *arg) {
+ gpr_timespec deadline = ((watch_state_stack*)arg)->deadline;
+ grpc_rb_channel *wrapper = ((watch_state_stack*)arg)->wrapper;
+ gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
+ return NULL;
+}
+
+static void watch_channel_state_unblocking_func(void *arg) {
+ grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
+ gpr_mu_lock(&wrapper->channel_mu);
+ gpr_cv_broadcast(&wrapper->channel_cv);
+ gpr_mu_unlock(&wrapper->channel_mu);
+}
+
/* Wait until the channel's connectivity state becomes different from
* "last_state", or "deadline" expires.
* Returns true if the the channel's connectivity state becomes
@@ -252,6 +272,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE last_state,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
+ watch_state_stack stack;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@@ -279,7 +300,9 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
gpr_mu_unlock(&wrapper->channel_mu);
return Qfalse;
}
- gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0));
+ stack.wrapper = wrapper;
+ stack.deadline = grpc_rb_time_timeval(deadline, 0);
+ rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
if (wrapper->request_safe_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
rb_raise(rb_eRuntimeError, "channel closed during call to watch_connectivity_state");
@@ -403,7 +426,7 @@ static void grpc_rb_channel_try_register_connection_polling(
gpr_mu_lock(&wrapper->channel_mu);
if (wrapper->request_safe_destroy) {
wrapper->safe_to_destroy = 1;
- gpr_cv_signal(&wrapper->channel_cv);
+ gpr_cv_broadcast(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
return;
}
@@ -412,7 +435,7 @@ static void grpc_rb_channel_try_register_connection_polling(
conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
if (conn_state != wrapper->current_connectivity_state) {
wrapper->current_connectivity_state = conn_state;
- gpr_cv_signal(&wrapper->channel_cv);
+ gpr_cv_broadcast(&wrapper->channel_cv);
}
// avoid posting work to the channel polling cq if it's been shutdown
if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
@@ -420,7 +443,7 @@ static void grpc_rb_channel_try_register_connection_polling(
wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
} else {
wrapper->safe_to_destroy = 1;
- gpr_cv_signal(&wrapper->channel_cv);
+ gpr_cv_broadcast(&wrapper->channel_cv);
}
gpr_mu_unlock(&global_connection_polling_mu);
gpr_mu_unlock(&wrapper->channel_mu);
diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb
index 318c1f9e22..61a0b723a3 100755
--- a/src/ruby/qps/worker.rb
+++ b/src/ruby/qps/worker.rb
@@ -36,8 +36,6 @@ lib_dir = File.join(File.dirname(this_dir), 'lib')
$LOAD_PATH.unshift(lib_dir) unless $LOAD_PATH.include?(lib_dir)
$LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir)
-puts $LOAD_PATH
-
require 'grpc'
require 'optparse'
require 'histogram'
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index dc17e738e6..573196fbb7 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -695,7 +695,6 @@ class RubyLanguage(object):
tests = [self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby.sh'],
timeout_seconds=10*60,
environ=_FORCE_ENVIRON_FOR_WRAPPERS)]
- # note these aren't getting ran on windows since no workers
tests.append(self.config.job_spec(['tools/run_tests/helper_scripts/run_ruby_end2end_tests.sh'],
timeout_seconds=10*60,
environ=_FORCE_ENVIRON_FOR_WRAPPERS))