diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 21 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 7 | ||||
-rw-r--r-- | src/cpp/common/core_codegen.cc | 12 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 26 |
4 files changed, 49 insertions, 17 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index e17f094837..ea97a6f374 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -61,6 +61,7 @@ typedef struct { } plucker; typedef struct { + bool can_get_pollset; size_t (*size)(void); void (*init)(grpc_pollset *pollset, gpr_mu **mu); grpc_error *(*kick)(grpc_pollset *pollset, @@ -107,9 +108,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec deadline) { non_polling_poller *npp = (non_polling_poller *)pollset; + if (npp->shutdown) return GRPC_ERROR_NONE; non_polling_worker w; gpr_cv_init(&w.cv); - *worker = (grpc_pollset_worker *)&w; + if (worker != NULL) *worker = (grpc_pollset_worker *)&w; if (npp->root == NULL) { npp->root = w.next = w.prev = &w; } else { @@ -128,11 +130,11 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, } npp->root = NULL; } - w.next->prev = w.prev; - w.prev->next = w.next; } + w.next->prev = w.prev; + w.prev->next = w.next; gpr_cv_destroy(&w.cv); - *worker = NULL; + if (worker != NULL) *worker = NULL; return GRPC_ERROR_NONE; } @@ -169,21 +171,24 @@ static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx, static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { /* GRPC_CQ_DEFAULT_POLLING */ - {.size = grpc_pollset_size, + {.can_get_pollset = true, + .size = grpc_pollset_size, .init = grpc_pollset_init, .kick = grpc_pollset_kick, .work = grpc_pollset_work, .shutdown = grpc_pollset_shutdown, .destroy = grpc_pollset_destroy}, /* GRPC_CQ_NON_LISTENING */ - {.size = grpc_pollset_size, + {.can_get_pollset = true, + .size = grpc_pollset_size, .init = grpc_pollset_init, .kick = grpc_pollset_kick, .work = grpc_pollset_work, .shutdown = grpc_pollset_shutdown, .destroy = grpc_pollset_destroy}, /* GRPC_CQ_NON_POLLING */ - {.size = non_polling_poller_size, + {.can_get_pollset = false, + .size = non_polling_poller_size, .init = non_polling_poller_init, .kick = non_polling_poller_kick, .work = non_polling_poller_work, @@ -837,7 +842,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return POLLSET_FROM_CQ(cc); + return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; } grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 9496f90390..767c91a5ec 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1009,6 +1009,8 @@ void grpc_server_register_completion_queue(grpc_server *server, calls grpc_completion_queue_pluck() on server completion queues */ } + GPR_ASSERT(grpc_cq_pollset(cq)); + register_completion_queue(server, cq, false, reserved); } @@ -1102,8 +1104,9 @@ void grpc_server_start(grpc_server *server) { gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { - server->pollsets[server->pollset_count++] = - grpc_cq_pollset(server->cqs[i]); + grpc_pollset *pollset = grpc_cq_pollset(server->cqs[i]); + GPR_ASSERT(pollset); + server->pollsets[server->pollset_count++] = pollset; } server->request_freelist_per_cq[i] = gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 0dd758ec4e..8f1de222fb 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -54,6 +54,18 @@ struct grpc_byte_buffer; namespace grpc { +const grpc_completion_queue_factory* +CoreCodegen::grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) { + return ::grpc_completion_queue_factory_lookup(attributes); +} + +grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attributes, void* reserved) { + return ::grpc_completion_queue_create(factory, attributes, reserved); +} + grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next( void* reserved) { return ::grpc_completion_queue_create_for_next(reserved); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index c6784ea159..6687fe78b9 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -243,6 +243,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_cqs(std::make_shared< std::vector<std::unique_ptr<ServerCompletionQueue>>>()); + int num_frequently_polled_cqs = 0; + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + if ((*it)->IsFrequentlyPolled()) { + num_frequently_polled_cqs++; + } + } + + const bool is_hybrid_server = + has_sync_methods && num_frequently_polled_cqs > 0; + if (has_sync_methods) { // This is a Sync server gpr_log(GPR_INFO, @@ -253,7 +263,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_settings_.cq_timeout_msec); grpc_cq_polling_type polling_type = - cqs_.empty() ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_POLLING; + is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING; // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { @@ -273,12 +283,15 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // server // 2. cqs_: Completion queues added via AddCompletionQueue() call - // All sync cqs (if any) are frequently polled by ThreadManager - int num_frequently_polled_cqs = sync_server_cqs->size(); - for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { - grpc_server_register_completion_queue(server->server_, (*it)->cq(), - nullptr); + if (is_hybrid_server) { + grpc_server_register_non_listening_completion_queue(server->server_, + (*it)->cq(), nullptr); + } else { + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); + } + num_frequently_polled_cqs++; } // cqs_ contains the completion queue added by calling the ServerBuilder's @@ -290,7 +303,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { if ((*it)->IsFrequentlyPolled()) { grpc_server_register_completion_queue(server->server_, (*it)->cq(), nullptr); - num_frequently_polled_cqs++; } else { grpc_server_register_non_listening_completion_queue(server->server_, (*it)->cq(), nullptr); |