diff options
Diffstat (limited to 'src/core/surface/server.c')
-rw-r--r-- | src/core/surface/server.c | 609 |
1 files changed, 609 insertions, 0 deletions
diff --git a/src/core/surface/server.c b/src/core/surface/server.c new file mode 100644 index 0000000000..99d66ffb2d --- /dev/null +++ b/src/core/surface/server.c @@ -0,0 +1,609 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/surface/server.h" + +#include <stdlib.h> +#include <string.h> + +#include "src/core/channel/census_filter.h" +#include "src/core/channel/channel_args.h" +#include "src/core/channel/connected_channel.h" +#include "src/core/surface/call.h" +#include "src/core/surface/channel.h" +#include "src/core/surface/completion_queue.h" +#include "src/core/surface/surface_em.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string.h> +#include <grpc/support/useful.h> + +typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; + +typedef struct listener { + void *arg; + void (*start)(grpc_server *server, void *arg); + void (*destroy)(grpc_server *server, void *arg); + struct listener *next; +} listener; + +typedef struct call_data call_data; +typedef struct channel_data channel_data; + +struct channel_data { + grpc_server *server; + grpc_channel *channel; + /* linked list of all channels on a server */ + channel_data *next; + channel_data *prev; +}; + +struct grpc_server { + size_t channel_filter_count; + const grpc_channel_filter **channel_filters; + grpc_channel_args *channel_args; + grpc_completion_queue *cq; + grpc_em *em; + + gpr_mu mu; + + void **tags; + size_t ntags; + size_t tag_cap; + + gpr_uint8 shutdown; + + call_data *lists[CALL_LIST_COUNT]; + channel_data root_channel_data; + + listener *listeners; + gpr_refcount internal_refcount; +}; + +typedef struct { + call_data *next; + call_data *prev; +} call_link; + +typedef enum { + /* waiting for metadata */ + NOT_STARTED, + /* inital metadata read, not flow controlled in yet */ + PENDING, + /* flow controlled in, on completion queue */ + ACTIVATED, + /* cancelled before being queued */ + ZOMBIED +} call_state; + +struct call_data { + grpc_call *call; + + call_state state; + gpr_timespec deadline; + + gpr_uint8 included[CALL_LIST_COUNT]; + call_link links[CALL_LIST_COUNT]; +}; + +#define SERVER_FROM_CALL_ELEM(elem) \ + (((channel_data *)(elem)->channel_data)->server) + +static void do_nothing(void *unused, grpc_op_error ignored) {} + +static int call_list_join(grpc_server *server, call_data *call, + call_list list) { + if (call->included[list]) return 0; + call->included[list] = 1; + if (!server->lists[list]) { + server->lists[list] = call; + call->links[list].next = call->links[list].prev = call; + } else { + call->links[list].next = server->lists[list]; + call->links[list].prev = server->lists[list]->links[list].prev; + call->links[list].next->links[list].prev = + call->links[list].prev->links[list].next = call; + } + return 1; +} + +static call_data *call_list_remove_head(grpc_server *server, call_list list) { + call_data *out = server->lists[list]; + if (out) { + out->included[list] = 0; + if (out->links[list].next == out) { + server->lists[list] = NULL; + } else { + server->lists[list] = out->links[list].next; + out->links[list].next->links[list].prev = out->links[list].prev; + out->links[list].prev->links[list].next = out->links[list].next; + } + } + return out; +} + +static int call_list_remove(grpc_server *server, call_data *call, + call_list list) { + if (!call->included[list]) return 0; + call->included[list] = 0; + if (server->lists[list] == call) { + server->lists[list] = call->links[list].next; + if (server->lists[list] == call) { + server->lists[list] = NULL; + return 1; + } + } + GPR_ASSERT(server->lists[list] != call); + call->links[list].next->links[list].prev = call->links[list].prev; + call->links[list].prev->links[list].next = call->links[list].next; + return 1; +} + +static void server_ref(grpc_server *server) { + gpr_ref(&server->internal_refcount); +} + +static void server_unref(grpc_server *server) { + if (gpr_unref(&server->internal_refcount)) { + grpc_channel_args_destroy(server->channel_args); + gpr_mu_destroy(&server->mu); + gpr_free(server->channel_filters); + gpr_free(server->tags); + gpr_free(server); + } +} + +static int is_channel_orphaned(channel_data *chand) { + return chand->next == chand; +} + +static void orphan_channel(channel_data *chand) { + chand->next->prev = chand->prev; + chand->prev->next = chand->next; + chand->next = chand->prev = chand; +} + +static void finish_destroy_channel(void *cd, grpc_em_cb_status status) { + channel_data *chand = cd; + grpc_server *server = chand->server; + /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ + grpc_channel_destroy(chand->channel); + server_unref(server); +} + +static void destroy_channel(channel_data *chand) { + if (is_channel_orphaned(chand)) return; + GPR_ASSERT(chand->server != NULL); + orphan_channel(chand); + server_ref(chand->server); + grpc_em_add_callback(chand->server->em, finish_destroy_channel, chand); +} + +static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) { + grpc_call *call = calld->call; + grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call); + size_t count = grpc_metadata_buffer_count(mdbuf); + grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf); + const char *host = NULL; + const char *method = NULL; + size_t i; + grpc_metadata status_md; + + for (i = 0; i < count; i++) { + if (0 == strcmp(elements[i].key, ":authority")) { + host = elements[i].value; + } else if (0 == strcmp(elements[i].key, ":path")) { + method = elements[i].value; + } + } + + status_md.key = ":status"; + status_md.value = "200"; + status_md.value_length = 3; + grpc_call_add_metadata(call, &status_md, GRPC_WRITE_BUFFER_HINT); + + grpc_call_internal_ref(call); + grpc_cq_end_new_rpc(server->cq, tag, call, + grpc_metadata_buffer_cleanup_elements, elements, method, + host, calld->deadline, count, elements); +} + +static void start_new_rpc(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + grpc_server *server = chand->server; + + gpr_mu_lock(&server->mu); + if (server->ntags) { + calld->state = ACTIVATED; + queue_new_rpc(server, calld, server->tags[--server->ntags]); + } else { + calld->state = PENDING; + call_list_join(server, calld, PENDING_START); + } + gpr_mu_unlock(&server->mu); +} + +static void kill_zombie(void *elem, grpc_em_cb_status status) { + grpc_call_destroy(grpc_call_from_top_element(elem)); +} + +static void finish_rpc(grpc_call_element *elem, int is_full_close) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->server->mu); + switch (calld->state) { + case ACTIVATED: + grpc_call_recv_finish(elem, is_full_close); + break; + case PENDING: + if (!is_full_close) { + grpc_call_recv_finish(elem, is_full_close); + break; + } + call_list_remove(chand->server, calld, PENDING_START); + /* fallthrough intended */ + case NOT_STARTED: + calld->state = ZOMBIED; + grpc_em_add_callback(chand->server->em, kill_zombie, elem); + break; + case ZOMBIED: + break; + } + gpr_mu_unlock(&chand->server->mu); +} + +static void call_op(grpc_call_element *elem, grpc_call_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + switch (op->type) { + case GRPC_RECV_METADATA: + grpc_call_recv_metadata(elem, op); + break; + case GRPC_RECV_END_OF_INITIAL_METADATA: + start_new_rpc(elem); + break; + case GRPC_RECV_MESSAGE: + grpc_call_recv_message(elem, op->data.message, op->done_cb, + op->user_data); + break; + case GRPC_RECV_HALF_CLOSE: + finish_rpc(elem, 0); + break; + case GRPC_RECV_FINISH: + finish_rpc(elem, 1); + break; + case GRPC_RECV_DEADLINE: + grpc_call_set_deadline(elem, op->data.deadline); + ((call_data *)elem->call_data)->deadline = op->data.deadline; + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); + grpc_call_next_op(elem, op); + break; + } +} + +static void channel_op(grpc_channel_element *elem, grpc_channel_op *op) { + channel_data *chand = elem->channel_data; + + switch (op->type) { + case GRPC_ACCEPT_CALL: + /* create a call */ + grpc_call_create(chand->channel, + op->data.accept_call.transport_server_data); + break; + case GRPC_TRANSPORT_CLOSED: + /* if the transport is closed for a server channel, we destroy the + channel */ + gpr_mu_lock(&chand->server->mu); + server_ref(chand->server); + destroy_channel(chand); + gpr_mu_unlock(&chand->server->mu); + server_unref(chand->server); + break; + default: + GPR_ASSERT(op->dir == GRPC_CALL_DOWN); + grpc_channel_next_op(elem, op); + break; + } +} + +static void finish_shutdown_channel(void *cd, grpc_em_cb_status status) { + channel_data *chand = cd; + grpc_channel_op op; + op.type = GRPC_CHANNEL_SHUTDOWN; + op.dir = GRPC_CALL_DOWN; + channel_op(grpc_channel_stack_element( + grpc_channel_get_channel_stack(chand->channel), 0), + &op); + grpc_channel_internal_unref(chand->channel); +} + +static void shutdown_channel(channel_data *chand) { + grpc_channel_internal_ref(chand->channel); + grpc_em_add_callback(chand->server->em, finish_shutdown_channel, chand); +} + +static void init_call_elem(grpc_call_element *elem, + const void *server_transport_data) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + memset(calld, 0, sizeof(call_data)); + calld->deadline = gpr_inf_future; + calld->call = grpc_call_from_top_element(elem); + + gpr_mu_lock(&chand->server->mu); + call_list_join(chand->server, calld, ALL_CALLS); + gpr_mu_unlock(&chand->server->mu); + + server_ref(chand->server); +} + +static void destroy_call_elem(grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + int i; + + gpr_mu_lock(&chand->server->mu); + for (i = 0; i < CALL_LIST_COUNT; i++) { + call_list_remove(chand->server, elem->call_data, i); + } + gpr_mu_unlock(&chand->server->mu); + + server_unref(chand->server); +} + +static void init_channel_elem(grpc_channel_element *elem, + const grpc_channel_args *args, + grpc_mdctx *metadata_context, int is_first, + int is_last) { + channel_data *chand = elem->channel_data; + GPR_ASSERT(is_first); + GPR_ASSERT(!is_last); + chand->server = NULL; + chand->channel = NULL; + chand->next = chand->prev = chand; +} + +static void destroy_channel_elem(grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + if (chand->server) { + gpr_mu_lock(&chand->server->mu); + chand->next->prev = chand->prev; + chand->prev->next = chand->next; + chand->next = chand->prev = chand; + gpr_mu_unlock(&chand->server->mu); + server_unref(chand->server); + } +} + +static const grpc_channel_filter server_surface_filter = { + call_op, channel_op, + + sizeof(call_data), init_call_elem, destroy_call_elem, + + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + + "server", +}; + +static void early_terminate_requested_calls(grpc_completion_queue *cq, + void **tags, size_t ntags) { + size_t i; + + for (i = 0; i < ntags; i++) { + grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL, + gpr_inf_past, 0, NULL); + } +} + +grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, + grpc_channel_filter **filters, + size_t filter_count, + const grpc_channel_args *args) { + size_t i; + int census_enabled = grpc_channel_args_is_census_enabled(args); + + grpc_server *server = gpr_malloc(sizeof(grpc_server)); + memset(server, 0, sizeof(grpc_server)); + + gpr_mu_init(&server->mu); + + server->cq = cq; + server->em = grpc_surface_em(); + /* decremented by grpc_server_destroy */ + gpr_ref_init(&server->internal_refcount, 1); + server->root_channel_data.next = server->root_channel_data.prev = + &server->root_channel_data; + + /* Server filter stack is: + + server_surface_filter - for making surface API calls + grpc_server_census_filter (optional) - for stats collection and tracing + {passed in filter stack} + grpc_connected_channel_filter - for interfacing with transports */ + server->channel_filter_count = filter_count + 1 + census_enabled; + server->channel_filters = + gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *)); + server->channel_filters[0] = &server_surface_filter; + if (census_enabled) { + server->channel_filters[1] = &grpc_server_census_filter; + } + for (i = 0; i < filter_count; i++) { + server->channel_filters[i + 1 + census_enabled] = filters[i]; + } + + server->channel_args = grpc_channel_args_copy(args); + + return server; +} + +void grpc_server_start(grpc_server *server) { + listener *l; + + for (l = server->listeners; l; l = l->next) { + l->start(server, l->arg); + } +} + +grpc_transport_setup_result grpc_server_setup_transport( + grpc_server *s, grpc_transport *transport, + grpc_channel_filter const **extra_filters, size_t num_extra_filters, + grpc_mdctx *mdctx) { + size_t num_filters = s->channel_filter_count + num_extra_filters + 1; + grpc_channel_filter const **filters = + gpr_malloc(sizeof(grpc_channel_filter *) * num_filters); + size_t i; + grpc_channel *channel; + channel_data *chand; + + for (i = 0; i < s->channel_filter_count; i++) { + filters[i] = s->channel_filters[i]; + } + for (; i < s->channel_filter_count + num_extra_filters; i++) { + filters[i] = extra_filters[i - s->channel_filter_count]; + } + filters[i] = &grpc_connected_channel_filter; + + channel = grpc_channel_create_from_filters(filters, num_filters, + s->channel_args, mdctx, 0); + chand = (channel_data *)grpc_channel_stack_element( + grpc_channel_get_channel_stack(channel), 0)->channel_data; + chand->server = s; + server_ref(s); + chand->channel = channel; + + gpr_mu_lock(&s->mu); + chand->next = &s->root_channel_data; + chand->prev = chand->next->prev; + chand->next->prev = chand->prev->next = chand; + gpr_mu_unlock(&s->mu); + + gpr_free(filters); + + return grpc_connected_channel_bind_transport( + grpc_channel_get_channel_stack(channel), transport); +} + +void grpc_server_shutdown(grpc_server *server) { + /* TODO(ctiller): send goaway, etc */ + listener *l; + void **tags; + size_t ntags; + + /* lock, and gather up some stuff to do */ + gpr_mu_lock(&server->mu); + if (server->shutdown) { + gpr_mu_unlock(&server->mu); + return; + } + + tags = server->tags; + ntags = server->ntags; + server->tags = NULL; + server->ntags = 0; + + server->shutdown = 1; + gpr_mu_unlock(&server->mu); + + /* terminate all the requested calls */ + early_terminate_requested_calls(server->cq, tags, ntags); + gpr_free(tags); + + /* Shutdown listeners */ + for (l = server->listeners; l; l = l->next) { + l->destroy(server, l->arg); + } + while (server->listeners) { + l = server->listeners; + server->listeners = l->next; + gpr_free(l); + } +} + +void grpc_server_destroy(grpc_server *server) { + channel_data *c; + gpr_mu_lock(&server->mu); + for (c = server->root_channel_data.next; c != &server->root_channel_data; + c = c->next) { + shutdown_channel(c); + } + gpr_mu_unlock(&server->mu); + + server_unref(server); +} + +void grpc_server_add_listener(grpc_server *server, void *arg, + void (*start)(grpc_server *server, void *arg), + void (*destroy)(grpc_server *server, void *arg)) { + listener *l = gpr_malloc(sizeof(listener)); + l->arg = arg; + l->start = start; + l->destroy = destroy; + l->next = server->listeners; + server->listeners = l; +} + +grpc_call_error grpc_server_request_call(grpc_server *server, void *tag_new) { + call_data *calld; + + grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW); + + gpr_mu_lock(&server->mu); + + if (server->shutdown) { + gpr_mu_unlock(&server->mu); + early_terminate_requested_calls(server->cq, &tag_new, 1); + return GRPC_CALL_OK; + } + + calld = call_list_remove_head(server, PENDING_START); + if (calld) { + GPR_ASSERT(calld->state == PENDING); + calld->state = ACTIVATED; + queue_new_rpc(server, calld, tag_new); + } else { + if (server->tag_cap == server->ntags) { + server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1); + server->tags = + gpr_realloc(server->tags, sizeof(void *) * server->tag_cap); + } + server->tags[server->ntags++] = tag_new; + } + gpr_mu_unlock(&server->mu); + + return GRPC_CALL_OK; +} + +const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { + return server->channel_args; +} |