diff options
author | 2017-03-21 18:31:29 -0700 | |
---|---|---|
committer | 2017-03-21 18:38:26 -0700 | |
commit | 5b881460d24bc930339d1cfd37805a7eadeee5c0 (patch) | |
tree | 83eb3d1846519addcd28e9e424e4ce7d1a238245 /src/ruby/ext/grpc/rb_channel.c | |
parent | ea282e9c4cd422a1edcbdd30a81b8f58c8523063 (diff) |
make fewer lock/unlock calls and loop on cv_wait in watch conn state
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 40 |
1 files changed, 19 insertions, 21 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 1fe825efd6..c12ea921c9 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -78,6 +78,7 @@ typedef struct grpc_rb_channel { grpc_connectivity_state current_connectivity_state; int mu_init_done; + int abort_watch_connectivity_state; gpr_mu channel_mu; gpr_cv channel_cv; } grpc_rb_channel; @@ -193,6 +194,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { wrapper->mu_init_done = 1; gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 0; wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; @@ -242,8 +244,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return LONG2NUM( - grpc_channel_check_connectivity_state(ch, grpc_try_to_connect)); + return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect)); } typedef struct watch_state_stack { @@ -254,39 +255,35 @@ typedef struct watch_state_stack { static void *watch_channel_state_without_gvl(void *arg) { watch_state_stack *stack = (watch_state_stack*)arg; - gpr_timespec deadline = stack->deadline; grpc_rb_channel *wrapper = stack->wrapper; int last_state = stack->last_state; + void *return_value = (void*)0; + gpr_timespec time_check_increment = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); + gpr_mu_lock(&wrapper->channel_mu); - if (wrapper->current_connectivity_state != last_state) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; - } - if (wrapper->request_safe_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; + while(wrapper->current_connectivity_state == last_state && + !wrapper->request_safe_destroy && + !wrapper->safe_to_destroy && + !wrapper->abort_watch_connectivity_state && + gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, time_check_increment); } - if (wrapper->safe_to_destroy) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; - } - - gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); - if (wrapper->current_connectivity_state != last_state) { - gpr_mu_unlock(&wrapper->channel_mu); - return (void*)1; + return_value = (void*)1; } gpr_mu_unlock(&wrapper->channel_mu); - return (void*)0; + + return return_value; } static void watch_channel_state_unblocking_func(void *arg) { grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 1; gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); } @@ -461,8 +458,9 @@ static void grpc_rb_channel_try_register_connection_polling( // Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { gpr_mu_lock(&wrapper->channel_mu); + wrapper->request_safe_destroy = 1; + while (!wrapper->safe_to_destroy) { - wrapper->request_safe_destroy = 1; gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } |