diff options
author | 2015-11-25 15:22:26 -0800 | |
---|---|---|
committer | 2015-11-25 15:22:26 -0800 | |
commit | 86c99580a0891697f3c5227ae2fd2911734098fc (patch) | |
tree | bdb4a2bf6b12e45d504ac622968b979ab4f3dec4 /src/core | |
parent | ee7531cd7c0ef681d104ab0ec83a8fb151ebb62b (diff) |
Load balancing interest management fixes
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 36 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 40 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/round_robin.c | 49 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 1 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 1 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 38 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 10 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set.h | 22 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.c | 44 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_windows.c | 8 | ||||
-rw-r--r-- | src/core/transport/transport.h | 2 |
12 files changed, 157 insertions, 98 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index ae1f3cf4c2..5fec87c67c 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -78,6 +78,8 @@ typedef struct client_channel_channel_data { int exit_idle_when_lb_policy_arrives; /** owning stack */ grpc_channel_stack *owning_stack; + /** interested parties */ + grpc_pollset_set interested_parties; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -177,6 +179,10 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->incoming_configuration = NULL; + if (lb_policy != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, &lb_policy->interested_parties, &chand->interested_parties); + } + gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; @@ -220,6 +226,7 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (old_lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &old_lb_policy->interested_parties, &chand->interested_parties); grpc_lb_policy_shutdown(exec_ctx, old_lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); } @@ -263,6 +270,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); chand->lb_policy = NULL; @@ -391,6 +399,7 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); + grpc_pollset_set_init(&chand->interested_parties); } /* Destructor for channel_data */ @@ -403,9 +412,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); } if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, &chand->lb_policy->interested_parties, &chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + grpc_pollset_set_destroy(&chand->interested_parties); gpr_mu_destroy(&chand->mu_config); } @@ -465,12 +476,35 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( return out; } +typedef struct { + channel_data *chand; + grpc_pollset *pollset; + grpc_closure *on_complete; + grpc_closure my_closure; +} external_connectivity_watcher; + +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { + external_connectivity_watcher *w = arg; + grpc_closure *follow_up = w->on_complete; + grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties, w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); +} + void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; + external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + w->chand = chand; + w->pollset = pollset; + w->on_complete = on_complete; + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, pollset); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); gpr_mu_lock(&chand->mu_config); grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, on_complete); + exec_ctx, &chand->state_tracker, state, &w->my_closure); gpr_mu_unlock(&chand->mu_config); } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 4ecfc11cdd..c0f1d3fd94 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -76,24 +76,6 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - -static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx, - pick_first_lb_policy *p) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); - } -} - void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; @@ -114,7 +96,6 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - del_interested_parties_locked(exec_ctx, p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -124,6 +105,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); pp = next; @@ -140,8 +122,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -161,6 +142,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } @@ -187,8 +169,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_subchannel_add_interested_party( - exec_ctx, p->subchannels[p->checking_subchannel], pollset); + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -275,8 +256,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(exec_ctx, selected_subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } @@ -288,15 +268,14 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connecting_transient_failure"); - del_interested_parties_locked(exec_ctx, p); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { goto loop; @@ -309,13 +288,13 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_changed"); grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], + &p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p); - GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], - p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; + GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], + p->subchannels[p->num_subchannels]); GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels], "pick_first"); if (p->num_subchannels == 0) { @@ -336,7 +315,6 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(exec_ctx, p); goto loop; } } diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index ca0d6abd07..10688b3fa5 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -200,24 +200,11 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, gpr_free(node); } -static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx, - round_robin_lb_policy *p, - const size_t subchannel_idx) { - pending_pick *pp; - for (pp = p->pending_picks; pp; pp = pp->next) { - grpc_subchannel_del_interested_party( - exec_ctx, p->subchannels[subchannel_idx], pp->pollset); - } -} - void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; size_t i; ready_list *elem; for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } - for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin"); } gpr_free(p->connectivity_changed_cbs); @@ -243,15 +230,10 @@ void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - for (i = 0; i < p->num_subchannels; i++) { - del_interested_parties_locked(exec_ctx, p, i); - } - p->shutdown = 1; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -268,17 +250,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - size_t i; gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pp->pollset); - } + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0); gpr_free(pp); @@ -298,6 +276,7 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { for (i = 0; i < p->num_subchannels; i++) { p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE; grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i], + &p->base.interested_parties, &p->subchannel_connectivity[i], &p->connectivity_changed_cbs[i]); GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity"); @@ -316,7 +295,6 @@ void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_connected_subchannel **target, grpc_closure *on_complete) { - size_t i; round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; @@ -336,10 +314,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset, if (!p->started_picking) { start_picking(exec_ctx, p); } - for (i = 0; i < p->num_subchannels; i++) { - grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i], - pollset); - } + grpc_pollset_set_add_pollset(exec_ctx, &p->base.interested_parties, pollset); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -398,13 +373,15 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel, - pp->pollset); + grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties, pp->pollset); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, + p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_CONNECTING: @@ -412,14 +389,17 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, *this_connectivity, "connecting_changed"); grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); /* renew state notification */ grpc_subchannel_notify_on_state_change( - exec_ctx, p->subchannels[this_idx], this_connectivity, + exec_ctx, p->subchannels[this_idx], + &p->base.interested_parties, + this_connectivity, &p->connectivity_changed_cbs[this_idx]); /* remove from ready list if still present */ @@ -433,7 +413,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "connecting_transient_failure"); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(exec_ctx, p, this_idx); if (p->subchannel_index_to_readylist_node[this_idx] != NULL) { remove_disconnected_sc_locked( p, p->subchannel_index_to_readylist_node[this_idx]); diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 5605f788a5..8d9287c36a 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -37,6 +37,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable) { policy->vtable = vtable; gpr_ref_init(&policy->refs, 1); + grpc_pollset_set_init(&policy->interested_parties); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 2889b8e55d..985c96630f 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -48,6 +48,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_refcount refs; + grpc_pollset_set interested_parties; }; struct grpc_lb_policy_vtable { diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 16f9346a35..3872cacfa0 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -224,18 +224,6 @@ void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx, } } -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(exec_ctx, &c->pollset_set, pollset); -} - -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(exec_ctx, &c->pollset_set, pollset); -} - static gpr_uint32 random_seed() { return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); } @@ -298,14 +286,38 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } +typedef struct { + grpc_subchannel *subchannel; + grpc_pollset_set *pollset_set; + grpc_closure *notify; + grpc_closure closure; +} external_state_watcher; + +static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, int success) { + external_state_watcher *w = arg; + grpc_closure *follow_up = w->notify; + grpc_pollset_set_del_pollset_set(exec_ctx, &w->subchannel->pollset_set, w->pollset_set); + GRPC_SUBCHANNEL_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, success); +} + void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify) { int do_connect = 0; + external_state_watcher *w = gpr_malloc(sizeof(*w)); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + grpc_pollset_set_add_pollset_set(exec_ctx, &c->pollset_set, interested_parties); + GRPC_SUBCHANNEL_REF(c, "external_state_watcher"); gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change( - exec_ctx, &c->state_tracker, state, notify)) { + exec_ctx, &c->state_tracker, state, &w->closure)) { do_connect = 1; c->connecting = 1; /* released by connection */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b50d1e8ecc..20d74e9f10 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -109,6 +109,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( Updates *state with the new state of the channel */ void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel, + grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify); void grpc_connected_subchannel_notify_on_state_change( @@ -124,15 +125,6 @@ void grpc_connected_subchannel_state_change_unsubscribe( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_closure *subscribed_notify); -/** express interest in \a channel's activities through \a pollset. */ -void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); -/** stop following \a channel's activity through \a pollset. */ -void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_subchannel *channel, - grpc_pollset *pollset); - /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 0fdcba01a4..e93a3dbb56 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -49,13 +49,19 @@ #include "src/core/iomgr/pollset_set_windows.h" #endif -void grpc_pollset_set_init(grpc_pollset_set* pollset_set); -void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set); -void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); -void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, - grpc_pollset_set* pollset_set, - grpc_pollset* pollset); +void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); +void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *pollset_set, + grpc_pollset *pollset); +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index c86ed3d5da..f29ef7cdcf 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -55,6 +55,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); } gpr_free(pollset_set->pollsets); + gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); } @@ -99,6 +100,43 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&pollset_set->mu); } +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i, j; + gpr_mu_lock(&bag->mu); + if (bag->pollset_set_count == bag->pollset_set_capacity) { + bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); + bag->pollset_sets = gpr_realloc(bag->pollset_sets, bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + } + bag->pollset_sets[bag->pollset_set_count++] = item; + for (i = 0, j = 0; i < bag->fd_count; i++) { + if (grpc_fd_is_orphaned(bag->fds[i])) { + GRPC_FD_UNREF(bag->fds[i], "pollset"); + } else { + grpc_pollset_set_add_fd(exec_ctx, item, bag->fds[i]); + bag->fds[j++] = bag->fds[i]; + } + } + bag->fd_count = j; + gpr_mu_unlock(&bag->mu); +} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) { + size_t i; + gpr_mu_lock(&bag->mu); + for (i = 0; i < bag->pollset_set_count; i++) { + if (bag->pollset_sets[i] == item) { + bag->pollset_set_count--; + GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], bag->pollset_sets[bag->pollset_set_count]); + break; + } + } + gpr_mu_unlock(&bag->mu); +} + void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd) { size_t i; @@ -113,6 +151,9 @@ void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, for (i = 0; i < pollset_set->pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } @@ -129,6 +170,9 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, break; } } + for (i = 0; i < pollset_set->pollset_set_count; i++) { + grpc_pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); + } gpr_mu_unlock(&pollset_set->mu); } diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 05234fb642..4820a61e4b 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -44,6 +44,10 @@ typedef struct grpc_pollset_set { size_t pollset_capacity; grpc_pollset **pollsets; + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + size_t fd_count; size_t fd_capacity; grpc_fd **fds; diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 53d5d3dcd4..04d88839cb 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -49,4 +49,12 @@ void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_pollset_set *bag, + grpc_pollset_set *item) {} + #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index f94f0ae76e..08f34ff0aa 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -50,7 +50,7 @@ typedef struct grpc_transport grpc_transport; for a stream. */ typedef struct grpc_stream grpc_stream; -/*#define GRPC_STREAM_REFCOUNT_DEBUG*/ +#define GRPC_STREAM_REFCOUNT_DEBUG typedef struct grpc_stream_refcount { gpr_refcount refs; |