From 9f9d4223fbb0cc93b95c5c1bd379c8b848936b7d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 16 May 2016 17:02:14 -0700 Subject: Further server cq affinity work --- .../chttp2/server/secure/server_secure_chttp2.c | 74 ++++++++++--------- src/core/lib/surface/completion_queue.h | 1 + src/core/lib/surface/server.c | 86 +++++++++++++++------- 3 files changed, 99 insertions(+), 62 deletions(-) diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 698b2bef61..26b0f00e9e 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -52,7 +52,7 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct grpc_server_secure_state { +typedef struct server_secure_state { grpc_server *server; grpc_tcp_server *tcp; grpc_server_security_connector *sc; @@ -62,13 +62,16 @@ typedef struct grpc_server_secure_state { gpr_refcount refcount; grpc_closure destroy_closure; grpc_closure *destroy_callback; -} grpc_server_secure_state; +} server_secure_state; -static void state_ref(grpc_server_secure_state *state) { - gpr_ref(&state->refcount); -} +typedef struct server_secure_connect { + server_secure_state *state; + grpc_pollset *accepting_pollset; +} server_secure_connect; + +static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); } -static void state_unref(grpc_server_secure_state *state) { +static void state_unref(server_secure_state *state) { if (gpr_unref(&state->refcount)) { /* ensure all threads have unlocked */ gpr_mu_lock(&state->mu); @@ -80,67 +83,66 @@ static void state_unref(grpc_server_secure_state *state) { } } -static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep, - grpc_transport *transport, - grpc_auth_context *auth_context) { - grpc_server_secure_state *state = statep; - grpc_channel_args *args_copy; - grpc_arg args_to_add[2]; - args_to_add[0] = grpc_server_credentials_to_arg(state->creds); - args_to_add[1] = grpc_auth_context_to_arg(auth_context); - args_copy = grpc_channel_args_copy_and_add( - grpc_server_get_channel_args(state->server), args_to_add, - GPR_ARRAY_SIZE(args_to_add)); - grpc_server_setup_transport(exec_ctx, state->server, transport, args_copy); - grpc_channel_args_destroy(args_copy); -} - static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep, grpc_security_status status, grpc_endpoint *secure_endpoint, grpc_auth_context *auth_context) { - grpc_server_secure_state *state = statep; + server_secure_connect *state = statep; grpc_transport *transport; if (status == GRPC_SECURITY_OK) { if (secure_endpoint) { - gpr_mu_lock(&state->mu); - if (!state->is_shutdown) { + gpr_mu_lock(&state->state->mu); + if (!state->state->is_shutdown) { transport = grpc_create_chttp2_transport( - exec_ctx, grpc_server_get_channel_args(state->server), + exec_ctx, grpc_server_get_channel_args(state->state->server), secure_endpoint, 0); - setup_transport(exec_ctx, state, transport, auth_context); + grpc_channel_args *args_copy; + grpc_arg args_to_add[2]; + args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds); + args_to_add[1] = grpc_auth_context_to_arg(auth_context); + args_copy = grpc_channel_args_copy_and_add( + grpc_server_get_channel_args(state->state->server), args_to_add, + GPR_ARRAY_SIZE(args_to_add)); + grpc_server_setup_transport(exec_ctx, state->state->server, transport, + state->accepting_pollset, args_copy); + grpc_channel_args_destroy(args_copy); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); } else { /* We need to consume this here, because the server may already have * gone away. */ grpc_endpoint_destroy(exec_ctx, secure_endpoint); } - gpr_mu_unlock(&state->mu); + gpr_mu_unlock(&state->state->mu); } } else { gpr_log(GPR_ERROR, "Secure transport failed with error %d", status); } - state_unref(state); + state_unref(state->state); + gpr_free(state); } static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, + grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { - grpc_server_secure_state *state = statep; - state_ref(state); - grpc_server_security_connector_do_handshake( - exec_ctx, state->sc, acceptor, tcp, on_secure_handshake_done, state); + server_secure_connect *state = gpr_malloc(sizeof(*state)); + state->state = statep; + state_ref(state->state); + state->accepting_pollset = accepting_pollset; + grpc_server_security_connector_do_handshake(exec_ctx, state->state->sc, + acceptor, tcp, + on_secure_handshake_done, state); } /* Server callback: start listening on our ports */ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, grpc_pollset **pollsets, size_t pollset_count) { - grpc_server_secure_state *state = statep; + server_secure_state *state = statep; grpc_tcp_server_start(exec_ctx, state->tcp, pollsets, pollset_count, on_accept, state); } static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { - grpc_server_secure_state *state = statep; + server_secure_state *state = statep; if (state->destroy_callback != NULL) { state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg, success); @@ -153,7 +155,7 @@ static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) { callbacks) */ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep, grpc_closure *callback) { - grpc_server_secure_state *state = statep; + server_secure_state *state = statep; grpc_tcp_server *tcp; gpr_mu_lock(&state->mu); state->is_shutdown = 1; @@ -167,7 +169,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, grpc_server_credentials *creds) { grpc_resolved_addresses *resolved = NULL; grpc_tcp_server *tcp = NULL; - grpc_server_secure_state *state = NULL; + server_secure_state *state = NULL; size_t i; unsigned count = 0; int port_num = -1; diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 1528ca4ad8..3d0dd13c53 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -81,6 +81,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); +grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index f1a031b715..d1fb3fc383 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -81,7 +81,6 @@ typedef struct requested_call { void *tag; grpc_server *server; grpc_completion_queue *cq_bound_to_call; - grpc_completion_queue *cq_for_notification; grpc_call **call; grpc_cq_completion completion; grpc_metadata_array *initial_metadata; @@ -171,6 +170,7 @@ struct call_data { struct request_matcher { grpc_server *server; + size_t cq_idx; call_data *pending_head; call_data *pending_tail; gpr_stack_lockfree *requests; @@ -237,7 +237,7 @@ struct grpc_server { static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success); static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - requested_call *rc); + size_t cq_idx, requested_call *rc); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server); @@ -312,9 +312,10 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, */ static void request_matcher_init(request_matcher *rm, size_t entries, - grpc_server *server) { + size_t cq_idx, grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; + rm->cq_idx = cq_idx; rm->requests = gpr_stack_lockfree_create(entries); } @@ -347,7 +348,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, request_matcher *rm) { int request_id; while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - fail_call(exec_ctx, server, &server->requested_calls[request_id]); + fail_call(exec_ctx, server, rm->cq_idx, + &server->requested_calls[request_id]); } } @@ -458,11 +460,11 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - call_data *calld, requested_call *rc) { + call_data *calld, size_t cq_idx, requested_call *rc) { grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); grpc_call *call = calld->call; *rc->call = call; - calld->cq_new = rc->cq_for_notification; + calld->cq_new = server->cqs[cq_idx]; GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata); switch (rc->type) { case BATCH_CALL: @@ -530,7 +532,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]); + publish_call(exec_ctx, server, calld, rm->cq_idx, + &server->requested_calls[request_id]); } } @@ -972,8 +975,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { for (i = 0; i < (size_t)server->max_requested_calls; i++) { gpr_stack_lockfree_push(server->request_freelist, (int)i); } - request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls, server); server->requested_calls = gpr_malloc(server->max_requested_calls * sizeof(*server->requested_calls)); @@ -1017,8 +1018,6 @@ void *grpc_server_register_method( } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); - request_matcher_init(&m->request_matcher, server->max_requested_calls, - server); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -1036,8 +1035,21 @@ void grpc_server_start(grpc_server *server) { GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->unregistered_request_matchers = gpr_malloc( + sizeof(*server->unregistered_request_matchers) * server->cq_count); for (i = 0; i < server->cq_count; i++) { server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); + request_matcher_init(&server->unregistered_request_matchers[i], + server->max_requested_calls, i, server); + for (registered_method *rm = server->registered_methods; rm; + rm = rm->next) { + if (i == 0) { + rm->request_matchers = + gpr_malloc(sizeof(*rm->request_matchers) * server->cq_count); + } + request_matcher_init(&rm->request_matchers[i], + server->max_requested_calls, i, server); + } } for (l = server->listeners; l; l = l->next) { @@ -1074,6 +1086,17 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, server_ref(s); chand->channel = channel; + size_t cq_idx; + grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); + for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { + if (s->cqs[cq_idx] == accepting_cq) break; + } + if (cq_idx == s->cq_count) { + /* completion queue not found: pick a random one to publish new calls to */ + cq_idx = (size_t)rand() % s->cq_count; + } + chand->cq_idx = cq_idx; + num_registered_methods = 0; for (rm = s->registered_methods; rm; rm = rm->next) { num_registered_methods++; @@ -1244,27 +1267,27 @@ void grpc_server_add_listener( } static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, - grpc_server *server, + grpc_server *server, size_t cq_idx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(exec_ctx, server, rc); + fail_call(exec_ctx, server, cq_idx, rc); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, rc); + fail_call(exec_ctx, server, cq_idx, rc); return GRPC_CALL_OK; } switch (rc->type) { case BATCH_CALL: - rm = &server->unregistered_request_matcher; + rm = &server->unregistered_request_matchers[cq_idx]; break; case REGISTERED_CALL: - rm = &rc->data.registered.registered_method->request_matcher; + rm = &rc->data.registered.registered_method->request_matchers[cq_idx]; break; } server->requested_calls[request_id] = *rc; @@ -1290,7 +1313,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, + publish_call(exec_ctx, server, calld, cq_idx, &server->requested_calls[request_id]); } gpr_mu_lock(&server->mu_call); @@ -1314,7 +1337,13 @@ grpc_call_error grpc_server_request_call( "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)", 7, (server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag)); - if (!grpc_cq_is_server_cq(cq_for_notification)) { + size_t cq_idx; + for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { + if (server->cqs[cq_idx] == cq_for_notification) { + break; + } + } + if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; @@ -1325,11 +1354,10 @@ grpc_call_error grpc_server_request_call( rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; - rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.batch.details = details; rc->initial_metadata = initial_metadata; - error = queue_call_request(&exec_ctx, server, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: grpc_exec_ctx_finish(&exec_ctx); return error; @@ -1351,7 +1379,14 @@ grpc_call_error grpc_server_request_registered_call( "tag=%p)", 9, (server, rmp, call, deadline, initial_metadata, optional_payload, cq_bound_to_call, cq_for_notification, tag)); - if (!grpc_cq_is_server_cq(cq_for_notification)) { + + size_t cq_idx; + for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { + if (server->cqs[cq_idx] == cq_for_notification) { + break; + } + } + if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; @@ -1367,26 +1402,25 @@ grpc_call_error grpc_server_request_registered_call( rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; - rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.registered.registered_method = rm; rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - error = queue_call_request(&exec_ctx, server, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: grpc_exec_ctx_finish(&exec_ctx); return error; } static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - requested_call *rc) { + size_t cq_idx, requested_call *rc) { *rc->call = NULL; rc->initial_metadata->count = 0; server_ref(server); - grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0, - done_request_event, rc, &rc->completion); + grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, 0, done_request_event, + rc, &rc->completion); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { -- cgit v1.2.3