diff options
author | Craig Tiller <ctiller@google.com> | 2016-12-12 16:58:57 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-01-06 14:40:08 -0800 |
commit | 841a99d7522af5cb1c9acdb2bd8bb431a8bfa758 (patch) | |
tree | b3bbafd2b855e1f1817f7cea8d72680582fbebe6 | |
parent | 9490389f868f6addf0e27f8c299f1e63c76dde1e (diff) |
Cleaning up error handling
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 39 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 15 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 9 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.c | 4 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 550 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 87 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 12 | ||||
-rw-r--r-- | test/core/end2end/tests/filter_causes_close.c | 9 |
9 files changed, 279 insertions, 453 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index fe2e50e71a..22bdb2952a 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -287,42 +287,3 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( sizeof(grpc_call_stack))); } - -static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) { - gpr_free(op); -} - -void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); - memset(op, 0, sizeof(*op)); - op->cancel_error = GRPC_ERROR_CANCELLED; - op->on_complete = - grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); - elem->filter->start_transport_stream_op(exec_ctx, elem, op); -} - -void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_status_code status, - grpc_slice *optional_message) { - grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); - memset(op, 0, sizeof(*op)); - op->on_complete = - grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); - grpc_transport_stream_op_add_cancellation_with_message(exec_ctx, op, status, - optional_message); - elem->filter->start_transport_stream_op(exec_ctx, elem, op); -} - -void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_status_code status, - grpc_slice *optional_message) { - grpc_transport_stream_op *op = gpr_malloc(sizeof(*op)); - memset(op, 0, sizeof(*op)); - op->on_complete = - grpc_closure_create(destroy_op, op, grpc_schedule_on_exec_ctx); - grpc_transport_stream_op_add_close(exec_ctx, op, status, optional_message); - elem->filter->start_transport_stream_op(exec_ctx, elem, op); -} diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index f420583a8d..1cf07d43c2 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -299,18 +299,9 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, grpc_call_element *elem, grpc_transport_stream_op *op); -void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, - grpc_call_element *cur_elem); - -void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *cur_elem, - grpc_status_code status, - grpc_slice *optional_message); - -void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *cur_elem, - grpc_status_code status, - grpc_slice *optional_message); +void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, + grpc_call_element *cur_elem, + grpc_error *error); extern int grpc_trace_channel; diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index a45a4d4b82..4e140c10f1 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -56,10 +56,11 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, deadline_state->timer_pending = false; gpr_mu_unlock(&deadline_state->timer_mu); if (error != GRPC_ERROR_CANCELLED) { - grpc_slice msg = grpc_slice_from_static_string("Deadline Exceeded"); - grpc_call_element_send_cancel_with_message( - exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg); - grpc_slice_unref_internal(exec_ctx, msg); + grpc_call_element_signal_error( + exec_ctx, elem, + grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_DEADLINE_EXCEEDED)); } GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer"); } diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index f8cdf62540..3578f20827 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -142,10 +142,11 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", op->send_message->length, calld->max_send_size); - grpc_slice message = grpc_slice_from_copied_string(message_string); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE(message_string), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_INVALID_ARGUMENT)); gpr_free(message_string); - grpc_call_element_send_close_with_message( - exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); } // Inject callback for receiving a message. if (op->recv_message_ready != NULL) { diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index f6eb8e02c9..e4bfa32214 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -385,9 +385,9 @@ void grpc_error_get_status(grpc_error *error, grpc_status_code *code, // If the error has a status message, use it. Otherwise, fall back to // the error description. *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE); - if (*msg == NULL) { + if (*msg == NULL && status != GRPC_STATUS_OK) { *msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION); - if (*msg == NULL) *msg = "uknown error"; // Just in case. + if (*msg == NULL) *msg = "unknown error"; // Just in case. } } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 04979ff460..1c32515c93 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -92,10 +92,8 @@ typedef enum { } status_source; typedef struct { - bool is_code_set; - bool is_details_set; - grpc_status_code code; - grpc_slice details; + bool is_set; + grpc_error *error; } received_status; #define MAX_ERRORS_PER_BATCH 3 @@ -224,8 +222,6 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); -static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error); static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description); @@ -233,6 +229,17 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error); +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data, grpc_slice *details); +static void set_status_value_directly(grpc_status_code status, void *dest); +static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, + status_source source, grpc_error *error); +static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl); +static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl); +static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, + grpc_error *error); grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, const grpc_call_create_args *args, @@ -386,24 +393,6 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } -static void get_final_status(grpc_call *call, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data) { - int i; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_code_set) { - set_value(call->status[i].code, set_value_user_data); - return; - } - } - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); - } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { @@ -419,11 +408,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } gpr_mu_destroy(&c->mu); - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (c->status[i].is_details_set) { - grpc_slice_unref_internal(exec_ctx, c->status[i].details); - } - } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -437,44 +421,241 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } grpc_channel *channel = c->channel; - get_final_status(call, set_status_value_directly, - &c->final_info.final_status); + get_final_status(call, set_status_value_directly, &c->final_info.final_status, + NULL); c->final_info.stats.latency = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time); + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + GRPC_ERROR_UNREF(c->status[i].error); + } + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); GPR_TIMER_END("destroy_call", 0); } -static void set_status_code(grpc_call *call, status_source source, - uint32_t status) { - if (call->status[source].is_code_set) return; +void grpc_call_destroy(grpc_call *c) { + int cancel; + grpc_call *parent = c->parent; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + GPR_TIMER_BEGIN("grpc_call_destroy", 0); + GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); + + if (parent) { + gpr_mu_lock(&parent->mu); + if (c == parent->first_child) { + parent->first_child = c->sibling_next; + if (c == parent->first_child) { + parent->first_child = NULL; + } + c->sibling_prev->sibling_next = c->sibling_next; + c->sibling_next->sibling_prev = c->sibling_prev; + } + gpr_mu_unlock(&parent->mu); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); + } + + gpr_mu_lock(&c->mu); + GPR_ASSERT(!c->destroy_called); + c->destroy_called = 1; + cancel = !c->received_final_op; + gpr_mu_unlock(&c->mu); + if (cancel) grpc_call_cancel(c, NULL); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); + GPR_TIMER_END("grpc_call_destroy", 0); +} + +grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { + GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); + GPR_ASSERT(!reserved); + return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", + NULL); +} + +static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op *op) { + grpc_call_element *elem; + + GPR_TIMER_BEGIN("execute_op", 0); + elem = CALL_ELEM_FROM_CALL(call, 0); + op->context = call->context; + elem->filter->start_transport_stream_op(exec_ctx, elem, op); + GPR_TIMER_END("execute_op", 0); +} + +char *grpc_call_get_peer(grpc_call *call) { + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + char *result; + GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); + result = elem->filter->get_peer(&exec_ctx, elem); + if (result == NULL) { + result = grpc_channel_get_target(call->channel); + } + if (result == NULL) { + result = gpr_strdup("unknown"); + } + grpc_exec_ctx_finish(&exec_ctx); + return result; +} + +grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { + return CALL_FROM_TOP_ELEM(elem); +} + +/******************************************************************************* + * CANCELLATION + */ + +grpc_call_error grpc_call_cancel_with_status(grpc_call *c, + grpc_status_code status, + const char *description, + void *reserved) { + grpc_call_error r; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_API_TRACE( + "grpc_call_cancel_with_status(" + "c=%p, status=%d, description=%s, reserved=%p)", + 4, (c, (int)status, description, reserved)); + GPR_ASSERT(reserved == NULL); + gpr_mu_lock(&c->mu); + r = cancel_with_status(&exec_ctx, c, status, description); + gpr_mu_unlock(&c->mu); + grpc_exec_ctx_finish(&exec_ctx); + return r; +} + +typedef enum { TC_CANCEL, TC_CLOSE } termination_closure_type; + +typedef struct termination_closure { + grpc_closure closure; + grpc_call *call; + grpc_error *error; + termination_closure_type type; + grpc_transport_stream_op op; +} termination_closure; + +static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, + grpc_error *error) { + termination_closure *tc = tcp; + GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination"); + GRPC_ERROR_UNREF(tc->error); + gpr_free(tc); +} + +static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, + grpc_error *error) { + termination_closure *tc = tcp; + memset(&tc->op, 0, sizeof(tc->op)); + switch (tc->type) { + case TC_CANCEL: + tc->op.cancel_error = tc->error; + break; + case TC_CLOSE: + tc->op.close_error = tc->error; + break; + } + /* reuse closure to catch completion */ + grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); + tc->op.on_complete = &tc->closure; + execute_op(exec_ctx, tc->call, &tc->op); +} + +static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, + termination_closure *tc) { + set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_REF(tc->error)); + grpc_closure_init(&tc->closure, send_termination, tc, + grpc_schedule_on_exec_ctx); + GRPC_CALL_INTERNAL_REF(tc->call, "termination"); + grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); + return GRPC_CALL_OK; +} + +static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx, + grpc_call *c, + termination_closure_type tc_type, + grpc_error *error) { + termination_closure *tc = gpr_malloc(sizeof(*tc)); + memset(tc, 0, sizeof(*tc)); + tc->type = tc_type; + tc->call = c; + tc->error = error; + return terminate_with_status(exec_ctx, tc); +} + +static grpc_error *error_from_status(grpc_status_code status, + const char *description) { + return grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); +} + +static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description) { + return terminate_with_error(exec_ctx, c, TC_CANCEL, + error_from_status(status, description)); +} - call->status[source].is_code_set = true; - call->status[source].code = (grpc_status_code)status; +static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_status_code status, + const char *description) { + return terminate_with_error(exec_ctx, c, TC_CLOSE, + error_from_status(status, description)); } -static void set_status_details(grpc_exec_ctx *exec_ctx, grpc_call *call, - status_source source, grpc_slice status) { - if (call->status[source].is_details_set) { - grpc_slice_unref_internal(exec_ctx, status); +/******************************************************************************* + * FINAL STATUS CODE MANIPULATION + */ + +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data, grpc_slice *details) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + const char *text = grpc_error_string(call->status[i].error); + gpr_log(GPR_DEBUG, "%s", text); + grpc_error_free_string(text); + + grpc_status_code code; + const char *msg = NULL; + grpc_error_get_status(call->status[i].error, &code, &msg); + set_value(code, set_value_user_data); + if (details != NULL) { + *details = grpc_slice_from_copied_string(msg); + } + return; + } + } + if (call->is_client) { + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); } else { - call->status[source].details = status; - call->status[source].is_details_set = true; + set_value(GRPC_STATUS_OK, set_value_user_data); } } static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call, status_source source, grpc_error *error) { - grpc_status_code status; - const char *msg; - grpc_error_get_status(error, &status, &msg); - set_status_code(call, source, (uint32_t)status); - set_status_details(exec_ctx, call, source, - grpc_slice_from_copied_string(msg)); + if (call->status[source].is_set) { + GRPC_ERROR_UNREF(error); + return; + } + call->status[source].is_set = true; + call->status[source].error = error; } +/******************************************************************************* + * COMPRESSION + */ + static void set_incoming_compression_algorithm( grpc_call *call, grpc_compression_algorithm algo) { GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT); @@ -560,23 +741,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } -static void get_final_details(grpc_call *call, grpc_slice *out_details) { - int i; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_code_set) { - if (call->status[i].is_details_set) { - *out_details = grpc_slice_ref(call->status[i].details); - } else { - goto no_details; - } - return; - } - } - -no_details: - *out_details = grpc_empty_slice(); -} - static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) { return (grpc_linked_mdelem *)&md->internal_data; } @@ -647,197 +811,6 @@ static int prepare_application_metadata( return 1; } -void grpc_call_destroy(grpc_call *c) { - int cancel; - grpc_call *parent = c->parent; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - GPR_TIMER_BEGIN("grpc_call_destroy", 0); - GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); - - if (parent) { - gpr_mu_lock(&parent->mu); - if (c == parent->first_child) { - parent->first_child = c->sibling_next; - if (c == parent->first_child) { - parent->first_child = NULL; - } - c->sibling_prev->sibling_next = c->sibling_next; - c->sibling_next->sibling_prev = c->sibling_prev; - } - gpr_mu_unlock(&parent->mu); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); - } - - gpr_mu_lock(&c->mu); - GPR_ASSERT(!c->destroy_called); - c->destroy_called = 1; - cancel = !c->received_final_op; - gpr_mu_unlock(&c->mu); - if (cancel) grpc_call_cancel(c, NULL); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); - GPR_TIMER_END("grpc_call_destroy", 0); -} - -grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { - GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); - GPR_ASSERT(!reserved); - return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled", - NULL); -} - -grpc_call_error grpc_call_cancel_with_status(grpc_call *c, - grpc_status_code status, - const char *description, - void *reserved) { - grpc_call_error r; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_API_TRACE( - "grpc_call_cancel_with_status(" - "c=%p, status=%d, description=%s, reserved=%p)", - 4, (c, (int)status, description, reserved)); - GPR_ASSERT(reserved == NULL); - gpr_mu_lock(&c->mu); - r = cancel_with_status(&exec_ctx, c, status, description); - gpr_mu_unlock(&c->mu); - grpc_exec_ctx_finish(&exec_ctx); - return r; -} - -typedef enum { TC_CANCEL, TC_CLOSE } termination_closure_type; - -typedef struct termination_closure { - grpc_closure closure; - grpc_call *call; - grpc_error *error; - termination_closure_type type; - grpc_transport_stream_op op; -} termination_closure; - -static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, - grpc_error *error) { - termination_closure *tc = tcp; - switch (tc->type) { - case TC_CANCEL: - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "cancel"); - break; - case TC_CLOSE: - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); - break; - } - GRPC_ERROR_UNREF(tc->error); - gpr_free(tc); -} - -static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { - termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = tc->error; - /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc, - grpc_schedule_on_exec_ctx); - tc->op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &tc->op); -} - -static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { - termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.close_error = tc->error; - /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc, - grpc_schedule_on_exec_ctx); - tc->op.on_complete = &tc->closure; - execute_op(exec_ctx, tc->call, &tc->op); -} - -static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, - termination_closure *tc) { - set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE, - tc->error); - - if (tc->type == TC_CANCEL) { - grpc_closure_init(&tc->closure, send_cancel, tc, grpc_schedule_on_exec_ctx); - GRPC_CALL_INTERNAL_REF(tc->call, "cancel"); - } else if (tc->type == TC_CLOSE) { - grpc_closure_init(&tc->closure, send_close, tc, grpc_schedule_on_exec_ctx); - GRPC_CALL_INTERNAL_REF(tc->call, "close"); - } - grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); - return GRPC_CALL_OK; -} - -static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx, - grpc_call *c, - termination_closure_type tc_type, - grpc_error *error) { - termination_closure *tc = gpr_malloc(sizeof(*tc)); - memset(tc, 0, sizeof(*tc)); - tc->type = tc_type; - tc->call = c; - tc->error = error; - return terminate_with_status(exec_ctx, tc); -} - -static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error) { - terminate_with_error(exec_ctx, c, TC_CANCEL, error); -} - -static grpc_error *error_from_status(grpc_status_code status, - const char *description) { - return grpc_error_set_int( - grpc_error_set_str(GRPC_ERROR_CREATE(description), - GRPC_ERROR_STR_GRPC_MESSAGE, description), - GRPC_ERROR_INT_GRPC_STATUS, status); -} - -static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_status_code status, - const char *description) { - return terminate_with_error(exec_ctx, c, TC_CANCEL, - error_from_status(status, description)); -} - -static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_status_code status, - const char *description) { - return terminate_with_error(exec_ctx, c, TC_CLOSE, - error_from_status(status, description)); -} - -static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op *op) { - grpc_call_element *elem; - - GPR_TIMER_BEGIN("execute_op", 0); - elem = CALL_ELEM_FROM_CALL(call, 0); - op->context = call->context; - elem->filter->start_transport_stream_op(exec_ctx, elem, op); - GPR_TIMER_END("execute_op", 0); -} - -char *grpc_call_get_peer(grpc_call *call) { - grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result; - GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); - result = elem->filter->get_peer(&exec_ctx, elem); - if (result == NULL) { - result = grpc_channel_get_target(call->channel); - } - if (result == NULL) { - result = gpr_strdup("unknown"); - } - grpc_exec_ctx_finish(&exec_ctx); - return result; -} - -grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { - return CALL_FROM_TOP_ELEM(elem); -} - /* we offset status by a small amount when storing it into transport metadata as metadata cannot store a 0 value (which is used as OK for grpc_status_codes */ @@ -880,22 +853,22 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) { static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_metadata_batch *b) { - if (b->idx.named.grpc_status != NULL) { - GPR_TIMER_BEGIN("status", 0); - set_status_code(call, STATUS_FROM_WIRE, - decode_status(b->idx.named.grpc_status->md)); - grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status); - GPR_TIMER_END("status", 0); - } + uint32_t status_code = decode_status(b->idx.named.grpc_status->md); + grpc_error *error = + status_code == GRPC_STATUS_OK + ? GRPC_ERROR_NONE + : grpc_error_set_int(GRPC_ERROR_CREATE("Error received from peer"), + GRPC_ERROR_INT_GRPC_STATUS, status_code); if (b->idx.named.grpc_message != NULL) { - GPR_TIMER_BEGIN("status-details", 0); - set_status_details( - exec_ctx, call, STATUS_FROM_WIRE, - grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md))); + char *msg = + grpc_slice_to_c_string(GRPC_MDVALUE(b->idx.named.grpc_message->md)); + error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); + gpr_free(msg); grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message); - GPR_TIMER_END("status-details", 0); } + + set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error); } static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b, @@ -1026,7 +999,8 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&call->mu); if (error != GRPC_ERROR_NONE) { - set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, error); + set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, + GRPC_ERROR_REF(error)); } if (bctl->send_initial_metadata) { @@ -1061,11 +1035,11 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (call->is_client) { get_final_status(call, set_status_value_directly, - call->final_op.client.status); - get_final_details(call, call->final_op.client.status_details); + call->final_op.client.status, + call->final_op.client.status_details); } else { get_final_status(call, set_cancelled_value, - call->final_op.server.cancelled); + call->final_op.server.cancelled, NULL); } GRPC_ERROR_UNREF(error); @@ -1468,19 +1442,25 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->send_extra_metadata_count = 1; call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem( exec_ctx, call->channel, op->data.send_status_from_server.status); - if (op->data.send_status_from_server.status_details != NULL) { - call->send_extra_metadata[1].md = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_ref_internal( - *op->data.send_status_from_server.status_details)); - call->send_extra_metadata_count++; - set_status_details(exec_ctx, call, STATUS_FROM_API_OVERRIDE, - grpc_slice_ref_internal(GRPC_MDVALUE( - call->send_extra_metadata[1].md))); - } - if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { - set_status_code(call, STATUS_FROM_API_OVERRIDE, - (uint32_t)op->data.send_status_from_server.status); + { + grpc_error *override_error = GRPC_ERROR_NONE; + if (op->data.send_status_from_server.status != GRPC_STATUS_OK) { + override_error = GRPC_ERROR_CREATE("Error from server send status"); + } + if (op->data.send_status_from_server.status_details != NULL) { + call->send_extra_metadata[1].md = grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_ref_internal( + *op->data.send_status_from_server.status_details)); + call->send_extra_metadata_count++; + char *msg = grpc_slice_to_c_string( + GRPC_MDVALUE(call->send_extra_metadata[1].md)); + override_error = grpc_error_set_str( + override_error, GRPC_ERROR_STR_GRPC_MESSAGE, msg); + gpr_free(msg); + } + set_status_from_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + override_error); } if (!prepare_application_metadata( exec_ctx, call, diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 8f9e6ca4ed..4841dc3143 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -176,93 +176,6 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, } typedef struct { - grpc_error *error; - grpc_closure *then_call; - grpc_closure closure; -} close_message_data; - -static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - close_message_data *cmd = p; - GRPC_ERROR_UNREF(cmd->error); - if (cmd->then_call != NULL) { - cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, error); - } - gpr_free(cmd); -} - -static void add_error(grpc_transport_stream_op *op, grpc_error **which, - grpc_error *error) { - close_message_data *cmd; - cmd = gpr_malloc(sizeof(*cmd)); - cmd->error = error; - cmd->then_call = op->on_complete; - grpc_closure_init(&cmd->closure, free_message, cmd, - grpc_schedule_on_exec_ctx); - op->on_complete = &cmd->closure; - *which = error; -} - -void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, - grpc_status_code status) { - GPR_ASSERT(status != GRPC_STATUS_OK); - if (op->cancel_error == GRPC_ERROR_NONE) { - op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED, - GRPC_ERROR_INT_GRPC_STATUS, status); - op->close_error = GRPC_ERROR_NONE; - } -} - -void grpc_transport_stream_op_add_cancellation_with_message( - grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, - grpc_status_code status, grpc_slice *optional_message) { - GPR_ASSERT(status != GRPC_STATUS_OK); - if (op->cancel_error != GRPC_ERROR_NONE) { - if (optional_message) { - grpc_slice_unref_internal(exec_ctx, *optional_message); - } - return; - } - grpc_error *error; - if (optional_message != NULL) { - char *msg = grpc_slice_to_c_string(*optional_message); - error = grpc_error_set_str(GRPC_ERROR_CREATE(msg), - GRPC_ERROR_STR_GRPC_MESSAGE, msg); - gpr_free(msg); - grpc_slice_unref_internal(exec_ctx, *optional_message); - } else { - error = GRPC_ERROR_CREATE("Call cancelled"); - } - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); - add_error(op, &op->cancel_error, error); -} - -void grpc_transport_stream_op_add_close(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_status_code status, - grpc_slice *optional_message) { - GPR_ASSERT(status != GRPC_STATUS_OK); - if (op->cancel_error != GRPC_ERROR_NONE || - op->close_error != GRPC_ERROR_NONE) { - if (optional_message) { - grpc_slice_unref_internal(exec_ctx, *optional_message); - } - return; - } - grpc_error *error; - if (optional_message != NULL) { - char *msg = grpc_slice_to_c_string(*optional_message); - error = grpc_error_set_str(GRPC_ERROR_CREATE(msg), - GRPC_ERROR_STR_GRPC_MESSAGE, msg); - gpr_free(msg); - grpc_slice_unref_internal(exec_ctx, *optional_message); - } else { - error = GRPC_ERROR_CREATE("Call force closed"); - } - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); - add_error(op, &op->close_error, error); -} - -typedef struct { grpc_closure outer_on_complete; grpc_closure *inner_on_complete; grpc_transport_op op; diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index d1281830aa..5a92226942 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -245,18 +245,6 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error); -void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, - grpc_status_code status); - -void grpc_transport_stream_op_add_cancellation_with_message( - grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, - grpc_status_code status, grpc_slice *optional_message); - -void grpc_transport_stream_op_add_close(grpc_exec_ctx *exec_ctx, - grpc_transport_stream_op *op, - grpc_status_code status, - grpc_slice *optional_message); - char *grpc_transport_stream_op_string(grpc_transport_stream_op *op); char *grpc_transport_op_string(grpc_transport_op *op); diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index d5f96bd4f2..8afd681505 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -208,15 +208,6 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; - if (error == GRPC_ERROR_NONE) { - // close the stream with an error. - grpc_slice message = - grpc_slice_from_copied_string("Failure that's not preventable."); - grpc_transport_stream_op *op = grpc_make_transport_stream_op(NULL); - grpc_transport_stream_op_add_close(exec_ctx, op, - GRPC_STATUS_PERMISSION_DENIED, &message); - grpc_call_next_op(exec_ctx, elem, op); - } grpc_closure_sched( exec_ctx, calld->recv_im_ready, GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1)); |