diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 317 |
1 files changed, 159 insertions, 158 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index b954e1b879..427df743d6 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -52,27 +52,27 @@ #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ - ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \ +#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \ + ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \ &(subchannel)->connected_subchannel))) typedef struct { grpc_closure closure; - grpc_subchannel *subchannel; + grpc_subchannel* subchannel; grpc_connectivity_state connectivity_state; } state_watcher; typedef struct external_state_watcher { - grpc_subchannel *subchannel; - grpc_pollset_set *pollset_set; - grpc_closure *notify; + grpc_subchannel* subchannel; + grpc_pollset_set* pollset_set; + grpc_closure* notify; grpc_closure closure; - struct external_state_watcher *next; - struct external_state_watcher *prev; + struct external_state_watcher* next; + struct external_state_watcher* prev; } external_state_watcher; struct grpc_subchannel { - grpc_connector *connector; + grpc_connector* connector; /** refcount - lower INTERNAL_REF_BITS bits are for internal references: @@ -82,12 +82,12 @@ struct grpc_subchannel { gpr_atm ref_pair; /** non-transport related channel filters */ - const grpc_channel_filter **filters; + const grpc_channel_filter** filters; size_t num_filters; /** channel arguments */ - grpc_channel_args *args; + grpc_channel_args* args; - grpc_subchannel_key *key; + grpc_subchannel_key* key; /** set during connection */ grpc_connect_out_args connecting_result; @@ -100,7 +100,7 @@ struct grpc_subchannel { /** pollset_set tracking who's interested in a connection being setup */ - grpc_pollset_set *pollset_set; + grpc_pollset_set* pollset_set; /** active connection, or null; of type grpc_connected_subchannel */ gpr_atm connected_subchannel; @@ -130,22 +130,22 @@ struct grpc_subchannel { }; struct grpc_subchannel_call { - grpc_connected_subchannel *connection; - grpc_closure *schedule_closure_after_destroy; + grpc_connected_subchannel* connection; + grpc_closure* schedule_closure_after_destroy; }; -#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) -#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con)) +#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1)) +#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con)) #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ - (((grpc_subchannel_call *)(callstack)) - 1) + (((grpc_subchannel_call*)(callstack)) - 1) -static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, - grpc_error *error); +static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* subchannel, + grpc_error* error); #ifndef NDEBUG #define REF_REASON reason #define REF_MUTATE_EXTRA_ARGS \ - GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose + GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose #define REF_MUTATE_PURPOSE(x) , file, line, reason, x #else #define REF_REASON "" @@ -157,21 +157,21 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel, * connection implementation */ -static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_connected_subchannel *c = (grpc_connected_subchannel *)arg; +static void connection_destroy(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg; grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c)); gpr_free(c); } -grpc_connected_subchannel *grpc_connected_subchannel_ref( - grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_connected_subchannel* grpc_connected_subchannel_ref( + grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); return c; } -void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *c +void grpc_connected_subchannel_unref(grpc_exec_ctx* exec_ctx, + grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); @@ -181,10 +181,10 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, * grpc_subchannel implementation */ -static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_subchannel *c = (grpc_subchannel *)arg; - gpr_free((void *)c->filters); +static void subchannel_destroy(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_subchannel* c = (grpc_subchannel*)arg; + gpr_free((void*)c->filters); grpc_channel_args_destroy(exec_ctx, c->args); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); @@ -194,7 +194,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, +static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta, int barrier REF_MUTATE_EXTRA_ARGS) { gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); @@ -208,8 +208,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -grpc_subchannel *grpc_subchannel_ref( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel* grpc_subchannel_ref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); @@ -217,16 +217,16 @@ grpc_subchannel *grpc_subchannel_ref( return c; } -grpc_subchannel *grpc_subchannel_weak_ref( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel* grpc_subchannel_weak_ref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); return c; } -grpc_subchannel *grpc_subchannel_ref_from_weak_ref( - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel* grpc_subchannel_ref_from_weak_ref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { if (!c) return NULL; for (;;) { gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); @@ -241,8 +241,8 @@ grpc_subchannel *grpc_subchannel_ref_from_weak_ref( } } -static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { - grpc_connected_subchannel *con; +static void disconnect(grpc_exec_ctx* exec_ctx, grpc_subchannel* c) { + grpc_connected_subchannel* con; grpc_subchannel_index_unregister(exec_ctx, c->key, c); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->disconnected); @@ -258,8 +258,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { gpr_mu_unlock(&c->mu); } -void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_unref(grpc_exec_ctx* exec_ctx, + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; // add a weak ref and subtract a strong ref (atomically) old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS), @@ -270,56 +270,57 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref"); } -void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c +void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx, + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(subchannel_destroy, c, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } } -grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, - grpc_connector *connector, - const grpc_subchannel_args *args) { - grpc_subchannel_key *key = grpc_subchannel_key_create(args); - grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key); +grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx, + grpc_connector* connector, + const grpc_subchannel_args* args) { + grpc_subchannel_key* key = grpc_subchannel_key_create(args); + grpc_subchannel* c = grpc_subchannel_index_find(exec_ctx, key); if (c) { grpc_subchannel_key_destroy(exec_ctx, key); return c; } GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx); - c = (grpc_subchannel *)gpr_zalloc(sizeof(*c)); + c = (grpc_subchannel*)gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; grpc_connector_ref(c->connector); c->num_filters = args->filter_count; if (c->num_filters > 0) { - c->filters = (const grpc_channel_filter **)gpr_malloc( - sizeof(grpc_channel_filter *) * c->num_filters); - memcpy((void *)c->filters, args->filters, - sizeof(grpc_channel_filter *) * c->num_filters); + c->filters = (const grpc_channel_filter**)gpr_malloc( + sizeof(grpc_channel_filter*) * c->num_filters); + memcpy((void*)c->filters, args->filters, + sizeof(grpc_channel_filter*) * c->num_filters); } else { c->filters = NULL; } c->pollset_set = grpc_pollset_set_create(); - grpc_resolved_address *addr = - (grpc_resolved_address *)gpr_malloc(sizeof(*addr)); + grpc_resolved_address* addr = + (grpc_resolved_address*)gpr_malloc(sizeof(*addr)); grpc_get_subchannel_address_arg(exec_ctx, args->args, addr); - grpc_resolved_address *new_address = NULL; - grpc_channel_args *new_args = NULL; + grpc_resolved_address* new_address = NULL; + grpc_channel_args* new_args = NULL; if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address, &new_args)) { GPR_ASSERT(new_address != NULL); gpr_free(addr); addr = new_address; } - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); c->args = grpc_channel_args_copy_and_add_and_remove( @@ -375,8 +376,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return grpc_subchannel_index_register(exec_ctx, key, c); } -static void continue_connect_locked(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c) { +static void continue_connect_locked(grpc_exec_ctx* exec_ctx, + grpc_subchannel* c) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; @@ -390,8 +391,8 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx, &c->connected); } -grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, - grpc_error **error) { +grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c, + grpc_error** error) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); state = grpc_connectivity_state_get(&c->state_tracker, error); @@ -399,10 +400,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, return state; } -static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - external_state_watcher *w = (external_state_watcher *)arg; - grpc_closure *follow_up = w->notify; +static void on_external_state_watcher_done(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + external_state_watcher* w = (external_state_watcher*)arg; + grpc_closure* follow_up = w->notify; if (w->pollset_set != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set, w->pollset_set); @@ -416,8 +417,8 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } -static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_subchannel *c = (grpc_subchannel *)arg; +static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_subchannel* c = (grpc_subchannel*)arg; gpr_mu_lock(&c->mu); c->have_alarm = false; if (c->disconnected) { @@ -438,8 +439,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_ERROR_UNREF(error); } -static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c) { +static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx, + grpc_subchannel* c) { if (c->disconnected) { /* Don't try to connect if we're already disconnected */ return; @@ -484,10 +485,10 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, } void grpc_subchannel_notify_on_state_change( - grpc_exec_ctx *exec_ctx, grpc_subchannel *c, - grpc_pollset_set *interested_parties, grpc_connectivity_state *state, - grpc_closure *notify) { - external_state_watcher *w; + grpc_exec_ctx* exec_ctx, grpc_subchannel* c, + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* notify) { + external_state_watcher* w; if (state == NULL) { gpr_mu_lock(&c->mu); @@ -500,7 +501,7 @@ void grpc_subchannel_notify_on_state_change( } gpr_mu_unlock(&c->mu); } else { - w = (external_state_watcher *)gpr_malloc(sizeof(*w)); + w = (external_state_watcher*)gpr_malloc(sizeof(*w)); w->subchannel = c; w->pollset_set = interested_parties; w->notify = notify; @@ -523,18 +524,18 @@ void grpc_subchannel_notify_on_state_change( } void grpc_connected_subchannel_process_transport_op( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_transport_op *op) { - grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); - grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); + grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, + grpc_transport_op* op) { + grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0); top_elem->filter->start_transport_op(exec_ctx, top_elem, op); } -static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, - grpc_error *error) { - state_watcher *sw = (state_watcher *)p; - grpc_subchannel *c = sw->subchannel; - gpr_mu *mu = &c->mu; +static void subchannel_on_child_state_changed(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + state_watcher* sw = (state_watcher*)p; + grpc_subchannel* c = sw->subchannel; + gpr_mu* mu = &c->mu; gpr_mu_lock(mu); @@ -559,13 +560,13 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p, gpr_free(sw); } -static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *con, - grpc_pollset_set *interested_parties, - grpc_connectivity_state *state, - grpc_closure *closure) { - grpc_transport_op *op = grpc_make_transport_op(NULL); - grpc_channel_element *elem; +static void connected_subchannel_state_op(grpc_exec_ctx* exec_ctx, + grpc_connected_subchannel* con, + grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(NULL); + grpc_channel_element* elem; op->connectivity_state = state; op->on_connectivity_state_change = closure; op->bind_pollset_set = interested_parties; @@ -574,31 +575,31 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, } void grpc_connected_subchannel_notify_on_state_change( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_pollset_set *interested_parties, grpc_connectivity_state *state, - grpc_closure *closure) { + grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { connected_subchannel_state_op(exec_ctx, con, interested_parties, state, closure); } -void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, - grpc_connected_subchannel *con, - grpc_closure *closure) { - grpc_transport_op *op = grpc_make_transport_op(NULL); - grpc_channel_element *elem; +void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, + grpc_connected_subchannel* con, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(NULL); + grpc_channel_element* elem; op->send_ping = closure; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem->filter->start_transport_op(exec_ctx, elem, op); } -static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c) { - grpc_connected_subchannel *con; - grpc_channel_stack *stk; - state_watcher *sw_subchannel; +static bool publish_transport_locked(grpc_exec_ctx* exec_ctx, + grpc_subchannel* c) { + grpc_connected_subchannel* con; + grpc_channel_stack* stk; + state_watcher* sw_subchannel; /* construct channel stack */ - grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( exec_ctx, builder, c->connecting_result.channel_args); grpc_channel_stack_builder_set_transport(builder, @@ -609,8 +610,8 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder_destroy(exec_ctx, builder); return false; } - grpc_error *error = grpc_channel_stack_builder_finish( - exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con); + grpc_error* error = grpc_channel_stack_builder_finish( + exec_ctx, builder, 0, 1, connection_destroy, NULL, (void**)&con); if (error != GRPC_ERROR_NONE) { grpc_transport_destroy(exec_ctx, c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", @@ -622,7 +623,7 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ - sw_subchannel = (state_watcher *)gpr_malloc(sizeof(*sw_subchannel)); + sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel)); sw_subchannel->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed, @@ -657,10 +658,10 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, return true; } -static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_subchannel *c = (grpc_subchannel *)arg; - grpc_channel_args *delete_channel_args = c->connecting_result.channel_args; +static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + grpc_subchannel* c = (grpc_subchannel*)arg; + grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); gpr_mu_lock(&c->mu); @@ -678,7 +679,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), "connect_failed"); - const char *errmsg = grpc_error_string(error); + const char* errmsg = grpc_error_string(error); gpr_log(GPR_INFO, "Connect failed: %s", errmsg); maybe_start_connecting_locked(exec_ctx, c); @@ -693,65 +694,65 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, * grpc_subchannel_call implementation */ -static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, - grpc_error *error) { - grpc_subchannel_call *c = (grpc_subchannel_call *)call; +static void subchannel_call_destroy(grpc_exec_ctx* exec_ctx, void* call, + grpc_error* error) { + grpc_subchannel_call* c = (grpc_subchannel_call*)call; GPR_ASSERT(c->schedule_closure_after_destroy != NULL); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); - grpc_connected_subchannel *connection = c->connection; + grpc_connected_subchannel* connection = c->connection; grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } -void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call, - grpc_closure *closure) { +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, + grpc_closure* closure) { GPR_ASSERT(call->schedule_closure_after_destroy == NULL); GPR_ASSERT(closure != NULL); call->schedule_closure_after_destroy = closure; } void grpc_subchannel_call_ref( - grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } -void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call *c +void grpc_subchannel_call_unref(grpc_exec_ctx* exec_ctx, + grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } -void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, - grpc_subchannel_call *call, - grpc_transport_stream_op_batch *batch) { +void grpc_subchannel_call_process_op(grpc_exec_ctx* exec_ctx, + grpc_subchannel_call* call, + grpc_transport_stream_op_batch* batch) { GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0); - grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); - grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); + grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); + grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, batch); GPR_TIMER_END("grpc_subchannel_call_process_op", 0); } -grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( - grpc_subchannel *c) { +grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel( + grpc_subchannel* c) { return GET_CONNECTED_SUBCHANNEL(c, acq); } -const grpc_subchannel_key *grpc_subchannel_get_key( - const grpc_subchannel *subchannel) { +const grpc_subchannel_key* grpc_subchannel_get_key( + const grpc_subchannel* subchannel) { return subchannel->key; } -grpc_error *grpc_connected_subchannel_create_call( - grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - const grpc_connected_subchannel_call_args *args, - grpc_subchannel_call **call) { - grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = (grpc_subchannel_call *)gpr_arena_alloc( +grpc_error* grpc_connected_subchannel_create_call( + grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, + const grpc_connected_subchannel_call_args* args, + grpc_subchannel_call** call) { + grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con); + *call = (grpc_subchannel_call*)gpr_arena_alloc( args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); - grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); const grpc_call_element_args call_args = { callstk, /* call_stack */ @@ -763,10 +764,10 @@ grpc_error *grpc_connected_subchannel_create_call( args->arena, /* arena */ args->call_combiner /* call_combiner */ }; - grpc_error *error = grpc_call_stack_init( + grpc_error* error = grpc_call_stack_init( exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { - const char *error_string = grpc_error_string(error); + const char* error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); return error; } @@ -774,39 +775,39 @@ grpc_error *grpc_connected_subchannel_create_call( return GRPC_ERROR_NONE; } -grpc_call_stack *grpc_subchannel_call_get_call_stack( - grpc_subchannel_call *subchannel_call) { +grpc_call_stack* grpc_subchannel_call_get_call_stack( + grpc_subchannel_call* subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } -static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str, - grpc_resolved_address *addr) { - grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */); +static void grpc_uri_to_sockaddr(grpc_exec_ctx* exec_ctx, const char* uri_str, + grpc_resolved_address* addr) { + grpc_uri* uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */); GPR_ASSERT(uri != NULL); if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); grpc_uri_destroy(uri); } -void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx, - const grpc_channel_args *args, - grpc_resolved_address *addr) { - const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args); +void grpc_get_subchannel_address_arg(grpc_exec_ctx* exec_ctx, + const grpc_channel_args* args, + grpc_resolved_address* addr) { + const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args); memset(addr, 0, sizeof(*addr)); if (*addr_uri_str != '\0') { grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr); } } -const char *grpc_get_subchannel_address_uri_arg(const grpc_channel_args *args) { - const grpc_arg *addr_arg = +const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { + const grpc_arg* addr_arg = grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy. GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING); return addr_arg->value.string; } -grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) { +grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { return grpc_channel_arg_string_create( - (char *)GRPC_ARG_SUBCHANNEL_ADDRESS, + (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } |