aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-05-16 16:27:51 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-05-16 16:27:51 -0700
commit418a82187ca4905dbbcdd05c3271022a74bda6e6 (patch)
treeaede13ed8aff4dcd23e5a7f2f9c8a6506c6996a8 /src
parentc3b88b079e32b5d8fb0d277116d9dd75a474389f (diff)
Begin sharding request queues per cq
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c11
-rw-r--r--src/core/lib/iomgr/tcp_server.h1
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/lib/surface/server.c49
-rw-r--r--src/core/lib/surface/server.h1
5 files changed, 36 insertions, 28 deletions
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index e21fa2a072..0428bb1e3d 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -43,14 +43,8 @@
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
-static void setup_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_transport *transport) {
- grpc_server_setup_transport(exec_ctx, server, transport,
- grpc_server_get_channel_args(server));
-}
-
static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
- grpc_endpoint *tcp,
+ grpc_endpoint *tcp, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
@@ -61,7 +55,8 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
*/
grpc_transport *transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
- setup_transport(exec_ctx, server, transport);
+ grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset,
+ grpc_server_get_channel_args(server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
}
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 99b9f29729..fee14ae661 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -52,6 +52,7 @@ typedef struct grpc_tcp_server_acceptor {
/* Called for newly connected TCP connections. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep,
+ grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor);
/* Create a server, initially not bound to any ports. The caller owns one ref.
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 97c945b834..c695621de8 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -362,7 +362,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
- &acceptor);
+ read_notifier_pollset, &acceptor);
gpr_free(name);
gpr_free(addr_str);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index c9b458faf2..f1a031b715 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -108,6 +108,7 @@ struct channel_data {
grpc_server *server;
grpc_connectivity_state connectivity_state;
grpc_channel *channel;
+ size_t cq_idx;
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
@@ -180,7 +181,8 @@ struct registered_method {
char *host;
grpc_server_register_method_payload_handling payload_handling;
uint32_t flags;
- request_matcher request_matcher;
+ /* one request matcher per method per cq */
+ request_matcher *request_matchers;
registered_method *next;
};
@@ -207,7 +209,8 @@ struct grpc_server {
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
- request_matcher unregistered_request_matcher;
+ /** one request matcher for unregistered methods per cq */
+ request_matcher *unregistered_request_matchers;
/** free list of available requested_calls indices */
gpr_stack_lockfree *request_freelist;
/** requested call backing data */
@@ -364,15 +367,17 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
gpr_mu_destroy(&server->mu_call);
while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next;
- request_matcher_destroy(&rm->request_matcher);
+ for (i = 0; i < server->cq_count; i++) {
+ request_matcher_destroy(&rm->request_matchers[i]);
+ }
gpr_free(rm->method);
gpr_free(rm->host);
gpr_free(rm);
}
for (i = 0; i < server->cq_count; i++) {
GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
+ request_matcher_destroy(&server->unregistered_request_matchers[i]);
}
- request_matcher_destroy(&server->unregistered_request_matcher);
gpr_stack_lockfree_destroy(server->request_freelist);
gpr_free(server->cqs);
gpr_free(server->pollsets);
@@ -584,9 +589,10 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request)
continue;
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
- rm->server_registered_method->payload_handling);
+ finish_start_new_rpc(
+ exec_ctx, server, elem,
+ &rm->server_registered_method->request_matchers[chand->cq_idx],
+ rm->server_registered_method->payload_handling);
return;
}
/* check for a wildcard method definition (no host set) */
@@ -600,14 +606,15 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request)
continue;
- finish_start_new_rpc(exec_ctx, server, elem,
- &rm->server_registered_method->request_matcher,
- rm->server_registered_method->payload_handling);
+ finish_start_new_rpc(
+ exec_ctx, server, elem,
+ &rm->server_registered_method->request_matchers[chand->cq_idx],
+ rm->server_registered_method->payload_handling);
return;
}
}
finish_start_new_rpc(exec_ctx, server, elem,
- &server->unregistered_request_matcher,
+ &server->unregistered_request_matchers[chand->cq_idx],
GRPC_SRM_PAYLOAD_NONE);
}
@@ -637,14 +644,17 @@ static int num_channels(grpc_server *server) {
static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
grpc_server *server) {
- registered_method *rm;
- request_matcher_kill_requests(exec_ctx, server,
- &server->unregistered_request_matcher);
- request_matcher_zombify_all_pending_calls(
- exec_ctx, &server->unregistered_request_matcher);
- for (rm = server->registered_methods; rm; rm = rm->next) {
- request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
- request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
+ for (size_t i = 0; i < server->cq_count; i++) {
+ request_matcher_kill_requests(exec_ctx, server,
+ &server->unregistered_request_matchers[i]);
+ request_matcher_zombify_all_pending_calls(
+ exec_ctx, &server->unregistered_request_matchers[i]);
+ for (registered_method *rm = server->registered_methods; rm;
+ rm = rm->next) {
+ request_matcher_kill_requests(exec_ctx, server, &rm->request_matchers[i]);
+ request_matcher_zombify_all_pending_calls(exec_ctx,
+ &rm->request_matchers[i]);
+ }
}
}
@@ -1039,6 +1049,7 @@ void grpc_server_start(grpc_server *server) {
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
grpc_transport *transport,
+ grpc_pollset *accepting_pollset,
const grpc_channel_args *args) {
size_t num_registered_methods;
size_t alloc;
diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h
index 470ef23c69..fb6e4d60c5 100644
--- a/src/core/lib/surface/server.h
+++ b/src/core/lib/surface/server.h
@@ -53,6 +53,7 @@ void grpc_server_add_listener(
server */
void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_transport *transport,
+ grpc_pollset *accepting_pollset,
const grpc_channel_args *args);
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server);