aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/server.c
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2015-07-08 15:16:22 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2015-07-08 15:17:10 -0700
commit17bb64981a0035482193a6c02b030594b3a68f25 (patch)
treef3e6db1e9715bd6f7276f48cdcaa7f5ee5923fc8 /src/core/surface/server.c
parenta01f7a41cc1153bba33420a31281a5a2eb14c407 (diff)
parentfc1a49a7354696a56f0065880a2d4a209516f774 (diff)
Merge branch 'master' of github.com:grpc/grpc into decompression
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r--src/core/surface/server.c309
1 files changed, 133 insertions, 176 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 278ea95488..b26637a6ac 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -51,7 +51,7 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
-typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
+typedef enum { PENDING_START, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
@@ -114,7 +114,7 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
- size_t num_calls;
+ grpc_connectivity_state connectivity_state;
grpc_channel *channel;
grpc_mdstr *path_key;
grpc_mdstr *authority_key;
@@ -125,6 +125,7 @@ struct channel_data {
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
grpc_iomgr_closure finish_destroy_channel_closure;
+ grpc_iomgr_closure channel_connectivity_changed;
};
typedef struct shutdown_tag {
@@ -149,7 +150,7 @@ struct grpc_server {
before mu_call. This is currently used in shutdown processing
(grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
gpr_mu mu_global; /* mutex for server and channel state */
- gpr_mu mu_call; /* mutex for call-specific state */
+ gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
requested_call_array requested_calls;
@@ -200,18 +201,83 @@ struct call_data {
call_link links[CALL_LIST_COUNT];
};
+typedef struct {
+ grpc_channel **channels;
+ size_t num_channels;
+} channel_broadcaster;
+
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
-static void shutdown_channel(channel_data *chand, int send_goaway,
- int send_disconnect);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
static void maybe_finish_shutdown(grpc_server *server);
+/* channel broadcaster */
+
+/* assumes server locked */
+static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
+ channel_data *c;
+ size_t count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
+ count++;
+ }
+ cb->num_channels = count;
+ cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
+ count = 0;
+ for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
+ cb->channels[count++] = c->channel;
+ GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
+ }
+}
+
+struct shutdown_cleanup_args {
+ grpc_iomgr_closure closure;
+ gpr_slice slice;
+};
+
+static void shutdown_cleanup(void *arg, int iomgr_status_ignored) {
+ struct shutdown_cleanup_args *a = arg;
+ gpr_slice_unref(a->slice);
+ gpr_free(a);
+}
+
+static void send_shutdown(grpc_channel *channel, int send_goaway,
+ int send_disconnect) {
+ grpc_transport_op op;
+ struct shutdown_cleanup_args *sc;
+ grpc_channel_element *elem;
+
+ memset(&op, 0, sizeof(op));
+ op.send_goaway = send_goaway;
+ sc = gpr_malloc(sizeof(*sc));
+ sc->slice = gpr_slice_from_copied_string("Server shutdown");
+ op.goaway_message = &sc->slice;
+ op.goaway_status = GRPC_STATUS_OK;
+ op.disconnect = send_disconnect;
+ grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
+ op.on_consumed = &sc->closure;
+
+ elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
+ elem->filter->start_transport_op(elem, &op);
+}
+
+static void channel_broadcaster_shutdown(channel_broadcaster *cb,
+ int send_goaway, int force_disconnect) {
+ size_t i;
+
+ for (i = 0; i < cb->num_channels; i++) {
+ send_shutdown(cb->channels[i], send_goaway, force_disconnect);
+ GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
+ }
+ gpr_free(cb->channels);
+}
+
+/* call list */
+
static int call_list_join(call_data **root, call_data *call, call_list list) {
GPR_ASSERT(!call->root[list]);
call->root[list] = root;
@@ -416,15 +482,6 @@ static void maybe_finish_shutdown(grpc_server *server) {
return;
}
- gpr_mu_lock(&server->mu_call);
- if (server->lists[ALL_CALLS] != NULL) {
- gpr_log(GPR_DEBUG,
- "Waiting for all calls to finish before destroying server");
- gpr_mu_unlock(&server->mu_call);
- return;
- }
- gpr_mu_unlock(&server->mu_call);
-
if (server->root_channel_data.next != &server->root_channel_data) {
gpr_log(GPR_DEBUG,
"Waiting for all channels to close before destroying server");
@@ -456,19 +513,10 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void decrement_call_count(channel_data *chand) {
- chand->num_calls--;
- if (0 == chand->num_calls && chand->server->shutdown) {
- shutdown_channel(chand, 0, 1);
- }
- maybe_finish_shutdown(chand->server);
-}
-
static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- int remove_res;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -513,20 +561,15 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
- remove_res = call_list_remove(calld, ALL_CALLS);
gpr_mu_unlock(&chand->server->mu_call);
- gpr_mu_lock(&chand->server->mu_global);
- if (remove_res) {
- decrement_call_count(chand);
- }
- gpr_mu_unlock(&chand->server->mu_global);
break;
}
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
-static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+static void server_mutate_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
if (op->recv_ops) {
@@ -538,92 +581,43 @@ static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
}
}
-static void server_start_transport_op(grpc_call_element *elem,
- grpc_transport_op *op) {
+static void server_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
-static void channel_op(grpc_channel_element *elem,
- grpc_channel_element *from_elem, grpc_channel_op *op) {
- channel_data *chand = elem->channel_data;
- grpc_server *server = chand->server;
-
- switch (op->type) {
- case GRPC_ACCEPT_CALL:
- /* create a call */
- grpc_call_create(chand->channel, NULL,
- op->data.accept_call.transport_server_data, NULL, 0,
- gpr_inf_future);
- break;
- case GRPC_TRANSPORT_CLOSED:
- /* if the transport is closed for a server channel, we destroy the
- channel */
- gpr_mu_lock(&server->mu_global);
- server_ref(server);
- destroy_channel(chand);
- gpr_mu_unlock(&server->mu_global);
- server_unref(server);
- break;
- case GRPC_TRANSPORT_GOAWAY:
- gpr_slice_unref(op->data.goaway.message);
- break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_channel_next_op(elem, op);
- break;
- }
+static void accept_stream(void *cd, grpc_transport *transport,
+ const void *transport_server_data) {
+ channel_data *chand = cd;
+ /* create a call */
+ grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
+ gpr_inf_future);
}
-typedef struct {
- channel_data *chand;
- int send_goaway;
- int send_disconnect;
- grpc_iomgr_closure finish_shutdown_channel_closure;
-} shutdown_channel_args;
-
-static void finish_shutdown_channel(void *p, int success) {
- shutdown_channel_args *sca = p;
- grpc_channel_op op;
-
- if (sca->send_goaway) {
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
- }
- if (sca->send_disconnect) {
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
+static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
+ channel_data *chand = cd;
+ grpc_server *server = chand->server;
+ if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ &op);
+ } else {
+ gpr_mu_lock(&server->mu_global);
+ destroy_channel(chand);
+ gpr_mu_unlock(&server->mu_global);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
}
- GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown");
-
- gpr_free(sca);
-}
-
-static void shutdown_channel(channel_data *chand, int send_goaway,
- int send_disconnect) {
- shutdown_channel_args *sca;
- GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown");
- sca = gpr_malloc(sizeof(shutdown_channel_args));
- sca->chand = chand;
- sca->send_goaway = send_goaway;
- sca->send_disconnect = send_disconnect;
- sca->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
- sca->finish_shutdown_channel_closure.cb_arg = sca;
- grpc_iomgr_add_callback(&sca->finish_shutdown_channel_closure);
}
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_op *initial_op) {
+ grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -632,14 +626,6 @@ static void init_call_elem(grpc_call_element *elem,
grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
- gpr_mu_lock(&chand->server->mu_call);
- call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
- gpr_mu_unlock(&chand->server->mu_call);
-
- gpr_mu_lock(&chand->server->mu_global);
- chand->num_calls++;
- gpr_mu_unlock(&chand->server->mu_global);
-
server_ref(chand->server);
if (initial_op) server_mutate_op(elem, initial_op);
@@ -648,19 +634,13 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- int removed[CALL_LIST_COUNT];
size_t i;
gpr_mu_lock(&chand->server->mu_call);
for (i = 0; i < CALL_LIST_COUNT; i++) {
- removed[i] = call_list_remove(elem->call_data, i);
+ call_list_remove(elem->call_data, i);
}
gpr_mu_unlock(&chand->server->mu_call);
- if (removed[ALL_CALLS]) {
- gpr_mu_lock(&chand->server->mu_global);
- decrement_call_count(chand);
- gpr_mu_unlock(&chand->server->mu_global);
- }
if (calld->host) {
grpc_mdstr_unref(calld->host);
@@ -672,7 +652,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
server_unref(chand->server);
}
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
@@ -680,12 +660,14 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(is_first);
GPR_ASSERT(!is_last);
chand->server = NULL;
- chand->num_calls = 0;
chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
+ chand->connectivity_state = GRPC_CHANNEL_IDLE;
+ grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@@ -716,8 +698,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- server_start_transport_op,
- channel_op,
+ server_start_transport_stream_op,
+ grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
destroy_call_elem,
@@ -831,10 +813,10 @@ void grpc_server_start(grpc_server *server) {
}
}
-grpc_transport_setup_result grpc_server_setup_transport(
- grpc_server *s, grpc_transport *transport,
- grpc_channel_filter const **extra_filters, size_t num_extra_filters,
- grpc_mdctx *mdctx, const grpc_channel_args *args) {
+void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
+ grpc_channel_filter const **extra_filters,
+ size_t num_extra_filters, grpc_mdctx *mdctx,
+ const grpc_channel_args *args) {
size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
grpc_channel_filter const **filters =
gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
@@ -851,7 +833,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_uint32 slots;
gpr_uint32 probes;
gpr_uint32 max_probes = 0;
- grpc_transport_setup_result result;
+ grpc_transport_op op;
for (i = 0; i < s->channel_filter_count; i++) {
filters[i] = s->channel_filters[i];
@@ -862,7 +844,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
filters[i] = &grpc_connected_channel_filter;
for (i = 0; i < s->cq_count; i++) {
- grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
+ memset(&op, 0, sizeof(op));
+ op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
+ grpc_transport_perform_op(transport, &op);
}
channel =
@@ -903,8 +887,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
chand->registered_method_max_probes = max_probes;
}
- result = grpc_connected_channel_bind_transport(
- grpc_channel_get_channel_stack(channel), transport);
+ grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
+ transport);
gpr_mu_lock(&s->mu_global);
chand->next = &s->root_channel_data;
@@ -914,17 +898,23 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_free(filters);
- return result;
+ GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = accept_stream;
+ op.set_accept_stream_user_data = chand;
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_transport_perform_op(transport, &op);
}
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
requested_call_array requested_calls;
- channel_data *c;
size_t i;
registered_method *rm;
shutdown_tag *sdt;
+ channel_broadcaster broadcaster;
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
@@ -940,10 +930,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
return;
}
- for (c = server->root_channel_data.next; c != &server->root_channel_data;
- c = c->next) {
- shutdown_channel(c, 1, c->num_calls == 0);
- }
+ channel_broadcaster_init(server, &broadcaster);
/* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
@@ -981,6 +968,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
for (l = server->listeners; l; l = l->next) {
l->destroy(server, l->arg);
}
+
+ channel_broadcaster_shutdown(&broadcaster, 1, 0);
}
void grpc_server_listener_destroy_done(void *s) {
@@ -992,47 +981,13 @@ void grpc_server_listener_destroy_done(void *s) {
}
void grpc_server_cancel_all_calls(grpc_server *server) {
- call_data *calld;
- grpc_call **calls;
- size_t call_count;
- size_t call_capacity;
- int is_first = 1;
- size_t i;
-
- gpr_mu_lock(&server->mu_call);
-
- GPR_ASSERT(server->shutdown);
-
- if (!server->lists[ALL_CALLS]) {
- gpr_mu_unlock(&server->mu_call);
- return;
- }
+ channel_broadcaster broadcaster;
- call_capacity = 8;
- call_count = 0;
- calls = gpr_malloc(sizeof(grpc_call *) * call_capacity);
-
- for (calld = server->lists[ALL_CALLS];
- calld != server->lists[ALL_CALLS] || is_first;
- calld = calld->links[ALL_CALLS].next) {
- if (call_count == call_capacity) {
- call_capacity *= 2;
- calls = gpr_realloc(calls, sizeof(grpc_call *) * call_capacity);
- }
- calls[call_count++] = calld->call;
- GRPC_CALL_INTERNAL_REF(calld->call, "cancel_all");
- is_first = 0;
- }
-
- gpr_mu_unlock(&server->mu_call);
-
- for (i = 0; i < call_count; i++) {
- grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
- "Unavailable");
- GRPC_CALL_INTERNAL_UNREF(calls[i], "cancel_all", 1);
- }
+ gpr_mu_lock(&server->mu_global);
+ channel_broadcaster_init(server, &broadcaster);
+ gpr_mu_unlock(&server->mu_global);
- gpr_free(calls);
+ channel_broadcaster_shutdown(&broadcaster, 0, 1);
}
void grpc_server_destroy(grpc_server *server) {
@@ -1181,6 +1136,8 @@ static void begin_call(grpc_server *server, call_data *calld,
calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
case BATCH_CALL:
+ GPR_ASSERT(calld->host != NULL);
+ GPR_ASSERT(calld->path != NULL);
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,