diff options
author | Craig Tiller <ctiller@google.com> | 2017-05-16 13:41:04 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-16 13:41:04 -0700 |
commit | bcb65f0bd533ff0c7babf3b06a424d2bd5f1d37d (patch) | |
tree | a1981594da42832700e23bca6c0c74e743d3df64 /src | |
parent | 82713635e15011beea6b592887f03c512969c7e8 (diff) | |
parent | 3cf000fb2fa20ea6d58c4c02ea5f31dd86dacba9 (diff) |
Merge pull request #10897 from ctiller/serve_fries
Switch default CQ count for thread manager to num_cpus
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/support/mpscq.c | 25 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.h | 27 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 112 |
3 files changed, 78 insertions, 86 deletions
diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 1015cc6776..822abd075d 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -46,11 +46,12 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { GPR_ASSERT(q->tail == &q->stub); } -void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { +bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); + return prev == &q->stub; } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { @@ -92,3 +93,25 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { *empty = false; return NULL; } + +void gpr_locked_mpscq_init(gpr_locked_mpscq *q) { + gpr_mpscq_init(&q->queue); + q->read_lock = GPR_SPINLOCK_INITIALIZER; +} + +void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) { + gpr_mpscq_destroy(&q->queue); +} + +bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) { + return gpr_mpscq_push(&q->queue, n); +} + +gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) { + if (gpr_spinlock_trylock(&q->read_lock)) { + gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue); + gpr_spinlock_unlock(&q->read_lock); + return n; + } + return NULL; +} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 24c89f90c9..b3a171678a 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -37,6 +37,7 @@ #include <grpc/support/atm.h> #include <stdbool.h> #include <stddef.h> +#include "src/core/lib/support/spinlock.h" // Multiple-producer single-consumer lock free queue, based upon the // implementation from Dmitry Vyukov here: @@ -58,12 +59,34 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q); // Push a node -void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +// Thread safe - can be called from multiple threads concurrently +// Returns true if this was possibly the first node (may return true +// sporadically, will not return false sporadically) +bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) +// Thread compatible - can only be called from one thread at a time gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); - // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); +// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing +// only one thread will succeed concurrently +typedef struct gpr_locked_mpscq { + gpr_mpscq queue; + gpr_spinlock read_lock; +} gpr_locked_mpscq; + +void gpr_locked_mpscq_init(gpr_locked_mpscq *q); +void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q); +// Push a node +// Thread safe - can be called from multiple threads concurrently +// Returns true if this was possibly the first node (may return true +// sporadically, will not return false sporadically) +bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n); +// Pop a node (returns NULL if no node is ready - which doesn't indicate that +// the queue is empty!!) +// Thread safe - can be called from multiple threads concurrently +gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q); + #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 560229e892..7e4ae421a0 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -47,7 +47,8 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/stack_lockfree.h" +#include "src/core/lib/support/mpscq.h" +#include "src/core/lib/support/spinlock.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -76,6 +77,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { + gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -175,7 +177,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_stack_lockfree **requests_per_cq; + gpr_locked_mpscq *requests_per_cq; }; struct registered_method { @@ -220,11 +222,6 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls_per_cq indices */ - gpr_stack_lockfree **request_freelist_per_cq; - /** requested call backing data */ - requested_call **requested_calls_per_cq; - int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -324,21 +321,20 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, size_t entries, - grpc_server *server) { +static void request_matcher_init(request_matcher *rm, grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); + gpr_locked_mpscq_init(&rm->requests_per_cq[i]); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); - gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); + GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); + gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -368,13 +364,17 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - int request_id; + requested_call *rc; for (size_t i = 0; i < server->cq_count; i++) { - while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != - -1) { - fail_call(exec_ctx, server, i, - &server->requested_calls_per_cq[i][request_id], - GRPC_ERROR_REF(error)); + /* Here we know: + 1. no requests are being added (since the server is shut down) + 2. no other threads are pulling (since the shut down process is single + threaded) + So, we can ignore the queue lock and just pop, with the guarantee that a + NULL returned here truly means that the queue is empty */ + while ((rc = (requested_call *)gpr_mpscq_pop( + &rm->requests_per_cq[i].queue)) != NULL) { + fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -409,13 +409,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); - if (server->started) { - gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); - gpr_free(server->requested_calls_per_cq[i]); - } } - gpr_free(server->request_freelist_per_cq); - gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -473,21 +467,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - requested_call *rc = req; - grpc_server *server = rc->server; - - if (rc >= server->requested_calls_per_cq[rc->cq_idx] && - rc < server->requested_calls_per_cq[rc->cq_idx] + - server->max_requested_calls_per_cq) { - GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); - gpr_stack_lockfree_push( - server->request_freelist_per_cq[rc->cq_idx], - (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); - } else { - gpr_free(req); - } - - server_unref(exec_ctx, server); + gpr_free(req); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -516,10 +496,6 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } - grpc_call_element *elem = - grpc_call_stack_element(grpc_call_get_call_stack(call), 0); - channel_data *chand = elem->channel_data; - server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -547,15 +523,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) { + requested_call *rc = + (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + publish_call(exec_ctx, server, calld, cq_idx, rc); return; /* early out */ } } @@ -1029,8 +1005,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; - /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1103,29 +1077,15 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); - server->request_freelist_per_cq = - gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); - server->requested_calls_per_cq = - gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } - server->request_freelist_per_cq[i] = - gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); - for (int j = 0; j < server->max_requested_calls_per_cq; j++) { - gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); - } - server->requested_calls_per_cq[i] = - gpr_malloc((size_t)server->max_requested_calls_per_cq * - sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&server->unregistered_request_matcher, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, - (size_t)server->max_requested_calls_per_cq, server); + request_matcher_init(&rm->request_matcher, server); } server_ref(server); @@ -1379,21 +1339,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; - int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); - if (request_id == -1) { - /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, cq_idx, rc, - grpc_error_set_int( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), - GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); - return GRPC_CALL_OK; - } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1402,15 +1352,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - server->requested_calls_per_cq[cq_idx][request_id] = *rc; - gpr_free(rc); - if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { + if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); - if (request_id == -1) break; + rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); + if (rc == NULL) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1426,8 +1374,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls_per_cq[cq_idx][request_id]); + publish_call(exec_ctx, server, calld, cq_idx, rc); } gpr_mu_lock(&server->mu_call); } @@ -1534,7 +1481,6 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); - server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } |