aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/client_config/connector.c9
-rw-r--r--src/core/client_config/connector.h12
-rw-r--r--src/core/client_config/lb_policies/pick_first.c52
-rw-r--r--src/core/client_config/lb_policy.c31
-rw-r--r--src/core/client_config/lb_policy.h19
-rw-r--r--src/core/client_config/resolvers/unix_resolver_posix.c21
-rw-r--r--src/core/client_config/subchannel.c214
-rw-r--r--src/core/client_config/subchannel.h9
8 files changed, 197 insertions, 170 deletions
diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c
index 9cc57ddf38..a8cd5fc149 100644
--- a/src/core/client_config/connector.c
+++ b/src/core/client_config/connector.c
@@ -41,10 +41,9 @@ void grpc_connector_unref(grpc_connector *connector) {
connector->vtable->unref(connector);
}
-void grpc_connector_connect(
- grpc_connector *connector,
- const grpc_connect_in_args *in_args,
- grpc_connect_out_args *out_args,
- grpc_iomgr_closure *notify) {
+void grpc_connector_connect(grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
+ grpc_iomgr_closure *notify) {
connector->vtable->connect(connector, in_args, out_args, notify);
}
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index 55c6e63129..edcb10a36e 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -72,16 +72,14 @@ struct grpc_connector_vtable {
void (*unref)(grpc_connector *connector);
void (*connect)(grpc_connector *connector,
const grpc_connect_in_args *in_args,
- grpc_connect_out_args *out_args,
- grpc_iomgr_closure *notify);
+ grpc_connect_out_args *out_args, grpc_iomgr_closure *notify);
};
void grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_connector *connector);
-void grpc_connector_connect(
- grpc_connector *connector,
- const grpc_connect_in_args *in_args,
- grpc_connect_out_args *out_args,
- grpc_iomgr_closure *notify);
+void grpc_connector_connect(grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
+ grpc_iomgr_closure *notify);
#endif
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index cdc7e75140..c94408200b 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -74,7 +74,7 @@ typedef struct {
} pick_first_lb_policy;
void pf_destroy(grpc_lb_policy *pol) {
- pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
for (i = 0; i < p->num_subchannels; i++) {
grpc_subchannel_unref(p->subchannels[i]);
@@ -92,7 +92,7 @@ void pf_shutdown(grpc_lb_policy *pol) {
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete) {
- pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
if (p->selected) {
@@ -105,9 +105,12 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
- grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed);
}
- grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset);
+ grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
+ pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -121,14 +124,16 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
static void del_interested_parties_locked(pick_first_lb_policy *p) {
pending_pick *pp;
for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], pp->pollset);
+ grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
+ pp->pollset);
}
}
static void add_interested_parties_locked(pick_first_lb_policy *p) {
pending_pick *pp;
for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pp->pollset);
+ grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
+ pp->pollset);
}
}
@@ -142,7 +147,8 @@ loop:
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
p->selected = p->subchannels[p->checking_subchannel];
- GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) == GRPC_CHANNEL_READY);
+ GPR_ASSERT(grpc_subchannel_check_connectivity(p->selected) ==
+ GRPC_CHANNEL_READY);
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
@@ -154,19 +160,25 @@ loop:
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
del_interested_parties_locked(p);
- p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels;
- p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
+ p->checking_subchannel =
+ (p->checking_subchannel + 1) % p->num_subchannels;
+ p->checking_connectivity = grpc_subchannel_check_connectivity(
+ p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
goto loop;
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
- grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p);
- GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]);
+ GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+ p->subchannels[p->num_subchannels - 1]);
p->checking_subchannel %= p->num_subchannels;
- p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
+ p->checking_connectivity = grpc_subchannel_check_connectivity(
+ p->subchannels[p->checking_subchannel]);
p->num_subchannels--;
grpc_subchannel_unref(p->subchannels[p->num_subchannels]);
add_interested_parties_locked(p);
@@ -184,7 +196,7 @@ loop:
}
static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
- pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
size_t n;
grpc_subchannel **subchannels;
@@ -206,7 +218,7 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
}
static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
- pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
st = grpc_connectivity_state_check(&p->state_tracker);
@@ -214,15 +226,19 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st;
}
-static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) {
- pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+static void pf_notify_on_state_change(grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
- grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify);
gpr_mu_unlock(&p->mu);
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_destroy, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
+ pf_destroy, pf_shutdown, pf_pick,
+ pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) {
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index dfe21cf443..6d1c788742 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -33,33 +33,34 @@
#include "src/core/client_config/lb_policy.h"
-void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) {
- policy->vtable = vtable;
- gpr_ref_init(&policy->refs, 1);
+void grpc_lb_policy_init(grpc_lb_policy *policy,
+ const grpc_lb_policy_vtable *vtable) {
+ policy->vtable = vtable;
+ gpr_ref_init(&policy->refs, 1);
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
-void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason) {
+void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
+ const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p ref %d -> %d %s",
- policy, (int)policy->refs.count, (int)policy->refs.count + 1,
- reason);
+ policy, (int)policy->refs.count, (int)policy->refs.count + 1, reason);
#else
-void grpc_lb_policy_ref(grpc_lb_policy *policy) {
+void grpc_lb_policy_ref(grpc_lb_policy *policy) {
#endif
- gpr_ref(&policy->refs);
+ gpr_ref(&policy->refs);
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
-void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason) {
+void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line,
+ const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s",
- policy, (int)policy->refs.count, (int)policy->refs.count - 1,
- reason);
+ policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason);
#else
void grpc_lb_policy_unref(grpc_lb_policy *policy) {
#endif
- if (gpr_unref(&policy->refs)) {
- policy->vtable->destroy(policy);
- }
+ if (gpr_unref(&policy->refs)) {
+ policy->vtable->destroy(policy);
+ }
}
void grpc_lb_policy_shutdown(grpc_lb_policy *policy) {
@@ -74,5 +75,5 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
}
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
- policy->vtable->broadcast(policy, op);
+ policy->vtable->broadcast(policy, op);
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 717f26af65..a468f761cc 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -67,14 +67,20 @@ struct grpc_lb_policy_vtable {
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
- void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure);
+ void (*notify_on_state_change)(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure);
};
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
-#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
-void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason);
-void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, const char *reason);
+#define GRPC_LB_POLICY_REF(p, r) \
+ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_UNREF(p, r) \
+ grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
+void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
+ const char *reason);
+void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line,
+ const char *reason);
#else
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
@@ -83,7 +89,8 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy);
#endif
/** called by concrete implementations to initialize the base struct */
-void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable);
+void grpc_lb_policy_init(grpc_lb_policy *policy,
+ const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */
void grpc_lb_policy_shutdown(grpc_lb_policy *policy);
diff --git a/src/core/client_config/resolvers/unix_resolver_posix.c b/src/core/client_config/resolvers/unix_resolver_posix.c
index 3dedf94357..f7498548b1 100644
--- a/src/core/client_config/resolvers/unix_resolver_posix.c
+++ b/src/core/client_config/resolvers/unix_resolver_posix.c
@@ -79,10 +79,10 @@ static void unix_ref(grpc_resolver *r);
static void unix_unref(grpc_resolver *r);
static void unix_shutdown(grpc_resolver *r);
static void unix_channel_saw_error(grpc_resolver *r,
- struct sockaddr *failing_address,
- int failing_address_len);
+ struct sockaddr *failing_address,
+ int failing_address_len);
static void unix_next(grpc_resolver *r, grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete);
+ grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable unix_resolver_vtable = {
unix_ref, unix_unref, unix_shutdown, unix_channel_saw_error, unix_next};
@@ -112,12 +112,11 @@ static void unix_shutdown(grpc_resolver *resolver) {
}
static void unix_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
- int len) {
-}
+ int len) {}
static void unix_next(grpc_resolver *resolver,
- grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete) {
+ grpc_client_config **target_config,
+ grpc_iomgr_closure *on_complete) {
unix_resolver *r = (unix_resolver *)resolver;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
@@ -136,9 +135,10 @@ static void unix_maybe_finish_next_locked(unix_resolver *r) {
if (r->next_completion != NULL && !r->published) {
cfg = grpc_client_config_create();
memset(&args, 0, sizeof(args));
- args.addr = (struct sockaddr *) &r->addr;
+ args.addr = (struct sockaddr *)&r->addr;
args.addr_len = r->addr_len;
- subchannel = grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
+ subchannel =
+ grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
lb_policy = r->lb_policy_factory(&subchannel, 1);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
@@ -194,8 +194,7 @@ static void unix_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *unix_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
- return unix_create(uri, grpc_create_pick_first_lb_policy,
- subchannel_factory);
+ return unix_create(uri, grpc_create_pick_first_lb_policy, subchannel_factory);
}
static const grpc_resolver_factory_vtable unix_factory_vtable = {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2f5843b2a4..c770cb3b20 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -42,10 +42,10 @@
#include "src/core/transport/connectivity_state.h"
typedef struct {
- /* all fields protected by subchannel->mu */
- /** refcount */
- int refs;
- /** parent subchannel */
+ /* all fields protected by subchannel->mu */
+ /** refcount */
+ int refs;
+ /** parent subchannel */
grpc_subchannel *subchannel;
} connection;
@@ -103,7 +103,8 @@ struct grpc_subchannel_call {
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
-static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
+static grpc_subchannel_call *create_call(connection *con,
+ grpc_transport_stream_op *initial_op);
static void connectivity_state_changed_locked(grpc_subchannel *c);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
@@ -112,7 +113,8 @@ static void subchannel_connected(void *subchannel, int iomgr_success);
static void subchannel_ref_locked(grpc_subchannel *c);
static int subchannel_unref_locked(grpc_subchannel *c) GRPC_MUST_USE_RESULT;
static void connection_ref_locked(connection *c);
-static grpc_subchannel *connection_unref_locked(connection *c) GRPC_MUST_USE_RESULT;
+static grpc_subchannel *connection_unref_locked(connection *c)
+ GRPC_MUST_USE_RESULT;
static void subchannel_destroy(grpc_subchannel *c);
/*
@@ -120,58 +122,55 @@ static void subchannel_destroy(grpc_subchannel *c);
*/
static void connection_destroy(connection *c) {
- GPR_ASSERT(c->refs == 0);
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
+ GPR_ASSERT(c->refs == 0);
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
}
-static void connection_ref_locked(connection *c) {
- subchannel_ref_locked(c->subchannel);
- ++c->refs;
+static void connection_ref_locked(connection *c) {
+ subchannel_ref_locked(c->subchannel);
+ ++c->refs;
}
static grpc_subchannel *connection_unref_locked(connection *c) {
- grpc_subchannel *destroy = NULL;
- if (subchannel_unref_locked(c->subchannel)) {
- destroy = c->subchannel;
- }
+ grpc_subchannel *destroy = NULL;
+ if (subchannel_unref_locked(c->subchannel)) {
+ destroy = c->subchannel;
+ }
if (--c->refs == 0 && c->subchannel->active != c) {
- connection_destroy(c);
+ connection_destroy(c);
}
return destroy;
}
-
/*
* grpc_subchannel implementation
*/
-static void subchannel_ref_locked(grpc_subchannel *c) {
- ++c->refs;
-}
+static void subchannel_ref_locked(grpc_subchannel *c) { ++c->refs; }
static int subchannel_unref_locked(grpc_subchannel *c) {
- return --c->refs == 0;
+ return --c->refs == 0;
}
void grpc_subchannel_ref(grpc_subchannel *c) {
- gpr_mu_lock(&c->mu);
- subchannel_ref_locked(c);
- gpr_mu_unlock(&c->mu);
+ gpr_mu_lock(&c->mu);
+ subchannel_ref_locked(c);
+ gpr_mu_unlock(&c->mu);
}
void grpc_subchannel_unref(grpc_subchannel *c) {
- int destroy;
- gpr_mu_lock(&c->mu);
- destroy = subchannel_unref_locked(c);
- gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(c);
+ int destroy;
+ gpr_mu_lock(&c->mu);
+ destroy = subchannel_unref_locked(c);
+ gpr_mu_unlock(&c->mu);
+ if (destroy) subchannel_destroy(c);
}
static void subchannel_destroy(grpc_subchannel *c) {
- if (c->active != NULL) {
- connection_destroy(c->active);
- }
+ if (c->active != NULL) {
+ connection_destroy(c->active);
+ }
gpr_free(c->filters);
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
@@ -216,16 +215,17 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
}
static void start_connect(grpc_subchannel *c) {
- grpc_connect_in_args args;
+ grpc_connect_in_args args;
- args.interested_parties = &c->pollset_set;
- args.addr = c->addr;
- args.addr_len = c->addr_len;
- args.deadline = compute_connect_deadline(c);
- args.channel_args = c->args;
- args.metadata_context = c->mdctx;
+ args.interested_parties = &c->pollset_set;
+ args.addr = c->addr;
+ args.addr_len = c->addr_len;
+ args.deadline = compute_connect_deadline(c);
+ args.channel_args = c->args;
+ args.metadata_context = c->mdctx;
- grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected);
+ grpc_connector_connect(c->connector, &args, &c->connecting_result,
+ &c->connected);
}
void grpc_subchannel_create_call(grpc_subchannel *c,
@@ -275,78 +275,82 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_iomgr_closure *notify) {
int do_connect = 0;
gpr_mu_lock(&c->mu);
- if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
- do_connect = 1;
+ if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
+ notify)) {
+ do_connect = 1;
c->connecting = 1;
subchannel_ref_locked(c);
- grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
+ grpc_connectivity_state_set(&c->state_tracker,
+ compute_connectivity_locked(c));
}
gpr_mu_unlock(&c->mu);
if (do_connect) {
- start_connect(c);
+ start_connect(c);
}
}
-void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) {
- abort();
+void grpc_subchannel_process_transport_op(grpc_subchannel *c,
+ grpc_transport_op *op) {
+ abort();
}
static void publish_transport(grpc_subchannel *c) {
- size_t channel_stack_size;
- connection *con;
- grpc_channel_stack *stk;
- size_t num_filters;
- const grpc_channel_filter **filters;
- waiting_for_connect *w4c;
- int destroy;
-
- num_filters = c->num_filters + c->connecting_result.num_filters + 1;
- filters = gpr_malloc(sizeof(*filters) * num_filters);
- memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
- memcpy(filters + c->num_filters, c->connecting_result.filters, sizeof(*filters) * c->connecting_result.num_filters);
- filters[num_filters - 1] = &grpc_connected_channel_filter;
-
- channel_stack_size = grpc_channel_stack_size(filters, num_filters);
- con = gpr_malloc(sizeof(connection) + channel_stack_size);
- stk = (grpc_channel_stack *)(con + 1);
-
- con->refs = 0;
- con->subchannel = c;
- grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
- grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
- memset(&c->connecting_result, 0, sizeof(c->connecting_result));
-
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->active == NULL);
- c->active = con;
- c->connecting = 0;
- connectivity_state_changed_locked(c);
- while ((w4c = c->waiting)) {
- abort(); /* not implemented */
- }
+ size_t channel_stack_size;
+ connection *con;
+ grpc_channel_stack *stk;
+ size_t num_filters;
+ const grpc_channel_filter **filters;
+ waiting_for_connect *w4c;
+ int destroy;
+
+ num_filters = c->num_filters + c->connecting_result.num_filters + 1;
+ filters = gpr_malloc(sizeof(*filters) * num_filters);
+ memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
+ memcpy(filters + c->num_filters, c->connecting_result.filters,
+ sizeof(*filters) * c->connecting_result.num_filters);
+ filters[num_filters - 1] = &grpc_connected_channel_filter;
+
+ channel_stack_size = grpc_channel_stack_size(filters, num_filters);
+ con = gpr_malloc(sizeof(connection) + channel_stack_size);
+ stk = (grpc_channel_stack *)(con + 1);
+
+ con->refs = 0;
+ con->subchannel = c;
+ grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
+ grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
+ memset(&c->connecting_result, 0, sizeof(c->connecting_result));
+
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->active == NULL);
+ c->active = con;
+ c->connecting = 0;
+ connectivity_state_changed_locked(c);
+ while ((w4c = c->waiting)) {
+ abort(); /* not implemented */
+ }
destroy = subchannel_unref_locked(c);
- gpr_mu_unlock(&c->mu);
+ gpr_mu_unlock(&c->mu);
- gpr_free(filters);
+ gpr_free(filters);
- if (destroy) {
- subchannel_destroy(c);
- }
-}
+ if (destroy) {
+ subchannel_destroy(c);
+ }
+}
static void subchannel_connected(void *arg, int iomgr_success) {
- grpc_subchannel *c = arg;
- if (c->connecting_result.transport) {
- publish_transport(c);
- } else {
- int destroy;
- gpr_mu_lock(&c->mu);
- destroy = subchannel_unref_locked(c);
- gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(c);
- /* TODO(ctiller): retry after sleeping */
- abort();
- }
+ grpc_subchannel *c = arg;
+ if (c->connecting_result.transport) {
+ publish_transport(c);
+ } else {
+ int destroy;
+ gpr_mu_lock(&c->mu);
+ destroy = subchannel_unref_locked(c);
+ gpr_mu_unlock(&c->mu);
+ if (destroy) subchannel_destroy(c);
+ /* TODO(ctiller): retry after sleeping */
+ abort();
+ }
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@@ -372,21 +376,19 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) {
* grpc_subchannel_call implementation
*/
-void grpc_subchannel_call_ref(grpc_subchannel_call *c) {
- gpr_ref(&c->refs);
-}
+void grpc_subchannel_call_ref(grpc_subchannel_call *c) { gpr_ref(&c->refs); }
void grpc_subchannel_call_unref(grpc_subchannel_call *c) {
if (gpr_unref(&c->refs)) {
- gpr_mu *mu = &c->connection->subchannel->mu;
- grpc_subchannel *destroy;
+ gpr_mu *mu = &c->connection->subchannel->mu;
+ grpc_subchannel *destroy;
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
gpr_mu_lock(mu);
destroy = connection_unref_locked(c->connection);
gpr_mu_unlock(mu);
gpr_free(c);
if (destroy) {
- subchannel_destroy(destroy);
+ subchannel_destroy(destroy);
}
}
}
@@ -398,9 +400,11 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
top_elem->filter->start_transport_stream_op(top_elem, op);
}
-grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
- grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+grpc_subchannel_call *create_call(connection *con,
+ grpc_transport_stream_op *initial_op) {
+ grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_subchannel_call *call =
+ gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
gpr_ref_init(&call->refs, 1);
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 8155aba14c..b777e51d20 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -55,7 +55,8 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel,
grpc_iomgr_closure *notify);
/** process a transport level op */
-void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op);
+void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
+ grpc_transport_op *op);
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
@@ -67,8 +68,10 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify);
-void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
-void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
+void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
+ grpc_pollset *pollset);
+void grpc_subchannel_del_interested_party(grpc_subchannel *channel,
+ grpc_pollset *pollset);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,