diff options
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 261 |
1 files changed, 113 insertions, 148 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index fc9df76dc1..5690bcab1e 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -109,6 +109,10 @@ typedef struct batch_control { uint8_t recv_message; uint8_t recv_final_op; uint8_t is_notify_tag_closure; + + /* TODO(ctiller): now that this is inlined, figure out how much of the above + state can be eliminated */ + grpc_transport_stream_op op; } batch_control; struct grpc_call { @@ -122,8 +126,6 @@ struct grpc_call { /* client or server call */ bool is_client; - /* is the alarm set */ - bool have_alarm; /** has grpc_call_destroy been called */ bool destroy_called; /** flag indicating that cancellation is inherited */ @@ -154,8 +156,9 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /* Call stats: only valid after trailing metadata received */ - grpc_call_stats stats; + /* Call data useful used for reporting. Only valid after the call has + * completed */ + grpc_call_final_info final_info; /* Compression algorithm for *incoming* data */ grpc_compression_algorithm incoming_compression_algorithm; @@ -165,9 +168,6 @@ struct grpc_call { /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; - /* Deadline alarm - if have_alarm is non-zero */ - grpc_timer alarm; - /* for the client, extra metadata is initial metadata; for the server, it's trailing metadata */ grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT]; @@ -210,8 +210,6 @@ struct grpc_call { #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, - gpr_timespec deadline); static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op); static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, @@ -259,29 +257,8 @@ grpc_call *grpc_call_create( call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); } } - call->send_deadline = - gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC); - GRPC_CHANNEL_INTERNAL_REF(channel, "call"); - /* initial refcount dropped by grpc_call_destroy */ - grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, - call->context, server_transport_data, - CALL_STACK_FROM_CALL(call)); - if (cq != NULL) { - GPR_ASSERT( - pollset_set_alternative == NULL && - "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); - GRPC_CQ_INTERNAL_REF(cq, "bind"); - call->pollent = - grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); - } - if (pollset_set_alternative != NULL) { - call->pollent = - grpc_polling_entity_create_from_pollset_set(pollset_set_alternative); - } - if (!grpc_polling_entity_is_empty(&call->pollent)) { - grpc_call_stack_set_pollset_or_pollset_set( - &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); - } + send_deadline = gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC); + if (parent_call != NULL) { GRPC_CALL_INTERNAL_REF(parent_call, "child"); GPR_ASSERT(call->is_client); @@ -323,10 +300,38 @@ grpc_call *grpc_call_create( gpr_mu_unlock(&parent_call->mu); } - if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != - 0) { - set_deadline_alarm(&exec_ctx, call, send_deadline); + + call->send_deadline = send_deadline; + + GRPC_CHANNEL_INTERNAL_REF(channel, "call"); + /* initial refcount dropped by grpc_call_destroy */ + grpc_error *error = grpc_call_stack_init( + &exec_ctx, channel_stack, 1, destroy_call, call, call->context, + server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call)); + if (error != GRPC_ERROR_NONE) { + grpc_status_code status; + const char *error_str; + grpc_error_get_status(error, &status, &error_str); + close_with_status(&exec_ctx, call, status, error_str); + GRPC_ERROR_UNREF(error); + } + if (cq != NULL) { + GPR_ASSERT( + pollset_set_alternative == NULL && + "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); + GRPC_CQ_INTERNAL_REF(cq, "bind"); + call->pollent = + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); + } + if (pollset_set_alternative != NULL) { + call->pollent = + grpc_polling_entity_create_from_pollset_set(pollset_set_alternative); + } + if (!grpc_polling_entity_is_empty(&call->pollent)) { + grpc_call_stack_set_pollset_or_pollset_set( + &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_create", 0); return call; @@ -361,6 +366,25 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + set_value(call->status[i].code, set_value_user_data); + return; + } + } + if (call->is_client) { + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); + } else { + set_value(GRPC_STATUS_OK, set_value_user_data); + } +} + +static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { size_t i; @@ -392,7 +416,11 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } grpc_channel *channel = c->channel; - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->stats, c); + + get_final_status(call, set_status_value_directly, + &c->final_info.final_status); + + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); GPR_TIMER_END("destroy_call", 0); } @@ -414,40 +442,13 @@ static void set_status_details(grpc_call *call, status_source source, } } -static void get_final_status(grpc_call *call, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data) { - int i; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - set_value(call->status[i].code, set_value_user_data); - return; - } - } - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); - } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - static void set_status_from_error(grpc_call *call, status_source source, grpc_error *error) { - intptr_t status; - if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { - set_status_code(call, source, (uint32_t)status); - } else { - set_status_code(call, source, GRPC_STATUS_INTERNAL); - } - const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); - bool free_msg = false; - if (msg == NULL) { - free_msg = true; - msg = grpc_error_string(error); - } + grpc_status_code status; + const char *msg; + grpc_error_get_status(error, &status, &msg); + set_status_code(call, source, (uint32_t)status); set_status_details(call, source, grpc_mdstr_from_string(msg)); - if (free_msg) grpc_error_free_string(msg); } static void set_incoming_compression_algorithm( @@ -720,9 +721,6 @@ void grpc_call_destroy(grpc_call *c) { gpr_mu_lock(&c->mu); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - if (c->have_alarm) { - grpc_timer_cancel(&exec_ctx, &c->alarm); - } cancel = !c->received_final_op; gpr_mu_unlock(&c->mu); if (cancel) grpc_call_cancel(c, NULL); @@ -760,8 +758,8 @@ typedef struct termination_closure { grpc_closure closure; grpc_call *call; grpc_error *error; - grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; + grpc_transport_stream_op op; } termination_closure; static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, @@ -776,31 +774,27 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, break; } GRPC_ERROR_UNREF(tc->error); - grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL); gpr_free(tc); } static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { - grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&op, 0, sizeof(op)); - op.cancel_error = tc->error; + memset(&tc->op, 0, sizeof(tc->op)); + tc->op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &op); + tc->op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &tc->op); } static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { - grpc_transport_stream_op op; termination_closure *tc = tcp; - memset(&op, 0, sizeof(op)); - op.close_error = tc->error; + memset(&tc->op, 0, sizeof(tc->op)); + tc->op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); - tc->op_closure = op.on_complete; - op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &op); + tc->op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &tc->op); } static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, @@ -881,32 +875,6 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { return CALL_FROM_TOP_ELEM(elem); } -static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call *call = arg; - gpr_mu_lock(&call->mu); - call->have_alarm = 0; - if (error != GRPC_ERROR_CANCELLED) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED, - "Deadline Exceeded"); - } - gpr_mu_unlock(&call->mu); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm"); -} - -static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call, - gpr_timespec deadline) { - if (call->have_alarm) { - gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice"); - assert(0); - return; - } - GRPC_CALL_INTERNAL_REF(call, "alarm"); - call->have_alarm = 1; - call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call, - gpr_now(GPR_CLOCK_MONOTONIC)); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -1144,17 +1112,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, if (gpr_unref(&bctl->steps_to_complete)) { post_batch_completion(exec_ctx, bctl); } - } else if (call->receiving_stream->length > - grpc_channel_get_max_message_length(call->channel)) { - cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL, - "Max message size exceeded"); - grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); - call->receiving_stream = NULL; - *call->receiving_buffer = NULL; - call->receiving_message = 0; - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } } else { call->test_only_last_message_flags = call->receiving_stream->flags; if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && @@ -1259,9 +1216,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { - GPR_TIMER_BEGIN("set_deadline_alarm", 0); - set_deadline_alarm(exec_ctx, call, md->deadline); - GPR_TIMER_END("set_deadline_alarm", 0); + call->send_deadline = + gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC); } } @@ -1290,9 +1246,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, GRPC_ERROR_REF(error); gpr_mu_lock(&call->mu); + + // If the error has an associated status code, set the call's status. + intptr_t status; + if (error != GRPC_ERROR_NONE && + grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { + set_status_from_error(call, STATUS_FROM_CORE, error); + } + if (bctl->send_initial_metadata) { if (error != GRPC_ERROR_NONE) { - set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE); + set_status_from_error(call, STATUS_FROM_CORE, error); } grpc_metadata_batch_destroy( &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); @@ -1310,9 +1274,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_metadata_batch_filter(md, recv_trailing_filter, call); call->received_final_op = true; - if (call->have_alarm) { - grpc_timer_cancel(exec_ctx, &call->alarm); - } /* propagate cancellation to any interested children */ child_call = call->first_child; if (child_call != NULL) { @@ -1354,19 +1315,19 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, int is_notify_tag_closure) { - grpc_transport_stream_op stream_op; size_t i; const grpc_op *op; batch_control *bctl; int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; + // sent_initial_metadata guards against variable reuse. + grpc_metadata compression_md; + GPR_TIMER_BEGIN("grpc_call_start_batch", 0); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); - memset(&stream_op, 0, sizeof(stream_op)); - /* TODO(ctiller): this feels like it could be made lock-free */ gpr_mu_lock(&call->mu); bctl = allocate_batch_control(call); @@ -1375,6 +1336,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + grpc_transport_stream_op *stream_op = &bctl->op; + memset(stream_op, 0, sizeof(*stream_op)); + if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); bctl->error = GRPC_ERROR_NONE; @@ -1406,8 +1370,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* process compression level */ - grpc_metadata compression_md; - memset(&compression_md, 0, sizeof(grpc_metadata)); + memset(&compression_md, 0, sizeof(compression_md)); size_t additional_metadata_count = 0; grpc_compression_level effective_compression_level; bool level_set = false; @@ -1453,9 +1416,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } /* TODO(ctiller): just make these the same variable? */ call->metadata_batch[0][0].deadline = call->send_deadline; - stream_op.send_initial_metadata = + stream_op->send_initial_metadata = &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; - stream_op.send_initial_metadata_flags = op->flags; + stream_op->send_initial_metadata_flags = op->flags; break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1475,7 +1438,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message->data.raw.slice_buffer, op->flags); - stream_op.send_message = &call->sending_stream.base; + stream_op->send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ @@ -1493,7 +1456,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } bctl->send_final_op = 1; call->sent_final_op = 1; - stream_op.send_trailing_metadata = + stream_op->send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_SEND_STATUS_FROM_SERVER: @@ -1540,7 +1503,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } - stream_op.send_trailing_metadata = + stream_op->send_trailing_metadata = &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]; break; case GRPC_OP_RECV_INITIAL_METADATA: @@ -1558,9 +1521,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl); bctl->recv_initial_metadata = 1; - stream_op.recv_initial_metadata = + stream_op->recv_initial_metadata = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - stream_op.recv_initial_metadata_ready = + stream_op->recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; num_completion_callbacks_needed++; break; @@ -1577,10 +1540,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->receiving_message = 1; bctl->recv_message = 1; call->receiving_buffer = op->data.recv_message; - stream_op.recv_message = &call->receiving_stream; + stream_op->recv_message = &call->receiving_stream; grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, bctl); - stream_op.recv_message_ready = &call->receiving_stream_ready; + stream_op->recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: @@ -1606,9 +1569,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.client.status_details_capacity = op->data.recv_status_on_client.status_details_capacity; bctl->recv_final_op = 1; - stream_op.recv_trailing_metadata = + stream_op->recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = &call->stats.transport_stream_stats; + stream_op->collect_stats = + &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ @@ -1628,9 +1592,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; - stream_op.recv_trailing_metadata = + stream_op->recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = &call->stats.transport_stream_stats; + stream_op->collect_stats = + &call->final_info.stats.transport_stream_stats; break; } } @@ -1641,12 +1606,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - stream_op.context = call->context; + stream_op->context = call->context; grpc_closure_init(&bctl->finish_batch, finish_batch, bctl); - stream_op.on_complete = &bctl->finish_batch; + stream_op->on_complete = &bctl->finish_batch; gpr_mu_unlock(&call->mu); - execute_op(exec_ctx, call, &stream_op); + execute_op(exec_ctx, call, stream_op); done: GPR_TIMER_END("grpc_call_start_batch", 0); |