diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/tcp_server.h | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 49 | ||||
-rw-r--r-- | src/core/lib/surface/server.h | 1 |
4 files changed, 33 insertions, 20 deletions
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 99b9f29729..fee14ae661 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -52,6 +52,7 @@ typedef struct grpc_tcp_server_acceptor { /* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *ep, + grpc_pollset *accepting_pollset, grpc_tcp_server_acceptor *acceptor); /* Create a server, initially not bound to any ports. The caller owns one ref. diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 97c945b834..c695621de8 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -362,7 +362,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), - &acceptor); + read_notifier_pollset, &acceptor); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index c9b458faf2..f1a031b715 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -108,6 +108,7 @@ struct channel_data { grpc_server *server; grpc_connectivity_state connectivity_state; grpc_channel *channel; + size_t cq_idx; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; @@ -180,7 +181,8 @@ struct registered_method { char *host; grpc_server_register_method_payload_handling payload_handling; uint32_t flags; - request_matcher request_matcher; + /* one request matcher per method per cq */ + request_matcher *request_matchers; registered_method *next; }; @@ -207,7 +209,8 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; - request_matcher unregistered_request_matcher; + /** one request matcher for unregistered methods per cq */ + request_matcher *unregistered_request_matchers; /** free list of available requested_calls indices */ gpr_stack_lockfree *request_freelist; /** requested call backing data */ @@ -364,15 +367,17 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { gpr_mu_destroy(&server->mu_call); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; - request_matcher_destroy(&rm->request_matcher); + for (i = 0; i < server->cq_count; i++) { + request_matcher_destroy(&rm->request_matchers[i]); + } gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + request_matcher_destroy(&server->unregistered_request_matchers[i]); } - request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); @@ -584,9 +589,10 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !calld->recv_idempotent_request) continue; - finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, - rm->server_registered_method->payload_handling); + finish_start_new_rpc( + exec_ctx, server, elem, + &rm->server_registered_method->request_matchers[chand->cq_idx], + rm->server_registered_method->payload_handling); return; } /* check for a wildcard method definition (no host set) */ @@ -600,14 +606,15 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) && !calld->recv_idempotent_request) continue; - finish_start_new_rpc(exec_ctx, server, elem, - &rm->server_registered_method->request_matcher, - rm->server_registered_method->payload_handling); + finish_start_new_rpc( + exec_ctx, server, elem, + &rm->server_registered_method->request_matchers[chand->cq_idx], + rm->server_registered_method->payload_handling); return; } } finish_start_new_rpc(exec_ctx, server, elem, - &server->unregistered_request_matcher, + &server->unregistered_request_matchers[chand->cq_idx], GRPC_SRM_PAYLOAD_NONE); } @@ -637,14 +644,17 @@ static int num_channels(grpc_server *server) { static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, grpc_server *server) { - registered_method *rm; - request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matcher); - request_matcher_zombify_all_pending_calls( - exec_ctx, &server->unregistered_request_matcher); - for (rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + for (size_t i = 0; i < server->cq_count; i++) { + request_matcher_kill_requests(exec_ctx, server, + &server->unregistered_request_matchers[i]); + request_matcher_zombify_all_pending_calls( + exec_ctx, &server->unregistered_request_matchers[i]); + for (registered_method *rm = server->registered_methods; rm; + rm = rm->next) { + request_matcher_kill_requests(exec_ctx, server, &rm->request_matchers[i]); + request_matcher_zombify_all_pending_calls(exec_ctx, + &rm->request_matchers[i]); + } } } @@ -1039,6 +1049,7 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args) { size_t num_registered_methods; size_t alloc; diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 470ef23c69..fb6e4d60c5 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -53,6 +53,7 @@ void grpc_server_add_listener( server */ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); |