diff options
author | Mark D. Roth <roth@google.com> | 2016-11-17 11:10:00 -0800 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-11-17 11:10:00 -0800 |
commit | 183a59fa23bb18c7efe458949a1dbde25994433d (patch) | |
tree | 160b2a81eadea3d10d0605bdf9a45a621fd7ff7b /src/core/ext/transport | |
parent | 3975768bdbbbf2d8911d1d098f235bfb8448f30e (diff) |
Clean up connector code.
Diffstat (limited to 'src/core/ext/transport')
-rw-r--r-- | src/core/ext/transport/chttp2/client/insecure/channel_create.c | 97 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/client/secure/secure_channel_create.c | 130 |
2 files changed, 160 insertions, 67 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index e0bce57fc2..f63c77c8d6 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -58,15 +58,19 @@ typedef struct { grpc_connector base; + + gpr_mu mu; gpr_refcount refs; + bool shutdown; + grpc_closure *notify; grpc_connect_in_args args; grpc_connect_out_args *result; grpc_closure initial_string_sent; grpc_slice_buffer initial_string_buffer; - grpc_endpoint *tcp; + grpc_endpoint *tcp; // Non-NULL until handshaking starts. grpc_closure connected; @@ -83,20 +87,39 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); + // If handshaking is not yet in progress, destroy the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->tcp != NULL) grpc_endpoint_destroy(exec_ctx, c->tcp); gpr_free(c); } } -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - connector_unref(exec_ctx, arg); +static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) { + connector *c = (connector *)con; + gpr_mu_lock(&c->mu); + c->shutdown = true; + grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); + // If handshaking is not yet in progress, shutdown the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->tcp != NULL) grpc_endpoint_shutdown(exec_ctx, c->tcp); + gpr_mu_unlock(&c->mu); } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; connector *c = args->user_data; - if (error != GRPC_ERROR_NONE) { + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + // We were shut down after handshaking completed successfully, so + // shutdown the endpoint here. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(args->args); gpr_free(args->read_buffer); @@ -110,52 +133,89 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } grpc_closure *notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + connector_unref(exec_ctx, (grpc_connector*)c); +} + +static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + connector *c = arg; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + connector_unref(exec_ctx, arg); + } else { + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->tcp, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->tcp = NULL; // Endpoint handed off to handshake manager. + gpr_mu_unlock(&c->mu); + } } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_endpoint *tcp = c->tcp; - if (tcp != NULL) { + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + connector_unref(exec_ctx, arg); + } else { + GPR_ASSERT(c->tcp != NULL); if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, c); grpc_slice_buffer_init(&c->initial_string_buffer); grpc_slice_buffer_add(&c->initial_string_buffer, c->args.initial_connect_string); - connector_ref(arg); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, + grpc_endpoint_write(exec_ctx, c->tcp, &c->initial_string_buffer, &c->initial_string_sent); } else { grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + exec_ctx, c->handshake_mgr, c->tcp, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->tcp = NULL; // Endpoint handed off to handshake manager. } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); + gpr_mu_unlock(&c->mu); } } -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} - static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, const grpc_connect_in_args *args, grpc_connect_out_args *result, grpc_closure *notify) { connector *c = (connector *)con; + gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); GPR_ASSERT(notify->cb); c->notify = notify; c->args = *args; c->result = result; c->tcp = NULL; + connector_ref(con); // Ref taken for callback. grpc_closure_init(&c->connected, connected, c); grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp, args->interested_parties, args->channel_args, args->addr, args->deadline); + gpr_mu_unlock(&c->mu); } static const grpc_connector_vtable connector_vtable = { @@ -177,6 +237,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel( connector *c = gpr_malloc(sizeof(*c)); memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; + gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); c->handshake_mgr = grpc_handshake_manager_create(); char *proxy_name = grpc_get_http_proxy_server(); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index ff6da0a545..d8b5a199f7 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -60,8 +60,12 @@ typedef struct { grpc_connector base; + + gpr_mu mu; gpr_refcount refs; + bool shutdown; + grpc_channel_security_connector *security_connector; grpc_closure *notify; @@ -70,9 +74,7 @@ typedef struct { grpc_closure initial_string_sent; grpc_slice_buffer initial_string_buffer; - gpr_mu mu; - grpc_endpoint *connecting_endpoint; - grpc_endpoint *newly_connecting_endpoint; + grpc_endpoint *endpoint; // Non-NULL until handshaking starts. grpc_closure connected_closure; @@ -88,87 +90,116 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ + gpr_mu_destroy(&c->mu); grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); + // If handshaking is not yet in progress, destroy the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint); gpr_free(c); } } +static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) { + connector *c = (connector *)con; + gpr_mu_lock(&c->mu); + c->shutdown = true; + grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); + // If handshaking is not yet in progress, shutdown the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); + gpr_mu_unlock(&c->mu); +} + static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; connector *c = args->user_data; gpr_mu_lock(&c->mu); - if (error != GRPC_ERROR_NONE) { - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + // We were shut down after handshaking completed successfully, so + // shutdown the endpoint here. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(args->args); gpr_free(args->read_buffer); } else { - if (c->connecting_endpoint == NULL) { - memset(c->result, 0, sizeof(*c->result)); - gpr_mu_unlock(&c->mu); - } else { - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - c->result->transport = grpc_create_chttp2_transport( - exec_ctx, args->args, args->endpoint, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, - args->read_buffer); - } + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, + args->read_buffer); c->result->channel_args = args->args; } grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); + gpr_mu_unlock(&c->mu); + connector_unref(exec_ctx, (grpc_connector*)c); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + connector_unref(exec_ctx, arg); + } else { + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->endpoint = NULL; // Endpoint handed off to handshake manager. + gpr_mu_unlock(&c->mu); + } } static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { connector *c = arg; - grpc_endpoint *tcp = c->newly_connecting_endpoint; - if (tcp != NULL) { - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - c->connecting_endpoint = tcp; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); gpr_mu_unlock(&c->mu); + connector_unref(exec_ctx, arg); + } else { + GPR_ASSERT(c->endpoint != NULL); if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, c); grpc_slice_buffer_init(&c->initial_string_buffer); grpc_slice_buffer_add(&c->initial_string_buffer, c->args.initial_connect_string); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, + grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer, &c->initial_string_sent); } else { grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, + exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->endpoint = NULL; // Endpoint handed off to handshake manager. } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } -} - -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - grpc_endpoint *ep; - gpr_mu_lock(&c->mu); - ep = c->connecting_endpoint; - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - if (ep) { - grpc_endpoint_shutdown(exec_ctx, ep); + gpr_mu_unlock(&c->mu); } } @@ -177,17 +208,18 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, grpc_connect_out_args *result, grpc_closure *notify) { connector *c = (connector *)con; + gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; c->args = *args; c->result = result; - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - gpr_mu_unlock(&c->mu); + GPR_ASSERT(c->endpoint == NULL); + connector_ref(con); // Ref taken for callback. grpc_closure_init(&c->connected_closure, connected, c); grpc_tcp_client_connect( - exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, - args->interested_parties, args->channel_args, args->addr, args->deadline); + exec_ctx, &c->connected_closure, &c->endpoint, args->interested_parties, + args->channel_args, args->addr, args->deadline); + gpr_mu_unlock(&c->mu); } static const grpc_connector_vtable connector_vtable = { |