From 96df9e5ef136ef1fbceb6f08c7b1711379b6d152 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 17 Nov 2016 14:55:03 -0800 Subject: Use the same synchronization in the insecure server code as in the secure. --- .../chttp2/server/insecure/server_chttp2.c | 197 ++++++++++++++++----- .../chttp2/server/secure/server_secure_chttp2.c | 146 +++++++-------- 2 files changed, 227 insertions(+), 116 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 127cc19f0b..86d43e5721 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -33,6 +33,8 @@ #include +#include + #include #include #include @@ -47,80 +49,177 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct server_connection_state { +typedef struct pending_handshake_manager_node { + grpc_handshake_manager* handshake_mgr; + struct pending_handshake_manager_node *next; +} pending_handshake_manager_node; + +typedef struct { grpc_server *server; + grpc_tcp_server *tcp_server; + gpr_mu mu; + bool shutdown; + grpc_closure tcp_server_shutdown_complete; + grpc_closure *server_destroy_listener_done; + pending_handshake_manager_node *pending_handshake_mgrs; +} server_state; + +typedef struct { + server_state *server_state; grpc_pollset *accepting_pollset; grpc_tcp_server_acceptor *acceptor; grpc_handshake_manager *handshake_mgr; } server_connection_state; +static void pending_handshake_manager_add_locked( + server_state* state, grpc_handshake_manager* handshake_mgr) { + pending_handshake_manager_node* node = gpr_malloc(sizeof(*node)); + node->handshake_mgr = handshake_mgr; + node->next = state->pending_handshake_mgrs; + state->pending_handshake_mgrs = node; +} + +static void pending_handshake_manager_remove_locked( + server_state* state, grpc_handshake_manager* handshake_mgr) { + pending_handshake_manager_node** prev_node = &state->pending_handshake_mgrs; + for (pending_handshake_manager_node* node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + if (node->handshake_mgr == handshake_mgr) { + *prev_node = node->next; + gpr_free(node); + break; + } + prev_node = &node->next; + } +} + +static void pending_handshake_manager_shutdown_locked( + grpc_exec_ctx* exec_ctx, server_state* state) { + pending_handshake_manager_node* prev_node = NULL; + for (pending_handshake_manager_node* node = state->pending_handshake_mgrs; + node != NULL; node = node->next) { + grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr); + gpr_free(prev_node); + prev_node = node; + } + gpr_free(prev_node); + state->pending_handshake_mgrs = NULL; +} + static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; - server_connection_state *state = args->user_data; + server_connection_state *connection_state = args->user_data; if (error != GRPC_ERROR_NONE) { const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); grpc_endpoint_destroy(exec_ctx, args->endpoint); gpr_free(args->read_buffer); + gpr_mu_lock(&connection_state->server_state->mu); } else { - // Beware that the call to grpc_create_chttp2_transport() has to happen - // before grpc_tcp_server_destroy(). This is fine here, but similar code - // asynchronously doing a handshake instead of calling - // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add - // synchronization to avoid this case. - grpc_transport *transport = - grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); - grpc_server_setup_transport(exec_ctx, state->server, transport, - state->accepting_pollset, - grpc_server_get_channel_args(state->server)); - grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); + gpr_mu_lock(&connection_state->server_state->mu); + if (!connection_state->server_state->shutdown) { + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); + grpc_server_setup_transport( + exec_ctx, connection_state->server_state->server, transport, + connection_state->accepting_pollset, + grpc_server_get_channel_args(connection_state->server_state->server)); + grpc_chttp2_transport_start_reading(exec_ctx, transport, + args->read_buffer); + } else { + // Need to destroy this here, because the server may have already + // gone away. + grpc_endpoint_destroy(exec_ctx, args->endpoint); + } } - // Clean up. + pending_handshake_manager_remove_locked(connection_state->server_state, + connection_state->handshake_mgr); + gpr_mu_unlock(&connection_state->server_state->mu); + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); + gpr_free(connection_state); grpc_channel_args_destroy(args->args); - grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); - gpr_free(state); } -static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp, +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { - server_connection_state *state = gpr_malloc(sizeof(server_connection_state)); - state->server = server; - state->accepting_pollset = accepting_pollset; - state->acceptor = acceptor; - state->handshake_mgr = grpc_handshake_manager_create(); + server_state* state = arg; + gpr_mu_lock(&state->mu); + if (state->shutdown) { + gpr_mu_unlock(&state->mu); + grpc_endpoint_destroy(exec_ctx, tcp); + return; + } + grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); + pending_handshake_manager_add_locked(state, handshake_mgr); + gpr_mu_unlock(&state->mu); + grpc_tcp_server_ref(state->tcp_server); + server_connection_state *connection_state = + gpr_malloc(sizeof(*connection_state)); + connection_state->server_state = state; + connection_state->accepting_pollset = accepting_pollset; + connection_state->acceptor = acceptor; + connection_state->handshake_mgr = handshake_mgr; // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); grpc_handshake_manager_do_handshake( - exec_ctx, state->handshake_mgr, tcp, grpc_server_get_channel_args(server), - deadline, acceptor, on_handshake_done, state); + exec_ctx, connection_state->handshake_mgr, tcp, + grpc_server_get_channel_args(state->server), + deadline, acceptor, on_handshake_done, connection_state); } /* Server callback: start listening on our ports */ -static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_pollset **pollsets, size_t pollset_count) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept, - server); +static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, + void *arg, grpc_pollset **pollsets, + size_t pollset_count) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = false; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count, + on_accept, state); +} + +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + server_state *state = arg; + /* ensure all threads have unlocked */ + gpr_mu_lock(&state->mu); + grpc_closure *destroy_done = state->server_destroy_listener_done; + GPR_ASSERT(state->shutdown); + pending_handshake_manager_shutdown_locked(exec_ctx, state); + gpr_mu_unlock(&state->mu); + // Invoke callback. + if (destroy_done != NULL) { + grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_REF(error), NULL); + } + gpr_mu_destroy(&state->mu); + gpr_free(state); } /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, - grpc_closure *destroy_done) { - grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); +static void server_destroy_listener(grpc_exec_ctx *exec_ctx, + grpc_server *server, void *arg, + grpc_closure *destroy_done) { + server_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = true; + state->server_destroy_listener_done = destroy_done; + grpc_tcp_server *tcp_server = state->tcp_server; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server); + grpc_tcp_server_unref(exec_ctx, tcp_server); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; + grpc_tcp_server *tcp_server = NULL; size_t i; size_t count = 0; int port_num = -1; @@ -136,17 +235,28 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { if (err != GRPC_ERROR_NONE) { goto error; } - - err = grpc_tcp_server_create(&exec_ctx, NULL, - grpc_server_get_channel_args(server), &tcp); + server_state* state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, state); + err = grpc_tcp_server_create(&exec_ctx, + &state->tcp_server_shutdown_complete, + grpc_server_get_channel_args(server), + &tcp_server); if (err != GRPC_ERROR_NONE) { goto error; } + state->server = server; + state->tcp_server = tcp_server; + state->shutdown = true; + gpr_mu_init(&state->mu); + const size_t naddrs = resolved->naddrs; errors = gpr_malloc(sizeof(*errors) * naddrs); for (i = 0; i < naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); + errors[i] = grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], + &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; @@ -179,7 +289,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { grpc_resolved_addresses_destroy(resolved); /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, tcp, start, destroy); + grpc_server_add_listener(&exec_ctx, server, state, + server_start_listener, server_destroy_listener); goto done; /* Error path: cleanup and return */ @@ -188,8 +299,8 @@ error: if (resolved) { grpc_resolved_addresses_destroy(resolved); } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); + if (tcp_server) { + grpc_tcp_server_unref(&exec_ctx, tcp_server); } port_num = 0; diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 7c028f9f36..ab0cebe902 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -61,11 +61,11 @@ typedef struct pending_handshake_manager_node { typedef struct server_secure_state { grpc_server *server; - grpc_tcp_server *tcp; + grpc_tcp_server *tcp_server; grpc_server_security_connector *sc; grpc_server_credentials *creds; - bool is_shutdown; gpr_mu mu; + bool shutdown; grpc_closure tcp_server_shutdown_complete; grpc_closure *server_destroy_listener_done; pending_handshake_manager_node *pending_handshake_mgrs; @@ -121,18 +121,18 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); - gpr_free(args->read_buffer); grpc_endpoint_destroy(exec_ctx, args->endpoint); + gpr_free(args->read_buffer); gpr_mu_lock(&connection_state->server_state->mu); } else { gpr_mu_lock(&connection_state->server_state->mu); - if (!connection_state->server_state->is_shutdown) { + if (!connection_state->server_state->shutdown) { grpc_arg channel_arg = grpc_server_credentials_to_arg( connection_state->server_state->creds); grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(args->args, &channel_arg, 1); - grpc_transport *transport = grpc_create_chttp2_transport( - exec_ctx, args_copy, args->endpoint, 0); + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, args_copy, args->endpoint, 0); grpc_server_setup_transport( exec_ctx, connection_state->server_state->server, transport, connection_state->accepting_pollset, args_copy); @@ -140,105 +140,103 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); } else { - /* We need to consume this here, because the server may already have - * gone away. */ + // Need to destroy this here, because the server may have already + // gone away. grpc_endpoint_destroy(exec_ctx, args->endpoint); } } - pending_handshake_manager_remove_locked( - connection_state->server_state, connection_state->handshake_mgr); + pending_handshake_manager_remove_locked(connection_state->server_state, + connection_state->handshake_mgr); gpr_mu_unlock(&connection_state->server_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); + grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); gpr_free(connection_state); grpc_channel_args_destroy(args->args); } -static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, +static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { - server_secure_state *server_state = statep; - gpr_mu_lock(&server_state->mu); - if (server_state->is_shutdown) { - gpr_mu_unlock(&server_state->mu); + server_secure_state *state = arg; + gpr_mu_lock(&state->mu); + if (state->shutdown) { + gpr_mu_unlock(&state->mu); grpc_endpoint_destroy(exec_ctx, tcp); return; } grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create(); - pending_handshake_manager_add_locked(server_state, handshake_mgr); - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_ref(server_state->tcp); + pending_handshake_manager_add_locked(state, handshake_mgr); + gpr_mu_unlock(&state->mu); + grpc_tcp_server_ref(state->tcp_server); server_secure_connection_state *connection_state = gpr_malloc(sizeof(*connection_state)); - connection_state->server_state = server_state; + connection_state->server_state = state; connection_state->accepting_pollset = accepting_pollset; connection_state->acceptor = acceptor; connection_state->handshake_mgr = handshake_mgr; grpc_server_security_connector_create_handshakers( - exec_ctx, server_state->sc, connection_state->handshake_mgr); + exec_ctx, state->sc, connection_state->handshake_mgr); // TODO(roth): We should really get this timeout value from channel // args instead of hard-coding it. - gpr_timespec deadline = gpr_time_add( + const gpr_timespec deadline = gpr_time_add( gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN)); grpc_handshake_manager_do_handshake( exec_ctx, connection_state->handshake_mgr, tcp, - grpc_server_get_channel_args(connection_state->server_state->server), + grpc_server_get_channel_args(state->server), deadline, acceptor, on_handshake_done, connection_state); } /* Server callback: start listening on our ports */ static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, - void *statep, grpc_pollset **pollsets, + void *arg, grpc_pollset **pollsets, size_t pollset_count) { - server_secure_state *server_state = statep; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = false; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_start(exec_ctx, server_state->tcp, pollsets, pollset_count, - on_accept, server_state); + server_secure_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = false; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_start(exec_ctx, state->tcp_server, pollsets, pollset_count, + on_accept, state); } -static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *statep, +static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - server_secure_state *server_state = statep; + server_secure_state *state = arg; /* ensure all threads have unlocked */ - gpr_mu_lock(&server_state->mu); - grpc_closure *destroy_done = server_state->server_destroy_listener_done; - GPR_ASSERT(server_state->is_shutdown); - pending_handshake_manager_shutdown_locked(exec_ctx, server_state); - gpr_mu_unlock(&server_state->mu); - + gpr_mu_lock(&state->mu); + grpc_closure *destroy_done = state->server_destroy_listener_done; + GPR_ASSERT(state->shutdown); + pending_handshake_manager_shutdown_locked(exec_ctx, state); + gpr_mu_unlock(&state->mu); /* Flush queued work before a synchronous unref. */ grpc_exec_ctx_flush(exec_ctx); - GRPC_SECURITY_CONNECTOR_UNREF(&server_state->sc->base, "server"); - grpc_server_credentials_unref(server_state->creds); - + GRPC_SECURITY_CONNECTOR_UNREF(&state->sc->base, "server"); + grpc_server_credentials_unref(state->creds); if (destroy_done != NULL) { destroy_done->cb(exec_ctx, destroy_done->cb_arg, GRPC_ERROR_REF(error)); grpc_exec_ctx_flush(exec_ctx); } - gpr_free(server_state); + gpr_mu_destroy(&state->mu); + gpr_free(state); } static void server_destroy_listener(grpc_exec_ctx *exec_ctx, - grpc_server *server, void *statep, - grpc_closure *callback) { - server_secure_state *server_state = statep; - grpc_tcp_server *tcp; - gpr_mu_lock(&server_state->mu); - server_state->is_shutdown = true; - server_state->server_destroy_listener_done = callback; - tcp = server_state->tcp; - gpr_mu_unlock(&server_state->mu); - grpc_tcp_server_shutdown_listeners(exec_ctx, tcp); - grpc_tcp_server_unref(exec_ctx, server_state->tcp); + grpc_server *server, void *arg, + grpc_closure *destroy_done) { + server_secure_state *state = arg; + gpr_mu_lock(&state->mu); + state->shutdown = true; + state->server_destroy_listener_done = destroy_done; + grpc_tcp_server *tcp_server = state->tcp_server; + gpr_mu_unlock(&state->mu); + grpc_tcp_server_shutdown_listeners(exec_ctx, tcp_server); + grpc_tcp_server_unref(exec_ctx, tcp_server); } int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { grpc_resolved_addresses *resolved = NULL; - grpc_tcp_server *tcp = NULL; - server_secure_state *server_state = NULL; + grpc_tcp_server *tcp_server = NULL; + server_secure_state *state = NULL; size_t i; size_t count = 0; int port_num = -1; @@ -277,27 +275,29 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, if (err != GRPC_ERROR_NONE) { goto error; } - server_state = gpr_malloc(sizeof(*server_state)); - memset(server_state, 0, sizeof(*server_state)); - grpc_closure_init(&server_state->tcp_server_shutdown_complete, - tcp_server_shutdown_complete, server_state); + state = gpr_malloc(sizeof(*state)); + memset(state, 0, sizeof(*state)); + grpc_closure_init(&state->tcp_server_shutdown_complete, + tcp_server_shutdown_complete, state); err = grpc_tcp_server_create(&exec_ctx, - &server_state->tcp_server_shutdown_complete, - grpc_server_get_channel_args(server), &tcp); + &state->tcp_server_shutdown_complete, + grpc_server_get_channel_args(server), + &tcp_server); if (err != GRPC_ERROR_NONE) { goto error; } - server_state->server = server; - server_state->tcp = tcp; - server_state->sc = sc; - server_state->creds = grpc_server_credentials_ref(creds); - server_state->is_shutdown = true; - gpr_mu_init(&server_state->mu); + state->server = server; + state->tcp_server = tcp_server; + state->sc = sc; + state->creds = grpc_server_credentials_ref(creds); + state->shutdown = true; + gpr_mu_init(&state->mu); errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); for (i = 0; i < resolved->naddrs; i++) { - errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); + errors[i] = + grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; @@ -336,7 +336,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_resolved_addresses_destroy(resolved); /* Register with the server only upon success */ - grpc_server_add_listener(&exec_ctx, server, server_state, + grpc_server_add_listener(&exec_ctx, server, state, server_start_listener, server_destroy_listener); grpc_exec_ctx_finish(&exec_ctx); @@ -354,15 +354,15 @@ error: if (resolved) { grpc_resolved_addresses_destroy(resolved); } - if (tcp) { - grpc_tcp_server_unref(&exec_ctx, tcp); + if (tcp_server) { + grpc_tcp_server_unref(&exec_ctx, tcp_server); } else { if (sc) { grpc_exec_ctx_flush(&exec_ctx); GRPC_SECURITY_CONNECTOR_UNREF(&sc->base, "server"); } - if (server_state) { - gpr_free(server_state); + if (state) { + gpr_free(state); } } grpc_exec_ctx_finish(&exec_ctx); -- cgit v1.2.3