aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/transport/chttp2
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2')
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c97
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c130
2 files changed, 160 insertions, 67 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index e0bce57fc2..f63c77c8d6 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -58,15 +58,19 @@
typedef struct {
grpc_connector base;
+
+ gpr_mu mu;
gpr_refcount refs;
+ bool shutdown;
+
grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
grpc_closure initial_string_sent;
grpc_slice_buffer initial_string_buffer;
- grpc_endpoint *tcp;
+ grpc_endpoint *tcp; // Non-NULL until handshaking starts.
grpc_closure connected;
@@ -83,20 +87,39 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
if (gpr_unref(&c->refs)) {
/* c->initial_string_buffer does not need to be destroyed */
grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
+ // If handshaking is not yet in progress, destroy the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (c->tcp != NULL) grpc_endpoint_destroy(exec_ctx, c->tcp);
gpr_free(c);
}
}
-static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- connector_unref(exec_ctx, arg);
+static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
+ connector *c = (connector *)con;
+ gpr_mu_lock(&c->mu);
+ c->shutdown = true;
+ grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
+ // If handshaking is not yet in progress, shutdown the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (c->tcp != NULL) grpc_endpoint_shutdown(exec_ctx, c->tcp);
+ gpr_mu_unlock(&c->mu);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_handshaker_args *args = arg;
connector *c = args->user_data;
- if (error != GRPC_ERROR_NONE) {
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ // We were shut down after handshaking completed successfully, so
+ // shutdown the endpoint here.
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(args->args);
gpr_free(args->read_buffer);
@@ -110,52 +133,89 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_closure *notify = c->notify;
c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ gpr_mu_unlock(&c->mu);
+ connector_unref(exec_ctx, (grpc_connector*)c);
+}
+
+static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ connector *c = arg;
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ gpr_mu_unlock(&c->mu);
+ connector_unref(exec_ctx, arg);
+ } else {
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, c->tcp, c->args.channel_args,
+ c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
+ c->tcp = NULL; // Endpoint handed off to handshake manager.
+ gpr_mu_unlock(&c->mu);
+ }
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
- grpc_endpoint *tcp = c->tcp;
- if (tcp != NULL) {
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ gpr_mu_unlock(&c->mu);
+ connector_unref(exec_ctx, arg);
+ } else {
+ GPR_ASSERT(c->tcp != NULL);
if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
c);
grpc_slice_buffer_init(&c->initial_string_buffer);
grpc_slice_buffer_add(&c->initial_string_buffer,
c->args.initial_connect_string);
- connector_ref(arg);
- grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
+ grpc_endpoint_write(exec_ctx, c->tcp, &c->initial_string_buffer,
&c->initial_string_sent);
} else {
grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
+ exec_ctx, c->handshake_mgr, c->tcp, c->args.channel_args,
c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
+ c->tcp = NULL; // Endpoint handed off to handshake manager.
}
- } else {
- memset(c->result, 0, sizeof(*c->result));
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
+ gpr_mu_unlock(&c->mu);
}
}
-static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
-
static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
grpc_closure *notify) {
connector *c = (connector *)con;
+ gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
c->notify = notify;
c->args = *args;
c->result = result;
c->tcp = NULL;
+ connector_ref(con); // Ref taken for callback.
grpc_closure_init(&c->connected, connected, c);
grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp,
args->interested_parties, args->channel_args,
args->addr, args->deadline);
+ gpr_mu_unlock(&c->mu);
}
static const grpc_connector_vtable connector_vtable = {
@@ -177,6 +237,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
connector *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
+ gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
c->handshake_mgr = grpc_handshake_manager_create();
char *proxy_name = grpc_get_http_proxy_server();
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index ff6da0a545..d8b5a199f7 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -60,8 +60,12 @@
typedef struct {
grpc_connector base;
+
+ gpr_mu mu;
gpr_refcount refs;
+ bool shutdown;
+
grpc_channel_security_connector *security_connector;
grpc_closure *notify;
@@ -70,9 +74,7 @@ typedef struct {
grpc_closure initial_string_sent;
grpc_slice_buffer initial_string_buffer;
- gpr_mu mu;
- grpc_endpoint *connecting_endpoint;
- grpc_endpoint *newly_connecting_endpoint;
+ grpc_endpoint *endpoint; // Non-NULL until handshaking starts.
grpc_closure connected_closure;
@@ -88,87 +90,116 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
/* c->initial_string_buffer does not need to be destroyed */
+ gpr_mu_destroy(&c->mu);
grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
+ // If handshaking is not yet in progress, destroy the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint);
gpr_free(c);
}
}
+static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
+ connector *c = (connector *)con;
+ gpr_mu_lock(&c->mu);
+ c->shutdown = true;
+ grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr);
+ // If handshaking is not yet in progress, shutdown the endpoint.
+ // Otherwise, the handshaker will do this for us.
+ if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint);
+ gpr_mu_unlock(&c->mu);
+}
+
static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_handshaker_args *args = arg;
connector *c = args->user_data;
gpr_mu_lock(&c->mu);
- if (error != GRPC_ERROR_NONE) {
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ // We were shut down after handshaking completed successfully, so
+ // shutdown the endpoint here.
+ grpc_endpoint_shutdown(exec_ctx, args->endpoint);
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(args->args);
gpr_free(args->read_buffer);
} else {
- if (c->connecting_endpoint == NULL) {
- memset(c->result, 0, sizeof(*c->result));
- gpr_mu_unlock(&c->mu);
- } else {
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- c->result->transport = grpc_create_chttp2_transport(
- exec_ctx, args->args, args->endpoint, 1);
- grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
- args->read_buffer);
- }
+ c->result->transport =
+ grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
+ GPR_ASSERT(c->result->transport);
+ grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
+ args->read_buffer);
c->result->channel_args = args->args;
}
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
+ gpr_mu_unlock(&c->mu);
+ connector_unref(exec_ctx, (grpc_connector*)c);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
connector *c = arg;
- grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args,
- c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
+ gpr_mu_unlock(&c->mu);
+ connector_unref(exec_ctx, arg);
+ } else {
+ grpc_handshake_manager_do_handshake(
+ exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
+ c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
+ c->endpoint = NULL; // Endpoint handed off to handshake manager.
+ gpr_mu_unlock(&c->mu);
+ }
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
- grpc_endpoint *tcp = c->newly_connecting_endpoint;
- if (tcp != NULL) {
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->connecting_endpoint == NULL);
- c->connecting_endpoint = tcp;
+ gpr_mu_lock(&c->mu);
+ if (error != GRPC_ERROR_NONE || c->shutdown) {
+ if (error == GRPC_ERROR_NONE) {
+ error = GRPC_ERROR_CREATE("connector shutdown");
+ } else {
+ error = GRPC_ERROR_REF(error);
+ }
+ memset(c->result, 0, sizeof(*c->result));
+ grpc_closure *notify = c->notify;
+ c->notify = NULL;
+ grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
gpr_mu_unlock(&c->mu);
+ connector_unref(exec_ctx, arg);
+ } else {
+ GPR_ASSERT(c->endpoint != NULL);
if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
c);
grpc_slice_buffer_init(&c->initial_string_buffer);
grpc_slice_buffer_add(&c->initial_string_buffer,
c->args.initial_connect_string);
- grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
+ grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer,
&c->initial_string_sent);
} else {
grpc_handshake_manager_do_handshake(
- exec_ctx, c->handshake_mgr, tcp, c->args.channel_args,
+ exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
c->args.deadline, NULL /* acceptor */, on_handshake_done, c);
+ c->endpoint = NULL; // Endpoint handed off to handshake manager.
}
- } else {
- memset(c->result, 0, sizeof(*c->result));
- grpc_closure *notify = c->notify;
- c->notify = NULL;
- grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
- }
-}
-
-static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
- connector *c = (connector *)con;
- grpc_endpoint *ep;
- gpr_mu_lock(&c->mu);
- ep = c->connecting_endpoint;
- c->connecting_endpoint = NULL;
- gpr_mu_unlock(&c->mu);
- if (ep) {
- grpc_endpoint_shutdown(exec_ctx, ep);
+ gpr_mu_unlock(&c->mu);
}
}
@@ -177,17 +208,18 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
grpc_connect_out_args *result,
grpc_closure *notify) {
connector *c = (connector *)con;
+ gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == NULL);
c->notify = notify;
c->args = *args;
c->result = result;
- gpr_mu_lock(&c->mu);
- GPR_ASSERT(c->connecting_endpoint == NULL);
- gpr_mu_unlock(&c->mu);
+ GPR_ASSERT(c->endpoint == NULL);
+ connector_ref(con); // Ref taken for callback.
grpc_closure_init(&c->connected_closure, connected, c);
grpc_tcp_client_connect(
- exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint,
- args->interested_parties, args->channel_args, args->addr, args->deadline);
+ exec_ctx, &c->connected_closure, &c->endpoint, args->interested_parties,
+ args->channel_args, args->addr, args->deadline);
+ gpr_mu_unlock(&c->mu);
}
static const grpc_connector_vtable connector_vtable = {