diff options
Diffstat (limited to 'src/core/ext')
6 files changed, 598 insertions, 508 deletions
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c new file mode 100644 index 0000000000..43649376a1 --- /dev/null +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -0,0 +1,239 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/grpc.h> + +#include <string.h> + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> + +#include "src/core/ext/client_channel/http_connect_handshaker.h" +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/client_channel/connector.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/security/transport/security_connector.h" + +typedef struct { + grpc_connector base; + + gpr_mu mu; + gpr_refcount refs; + + bool shutdown; + + grpc_closure *notify; + grpc_connect_in_args args; + grpc_connect_out_args *result; + grpc_closure initial_string_sent; + grpc_slice_buffer initial_string_buffer; + + grpc_endpoint *endpoint; // Non-NULL until handshaking starts. + + grpc_closure connected; + + grpc_handshake_manager *handshake_mgr; +} chttp2_connector; + +static void chttp2_connector_ref(grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_ref(&c->refs); +} + +static void chttp2_connector_unref(grpc_exec_ctx *exec_ctx, + grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + if (gpr_unref(&c->refs)) { + /* c->initial_string_buffer does not need to be destroyed */ + gpr_mu_destroy(&c->mu); + grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); + // If handshaking is not yet in progress, destroy the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_destroy(exec_ctx, c->endpoint); + gpr_free(c); + } +} + +static void chttp2_connector_shutdown(grpc_exec_ctx *exec_ctx, + grpc_connector *con) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_mu_lock(&c->mu); + c->shutdown = true; + grpc_handshake_manager_shutdown(exec_ctx, c->handshake_mgr); + // If handshaking is not yet in progress, shutdown the endpoint. + // Otherwise, the handshaker will do this for us. + if (c->endpoint != NULL) grpc_endpoint_shutdown(exec_ctx, c->endpoint); + gpr_mu_unlock(&c->mu); +} + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_handshaker_args *args = arg; + chttp2_connector *c = args->user_data; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + // We were shut down after handshaking completed successfully, so + // shutdown the endpoint here. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_channel_args_destroy(args->args); + gpr_free(args->read_buffer); + } else { + c->result->transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); + GPR_ASSERT(c->result->transport); + grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, + args->read_buffer); + c->result->channel_args = args->args; + } + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, (grpc_connector*)c); +} + +static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + chttp2_connector *c = arg; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, arg); + } else { + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->endpoint = NULL; // Endpoint handed off to handshake manager. + gpr_mu_unlock(&c->mu); + } +} + +static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + chttp2_connector *c = arg; + gpr_mu_lock(&c->mu); + if (error != GRPC_ERROR_NONE || c->shutdown) { + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE("connector shutdown"); + } else { + error = GRPC_ERROR_REF(error); + } + memset(c->result, 0, sizeof(*c->result)); + grpc_closure *notify = c->notify; + c->notify = NULL; + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); + gpr_mu_unlock(&c->mu); + chttp2_connector_unref(exec_ctx, arg); + } else { + GPR_ASSERT(c->endpoint != NULL); + if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { + grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, + c); + grpc_slice_buffer_init(&c->initial_string_buffer); + grpc_slice_buffer_add(&c->initial_string_buffer, + c->args.initial_connect_string); + grpc_endpoint_write(exec_ctx, c->endpoint, &c->initial_string_buffer, + &c->initial_string_sent); + } else { + grpc_handshake_manager_do_handshake( + exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, + c->args.deadline, NULL /* acceptor */, on_handshake_done, c); + c->endpoint = NULL; // Endpoint handed off to handshake manager. + } + gpr_mu_unlock(&c->mu); + } +} + +static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, + grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_closure *notify) { + chttp2_connector *c = (chttp2_connector *)con; + gpr_mu_lock(&c->mu); + GPR_ASSERT(c->notify == NULL); + c->notify = notify; + c->args = *args; + c->result = result; + GPR_ASSERT(c->endpoint == NULL); + chttp2_connector_ref(con); // Ref taken for callback. + grpc_closure_init(&c->connected, connected, c); + grpc_tcp_client_connect(exec_ctx, &c->connected, &c->endpoint, + args->interested_parties, args->channel_args, + args->addr, args->deadline); + gpr_mu_unlock(&c->mu); +} + +static const grpc_connector_vtable chttp2_connector_vtable = { + chttp2_connector_ref, chttp2_connector_unref, chttp2_connector_shutdown, + chttp2_connector_connect}; + +grpc_connector *grpc_chttp2_connector_create( + grpc_exec_ctx *exec_ctx, const char* server_name, + grpc_channel_security_connector* security_connector) { + chttp2_connector *c = gpr_malloc(sizeof(*c)); + memset(c, 0, sizeof(*c)); + c->base.vtable = &chttp2_connector_vtable; + gpr_mu_init(&c->mu); + gpr_ref_init(&c->refs, 1); + c->handshake_mgr = grpc_handshake_manager_create(); + char *proxy_name = grpc_get_http_proxy_server(); + if (proxy_name != NULL) { + grpc_handshake_manager_add( + c->handshake_mgr, + grpc_http_connect_handshaker_create(proxy_name, server_name)); + gpr_free(proxy_name); + } + if (security_connector != NULL) { + grpc_channel_security_connector_create_handshakers( + exec_ctx, security_connector, c->handshake_mgr); + } + return &c->base; +} diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h new file mode 100644 index 0000000000..5a85d8fa7c --- /dev/null +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H + +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/security/transport/security_connector.h" + +grpc_connector *grpc_chttp2_connector_create( + grpc_exec_ctx *exec_ctx, const char* server_name, + grpc_channel_security_connector* security_connector); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 00b272de27..5bb0f83f69 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -33,134 +33,16 @@ #include <grpc/grpc.h> -#include <stdlib.h> #include <string.h> -#include <grpc/slice.h> -#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/resolver_registry.h" -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" -#include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/compress_filter.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_client_filter.h" -#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -// -// connector -// - -typedef struct { - grpc_connector base; - gpr_refcount refs; - - grpc_closure *notify; - grpc_connect_in_args args; - grpc_connect_out_args *result; - grpc_closure initial_string_sent; - grpc_slice_buffer initial_string_buffer; - - grpc_endpoint *tcp; - - grpc_closure connected; - - grpc_handshake_manager *handshake_mgr; -} connector; - -static void connector_ref(grpc_connector *con) { - connector *c = (connector *)con; - gpr_ref(&c->refs); -} - -static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - if (gpr_unref(&c->refs)) { - /* c->initial_string_buffer does not need to be destroyed */ - grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); - gpr_free(c); - } -} - -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - connector_unref(exec_ctx, arg); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_handshaker_args *args = arg; - connector *c = args->user_data; - if (error == GRPC_ERROR_NONE) { - c->result->transport = - grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); - GPR_ASSERT(c->result->transport); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, - args->read_buffer); - c->result->channel_args = args->args; - } - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); -} - -static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - connector *c = arg; - grpc_endpoint *tcp = c->tcp; - if (tcp != NULL) { - if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { - grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); - grpc_slice_buffer_init(&c->initial_string_buffer); - grpc_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); - connector_ref(arg); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, - &c->initial_string_sent); - } else { - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); - } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } -} - -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {} - -static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, - const grpc_connect_in_args *args, - grpc_connect_out_args *result, - grpc_closure *notify) { - connector *c = (connector *)con; - GPR_ASSERT(c->notify == NULL); - GPR_ASSERT(notify->cb); - c->notify = notify; - c->args = *args; - c->result = result; - c->tcp = NULL; - grpc_closure_init(&c->connected, connected, c); - grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp, - args->interested_parties, args->channel_args, - args->addr, args->deadline); -} - -static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_shutdown, connector_connect}; - -// -// client_channel_factory -// - static void client_channel_factory_ref( grpc_client_channel_factory *cc_factory) {} @@ -170,20 +52,10 @@ static void client_channel_factory_unref( static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { - connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - c->base.vtable = &connector_vtable; - gpr_ref_init(&c->refs, 1); - c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add( - c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name, args->server_name)); - gpr_free(proxy_name); - } - grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); - grpc_connector_unref(exec_ctx, &c->base); + grpc_connector *connector = grpc_chttp2_connector_create( + exec_ctx, args->server_name, NULL /* security_connector */); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); + grpc_connector_unref(exec_ctx, connector); return s; } @@ -194,16 +66,14 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel *channel = grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL); grpc_resolver *resolver = grpc_resolver_create(target, args); - if (!resolver) { + if (resolver == NULL) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); return NULL; } - grpc_client_channel_finish_initialization( exec_ctx, grpc_channel_get_channel_stack(channel), resolver, cc_factory); GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel"); - return channel; } @@ -226,16 +96,13 @@ grpc_channel *grpc_insecure_channel_create(const char *target, GRPC_API_TRACE( "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3, (target, args, reserved)); - GPR_ASSERT(!reserved); - + GPR_ASSERT(reserved == NULL); grpc_client_channel_factory *factory = (grpc_client_channel_factory *)&client_channel_factory; grpc_channel *channel = client_channel_factory_create_channel( &exec_ctx, factory, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, args); - grpc_client_channel_factory_unref(&exec_ctx, factory); grpc_exec_ctx_finish(&exec_ctx); - return channel != NULL ? channel : grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, "Failed to create client channel"); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index b4a30f94fc..a5cc1633ae 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -33,194 +33,18 @@ #include <grpc/grpc.h> -#include <stdlib.h> #include <string.h> -#include <grpc/slice.h> -#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include "src/core/ext/client_channel/client_channel.h" -#include "src/core/ext/client_channel/http_connect_handshaker.h" #include "src/core/ext/client_channel/resolver_registry.h" -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/iomgr/tcp_client.h" -#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/auth_filters.h" +#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" -#include "src/core/lib/tsi/transport_security_interface.h" - -// -// connector -// - -typedef struct { - grpc_connector base; - gpr_refcount refs; - - grpc_channel_security_connector *security_connector; - - grpc_closure *notify; - grpc_connect_in_args args; - grpc_connect_out_args *result; - grpc_closure initial_string_sent; - grpc_slice_buffer initial_string_buffer; - - gpr_mu mu; - grpc_endpoint *connecting_endpoint; - grpc_endpoint *newly_connecting_endpoint; - - grpc_closure connected_closure; - - grpc_handshake_manager *handshake_mgr; - - // TODO(roth): Remove once we eliminate on_secure_handshake_done(). - grpc_channel_args *tmp_args; -} connector; - -static void connector_ref(grpc_connector *con) { - connector *c = (connector *)con; - gpr_ref(&c->refs); -} - -static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - if (gpr_unref(&c->refs)) { - /* c->initial_string_buffer does not need to be destroyed */ - grpc_channel_args_destroy(c->tmp_args); - grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); - gpr_free(c); - } -} - -static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - connector *c = arg; - gpr_mu_lock(&c->mu); - grpc_error *error = GRPC_ERROR_NONE; - if (c->connecting_endpoint == NULL) { - memset(c->result, 0, sizeof(*c->result)); - gpr_mu_unlock(&c->mu); - } else if (status != GRPC_SECURITY_OK) { - error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"), - GRPC_ERROR_INT_SECURITY_STATUS, status); - memset(c->result, 0, sizeof(*c->result)); - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - } else { - grpc_arg auth_context_arg; - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - c->result->transport = grpc_create_chttp2_transport( - exec_ctx, c->args.channel_args, secure_endpoint, 1); - grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL); - auth_context_arg = grpc_auth_context_to_arg(auth_context); - c->result->channel_args = - grpc_channel_args_copy_and_add(c->tmp_args, &auth_context_arg, 1); - } - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_handshaker_args *args = arg; - connector *c = args->user_data; - c->tmp_args = args->args; - if (error != GRPC_ERROR_NONE) { - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } else { - // TODO(roth, jboeuf): Convert security connector handshaking to use new - // handshake API, and then move the code from on_secure_handshake_done() - // into this function. - grpc_channel_security_connector_do_handshake( - exec_ctx, c->security_connector, args->endpoint, args->read_buffer, - c->args.deadline, on_secure_handshake_done, c); - } -} - -static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - connector *c = arg; - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, c->connecting_endpoint, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); -} - -static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - connector *c = arg; - grpc_endpoint *tcp = c->newly_connecting_endpoint; - if (tcp != NULL) { - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - c->connecting_endpoint = tcp; - gpr_mu_unlock(&c->mu); - if (!GRPC_SLICE_IS_EMPTY(c->args.initial_connect_string)) { - grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent, - c); - grpc_slice_buffer_init(&c->initial_string_buffer); - grpc_slice_buffer_add(&c->initial_string_buffer, - c->args.initial_connect_string); - grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer, - &c->initial_string_sent); - } else { - grpc_handshake_manager_do_handshake( - exec_ctx, c->handshake_mgr, tcp, c->args.channel_args, - c->args.deadline, NULL /* acceptor */, on_handshake_done, c); - } - } else { - memset(c->result, 0, sizeof(*c->result)); - grpc_closure *notify = c->notify; - c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); - } -} - -static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) { - connector *c = (connector *)con; - grpc_endpoint *ep; - gpr_mu_lock(&c->mu); - ep = c->connecting_endpoint; - c->connecting_endpoint = NULL; - gpr_mu_unlock(&c->mu); - if (ep) { - grpc_endpoint_shutdown(exec_ctx, ep); - } -} - -static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, - const grpc_connect_in_args *args, - grpc_connect_out_args *result, - grpc_closure *notify) { - connector *c = (connector *)con; - GPR_ASSERT(c->notify == NULL); - c->notify = notify; - c->args = *args; - c->result = result; - gpr_mu_lock(&c->mu); - GPR_ASSERT(c->connecting_endpoint == NULL); - gpr_mu_unlock(&c->mu); - grpc_closure_init(&c->connected_closure, connected, c); - grpc_tcp_client_connect( - exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, - args->interested_parties, args->channel_args, args->addr, args->deadline); -} - -static const grpc_connector_vtable connector_vtable = { - connector_ref, connector_unref, connector_shutdown, connector_connect}; - -// -// client_channel_factory -// typedef struct { grpc_client_channel_factory base; @@ -248,22 +72,11 @@ static grpc_subchannel *client_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const grpc_subchannel_args *args) { client_channel_factory *f = (client_channel_factory *)cc_factory; - connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); - c->base.vtable = &connector_vtable; - c->security_connector = f->security_connector; - c->handshake_mgr = grpc_handshake_manager_create(); - char *proxy_name = grpc_get_http_proxy_server(); - if (proxy_name != NULL) { - grpc_handshake_manager_add( - c->handshake_mgr, - grpc_http_connect_handshaker_create(proxy_name, args->server_name)); - gpr_free(proxy_name); - } - gpr_mu_init(&c->mu); - gpr_ref_init(&c->refs, 1); - grpc_subchannel *s = grpc_subchannel_create(exec_ctx, &c->base, args); - grpc_connector_unref(exec_ctx, &c->base); + grpc_connector *connector = + grpc_chttp2_connector_create(exec_ctx, args->server_name, + f->security_connector); + grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); + grpc_connector_unref(exec_ctx, connector); return s; } @@ -275,15 +88,14 @@ static grpc_channel *client_channel_factory_create_channel( grpc_channel *channel = grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL); grpc_resolver *resolver = grpc_resolver_create(target, args); - if (resolver != NULL) { - grpc_client_channel_finish_initialization( - exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); - GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create"); - } else { + if (resolver == NULL) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); - channel = NULL; + return NULL; } + grpc_client_channel_finish_initialization( + exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "create_channel"); return channel; } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 1b38d4decd..9284d19357 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -33,6 +33,8 @@ #include <grpc/grpc.h> +#include <string.h> + #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -47,78 +49,177 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_connect_state { +typedef struct pending_handshake_manager_node { + grpc_handshake_manager* handshake_mgr; + struct pending_handshake_manager_node *next; +} pending_handshake_manager_node; + +typedef struct { grpc_server *server; + grpc_tcp_server *tcp_server; + gpr_mu mu; + bool shutdown; + grpc_closure tcp_server_shutdown_complete; + grpc_closure *server_destroy_listener_done; + pending_handshake_manager_node *pending_handshake_mgrs; +} server_state; + +typedef struct { + server_state *server_state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; grpc_handshake_manager *handshake_mgr; -} server_connect_state; +} server_connection_state; + +static void pending_handshake_manager_add_locked( + server_state* state, grpc_handshake_manager* handshake_mgr) { + pending_handshake_manager_node* node = gpr_malloc(sizeof(*node)); + node->handshake_mgr = handshake_mgr; + node->next = state->pending_handshake_mgrs; + state->pending_handshake_mgrs = node; +} + +static void pending_handshake_manager_remove_locked( + server_state* state, grpc_handshake_manager* handshake_mgr) { + pending_handshake_manager_node** prev_node = &state->pending_handshake_mgrs; + for (pending_handshake_manager_node* node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + if (node->handshake_mgr == handshake_mgr) { + *prev_node = node->next; + gpr_free(node); + break; + } + prev_node = &node->next; + } +} + +static void pending_handshake_manager_shutdown_locked( + grpc_exec_ctx* exec_ctx, server_state* state) { + pending_handshake_manager_node* prev_node = NULL; + for (pending_handshake_manager_node* node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + gpr_free(prev_node); + prev_node = node; + } + gpr_free(prev_node); + state->pending_handshake_mgrs = NULL; +} static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; - server_connect_state *state = args->user_data; + server_connection_state *connection_state = args->user_data; if (error != GRPC_ERROR_NONE) { const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); + gpr_mu_lock(&connection_state->server_state->mu); } else { - // Beware that the call to grpc_create_chttp2_transport() has to happen - // before grpc_tcp_server_destroy(). This is fine here, but similar code - // asynchronously doing a handshake instead of calling - // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add - // synchronization to avoid this case. - grpc_transport *transport = - grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); - grpc_server_setup_transport(exec_ctx, state->server, transport, - state->accepting_pollset, - grpc_server_get_channel_args(state->server)); - grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); + gpr_mu_lock(&connection_state->server_state->mu); + if (!connection_state->server_state->shutdown) { + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, + grpc_server_get_channel_args(connection_state->server_state->server)); + grpc_chttp2_transport_start_reading(exec_ctx, transport, + args->read_buffer); + } else { + // Need to destroy this here, because the server may have already + // gone away. + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + } grpc_channel_args_destroy(args->args); } - // Clean up. - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - gpr_free(state); + pending_handshake_manager_remove_locked(connection_state->server_state, + connection_state->handshake_mgr); + gpr_mu_unlock(&connection_state->server_state->mu); + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + gpr_free(connection_state); } -static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { - server_connect_state *state = gpr_malloc(sizeof(server_connect_state)); - state->server = server; - state->accepting_pollset = accepting_pollset; - state->acceptor = acceptor; - state->handshake_mgr = grpc_handshake_manager_create(); + server_state* state = arg; + gpr_mu_lock(&state->mu); + if (state->shutdown) { + gpr_mu_unlock(&state->mu); + grpc_endpoint_destroy(exec_ctx, tcp); + return; + } + grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); + pending_handshake_manager_add_locked(state, handshake_mgr); + gpr_mu_unlock(&state->mu); + grpc_tcp_server_ref(state->tcp_server); + server_connection_state *connection_state = + gpr_malloc(sizeof(*connection_state)); + connection_state->server_state = state; + connection_state->accepting_pollset = accepting_pollset; + connection_state->acceptor = acceptor; + connection_state->handshake_mgr = handshake_mgr; // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); grpc_handshake_manager_do_handshake( - exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), - deadline, acceptor, on_handshake_done, state); + exec_ctx, connection_state->handshake_mgr, tcp, + grpc_server_get_channel_args(state->server), + deadline, acceptor, on_handshake_done, connection_state); } /* Server callback: start listening on our ports */ -static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_pollset **pollsets, size_t pollset_count) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, - server); +static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, + void *arg, grpc_pollset **pollsets, + size_t pollset_count) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = false; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count, + on_accept, state); +} + +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + server_state *state = arg; + /* ensure all threads have unlocked */ + gpr_mu_lock(&state->mu); + grpc_closure *destroy_done = state->server_destroy_listener_done; + GPR_ASSERT(state->shutdown); + pending_handshake_manager_shutdown_locked(exec_ctx, state); + gpr_mu_unlock(&state->mu); + // Invoke callback. + if (destroy_done != NULL) { + grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_REF(error), NULL); + } + gpr_mu_destroy(&state->mu); + gpr_free(state); } /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_closure *destroy_done) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); +static void server_destroy_listener(grpc_exec_ctx *exec_ctx, + grpc_server *server, void *arg, + grpc_closure *destroy_done) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = true; + state->server_destroy_listener_done = destroy_done; + grpc_tcp_server *tcp_server = state->tcp_server; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server); + grpc_tcp_server_unref(exec_ctx, tcp_server); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; + grpc_tcp_server *tcp_server = NULL; size_t i; size_t count = 0; int port_num = -1; @@ -134,17 +235,28 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { if (err != GRPC_ERROR_NONE) { goto error; } - - err = grpc_tcp_server_create(&exec_ctx, NULL, - grpc_server_get_channel_args(server), &tcp); + server_state* state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, state); + err = grpc_tcp_server_create(&exec_ctx, + &state->tcp_server_shutdown_complete, + grpc_server_get_channel_args(server), + &tcp_server); if (err != GRPC_ERROR_NONE) { goto error; } + state->server = server; + state->tcp_server = tcp_server; + state->shutdown = true; + gpr_mu_init(&state->mu); + const size_t naddrs = resolved->naddrs; errors = gpr_malloc(sizeof(*errors) * naddrs); for (i = 0; i < naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); + errors[i] = grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], + &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; @@ -177,7 +289,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses_destroy(resolved); /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, tcp, start, destroy); + grpc_server_add_listener(&exec_ctx, server, state, + server_start_listener, server_destroy_listener); goto done; /* Error path: cleanup and return */ @@ -186,8 +299,8 @@ error: if (resolved) { grpc_resolved_addresses_destroy(resolved); } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); + if (tcp_server) { + grpc_tcp_server_unref(&exec_ctx, tcp_server); } port_num = 0; diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 22af94199f..afdf93398f 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -54,175 +54,189 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" +typedef struct pending_handshake_manager_node { + grpc_handshake_manager* handshake_mgr; + struct pending_handshake_manager_node *next; +} pending_handshake_manager_node; + typedef struct server_secure_state { grpc_server *server; - grpc_tcp_server *tcp; + grpc_tcp_server *tcp_server; grpc_server_security_connector *sc; grpc_server_credentials *creds; - bool is_shutdown; gpr_mu mu; + bool shutdown; grpc_closure tcp_server_shutdown_complete; grpc_closure *server_destroy_listener_done; + pending_handshake_manager_node *pending_handshake_mgrs; } server_secure_state; -typedef struct server_secure_connect { +typedef struct server_secure_connection_state { server_secure_state *server_state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; grpc_handshake_manager *handshake_mgr; - // TODO(roth): Remove the following two fields when we eliminate - // grpc_server_security_connector_do_handshake(). - gpr_timespec deadline; - grpc_channel_args *args; -} server_secure_connect; +} server_secure_connection_state; -static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - server_secure_connect *connection_state = statep; - if (status == GRPC_SECURITY_OK) { - if (secure_endpoint) { - gpr_mu_lock(&connection_state->server_state->mu); - if (!connection_state->server_state->is_shutdown) { - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args( - connection_state->server_state->server), - secure_endpoint, 0); - grpc_arg args_to_add[2]; - args_to_add[0] = grpc_server_credentials_to_arg( - connection_state->server_state->creds); - args_to_add[1] = grpc_auth_context_to_arg(auth_context); - grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( - connection_state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, - connection_state->accepting_pollset, args_copy); - grpc_channel_args_destroy(args_copy); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL); - } else { - /* We need to consume this here, because the server may already have - * gone away. */ - grpc_endpoint_destroy(exec_ctx, secure_endpoint); - } - gpr_mu_unlock(&connection_state->server_state->mu); +static void pending_handshake_manager_add_locked( + server_secure_state* state, grpc_handshake_manager* handshake_mgr) { + pending_handshake_manager_node* node = gpr_malloc(sizeof(*node)); + node->handshake_mgr = handshake_mgr; + node->next = state->pending_handshake_mgrs; + state->pending_handshake_mgrs = node; +} + +static void pending_handshake_manager_remove_locked( + server_secure_state* state, grpc_handshake_manager* handshake_mgr) { + pending_handshake_manager_node** prev_node = &state->pending_handshake_mgrs; + for (pending_handshake_manager_node* node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + if (node->handshake_mgr == handshake_mgr) { + *prev_node = node->next; + gpr_free(node); + break; } - } else { - gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); + prev_node = &node->next; } - grpc_channel_args_destroy(connection_state->args); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); +} + +static void pending_handshake_manager_shutdown_locked( + grpc_exec_ctx* exec_ctx, server_secure_state* state) { + pending_handshake_manager_node* prev_node = NULL; + for (pending_handshake_manager_node* node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + gpr_free(prev_node); + prev_node = node; + } + gpr_free(prev_node); + state->pending_handshake_mgrs = NULL; } static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; - server_secure_connect *connection_state = args->user_data; + server_secure_connection_state *connection_state = args->user_data; if (error != GRPC_ERROR_NONE) { const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); - return; + gpr_mu_lock(&connection_state->server_state->mu); + } else { + gpr_mu_lock(&connection_state->server_state->mu); + if (!connection_state->server_state->shutdown) { + grpc_arg channel_arg = grpc_server_credentials_to_arg( + connection_state->server_state->creds); + grpc_channel_args *args_copy = + grpc_channel_args_copy_and_add(args->args, &channel_arg, 1); + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args_copy, args->endpoint, 0); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, args_copy); + grpc_channel_args_destroy(args_copy); + grpc_chttp2_transport_start_reading(exec_ctx, transport, + args->read_buffer); + } else { + // Need to destroy this here, because the server may have already + // gone away. + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + } + grpc_channel_args_destroy(args->args); } - // TODO(roth, jboeuf): Convert security connector handshaking to use new - // handshake API, and then move the code from on_secure_handshake_done() - // into this function. - connection_state->args = args->args; - grpc_server_security_connector_do_handshake( - exec_ctx, connection_state->server_state->sc, connection_state->acceptor, - args->endpoint, args->read_buffer, connection_state->deadline, - on_secure_handshake_done, connection_state); + pending_handshake_manager_remove_locked(connection_state->server_state, + connection_state->handshake_mgr); + gpr_mu_unlock(&connection_state->server_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - connection_state->handshake_mgr = NULL; + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + gpr_free(connection_state); } -static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { - server_secure_state *server_state = statep; - server_secure_connect *connection_state = NULL; - gpr_mu_lock(&server_state->mu); - if (server_state->is_shutdown) { - gpr_mu_unlock(&server_state->mu); + server_secure_state *state = arg; + gpr_mu_lock(&state->mu); + if (state->shutdown) { + gpr_mu_unlock(&state->mu); grpc_endpoint_destroy(exec_ctx, tcp); return; } - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_ref(server_state->tcp); - connection_state = gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = server_state; + grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); + pending_handshake_manager_add_locked(state, handshake_mgr); + gpr_mu_unlock(&state->mu); + grpc_tcp_server_ref(state->tcp_server); + server_secure_connection_state *connection_state = + gpr_malloc(sizeof(*connection_state)); + connection_state->server_state = state; connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; - connection_state->handshake_mgr = grpc_handshake_manager_create(); + connection_state->handshake_mgr = handshake_mgr; + grpc_server_security_connector_create_handshakers( + exec_ctx, state->sc, connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. - connection_state->deadline = gpr_time_add( + const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); grpc_handshake_manager_do_handshake( exec_ctx, connection_state->handshake_mgr, tcp, - grpc_server_get_channel_args(connection_state->server_state->server), - connection_state->deadline, acceptor, on_handshake_done, - connection_state); + grpc_server_get_channel_args(state->server), + deadline, acceptor, on_handshake_done, connection_state); } /* Server callback: start listening on our ports */ static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, - void *statep, grpc_pollset **pollsets, + void *arg, grpc_pollset **pollsets, size_t pollset_count) { - server_secure_state *server_state = statep; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = false; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count, - on_accept, server_state); + server_secure_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = false; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count, + on_accept, state); } -static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep, +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - server_secure_state *server_state = statep; + server_secure_state *state = arg; /* ensure all threads have unlocked */ - gpr_mu_lock(&server_state->mu); - grpc_closure *destroy_done = server_state->server_destroy_listener_done; - GPR_ASSERT(server_state->is_shutdown); - gpr_mu_unlock(&server_state->mu); - /* clean up */ - grpc_server_security_connector_shutdown(exec_ctx, server_state->sc); - + gpr_mu_lock(&state->mu); + grpc_closure *destroy_done = state->server_destroy_listener_done; + GPR_ASSERT(state->shutdown); + pending_handshake_manager_shutdown_locked(exec_ctx, state); + gpr_mu_unlock(&state->mu); /* Flush queued work before a synchronous unref. */ grpc_exec_ctx_flush(exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(&server_state->sc->base, "server"); - grpc_server_credentials_unref(server_state->creds); - + GRPC_SECURITY_CONNECTOR_UNREF(&state->sc->base, "server"); + grpc_server_credentials_unref(state->creds); if (destroy_done != NULL) { destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); grpc_exec_ctx_flush(exec_ctx); } - gpr_free(server_state); + gpr_mu_destroy(&state->mu); + gpr_free(state); } static void server_destroy_listener(grpc_exec_ctx *exec_ctx, - grpc_server *server, void *statep, - grpc_closure *callback) { - server_secure_state *server_state = statep; - grpc_tcp_server *tcp; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = true; - server_state->server_destroy_listener_done = callback; - tcp = server_state->tcp; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, server_state->tcp); + grpc_server *server, void *arg, + grpc_closure *destroy_done) { + server_secure_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = true; + state->server_destroy_listener_done = destroy_done; + grpc_tcp_server *tcp_server = state->tcp_server; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server); + grpc_tcp_server_unref(exec_ctx, tcp_server); } int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - server_secure_state *server_state = NULL; + grpc_tcp_server *tcp_server = NULL; + server_secure_state *state = NULL; size_t i; size_t count = 0; int port_num = -1; @@ -255,34 +269,35 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, gpr_free(msg); goto error; } - sc->channel_args = grpc_server_get_channel_args(server); /* resolve address */ err = grpc_blocking_resolve_address(addr, "https", &resolved); if (err != GRPC_ERROR_NONE) { goto error; } - server_state = gpr_malloc(sizeof(*server_state)); - memset(server_state, 0, sizeof(*server_state)); - grpc_closure_init(&server_state->tcp_server_shutdown_complete, - tcp_server_shutdown_complete, server_state); + state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, state); err = grpc_tcp_server_create(&exec_ctx, - &server_state->tcp_server_shutdown_complete, - grpc_server_get_channel_args(server), &tcp); + &state->tcp_server_shutdown_complete, + grpc_server_get_channel_args(server), + &tcp_server); if (err != GRPC_ERROR_NONE) { goto error; } - server_state->server = server; - server_state->tcp = tcp; - server_state->sc = sc; - server_state->creds = grpc_server_credentials_ref(creds); - server_state->is_shutdown = true; - gpr_mu_init(&server_state->mu); + state->server = server; + state->tcp_server = tcp_server; + state->sc = sc; + state->creds = grpc_server_credentials_ref(creds); + state->shutdown = true; + gpr_mu_init(&state->mu); errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); for (i = 0; i < resolved->naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); + errors[i] = + grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; @@ -321,7 +336,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_resolved_addresses_destroy(resolved); /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, server_state, + grpc_server_add_listener(&exec_ctx, server, state, server_start_listener, server_destroy_listener); grpc_exec_ctx_finish(&exec_ctx); @@ -339,15 +354,15 @@ error: if (resolved) { grpc_resolved_addresses_destroy(resolved); } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); + if (tcp_server) { + grpc_tcp_server_unref(&exec_ctx, tcp_server); } else { if (sc) { grpc_exec_ctx_flush(&exec_ctx); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server"); } - if (server_state) { - gpr_free(server_state); + if (state) { + gpr_free(state); } } grpc_exec_ctx_finish(&exec_ctx); |