From 418a82187ca4905dbbcdd05c3271022a74bda6e6 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 16 May 2016 16:27:51 -0700 Subject: Begin sharding request queues per cq --- .../chttp2/server/insecure/server_chttp2.c | 11 ++--- src/core/lib/iomgr/tcp_server.h | 1 + src/core/lib/iomgr/tcp_server_posix.c | 2 +- src/core/lib/surface/server.c | 49 +++++++++++++--------- src/core/lib/surface/server.h | 1 + third_party/protobuf | 2 +- 6 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index e21fa2a072..0428bb1e3d 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -43,14 +43,8 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -static void setup_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_transport *transport) { - grpc_server_setup_transport(exec_ctx, server, transport, - grpc_server_get_channel_args(server)); -} - static void new_transport(grpc_exec_ctx *exec_ctx, void *server, - grpc_endpoint *tcp, + grpc_endpoint *tcp, grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor) { /* * Beware that the call to grpc_create_chttp2_transport() has to happen before @@ -61,7 +55,8 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server, */ grpc_transport *transport = grpc_create_chttp2_transport( exec_ctx, grpc_server_get_channel_args(server), tcp, 0); - setup_transport(exec_ctx, server, transport); + grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset, + grpc_server_get_channel_args(server)); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); } diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 99b9f29729..fee14ae661 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -52,6 +52,7 @@ typedef struct grpc_tcp_server_acceptor { /* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *ep, + grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor); /* Create a server, initially not bound to any ports. The caller owns one ref. diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 97c945b834..c695621de8 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -362,7 +362,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), - &acceptor); + read_notifier_pollset, &acceptor); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index c9b458faf2..f1a031b715 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -108,6 +108,7 @@ struct channel_data { grpc_server *server; grpc_connectivity_state connectivity_state; grpc_channel *channel; + size_t cq_idx; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; @@ -180,7 +181,8 @@ struct registered_method { char *host; grpc_server_register_method_payload_handling payload_handling; uint32_t flags; - request_matcher request_matcher; + /* one request matcher per method per cq */ + request_matcher *request_matchers; registered_method *next; }; @@ -207,7 +209,8 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; - request_matcher unregistered_request_matcher; + /** one request matcher for unregistered methods per cq */ + request_matcher *unregistered_request_matchers; /** free list of available requested_calls indices */ gpr_stack_lockfree *request_freelist; /** requested call backing data */ @@ -364,15 +367,17 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { gpr_mu_destroy(&server->mu_call); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; - request_matcher_destroy(&rm->request_matcher); + for (i = 0; i < server->cq_count; i++) { + request_matcher_destroy(&rm->request_matchers[i]); + } gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + request_matcher_destroy(&server->unregistered_request_matchers[i]); } - request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); @@ -584,9 +589,10 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !calld->recv_idempotent_request) continue; - finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, - rm->server_registered_method->payload_handling); + finish_start_new_rpc( + exec_ctx, server, elem, + &rm->server_registered_method->request_matchers[chand->cq_idx], + rm->server_registered_method->payload_handling); return; } /* check for a wildcard method definition (no host set) */ @@ -600,14 +606,15 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !calld->recv_idempotent_request) continue; - finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, - rm->server_registered_method->payload_handling); + finish_start_new_rpc( + exec_ctx, server, elem, + &rm->server_registered_method->request_matchers[chand->cq_idx], + rm->server_registered_method->payload_handling); return; } } finish_start_new_rpc(exec_ctx, server, elem, - &server->unregistered_request_matcher, + &server->unregistered_request_matchers[chand->cq_idx], GRPC_SRM_PAYLOAD_NONE); } @@ -637,14 +644,17 @@ static int num_channels(grpc_server *server) { static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, grpc_server *server) { - registered_method *rm; - request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matcher); - request_matcher_zombify_all_pending_calls( - exec_ctx, &server->unregistered_request_matcher); - for (rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + for (size_t i = 0; i < server->cq_count; i++) { + request_matcher_kill_requests(exec_ctx, server, + &server->unregistered_request_matchers[i]); + request_matcher_zombify_all_pending_calls( + exec_ctx, &server->unregistered_request_matchers[i]); + for (registered_method *rm = server->registered_methods; rm; + rm = rm->next) { + request_matcher_kill_requests(exec_ctx, server, &rm->request_matchers[i]); + request_matcher_zombify_all_pending_calls(exec_ctx, + &rm->request_matchers[i]); + } } } @@ -1039,6 +1049,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args) { size_t num_registered_methods; size_t alloc; diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 470ef23c69..fb6e4d60c5 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -53,6 +53,7 @@ void grpc_server_add_listener( server */ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); diff --git a/third_party/protobuf b/third_party/protobuf index a1938b2aa9..d5fb408ddc 160000 --- a/third_party/protobuf +++ b/third_party/protobuf @@ -1 +1 @@ -Subproject commit a1938b2aa9ca86ce7ce50c27ff9737c1008d2a03 +Subproject commit d5fb408ddc281ffcadeb08699e65bb694656d0bd -- cgit v1.2.3