aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/surface/completion_queue.c21
-rw-r--r--src/core/lib/surface/server.c7
-rw-r--r--src/cpp/common/core_codegen.cc12
-rw-r--r--src/cpp/server/server_builder.cc26
4 files changed, 49 insertions, 17 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index e17f094837..ea97a6f374 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -61,6 +61,7 @@ typedef struct {
} plucker;
typedef struct {
+ bool can_get_pollset;
size_t (*size)(void);
void (*init)(grpc_pollset *pollset, gpr_mu **mu);
grpc_error *(*kick)(grpc_pollset *pollset,
@@ -107,9 +108,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
gpr_timespec now,
gpr_timespec deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
+ if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
gpr_cv_init(&w.cv);
- *worker = (grpc_pollset_worker *)&w;
+ if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
if (npp->root == NULL) {
npp->root = w.next = w.prev = &w;
} else {
@@ -128,11 +130,11 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
}
npp->root = NULL;
}
- w.next->prev = w.prev;
- w.prev->next = w.next;
}
+ w.next->prev = w.prev;
+ w.prev->next = w.next;
gpr_cv_destroy(&w.cv);
- *worker = NULL;
+ if (worker != NULL) *worker = NULL;
return GRPC_ERROR_NONE;
}
@@ -169,21 +171,24 @@ static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
/* GRPC_CQ_DEFAULT_POLLING */
- {.size = grpc_pollset_size,
+ {.can_get_pollset = true,
+ .size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_LISTENING */
- {.size = grpc_pollset_size,
+ {.can_get_pollset = true,
+ .size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_POLLING */
- {.size = non_polling_poller_size,
+ {.can_get_pollset = false,
+ .size = non_polling_poller_size,
.init = non_polling_poller_init,
.kick = non_polling_poller_kick,
.work = non_polling_poller_work,
@@ -837,7 +842,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return POLLSET_FROM_CQ(cc);
+ return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
}
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 9496f90390..767c91a5ec 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1009,6 +1009,8 @@ void grpc_server_register_completion_queue(grpc_server *server,
calls grpc_completion_queue_pluck() on server completion queues */
}
+ GPR_ASSERT(grpc_cq_pollset(cq));
+
register_completion_queue(server, cq, false, reserved);
}
@@ -1102,8 +1104,9 @@ void grpc_server_start(grpc_server *server) {
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
- server->pollsets[server->pollset_count++] =
- grpc_cq_pollset(server->cqs[i]);
+ grpc_pollset *pollset = grpc_cq_pollset(server->cqs[i]);
+ GPR_ASSERT(pollset);
+ server->pollsets[server->pollset_count++] = pollset;
}
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 0dd758ec4e..8f1de222fb 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -54,6 +54,18 @@ struct grpc_byte_buffer;
namespace grpc {
+const grpc_completion_queue_factory*
+CoreCodegen::grpc_completion_queue_factory_lookup(
+ const grpc_completion_queue_attributes* attributes) {
+ return ::grpc_completion_queue_factory_lookup(attributes);
+}
+
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attributes, void* reserved) {
+ return ::grpc_completion_queue_create(factory, attributes, reserved);
+}
+
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
return ::grpc_completion_queue_create_for_next(reserved);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index c6784ea159..6687fe78b9 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -243,6 +243,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_cqs(std::make_shared<
std::vector<std::unique_ptr<ServerCompletionQueue>>>());
+ int num_frequently_polled_cqs = 0;
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ num_frequently_polled_cqs++;
+ }
+ }
+
+ const bool is_hybrid_server =
+ has_sync_methods && num_frequently_polled_cqs > 0;
+
if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
@@ -253,7 +263,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.cq_timeout_msec);
grpc_cq_polling_type polling_type =
- cqs_.empty() ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_POLLING;
+ is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
@@ -273,12 +283,15 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// server
// 2. cqs_: Completion queues added via AddCompletionQueue() call
- // All sync cqs (if any) are frequently polled by ThreadManager
- int num_frequently_polled_cqs = sync_server_cqs->size();
-
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
- grpc_server_register_completion_queue(server->server_, (*it)->cq(),
- nullptr);
+ if (is_hybrid_server) {
+ grpc_server_register_non_listening_completion_queue(server->server_,
+ (*it)->cq(), nullptr);
+ } else {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
+ }
+ num_frequently_polled_cqs++;
}
// cqs_ contains the completion queue added by calling the ServerBuilder's
@@ -290,7 +303,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if ((*it)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
- num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
(*it)->cq(), nullptr);