aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r--src/core/surface/server.c153
1 files changed, 91 insertions, 62 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index e771929870..83caefcbc6 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -173,13 +173,19 @@ struct call_data {
grpc_call *call;
call_state state;
- gpr_timespec deadline;
grpc_mdstr *path;
grpc_mdstr *host;
+ gpr_timespec deadline;
+ int got_initial_metadata;
legacy_data *legacy;
grpc_completion_queue *cq_new;
+ grpc_stream_op_buffer *recv_ops;
+ grpc_stream_state *recv_state;
+ void (*on_done_recv)(void *user_data, int success);
+ void *recv_user_data;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -262,6 +268,7 @@ static void server_ref(grpc_server *server) {
static void server_unref(grpc_server *server) {
registered_method *rm;
+ size_t i;
if (gpr_unref(&server->internal_refcount)) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
@@ -275,6 +282,9 @@ static void server_unref(grpc_server *server) {
requested_call_array_destroy(&rm->requested);
gpr_free(rm);
}
+ for (i = 0; i < server->cq_count; i++) {
+ grpc_cq_internal_unref(server->cqs[i]);
+ }
gpr_free(server->cqs);
gpr_free(server->pollsets);
gpr_free(server->shutdown_tags);
@@ -371,46 +381,6 @@ static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
-static void stream_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- break;
- case PENDING:
- call_list_remove(calld, PENDING_START);
- /* fallthrough intended */
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
- grpc_call_stream_closed(elem);
-}
-
-static void read_closed(grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- gpr_mu_lock(&chand->server->mu);
- switch (calld->state) {
- case ACTIVATED:
- case PENDING:
- grpc_call_read_closed(elem);
- break;
- case NOT_STARTED:
- calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
- break;
- case ZOMBIED:
- break;
- }
- gpr_mu_unlock(&chand->server->mu);
-}
-
static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *chand = elem->channel_data;
@@ -425,33 +395,75 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
- grpc_call_op *op) {
+static void server_on_recv(void *ptr, int success) {
+ grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- switch (op->type) {
- case GRPC_RECV_METADATA:
+ channel_data *chand = elem->channel_data;
+
+ if (success && !calld->got_initial_metadata) {
+ size_t i;
+ size_t nops = calld->recv_ops->nops;
+ grpc_stream_op *ops = calld->recv_ops->ops;
+ for (i = 0; i < nops; i++) {
+ grpc_stream_op *op = &ops[i];
+ if (op->type != GRPC_OP_METADATA) continue;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
- if (grpc_call_recv_metadata(elem, &op->data.metadata)) {
+ if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) {
calld->deadline = op->data.metadata.deadline;
- start_new_rpc(elem);
}
+ calld->got_initial_metadata = 1;
+ start_new_rpc(elem);
break;
- case GRPC_RECV_MESSAGE:
- grpc_call_recv_message(elem, op->data.message);
- op->done_cb(op->user_data, GRPC_OP_OK);
+ }
+ }
+
+ switch (*calld->recv_state) {
+ case GRPC_STREAM_OPEN:
break;
- case GRPC_RECV_HALF_CLOSE:
- read_closed(elem);
+ case GRPC_STREAM_SEND_CLOSED:
break;
- case GRPC_RECV_FINISH:
- stream_closed(elem);
+ case GRPC_STREAM_RECV_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
- default:
- GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
- grpc_call_next_op(elem, op);
+ case GRPC_STREAM_CLOSED:
+ gpr_mu_lock(&chand->server->mu);
+ if (calld->state == NOT_STARTED) {
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
+ } else if (calld->state == PENDING) {
+ call_list_remove(calld, PENDING_START);
+ }
+ gpr_mu_unlock(&chand->server->mu);
break;
}
+
+ calld->on_done_recv(calld->recv_user_data, success);
+}
+
+static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) {
+ call_data *calld = elem->call_data;
+
+ if (op->recv_ops) {
+ /* substitute our callback for the higher callback */
+ calld->recv_ops = op->recv_ops;
+ calld->recv_state = op->recv_state;
+ calld->on_done_recv = op->on_done_recv;
+ calld->recv_user_data = op->recv_user_data;
+ op->on_done_recv = server_on_recv;
+ op->recv_user_data = elem;
+ }
+}
+
+static void server_start_transport_op(grpc_call_element *elem,
+ grpc_transport_op *op) {
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
+ server_mutate_op(elem, op);
+ grpc_call_next_op(elem, op);
}
static void channel_op(grpc_channel_element *elem,
@@ -502,7 +514,8 @@ static void shutdown_channel(channel_data *chand) {
}
static void init_call_elem(grpc_call_element *elem,
- const void *server_transport_data) {
+ const void *server_transport_data,
+ grpc_transport_op *initial_op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -514,6 +527,8 @@ static void init_call_elem(grpc_call_element *elem,
gpr_mu_unlock(&chand->server->mu);
server_ref(chand->server);
+
+ if (initial_op) server_mutate_op(elem, initial_op);
}
static void destroy_call_elem(grpc_call_element *elem) {
@@ -592,8 +607,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
static const grpc_channel_filter server_surface_filter = {
- call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
+ server_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
+ destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, "server",
};
static void addcq(grpc_server *server, grpc_completion_queue *cq) {
@@ -601,6 +617,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) {
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
+ grpc_cq_internal_ref(cq);
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs,
server->cq_count * sizeof(grpc_completion_queue *));
@@ -913,6 +930,8 @@ void grpc_server_destroy(grpc_server *server) {
channel_data *c;
listener *l;
size_t i;
+ call_data *calld;
+
gpr_mu_lock(&server->mu);
if (!server->shutdown) {
gpr_mu_unlock(&server->mu);
@@ -937,6 +956,15 @@ void grpc_server_destroy(grpc_server *server) {
gpr_free(l);
}
+ while ((calld = call_list_remove_head(&server->lists[PENDING_START],
+ PENDING_START)) != NULL) {
+ gpr_log(GPR_DEBUG, "server destroys call %p", calld->call);
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(
+ kill_zombie,
+ grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
+ }
+
for (c = server->root_channel_data.next; c != &server->root_channel_data;
c = c->next) {
shutdown_channel(c);
@@ -1083,6 +1111,7 @@ static void begin_call(grpc_server *server, call_data *calld,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,
&rc->data.batch.details->method_capacity, calld->path);
+ rc->data.batch.details->deadline = calld->deadline;
grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind);
*rc->data.batch.call = calld->call;
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
@@ -1108,7 +1137,7 @@ static void begin_call(grpc_server *server, call_data *calld,
break;
}
- grpc_call_internal_ref(calld->call);
+ GRPC_CALL_INTERNAL_REF(calld->call, "server");
grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish,
rc->tag);
}