aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c42
1 files changed, 37 insertions, 5 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index a28a542c8d..445111ca40 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -162,6 +162,8 @@ struct grpc_call {
gpr_uint8 error_status_set;
/** should the alarm be cancelled */
gpr_uint8 cancel_alarm;
+ /** bitmask of allocated completion events in completions */
+ gpr_uint8 allocated_completions;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
@@ -250,6 +252,9 @@ struct grpc_call {
grpc_iomgr_closure on_done_recv;
grpc_iomgr_closure on_done_send;
grpc_iomgr_closure on_done_bind;
+
+ /** completion events - for completion queue use */
+ grpc_cq_completion completions[6];
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -349,6 +354,27 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
return call->cq;
}
+grpc_cq_completion *allocate_completion(grpc_call *call) {
+ gpr_uint8 i;
+ for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
+ if (call->allocated_completions & (1u << i)) {
+ continue;
+ }
+ call->allocated_completions |= 1u << i;
+ return &call->completions[i];
+ }
+ gpr_log(GPR_ERROR, "should never reach here");
+ abort();
+}
+
+void done_completion(void *call, grpc_cq_completion *completion) {
+ grpc_call *c = call;
+ gpr_mu_lock(&c->mu);
+ c->allocated_completions &= ~(1u << (completion - c->completions));
+ gpr_mu_unlock(&c->mu);
+ GRPC_CALL_INTERNAL_UNREF(c, "completion", 1);
+}
+
#ifdef GRPC_CALL_REF_COUNT_DEBUG
void grpc_call_internal_ref(grpc_call *c, const char *reason) {
gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
@@ -1316,11 +1342,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, success);
+ grpc_cq_end_op(call->cq,
+ tag, success, done_completion, call,
+ allocate_completion(call));
}
static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_end_op(call->cq,
+ tag, 1, done_completion, call,
+ allocate_completion(call));
}
static int are_write_flags_valid(gpr_uint32 flags) {
@@ -1343,8 +1373,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
- grpc_cq_begin_op(call->cq, call);
- grpc_cq_end_op(call->cq, tag, call, 1);
+ grpc_cq_begin_op(call->cq);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call));
return GRPC_CALL_OK;
}
@@ -1466,7 +1497,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
- grpc_cq_begin_op(call->cq, call);
+ GRPC_CALL_INTERNAL_REF(call, "completion");
+ grpc_cq_begin_op(call->cq);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}