diff options
author | David Garcia Quintas <dgq@google.com> | 2016-04-27 18:40:50 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-04-27 18:40:50 -0700 |
commit | d312a0b866b0223bf9d5a2c88530a5cd7005c990 (patch) | |
tree | f13b5748c9d83c6177b8ae6f22422af29ef9d803 /src/core | |
parent | 879b3b9efa634ab683fccc96261af6e7838e6b31 (diff) |
Smarter pollset/pollset_set propagation
For some definition of "smart"... client_channel simply passes along
pollset/pollset_set, removing the need to instantiate a pollset_set in
the subchannel_call_holder: it's now up to the LB policies to handle the
pollset/pollset_set.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 18 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.c | 6 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.h | 5 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel_call_holder.c | 3 | ||||
-rw-r--r-- | src/core/ext/lb_policy/common.c | 62 | ||||
-rw-r--r-- | src/core/ext/lb_policy/common.h | 48 | ||||
-rw-r--r-- | src/core/ext/lb_policy/pick_first/pick_first.c | 34 | ||||
-rw-r--r-- | src/core/ext/lb_policy/round_robin/round_robin.c | 29 |
8 files changed, 164 insertions, 41 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 87abfea5ff..68a203820b 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -368,9 +368,11 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, int r; GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel"); gpr_mu_unlock(&chand->mu_config); - r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset_set, - initial_metadata, initial_metadata_flags, - connected_subchannel, on_ready); + GPR_ASSERT((calld->pollset != NULL) + (calld->pollset_set != NULL) == 1); + r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset, + calld->pollset_set, initial_metadata, + initial_metadata_flags, connected_subchannel, + on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel"); return r; } @@ -453,14 +455,8 @@ static void cc_set_pollset_or_pollset_set( GPR_ASSERT(pollset != NULL || pollset_set_alternative != NULL); call_data *calld = elem->call_data; - if (pollset != NULL) { - calld->pollset = pollset; - grpc_pollset_set_add_pollset(exec_ctx, calld->pollset_set, pollset); - } else if (pollset_set_alternative != NULL) { - calld->pollset = NULL; - grpc_pollset_set_add_pollset_set(exec_ctx, calld->pollset_set, - pollset_set_alternative); - } + calld->pollset = pollset; + calld->pollset_set = pollset_set_alternative; } const grpc_channel_filter grpc_client_channel_filter = { diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index 1e4e0077e6..96342c7c7d 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -99,12 +99,14 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, } int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset_set *pollset_set, + grpc_pollset *pollset, + grpc_pollset_set *pollset_set_alternative, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, grpc_closure *on_complete) { - return policy->vtable->pick(exec_ctx, policy, pollset_set, initial_metadata, + return policy->vtable->pick(exec_ctx, policy, pollset, + pollset_set_alternative, initial_metadata, initial_metadata_flags, target, on_complete); } diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index d5c578836b..e9bb1850e7 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -59,7 +59,7 @@ struct grpc_lb_policy_vtable { /** implement grpc_lb_policy_pick */ int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset_set *pollset_set, + grpc_pollset *pollset, grpc_pollset_set *pollset_set_alternative, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, grpc_closure *on_complete); @@ -125,7 +125,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, \a target. Picking can be asynchronous. Any IO should be done under \a pollset. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_pollset_set *pollset_set, + grpc_pollset *pollset, + grpc_pollset_set *pollset_set_alternative, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index c6f4b8f373..e07dd1a621 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -68,7 +68,7 @@ void grpc_subchannel_call_holder_init( holder->waiting_ops_capacity = 0; holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; holder->owning_call = owning_call; - holder->pollset_set = grpc_pollset_set_create(); + holder->pollset_set = NULL; } void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, @@ -82,7 +82,6 @@ void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, gpr_mu_destroy(&holder->mu); GPR_ASSERT(holder->waiting_ops_count == 0); gpr_free(holder->waiting_ops); - grpc_pollset_set_destroy(holder->pollset_set); } void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/lb_policy/common.c b/src/core/ext/lb_policy/common.c new file mode 100644 index 0000000000..bfb6aace87 --- /dev/null +++ b/src/core/ext/lb_policy/common.c @@ -0,0 +1,62 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/log.h> + +#include "src/core/ext/lb_policy/common.h" + +void add_pollset_or_pollset_set_alternative( + grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, + grpc_pollset *pollset, grpc_pollset_set *pollset_set_alternative) { + if (pollset != NULL) { + GPR_ASSERT(pollset_set_alternative == NULL); + grpc_pollset_set_add_pollset(exec_ctx, interested_parties, pollset); + } else { + GPR_ASSERT(pollset_set_alternative != NULL); + grpc_pollset_set_add_pollset_set(exec_ctx, interested_parties, + pollset_set_alternative); + } +} + +void del_pollset_or_pollset_set_alternative( + grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, + grpc_pollset *pollset, grpc_pollset_set *pollset_set_alternative) { + if (pollset != NULL) { + GPR_ASSERT(pollset_set_alternative == NULL); + grpc_pollset_set_del_pollset(exec_ctx, interested_parties, pollset); + } else { + GPR_ASSERT(pollset_set_alternative != NULL); + grpc_pollset_set_del_pollset_set(exec_ctx, interested_parties, + pollset_set_alternative); + } +} diff --git a/src/core/ext/lb_policy/common.h b/src/core/ext/lb_policy/common.h new file mode 100644 index 0000000000..1ace967328 --- /dev/null +++ b/src/core/ext/lb_policy/common.h @@ -0,0 +1,48 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_LB_POLICY_COMMON_H +#define GRPC_CORE_EXT_LB_POLICY_COMMON_H + +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" + +void add_pollset_or_pollset_set_alternative( + grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, + grpc_pollset *pollset, grpc_pollset_set *pollset_set_alternative); + +void del_pollset_or_pollset_set_alternative( + grpc_exec_ctx *exec_ctx, grpc_pollset_set *interested_parties, + grpc_pollset *pollset, grpc_pollset_set *pollset_set_alternative); + +#endif /* GRPC_CORE_EXT_LB_POLICY_COMMON_H */ diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 0ffa003946..f6613298be 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -35,11 +35,13 @@ #include <grpc/support/alloc.h> #include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/lb_policy/common.h" #include "src/core/lib/transport/connectivity_state.h" typedef struct pending_pick { struct pending_pick *next; - grpc_pollset_set *pollset_set; + grpc_pollset *pollset; + grpc_pollset_set *pollset_set_alternative; uint32_t initial_metadata_flags; grpc_connected_subchannel **target; grpc_closure *on_complete; @@ -118,8 +120,9 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_pollset_set_del_pollset_set(exec_ctx, p->base.interested_parties, - pp->pollset_set); + del_pollset_or_pollset_set_alternative(exec_ctx, p->base.interested_parties, + pp->pollset, + pp->pollset_set_alternative); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); pp = next; @@ -136,8 +139,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset_set(exec_ctx, p->base.interested_parties, - pp->pollset_set); + del_pollset_or_pollset_set_alternative( + exec_ctx, p->base.interested_parties, pp->pollset, + pp->pollset_set_alternative); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); @@ -162,8 +166,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_pollset_set_del_pollset_set(exec_ctx, p->base.interested_parties, - pp->pollset_set); + del_pollset_or_pollset_set_alternative( + exec_ctx, p->base.interested_parties, pp->pollset, + pp->pollset_set_alternative); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); } else { @@ -196,7 +201,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_pollset_set *pollset_set, + grpc_pollset *pollset, + grpc_pollset_set *pollset_set_alternative, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, @@ -222,11 +228,12 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset_set(exec_ctx, p->base.interested_parties, - pollset_set); + add_pollset_or_pollset_set_alternative(exec_ctx, p->base.interested_parties, + pollset, pollset_set_alternative); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollset_set = pollset_set; + pp->pollset = pollset; + pp->pollset_set_alternative = pollset_set_alternative; pp->target = target; pp->initial_metadata_flags = initial_metadata_flags; pp->on_complete = on_complete; @@ -306,8 +313,9 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; - grpc_pollset_set_del_pollset_set(exec_ctx, p->base.interested_parties, - pp->pollset_set); + del_pollset_or_pollset_set_alternative( + exec_ctx, p->base.interested_parties, pp->pollset, + pp->pollset_set_alternative); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 78ac61923f..f4e3d60a83 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -36,6 +36,7 @@ #include <grpc/support/alloc.h> #include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/lb_policy/common.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/connectivity_state.h" @@ -48,7 +49,8 @@ int grpc_lb_round_robin_trace = 0; * Once a pick is available, \a target is updated and \a on_complete called. */ typedef struct pending_pick { struct pending_pick *next; - grpc_pollset_set *pollset_set; + grpc_pollset *pollset; + grpc_pollset_set *pollset_set_alternative; uint32_t initial_metadata_flags; grpc_connected_subchannel **target; grpc_closure *on_complete; @@ -262,8 +264,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, while (pp != NULL) { pending_pick *next = pp->next; if (pp->target == target) { - grpc_pollset_set_del_pollset_set(exec_ctx, p->base.interested_parties, - pp->pollset_set); + del_pollset_or_pollset_set_alternative( + exec_ctx, p->base.interested_parties, pp->pollset, + pp->pollset_set_alternative); *target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); @@ -288,8 +291,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *next = pp->next; if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { - grpc_pollset_set_del_pollset_set(exec_ctx, p->base.interested_parties, - pp->pollset_set); + del_pollset_or_pollset_set_alternative( + exec_ctx, p->base.interested_parties, pp->pollset, + pp->pollset_set_alternative); *pp->target = NULL; grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL); gpr_free(pp); @@ -329,7 +333,8 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_pollset_set *pollset_set, + grpc_pollset *pollset, + grpc_pollset_set *pollset_set_alternative, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, grpc_connected_subchannel **target, @@ -353,11 +358,12 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (!p->started_picking) { start_picking(exec_ctx, p); } - grpc_pollset_set_add_pollset_set(exec_ctx, p->base.interested_parties, - pollset_set); + add_pollset_or_pollset_set_alternative(exec_ctx, p->base.interested_parties, + pollset, pollset_set_alternative); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; - pp->pollset_set = pollset_set; + pp->pollset = pollset; + pp->pollset_set_alternative = pollset_set_alternative; pp->target = target; pp->on_complete = on_complete; pp->initial_metadata_flags = initial_metadata_flags; @@ -406,8 +412,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)", selected->subchannel, selected); } - grpc_pollset_set_del_pollset_set(exec_ctx, p->base.interested_parties, - pp->pollset_set); + del_pollset_or_pollset_set_alternative( + exec_ctx, p->base.interested_parties, pp->pollset, + pp->pollset_set_alternative); grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL); gpr_free(pp); } |