diff options
author | 2017-04-26 14:18:39 -0700 | |
---|---|---|
committer | 2017-04-26 14:18:39 -0700 | |
commit | f5521c33f9094f2f1e416764c4d2020237d65bc6 (patch) | |
tree | e8ef2105ec499e1ebf6e840bb0140e5f1c29550d /src/ruby/ext/grpc/rb_channel.c | |
parent | 79759fea1abd09102d38af3e13a84b69e898124b (diff) |
Revert "Merge branch 'master' into v1.3.x"
This reverts commit 79759fea1abd09102d38af3e13a84b69e898124b, reversing
changes made to dc36f4df6aba60275a31227e1d29c4d20b6cadca.
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 74 |
1 files changed, 29 insertions, 45 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index a802183726..fb610f548e 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -34,9 +34,9 @@ #include <ruby/ruby.h> #include <ruby/thread.h> +#include "rb_grpc_imports.generated.h" #include "rb_byte_buffer.h" #include "rb_channel.h" -#include "rb_grpc_imports.generated.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> @@ -89,8 +89,8 @@ typedef struct grpc_rb_channel { static void grpc_rb_channel_try_register_connection_polling( grpc_rb_channel *wrapper); static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); -static void *wait_until_channel_polling_thread_started_no_gil(void *); -static void wait_until_channel_polling_thread_started_unblocking_func(void *); +static void *wait_until_channel_polling_thread_started_no_gil(void*); +static void wait_until_channel_polling_thread_started_unblocking_func(void*); static grpc_completion_queue *channel_polling_cq; static gpr_mu global_connection_polling_mu; @@ -171,9 +171,8 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { MEMZERO(&args, grpc_channel_args, 1); grpc_ruby_once_init(); - rb_thread_call_without_gvl( - wait_until_channel_polling_thread_started_no_gil, NULL, - wait_until_channel_polling_thread_started_unblocking_func, NULL); + rb_thread_call_without_gvl(wait_until_channel_polling_thread_started_no_gil, NULL, + wait_until_channel_polling_thread_started_unblocking_func, NULL); /* "3" == 3 mandatory args */ rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); @@ -205,14 +204,14 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { gpr_mu_lock(&wrapper->channel_mu); wrapper->abort_watch_connectivity_state = 0; - wrapper->current_connectivity_state = - grpc_channel_check_connectivity_state(wrapper->wrapped, 0); + wrapper->current_connectivity_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); wrapper->safe_to_destroy = 0; wrapper->request_safe_destroy = 0; gpr_cv_broadcast(&wrapper->channel_cv); gpr_mu_unlock(&wrapper->channel_mu); + grpc_rb_channel_try_register_connection_polling(wrapper); if (args.args != NULL) { @@ -254,8 +253,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, rb_raise(rb_eRuntimeError, "closed!"); return Qnil; } - return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, - grpc_try_to_connect)); + return LONG2NUM(grpc_channel_check_connectivity_state(wrapper->wrapped, grpc_try_to_connect)); } typedef struct watch_state_stack { @@ -265,21 +263,22 @@ typedef struct watch_state_stack { } watch_state_stack; static void *watch_channel_state_without_gvl(void *arg) { - watch_state_stack *stack = (watch_state_stack *)arg; + watch_state_stack *stack = (watch_state_stack*)arg; gpr_timespec deadline = stack->deadline; grpc_rb_channel *wrapper = stack->wrapper; int last_state = stack->last_state; - void *return_value = (void *)0; + void *return_value = (void*)0; gpr_mu_lock(&wrapper->channel_mu); - while (wrapper->current_connectivity_state == last_state && - !wrapper->request_safe_destroy && !wrapper->safe_to_destroy && - !wrapper->abort_watch_connectivity_state && - gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { + while(wrapper->current_connectivity_state == last_state && + !wrapper->request_safe_destroy && + !wrapper->safe_to_destroy && + !wrapper->abort_watch_connectivity_state && + gpr_time_cmp(deadline, gpr_now(GPR_CLOCK_REALTIME)) > 0) { gpr_cv_wait(&wrapper->channel_cv, &wrapper->channel_mu, deadline); } if (wrapper->current_connectivity_state != last_state) { - return_value = (void *)1; + return_value = (void*)1; } gpr_mu_unlock(&wrapper->channel_mu); @@ -287,7 +286,7 @@ static void *watch_channel_state_without_gvl(void *arg) { } static void watch_channel_state_unblocking_func(void *arg) { - grpc_rb_channel *wrapper = (grpc_rb_channel *)arg; + grpc_rb_channel *wrapper = (grpc_rb_channel*)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: watch channel state unblocking func called"); gpr_mu_lock(&wrapper->channel_mu); wrapper->abort_watch_connectivity_state = 1; @@ -307,7 +306,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE deadline) { grpc_rb_channel *wrapper = NULL; watch_state_stack stack; - void *out; + void* out; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -317,18 +316,14 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, } if (!FIXNUM_P(last_state)) { - rb_raise( - rb_eTypeError, - "bad type for last_state. want a GRPC::Core::ChannelState constant"); + rb_raise(rb_eTypeError, "bad type for last_state. want a GRPC::Core::ChannelState constant"); return Qnil; } stack.wrapper = wrapper; stack.deadline = grpc_rb_time_timeval(deadline, 0); stack.last_state = NUM2LONG(last_state); - out = - rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, - watch_channel_state_unblocking_func, wrapper); + out = rb_thread_call_without_gvl(watch_channel_state_without_gvl, &stack, watch_channel_state_unblocking_func, wrapper); if (out) { return Qtrue; } @@ -364,7 +359,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, parent_call = grpc_rb_get_wrapped_call(parent); } - cq = grpc_completion_queue_create_for_pluck(NULL); + cq = grpc_completion_queue_create(NULL); TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; if (ch == NULL) { @@ -433,7 +428,7 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { // destroy. // Not safe to call while a channel's connection state is polled. static void grpc_rb_channel_try_register_connection_polling( - grpc_rb_channel *wrapper) { + grpc_rb_channel *wrapper) { grpc_connectivity_state conn_state; gpr_timespec sleep_time = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(20, GPR_TIMESPAN)); @@ -506,14 +501,11 @@ static void *run_poll_channels_loop_no_gil(void *arg) { break; } if (event.type == GRPC_OP_COMPLETE) { - grpc_rb_channel_try_register_connection_polling( - (grpc_rb_channel *)event.tag); + grpc_rb_channel_try_register_connection_polling((grpc_rb_channel *)event.tag); } } grpc_completion_queue_destroy(channel_polling_cq); - gpr_log(GPR_DEBUG, - "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling " - "loop"); + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - exit connection polling loop"); return NULL; } @@ -521,9 +513,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) { static void run_poll_channels_loop_unblocking_func(void *arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); - gpr_log(GPR_DEBUG, - "GRPC_RUBY: grpc_rb_event_unblocking_func - begin aborting " - "connection polling"); + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_unblocking_func - begin aborting connection polling"); abort_channel_polling = 1; grpc_completion_queue_shutdown(channel_polling_cq); gpr_mu_unlock(&global_connection_polling_mu); @@ -532,9 +522,7 @@ static void run_poll_channels_loop_unblocking_func(void *arg) { // Poll channel connectivity states in background thread without the GIL. static VALUE run_poll_channels_loop(VALUE arg) { (void)arg; - gpr_log( - GPR_DEBUG, - "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, run_poll_channels_loop_unblocking_func, NULL); @@ -554,14 +542,10 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { return NULL; } -static void wait_until_channel_polling_thread_started_unblocking_func( - void *arg) { +static void wait_until_channel_polling_thread_started_unblocking_func(void* arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); - gpr_log(GPR_DEBUG, - "GRPC_RUBY: " - "wait_until_channel_polling_thread_started_unblocking_func - begin " - "aborting connection polling"); + gpr_log(GPR_DEBUG, "GRPC_RUBY: wait_until_channel_polling_thread_started_unblocking_func - begin aborting connection polling"); abort_channel_polling = 1; gpr_cv_broadcast(&global_connection_polling_cv); gpr_mu_unlock(&global_connection_polling_mu); @@ -587,7 +571,7 @@ void grpc_rb_channel_polling_thread_start() { gpr_mu_init(&global_connection_polling_mu); gpr_cv_init(&global_connection_polling_cv); - channel_polling_cq = grpc_completion_queue_create_for_next(NULL); + channel_polling_cq = grpc_completion_queue_create(NULL); background_thread = rb_thread_create(run_poll_channels_loop, NULL); if (!RTEST(background_thread)) { |