diff options
author | Craig Tiller <ctiller@google.com> | 2016-06-30 09:05:33 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-30 09:05:33 -0700 |
commit | cdac58bb03e060668b32836bb60a8e629cf39049 (patch) | |
tree | 80ba8b275259802a1db4fabf20a35e9aca78aa70 /src/core/lib | |
parent | b0c246e27cf4f3a5d2c1e35105b57a73e01ce02d (diff) | |
parent | a439859c73f34bec8f38e28fe818e2d3305d7873 (diff) |
Merge pull request #7025 from ctiller/%s
Make transport-level errors be reflected in status messages on calls
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.c | 33 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 16 | ||||
-rw-r--r-- | src/core/lib/security/transport/client_auth_filter.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 106 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 2 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 65 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 9 | ||||
-rw-r--r-- | src/core/lib/transport/transport_op_string.c | 15 |
9 files changed, 154 insertions, 97 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index bbba85d80b..42075b127b 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -263,6 +263,6 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem) { grpc_transport_stream_op op; memset(&op, 0, sizeof(op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; + op.cancel_error = GRPC_ERROR_CANCELLED; grpc_call_next_op(exec_ctx, cur_elem, &op); } diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 6f24200b77..e20a0169c5 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -37,6 +37,7 @@ #include <stdbool.h> #include <string.h> +#include <grpc/status.h> #include <grpc/support/alloc.h> #include <grpc/support/avl.h> #include <grpc/support/log.h> @@ -117,6 +118,8 @@ static const char *error_int_name(grpc_error_ints key) { return "wsa_error"; case GRPC_ERROR_INT_HTTP_STATUS: return "http_status"; + case GRPC_ERROR_INT_LIMIT: + return "limit"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -249,10 +252,16 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { GPR_TIMER_BEGIN("copy_error_and_unref", 0); grpc_error *out; if (is_special(in)) { - if (in == GRPC_ERROR_NONE) return GRPC_ERROR_CREATE("no error"); - if (in == GRPC_ERROR_OOM) return GRPC_ERROR_CREATE("oom"); - if (in == GRPC_ERROR_CANCELLED) return GRPC_ERROR_CREATE("cancelled"); - out = GRPC_ERROR_CREATE("unknown"); + if (in == GRPC_ERROR_NONE) + out = GRPC_ERROR_CREATE("no error"); + else if (in == GRPC_ERROR_OOM) + out = GRPC_ERROR_CREATE("oom"); + else if (in == GRPC_ERROR_CANCELLED) + out = + grpc_error_set_int(GRPC_ERROR_CREATE("cancelled"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED); + else + out = GRPC_ERROR_CREATE("unknown"); } else { out = gpr_malloc(sizeof(*out)); #ifdef GRPC_ERROR_REFCOUNT_DEBUG @@ -280,8 +289,17 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, } bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { - void *pp; GPR_TIMER_BEGIN("grpc_error_get_int", 0); + void *pp; + if (is_special(err)) { + if (err == GRPC_ERROR_CANCELLED && which == GRPC_ERROR_INT_GRPC_STATUS) { + *p = GRPC_STATUS_CANCELLED; + GPR_TIMER_END("grpc_error_get_int", 0); + return true; + } + GPR_TIMER_END("grpc_error_get_int", 0); + return false; + } if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) { if (p != NULL) *p = (intptr_t)pp; GPR_TIMER_END("grpc_error_get_int", 0); @@ -301,6 +319,11 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, return new; } +const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) { + if (is_special(err)) return NULL; + return gpr_avl_get(err->strs, (void *)(uintptr_t)which); +} + grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { GPR_TIMER_BEGIN("grpc_error_add_child", 0); grpc_error *new = copy_error_and_unref(src); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 69cdf3028e..13f898e31a 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -92,6 +92,8 @@ typedef enum { GRPC_ERROR_INT_FD, /// HTTP status (i.e. 404) GRPC_ERROR_INT_HTTP_STATUS, + /// context sensitive limit associated with the error + GRPC_ERROR_INT_LIMIT, } grpc_error_ints; typedef enum { @@ -163,23 +165,25 @@ void grpc_error_unref(grpc_error *err); #endif grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, - intptr_t value); + intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p); grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which, - gpr_timespec value); + gpr_timespec value) GRPC_MUST_USE_RESULT; grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, - const char *value); + const char *value) GRPC_MUST_USE_RESULT; +const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which); /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level /// errors that contributed to them. -grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child); +grpc_error *grpc_error_add_child(grpc_error *src, + grpc_error *child) GRPC_MUST_USE_RESULT; grpc_error *grpc_os_error(const char *file, int line, int err, - const char *call_name); + const char *call_name) GRPC_MUST_USE_RESULT; /// create an error associated with errno!=0 (an 'operating system' error) #define GRPC_OS_ERROR(err, call_name) \ grpc_os_error(__FILE__, __LINE__, err, call_name) grpc_error *grpc_wsa_error(const char *file, int line, int err, - const char *call_name); + const char *call_name) GRPC_MUST_USE_RESULT; /// windows only: create an error associated with WSAGetLastError()!=0 #define GRPC_WSA_ERROR(err, call_name) \ grpc_wsa_error(__FILE__, __LINE__, err, call_name) diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 399b92c8e1..14ccf72dc9 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -224,8 +224,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *l; grpc_client_security_context *sec_ctx = NULL; - if (calld->security_context_set == 0 && - op->cancel_with_status == GRPC_STATUS_OK) { + if (calld->security_context_set == 0 && op->cancel_error == GRPC_ERROR_NONE) { calld->security_context_set = 1; GPR_ASSERT(op->context); if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 04291b0ee0..708ea3502a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -402,8 +402,51 @@ static void set_status_code(grpc_call *call, status_source source, call->status[source].is_set = 1; call->status[source].code = (grpc_status_code)status; +} - /* TODO(ctiller): what to do about the flush that was previously here */ +static void set_status_details(grpc_call *call, status_source source, + grpc_mdstr *status) { + if (call->status[source].details != NULL) { + GRPC_MDSTR_UNREF(status); + } else { + call->status[source].details = status; + } +} + +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + set_value(call->status[i].code, set_value_user_data); + return; + } + } + if (call->is_client) { + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); + } else { + set_value(GRPC_STATUS_OK, set_value_user_data); + } +} + +static void set_status_from_error(grpc_call *call, status_source source, + grpc_error *error) { + intptr_t status; + if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { + set_status_code(call, source, (uint32_t)status); + } else { + set_status_code(call, source, GRPC_STATUS_INTERNAL); + } + const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); + bool free_msg = false; + if (msg == NULL) { + free_msg = true; + msg = grpc_error_string(error); + } + set_status_details(call, source, grpc_mdstr_from_string(msg)); + if (free_msg) grpc_error_free_string(msg); } static void set_incoming_compression_algorithm( @@ -492,32 +535,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } -static void set_status_details(grpc_call *call, status_source source, - grpc_mdstr *status) { - if (call->status[source].details != NULL) { - GRPC_MDSTR_UNREF(call->status[source].details); - } - call->status[source].details = status; -} - -static void get_final_status(grpc_call *call, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data) { - int i; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - set_value(call->status[i].code, set_value_user_data); - return; - } - } - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); - } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - static void get_final_details(grpc_call *call, char **out_details, size_t *out_details_capacity) { int i; @@ -741,8 +758,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, typedef struct termination_closure { grpc_closure closure; grpc_call *call; - grpc_status_code status; - gpr_slice optional_message; + grpc_error *error; grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; } termination_closure; @@ -758,7 +774,7 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); break; } - gpr_slice_unref(tc->optional_message); + GRPC_ERROR_UNREF(tc->error); grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL); gpr_free(tc); } @@ -767,7 +783,7 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - op.cancel_with_status = tc->status; + op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); op.on_complete = &tc->closure; @@ -778,8 +794,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - tc->optional_message = gpr_slice_ref(tc->optional_message); - grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message); + op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); tc->op_closure = op.on_complete; @@ -789,14 +804,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, termination_closure *tc) { - grpc_mdstr *details = NULL; - if (GPR_SLICE_LENGTH(tc->optional_message) > 0) { - tc->optional_message = gpr_slice_ref(tc->optional_message); - details = grpc_mdstr_from_slice(tc->optional_message); - } - - set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status); - set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details); + set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error); if (tc->type == TC_CANCEL) { grpc_closure_init(&tc->closure, send_cancel, tc); @@ -812,13 +820,15 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(termination_closure)); tc->type = TC_CANCEL; tc->call = c; - tc->optional_message = gpr_slice_from_copied_string(description); - GPR_ASSERT(status != GRPC_STATUS_OK); - tc->status = status; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); return terminate_with_status(exec_ctx, tc); } @@ -826,13 +836,15 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(termination_closure)); tc->type = TC_CLOSE; tc->call = c; - tc->optional_message = gpr_slice_from_copied_string(description); - GPR_ASSERT(status != GRPC_STATUS_OK); - tc->status = status; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); return terminate_with_status(exec_ctx, tc); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 1eeacf81a0..5978884db8 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -250,7 +250,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " "done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); - if (grpc_trace_operation_failures) { + if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } grpc_error_free_string(errmsg); diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 1105494a85..79a20e1262 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -36,6 +36,7 @@ #include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport_impl.h" #ifdef GRPC_STREAM_REFCOUNT_DEBUG @@ -162,55 +163,63 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } -void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, - grpc_status_code status) { - GPR_ASSERT(status != GRPC_STATUS_OK); - if (op->cancel_with_status == GRPC_STATUS_OK) { - op->cancel_with_status = status; - } - if (op->close_with_status != GRPC_STATUS_OK) { - op->close_with_status = GRPC_STATUS_OK; - if (op->optional_close_message != NULL) { - gpr_slice_unref(*op->optional_close_message); - op->optional_close_message = NULL; - } - } -} - typedef struct { - gpr_slice message; + grpc_error *error; grpc_closure *then_call; grpc_closure closure; } close_message_data; static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { close_message_data *cmd = p; - gpr_slice_unref(cmd->message); + GRPC_ERROR_UNREF(cmd->error); if (cmd->then_call != NULL) { cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, GRPC_ERROR_REF(error)); } gpr_free(cmd); } +static void add_error(grpc_transport_stream_op *op, grpc_error **which, + grpc_error *error) { + close_message_data *cmd; + cmd = gpr_malloc(sizeof(*cmd)); + cmd->error = error; + cmd->then_call = op->on_complete; + grpc_closure_init(&cmd->closure, free_message, cmd); + op->on_complete = &cmd->closure; + *which = error; +} + +void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, + grpc_status_code status) { + GPR_ASSERT(status != GRPC_STATUS_OK); + if (op->cancel_error == GRPC_ERROR_NONE) { + op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED, + GRPC_ERROR_INT_GRPC_STATUS, status); + op->close_error = GRPC_ERROR_NONE; + } +} + void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, grpc_status_code status, gpr_slice *optional_message) { - close_message_data *cmd; GPR_ASSERT(status != GRPC_STATUS_OK); - if (op->cancel_with_status != GRPC_STATUS_OK || - op->close_with_status != GRPC_STATUS_OK) { + if (op->cancel_error != GRPC_ERROR_NONE || + op->close_error != GRPC_ERROR_NONE) { if (optional_message) { gpr_slice_unref(*optional_message); } return; } - if (optional_message) { - cmd = gpr_malloc(sizeof(*cmd)); - cmd->message = *optional_message; - cmd->then_call = op->on_complete; - grpc_closure_init(&cmd->closure, free_message, cmd); - op->on_complete = &cmd->closure; - op->optional_close_message = &cmd->message; + grpc_error *error; + if (optional_message != NULL) { + char *msg = gpr_dump_slice(*optional_message, GPR_DUMP_ASCII); + error = grpc_error_set_str(GRPC_ERROR_CREATE(msg), + GRPC_ERROR_STR_GRPC_MESSAGE, msg); + gpr_free(msg); + gpr_slice_unref(*optional_message); + } else { + error = GRPC_ERROR_CREATE("Call force closed"); } - op->close_with_status = status; + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); + add_error(op, &op->close_error, error); } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index a46ccb643c..d2f6344ee3 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -135,13 +135,12 @@ typedef struct grpc_transport_stream_op { /** Collect any stats into provided buffer, zero internal stat counters */ grpc_transport_stream_stats *collect_stats; - /** If != GRPC_STATUS_OK, cancel this stream */ - grpc_status_code cancel_with_status; + /** If != GRPC_ERROR_NONE, cancel this stream */ + grpc_error *cancel_error; - /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this + /** If != GRPC_ERROR, send grpc-status, grpc-message, and close this stream for both reading and writing */ - grpc_status_code close_with_status; - gpr_slice *optional_close_message; + grpc_error *close_error; /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index aeaba5339f..138591db2a 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -119,10 +119,21 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA")); } - if (op->cancel_with_status != GRPC_STATUS_OK) { + if (op->cancel_error != GRPC_ERROR_NONE) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); + const char *msg = grpc_error_string(op->cancel_error); + gpr_asprintf(&tmp, "CANCEL:%s", msg); + grpc_error_free_string(msg); + gpr_strvec_add(&b, tmp); + } + + if (op->close_error != GRPC_ERROR_NONE) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + const char *msg = grpc_error_string(op->close_error); + gpr_asprintf(&tmp, "CLOSE:%s", msg); + grpc_error_free_string(msg); gpr_strvec_add(&b, tmp); } |