diff options
author | Craig Tiller <ctiller@google.com> | 2015-07-16 07:39:59 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-07-16 07:39:59 -0700 |
commit | f101af1ab4c553ff2a8cd8f72e8ca156ef188b86 (patch) | |
tree | 5b8b6c316210ae1ae094fe17923bb0f8b44b2a8d /src/core | |
parent | 0327ce5ca851440f168ae3a8b359e4a13a88f2e8 (diff) | |
parent | 097468d43ad44c087a01abee5044bd683c144d18 (diff) |
Merge pull request #2468 from yang-g/cleanup_handshaking_channels_when_server_shutdown
Clean up handshaking server channels properly
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/httpcli/httpcli.c | 1 | ||||
-rw-r--r-- | src/core/security/secure_transport_setup.c | 29 | ||||
-rw-r--r-- | src/core/security/secure_transport_setup.h | 2 | ||||
-rw-r--r-- | src/core/security/server_secure_chttp2.c | 47 | ||||
-rw-r--r-- | src/core/surface/secure_channel_create.c | 1 |
5 files changed, 67 insertions, 13 deletions
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 3f5557e08e..65997d5f44 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -165,6 +165,7 @@ static void start_write(internal_request *req) { static void on_secure_transport_setup_done(void *rp, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { internal_request *req = rp; if (status != GRPC_SECURITY_OK) { diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 731b382f09..0c3572b53c 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -47,7 +47,8 @@ typedef struct { tsi_handshaker *handshaker; unsigned char *handshake_buffer; size_t handshake_buffer_size; - grpc_endpoint *endpoint; + grpc_endpoint *wrapped_endpoint; + grpc_endpoint *secure_endpoint; gpr_slice_buffer left_overs; grpc_secure_transport_setup_done_cb cb; void *user_data; @@ -63,13 +64,16 @@ static void on_handshake_data_sent_to_peer(void *setup, static void secure_transport_setup_done(grpc_secure_transport_setup *s, int is_success) { if (is_success) { - s->cb(s->user_data, GRPC_SECURITY_OK, s->endpoint); + s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint, + s->secure_endpoint); } else { - if (s->endpoint != NULL) { - grpc_endpoint_shutdown(s->endpoint); - grpc_endpoint_destroy(s->endpoint); + if (s->secure_endpoint != NULL) { + grpc_endpoint_shutdown(s->secure_endpoint); + grpc_endpoint_destroy(s->secure_endpoint); + } else { + grpc_endpoint_destroy(s->wrapped_endpoint); } - s->cb(s->user_data, GRPC_SECURITY_ERROR, NULL); + s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL); } if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); @@ -95,8 +99,9 @@ static void on_peer_checked(void *user_data, grpc_security_status status) { secure_transport_setup_done(s, 0); return; } - s->endpoint = grpc_secure_endpoint_create( - protector, s->endpoint, s->left_overs.slices, s->left_overs.count); + s->secure_endpoint = + grpc_secure_endpoint_create(protector, s->wrapped_endpoint, + s->left_overs.slices, s->left_overs.count); secure_transport_setup_done(s, 1); return; } @@ -152,7 +157,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - write_status = grpc_endpoint_write(s->endpoint, &to_send, 1, + write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1, on_handshake_data_sent_to_peer, s); if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { gpr_log(GPR_ERROR, "Could not send handshake data to peer."); @@ -198,7 +203,7 @@ static void on_handshake_data_received_from_peer( if (result == TSI_INCOMPLETE_DATA) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->endpoint, + grpc_endpoint_notify_on_read(s->wrapped_endpoint, on_handshake_data_received_from_peer, setup); cleanup_slices(slices, nslices); return; @@ -256,7 +261,7 @@ static void on_handshake_data_sent_to_peer(void *setup, if (tsi_handshaker_is_in_progress(s->handshaker)) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->endpoint, + grpc_endpoint_notify_on_read(s->wrapped_endpoint, on_handshake_data_received_from_peer, setup); } else { check_peer(s); @@ -280,7 +285,7 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup"); s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; s->handshake_buffer = gpr_malloc(s->handshake_buffer_size); - s->endpoint = nonsecure_endpoint; + s->wrapped_endpoint = nonsecure_endpoint; s->user_data = user_data; s->cb = cb; gpr_slice_buffer_init(&s->left_overs); diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h index 58701c461d..29025f5236 100644 --- a/src/core/security/secure_transport_setup.h +++ b/src/core/security/secure_transport_setup.h @@ -42,7 +42,7 @@ /* Ownership of the secure_endpoint is transfered. */ typedef void (*grpc_secure_transport_setup_done_cb)( void *user_data, grpc_security_status status, - grpc_endpoint *secure_endpoint); + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint); /* Calls the callback upon completion. */ void grpc_setup_secure_transport(grpc_security_connector *connector, diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 8a7ada07af..3717b8989f 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -51,10 +51,16 @@ #include <grpc/support/sync.h> #include <grpc/support/useful.h> +typedef struct tcp_endpoint_list { + grpc_endpoint *tcp_endpoint; + struct tcp_endpoint_list *next; +} tcp_endpoint_list; + typedef struct grpc_server_secure_state { grpc_server *server; grpc_tcp_server *tcp; grpc_security_connector *sc; + tcp_endpoint_list *handshaking_tcp_endpoints; int is_shutdown; gpr_mu mu; gpr_refcount refcount; @@ -88,14 +94,37 @@ static void setup_transport(void *statep, grpc_transport *transport, grpc_channel_args_destroy(args_copy); } +static int remove_tcp_from_list_locked(grpc_server_secure_state *state, + grpc_endpoint *tcp) { + tcp_endpoint_list *node = state->handshaking_tcp_endpoints; + tcp_endpoint_list *tmp = NULL; + if (node && node->tcp_endpoint == tcp) { + state->handshaking_tcp_endpoints = state->handshaking_tcp_endpoints->next; + gpr_free(node); + return 0; + } + while (node) { + if (node->next->tcp_endpoint == tcp) { + tmp = node->next; + node->next = node->next->next; + gpr_free(tmp); + return 0; + } + node = node->next; + } + return -1; +} + static void on_secure_transport_setup_done(void *statep, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; if (status == GRPC_SECURITY_OK) { gpr_mu_lock(&state->mu); + remove_tcp_from_list_locked(state, wrapped_endpoint); if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); transport = grpc_create_chttp2_transport( @@ -110,6 +139,9 @@ static void on_secure_transport_setup_done(void *statep, } gpr_mu_unlock(&state->mu); } else { + gpr_mu_lock(&state->mu); + remove_tcp_from_list_locked(state, wrapped_endpoint); + gpr_mu_unlock(&state->mu); gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } state_unref(state); @@ -117,7 +149,14 @@ static void on_secure_transport_setup_done(void *statep, static void on_accept(void *statep, grpc_endpoint *tcp) { grpc_server_secure_state *state = statep; + tcp_endpoint_list *node; state_ref(state); + node = gpr_malloc(sizeof(tcp_endpoint_list)); + node->tcp_endpoint = tcp; + gpr_mu_lock(&state->mu); + node->next = state->handshaking_tcp_endpoints; + state->handshaking_tcp_endpoints = node; + gpr_mu_unlock(&state->mu); grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done, state); } @@ -132,6 +171,13 @@ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets, static void destroy_done(void *statep) { grpc_server_secure_state *state = statep; grpc_server_listener_destroy_done(state->server); + gpr_mu_lock(&state->mu); + while (state->handshaking_tcp_endpoints != NULL) { + grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint); + remove_tcp_from_list_locked(state, + state->handshaking_tcp_endpoints->tcp_endpoint); + } + gpr_mu_unlock(&state->mu); state_unref(state); } @@ -209,6 +255,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, state->server = server; state->tcp = tcp; state->sc = sc; + state->handshaking_tcp_endpoints = NULL; state->is_shutdown = 0; gpr_mu_init(&state->mu); gpr_ref_init(&state->refcount, 1); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 34ee3f8400..f3c7d8397b 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -75,6 +75,7 @@ static void connector_unref(grpc_connector *con) { static void on_secure_transport_setup_done(void *arg, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { connector *c = arg; grpc_iomgr_closure *notify; |