aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/surface/completion_queue.c27
-rw-r--r--src/core/lib/surface/completion_queue.h3
-rw-r--r--src/core/lib/surface/server.c18
3 files changed, 41 insertions, 7 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 59148fa6a5..d45e265d5c 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -115,7 +115,7 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
-grpc_completion_queue *grpc_completion_queue_create_ex(
+grpc_completion_queue *grpc_completion_queue_create(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
void *reserved) {
grpc_completion_queue *cc;
@@ -132,6 +132,9 @@ grpc_completion_queue *grpc_completion_queue_create_ex(
cc->outstanding_tag_capacity = 0;
#endif
+ cc->completion_type = completion_type;
+ cc->polling_type = polling_type;
+
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
@@ -155,8 +158,12 @@ grpc_completion_queue *grpc_completion_queue_create_ex(
return cc;
}
-grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
- return grpc_completion_queue_create_ex(0, DEFAULT_POLLING, reserved);
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
+ return cc->completion_type;
+}
+
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
+ return cc->polling_type;
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
@@ -359,6 +366,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
+ if (cc->completion_type != GRPC_CQ_NEXT) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_next() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_NEXT");
+ abort();
+ }
+
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
@@ -529,6 +543,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
+ if (cc->completion_type != GRPC_CQ_PLUCK) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_pluck() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_PLUCK");
+ abort();
+ }
+
if (grpc_cq_pluck_trace) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 5d73dd7216..21f28e9424 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -99,4 +99,7 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
+
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index b360579553..13f7321fac 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1001,6 +1001,14 @@ void grpc_server_register_completion_queue(grpc_server *server,
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
+
+ if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
+ gpr_log(
+ GPR_ERROR,
+ "Server completion queues must have a completion type of GRPC_CQ_NEXT");
+ abort();
+ }
+
register_completion_queue(server, cq, false, reserved);
}
@@ -1423,8 +1431,9 @@ grpc_call_error grpc_server_request_call(
"grpc_server_request_call("
"server=%p, call=%p, details=%p, initial_metadata=%p, "
"cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
- 7, (server, call, details, initial_metadata, cq_bound_to_call,
- cq_for_notification, tag));
+ 7,
+ (server, call, details, initial_metadata, cq_bound_to_call,
+ cq_for_notification, tag));
size_t cq_idx;
for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
if (server->cqs[cq_idx] == cq_for_notification) {
@@ -1466,8 +1475,9 @@ grpc_call_error grpc_server_request_registered_call(
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
"optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
"tag=%p)",
- 9, (server, rmp, call, deadline, initial_metadata, optional_payload,
- cq_bound_to_call, cq_for_notification, tag));
+ 9,
+ (server, rmp, call, deadline, initial_metadata, optional_payload,
+ cq_bound_to_call, cq_for_notification, tag));
size_t cq_idx;
for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {