From 729b35a59e46bba034fcf598c01957ce4ee8c27f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Jul 2015 12:36:47 -0700 Subject: Split request matching into a single datastructure --- src/core/surface/server.c | 255 ++++++++++++++++++++++------------------------ 1 file changed, 123 insertions(+), 132 deletions(-) (limited to 'src') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 4dc51bf031..6f63f999bf 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -51,8 +51,6 @@ #include #include -typedef enum { PENDING_START, CALL_LIST_COUNT } call_list; - typedef struct listener { void *arg; void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, @@ -94,14 +92,6 @@ typedef struct requested_call { } data; } requested_call; -struct registered_method { - char *method; - char *host; - call_data *pending; - requested_call *requests; - registered_method *next; -}; - typedef struct channel_registered_method { registered_method *server_registered_method; grpc_mdstr *method; @@ -130,44 +120,6 @@ typedef struct shutdown_tag { grpc_cq_completion completion; } shutdown_tag; -struct grpc_server { - size_t channel_filter_count; - const grpc_channel_filter **channel_filters; - grpc_channel_args *channel_args; - - grpc_completion_queue **cqs; - grpc_pollset **pollsets; - size_t cq_count; - - /* The two following mutexes control access to server-state - mu_global controls access to non-call-related state (e.g., channel state) - mu_call controls access to call-related state (e.g., the call lists) - - If they are ever required to be nested, you must lock mu_global - before mu_call. This is currently used in shutdown processing - (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ - gpr_mu mu_global; /* mutex for server and channel state */ - gpr_mu mu_call; /* mutex for call-specific state */ - - registered_method *registered_methods; - requested_call *requests; - - gpr_uint8 shutdown; - gpr_uint8 shutdown_published; - size_t num_shutdown_tags; - shutdown_tag *shutdown_tags; - - call_data *lists[CALL_LIST_COUNT]; - channel_data root_channel_data; - - listener *listeners; - int listeners_destroyed; - gpr_refcount internal_refcount; - - /** when did we print the last shutdown progress message */ - gpr_timespec last_shutdown_message_time; -}; - typedef enum { /* waiting for metadata */ NOT_STARTED, @@ -179,6 +131,8 @@ typedef enum { ZOMBIED } call_state; +typedef struct request_matcher request_matcher; + struct call_data { grpc_call *call; @@ -201,8 +155,20 @@ struct call_data { grpc_iomgr_closure server_on_recv; grpc_iomgr_closure kill_zombie_closure; - call_data **root[CALL_LIST_COUNT]; - call_link links[CALL_LIST_COUNT]; + call_data *pending_next; +}; + +struct request_matcher { + call_data *pending_head; + call_data *pending_tail; + requested_call *requests; +}; + +struct registered_method { + char *method; + char *host; + request_matcher request_matcher; + registered_method *next; }; typedef struct { @@ -210,6 +176,43 @@ typedef struct { size_t num_channels; } channel_broadcaster; +struct grpc_server { + size_t channel_filter_count; + const grpc_channel_filter **channel_filters; + grpc_channel_args *channel_args; + + grpc_completion_queue **cqs; + grpc_pollset **pollsets; + size_t cq_count; + + /* The two following mutexes control access to server-state + mu_global controls access to non-call-related state (e.g., channel state) + mu_call controls access to call-related state (e.g., the call lists) + + If they are ever required to be nested, you must lock mu_global + before mu_call. This is currently used in shutdown processing + (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */ + gpr_mu mu_global; /* mutex for server and channel state */ + gpr_mu mu_call; /* mutex for call-specific state */ + + registered_method *registered_methods; + request_matcher unregistered_request_matcher; + + gpr_uint8 shutdown; + gpr_uint8 shutdown_published; + size_t num_shutdown_tags; + shutdown_tag *shutdown_tags; + + channel_data root_channel_data; + + listener *listeners; + int listeners_destroyed; + gpr_refcount internal_refcount; + + /** when did we print the last shutdown progress message */ + gpr_timespec last_shutdown_message_time; +}; + #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) @@ -220,7 +223,9 @@ static void fail_call(grpc_server *server, requested_call *rc); hold mu_call */ static void maybe_finish_shutdown(grpc_server *server); -/* channel broadcaster */ +/* + * channel broadcaster + */ /* assumes server locked */ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) { @@ -281,55 +286,35 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, gpr_free(cb->channels); } -/* call list */ +/* + * request_matcher + */ -static int call_list_join(call_data **root, call_data *call, call_list list) { - GPR_ASSERT(!call->root[list]); - call->root[list] = root; - if (!*root) { - *root = call; - call->links[list].next = call->links[list].prev = call; - } else { - call->links[list].next = *root; - call->links[list].prev = (*root)->links[list].prev; - call->links[list].next->links[list].prev = - call->links[list].prev->links[list].next = call; - } - return 1; +static void request_matcher_init(request_matcher *request_matcher) { + memset(request_matcher, 0, sizeof(*request_matcher)); } -static call_data *call_list_remove_head(call_data **root, call_list list) { - call_data *out = *root; - if (out) { - out->root[list] = NULL; - if (out->links[list].next == out) { - *root = NULL; - } else { - *root = out->links[list].next; - out->links[list].next->links[list].prev = out->links[list].prev; - out->links[list].prev->links[list].next = out->links[list].next; - } - } - return out; +static void kill_zombie(void *elem, int success) { + grpc_call_destroy(grpc_call_from_top_element(elem)); } -static int call_list_remove(call_data *call, call_list list) { - call_data **root = call->root[list]; - if (root == NULL) return 0; - call->root[list] = NULL; - if (*root == call) { - *root = call->links[list].next; - if (*root == call) { - *root = NULL; - return 1; - } +static void request_matcher_zombify_all_pending_calls( + request_matcher *request_matcher) { + while (request_matcher->pending_head) { + call_data *calld = request_matcher->pending_head; + request_matcher->pending_head = calld->pending_next; + calld->state = ZOMBIED; + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); } - GPR_ASSERT(*root != call); - call->links[list].next->links[list].prev = call->links[list].prev; - call->links[list].prev->links[list].next = call->links[list].next; - return 1; } +/* + * server proper + */ + static void server_ref(grpc_server *server) { gpr_ref(&server->internal_refcount); } @@ -391,20 +376,25 @@ static void destroy_channel(channel_data *chand) { } static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, - call_data **pending_root, - requested_call **requests) { + request_matcher *request_matcher) { requested_call *rc; call_data *calld = elem->call_data; gpr_mu_lock(&server->mu_call); - rc = *requests; + rc = request_matcher->requests; if (rc == NULL) { gpr_mu_lock(&calld->mu_state); calld->state = PENDING; gpr_mu_unlock(&calld->mu_state); - call_list_join(pending_root, calld, PENDING_START); + if (request_matcher->pending_head == NULL) { + request_matcher->pending_tail = request_matcher->pending_head = calld; + } else { + request_matcher->pending_tail->pending_next = calld; + request_matcher->pending_tail = calld; + } + calld->pending_next = NULL; gpr_mu_unlock(&server->mu_call); } else { - *requests = rc->next; + request_matcher->requests = rc->next; gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); @@ -431,8 +421,8 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != calld->host) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requests); + finish_start_new_rpc(server, elem, + &rm->server_registered_method->request_matcher); return; } /* check for a wildcard method definition (no host set) */ @@ -443,17 +433,12 @@ static void start_new_rpc(grpc_call_element *elem) { if (!rm) break; if (rm->host != NULL) continue; if (rm->method != calld->path) continue; - finish_start_new_rpc(server, elem, &rm->server_registered_method->pending, - &rm->server_registered_method->requests); + finish_start_new_rpc(server, elem, + &rm->server_registered_method->request_matcher); return; } } - finish_start_new_rpc(server, elem, &server->lists[PENDING_START], - &server->requests); -} - -static void kill_zombie(void *elem, int success) { - grpc_call_destroy(grpc_call_from_top_element(elem)); + finish_start_new_rpc(server, elem, &server->unregistered_request_matcher); } static int num_listeners(grpc_server *server) { @@ -526,7 +511,6 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { static void server_on_recv(void *ptr, int success) { grpc_call_element *elem = ptr; call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; if (success && !calld->got_initial_metadata) { size_t i; @@ -571,11 +555,8 @@ static void server_on_recv(void *ptr, int success) { } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - gpr_mu_lock(&chand->server->mu_call); - call_list_remove(calld, PENDING_START); - gpr_mu_unlock(&chand->server->mu_call); - grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); + /* zombied call will be destroyed when it's removed from the pending + queue... later */ } else { gpr_mu_unlock(&calld->mu_state); } @@ -653,11 +634,7 @@ static void destroy_call_elem(grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (calld->state == PENDING) { - gpr_mu_lock(&chand->server->mu_call); - call_list_remove(elem->call_data, PENDING_START); - gpr_mu_unlock(&chand->server->mu_call); - } + GPR_ASSERT(calld->state != PENDING); if (calld->host) { GRPC_MDSTR_UNREF(calld->host); @@ -764,6 +741,8 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + request_matcher_init(&server->unregistered_request_matcher); + /* Server filter stack is: server_surface_filter - for making surface API calls @@ -811,6 +790,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); + request_matcher_init(&m->request_matcher); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -954,15 +934,18 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requests = server->requests; - server->requests = NULL; + requests = server->unregistered_request_matcher.requests; + server->unregistered_request_matcher.requests = NULL; + request_matcher_zombify_all_pending_calls( + &server->unregistered_request_matcher); for (rm = server->registered_methods; rm; rm = rm->next) { - while (rm->requests != NULL) { - requested_call *c = rm->requests; - rm->requests = c->next; + while (rm->request_matcher.requests != NULL) { + requested_call *c = rm->request_matcher.requests; + rm->request_matcher.requests = c->next; c->next = requests; requests = c; } + request_matcher_zombify_all_pending_calls(&rm->request_matcher); } gpr_mu_unlock(&server->mu_call); @@ -1037,7 +1020,7 @@ void grpc_server_add_listener(grpc_server *server, void *arg, static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; - requested_call **requests = NULL; + request_matcher *request_matcher = NULL; gpr_mu_lock(&server->mu_call); if (server->shutdown) { gpr_mu_unlock(&server->mu_call); @@ -1046,27 +1029,35 @@ static grpc_call_error queue_call_request(grpc_server *server, } switch (rc->type) { case BATCH_CALL: - calld = - call_list_remove_head(&server->lists[PENDING_START], PENDING_START); - requests = &server->requests; + request_matcher = &server->unregistered_request_matcher; break; case REGISTERED_CALL: - calld = call_list_remove_head( - &rc->data.registered.registered_method->pending, PENDING_START); - requests = &rc->data.registered.registered_method->requests; + request_matcher = &rc->data.registered.registered_method->request_matcher; break; } + if (request_matcher->pending_head != NULL) { + calld = request_matcher->pending_head; + request_matcher->pending_head = calld->pending_next; + } if (calld != NULL) { gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); + if (calld->state == ZOMBIED) { + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + return queue_call_request(server, rc); /* retry */ + } GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); begin_call(server, calld, rc); return GRPC_CALL_OK; } else { - rc->next = *requests; - *requests = rc; + rc->next = request_matcher->requests; + request_matcher->requests = rc; gpr_mu_unlock(&server->mu_call); return GRPC_CALL_OK; } -- cgit v1.2.3 From eef4c2570d7bc8dbe314403a68bff2500722c07d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Jul 2015 16:25:21 -0700 Subject: Add a detectable edge for the first insert (so far untested) --- src/core/support/stack_lockfree.c | 3 ++- src/core/support/stack_lockfree.h | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index 83a68444f5..598aa9cdf1 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -95,7 +95,7 @@ void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { gpr_free(stack); } -void gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { +int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { lockfree_node head; lockfree_node newhead; @@ -112,6 +112,7 @@ void gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); /* Use rel_cas above to make sure that entry index is set properly */ + return head.atm == INVALID_ENTRY_INDEX; } int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { diff --git a/src/core/support/stack_lockfree.h b/src/core/support/stack_lockfree.h index 7919ef38cc..4da3572deb 100644 --- a/src/core/support/stack_lockfree.h +++ b/src/core/support/stack_lockfree.h @@ -42,7 +42,8 @@ gpr_stack_lockfree *gpr_stack_lockfree_create(int entries); void gpr_stack_lockfree_destroy(gpr_stack_lockfree *); /* Pass in a valid entry number for the next stack entry */ -void gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); +/* Returns 1 if this is the first element on the stack, 0 otherwise */ +int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); /* Returns -1 on empty or the actual entry number */ int gpr_stack_lockfree_pop(gpr_stack_lockfree *); -- cgit v1.2.3 From 6a006cee8a86a754174939b967eb7c946d303eb0 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Jul 2015 16:25:40 -0700 Subject: First pass of lockless delivery of new calls --- src/core/surface/server.c | 198 +++++++++++++++++++++++++++++++--------------- 1 file changed, 136 insertions(+), 62 deletions(-) (limited to 'src') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 6f63f999bf..9eb501decb 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -36,20 +36,22 @@ #include #include +#include +#include +#include +#include + #include "src/core/channel/census_filter.h" #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/support/stack_lockfree.h" #include "src/core/support/string.h" #include "src/core/surface/call.h" #include "src/core/surface/channel.h" #include "src/core/surface/completion_queue.h" #include "src/core/surface/init.h" #include "src/core/transport/metadata.h" -#include -#include -#include -#include typedef struct listener { void *arg; @@ -72,8 +74,8 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; - struct requested_call *next; void *tag; + grpc_server *server; grpc_completion_queue *cq_bound_to_call; grpc_completion_queue *cq_for_notification; grpc_call **call; @@ -161,7 +163,7 @@ struct call_data { struct request_matcher { call_data *pending_head; call_data *pending_tail; - requested_call *requests; + gpr_stack_lockfree *requests; }; struct registered_method { @@ -197,8 +199,13 @@ struct grpc_server { registered_method *registered_methods; request_matcher unregistered_request_matcher; + /** free list of available requested_calls indices */ + gpr_stack_lockfree *request_freelist; + /** requested call backing data */ + requested_call *requested_calls; + int max_requested_calls; - gpr_uint8 shutdown; + gpr_atm shutdown_flag; gpr_uint8 shutdown_published; size_t num_shutdown_tags; shutdown_tag *shutdown_tags; @@ -290,8 +297,15 @@ static void channel_broadcaster_shutdown(channel_broadcaster *cb, * request_matcher */ -static void request_matcher_init(request_matcher *request_matcher) { +static void request_matcher_init(request_matcher *request_matcher, + int entries) { memset(request_matcher, 0, sizeof(*request_matcher)); + request_matcher->requests = gpr_stack_lockfree_create(entries); +} + +static void request_matcher_destroy(request_matcher *request_matcher) { + GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1); + gpr_stack_lockfree_destroy(request_matcher->requests); } static void kill_zombie(void *elem, int success) { @@ -328,6 +342,7 @@ static void server_delete(grpc_server *server) { gpr_free(server->channel_filters); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; + request_matcher_destroy(&rm->request_matcher); gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); @@ -335,9 +350,12 @@ static void server_delete(grpc_server *server) { for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); } + request_matcher_destroy(&server->unregistered_request_matcher); + gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); + gpr_free(server->requested_calls); gpr_free(server); } @@ -377,11 +395,12 @@ static void destroy_channel(channel_data *chand) { static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, request_matcher *request_matcher) { - requested_call *rc; call_data *calld = elem->call_data; - gpr_mu_lock(&server->mu_call); - rc = request_matcher->requests; - if (rc == NULL) { + int request_id; + + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) { + gpr_mu_lock(&server->mu_call); gpr_mu_lock(&calld->mu_state); calld->state = PENDING; gpr_mu_unlock(&calld->mu_state); @@ -394,12 +413,10 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem, calld->pending_next = NULL; gpr_mu_unlock(&server->mu_call); } else { - request_matcher->requests = rc->next; gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - gpr_mu_unlock(&server->mu_call); - begin_call(server, calld, rc); + begin_call(server, calld, &server->requested_calls[request_id]); } } @@ -466,7 +483,7 @@ static int num_channels(grpc_server *server) { static void maybe_finish_shutdown(grpc_server *server) { size_t i; - if (!server->shutdown || server->shutdown_published) { + if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) { return; } @@ -741,7 +758,17 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters, server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - request_matcher_init(&server->unregistered_request_matcher); + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls = 32768; + server->request_freelist = + gpr_stack_lockfree_create(server->max_requested_calls); + for (i = 0; i < (size_t)server->max_requested_calls; i++) { + gpr_stack_lockfree_push(server->request_freelist, i); + } + request_matcher_init(&server->unregistered_request_matcher, + server->max_requested_calls); + server->requested_calls = gpr_malloc(server->max_requested_calls * + sizeof(*server->requested_calls)); /* Server filter stack is: @@ -790,7 +817,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); - request_matcher_init(&m->request_matcher); + request_matcher_init(&m->request_matcher, server->max_requested_calls); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -906,13 +933,49 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } +typedef struct { + requested_call **requests; + size_t count; + size_t capacity; +} request_killer; + +static void request_killer_init(request_killer *rk) { + memset(rk, 0, sizeof(*rk)); +} + +static void request_killer_add(request_killer *rk, requested_call *rc) { + if (rk->capacity == rk->count) { + rk->capacity = GPR_MAX(8, rk->capacity * 2); + rk->requests = + gpr_realloc(rk->requests, rk->capacity * sizeof(*rk->requests)); + } + rk->requests[rk->count++] = rc; +} + +static void request_killer_add_request_matcher(request_killer *rk, + grpc_server *server, + request_matcher *rm) { + int request_id; + while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { + request_killer_add(rk, &server->requested_calls[request_id]); + } +} + +static void request_killer_run(request_killer *rk, grpc_server *server) { + size_t i; + for (i = 0; i < rk->count; i++) { + fail_call(server, rk->requests[i]); + } + gpr_free(rk->requests); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; - requested_call *requests = NULL; registered_method *rm; shutdown_tag *sdt; channel_broadcaster broadcaster; + request_killer reqkill; /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); @@ -923,7 +986,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, sdt = &server->shutdown_tags[server->num_shutdown_tags++]; sdt->tag = tag; sdt->cq = cq; - if (server->shutdown) { + if (gpr_atm_acq_load(&server->shutdown_flag)) { gpr_mu_unlock(&server->mu_global); return; } @@ -931,34 +994,26 @@ void grpc_server_shutdown_and_notify(grpc_server *server, server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME); channel_broadcaster_init(server, &broadcaster); + request_killer_init(&reqkill); /* collect all unregistered then registered calls */ gpr_mu_lock(&server->mu_call); - requests = server->unregistered_request_matcher.requests; - server->unregistered_request_matcher.requests = NULL; + request_killer_add_request_matcher(&reqkill, server, + &server->unregistered_request_matcher); request_matcher_zombify_all_pending_calls( &server->unregistered_request_matcher); for (rm = server->registered_methods; rm; rm = rm->next) { - while (rm->request_matcher.requests != NULL) { - requested_call *c = rm->request_matcher.requests; - rm->request_matcher.requests = c->next; - c->next = requests; - requests = c; - } + request_killer_add_request_matcher(&reqkill, server, &rm->request_matcher); request_matcher_zombify_all_pending_calls(&rm->request_matcher); } gpr_mu_unlock(&server->mu_call); - server->shutdown = 1; + gpr_atm_rel_store(&server->shutdown_flag, 1); maybe_finish_shutdown(server); gpr_mu_unlock(&server->mu_global); /* terminate all the requested calls */ - while (requests != NULL) { - requested_call *next = requests->next; - fail_call(server, requests); - requests = next; - } + request_killer_run(&reqkill, server); /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { @@ -990,7 +1045,7 @@ void grpc_server_destroy(grpc_server *server) { listener *l; gpr_mu_lock(&server->mu_global); - GPR_ASSERT(server->shutdown || !server->listeners); + GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { @@ -1021,9 +1076,14 @@ static grpc_call_error queue_call_request(grpc_server *server, requested_call *rc) { call_data *calld = NULL; request_matcher *request_matcher = NULL; - gpr_mu_lock(&server->mu_call); - if (server->shutdown) { - gpr_mu_unlock(&server->mu_call); + int request_id; + if (gpr_atm_acq_load(&server->shutdown_flag)) { + fail_call(server, rc); + return GRPC_CALL_OK; + } + request_id = gpr_stack_lockfree_pop(server->request_freelist); + if (request_id == -1) { + /* out of request ids: just fail this one */ fail_call(server, rc); return GRPC_CALL_OK; } @@ -1035,32 +1095,35 @@ static grpc_call_error queue_call_request(grpc_server *server, request_matcher = &rc->data.registered.registered_method->request_matcher; break; } - if (request_matcher->pending_head != NULL) { - calld = request_matcher->pending_head; - request_matcher->pending_head = calld->pending_next; - } - if (calld != NULL) { - gpr_mu_unlock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - if (calld->state == ZOMBIED) { - gpr_mu_unlock(&calld->mu_state); - grpc_iomgr_closure_init( - &calld->kill_zombie_closure, kill_zombie, - grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_iomgr_add_callback(&calld->kill_zombie_closure); - return queue_call_request(server, rc); /* retry */ + server->requested_calls[request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) { + /* this was the first queued request: we need to lock and start + matching calls */ + gpr_mu_lock(&server->mu_call); + while ((calld = request_matcher->pending_head) != NULL) { + request_id = gpr_stack_lockfree_pop(request_matcher->requests); + if (request_id == -1) break; + request_matcher->pending_head = calld->pending_next; + gpr_mu_unlock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); + if (calld->state == ZOMBIED) { + gpr_mu_unlock(&calld->mu_state); + grpc_iomgr_closure_init( + &calld->kill_zombie_closure, kill_zombie, + grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); + grpc_iomgr_add_callback(&calld->kill_zombie_closure); + } else { + GPR_ASSERT(calld->state == PENDING); + calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); + begin_call(server, calld, rc); + } + gpr_mu_lock(&server->mu_call); } - GPR_ASSERT(calld->state == PENDING); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, rc); - return GRPC_CALL_OK; - } else { - rc->next = request_matcher->requests; - request_matcher->requests = rc; gpr_mu_unlock(&server->mu_call); - return GRPC_CALL_OK; } + return GRPC_CALL_OK; } grpc_call_error grpc_server_request_call( @@ -1078,6 +1141,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification); rc->type = BATCH_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1100,6 +1164,7 @@ grpc_call_error grpc_server_request_registered_call( } grpc_cq_begin_op(cq_for_notification); rc->type = REGISTERED_CALL; + rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; rc->cq_for_notification = cq_for_notification; @@ -1179,7 +1244,16 @@ static void begin_call(grpc_server *server, call_data *calld, } static void done_request_event(void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls && + rc < server->requested_calls + server->max_requested_calls) { + gpr_stack_lockfree_push(server->request_freelist, + rc - server->requested_calls); + } else { + gpr_free(req); + } } static void fail_call(grpc_server *server, requested_call *rc) { -- cgit v1.2.3 From ffe27b98b90ea9f196b89153959c587561ad4ce2 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 13 Jul 2015 16:37:15 -0700 Subject: Implement test for stack initial push edge --- src/core/support/stack_lockfree.c | 2 +- test/core/support/stack_lockfree_test.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/support/stack_lockfree.c b/src/core/support/stack_lockfree.c index 598aa9cdf1..0093b33a2e 100644 --- a/src/core/support/stack_lockfree.c +++ b/src/core/support/stack_lockfree.c @@ -112,7 +112,7 @@ int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); /* Use rel_cas above to make sure that entry index is set properly */ - return head.atm == INVALID_ENTRY_INDEX; + return head.contents.index == INVALID_ENTRY_INDEX; } int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { diff --git a/test/core/support/stack_lockfree_test.c b/test/core/support/stack_lockfree_test.c index ebee04d5b8..6141f9811d 100644 --- a/test/core/support/stack_lockfree_test.c +++ b/test/core/support/stack_lockfree_test.c @@ -62,7 +62,7 @@ static void test_serial_sized(int size) { for (i=1; i Date: Tue, 14 Jul 2015 07:12:19 -0700 Subject: Initial fixes for lock free request path --- src/core/surface/server.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 9eb501decb..84c25c3cfd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -317,7 +317,9 @@ static void request_matcher_zombify_all_pending_calls( while (request_matcher->pending_head) { call_data *calld = request_matcher->pending_head; request_matcher->pending_head = calld->pending_next; + gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; + gpr_mu_unlock(&calld->mu_state); grpc_iomgr_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); @@ -1117,7 +1119,7 @@ static grpc_call_error queue_call_request(grpc_server *server, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - begin_call(server, calld, rc); + begin_call(server, calld, &server->requested_calls[request_id]); } gpr_mu_lock(&server->mu_call); } -- cgit v1.2.3 From 5c785d4c3982e6c2dd3bacab11e01405ddfbab8f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 14 Jul 2015 08:23:43 -0700 Subject: Hoist epoll_ctl outside of pollset lock --- src/core/iomgr/pollset_multipoller_with_epoll.c | 17 +++++++++++--- .../iomgr/pollset_multipoller_with_poll_posix.c | 12 ++++++++-- src/core/iomgr/pollset_posix.c | 27 ++++++++++++++-------- src/core/iomgr/pollset_posix.h | 6 +++-- 4 files changed, 46 insertions(+), 16 deletions(-) (limited to 'src') diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 3746c8edaf..d697b59e4c 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -50,12 +50,17 @@ typedef struct { } pollset_hdr; static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; grpc_fd_watcher watcher; + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* We pretend to be polling whilst adding an fd to keep the fd from being closed during the add. This may result in a spurious wakeup being assigned to this pollset whilst adding, but that should be benign. */ @@ -76,9 +81,15 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { pollset_hdr *h = pollset->data.ptr; int err; + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } + /* Note that this can race with concurrent poll, but that should be fine since * at worst it creates a spurious read event on a reused grpc_fd object. */ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL); @@ -183,7 +194,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, abort(); } for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(pollset, fds[i]); + multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0); } grpc_wakeup_fd_create(&h->wakeup_fd); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 7b717bd159..7377a04817 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -66,7 +66,8 @@ typedef struct { } pollset_hdr; static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { size_t i; pollset_hdr *h = pollset->data.ptr; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ @@ -79,10 +80,14 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, } h->fds[h->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd) { + grpc_fd *fd, + int and_unlock_pollset) { /* will get removed next poll cycle */ pollset_hdr *h = pollset->data.ptr; if (h->del_count == h->del_capacity) { @@ -91,6 +96,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, } h->dels[h->del_count++] = fd; GRPC_FD_REF(fd, "multipoller_del"); + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void end_polling(grpc_pollset *pollset) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 85101764d2..e8189c28ad 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -105,14 +105,12 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd); - gpr_mu_unlock(&pollset->mu); + pollset->vtable->add_fd(pollset, fd, 1); } void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd); - gpr_mu_unlock(&pollset->mu); + pollset->vtable->del_fd(pollset, fd, 1); } static void finish_shutdown(grpc_pollset *pollset) { @@ -257,7 +255,7 @@ static void basic_do_promote(void *args, int success) { } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd); + pollset->vtable->add_fd(pollset, fd, 0); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; @@ -287,10 +285,11 @@ static void basic_do_promote(void *args, int success) { GRPC_FD_UNREF(fd, "basicpoll_add"); } -static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); - if (fd == pollset->data.ptr) return; + if (fd == pollset->data.ptr) goto exit; if (!pollset->counter) { /* Fast path -- no in flight cbs */ @@ -313,7 +312,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { pollset->data.ptr = fd; GRPC_FD_REF(fd, "basicpoll"); } - return; + goto exit; } /* Now we need to promote. This needs to happen when we're not polling. Since @@ -329,14 +328,24 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { grpc_iomgr_add_callback(&up_args->promotion_closure); grpc_pollset_kick(pollset); + +exit: + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } -static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, + int and_unlock_pollset) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); pollset->data.ptr = NULL; } + + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + } } static void basic_pollset_maybe_work(grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 53585a2886..37de1276d1 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -66,8 +66,10 @@ typedef struct grpc_pollset { } grpc_pollset; struct grpc_pollset_vtable { - void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); - void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); + void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd, + int and_unlock_pollset); void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback); void (*kick)(grpc_pollset *pollset); -- cgit v1.2.3 From b0c13ad7698e577298f199007cc2d7a0a3049d1c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Jul 2015 08:42:31 -0700 Subject: Fix a leaked lock, and make a debug-only detection for this class of bug --- src/core/iomgr/pollset_multipoller_with_poll_posix.c | 3 ++- src/core/iomgr/pollset_posix.c | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 7377a04817..0084e83953 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -72,7 +72,7 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, pollset_hdr *h = pollset->data.ptr; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ for (i = 0; i < h->fd_count; i++) { - if (h->fds[i] == fd) return; + if (h->fds[i] == fd) goto exit; } if (h->fd_count == h->fd_capacity) { h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); @@ -80,6 +80,7 @@ static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, } h->fds[h->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); +exit: if (and_unlock_pollset) { gpr_mu_unlock(&pollset->mu); } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index e8189c28ad..3c35fcb89f 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -106,11 +106,21 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd, 1); +#ifndef NDEBUG + /* this will deadlock if the unlocking rules aren't correctly implemented */ + gpr_mu_lock(&pollset->mu); + gpr_mu_unlock(&pollset->mu); +#endif } void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->del_fd(pollset, fd, 1); +#ifndef NDEBUG + /* this will deadlock if the unlocking rules aren't correctly implemented */ + gpr_mu_lock(&pollset->mu); + gpr_mu_unlock(&pollset->mu); +#endif } static void finish_shutdown(grpc_pollset *pollset) { -- cgit v1.2.3 From abfaf2a5363aefdda8f5b311d1e46833aca06e53 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 16 Jul 2015 17:23:40 -0700 Subject: Expand comment --- src/core/iomgr/pollset_posix.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 3c35fcb89f..9b4a2006a1 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -106,8 +106,11 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd, 1); + /* the following (enabled only in debug) will reacquire and then releast + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ #ifndef NDEBUG - /* this will deadlock if the unlocking rules aren't correctly implemented */ gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); #endif @@ -116,8 +119,11 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->del_fd(pollset, fd, 1); -#ifndef NDEBUG - /* this will deadlock if the unlocking rules aren't correctly implemented */ + /* the following (enabled only in debug) will reacquire and then releast + our lock - meaning that if the unlocking flag passed to del_fd above is + not respected, the code will deadlock (in a way that we have a chance of + debugging) */ +#ifndef NDEBUG gpr_mu_lock(&pollset->mu); gpr_mu_unlock(&pollset->mu); #endif -- cgit v1.2.3 From 232e04b6199090b5805b3e7148fcfce3b8ef9956 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 17 Jul 2015 14:36:30 -0700 Subject: Spelling correction --- src/core/iomgr/pollset_posix.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 9b4a2006a1..f55f8d7d09 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -106,7 +106,7 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd, 1); - /* the following (enabled only in debug) will reacquire and then releast + /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of debugging) */ @@ -119,7 +119,7 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { gpr_mu_lock(&pollset->mu); pollset->vtable->del_fd(pollset, fd, 1); - /* the following (enabled only in debug) will reacquire and then releast + /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of debugging) */ -- cgit v1.2.3