diff options
author | 2016-12-06 15:05:59 -0800 | |
---|---|---|
committer | 2016-12-06 15:05:59 -0800 | |
commit | 397bff36b94bcb6d37c47c061b3bcc1e7c2d6a6d (patch) | |
tree | 3c27de51b7ce79e41bb003350940b84e22d8334b /src/core/ext/transport/chttp2/server | |
parent | 80a675005d4545cb50c54bc292339b69f2304b04 (diff) | |
parent | 7c9076ab7a58fd7d120d3e0d6a58a3a253535185 (diff) |
Merge github.com:grpc/grpc into slice_with_exec_ctx
Diffstat (limited to 'src/core/ext/transport/chttp2/server')
4 files changed, 493 insertions, 448 deletions
diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c new file mode 100644 index 0000000000..4a182146bf --- /dev/null +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -0,0 +1,354 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" + +#include <grpc/grpc.h> + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/channel/http_server_filter.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/server.h" + +void grpc_chttp2_server_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr) { + if (handshaker_factory != NULL) { + handshaker_factory->vtable->create_handshakers(exec_ctx, handshaker_factory, + handshake_mgr); + } +} + +void grpc_chttp2_server_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory) { + if (handshaker_factory != NULL) { + handshaker_factory->vtable->destroy(exec_ctx, handshaker_factory); + } +} + +typedef struct pending_handshake_manager_node { + grpc_handshake_manager *handshake_mgr; + struct pending_handshake_manager_node *next; +} pending_handshake_manager_node; + +typedef struct { + grpc_server *server; + grpc_tcp_server *tcp_server; + grpc_channel_args *args; + grpc_chttp2_server_handshaker_factory *handshaker_factory; + gpr_mu mu; + bool shutdown; + grpc_closure tcp_server_shutdown_complete; + grpc_closure *server_destroy_listener_done; + pending_handshake_manager_node *pending_handshake_mgrs; +} server_state; + +typedef struct { + server_state *server_state; + grpc_pollset *accepting_pollset; + grpc_tcp_server_acceptor *acceptor; + grpc_handshake_manager *handshake_mgr; +} server_connection_state; + +static void pending_handshake_manager_add_locked( + server_state *state, grpc_handshake_manager *handshake_mgr) { + pending_handshake_manager_node *node = gpr_malloc(sizeof(*node)); + node->handshake_mgr = handshake_mgr; + node->next = state->pending_handshake_mgrs; + state->pending_handshake_mgrs = node; +} + +static void pending_handshake_manager_remove_locked( + server_state *state, grpc_handshake_manager *handshake_mgr) { + pending_handshake_manager_node **prev_node = &state->pending_handshake_mgrs; + for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + if (node->handshake_mgr == handshake_mgr) { + *prev_node = node->next; + gpr_free(node); + break; + } + prev_node = &node->next; + } +} + +static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx, + server_state *state) { + pending_handshake_manager_node *prev_node = NULL; + for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + gpr_free(prev_node); + prev_node = node; + } + gpr_free(prev_node); + state->pending_handshake_mgrs = NULL; +} + +static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_handshaker_args *args = arg; + server_connection_state *connection_state = args->user_data; + gpr_mu_lock(&connection_state->server_state->mu); + if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { + const char *error_str = grpc_error_string(error); + gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); + grpc_error_free_string(error_str); + if (error == GRPC_ERROR_NONE) { + // We were shut down after handshaking completed successfully, so + // destroy the endpoint here. + // TODO(ctiller): It is currently necessary to shutdown endpoints + // before destroying them, even if we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(exec_ctx, args->endpoint); + grpc_endpoint_destroy(exec_ctx, args->endpoint); + grpc_channel_args_destroy(exec_ctx, args->args); + grpc_slice_buffer_destroy(args->read_buffer); + gpr_free(args->read_buffer); + } + } else { + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, args->args); + grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); + grpc_channel_args_destroy(exec_ctx, args->args); + } + pending_handshake_manager_remove_locked(connection_state->server_state, + connection_state->handshake_mgr); + gpr_mu_unlock(&connection_state->server_state->mu); + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + gpr_free(connection_state); +} + +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, + grpc_tcp_server_acceptor *acceptor) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + if (state->shutdown) { + gpr_mu_unlock(&state->mu); + grpc_endpoint_destroy(exec_ctx, tcp); + return; + } + grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); + pending_handshake_manager_add_locked(state, handshake_mgr); + gpr_mu_unlock(&state->mu); + grpc_tcp_server_ref(state->tcp_server); + server_connection_state *connection_state = + gpr_malloc(sizeof(*connection_state)); + connection_state->server_state = state; + connection_state->accepting_pollset = accepting_pollset; + connection_state->acceptor = acceptor; + connection_state->handshake_mgr = handshake_mgr; + grpc_chttp2_server_handshaker_factory_create_handshakers( + exec_ctx, state->handshaker_factory, connection_state->handshake_mgr); + // TODO(roth): We should really get this timeout value from channel + // args instead of hard-coding it. + const gpr_timespec deadline = gpr_time_add( + gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); + grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr, + tcp, state->args, deadline, acceptor, + on_handshake_done, connection_state); +} + +/* Server callback: start listening on our ports */ +static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, + void *arg, grpc_pollset **pollsets, + size_t pollset_count) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = false; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count, + on_accept, state); +} + +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + server_state *state = arg; + /* ensure all threads have unlocked */ + gpr_mu_lock(&state->mu); + grpc_closure *destroy_done = state->server_destroy_listener_done; + GPR_ASSERT(state->shutdown); + pending_handshake_manager_shutdown_locked(exec_ctx, state); + gpr_mu_unlock(&state->mu); + // Flush queued work before destroying handshaker factory, since that + // may do a synchronous unref. + grpc_exec_ctx_flush(exec_ctx); + grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, + state->handshaker_factory); + if (destroy_done != NULL) { + destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); + grpc_exec_ctx_flush(exec_ctx); + } + grpc_channel_args_destroy(exec_ctx, state->args); + gpr_mu_destroy(&state->mu); + gpr_free(state); +} + +/* Server callback: destroy the tcp listener (so we don't generate further + callbacks) */ +static void server_destroy_listener(grpc_exec_ctx *exec_ctx, + grpc_server *server, void *arg, + grpc_closure *destroy_done) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = true; + state->server_destroy_listener_done = destroy_done; + grpc_tcp_server *tcp_server = state->tcp_server; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server); + grpc_tcp_server_unref(exec_ctx, tcp_server); +} + +grpc_error *grpc_chttp2_server_add_port( + grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, + grpc_channel_args *args, + grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num) { + grpc_resolved_addresses *resolved = NULL; + grpc_tcp_server *tcp_server = NULL; + size_t i; + size_t count = 0; + int port_temp; + grpc_error *err = GRPC_ERROR_NONE; + server_state *state = NULL; + grpc_error **errors = NULL; + + *port_num = -1; + + /* resolve address */ + err = grpc_blocking_resolve_address(addr, "https", &resolved); + if (err != GRPC_ERROR_NONE) { + goto error; + } + state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, state); + err = grpc_tcp_server_create(exec_ctx, &state->tcp_server_shutdown_complete, + args, &tcp_server); + if (err != GRPC_ERROR_NONE) { + goto error; + } + + state->server = server; + state->tcp_server = tcp_server; + state->args = args; + state->handshaker_factory = handshaker_factory; + state->shutdown = true; + gpr_mu_init(&state->mu); + + const size_t naddrs = resolved->naddrs; + errors = gpr_malloc(sizeof(*errors) * naddrs); + for (i = 0; i < naddrs; i++) { + errors[i] = + grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp); + if (errors[i] == GRPC_ERROR_NONE) { + if (*port_num == -1) { + *port_num = port_temp; + } else { + GPR_ASSERT(*port_num == port_temp); + } + count++; + } + } + if (count == 0) { + char *msg; + gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", + naddrs); + err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); + gpr_free(msg); + goto error; + } else if (count != naddrs) { + char *msg; + gpr_asprintf(&msg, "Only %" PRIuPTR + " addresses added out of total %" PRIuPTR " resolved", + count, naddrs); + err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); + gpr_free(msg); + + const char *warning_message = grpc_error_string(err); + gpr_log(GPR_INFO, "WARNING: %s", warning_message); + grpc_error_free_string(warning_message); + /* we managed to bind some addresses: continue */ + } + grpc_resolved_addresses_destroy(resolved); + + /* Register with the server only upon success */ + grpc_server_add_listener(exec_ctx, server, state, server_start_listener, + server_destroy_listener); + goto done; + +/* Error path: cleanup and return */ +error: + GPR_ASSERT(err != GRPC_ERROR_NONE); + if (resolved) { + grpc_resolved_addresses_destroy(resolved); + } + if (tcp_server) { + grpc_tcp_server_unref(exec_ctx, tcp_server); + } else { + grpc_channel_args_destroy(exec_ctx, args); + grpc_chttp2_server_handshaker_factory_destroy(exec_ctx, handshaker_factory); + gpr_free(state); + } + *port_num = 0; + +done: + if (errors != NULL) { + for (i = 0; i < naddrs; i++) { + GRPC_ERROR_UNREF(errors[i]); + } + gpr_free(errors); + } + return err; +} diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.h b/src/core/ext/transport/chttp2/server/chttp2_server.h new file mode 100644 index 0000000000..3073399267 --- /dev/null +++ b/src/core/ext/transport/chttp2/server/chttp2_server.h @@ -0,0 +1,78 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H + +#include <grpc/impl/codegen/grpc_types.h> + +#include "src/core/lib/channel/handshaker.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +/// A server handshaker factory is used to create handshakers for server +/// connections. +typedef struct grpc_chttp2_server_handshaker_factory + grpc_chttp2_server_handshaker_factory; + +typedef struct { + void (*create_handshakers)( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr); + void (*destroy)(grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory); +} grpc_chttp2_server_handshaker_factory_vtable; + +struct grpc_chttp2_server_handshaker_factory { + const grpc_chttp2_server_handshaker_factory_vtable *vtable; +}; + +void grpc_chttp2_server_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory, + grpc_handshake_manager *handshake_mgr); + +void grpc_chttp2_server_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, + grpc_chttp2_server_handshaker_factory *handshaker_factory); + +/// Adds a port to \a server. Sets \a port_num to the port number. +/// If \a handshaker_factory is not NULL, it will be used to create +/// handshakers for the port. +/// Takes ownership of \a args and \a handshaker_factory. +grpc_error *grpc_chttp2_server_add_port( + grpc_exec_ctx *exec_ctx, grpc_server *server, const char *addr, + grpc_channel_args *args, + grpc_chttp2_server_handshaker_factory *handshaker_factory, int *port_num); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_SERVER_CHTTP2_SERVER_H */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index ce7cd7586b..7e286d4e46 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -33,180 +33,28 @@ #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> -#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_connect_state { - grpc_server *server; - grpc_pollset *accepting_pollset; - grpc_tcp_server_acceptor *acceptor; - grpc_handshake_manager *handshake_mgr; -} server_connect_state; - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - server_connect_state *state = user_data; - if (error != GRPC_ERROR_NONE) { - const char *error_str = grpc_error_string(error); - gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); - grpc_error_free_string(error_str); - GRPC_ERROR_UNREF(error); - grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); - gpr_free(read_buffer); - } else { - // Beware that the call to grpc_create_chttp2_transport() has to happen - // before grpc_tcp_server_destroy(). This is fine here, but similar code - // asynchronously doing a handshake instead of calling - // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add - // synchronization to avoid this case. - grpc_transport *transport = - grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0); - grpc_server_setup_transport(exec_ctx, state->server, transport, - state->accepting_pollset, - grpc_server_get_channel_args(state->server)); - grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer); - } - // Clean up. - grpc_channel_args_destroy(exec_ctx, args); - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - gpr_free(state); -} - -static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, - grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { - server_connect_state *state = gpr_malloc(sizeof(server_connect_state)); - state->server = server; - state->accepting_pollset = accepting_pollset; - state->acceptor = acceptor; - state->handshake_mgr = grpc_handshake_manager_create(); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - const gpr_timespec deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake( - exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), - deadline, acceptor, on_handshake_done, state); -} - -/* Server callback: start listening on our ports */ -static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_pollset **pollsets, size_t pollset_count) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, - server); -} - -/* Server callback: destroy the tcp listener (so we don't generate further - callbacks) */ -static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_closure *destroy_done) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); -} - int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { - grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - size_t i; - size_t count = 0; - int port_num = -1; - int port_temp; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_error *err = GRPC_ERROR_NONE; - + int port_num = 0; GRPC_API_TRACE("grpc_server_add_insecure_http2_port(server=%p, addr=%s)", 2, (server, addr)); - - grpc_error **errors = NULL; - err = grpc_blocking_resolve_address(addr, "https", &resolved); - if (err != GRPC_ERROR_NONE) { - goto error; - } - - err = grpc_tcp_server_create(&exec_ctx, NULL, - grpc_server_get_channel_args(server), &tcp); + grpc_error *err = grpc_chttp2_server_add_port( + &exec_ctx, server, addr, + grpc_channel_args_copy(grpc_server_get_channel_args(server)), + NULL /* handshaker_factory */, &port_num); if (err != GRPC_ERROR_NONE) { - goto error; - } - - const size_t naddrs = resolved->naddrs; - errors = gpr_malloc(sizeof(*errors) * naddrs); - for (i = 0; i < naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); - if (errors[i] == GRPC_ERROR_NONE) { - if (port_num == -1) { - port_num = port_temp; - } else { - GPR_ASSERT(port_num == port_temp); - } - count++; - } + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); } - if (count == 0) { - char *msg; - gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", - naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); - gpr_free(msg); - goto error; - } else if (count != naddrs) { - char *msg; - gpr_asprintf(&msg, "Only %" PRIuPTR - " addresses added out of total %" PRIuPTR " resolved", - count, naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, naddrs); - gpr_free(msg); - - const char *warning_message = grpc_error_string(err); - gpr_log(GPR_INFO, "WARNING: %s", warning_message); - grpc_error_free_string(warning_message); - /* we managed to bind some addresses: continue */ - } - grpc_resolved_addresses_destroy(resolved); - - /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, tcp, start, destroy); - goto done; - -/* Error path: cleanup and return */ -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); - } - port_num = 0; - - const char *msg = grpc_error_string(err); - gpr_log(GPR_ERROR, "%s", msg); - grpc_error_free_string(msg); - GRPC_ERROR_UNREF(err); - -done: grpc_exec_ctx_finish(&exec_ctx); - if (errors != NULL) { - for (i = 0; i < naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - } - gpr_free(errors); return port_num; } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 6293e06b69..df80407c5d 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -38,218 +38,62 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> + +#include "src/core/ext/transport/chttp2/server/chttp2_server.h" + #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/handshaker.h" -#include "src/core/lib/channel/http_server_filter.h" -#include "src/core/lib/iomgr/endpoint.h" -#include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/credentials/credentials.h" -#include "src/core/lib/security/transport/auth_filters.h" -#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_secure_state { - grpc_server *server; - grpc_tcp_server *tcp; - grpc_server_security_connector *sc; - grpc_server_credentials *creds; - bool is_shutdown; - gpr_mu mu; - grpc_closure tcp_server_shutdown_complete; - grpc_closure *server_destroy_listener_done; -} server_secure_state; - -typedef struct server_secure_connect { - server_secure_state *server_state; - grpc_pollset *accepting_pollset; - grpc_tcp_server_acceptor *acceptor; - grpc_handshake_manager *handshake_mgr; - // TODO(roth): Remove the following two fields when we eliminate - // grpc_server_security_connector_do_handshake(). - gpr_timespec deadline; - grpc_channel_args *args; -} server_secure_connect; - -static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, - grpc_security_status status, - grpc_endpoint *secure_endpoint, - grpc_auth_context *auth_context) { - server_secure_connect *connection_state = statep; - if (status == GRPC_SECURITY_OK) { - if (secure_endpoint) { - gpr_mu_lock(&connection_state->server_state->mu); - if (!connection_state->server_state->is_shutdown) { - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args( - connection_state->server_state->server), - secure_endpoint, 0); - grpc_arg args_to_add[2]; - args_to_add[0] = grpc_server_credentials_to_arg( - connection_state->server_state->creds); - args_to_add[1] = grpc_auth_context_to_arg(auth_context); - grpc_channel_args *args_copy = grpc_channel_args_copy_and_add( - connection_state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add)); - grpc_server_setup_transport( - exec_ctx, connection_state->server_state->server, transport, - connection_state->accepting_pollset, args_copy); - grpc_channel_args_destroy(exec_ctx, args_copy); - grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL); - } else { - /* We need to consume this here, because the server may already have - * gone away. */ - grpc_endpoint_destroy(exec_ctx, secure_endpoint); - } - gpr_mu_unlock(&connection_state->server_state->mu); - } - } else { - gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); - } - grpc_channel_args_destroy(exec_ctx, connection_state->args); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); -} - -static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, - grpc_channel_args *args, - grpc_slice_buffer *read_buffer, void *user_data, - grpc_error *error) { - server_secure_connect *connection_state = user_data; - if (error != GRPC_ERROR_NONE) { - const char *error_str = grpc_error_string(error); - gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); - grpc_error_free_string(error_str); - GRPC_ERROR_UNREF(error); - grpc_channel_args_destroy(exec_ctx, args); - gpr_free(read_buffer); - grpc_handshake_manager_shutdown(exec_ctx, connection_state->handshake_mgr); - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); - gpr_free(connection_state); - return; - } - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - connection_state->handshake_mgr = NULL; - // TODO(roth, jboeuf): Convert security connector handshaking to use new - // handshake API, and then move the code from on_secure_handshake_done() - // into this function. - connection_state->args = args; - grpc_server_security_connector_do_handshake( - exec_ctx, connection_state->server_state->sc, connection_state->acceptor, - endpoint, read_buffer, connection_state->deadline, - on_secure_handshake_done, connection_state); -} - -static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, - grpc_pollset *accepting_pollset, - grpc_tcp_server_acceptor *acceptor) { - server_secure_state *server_state = statep; - server_secure_connect *connection_state = NULL; - gpr_mu_lock(&server_state->mu); - if (server_state->is_shutdown) { - gpr_mu_unlock(&server_state->mu); - grpc_endpoint_destroy(exec_ctx, tcp); - return; - } - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_ref(server_state->tcp); - connection_state = gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = server_state; - connection_state->accepting_pollset = accepting_pollset; - connection_state->acceptor = acceptor; - connection_state->handshake_mgr = grpc_handshake_manager_create(); - // TODO(roth): We should really get this timeout value from channel - // args instead of hard-coding it. - connection_state->deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); - grpc_handshake_manager_do_handshake( - exec_ctx, connection_state->handshake_mgr, tcp, - grpc_server_get_channel_args(connection_state->server_state->server), - connection_state->deadline, acceptor, on_handshake_done, - connection_state); +typedef struct { + grpc_chttp2_server_handshaker_factory base; + grpc_server_security_connector *security_connector; +} server_security_handshaker_factory; + +static void server_security_handshaker_factory_create_handshakers( + grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf, + grpc_handshake_manager *handshake_mgr) { + server_security_handshaker_factory *handshaker_factory = + (server_security_handshaker_factory *)hf; + grpc_server_security_connector_create_handshakers( + exec_ctx, handshaker_factory->security_connector, handshake_mgr); } -/* Server callback: start listening on our ports */ -static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, - void *statep, grpc_pollset **pollsets, - size_t pollset_count) { - server_secure_state *server_state = statep; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = false; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count, - on_accept, server_state); +static void server_security_handshaker_factory_destroy( + grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf) { + server_security_handshaker_factory *handshaker_factory = + (server_security_handshaker_factory *)hf; + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &handshaker_factory->security_connector->base, + "server"); + gpr_free(hf); } -static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep, - grpc_error *error) { - server_secure_state *server_state = statep; - /* ensure all threads have unlocked */ - gpr_mu_lock(&server_state->mu); - grpc_closure *destroy_done = server_state->server_destroy_listener_done; - GPR_ASSERT(server_state->is_shutdown); - gpr_mu_unlock(&server_state->mu); - /* clean up */ - grpc_server_security_connector_shutdown(exec_ctx, server_state->sc); - - /* Flush queued work before a synchronous unref. */ - grpc_exec_ctx_flush(exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &server_state->sc->base, "server"); - grpc_server_credentials_unref(exec_ctx, server_state->creds); - - if (destroy_done != NULL) { - destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); - grpc_exec_ctx_flush(exec_ctx); - } - gpr_free(server_state); -} - -static void server_destroy_listener(grpc_exec_ctx *exec_ctx, - grpc_server *server, void *statep, - grpc_closure *callback) { - server_secure_state *server_state = statep; - grpc_tcp_server *tcp; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = true; - server_state->server_destroy_listener_done = callback; - tcp = server_state->tcp; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, server_state->tcp); -} +static const grpc_chttp2_server_handshaker_factory_vtable + server_security_handshaker_factory_vtable = { + server_security_handshaker_factory_create_handshakers, + server_security_handshaker_factory_destroy}; int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { - grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - server_secure_state *server_state = NULL; - size_t i; - size_t count = 0; - int port_num = -1; - int port_temp; - grpc_security_status status = GRPC_SECURITY_ERROR; - grpc_server_security_connector *sc = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *err = GRPC_ERROR_NONE; - grpc_error **errors = NULL; - + grpc_server_security_connector *sc = NULL; + int port_num = 0; GRPC_API_TRACE( "grpc_server_add_secure_http2_port(" "server=%p, addr=%s, creds=%p)", 3, (server, addr, creds)); - - /* create security context */ + // Create security context. if (creds == NULL) { err = GRPC_ERROR_CREATE( "No credentials specified for secure server port (creds==NULL)"); - goto error; + goto done; } - status = + grpc_security_status status = grpc_server_credentials_create_security_connector(&exec_ctx, creds, &sc); if (status != GRPC_SECURITY_OK) { char *msg; @@ -259,107 +103,28 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, err = grpc_error_set_int(GRPC_ERROR_CREATE(msg), GRPC_ERROR_INT_SECURITY_STATUS, status); gpr_free(msg); - goto error; + goto done; } - sc->channel_args = grpc_server_get_channel_args(server); - - /* resolve address */ - err = grpc_blocking_resolve_address(addr, "https", &resolved); - if (err != GRPC_ERROR_NONE) { - goto error; - } - server_state = gpr_malloc(sizeof(*server_state)); - memset(server_state, 0, sizeof(*server_state)); - grpc_closure_init(&server_state->tcp_server_shutdown_complete, - tcp_server_shutdown_complete, server_state); - err = grpc_tcp_server_create(&exec_ctx, - &server_state->tcp_server_shutdown_complete, - grpc_server_get_channel_args(server), &tcp); + // Create handshaker factory. + server_security_handshaker_factory *handshaker_factory = + gpr_malloc(sizeof(*handshaker_factory)); + memset(handshaker_factory, 0, sizeof(*handshaker_factory)); + handshaker_factory->base.vtable = &server_security_handshaker_factory_vtable; + handshaker_factory->security_connector = sc; + // Create channel args. + grpc_arg channel_arg = grpc_server_credentials_to_arg(creds); + grpc_channel_args *args = grpc_channel_args_copy_and_add( + grpc_server_get_channel_args(server), &channel_arg, 1); + // Add server port. + err = grpc_chttp2_server_add_port(&exec_ctx, server, addr, args, + &handshaker_factory->base, &port_num); +done: + grpc_exec_ctx_finish(&exec_ctx); if (err != GRPC_ERROR_NONE) { - goto error; + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); } - - server_state->server = server; - server_state->tcp = tcp; - server_state->sc = sc; - server_state->creds = grpc_server_credentials_ref(creds); - server_state->is_shutdown = true; - gpr_mu_init(&server_state->mu); - - errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); - for (i = 0; i < resolved->naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); - if (errors[i] == GRPC_ERROR_NONE) { - if (port_num == -1) { - port_num = port_temp; - } else { - GPR_ASSERT(port_num == port_temp); - } - count++; - } - } - if (count == 0) { - char *msg; - gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved", - resolved->naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); - gpr_free(msg); - goto error; - } else if (count != resolved->naddrs) { - char *msg; - gpr_asprintf(&msg, "Only %" PRIuPTR - " addresses added out of total %" PRIuPTR " resolved", - count, resolved->naddrs); - err = GRPC_ERROR_CREATE_REFERENCING(msg, errors, resolved->naddrs); - gpr_free(msg); - - const char *warning_message = grpc_error_string(err); - gpr_log(GPR_INFO, "WARNING: %s", warning_message); - grpc_error_free_string(warning_message); - /* we managed to bind some addresses: continue */ - } else { - for (i = 0; i < resolved->naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - } - gpr_free(errors); - errors = NULL; - grpc_resolved_addresses_destroy(resolved); - - /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, server_state, - server_start_listener, server_destroy_listener); - - grpc_exec_ctx_finish(&exec_ctx); return port_num; - -/* Error path: cleanup and return */ -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (errors != NULL) { - for (i = 0; i < resolved->naddrs; i++) { - GRPC_ERROR_UNREF(errors[i]); - } - gpr_free(errors); - } - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); - } else { - if (sc) { - grpc_exec_ctx_flush(&exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &sc->base, "server"); - } - if (server_state) { - gpr_free(server_state); - } - } - grpc_exec_ctx_finish(&exec_ctx); - const char *msg = grpc_error_string(err); - GRPC_ERROR_UNREF(err); - gpr_log(GPR_ERROR, "%s", msg); - grpc_error_free_string(msg); - return 0; } |