diff options
author | 2015-07-13 10:10:31 -0700 | |
---|---|---|
committer | 2015-07-13 10:10:31 -0700 | |
commit | cc44ee50e467a956994a9d283084f40ccbd04a02 (patch) | |
tree | f45bad885a001ad2b39197284fc2e10aae4dc42d /src/core/surface/call.c | |
parent | 53132db4a46aaf93a6aef4dd73cd27250b836d6b (diff) | |
parent | 4d0b7427faecfdbb1635c2ab158c922d7511b865 (diff) |
Merge github.com:grpc/grpc into flow-like-lava-to-a-barnyard
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 60 |
1 files changed, 54 insertions, 6 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index c0f059c548..71f4235571 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -49,6 +49,17 @@ #include <stdlib.h> #include <string.h> +/** The maximum number of completions possible. + Based upon the maximum number of individually queueable ops in the batch + api: + - initial metadata send + - message send + - status/close send (depending on client/server) + - initial metadata recv + - message recv + - status/close recv (depending on client/server) */ +#define MAX_CONCURRENT_COMPLETIONS 6 + typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { @@ -135,6 +146,7 @@ struct grpc_call { grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; + gpr_mu completion_mu; /* how far through the stream have we read? */ read_state read_state; @@ -162,6 +174,8 @@ struct grpc_call { gpr_uint8 error_status_set; /** should the alarm be cancelled */ gpr_uint8 cancel_alarm; + /** bitmask of allocated completion events in completions */ + gpr_uint8 allocated_completions; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -250,6 +264,9 @@ struct grpc_call { grpc_iomgr_closure on_done_recv; grpc_iomgr_closure on_done_send; grpc_iomgr_closure on_done_bind; + + /** completion events - for completion queue use */ + grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -286,6 +303,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); + gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; if (cq) { @@ -349,6 +367,29 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } +static grpc_cq_completion *allocate_completion(grpc_call *call) { + gpr_uint8 i; + gpr_mu_lock(&call->completion_mu); + for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { + if (call->allocated_completions & (1u << i)) { + continue; + } + call->allocated_completions |= 1u << i; + gpr_mu_unlock(&call->completion_mu); + return &call->completions[i]; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +static void done_completion(void *call, grpc_cq_completion *completion) { + grpc_call *c = call; + gpr_mu_lock(&c->completion_mu); + c->allocated_completions &= ~(1u << (completion - c->completions)); + gpr_mu_unlock(&c->completion_mu); + GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); +} + #ifdef GRPC_CALL_REF_COUNT_DEBUG void grpc_call_internal_ref(grpc_call *c, const char *reason) { gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, @@ -365,6 +406,7 @@ static void destroy_call(void *call, int ignored_success) { grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); + gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { GRPC_MDSTR_UNREF(c->status[i].details); @@ -1201,7 +1243,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, + gpr_now(GPR_CLOCK_REALTIME)); } /* we offset status by a small amount when storing it into transport metadata @@ -1329,11 +1372,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, success); + grpc_cq_end_op(call->cq, tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1356,8 +1401,10 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { - grpc_cq_begin_op(call->cq, call); - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_begin_op(call->cq); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); return GRPC_CALL_OK; } @@ -1479,7 +1526,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, } } - grpc_cq_begin_op(call->cq, call); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_begin_op(call->cq); return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } |