aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/completion_queue.c
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2017-07-13 11:39:44 -0700
committerGravatar yang-g <yangg@google.com>2017-07-28 12:49:40 -0700
commit533fbd362f96c573343403e85508390d0f81a2c5 (patch)
tree3a660ff72f7a7eec7df3f586865bb4b4fb275185 /src/core/lib/surface/completion_queue.c
parent0eaf7debd2915f947ae50367b1768cfad44205dc (diff)
Rebase with head and resolve conflicts
Diffstat (limited to 'src/core/lib/surface/completion_queue.c')
-rw-r--r--src/core/lib/surface/completion_queue.c76
1 files changed, 38 insertions, 38 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index ef1dfb1eb0..98234e036a 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
- void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ int (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@@ -288,8 +288,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@@ -522,28 +522,55 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
+ int found = 0;
+ if (lock_cq) {
+ gpr_mu_lock(cq->mu);
+ }
+
+ for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
+ if (cq->outstanding_tags[i] == tag) {
+ cq->outstanding_tag_count--;
+ GPR_SWAP(void *, cq->outstanding_tags[i],
+ cq->outstanding_tags[cq->outstanding_tag_count]);
+ found = 1;
+ break;
+ }
+ }
+
+ if (lock_cq) {
+ gpr_mu_unlock(cq->mu);
+ }
+
+ GPR_ASSERT(found);
+}
+#else
+static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
+#endif
+
+static int cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
while (true) {
- gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events.count);
+ gpr_atm count = gpr_atm_no_barrier_load(&cqd->pending_events);
if (count == 0) {
+ cq_check_tag(cq, tag, true); /* Used in debug builds only */
return 1;
- } else if (gpr_atm_no_barrier_cas(&cqd->pending_events.count, count,
- count + 1)) {
+ } else if (gpr_atm_no_barrier_cas(&cqd->pending_events, count, count + 1)) {
break;
}
}
return 0;
}
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+static int cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
GPR_ASSERT(!cqd->shutdown_called);
gpr_ref(&cqd->pending_events);
+ return 0;
}
-void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+int grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
#ifndef NDEBUG
gpr_mu_lock(cq->mu);
if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
@@ -555,35 +582,8 @@ void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
gpr_mu_unlock(cq->mu);
#endif
- cq->vtable->begin_op(cq, tag);
-}
-
-#ifndef NDEBUG
-static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
- int found = 0;
- if (lock_cq) {
- gpr_mu_lock(cq->mu);
- }
-
- for (int i = 0; i < (int)cq->outstanding_tag_count; i++) {
- if (cq->outstanding_tags[i] == tag) {
- cq->outstanding_tag_count--;
- GPR_SWAP(void *, cq->outstanding_tags[i],
- cq->outstanding_tags[cq->outstanding_tag_count]);
- found = 1;
- break;
- }
- }
-
- if (lock_cq) {
- gpr_mu_unlock(cq->mu);
- }
-
- GPR_ASSERT(found);
+ return cq->vtable->begin_op(cq, tag);
}
-#else
-static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
-#endif
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion