aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-25 15:22:26 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-11-25 15:22:26 -0800
commit86c99580a0891697f3c5227ae2fd2911734098fc (patch)
treebdb4a2bf6b12e45d504ac622968b979ab4f3dec4 /src/core
parentee7531cd7c0ef681d104ab0ec83a8fb151ebb62b (diff)
Load balancing interest management fixes
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c36
-rw-r--r--src/core/client_config/lb_policies/pick_first.c40
-rw-r--r--src/core/client_config/lb_policies/round_robin.c49
-rw-r--r--src/core/client_config/lb_policy.c1
-rw-r--r--src/core/client_config/lb_policy.h1
-rw-r--r--src/core/client_config/subchannel.c38
-rw-r--r--src/core/client_config/subchannel.h10
-rw-r--r--src/core/iomgr/pollset_set.h22
-rw-r--r--src/core/iomgr/pollset_set_posix.c44
-rw-r--r--src/core/iomgr/pollset_set_posix.h4
-rw-r--r--src/core/iomgr/pollset_set_windows.c8
-rw-r--r--src/core/transport/transport.h2
12 files changed, 157 insertions, 98 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index ae1f3cf4c2..5fec87c67c 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -78,6 +78,8 @@ typedef struct client_channel_channel_data {
int exit_idle_when_lb_policy_arrives;
/** owning stack */
grpc_channel_stack *owning_stack;
+ /** interested parties */
+ grpc_pollset_set interested_parties;
} channel_data;
/** We create one watcher for each new lb_policy that is returned from a
@@ -177,6 +179,10 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
chand->incoming_configuration = NULL;
+ if (lb_policy != NULL) {
+ grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties);
+ }
+
gpr_mu_lock(&chand->mu_config);
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
@@ -220,6 +226,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
if (old_lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties);
grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
@@ -263,6 +270,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties);
grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
@@ -391,6 +399,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
+ grpc_pollset_set_init(&chand->interested_parties);
}
/* Destructor for channel_data */
@@ -403,9 +412,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
if (chand->lb_policy != NULL) {
+ grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties);
GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
+ grpc_pollset_set_destroy(&chand->interested_parties);
gpr_mu_destroy(&chand->mu_config);
}
@@ -465,12 +476,35 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
return out;
}
+typedef struct {
+ channel_data *chand;
+ grpc_pollset *pollset;
+ grpc_closure *on_complete;
+ grpc_closure my_closure;
+} external_connectivity_watcher;
+
+static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
+ external_connectivity_watcher *w = arg;
+ grpc_closure *follow_up = w->on_complete;
+ grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success);
+}
+
void grpc_client_channel_watch_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
+ external_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ w->chand = chand;
+ w->pollset = pollset;
+ w->on_complete = on_complete;
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset);
+ grpc_closure_init(&w->my_closure, on_external_watch_complete, w);
+ GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher");
gpr_mu_lock(&chand->mu_config);
grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &chand->state_tracker, state, on_complete);
+ exec_ctx, &chand->state_tracker, state, &w->my_closure);
gpr_mu_unlock(&chand->mu_config);
}
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 4ecfc11cdd..c0f1d3fd94 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,24 +76,6 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
-static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- pick_first_lb_policy *p) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
- }
-}
-
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
@@ -114,7 +96,6 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
- del_interested_parties_locked(exec_ctx, p);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
@@ -124,6 +105,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
pp = next;
@@ -140,8 +122,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@@ -161,6 +142,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
}
@@ -187,8 +169,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_subchannel_add_interested_party(
- exec_ctx, p->subchannels[p->checking_subchannel], pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -275,8 +256,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel,
- pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
@@ -288,15 +268,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
"connecting_transient_failure");
- del_interested_parties_locked(exec_ctx, p);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
} else {
goto loop;
@@ -309,13 +288,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_changed");
grpc_subchannel_notify_on_state_change(
exec_ctx, p->subchannels[p->checking_subchannel],
+ &p->base.interested_parties,
&p->checking_connectivity, &p->connectivity_changed);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(exec_ctx, p);
- GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
- p->subchannels[p->num_subchannels - 1]);
p->num_subchannels--;
+ GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
+ p->subchannels[p->num_subchannels]);
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
"pick_first");
if (p->num_subchannels == 0) {
@@ -336,7 +315,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(exec_ctx, p);
goto loop;
}
}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index ca0d6abd07..10688b3fa5 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -200,24 +200,11 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
gpr_free(node);
}
-static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
- round_robin_lb_policy *p,
- const size_t subchannel_idx) {
- pending_pick *pp;
- for (pp = p->pending_picks; pp; pp = pp->next) {
- grpc_subchannel_del_interested_party(
- exec_ctx, p->subchannels[subchannel_idx], pp->pollset);
- }
-}
-
void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
size_t i;
ready_list *elem;
for (i = 0; i < p->num_subchannels; i++) {
- del_interested_parties_locked(exec_ctx, p, i);
- }
- for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin");
}
gpr_free(p->connectivity_changed_cbs);
@@ -243,15 +230,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
- size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
- for (i = 0; i < p->num_subchannels; i++) {
- del_interested_parties_locked(exec_ctx, p, i);
- }
-
p->shutdown = 1;
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
@@ -268,17 +250,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
- size_t i;
gpr_mu_lock(&p->mu);
pp = p->pending_picks;
p->pending_picks = NULL;
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- for (i = 0; i < p->num_subchannels; i++) {
- grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
- pp->pollset);
- }
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
gpr_free(pp);
@@ -298,6 +276,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
for (i = 0; i < p->num_subchannels; i++) {
p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE;
grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i],
+ &p->base.interested_parties,
&p->subchannel_connectivity[i],
&p->connectivity_changed_cbs[i]);
GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity");
@@ -316,7 +295,6 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_connected_subchannel **target, grpc_closure *on_complete) {
- size_t i;
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
ready_list *selected;
@@ -336,10 +314,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- for (i = 0; i < p->num_subchannels; i++) {
- grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
- pollset);
- }
+ grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -398,13 +373,15 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel,
- pp->pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
+ exec_ctx,
+ p->subchannels[this_idx],
+ &p->base.interested_parties,
+ this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
break;
case GRPC_CHANNEL_CONNECTING:
@@ -412,14 +389,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
*this_connectivity, "connecting_changed");
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
+ exec_ctx, p->subchannels[this_idx],
+ &p->base.interested_parties,
+ this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
- del_interested_parties_locked(exec_ctx, p, this_idx);
/* renew state notification */
grpc_subchannel_notify_on_state_change(
- exec_ctx, p->subchannels[this_idx], this_connectivity,
+ exec_ctx, p->subchannels[this_idx],
+ &p->base.interested_parties,
+ this_connectivity,
&p->connectivity_changed_cbs[this_idx]);
/* remove from ready list if still present */
@@ -433,7 +413,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"connecting_transient_failure");
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(exec_ctx, p, this_idx);
if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
remove_disconnected_sc_locked(
p, p->subchannel_index_to_readylist_node[this_idx]);
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 5605f788a5..8d9287c36a 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -37,6 +37,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable) {
policy->vtable = vtable;
gpr_ref_init(&policy->refs, 1);
+ grpc_pollset_set_init(&policy->interested_parties);
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 2889b8e55d..985c96630f 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -48,6 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
struct grpc_lb_policy {
const grpc_lb_policy_vtable *vtable;
gpr_refcount refs;
+ grpc_pollset_set interested_parties;
};
struct grpc_lb_policy_vtable {
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 16f9346a35..3872cacfa0 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -224,18 +224,6 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset);
-}
-
-void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset);
-}
-
static gpr_uint32 random_seed() {
return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
}
@@ -298,14 +286,38 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
+typedef struct {
+ grpc_subchannel *subchannel;
+ grpc_pollset_set *pollset_set;
+ grpc_closure *notify;
+ grpc_closure closure;
+} external_state_watcher;
+
+static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ external_state_watcher *w = arg;
+ grpc_closure *follow_up = w->notify;
+ grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher");
+ gpr_free(w);
+ follow_up->cb(exec_ctx, follow_up->cb_arg, success);
+}
+
void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *c,
+ grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *notify) {
int do_connect = 0;
+ external_state_watcher *w = gpr_malloc(sizeof(*w));
+ w->subchannel = c;
+ w->pollset_set = interested_parties;
+ w->notify = notify;
+ grpc_closure_init(&w->closure, on_external_state_watcher_done, w);
+ grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties);
+ GRPC_SUBCHANNEL_REF(c, "external_state_watcher");
gpr_mu_lock(&c->mu);
if (grpc_connectivity_state_notify_on_state_change(
- exec_ctx, &c->state_tracker, state, notify)) {
+ exec_ctx, &c->state_tracker, state, &w->closure)) {
do_connect = 1;
c->connecting = 1;
/* released by connection */
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index b50d1e8ecc..20d74e9f10 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -109,6 +109,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
Updates *state with the new state of the channel */
void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
grpc_subchannel *channel,
+ grpc_pollset_set *interested_parties,
grpc_connectivity_state *state,
grpc_closure *notify);
void grpc_connected_subchannel_notify_on_state_change(
@@ -124,15 +125,6 @@ void grpc_connected_subchannel_state_change_unsubscribe(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel,
grpc_closure *subscribed_notify);
-/** express interest in \a channel's activities through \a pollset. */
-void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_pollset *pollset);
-/** stop following \a channel's activity through \a pollset. */
-void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *channel,
- grpc_pollset *pollset);
-
/** retrieve the grpc_connected_subchannel - or NULL if called before
the subchannel becomes connected */
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h
index 0fdcba01a4..e93a3dbb56 100644
--- a/src/core/iomgr/pollset_set.h
+++ b/src/core/iomgr/pollset_set.h
@@ -49,13 +49,19 @@
#include "src/core/iomgr/pollset_set_windows.h"
#endif
-void grpc_pollset_set_init(grpc_pollset_set* pollset_set);
-void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set);
-void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* pollset_set,
- grpc_pollset* pollset);
-void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
- grpc_pollset_set* pollset_set,
- grpc_pollset* pollset);
+void grpc_pollset_set_init(grpc_pollset_set *pollset_set);
+void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set);
+void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset);
+void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset);
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item);
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */
diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c
index c86ed3d5da..f29ef7cdcf 100644
--- a/src/core/iomgr/pollset_set_posix.c
+++ b/src/core/iomgr/pollset_set_posix.c
@@ -55,6 +55,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) {
GRPC_FD_UNREF(pollset_set->fds[i], "pollset");
}
gpr_free(pollset_set->pollsets);
+ gpr_free(pollset_set->pollset_sets);
gpr_free(pollset_set->fds);
}
@@ -99,6 +100,43 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&pollset_set->mu);
}
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i, j;
+ gpr_mu_lock(&bag->mu);
+ if (bag->pollset_set_count == bag->pollset_set_capacity) {
+ bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
+ bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ }
+ bag->pollset_sets[bag->pollset_set_count++] = item;
+ for (i = 0, j = 0; i < bag->fd_count; i++) {
+ if (grpc_fd_is_orphaned(bag->fds[i])) {
+ GRPC_FD_UNREF(bag->fds[i], "pollset");
+ } else {
+ grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
+ bag->fds[j++] = bag->fds[i];
+ }
+ }
+ bag->fd_count = j;
+ gpr_mu_unlock(&bag->mu);
+}
+
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i;
+ gpr_mu_lock(&bag->mu);
+ for (i = 0; i < bag->pollset_set_count; i++) {
+ if (bag->pollset_sets[i] == item) {
+ bag->pollset_set_count--;
+ GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&bag->mu);
+}
+
void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
grpc_pollset_set *pollset_set, grpc_fd *fd) {
size_t i;
@@ -113,6 +151,9 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
for (i = 0; i < pollset_set->pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
}
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
gpr_mu_unlock(&pollset_set->mu);
}
@@ -129,6 +170,9 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
break;
}
}
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
gpr_mu_unlock(&pollset_set->mu);
}
diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h
index 05234fb642..4820a61e4b 100644
--- a/src/core/iomgr/pollset_set_posix.h
+++ b/src/core/iomgr/pollset_set_posix.h
@@ -44,6 +44,10 @@ typedef struct grpc_pollset_set {
size_t pollset_capacity;
grpc_pollset **pollsets;
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
size_t fd_count;
size_t fd_capacity;
grpc_fd **fds;
diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c
index 53d5d3dcd4..04d88839cb 100644
--- a/src/core/iomgr/pollset_set_windows.c
+++ b/src/core/iomgr/pollset_set_windows.c
@@ -49,4 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx,
grpc_pollset_set* pollset_set,
grpc_pollset* pollset) {}
+void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {}
+
+void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {}
+
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f94f0ae76e..08f34ff0aa 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport;
for a stream. */
typedef struct grpc_stream grpc_stream;
-/*#define GRPC_STREAM_REFCOUNT_DEBUG*/
+#define GRPC_STREAM_REFCOUNT_DEBUG
typedef struct grpc_stream_refcount {
gpr_refcount refs;