aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_channel.c
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-03-21 18:31:29 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-03-21 18:38:26 -0700
commit5b881460d24bc930339d1cfd37805a7eadeee5c0 (patch)
tree83eb3d1846519addcd28e9e424e4ce7d1a238245 /src/ruby/ext/grpc/rb_channel.c
parentea282e9c4cd422a1edcbdd30a81b8f58c8523063 (diff)
make fewer lock/unlock calls and loop on cv_wait in watch conn state
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c40
1 files changed, 19 insertions, 21 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 1fe825efd6..c12ea921c9 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -78,6 +78,7 @@ typedef struct grpc_rb_channel {
grpc_connectivity_state current_connectivity_state;
int mu_init_done;
+ int abort_watch_connectivity_state;
gpr_mu channel_mu;
gpr_cv channel_cv;
} grpc_rb_channel;
@@ -193,6 +194,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
wrapper->mu_init_done = 1;
gpr_mu_lock(&wrapper->channel_mu);
+ wrapper->abort_watch_connectivity_state = 0;
wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0;
@@ -242,8 +244,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
- return LONG2NUM(
- grpc_channel_check_connectivity_state(ch, grpc_try_to_connect));
+ return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect));
}
typedef struct watch_state_stack {
@@ -254,39 +255,35 @@ typedef struct watch_state_stack {
static void *watch_channel_state_without_gvl(void *arg) {
watch_state_stack *stack = (watch_state_stack*)arg;
-
gpr_timespec deadline = stack->deadline;
grpc_rb_channel *wrapper = stack->wrapper;
int last_state = stack->last_state;
+ void *return_value = (void*)0;
+ gpr_timespec time_check_increment = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
+
gpr_mu_lock(&wrapper->channel_mu);
- if (wrapper->current_connectivity_state != last_state) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return (void*)0;
- }
- if (wrapper->request_safe_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return (void*)0;
+ while(wrapper->current_connectivity_state == last_state &&
+ !wrapper->request_safe_destroy &&
+ !wrapper->safe_to_destroy &&
+ !wrapper->abort_watch_connectivity_state &&
+ gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
+ gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, time_check_increment);
}
- if (wrapper->safe_to_destroy) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return (void*)0;
- }
-
- gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
-
if (wrapper->current_connectivity_state != last_state) {
- gpr_mu_unlock(&wrapper->channel_mu);
- return (void*)1;
+ return_value = (void*)1;
}
gpr_mu_unlock(&wrapper->channel_mu);
- return (void*)0;
+
+ return return_value;
}
static void watch_channel_state_unblocking_func(void *arg) {
grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
gpr_mu_lock(&wrapper->channel_mu);
+ wrapper->abort_watch_connectivity_state = 1;
gpr_cv_broadcast(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
}
@@ -461,8 +458,9 @@ static void grpc_rb_channel_try_register_connection_polling(
// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
gpr_mu_lock(&wrapper->channel_mu);
+ wrapper->request_safe_destroy = 1;
+
while (!wrapper->safe_to_destroy) {
- wrapper->request_safe_destroy = 1;
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}