aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r--src/core/lib/surface/call.c74
1 files changed, 45 insertions, 29 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 02a5de0857..8ca3cab9d5 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -39,9 +39,9 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
@@ -49,6 +49,7 @@
#include "src/core/lib/compression/algorithm_metadata.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
@@ -122,6 +123,7 @@ struct grpc_call {
grpc_channel *channel;
grpc_call *parent;
grpc_call *first_child;
+ gpr_timespec start_time;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
@@ -184,7 +186,7 @@ struct grpc_call {
grpc_slice_buffer_stream sending_stream;
grpc_byte_stream *receiving_stream;
grpc_byte_buffer **receiving_buffer;
- gpr_slice receiving_slice;
+ grpc_slice receiving_slice;
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
grpc_closure receiving_initial_metadata_ready;
@@ -239,6 +241,7 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
call->channel = args->channel;
call->cq = args->cq;
call->parent = args->parent_call;
+ call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == NULL;
@@ -311,10 +314,10 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
- grpc_error *error =
- grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
- call->context, args->server_transport_data, path,
- send_deadline, CALL_STACK_FROM_CALL(call));
+ grpc_error *error = grpc_call_stack_init(
+ &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
+ args->server_transport_data, path, call->start_time, send_deadline,
+ CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
grpc_status_code status;
const char *error_str;
@@ -427,6 +430,8 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call,
get_final_status(call, set_status_value_directly,
&c->final_info.final_status);
+ c->final_info.stats.latency =
+ gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
@@ -493,8 +498,8 @@ static void destroy_encodings_accepted_by_peer(void *p) { return; }
static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) {
size_t i;
grpc_compression_algorithm algorithm;
- gpr_slice_buffer accept_encoding_parts;
- gpr_slice accept_encoding_slice;
+ grpc_slice_buffer accept_encoding_parts;
+ grpc_slice accept_encoding_slice;
void *accepted_user_data;
accepted_user_data =
@@ -506,23 +511,23 @@ static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) {
}
accept_encoding_slice = mdel->value->slice;
- gpr_slice_buffer_init(&accept_encoding_parts);
- gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
+ grpc_slice_buffer_init(&accept_encoding_parts);
+ grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
/* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
* zeroes the whole grpc_call */
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
for (i = 0; i < accept_encoding_parts.count; i++) {
- const gpr_slice *accept_encoding_entry_slice =
+ const grpc_slice *accept_encoding_entry_slice =
&accept_encoding_parts.slices[i];
if (grpc_compression_algorithm_parse(
- (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice),
- GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
+ (const char *)GRPC_SLICE_START_PTR(*accept_encoding_entry_slice),
+ GRPC_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
} else {
char *accept_encoding_entry_str =
- gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
+ grpc_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
gpr_log(GPR_ERROR,
"Invalid entry in accept encoding metadata: '%s'. Ignoring.",
accept_encoding_entry_str);
@@ -530,7 +535,7 @@ static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) {
}
}
- gpr_slice_buffer_destroy(&accept_encoding_parts);
+ grpc_slice_buffer_destroy(&accept_encoding_parts);
grpc_mdelem_set_user_data(
mdel, destroy_encodings_accepted_by_peer,
@@ -551,14 +556,14 @@ static void get_final_details(grpc_call *call, char **out_details,
for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (call->status[i].is_set) {
if (call->status[i].details) {
- gpr_slice details = call->status[i].details->slice;
- size_t len = GPR_SLICE_LENGTH(details);
+ grpc_slice details = call->status[i].details->slice;
+ size_t len = GRPC_SLICE_LENGTH(details);
if (len + 1 > *out_details_capacity) {
*out_details_capacity =
GPR_MAX(len + 1, *out_details_capacity * 3 / 2);
*out_details = gpr_realloc(*out_details, *out_details_capacity);
}
- memcpy(*out_details, GPR_SLICE_START_PTR(details), len);
+ memcpy(*out_details, GRPC_SLICE_START_PTR(details), len);
(*out_details)[len] = 0;
} else {
goto no_details;
@@ -632,9 +637,6 @@ static int prepare_application_metadata(grpc_call *call, int count,
if (call->send_extra_metadata_count == 0) {
prepend_extra_metadata = 0;
} else {
- for (i = 0; i < call->send_extra_metadata_count; i++) {
- GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
- }
for (i = 1; i < call->send_extra_metadata_count; i++) {
call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
}
@@ -680,6 +682,7 @@ static int prepare_application_metadata(grpc_call *call, int count,
&call->send_extra_metadata[call->send_extra_metadata_count - 1];
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
+ call->send_extra_metadata_count = 0;
break;
case 3: {
/* prepend AND md */
@@ -695,6 +698,7 @@ static int prepare_application_metadata(grpc_call *call, int count,
batch->list.tail = linked_from_md(last_md);
batch->list.head->prev = NULL;
batch->list.tail->next = NULL;
+ call->send_extra_metadata_count = 0;
break;
}
default:
@@ -900,7 +904,7 @@ static uint32_t decode_status(grpc_mdelem *md) {
status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
- GPR_SLICE_LENGTH(md->value->slice),
+ GRPC_SLICE_LENGTH(md->value->slice),
&status)) {
status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
}
@@ -953,7 +957,7 @@ static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem,
mdusr = &dest->metadata[dest->count++];
mdusr->key = grpc_mdstr_as_c_string(elem->key);
mdusr->value = grpc_mdstr_as_c_string(elem->value);
- mdusr->value_length = GPR_SLICE_LENGTH(elem->value->slice);
+ mdusr->value_length = GRPC_SLICE_LENGTH(elem->value->slice);
GPR_TIMER_END("publish_app_metadata", 0);
return elem;
}
@@ -1085,8 +1089,8 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
&call->receiving_slice, remaining,
&call->receiving_slice_ready)) {
- gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ call->receiving_slice);
} else {
return;
}
@@ -1099,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
if (error == GRPC_ERROR_NONE) {
- gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
- call->receiving_slice);
+ grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
+ call->receiving_slice);
continue_receiving_slices(exec_ctx, bctl);
} else {
if (grpc_trace_operation_failures) {
@@ -1461,6 +1465,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message->data.raw.slice_buffer, op->flags);
+ /* If the outgoing buffer is already compressed, mark it as so in the
+ flags. These will be picked up by the compression filter and further
+ (wasteful) attempts at compression skipped. */
+ if (op->data.send_message->data.raw.compression > GRPC_COMPRESS_NONE) {
+ call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
+ }
stream_op->send_message = &call->sending_stream.base;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@@ -1516,8 +1526,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, STATUS_FROM_API_OVERRIDE,
GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value));
}
- set_status_code(call, STATUS_FROM_API_OVERRIDE,
- (uint32_t)op->data.send_status_from_server.status);
+ if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
+ set_status_code(call, STATUS_FROM_API_OVERRIDE,
+ (uint32_t)op->data.send_status_from_server.status);
+ }
if (!prepare_application_metadata(
call,
(int)op->data.send_status_from_server.trailing_metadata_count,
@@ -1539,6 +1551,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
+ /* IF this is a server, then GRPC_OP_RECV_INITIAL_METADATA *must* come
+ from server.c. In that case, it's coming from accept_stream, and in
+ that case we're not necessarily covered by a poller. */
+ stream_op->covered_by_poller = call->is_client;
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
grpc_closure_init(&call->receiving_initial_metadata_ready,