aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby/ext/grpc/rb_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby/ext/grpc/rb_channel.c')
-rw-r--r--src/ruby/ext/grpc/rb_channel.c174
1 files changed, 87 insertions, 87 deletions
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index f0af54d9e5..1d11a53aa7 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -54,9 +54,9 @@ static VALUE grpc_rb_cChannel = Qnil;
static VALUE grpc_rb_cChannelArgs;
typedef struct bg_watched_channel {
- grpc_channel *channel;
+ grpc_channel* channel;
// these fields must only be accessed under global_connection_polling_mu
- struct bg_watched_channel *next;
+ struct bg_watched_channel* next;
int channel_destroyed;
int refcount;
} bg_watched_channel;
@@ -67,7 +67,7 @@ typedef struct grpc_rb_channel {
/* The actual channel (protected in a wrapper to tell when it's safe to
* destroy) */
- bg_watched_channel *bg_wrapped;
+ bg_watched_channel* bg_wrapped;
} grpc_rb_channel;
typedef enum { CONTINUOUS_WATCH, WATCH_STATE_API } watch_state_op_type;
@@ -82,40 +82,40 @@ typedef struct watch_state_op {
int called_back;
} api_callback_args;
struct {
- bg_watched_channel *bg;
+ bg_watched_channel* bg;
} continuous_watch_callback_args;
} op;
} watch_state_op;
-static bg_watched_channel *bg_watched_channel_list_head = NULL;
+static bg_watched_channel* bg_watched_channel_list_head = NULL;
static void grpc_rb_channel_try_register_connection_polling(
- bg_watched_channel *bg);
-static void *wait_until_channel_polling_thread_started_no_gil(void *);
-static void wait_until_channel_polling_thread_started_unblocking_func(void *);
-static void *channel_init_try_register_connection_polling_without_gil(
- void *arg);
+ bg_watched_channel* bg);
+static void* wait_until_channel_polling_thread_started_no_gil(void*);
+static void wait_until_channel_polling_thread_started_unblocking_func(void*);
+static void* channel_init_try_register_connection_polling_without_gil(
+ void* arg);
typedef struct channel_init_try_register_stack {
- grpc_channel *channel;
- grpc_rb_channel *wrapper;
+ grpc_channel* channel;
+ grpc_rb_channel* wrapper;
} channel_init_try_register_stack;
-static grpc_completion_queue *channel_polling_cq;
+static grpc_completion_queue* channel_polling_cq;
static gpr_mu global_connection_polling_mu;
static gpr_cv global_connection_polling_cv;
static int abort_channel_polling = 0;
static int channel_polling_thread_started = 0;
-static int bg_watched_channel_list_lookup(bg_watched_channel *bg);
-static bg_watched_channel *bg_watched_channel_list_create_and_add(
- grpc_channel *channel);
-static void bg_watched_channel_list_free_and_remove(bg_watched_channel *bg);
-static void run_poll_channels_loop_unblocking_func(void *arg);
+static int bg_watched_channel_list_lookup(bg_watched_channel* bg);
+static bg_watched_channel* bg_watched_channel_list_create_and_add(
+ grpc_channel* channel);
+static void bg_watched_channel_list_free_and_remove(bg_watched_channel* bg);
+static void run_poll_channels_loop_unblocking_func(void* arg);
// Needs to be called under global_connection_polling_mu
static void grpc_rb_channel_watch_connection_state_op_complete(
- watch_state_op *op, int success) {
+ watch_state_op* op, int success) {
GPR_ASSERT(!op->op.api_callback_args.called_back);
op->op.api_callback_args.called_back = 1;
op->op.api_callback_args.success = success;
@@ -124,7 +124,7 @@ static void grpc_rb_channel_watch_connection_state_op_complete(
}
/* Avoids destroying a channel twice. */
-static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
+static void grpc_rb_channel_safe_destroy(bg_watched_channel* bg) {
gpr_mu_lock(&global_connection_polling_mu);
GPR_ASSERT(bg_watched_channel_list_lookup(bg));
if (!bg->channel_destroyed) {
@@ -138,18 +138,18 @@ static void grpc_rb_channel_safe_destroy(bg_watched_channel *bg) {
gpr_mu_unlock(&global_connection_polling_mu);
}
-static void *channel_safe_destroy_without_gil(void *arg) {
- grpc_rb_channel_safe_destroy((bg_watched_channel *)arg);
+static void* channel_safe_destroy_without_gil(void* arg) {
+ grpc_rb_channel_safe_destroy((bg_watched_channel*)arg);
return NULL;
}
/* Destroys Channel instances. */
-static void grpc_rb_channel_free(void *p) {
- grpc_rb_channel *ch = NULL;
+static void grpc_rb_channel_free(void* p) {
+ grpc_rb_channel* ch = NULL;
if (p == NULL) {
return;
};
- ch = (grpc_rb_channel *)p;
+ ch = (grpc_rb_channel*)p;
if (ch->bg_wrapped != NULL) {
/* assumption made here: it's ok to directly gpr_mu_lock the global
@@ -164,12 +164,12 @@ static void grpc_rb_channel_free(void *p) {
}
/* Protects the mark object from GC */
-static void grpc_rb_channel_mark(void *p) {
- grpc_rb_channel *channel = NULL;
+static void grpc_rb_channel_mark(void* p) {
+ grpc_rb_channel* channel = NULL;
if (p == NULL) {
return;
}
- channel = (grpc_rb_channel *)p;
+ channel = (grpc_rb_channel*)p;
if (channel->credentials != Qnil) {
rb_gc_mark(channel->credentials);
}
@@ -189,7 +189,7 @@ static rb_data_type_t grpc_channel_data_type = {"grpc_channel",
/* Allocates grpc_rb_channel instances. */
static VALUE grpc_rb_channel_alloc(VALUE cls) {
- grpc_rb_channel *wrapper = ALLOC(grpc_rb_channel);
+ grpc_rb_channel* wrapper = ALLOC(grpc_rb_channel);
wrapper->bg_wrapped = NULL;
wrapper->credentials = Qnil;
return TypedData_Wrap_Struct(cls, &grpc_channel_data_type, wrapper);
@@ -203,14 +203,14 @@ static VALUE grpc_rb_channel_alloc(VALUE cls) {
secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds)
Creates channel instances. */
-static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
+static VALUE grpc_rb_channel_init(int argc, VALUE* argv, VALUE self) {
VALUE channel_args = Qnil;
VALUE credentials = Qnil;
VALUE target = Qnil;
- grpc_rb_channel *wrapper = NULL;
- grpc_channel *ch = NULL;
- grpc_channel_credentials *creds = NULL;
- char *target_chars = NULL;
+ grpc_rb_channel* wrapper = NULL;
+ grpc_channel* ch = NULL;
+ grpc_channel_credentials* creds = NULL;
+ char* target_chars = NULL;
grpc_channel_args args;
channel_init_try_register_stack stack;
int stop_waiting_for_thread_start = 0;
@@ -262,13 +262,13 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
}
typedef struct get_state_stack {
- bg_watched_channel *bg;
+ bg_watched_channel* bg;
int try_to_connect;
int out;
} get_state_stack;
-static void *get_state_without_gil(void *arg) {
- get_state_stack *stack = (get_state_stack *)arg;
+static void* get_state_without_gil(void* arg) {
+ get_state_stack* stack = (get_state_stack*)arg;
gpr_mu_lock(&global_connection_polling_mu);
GPR_ASSERT(abort_channel_polling || channel_polling_thread_started);
@@ -292,10 +292,10 @@ static void *get_state_without_gil(void *arg) {
constants defined in GRPC::Core::ConnectivityStates.
It also tries to connect if the chennel is idle in the second form. */
-static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
+static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE* argv,
VALUE self) {
VALUE try_to_connect_param = Qfalse;
- grpc_rb_channel *wrapper = NULL;
+ grpc_rb_channel* wrapper = NULL;
get_state_stack stack;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
@@ -315,22 +315,22 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
}
typedef struct watch_state_stack {
- grpc_channel *channel;
+ grpc_channel* channel;
gpr_timespec deadline;
int last_state;
} watch_state_stack;
-static void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
- watch_state_stack *stack = (watch_state_stack *)arg;
- watch_state_op *op = NULL;
- void *success = (void *)0;
+static void* wait_for_watch_state_op_complete_without_gvl(void* arg) {
+ watch_state_stack* stack = (watch_state_stack*)arg;
+ watch_state_op* op = NULL;
+ void* success = (void*)0;
gpr_mu_lock(&global_connection_polling_mu);
// its unsafe to do a "watch" after "channel polling abort" because the cq has
// been shut down.
if (abort_channel_polling) {
gpr_mu_unlock(&global_connection_polling_mu);
- return (void *)0;
+ return (void*)0;
}
op = gpr_zalloc(sizeof(watch_state_op));
op->op_type = WATCH_STATE_API;
@@ -343,15 +343,15 @@ static void *wait_for_watch_state_op_complete_without_gvl(void *arg) {
gpr_inf_future(GPR_CLOCK_REALTIME));
}
if (op->op.api_callback_args.success) {
- success = (void *)1;
+ success = (void*)1;
}
gpr_free(op);
gpr_mu_unlock(&global_connection_polling_mu);
return success;
}
-static void wait_for_watch_state_op_complete_unblocking_func(void *arg) {
- bg_watched_channel *bg = (bg_watched_channel *)arg;
+static void wait_for_watch_state_op_complete_unblocking_func(void* arg) {
+ bg_watched_channel* bg = (bg_watched_channel*)arg;
gpr_mu_lock(&global_connection_polling_mu);
if (!bg->channel_destroyed) {
grpc_channel_destroy(bg->channel);
@@ -370,9 +370,9 @@ static void wait_for_watch_state_op_complete_unblocking_func(void *arg) {
static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE last_state,
VALUE deadline) {
- grpc_rb_channel *wrapper = NULL;
+ grpc_rb_channel* wrapper = NULL;
watch_state_stack stack;
- void *op_success = 0;
+ void* op_success = 0;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@@ -405,15 +405,15 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
VALUE method, VALUE host,
VALUE deadline) {
VALUE res = Qnil;
- grpc_rb_channel *wrapper = NULL;
- grpc_call *call = NULL;
- grpc_call *parent_call = NULL;
- grpc_completion_queue *cq = NULL;
+ grpc_rb_channel* wrapper = NULL;
+ grpc_call* call = NULL;
+ grpc_call* parent_call = NULL;
+ grpc_completion_queue* cq = NULL;
int flags = GRPC_PROPAGATE_DEFAULTS;
grpc_slice method_slice;
grpc_slice host_slice;
- grpc_slice *host_slice_ptr = NULL;
- char *tmp_str = NULL;
+ grpc_slice* host_slice_ptr = NULL;
+ char* tmp_str = NULL;
if (host != Qnil) {
host_slice =
@@ -466,7 +466,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent, VALUE mask,
/* Note this is an API-level call; a wrapped channel's finalizer doesn't call
* this */
static VALUE grpc_rb_channel_destroy(VALUE self) {
- grpc_rb_channel *wrapper = NULL;
+ grpc_rb_channel* wrapper = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
if (wrapper->bg_wrapped != NULL) {
@@ -480,9 +480,9 @@ static VALUE grpc_rb_channel_destroy(VALUE self) {
/* Called to obtain the target that this channel accesses. */
static VALUE grpc_rb_channel_get_target(VALUE self) {
- grpc_rb_channel *wrapper = NULL;
+ grpc_rb_channel* wrapper = NULL;
VALUE res = Qnil;
- char *target = NULL;
+ char* target = NULL;
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
target = grpc_channel_get_target(wrapper->bg_wrapped->channel);
@@ -493,8 +493,8 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
}
/* Needs to be called under global_connection_polling_mu */
-static int bg_watched_channel_list_lookup(bg_watched_channel *target) {
- bg_watched_channel *cur = bg_watched_channel_list_head;
+static int bg_watched_channel_list_lookup(bg_watched_channel* target) {
+ bg_watched_channel* cur = bg_watched_channel_list_head;
while (cur != NULL) {
if (cur == target) {
@@ -507,9 +507,9 @@ static int bg_watched_channel_list_lookup(bg_watched_channel *target) {
}
/* Needs to be called under global_connection_polling_mu */
-static bg_watched_channel *bg_watched_channel_list_create_and_add(
- grpc_channel *channel) {
- bg_watched_channel *watched = gpr_zalloc(sizeof(bg_watched_channel));
+static bg_watched_channel* bg_watched_channel_list_create_and_add(
+ grpc_channel* channel) {
+ bg_watched_channel* watched = gpr_zalloc(sizeof(bg_watched_channel));
watched->channel = channel;
watched->next = bg_watched_channel_list_head;
@@ -520,8 +520,8 @@ static bg_watched_channel *bg_watched_channel_list_create_and_add(
/* Needs to be called under global_connection_polling_mu */
static void bg_watched_channel_list_free_and_remove(
- bg_watched_channel *target) {
- bg_watched_channel *bg = NULL;
+ bg_watched_channel* target) {
+ bg_watched_channel* bg = NULL;
GPR_ASSERT(bg_watched_channel_list_lookup(target));
GPR_ASSERT(target->channel_destroyed && target->refcount == 0);
@@ -544,10 +544,10 @@ static void bg_watched_channel_list_free_and_remove(
/* Initialize a grpc_rb_channel's "protected grpc_channel" and try to push
* it onto the background thread for constant watches. */
-static void *channel_init_try_register_connection_polling_without_gil(
- void *arg) {
- channel_init_try_register_stack *stack =
- (channel_init_try_register_stack *)arg;
+static void* channel_init_try_register_connection_polling_without_gil(
+ void* arg) {
+ channel_init_try_register_stack* stack =
+ (channel_init_try_register_stack*)arg;
gpr_mu_lock(&global_connection_polling_mu);
stack->wrapper->bg_wrapped =
@@ -559,9 +559,9 @@ static void *channel_init_try_register_connection_polling_without_gil(
// Needs to be called under global_connection_poolling_mu
static void grpc_rb_channel_try_register_connection_polling(
- bg_watched_channel *bg) {
+ bg_watched_channel* bg) {
grpc_connectivity_state conn_state;
- watch_state_op *op = NULL;
+ watch_state_op* op = NULL;
GPR_ASSERT(channel_polling_thread_started || abort_channel_polling);
@@ -597,10 +597,10 @@ static void grpc_rb_channel_try_register_connection_polling(
// indicates process shutdown.
// In the worst case, this stops polling channel connectivity
// early and falls back to current behavior.
-static void *run_poll_channels_loop_no_gil(void *arg) {
+static void* run_poll_channels_loop_no_gil(void* arg) {
grpc_event event;
- watch_state_op *op = NULL;
- bg_watched_channel *bg = NULL;
+ watch_state_op* op = NULL;
+ bg_watched_channel* bg = NULL;
(void)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin");
@@ -618,15 +618,15 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
}
gpr_mu_lock(&global_connection_polling_mu);
if (event.type == GRPC_OP_COMPLETE) {
- op = (watch_state_op *)event.tag;
+ op = (watch_state_op*)event.tag;
if (op->op_type == CONTINUOUS_WATCH) {
- bg = (bg_watched_channel *)op->op.continuous_watch_callback_args.bg;
+ bg = (bg_watched_channel*)op->op.continuous_watch_callback_args.bg;
bg->refcount--;
grpc_rb_channel_try_register_connection_polling(bg);
gpr_free(op);
} else if (op->op_type == WATCH_STATE_API) {
grpc_rb_channel_watch_connection_state_op_complete(
- (watch_state_op *)event.tag, event.success);
+ (watch_state_op*)event.tag, event.success);
} else {
GPR_ASSERT(0);
}
@@ -641,8 +641,8 @@ static void *run_poll_channels_loop_no_gil(void *arg) {
}
// Notify the channel polling loop to cleanup and shutdown.
-static void run_poll_channels_loop_unblocking_func(void *arg) {
- bg_watched_channel *bg = NULL;
+static void run_poll_channels_loop_unblocking_func(void* arg) {
+ bg_watched_channel* bg = NULL;
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
@@ -686,8 +686,8 @@ static VALUE run_poll_channels_loop(VALUE arg) {
return Qnil;
}
-static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
- int *stop_waiting = (int *)arg;
+static void* wait_until_channel_polling_thread_started_no_gil(void* arg) {
+ int* stop_waiting = (int*)arg;
gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start");
gpr_mu_lock(&global_connection_polling_mu);
while (!channel_polling_thread_started && !abort_channel_polling &&
@@ -701,8 +701,8 @@ static void *wait_until_channel_polling_thread_started_no_gil(void *arg) {
}
static void wait_until_channel_polling_thread_started_unblocking_func(
- void *arg) {
- int *stop_waiting = (int *)arg;
+ void* arg) {
+ int* stop_waiting = (int*)arg;
gpr_mu_lock(&global_connection_polling_mu);
gpr_log(GPR_DEBUG,
"GRPC_RUBY: interrupt wait for channel polling thread to start");
@@ -711,7 +711,7 @@ static void wait_until_channel_polling_thread_started_unblocking_func(
gpr_mu_unlock(&global_connection_polling_mu);
}
-static void *set_abort_channel_polling_without_gil(void *arg) {
+static void* set_abort_channel_polling_without_gil(void* arg) {
(void)arg;
gpr_mu_lock(&global_connection_polling_mu);
abort_channel_polling = 1;
@@ -822,8 +822,8 @@ void Init_grpc_channel() {
}
/* Gets the wrapped channel from the ruby wrapper */
-grpc_channel *grpc_rb_get_wrapped_channel(VALUE v) {
- grpc_rb_channel *wrapper = NULL;
+grpc_channel* grpc_rb_get_wrapped_channel(VALUE v) {
+ grpc_rb_channel* wrapper = NULL;
TypedData_Get_Struct(v, grpc_rb_channel, &grpc_channel_data_type, wrapper);
return wrapper->bg_wrapped->channel;
}