diff options
author | Craig Tiller <ctiller@google.com> | 2017-01-20 18:11:52 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-20 18:11:52 -0800 |
commit | 5e01e2ac977655aa074faf7fde0a74298f5e4c55 (patch) | |
tree | 9acab9c5952f292683a6d474861e2997e2a9d664 /src/core/lib/surface/call.c | |
parent | c84725fd02dc58a819c8c4e8acdc321e81f44764 (diff) |
Revert "Metadata handling rewrite"
Diffstat (limited to 'src/core/lib/surface/call.c')
-rw-r--r-- | src/core/lib/surface/call.c | 988 |
1 files changed, 530 insertions, 458 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 63b0683df5..899e8fab3f 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -56,15 +56,13 @@ #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" -#include "src/core/lib/surface/validate_metadata.h" -#include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/transport.h" /** The maximum number of concurrent batches possible. Based upon the maximum number of individually queueable ops in the batch - api: + api: - initial metadata send - message send - status/close send (depending on client/server) @@ -94,21 +92,18 @@ typedef enum { } status_source; typedef struct { - bool is_set; - grpc_error *error; + uint8_t is_set; + grpc_status_code code; + grpc_mdstr *details; } received_status; -#define MAX_ERRORS_PER_BATCH 3 - typedef struct batch_control { grpc_call *call; grpc_cq_completion cq_completion; grpc_closure finish_batch; void *notify_tag; gpr_refcount steps_to_complete; - - grpc_error *errors[MAX_ERRORS_PER_BATCH]; - gpr_atm num_errors; + grpc_error *error; uint8_t send_initial_metadata; uint8_t send_message; @@ -190,7 +185,6 @@ struct grpc_call { grpc_call *sibling_prev; grpc_slice_buffer_stream sending_stream; - grpc_byte_stream *receiving_stream; grpc_byte_buffer **receiving_buffer; grpc_slice receiving_slice; @@ -202,7 +196,8 @@ struct grpc_call { union { struct { grpc_status_code *status; - grpc_slice *status_details; + char **status_details; + size_t *status_details_capacity; } client; struct { int *cancelled; @@ -224,23 +219,13 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); -static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error); +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); -static void get_final_status(grpc_call *call, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data, grpc_slice *details); -static void set_status_value_directly(grpc_status_code status, void *dest); -static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, - status_source source, grpc_error *error); -static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl); -static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl); -static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, - grpc_error *error); grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, const grpc_call_create_args *args, @@ -261,16 +246,14 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; - grpc_slice path = grpc_empty_slice(); + grpc_mdstr *path = NULL; if (call->is_client) { GPR_ASSERT(args->add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); for (i = 0; i < args->add_initial_metadata_count; i++) { call->send_extra_metadata[i].md = args->add_initial_metadata[i]; - if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]), - GRPC_MDSTR_PATH)) { - path = grpc_slice_ref_internal( - GRPC_MDVALUE(args->add_initial_metadata[i])); + if (args->add_initial_metadata[i]->key == GRPC_MDSTR_PATH) { + path = GRPC_MDSTR_REF(args->add_initial_metadata[i]->value); } } call->send_extra_metadata_count = (int)args->add_initial_metadata_count; @@ -337,7 +320,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, args->server_transport_data, path, call->start_time, send_deadline, CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { - cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); + grpc_status_code status; + const char *error_str; + grpc_error_get_status(error, &status, &error_str); + close_with_status(exec_ctx, call, status, error_str); } if (args->cq != NULL) { GPR_ASSERT( @@ -356,7 +342,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } - grpc_slice_unref_internal(exec_ctx, path); + if (path != NULL) GRPC_MDSTR_UNREF(exec_ctx, path); GPR_TIMER_END("grpc_call_create", 0); return error; @@ -391,6 +377,24 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + set_value(call->status[i].code, set_value_user_data); + return; + } + } + if (call->is_client) { + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); + } else { + set_value(GRPC_STATUS_OK, set_value_user_data); + } +} + static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { @@ -406,6 +410,11 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } gpr_mu_destroy(&c->mu); + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (c->status[i].details) { + GRPC_MDSTR_UNREF(exec_ctx, c->status[i].details); + } + } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -419,245 +428,42 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } grpc_channel *channel = c->channel; - get_final_status(call, set_status_value_directly, &c->final_info.final_status, - NULL); + get_final_status(call, set_status_value_directly, + &c->final_info.final_status); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - GRPC_ERROR_UNREF(c->status[i].error); - } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); GPR_TIMER_END("destroy_call", 0); } -void grpc_call_destroy(grpc_call *c) { - int cancel; - grpc_call *parent = c->parent; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - GPR_TIMER_BEGIN("grpc_call_destroy", 0); - GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); - - if (parent) { - gpr_mu_lock(&parent->mu); - if (c == parent->first_child) { - parent->first_child = c->sibling_next; - if (c == parent->first_child) { - parent->first_child = NULL; - } - c->sibling_prev->sibling_next = c->sibling_next; - c->sibling_next->sibling_prev = c->sibling_prev; - } - gpr_mu_unlock(&parent->mu); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); - } - - gpr_mu_lock(&c->mu); - GPR_ASSERT(!c->destroy_called); - c->destroy_called = 1; - cancel = !c->received_final_op; - gpr_mu_unlock(&c->mu); - if (cancel) grpc_call_cancel(c, NULL); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); - GPR_TIMER_END("grpc_call_destroy", 0); -} - -grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { - GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); - GPR_ASSERT(!reserved); - return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", - NULL); -} - -static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op) { - grpc_call_element *elem; - - GPR_TIMER_BEGIN("execute_op", 0); - elem = CALL_ELEM_FROM_CALL(call, 0); - op->context = call->context; - elem->filter->start_transport_stream_op(exec_ctx, elem, op); - GPR_TIMER_END("execute_op", 0); -} - -char *grpc_call_get_peer(grpc_call *call) { - grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result; - GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); - result = elem->filter->get_peer(&exec_ctx, elem); - if (result == NULL) { - result = grpc_channel_get_target(call->channel); - } - if (result == NULL) { - result = gpr_strdup("unknown"); - } - grpc_exec_ctx_finish(&exec_ctx); - return result; -} - -grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { - return CALL_FROM_TOP_ELEM(elem); -} - -/******************************************************************************* - * CANCELLATION - */ - -grpc_call_error grpc_call_cancel_with_status(grpc_call *c, - grpc_status_code status, - const char *description, - void *reserved) { - grpc_call_error r; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_API_TRACE( - "grpc_call_cancel_with_status(" - "c=%p, status=%d, description=%s, reserved=%p)", - 4, (c, (int)status, description, reserved)); - GPR_ASSERT(reserved == NULL); - gpr_mu_lock(&c->mu); - r = cancel_with_status(&exec_ctx, c, status, description); - gpr_mu_unlock(&c->mu); - grpc_exec_ctx_finish(&exec_ctx); - return r; -} - -typedef struct termination_closure { - grpc_closure closure; - grpc_call *call; - grpc_error *error; - grpc_transport_stream_op op; -} termination_closure; - -static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, - grpc_error *error) { - termination_closure *tc = tcp; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination"); - gpr_free(tc); -} - -static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, - grpc_error *error) { - termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = tc->error; - /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc, - grpc_schedule_on_exec_ctx); - tc->op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &tc->op); -} - -static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, - termination_closure *tc) { - set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_REF(tc->error)); - grpc_closure_init(&tc->closure, send_termination, tc, - grpc_schedule_on_exec_ctx); - GRPC_CALL_INTERNAL_REF(tc->call, "termination"); - grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); - return GRPC_CALL_OK; -} - -static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx, - grpc_call *c, grpc_error *error) { - termination_closure *tc = gpr_malloc(sizeof(*tc)); - memset(tc, 0, sizeof(*tc)); - tc->call = c; - tc->error = error; - return terminate_with_status(exec_ctx, tc); -} +static void set_status_code(grpc_call *call, status_source source, + uint32_t status) { + if (call->status[source].is_set) return; -static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error) { - terminate_with_error(exec_ctx, c, error); + call->status[source].is_set = 1; + call->status[source].code = (grpc_status_code)status; } -static grpc_error *error_from_status(grpc_status_code status, - const char *description) { - return grpc_error_set_int( - grpc_error_set_str(GRPC_ERROR_CREATE(description), - GRPC_ERROR_STR_GRPC_MESSAGE, description), - GRPC_ERROR_INT_GRPC_STATUS, status); -} - -static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_status_code status, - const char *description) { - return terminate_with_error(exec_ctx, c, - error_from_status(status, description)); -} - -/******************************************************************************* - * FINAL STATUS CODE MANIPULATION - */ - -static void get_final_status_from(grpc_call *call, status_source from_source, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data, - grpc_slice *details) { - grpc_status_code code; - const char *msg = NULL; - grpc_error_get_status(call->status[from_source].error, call->send_deadline, - &code, &msg, NULL); - - set_value(code, set_value_user_data); - if (details != NULL) { - *details = - msg == NULL ? grpc_empty_slice() : grpc_slice_from_copied_string(msg); - } -} - -static void get_final_status(grpc_call *call, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data, grpc_slice *details) { - int i; - /* search for the best status we can present: ideally the error we use has a - clearly defined grpc-status, and we'll prefer that. */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set && - grpc_error_has_clear_grpc_status(call->status[i].error)) { - get_final_status_from(call, (status_source)i, set_value, - set_value_user_data, details); - return; - } - } - /* If no clearly defined status exists, search for 'anything' */ - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - get_final_status_from(call, (status_source)i, set_value, - set_value_user_data, details); - return; - } - } - /* If nothing exists, set some default */ - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); +static void set_status_details(grpc_exec_ctx *exec_ctx, grpc_call *call, + status_source source, grpc_mdstr *status) { + if (call->status[source].details != NULL) { + GRPC_MDSTR_UNREF(exec_ctx, status); } else { - set_value(GRPC_STATUS_OK, set_value_user_data); + call->status[source].details = status; } } static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, status_source source, grpc_error *error) { - if (call->status[source].is_set) { - GRPC_ERROR_UNREF(error); - return; - } - call->status[source].is_set = true; - call->status[source].error = error; + grpc_status_code status; + const char *msg; + grpc_error_get_status(error, &status, &msg); + set_status_code(call, source, (uint32_t)status); + set_status_details(exec_ctx, call, source, grpc_mdstr_from_string(msg)); } -/******************************************************************************* - * COMPRESSION - */ - static void set_incoming_compression_algorithm( grpc_call *call, grpc_compression_algorithm algo) { GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); @@ -690,7 +496,7 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { static void destroy_encodings_accepted_by_peer(void *p) { return; } static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, - grpc_call *call, grpc_mdelem mdel) { + grpc_call *call, grpc_mdelem *mdel) { size_t i; grpc_compression_algorithm algorithm; grpc_slice_buffer accept_encoding_parts; @@ -705,7 +511,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, return; } - accept_encoding_slice = GRPC_MDVALUE(mdel); + accept_encoding_slice = mdel->value->slice; grpc_slice_buffer_init(&accept_encoding_parts); grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts); @@ -714,13 +520,15 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); for (i = 0; i < accept_encoding_parts.count; i++) { - grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i]; - if (grpc_compression_algorithm_parse(accept_encoding_entry_slice, - &algorithm)) { + const grpc_slice *accept_encoding_entry_slice = + &accept_encoding_parts.slices[i]; + if (grpc_compression_algorithm_parse( + (const char *)GRPC_SLICE_START_PTR(*accept_encoding_entry_slice), + GRPC_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) { GPR_BITSET(&call->encodings_accepted_by_peer, algorithm); } else { char *accept_encoding_entry_str = - grpc_slice_to_c_string(accept_encoding_entry_slice); + grpc_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII); gpr_log(GPR_ERROR, "Invalid entry in accept encoding metadata: '%s'. Ignoring.", accept_encoding_entry_str); @@ -743,6 +551,36 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } +static void get_final_details(grpc_call *call, char **out_details, + size_t *out_details_capacity) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + if (call->status[i].details) { + grpc_slice details = call->status[i].details->slice; + size_t len = GRPC_SLICE_LENGTH(details); + if (len + 1 > *out_details_capacity) { + *out_details_capacity = + GPR_MAX(len + 1, *out_details_capacity * 3 / 2); + *out_details = gpr_realloc(*out_details, *out_details_capacity); + } + memcpy(*out_details, GRPC_SLICE_START_PTR(details), len); + (*out_details)[len] = 0; + } else { + goto no_details; + } + return; + } + } + +no_details: + if (0 == *out_details_capacity) { + *out_details_capacity = 8; + *out_details = gpr_malloc(*out_details_capacity); + } + **out_details = 0; +} + static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } @@ -769,19 +607,24 @@ static int prepare_application_metadata( get_md_elem(metadata, additional_metadata, i, count); grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); - if (!GRPC_LOG_IF_ERROR("validate_metadata", - grpc_validate_header_key_is_legal(md->key))) { + l->md = grpc_mdelem_from_string_and_buffer( + exec_ctx, md->key, (const uint8_t *)md->value, md->value_length); + if (!grpc_header_key_is_legal(grpc_mdstr_as_c_string(l->md->key), + GRPC_MDSTR_LENGTH(l->md->key))) { + gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", + grpc_mdstr_as_c_string(l->md->key)); break; - } else if (!grpc_is_binary_header(md->key) && - !GRPC_LOG_IF_ERROR( - "validate_metadata", - grpc_validate_header_nonbin_value_is_legal(md->value))) { + } else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key), + GRPC_MDSTR_LENGTH(l->md->key)) && + !grpc_header_nonbin_value_is_legal( + grpc_mdstr_as_c_string(l->md->value), + GRPC_MDSTR_LENGTH(l->md->value))) { + gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); break; } - l->md = grpc_mdelem_from_grpc_metadata(exec_ctx, (grpc_metadata *)md); } if (i != total_count) { - for (int j = 0; j < i; j++) { + for (int j = 0; j <= i; j++) { const grpc_metadata *md = get_md_elem(metadata, additional_metadata, j, count); grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; @@ -793,41 +636,278 @@ static int prepare_application_metadata( if (call->send_extra_metadata_count == 0) { prepend_extra_metadata = 0; } else { - for (i = 0; i < call->send_extra_metadata_count; i++) { - GRPC_LOG_IF_ERROR("prepare_application_metadata", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &call->send_extra_metadata[i])); + for (i = 1; i < call->send_extra_metadata_count; i++) { + call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; + } + for (i = 0; i < call->send_extra_metadata_count - 1; i++) { + call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; } } } - for (i = 0; i < total_count; i++) { + for (i = 1; i < total_count; i++) { + grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); + grpc_metadata *prev_md = + get_md_elem(metadata, additional_metadata, i - 1, count); + linked_from_md(md)->prev = linked_from_md(prev_md); + } + for (i = 0; i < total_count - 1; i++) { grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count); - GRPC_LOG_IF_ERROR( - "prepare_application_metadata", - grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md))); + grpc_metadata *next_md = + get_md_elem(metadata, additional_metadata, i + 1, count); + linked_from_md(md)->next = linked_from_md(next_md); + } + + switch (prepend_extra_metadata * 2 + (total_count != 0)) { + case 0: + /* no prepend, no metadata => nothing to do */ + batch->list.head = batch->list.tail = NULL; + break; + case 1: { + /* metadata, but no prepend */ + grpc_metadata *first_md = + get_md_elem(metadata, additional_metadata, 0, count); + grpc_metadata *last_md = + get_md_elem(metadata, additional_metadata, total_count - 1, count); + batch->list.head = linked_from_md(first_md); + batch->list.tail = linked_from_md(last_md); + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; + break; + } + case 2: + /* prepend, but no md */ + batch->list.head = &call->send_extra_metadata[0]; + batch->list.tail = + &call->send_extra_metadata[call->send_extra_metadata_count - 1]; + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; + call->send_extra_metadata_count = 0; + break; + case 3: { + /* prepend AND md */ + grpc_metadata *first_md = + get_md_elem(metadata, additional_metadata, 0, count); + grpc_metadata *last_md = + get_md_elem(metadata, additional_metadata, total_count - 1, count); + batch->list.head = &call->send_extra_metadata[0]; + call->send_extra_metadata[call->send_extra_metadata_count - 1].next = + linked_from_md(first_md); + linked_from_md(first_md)->prev = + &call->send_extra_metadata[call->send_extra_metadata_count - 1]; + batch->list.tail = linked_from_md(last_md); + batch->list.head->prev = NULL; + batch->list.tail->next = NULL; + call->send_extra_metadata_count = 0; + break; + } + default: + GPR_UNREACHABLE_CODE(return 0); } - call->send_extra_metadata_count = 0; return 1; } +void grpc_call_destroy(grpc_call *c) { + int cancel; + grpc_call *parent = c->parent; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + GPR_TIMER_BEGIN("grpc_call_destroy", 0); + GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); + + if (parent) { + gpr_mu_lock(&parent->mu); + if (c == parent->first_child) { + parent->first_child = c->sibling_next; + if (c == parent->first_child) { + parent->first_child = NULL; + } + c->sibling_prev->sibling_next = c->sibling_next; + c->sibling_next->sibling_prev = c->sibling_prev; + } + gpr_mu_unlock(&parent->mu); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); + } + + gpr_mu_lock(&c->mu); + GPR_ASSERT(!c->destroy_called); + c->destroy_called = 1; + cancel = !c->received_final_op; + gpr_mu_unlock(&c->mu); + if (cancel) grpc_call_cancel(c, NULL); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_call_destroy", 0); +} + +grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { + GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); + GPR_ASSERT(!reserved); + return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", + NULL); +} + +grpc_call_error grpc_call_cancel_with_status(grpc_call *c, + grpc_status_code status, + const char *description, + void *reserved) { + grpc_call_error r; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_API_TRACE( + "grpc_call_cancel_with_status(" + "c=%p, status=%d, description=%s, reserved=%p)", + 4, (c, (int)status, description, reserved)); + GPR_ASSERT(reserved == NULL); + gpr_mu_lock(&c->mu); + r = cancel_with_status(&exec_ctx, c, status, description); + gpr_mu_unlock(&c->mu); + grpc_exec_ctx_finish(&exec_ctx); + return r; +} + +typedef struct termination_closure { + grpc_closure closure; + grpc_call *call; + grpc_error *error; + enum { TC_CANCEL, TC_CLOSE } type; + grpc_transport_stream_op op; +} termination_closure; + +static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, + grpc_error *error) { + termination_closure *tc = tcp; + switch (tc->type) { + case TC_CANCEL: + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel"); + break; + case TC_CLOSE: + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); + break; + } + GRPC_ERROR_UNREF(tc->error); + gpr_free(tc); +} + +static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { + termination_closure *tc = tcp; + memset(&tc->op, 0, sizeof(tc->op)); + tc->op.cancel_error = tc->error; + /* reuse closure to catch completion */ + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); + tc->op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &tc->op); +} + +static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { + termination_closure *tc = tcp; + memset(&tc->op, 0, sizeof(tc->op)); + tc->op.close_error = tc->error; + /* reuse closure to catch completion */ + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); + tc->op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &tc->op); +} + +static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, + termination_closure *tc) { + set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE, + tc->error); + + if (tc->type == TC_CANCEL) { + grpc_closure_init(&tc->closure, send_cancel, tc, grpc_schedule_on_exec_ctx); + GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); + } else if (tc->type == TC_CLOSE) { + grpc_closure_init(&tc->closure, send_close, tc, grpc_schedule_on_exec_ctx); + GRPC_CALL_INTERNAL_REF(tc->call, "close"); + } + grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); + return GRPC_CALL_OK; +} + +static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CANCEL; + tc->call = c; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); + + return terminate_with_status(exec_ctx, tc); +} + +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(termination_closure)); + tc->type = TC_CLOSE; + tc->call = c; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); + + return terminate_with_status(exec_ctx, tc); +} + +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op *op) { + grpc_call_element *elem; + + GPR_TIMER_BEGIN("execute_op", 0); + elem = CALL_ELEM_FROM_CALL(call, 0); + op->context = call->context; + elem->filter->start_transport_stream_op(exec_ctx, elem, op); + GPR_TIMER_END("execute_op", 0); +} + +char *grpc_call_get_peer(grpc_call *call) { + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + char *result; + GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } + grpc_exec_ctx_finish(&exec_ctx); + return result; +} + +grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { + return CALL_FROM_TOP_ELEM(elem); +} + /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ #define STATUS_OFFSET 1 static void destroy_status(void *ignored) {} -static uint32_t decode_status(grpc_mdelem md) { +static uint32_t decode_status(grpc_mdelem *md) { uint32_t status; void *user_data; - if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_0)) return 0; - if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_1)) return 1; - if (grpc_mdelem_eq(md, GRPC_MDELEM_GRPC_STATUS_2)) return 2; + if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0; + if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1; + if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2; user_data = grpc_mdelem_get_user_data(md, destroy_status); if (user_data != NULL) { status = ((uint32_t)(intptr_t)user_data) - STATUS_OFFSET; } else { - if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(md), &status)) { + if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), + GRPC_SLICE_LENGTH(md->value->slice), + &status)) { status = GRPC_STATUS_UNKNOWN; /* could not parse status code */ } grpc_mdelem_set_user_data(md, destroy_status, @@ -836,104 +916,93 @@ static uint32_t decode_status(grpc_mdelem md) { return status; } -static grpc_compression_algorithm decode_compression(grpc_mdelem md) { +static grpc_compression_algorithm decode_compression(grpc_mdelem *md) { grpc_compression_algorithm algorithm = - grpc_compression_algorithm_from_slice(GRPC_MDVALUE(md)); + grpc_compression_algorithm_from_mdstr(md->value); if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) { - char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + const char *md_c_str = grpc_mdstr_as_c_string(md->value); gpr_log(GPR_ERROR, "Invalid incoming compression algorithm: '%s'. Interpreting " "incoming data as uncompressed.", md_c_str); - gpr_free(md_c_str); return GRPC_COMPRESS_NONE; } return algorithm; } -static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *b) { - if (b->idx.named.grpc_status != NULL) { - uint32_t status_code = decode_status(b->idx.named.grpc_status->md); - grpc_error *error = - status_code == GRPC_STATUS_OK - ? GRPC_ERROR_NONE - : grpc_error_set_int(GRPC_ERROR_CREATE("Error received from peer"), - GRPC_ERROR_INT_GRPC_STATUS, - (intptr_t)status_code); - - if (b->idx.named.grpc_message != NULL) { - char *msg = - grpc_slice_to_c_string(GRPC_MDVALUE(b->idx.named.grpc_message->md)); - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); - gpr_free(msg); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); - } else { - error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, ""); - } - - set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); - } +static grpc_mdelem *recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_mdelem *elem) { + if (elem->key == GRPC_MDSTR_GRPC_STATUS) { + GPR_TIMER_BEGIN("status", 0); + set_status_code(call, STATUS_FROM_WIRE, decode_status(elem)); + GPR_TIMER_END("status", 0); + return NULL; + } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) { + GPR_TIMER_BEGIN("status-details", 0); + set_status_details(exec_ctx, call, STATUS_FROM_WIRE, + GRPC_MDSTR_REF(elem->value)); + GPR_TIMER_END("status-details", 0); + return NULL; + } + return elem; } -static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, - int is_trailing) { - if (b->list.count == 0) return; - GPR_TIMER_BEGIN("publish_app_metadata", 0); +static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem, + int is_trailing) { grpc_metadata_array *dest; grpc_metadata *mdusr; + GPR_TIMER_BEGIN("publish_app_metadata", 0); dest = call->buffered_metadata[is_trailing]; - if (dest->count + b->list.count > dest->capacity) { - dest->capacity = - GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2); + if (dest->count == dest->capacity) { + dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2); dest->metadata = gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); } - for (grpc_linked_mdelem *l = b->list.head; l != NULL; l = l->next) { - mdusr = &dest->metadata[dest->count++]; - /* we pass back borrowed slices that are valid whilst the call is valid */ - mdusr->key = GRPC_MDKEY(l->md); - mdusr->value = GRPC_MDVALUE(l->md); - } + mdusr = &dest->metadata[dest->count++]; + mdusr->key = grpc_mdstr_as_c_string(elem->key); + mdusr->value = grpc_mdstr_as_c_string(elem->value); + mdusr->value_length = GRPC_SLICE_LENGTH(elem->value->slice); GPR_TIMER_END("publish_app_metadata", 0); + return elem; } -static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_metadata_batch *b) { - recv_common_filter(exec_ctx, call, b); - - if (b->idx.named.grpc_encoding != NULL) { +static grpc_mdelem *recv_initial_filter(grpc_exec_ctx *exec_ctx, void *args, + grpc_mdelem *elem) { + grpc_call *call = args; + elem = recv_common_filter(exec_ctx, call, elem); + if (elem == NULL) { + return NULL; + } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) { GPR_TIMER_BEGIN("incoming_compression_algorithm", 0); - set_incoming_compression_algorithm( - call, decode_compression(b->idx.named.grpc_encoding->md)); + set_incoming_compression_algorithm(call, decode_compression(elem)); GPR_TIMER_END("incoming_compression_algorithm", 0); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding); - } - - if (b->idx.named.grpc_accept_encoding != NULL) { + return NULL; + } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) { GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0); - set_encodings_accepted_by_peer(exec_ctx, call, - b->idx.named.grpc_accept_encoding->md); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding); + set_encodings_accepted_by_peer(exec_ctx, call, elem); GPR_TIMER_END("encodings_accepted_by_peer", 0); + return NULL; + } else { + return publish_app_metadata(call, elem, 0); } - - publish_app_metadata(call, b, false); } -static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, - grpc_metadata_batch *b) { +static grpc_mdelem *recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args, + grpc_mdelem *elem) { grpc_call *call = args; - recv_common_filter(exec_ctx, call, b); - publish_app_metadata(call, b, true); + elem = recv_common_filter(exec_ctx, call, elem); + if (elem == NULL) { + return NULL; + } else { + return publish_app_metadata(call, elem, 1); + } } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) { return CALL_STACK_FROM_CALL(call); } -/******************************************************************************* +/* * BATCH API IMPLEMENTATION */ @@ -984,83 +1053,14 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } -static grpc_error *consolidate_batch_errors(batch_control *bctl) { - size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors); - if (n == 0) { - return GRPC_ERROR_NONE; - } else if (n == 1) { - return bctl->errors[0]; - } else { - grpc_error *error = - GRPC_ERROR_CREATE_REFERENCING("Call batch failed", bctl->errors, n); - for (size_t i = 0; i < n; i++) { - GRPC_ERROR_UNREF(bctl->errors[i]); - } - return error; - } -} - static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { - grpc_call *child_call; - grpc_call *next_child_call; grpc_call *call = bctl->call; - grpc_error *error = consolidate_batch_errors(bctl); - - gpr_mu_lock(&call->mu); - - if (error != GRPC_ERROR_NONE) { - set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, - GRPC_ERROR_REF(error)); - } - - if (bctl->send_initial_metadata) { - grpc_metadata_batch_destroy( - exec_ctx, - &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); - } - if (bctl->send_message) { - call->sending_message = false; - } - if (bctl->send_final_op) { - grpc_metadata_batch_destroy( - exec_ctx, - &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); - } + grpc_error *error = bctl->error; if (bctl->recv_final_op) { - grpc_metadata_batch *md = - &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; - recv_trailing_filter(exec_ctx, call, md); - - call->received_final_op = true; - /* propagate cancellation to any interested children */ - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); - } - - if (call->is_client) { - get_final_status(call, set_status_value_directly, - call->final_op.client.status, - call->final_op.client.status_details); - } else { - get_final_status(call, set_cancelled_value, - call->final_op.server.cancelled, NULL); - } - GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } - gpr_mu_unlock(&call->mu); - if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ grpc_closure_run(exec_ctx, bctl->notify_tag, error); @@ -1077,12 +1077,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, } } -static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { - if (gpr_unref(&bctl->steps_to_complete)) { - post_batch_completion(exec_ctx, bctl); - } -} - static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; @@ -1093,7 +1087,9 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, call->receiving_message = 0; grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); call->receiving_stream = NULL; - finish_batch_step(exec_ctx, bctl); + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } return; } if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, @@ -1124,7 +1120,9 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, call->receiving_stream = NULL; grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = NULL; - finish_batch_step(exec_ctx, bctl); + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } } } @@ -1134,7 +1132,9 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, if (call->receiving_stream == NULL) { *call->receiving_buffer = NULL; call->receiving_message = 0; - finish_batch_step(exec_ctx, bctl); + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } } else { call->test_only_last_message_flags = call->receiving_stream->flags; if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && @@ -1154,11 +1154,14 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - gpr_mu_lock(&bctl->call->mu); if (error != GRPC_ERROR_NONE) { - cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); + grpc_status_code status; + const char *msg; + grpc_error_get_status(error, &status, &msg); + close_with_status(exec_ctx, call, status, msg); } - if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || + gpr_mu_lock(&bctl->call->mu); + if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); process_data_after_md(exec_ctx, bctlp); @@ -1183,7 +1186,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", algo); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ @@ -1192,7 +1195,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); } else { call->incoming_compression_algorithm = algo; } @@ -1218,12 +1221,12 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } } -static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, - grpc_error *error) { +static void add_batch_error(batch_control *bctl, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; - cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error)); - int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); - bctl->errors[idx] = error; + if (bctl->error == GRPC_ERROR_NONE) { + bctl->error = GRPC_ERROR_CREATE("Call batch operation failed"); + } + bctl->error = grpc_error_add_child(bctl->error, error); } static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, @@ -1233,13 +1236,12 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&call->mu); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error)); + add_batch_error(bctl, GRPC_ERROR_REF(error)); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; - recv_initial_filter(exec_ctx, call, md); + grpc_metadata_batch_filter(exec_ctx, md, recv_initial_filter, call); - /* TODO(ctiller): this could be moved into recv_initial_filter now */ GPR_TIMER_BEGIN("validate_filtered_metadata", 0); validate_filtered_metadata(exec_ctx, bctl); GPR_TIMER_END("validate_filtered_metadata", 0); @@ -1263,15 +1265,85 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&call->mu); - finish_batch_step(exec_ctx, bctl); + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } } static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + grpc_call *child_call; + grpc_call *next_child_call; + + GRPC_ERROR_REF(error); + + gpr_mu_lock(&call->mu); + + // If the error has an associated status code, set the call's status. + intptr_t status; + if (error != GRPC_ERROR_NONE && + grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { + set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); + } + + if (bctl->send_initial_metadata) { + if (error != GRPC_ERROR_NONE) { + set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); + } + grpc_metadata_batch_destroy( + exec_ctx, + &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]); + } + if (bctl->send_message) { + call->sending_message = 0; + } + if (bctl->send_final_op) { + grpc_metadata_batch_destroy( + exec_ctx, + &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]); + } + if (bctl->recv_final_op) { + grpc_metadata_batch *md = + &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; + grpc_metadata_batch_filter(exec_ctx, md, recv_trailing_filter, call); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error)); - finish_batch_step(exec_ctx, bctl); + call->received_final_op = true; + /* propagate cancellation to any interested children */ + child_call = call->first_child; + if (child_call != NULL) { + do { + next_child_call = child_call->sibling_next; + if (child_call->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); + grpc_call_cancel(child_call, NULL); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); + } + child_call = next_child_call; + } while (child_call != call->first_child); + } + + if (call->is_client) { + get_final_status(call, set_status_value_directly, + call->final_op.client.status); + get_final_details(call, call->final_op.client.status_details, + call->final_op.client.status_details_capacity); + } else { + get_final_status(call, set_cancelled_value, + call->final_op.server.cancelled); + } + + GRPC_ERROR_UNREF(error); + error = GRPC_ERROR_NONE; + } + add_batch_error(bctl, GRPC_ERROR_REF(error)); + gpr_mu_unlock(&call->mu); + if (gpr_unref(&bctl->steps_to_complete)) { + post_batch_completion(exec_ctx, bctl); + } + + GRPC_ERROR_UNREF(error); } static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, @@ -1305,6 +1377,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (nops == 0) { GRPC_CALL_INTERNAL_REF(call, "completion"); + bctl->error = GRPC_ERROR_NONE; if (!is_notify_tag_closure) { grpc_cq_begin_op(call->cq, notify_tag); } @@ -1353,10 +1426,13 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, const grpc_compression_algorithm calgo = compression_algorithm_for_level_locked( call, effective_compression_level); + char *calgo_name = NULL; + grpc_compression_algorithm_name(calgo, &calgo_name); // the following will be picked up by the compress filter and used as // the call's compression algorithm. - compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; - compression_md.value = grpc_compression_algorithm_slice(calgo); + compression_md.key = GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY; + compression_md.value = calgo_name; + compression_md.value_length = strlen(calgo_name); additional_metadata_count++; } @@ -1449,25 +1525,19 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( exec_ctx, call->channel, op->data.send_status_from_server.status); - { - grpc_error *override_error = GRPC_ERROR_NONE; - if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { - override_error = GRPC_ERROR_CREATE("Error from server send status"); - } - if (op->data.send_status_from_server.status_details != NULL) { - call->send_extra_metadata[1].md = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_ref_internal( - *op->data.send_status_from_server.status_details)); - call->send_extra_metadata_count++; - char *msg = grpc_slice_to_c_string( - GRPC_MDVALUE(call->send_extra_metadata[1].md)); - override_error = grpc_error_set_str( - override_error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); - gpr_free(msg); - } - set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - override_error); + if (op->data.send_status_from_server.status_details != NULL) { + call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings( + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_mdstr_from_string( + op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + set_status_details( + exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value)); + } + if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { + set_status_code(call, STATUS_FROM_API_OVERRIDE, + (uint32_t)op->data.send_status_from_server.status); } if (!prepare_application_metadata( exec_ctx, call, @@ -1545,6 +1615,8 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->final_op.client.status = op->data.recv_status_on_client.status; call->final_op.client.status_details = op->data.recv_status_on_client.status_details; + call->final_op.client.status_details_capacity = + op->data.recv_status_on_client.status_details_capacity; bctl->recv_final_op = 1; stream_op->recv_trailing_metadata = &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; |