aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r--src/core/surface/server.c172
1 files changed, 98 insertions, 74 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 825ef66804..c8ac559a0d 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -114,6 +114,7 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
+ size_t num_calls;
grpc_channel *channel;
grpc_mdstr *path_key;
grpc_mdstr *authority_key;
@@ -123,7 +124,6 @@ struct channel_data {
channel_registered_method *registered_methods;
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
- grpc_iomgr_closure finish_shutdown_channel_closure;
grpc_iomgr_closure finish_destroy_channel_closure;
};
@@ -198,6 +198,9 @@ struct call_data {
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
+static void shutdown_channel(channel_data *chand, int send_goaway,
+ int send_disconnect);
+static void maybe_finish_shutdown(grpc_server *server);
static int call_list_join(call_data **root, call_data *call, call_list list) {
GPR_ASSERT(!call->root[list]);
@@ -281,7 +284,7 @@ static void server_delete(grpc_server *server) {
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
- grpc_cq_internal_unref(server->cqs[i]);
+ GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
}
gpr_free(server->cqs);
gpr_free(server->pollsets);
@@ -308,7 +311,7 @@ static void orphan_channel(channel_data *chand) {
static void finish_destroy_channel(void *cd, int success) {
channel_data *chand = cd;
grpc_server *server = chand->server;
- grpc_channel_internal_unref(chand->channel);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
server_unref(server);
}
@@ -317,6 +320,7 @@ static void destroy_channel(channel_data *chand) {
GPR_ASSERT(chand->server != NULL);
orphan_channel(chand);
server_ref(chand->server);
+ maybe_finish_shutdown(chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
@@ -397,12 +401,28 @@ static int num_listeners(grpc_server *server) {
static void maybe_finish_shutdown(grpc_server *server) {
size_t i;
- if (server->shutdown && !server->shutdown_published && server->lists[ALL_CALLS] == NULL && server->listeners_destroyed == num_listeners(server)) {
- server->shutdown_published = 1;
- for (i = 0; i < server->num_shutdown_tags; i++) {
- grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
- NULL, 1);
- }
+ if (!server->shutdown || server->shutdown_published) {
+ return;
+ }
+ if (server->lists[ALL_CALLS] != NULL) {
+ gpr_log(GPR_DEBUG,
+ "Waiting for all calls to finish before destroying server");
+ return;
+ }
+ if (server->root_channel_data.next != &server->root_channel_data) {
+ gpr_log(GPR_DEBUG,
+ "Waiting for all channels to close before destroying server");
+ return;
+ }
+ if (server->listeners_destroyed < num_listeners(server)) {
+ gpr_log(GPR_DEBUG, "Waiting for all listeners to be destroyed (@ %d/%d)",
+ server->listeners_destroyed, num_listeners(server));
+ return;
+ }
+ server->shutdown_published = 1;
+ for (i = 0; i < server->num_shutdown_tags; i++) {
+ grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
+ NULL, 1);
}
}
@@ -420,6 +440,14 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
+static void decrement_call_count(channel_data *chand) {
+ chand->num_calls--;
+ if (0 == chand->num_calls && chand->server->shutdown) {
+ shutdown_channel(chand, 0, 1);
+ }
+ maybe_finish_shutdown(chand->server);
+}
+
static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
@@ -467,10 +495,9 @@ static void server_on_recv(void *ptr, int success) {
calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
-
}
if (call_list_remove(calld, ALL_CALLS)) {
- maybe_finish_shutdown(chand->server);
+ decrement_call_count(chand);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -531,22 +558,49 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void finish_shutdown_channel(void *cd, int success) {
- channel_data *chand = cd;
+typedef struct {
+ channel_data *chand;
+ int send_goaway;
+ int send_disconnect;
+ grpc_iomgr_closure finish_shutdown_channel_closure;
+} shutdown_channel_args;
+
+static void finish_shutdown_channel(void *p, int success) {
+ shutdown_channel_args *sca = p;
grpc_channel_op op;
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(chand->channel), 0),
- NULL, &op);
- grpc_channel_internal_unref(chand->channel);
+
+ if (sca->send_goaway) {
+ op.type = GRPC_CHANNEL_GOAWAY;
+ op.dir = GRPC_CALL_DOWN;
+ op.data.goaway.status = GRPC_STATUS_OK;
+ op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
+ channel_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(sca->chand->channel), 0),
+ NULL, &op);
+ }
+ if (sca->send_disconnect) {
+ op.type = GRPC_CHANNEL_DISCONNECT;
+ op.dir = GRPC_CALL_DOWN;
+ channel_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(sca->chand->channel), 0),
+ NULL, &op);
+ }
+ GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown");
+
+ gpr_free(sca);
}
-static void shutdown_channel(channel_data *chand) {
- grpc_channel_internal_ref(chand->channel);
- chand->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
- chand->finish_shutdown_channel_closure.cb_arg = chand;
- grpc_iomgr_add_callback(&chand->finish_shutdown_channel_closure);
+static void shutdown_channel(channel_data *chand, int send_goaway,
+ int send_disconnect) {
+ shutdown_channel_args *sca;
+ GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown");
+ sca = gpr_malloc(sizeof(shutdown_channel_args));
+ sca->chand = chand;
+ sca->send_goaway = send_goaway;
+ sca->send_disconnect = send_disconnect;
+ sca->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
+ sca->finish_shutdown_channel_closure.cb_arg = sca;
+ grpc_iomgr_add_callback(&sca->finish_shutdown_channel_closure);
}
static void init_call_elem(grpc_call_element *elem,
@@ -560,6 +614,7 @@ static void init_call_elem(grpc_call_element *elem,
gpr_mu_lock(&chand->server->mu);
call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
+ chand->num_calls++;
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
@@ -578,7 +633,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
removed[i] = call_list_remove(elem->call_data, i);
}
if (removed[ALL_CALLS]) {
- maybe_finish_shutdown(chand->server);
+ decrement_call_count(chand);
}
gpr_mu_unlock(&chand->server->mu);
@@ -600,6 +655,7 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
chand->server = NULL;
+ chand->num_calls = 0;
chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
@@ -626,6 +682,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
+ maybe_finish_shutdown(chand->server);
gpr_mu_unlock(&chand->server->mu);
grpc_mdstr_unref(chand->path_key);
grpc_mdstr_unref(chand->authority_key);
@@ -651,7 +708,8 @@ void grpc_server_register_completion_queue(grpc_server *server,
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
- grpc_cq_internal_ref(cq);
+ GRPC_CQ_INTERNAL_REF(cq, "server");
+ grpc_cq_mark_server_cq(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));
@@ -713,7 +771,8 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
registered_method *m;
if (!method) {
- gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL");
+ gpr_log(GPR_ERROR,
+ "grpc_server_register_method method string cannot be NULL");
return NULL;
}
for (m = server->registered_methods; m; m = m->next) {
@@ -836,12 +895,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
- channel_data **channels;
channel_data *c;
- size_t nchannels;
size_t i;
- grpc_channel_op op;
- grpc_channel_element *elem;
registered_method *rm;
shutdown_tag *sdt;
@@ -859,18 +914,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
- nchannels = 0;
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
- nchannels++;
- }
- channels = gpr_malloc(sizeof(channel_data *) * nchannels);
- i = 0;
- for (c = server->root_channel_data.next; c != &server->root_channel_data;
- c = c->next) {
- grpc_channel_internal_ref(c->channel);
- channels[i] = c;
- i++;
+ shutdown_channel(c, 1, c->num_calls == 0);
}
/* collect all unregistered then registered calls */
@@ -897,21 +943,6 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu);
- for (i = 0; i < nchannels; i++) {
- c = channels[i];
- elem = grpc_channel_stack_element(
- grpc_channel_get_channel_stack(c->channel), 0);
-
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
- elem->filter->channel_op(elem, NULL, &op);
-
- grpc_channel_internal_unref(c->channel);
- }
- gpr_free(channels);
-
/* terminate all the requested calls */
for (i = 0; i < requested_calls.count; i++) {
fail_call(server, &requested_calls.calls[i]);
@@ -953,7 +984,9 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
call_count = 0;
calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
- for (calld = server->lists[ALL_CALLS]; calld != server->lists[ALL_CALLS] || is_first; calld = calld->links[ALL_CALLS].next) {
+ for (calld = server->lists[ALL_CALLS];
+ calld != server->lists[ALL_CALLS] || is_first;
+ calld = calld->links[ALL_CALLS].next) {
if (call_count == call_capacity) {
call_capacity *= 2;
calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
@@ -966,7 +999,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
gpr_mu_unlock(&server->mu);
for (i = 0; i < call_count; i++) {
- grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, "Unavailable");
+ grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
+ "Unavailable");
GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
}
@@ -974,9 +1008,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
}
void grpc_server_destroy(grpc_server *server) {
- channel_data *c;
listener *l;
- call_data *calld;
gpr_mu_lock(&server->mu);
GPR_ASSERT(server->shutdown || !server->listeners);
@@ -988,19 +1020,6 @@ void grpc_server_destroy(grpc_server *server) {
gpr_free(l);
}
- while ((calld = call_list_remove_head(&server->lists[PENDING_START],
- PENDING_START)) != NULL) {
- calld->state = ZOMBIED;
- grpc_iomgr_closure_init(
- &calld->kill_zombie_closure, kill_zombie,
- grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_iomgr_add_callback(&calld->kill_zombie_closure);
- }
-
- for (c = server->root_channel_data.next; c != &server->root_channel_data;
- c = c->next) {
- shutdown_channel(c);
- }
gpr_mu_unlock(&server->mu);
server_unref(server);
@@ -1063,6 +1082,9 @@ grpc_call_error grpc_server_request_call(
GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details,
initial_metadata, cq_bound_to_call,
cq_for_notification, tag);
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
@@ -1081,6 +1103,9 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue *cq_for_notification, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
+ if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
+ }
grpc_cq_begin_op(cq_for_notification, NULL);
rc.type = REGISTERED_CALL;
rc.tag = tag;
@@ -1192,4 +1217,3 @@ int grpc_server_has_open_connections(grpc_server *server) {
gpr_mu_unlock(&server->mu);
return r;
}
-