diff options
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r-- | src/core/surface/server.c | 251 |
1 files changed, 106 insertions, 145 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 819226278d..1e1cde3648 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -54,6 +54,7 @@ #include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/transport/metadata.h" +#include "src/core/transport/static_metadata.h" typedef struct listener { void *arg; @@ -84,18 +85,18 @@ typedef struct requested_call { grpc_completion_queue *cq_for_notification; grpc_call **call; grpc_cq_completion completion; + grpc_metadata_array *initial_metadata; union { struct { grpc_call_details *details; - grpc_metadata_array *initial_metadata; } batch; struct { registered_method *registered_method; gpr_timespec *deadline; - grpc_metadata_array *initial_metadata; grpc_byte_buffer **optional_payload; } registered; } data; + grpc_closure publish; } requested_call; typedef struct channel_registered_method { @@ -108,8 +109,6 @@ struct channel_data { grpc_server *server; grpc_connectivity_state connectivity_state; grpc_channel *channel; - grpc_mdstr *path_key; - grpc_mdstr *authority_key; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; @@ -150,16 +149,16 @@ struct call_data { grpc_mdstr *path; grpc_mdstr *host; gpr_timespec deadline; - int got_initial_metadata; grpc_completion_queue *cq_new; - grpc_stream_op_buffer *recv_ops; - grpc_stream_state *recv_state; - grpc_closure *on_done_recv; + grpc_metadata_batch *recv_initial_metadata; + grpc_metadata_array initial_metadata; - grpc_closure server_on_recv; + grpc_closure got_initial_metadata; + grpc_closure server_on_recv_initial_metadata; grpc_closure kill_zombie_closure; + grpc_closure *on_done_recv_initial_metadata; call_data *pending_next; }; @@ -396,7 +395,6 @@ static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd, int success) { channel_data *chand = cd; grpc_server *server = chand->server; - gpr_log(GPR_DEBUG, "finish_destroy_channel: %p", chand->channel); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server"); server_unref(exec_ctx, server); } @@ -559,91 +557,46 @@ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (md->key == chand->path_key) { + if (md->key == GRPC_MDSTR_PATH) { calld->path = GRPC_MDSTR_REF(md->value); return NULL; - } else if (md->key == chand->authority_key) { + } else if (md->key == GRPC_MDSTR_AUTHORITY) { calld->host = GRPC_MDSTR_REF(md->value); return NULL; } return md; } -static void server_on_recv(grpc_exec_ctx *exec_ctx, void *ptr, int success) { +static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, + int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; gpr_timespec op_deadline; - if (success && !calld->got_initial_metadata) { - size_t i; - size_t nops = calld->recv_ops->nops; - grpc_stream_op *ops = calld->recv_ops->ops; - for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - op_deadline = op->data.metadata.deadline; - if (0 != - gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { - calld->deadline = op->data.metadata.deadline; - } - if (calld->host && calld->path) { - calld->got_initial_metadata = 1; - start_new_rpc(exec_ctx, elem); - } - break; - } + grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem); + op_deadline = calld->recv_initial_metadata->deadline; + if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) { + calld->deadline = op_deadline; } - - switch (*calld->recv_state) { - case GRPC_STREAM_OPEN: - break; - case GRPC_STREAM_SEND_CLOSED: - break; - case GRPC_STREAM_RECV_CLOSED: - gpr_mu_lock(&calld->mu_state); - if (calld->state == NOT_STARTED) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); - } else { - gpr_mu_unlock(&calld->mu_state); - } - break; - case GRPC_STREAM_CLOSED: - gpr_mu_lock(&calld->mu_state); - if (calld->state == NOT_STARTED) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); - } else if (calld->state == PENDING) { - calld->state = ZOMBIED; - gpr_mu_unlock(&calld->mu_state); - /* zombied call will be destroyed when it's removed from the pending - queue... later */ - } else { - gpr_mu_unlock(&calld->mu_state); - } - break; + if (calld->host && calld->path) { + /* do nothing */ + } else { + success = 0; } - calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); + calld->on_done_recv_initial_metadata->cb( + exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success); } static void server_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; - if (op->recv_ops) { - /* substitute our callback for the higher callback */ - calld->recv_ops = op->recv_ops; - calld->recv_state = op->recv_state; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = &calld->server_on_recv; + if (op->recv_initial_metadata != NULL) { + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv_initial_metadata = op->on_complete; + op->on_complete = &calld->server_on_recv_initial_metadata; } } @@ -655,12 +608,48 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_next_op(exec_ctx, elem, op); } -static void accept_stream(void *cd, grpc_transport *transport, +static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, + int success) { + grpc_call_element *elem = ptr; + call_data *calld = elem->call_data; + if (success) { + start_new_rpc(exec_ctx, elem); + } else { + gpr_mu_lock(&calld->mu_state); + if (calld->state == NOT_STARTED) { + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); + grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1); + } else if (calld->state == PENDING) { + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + /* zombied call will be destroyed when it's removed from the pending + queue... later */ + } else { + gpr_mu_unlock(&calld->mu_state); + } + } +} + +static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, + grpc_transport *transport, const void *transport_server_data) { channel_data *chand = cd; /* create a call */ - grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL, - 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_call *call = + grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, + NULL, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_INITIAL_METADATA; + op.data.recv_initial_metadata = &calld->initial_metadata; + grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem); + grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, + &calld->got_initial_metadata); } static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, @@ -685,8 +674,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; memset(calld, 0, sizeof(call_data)); @@ -694,11 +682,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); - grpc_closure_init(&calld->server_on_recv, server_on_recv, elem); + grpc_closure_init(&calld->server_on_recv_initial_metadata, + server_on_recv_initial_metadata, elem); server_ref(chand->server); - - if (initial_op) server_mutate_op(elem, initial_op); } static void destroy_call_elem(grpc_exec_ctx *exec_ctx, @@ -714,6 +701,7 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, if (calld->path) { GRPC_MDSTR_UNREF(calld->path); } + grpc_metadata_array_destroy(&calld->initial_metadata); gpr_mu_destroy(&calld->mu_state); @@ -721,17 +709,13 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, } static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, - grpc_mdctx *metadata_context, int is_first, - int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; - GPR_ASSERT(is_first); - GPR_ASSERT(!is_last); + GPR_ASSERT(args->is_first); + GPR_ASSERT(!args->is_last); chand->server = NULL; chand->channel = NULL; - chand->path_key = grpc_mdstr_from_string(metadata_context, ":path"); - chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority"); chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; @@ -761,16 +745,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->next = chand->prev = chand; maybe_finish_shutdown(exec_ctx, chand->server); gpr_mu_unlock(&chand->server->mu_global); - GRPC_MDSTR_UNREF(chand->path_key); - GRPC_MDSTR_UNREF(chand->authority_key); server_unref(exec_ctx, chand->server); } } static const grpc_channel_filter server_surface_filter = { server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, - destroy_channel_elem, grpc_call_next_get_peer, "server", + init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, "server", }; void grpc_server_register_completion_queue(grpc_server *server, @@ -904,7 +887,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, grpc_channel_filter const **extra_filters, - size_t num_extra_filters, grpc_mdctx *mdctx, + size_t num_extra_filters, const grpc_channel_args *args) { size_t num_filters = s->channel_filter_count + num_extra_filters + 1; grpc_channel_filter const **filters = @@ -939,7 +922,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, } channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters, - num_filters, args, mdctx, 0); + num_filters, args, 0); chand = (channel_data *)grpc_channel_stack_element( grpc_channel_get_channel_stack(channel), 0)->channel_data; chand->server = s; @@ -958,8 +941,8 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, chand->registered_methods = gpr_malloc(alloc); memset(chand->registered_methods, 0, alloc); for (rm = s->registered_methods; rm; rm = rm->next) { - host = rm->host ? grpc_mdstr_from_string(mdctx, rm->host) : NULL; - method = grpc_mdstr_from_string(mdctx, rm->method); + host = rm->host ? grpc_mdstr_from_string(rm->host) : NULL; + method = grpc_mdstr_from_string(rm->method); hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash); for (probes = 0; chand->registered_methods[(hash + probes) % slots] .server_registered_method != NULL; @@ -1022,11 +1005,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server, GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3, (server, cq, tag)); - GRPC_SERVER_LOG_SHUTDOWN(GPR_INFO, server, cq, tag); - /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); - grpc_cq_begin_op(cq); + grpc_cq_begin_op(cq, tag); if (server->shutdown_published) { grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL, gpr_malloc(sizeof(grpc_cq_completion))); @@ -1187,18 +1168,15 @@ grpc_call_error grpc_server_request_call( GRPC_API_TRACE( "grpc_server_request_call(" "server=%p, call=%p, details=%p, initial_metadata=%p, " - "cq_bound_to_call=%p, cq_for_notification=%p, tag%p)", + "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)", 7, (server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag)); - GRPC_SERVER_LOG_REQUEST_CALL(GPR_INFO, server, call, details, - initial_metadata, cq_bound_to_call, - cq_for_notification, tag); if (!grpc_cq_is_server_cq(cq_for_notification)) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification); + grpc_cq_begin_op(cq_for_notification, tag); details->reserved = NULL; rc->type = BATCH_CALL; rc->server = server; @@ -1207,7 +1185,7 @@ grpc_call_error grpc_server_request_call( rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.batch.details = details; - rc->data.batch.initial_metadata = initial_metadata; + rc->initial_metadata = initial_metadata; error = queue_call_request(&exec_ctx, server, rc); done: grpc_exec_ctx_finish(&exec_ctx); @@ -1235,7 +1213,7 @@ grpc_call_error grpc_server_request_registered_call( error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; } - grpc_cq_begin_op(cq_for_notification); + grpc_cq_begin_op(cq_for_notification, tag); rc->type = REGISTERED_CALL; rc->server = server; rc->tag = tag; @@ -1244,7 +1222,7 @@ grpc_call_error grpc_server_request_registered_call( rc->call = call; rc->data.registered.registered_method = rm; rc->data.registered.deadline = deadline; - rc->data.registered.initial_metadata = initial_metadata; + rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; error = queue_call_request(&exec_ctx, server, rc); done: @@ -1253,12 +1231,7 @@ done: } static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, - grpc_call *call, int success, - void *tag); -static void publish_was_not_set(grpc_exec_ctx *exec_ctx, grpc_call *call, - int success, void *tag) { - abort(); -} + void *user_data, int success); static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { gpr_slice slice = value->slice; @@ -1273,9 +1246,10 @@ static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) { static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, call_data *calld, requested_call *rc) { - grpc_ioreq_completion_func publish = publish_was_not_set; - grpc_ioreq req[2]; - grpc_ioreq *r = req; + grpc_op ops[1]; + grpc_op *op = ops; + + memset(ops, 0, sizeof(ops)); /* called once initial metadata has been read by the call, but BEFORE the ioreq to fetch it out of the call has been executed. @@ -1284,8 +1258,10 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, an ioreq op, that should complete immediately. */ grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); + grpc_closure_init(&rc->publish, publish_registered_or_batch, rc); *rc->call = calld->call; calld->cq_new = rc->cq_for_notification; + GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata); switch (rc->type) { case BATCH_CALL: GPR_ASSERT(calld->host != NULL); @@ -1295,31 +1271,22 @@ static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server, cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); rc->data.batch.details->deadline = calld->deadline; - r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - r->data.recv_metadata = rc->data.batch.initial_metadata; - r->flags = 0; - r++; - publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; - r->data.recv_metadata = rc->data.registered.initial_metadata; - r->flags = 0; - r++; if (rc->data.registered.optional_payload) { - r->op = GRPC_IOREQ_RECV_MESSAGE; - r->data.recv_message = rc->data.registered.optional_payload; - r->flags = 0; - r++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = rc->data.registered.optional_payload; + op++; } - publish = publish_registered_or_batch; break; + default: + GPR_UNREACHABLE_CODE(return ); } GRPC_CALL_INTERNAL_REF(calld->call, "server"); - grpc_call_start_ioreq_and_call_back(exec_ctx, calld->call, req, - (size_t)(r - req), publish, rc); + grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops, + (size_t)(op - ops), &rc->publish); } static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, @@ -1342,25 +1309,19 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, requested_call *rc) { *rc->call = NULL; - switch (rc->type) { - case BATCH_CALL: - rc->data.batch.initial_metadata->count = 0; - break; - case REGISTERED_CALL: - rc->data.registered.initial_metadata->count = 0; - break; - } + rc->initial_metadata->count = 0; + server_ref(server); grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0, done_request_event, rc, &rc->completion); } -static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, - grpc_call *call, int success, - void *prc) { +static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc, + int success) { + requested_call *rc = prc; + grpc_call *call = *rc->call; grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - requested_call *rc = prc; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; server_ref(chand->server); |