aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-17 22:29:04 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-02-17 22:29:04 -0800
commit7bd5ab10552cb12302aa563c2226bfb2eee1da3f (patch)
treecfb952b2fec4701027a5edc1d5a0b41938056757
parent1ece67cba24c9872ae8fbd2218c57bac8ff9794d (diff)
parent52342b39501b0c9afbd68c25697d7af6118d7f9d (diff)
Merge github.com:grpc/grpc into c++api
Conflicts: src/core/surface/server.c
-rw-r--r--include/grpc/grpc.h10
-rw-r--r--src/core/surface/channel.c20
-rw-r--r--src/core/surface/server.c53
-rw-r--r--test/core/end2end/tests/early_server_shutdown_finishes_tags.c2
4 files changed, 55 insertions, 30 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 705fb9d115..621740e038 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -590,15 +590,19 @@ void grpc_server_start(grpc_server *server);
/* Begin shutting down a server.
After completion, no new calls or connections will be admitted.
- Existing calls will be allowed to complete. */
+ Existing calls will be allowed to complete.
+ Shutdown is idempotent. */
void grpc_server_shutdown(grpc_server *server);
/* As per grpc_server_shutdown, but send a GRPC_SERVER_SHUTDOWN event when
- there are no more calls being serviced. */
+ there are no more calls being serviced.
+ Shutdown is idempotent, and all tags will be notified at once if multiple
+ grpc_server_shutdown_and_notify calls are made. */
void grpc_server_shutdown_and_notify(grpc_server *server, void *tag);
/* Destroy a server.
- Forcefully cancels all existing calls. */
+ Forcefully cancels all existing calls.
+ Implies grpc_server_shutdown() if one was not previously performed. */
void grpc_server_destroy(grpc_server *server);
#ifdef __cplusplus
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 514073ce0b..fef1c7d394 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -36,6 +36,7 @@
#include <stdlib.h>
#include <string.h>
+#include "src/core/iomgr/iomgr.h"
#include "src/core/surface/call.h"
#include "src/core/surface/client.h"
#include <grpc/support/alloc.h>
@@ -138,15 +139,20 @@ void grpc_channel_internal_ref(grpc_channel *channel) {
gpr_ref(&channel->refs);
}
+static void destroy_channel(void *p, int ok) {
+ grpc_channel *channel = p;
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
+ grpc_mdstr_unref(channel->grpc_status_string);
+ grpc_mdstr_unref(channel->grpc_message_string);
+ grpc_mdstr_unref(channel->path_string);
+ grpc_mdstr_unref(channel->authority_string);
+ grpc_mdctx_orphan(channel->metadata_context);
+ gpr_free(channel);
+}
+
void grpc_channel_internal_unref(grpc_channel *channel) {
if (gpr_unref(&channel->refs)) {
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
- grpc_mdstr_unref(channel->grpc_status_string);
- grpc_mdstr_unref(channel->grpc_message_string);
- grpc_mdstr_unref(channel->path_string);
- grpc_mdstr_unref(channel->authority_string);
- grpc_mdctx_orphan(channel->metadata_context);
- gpr_free(channel);
+ grpc_iomgr_add_callback(destroy_channel, channel);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 9b113610d5..7297a2a12d 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -141,8 +141,8 @@ struct grpc_server {
requested_call_array requested_calls;
gpr_uint8 shutdown;
- gpr_uint8 have_shutdown_tag;
- void *shutdown_tag;
+ size_t num_shutdown_tags;
+ void **shutdown_tags;
call_data *lists[CALL_LIST_COUNT];
channel_data root_channel_data;
@@ -273,6 +273,7 @@ static void server_unref(grpc_server *server) {
}
gpr_free(server->cqs);
gpr_free(server->pollsets);
+ gpr_free(server->shutdown_tags);
gpr_free(server);
}
}
@@ -514,17 +515,18 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- size_t i;
+ size_t i, j;
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
call_list_remove(elem->call_data, i);
}
- if (chand->server->shutdown && chand->server->have_shutdown_tag &&
- chand->server->lists[ALL_CALLS] == NULL) {
- for (i = 0; i < chand->server->cq_count; i++) {
- grpc_cq_end_server_shutdown(chand->server->cqs[i],
- chand->server->shutdown_tag);
+ if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
+ for (i = 0; i < chand->server->num_shutdown_tags; i++) {
+ for (j = 0; j < chand->server->cq_count; j++) {
+ grpc_cq_end_server_shutdown(chand->server->cqs[j],
+ chand->server->shutdown_tags[i]);
+ }
}
}
gpr_mu_unlock(&chand->server->mu);
@@ -586,8 +588,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
+ call_op, channel_op, sizeof(call_data),
+ init_call_elem, destroy_call_elem, sizeof(channel_data),
+ init_channel_elem, destroy_channel_elem, "server",
};
static void addcq(grpc_server *server, grpc_completion_queue *cq) {
@@ -762,7 +765,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
result = grpc_connected_channel_bind_transport(
grpc_channel_get_channel_stack(channel), transport);
-
+
gpr_mu_lock(&s->mu);
chand->next = &s->root_channel_data;
chand->prev = chand->next->prev;
@@ -781,13 +784,22 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
channel_data **channels;
channel_data *c;
size_t nchannels;
- size_t i;
+ size_t i, j;
grpc_channel_op op;
grpc_channel_element *elem;
registered_method *rm;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu);
+ if (have_shutdown_tag) {
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
+ }
+ server->shutdown_tags =
+ gpr_realloc(server->shutdown_tags,
+ sizeof(void *) * (server->num_shutdown_tags + 1));
+ server->shutdown_tags[server->num_shutdown_tags++] = shutdown_tag;
+ }
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
return;
@@ -828,13 +840,10 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
}
server->shutdown = 1;
- server->have_shutdown_tag = have_shutdown_tag;
- server->shutdown_tag = shutdown_tag;
- if (have_shutdown_tag) {
- for (i = 0; i < server->cq_count; i++) {
- grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
- if (server->lists[ALL_CALLS] == NULL) {
- grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
+ if (server->lists[ALL_CALLS] == NULL) {
+ for (i = 0; i < server->num_shutdown_tags; i++) {
+ for (j = 0; j < server->cq_count; j++) {
+ grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
}
}
}
@@ -883,6 +892,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server, void *tag) {
void grpc_server_destroy(grpc_server *server) {
channel_data *c;
gpr_mu_lock(&server->mu);
+ if (!server->shutdown) {
+ gpr_mu_unlock(&server->mu);
+ grpc_server_shutdown(server);
+ gpr_mu_lock(&server->mu);
+ }
+
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
shutdown_channel(c);
diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
index 123c8bc415..51486cc169 100644
--- a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
+++ b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
@@ -79,7 +79,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
- grpc_server_shutdown(f->server);
+ /* don't shutdown, just destroy, to tickle this code edge */
grpc_server_destroy(f->server);
f->server = NULL;
}