From e503cd53984eab4b11da1193bc4f35df8dff008e Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Tue, 14 Jul 2015 02:27:23 +0200 Subject: Better socket kick for Windows. Now calling tcp_shutdown will in fact close the socket, which cascades into properly cleaning out all the pending requests. The tcp_server_windows's shutdown logic had to be rewritted (simplified) in order to take this into account. --- src/core/iomgr/socket_windows.c | 30 ++++++++------- src/core/iomgr/tcp_server_windows.c | 76 ++++++++++++++++--------------------- 2 files changed, 49 insertions(+), 57 deletions(-) (limited to 'src/core') diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c index 897408ded2..f6ddfff0ad 100644 --- a/src/core/iomgr/socket_windows.c +++ b/src/core/iomgr/socket_windows.c @@ -37,6 +37,7 @@ #include #include +#include #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr_internal.h" @@ -61,22 +62,27 @@ grpc_winsocket *grpc_winsocket_create(SOCKET socket, const char *name) { operations to abort them. We need to do that this way because of the various callsites of that function, which happens to be in various mutex hold states, and that'd be unsafe to call them directly. */ -int grpc_winsocket_shutdown(grpc_winsocket *socket) { +int grpc_winsocket_shutdown(grpc_winsocket *winsocket) { int callbacks_set = 0; - gpr_mu_lock(&socket->state_mu); - if (socket->read_info.cb) { + SOCKET socket; + gpr_mu_lock(&winsocket->state_mu); + socket = winsocket->socket; + if (winsocket->read_info.cb) { callbacks_set++; - grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb, - socket->read_info.opaque); - grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); + grpc_iomgr_closure_init(&winsocket->shutdown_closure, + winsocket->read_info.cb, + winsocket->read_info.opaque); + grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); } - if (socket->write_info.cb) { + if (winsocket->write_info.cb) { callbacks_set++; - grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb, - socket->write_info.opaque); - grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0); + grpc_iomgr_closure_init(&winsocket->shutdown_closure, + winsocket->write_info.cb, + winsocket->write_info.opaque); + grpc_iomgr_add_delayed_callback(&winsocket->shutdown_closure, 0); } - gpr_mu_unlock(&socket->state_mu); + gpr_mu_unlock(&winsocket->state_mu); + closesocket(socket); return callbacks_set; } @@ -87,14 +93,12 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) { an "idle" socket which is neither trying to read or write, we'd start leaking both memory and sockets. */ void grpc_winsocket_orphan(grpc_winsocket *winsocket) { - SOCKET socket = winsocket->socket; grpc_iomgr_unregister_object(&winsocket->iomgr_object); if (winsocket->read_info.outstanding || winsocket->write_info.outstanding) { grpc_iocp_socket_orphan(winsocket); } else { grpc_winsocket_destroy(winsocket); } - closesocket(socket); } void grpc_winsocket_destroy(grpc_winsocket *winsocket) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index d70968de88..dfd56f9c40 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -108,9 +108,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, size_t i; gpr_mu_lock(&s->mu); /* First, shutdown all fd's. This will queue abortion calls for all - of the pending accepts. */ + of the pending accepts due to the normal operation mechanism. */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; + sp->shutting_down = 1; grpc_winsocket_shutdown(sp->socket); } /* This happens asynchronously. Wait while that happens. */ @@ -243,62 +244,49 @@ static void on_accept(void *arg, int from_iocp) { grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; - /* The shutdown sequence is done in two parts. This is the second - part here, acknowledging the IOCP notification, and doing nothing - else, especially not queuing a new accept. */ - if (sp->shutting_down) { - GPR_ASSERT(from_iocp); - sp->shutting_down = 0; - sp->socket->read_info.outstanding = 0; - gpr_mu_lock(&sp->server->mu); - if (0 == --sp->server->active_ports) { - gpr_cv_broadcast(&sp->server->cv); - } - gpr_mu_unlock(&sp->server->mu); - return; - } - - if (from_iocp) { - /* The IOCP notified us of a completed operation. Let's grab the results, - and act accordingly. */ - DWORD transfered_bytes = 0; - DWORD flags; - BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, - &transfered_bytes, FALSE, &flags); - if (!wsa_success) { + /* The general mechanism for shutting down is to queue abortion calls. While + this is necessary in the read/write case, it's useless for the accept + case. Let's do nothing. */ + if (!from_iocp) return; + + /* The IOCP notified us of a completed operation. Let's grab the results, + and act accordingly. */ + DWORD transfered_bytes = 0; + DWORD flags; + BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, + &transfered_bytes, FALSE, &flags); + if (!wsa_success) { + if (sp->shutting_down) { + /* During the shutdown case, we ARE expecting an error. So that's swell, + and we can wake up the shutdown thread. */ + sp->shutting_down = 0; + sp->socket->read_info.outstanding = 0; + gpr_mu_lock(&sp->server->mu); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); + return; + } else { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message); gpr_free(utf8_message); closesocket(sock); - } else { - /* TODO(ctiller): add sockaddr address to label */ - ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); } } else { - /* If we're not notified from the IOCP, it means we are asked to shutdown. - This will initiate that shutdown. Calling closesocket will trigger an - IOCP notification, that will call this function a second time, from - the IOCP thread. Of course, this only works if the socket was, in fact, - listening. If that's not the case, we'd wait indefinitely. That's a bit - of a degenerate case, but it can happen if you create a server, but - don't start it. So let's support that by recursing once. */ - sp->shutting_down = 1; - sp->new_socket = INVALID_SOCKET; - if (sock != INVALID_SOCKET) { - closesocket(sock); - } else { - on_accept(sp, 1); + if (!sp->shutting_down) { + /* TODO(ctiller): add sockaddr address to label */ + ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); } - return; } /* The only time we should call our callback, is where we successfully managed to accept a connection, and created an endpoint. */ if (ep) sp->server->cb(sp->server->cb_arg, ep); /* As we were notified from the IOCP of one and exactly one accept, - the former socked we created has now either been destroy or assigned - to the new connection. We need to create a new one for the next - connection. */ + the former socked we created has now either been destroy or assigned + to the new connection. We need to create a new one for the next + connection. */ start_accept(sp); } -- cgit v1.2.3 From e55e1833bc3f8d7207d8fc71e1e7c0a822b3b45d Mon Sep 17 00:00:00 2001 From: chai2010 Date: Tue, 14 Jul 2015 13:24:48 +0800 Subject: fix build on windows --- src/core/iomgr/tcp_server_windows.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index dfd56f9c40..e6e1d1499e 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -243,6 +243,9 @@ static void on_accept(void *arg, int from_iocp) { SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; + DWORD transfered_bytes; + DWORD flags; + BOOL wsa_success; /* The general mechanism for shutting down is to queue abortion calls. While this is necessary in the read/write case, it's useless for the accept @@ -251,9 +254,8 @@ static void on_accept(void *arg, int from_iocp) { /* The IOCP notified us of a completed operation. Let's grab the results, and act accordingly. */ - DWORD transfered_bytes = 0; - DWORD flags; - BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, + transfered_bytes = 0; + wsa_success = WSAGetOverlappedResult(sock, &info->overlapped, &transfered_bytes, FALSE, &flags); if (!wsa_success) { if (sp->shutting_down) { -- cgit v1.2.3 From d6cc1814c069f9023b886af7119d0b4ea9f49688 Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 14 Jul 2015 14:49:01 -0700 Subject: Name arguments --- src/core/support/stack_lockfree.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/core') diff --git a/src/core/support/stack_lockfree.h b/src/core/support/stack_lockfree.h index 7919ef38cc..f1d8c77279 100644 --- a/src/core/support/stack_lockfree.h +++ b/src/core/support/stack_lockfree.h @@ -39,12 +39,12 @@ typedef struct gpr_stack_lockfree gpr_stack_lockfree; /* This stack must specify the maximum number of entries to track. The current implementation only allows up to 65534 entries */ gpr_stack_lockfree *gpr_stack_lockfree_create(int entries); -void gpr_stack_lockfree_destroy(gpr_stack_lockfree *); +void gpr_stack_lockfree_destroy(gpr_stack_lockfree* stack); /* Pass in a valid entry number for the next stack entry */ -void gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); +void gpr_stack_lockfree_push(gpr_stack_lockfree* stack, int entry); /* Returns -1 on empty or the actual entry number */ -int gpr_stack_lockfree_pop(gpr_stack_lockfree *); +int gpr_stack_lockfree_pop(gpr_stack_lockfree* stack); #endif /* GRPC_INTERNAL_CORE_SUPPORT_STACK_LOCKFREE_H */ -- cgit v1.2.3 From d54c3e6b39643c34e7466ff826a4a9f2b909a769 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 14 Jul 2015 14:52:34 -0700 Subject: clang-format changed files --- src/core/support/stack_lockfree.c | 10 ++++------ src/core/support/stack_lockfree.h | 4 ++-- test/core/support/stack_lockfree_test.c | 28 ++++++++++++++-------------- 3 files changed, 20 insertions(+), 22 deletions(-) (limited to 'src/core') diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index 83a68444f5..9497efbfb5 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -65,7 +65,8 @@ typedef union lockfree_node { } lockfree_node; #define ENTRY_ALIGNMENT_BITS 3 /* make sure that entries aligned to 8-bytes */ -#define INVALID_ENTRY_INDEX ((1<<16)-1) /* reserve this entry as invalid */ +#define INVALID_ENTRY_INDEX ((1 << 16) - 1) /* reserve this entry as invalid \ + */ struct gpr_stack_lockfree { lockfree_node *entries; @@ -109,8 +110,7 @@ void gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); /* Point to it */ stack->entries[entry].contents.index = head.contents.index; - } while (!gpr_atm_rel_cas(&(stack->head.atm), - head.atm, newhead.atm)); + } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); /* Use rel_cas above to make sure that entry index is set properly */ } @@ -125,8 +125,6 @@ int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { newhead.atm = gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm)); - } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), - head.atm, - newhead.atm)); + } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm)); return head.contents.index; } diff --git a/src/core/support/stack_lockfree.h b/src/core/support/stack_lockfree.h index f1d8c77279..0bcf73635d 100644 --- a/src/core/support/stack_lockfree.h +++ b/src/core/support/stack_lockfree.h @@ -38,7 +38,7 @@ typedef struct gpr_stack_lockfree gpr_stack_lockfree; /* This stack must specify the maximum number of entries to track. The current implementation only allows up to 65534 entries */ -gpr_stack_lockfree *gpr_stack_lockfree_create(int entries); +gpr_stack_lockfree* gpr_stack_lockfree_create(int entries); void gpr_stack_lockfree_destroy(gpr_stack_lockfree* stack); /* Pass in a valid entry number for the next stack entry */ @@ -47,4 +47,4 @@ void gpr_stack_lockfree_push(gpr_stack_lockfree* stack, int entry); /* Returns -1 on empty or the actual entry number */ int gpr_stack_lockfree_pop(gpr_stack_lockfree* stack); -#endif /* GRPC_INTERNAL_CORE_SUPPORT_STACK_LOCKFREE_H */ +#endif /* GRPC_INTERNAL_CORE_SUPPORT_STACK_LOCKFREE_H */ diff --git a/test/core/support/stack_lockfree_test.c b/test/core/support/stack_lockfree_test.c index ebee04d5b8..42082de389 100644 --- a/test/core/support/stack_lockfree_test.c +++ b/test/core/support/stack_lockfree_test.c @@ -59,13 +59,13 @@ static void test_serial_sized(int size) { GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1); /* Now add repeatedly more items and check them */ - for (i=1; irank*arg->stack_size/arg->nthreads; - hi = (arg->rank+1)*arg->stack_size/arg->nthreads; - for (i=lo; irank * arg->stack_size / arg->nthreads; + hi = (arg->rank + 1) * arg->stack_size / arg->nthreads; + for (i = lo; i < hi; i++) { gpr_stack_lockfree_push(arg->stack, i); if ((res = gpr_stack_lockfree_pop(arg->stack)) != -1) { arg->sum += res; @@ -116,7 +116,7 @@ static void test_mt_sized(int size, int nth) { gpr_thd_options options = gpr_thd_options_default(); stack = gpr_stack_lockfree_create(size); - for (i=0; i Date: Tue, 14 Jul 2015 22:21:05 -0700 Subject: Fixed wrong frame parsing --- src/core/transport/chttp2/frame_data.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src/core') diff --git a/src/core/transport/chttp2/frame_data.c b/src/core/transport/chttp2/frame_data.c index 0ad62a9999..7e3980159e 100644 --- a/src/core/transport/chttp2/frame_data.c +++ b/src/core/transport/chttp2/frame_data.c @@ -89,12 +89,9 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( fh_0: case GRPC_CHTTP2_DATA_FH_0: p->frame_type = *cur; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_1; - return GRPC_CHTTP2_PARSE_OK; - } switch (p->frame_type) { case 0: + /* noop */ break; case 1: gpr_log(GPR_ERROR, "Compressed GRPC frames not yet supported"); @@ -103,6 +100,10 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type); return GRPC_CHTTP2_STREAM_ERROR; } + if (++cur == end) { + p->state = GRPC_CHTTP2_DATA_FH_1; + return GRPC_CHTTP2_PARSE_OK; + } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_1: p->frame_size = ((gpr_uint32)*cur) << 24; -- cgit v1.2.3 From 097468d43ad44c087a01abee5044bd683c144d18 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 15 Jul 2015 22:51:39 -0700 Subject: Clean up handshaking server channels properly --- src/core/httpcli/httpcli.c | 1 + src/core/security/secure_transport_setup.c | 29 ++++++++++-------- src/core/security/secure_transport_setup.h | 2 +- src/core/security/server_secure_chttp2.c | 47 ++++++++++++++++++++++++++++++ src/core/surface/secure_channel_create.c | 1 + 5 files changed, 67 insertions(+), 13 deletions(-) (limited to 'src/core') diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index 3f5557e08e..65997d5f44 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -165,6 +165,7 @@ static void start_write(internal_request *req) { static void on_secure_transport_setup_done(void *rp, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { internal_request *req = rp; if (status != GRPC_SECURITY_OK) { diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c index 731b382f09..0c3572b53c 100644 --- a/src/core/security/secure_transport_setup.c +++ b/src/core/security/secure_transport_setup.c @@ -47,7 +47,8 @@ typedef struct { tsi_handshaker *handshaker; unsigned char *handshake_buffer; size_t handshake_buffer_size; - grpc_endpoint *endpoint; + grpc_endpoint *wrapped_endpoint; + grpc_endpoint *secure_endpoint; gpr_slice_buffer left_overs; grpc_secure_transport_setup_done_cb cb; void *user_data; @@ -63,13 +64,16 @@ static void on_handshake_data_sent_to_peer(void *setup, static void secure_transport_setup_done(grpc_secure_transport_setup *s, int is_success) { if (is_success) { - s->cb(s->user_data, GRPC_SECURITY_OK, s->endpoint); + s->cb(s->user_data, GRPC_SECURITY_OK, s->wrapped_endpoint, + s->secure_endpoint); } else { - if (s->endpoint != NULL) { - grpc_endpoint_shutdown(s->endpoint); - grpc_endpoint_destroy(s->endpoint); + if (s->secure_endpoint != NULL) { + grpc_endpoint_shutdown(s->secure_endpoint); + grpc_endpoint_destroy(s->secure_endpoint); + } else { + grpc_endpoint_destroy(s->wrapped_endpoint); } - s->cb(s->user_data, GRPC_SECURITY_ERROR, NULL); + s->cb(s->user_data, GRPC_SECURITY_ERROR, s->wrapped_endpoint, NULL); } if (s->handshaker != NULL) tsi_handshaker_destroy(s->handshaker); if (s->handshake_buffer != NULL) gpr_free(s->handshake_buffer); @@ -95,8 +99,9 @@ static void on_peer_checked(void *user_data, grpc_security_status status) { secure_transport_setup_done(s, 0); return; } - s->endpoint = grpc_secure_endpoint_create( - protector, s->endpoint, s->left_overs.slices, s->left_overs.count); + s->secure_endpoint = + grpc_secure_endpoint_create(protector, s->wrapped_endpoint, + s->left_overs.slices, s->left_overs.count); secure_transport_setup_done(s, 1); return; } @@ -152,7 +157,7 @@ static void send_handshake_bytes_to_peer(grpc_secure_transport_setup *s) { gpr_slice_from_copied_buffer((const char *)s->handshake_buffer, offset); /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - write_status = grpc_endpoint_write(s->endpoint, &to_send, 1, + write_status = grpc_endpoint_write(s->wrapped_endpoint, &to_send, 1, on_handshake_data_sent_to_peer, s); if (write_status == GRPC_ENDPOINT_WRITE_ERROR) { gpr_log(GPR_ERROR, "Could not send handshake data to peer."); @@ -198,7 +203,7 @@ static void on_handshake_data_received_from_peer( if (result == TSI_INCOMPLETE_DATA) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->endpoint, + grpc_endpoint_notify_on_read(s->wrapped_endpoint, on_handshake_data_received_from_peer, setup); cleanup_slices(slices, nslices); return; @@ -256,7 +261,7 @@ static void on_handshake_data_sent_to_peer(void *setup, if (tsi_handshaker_is_in_progress(s->handshaker)) { /* TODO(klempner,jboeuf): This should probably use the client setup deadline */ - grpc_endpoint_notify_on_read(s->endpoint, + grpc_endpoint_notify_on_read(s->wrapped_endpoint, on_handshake_data_received_from_peer, setup); } else { check_peer(s); @@ -280,7 +285,7 @@ void grpc_setup_secure_transport(grpc_security_connector *connector, GRPC_SECURITY_CONNECTOR_REF(connector, "secure_transport_setup"); s->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; s->handshake_buffer = gpr_malloc(s->handshake_buffer_size); - s->endpoint = nonsecure_endpoint; + s->wrapped_endpoint = nonsecure_endpoint; s->user_data = user_data; s->cb = cb; gpr_slice_buffer_init(&s->left_overs); diff --git a/src/core/security/secure_transport_setup.h b/src/core/security/secure_transport_setup.h index 58701c461d..29025f5236 100644 --- a/src/core/security/secure_transport_setup.h +++ b/src/core/security/secure_transport_setup.h @@ -42,7 +42,7 @@ /* Ownership of the secure_endpoint is transfered. */ typedef void (*grpc_secure_transport_setup_done_cb)( void *user_data, grpc_security_status status, - grpc_endpoint *secure_endpoint); + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint); /* Calls the callback upon completion. */ void grpc_setup_secure_transport(grpc_security_connector *connector, diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index 8a7ada07af..3717b8989f 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -51,10 +51,16 @@ #include #include +typedef struct tcp_endpoint_list { + grpc_endpoint *tcp_endpoint; + struct tcp_endpoint_list *next; +} tcp_endpoint_list; + typedef struct grpc_server_secure_state { grpc_server *server; grpc_tcp_server *tcp; grpc_security_connector *sc; + tcp_endpoint_list *handshaking_tcp_endpoints; int is_shutdown; gpr_mu mu; gpr_refcount refcount; @@ -88,14 +94,37 @@ static void setup_transport(void *statep, grpc_transport *transport, grpc_channel_args_destroy(args_copy); } +static int remove_tcp_from_list_locked(grpc_server_secure_state *state, + grpc_endpoint *tcp) { + tcp_endpoint_list *node = state->handshaking_tcp_endpoints; + tcp_endpoint_list *tmp = NULL; + if (node && node->tcp_endpoint == tcp) { + state->handshaking_tcp_endpoints = state->handshaking_tcp_endpoints->next; + gpr_free(node); + return 0; + } + while (node) { + if (node->next->tcp_endpoint == tcp) { + tmp = node->next; + node->next = node->next->next; + gpr_free(tmp); + return 0; + } + node = node->next; + } + return -1; +} + static void on_secure_transport_setup_done(void *statep, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; if (status == GRPC_SECURITY_OK) { gpr_mu_lock(&state->mu); + remove_tcp_from_list_locked(state, wrapped_endpoint); if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); transport = grpc_create_chttp2_transport( @@ -110,6 +139,9 @@ static void on_secure_transport_setup_done(void *statep, } gpr_mu_unlock(&state->mu); } else { + gpr_mu_lock(&state->mu); + remove_tcp_from_list_locked(state, wrapped_endpoint); + gpr_mu_unlock(&state->mu); gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } state_unref(state); @@ -117,7 +149,14 @@ static void on_secure_transport_setup_done(void *statep, static void on_accept(void *statep, grpc_endpoint *tcp) { grpc_server_secure_state *state = statep; + tcp_endpoint_list *node; state_ref(state); + node = gpr_malloc(sizeof(tcp_endpoint_list)); + node->tcp_endpoint = tcp; + gpr_mu_lock(&state->mu); + node->next = state->handshaking_tcp_endpoints; + state->handshaking_tcp_endpoints = node; + gpr_mu_unlock(&state->mu); grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done, state); } @@ -132,6 +171,13 @@ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets, static void destroy_done(void *statep) { grpc_server_secure_state *state = statep; grpc_server_listener_destroy_done(state->server); + gpr_mu_lock(&state->mu); + while (state->handshaking_tcp_endpoints != NULL) { + grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint); + remove_tcp_from_list_locked(state, + state->handshaking_tcp_endpoints->tcp_endpoint); + } + gpr_mu_unlock(&state->mu); state_unref(state); } @@ -209,6 +255,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, state->server = server; state->tcp = tcp; state->sc = sc; + state->handshaking_tcp_endpoints = NULL; state->is_shutdown = 0; gpr_mu_init(&state->mu); gpr_ref_init(&state->refcount, 1); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 34ee3f8400..f3c7d8397b 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -75,6 +75,7 @@ static void connector_unref(grpc_connector *con) { static void on_secure_transport_setup_done(void *arg, grpc_security_status status, + grpc_endpoint *wrapped_endpoint, grpc_endpoint *secure_endpoint) { connector *c = arg; grpc_iomgr_closure *notify; -- cgit v1.2.3