diff options
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r-- | src/core/surface/server.c | 153 |
1 files changed, 91 insertions, 62 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index e771929870..83caefcbc6 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -173,13 +173,19 @@ struct call_data { grpc_call *call; call_state state; - gpr_timespec deadline; grpc_mdstr *path; grpc_mdstr *host; + gpr_timespec deadline; + int got_initial_metadata; legacy_data *legacy; grpc_completion_queue *cq_new; + grpc_stream_op_buffer *recv_ops; + grpc_stream_state *recv_state; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; + call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; }; @@ -262,6 +268,7 @@ static void server_ref(grpc_server *server) { static void server_unref(grpc_server *server) { registered_method *rm; + size_t i; if (gpr_unref(&server->internal_refcount)) { grpc_channel_args_destroy(server->channel_args); gpr_mu_destroy(&server->mu); @@ -275,6 +282,9 @@ static void server_unref(grpc_server *server) { requested_call_array_destroy(&rm->requested); gpr_free(rm); } + for (i = 0; i < server->cq_count; i++) { + grpc_cq_internal_unref(server->cqs[i]); + } gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -371,46 +381,6 @@ static void kill_zombie(void *elem, int success) { grpc_call_destroy(grpc_call_from_top_element(elem)); } -static void stream_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - break; - case PENDING: - call_list_remove(calld, PENDING_START); - /* fallthrough intended */ - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); - grpc_call_stream_closed(elem); -} - -static void read_closed(grpc_call_element *elem) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->server->mu); - switch (calld->state) { - case ACTIVATED: - case PENDING: - grpc_call_read_closed(elem); - break; - case NOT_STARTED: - calld->state = ZOMBIED; - grpc_iomgr_add_callback(kill_zombie, elem); - break; - case ZOMBIED: - break; - } - gpr_mu_unlock(&chand->server->mu); -} - static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *chand = elem->channel_data; @@ -425,33 +395,75 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { return md; } -static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn, - grpc_call_op *op) { +static void server_on_recv(void *ptr, int success) { + grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_RECV_METADATA: + channel_data *chand = elem->channel_data; + + if (success && !calld->got_initial_metadata) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (grpc_call_recv_metadata(elem, &op->data.metadata)) { + if (0 != gpr_time_cmp(op->data.metadata.deadline, gpr_inf_future)) { calld->deadline = op->data.metadata.deadline; - start_new_rpc(elem); } + calld->got_initial_metadata = 1; + start_new_rpc(elem); break; - case GRPC_RECV_MESSAGE: - grpc_call_recv_message(elem, op->data.message); - op->done_cb(op->user_data, GRPC_OP_OK); + } + } + + switch (*calld->recv_state) { + case GRPC_STREAM_OPEN: break; - case GRPC_RECV_HALF_CLOSE: - read_closed(elem); + case GRPC_STREAM_SEND_CLOSED: break; - case GRPC_RECV_FINISH: - stream_closed(elem); + case GRPC_STREAM_RECV_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } + gpr_mu_unlock(&chand->server->mu); break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_DOWN); - grpc_call_next_op(elem, op); + case GRPC_STREAM_CLOSED: + gpr_mu_lock(&chand->server->mu); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); + } else if (calld->state == PENDING) { + call_list_remove(calld, PENDING_START); + } + gpr_mu_unlock(&chand->server->mu); break; } + + calld->on_done_recv(calld->recv_user_data, success); +} + +static void server_mutate_op(grpc_call_element *elem, grpc_transport_op *op) { + call_data *calld = elem->call_data; + + if (op->recv_ops) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->recv_state = op->recv_state; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = server_on_recv; + op->recv_user_data = elem; + } +} + +static void server_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + server_mutate_op(elem, op); + grpc_call_next_op(elem, op); } static void channel_op(grpc_channel_element *elem, @@ -502,7 +514,8 @@ static void shutdown_channel(channel_data *chand) { } static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { + const void *server_transport_data, + grpc_transport_op *initial_op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -514,6 +527,8 @@ static void init_call_elem(grpc_call_element *elem, gpr_mu_unlock(&chand->server->mu); server_ref(chand->server); + + if (initial_op) server_mutate_op(elem, initial_op); } static void destroy_call_elem(grpc_call_element *elem) { @@ -592,8 +607,9 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } static const grpc_channel_filter server_surface_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server", + server_start_transport_op, channel_op, sizeof(call_data), init_call_elem, + destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, "server", }; static void addcq(grpc_server *server, grpc_completion_queue *cq) { @@ -601,6 +617,7 @@ static void addcq(grpc_server *server, grpc_completion_queue *cq) { for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } + grpc_cq_internal_ref(cq); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); @@ -913,6 +930,8 @@ void grpc_server_destroy(grpc_server *server) { channel_data *c; listener *l; size_t i; + call_data *calld; + gpr_mu_lock(&server->mu); if (!server->shutdown) { gpr_mu_unlock(&server->mu); @@ -937,6 +956,15 @@ void grpc_server_destroy(grpc_server *server) { gpr_free(l); } + while ((calld = call_list_remove_head(&server->lists[PENDING_START], + PENDING_START)) != NULL) { + gpr_log(GPR_DEBUG, "server destroys call %p", calld->call); + calld->state = ZOMBIED; + grpc_iomgr_add_callback( + kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + } + for (c = server->root_channel_data.next; c != &server->root_channel_data; c = c->next) { shutdown_channel(c); @@ -1083,6 +1111,7 @@ static void begin_call(grpc_server *server, call_data *calld, &rc->data.batch.details->host_capacity, calld->host); cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); + rc->data.batch.details->deadline = calld->deadline; grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; @@ -1108,7 +1137,7 @@ static void begin_call(grpc_server *server, call_data *calld, break; } - grpc_call_internal_ref(calld->call); + GRPC_CALL_INTERNAL_REF(calld->call, "server"); grpc_call_start_ioreq_and_call_back(calld->call, req, r - req, publish, rc->tag); } |