aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/server.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-25 12:54:23 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-25 12:54:23 -0700
commite039f0338333e1a2f368ec20740662fb2eac2875 (patch)
treec4267e757c1196815e0559790d56a6e7679c373d /src/core/surface/server.c
parent3f475422ecb8cd5c648ce86f126122ba6dee1c9c (diff)
Plumbing transport_op changes through
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r--src/core/surface/server.c100
1 files changed, 75 insertions, 25 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 607344a7a6..568f7925dd 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -115,6 +115,7 @@ typedef struct channel_registered_method {
struct channel_data {
grpc_server *server;
size_t num_calls;
+ grpc_connectivity_state connectivity_state;
grpc_channel *channel;
grpc_mdstr *path_key;
grpc_mdstr *authority_key;
@@ -125,6 +126,7 @@ struct channel_data {
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
grpc_iomgr_closure finish_destroy_channel_closure;
+ grpc_iomgr_closure channel_connectivity_changed;
};
typedef struct shutdown_tag {
@@ -539,13 +541,41 @@ static void server_mutate_op(grpc_call_element *elem,
}
}
-static void server_start_transport_op(grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+static void server_start_transport_stream_op(grpc_call_element *elem,
+ grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(elem, op);
}
+static void accept_stream(void *cd, grpc_transport *transport,
+ const void *transport_server_data) {
+ channel_data *chand = cd;
+ /* create a call */
+ grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
+ gpr_inf_future);
+}
+
+static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {
+ channel_data *chand = cd;
+ grpc_server *server = chand->server;
+ if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(chand->channel), 0),
+ &op);
+ } else {
+ gpr_mu_lock(&server->mu_global);
+ destroy_channel(chand);
+ gpr_mu_unlock(&server->mu_global);
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
+ }
+}
+
+#if 0
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
channel_data *chand = elem->channel_data;
@@ -576,39 +606,45 @@ static void channel_op(grpc_channel_element *elem,
break;
}
}
+#endif
typedef struct {
channel_data *chand;
int send_goaway;
int send_disconnect;
grpc_iomgr_closure finish_shutdown_channel_closure;
+
+ /* for use during shutdown: the goaway message to send */
+ gpr_slice goaway_message;
} shutdown_channel_args;
-static void finish_shutdown_channel(void *p, int success) {
+static void destroy_shutdown_channel_args(void *p, int success) {
shutdown_channel_args *sca = p;
- grpc_channel_op op;
-
- if (sca->send_goaway) {
- op.type = GRPC_CHANNEL_GOAWAY;
- op.dir = GRPC_CALL_DOWN;
- op.data.goaway.status = GRPC_STATUS_OK;
- op.data.goaway.message = gpr_slice_from_copied_string("Server shutdown");
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
- }
- if (sca->send_disconnect) {
- op.type = GRPC_CHANNEL_DISCONNECT;
- op.dir = GRPC_CALL_DOWN;
- channel_op(grpc_channel_stack_element(
- grpc_channel_get_channel_stack(sca->chand->channel), 0),
- NULL, &op);
- }
GRPC_CHANNEL_INTERNAL_UNREF(sca->chand->channel, "shutdown");
-
+ gpr_slice_unref(sca->goaway_message);
gpr_free(sca);
}
+static void finish_shutdown_channel(void *p, int success) {
+ shutdown_channel_args *sca = p;
+ grpc_transport_op op;
+ memset(&op, 0, sizeof(op));
+
+ op.send_goaway = sca->send_goaway;
+ sca->goaway_message = gpr_slice_from_copied_string("Server shutdown");
+ op.goaway_message = &sca->goaway_message;
+ op.goaway_status = GRPC_STATUS_OK;
+ op.disconnect = sca->send_disconnect;
+ grpc_iomgr_closure_init(&sca->finish_shutdown_channel_closure,
+ destroy_shutdown_channel_args, sca);
+ op.on_consumed = &sca->finish_shutdown_channel_closure;
+
+ grpc_channel_next_op(
+ grpc_channel_stack_element(
+ grpc_channel_get_channel_stack(sca->chand->channel), 0),
+ &op);
+}
+
static void shutdown_channel(channel_data *chand, int send_goaway,
int send_disconnect) {
shutdown_channel_args *sca;
@@ -687,6 +723,9 @@ static void init_channel_elem(grpc_channel_element *elem,
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
+ chand->connectivity_state = GRPC_CHANNEL_IDLE;
+ grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@@ -717,8 +756,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- server_start_transport_op,
- channel_op,
+ server_start_transport_stream_op,
+ grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
destroy_call_elem,
@@ -852,6 +891,7 @@ grpc_transport_setup_result grpc_server_setup_transport(
gpr_uint32 slots;
gpr_uint32 probes;
gpr_uint32 max_probes = 0;
+ grpc_transport_op op;
grpc_transport_setup_result result;
for (i = 0; i < s->channel_filter_count; i++) {
@@ -863,7 +903,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
filters[i] = &grpc_connected_channel_filter;
for (i = 0; i < s->cq_count; i++) {
- grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
+ memset(&op, 0, sizeof(op));
+ op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
+ grpc_transport_perform_op(transport, &op);
}
channel =
@@ -875,6 +917,14 @@ grpc_transport_setup_result grpc_server_setup_transport(
server_ref(s);
chand->channel = channel;
+ GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
+ memset(&op, 0, sizeof(op));
+ op.set_accept_stream = accept_stream;
+ op.set_accept_stream_user_data = chand;
+ op.on_connectivity_state_change = &chand->channel_connectivity_changed;
+ op.connectivity_state = &chand->connectivity_state;
+ grpc_transport_perform_op(transport, &op);
+
num_registered_methods = 0;
for (rm = s->registered_methods; rm; rm = rm->next) {
num_registered_methods++;