diff options
author | Craig Tiller <ctiller@google.com> | 2015-05-26 16:15:34 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-05-26 16:15:34 -0700 |
commit | ee945e8325ff7d67be6990b6193e19f865ec7b30 (patch) | |
tree | fa0d40b2a59de878f4db98636441db680b896cc8 /src/core/surface | |
parent | 9db8f1ade2222a00578abf5300abd1734a0e5ed4 (diff) |
Work towards removing grpc_server_shutdown
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/completion_queue.c | 21 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 6 | ||||
-rw-r--r-- | src/core/surface/server.c | 131 |
3 files changed, 51 insertions, 107 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index bc7c15178c..6d49637f8e 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -150,6 +150,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, int success) { event *ev; int shutdown = 0; + gpr_log(GPR_DEBUG, "end_op:%p", tag); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); ev->base.success = success; @@ -167,26 +168,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, } } -void grpc_cq_begin_silent_op(grpc_completion_queue *cc) { - gpr_ref(&cc->refs); -} - -void grpc_cq_end_silent_op(grpc_completion_queue *cc) { - int shutdown = 0; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - if (gpr_unref(&cc->refs)) { - GPR_ASSERT(!cc->shutdown); - GPR_ASSERT(cc->shutdown_called); - cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - shutdown = 1; - } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - if (shutdown) { - grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc); - } -} - /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ static event *create_shutdown_event(void) { event *ev = gpr_malloc(sizeof(event)); diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index 896b7f708b..7b6fad98fd 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -50,12 +50,6 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call); void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, int success); -/* Begin a 'silent operation' - one that blocks completion queue shutdown - until it is ended */ -void grpc_cq_begin_silent_op(grpc_completion_queue *cc); -/* End such an operation */ -void grpc_cq_end_silent_op(grpc_completion_queue *cc); - /* disable polling for some tests */ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 0019b8d4a4..69f531fe1d 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -134,7 +134,6 @@ struct grpc_server { size_t cq_count; gpr_mu mu; - gpr_cv cv; registered_method *registered_methods; requested_call_array requested_calls; @@ -256,29 +255,32 @@ static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } -static void server_unref(grpc_server *server) { +static void server_delete(grpc_server *server) { registered_method *rm; size_t i; + grpc_channel_args_destroy(server->channel_args); + gpr_mu_destroy(&server->mu); + gpr_free(server->channel_filters); + requested_call_array_destroy(&server->requested_calls); + while ((rm = server->registered_methods) != NULL) { + server->registered_methods = rm->next; + gpr_free(rm->method); + gpr_free(rm->host); + requested_call_array_destroy(&rm->requested); + gpr_free(rm); + } + for (i = 0; i < server->cq_count; i++) { + grpc_cq_internal_unref(server->cqs[i]); + } + gpr_free(server->cqs); + gpr_free(server->pollsets); + gpr_free(server->shutdown_tags); + gpr_free(server); +} + +static void server_unref(grpc_server *server) { if (gpr_unref(&server->internal_refcount)) { - grpc_channel_args_destroy(server->channel_args); - gpr_mu_destroy(&server->mu); - gpr_cv_destroy(&server->cv); - gpr_free(server->channel_filters); - requested_call_array_destroy(&server->requested_calls); - while ((rm = server->registered_methods) != NULL) { - server->registered_methods = rm->next; - gpr_free(rm->method); - gpr_free(rm->host); - requested_call_array_destroy(&rm->requested); - gpr_free(rm); - } - for (i = 0; i < server->cq_count; i++) { - grpc_cq_internal_unref(server->cqs[i]); - } - gpr_free(server->cqs); - gpr_free(server->pollsets); - gpr_free(server->shutdown_tags); - gpr_free(server); + server_delete(server); } } @@ -371,12 +373,20 @@ static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } +static int num_listeners(grpc_server *server) { + listener *l; + int n = 0; + for (l = server->listeners; l; l = l->next) { + n++; + } + return n; +} + static void maybe_finish_shutdown(grpc_server *server) { size_t i, j; - if (server->shutdown && server->lists[ALL_CALLS] == NULL) { - for (j = 0; j < server->cq_count; j++) { - grpc_cq_end_silent_op(server->cqs[j]); - for (i = 0; i < server->num_shutdown_tags; i++) { + if (server->shutdown && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) { + for (i = 0; i < server->num_shutdown_tags; i++) { + for (j = 0; j < server->cq_count; j++) { grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1); } @@ -541,13 +551,16 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + int removed[CALL_LIST_COUNT]; size_t i; gpr_mu_lock(&chand->server->mu); for (i = 0; i < CALL_LIST_COUNT; i++) { - call_list_remove(elem->call_data, i); + removed[i] = call_list_remove(elem->call_data, i); + } + if (removed[ALL_CALLS]) { + maybe_finish_shutdown(chand->server); } - maybe_finish_shutdown(chand->server); gpr_mu_unlock(&chand->server->mu); if (calld->host) { @@ -633,7 +646,6 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, memset(server, 0, sizeof(grpc_server)); gpr_mu_init(&server->mu); - gpr_cv_init(&server->cv); /* decremented by grpc_server_destroy */ gpr_ref_init(&server->internal_refcount, 1); @@ -792,17 +804,7 @@ grpc_transport_setup_result grpc_server_setup_transport( return result; } -static int num_listeners(grpc_server *server) { - listener *l; - int n = 0; - for (l = server->listeners; l; l = l->next) { - n++; - } - return n; -} - -static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, - void *shutdown_tag) { +void grpc_server_shutdown_and_notify(grpc_server *server, void *shutdown_tag) { listener *l; requested_call_array requested_calls; channel_data **channels; @@ -815,24 +817,18 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu); - if (have_shutdown_tag) { - for (i = 0; i < server->cq_count; i++) { - grpc_cq_begin_op(server->cqs[i], NULL); - } - server->shutdown_tags = - gpr_realloc(server->shutdown_tags, - sizeof(void *) * (server->num_shutdown_tags + 1)); - server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag; + for (i = 0; i < server->cq_count; i++) { + grpc_cq_begin_op(server->cqs[i], NULL); } + server->shutdown_tags = + gpr_realloc(server->shutdown_tags, + sizeof(void *) * (server->num_shutdown_tags + 1)); + server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag; if (server->shutdown) { gpr_mu_unlock(&server->mu); return; } - for (i = 0; i < server->cq_count; i++) { - grpc_cq_begin_silent_op(server->cqs[i]); - } - nchannels = 0; for (c = server->root_channel_data.next; c != &server->root_channel_data; c = c->next) { @@ -898,49 +894,22 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, } } -void grpc_server_shutdown(grpc_server *server) { - shutdown_internal(server, 0, NULL); -} - -void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) { - shutdown_internal(server, 1, tag); -} - void grpc_server_listener_destroy_done(void *s) { grpc_server *server = s; gpr_mu_lock(&server->mu); server->listeners_destroyed++; - gpr_cv_signal(&server->cv); + maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu); } -static void continue_server_shutdown(void *server, int iomgr_success) { - grpc_server_destroy(server); -} - void grpc_server_destroy(grpc_server *server) { channel_data *c; listener *l; - size_t i; call_data *calld; gpr_mu_lock(&server->mu); - if (!server->shutdown) { - gpr_mu_unlock(&server->mu); - grpc_server_shutdown(server); - gpr_mu_lock(&server->mu); - } - - if (server->listeners_destroyed != num_listeners(server)) { - gpr_mu_unlock(&server->mu); - for (i = 0; i < server->cq_count; i++) { - grpc_cq_hack_spin_pollset(server->cqs[i]); - } - - /* delay execution some, and return early */ - grpc_iomgr_add_callback(continue_server_shutdown, server); - return; - } + GPR_ASSERT(server->shutdown); + GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { l = server->listeners; |