From d1bec03fa148344b8eac2b59517252d86e4ca858 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 18 Sep 2015 17:29:00 -0700 Subject: Call list progress --- src/core/security/server_secure_chttp2.c | 36 ++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 13 deletions(-) (limited to 'src/core/security/server_secure_chttp2.c') diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c index a6f50712f5..e6e2eee658 100644 --- a/src/core/security/server_secure_chttp2.c +++ b/src/core/security/server_secure_chttp2.c @@ -66,6 +66,7 @@ typedef struct grpc_server_secure_state { int is_shutdown; gpr_mu mu; gpr_refcount refcount; + grpc_closure destroy_closure; } grpc_server_secure_state; static void state_ref(grpc_server_secure_state *state) { @@ -127,7 +128,8 @@ static int remove_tcp_from_list_locked(grpc_server_secure_state *state, static void on_secure_transport_setup_done(void *statep, grpc_security_status status, grpc_endpoint *wrapped_endpoint, - grpc_endpoint *secure_endpoint) { + grpc_endpoint *secure_endpoint, + grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_transport *transport; grpc_mdctx *mdctx; @@ -137,16 +139,16 @@ static void on_secure_transport_setup_done(void *statep, remove_tcp_from_list_locked(state, wrapped_endpoint); if (!state->is_shutdown) { mdctx = grpc_mdctx_create(); - workqueue = grpc_workqueue_create(); + workqueue = grpc_workqueue_create(call_list); transport = grpc_create_chttp2_transport( grpc_server_get_channel_args(state->server), secure_endpoint, mdctx, - workqueue, 0); + 0); setup_transport(state, transport, mdctx, workqueue); grpc_chttp2_transport_start_reading(transport, NULL, 0); } else { /* We need to consume this here, because the server may already have gone * away. */ - grpc_endpoint_destroy(secure_endpoint); + grpc_endpoint_destroy(secure_endpoint, call_list); } gpr_mu_unlock(&state->mu); } else { @@ -158,7 +160,8 @@ static void on_secure_transport_setup_done(void *statep, state_unref(state); } -static void on_accept(void *statep, grpc_endpoint *tcp) { +static void on_accept(void *statep, grpc_endpoint *tcp, + grpc_call_list *call_list) { grpc_server_secure_state *state = statep; tcp_endpoint_list *node; state_ref(state); @@ -169,22 +172,24 @@ static void on_accept(void *statep, grpc_endpoint *tcp) { state->handshaking_tcp_endpoints = node; gpr_mu_unlock(&state->mu); grpc_setup_secure_transport(state->sc, tcp, on_secure_transport_setup_done, - state); + state, call_list); } /* Server callback: start listening on our ports */ static void start(grpc_server *server, void *statep, grpc_pollset **pollsets, - size_t pollset_count) { + size_t pollset_count, grpc_call_list *call_list) { grpc_server_secure_state *state = statep; - grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state); + grpc_tcp_server_start(state->tcp, pollsets, pollset_count, on_accept, state, + call_list); } -static void destroy_done(void *statep) { +static void destroy_done(void *statep, int success, grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_server_listener_destroy_done(state->server); gpr_mu_lock(&state->mu); while (state->handshaking_tcp_endpoints != NULL) { - grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint); + grpc_endpoint_shutdown(state->handshaking_tcp_endpoints->tcp_endpoint, + call_list); remove_tcp_from_list_locked(state, state->handshaking_tcp_endpoints->tcp_endpoint); } @@ -194,14 +199,16 @@ static void destroy_done(void *statep) { /* Server callback: destroy the tcp listener (so we don't generate further callbacks) */ -static void destroy(grpc_server *server, void *statep) { +static void destroy(grpc_server *server, void *statep, + grpc_call_list *call_list) { grpc_server_secure_state *state = statep; grpc_tcp_server *tcp; gpr_mu_lock(&state->mu); state->is_shutdown = 1; tcp = state->tcp; gpr_mu_unlock(&state->mu); - grpc_tcp_server_destroy(tcp, destroy_done, state); + grpc_closure_init(&state->destroy_closure, destroy_done, state); + grpc_tcp_server_destroy(tcp, &state->destroy_closure, call_list); } int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, @@ -215,6 +222,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, int port_temp; grpc_security_status status = GRPC_SECURITY_ERROR; grpc_security_connector *sc = NULL; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; /* create security context */ if (creds == NULL) goto error; @@ -277,6 +285,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, /* Register with the server only upon success */ grpc_server_add_listener(server, state, start, destroy); + grpc_call_list_run(&call_list); return port_num; /* Error path: cleanup and return */ @@ -288,10 +297,11 @@ error: grpc_resolved_addresses_destroy(resolved); } if (tcp) { - grpc_tcp_server_destroy(tcp, NULL, NULL); + grpc_tcp_server_destroy(tcp, NULL, &call_list); } if (state) { gpr_free(state); } + grpc_call_list_run(&call_list); return 0; } -- cgit v1.2.3