diff options
author | 2017-03-24 10:55:39 -0700 | |
---|---|---|
committer | 2017-03-24 10:55:39 -0700 | |
commit | 944a56fa8467f7f515e7d536494fce649b88629b (patch) | |
tree | 8b57a4c18c26ee70121592708320637d6276fa2e /src/ruby/ext/grpc/rb_channel.c | |
parent | 5bec133ba03f8cdffc7e2ace03a06aa5b48aed5c (diff) | |
parent | af61d0b7b4b9cd48b424c3c6f7c0b3c3be2ae270 (diff) |
Merge branch 'master' into cq_create_api_changes
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 269 |
1 files changed, 232 insertions, 37 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 54b178ad87..05f7160183 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -32,10 +32,11 @@ */ #include <ruby/ruby.h> +#include <ruby/thread.h> +#include "rb_grpc_imports.generated.h" #include "rb_byte_buffer.h" #include "rb_channel.h" -#include "rb_grpc_imports.generated.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> @@ -73,9 +74,26 @@ typedef struct grpc_rb_channel { /* The actual channel */ grpc_channel *wrapped; - grpc_completion_queue *queue; + int request_safe_destroy; + int safe_to_destroy; + grpc_connectivity_state current_connectivity_state; + + int mu_init_done; + int abort_watch_connectivity_state; + gpr_mu channel_mu; + gpr_cv channel_cv; } grpc_rb_channel; +/* Forward declarations of functions involved in temporary fix to + * https://github.com/grpc/grpc/issues/9941 */ +static void grpc_rb_channel_try_register_connection_polling( + grpc_rb_channel *wrapper); +static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); + +static grpc_completion_queue *channel_polling_cq; +static gpr_mu global_connection_polling_mu; +static int abort_channel_polling = 0; + /* Destroys Channel instances. */ static void grpc_rb_channel_free(void *p) { grpc_rb_channel *ch = NULL; @@ -85,8 +103,13 @@ static void grpc_rb_channel_free(void *p) { ch = (grpc_rb_channel *)p; if (ch->wrapped != NULL) { - grpc_channel_destroy(ch->wrapped); - grpc_rb_completion_queue_destroy(ch->queue); + grpc_rb_channel_safe_destroy(ch); + ch->wrapped = NULL; + } + + if (ch->mu_init_done) { + gpr_mu_destroy(&ch->channel_mu); + gpr_cv_destroy(&ch->channel_cv); } xfree(p); @@ -147,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); + wrapper->mu_init_done = 0; target_chars = StringValueCStr(target); grpc_rb_hash_convert_to_channel_args(channel_args, &args); if (TYPE(credentials) == T_SYMBOL) { @@ -161,6 +185,27 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { creds = grpc_rb_get_wrapped_channel_credentials(credentials); ch = grpc_secure_channel_create(creds, target_chars, &args, NULL); } + + GPR_ASSERT(ch); + + wrapper->wrapped = ch; + + gpr_mu_init(&wrapper->channel_mu); + gpr_cv_init(&wrapper->channel_cv); + wrapper->mu_init_done = 1; + + gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 0; + wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + wrapper->safe_to_destroy = 0; + wrapper->request_safe_destroy = 0; + + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); + + + grpc_rb_channel_try_register_connection_polling(wrapper); + if (args.args != NULL) { xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */ } @@ -171,25 +216,28 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } rb_ivar_set(self, id_target, target); wrapper->wrapped = ch; - wrapper->queue = grpc_completion_queue_create_for_pluck(NULL); return self; } /* call-seq: - insecure_channel = Channel:new("myhost:8080", {'arg1': 'value1'}) - creds = ... - secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds) + ch.connectivity_state -> state + ch.connectivity_state(true) -> state - Creates channel instances. */ + Indicates the current state of the channel, whose value is one of the + constants defined in GRPC::Core::ConnectivityStates. + + It also tries to connect if the chennel is idle in the second form. */ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, VALUE self) { - VALUE try_to_connect = Qfalse; + VALUE try_to_connect_param = Qfalse; + int grpc_try_to_connect = 0; grpc_rb_channel *wrapper = NULL; grpc_channel *ch = NULL; /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ - rb_scan_args(argc, argv, "01", try_to_connect); + rb_scan_args(argc, argv, "01", &try_to_connect_param); + grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; @@ -197,47 +245,81 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return NUM2LONG( - grpc_channel_check_connectivity_state(ch, (int)try_to_connect)); + return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect)); } -/* Watch for a change in connectivity state. +typedef struct watch_state_stack { + grpc_rb_channel *wrapper; + gpr_timespec deadline; + int last_state; +} watch_state_stack; + +static void *watch_channel_state_without_gvl(void *arg) { + watch_state_stack *stack = (watch_state_stack*)arg; + gpr_timespec deadline = stack->deadline; + grpc_rb_channel *wrapper = stack->wrapper; + int last_state = stack->last_state; + void *return_value = (void*)0; + + gpr_mu_lock(&wrapper->channel_mu); + while(wrapper->current_connectivity_state == last_state && + !wrapper->request_safe_destroy && + !wrapper->safe_to_destroy && + !wrapper->abort_watch_connectivity_state && + gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); + } + if (wrapper->current_connectivity_state != last_state) { + return_value = (void*)1; + } + gpr_mu_unlock(&wrapper->channel_mu); + + return return_value; +} - Once the channel connectivity state is different from the last observed - state, tag will be enqueued on cq with success=1 +static void watch_channel_state_unblocking_func(void *arg) { + grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); + gpr_mu_lock(&wrapper->channel_mu); + wrapper->abort_watch_connectivity_state = 1; + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); +} - If deadline expires BEFORE the state is changed, tag will be enqueued on - the completion queue with success=0 */ +/* Wait until the channel's connectivity state becomes different from + * "last_state", or "deadline" expires. + * Returns true if the the channel's connectivity state becomes + * different from "last_state" within "deadline". + * Returns false if "deadline" expires before the channel's connectivity + * state changes from "last_state". + * */ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; - grpc_completion_queue *cq = NULL; - - void *tag = wrapper; - - grpc_event event; + watch_state_stack stack; + void* out; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); - ch = wrapper->wrapped; - cq = wrapper->queue; - if (ch == NULL) { + + if (wrapper->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - grpc_channel_watch_connectivity_state( - ch, (grpc_connectivity_state)NUM2LONG(last_state), - grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag); - event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME), - NULL); + if (!FIXNUM_P(last_state)) { + rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant"); + return Qnil; + } - if (event.success) { + stack.wrapper = wrapper; + stack.deadline = grpc_rb_time_timeval(deadline, 0); + stack.last_state = NUM2LONG(last_state); + out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); + if (out) { return Qtrue; - } else { - return Qfalse; } + return Qfalse; } /* Create a call given a grpc_channel, in order to call method. The request @@ -313,7 +395,7 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch != NULL) { - grpc_channel_destroy(ch); + grpc_rb_channel_safe_destroy(wrapper); wrapper->wrapped = NULL; } @@ -334,6 +416,118 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { return res; } +// Either start polling channel connection state or signal that it's free to +// destroy. +// Not safe to call while a channel's connection state is polled. +static void grpc_rb_channel_try_register_connection_polling( + grpc_rb_channel *wrapper) { + grpc_connectivity_state conn_state; + gpr_timespec sleep_time = gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); + + GPR_ASSERT(wrapper); + GPR_ASSERT(wrapper->wrapped); + gpr_mu_lock(&wrapper->channel_mu); + if (wrapper->request_safe_destroy) { + wrapper->safe_to_destroy = 1; + gpr_cv_broadcast(&wrapper->channel_cv); + gpr_mu_unlock(&wrapper->channel_mu); + return; + } + gpr_mu_lock(&global_connection_polling_mu); + + conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + if (conn_state != wrapper->current_connectivity_state) { + wrapper->current_connectivity_state = conn_state; + gpr_cv_broadcast(&wrapper->channel_cv); + } + // avoid posting work to the channel polling cq if it's been shutdown + if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) { + grpc_channel_watch_connectivity_state( + wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper); + } else { + wrapper->safe_to_destroy = 1; + gpr_cv_broadcast(&wrapper->channel_cv); + } + gpr_mu_unlock(&global_connection_polling_mu); + gpr_mu_unlock(&wrapper->channel_mu); +} + +// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized +static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { + gpr_mu_lock(&wrapper->channel_mu); + wrapper->request_safe_destroy = 1; + + while (!wrapper->safe_to_destroy) { + gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + GPR_ASSERT(wrapper->safe_to_destroy); + gpr_mu_unlock(&wrapper->channel_mu); + + grpc_channel_destroy(wrapper->wrapped); +} + +// Note this loop breaks out with a single call of +// "grpc_rb_event_unblocking_func". +// This assumes that a ruby call the unblocking func +// indicates process shutdown. +// In the worst case, this stops polling channel connectivity +// early and falls back to current behavior. +static void *run_poll_channels_loop_no_gil(void *arg) { + grpc_event event; + (void)arg; + for (;;) { + event = grpc_completion_queue_next( + channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + if (event.type == GRPC_QUEUE_SHUTDOWN) { + break; + } + if (event.type == GRPC_OP_COMPLETE) { + grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag); + } + } + grpc_completion_queue_destroy(channel_polling_cq); + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop"); + return NULL; +} + +// Notify the channel polling loop to cleanup and shutdown. +static void grpc_rb_event_unblocking_func(void *arg) { + (void)arg; + gpr_mu_lock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting connection polling"); + abort_channel_polling = 1; + grpc_completion_queue_shutdown(channel_polling_cq); + gpr_mu_unlock(&global_connection_polling_mu); +} + +// Poll channel connectivity states in background thread without the GIL. +static VALUE run_poll_channels_loop(VALUE arg) { + (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); + rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, + grpc_rb_event_unblocking_func, NULL); + return Qnil; +} + +/* Temporary fix for + * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899. + * Transports in idle channels can get destroyed. Normally c-core re-connects, + * but in grpc-ruby core never gets a thread until an RPC is made, because ruby + * only calls c-core's "completion_queu_pluck" API. + * This uses a global background thread that calls + * "completion_queue_next" on registered "watch_channel_connectivity_state" + * calls - so that c-core can reconnect if needed, when there aren't any RPC's. + * TODO(apolcyn) remove this when core handles new RPCs on dead connections. + */ +static void start_poll_channels_loop() { + channel_polling_cq = grpc_completion_queue_create_for_next(NULL); + gpr_mu_init(&global_connection_polling_mu); + abort_channel_polling = 0; + rb_thread_create(run_poll_channels_loop, NULL); +} + static void Init_grpc_propagate_masks() { /* Constants representing call propagation masks in grpc.h */ VALUE grpc_rb_mPropagateMasks = @@ -383,7 +577,7 @@ void Init_grpc_channel() { rb_define_method(grpc_rb_cChannel, "connectivity_state", grpc_rb_channel_get_connectivity_state, -1); rb_define_method(grpc_rb_cChannel, "watch_connectivity_state", - grpc_rb_channel_watch_connectivity_state, 4); + grpc_rb_channel_watch_connectivity_state, 2); rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call, 5); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); @@ -403,6 +597,7 @@ void Init_grpc_channel() { id_insecure_channel = rb_intern("this_channel_is_insecure"); Init_grpc_propagate_masks(); Init_grpc_connectivity_states(); + start_poll_channels_loop(); } /* Gets the wrapped channel from the ruby wrapper */ |