aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_channel.c
diff options
context:
space:
mode:
authorGravatar Alexander Polcyn <apolcyn@google.com>2017-04-26 14:18:39 -0700
committerGravatar Alexander Polcyn <apolcyn@google.com>2017-04-26 14:18:39 -0700
commitf5521c33f9094f2f1e416764c4d2020237d65bc6 (patch)
treee8ef2105ec499e1ebf6e840bb0140e5f1c29550d /src/ruby/ext/grpc/rb_channel.c
parent79759fea1abd09102d38af3e13a84b69e898124b (diff)
Revert "Merge branch 'master' into v1.3.x"
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c74
1 files changed, 29 insertions, 45 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index a802183726..fb610f548e 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -34,9 +34,9 @@
#include <ruby/ruby.h>
#include <ruby/thread.h>
+#include "rb_grpc_imports.generated.h"
#include "rb_byte_buffer.h"
#include "rb_channel.h"
-#include "rb_grpc_imports.generated.h"
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
@@ -89,8 +89,8 @@ typedef struct grpc_rb_channel {
static void grpc_rb_channel_try_register_connection_polling(
grpc_rb_channel *wrapper);
static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper);
-static void *wait_until_channel_polling_thread_started_no_gil(void *);
-static void wait_until_channel_polling_thread_started_unblocking_func(void *);
+static void *wait_until_channel_polling_thread_started_no_gil(void*);
+static void wait_until_channel_polling_thread_started_unblocking_func(void*);
static grpc_completion_queue *channel_polling_cq;
static gpr_mu global_connection_polling_mu;
@@ -171,9 +171,8 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
MEMZERO(&args, grpc_channel_args, 1);
grpc_ruby_once_init();
- rb_thread_call_without_gvl(
- wait_until_channel_polling_thread_started_no_gil, NULL,
- wait_until_channel_polling_thread_started_unblocking_func, NULL);
+ rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, NULL,
+ wait_until_channel_polling_thread_started_unblocking_func, NULL);
/* "3" == 3 mandatory args */
rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials);
@@ -205,14 +204,14 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
gpr_mu_lock(&wrapper->channel_mu);
wrapper->abort_watch_connectivity_state = 0;
- wrapper->current_connectivity_state =
- grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
+ wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0);
wrapper->safe_to_destroy = 0;
wrapper->request_safe_destroy = 0;
gpr_cv_broadcast(&wrapper->channel_cv);
gpr_mu_unlock(&wrapper->channel_mu);
+
grpc_rb_channel_try_register_connection_polling(wrapper);
if (args.args != NULL) {
@@ -254,8 +253,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
- return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped,
- grpc_try_to_connect));
+ return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect));
}
typedef struct watch_state_stack {
@@ -265,21 +263,22 @@ typedef struct watch_state_stack {
} watch_state_stack;
static void *watch_channel_state_without_gvl(void *arg) {
- watch_state_stack *stack = (watch_state_stack *)arg;
+ watch_state_stack *stack = (watch_state_stack*)arg;
gpr_timespec deadline = stack->deadline;
grpc_rb_channel *wrapper = stack->wrapper;
int last_state = stack->last_state;
- void *return_value = (void *)0;
+ void *return_value = (void*)0;
gpr_mu_lock(&wrapper->channel_mu);
- while (wrapper->current_connectivity_state == last_state &&
- !wrapper->request_safe_destroy && !wrapper->safe_to_destroy &&
- !wrapper->abort_watch_connectivity_state &&
- gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
+ while(wrapper->current_connectivity_state == last_state &&
+ !wrapper->request_safe_destroy &&
+ !wrapper->safe_to_destroy &&
+ !wrapper->abort_watch_connectivity_state &&
+ gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) {
gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline);
}
if (wrapper->current_connectivity_state != last_state) {
- return_value = (void *)1;
+ return_value = (void*)1;
}
gpr_mu_unlock(&wrapper->channel_mu);
@@ -287,7 +286,7 @@ static void *watch_channel_state_without_gvl(void *arg) {
}
static void watch_channel_state_unblocking_func(void *arg) {
- grpc_rb_channel *wrapper = (grpc_rb_channel *)arg;
+ grpc_rb_channel *wrapper = (grpc_rb_channel*)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called");
gpr_mu_lock(&wrapper->channel_mu);
wrapper->abort_watch_connectivity_state = 1;
@@ -307,7 +306,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE deadline) {
grpc_rb_channel *wrapper = NULL;
watch_state_stack stack;
- void *out;
+ void* out;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@@ -317,18 +316,14 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
}
if (!FIXNUM_P(last_state)) {
- rb_raise(
- rb_eTypeError,
- "bad type for last_state. want a GRPC::Core::ChannelState constant");
+ rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant");
return Qnil;
}
stack.wrapper = wrapper;
stack.deadline = grpc_rb_time_timeval(deadline, 0);
stack.last_state = NUM2LONG(last_state);
- out =
- rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack,
- watch_channel_state_unblocking_func, wrapper);
+ out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper);
if (out) {
return Qtrue;
}
@@ -364,7 +359,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
parent_call = grpc_rb_get_wrapped_call(parent);
}
- cq = grpc_completion_queue_create_for_pluck(NULL);
+ cq = grpc_completion_queue_create(NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
@@ -433,7 +428,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
// destroy.
// Not safe to call while a channel's connection state is polled.
static void grpc_rb_channel_try_register_connection_polling(
- grpc_rb_channel *wrapper) {
+ grpc_rb_channel *wrapper) {
grpc_connectivity_state conn_state;
gpr_timespec sleep_time = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN));
@@ -506,14 +501,11 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
break;
}
if (event.type == GRPC_OP_COMPLETE) {
- grpc_rb_channel_try_register_connection_polling(
- (grpc_rb_channel *)event.tag);
+ grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag);
}
}
grpc_completion_queue_destroy(channel_polling_cq);
- gpr_log(GPR_DEBUG,
- "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling "
- "loop");
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop");
return NULL;
}
@@ -521,9 +513,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
static void run_poll_channels_loop_unblocking_func(void *arg) {
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
- gpr_log(GPR_DEBUG,
- "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting "
- "connection polling");
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling");
abort_channel_polling = 1;
grpc_completion_queue_shutdown(channel_polling_cq);
gpr_mu_unlock(&global_connection_polling_mu);
@@ -532,9 +522,7 @@ static void run_poll_channels_loop_unblocking_func(void *arg) {
// Poll channel connectivity states in background thread without the GIL.
static VALUE run_poll_channels_loop(VALUE arg) {
(void)arg;
- gpr_log(
- GPR_DEBUG,
- "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread");
rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL,
run_poll_channels_loop_unblocking_func, NULL);
@@ -554,14 +542,10 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
return NULL;
}
-static void wait_until_channel_polling_thread_started_unblocking_func(
- void *arg) {
+static void wait_until_channel_polling_thread_started_unblocking_func(void* arg) {
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
- gpr_log(GPR_DEBUG,
- "GRPC_RUBY: "
- "wait_until_channel_polling_thread_started_unblocking_func - begin "
- "aborting connection polling");
+ gpr_log(GPR_DEBUG, "GRPC_RUBY: wait_until_channel_polling_thread_started_unblocking_func - begin aborting connection polling");
abort_channel_polling = 1;
gpr_cv_broadcast(&global_connection_polling_cv);
gpr_mu_unlock(&global_connection_polling_mu);
@@ -587,7 +571,7 @@ void grpc_rb_channel_polling_thread_start() {
gpr_mu_init(&global_connection_polling_mu);
gpr_cv_init(&global_connection_polling_cv);
- channel_polling_cq = grpc_completion_queue_create_for_next(NULL);
+ channel_polling_cq = grpc_completion_queue_create(NULL);
background_thread = rb_thread_create(run_poll_channels_loop, NULL);
if (!RTEST(background_thread)) {