aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-05-26 16:15:34 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-05-26 16:15:34 -0700
commitee945e8325ff7d67be6990b6193e19f865ec7b30 (patch)
treefa0d40b2a59de878f4db98636441db680b896cc8 /src/core/surface
parent9db8f1ade2222a00578abf5300abd1734a0e5ed4 (diff)
Work towards removing grpc_server_shutdown
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/completion_queue.c21
-rw-r--r--src/core/surface/completion_queue.h6
-rw-r--r--src/core/surface/server.c131
3 files changed, 51 insertions, 107 deletions
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index bc7c15178c..6d49637f8e 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -150,6 +150,7 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success) {
event *ev;
int shutdown = 0;
+ gpr_log(GPR_DEBUG, "end_op:%p", tag);
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
ev->base.success = success;
@@ -167,26 +168,6 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
}
}
-void grpc_cq_begin_silent_op(grpc_completion_queue *cc) {
- gpr_ref(&cc->refs);
-}
-
-void grpc_cq_end_silent_op(grpc_completion_queue *cc) {
- int shutdown = 0;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- if (gpr_unref(&cc->refs)) {
- GPR_ASSERT(!cc->shutdown);
- GPR_ASSERT(cc->shutdown_called);
- cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- shutdown = 1;
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- if (shutdown) {
- grpc_pollset_shutdown(&cc->pollset, on_pollset_destroy_done, cc);
- }
-}
-
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
static event *create_shutdown_event(void) {
event *ev = gpr_malloc(sizeof(event));
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index 896b7f708b..7b6fad98fd 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -50,12 +50,6 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
int success);
-/* Begin a 'silent operation' - one that blocks completion queue shutdown
- until it is ended */
-void grpc_cq_begin_silent_op(grpc_completion_queue *cc);
-/* End such an operation */
-void grpc_cq_end_silent_op(grpc_completion_queue *cc);
-
/* disable polling for some tests */
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 0019b8d4a4..69f531fe1d 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -134,7 +134,6 @@ struct grpc_server {
size_t cq_count;
gpr_mu mu;
- gpr_cv cv;
registered_method *registered_methods;
requested_call_array requested_calls;
@@ -256,29 +255,32 @@ static void server_ref(grpc_server *server) {
gpr_ref(&server->internal_refcount);
}
-static void server_unref(grpc_server *server) {
+static void server_delete(grpc_server *server) {
registered_method *rm;
size_t i;
+ grpc_channel_args_destroy(server->channel_args);
+ gpr_mu_destroy(&server->mu);
+ gpr_free(server->channel_filters);
+ requested_call_array_destroy(&server->requested_calls);
+ while ((rm = server->registered_methods) != NULL) {
+ server->registered_methods = rm->next;
+ gpr_free(rm->method);
+ gpr_free(rm->host);
+ requested_call_array_destroy(&rm->requested);
+ gpr_free(rm);
+ }
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_internal_unref(server->cqs[i]);
+ }
+ gpr_free(server->cqs);
+ gpr_free(server->pollsets);
+ gpr_free(server->shutdown_tags);
+ gpr_free(server);
+}
+
+static void server_unref(grpc_server *server) {
if (gpr_unref(&server->internal_refcount)) {
- grpc_channel_args_destroy(server->channel_args);
- gpr_mu_destroy(&server->mu);
- gpr_cv_destroy(&server->cv);
- gpr_free(server->channel_filters);
- requested_call_array_destroy(&server->requested_calls);
- while ((rm = server->registered_methods) != NULL) {
- server->registered_methods = rm->next;
- gpr_free(rm->method);
- gpr_free(rm->host);
- requested_call_array_destroy(&rm->requested);
- gpr_free(rm);
- }
- for (i = 0; i < server->cq_count; i++) {
- grpc_cq_internal_unref(server->cqs[i]);
- }
- gpr_free(server->cqs);
- gpr_free(server->pollsets);
- gpr_free(server->shutdown_tags);
- gpr_free(server);
+ server_delete(server);
}
}
@@ -371,12 +373,20 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
+static int num_listeners(grpc_server *server) {
+ listener *l;
+ int n = 0;
+ for (l = server->listeners; l; l = l->next) {
+ n++;
+ }
+ return n;
+}
+
static void maybe_finish_shutdown(grpc_server *server) {
size_t i, j;
- if (server->shutdown && server->lists[ALL_CALLS] == NULL) {
- for (j = 0; j < server->cq_count; j++) {
- grpc_cq_end_silent_op(server->cqs[j]);
- for (i = 0; i < server->num_shutdown_tags; i++) {
+ if (server->shutdown && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
+ for (i = 0; i < server->num_shutdown_tags; i++) {
+ for (j = 0; j < server->cq_count; j++) {
grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i],
NULL, 1);
}
@@ -541,13 +551,16 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
+ int removed[CALL_LIST_COUNT];
size_t i;
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
- call_list_remove(elem->call_data, i);
+ removed[i] = call_list_remove(elem->call_data, i);
+ }
+ if (removed[ALL_CALLS]) {
+ maybe_finish_shutdown(chand->server);
}
- maybe_finish_shutdown(chand->server);
gpr_mu_unlock(&chand->server->mu);
if (calld->host) {
@@ -633,7 +646,6 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
memset(server, 0, sizeof(grpc_server));
gpr_mu_init(&server->mu);
- gpr_cv_init(&server->cv);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@@ -792,17 +804,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
return result;
}
-static int num_listeners(grpc_server *server) {
- listener *l;
- int n = 0;
- for (l = server->listeners; l; l = l->next) {
- n++;
- }
- return n;
-}
-
-static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
- void *shutdown_tag) {
+void grpc_server_shutdown_and_notify(grpc_server *server, void *shutdown_tag) {
listener *l;
requested_call_array requested_calls;
channel_data **channels;
@@ -815,24 +817,18 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu);
- if (have_shutdown_tag) {
- for (i = 0; i < server->cq_count; i++) {
- grpc_cq_begin_op(server->cqs[i], NULL);
- }
- server->shutdown_tags =
- gpr_realloc(server->shutdown_tags,
- sizeof(void *) * (server->num_shutdown_tags + 1));
- server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_begin_op(server->cqs[i], NULL);
}
+ server->shutdown_tags =
+ gpr_realloc(server->shutdown_tags,
+ sizeof(void *) * (server->num_shutdown_tags + 1));
+ server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
return;
}
- for (i = 0; i < server->cq_count; i++) {
- grpc_cq_begin_silent_op(server->cqs[i]);
- }
-
nchannels = 0;
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
@@ -898,49 +894,22 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
}
-void grpc_server_shutdown(grpc_server *server) {
- shutdown_internal(server, 0, NULL);
-}
-
-void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
- shutdown_internal(server, 1, tag);
-}
-
void grpc_server_listener_destroy_done(void *s) {
grpc_server *server = s;
gpr_mu_lock(&server->mu);
server->listeners_destroyed++;
- gpr_cv_signal(&server->cv);
+ maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu);
}
-static void continue_server_shutdown(void *server, int iomgr_success) {
- grpc_server_destroy(server);
-}
-
void grpc_server_destroy(grpc_server *server) {
channel_data *c;
listener *l;
- size_t i;
call_data *calld;
gpr_mu_lock(&server->mu);
- if (!server->shutdown) {
- gpr_mu_unlock(&server->mu);
- grpc_server_shutdown(server);
- gpr_mu_lock(&server->mu);
- }
-
- if (server->listeners_destroyed != num_listeners(server)) {
- gpr_mu_unlock(&server->mu);
- for (i = 0; i < server->cq_count; i++) {
- grpc_cq_hack_spin_pollset(server->cqs[i]);
- }
-
- /* delay execution some, and return early */
- grpc_iomgr_add_callback(continue_server_shutdown, server);
- return;
- }
+ GPR_ASSERT(server->shutdown);
+ GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
while (server->listeners) {
l = server->listeners;