diff options
author | Vijay Pai <vpai@google.com> | 2015-07-17 16:05:28 -0700 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2015-07-17 16:05:28 -0700 |
commit | f87a0984ab727e95b068237f3bb0689d9685c8ea (patch) | |
tree | 3086bee99a5e7ec12ddcd9de1e69f8a63182ef3b /src | |
parent | e9881bbaf3d53aa80099c42c80fb3331ff38270a (diff) | |
parent | af26143b6fc76559f0907aadfc9f23c4e2e60844 (diff) |
Merge pull request #2470 from ctiller/i-want-to-wait-free
Lock free requested call fulfillment path
Diffstat (limited to 'src')
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 17 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 15 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 39 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 6 | ||||
-rw-r--r-- | src/core/support/stack_lockfree.c | 8 | ||||
-rw-r--r-- | src/core/support/stack_lockfree.h | 3 | ||||
-rw-r--r-- | src/core/surface/server.c | 405 |
7 files changed, 305 insertions, 188 deletions
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 3746c8edaf..d697b59e4c 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -50,12 +50,17 @@ typedef struct { } pollset_hdr; static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; grpc_fd_watcher watcher; + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* We pretend to be polling whilst adding an fd to keep the fd from being closed during the add. This may result in a spurious wakeup being assigned to this pollset whilst adding, but that should be benign. */ @@ -76,9 +81,15 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; int err; + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* Note that this can race with concurrent poll, but that should be fine since * at worst it creates a spurious read event on a reused grpc_fd object. */ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); @@ -183,7 +194,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, abort(); } for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(pollset, fds[i]); + multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0); } grpc_wakeup_fd_create(&h->wakeup_fd); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 7b717bd159..0084e83953 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -66,12 +66,13 @@ typedef struct { } pollset_hdr; static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { size_t i; pollset_hdr *h = pollset->data.ptr; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) return; + if (h->fds[i] == fd) goto exit; } if (h->fd_count == h->fd_capacity) { h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); @@ -79,10 +80,15 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, } h->fds[h->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { /* will get removed next poll cycle */ pollset_hdr *h = pollset->data.ptr; if (h->del_count == h->del_capacity) { @@ -91,6 +97,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, } h->dels[h->del_count++] = fd; GRPC_FD_REF(fd, "multipoller_del"); + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void end_polling(grpc_pollset *pollset) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 7afff58eee..efb301d81c 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -105,14 +105,28 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd); + pollset->vtable->add_fd(pollset, fd, 1); + /* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG + gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); +#endif } void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd); + pollset->vtable->del_fd(pollset, fd, 1); + /* the following (enabled only in debug) will reacquire and then release + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG + gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); +#endif } static void finish_shutdown(grpc_pollset *pollset) { @@ -257,7 +271,7 @@ static void basic_do_promote(void *args, int success) { } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd); + pollset->vtable->add_fd(pollset, fd, 0); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; @@ -287,10 +301,11 @@ static void basic_do_promote(void *args, int success) { GRPC_FD_UNREF(fd, "basicpoll_add"); } -static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); - if (fd == pollset->data.ptr) return; + if (fd == pollset->data.ptr) goto exit; if (!pollset->counter) { /* Fast path -- no in flight cbs */ @@ -313,7 +328,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { pollset->data.ptr = fd; GRPC_FD_REF(fd, "basicpoll"); } - return; + goto exit; } /* Now we need to promote. This needs to happen when we're not polling. Since @@ -329,14 +344,24 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { grpc_iomgr_add_callback(&up_args->promotion_closure); grpc_pollset_kick(pollset); + +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } -static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); pollset->data.ptr = NULL; } + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void basic_pollset_maybe_work(grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 53585a2886..37de1276d1 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -66,8 +66,10 @@ typedef struct grpc_pollset { } grpc_pollset; struct grpc_pollset_vtable { - void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); - void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); + void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback); void (*kick)(grpc_pollset *pollset); diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index 9497efbfb5..2844330379 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -65,8 +65,9 @@ typedef union lockfree_node { } lockfree_node; #define ENTRY_ALIGNMENT_BITS 3 /* make sure that entries aligned to 8-bytes */ -#define INVALID_ENTRY_INDEX ((1 << 16) - 1) /* reserve this entry as invalid \ - */ +#define INVALID_ENTRY_INDEX \ + ((1 << 16) - 1) /* reserve this entry as invalid \ + */ struct gpr_stack_lockfree { lockfree_node *entries; @@ -96,7 +97,7 @@ void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { gpr_free(stack); } -void gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { +int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { lockfree_node head; lockfree_node newhead; @@ -112,6 +113,7 @@ void gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { stack->entries[entry].contents.index = head.contents.index; } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); /* Use rel_cas above to make sure that entry index is set properly */ + return head.contents.index == INVALID_ENTRY_INDEX; } int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { diff --git a/src/core/support/stack_lockfree.h b/src/core/support/stack_lockfree.h index 0bcf73635d..eec960fbb0 100644 --- a/src/core/support/stack_lockfree.h +++ b/src/core/support/stack_lockfree.h @@ -42,7 +42,8 @@ gpr_stack_lockfree* gpr_stack_lockfree_create(int entries); void gpr_stack_lockfree_destroy(gpr_stack_lockfree* stack); /* Pass in a valid entry number for the next stack entry */ -void gpr_stack_lockfree_push(gpr_stack_lockfree* stack, int entry); +/* Returns 1 if this is the first element on the stack, 0 otherwise */ +int gpr_stack_lockfree_push(gpr_stack_lockfree*, int entry); /* Returns -1 on empty or the actual entry number */ int gpr_stack_lockfree_pop(gpr_stack_lockfree* stack); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 37f0c3c005..fa120088e1 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -36,22 +36,22 @@ #include <stdlib.h> #include <string.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + #include "src/core/channel/census_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/support/stack_lockfree.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/transport/metadata.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/useful.h> - -typedef enum { PENDING_START, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; @@ -74,8 +74,8 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; - struct requested_call *next; void *tag; + grpc_server *server; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; @@ -94,14 +94,6 @@ typedef struct requested_call { } data; } requested_call; -struct registered_method { - char *method; - char *host; - call_data *pending; - requested_call *requests; - registered_method *next; -}; - typedef struct channel_registered_method { registered_method *server_registered_method; grpc_mdstr *method; @@ -130,44 +122,6 @@ typedef struct shutdown_tag { grpc_cq_completion completion; } shutdown_tag; -struct grpc_server { - size_t channel_filter_count; - const grpc_channel_filter **channel_filters; - grpc_channel_args *channel_args; - - grpc_completion_queue **cqs; - grpc_pollset **pollsets; - size_t cq_count; - - /* The two following mutexes control access to server-state - mu_global controls access to non-call-related state (e.g., channel state) - mu_call controls access to call-related state (e.g., the call lists) - - If they are ever required to be nested, you must lock mu_global - before mu_call. This is currently used in shutdown processing - (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ - gpr_mu mu_global; /* mutex for server and channel state */ - gpr_mu mu_call; /* mutex for call-specific state */ - - registered_method *registered_methods; - requested_call *requests; - - gpr_uint8 shutdown; - gpr_uint8 shutdown_published; - size_t num_shutdown_tags; - shutdown_tag *shutdown_tags; - - call_data *lists[CALL_LIST_COUNT]; - channel_data root_channel_data; - - listener *listeners; - int listeners_destroyed; - gpr_refcount internal_refcount; - - /** when did we print the last shutdown progress message */ - gpr_timespec last_shutdown_message_time; -}; - typedef enum { /* waiting for metadata */ NOT_STARTED, @@ -179,6 +133,8 @@ typedef enum { ZOMBIED } call_state; +typedef struct request_matcher request_matcher; + struct call_data { grpc_call *call; @@ -201,8 +157,20 @@ struct call_data { grpc_iomgr_closure server_on_recv; grpc_iomgr_closure kill_zombie_closure; - call_data **root[CALL_LIST_COUNT]; - call_link links[CALL_LIST_COUNT]; + call_data *pending_next; +}; + +struct request_matcher { + call_data *pending_head; + call_data *pending_tail; + gpr_stack_lockfree *requests; +}; + +struct registered_method { + char *method; + char *host; + request_matcher request_matcher; + registered_method *next; }; typedef struct { @@ -210,6 +178,48 @@ typedef struct { size_t num_channels; } channel_broadcaster; +struct grpc_server { + size_t channel_filter_count; + const grpc_channel_filter **channel_filters; + grpc_channel_args *channel_args; + + grpc_completion_queue **cqs; + grpc_pollset **pollsets; + size_t cq_count; + + /* The two following mutexes control access to server-state + mu_global controls access to non-call-related state (e.g., channel state) + mu_call controls access to call-related state (e.g., the call lists) + + If they are ever required to be nested, you must lock mu_global + before mu_call. This is currently used in shutdown processing + (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ + gpr_mu mu_global; /* mutex for server and channel state */ + gpr_mu mu_call; /* mutex for call-specific state */ + + registered_method *registered_methods; + request_matcher unregistered_request_matcher; + /** free list of available requested_calls indices */ + gpr_stack_lockfree *request_freelist; + /** requested call backing data */ + requested_call *requested_calls; + int max_requested_calls; + + gpr_atm shutdown_flag; + gpr_uint8 shutdown_published; + size_t num_shutdown_tags; + shutdown_tag *shutdown_tags; + + channel_data root_channel_data; + + listener *listeners; + int listeners_destroyed; + gpr_refcount internal_refcount; + + /** when did we print the last shutdown progress message */ + gpr_timespec last_shutdown_message_time; +}; + #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) @@ -220,7 +230,9 @@ static void fail_call(grpc_server *server, requested_call *rc); hold mu_call */ static void maybe_finish_shutdown(grpc_server *server); -/* channel broadcaster */ +/* + * channel broadcaster + */ /* assumes server locked */ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { @@ -281,55 +293,44 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, gpr_free(cb->channels); } -/* call list */ +/* + * request_matcher + */ -static int call_list_join(call_data **root, call_data *call, call_list list) { - GPR_ASSERT(!call->root[list]); - call->root[list] = root; - if (!*root) { - *root = call; - call->links[list].next = call->links[list].prev = call; - } else { - call->links[list].next = *root; - call->links[list].prev = (*root)->links[list].prev; - call->links[list].next->links[list].prev = - call->links[list].prev->links[list].next = call; - } - return 1; +static void request_matcher_init(request_matcher *request_matcher, + int entries) { + memset(request_matcher, 0, sizeof(*request_matcher)); + request_matcher->requests = gpr_stack_lockfree_create(entries); } -static call_data *call_list_remove_head(call_data **root, call_list list) { - call_data *out = *root; - if (out) { - out->root[list] = NULL; - if (out->links[list].next == out) { - *root = NULL; - } else { - *root = out->links[list].next; - out->links[list].next->links[list].prev = out->links[list].prev; - out->links[list].prev->links[list].next = out->links[list].next; - } - } - return out; +static void request_matcher_destroy(request_matcher *request_matcher) { + GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1); + gpr_stack_lockfree_destroy(request_matcher->requests); } -static int call_list_remove(call_data *call, call_list list) { - call_data **root = call->root[list]; - if (root == NULL) return 0; - call->root[list] = NULL; - if (*root == call) { - *root = call->links[list].next; - if (*root == call) { - *root = NULL; - return 1; - } +static void kill_zombie(void *elem, int success) { + grpc_call_destroy(grpc_call_from_top_element(elem)); +} + +static void request_matcher_zombify_all_pending_calls( + request_matcher *request_matcher) { + while (request_matcher->pending_head) { + call_data *calld = request_matcher->pending_head; + request_matcher->pending_head = calld->pending_next; + gpr_mu_lock(&calld->mu_state); + calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - GPR_ASSERT(*root != call); - call->links[list].next->links[list].prev = call->links[list].prev; - call->links[list].prev->links[list].next = call->links[list].next; - return 1; } +/* + * server proper + */ + static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -343,6 +344,7 @@ static void server_delete(grpc_server *server) { gpr_free(server->channel_filters); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; + request_matcher_destroy(&rm->request_matcher); gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); @@ -350,9 +352,12 @@ static void server_delete(grpc_server *server) { for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); } + request_matcher_destroy(&server->unregistered_request_matcher); + gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); + gpr_free(server->requested_calls); gpr_free(server); } @@ -391,25 +396,29 @@ static void destroy_channel(channel_data *chand) { } static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, - call_data **pending_root, - requested_call **requests) { - requested_call *rc; + request_matcher *request_matcher) { call_data *calld = elem->call_data; - gpr_mu_lock(&server->mu_call); - rc = *requests; - if (rc == NULL) { + int request_id; + + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) { + gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; gpr_mu_unlock(&calld->mu_state); - call_list_join(pending_root, calld, PENDING_START); + if (request_matcher->pending_head == NULL) { + request_matcher->pending_tail = request_matcher->pending_head = calld; + } else { + request_matcher->pending_tail->pending_next = calld; + request_matcher->pending_tail = calld; + } + calld->pending_next = NULL; gpr_mu_unlock(&server->mu_call); } else { - *requests = rc->next; gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, rc); + begin_call(server, calld, &server->requested_calls[request_id]); } } @@ -431,8 +440,8 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requests); + finish_start_new_rpc(server, elem, + &rm->server_registered_method->request_matcher); return; } /* check for a wildcard method definition (no host set) */ @@ -443,17 +452,12 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requests); + finish_start_new_rpc(server, elem, + &rm->server_registered_method->request_matcher); return; } } - finish_start_new_rpc(server, elem, &server->lists[PENDING_START], - &server->requests); -} - -static void kill_zombie(void *elem, int success) { - grpc_call_destroy(grpc_call_from_top_element(elem)); + finish_start_new_rpc(server, elem, &server->unregistered_request_matcher); } static int num_listeners(grpc_server *server) { @@ -481,7 +485,7 @@ static int num_channels(grpc_server *server) { static void maybe_finish_shutdown(grpc_server *server) { size_t i; - if (!server->shutdown || server->shutdown_published) { + if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } @@ -526,7 +530,6 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; if (success && !calld->got_initial_metadata) { size_t i; @@ -572,11 +575,8 @@ static void server_on_recv(void *ptr, int success) { } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - gpr_mu_lock(&chand->server->mu_call); - call_list_remove(calld, PENDING_START); - gpr_mu_unlock(&chand->server->mu_call); - grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + /* zombied call will be destroyed when it's removed from the pending + queue... later */ } else { gpr_mu_unlock(&calld->mu_state); } @@ -654,11 +654,7 @@ static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (calld->state == PENDING) { - gpr_mu_lock(&chand->server->mu_call); - call_list_remove(elem->call_data, PENDING_START); - gpr_mu_unlock(&chand->server->mu_call); - } + GPR_ASSERT(calld->state != PENDING); if (calld->host) { GRPC_MDSTR_UNREF(calld->host); @@ -765,6 +761,18 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls = 32768; + server->request_freelist = + gpr_stack_lockfree_create(server->max_requested_calls); + for (i = 0; i < (size_t)server->max_requested_calls; i++) { + gpr_stack_lockfree_push(server->request_freelist, i); + } + request_matcher_init(&server->unregistered_request_matcher, + server->max_requested_calls); + server->requested_calls = gpr_malloc(server->max_requested_calls * + sizeof(*server->requested_calls)); + /* Server filter stack is: server_surface_filter - for making surface API calls @@ -812,6 +820,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); + request_matcher_init(&m->request_matcher, server->max_requested_calls); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -927,13 +936,49 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } +typedef struct { + requested_call **requests; + size_t count; + size_t capacity; +} request_killer; + +static void request_killer_init(request_killer *rk) { + memset(rk, 0, sizeof(*rk)); +} + +static void request_killer_add(request_killer *rk, requested_call *rc) { + if (rk->capacity == rk->count) { + rk->capacity = GPR_MAX(8, rk->capacity * 2); + rk->requests = + gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests)); + } + rk->requests[rk->count++] = rc; +} + +static void request_killer_add_request_matcher(request_killer *rk, + grpc_server *server, + request_matcher *rm) { + int request_id; + while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { + request_killer_add(rk, &server->requested_calls[request_id]); + } +} + +static void request_killer_run(request_killer *rk, grpc_server *server) { + size_t i; + for (i = 0; i < rk->count; i++) { + fail_call(server, rk->requests[i]); + } + gpr_free(rk->requests); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; + request_killer reqkill; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); @@ -944,7 +989,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt = &server->shutdown_tags[server->num_shutdown_tags++]; sdt->tag = tag; sdt->cq = cq; - if (server->shutdown) { + if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); return; } @@ -952,31 +997,26 @@ void grpc_server_shutdown_and_notify(grpc_server *server, server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); channel_broadcaster_init(server, &broadcaster); + request_killer_init(&reqkill); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requests = server->requests; - server->requests = NULL; + request_killer_add_request_matcher(&reqkill, server, + &server->unregistered_request_matcher); + request_matcher_zombify_all_pending_calls( + &server->unregistered_request_matcher); for (rm = server->registered_methods; rm; rm = rm->next) { - while (rm->requests != NULL) { - requested_call *c = rm->requests; - rm->requests = c->next; - c->next = requests; - requests = c; - } + request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(&rm->request_matcher); } gpr_mu_unlock(&server->mu_call); - server->shutdown = 1; + gpr_atm_rel_store(&server->shutdown_flag, 1); maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - while (requests != NULL) { - requested_call *next = requests->next; - fail_call(server, requests); - requests = next; - } + request_killer_run(&reqkill, server); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -1008,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) { listener *l; gpr_mu_lock(&server->mu_global); - GPR_ASSERT(server->shutdown || !server->listeners); + GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { @@ -1038,39 +1078,55 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call **requests = NULL; - gpr_mu_lock(&server->mu_call); - if (server->shutdown) { - gpr_mu_unlock(&server->mu_call); + request_matcher *request_matcher = NULL; + int request_id; + if (gpr_atm_acq_load(&server->shutdown_flag)) { + fail_call(server, rc); + return GRPC_CALL_OK; + } + request_id = gpr_stack_lockfree_pop(server->request_freelist); + if (request_id == -1) { + /* out of request ids: just fail this one */ fail_call(server, rc); return GRPC_CALL_OK; } switch (rc->type) { case BATCH_CALL: - calld = - call_list_remove_head(&server->lists[PENDING_START], PENDING_START); - requests = &server->requests; + request_matcher = &server->unregistered_request_matcher; break; case REGISTERED_CALL: - calld = call_list_remove_head( - &rc->data.registered.registered_method->pending, PENDING_START); - requests = &rc->data.registered.registered_method->requests; + request_matcher = &rc->data.registered.registered_method->request_matcher; break; } - if (calld != NULL) { - gpr_mu_unlock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - GPR_ASSERT(calld->state == PENDING); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, rc); - return GRPC_CALL_OK; - } else { - rc->next = *requests; - *requests = rc; + server->requested_calls[request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) { + /* this was the first queued request: we need to lock and start + matching calls */ + gpr_mu_lock(&server->mu_call); + while ((calld = request_matcher->pending_head) != NULL) { + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) break; + request_matcher->pending_head = calld->pending_next; + gpr_mu_unlock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); + if (calld->state == ZOMBIED) { + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } else { + GPR_ASSERT(calld->state == PENDING); + calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); + begin_call(server, calld, &server->requested_calls[request_id]); + } + gpr_mu_lock(&server->mu_call); + } gpr_mu_unlock(&server->mu_call); - return GRPC_CALL_OK; } + return GRPC_CALL_OK; } grpc_call_error grpc_server_request_call( @@ -1088,6 +1144,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification); rc->type = BATCH_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1110,6 +1167,7 @@ grpc_call_error grpc_server_request_registered_call( } grpc_cq_begin_op(cq_for_notification); rc->type = REGISTERED_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1189,7 +1247,16 @@ static void begin_call(grpc_server *server, call_data *calld, } static void done_request_event(void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls && + rc < server->requested_calls + server->max_requested_calls) { + gpr_stack_lockfree_push(server->request_freelist, + rc - server->requested_calls); + } else { + gpr_free(req); + } } static void fail_call(grpc_server *server, requested_call *rc) { |