diff options
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 155 |
1 files changed, 94 insertions, 61 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 04291b0ee0..772681109a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -154,8 +154,9 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /* Call stats: only valid after trailing metadata received */ - grpc_call_stats stats; + /* Call data useful used for reporting. Only valid after the call has + * completed */ + grpc_call_final_info final_info; /* Compression algorithm for *incoming* data */ grpc_compression_algorithm incoming_compression_algorithm; @@ -259,12 +260,23 @@ grpc_call *grpc_call_create( call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); } } - call->send_deadline = send_deadline; + call->send_deadline = + gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC); GRPC_CHANNEL_INTERNAL_REF(channel, "call"); /* initial refcount dropped by grpc_call_destroy */ - grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, - call->context, server_transport_data, - CALL_STACK_FROM_CALL(call)); + grpc_error *error = grpc_call_stack_init( + &exec_ctx, channel_stack, 1, destroy_call, call, call->context, + server_transport_data, CALL_STACK_FROM_CALL(call)); + if (error != GRPC_ERROR_NONE) { + intptr_t status; + if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) + status = GRPC_STATUS_UNKNOWN; + const char *error_str = + grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION); + close_with_status(&exec_ctx, call, (grpc_status_code)status, + error_str == NULL ? "unknown error" : error_str); + GRPC_ERROR_UNREF(error); + } if (cq != NULL) { GPR_ASSERT( pollset_set_alternative == NULL && @@ -360,6 +372,25 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + set_value(call->status[i].code, set_value_user_data); + return; + } + } + if (call->is_client) { + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); + } else { + set_value(GRPC_STATUS_OK, set_value_user_data); + } +} + +static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { size_t i; @@ -391,7 +422,11 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } grpc_channel *channel = c->channel; - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->stats, c); + + get_final_status(call, set_status_value_directly, + &c->final_info.final_status); + + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); GPR_TIMER_END("destroy_call", 0); } @@ -402,8 +437,33 @@ static void set_status_code(grpc_call *call, status_source source, call->status[source].is_set = 1; call->status[source].code = (grpc_status_code)status; +} - /* TODO(ctiller): what to do about the flush that was previously here */ +static void set_status_details(grpc_call *call, status_source source, + grpc_mdstr *status) { + if (call->status[source].details != NULL) { + GRPC_MDSTR_UNREF(status); + } else { + call->status[source].details = status; + } +} + +static void set_status_from_error(grpc_call *call, status_source source, + grpc_error *error) { + intptr_t status; + if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { + set_status_code(call, source, (uint32_t)status); + } else { + set_status_code(call, source, GRPC_STATUS_INTERNAL); + } + const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); + bool free_msg = false; + if (msg == NULL) { + free_msg = true; + msg = grpc_error_string(error); + } + set_status_details(call, source, grpc_mdstr_from_string(msg)); + if (free_msg) grpc_error_free_string(msg); } static void set_incoming_compression_algorithm( @@ -492,32 +552,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } -static void set_status_details(grpc_call *call, status_source source, - grpc_mdstr *status) { - if (call->status[source].details != NULL) { - GRPC_MDSTR_UNREF(call->status[source].details); - } - call->status[source].details = status; -} - -static void get_final_status(grpc_call *call, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data) { - int i; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - set_value(call->status[i].code, set_value_user_data); - return; - } - } - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); - } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - static void get_final_details(grpc_call *call, char **out_details, size_t *out_details_capacity) { int i; @@ -741,8 +775,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, typedef struct termination_closure { grpc_closure closure; grpc_call *call; - grpc_status_code status; - gpr_slice optional_message; + grpc_error *error; grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; } termination_closure; @@ -758,7 +791,7 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); break; } - gpr_slice_unref(tc->optional_message); + GRPC_ERROR_UNREF(tc->error); grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL); gpr_free(tc); } @@ -767,7 +800,7 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - op.cancel_with_status = tc->status; + op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); op.on_complete = &tc->closure; @@ -778,8 +811,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - tc->optional_message = gpr_slice_ref(tc->optional_message); - grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message); + op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); tc->op_closure = op.on_complete; @@ -789,14 +821,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, termination_closure *tc) { - grpc_mdstr *details = NULL; - if (GPR_SLICE_LENGTH(tc->optional_message) > 0) { - tc->optional_message = gpr_slice_ref(tc->optional_message); - details = grpc_mdstr_from_slice(tc->optional_message); - } - - set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status); - set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details); + set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error); if (tc->type == TC_CANCEL) { grpc_closure_init(&tc->closure, send_cancel, tc); @@ -812,13 +837,15 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(termination_closure)); tc->type = TC_CANCEL; tc->call = c; - tc->optional_message = gpr_slice_from_copied_string(description); - GPR_ASSERT(status != GRPC_STATUS_OK); - tc->status = status; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); return terminate_with_status(exec_ctx, tc); } @@ -826,13 +853,15 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(termination_closure)); tc->type = TC_CLOSE; tc->call = c; - tc->optional_message = gpr_slice_from_copied_string(description); - GPR_ASSERT(status != GRPC_STATUS_OK); - tc->status = status; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); return terminate_with_status(exec_ctx, tc); } @@ -1194,7 +1223,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ - char *algo_name; + char *algo_name = NULL; grpc_compression_algorithm_name(algo, &algo_name); gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); @@ -1213,7 +1242,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, call->incoming_compression_algorithm)) { extern int grpc_compression_trace; if (grpc_compression_trace) { - char *algo_name; + char *algo_name = NULL; grpc_compression_algorithm_name(call->incoming_compression_algorithm, &algo_name); gpr_log(GPR_ERROR, @@ -1348,6 +1377,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; + // sent_initial_metadata guards against variable reuse. + grpc_metadata compression_md; + GPR_TIMER_BEGIN("grpc_call_start_batch", 0); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); @@ -1393,8 +1425,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* process compression level */ - grpc_metadata compression_md; - memset(&compression_md, 0, sizeof(grpc_metadata)); + memset(&compression_md, 0, sizeof(compression_md)); size_t additional_metadata_count = 0; grpc_compression_level effective_compression_level; bool level_set = false; @@ -1414,7 +1445,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, const grpc_compression_algorithm calgo = compression_algorithm_for_level_locked( call, effective_compression_level); - char *calgo_name; + char *calgo_name = NULL; grpc_compression_algorithm_name(calgo, &calgo_name); // the following will be picked up by the compress filter and used as // the call's compression algorithm. @@ -1595,7 +1626,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = &call->stats.transport_stream_stats; + stream_op.collect_stats = + &call->final_info.stats.transport_stream_stats; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: /* Flag validation: currently allow no flags */ @@ -1617,7 +1649,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->recv_final_op = 1; stream_op.recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - stream_op.collect_stats = &call->stats.transport_stream_stats; + stream_op.collect_stats = + &call->final_info.stats.transport_stream_stats; break; } } |