aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-24 11:33:04 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-24 11:33:04 -0700
commitf8b14ca5f24448767c8627bb11508ba91ce608a0 (patch)
tree9c674d6529e8da204ca26021ca50d005211bc27f /src/core/surface
parentb9d3596cb1cf9406ea6b1c67eda04a497857f6b4 (diff)
parent9e71674ab942c748f24e945327424163c15b5e66 (diff)
Merge github.com:grpc/grpc into come-out-of-the-shadow
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/channel_connectivity.c16
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/secure_channel_create.c51
4 files changed, 62 insertions, 15 deletions
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 88a7c16598..5c55ad3655 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -67,6 +67,7 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
+ int removed;
grpc_iomgr_closure on_complete;
grpc_alarm alarm;
grpc_connectivity_state state;
@@ -77,10 +78,6 @@ typedef struct {
} state_watcher;
static void delete_state_watcher(state_watcher *w) {
- grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
- grpc_channel_get_channel_stack(w->channel));
- grpc_client_channel_del_interested_party(client_channel_elem,
- grpc_cq_pollset(w->cq));
GRPC_CHANNEL_INTERNAL_UNREF(w->channel, "watch_connectivity");
gpr_mu_destroy(&w->mu);
gpr_free(w);
@@ -112,7 +109,17 @@ static void finished_completion(void *pw, grpc_cq_completion *ignored) {
static void partly_done(state_watcher *w, int due_to_completion) {
int delete = 0;
+ grpc_channel_element *client_channel_elem = NULL;
+ gpr_mu_lock(&w->mu);
+ if (w->removed == 0) {
+ w->removed = 1;
+ client_channel_elem = grpc_channel_stack_last_element(
+ grpc_channel_get_channel_stack(w->channel));
+ grpc_client_channel_del_interested_party(client_channel_elem,
+ grpc_cq_pollset(w->cq));
+ }
+ gpr_mu_unlock(&w->mu);
if (due_to_completion) {
gpr_mu_lock(&w->mu);
w->success = 1;
@@ -163,6 +170,7 @@ void grpc_channel_watch_connectivity_state(
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
+ w->removed = 0;
w->cq = cq;
w->tag = tag;
w->channel = channel;
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 9e2cf1cf66..d323d0d74f 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -88,6 +88,8 @@ static void connected(void *arg, grpc_endpoint *tcp) {
grpc_iomgr_add_callback(notify);
}
+static void connector_shutdown(grpc_connector *con) {}
+
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
@@ -103,7 +105,7 @@ static void connector_connect(grpc_connector *con,
}
static const grpc_connector_vtable connector_vtable = {
- connector_ref, connector_unref, connector_connect};
+ connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
grpc_subchannel_factory base;
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 0d48cd42d7..93c27c77bf 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -40,6 +40,9 @@
#include <grpc/support/alloc.h>
#include <grpc/support/time.h>
#include "src/core/channel/channel_stack.h"
+#include "src/core/client_config/lb_policy_registry.h"
+#include "src/core/client_config/lb_policies/pick_first.h"
+#include "src/core/client_config/lb_policies/round_robin.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/client_config/resolvers/dns_resolver.h"
#include "src/core/client_config/resolvers/sockaddr_resolver.h"
@@ -85,6 +88,9 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
+ grpc_lb_policy_registry_init(grpc_pick_first_lb_factory_create());
+ grpc_register_lb_policy(grpc_pick_first_lb_factory_create());
+ grpc_register_lb_policy(grpc_round_robin_lb_factory_create());
grpc_resolver_registry_init("dns:///");
grpc_register_resolver_type(grpc_dns_resolver_factory_create());
grpc_register_resolver_type(grpc_ipv4_resolver_factory_create());
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 6eb31ad63f..5a4cf4ad0e 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -47,7 +47,6 @@
#include "src/core/iomgr/tcp_client.h"
#include "src/core/security/auth_filters.h"
#include "src/core/security/credentials.h"
-#include "src/core/security/secure_transport_setup.h"
#include "src/core/surface/channel.h"
#include "src/core/transport/chttp2_transport.h"
#include "src/core/tsi/transport_security_interface.h"
@@ -61,6 +60,9 @@ typedef struct {
grpc_iomgr_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
+
+ gpr_mu mu;
+ grpc_endpoint *connecting_endpoint;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -75,16 +77,25 @@ static void connector_unref(grpc_connector *con) {
}
}
-static void on_secure_transport_setup_done(void *arg,
- grpc_security_status status,
- grpc_endpoint *wrapped_endpoint,
- grpc_endpoint *secure_endpoint) {
+static void on_secure_handshake_done(void *arg, grpc_security_status status,
+ grpc_endpoint *wrapped_endpoint,
+ grpc_endpoint *secure_endpoint) {
connector *c = arg;
grpc_iomgr_closure *notify;
- if (status != GRPC_SECURITY_OK) {
- gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
+ gpr_mu_lock(&c->mu);
+ if (c->connecting_endpoint == NULL) {
+ memset(c->result, 0, sizeof(*c->result));
+ gpr_mu_unlock(&c->mu);
+ } else if (status != GRPC_SECURITY_OK) {
+ GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint);
+ gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status);
memset(c->result, 0, sizeof(*c->result));
+ c->connecting_endpoint = NULL;
+ gpr_mu_unlock(&c->mu);
} else {
+ GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint);
+ c->connecting_endpoint = NULL;
+ gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, secure_endpoint, c->args.metadata_context, 1);
grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0);
@@ -102,8 +113,12 @@ static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
if (tcp != NULL) {
- grpc_setup_secure_transport(&c->security_connector->base, tcp,
- on_secure_transport_setup_done, c);
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->connecting_endpoint == NULL);
+ c->connecting_endpoint = tcp;
+ gpr_mu_unlock(&c->mu);
+ grpc_security_connector_do_handshake(&c->security_connector->base, tcp,
+ on_secure_handshake_done, c);
} else {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
@@ -112,6 +127,18 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
}
+static void connector_shutdown(grpc_connector *con) {
+ connector *c = (connector *)con;
+ grpc_endpoint *ep;
+ gpr_mu_lock(&c->mu);
+ ep = c->connecting_endpoint;
+ c->connecting_endpoint = NULL;
+ gpr_mu_unlock(&c->mu);
+ if (ep) {
+ grpc_endpoint_shutdown(ep);
+ }
+}
+
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
@@ -122,12 +149,15 @@ static void connector_connect(grpc_connector *con,
c->notify = notify;
c->args = *args;
c->result = result;
+ gpr_mu_lock(&c->mu);
+ GPR_ASSERT(c->connecting_endpoint == NULL);
+ gpr_mu_unlock(&c->mu);
grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr,
args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {
- connector_ref, connector_unref, connector_connect};
+ connector_ref, connector_unref, connector_shutdown, connector_connect};
typedef struct {
grpc_subchannel_factory base;
@@ -165,6 +195,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
+ gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
args->mdctx = f->mdctx;
args->args = final_args;