aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-10-13 16:17:26 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-10-13 16:17:26 -0700
commit0fe72429a03dd1ac6081ee8efe8fe7b85da7bef3 (patch)
treee18efd405fce4cb017105fedf9e1cef5585824e0 /src/core/client_config
parent7b1bd2c2713c9d505094ab728a08d4c94f15c6fa (diff)
parentbee8f104c0827bc829402d79c8302835adfc37f9 (diff)
Merge branch 'master' of github.com:grpc/grpc into microchannels
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c48
-rw-r--r--src/core/client_config/subchannel.c17
-rw-r--r--src/core/client_config/subchannel.h22
-rw-r--r--src/core/client_config/uri_parser.c4
4 files changed, 76 insertions, 15 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 28155d0fbc..e5bf0680ff 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -101,6 +101,9 @@ void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
for (i = 0; i < p->num_subchannels; i++) {
GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
}
+ if (p->selected) {
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
+ }
grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
@@ -172,6 +175,35 @@ void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
}
}
+static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ pick_first_lb_policy *p = arg;
+ size_t i;
+ grpc_transport_op op;
+ size_t num_subchannels = p->num_subchannels;
+ grpc_subchannel **subchannels;
+ grpc_subchannel *exclude_subchannel;
+
+ gpr_mu_lock(&p->mu);
+ subchannels = p->subchannels;
+ p->num_subchannels = 0;
+ p->subchannels = NULL;
+ exclude_subchannel = p->selected;
+ gpr_mu_unlock(&p->mu);
+ GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
+
+ for (i = 0; i < num_subchannels; i++) {
+ if (subchannels[i] != exclude_subchannel) {
+ memset(&op, 0, sizeof(op));
+ op.disconnect = 1;
+ grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
+ }
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
+ }
+
+ gpr_free(subchannels);
+}
+
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
pick_first_lb_policy *p = arg;
@@ -200,6 +232,12 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_READY, "connecting_ready");
p->selected = p->subchannels[p->checking_subchannel];
+ GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
+ /* drop the pick list: we are connected now */
+ GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
+ grpc_exec_ctx_enqueue(exec_ctx,
+ grpc_closure_create(destroy_subchannels, p), 1);
+ /* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
@@ -279,10 +317,15 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
size_t i;
size_t n;
grpc_subchannel **subchannels;
+ grpc_subchannel *selected;
gpr_mu_lock(&p->mu);
n = p->num_subchannels;
subchannels = gpr_malloc(n * sizeof(*subchannels));
+ selected = p->selected;
+ if (selected) {
+ GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
+ }
for (i = 0; i < n; i++) {
subchannels[i] = p->subchannels[i];
GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
@@ -290,9 +333,14 @@ static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
+ if (selected == subchannels[i]) continue;
grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
}
+ if (p->selected) {
+ grpc_subchannel_process_transport_op(exec_ctx, selected, op);
+ GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
+ }
gpr_free(subchannels);
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index a378f06543..8494ebdc1d 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -358,18 +358,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
+ grpc_subchannel_call_create_status call_creation_status;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
- w4c->target, w4c->notify);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
+ GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY);
+ w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify) {
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -378,7 +380,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
*target = create_call(exec_ctx, con);
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ return GRPC_SUBCHANNEL_CALL_CREATE_READY;
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@@ -403,6 +405,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
} else {
gpr_mu_unlock(&c->mu);
}
+ return GRPC_SUBCHANNEL_CALL_CREATE_PENDING;
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index f9bc0c2d2f..ec1cc7cc69 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a call (possibly asynchronously) */
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify);
+typedef enum {
+ GRPC_SUBCHANNEL_CALL_CREATE_READY,
+ GRPC_SUBCHANNEL_CALL_CREATE_PENDING
+} grpc_subchannel_call_create_status;
+
+/** construct a subchannel call (possibly asynchronously).
+ *
+ * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
+ * return immediately and \a target will point to a connected \a subchannel_call
+ * instance. Note that \a notify will \em not be invoked in this case.
+ * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
+ * subchannel call will be created asynchronously, invoking the \a notify
+ * callback upon completion. */
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify);
/** cancel \a call in the waiting state. */
void grpc_subchannel_cancel_waiting_call(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c
index df9f32d403..cbdfffcf8e 100644
--- a/src/core/client_config/uri_parser.c
+++ b/src/core/client_config/uri_parser.c
@@ -37,6 +37,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
/** a size_t default value... maps to all 1's */
@@ -120,8 +121,7 @@ static int parse_fragment_or_query(const char *uri_text, size_t *i) {
} else {
return 1;
}
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
+ GPR_UNREACHABLE_CODE(return 0);
default:
(*i) += advance;
break;