aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c48
1 files changed, 43 insertions, 5 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index b4594817e4..aecc393edf 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -64,6 +64,10 @@ typedef struct {
struct grpc_completion_queue {
/** owned by pollset */
gpr_mu *mu;
+
+ grpc_cq_completion_type completion_type;
+ grpc_cq_polling_type polling_type;
+
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -79,6 +83,7 @@ struct grpc_completion_queue {
int shutdown_called;
int is_server_cq;
/** Can the server cq accept incoming channels */
+ /* TODO: sreek - This will no longer be needed. Use polling_type set */
int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@@ -110,13 +115,17 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
-grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
+grpc_completion_queue *grpc_completion_queue_create_internal(
+ grpc_cq_completion_type completion_type,
+ grpc_cq_polling_type polling_type) {
grpc_completion_queue *cc;
- GPR_ASSERT(!reserved);
- GPR_TIMER_BEGIN("grpc_completion_queue_create", 0);
+ GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
- GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
+ GRPC_API_TRACE(
+ "grpc_completion_queue_create_internal(completion_type=%d, "
+ "polling_type=%d)",
+ 2, (completion_type, polling_type));
cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
@@ -125,6 +134,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->outstanding_tag_capacity = 0;
#endif
+ cc->completion_type = completion_type;
+ cc->polling_type = polling_type;
+
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
@@ -143,11 +155,19 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
grpc_schedule_on_exec_ctx);
- GPR_TIMER_END("grpc_completion_queue_create", 0);
+ GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
return cc;
}
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
+ return cc->completion_type;
+}
+
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
+ return cc->polling_type;
+}
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@@ -348,6 +368,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
+ if (cc->completion_type != GRPC_CQ_NEXT) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_next() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_NEXT");
+ abort();
+ }
+
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
@@ -517,6 +544,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
+ if (cc->completion_type != GRPC_CQ_PLUCK) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_pluck() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_PLUCK");
+ abort();
+ }
+
if (grpc_cq_pluck_trace) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
@@ -681,10 +715,14 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
}
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO: sreek - use cc->polling_type field here and add a validation check
+ (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose
+ polling_type is set to GRPC_CQ_NON_LISTENING */
cc->is_non_listening_server_cq = 1;
}
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */
return (cc->is_non_listening_server_cq == 1);
}