aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/server.c')
-rw-r--r--src/core/lib/surface/server.c86
1 files changed, 60 insertions, 26 deletions
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index f1a031b715..d1fb3fc383 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -81,7 +81,6 @@ typedef struct requested_call {
void *tag;
grpc_server *server;
grpc_completion_queue *cq_bound_to_call;
- grpc_completion_queue *cq_for_notification;
grpc_call **call;
grpc_cq_completion completion;
grpc_metadata_array *initial_metadata;
@@ -171,6 +170,7 @@ struct call_data {
struct request_matcher {
grpc_server *server;
+ size_t cq_idx;
call_data *pending_head;
call_data *pending_tail;
gpr_stack_lockfree *requests;
@@ -237,7 +237,7 @@ struct grpc_server {
static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success);
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- requested_call *rc);
+ size_t cq_idx, requested_call *rc);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server);
@@ -312,9 +312,10 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
*/
static void request_matcher_init(request_matcher *rm, size_t entries,
- grpc_server *server) {
+ size_t cq_idx, grpc_server *server) {
memset(rm, 0, sizeof(*rm));
rm->server = server;
+ rm->cq_idx = cq_idx;
rm->requests = gpr_stack_lockfree_create(entries);
}
@@ -347,7 +348,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
request_matcher *rm) {
int request_id;
while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
- fail_call(exec_ctx, server, &server->requested_calls[request_id]);
+ fail_call(exec_ctx, server, rm->cq_idx,
+ &server->requested_calls[request_id]);
}
}
@@ -458,11 +460,11 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
}
static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- call_data *calld, requested_call *rc) {
+ call_data *calld, size_t cq_idx, requested_call *rc) {
grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
grpc_call *call = calld->call;
*rc->call = call;
- calld->cq_new = rc->cq_for_notification;
+ calld->cq_new = server->cqs[cq_idx];
GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
switch (rc->type) {
case BATCH_CALL:
@@ -530,7 +532,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
gpr_mu_lock(&calld->mu_state);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
+ publish_call(exec_ctx, server, calld, rm->cq_idx,
+ &server->requested_calls[request_id]);
}
}
@@ -972,8 +975,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
for (i = 0; i < (size_t)server->max_requested_calls; i++) {
gpr_stack_lockfree_push(server->request_freelist, (int)i);
}
- request_matcher_init(&server->unregistered_request_matcher,
- server->max_requested_calls, server);
server->requested_calls = gpr_malloc(server->max_requested_calls *
sizeof(*server->requested_calls));
@@ -1017,8 +1018,6 @@ void *grpc_server_register_method(
}
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
- request_matcher_init(&m->request_matcher, server->max_requested_calls,
- server);
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
@@ -1036,8 +1035,21 @@ void grpc_server_start(grpc_server *server) {
GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
+ server->unregistered_request_matchers = gpr_malloc(
+ sizeof(*server->unregistered_request_matchers) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
+ request_matcher_init(&server->unregistered_request_matchers[i],
+ server->max_requested_calls, i, server);
+ for (registered_method *rm = server->registered_methods; rm;
+ rm = rm->next) {
+ if (i == 0) {
+ rm->request_matchers =
+ gpr_malloc(sizeof(*rm->request_matchers) * server->cq_count);
+ }
+ request_matcher_init(&rm->request_matchers[i],
+ server->max_requested_calls, i, server);
+ }
}
for (l = server->listeners; l; l = l->next) {
@@ -1074,6 +1086,17 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
server_ref(s);
chand->channel = channel;
+ size_t cq_idx;
+ grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset);
+ for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
+ if (s->cqs[cq_idx] == accepting_cq) break;
+ }
+ if (cq_idx == s->cq_count) {
+ /* completion queue not found: pick a random one to publish new calls to */
+ cq_idx = (size_t)rand() % s->cq_count;
+ }
+ chand->cq_idx = cq_idx;
+
num_registered_methods = 0;
for (rm = s->registered_methods; rm; rm = rm->next) {
num_registered_methods++;
@@ -1244,27 +1267,27 @@ void grpc_server_add_listener(
}
static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
- grpc_server *server,
+ grpc_server *server, size_t cq_idx,
requested_call *rc) {
call_data *calld = NULL;
request_matcher *rm = NULL;
int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) {
- fail_call(exec_ctx, server, rc);
+ fail_call(exec_ctx, server, cq_idx, rc);
return GRPC_CALL_OK;
}
request_id = gpr_stack_lockfree_pop(server->request_freelist);
if (request_id == -1) {
/* out of request ids: just fail this one */
- fail_call(exec_ctx, server, rc);
+ fail_call(exec_ctx, server, cq_idx, rc);
return GRPC_CALL_OK;
}
switch (rc->type) {
case BATCH_CALL:
- rm = &server->unregistered_request_matcher;
+ rm = &server->unregistered_request_matchers[cq_idx];
break;
case REGISTERED_CALL:
- rm = &rc->data.registered.registered_method->request_matcher;
+ rm = &rc->data.registered.registered_method->request_matchers[cq_idx];
break;
}
server->requested_calls[request_id] = *rc;
@@ -1290,7 +1313,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
gpr_mu_unlock(&calld->mu_state);
- publish_call(exec_ctx, server, calld,
+ publish_call(exec_ctx, server, calld, cq_idx,
&server->requested_calls[request_id]);
}
gpr_mu_lock(&server->mu_call);
@@ -1314,7 +1337,13 @@ grpc_call_error grpc_server_request_call(
"cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
7, (server, call, details, initial_metadata, cq_bound_to_call,
cq_for_notification, tag));
- if (!grpc_cq_is_server_cq(cq_for_notification)) {
+ size_t cq_idx;
+ for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+ if (server->cqs[cq_idx] == cq_for_notification) {
+ break;
+ }
+ }
+ if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
@@ -1325,11 +1354,10 @@ grpc_call_error grpc_server_request_call(
rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
- rc->cq_for_notification = cq_for_notification;
rc->call = call;
rc->data.batch.details = details;
rc->initial_metadata = initial_metadata;
- error = queue_call_request(&exec_ctx, server, rc);
+ error = queue_call_request(&exec_ctx, server, cq_idx, rc);
done:
grpc_exec_ctx_finish(&exec_ctx);
return error;
@@ -1351,7 +1379,14 @@ grpc_call_error grpc_server_request_registered_call(
"tag=%p)",
9, (server, rmp, call, deadline, initial_metadata, optional_payload,
cq_bound_to_call, cq_for_notification, tag));
- if (!grpc_cq_is_server_cq(cq_for_notification)) {
+
+ size_t cq_idx;
+ for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
+ if (server->cqs[cq_idx] == cq_for_notification) {
+ break;
+ }
+ }
+ if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
@@ -1367,26 +1402,25 @@ grpc_call_error grpc_server_request_registered_call(
rc->server = server;
rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call;
- rc->cq_for_notification = cq_for_notification;
rc->call = call;
rc->data.registered.registered_method = rm;
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
- error = queue_call_request(&exec_ctx, server, rc);
+ error = queue_call_request(&exec_ctx, server, cq_idx, rc);
done:
grpc_exec_ctx_finish(&exec_ctx);
return error;
}
static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
- requested_call *rc) {
+ size_t cq_idx, requested_call *rc) {
*rc->call = NULL;
rc->initial_metadata->count = 0;
server_ref(server);
- grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0,
- done_request_event, rc, &rc->completion);
+ grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, 0, done_request_event,
+ rc, &rc->completion);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {