diff options
author | murgatroid99 <mlumish@google.com> | 2017-04-19 15:54:27 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2017-04-19 15:54:27 -0700 |
commit | ce67bff2dff6146981928a9f06edf875e6566f4c (patch) | |
tree | 564f19af118dc29fd996c15c4cb1af9db96f92c5 /src/ruby/ext/grpc/rb_channel.c | |
parent | feaee850bf82251f3797749f9b2ef4580b1ba002 (diff) | |
parent | 5f4758e47850068bf4cda88aa9e5580a9ca7ed85 (diff) |
Merge remote-tracking branch 'upstream/v1.3.x' into upmerge_v1.3.x
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 75 |
1 files changed, 67 insertions, 8 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index ecfffd1839..a802183726 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -89,10 +89,14 @@ typedef struct grpc_rb_channel { static void grpc_rb_channel_try_register_connection_polling( grpc_rb_channel *wrapper); static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); +static void *wait_until_channel_polling_thread_started_no_gil(void *); +static void wait_until_channel_polling_thread_started_unblocking_func(void *); static grpc_completion_queue *channel_polling_cq; static gpr_mu global_connection_polling_mu; +static gpr_cv global_connection_polling_cv; static int abort_channel_polling = 0; +static int channel_polling_thread_started = 0; /* Destroys Channel instances. */ static void grpc_rb_channel_free(void *p) { @@ -166,6 +170,11 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { grpc_channel_args args; MEMZERO(&args, grpc_channel_args, 1); + grpc_ruby_once_init(); + rb_thread_call_without_gvl( + wait_until_channel_polling_thread_started_no_gil, NULL, + wait_until_channel_polling_thread_started_unblocking_func, NULL); + /* "3" == 3 mandatory args */ rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); @@ -440,6 +449,7 @@ static void grpc_rb_channel_try_register_connection_polling( } gpr_mu_lock(&global_connection_polling_mu); + GPR_ASSERT(channel_polling_thread_started || abort_channel_polling); conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); if (conn_state != wrapper->current_connectivity_state) { wrapper->current_connectivity_state = conn_state; @@ -473,7 +483,7 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { } // Note this loop breaks out with a single call of -// "grpc_rb_event_unblocking_func". +// "run_poll_channels_loop_no_gil". // This assumes that a ruby call the unblocking func // indicates process shutdown. // In the worst case, this stops polling channel connectivity @@ -481,6 +491,14 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { static void *run_poll_channels_loop_no_gil(void *arg) { grpc_event event; (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin"); + + gpr_mu_lock(&global_connection_polling_mu); + GPR_ASSERT(!channel_polling_thread_started); + channel_polling_thread_started = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); + for (;;) { event = grpc_completion_queue_next( channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); @@ -500,7 +518,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) { } // Notify the channel polling loop to cleanup and shutdown. -static void grpc_rb_event_unblocking_func(void *arg) { +static void run_poll_channels_loop_unblocking_func(void *arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); gpr_log(GPR_DEBUG, @@ -518,10 +536,37 @@ static VALUE run_poll_channels_loop(VALUE arg) { GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, - grpc_rb_event_unblocking_func, NULL); + run_poll_channels_loop_unblocking_func, NULL); + return Qnil; } +static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { + (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start"); + gpr_mu_lock(&global_connection_polling_mu); + while (!channel_polling_thread_started && !abort_channel_polling) { + gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&global_connection_polling_mu); + + return NULL; +} + +static void wait_until_channel_polling_thread_started_unblocking_func( + void *arg) { + (void)arg; + gpr_mu_lock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: " + "wait_until_channel_polling_thread_started_unblocking_func - begin " + "aborting connection polling"); + abort_channel_polling = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); +} + /* Temporary fix for * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899. * Transports in idle channels can get destroyed. Normally c-core re-connects, @@ -532,11 +577,26 @@ static VALUE run_poll_channels_loop(VALUE arg) { * calls - so that c-core can reconnect if needed, when there aren't any RPC's. * TODO(apolcyn) remove this when core handles new RPCs on dead connections. */ -static void start_poll_channels_loop() { - channel_polling_cq = grpc_completion_queue_create_for_next(NULL); +void grpc_rb_channel_polling_thread_start() { + VALUE background_thread = Qnil; + + GPR_ASSERT(!abort_channel_polling); + GPR_ASSERT(!channel_polling_thread_started); + GPR_ASSERT(channel_polling_cq == NULL); + gpr_mu_init(&global_connection_polling_mu); - abort_channel_polling = 0; - rb_thread_create(run_poll_channels_loop, NULL); + gpr_cv_init(&global_connection_polling_cv); + + channel_polling_cq = grpc_completion_queue_create_for_next(NULL); + background_thread = rb_thread_create(run_poll_channels_loop, NULL); + + if (!RTEST(background_thread)) { + gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread"); + gpr_mu_lock(&global_connection_polling_mu); + abort_channel_polling = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); + } } static void Init_grpc_propagate_masks() { @@ -608,7 +668,6 @@ void Init_grpc_channel() { id_insecure_channel = rb_intern("this_channel_is_insecure"); Init_grpc_propagate_masks(); Init_grpc_connectivity_states(); - start_poll_channels_loop(); } /* Gets the wrapped channel from the ruby wrapper */ |