diff options
author | Craig Tiller <ctiller@google.com> | 2016-05-24 06:55:28 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-05-24 06:55:28 -0700 |
commit | 48abdde19722a9d24d4f590b9d197db29f498ed1 (patch) | |
tree | 97f138ce898ff271d2ee4606996c446fb3648daa /src/core/lib/surface | |
parent | 77758f5cb9f913a891be231f24eaa8306d5047b3 (diff) | |
parent | 6db0232c3abe800a302d5cc889e4cadf060e221c (diff) |
Merge github.com:grpc/grpc into error
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 18 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 5 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 231 | ||||
-rw-r--r-- | src/core/lib/surface/server.h | 1 |
4 files changed, 172 insertions, 83 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index c1a80a7ba7..ecc127876b 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -70,6 +70,8 @@ struct grpc_completion_queue { int shutdown; int shutdown_called; int is_server_cq; + /** Can the server cq accept incoming channels */ + int is_non_listening_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; @@ -84,6 +86,7 @@ struct grpc_completion_queue { }; #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) +#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) static gpr_mu g_freelist_mu; static grpc_completion_queue *g_freelist; @@ -149,6 +152,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->shutdown = 0; cc->shutdown_called = 0; cc->is_server_cq = 0; + cc->is_non_listening_server_cq = 0; cc->num_pluckers = 0; #ifndef NDEBUG cc->outstanding_tag_count = 0; @@ -545,6 +549,18 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { return POLLSET_FROM_CQ(cc); } +grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { + return CQ_FROM_POLLSET(ps); +} + +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { + cc->is_non_listening_server_cq = 1; +} + +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { + return (cc->is_non_listening_server_cq == 1); +} + void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 9da660ca8b..a866858cb5 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -81,7 +81,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); +grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 9710f9ebd5..f261321750 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -69,11 +69,6 @@ typedef struct call_data call_data; typedef struct channel_data channel_data; typedef struct registered_method registered_method; -typedef struct { - call_data *next; - call_data *prev; -} call_link; - typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { @@ -81,7 +76,6 @@ typedef struct requested_call { void *tag; grpc_server *server; grpc_completion_queue *cq_bound_to_call; - grpc_completion_queue *cq_for_notification; grpc_call **call; grpc_cq_completion completion; grpc_metadata_array *initial_metadata; @@ -108,6 +102,7 @@ struct channel_data { grpc_server *server; grpc_connectivity_state connectivity_state; grpc_channel *channel; + size_t cq_idx; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; @@ -172,7 +167,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_stack_lockfree *requests; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -180,6 +175,7 @@ struct registered_method { char *host; grpc_server_register_method_payload_handling payload_handling; uint32_t flags; + /* one request matcher per method */ request_matcher request_matcher; registered_method *next; }; @@ -195,6 +191,7 @@ struct grpc_server { grpc_completion_queue **cqs; grpc_pollset **pollsets; size_t cq_count; + bool started; /* The two following mutexes control access to server-state mu_global controls access to non-call-related state (e.g., channel state) @@ -207,6 +204,7 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; + /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; /** free list of available requested_calls indices */ gpr_stack_lockfree *request_freelist; @@ -235,7 +233,7 @@ struct grpc_server { static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, grpc_error *error); static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - requested_call *rc, grpc_error *error); + size_t cq_idx, requested_call *rc, grpc_error *error); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server); @@ -315,12 +313,19 @@ static void request_matcher_init(request_matcher *rm, size_t entries, grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; - rm->requests = gpr_stack_lockfree_create(entries); + rm->requests_per_cq = + gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); + for (size_t i = 0; i < server->cq_count; i++) { + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); + } } static void request_matcher_destroy(request_matcher *rm) { - GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1); - gpr_stack_lockfree_destroy(rm->requests); + for (size_t i = 0; i < rm->server->cq_count; i++) { + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); + } + gpr_free(rm->requests_per_cq); } static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, @@ -349,9 +354,12 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, request_matcher *rm, grpc_error *error) { int request_id; - while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - fail_call(exec_ctx, server, &server->requested_calls[request_id], - GRPC_ERROR_REF(error)); + for (size_t i = 0; i < server->cq_count; i++) { + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, &server->requested_calls[request_id], + GRPC_ERROR_REF(error)); + } } GRPC_ERROR_UNREF(error); } @@ -372,15 +380,19 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { gpr_mu_destroy(&server->mu_call); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; - request_matcher_destroy(&rm->request_matcher); + if (server->started) { + request_matcher_destroy(&rm->request_matcher); + } gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); } + if (server->started) { + request_matcher_destroy(&server->unregistered_request_matcher); + } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); } - request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); @@ -461,11 +473,11 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - call_data *calld, requested_call *rc) { + call_data *calld, size_t cq_idx, requested_call *rc) { grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); grpc_call *call = calld->call; *rc->call = call; - calld->cq_new = rc->cq_for_notification; + calld->cq_new = server->cqs[cq_idx]; GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata); switch (rc->type) { case BATCH_CALL: @@ -501,7 +513,9 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - call_data *calld = arg; + grpc_call_element *call_elem = arg; + call_data *calld = call_elem->call_data; + channel_data *chand = call_elem->channel_data; request_matcher *rm = calld->request_matcher; grpc_server *server = rm->server; @@ -516,26 +530,34 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, return; } - int request_id = gpr_stack_lockfree_pop(rm->requests); - if (request_id == -1) { - gpr_mu_lock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - calld->state = PENDING; - gpr_mu_unlock(&calld->mu_state); - if (rm->pending_head == NULL) { - rm->pending_tail = rm->pending_head = calld; + for (size_t i = 0; i < server->cq_count; i++) { + size_t cq_idx = (chand->cq_idx + i) % server->cq_count; + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { + continue; } else { - rm->pending_tail->pending_next = calld; - rm->pending_tail = calld; + gpr_mu_lock(&calld->mu_state); + calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls[request_id]); + return; /* early out */ } - calld->pending_next = NULL; - gpr_mu_unlock(&server->mu_call); + } + + /* no cq to take the request found: queue it on the slow list */ + gpr_mu_lock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); + calld->state = PENDING; + gpr_mu_unlock(&calld->mu_state); + if (rm->pending_head == NULL) { + rm->pending_tail = rm->pending_head = calld; } else { - gpr_mu_lock(&calld->mu_state); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]); + rm->pending_tail->pending_next = calld; + rm->pending_tail = calld; } + calld->pending_next = NULL; + gpr_mu_unlock(&server->mu_call); } static void finish_start_new_rpc( @@ -558,14 +580,14 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(exec_ctx, calld, GRPC_ERROR_NONE); + publish_new_rpc(exec_ctx, elem, GRPC_ERROR_NONE); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; op.data.recv_message = &calld->payload; - grpc_closure_init(&calld->publish, publish_new_rpc, calld); + grpc_closure_init(&calld->publish, publish_new_rpc, elem); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, &calld->publish); break; @@ -647,16 +669,18 @@ static int num_channels(grpc_server *server) { static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_error *error) { - registered_method *rm; - request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matcher, - GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls( - exec_ctx, &server->unregistered_request_matcher); - for (rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, + if (server->started) { + request_matcher_kill_requests(exec_ctx, server, + &server->unregistered_request_matcher, GRPC_ERROR_REF(error)); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + request_matcher_zombify_all_pending_calls( + exec_ctx, &server->unregistered_request_matcher); + for (registered_method *rm = server->registered_methods; rm; + rm = rm->next) { + request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, + GRPC_ERROR_REF(error)); + request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + } } GRPC_ERROR_UNREF(error); } @@ -914,25 +938,46 @@ const grpc_channel_filter grpc_server_top_filter = { "server", }; -void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq, - void *reserved) { +static void register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + bool is_non_listening, void *reserved) { size_t i, n; - GRPC_API_TRACE( - "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, - (server, cq, reserved)); GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } - GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); + + if (is_non_listening) { + grpc_cq_mark_non_listening_server_cq(cq); + } + + GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); server->cqs[n] = cq; } +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, + (server, cq, reserved)); + register_completion_queue(server, cq, false, reserved); +} + +void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *cq, void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " + "reserved=%p)", + 3, (server, cq, reserved)); + register_completion_queue(server, cq, true, reserved); +} + grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { size_t i; @@ -959,8 +1004,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { for (i = 0; i < (size_t)server->max_requested_calls; i++) { gpr_stack_lockfree_push(server->request_freelist, (int)i); } - request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls, server); server->requested_calls = gpr_malloc(server->max_requested_calls * sizeof(*server->requested_calls)); @@ -1004,8 +1047,6 @@ void *grpc_server_register_method( } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); - request_matcher_init(&m->request_matcher, server->max_requested_calls, - server); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -1022,13 +1063,23 @@ void grpc_server_start(grpc_server *server) { GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); + server->started = true; + size_t pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); for (i = 0; i < server->cq_count; i++) { - server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); + if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { + server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); + } + } + request_matcher_init(&server->unregistered_request_matcher, + server->max_requested_calls, server); + for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { + request_matcher_init(&rm->request_matcher, server->max_requested_calls, + server); } for (l = server->listeners; l; l = l->next) { - l->start(&exec_ctx, server, l->arg, server->pollsets, server->cq_count); + l->start(&exec_ctx, server, l->arg, server->pollsets, pollset_count); } grpc_exec_ctx_finish(&exec_ctx); @@ -1036,8 +1087,8 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args) { - size_t i; size_t num_registered_methods; size_t alloc; registered_method *rm; @@ -1052,12 +1103,6 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, uint32_t max_probes = 0; grpc_transport_op op; - for (i = 0; i < s->cq_count; i++) { - memset(&op, 0, sizeof(op)); - op.bind_pollset = grpc_cq_pollset(s->cqs[i]); - grpc_transport_perform_op(exec_ctx, transport, &op); - } - channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); chand = (channel_data *)grpc_channel_stack_element( @@ -1067,6 +1112,17 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, server_ref(s); chand->channel = channel; + size_t cq_idx; + grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); + for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { + if (s->cqs[cq_idx] == accepting_cq) break; + } + if (cq_idx == s->cq_count) { + /* completion queue not found: pick a random one to publish new calls to */ + cq_idx = (size_t)rand() % s->cq_count; + } + chand->cq_idx = cq_idx; + num_registered_methods = 0; for (rm = s->registered_methods; rm; rm = rm->next) { num_registered_methods++; @@ -1241,19 +1297,21 @@ void grpc_server_add_listener( } static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, - grpc_server *server, + grpc_server *server, size_t cq_idx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(exec_ctx, server, rc, GRPC_ERROR_CREATE("Server Shutdown")); + fail_call(exec_ctx, server, cq_idx, rc, + GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, rc, GRPC_ERROR_CREATE("Server Shutdown")); + fail_call(exec_ctx, server, cq_idx, rc, + GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } switch (rc->type) { @@ -1266,12 +1324,12 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, } server->requested_calls[request_id] = *rc; gpr_free(rc); - if (gpr_stack_lockfree_push(rm->requests, request_id)) { + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - request_id = gpr_stack_lockfree_pop(rm->requests); + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); @@ -1287,7 +1345,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, + publish_call(exec_ctx, server, calld, cq_idx, &server->requested_calls[request_id]); } gpr_mu_lock(&server->mu_call); @@ -1311,7 +1369,13 @@ grpc_call_error grpc_server_request_call( "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)", 7, (server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag)); - if (!grpc_cq_is_server_cq(cq_for_notification)) { + size_t cq_idx; + for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { + if (server->cqs[cq_idx] == cq_for_notification) { + break; + } + } + if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; @@ -1322,11 +1386,10 @@ grpc_call_error grpc_server_request_call( rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; - rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.batch.details = details; rc->initial_metadata = initial_metadata; - error = queue_call_request(&exec_ctx, server, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: grpc_exec_ctx_finish(&exec_ctx); return error; @@ -1348,7 +1411,14 @@ grpc_call_error grpc_server_request_registered_call( "tag=%p)", 9, (server, rmp, call, deadline, initial_metadata, optional_payload, cq_bound_to_call, cq_for_notification, tag)); - if (!grpc_cq_is_server_cq(cq_for_notification)) { + + size_t cq_idx; + for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { + if (server->cqs[cq_idx] == cq_for_notification) { + break; + } + } + if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; @@ -1364,26 +1434,25 @@ grpc_call_error grpc_server_request_registered_call( rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; - rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.registered.registered_method = rm; rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - error = queue_call_request(&exec_ctx, server, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: grpc_exec_ctx_finish(&exec_ctx); return error; } static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - requested_call *rc, grpc_error *error) { + size_t cq_idx, requested_call *rc, grpc_error *error) { *rc->call = NULL; rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); server_ref(server); - grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, error, + grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 470ef23c69..fb6e4d60c5 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -53,6 +53,7 @@ void grpc_server_add_listener( server */ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); |