aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_channel.c
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-03-24 10:55:39 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-03-24 10:55:39 -0700
commit944a56fa8467f7f515e7d536494fce649b88629b (patch)
tree8b57a4c18c26ee70121592708320637d6276fa2e /src/ruby/ext/grpc/rb_channel.c
parent5bec133ba03f8cdffc7e2ace03a06aa5b48aed5c (diff)
parentaf61d0b7b4b9cd48b424c3c6f7c0b3c3be2ae270 (diff)
Merge branch 'master' into cq_create_api_changes
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c269
1 files changed, 232 insertions, 37 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 54b178ad87..05f7160183 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -32,10 +32,11 @@
*/
#include <ruby/ruby.h>
+#include <ruby/thread.h>
+#include "rb_grpc_imports.generated.h"
#include "rb_byte_buffer.h"
#include "rb_channel.h"
-#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
@@ -73,9 +74,26 @@ typedef struct grpc_rb_channel {
/* The actual channel */
grpc_channel *wrapped;
- grpc_completion_queue *queue;
+ int request_safe_destroy;
+ int safe_to_destroy;
+ grpc_connectivity_state current_connectivity_state;
+
+ int mu_init_done;
+ int abort_watch_connectivity_state;
+ gpr_mu channel_mu;
+ gpr_cv channel_cv;
} grpc_rb_channel;
+/* Forward declarations of functions involved in temporary fix to
+ * https://github.com/grpc/grpc/issues/9941 */
+static void grpc_rb_channel_try_register_connection_polling(
+ grpc_rb_channel *wrapper);
+static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
+
+static grpc_completion_queue *channel_polling_cq;
+static gpr_mu global_connection_polling_mu;
+static int abort_channel_polling = 0;
+
/* Destroys Channel instances. */
static void grpc_rb_channel_free(void *p) {
grpc_rb_channel *ch = NULL;
@@ -85,8 +103,13 @@ static void grpc_rb_channel_free(void *p) {
ch = (grpc_rb_channel *)p;
if (ch->wrapped != NULL) {
- grpc_channel_destroy(ch->wrapped);
- grpc_rb_completion_queue_destroy(ch->queue);
+ grpc_rb_channel_safe_destroy(ch);
+ ch->wrapped = NULL;
+ }
+
+ if (ch->mu_init_done) {
+ gpr_mu_destroy(&ch->channel_mu);
+ gpr_cv_destroy(&ch->channel_cv);
}
xfree(p);
@@ -147,6 +170,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
+ wrapper->mu_init_done = 0;
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (TYPE(credentials) == T_SYMBOL) {
@@ -161,6 +185,27 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
creds = grpc_rb_get_wrapped_channel_credentials(credentials);
ch = grpc_secure_channel_create(creds, target_chars, &args, NULL);
}
+
+ GPR_ASSERT(ch);
+
+ wrapper->wrapped = ch;
+
+ gpr_mu_init(&wrapper->channel_mu);
+ gpr_cv_init(&wrapper->channel_cv);
+ wrapper->mu_init_done = 1;
+
+ gpr_mu_lock(&wrapper->channel_mu);
+ wrapper->abort_watch_connectivity_state = 0;
+ wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
+ wrapper->safe_to_destroy = 0;
+ wrapper->request_safe_destroy = 0;
+
+ gpr_cv_broadcast(&wrapper->channel_cv);
+ gpr_mu_unlock(&wrapper->channel_mu);
+
+
+ grpc_rb_channel_try_register_connection_polling(wrapper);
+
if (args.args != NULL) {
xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
}
@@ -171,25 +216,28 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
rb_ivar_set(self, id_target, target);
wrapper->wrapped = ch;
- wrapper->queue = grpc_completion_queue_create_for_pluck(NULL);
return self;
}
/*
call-seq:
- insecure_channel = Channel:new("myhost:8080", {'arg1': 'value1'})
- creds = ...
- secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds)
+ ch.connectivity_state -> state
+ ch.connectivity_state(true) -> state
- Creates channel instances. */
+ Indicates the current state of the channel, whose value is one of the
+ constants defined in GRPC::Core::ConnectivityStates.
+
+ It also tries to connect if the chennel is idle in the second form. */
static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
VALUE self) {
- VALUE try_to_connect = Qfalse;
+ VALUE try_to_connect_param = Qfalse;
+ int grpc_try_to_connect = 0;
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
- rb_scan_args(argc, argv, "01", try_to_connect);
+ rb_scan_args(argc, argv, "01", &try_to_connect_param);
+ grpc_try_to_connect = RTEST(try_to_connect_param) ? 1 : 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
@@ -197,47 +245,81 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
- return NUM2LONG(
- grpc_channel_check_connectivity_state(ch, (int)try_to_connect));
+ return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect));
}
-/* Watch for a change in connectivity state.
+typedef struct watch_state_stack {
+ grpc_rb_channel *wrapper;
+ gpr_timespec deadline;
+ int last_state;
+} watch_state_stack;
+
+static void *watch_channel_state_without_gvl(void *arg) {
+ watch_state_stack *stack = (watch_state_stack*)arg;
+ gpr_timespec deadline = stack->deadline;
+ grpc_rb_channel *wrapper = stack->wrapper;
+ int last_state = stack->last_state;
+ void *return_value = (void*)0;
+
+ gpr_mu_lock(&wrapper->channel_mu);
+ while(wrapper->current_connectivity_state == last_state &&
+ !wrapper->request_safe_destroy &&
+ !wrapper->safe_to_destroy &&
+ !wrapper->abort_watch_connectivity_state &&
+ gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
+ gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
+ }
+ if (wrapper->current_connectivity_state != last_state) {
+ return_value = (void*)1;
+ }
+ gpr_mu_unlock(&wrapper->channel_mu);
+
+ return return_value;
+}
- Once the channel connectivity state is different from the last observed
- state, tag will be enqueued on cq with success=1
+static void watch_channel_state_unblocking_func(void *arg) {
+ grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
+ gpr_mu_lock(&wrapper->channel_mu);
+ wrapper->abort_watch_connectivity_state = 1;
+ gpr_cv_broadcast(&wrapper->channel_cv);
+ gpr_mu_unlock(&wrapper->channel_mu);
+}
- If deadline expires BEFORE the state is changed, tag will be enqueued on
- the completion queue with success=0 */
+/* Wait until the channel's connectivity state becomes different from
+ * "last_state", or "deadline" expires.
+ * Returns true if the the channel's connectivity state becomes
+ * different from "last_state" within "deadline".
+ * Returns false if "deadline" expires before the channel's connectivity
+ * state changes from "last_state".
+ * */
static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE last_state,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
- grpc_channel *ch = NULL;
- grpc_completion_queue *cq = NULL;
-
- void *tag = wrapper;
-
- grpc_event event;
+ watch_state_stack stack;
+ void* out;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
- ch = wrapper->wrapped;
- cq = wrapper->queue;
- if (ch == NULL) {
+
+ if (wrapper->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
- grpc_channel_watch_connectivity_state(
- ch, (grpc_connectivity_state)NUM2LONG(last_state),
- grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, tag);
- event = rb_completion_queue_pluck(cq, tag, gpr_inf_future(GPR_CLOCK_REALTIME),
- NULL);
+ if (!FIXNUM_P(last_state)) {
+ rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant");
+ return Qnil;
+ }
- if (event.success) {
+ stack.wrapper = wrapper;
+ stack.deadline = grpc_rb_time_timeval(deadline, 0);
+ stack.last_state = NUM2LONG(last_state);
+ out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
+ if (out) {
return Qtrue;
- } else {
- return Qfalse;
}
+ return Qfalse;
}
/* Create a call given a grpc_channel, in order to call method. The request
@@ -313,7 +395,7 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch != NULL) {
- grpc_channel_destroy(ch);
+ grpc_rb_channel_safe_destroy(wrapper);
wrapper->wrapped = NULL;
}
@@ -334,6 +416,118 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
return res;
}
+// Either start polling channel connection state or signal that it's free to
+// destroy.
+// Not safe to call while a channel's connection state is polled.
+static void grpc_rb_channel_try_register_connection_polling(
+ grpc_rb_channel *wrapper) {
+ grpc_connectivity_state conn_state;
+ gpr_timespec sleep_time = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
+
+ GPR_ASSERT(wrapper);
+ GPR_ASSERT(wrapper->wrapped);
+ gpr_mu_lock(&wrapper->channel_mu);
+ if (wrapper->request_safe_destroy) {
+ wrapper->safe_to_destroy = 1;
+ gpr_cv_broadcast(&wrapper->channel_cv);
+ gpr_mu_unlock(&wrapper->channel_mu);
+ return;
+ }
+ gpr_mu_lock(&global_connection_polling_mu);
+
+ conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
+ if (conn_state != wrapper->current_connectivity_state) {
+ wrapper->current_connectivity_state = conn_state;
+ gpr_cv_broadcast(&wrapper->channel_cv);
+ }
+ // avoid posting work to the channel polling cq if it's been shutdown
+ if (!abort_channel_polling && conn_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_channel_watch_connectivity_state(
+ wrapper->wrapped, conn_state, sleep_time, channel_polling_cq, wrapper);
+ } else {
+ wrapper->safe_to_destroy = 1;
+ gpr_cv_broadcast(&wrapper->channel_cv);
+ }
+ gpr_mu_unlock(&global_connection_polling_mu);
+ gpr_mu_unlock(&wrapper->channel_mu);
+}
+
+// Note requires wrapper->wrapped, wrapper->channel_mu/cv initialized
+static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) {
+ gpr_mu_lock(&wrapper->channel_mu);
+ wrapper->request_safe_destroy = 1;
+
+ while (!wrapper->safe_to_destroy) {
+ gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu,
+ gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ GPR_ASSERT(wrapper->safe_to_destroy);
+ gpr_mu_unlock(&wrapper->channel_mu);
+
+ grpc_channel_destroy(wrapper->wrapped);
+}
+
+// Note this loop breaks out with a single call of
+// "grpc_rb_event_unblocking_func".
+// This assumes that a ruby call the unblocking func
+// indicates process shutdown.
+// In the worst case, this stops polling channel connectivity
+// early and falls back to current behavior.
+static void *run_poll_channels_loop_no_gil(void *arg) {
+ grpc_event event;
+ (void)arg;
+ for (;;) {
+ event = grpc_completion_queue_next(
+ channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ if (event.type == GRPC_QUEUE_SHUTDOWN) {
+ break;
+ }
+ if (event.type == GRPC_OP_COMPLETE) {
+ grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag);
+ }
+ }
+ grpc_completion_queue_destroy(channel_polling_cq);
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop");
+ return NULL;
+}
+
+// Notify the channel polling loop to cleanup and shutdown.
+static void grpc_rb_event_unblocking_func(void *arg) {
+ (void)arg;
+ gpr_mu_lock(&global_connection_polling_mu);
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting connection polling");
+ abort_channel_polling = 1;
+ grpc_completion_queue_shutdown(channel_polling_cq);
+ gpr_mu_unlock(&global_connection_polling_mu);
+}
+
+// Poll channel connectivity states in background thread without the GIL.
+static VALUE run_poll_channels_loop(VALUE arg) {
+ (void)arg;
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
+ rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
+ grpc_rb_event_unblocking_func, NULL);
+ return Qnil;
+}
+
+/* Temporary fix for
+ * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899.
+ * Transports in idle channels can get destroyed. Normally c-core re-connects,
+ * but in grpc-ruby core never gets a thread until an RPC is made, because ruby
+ * only calls c-core's "completion_queu_pluck" API.
+ * This uses a global background thread that calls
+ * "completion_queue_next" on registered "watch_channel_connectivity_state"
+ * calls - so that c-core can reconnect if needed, when there aren't any RPC's.
+ * TODO(apolcyn) remove this when core handles new RPCs on dead connections.
+ */
+static void start_poll_channels_loop() {
+ channel_polling_cq = grpc_completion_queue_create_for_next(NULL);
+ gpr_mu_init(&global_connection_polling_mu);
+ abort_channel_polling = 0;
+ rb_thread_create(run_poll_channels_loop, NULL);
+}
+
static void Init_grpc_propagate_masks() {
/* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mPropagateMasks =
@@ -383,7 +577,7 @@ void Init_grpc_channel() {
rb_define_method(grpc_rb_cChannel, "connectivity_state",
grpc_rb_channel_get_connectivity_state, -1);
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
- grpc_rb_channel_watch_connectivity_state, 4);
+ grpc_rb_channel_watch_connectivity_state, 2);
rb_define_method(grpc_rb_cChannel, "create_call", grpc_rb_channel_create_call,
5);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
@@ -403,6 +597,7 @@ void Init_grpc_channel() {
id_insecure_channel = rb_intern("this_channel_is_insecure");
Init_grpc_propagate_masks();
Init_grpc_connectivity_states();
+ start_poll_channels_loop();
}
/* Gets the wrapped channel from the ruby wrapper */