diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 103 |
1 files changed, 81 insertions, 22 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ae1b215767..71f4235571 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -49,6 +49,17 @@ #include <stdlib.h> #include <string.h> +/** The maximum number of completions possible. + Based upon the maximum number of individually queueable ops in the batch + api: + - initial metadata send + - message send + - status/close send (depending on client/server) + - initial metadata recv + - message recv + - status/close recv (depending on client/server) */ +#define MAX_CONCURRENT_COMPLETIONS 6 + typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { @@ -76,14 +87,14 @@ typedef struct { typedef struct { /* Overall status of the operation: starts OK, may degrade to non-OK */ - int success; - /* Completion function to call at the end of the operation */ - grpc_ioreq_completion_func on_complete; - void *user_data; + gpr_uint8 success; /* a bit mask of which request ops are needed (1u << opid) */ gpr_uint16 need_mask; /* a bit mask of which request ops are now completed */ gpr_uint16 complete_mask; + /* Completion function to call at the end of the operation */ + grpc_ioreq_completion_func on_complete; + void *user_data; } reqinfo_master; /* Status data for a request can come from several sources; this @@ -135,6 +146,7 @@ struct grpc_call { grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; + gpr_mu completion_mu; /* how far through the stream have we read? */ read_state read_state; @@ -162,6 +174,8 @@ struct grpc_call { gpr_uint8 error_status_set; /** should the alarm be cancelled */ gpr_uint8 cancel_alarm; + /** bitmask of allocated completion events in completions */ + gpr_uint8 allocated_completions; /* flags with bits corresponding to write states allowing us to determine what was sent */ @@ -250,6 +264,9 @@ struct grpc_call { grpc_iomgr_closure on_done_recv; grpc_iomgr_closure on_done_send; grpc_iomgr_closure on_done_bind; + + /** completion events - for completion queue use */ + grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS]; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -286,6 +303,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); memset(call, 0, sizeof(grpc_call)); gpr_mu_init(&call->mu); + gpr_mu_init(&call->completion_mu); call->channel = channel; call->cq = cq; if (cq) { @@ -298,8 +316,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, if (call->is_client) { call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; - call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create(); - call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy; } GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); for (i = 0; i < add_initial_metadata_count; i++) { @@ -351,6 +367,29 @@ grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) { return call->cq; } +static grpc_cq_completion *allocate_completion(grpc_call *call) { + gpr_uint8 i; + gpr_mu_lock(&call->completion_mu); + for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) { + if (call->allocated_completions & (1u << i)) { + continue; + } + call->allocated_completions |= 1u << i; + gpr_mu_unlock(&call->completion_mu); + return &call->completions[i]; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); +} + +static void done_completion(void *call, grpc_cq_completion *completion) { + grpc_call *c = call; + gpr_mu_lock(&c->completion_mu); + c->allocated_completions &= ~(1u << (completion - c->completions)); + gpr_mu_unlock(&c->completion_mu); + GRPC_CALL_INTERNAL_UNREF(c, "completion", 1); +} + #ifdef GRPC_CALL_REF_COUNT_DEBUG void grpc_call_internal_ref(grpc_call *c, const char *reason) { gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c, @@ -367,20 +406,21 @@ static void destroy_call(void *call, int ignored_success) { grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); + gpr_mu_destroy(&c->completion_mu); for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (c->status[i].details) { - grpc_mdstr_unref(c->status[i].details); + GRPC_MDSTR_UNREF(c->status[i].details); } } for (i = 0; i < c->owned_metadata_count; i++) { - grpc_mdelem_unref(c->owned_metadata[i]); + GRPC_MDELEM_UNREF(c->owned_metadata[i]); } gpr_free(c->owned_metadata); for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) { gpr_free(c->buffered_metadata[i].metadata); } for (i = 0; i < c->send_initial_metadata_count; i++) { - grpc_mdelem_unref(c->send_initial_metadata[i].md); + GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md); } for (i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (c->context[i].destroy) { @@ -437,7 +477,7 @@ static void set_decode_compression_level(grpc_call *call, static void set_status_details(grpc_call *call, status_source source, grpc_mdstr *status) { if (call->status[source].details != NULL) { - grpc_mdstr_unref(call->status[source].details); + GRPC_MDSTR_UNREF(call->status[source].details); } call->status[source].details = status; } @@ -473,6 +513,8 @@ static void unlock(grpc_call *call) { int completing_requests = 0; int start_op = 0; int i; + const gpr_uint32 MAX_RECV_PEEK_AHEAD = 65536; + size_t buffered_bytes; int cancel_alarm = 0; memset(&op, 0, sizeof(op)); @@ -488,6 +530,17 @@ static void unlock(grpc_call *call) { op.recv_ops = &call->recv_ops; op.recv_state = &call->recv_state; op.on_done_recv = &call->on_done_recv; + if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) { + op.max_recv_bytes = call->incoming_message_length - + call->incoming_message.length + MAX_RECV_PEEK_AHEAD; + } else { + buffered_bytes = grpc_bbq_bytes(&call->incoming_queue); + if (buffered_bytes > MAX_RECV_PEEK_AHEAD) { + op.max_recv_bytes = 0; + } else { + op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes; + } + } call->receiving = 1; GRPC_CALL_INTERNAL_REF(call, "receiving"); start_op = 1; @@ -616,7 +669,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, case GRPC_IOREQ_SEND_STATUS: if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != NULL) { - grpc_mdstr_unref( + GRPC_MDSTR_UNREF( call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = NULL; @@ -945,7 +998,7 @@ static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) { &mdb, &call->details_link, grpc_mdelem_from_metadata_strings( call->metadata_context, - grpc_mdstr_ref( + GRPC_MDSTR_REF( grpc_channel_get_message_string(call->channel)), data.send_status.details)); call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = @@ -1053,7 +1106,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, reqs[i].data.send_status.code); if (reqs[i].data.send_status.details) { set_status_details(call, STATUS_FROM_SERVER_STATUS, - grpc_mdstr_ref(reqs[i].data.send_status.details)); + GRPC_MDSTR_REF(reqs[i].data.send_status.details)); } } have_ops |= 1u << op; @@ -1190,7 +1243,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now()); + grpc_alarm_init(&call->alarm, deadline, call_alarm, call, + gpr_now(GPR_CLOCK_REALTIME)); } /* we offset status by a small amount when storing it into transport metadata @@ -1257,7 +1311,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { if (key == grpc_channel_get_status_string(call->channel)) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { - set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); + set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value)); } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { set_decode_compression_level(call, decode_compression(md)); @@ -1293,10 +1347,10 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { grpc_mdctx_lock(mdctx); for (l = md->list.head; l; l = l->next) { - if (l->md) grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); } for (l = md->garbage.head; l; l = l->next) { - grpc_mdctx_locked_mdelem_unref(mdctx, l->md); + GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md); } grpc_mdctx_unlock(mdctx); } @@ -1318,11 +1372,13 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, success); + grpc_cq_end_op(call->cq, tag, success, done_completion, call, + allocate_completion(call)); } static void finish_batch_with_close(grpc_call *call, int success, void *tag) { - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); } static int are_write_flags_valid(gpr_uint32 flags) { @@ -1345,8 +1401,10 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { - grpc_cq_begin_op(call->cq, call); - grpc_cq_end_op(call->cq, tag, call, 1); + grpc_cq_begin_op(call->cq); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_end_op(call->cq, tag, 1, done_completion, call, + allocate_completion(call)); return GRPC_CALL_OK; } @@ -1468,7 +1526,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, } } - grpc_cq_begin_op(call->cq, call); + GRPC_CALL_INTERNAL_REF(call, "completion"); + grpc_cq_begin_op(call->cq); return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } |