diff options
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 74 |
1 files changed, 45 insertions, 29 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 02a5de0857..8ca3cab9d5 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -39,9 +39,9 @@ #include <grpc/compression.h> #include <grpc/grpc.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice.h> #include <grpc/support/string_util.h> #include <grpc/support/useful.h> @@ -49,6 +49,7 @@ #include "src/core/lib/compression/algorithm_metadata.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -122,6 +123,7 @@ struct grpc_call { grpc_channel *channel; grpc_call *parent; grpc_call *first_child; + gpr_timespec start_time; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; @@ -184,7 +186,7 @@ struct grpc_call { grpc_slice_buffer_stream sending_stream; grpc_byte_stream *receiving_stream; grpc_byte_buffer **receiving_buffer; - gpr_slice receiving_slice; + grpc_slice receiving_slice; grpc_closure receiving_slice_ready; grpc_closure receiving_stream_ready; grpc_closure receiving_initial_metadata_ready; @@ -239,6 +241,7 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, call->channel = args->channel; call->cq = args->cq; call->parent = args->parent_call; + call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; @@ -311,10 +314,10 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ - grpc_error *error = - grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, - call->context, args->server_transport_data, path, - send_deadline, CALL_STACK_FROM_CALL(call)); + grpc_error *error = grpc_call_stack_init( + &exec_ctx, channel_stack, 1, destroy_call, call, call->context, + args->server_transport_data, path, call->start_time, send_deadline, + CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { grpc_status_code status; const char *error_str; @@ -427,6 +430,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, get_final_status(call, set_status_value_directly, &c->final_info.final_status); + c->final_info.stats.latency = + gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); @@ -493,8 +498,8 @@ static void destroy_encodings_accepted_by_peer(void *p) { return; } static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { size_t i; grpc_compression_algorithm algorithm; - gpr_slice_buffer accept_encoding_parts; - gpr_slice accept_encoding_slice; + grpc_slice_buffer accept_encoding_parts; + grpc_slice accept_encoding_slice; void *accepted_user_data; accepted_user_data = @@ -506,23 +511,23 @@ static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { } accept_encoding_slice = mdel->value->slice; - gpr_slice_buffer_init(&accept_encoding_parts); - gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); + grpc_slice_buffer_init(&accept_encoding_parts); + grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already * zeroes the whole grpc_call */ /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); for (i = 0; i < accept_encoding_parts.count; i++) { - const gpr_slice *accept_encoding_entry_slice = + const grpc_slice *accept_encoding_entry_slice = &accept_encoding_parts.slices[i]; if (grpc_compression_algorithm_parse( - (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice), - GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) { + (const char *)GRPC_SLICE_START_PTR(*accept_encoding_entry_slice), + GRPC_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) { GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); } else { char *accept_encoding_entry_str = - gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII); + grpc_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII); gpr_log(GPR_ERROR, "Invalid entry in accept encoding metadata: '%s'. Ignoring.", accept_encoding_entry_str); @@ -530,7 +535,7 @@ static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) { } } - gpr_slice_buffer_destroy(&accept_encoding_parts); + grpc_slice_buffer_destroy(&accept_encoding_parts); grpc_mdelem_set_user_data( mdel, destroy_encodings_accepted_by_peer, @@ -551,14 +556,14 @@ static void get_final_details(grpc_call *call, char **out_details, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { if (call->status[i].is_set) { if (call->status[i].details) { - gpr_slice details = call->status[i].details->slice; - size_t len = GPR_SLICE_LENGTH(details); + grpc_slice details = call->status[i].details->slice; + size_t len = GRPC_SLICE_LENGTH(details); if (len + 1 > *out_details_capacity) { *out_details_capacity = GPR_MAX(len + 1, *out_details_capacity * 3 / 2); *out_details = gpr_realloc(*out_details, *out_details_capacity); } - memcpy(*out_details, GPR_SLICE_START_PTR(details), len); + memcpy(*out_details, GRPC_SLICE_START_PTR(details), len); (*out_details)[len] = 0; } else { goto no_details; @@ -632,9 +637,6 @@ static int prepare_application_metadata(grpc_call *call, int count, if (call->send_extra_metadata_count == 0) { prepend_extra_metadata = 0; } else { - for (i = 0; i < call->send_extra_metadata_count; i++) { - GRPC_MDELEM_REF(call->send_extra_metadata[i].md); - } for (i = 1; i < call->send_extra_metadata_count; i++) { call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; } @@ -680,6 +682,7 @@ static int prepare_application_metadata(grpc_call *call, int count, &call->send_extra_metadata[call->send_extra_metadata_count - 1]; batch->list.head->prev = NULL; batch->list.tail->next = NULL; + call->send_extra_metadata_count = 0; break; case 3: { /* prepend AND md */ @@ -695,6 +698,7 @@ static int prepare_application_metadata(grpc_call *call, int count, batch->list.tail = linked_from_md(last_md); batch->list.head->prev = NULL; batch->list.tail->next = NULL; + call->send_extra_metadata_count = 0; break; } default: @@ -900,7 +904,7 @@ static uint32_t decode_status(grpc_mdelem *md) { status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET; } else { if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), + GRPC_SLICE_LENGTH(md->value->slice), &status)) { status = GRPC_STATUS_UNKNOWN; /* could not parse status code */ } @@ -953,7 +957,7 @@ static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem, mdusr = &dest->metadata[dest->count++]; mdusr->key = grpc_mdstr_as_c_string(elem->key); mdusr->value = grpc_mdstr_as_c_string(elem->value); - mdusr->value_length = GPR_SLICE_LENGTH(elem->value->slice); + mdusr->value_length = GRPC_SLICE_LENGTH(elem->value->slice); GPR_TIMER_END("publish_app_metadata", 0); return elem; } @@ -1085,8 +1089,8 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, &call->receiving_slice, remaining, &call->receiving_slice_ready)) { - gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); } else { return; } @@ -1099,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_call *call = bctl->call; if (error == GRPC_ERROR_NONE) { - gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); continue_receiving_slices(exec_ctx, bctl); } else { if (grpc_trace_operation_failures) { @@ -1461,6 +1465,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message->data.raw.slice_buffer, op->flags); + /* If the outgoing buffer is already compressed, mark it as so in the + flags. These will be picked up by the compression filter and further + (wasteful) attempts at compression skipped. */ + if (op->data.send_message->data.raw.compression > GRPC_COMPRESS_NONE) { + call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } stream_op->send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: @@ -1516,8 +1526,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call, STATUS_FROM_API_OVERRIDE, GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value)); } - set_status_code(call, STATUS_FROM_API_OVERRIDE, - (uint32_t)op->data.send_status_from_server.status); + if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { + set_status_code(call, STATUS_FROM_API_OVERRIDE, + (uint32_t)op->data.send_status_from_server.status); + } if (!prepare_application_metadata( call, (int)op->data.send_status_from_server.trailing_metadata_count, @@ -1539,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } + /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come + from server.c. In that case, it's coming from accept_stream, and in + that case we're not necessarily covered by a poller. */ + stream_op->covered_by_poller = call->is_client; call->received_initial_metadata = 1; call->buffered_metadata[0] = op->data.recv_initial_metadata; grpc_closure_init(&call->receiving_initial_metadata_ready, |