diff options
author | 2015-10-13 16:17:26 -0700 | |
---|---|---|
committer | 2015-10-13 16:17:26 -0700 | |
commit | 0fe72429a03dd1ac6081ee8efe8fe7b85da7bef3 (patch) | |
tree | e18efd405fce4cb017105fedf9e1cef5585824e0 /src/core/client_config | |
parent | 7b1bd2c2713c9d505094ab728a08d4c94f15c6fa (diff) | |
parent | bee8f104c0827bc829402d79c8302835adfc37f9 (diff) |
Merge branch 'master' of github.com:grpc/grpc into microchannels
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 48 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 17 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 22 | ||||
-rw-r--r-- | src/core/client_config/uri_parser.c | 4 |
4 files changed, 76 insertions, 15 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 28155d0fbc..e5bf0680ff 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -101,6 +101,9 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } + if (p->selected) { + GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); + } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); @@ -172,6 +175,35 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } } +static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + pick_first_lb_policy *p = arg; + size_t i; + grpc_transport_op op; + size_t num_subchannels = p->num_subchannels; + grpc_subchannel **subchannels; + grpc_subchannel *exclude_subchannel; + + gpr_mu_lock(&p->mu); + subchannels = p->subchannels; + p->num_subchannels = 0; + p->subchannels = NULL; + exclude_subchannel = p->selected; + gpr_mu_unlock(&p->mu); + GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels"); + + for (i = 0; i < num_subchannels; i++) { + if (subchannels[i] != exclude_subchannel) { + memset(&op, 0, sizeof(op)); + op.disconnect = 1; + grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op); + } + GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first"); + } + + gpr_free(subchannels); +} + static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; @@ -200,6 +232,12 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY, "connecting_ready"); p->selected = p->subchannels[p->checking_subchannel]; + GRPC_SUBCHANNEL_REF(p->selected, "picked_first"); + /* drop the pick list: we are connected now */ + GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels"); + grpc_exec_ctx_enqueue(exec_ctx, + grpc_closure_create(destroy_subchannels, p), 1); + /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; @@ -279,10 +317,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, size_t i; size_t n; grpc_subchannel **subchannels; + grpc_subchannel *selected; gpr_mu_lock(&p->mu); n = p->num_subchannels; subchannels = gpr_malloc(n * sizeof(*subchannels)); + selected = p->selected; + if (selected) { + GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected"); + } for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast"); @@ -290,9 +333,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { + if (selected == subchannels[i]) continue; grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op); GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast"); } + if (p->selected) { + grpc_subchannel_process_transport_op(exec_ctx, selected, op); + GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected"); + } gpr_free(subchannels); } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a378f06543..8494ebdc1d 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -358,18 +358,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { + grpc_subchannel_call_create_status call_creation_status; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset, - w4c->target, w4c->notify); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); + GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); + w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify) { +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -378,7 +380,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, gpr_mu_unlock(&c->mu); *target = create_call(exec_ctx, con); - notify->cb(exec_ctx, notify->cb_arg, 1); + return GRPC_SUBCHANNEL_CALL_CREATE_READY; } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -403,6 +405,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, } else { gpr_mu_unlock(&c->mu); } + return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index f9bc0c2d2f..ec1cc7cc69 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify); +typedef enum { + GRPC_SUBCHANNEL_CALL_CREATE_READY, + GRPC_SUBCHANNEL_CALL_CREATE_PENDING +} grpc_subchannel_call_create_status; + +/** construct a subchannel call (possibly asynchronously). + * + * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will + * return immediately and \a target will point to a connected \a subchannel_call + * instance. Note that \a notify will \em not be invoked in this case. + * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the + * subchannel call will be created asynchronously, invoking the \a notify + * callback upon completion. */ +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify); /** cancel \a call in the waiting state. */ void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx, diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index df9f32d403..cbdfffcf8e 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -37,6 +37,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> /** a size_t default value... maps to all 1's */ @@ -120,8 +121,7 @@ static int parse_fragment_or_query(const char *uri_text, size_t *i) { } else { return 1; } - gpr_log(GPR_ERROR, "should never reach here"); - abort(); + GPR_UNREACHABLE_CODE(return 0); default: (*i) += advance; break; |