diff options
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 33 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 18 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 5 | ||||
-rw-r--r-- | src/core/lib/surface/init_secure.c | 8 | ||||
-rw-r--r-- | src/core/lib/surface/lame_client.c | 5 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 231 | ||||
-rw-r--r-- | src/core/lib/surface/server.h | 1 |
7 files changed, 182 insertions, 119 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 69eb43c952..6fc91cb7ea 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -68,12 +68,6 @@ - status/close recv (depending on client/server) */ #define MAX_CONCURRENT_BATCHES 6 -typedef struct { - grpc_ioreq_completion_func on_complete; - void *user_data; - int success; -} completed_request; - #define MAX_SEND_EXTRA_METADATA_COUNT 3 /* Status data for a request can come from several sources; this @@ -100,25 +94,6 @@ typedef struct { grpc_mdstr *details; } received_status; -/* How far through the GRPC stream have we read? */ -typedef enum { - /* We are still waiting for initial metadata to complete */ - READ_STATE_INITIAL = 0, - /* We have gotten initial metadata, and are reading either - messages or trailing metadata */ - READ_STATE_GOT_INITIAL_METADATA, - /* The stream is closed for reading */ - READ_STATE_READ_CLOSED, - /* The stream is closed for reading & writing */ - READ_STATE_STREAM_CLOSED -} read_state; - -typedef enum { - WRITE_STATE_INITIAL = 0, - WRITE_STATE_STARTED, - WRITE_STATE_WRITE_CLOSED -} write_state; - typedef struct batch_control { grpc_call *call; grpc_cq_completion cq_completion; @@ -179,7 +154,7 @@ struct grpc_call { received_status status[STATUS_SOURCE_COUNT]; /* Call stats: only valid after trailing metadata received */ - grpc_transport_stream_stats stats; + grpc_call_stats stats; /* Compression algorithm for *incoming* data */ grpc_compression_algorithm incoming_compression_algorithm; @@ -399,7 +374,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } grpc_channel *channel = c->channel; - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c); + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->stats, c); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); GPR_TIMER_END("destroy_call", 0); } @@ -1591,7 +1566,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = &call->stats; + stream_op.collect_stats = &call->stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ @@ -1613,7 +1588,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = &call->stats; + stream_op.collect_stats = &call->stats.transport_stream_stats; break; } } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 1f82c3bad2..5eb7cf1bf4 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -70,6 +70,8 @@ struct grpc_completion_queue { int shutdown; int shutdown_called; int is_server_cq; + /** Can the server cq accept incoming channels */ + int is_non_listening_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; grpc_closure pollset_shutdown_done; @@ -84,6 +86,7 @@ struct grpc_completion_queue { }; #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) +#define CQ_FROM_POLLSET(ps) (((grpc_completion_queue *)ps) - 1) static gpr_mu g_freelist_mu; static grpc_completion_queue *g_freelist; @@ -149,6 +152,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->shutdown = 0; cc->shutdown_called = 0; cc->is_server_cq = 0; + cc->is_non_listening_server_cq = 0; cc->num_pluckers = 0; #ifndef NDEBUG cc->outstanding_tag_count = 0; @@ -511,6 +515,18 @@ grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { return POLLSET_FROM_CQ(cc); } +grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { + return CQ_FROM_POLLSET(ps); +} + +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { + cc->is_non_listening_server_cq = 1; +} + +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { + return (cc->is_non_listening_server_cq == 1); +} + void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index eef82cf014..3d0dd13c53 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -81,7 +81,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, void *done_arg, grpc_cq_completion *storage); grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); +grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); +void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 3fda2c9e1e..7ee7b51568 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -37,10 +37,10 @@ #include <string.h> #include "src/core/lib/debug/trace.h" -#include "src/core/lib/security/auth_filters.h" -#include "src/core/lib/security/credentials.h" -#include "src/core/lib/security/secure_endpoint.h" -#include "src/core/lib/security/security_connector.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/security/transport/auth_filters.h" +#include "src/core/lib/security/transport/secure_endpoint.h" +#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/tsi/transport_security_interface.h" diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index f50ec54cea..eef862787f 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -91,8 +91,8 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { if (op->on_connectivity_state_change) { - GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); - *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; + GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN); + *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN; op->on_connectivity_state_change->cb( exec_ctx, op->on_connectivity_state_change->cb_arg, 1); } @@ -108,6 +108,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) {} static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_stats *stats, void *and_free_memory) { gpr_free(and_free_memory); } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 2db95b9cdf..9532e090a4 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -69,11 +69,6 @@ typedef struct call_data call_data; typedef struct channel_data channel_data; typedef struct registered_method registered_method; -typedef struct { - call_data *next; - call_data *prev; -} call_link; - typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { @@ -81,7 +76,6 @@ typedef struct requested_call { void *tag; grpc_server *server; grpc_completion_queue *cq_bound_to_call; - grpc_completion_queue *cq_for_notification; grpc_call **call; grpc_cq_completion completion; grpc_metadata_array *initial_metadata; @@ -108,6 +102,7 @@ struct channel_data { grpc_server *server; grpc_connectivity_state connectivity_state; grpc_channel *channel; + size_t cq_idx; /* linked list of all channels on a server */ channel_data *next; channel_data *prev; @@ -172,7 +167,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_stack_lockfree *requests; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -180,6 +175,7 @@ struct registered_method { char *host; grpc_server_register_method_payload_handling payload_handling; uint32_t flags; + /* one request matcher per method */ request_matcher request_matcher; registered_method *next; }; @@ -195,6 +191,7 @@ struct grpc_server { grpc_completion_queue **cqs; grpc_pollset **pollsets; size_t cq_count; + bool started; /* The two following mutexes control access to server-state mu_global controls access to non-call-related state (e.g., channel state) @@ -207,6 +204,7 @@ struct grpc_server { gpr_mu mu_call; /* mutex for call-specific state */ registered_method *registered_methods; + /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; /** free list of available requested_calls indices */ gpr_stack_lockfree *request_freelist; @@ -234,7 +232,7 @@ struct grpc_server { static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *calld, bool success); static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - requested_call *rc); + size_t cq_idx, requested_call *rc); /* Before calling maybe_finish_shutdown, we must hold mu_global and not hold mu_call */ static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server); @@ -312,12 +310,19 @@ static void request_matcher_init(request_matcher *rm, size_t entries, grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; - rm->requests = gpr_stack_lockfree_create(entries); + rm->requests_per_cq = + gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); + for (size_t i = 0; i < server->cq_count; i++) { + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); + } } static void request_matcher_destroy(request_matcher *rm) { - GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1); - gpr_stack_lockfree_destroy(rm->requests); + for (size_t i = 0; i < rm->server->cq_count; i++) { + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); + } + gpr_free(rm->requests_per_cq); } static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) { @@ -343,8 +348,11 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm) { int request_id; - while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) { - fail_call(exec_ctx, server, &server->requested_calls[request_id]); + for (size_t i = 0; i < server->cq_count; i++) { + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, &server->requested_calls[request_id]); + } } } @@ -364,15 +372,19 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { gpr_mu_destroy(&server->mu_call); while ((rm = server->registered_methods) != NULL) { server->registered_methods = rm->next; - request_matcher_destroy(&rm->request_matcher); + if (server->started) { + request_matcher_destroy(&rm->request_matcher); + } gpr_free(rm->method); gpr_free(rm->host); gpr_free(rm); } + if (server->started) { + request_matcher_destroy(&server->unregistered_request_matcher); + } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); } - request_matcher_destroy(&server->unregistered_request_matcher); gpr_stack_lockfree_destroy(server->request_freelist); gpr_free(server->cqs); gpr_free(server->pollsets); @@ -453,11 +465,11 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - call_data *calld, requested_call *rc) { + call_data *calld, size_t cq_idx, requested_call *rc) { grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call); grpc_call *call = calld->call; *rc->call = call; - calld->cq_new = rc->cq_for_notification; + calld->cq_new = server->cqs[cq_idx]; GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata); switch (rc->type) { case BATCH_CALL: @@ -492,7 +504,9 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, } static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { - call_data *calld = arg; + grpc_call_element *call_elem = arg; + call_data *calld = call_elem->call_data; + channel_data *chand = call_elem->channel_data; request_matcher *rm = calld->request_matcher; grpc_server *server = rm->server; @@ -507,26 +521,34 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, bool success) { return; } - int request_id = gpr_stack_lockfree_pop(rm->requests); - if (request_id == -1) { - gpr_mu_lock(&server->mu_call); - gpr_mu_lock(&calld->mu_state); - calld->state = PENDING; - gpr_mu_unlock(&calld->mu_state); - if (rm->pending_head == NULL) { - rm->pending_tail = rm->pending_head = calld; + for (size_t i = 0; i < server->cq_count; i++) { + size_t cq_idx = (chand->cq_idx + i) % server->cq_count; + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { + continue; } else { - rm->pending_tail->pending_next = calld; - rm->pending_tail = calld; + gpr_mu_lock(&calld->mu_state); + calld->state = ACTIVATED; + gpr_mu_unlock(&calld->mu_state); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls[request_id]); + return; /* early out */ } - calld->pending_next = NULL; - gpr_mu_unlock(&server->mu_call); + } + + /* no cq to take the request found: queue it on the slow list */ + gpr_mu_lock(&server->mu_call); + gpr_mu_lock(&calld->mu_state); + calld->state = PENDING; + gpr_mu_unlock(&calld->mu_state); + if (rm->pending_head == NULL) { + rm->pending_tail = rm->pending_head = calld; } else { - gpr_mu_lock(&calld->mu_state); - calld->state = ACTIVATED; - gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, &server->requested_calls[request_id]); + rm->pending_tail->pending_next = calld; + rm->pending_tail = calld; } + calld->pending_next = NULL; + gpr_mu_unlock(&server->mu_call); } static void finish_start_new_rpc( @@ -548,14 +570,14 @@ static void finish_start_new_rpc( switch (payload_handling) { case GRPC_SRM_PAYLOAD_NONE: - publish_new_rpc(exec_ctx, calld, true); + publish_new_rpc(exec_ctx, elem, true); break; case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: { grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; op.data.recv_message = &calld->payload; - grpc_closure_init(&calld->publish, publish_new_rpc, calld); + grpc_closure_init(&calld->publish, publish_new_rpc, elem); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, &calld->publish); break; @@ -637,14 +659,16 @@ static int num_channels(grpc_server *server) { static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx, grpc_server *server) { - registered_method *rm; - request_matcher_kill_requests(exec_ctx, server, - &server->unregistered_request_matcher); - request_matcher_zombify_all_pending_calls( - exec_ctx, &server->unregistered_request_matcher); - for (rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); - request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + if (server->started) { + request_matcher_kill_requests(exec_ctx, server, + &server->unregistered_request_matcher); + request_matcher_zombify_all_pending_calls( + exec_ctx, &server->unregistered_request_matcher); + for (registered_method *rm = server->registered_methods; rm; + rm = rm->next) { + request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher); + request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); + } } } @@ -788,7 +812,7 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd, bool iomgr_status_ignored) { channel_data *chand = cd; grpc_server *server = chand->server; - if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) { + if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op op; memset(&op, 0, sizeof(op)); op.on_connectivity_state_change = &chand->channel_connectivity_changed, @@ -821,7 +845,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, } static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - void *ignored) { + const grpc_call_stats *stats, void *ignored) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; @@ -895,25 +919,46 @@ const grpc_channel_filter grpc_server_top_filter = { "server", }; -void grpc_server_register_completion_queue(grpc_server *server, - grpc_completion_queue *cq, - void *reserved) { +static void register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + bool is_non_listening, void *reserved) { size_t i, n; - GRPC_API_TRACE( - "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, - (server, cq, reserved)); GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { if (server->cqs[i] == cq) return; } - GRPC_CQ_INTERNAL_REF(cq, "server"); + grpc_cq_mark_server_cq(cq); + + if (is_non_listening) { + grpc_cq_mark_non_listening_server_cq(cq); + } + + GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue *)); server->cqs[n] = cq; } +void grpc_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq, + void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, + (server, cq, reserved)); + register_completion_queue(server, cq, false, reserved); +} + +void grpc_server_register_non_listening_completion_queue( + grpc_server *server, grpc_completion_queue *cq, void *reserved) { + GRPC_API_TRACE( + "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " + "reserved=%p)", + 3, (server, cq, reserved)); + register_completion_queue(server, cq, true, reserved); +} + grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { size_t i; @@ -940,8 +985,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { for (i = 0; i < (size_t)server->max_requested_calls; i++) { gpr_stack_lockfree_push(server->request_freelist, (int)i); } - request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls, server); server->requested_calls = gpr_malloc(server->max_requested_calls * sizeof(*server->requested_calls)); @@ -985,8 +1028,6 @@ void *grpc_server_register_method( } m = gpr_malloc(sizeof(registered_method)); memset(m, 0, sizeof(*m)); - request_matcher_init(&m->request_matcher, server->max_requested_calls, - server); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -1003,13 +1044,23 @@ void grpc_server_start(grpc_server *server) { GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server)); + server->started = true; + size_t pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); for (i = 0; i < server->cq_count; i++) { - server->pollsets[i] = grpc_cq_pollset(server->cqs[i]); + if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { + server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); + } + } + request_matcher_init(&server->unregistered_request_matcher, + server->max_requested_calls, server); + for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { + request_matcher_init(&rm->request_matcher, server->max_requested_calls, + server); } for (l = server->listeners; l; l = l->next) { - l->start(&exec_ctx, server, l->arg, server->pollsets, server->cq_count); + l->start(&exec_ctx, server, l->arg, server->pollsets, pollset_count); } grpc_exec_ctx_finish(&exec_ctx); @@ -1017,8 +1068,8 @@ void grpc_server_start(grpc_server *server) { void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args) { - size_t i; size_t num_registered_methods; size_t alloc; registered_method *rm; @@ -1033,12 +1084,6 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, uint32_t max_probes = 0; grpc_transport_op op; - for (i = 0; i < s->cq_count; i++) { - memset(&op, 0, sizeof(op)); - op.bind_pollset = grpc_cq_pollset(s->cqs[i]); - grpc_transport_perform_op(exec_ctx, transport, &op); - } - channel = grpc_channel_create(exec_ctx, NULL, args, GRPC_SERVER_CHANNEL, transport); chand = (channel_data *)grpc_channel_stack_element( @@ -1048,6 +1093,17 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, server_ref(s); chand->channel = channel; + size_t cq_idx; + grpc_completion_queue *accepting_cq = grpc_cq_from_pollset(accepting_pollset); + for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) { + if (s->cqs[cq_idx] == accepting_cq) break; + } + if (cq_idx == s->cq_count) { + /* completion queue not found: pick a random one to publish new calls to */ + cq_idx = (size_t)rand() % s->cq_count; + } + chand->cq_idx = cq_idx; + num_registered_methods = 0; for (rm = s->registered_methods; rm; rm = rm->next) { num_registered_methods++; @@ -1218,19 +1274,19 @@ void grpc_server_add_listener( } static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, - grpc_server *server, + grpc_server *server, size_t cq_idx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { - fail_call(exec_ctx, server, rc); + fail_call(exec_ctx, server, cq_idx, rc); return GRPC_CALL_OK; } request_id = gpr_stack_lockfree_pop(server->request_freelist); if (request_id == -1) { /* out of request ids: just fail this one */ - fail_call(exec_ctx, server, rc); + fail_call(exec_ctx, server, cq_idx, rc); return GRPC_CALL_OK; } switch (rc->type) { @@ -1243,12 +1299,12 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, } server->requested_calls[request_id] = *rc; gpr_free(rc); - if (gpr_stack_lockfree_push(rm->requests, request_id)) { + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - request_id = gpr_stack_lockfree_pop(rm->requests); + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); @@ -1264,7 +1320,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, + publish_call(exec_ctx, server, calld, cq_idx, &server->requested_calls[request_id]); } gpr_mu_lock(&server->mu_call); @@ -1288,7 +1344,13 @@ grpc_call_error grpc_server_request_call( "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)", 7, (server, call, details, initial_metadata, cq_bound_to_call, cq_for_notification, tag)); - if (!grpc_cq_is_server_cq(cq_for_notification)) { + size_t cq_idx; + for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { + if (server->cqs[cq_idx] == cq_for_notification) { + break; + } + } + if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; @@ -1299,11 +1361,10 @@ grpc_call_error grpc_server_request_call( rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; - rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.batch.details = details; rc->initial_metadata = initial_metadata; - error = queue_call_request(&exec_ctx, server, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: grpc_exec_ctx_finish(&exec_ctx); return error; @@ -1325,7 +1386,14 @@ grpc_call_error grpc_server_request_registered_call( "tag=%p)", 9, (server, rmp, call, deadline, initial_metadata, optional_payload, cq_bound_to_call, cq_for_notification, tag)); - if (!grpc_cq_is_server_cq(cq_for_notification)) { + + size_t cq_idx; + for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { + if (server->cqs[cq_idx] == cq_for_notification) { + break; + } + } + if (cq_idx == server->cq_count) { gpr_free(rc); error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; goto done; @@ -1341,26 +1409,25 @@ grpc_call_error grpc_server_request_registered_call( rc->server = server; rc->tag = tag; rc->cq_bound_to_call = cq_bound_to_call; - rc->cq_for_notification = cq_for_notification; rc->call = call; rc->data.registered.registered_method = rm; rc->data.registered.deadline = deadline; rc->initial_metadata = initial_metadata; rc->data.registered.optional_payload = optional_payload; - error = queue_call_request(&exec_ctx, server, rc); + error = queue_call_request(&exec_ctx, server, cq_idx, rc); done: grpc_exec_ctx_finish(&exec_ctx); return error; } static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, - requested_call *rc) { + size_t cq_idx, requested_call *rc) { *rc->call = NULL; rc->initial_metadata->count = 0; server_ref(server); - grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0, - done_request_event, rc, &rc->completion); + grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, 0, done_request_event, + rc, &rc->completion); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { diff --git a/src/core/lib/surface/server.h b/src/core/lib/surface/server.h index 470ef23c69..fb6e4d60c5 100644 --- a/src/core/lib/surface/server.h +++ b/src/core/lib/surface/server.h @@ -53,6 +53,7 @@ void grpc_server_add_listener( server */ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_transport *transport, + grpc_pollset *accepting_pollset, const grpc_channel_args *args); const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server); |