aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/subchannel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc317
1 files changed, 159 insertions, 158 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index b954e1b879..427df743d6 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -52,27 +52,27 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
- ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load( \
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_connected_subchannel*)(gpr_atm_##barrier##_load( \
&(subchannel)->connected_subchannel)))
typedef struct {
grpc_closure closure;
- grpc_subchannel *subchannel;
+ grpc_subchannel* subchannel;
grpc_connectivity_state connectivity_state;
} state_watcher;
typedef struct external_state_watcher {
- grpc_subchannel *subchannel;
- grpc_pollset_set *pollset_set;
- grpc_closure *notify;
+ grpc_subchannel* subchannel;
+ grpc_pollset_set* pollset_set;
+ grpc_closure* notify;
grpc_closure closure;
- struct external_state_watcher *next;
- struct external_state_watcher *prev;
+ struct external_state_watcher* next;
+ struct external_state_watcher* prev;
} external_state_watcher;
struct grpc_subchannel {
- grpc_connector *connector;
+ grpc_connector* connector;
/** refcount
- lower INTERNAL_REF_BITS bits are for internal references:
@@ -82,12 +82,12 @@ struct grpc_subchannel {
gpr_atm ref_pair;
/** non-transport related channel filters */
- const grpc_channel_filter **filters;
+ const grpc_channel_filter** filters;
size_t num_filters;
/** channel arguments */
- grpc_channel_args *args;
+ grpc_channel_args* args;
- grpc_subchannel_key *key;
+ grpc_subchannel_key* key;
/** set during connection */
grpc_connect_out_args connecting_result;
@@ -100,7 +100,7 @@ struct grpc_subchannel {
/** pollset_set tracking who's interested in a connection
being setup */
- grpc_pollset_set *pollset_set;
+ grpc_pollset_set* pollset_set;
/** active connection, or null; of type grpc_connected_subchannel */
gpr_atm connected_subchannel;
@@ -130,22 +130,22 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- grpc_connected_subchannel *connection;
- grpc_closure *schedule_closure_after_destroy;
+ grpc_connected_subchannel* connection;
+ grpc_closure* schedule_closure_after_destroy;
};
-#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
-#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)(con))
+#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack*)((call) + 1))
+#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack*)(con))
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
- (((grpc_subchannel_call *)(callstack)) - 1)
+ (((grpc_subchannel_call*)(callstack)) - 1)
-static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
- grpc_error *error);
+static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* subchannel,
+ grpc_error* error);
#ifndef NDEBUG
#define REF_REASON reason
#define REF_MUTATE_EXTRA_ARGS \
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char *purpose
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
#define REF_MUTATE_PURPOSE(x) , file, line, reason, x
#else
#define REF_REASON ""
@@ -157,21 +157,21 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
* connection implementation
*/
-static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_connected_subchannel *c = (grpc_connected_subchannel *)arg;
+static void connection_destroy(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_connected_subchannel* c = (grpc_connected_subchannel*)arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
}
-grpc_connected_subchannel *grpc_connected_subchannel_ref(
- grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_connected_subchannel* grpc_connected_subchannel_ref(
+ grpc_connected_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON);
return c;
}
-void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
- grpc_connected_subchannel *c
+void grpc_connected_subchannel_unref(grpc_exec_ctx* exec_ctx,
+ grpc_connected_subchannel* c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CHANNEL_STACK_UNREF(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c),
REF_REASON);
@@ -181,10 +181,10 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
* grpc_subchannel implementation
*/
-static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_subchannel *c = (grpc_subchannel *)arg;
- gpr_free((void *)c->filters);
+static void subchannel_destroy(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_subchannel* c = (grpc_subchannel*)arg;
+ gpr_free((void*)c->filters);
grpc_channel_args_destroy(exec_ctx, c->args);
grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
grpc_connector_unref(exec_ctx, c->connector);
@@ -194,7 +194,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(c);
}
-static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
+static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta,
int barrier REF_MUTATE_EXTRA_ARGS) {
gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
@@ -208,8 +208,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
return old_val;
}
-grpc_subchannel *grpc_subchannel_ref(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_subchannel* grpc_subchannel_ref(
+ grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
0 REF_MUTATE_PURPOSE("STRONG_REF"));
@@ -217,16 +217,16 @@ grpc_subchannel *grpc_subchannel_ref(
return c;
}
-grpc_subchannel *grpc_subchannel_weak_ref(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_subchannel* grpc_subchannel_weak_ref(
+ grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
GPR_ASSERT(old_refs != 0);
return c;
}
-grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
+ grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
if (!c) return NULL;
for (;;) {
gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
@@ -241,8 +241,8 @@ grpc_subchannel *grpc_subchannel_ref_from_weak_ref(
}
}
-static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- grpc_connected_subchannel *con;
+static void disconnect(grpc_exec_ctx* exec_ctx, grpc_subchannel* c) {
+ grpc_connected_subchannel* con;
grpc_subchannel_index_unregister(exec_ctx, c->key, c);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->disconnected);
@@ -258,8 +258,8 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_mu_unlock(&c->mu);
}
-void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_subchannel_unref(grpc_exec_ctx* exec_ctx,
+ grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
// add a weak ref and subtract a strong ref (atomically)
old_refs = ref_mutate(c, (gpr_atm)1 - (gpr_atm)(1 << INTERNAL_REF_BITS),
@@ -270,56 +270,57 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "strong-unref");
}
-void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c
+void grpc_subchannel_weak_unref(grpc_exec_ctx* exec_ctx,
+ grpc_subchannel* c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_atm old_refs;
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
- GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(subchannel_destroy, c,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(
+ exec_ctx,
+ GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
}
-grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
- grpc_connector *connector,
- const grpc_subchannel_args *args) {
- grpc_subchannel_key *key = grpc_subchannel_key_create(args);
- grpc_subchannel *c = grpc_subchannel_index_find(exec_ctx, key);
+grpc_subchannel* grpc_subchannel_create(grpc_exec_ctx* exec_ctx,
+ grpc_connector* connector,
+ const grpc_subchannel_args* args) {
+ grpc_subchannel_key* key = grpc_subchannel_key_create(args);
+ grpc_subchannel* c = grpc_subchannel_index_find(exec_ctx, key);
if (c) {
grpc_subchannel_key_destroy(exec_ctx, key);
return c;
}
GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(exec_ctx);
- c = (grpc_subchannel *)gpr_zalloc(sizeof(*c));
+ c = (grpc_subchannel*)gpr_zalloc(sizeof(*c));
c->key = key;
gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
c->connector = connector;
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
if (c->num_filters > 0) {
- c->filters = (const grpc_channel_filter **)gpr_malloc(
- sizeof(grpc_channel_filter *) * c->num_filters);
- memcpy((void *)c->filters, args->filters,
- sizeof(grpc_channel_filter *) * c->num_filters);
+ c->filters = (const grpc_channel_filter**)gpr_malloc(
+ sizeof(grpc_channel_filter*) * c->num_filters);
+ memcpy((void*)c->filters, args->filters,
+ sizeof(grpc_channel_filter*) * c->num_filters);
} else {
c->filters = NULL;
}
c->pollset_set = grpc_pollset_set_create();
- grpc_resolved_address *addr =
- (grpc_resolved_address *)gpr_malloc(sizeof(*addr));
+ grpc_resolved_address* addr =
+ (grpc_resolved_address*)gpr_malloc(sizeof(*addr));
grpc_get_subchannel_address_arg(exec_ctx, args->args, addr);
- grpc_resolved_address *new_address = NULL;
- grpc_channel_args *new_args = NULL;
+ grpc_resolved_address* new_address = NULL;
+ grpc_channel_args* new_args = NULL;
if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address,
&new_args)) {
GPR_ASSERT(new_address != NULL);
gpr_free(addr);
addr = new_address;
}
- static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
+ static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
gpr_free(addr);
c->args = grpc_channel_args_copy_and_add_and_remove(
@@ -375,8 +376,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
return grpc_subchannel_index_register(exec_ctx, key, c);
}
-static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c) {
+static void continue_connect_locked(grpc_exec_ctx* exec_ctx,
+ grpc_subchannel* c) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -390,8 +391,8 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx,
&c->connected);
}
-grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
- grpc_error **error) {
+grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
+ grpc_error** error) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
state = grpc_connectivity_state_get(&c->state_tracker, error);
@@ -399,10 +400,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c,
return state;
}
-static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- external_state_watcher *w = (external_state_watcher *)arg;
- grpc_closure *follow_up = w->notify;
+static void on_external_state_watcher_done(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ external_state_watcher* w = (external_state_watcher*)arg;
+ grpc_closure* follow_up = w->notify;
if (w->pollset_set != NULL) {
grpc_pollset_set_del_pollset_set(exec_ctx, w->subchannel->pollset_set,
w->pollset_set);
@@ -416,8 +417,8 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_CLOSURE_RUN(exec_ctx, follow_up, GRPC_ERROR_REF(error));
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_subchannel *c = (grpc_subchannel *)arg;
+static void on_alarm(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_subchannel* c = (grpc_subchannel*)arg;
gpr_mu_lock(&c->mu);
c->have_alarm = false;
if (c->disconnected) {
@@ -438,8 +439,8 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
GRPC_ERROR_UNREF(error);
}
-static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c) {
+static void maybe_start_connecting_locked(grpc_exec_ctx* exec_ctx,
+ grpc_subchannel* c) {
if (c->disconnected) {
/* Don't try to connect if we're already disconnected */
return;
@@ -484,10 +485,10 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx,
}
void grpc_subchannel_notify_on_state_change(
- grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
- grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
- grpc_closure *notify) {
- external_state_watcher *w;
+ grpc_exec_ctx* exec_ctx, grpc_subchannel* c,
+ grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
+ grpc_closure* notify) {
+ external_state_watcher* w;
if (state == NULL) {
gpr_mu_lock(&c->mu);
@@ -500,7 +501,7 @@ void grpc_subchannel_notify_on_state_change(
}
gpr_mu_unlock(&c->mu);
} else {
- w = (external_state_watcher *)gpr_malloc(sizeof(*w));
+ w = (external_state_watcher*)gpr_malloc(sizeof(*w));
w->subchannel = c;
w->pollset_set = interested_parties;
w->notify = notify;
@@ -523,18 +524,18 @@ void grpc_subchannel_notify_on_state_change(
}
void grpc_connected_subchannel_process_transport_op(
- grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_transport_op *op) {
- grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0);
+ grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con,
+ grpc_transport_op* op) {
+ grpc_channel_stack* channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_channel_element* top_elem = grpc_channel_stack_element(channel_stack, 0);
top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
}
-static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- grpc_error *error) {
- state_watcher *sw = (state_watcher *)p;
- grpc_subchannel *c = sw->subchannel;
- gpr_mu *mu = &c->mu;
+static void subchannel_on_child_state_changed(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ state_watcher* sw = (state_watcher*)p;
+ grpc_subchannel* c = sw->subchannel;
+ gpr_mu* mu = &c->mu;
gpr_mu_lock(mu);
@@ -559,13 +560,13 @@ static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
gpr_free(sw);
}
-static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
- grpc_connected_subchannel *con,
- grpc_pollset_set *interested_parties,
- grpc_connectivity_state *state,
- grpc_closure *closure) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
- grpc_channel_element *elem;
+static void connected_subchannel_state_op(grpc_exec_ctx* exec_ctx,
+ grpc_connected_subchannel* con,
+ grpc_pollset_set* interested_parties,
+ grpc_connectivity_state* state,
+ grpc_closure* closure) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ grpc_channel_element* elem;
op->connectivity_state = state;
op->on_connectivity_state_change = closure;
op->bind_pollset_set = interested_parties;
@@ -574,31 +575,31 @@ static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx,
}
void grpc_connected_subchannel_notify_on_state_change(
- grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_pollset_set *interested_parties, grpc_connectivity_state *state,
- grpc_closure *closure) {
+ grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con,
+ grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
+ grpc_closure* closure) {
connected_subchannel_state_op(exec_ctx, con, interested_parties, state,
closure);
}
-void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx,
- grpc_connected_subchannel *con,
- grpc_closure *closure) {
- grpc_transport_op *op = grpc_make_transport_op(NULL);
- grpc_channel_element *elem;
+void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx,
+ grpc_connected_subchannel* con,
+ grpc_closure* closure) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ grpc_channel_element* elem;
op->send_ping = closure;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(exec_ctx, elem, op);
}
-static bool publish_transport_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c) {
- grpc_connected_subchannel *con;
- grpc_channel_stack *stk;
- state_watcher *sw_subchannel;
+static bool publish_transport_locked(grpc_exec_ctx* exec_ctx,
+ grpc_subchannel* c) {
+ grpc_connected_subchannel* con;
+ grpc_channel_stack* stk;
+ state_watcher* sw_subchannel;
/* construct channel stack */
- grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create();
+ grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
grpc_channel_stack_builder_set_channel_arguments(
exec_ctx, builder, c->connecting_result.channel_args);
grpc_channel_stack_builder_set_transport(builder,
@@ -609,8 +610,8 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder_destroy(exec_ctx, builder);
return false;
}
- grpc_error *error = grpc_channel_stack_builder_finish(
- exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con);
+ grpc_error* error = grpc_channel_stack_builder_finish(
+ exec_ctx, builder, 0, 1, connection_destroy, NULL, (void**)&con);
if (error != GRPC_ERROR_NONE) {
grpc_transport_destroy(exec_ctx, c->connecting_result.transport);
gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
@@ -622,7 +623,7 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx,
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
- sw_subchannel = (state_watcher *)gpr_malloc(sizeof(*sw_subchannel));
+ sw_subchannel = (state_watcher*)gpr_malloc(sizeof(*sw_subchannel));
sw_subchannel->subchannel = c;
sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
GRPC_CLOSURE_INIT(&sw_subchannel->closure, subchannel_on_child_state_changed,
@@ -657,10 +658,10 @@ static bool publish_transport_locked(grpc_exec_ctx *exec_ctx,
return true;
}
-static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- grpc_subchannel *c = (grpc_subchannel *)arg;
- grpc_channel_args *delete_channel_args = c->connecting_result.channel_args;
+static void subchannel_connected(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ grpc_subchannel* c = (grpc_subchannel*)arg;
+ grpc_channel_args* delete_channel_args = c->connecting_result.channel_args;
GRPC_SUBCHANNEL_WEAK_REF(c, "connected");
gpr_mu_lock(&c->mu);
@@ -678,7 +679,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
- const char *errmsg = grpc_error_string(error);
+ const char* errmsg = grpc_error_string(error);
gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
maybe_start_connecting_locked(exec_ctx, c);
@@ -693,65 +694,65 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
* grpc_subchannel_call implementation
*/
-static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
- grpc_error *error) {
- grpc_subchannel_call *c = (grpc_subchannel_call *)call;
+static void subchannel_call_destroy(grpc_exec_ctx* exec_ctx, void* call,
+ grpc_error* error) {
+ grpc_subchannel_call* c = (grpc_subchannel_call*)call;
GPR_ASSERT(c->schedule_closure_after_destroy != NULL);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
- grpc_connected_subchannel *connection = c->connection;
+ grpc_connected_subchannel* connection = c->connection;
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL,
c->schedule_closure_after_destroy);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
-void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call,
- grpc_closure *closure) {
+void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call,
+ grpc_closure* closure) {
GPR_ASSERT(call->schedule_closure_after_destroy == NULL);
GPR_ASSERT(closure != NULL);
call->schedule_closure_after_destroy = closure;
}
void grpc_subchannel_call_ref(
- grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
-void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call *c
+void grpc_subchannel_call_unref(grpc_exec_ctx* exec_ctx,
+ grpc_subchannel_call* c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_UNREF(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
-void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call *call,
- grpc_transport_stream_op_batch *batch) {
+void grpc_subchannel_call_process_op(grpc_exec_ctx* exec_ctx,
+ grpc_subchannel_call* call,
+ grpc_transport_stream_op_batch* batch) {
GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
- grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
- grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
+ grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+ grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
top_elem->filter->start_transport_stream_op_batch(exec_ctx, top_elem, batch);
GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
-grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
- grpc_subchannel *c) {
+grpc_connected_subchannel* grpc_subchannel_get_connected_subchannel(
+ grpc_subchannel* c) {
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
-const grpc_subchannel_key *grpc_subchannel_get_key(
- const grpc_subchannel *subchannel) {
+const grpc_subchannel_key* grpc_subchannel_get_key(
+ const grpc_subchannel* subchannel) {
return subchannel->key;
}
-grpc_error *grpc_connected_subchannel_create_call(
- grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- const grpc_connected_subchannel_call_args *args,
- grpc_subchannel_call **call) {
- grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- *call = (grpc_subchannel_call *)gpr_arena_alloc(
+grpc_error* grpc_connected_subchannel_create_call(
+ grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con,
+ const grpc_connected_subchannel_call_args* args,
+ grpc_subchannel_call** call) {
+ grpc_channel_stack* chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
+ *call = (grpc_subchannel_call*)gpr_arena_alloc(
args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
- grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
const grpc_call_element_args call_args = {
callstk, /* call_stack */
@@ -763,10 +764,10 @@ grpc_error *grpc_connected_subchannel_create_call(
args->arena, /* arena */
args->call_combiner /* call_combiner */
};
- grpc_error *error = grpc_call_stack_init(
+ grpc_error* error = grpc_call_stack_init(
exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args);
if (error != GRPC_ERROR_NONE) {
- const char *error_string = grpc_error_string(error);
+ const char* error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
@@ -774,39 +775,39 @@ grpc_error *grpc_connected_subchannel_create_call(
return GRPC_ERROR_NONE;
}
-grpc_call_stack *grpc_subchannel_call_get_call_stack(
- grpc_subchannel_call *subchannel_call) {
+grpc_call_stack* grpc_subchannel_call_get_call_stack(
+ grpc_subchannel_call* subchannel_call) {
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}
-static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str,
- grpc_resolved_address *addr) {
- grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */);
+static void grpc_uri_to_sockaddr(grpc_exec_ctx* exec_ctx, const char* uri_str,
+ grpc_resolved_address* addr) {
+ grpc_uri* uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr));
grpc_uri_destroy(uri);
}
-void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx,
- const grpc_channel_args *args,
- grpc_resolved_address *addr) {
- const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
+void grpc_get_subchannel_address_arg(grpc_exec_ctx* exec_ctx,
+ const grpc_channel_args* args,
+ grpc_resolved_address* addr) {
+ const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
memset(addr, 0, sizeof(*addr));
if (*addr_uri_str != '\0') {
grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr);
}
}
-const char *grpc_get_subchannel_address_uri_arg(const grpc_channel_args *args) {
- const grpc_arg *addr_arg =
+const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) {
+ const grpc_arg* addr_arg =
grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy.
GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING);
return addr_arg->value.string;
}
-grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) {
+grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
return grpc_channel_arg_string_create(
- (char *)GRPC_ARG_SUBCHANNEL_ADDRESS,
+ (char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
}