aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-17 07:18:31 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-17 07:18:31 -0800
commitb5585d4f723003e4c900f02c2f4ce25e6fb26d5e (patch)
treeec65b2d8caa4b4f35c2d9b8e644542478e163125 /src/core
parent860f484f43841d14df61cfeda6ca5e637ed99d19 (diff)
Initial pass through to make subchannels single connect
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c22
-rw-r--r--src/core/channel/client_uchannel.c20
-rw-r--r--src/core/channel/client_uchannel.h4
-rw-r--r--src/core/channel/subchannel_call_holder.c54
-rw-r--r--src/core/channel/subchannel_call_holder.h7
-rw-r--r--src/core/client_config/lb_policies/pick_first.c37
-rw-r--r--src/core/client_config/lb_policies/round_robin.c18
-rw-r--r--src/core/client_config/lb_policy.c4
-rw-r--r--src/core/client_config/lb_policy.h8
-rw-r--r--src/core/client_config/subchannel.c481
-rw-r--r--src/core/client_config/subchannel.h51
-rw-r--r--src/core/transport/transport.h2
12 files changed, 232 insertions, 476 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 16d91d4277..5517507423 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -287,7 +287,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
typedef struct {
grpc_metadata_batch *initial_metadata;
- grpc_subchannel **subchannel;
+ grpc_connected_subchannel **connected_subchannel;
grpc_closure *on_ready;
grpc_call_element *elem;
grpc_closure closure;
@@ -295,17 +295,17 @@ typedef struct {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
continue_picking_args *cpa = arg;
if (!success) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
- } else if (cpa->subchannel == NULL) {
+ } else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
- cpa->subchannel, cpa->on_ready)) {
+ cpa->connected_subchannel, cpa->on_ready)) {
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
}
gpr_free(cpa);
@@ -313,7 +313,7 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_call_element *elem = elemp;
channel_data *chand = elem->channel_data;
@@ -321,18 +321,18 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
continue_picking_args *cpa;
grpc_closure *closure;
- GPR_ASSERT(subchannel);
+ GPR_ASSERT(connected_subchannel);
gpr_mu_lock(&chand->mu_config);
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
- grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel);
+ grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, connected_subchannel);
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = grpc_closure_next(closure)) {
cpa = closure->cb_arg;
- if (cpa->subchannel == subchannel) {
- cpa->subchannel = NULL;
+ if (cpa->connected_subchannel == connected_subchannel) {
+ cpa->connected_subchannel = NULL;
grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
}
}
@@ -341,7 +341,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
if (chand->lb_policy != NULL) {
int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
- initial_metadata, subchannel, on_ready);
+ initial_metadata, connected_subchannel, on_ready);
gpr_mu_unlock(&chand->mu_config);
return r;
}
@@ -354,7 +354,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
cpa = gpr_malloc(sizeof(*cpa));
cpa->initial_metadata = initial_metadata;
- cpa->subchannel = subchannel;
+ cpa->connected_subchannel = connected_subchannel;
cpa->on_ready = on_ready;
cpa->elem = elem;
grpc_closure_init(&cpa->closure, continue_picking, cpa);
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index ec6b02381a..b27284e8a7 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -67,7 +67,7 @@ typedef struct client_uchannel_channel_data {
grpc_connectivity_state_tracker state_tracker;
/** the subchannel wrapped by the microchannel */
- grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
/** the callback used to stay subscribed to subchannel connectivity
* notifications */
@@ -87,7 +87,7 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
chand->subchannel_connectivity,
"uchannel_monitor_subchannel");
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel,
&chand->subchannel_connectivity,
&chand->connectivity_cb);
}
@@ -131,11 +131,11 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
channel_data *chand = arg;
GPR_ASSERT(initial_metadata != NULL);
- *subchannel = chand->subchannel;
+ *connected_subchannel = chand->connected_subchannel;
return 1;
}
@@ -172,7 +172,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel,
+ grpc_connected_subchannel_state_change_unsubscribe(exec_ctx, chand->connected_subchannel,
&chand->connectivity_cb);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
gpr_mu_destroy(&chand->mu_state);
@@ -202,7 +202,7 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
GRPC_CHANNEL_CONNECTING,
"uchannel_connecting_changed");
chand->subchannel_connectivity = out;
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, chand->connected_subchannel,
&chand->subchannel_connectivity,
&chand->connectivity_cb);
}
@@ -226,7 +226,7 @@ grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
grpc_channel_element *parent_elem;
gpr_mu_lock(&chand->mu_state);
parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
- grpc_subchannel_get_master(chand->subchannel)));
+ chand->master));
gpr_mu_unlock(&chand->mu_state);
return grpc_client_channel_get_connecting_pollset_set(parent_elem);
}
@@ -273,13 +273,13 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
return channel;
}
-void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
- grpc_subchannel *subchannel) {
+void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel,
+ grpc_connected_subchannel *connected_subchannel) {
grpc_channel_element *elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
gpr_mu_lock(&chand->mu_state);
- chand->subchannel = subchannel;
+ chand->connected_subchannel = connected_subchannel;
gpr_mu_unlock(&chand->mu_state);
}
diff --git a/src/core/channel/client_uchannel.h b/src/core/channel/client_uchannel.h
index dfe6695ae3..1acf9bfd69 100644
--- a/src/core/channel/client_uchannel.h
+++ b/src/core/channel/client_uchannel.h
@@ -64,7 +64,7 @@ void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args);
-void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
- grpc_subchannel *subchannel);
+void grpc_client_uchannel_set_connected_subchannel(grpc_channel *uchannel,
+ grpc_connected_subchannel *connected_subchannel);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_MICROCHANNEL_H */
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 7251714519..d1a7f86348 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -44,7 +44,6 @@
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
int success);
-static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
int success);
@@ -63,7 +62,7 @@ void grpc_subchannel_call_holder_init(
holder->pick_subchannel = pick_subchannel;
holder->pick_subchannel_arg = pick_subchannel_arg;
gpr_mu_init(&holder->mu);
- holder->subchannel = NULL;
+ holder->connected_subchannel = NULL;
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
@@ -125,13 +124,9 @@ retry:
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, holder);
break;
- case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
- grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
- &holder->subchannel_call);
- break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
- &holder->subchannel, NULL);
+ &holder->connected_subchannel, NULL);
break;
}
gpr_mu_unlock(&holder->mu);
@@ -142,28 +137,21 @@ retry:
}
/* if we don't have a subchannel, try to get one */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
- holder->subchannel == NULL && op->send_initial_metadata != NULL) {
+ holder->connected_subchannel == NULL && op->send_initial_metadata != NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
grpc_closure_init(&holder->next_step, subchannel_ready, holder);
if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
- op->send_initial_metadata, &holder->subchannel,
+ op->send_initial_metadata, &holder->connected_subchannel,
&holder->next_step)) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
}
}
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
- holder->subchannel != NULL) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL;
- grpc_closure_init(&holder->next_step, call_ready, holder);
- if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
- holder->pollset, &holder->subchannel_call,
- &holder->next_step)) {
- /* got one immediately - continue the op (and any waiting ops) */
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- retry_waiting_locked(exec_ctx, holder);
- goto retry;
- }
+ holder->connected_subchannel != NULL) {
+ gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset));
+ retry_waiting_locked(exec_ctx, holder);
+ goto retry;
}
/* nothing to be done but wait */
add_waiting_locked(holder, op);
@@ -179,36 +167,14 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
call = GET_CALL(holder);
GPR_ASSERT(call == NULL || call == CANCELLED_CALL);
- if (holder->subchannel == NULL) {
+ if (holder->connected_subchannel == NULL) {
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
fail_locked(exec_ctx, holder);
} else {
- grpc_closure_init(&holder->next_step, call_ready, holder);
- if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
- holder->pollset, &holder->subchannel_call,
- &holder->next_step)) {
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- /* got one immediately - continue the op (and any waiting ops) */
- retry_waiting_locked(exec_ctx, holder);
- }
- }
- gpr_mu_unlock(&holder->mu);
-}
-
-static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
- grpc_subchannel_call_holder *holder = arg;
- GPR_TIMER_BEGIN("call_ready", 0);
- gpr_mu_lock(&holder->mu);
- GPR_ASSERT(holder->creation_phase ==
- GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL);
- holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
- if (GET_CALL(holder) != NULL) {
+ gpr_atm_rel_store(&holder->subchannel_call, grpc_connected_subchannel_create_call(exec_ctx, holder->connected_subchannel, holder->pollset));
retry_waiting_locked(exec_ctx, holder);
- } else {
- fail_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
- GPR_TIMER_END("call_ready", 0);
}
typedef struct {
diff --git a/src/core/channel/subchannel_call_holder.h b/src/core/channel/subchannel_call_holder.h
index bda051c566..6328f35344 100644
--- a/src/core/channel/subchannel_call_holder.h
+++ b/src/core/channel/subchannel_call_holder.h
@@ -42,12 +42,11 @@
called when the subchannel is available) */
typedef int (*grpc_subchannel_call_holder_pick_subchannel)(
grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel, grpc_closure *on_ready);
+ grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready);
typedef enum {
GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING,
- GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL,
- GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL
+ GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL
} grpc_subchannel_call_holder_creation_phase;
/** Wrapper for holding a pointer to grpc_subchannel_call, and the
@@ -71,7 +70,7 @@ typedef struct grpc_subchannel_call_holder {
gpr_mu mu;
grpc_subchannel_call_holder_creation_phase creation_phase;
- grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
grpc_pollset *pollset;
grpc_transport_stream_op *waiting_ops;
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 93312abb00..6d9e6af4a6 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -42,7 +42,7 @@
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
- grpc_subchannel **target;
+ grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -60,7 +60,7 @@ typedef struct {
/** the selected channel
TODO(ctiller): this should be atomically set so we don't
need to take a mutex in the common case */
- grpc_subchannel *selected;
+ grpc_connected_subchannel *selected;
/** have we started picking? */
int started_picking;
/** are we shut down? */
@@ -102,7 +102,7 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
if (p->selected) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
}
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
@@ -131,7 +131,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -174,7 +174,7 @@ void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
- grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
+ grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
@@ -207,7 +207,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_op op;
size_t num_subchannels = p->num_subchannels;
grpc_subchannel **subchannels;
- grpc_subchannel *exclude_subchannel;
+ grpc_connected_subchannel *exclude_subchannel;
gpr_mu_lock(&p->mu);
subchannels = p->subchannels;
@@ -218,7 +218,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
for (i = 0; i < num_subchannels; i++) {
- if (subchannels[i] != exclude_subchannel) {
+ if (grpc_subchannel_get_connected_subchannel(subchannels[i]) != exclude_subchannel) {
memset(&op, 0, sizeof(op));
op.disconnect = 1;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
@@ -232,6 +232,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
+ grpc_subchannel *selected_subchannel;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -244,7 +245,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
p->checking_connectivity, "selected_changed");
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
} else {
@@ -256,8 +257,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
- p->selected = p->subchannels[p->checking_subchannel];
- GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
+ selected_subchannel = p->subchannels[p->checking_subchannel];
+ p->selected = grpc_subchannel_get_connected_subchannel(selected_subchannel);
+ GPR_ASSERT(p->selected);
+ GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
grpc_exec_ctx_enqueue(exec_ctx,
@@ -266,12 +269,12 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(exec_ctx, p->selected,
+ grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel,
pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
- grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, p->selected,
&p->checking_connectivity,
&p->connectivity_changed);
break;
@@ -342,14 +345,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
size_t i;
size_t n;
grpc_subchannel **subchannels;
- grpc_subchannel *selected;
+ grpc_connected_subchannel *selected;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
selected = p->selected;
if (selected) {
- GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
+ GRPC_CONNECTED_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
}
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
@@ -358,13 +361,13 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
- if (selected == subchannels[i]) continue;
+ if (selected == grpc_subchannel_get_connected_subchannel(subchannels[i])) continue;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
if (p->selected) {
- grpc_subchannel_process_transport_op(exec_ctx, selected, op);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
+ grpc_connected_subchannel_process_transport_op(exec_ctx, selected, op);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
}
gpr_free(subchannels);
}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index 1ffe32fff2..08592b79e1 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -46,7 +46,7 @@ int grpc_lb_round_robin_trace = 0;
typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
- grpc_subchannel **target;
+ grpc_connected_subchannel **target;
grpc_closure *on_complete;
} pending_pick;
@@ -144,9 +144,9 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) {
/** Prepends (relative to the root at p->ready_list) the connected subchannel \a
* csc to the list of ready subchannels. */
static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
- grpc_subchannel *csc) {
+ grpc_subchannel *sc) {
ready_list *new_elem = gpr_malloc(sizeof(ready_list));
- new_elem->subchannel = csc;
+ new_elem->subchannel = sc;
if (p->ready_list.prev == NULL) {
/* first element */
new_elem->next = &p->ready_list;
@@ -160,7 +160,7 @@ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
p->ready_list.prev = new_elem;
}
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc);
+ gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, sc);
}
return new_elem;
}
@@ -265,7 +265,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
size_t i;
@@ -314,7 +314,7 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
- grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
+ grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target,
grpc_closure *on_complete) {
size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
@@ -323,9 +323,9 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
gpr_mu_lock(&p->mu);
if ((selected = peek_next_connected_locked(p))) {
gpr_mu_unlock(&p->mu);
- *target = selected->subchannel;
+ *target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
- gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)",
+ gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- CONNECTED SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
/* only advance the last picked pointer if the selection was used */
@@ -390,7 +390,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
- *pp->target = selected->subchannel;
+ *pp->target = grpc_subchannel_get_connected_subchannel(selected->subchannel);
if (grpc_lb_round_robin_trace) {
gpr_log(GPR_DEBUG,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 36a2454309..6fa3c1b423 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -71,13 +71,13 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target, grpc_closure *on_complete) {
+ grpc_connected_subchannel **target, grpc_closure *on_complete) {
return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
target, on_complete);
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_subchannel **target) {
+ grpc_connected_subchannel **target) {
policy->vtable->cancel_pick(exec_ctx, policy, target);
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index a696c3ce64..b1fb64c06c 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -58,9 +58,9 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target, grpc_closure *on_complete);
+ grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_subchannel **target);
+ grpc_connected_subchannel **target);
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
@@ -111,10 +111,10 @@ void grpc_lb_policy_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target, grpc_closure *on_complete);
+ grpc_connected_subchannel **target, grpc_closure *on_complete);
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_subchannel **target);
+ grpc_connected_subchannel **target);
void grpc_lb_policy_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_transport_op *op);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 49c2cf9a19..e2644be253 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -52,33 +52,29 @@
#define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
#define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
-typedef struct {
- /* all fields protected by subchannel->mu */
+#define GET_CONNECTED_SUBCHANNEL(subchannel, barrier) \
+ ((grpc_connected_subchannel *)(gpr_atm_##barrier##_load(&(subchannel)->connected_subchannel)))
+
+struct grpc_connected_subchannel {
/** refcount */
- int refs;
- /** parent subchannel */
- grpc_subchannel *subchannel;
-} connection;
+ gpr_refcount refs;
+};
typedef struct {
grpc_closure closure;
- size_t version;
- grpc_subchannel *subchannel;
+ union {
+ grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
+ } whom;
grpc_connectivity_state connectivity_state;
} state_watcher;
-typedef struct waiting_for_connect {
- struct waiting_for_connect *next;
- grpc_closure *notify;
- grpc_pollset *pollset;
- gpr_atm *target;
- grpc_subchannel *subchannel;
- grpc_closure continuation;
-} waiting_for_connect;
-
struct grpc_subchannel {
grpc_connector *connector;
+ /** refcount */
+ gpr_refcount refs;
+
/** non-transport related channel filters */
const grpc_channel_filter **filters;
size_t num_filters;
@@ -94,8 +90,6 @@ struct grpc_subchannel {
We occasionally use this to bump the refcount on the master channel
to keep ourselves alive through an asynchronous operation. */
grpc_channel *master;
- /** have we seen a disconnection? */
- int disconnected;
/** set during connection */
grpc_connect_out_args connecting_result;
@@ -109,19 +103,16 @@ struct grpc_subchannel {
filter there-in) */
grpc_pollset_set *pollset_set;
+ /** active connection, or null; of type grpc_connected_subchannel */
+ gpr_atm connected_subchannel;
+
/** mutex protecting remaining elements */
gpr_mu mu;
- /** active connection */
- connection *active;
- /** version number for the active connection */
- size_t active_version;
- /** refcount */
- int refs;
+ /** have we seen a disconnection? */
+ int disconnected;
/** are we connecting */
int connecting;
- /** things waiting for a connection */
- waiting_for_connect *waiting;
/** connectivity state tracking */
grpc_connectivity_state_tracker state_tracker;
@@ -138,7 +129,7 @@ struct grpc_subchannel {
};
struct grpc_subchannel_call {
- connection *connection;
+ grpc_connected_subchannel *connection;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
@@ -146,27 +137,10 @@ struct grpc_subchannel_call {
#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
(((grpc_subchannel_call *)(callstack)) - 1)
-static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
- connection *con,
- grpc_pollset *pollset);
-static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- const char *reason);
-static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
int iomgr_success);
-static void subchannel_ref_locked(grpc_subchannel *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-static int subchannel_unref_locked(
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
-static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-static grpc_subchannel *connection_unref_locked(
- grpc_exec_ctx *exec_ctx,
- connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
-static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
-
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \
subchannel_ref_locked((p), __FILE__, __LINE__, (r))
@@ -203,66 +177,35 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
* connection implementation
*/
-static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) {
- GPR_ASSERT(c->refs == 0);
+static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
}
-static void connection_ref_locked(connection *c
+void grpc_connected_subchannel_ref(grpc_connected_subchannel *c
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
REF_LOG("CONNECTION", c);
- subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
- ++c->refs;
+ gpr_ref(&c->refs);
}
-static grpc_subchannel *connection_unref_locked(
- grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- grpc_subchannel *destroy = NULL;
+void grpc_connected_subchannel_unref(
+ grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
UNREF_LOG("CONNECTION", c);
- if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
- destroy = c->subchannel;
- }
- if (--c->refs == 0 && c->subchannel->active != c) {
- connection_destroy(exec_ctx, c);
+ if (gpr_unref(&c->refs)) {
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(connection_destroy, c), 1);
}
- return destroy;
}
/*
* grpc_subchannel implementation
*/
-static void subchannel_ref_locked(grpc_subchannel *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- REF_LOG("SUBCHANNEL", c);
- ++c->refs;
-}
-
-static int subchannel_unref_locked(grpc_subchannel *c
- GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- UNREF_LOG("SUBCHANNEL", c);
- return --c->refs == 0;
-}
-
-void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- gpr_mu_lock(&c->mu);
- subchannel_ref_locked(c REF_PASS_ARGS);
- gpr_mu_unlock(&c->mu);
-}
-
-void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
- int destroy;
- gpr_mu_lock(&c->mu);
- destroy = subchannel_unref_locked(c REF_PASS_ARGS);
- gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(exec_ctx, c);
-}
-
-static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
- if (c->active != NULL) {
- connection_destroy(exec_ctx, c->active);
+static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ grpc_subchannel *c = arg;
+ grpc_connected_subchannel *con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
+ if (con != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
}
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
@@ -273,6 +216,17 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
gpr_free(c);
}
+void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ gpr_ref(&c->refs);
+}
+
+void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ if (gpr_unref(&c->refs)) {
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c), 1);
+ }
+}
+
void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
grpc_pollset *pollset) {
@@ -295,7 +249,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(args->master));
memset(c, 0, sizeof(*c));
- c->refs = 1;
+ gpr_ref_init(&c->refs, 1);
c->connector = connector;
grpc_connector_ref(c->connector);
c->num_filters = args->filter_count;
@@ -318,60 +272,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
-static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- int iomgr_success) {
- waiting_for_connect *w4c;
- gpr_mu_lock(&subchannel->mu);
- w4c = subchannel->waiting;
- subchannel->waiting = NULL;
- gpr_mu_unlock(&subchannel->mu);
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
- w4c->pollset);
- if (w4c->notify) {
- w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
- }
-
- GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
- gpr_free(w4c);
-
- w4c = next;
- }
-}
-
-void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- gpr_atm *target) {
- waiting_for_connect *w4c;
- int unref_count = 0;
- gpr_mu_lock(&subchannel->mu);
- w4c = subchannel->waiting;
- subchannel->waiting = NULL;
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- if (w4c->target == target) {
- grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
- w4c->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0);
-
- unref_count++;
- gpr_free(w4c);
- } else {
- w4c->next = subchannel->waiting;
- subchannel->waiting = w4c;
- }
-
- w4c = next;
- }
- gpr_mu_unlock(&subchannel->mu);
-
- while (unref_count-- > 0) {
- GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect");
- }
-}
-
static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
grpc_connect_in_args args;
@@ -381,6 +281,7 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
args.deadline = compute_connect_deadline(c);
args.channel_args = c->args;
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_CONNECTING, "state_change");
grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
&c->connected);
}
@@ -393,66 +294,6 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
continue_connect(exec_ctx, c);
}
-static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
- int call_creation_finished_ok;
- waiting_for_connect *w4c = arg;
- grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- call_creation_finished_ok = grpc_subchannel_create_call(
- exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
- GPR_ASSERT(call_creation_finished_ok == 1);
- w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
- GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
- gpr_free(w4c);
-}
-
-int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
- grpc_pollset *pollset, gpr_atm *target,
- grpc_closure *notify) {
- connection *con;
- grpc_subchannel_call *call;
- GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0);
- gpr_mu_lock(&c->mu);
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "call");
- gpr_mu_unlock(&c->mu);
-
- call = create_call(exec_ctx, con, pollset);
- if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) {
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set");
- }
- GPR_TIMER_END("grpc_subchannel_create_call", 0);
- return 1;
- } else {
- waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
- w4c->next = c->waiting;
- w4c->notify = notify;
- w4c->pollset = pollset;
- w4c->target = target;
- w4c->subchannel = c;
- /* released when clearing w4c */
- SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
- grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
- c->waiting = w4c;
- grpc_subchannel_add_interested_party(exec_ctx, c, pollset);
- if (!c->connecting) {
- c->connecting = 1;
- connectivity_state_changed_locked(exec_ctx, c, "create_call");
- /* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connecting");
- GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- gpr_mu_unlock(&c->mu);
-
- start_connect(exec_ctx, c);
- } else {
- gpr_mu_unlock(&c->mu);
- }
- GPR_TIMER_END("grpc_subchannel_create_call", 0);
- return 0;
- }
-}
-
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
@@ -472,9 +313,8 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
do_connect = 1;
c->connecting = 1;
/* released by connection */
- SUBCHANNEL_REF_LOCKED(c, "connecting");
+ GRPC_SUBCHANNEL_REF(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- connectivity_state_changed_locked(exec_ctx, c, "state_change");
}
gpr_mu_unlock(&c->mu);
@@ -483,31 +323,28 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
}
}
-int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
+void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
grpc_closure *subscribed_notify) {
- int success;
gpr_mu_lock(&c->mu);
- success = grpc_connectivity_state_change_unsubscribe(
+ grpc_connectivity_state_change_unsubscribe(
exec_ctx, &c->state_tracker, subscribed_notify);
gpr_mu_unlock(&c->mu);
- return success;
}
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
grpc_transport_op *op) {
- connection *con = NULL;
- grpc_subchannel *destroy;
+ grpc_connected_subchannel *con;
int cancel_alarm = 0;
gpr_mu_lock(&c->mu);
- if (c->active != NULL) {
- con = c->active;
- CONNECTION_REF_LOCKED(con, "transport-op");
+ con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
+ if (con != NULL) {
+ GRPC_CONNECTED_SUBCHANNEL_REF(con, "transport-op");
}
if (op->disconnect) {
c->disconnected = 1;
- connectivity_state_changed_locked(exec_ctx, c, "disconnect");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
if (c->have_alarm) {
cancel_alarm = 1;
}
@@ -515,17 +352,8 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&c->mu);
if (con != NULL) {
- grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_channel_element *top_elem =
- grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
-
- gpr_mu_lock(&c->mu);
- destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op");
- gpr_mu_unlock(&c->mu);
- if (destroy) {
- subchannel_destroy(exec_ctx, destroy);
- }
+ grpc_connected_subchannel_process_transport_op(exec_ctx, con, op);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "transport-op");
}
if (cancel_alarm) {
@@ -537,77 +365,62 @@ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
}
}
-static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p,
+void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_transport_op *op) {
+ grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_channel_element *top_elem =
+ grpc_channel_stack_element(channel_stack, 0);
+ top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
+}
+
+static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
int iomgr_success) {
state_watcher *sw = p;
- grpc_subchannel *c = sw->subchannel;
+ grpc_subchannel *c = sw->whom.subchannel;
gpr_mu *mu = &c->mu;
- int destroy;
- grpc_transport_op op;
- grpc_channel_element *elem;
- connection *destroy_connection = NULL;
gpr_mu_lock(mu);
- /* if we failed or there is a version number mismatch, just leave
- this closure */
- if (!iomgr_success || sw->subchannel->active_version != sw->version) {
- goto done;
- }
-
- switch (sw->connectivity_state) {
- case GRPC_CHANNEL_CONNECTING:
- case GRPC_CHANNEL_READY:
- case GRPC_CHANNEL_IDLE:
- /* all is still good: keep watching */
- memset(&op, 0, sizeof(op));
- op.connectivity_state = &sw->connectivity_state;
- op.on_connectivity_state_change = &sw->closure;
- elem = grpc_channel_stack_element(
- CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(exec_ctx, elem, &op);
- /* early out */
- gpr_mu_unlock(mu);
- return;
- case GRPC_CHANNEL_FATAL_FAILURE:
- case GRPC_CHANNEL_TRANSIENT_FAILURE:
- /* things have gone wrong, deactivate and enter idle */
- if (sw->subchannel->active->refs == 0) {
- destroy_connection = sw->subchannel->active;
- }
- sw->subchannel->active = NULL;
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
- c->disconnected
- ? GRPC_CHANNEL_FATAL_FAILURE
- : GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connection_failed");
- break;
+ /* if we failed just leave this closure */
+ if (iomgr_success) {
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, sw->connectivity_state, "reflect_child");
+ if (sw->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, GET_CONNECTED_SUBCHANNEL(c, no_barrier), &sw->connectivity_state, &sw->closure);
+ GRPC_SUBCHANNEL_REF(c, "state_watcher");
+ sw = NULL;
+ }
}
-done:
- connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed");
- destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
- gpr_free(sw);
gpr_mu_unlock(mu);
- if (destroy) {
- subchannel_destroy(exec_ctx, c);
- }
- if (destroy_connection != NULL) {
- connection_destroy(exec_ctx, destroy_connection);
- }
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "state_watcher");
+ gpr_free(sw);
+}
+
+static void connected_subchannel_state_op(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) {
+ grpc_transport_op op;
+ grpc_channel_element *elem;
+ memset(&op, 0, sizeof(op));
+ op.connectivity_state = state;
+ op.on_connectivity_state_change = closure;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
+ elem->filter->start_transport_op(exec_ctx, elem, &op);
+}
+
+void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_connectivity_state *state, grpc_closure *closure) {
+ GPR_ASSERT(state != NULL);
+ connected_subchannel_state_op(exec_ctx, con, state, closure);
+}
+
+void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_closure *closure) {
+ connected_subchannel_state_op(exec_ctx, con, NULL, closure);
}
static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
size_t channel_stack_size;
- connection *con;
+ grpc_connected_subchannel *con;
grpc_channel_stack *stk;
size_t num_filters;
const grpc_channel_filter **filters;
- waiting_for_connect *w4c;
- grpc_transport_op op;
- state_watcher *sw;
- connection *destroy_connection = NULL;
- grpc_channel_element *elem;
+ state_watcher *sw_subchannel;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@@ -619,10 +432,9 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
/* construct channel stack */
channel_stack_size = grpc_channel_stack_size(filters, num_filters);
- con = gpr_malloc(sizeof(connection) + channel_stack_size);
+ con = gpr_malloc(sizeof(grpc_connected_subchannel) + channel_stack_size);
stk = (grpc_channel_stack *)(con + 1);
- con->refs = 0;
- con->subchannel = c;
+ gpr_ref_init(&c->refs, 1);
grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
c->mdctx, stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
@@ -630,16 +442,16 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
- sw = gpr_malloc(sizeof(*sw));
- grpc_closure_init(&sw->closure, on_state_changed, sw);
- sw->subchannel = c;
- sw->connectivity_state = GRPC_CHANNEL_READY;
+ sw_subchannel = gpr_malloc(sizeof(*sw_subchannel));
+ sw_subchannel->whom.subchannel = c;
+ sw_subchannel->connectivity_state = GRPC_CHANNEL_READY;
+ grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel);
gpr_mu_lock(&c->mu);
if (c->disconnected) {
gpr_mu_unlock(&c->mu);
- gpr_free(sw);
+ gpr_free(sw_subchannel);
gpr_free((void *)filters);
grpc_channel_stack_destroy(exec_ctx, stk);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
@@ -648,45 +460,35 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
}
/* publish */
- if (c->active != NULL && c->active->refs == 0) {
- destroy_connection = c->active;
- }
- c->active = con;
- c->active_version++;
- sw->version = c->active_version;
+ GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
- /* watch for changes; subchannel ref for connecting is donated
+ /* setup subchannel watching connected subchannel for changes; subchannel ref for connecting is donated
to the state watcher */
+ GRPC_SUBCHANNEL_REF(c, "state_watcher");
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
+ grpc_connected_subchannel_notify_on_state_change(exec_ctx, con, &sw_subchannel->connectivity_state, &sw_subchannel->closure);
+
+#if 0
+ grpc_transport_op op;
+ grpc_channel_element *elem;
+
+ /* setup connected subchannel watching transport for changes */
memset(&op, 0, sizeof(op));
- op.connectivity_state = &sw->connectivity_state;
- op.on_connectivity_state_change = &sw->closure;
+ op.connectivity_state = &sw_connected_subchannel->connectivity_state;
+ op.on_connectivity_state_change = &sw_connected_subchannel->closure;
op.bind_pollset_set = c->pollset_set;
- SUBCHANNEL_REF_LOCKED(c, "state_watcher");
- GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
- GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
elem =
- grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
+ grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(exec_ctx, elem, &op);
+#endif
/* signal completion */
- connectivity_state_changed_locked(exec_ctx, c, "connected");
- w4c = c->waiting;
- c->waiting = NULL;
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected");
gpr_mu_unlock(&c->mu);
-
- while (w4c != NULL) {
- waiting_for_connect *next = w4c->next;
- grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1);
- w4c = next;
- }
-
gpr_free((void *)filters);
-
- if (destroy_connection != NULL) {
- connection_destroy(exec_ctx, destroy_connection);
- }
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
}
/* Generate a random number between 0 and 1. */
@@ -725,13 +527,11 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
if (c->disconnected) {
iomgr_success = 0;
}
- connectivity_state_changed_locked(exec_ctx, c, "alarm");
gpr_mu_unlock(&c->mu);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(exec_ctx, c);
} else {
- cancel_waiting_calls(exec_ctx, c, iomgr_success);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
}
@@ -742,12 +542,14 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL) {
publish_transport(exec_ctx, c);
+ } else if (c->disconnected) {
+ /* do nothing */
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- connectivity_state_changed_locked(exec_ctx, c, "connect_failed");
+ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed");
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
gpr_mu_unlock(&c->mu);
}
@@ -764,29 +566,6 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
: min_deadline;
}
-static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
- if (c->disconnected) {
- return GRPC_CHANNEL_FATAL_FAILURE;
- }
- if (c->connecting) {
- if (c->have_alarm) {
- return GRPC_CHANNEL_TRANSIENT_FAILURE;
- }
- return GRPC_CHANNEL_CONNECTING;
- }
- if (c->active) {
- return GRPC_CHANNEL_READY;
- }
- return GRPC_CHANNEL_IDLE;
-}
-
-static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- const char *reason) {
- grpc_connectivity_state current = compute_connectivity_locked(c);
- grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason);
-}
-
/*
* grpc_subchannel_call implementation
*/
@@ -794,17 +573,9 @@ static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
int success) {
grpc_subchannel_call *c = call;
- gpr_mu *mu = &c->connection->subchannel->mu;
- grpc_subchannel *destroy;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
- gpr_mu_lock(mu);
- destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
- gpr_mu_unlock(mu);
gpr_free(c);
- if (destroy != NULL) {
- subchannel_destroy(exec_ctx, destroy);
- }
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
}
@@ -842,8 +613,12 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
}
-static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
- connection *con,
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *c) {
+ return GET_CONNECTED_SUBCHANNEL(c, acq);
+}
+
+grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *con,
grpc_pollset *pollset) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 1fefa1888a..d5305f1a50 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -41,6 +41,7 @@
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
+typedef struct grpc_connected_subchannel grpc_connected_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
typedef struct grpc_subchannel_args grpc_subchannel_args;
@@ -49,6 +50,10 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) \
grpc_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) \
+ grpc_connected_subchannel_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) \
+ grpc_connected_subchannel_unref((cl), (p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \
@@ -58,6 +63,8 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_UNREF(cl, p, r) grpc_subchannel_unref((cl), (p))
+#define GRPC_CONNECTED_SUBCHANNEL_REF(p, r) grpc_connected_subchannel_ref((p))
+#define GRPC_CONNECTED_SUBCHANNEL_UNREF(cl, p, r) grpc_connected_subchannel_unref((cl), (p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(cl, p, r) \
grpc_subchannel_call_unref((cl), (p))
@@ -69,33 +76,29 @@ void grpc_subchannel_ref(grpc_subchannel *channel
void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_connected_subchannel_ref(grpc_connected_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *channel
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a subchannel call (possibly asynchronously).
- *
- * If the returned status is 1, the call will return immediately and \a target
- * will point to a connected \a subchannel_call instance. Note that \a notify
- * will \em not be invoked in this case.
- * Otherwise, if the returned status is 0, the subchannel call will be created
- * asynchronously, invoking the \a notify callback upon completion. */
-int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_pollset *pollset, gpr_atm *target,
- grpc_closure *notify);
-
-/** cancel \a call in the waiting state. */
-void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- gpr_atm *target);
+/** construct a subchannel call */
+grpc_subchannel_call *grpc_connected_subchannel_create_call(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *connected_subchannel,
+ grpc_pollset *pollset);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel *subchannel,
grpc_transport_op *op);
+void grpc_connected_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *subchannel,
+ grpc_transport_op *op);
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
@@ -107,13 +110,19 @@ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_closure *notify);
+void grpc_connected_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *channel,
+ grpc_connectivity_state *state,
+ grpc_closure *notify);
/** Remove \a subscribed_notify from the list of closures to be called on a
- * state change if present, returning 1. Otherwise, nothing is done and return
- * 0. */
-int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
+ * state change if present. */
+void grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
grpc_closure *subscribed_notify);
+void grpc_connected_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
+ grpc_connected_subchannel *channel,
+ grpc_closure *subscribed_notify);
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
@@ -124,6 +133,10 @@ void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
grpc_pollset *pollset);
+/** retrieve the grpc_connected_subchannel - or NULL if called before
+ the subchannel becomes connected */
+grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(grpc_subchannel *subchannel);
+
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call,
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f296ce8251..f1059801d4 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -96,7 +96,7 @@ typedef struct grpc_transport_stream_op {
typedef struct grpc_transport_op {
/** called when processing of this op is done */
grpc_closure *on_consumed;
- /** connectivity monitoring */
+ /** connectivity monitoring - set connectivity_state to NULL to unsubscribe */
grpc_closure *on_connectivity_state_change;
grpc_connectivity_state *connectivity_state;
/** should the transport be disconnected */