diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 42 |
1 files changed, 37 insertions, 5 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index a28a542c8d..445111ca40 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -162,6 +162,8 @@ struct grpc_call { gpr_uint8 error_status_set; /** should the alarm be cancelled */ gpr_uint8 cancel_alarm; + /** bitmask of allocated completion events in completions */ + gpr_uint8 allocated_completions; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -250,6 +252,9 @@ struct grpc_call { grpc_iomgr_closure on_done_recv; grpc_iomgr_closure on_done_send; grpc_iomgr_closure on_done_bind; + + /** completion events - for completion queue use */ + grpc_cq_completion completions[6]; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -349,6 +354,27 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } +grpc_cq_completion *allocate_completion(grpc_call *call) { + gpr_uint8 i; + for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { + if (call->allocated_completions & (1u << i)) { + continue; + } + call->allocated_completions |= 1u << i; + return &call->completions[i]; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +void done_completion(void *call, grpc_cq_completion *completion) { + grpc_call *c = call; + gpr_mu_lock(&c->mu); + c->allocated_completions &= ~(1u << (completion - c->completions)); + gpr_mu_unlock(&c->mu); + GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); +} + #ifdef GRPC_CALL_REF_COUNT_DEBUG void grpc_call_internal_ref(grpc_call *c, const char *reason) { gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, @@ -1316,11 +1342,15 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, success); + grpc_cq_end_op(call->cq, + tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_end_op(call->cq, + tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1343,8 +1373,9 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { - grpc_cq_begin_op(call->cq, call); - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_begin_op(call->cq); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, allocate_completion(call)); return GRPC_CALL_OK; } @@ -1466,7 +1497,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, } } - grpc_cq_begin_op(call->cq, call); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_begin_op(call->cq); return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } |