diff options
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_channel.c | 174 |
1 files changed, 87 insertions, 87 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index f0af54d9e5..1d11a53aa7 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -54,9 +54,9 @@ static VALUE grpc_rb_cChannel = Qnil; static VALUE grpc_rb_cChannelArgs; typedef struct bg_watched_channel { - grpc_channel *channel; + grpc_channel* channel; // these fields must only be accessed under global_connection_polling_mu - struct bg_watched_channel *next; + struct bg_watched_channel* next; int channel_destroyed; int refcount; } bg_watched_channel; @@ -67,7 +67,7 @@ typedef struct grpc_rb_channel { /* The actual channel (protected in a wrapper to tell when it's safe to * destroy) */ - bg_watched_channel *bg_wrapped; + bg_watched_channel* bg_wrapped; } grpc_rb_channel; typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type; @@ -82,40 +82,40 @@ typedef struct watch_state_op { int called_back; } api_callback_args; struct { - bg_watched_channel *bg; + bg_watched_channel* bg; } continuous_watch_callback_args; } op; } watch_state_op; -static bg_watched_channel *bg_watched_channel_list_head = NULL; +static bg_watched_channel* bg_watched_channel_list_head = NULL; static void grpc_rb_channel_try_register_connection_polling( - bg_watched_channel *bg); -static void *wait_until_channel_polling_thread_started_no_gil(void *); -static void wait_until_channel_polling_thread_started_unblocking_func(void *); -static void *channel_init_try_register_connection_polling_without_gil( - void *arg); + bg_watched_channel* bg); +static void* wait_until_channel_polling_thread_started_no_gil(void*); +static void wait_until_channel_polling_thread_started_unblocking_func(void*); +static void* channel_init_try_register_connection_polling_without_gil( + void* arg); typedef struct channel_init_try_register_stack { - grpc_channel *channel; - grpc_rb_channel *wrapper; + grpc_channel* channel; + grpc_rb_channel* wrapper; } channel_init_try_register_stack; -static grpc_completion_queue *channel_polling_cq; +static grpc_completion_queue* channel_polling_cq; static gpr_mu global_connection_polling_mu; static gpr_cv global_connection_polling_cv; static int abort_channel_polling = 0; static int channel_polling_thread_started = 0; -static int bg_watched_channel_list_lookup(bg_watched_channel *bg); -static bg_watched_channel *bg_watched_channel_list_create_and_add( - grpc_channel *channel); -static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg); -static void run_poll_channels_loop_unblocking_func(void *arg); +static int bg_watched_channel_list_lookup(bg_watched_channel* bg); +static bg_watched_channel* bg_watched_channel_list_create_and_add( + grpc_channel* channel); +static void bg_watched_channel_list_free_and_remove(bg_watched_channel* bg); +static void run_poll_channels_loop_unblocking_func(void* arg); // Needs to be called under global_connection_polling_mu static void grpc_rb_channel_watch_connection_state_op_complete( - watch_state_op *op, int success) { + watch_state_op* op, int success) { GPR_ASSERT(!op->op.api_callback_args.called_back); op->op.api_callback_args.called_back = 1; op->op.api_callback_args.success = success; @@ -124,7 +124,7 @@ static void grpc_rb_channel_watch_connection_state_op_complete( } /* Avoids destroying a channel twice. */ -static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { +static void grpc_rb_channel_safe_destroy(bg_watched_channel* bg) { gpr_mu_lock(&global_connection_polling_mu); GPR_ASSERT(bg_watched_channel_list_lookup(bg)); if (!bg->channel_destroyed) { @@ -138,18 +138,18 @@ static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) { gpr_mu_unlock(&global_connection_polling_mu); } -static void *channel_safe_destroy_without_gil(void *arg) { - grpc_rb_channel_safe_destroy((bg_watched_channel *)arg); +static void* channel_safe_destroy_without_gil(void* arg) { + grpc_rb_channel_safe_destroy((bg_watched_channel*)arg); return NULL; } /* Destroys Channel instances. */ -static void grpc_rb_channel_free(void *p) { - grpc_rb_channel *ch = NULL; +static void grpc_rb_channel_free(void* p) { + grpc_rb_channel* ch = NULL; if (p == NULL) { return; }; - ch = (grpc_rb_channel *)p; + ch = (grpc_rb_channel*)p; if (ch->bg_wrapped != NULL) { /* assumption made here: it's ok to directly gpr_mu_lock the global @@ -164,12 +164,12 @@ static void grpc_rb_channel_free(void *p) { } /* Protects the mark object from GC */ -static void grpc_rb_channel_mark(void *p) { - grpc_rb_channel *channel = NULL; +static void grpc_rb_channel_mark(void* p) { + grpc_rb_channel* channel = NULL; if (p == NULL) { return; } - channel = (grpc_rb_channel *)p; + channel = (grpc_rb_channel*)p; if (channel->credentials != Qnil) { rb_gc_mark(channel->credentials); } @@ -189,7 +189,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel", /* Allocates grpc_rb_channel instances. */ static VALUE grpc_rb_channel_alloc(VALUE cls) { - grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel); + grpc_rb_channel* wrapper = ALLOC(grpc_rb_channel); wrapper->bg_wrapped = NULL; wrapper->credentials = Qnil; return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper); @@ -203,14 +203,14 @@ static VALUE grpc_rb_channel_alloc(VALUE cls) { secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds) Creates channel instances. */ -static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { +static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) { VALUE channel_args = Qnil; VALUE credentials = Qnil; VALUE target = Qnil; - grpc_rb_channel *wrapper = NULL; - grpc_channel *ch = NULL; - grpc_channel_credentials *creds = NULL; - char *target_chars = NULL; + grpc_rb_channel* wrapper = NULL; + grpc_channel* ch = NULL; + grpc_channel_credentials* creds = NULL; + char* target_chars = NULL; grpc_channel_args args; channel_init_try_register_stack stack; int stop_waiting_for_thread_start = 0; @@ -262,13 +262,13 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } typedef struct get_state_stack { - bg_watched_channel *bg; + bg_watched_channel* bg; int try_to_connect; int out; } get_state_stack; -static void *get_state_without_gil(void *arg) { - get_state_stack *stack = (get_state_stack *)arg; +static void* get_state_without_gil(void* arg) { + get_state_stack* stack = (get_state_stack*)arg; gpr_mu_lock(&global_connection_polling_mu); GPR_ASSERT(abort_channel_polling || channel_polling_thread_started); @@ -292,10 +292,10 @@ static void *get_state_without_gil(void *arg) { constants defined in GRPC::Core::ConnectivityStates. It also tries to connect if the chennel is idle in the second form. */ -static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, +static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE* argv, VALUE self) { VALUE try_to_connect_param = Qfalse; - grpc_rb_channel *wrapper = NULL; + grpc_rb_channel* wrapper = NULL; get_state_stack stack; /* "01" == 0 mandatory args, 1 (try_to_connect) is optional */ @@ -315,22 +315,22 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, } typedef struct watch_state_stack { - grpc_channel *channel; + grpc_channel* channel; gpr_timespec deadline; int last_state; } watch_state_stack; -static void *wait_for_watch_state_op_complete_without_gvl(void *arg) { - watch_state_stack *stack = (watch_state_stack *)arg; - watch_state_op *op = NULL; - void *success = (void *)0; +static void* wait_for_watch_state_op_complete_without_gvl(void* arg) { + watch_state_stack* stack = (watch_state_stack*)arg; + watch_state_op* op = NULL; + void* success = (void*)0; gpr_mu_lock(&global_connection_polling_mu); // its unsafe to do a "watch" after "channel polling abort" because the cq has // been shut down. if (abort_channel_polling) { gpr_mu_unlock(&global_connection_polling_mu); - return (void *)0; + return (void*)0; } op = gpr_zalloc(sizeof(watch_state_op)); op->op_type = WATCH_STATE_API; @@ -343,15 +343,15 @@ static void *wait_for_watch_state_op_complete_without_gvl(void *arg) { gpr_inf_future(GPR_CLOCK_REALTIME)); } if (op->op.api_callback_args.success) { - success = (void *)1; + success = (void*)1; } gpr_free(op); gpr_mu_unlock(&global_connection_polling_mu); return success; } -static void wait_for_watch_state_op_complete_unblocking_func(void *arg) { - bg_watched_channel *bg = (bg_watched_channel *)arg; +static void wait_for_watch_state_op_complete_unblocking_func(void* arg) { + bg_watched_channel* bg = (bg_watched_channel*)arg; gpr_mu_lock(&global_connection_polling_mu); if (!bg->channel_destroyed) { grpc_channel_destroy(bg->channel); @@ -370,9 +370,9 @@ static void wait_for_watch_state_op_complete_unblocking_func(void *arg) { static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, VALUE deadline) { - grpc_rb_channel *wrapper = NULL; + grpc_rb_channel* wrapper = NULL; watch_state_stack stack; - void *op_success = 0; + void* op_success = 0; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); @@ -405,15 +405,15 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, VALUE method, VALUE host, VALUE deadline) { VALUE res = Qnil; - grpc_rb_channel *wrapper = NULL; - grpc_call *call = NULL; - grpc_call *parent_call = NULL; - grpc_completion_queue *cq = NULL; + grpc_rb_channel* wrapper = NULL; + grpc_call* call = NULL; + grpc_call* parent_call = NULL; + grpc_completion_queue* cq = NULL; int flags = GRPC_PROPAGATE_DEFAULTS; grpc_slice method_slice; grpc_slice host_slice; - grpc_slice *host_slice_ptr = NULL; - char *tmp_str = NULL; + grpc_slice* host_slice_ptr = NULL; + char* tmp_str = NULL; if (host != Qnil) { host_slice = @@ -466,7 +466,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask, /* Note this is an API-level call; a wrapped channel's finalizer doesn't call * this */ static VALUE grpc_rb_channel_destroy(VALUE self) { - grpc_rb_channel *wrapper = NULL; + grpc_rb_channel* wrapper = NULL; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); if (wrapper->bg_wrapped != NULL) { @@ -480,9 +480,9 @@ static VALUE grpc_rb_channel_destroy(VALUE self) { /* Called to obtain the target that this channel accesses. */ static VALUE grpc_rb_channel_get_target(VALUE self) { - grpc_rb_channel *wrapper = NULL; + grpc_rb_channel* wrapper = NULL; VALUE res = Qnil; - char *target = NULL; + char* target = NULL; TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); target = grpc_channel_get_target(wrapper->bg_wrapped->channel); @@ -493,8 +493,8 @@ static VALUE grpc_rb_channel_get_target(VALUE self) { } /* Needs to be called under global_connection_polling_mu */ -static int bg_watched_channel_list_lookup(bg_watched_channel *target) { - bg_watched_channel *cur = bg_watched_channel_list_head; +static int bg_watched_channel_list_lookup(bg_watched_channel* target) { + bg_watched_channel* cur = bg_watched_channel_list_head; while (cur != NULL) { if (cur == target) { @@ -507,9 +507,9 @@ static int bg_watched_channel_list_lookup(bg_watched_channel *target) { } /* Needs to be called under global_connection_polling_mu */ -static bg_watched_channel *bg_watched_channel_list_create_and_add( - grpc_channel *channel) { - bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel)); +static bg_watched_channel* bg_watched_channel_list_create_and_add( + grpc_channel* channel) { + bg_watched_channel* watched = gpr_zalloc(sizeof(bg_watched_channel)); watched->channel = channel; watched->next = bg_watched_channel_list_head; @@ -520,8 +520,8 @@ static bg_watched_channel *bg_watched_channel_list_create_and_add( /* Needs to be called under global_connection_polling_mu */ static void bg_watched_channel_list_free_and_remove( - bg_watched_channel *target) { - bg_watched_channel *bg = NULL; + bg_watched_channel* target) { + bg_watched_channel* bg = NULL; GPR_ASSERT(bg_watched_channel_list_lookup(target)); GPR_ASSERT(target->channel_destroyed && target->refcount == 0); @@ -544,10 +544,10 @@ static void bg_watched_channel_list_free_and_remove( /* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push * it onto the background thread for constant watches. */ -static void *channel_init_try_register_connection_polling_without_gil( - void *arg) { - channel_init_try_register_stack *stack = - (channel_init_try_register_stack *)arg; +static void* channel_init_try_register_connection_polling_without_gil( + void* arg) { + channel_init_try_register_stack* stack = + (channel_init_try_register_stack*)arg; gpr_mu_lock(&global_connection_polling_mu); stack->wrapper->bg_wrapped = @@ -559,9 +559,9 @@ static void *channel_init_try_register_connection_polling_without_gil( // Needs to be called under global_connection_poolling_mu static void grpc_rb_channel_try_register_connection_polling( - bg_watched_channel *bg) { + bg_watched_channel* bg) { grpc_connectivity_state conn_state; - watch_state_op *op = NULL; + watch_state_op* op = NULL; GPR_ASSERT(channel_polling_thread_started || abort_channel_polling); @@ -597,10 +597,10 @@ static void grpc_rb_channel_try_register_connection_polling( // indicates process shutdown. // In the worst case, this stops polling channel connectivity // early and falls back to current behavior. -static void *run_poll_channels_loop_no_gil(void *arg) { +static void* run_poll_channels_loop_no_gil(void* arg) { grpc_event event; - watch_state_op *op = NULL; - bg_watched_channel *bg = NULL; + watch_state_op* op = NULL; + bg_watched_channel* bg = NULL; (void)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin"); @@ -618,15 +618,15 @@ static void *run_poll_channels_loop_no_gil(void *arg) { } gpr_mu_lock(&global_connection_polling_mu); if (event.type == GRPC_OP_COMPLETE) { - op = (watch_state_op *)event.tag; + op = (watch_state_op*)event.tag; if (op->op_type == CONTINUOUS_WATCH) { - bg = (bg_watched_channel *)op->op.continuous_watch_callback_args.bg; + bg = (bg_watched_channel*)op->op.continuous_watch_callback_args.bg; bg->refcount--; grpc_rb_channel_try_register_connection_polling(bg); gpr_free(op); } else if (op->op_type == WATCH_STATE_API) { grpc_rb_channel_watch_connection_state_op_complete( - (watch_state_op *)event.tag, event.success); + (watch_state_op*)event.tag, event.success); } else { GPR_ASSERT(0); } @@ -641,8 +641,8 @@ static void *run_poll_channels_loop_no_gil(void *arg) { } // Notify the channel polling loop to cleanup and shutdown. -static void run_poll_channels_loop_unblocking_func(void *arg) { - bg_watched_channel *bg = NULL; +static void run_poll_channels_loop_unblocking_func(void* arg) { + bg_watched_channel* bg = NULL; (void)arg; gpr_mu_lock(&global_connection_polling_mu); @@ -686,8 +686,8 @@ static VALUE run_poll_channels_loop(VALUE arg) { return Qnil; } -static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { - int *stop_waiting = (int *)arg; +static void* wait_until_channel_polling_thread_started_no_gil(void* arg) { + int* stop_waiting = (int*)arg; gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start"); gpr_mu_lock(&global_connection_polling_mu); while (!channel_polling_thread_started && !abort_channel_polling && @@ -701,8 +701,8 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { } static void wait_until_channel_polling_thread_started_unblocking_func( - void *arg) { - int *stop_waiting = (int *)arg; + void* arg) { + int* stop_waiting = (int*)arg; gpr_mu_lock(&global_connection_polling_mu); gpr_log(GPR_DEBUG, "GRPC_RUBY: interrupt wait for channel polling thread to start"); @@ -711,7 +711,7 @@ static void wait_until_channel_polling_thread_started_unblocking_func( gpr_mu_unlock(&global_connection_polling_mu); } -static void *set_abort_channel_polling_without_gil(void *arg) { +static void* set_abort_channel_polling_without_gil(void* arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); abort_channel_polling = 1; @@ -822,8 +822,8 @@ void Init_grpc_channel() { } /* Gets the wrapped channel from the ruby wrapper */ -grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) { - grpc_rb_channel *wrapper = NULL; +grpc_channel* grpc_rb_get_wrapped_channel(VALUE v) { + grpc_rb_channel* wrapper = NULL; TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper); return wrapper->bg_wrapped->channel; } |