diff options
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r-- | src/core/surface/server.c | 195 |
1 files changed, 111 insertions, 84 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index a60d5f7717..b1e3ec9876 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -124,6 +124,11 @@ struct channel_data { gpr_uint32 registered_method_max_probes; }; +typedef struct shutdown_tag { + void *tag; + grpc_completion_queue *cq; +} shutdown_tag; + struct grpc_server { size_t channel_filter_count; const grpc_channel_filter **channel_filters; @@ -134,14 +139,14 @@ struct grpc_server { size_t cq_count; gpr_mu mu; - gpr_cv cv; registered_method *registered_methods; requested_call_array requested_calls; gpr_uint8 shutdown; + gpr_uint8 shutdown_published; size_t num_shutdown_tags; - void **shutdown_tags; + shutdown_tag *shutdown_tags; call_data *lists[CALL_LIST_COUNT]; channel_data root_channel_data; @@ -256,29 +261,32 @@ static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } -static void server_unref(grpc_server *server) { +static void server_delete(grpc_server *server) { registered_method *rm; size_t i; + grpc_channel_args_destroy(server->channel_args); + gpr_mu_destroy(&server->mu); + gpr_free(server->channel_filters); + requested_call_array_destroy(&server->requested_calls); + while ((rm = server->registered_methods) != NULL) { + server->registered_methods = rm->next; + gpr_free(rm->method); + gpr_free(rm->host); + requested_call_array_destroy(&rm->requested); + gpr_free(rm); + } + for (i = 0; i < server->cq_count; i++) { + grpc_cq_internal_unref(server->cqs[i]); + } + gpr_free(server->cqs); + gpr_free(server->pollsets); + gpr_free(server->shutdown_tags); + gpr_free(server); +} + +static void server_unref(grpc_server *server) { if (gpr_unref(&server->internal_refcount)) { - grpc_channel_args_destroy(server->channel_args); - gpr_mu_destroy(&server->mu); - gpr_cv_destroy(&server->cv); - gpr_free(server->channel_filters); - requested_call_array_destroy(&server->requested_calls); - while ((rm = server->registered_methods) != NULL) { - server->registered_methods = rm->next; - gpr_free(rm->method); - gpr_free(rm->host); - requested_call_array_destroy(&rm->requested); - gpr_free(rm); - } - for (i = 0; i < server->cq_count; i++) { - grpc_cq_internal_unref(server->cqs[i]); - } - gpr_free(server->cqs); - gpr_free(server->pollsets); - gpr_free(server->shutdown_tags); - gpr_free(server); + server_delete(server); } } @@ -371,6 +379,26 @@ static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } +static int num_listeners(grpc_server *server) { + listener *l; + int n = 0; + for (l = server->listeners; l; l = l->next) { + n++; + } + return n; +} + +static void maybe_finish_shutdown(grpc_server *server) { + size_t i; + if (server->shutdown && !server->shutdown_published && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) { + server->shutdown_published = 1; + for (i = 0; i < server->num_shutdown_tags; i++) { + grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag, + NULL, 1); + } + } +} + static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *chand = elem->channel_data; @@ -430,6 +458,9 @@ static void server_on_recv(void *ptr, int success) { calld->state = ZOMBIED; grpc_iomgr_add_callback(kill_zombie, elem); } + if (call_list_remove(calld, ALL_CALLS)) { + maybe_finish_shutdown(chand->server); + } gpr_mu_unlock(&chand->server->mu); break; } @@ -526,19 +557,15 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - size_t i, j; + int removed[CALL_LIST_COUNT]; + size_t i; gpr_mu_lock(&chand->server->mu); for (i = 0; i < CALL_LIST_COUNT; i++) { - call_list_remove(elem->call_data, i); + removed[i] = call_list_remove(elem->call_data, i); } - if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) { - for (i = 0; i < chand->server->num_shutdown_tags; i++) { - for (j = 0; j < chand->server->cq_count; j++) { - grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i], - NULL, 1); - } - } + if (removed[ALL_CALLS]) { + maybe_finish_shutdown(chand->server); } gpr_mu_unlock(&chand->server->mu); @@ -625,7 +652,6 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, memset(server, 0, sizeof(grpc_server)); gpr_mu_init(&server->mu); - gpr_cv_init(&server->cv); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); @@ -784,38 +810,28 @@ grpc_transport_setup_result grpc_server_setup_transport( return result; } -static int num_listeners(grpc_server *server) { - listener *l; - int n = 0; - for (l = server->listeners; l; l = l->next) { - n++; - } - return n; -} - -static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, - void *shutdown_tag) { +void grpc_server_shutdown_and_notify(grpc_server *server, + grpc_completion_queue *cq, void *tag) { listener *l; requested_call_array requested_calls; channel_data **channels; channel_data *c; size_t nchannels; - size_t i, j; + size_t i; grpc_channel_op op; grpc_channel_element *elem; registered_method *rm; + shutdown_tag *sdt; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu); - if (have_shutdown_tag) { - for (i = 0; i < server->cq_count; i++) { - grpc_cq_begin_op(server->cqs[i], NULL); - } - server->shutdown_tags = - gpr_realloc(server->shutdown_tags, - sizeof(void *) * (server->num_shutdown_tags + 1)); - server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag; - } + grpc_cq_begin_op(cq, NULL); + server->shutdown_tags = + gpr_realloc(server->shutdown_tags, + sizeof(void *) * (server->num_shutdown_tags + 1)); + sdt = &server->shutdown_tags[server->num_shutdown_tags++]; + sdt->tag = tag; + sdt->cq = cq; if (server->shutdown) { gpr_mu_unlock(&server->mu); return; @@ -856,13 +872,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, } server->shutdown = 1; - if (server->lists[ALL_CALLS] == NULL) { - for (i = 0; i < server->num_shutdown_tags; i++) { - for (j = 0; j < server->cq_count; j++) { - grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1); - } - } - } + maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu); for (i = 0; i < nchannels; i++) { @@ -892,46 +902,64 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, } } -void grpc_server_shutdown(grpc_server *server) { - shutdown_internal(server, 0, NULL); -} - -void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) { - shutdown_internal(server, 1, tag); -} - void grpc_server_listener_destroy_done(void *s) { grpc_server *server = s; gpr_mu_lock(&server->mu); server->listeners_destroyed++; - gpr_cv_signal(&server->cv); + maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu); } -void grpc_server_destroy(grpc_server *server) { - channel_data *c; - listener *l; - size_t i; +void grpc_server_cancel_all_calls(grpc_server *server) { call_data *calld; + grpc_call **calls; + size_t call_count; + size_t call_capacity; + int is_first = 1; + size_t i; gpr_mu_lock(&server->mu); - if (!server->shutdown) { + + GPR_ASSERT(server->shutdown); + + if (!server->lists[ALL_CALLS]) { gpr_mu_unlock(&server->mu); - grpc_server_shutdown(server); - gpr_mu_lock(&server->mu); + return; } - while (server->listeners_destroyed != num_listeners(server)) { - for (i = 0; i < server->cq_count; i++) { - gpr_mu_unlock(&server->mu); - grpc_cq_hack_spin_pollset(server->cqs[i]); - gpr_mu_lock(&server->mu); + call_capacity = 8; + call_count = 0; + calls = gpr_malloc(sizeof(grpc_call *) * call_capacity); + + for (calld = server->lists[ALL_CALLS]; calld != server->lists[ALL_CALLS] || is_first; calld = calld->links[ALL_CALLS].next) { + if (call_count == call_capacity) { + call_capacity *= 2; + calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity); } + calls[call_count++] = calld->call; + GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all"); + is_first = 0; + } + + gpr_mu_unlock(&server->mu); - gpr_cv_wait(&server->cv, &server->mu, - gpr_time_add(gpr_now(), gpr_time_from_millis(100))); + for (i = 0; i < call_count; i++) { + grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, "Unavailable"); + GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1); } + gpr_free(calls); +} + +void grpc_server_destroy(grpc_server *server) { + channel_data *c; + listener *l; + call_data *calld; + + gpr_mu_lock(&server->mu); + GPR_ASSERT(server->shutdown); + GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); + while (server->listeners) { l = server->listeners; server->listeners = l->next; @@ -940,7 +968,6 @@ void grpc_server_destroy(grpc_server *server) { while ((calld = call_list_remove_head(&server->lists[PENDING_START], PENDING_START)) != NULL) { - gpr_log(GPR_DEBUG, "server destroys call %p", calld->call); calld->state = ZOMBIED; grpc_iomgr_add_callback( kill_zombie, |