aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_channel.c
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-03-14 16:33:44 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-03-14 17:15:51 -0700
commitbe30114a3b95b6dc843f8338d2b8a5470bec2553 (patch)
tree85f10580ea98756fe6490942ce3b85c009397b8d /src/ruby/ext/grpc/rb_channel.c
parentfcad5799b4c785d428ed30340d79581a5d97026c (diff)
fix up tests and remove two unlocks in a row bug
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c33
1 files changed, 15 insertions, 18 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 8cd489345d..d143c54d21 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -89,7 +89,7 @@ static void grpc_rb_channel_try_register_connection_polling(
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
static grpc_completion_queue *channel_polling_cq;
-static gpr_mu channel_polling_mu;
+static gpr_mu global_connection_polling_mu;
static int abort_channel_polling = 0;
/* Destroys Channel instances. */
@@ -188,13 +188,13 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
gpr_mu_lock(&wrapper->channel_mu);
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
- gpr_cv_signal(&wrapper->channel_cv);
- gpr_mu_unlock(&wrapper->channel_mu);
-
- gpr_mu_lock(&wrapper->channel_mu);
wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0;
+
+ gpr_cv_signal(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
+
+
grpc_rb_channel_try_register_connection_polling(wrapper);
if (args.args != NULL) {
@@ -277,7 +277,6 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
}
if (wrapper->safe_to_destroy) {
gpr_mu_unlock(&wrapper->channel_mu);
- gpr_log(GPR_DEBUG, "GRPC_RUBY_RB_CHANNEL: attempt to watch_connectivity_state on a non-state-polled channel");
return Qfalse;
}
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, grpc_rb_time_timeval(deadline, /* absolute time */ 0));
@@ -394,7 +393,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
// destroy.
// Not safe to call while a channel's connection state is polled.
static void grpc_rb_channel_try_register_connection_polling(
- grpc_rb_channel *wrapper) {
+ grpc_rb_channel *wrapper) {
grpc_connectivity_state conn_state;
gpr_timespec sleep_time = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
@@ -408,9 +407,8 @@ static void grpc_rb_channel_try_register_connection_polling(
gpr_mu_unlock(&wrapper->channel_mu);
return;
}
- gpr_mu_lock(&channel_polling_mu);
+ gpr_mu_lock(&global_connection_polling_mu);
- gpr_mu_lock(&wrapper->channel_mu);
conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
if (conn_state != wrapper->current_connectivity_state) {
wrapper->current_connectivity_state = conn_state;
@@ -424,8 +422,7 @@ static void grpc_rb_channel_try_register_connection_polling(
wrapper->safe_to_destroy = 1;
gpr_cv_signal(&wrapper->channel_cv);
}
- gpr_mu_unlock(&wrapper->channel_mu);
- gpr_mu_unlock(&channel_polling_mu);
+ gpr_mu_unlock(&global_connection_polling_mu);
gpr_mu_unlock(&wrapper->channel_mu);
}
@@ -454,7 +451,6 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
// early and falls back to current behavior.
static void *run_poll_channels_loop_no_gil(void *arg) {
grpc_event event;
- grpc_rb_channel *wrapper;
(void)arg;
for (;;) {
event = grpc_completion_queue_next(
@@ -463,27 +459,28 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
break;
}
if (event.type == GRPC_OP_COMPLETE) {
- wrapper = (grpc_rb_channel *)event.tag;
-
- grpc_rb_channel_try_register_connection_polling(wrapper);
+ grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag);
}
}
grpc_completion_queue_destroy(channel_polling_cq);
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop");
return NULL;
}
// Notify the channel polling loop to cleanup and shutdown.
static void grpc_rb_event_unblocking_func(void *arg) {
(void)arg;
- gpr_mu_lock(&channel_polling_mu);
+ gpr_mu_lock(&global_connection_polling_mu);
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting connection polling");
abort_channel_polling = 1;
grpc_completion_queue_shutdown(channel_polling_cq);
- gpr_mu_unlock(&channel_polling_mu);
+ gpr_mu_unlock(&global_connection_polling_mu);
}
// Poll channel connectivity states in background thread without the GIL.
static VALUE run_poll_channels_loop(VALUE arg) {
(void)arg;
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
grpc_rb_event_unblocking_func, NULL);
return Qnil;
@@ -501,7 +498,7 @@ static VALUE run_poll_channels_loop(VALUE arg) {
*/
static void start_poll_channels_loop() {
channel_polling_cq = grpc_completion_queue_create(NULL);
- gpr_mu_init(&channel_polling_mu);
+ gpr_mu_init(&global_connection_polling_mu);
abort_channel_polling = 0;
rb_thread_create(run_poll_channels_loop, NULL);
}