aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/channel/client_channel.c329
-rw-r--r--src/core/client_config/client_config.c2
-rw-r--r--src/core/client_config/lb_policies/pick_first.c169
-rw-r--r--src/core/client_config/lb_policy.h2
-rw-r--r--src/core/client_config/resolver_factory.c1
-rw-r--r--src/core/client_config/resolver_registry.c79
-rw-r--r--src/core/client_config/resolver_registry.h12
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c47
-rw-r--r--src/core/client_config/resolvers/dns_resolver.h3
-rw-r--r--src/core/client_config/subchannel.c77
-rw-r--r--src/core/client_config/subchannel.h5
-rw-r--r--src/core/client_config/subchannel_factory.c12
-rw-r--r--src/core/client_config/uri_parser.c25
-rw-r--r--src/core/client_config/uri_parser.h3
-rw-r--r--src/core/surface/init.c4
-rw-r--r--test/core/end2end/fixtures/chttp2_fullstack.c1
16 files changed, 511 insertions, 260 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 9630f6898d..965d4e53dc 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -34,6 +34,7 @@
#include "src/core/channel/client_channel.h"
#include <stdio.h>
+#include <string.h>
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
@@ -75,6 +76,7 @@ typedef enum {
CALL_CREATED,
CALL_WAITING_FOR_CONFIG,
CALL_WAITING_FOR_PICK,
+ CALL_WAITING_FOR_CALL,
CALL_ACTIVE,
CALL_CANCELLED
} call_state;
@@ -87,17 +89,13 @@ struct call_data {
call_state state;
gpr_timespec deadline;
- union {
- struct {
- /* our child call stack */
- grpc_subchannel_call *subchannel_call;
- } active;
- grpc_transport_stream_op waiting_op;
- struct {
- grpc_linked_mdelem status;
- grpc_linked_mdelem details;
- } cancelled;
- } s;
+ grpc_subchannel *picked_channel;
+ grpc_iomgr_closure async_setup_task;
+ grpc_transport_stream_op waiting_op;
+ /* our child call stack */
+ grpc_subchannel_call *subchannel_call;
+ grpc_linked_mdelem status;
+ grpc_linked_mdelem details;
};
#if 0
@@ -110,9 +108,9 @@ static int prepare_activate(grpc_call_element *elem,
/* no more access to calld->s.waiting allowed */
GPR_ASSERT(calld->state == CALL_WAITING);
- if (calld->s.waiting_op.bind_pollset) {
+ if (calld->waiting_op.bind_pollset) {
grpc_transport_setup_del_interested_party(chand->transport_setup,
- calld->s.waiting_op.bind_pollset);
+ calld->waiting_op.bind_pollset);
}
calld->state = CALL_ACTIVE;
@@ -143,7 +141,7 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
if (chand->waiting_children[i] == calld) {
grpc_transport_setup_del_interested_party(
- chand->transport_setup, calld->s.waiting_op.bind_pollset);
+ chand->transport_setup, calld->waiting_op.bind_pollset);
continue;
}
chand->waiting_children[new_count++] = chand->waiting_children[i];
@@ -166,15 +164,15 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_CANCELLED, status);
- calld->s.cancelled.status.md =
+ calld->status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
- calld->s.cancelled.details.md =
+ calld->details.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
- calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
- calld->s.cancelled.status.next = &calld->s.cancelled.details;
- calld->s.cancelled.details.prev = &calld->s.cancelled.status;
- mdb.list.head = &calld->s.cancelled.status;
- mdb.list.tail = &calld->s.cancelled.details;
+ calld->status.prev = calld->details.next = NULL;
+ calld->status.next = &calld->details;
+ calld->details.prev = &calld->status;
+ mdb.list.head = &calld->status;
+ mdb.list.tail = &calld->details;
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = gpr_inf_future;
grpc_sopb_add_metadata(op->recv_ops, mdb);
@@ -186,16 +184,111 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
}
}
-static void add_to_lb_policy_wait_queue_locked_state_config(channel_data *chand, call_data *calld) {
- abort();
+typedef struct {
+ grpc_iomgr_closure closure;
+ grpc_call_element *elem;
+} waiting_call;
+
+static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation);
+
+static void continue_with_pick(void *arg, int iomgr_success) {
+ waiting_call *wc = arg;
+ call_data *calld = wc->elem->call_data;
+ perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
+ gpr_free(wc);
+}
+
+static void add_to_lb_policy_wait_queue_locked_state_config(grpc_call_element *elem) {
+ channel_data *chand = elem->channel_data;
+ waiting_call *wc = gpr_malloc(sizeof(*wc));
+ grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
+ wc->elem = elem;
+ wc->closure.next = chand->waiting_for_config_closures;
+ chand->waiting_for_config_closures = &wc->closure;
+}
+
+static int is_empty(void *p, int len) {
+ char *ptr = p;
+ int i;
+ for (i = 0; i < len; i++) {
+ if (ptr[i] != 0) return 0;
+ }
+ return 1;
+}
+
+static void started_call(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ grpc_transport_stream_op op;
+ int have_waiting;
+
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
+ memset(&op, 0, sizeof(op));
+ op.cancel_with_status = GRPC_STATUS_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_subchannel_call_process_op(calld->subchannel_call, &op);
+ } else if (calld->state == CALL_WAITING_FOR_CALL) {
+ have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
+ if (calld->subchannel_call != NULL) {
+ calld->state = CALL_ACTIVE;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ grpc_subchannel_call_process_op(calld->subchannel_call, &calld->waiting_op);
+ }
+ } else {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ if (have_waiting) {
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ }
+ }
+ } else {
+ GPR_ASSERT(calld->state == CALL_CANCELLED);
+ }
+}
+
+static void picked_target(void *arg, int iomgr_success) {
+ call_data *calld = arg;
+ channel_data *chand = calld->elem->channel_data;
+ grpc_transport_stream_op op;
+
+ if (calld->picked_channel == NULL) {
+ /* treat this like a cancellation */
+ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
+ perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
+ } else {
+ gpr_mu_lock(&calld->mu_state);
+ if (calld->state == CALL_CANCELLED) {
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ } else {
+ GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
+ calld->state = CALL_WAITING_FOR_CALL;
+ op = calld->waiting_op;
+ memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
+ gpr_mu_unlock(&calld->mu_state);
+ grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
+ grpc_subchannel_create_call(calld->picked_channel, chand->mdctx, &op, &calld->subchannel_call, &calld->async_setup_task);
+ }
+ }
}
static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
- abort();
+ grpc_metadata_batch *initial_metadata;
+ grpc_transport_stream_op *op = &calld->waiting_op;
+
+ GPR_ASSERT(op->bind_pollset);
+ GPR_ASSERT(op->send_ops);
+ GPR_ASSERT(op->send_ops->nops >= 1);
+ GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ initial_metadata = &op->send_ops->ops[0].data.metadata;
+
+ grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_lb_policy_pick(lb_policy, op->bind_pollset,
+ initial_metadata, &calld->picked_channel, &calld->async_setup_task);
}
-static void cc_start_transport_stream_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, int continuation) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
@@ -206,7 +299,8 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- subchannel_call = calld->s.active.subchannel_call;
+ GPR_ASSERT(!continuation);
+ subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_process_op(subchannel_call, op);
break;
@@ -214,13 +308,44 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
break;
+ case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_PICK:
+ case CALL_WAITING_FOR_CALL:
+ if (!continuation) {
+ if (op->cancel_with_status != GRPC_STATUS_OK) {
+ calld->state = CALL_CANCELLED;
+ gpr_mu_unlock(&calld->mu_state);
+ handle_op_after_cancellation(elem, op);
+ } else {
+ GPR_ASSERT((calld->waiting_op.send_ops == NULL) !=
+ (op->send_ops == NULL));
+ GPR_ASSERT((calld->waiting_op.recv_ops == NULL) !=
+ (op->recv_ops == NULL));
+ if (op->send_ops != NULL) {
+ calld->waiting_op.send_ops = op->send_ops;
+ calld->waiting_op.is_last_send = op->is_last_send;
+ calld->waiting_op.on_done_send = op->on_done_send;
+ }
+ if (op->recv_ops != NULL) {
+ calld->waiting_op.recv_ops = op->recv_ops;
+ calld->waiting_op.recv_state = op->recv_state;
+ calld->waiting_op.on_done_recv = op->on_done_recv;
+ }
+ gpr_mu_unlock(&calld->mu_state);
+ if (op->on_consumed != NULL) {
+ op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+ }
+ }
+ break;
+ }
+ /* fall through */
case CALL_CREATED:
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
handle_op_after_cancellation(elem, op);
} else {
- calld->s.waiting_op = *op;
+ calld->waiting_op = *op;
gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy;
@@ -235,141 +360,22 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
grpc_lb_policy_unref(lb_policy);
} else {
calld->state = CALL_WAITING_FOR_CONFIG;
- add_to_lb_policy_wait_queue_locked_state_config(chand, calld);
+ add_to_lb_policy_wait_queue_locked_state_config(elem);
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
}
}
break;
- case CALL_WAITING_FOR_CONFIG:
- case CALL_WAITING_FOR_PICK:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op);
- } else {
- GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
- (op->send_ops == NULL));
- GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
- (op->recv_ops == NULL));
- if (op->send_ops != NULL) {
- calld->s.waiting_op.send_ops = op->send_ops;
- calld->s.waiting_op.is_last_send = op->is_last_send;
- calld->s.waiting_op.on_done_send = op->on_done_send;
- }
- if (op->recv_ops != NULL) {
- calld->s.waiting_op.recv_ops = op->recv_ops;
- calld->s.waiting_op.recv_state = op->recv_state;
- calld->s.waiting_op.on_done_recv = op->on_done_recv;
- }
- gpr_mu_unlock(&calld->mu_state);
- if (op->on_consumed != NULL) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
- }
- }
- break;
}
+}
-
-
-
-#if 0
- gpr_mu_lock(&chand->mu);
- switch (calld->state) {
- case CALL_ACTIVE:
- child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
- gpr_mu_unlock(&chand->mu);
- child_elem->filter->start_transport_op(child_elem, op);
- break;
- case CALL_CREATED:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- } else {
- calld->state = CALL_WAITING;
- calld->s.waiting_op.bind_pollset = NULL;
- if (chand->active_child) {
- /* channel is connected - use the connected stack */
- if (prepare_activate(elem, chand->active_child)) {
- gpr_mu_unlock(&chand->mu);
- /* activate the request (pass it down) outside the lock */
- complete_activate(elem, op);
- } else {
- gpr_mu_unlock(&chand->mu);
- }
- } else {
- /* check to see if we should initiate a connection (if we're not
- already),
- but don't do so until outside the lock to avoid re-entrancy
- problems if
- the callback is immediate */
- int initiate_transport_setup = 0;
- if (!chand->transport_setup_initiated) {
- chand->transport_setup_initiated = 1;
- initiate_transport_setup = 1;
- }
- /* add this call to the waiting set to be resumed once we have a child
- channel stack, growing the waiting set if needed */
- if (chand->waiting_child_count == chand->waiting_child_capacity) {
- chand->waiting_child_capacity =
- GPR_MAX(chand->waiting_child_capacity * 2, 8);
- chand->waiting_children = gpr_realloc(
- chand->waiting_children,
- chand->waiting_child_capacity * sizeof(call_data *));
- }
- calld->s.waiting_op = *op;
- chand->waiting_children[chand->waiting_child_count++] = calld;
- grpc_transport_setup_add_interested_party(chand->transport_setup,
- op->bind_pollset);
- gpr_mu_unlock(&chand->mu);
-
- /* finally initiate transport setup if needed */
- if (initiate_transport_setup) {
- grpc_transport_setup_initiate(chand->transport_setup);
- }
- }
- }
- break;
- case CALL_WAITING:
- if (op->cancel_with_status != GRPC_STATUS_OK) {
- waiting_op = calld->s.waiting_op;
- remove_waiting_child(chand, calld);
- calld->state = CALL_CANCELLED;
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, &waiting_op);
- handle_op_after_cancellation(elem, op);
- } else {
- GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
- (op->send_ops == NULL));
- GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
- (op->recv_ops == NULL));
- if (op->send_ops) {
- calld->s.waiting_op.send_ops = op->send_ops;
- calld->s.waiting_op.is_last_send = op->is_last_send;
- calld->s.waiting_op.on_done_send = op->on_done_send;
- }
- if (op->recv_ops) {
- calld->s.waiting_op.recv_ops = op->recv_ops;
- calld->s.waiting_op.recv_state = op->recv_state;
- calld->s.waiting_op.on_done_recv = op->on_done_recv;
- }
- gpr_mu_unlock(&chand->mu);
- if (op->on_consumed) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
- }
- }
- break;
- case CALL_CANCELLED:
- gpr_mu_unlock(&chand->mu);
- handle_op_after_cancellation(elem, op);
- break;
- }
-#endif
+static void cc_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
+ perform_transport_stream_op(elem, op, 0);
}
static void update_state_locked(channel_data *chand) {
-
+ gpr_log(GPR_ERROR, "update_state_locked not implemented");
}
static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -382,9 +388,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (chand->incoming_configuration) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
grpc_lb_policy_ref(lb_policy);
+
+ grpc_client_config_unref(chand->incoming_configuration);
}
- grpc_client_config_unref(chand->incoming_configuration);
chand->incoming_configuration = NULL;
gpr_mu_lock(&chand->mu_config);
@@ -402,7 +409,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
wakeup_closures = next;
}
- grpc_lb_policy_unref(old_lb_policy);
+ if (old_lb_policy) {
+ grpc_lb_policy_unref(old_lb_policy);
+ }
if (iomgr_success) {
grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
@@ -511,6 +520,7 @@ static void init_call_elem(grpc_call_element *elem,
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GPR_ASSERT(server_transport_data == NULL);
+ gpr_mu_init(&calld->mu_state);
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
@@ -527,7 +537,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
gpr_mu_lock(&calld->mu_state);
switch (calld->state) {
case CALL_ACTIVE:
- subchannel_call = calld->s.active.subchannel_call;
+ subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
grpc_subchannel_call_unref(subchannel_call);
break;
@@ -537,6 +547,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
break;
case CALL_WAITING_FOR_PICK:
case CALL_WAITING_FOR_CONFIG:
+ case CALL_WAITING_FOR_CALL:
gpr_log(GPR_ERROR, "should never reach here");
abort();
break;
@@ -550,12 +561,12 @@ static void init_channel_elem(grpc_channel_element *elem,
int is_last) {
channel_data *chand = elem->channel_data;
- GPR_ASSERT(!is_first);
+ memset(chand, 0, sizeof(*chand));
+
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
gpr_mu_init(&chand->mu_config);
- chand->resolver = NULL;
chand->mdctx = metadata_context;
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
}
@@ -633,7 +644,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
- call_ops[i] = waiting_children[i]->s.waiting_op;
+ call_ops[i] = waiting_children[i]->waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
grpc_transport_stream_op_finish_with_failure(&call_ops[i]);
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c
index e7d7647b88..bc8dcec54e 100644
--- a/src/core/client_config/client_config.c
+++ b/src/core/client_config/client_config.c
@@ -61,7 +61,7 @@ void grpc_client_config_unref(grpc_client_config *c) {
void grpc_client_config_set_lb_policy(grpc_client_config *c,
grpc_lb_policy *lb_policy) {
if (lb_policy) {
- grpc_lb_policy_ref(c->lb_policy);
+ grpc_lb_policy_ref(lb_policy);
}
if (c->lb_policy) {
grpc_lb_policy_unref(c->lb_policy);
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 46732b8444..83a25a9a72 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -32,3 +32,172 @@
*/
#include "src/core/client_config/lb_policies/pick_first.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+
+typedef struct pending_pick {
+ struct pending_pick *next;
+ grpc_pollset *pollset;
+ grpc_subchannel **target;
+ grpc_iomgr_closure *on_complete;
+} pending_pick;
+
+typedef struct {
+ /** base policy: must be first */
+ grpc_lb_policy base;
+ /** ref count */
+ gpr_refcount refs;
+ /** all our subchannels */
+ grpc_subchannel **subchannels;
+ size_t num_subchannels;
+
+ grpc_iomgr_closure connectivity_changed;
+
+ /** mutex protecting remaining members */
+ gpr_mu mu;
+ /** the selected channel
+ TODO(ctiller): this should be atomically set so we don't
+ need to take a mutex in the common case */
+ grpc_subchannel *selected;
+ /** have we started picking? */
+ int started_picking;
+ /** which subchannel are we watching? */
+ size_t checking_subchannel;
+ /** what is the connectivity of that channel? */
+ grpc_connectivity_state checking_connectivity;
+ /** list of picks that are waiting on connectivity */
+ pending_pick *pending_picks;
+} pick_first_lb_policy;
+
+void pf_ref(grpc_lb_policy *pol) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ gpr_ref(&p->refs);
+}
+
+void pf_unref(grpc_lb_policy *pol) {
+ pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
+ if (gpr_unref(&p->refs)) {
+ gpr_free(p->subchannels);
+ gpr_mu_destroy(&p->mu);
+ gpr_free(p);
+ }
+}
+
+void pf_shutdown(grpc_lb_policy *pol) {
+ /* pick_first_lb_policy *p = (pick_first_lb_policy*)pol; */
+ abort();
+}
+
+void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
+ grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
+ grpc_iomgr_closure *on_complete) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ pending_pick *pp;
+ gpr_mu_lock(&p->mu);
+ if (p->selected) {
+ gpr_mu_unlock(&p->mu);
+ *target = p->selected;
+ on_complete->cb(on_complete->cb_arg, 1);
+ } else {
+ if (!p->started_picking) {
+ p->started_picking = 1;
+ p->checking_subchannel = 0;
+ p->checking_connectivity = GRPC_CHANNEL_IDLE;
+ pf_ref(pol);
+ grpc_subchannel_notify_on_state_change(p->subchannels[0], &p->checking_connectivity, &p->connectivity_changed);
+ }
+ grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pollset);
+ pp = gpr_malloc(sizeof(*pp));
+ pp->next = p->pending_picks;
+ pp->pollset = pollset;
+ pp->target = target;
+ pp->on_complete = on_complete;
+ p->pending_picks = pp;
+ gpr_mu_unlock(&p->mu);
+ }
+}
+
+static void del_interested_parties_locked(pick_first_lb_policy *p) {
+ pending_pick *pp;
+ for (pp = p->pending_picks; pp; pp = pp->next) {
+ grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], pp->pollset);
+ }
+}
+
+static void add_interested_parties_locked(pick_first_lb_policy *p) {
+ pending_pick *pp;
+ for (pp = p->pending_picks; pp; pp = pp->next) {
+ grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], pp->pollset);
+ }
+}
+
+static void pf_connectivity_changed(void *arg, int iomgr_success) {
+ pick_first_lb_policy *p = arg;
+ pending_pick *pp;
+ int unref = 0;
+
+ gpr_mu_lock(&p->mu);
+loop:
+ switch (p->checking_connectivity) {
+ case GRPC_CHANNEL_READY:
+ p->selected = p->subchannels[p->checking_connectivity];
+ while ((pp = p->pending_picks)) {
+ p->pending_picks = pp->next;
+ *pp->target = p->selected;
+ grpc_subchannel_del_interested_party(p->selected, pp->pollset);
+ grpc_iomgr_add_delayed_callback(pp->on_complete, 1);
+ gpr_free(pp);
+ }
+ unref = 1;
+ break;
+ case GRPC_CHANNEL_TRANSIENT_FAILURE:
+ del_interested_parties_locked(p);
+ p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels;
+ p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
+ add_interested_parties_locked(p);
+ goto loop;
+ case GRPC_CHANNEL_CONNECTING:
+ case GRPC_CHANNEL_IDLE:
+ grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel], &p->checking_connectivity, &p->connectivity_changed);
+ break;
+ case GRPC_CHANNEL_FATAL_FAILURE:
+ del_interested_parties_locked(p);
+ GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]);
+ p->checking_subchannel %= p->num_subchannels;
+ p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]);
+ p->num_subchannels--;
+ grpc_subchannel_unref(p->subchannels[p->num_subchannels]);
+ add_interested_parties_locked(p);
+ if (p->num_subchannels == 0) {
+ abort();
+ } else {
+ goto loop;
+ }
+ }
+ gpr_mu_unlock(&p->mu);
+
+ if (unref) {
+ pf_unref(&p->base);
+ }
+}
+
+static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
+ pf_ref, pf_unref, pf_shutdown, pf_pick};
+
+grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
+ size_t num_subchannels) {
+ pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
+ GPR_ASSERT(num_subchannels);
+ memset(p, 0, sizeof(*p));
+ p->base.vtable = &pick_first_lb_policy_vtable;
+ gpr_ref_init(&p->refs, 1);
+ p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels);
+ p->num_subchannels = num_subchannels;
+ memcpy(p->subchannels, subchannels,
+ sizeof(grpc_subchannel *) * num_subchannels);
+ grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
+ gpr_mu_init(&p->mu);
+ return &p->base;
+}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 4e185d9086..42929e933b 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -45,7 +45,7 @@ typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel,
grpc_status_code status, const char *errmsg);
struct grpc_lb_policy {
- grpc_lb_policy_vtable *vtable;
+ const grpc_lb_policy_vtable *vtable;
};
struct grpc_lb_policy_vtable {
diff --git a/src/core/client_config/resolver_factory.c b/src/core/client_config/resolver_factory.c
index 378529d5b2..6721977e21 100644
--- a/src/core/client_config/resolver_factory.c
+++ b/src/core/client_config/resolver_factory.c
@@ -45,5 +45,6 @@ void grpc_resolver_factory_unref(grpc_resolver_factory *factory) {
grpc_resolver *grpc_resolver_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
+ if (!factory) return NULL;
return factory->vtable->create_resolver(factory, uri, subchannel_factory);
}
diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c
index 770d4aca8e..abdb5f9377 100644
--- a/src/core/client_config/resolver_registry.c
+++ b/src/core/client_config/resolver_registry.c
@@ -46,57 +46,76 @@ typedef struct {
grpc_resolver_factory *factory;
} registered_resolver;
-static registered_resolver all_of_the_resolvers[MAX_RESOLVERS];
-static int number_of_resolvers = 0;
+static registered_resolver g_all_of_the_resolvers[MAX_RESOLVERS];
+static int g_number_of_resolvers = 0;
-void grpc_resolver_registry_init(grpc_resolver_factory *r) {
- number_of_resolvers = 0;
- grpc_register_resolver_type("default-grpc-resolver", r);
+static char *g_default_resolver_scheme;
+
+void grpc_resolver_registry_init(const char *default_resolver_scheme) {
+ g_number_of_resolvers = 0;
+ g_default_resolver_scheme = gpr_strdup(default_resolver_scheme);
}
void grpc_resolver_registry_shutdown(void) {
int i;
- for (i = 0; i < number_of_resolvers; i++) {
- gpr_free(all_of_the_resolvers[i].scheme);
- grpc_resolver_factory_unref(all_of_the_resolvers[i].factory);
+ for (i = 0; i < g_number_of_resolvers; i++) {
+ gpr_free(g_all_of_the_resolvers[i].scheme);
+ grpc_resolver_factory_unref(g_all_of_the_resolvers[i].factory);
}
+ gpr_free(g_default_resolver_scheme);
}
void grpc_register_resolver_type(const char *scheme,
grpc_resolver_factory *factory) {
int i;
- for (i = 0; i < number_of_resolvers; i++) {
- GPR_ASSERT(0 != strcmp(scheme, all_of_the_resolvers[i].scheme));
+ for (i = 0; i < g_number_of_resolvers; i++) {
+ GPR_ASSERT(0 != strcmp(scheme, g_all_of_the_resolvers[i].scheme));
}
- GPR_ASSERT(number_of_resolvers != MAX_RESOLVERS);
- all_of_the_resolvers[number_of_resolvers].scheme = gpr_strdup(scheme);
+ GPR_ASSERT(g_number_of_resolvers != MAX_RESOLVERS);
+ g_all_of_the_resolvers[g_number_of_resolvers].scheme = gpr_strdup(scheme);
grpc_resolver_factory_ref(factory);
- all_of_the_resolvers[number_of_resolvers].factory = factory;
- number_of_resolvers++;
+ g_all_of_the_resolvers[g_number_of_resolvers].factory = factory;
+ g_number_of_resolvers++;
+}
+
+static grpc_resolver_factory *lookup_factory(grpc_uri *uri) {
+ int i;
+
+ /* handling NULL uri's here simplifies grpc_resolver_create */
+ if (!uri) return NULL;
+
+ for (i = 0; i < g_number_of_resolvers; i++) {
+ if (0 == strcmp(uri->scheme, g_all_of_the_resolvers[i].scheme)) {
+ return g_all_of_the_resolvers[i].factory;
+ }
+ }
+
+ return NULL;
}
grpc_resolver *grpc_resolver_create(
const char *name, grpc_subchannel_factory *subchannel_factory) {
grpc_uri *uri;
- int i;
char *tmp;
- grpc_resolver *resolver = NULL;
- if (grpc_has_scheme(name)) {
- uri = grpc_uri_parse(name);
- if (!uri) {
- return NULL;
- }
- for (i = 0; i < number_of_resolvers; i++) {
- if (0 == strcmp(all_of_the_resolvers[i].scheme, uri->scheme)) {
- grpc_resolver_factory_create_resolver(all_of_the_resolvers[i].factory,
- uri, subchannel_factory);
- }
+ grpc_resolver_factory *factory = NULL;
+ grpc_resolver *resolver;
+
+ uri = grpc_uri_parse(name);
+ factory = lookup_factory(uri);
+ if (factory == NULL && g_default_resolver_scheme != NULL) {
+ grpc_uri_destroy(uri);
+ gpr_asprintf(&tmp, "%s%s", g_default_resolver_scheme, name);
+ uri = grpc_uri_parse(tmp);
+ factory = lookup_factory(uri);
+ if (factory == NULL) {
+ gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", name, tmp);
}
- } else {
- gpr_asprintf(&tmp, "default-grpc-resolver:%s", name);
- GPR_ASSERT(grpc_has_scheme(tmp));
- resolver = grpc_resolver_create(tmp, subchannel_factory);
gpr_free(tmp);
+ } else if (factory == NULL) {
+ gpr_log(GPR_ERROR, "don't know how to resolve '%s'", name);
}
+ resolver =
+ grpc_resolver_factory_create_resolver(factory, uri, subchannel_factory);
+ grpc_uri_destroy(uri);
return resolver;
}
diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h
index 53335172da..31aa47620a 100644
--- a/src/core/client_config/resolver_registry.h
+++ b/src/core/client_config/resolver_registry.h
@@ -36,7 +36,7 @@
#include "src/core/client_config/resolver_factory.h"
-void grpc_resolver_registry_init(grpc_resolver_factory *default_resolver);
+void grpc_resolver_registry_init(const char *default_prefix);
void grpc_resolver_registry_shutdown(void);
/** Register a resolver type.
@@ -47,7 +47,15 @@ void grpc_resolver_registry_shutdown(void);
void grpc_register_resolver_type(const char *scheme,
grpc_resolver_factory *factory);
-/** Create a resolver given a \a uri string (with an optional scheme prefix) */
+/** Create a resolver given \a name.
+ First tries to parse \a name as a URI. If this succeeds, tries
+ to locate a registered resolver factory based on the URI scheme.
+ If parsing or location fails, prefixes default_prefix from
+ grpc_resolver_registry_init to name, and tries again (if default_prefix
+ was not NULL).
+ If a resolver factory was found, use it to instantiate a resolver and
+ return it.
+ If a resolver factory was not found, return NULL. */
grpc_resolver *grpc_resolver_create(
const char *name, grpc_subchannel_factory *subchannel_factory);
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 95f38ecab7..ba82675275 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
+#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/support/string.h"
@@ -177,6 +178,8 @@ static void dns_start_resolving_locked(dns_resolver *r) {
static void dns_maybe_finish_next_locked(dns_resolver *r) {
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
+ *r->target_config = r->resolved_config;
+ grpc_client_config_ref(r->resolved_config);
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
r->published_version = r->resolved_version;
@@ -191,8 +194,11 @@ static void dns_destroy(dns_resolver *r) {
gpr_free(r);
}
-static grpc_resolver *dns_create(grpc_uri *uri, const char *default_port,
- grpc_subchannel_factory *subchannel_factory) {
+static grpc_resolver *dns_create(
+ grpc_uri *uri, const char *default_port,
+ grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
+ size_t num_subchannels),
+ grpc_subchannel_factory *subchannel_factory) {
dns_resolver *r;
const char *path = uri->path;
@@ -211,6 +217,7 @@ static grpc_resolver *dns_create(grpc_uri *uri, const char *default_port,
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
r->subchannel_factory = subchannel_factory;
+ r->lb_policy_factory = lb_policy_factory;
grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
}
@@ -219,43 +226,21 @@ static grpc_resolver *dns_create(grpc_uri *uri, const char *default_port,
* FACTORY
*/
-typedef struct {
- /** base: must be first */
- grpc_resolver_factory base;
- /** ref count */
- gpr_refcount refs;
- /** default port */
- char *default_port;
-} dns_resolver_factory;
-
-static void dns_factory_ref(grpc_resolver_factory *factory) {
- dns_resolver_factory *f = (dns_resolver_factory *)factory;
- gpr_ref(&f->refs);
-}
+static void dns_factory_ref(grpc_resolver_factory *factory) {}
-static void dns_factory_unref(grpc_resolver_factory *factory) {
- dns_resolver_factory *f = (dns_resolver_factory *)factory;
- if (gpr_unref(&f->refs)) {
- gpr_free(f->default_port);
- gpr_free(f);
- }
-}
+static void dns_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *dns_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
- dns_resolver_factory *f = (dns_resolver_factory *)factory;
- return dns_create(uri, f->default_port, subchannel_factory);
+ return dns_create(uri, "https", grpc_create_pick_first_lb_policy,
+ subchannel_factory);
}
static const grpc_resolver_factory_vtable dns_factory_vtable = {
dns_factory_ref, dns_factory_unref, dns_factory_create_resolver};
+static grpc_resolver_factory dns_resolver_factory = {&dns_factory_vtable};
-grpc_resolver_factory *grpc_dns_resolver_factory_create(
- const char *default_port) {
- dns_resolver_factory *f = gpr_malloc(sizeof(*f));
- memset(f, 0, sizeof(*f));
- f->base.vtable = &dns_factory_vtable;
- f->default_port = gpr_strdup(default_port);
- return &f->base;
+grpc_resolver_factory *grpc_dns_resolver_factory_create() {
+ return &dns_resolver_factory;
}
diff --git a/src/core/client_config/resolvers/dns_resolver.h b/src/core/client_config/resolvers/dns_resolver.h
index 9881448fd8..09322b16cf 100644
--- a/src/core/client_config/resolvers/dns_resolver.h
+++ b/src/core/client_config/resolvers/dns_resolver.h
@@ -37,7 +37,6 @@
#include "src/core/client_config/resolver_factory.h"
/** Create a dns resolver for \a name */
-grpc_resolver_factory *grpc_dns_resolver_factory_create(
- const char *default_port);
+grpc_resolver_factory *grpc_dns_resolver_factory_create(void);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_DNS_RESOLVER_H */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 037f0c0ab0..9637cf39fe 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -37,6 +37,13 @@
#include <grpc/support/alloc.h>
+#include "src/core/channel/channel_args.h"
+
+typedef struct {
+ gpr_refcount refs;
+ grpc_subchannel *subchannel;
+} connection;
+
struct grpc_subchannel {
gpr_refcount refs;
grpc_connector *connector;
@@ -49,24 +56,45 @@ struct grpc_subchannel {
/** address to connect to */
struct sockaddr *addr;
size_t addr_len;
+
+ /** set during connection */
+ grpc_transport *connecting_transport;
+
+ /** callback for connection finishing */
+ grpc_iomgr_closure connected;
+
+ /** mutex protecting remaining elements */
+ gpr_mu mu;
+
+ /** active connection */
+ connection *active;
+ /** are we connecting */
+ int connecting;
+ /** closures waiting for a connection */
+ grpc_iomgr_closure *waiting;
};
struct grpc_subchannel_call {
- grpc_subchannel *subchannel;
+ connection *connection;
gpr_refcount refs;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1)
+static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
+
/*
* grpc_subchannel implementation
*/
-void grpc_subchannel_ref(grpc_subchannel *channel) { gpr_ref(&channel->refs); }
+void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); }
-void grpc_subchannel_unref(grpc_subchannel *channel) {
- if (gpr_unref(&channel->refs)) {
- gpr_free(channel);
+void grpc_subchannel_unref(grpc_subchannel *c) {
+ if (gpr_unref(&c->refs)) {
+ gpr_free(c->filters);
+ grpc_channel_args_destroy(c->args);
+ gpr_free(c->addr);
+ gpr_free(c);
}
}
@@ -84,9 +112,39 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
c->addr_len = args->addr_len;
+ c->args = grpc_channel_args_copy(args->args);
+ gpr_mu_init(&c->mu);
return c;
}
+void grpc_subchannel_create_call(grpc_subchannel *c,
+ grpc_mdctx *mdctx,
+ grpc_transport_stream_op *initial_op,
+ grpc_subchannel_call **target,
+ grpc_iomgr_closure *notify) {
+ connection *con;
+ gpr_mu_lock(&c->mu);
+ if (c->active != NULL) {
+ con = c->active;
+ gpr_ref(&con->refs);
+ gpr_mu_unlock(&c->mu);
+
+ *target = create_call(con, initial_op);
+ notify->cb(notify->cb_arg, 1);
+ } else {
+ notify->next = c->waiting;
+ c->waiting = notify;
+ if (!c->connecting) {
+ c->connecting = 1;
+ gpr_mu_unlock(&c->mu);
+
+ grpc_connector_connect(c->connector, c->args, mdctx, &c->connecting_transport, &c->connected);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ }
+ }
+}
+
/*
* grpc_subchannel_call implementation
*/
@@ -98,7 +156,9 @@ void grpc_subchannel_call_ref(grpc_subchannel_call *call) {
void grpc_subchannel_call_unref(grpc_subchannel_call *call) {
if (gpr_unref(&call->refs)) {
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call));
- grpc_subchannel_unref(call->subchannel);
+ if (gpr_unref(&call->connection->refs)) {
+ gpr_free(call->connection);
+ }
gpr_free(call);
}
}
@@ -109,3 +169,8 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
top_elem->filter->start_transport_stream_op(top_elem, op);
}
+
+grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
+ abort();
+ return NULL;
+}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index b4d40eadfa..8b3d82eb0a 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -59,9 +59,12 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify);
+void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
+void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
+
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
- grpc_call_element *parent,
+ grpc_mdctx *mdctx,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
grpc_iomgr_closure *notify);
diff --git a/src/core/client_config/subchannel_factory.c b/src/core/client_config/subchannel_factory.c
index 06e8a4bdaf..f71386594c 100644
--- a/src/core/client_config/subchannel_factory.c
+++ b/src/core/client_config/subchannel_factory.c
@@ -32,3 +32,15 @@
*/
#include "src/core/client_config/subchannel_factory.h"
+
+void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory) {
+ factory->vtable->ref(factory);
+}
+void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory) {
+ factory->vtable->unref(factory);
+}
+
+grpc_subchannel *grpc_subchannel_factory_create_subchannel(
+ grpc_subchannel_factory *factory, grpc_subchannel_args *args) {
+ return factory->vtable->create_subchannel(factory, args);
+}
diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c
index 24dfaae497..43b5b47f55 100644
--- a/src/core/client_config/uri_parser.c
+++ b/src/core/client_config/uri_parser.c
@@ -64,30 +64,6 @@ static char *copy_fragment(const char *src, int begin, int end) {
return out;
}
-int grpc_has_scheme(const char *uri_text) {
- int scheme_begin = 0;
- int scheme_end = -1;
- int i;
-
- for (i = scheme_begin; uri_text[i] != 0; i++) {
- if (uri_text[i] == ':') {
- scheme_end = i;
- break;
- }
- if (uri_text[i] >= 'a' && uri_text[i] <= 'z') continue;
- if (uri_text[i] >= 'A' && uri_text[i] <= 'Z') continue;
- if (i != scheme_begin) {
- if (uri_text[i] >= '0' && uri_text[i] <= '9') continue;
- if (uri_text[i] == '+') continue;
- if (uri_text[i] == '-') continue;
- if (uri_text[i] == '.') continue;
- }
- break;
- }
-
- return scheme_end != -1;
-}
-
grpc_uri *grpc_uri_parse(const char *uri_text) {
grpc_uri *uri;
int scheme_begin = 0;
@@ -162,6 +138,7 @@ grpc_uri *grpc_uri_parse(const char *uri_text) {
}
void grpc_uri_destroy(grpc_uri *uri) {
+ if (!uri) return;
gpr_free(uri->scheme);
gpr_free(uri->authority);
gpr_free(uri->path);
diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h
index 0e21d568a9..b6821f9621 100644
--- a/src/core/client_config/uri_parser.h
+++ b/src/core/client_config/uri_parser.h
@@ -43,9 +43,6 @@ typedef struct {
/** parse a uri, return NULL on failure */
grpc_uri *grpc_uri_parse(const char *uri_text);
-/** return 1 if uri_text has something that is likely a scheme, 0 otherwise */
-int grpc_has_scheme(const char *uri_text);
-
/** destroy a uri */
void grpc_uri_destroy(grpc_uri *uri);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index ca61a38a35..03add81466 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -34,6 +34,8 @@
#include <grpc/census.h>
#include <grpc/grpc.h>
#include "src/core/channel/channel_stack.h"
+#include "src/core/client_config/resolver_registry.h"
+#include "src/core/client_config/resolvers/dns_resolver.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h"
@@ -56,6 +58,8 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
+ grpc_resolver_registry_init("dns:///");
+ grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create());
grpc_register_tracer("channel", &grpc_trace_channel);
grpc_register_tracer("surface", &grpc_surface_trace);
grpc_register_tracer("http", &grpc_http_trace);
diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c
index e647434509..8a1530e63b 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack.c
@@ -73,6 +73,7 @@ void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
fullstack_fixture_data *ffd = f->fixture_data;
f->client = grpc_channel_create(ffd->localaddr, client_args);
+ GPR_ASSERT(f->client);
}
void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,