diff options
-rw-r--r-- | src/core/ext/client_config/subchannel_call_holder.c | 7 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 228 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 3 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.c | 20 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.h | 16 | ||||
-rw-r--r-- | src/core/lib/security/transport/client_auth_filter.c | 3 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 106 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 2 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 65 | ||||
-rw-r--r-- | src/core/lib/transport/transport.h | 9 | ||||
-rw-r--r-- | src/core/lib/transport/transport_op_string.c | 15 |
12 files changed, 285 insertions, 191 deletions
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index e31800edd9..b96a0ad093 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -120,16 +120,13 @@ retry: return; } /* if this is a cancellation, then we can raise our cancelled flag */ - if (op->cancel_with_status != GRPC_STATUS_OK) { + if (op->cancel_error != GRPC_ERROR_NONE) { if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) { goto retry; } else { switch (holder->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: - fail_locked(exec_ctx, holder, - grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"), - GRPC_ERROR_INT_GRPC_STATUS, - op->cancel_with_status)); + fail_locked(exec_ctx, holder, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 9c125566d4..c311f54409 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -106,14 +106,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status, - gpr_slice *optional_message); + grpc_error *error); static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status, - gpr_slice *optional_message); + grpc_error *error); /** Add endpoint from this transport to pollset */ static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx, @@ -163,8 +161,6 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, GPR_ASSERT(t->ep == NULL); - gpr_slice_unref(t->optional_drop_message); - gpr_slice_buffer_destroy(&t->global.qbuf); gpr_slice_buffer_destroy(&t->writing.outbuf); @@ -266,7 +262,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0; t->parsing.is_first_frame = true; t->writing.is_client = is_client; - t->optional_drop_message = gpr_empty_slice(); grpc_connectivity_state_init( &t->channel_callback.state_tracker, GRPC_CHANNEL_READY, is_client ? "client_transport" : "server_transport"); @@ -876,7 +871,9 @@ static void maybe_start_some_streams( grpc_chttp2_list_pop_waiting_for_concurrency(transport_global, &stream_global)) { cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE, NULL); + grpc_error_set_int( + GRPC_ERROR_CREATE("Stream IDs exhausted"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); } } @@ -958,14 +955,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT; } - if (op->cancel_with_status != GRPC_STATUS_OK) { + if (op->cancel_error != GRPC_ERROR_NONE) { cancel_from_api(exec_ctx, transport_global, stream_global, - op->cancel_with_status, op->optional_close_message); + GRPC_ERROR_REF(op->cancel_error)); } - if (op->close_with_status != GRPC_STATUS_OK) { + if (op->close_error != GRPC_ERROR_NONE) { close_from_api(exec_ctx, transport_global, stream_global, - op->close_with_status, op->optional_close_message); + GRPC_ERROR_REF(op->close_error)); } if (op->send_initial_metadata != NULL) { @@ -979,12 +976,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, transport_global->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; if (metadata_size > metadata_peer_limit) { - gpr_log(GPR_DEBUG, - "to-be-sent initial metadata size exceeds peer limit " - "(%" PRIuPTR " vs. %" PRIuPTR ")", - metadata_size, metadata_peer_limit); - cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + grpc_error_set_int( + grpc_error_set_int( + GRPC_ERROR_CREATE("to-be-sent initial metadata size " + "exceeds peer limit"), + GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size), + GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } else { if (contains_non_ok_status(transport_global, op->send_initial_metadata)) { stream_global->seen_error = true; @@ -1038,12 +1039,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, transport_global->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE]; if (metadata_size > metadata_peer_limit) { - gpr_log(GPR_DEBUG, - "to-be-sent trailing metadata size exceeds peer limit " - "(%" PRIuPTR " vs. %" PRIuPTR ")", - metadata_size, metadata_peer_limit); - cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + grpc_error_set_int( + grpc_error_set_int( + GRPC_ERROR_CREATE("to-be-sent trailing metadata size " + "exceeds peer limit"), + GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size), + GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } else { if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) { @@ -1235,8 +1240,12 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->exceeded_metadata_size) { - cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + GRPC_ERROR_CREATE( + "received initial metadata size exceeds limit"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } } grpc_chttp2_incoming_metadata_buffer_publish( @@ -1275,8 +1284,12 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs); } if (stream_global->exceeded_metadata_size) { - cancel_from_api(exec_ctx, transport_global, stream_global, - GRPC_STATUS_RESOURCE_EXHAUSTED, NULL); + cancel_from_api( + exec_ctx, transport_global, stream_global, + grpc_error_set_int( + GRPC_ERROR_CREATE( + "received trailing metadata size exceeds limit"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED)); } } if (stream_global->all_incoming_byte_streams_finished) { @@ -1340,35 +1353,67 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GRPC_ERROR_UNREF(error); } +static void status_codes_from_error(grpc_error *error, + grpc_chttp2_error_code *http2_error, + grpc_status_code *grpc_status) { + intptr_t ip_http; + intptr_t ip_grpc; + bool have_http = + grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &ip_http); + bool have_grpc = + grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &ip_grpc); + if (have_http) { + *http2_error = (grpc_chttp2_error_code)ip_http; + } else if (have_grpc) { + *http2_error = + grpc_chttp2_grpc_status_to_http2_error((grpc_status_code)ip_grpc); + } else { + *http2_error = GRPC_CHTTP2_INTERNAL_ERROR; + } + if (have_grpc) { + *grpc_status = (grpc_status_code)ip_grpc; + } else if (have_http) { + *grpc_status = + grpc_chttp2_http2_error_to_grpc_status((grpc_chttp2_error_code)ip_http); + } else { + *grpc_status = GRPC_STATUS_INTERNAL; + } +} + static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status, - gpr_slice *optional_message) { + grpc_error *due_to_error) { if (!stream_global->read_closed || !stream_global->write_closed) { + grpc_status_code grpc_status; + grpc_chttp2_error_code http_error; + status_codes_from_error(due_to_error, &http_error, &grpc_status); + if (stream_global->id != 0) { gpr_slice_buffer_add( &transport_global->qbuf, - grpc_chttp2_rst_stream_create( - stream_global->id, - (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status), - &stream_global->stats.outgoing)); + grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, + &stream_global->stats.outgoing)); } - if (optional_message) { - gpr_slice_ref(*optional_message); + const char *msg = + grpc_error_get_str(due_to_error, GRPC_ERROR_STR_GRPC_MESSAGE); + bool free_msg = false; + if (msg == NULL) { + free_msg = true; + msg = grpc_error_string(due_to_error); } - grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, - optional_message); + gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, + grpc_status, &msg_slice); + if (free_msg) grpc_error_free_string(msg); } - if (status != GRPC_STATUS_OK && !stream_global->seen_error) { + if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) { stream_global->seen_error = true; grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } - grpc_chttp2_mark_stream_closed( - exec_ctx, transport_global, stream_global, 1, 1, - grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"), - GRPC_ERROR_INT_GRPC_STATUS, status)); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, + 1, due_to_error); } void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, @@ -1469,15 +1514,17 @@ void grpc_chttp2_mark_stream_closed( static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, - grpc_status_code status, - gpr_slice *optional_message) { + grpc_error *error) { gpr_slice hdr; gpr_slice status_hdr; gpr_slice message_pfx; uint8_t *p; uint32_t len = 0; + grpc_status_code grpc_status; + grpc_chttp2_error_code http_error; + status_codes_from_error(error, &http_error, &grpc_status); - GPR_ASSERT(status >= 0 && (int)status < 100); + GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100); if (stream_global->id != 0 && !transport_global->is_client) { /* Hand roll a header block. @@ -1487,7 +1534,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, time we got around to sending this, so instead we ignore HPACK compression and just write the uncompressed bytes onto the wire. */ - status_hdr = gpr_slice_malloc(15 + (status >= 10)); + status_hdr = gpr_slice_malloc(15 + (grpc_status >= 10)); p = GPR_SLICE_START_PTR(status_hdr); *p++ = 0x40; /* literal header */ *p++ = 11; /* len(grpc-status) */ @@ -1502,19 +1549,23 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, *p++ = 't'; *p++ = 'u'; *p++ = 's'; - if (status < 10) { + if (grpc_status < 10) { *p++ = 1; - *p++ = (uint8_t)('0' + status); + *p++ = (uint8_t)('0' + grpc_status); } else { *p++ = 2; - *p++ = (uint8_t)('0' + (status / 10)); - *p++ = (uint8_t)('0' + (status % 10)); + *p++ = (uint8_t)('0' + (grpc_status / 10)); + *p++ = (uint8_t)('0' + (grpc_status % 10)); } GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr)); len += (uint32_t)GPR_SLICE_LENGTH(status_hdr); - if (optional_message) { - GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127); + const char *optional_message = + grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); + + if (optional_message != NULL) { + size_t msg_len = strlen(optional_message); + GPR_ASSERT(msg_len < 127); message_pfx = gpr_slice_malloc(15); p = GPR_SLICE_START_PTR(message_pfx); *p++ = 0x40; @@ -1531,10 +1582,10 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, *p++ = 'a'; *p++ = 'g'; *p++ = 'e'; - *p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message); + *p++ = (uint8_t)msg_len; GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx)); len += (uint32_t)GPR_SLICE_LENGTH(message_pfx); - len += (uint32_t)GPR_SLICE_LENGTH(*optional_message); + len += (uint32_t)msg_len; } hdr = gpr_slice_malloc(9); @@ -1555,7 +1606,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, if (optional_message) { gpr_slice_buffer_add(&transport_global->qbuf, message_pfx); gpr_slice_buffer_add(&transport_global->qbuf, - gpr_slice_ref(*optional_message)); + gpr_slice_from_copied_string(optional_message)); } gpr_slice_buffer_add( &transport_global->qbuf, @@ -1563,43 +1614,45 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, &stream_global->stats.outgoing)); } - if (optional_message) { - gpr_slice_ref(*optional_message); - } - grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, - optional_message); - grpc_error *err = GRPC_ERROR_CREATE("Stream closed"); - err = grpc_error_set_int(err, GRPC_ERROR_INT_GRPC_STATUS, status); - if (optional_message) { - char *str = - gpr_dump_slice(*optional_message, GPR_DUMP_HEX | GPR_DUMP_ASCII); - err = grpc_error_set_str(err, GRPC_ERROR_STR_GRPC_MESSAGE, str); - gpr_free(str); + const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); + bool free_msg = false; + if (msg == NULL) { + free_msg = true; + msg = grpc_error_string(error); } + gpr_slice msg_slice = gpr_slice_from_copied_string(msg); + grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, + grpc_status, &msg_slice); + if (free_msg) grpc_error_free_string(msg); + grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, - 1, err); + 1, error); } +typedef struct { + grpc_exec_ctx *exec_ctx; + grpc_error *error; +} cancel_stream_cb_args; + static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global, void *user_data, grpc_chttp2_stream_global *stream_global) { - grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global); - cancel_from_api(user_data, transport_global, stream_global, - GRPC_STATUS_UNAVAILABLE, - GPR_SLICE_IS_EMPTY(transport->optional_drop_message) - ? NULL - : &transport->optional_drop_message); + cancel_stream_cb_args *args = user_data; + cancel_from_api(args->exec_ctx, transport_global, stream_global, + GRPC_ERROR_REF(args->error)); } -static void end_all_the_calls(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t) { - grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb); +static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_error *error) { + cancel_stream_cb_args args = {exec_ctx, error}; + grpc_chttp2_for_all_streams(&t->global, &args, cancel_stream_cb); + GRPC_ERROR_UNREF(error); } static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { - close_transport_locked(exec_ctx, t, error); - end_all_the_calls(exec_ctx, t); + close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error)); + end_all_the_calls(exec_ctx, t, error); } /** update window from a settings change */ @@ -1706,15 +1759,7 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg, t->read_buffer.slices[i]); }; if (i != t->read_buffer.count) { - gpr_slice_unref(t->optional_drop_message); errors[2] = try_http_parsing(exec_ctx, t); - if (errors[2] != GRPC_ERROR_NONE) { - t->optional_drop_message = gpr_slice_from_copied_string( - "Connection dropped: received http1.x response"); - } else { - t->optional_drop_message = gpr_slice_from_copied_string( - "Connection dropped: received unparseable response"); - } } grpc_error *err = errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE && @@ -1782,6 +1827,10 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_CREATE("Transport closed"); } if (error != GRPC_ERROR_NONE) { + if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; if (!t->executor.writing_active && t->ep) { @@ -1796,6 +1845,7 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, prevent_endpoint_shutdown(t); } gpr_slice_buffer_reset_and_unref(&t->read_buffer); + GRPC_ERROR_UNREF(error); if (keep_reading) { grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action); @@ -1804,8 +1854,6 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, } else { UNREF_TRANSPORT(exec_ctx, t, "reading_action"); } - - GRPC_LOG_IF_ERROR("close_transport", error); } /******************************************************************************* diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index d63170e350..7e281f1b1a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -384,9 +384,6 @@ struct grpc_chttp2_transport { /** Transport op to be applied post-parsing */ grpc_transport_op *post_parsing_op; - - /** Message explaining the reason of dropping connection */ - gpr_slice optional_drop_message; }; typedef struct { diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index bbba85d80b..42075b127b 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -263,6 +263,6 @@ void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, grpc_call_element *cur_elem) { grpc_transport_stream_op op; memset(&op, 0, sizeof(op)); - op.cancel_with_status = GRPC_STATUS_CANCELLED; + op.cancel_error = GRPC_ERROR_CANCELLED; grpc_call_next_op(exec_ctx, cur_elem, &op); } diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 540fb4fa7e..aa94bb7fdf 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -37,6 +37,7 @@ #include <stdbool.h> #include <string.h> +#include <grpc/status.h> #include <grpc/support/alloc.h> #include <grpc/support/avl.h> #include <grpc/support/log.h> @@ -115,6 +116,8 @@ static const char *error_int_name(grpc_error_ints key) { return "wsa_error"; case GRPC_ERROR_INT_HTTP_STATUS: return "http_status"; + case GRPC_ERROR_INT_LIMIT: + return "limit"; } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -245,7 +248,10 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { if (is_special(in)) { if (in == GRPC_ERROR_NONE) return GRPC_ERROR_CREATE("no error"); if (in == GRPC_ERROR_OOM) return GRPC_ERROR_CREATE("oom"); - if (in == GRPC_ERROR_CANCELLED) return GRPC_ERROR_CREATE("cancelled"); + if (in == GRPC_ERROR_CANCELLED) + return grpc_error_set_int(GRPC_ERROR_CREATE("cancelled"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_CANCELLED); return GRPC_ERROR_CREATE("unknown"); } grpc_error *out = gpr_malloc(sizeof(*out)); @@ -271,6 +277,13 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { void *pp; + if (is_special(err)) { + if (err == GRPC_ERROR_CANCELLED && which == GRPC_ERROR_INT_GRPC_STATUS) { + *p = GRPC_STATUS_CANCELLED; + return true; + } + return false; + } if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) { if (p != NULL) *p = (intptr_t)pp; return true; @@ -286,6 +299,11 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, return new; } +const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) { + if (is_special(err)) return NULL; + return gpr_avl_get(err->strs, (void *)(uintptr_t)which); +} + grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { grpc_error *new = copy_error_and_unref(src); new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 69cdf3028e..13f898e31a 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -92,6 +92,8 @@ typedef enum { GRPC_ERROR_INT_FD, /// HTTP status (i.e. 404) GRPC_ERROR_INT_HTTP_STATUS, + /// context sensitive limit associated with the error + GRPC_ERROR_INT_LIMIT, } grpc_error_ints; typedef enum { @@ -163,23 +165,25 @@ void grpc_error_unref(grpc_error *err); #endif grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, - intptr_t value); + intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p); grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which, - gpr_timespec value); + gpr_timespec value) GRPC_MUST_USE_RESULT; grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, - const char *value); + const char *value) GRPC_MUST_USE_RESULT; +const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which); /// Add a child error: an error that is believed to have contributed to this /// error occurring. Allows root causing high level errors from lower level /// errors that contributed to them. -grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child); +grpc_error *grpc_error_add_child(grpc_error *src, + grpc_error *child) GRPC_MUST_USE_RESULT; grpc_error *grpc_os_error(const char *file, int line, int err, - const char *call_name); + const char *call_name) GRPC_MUST_USE_RESULT; /// create an error associated with errno!=0 (an 'operating system' error) #define GRPC_OS_ERROR(err, call_name) \ grpc_os_error(__FILE__, __LINE__, err, call_name) grpc_error *grpc_wsa_error(const char *file, int line, int err, - const char *call_name); + const char *call_name) GRPC_MUST_USE_RESULT; /// windows only: create an error associated with WSAGetLastError()!=0 #define GRPC_WSA_ERROR(err, call_name) \ grpc_wsa_error(__FILE__, __LINE__, err, call_name) diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 399b92c8e1..14ccf72dc9 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -224,8 +224,7 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_linked_mdelem *l; grpc_client_security_context *sec_ctx = NULL; - if (calld->security_context_set == 0 && - op->cancel_with_status == GRPC_STATUS_OK) { + if (calld->security_context_set == 0 && op->cancel_error == GRPC_ERROR_NONE) { calld->security_context_set = 1; GPR_ASSERT(op->context); if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 04291b0ee0..708ea3502a 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -402,8 +402,51 @@ static void set_status_code(grpc_call *call, status_source source, call->status[source].is_set = 1; call->status[source].code = (grpc_status_code)status; +} - /* TODO(ctiller): what to do about the flush that was previously here */ +static void set_status_details(grpc_call *call, status_source source, + grpc_mdstr *status) { + if (call->status[source].details != NULL) { + GRPC_MDSTR_UNREF(status); + } else { + call->status[source].details = status; + } +} + +static void get_final_status(grpc_call *call, + void (*set_value)(grpc_status_code code, + void *user_data), + void *set_value_user_data) { + int i; + for (i = 0; i < STATUS_SOURCE_COUNT; i++) { + if (call->status[i].is_set) { + set_value(call->status[i].code, set_value_user_data); + return; + } + } + if (call->is_client) { + set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); + } else { + set_value(GRPC_STATUS_OK, set_value_user_data); + } +} + +static void set_status_from_error(grpc_call *call, status_source source, + grpc_error *error) { + intptr_t status; + if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { + set_status_code(call, source, (uint32_t)status); + } else { + set_status_code(call, source, GRPC_STATUS_INTERNAL); + } + const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE); + bool free_msg = false; + if (msg == NULL) { + free_msg = true; + msg = grpc_error_string(error); + } + set_status_details(call, source, grpc_mdstr_from_string(msg)); + if (free_msg) grpc_error_free_string(msg); } static void set_incoming_compression_algorithm( @@ -492,32 +535,6 @@ uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { return encodings_accepted_by_peer; } -static void set_status_details(grpc_call *call, status_source source, - grpc_mdstr *status) { - if (call->status[source].details != NULL) { - GRPC_MDSTR_UNREF(call->status[source].details); - } - call->status[source].details = status; -} - -static void get_final_status(grpc_call *call, - void (*set_value)(grpc_status_code code, - void *user_data), - void *set_value_user_data) { - int i; - for (i = 0; i < STATUS_SOURCE_COUNT; i++) { - if (call->status[i].is_set) { - set_value(call->status[i].code, set_value_user_data); - return; - } - } - if (call->is_client) { - set_value(GRPC_STATUS_UNKNOWN, set_value_user_data); - } else { - set_value(GRPC_STATUS_OK, set_value_user_data); - } -} - static void get_final_details(grpc_call *call, char **out_details, size_t *out_details_capacity) { int i; @@ -741,8 +758,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, typedef struct termination_closure { grpc_closure closure; grpc_call *call; - grpc_status_code status; - gpr_slice optional_message; + grpc_error *error; grpc_closure *op_closure; enum { TC_CANCEL, TC_CLOSE } type; } termination_closure; @@ -758,7 +774,7 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "close"); break; } - gpr_slice_unref(tc->optional_message); + GRPC_ERROR_UNREF(tc->error); grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL); gpr_free(tc); } @@ -767,7 +783,7 @@ static void send_cancel(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - op.cancel_with_status = tc->status; + op.cancel_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); op.on_complete = &tc->closure; @@ -778,8 +794,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { grpc_transport_stream_op op; termination_closure *tc = tcp; memset(&op, 0, sizeof(op)); - tc->optional_message = gpr_slice_ref(tc->optional_message); - grpc_transport_stream_op_add_close(&op, tc->status, &tc->optional_message); + op.close_error = tc->error; /* reuse closure to catch completion */ grpc_closure_init(&tc->closure, done_termination, tc); tc->op_closure = op.on_complete; @@ -789,14 +804,7 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, termination_closure *tc) { - grpc_mdstr *details = NULL; - if (GPR_SLICE_LENGTH(tc->optional_message) > 0) { - tc->optional_message = gpr_slice_ref(tc->optional_message); - details = grpc_mdstr_from_slice(tc->optional_message); - } - - set_status_code(tc->call, STATUS_FROM_API_OVERRIDE, (uint32_t)tc->status); - set_status_details(tc->call, STATUS_FROM_API_OVERRIDE, details); + set_status_from_error(tc->call, STATUS_FROM_API_OVERRIDE, tc->error); if (tc->type == TC_CANCEL) { grpc_closure_init(&tc->closure, send_cancel, tc); @@ -812,13 +820,15 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(termination_closure)); tc->type = TC_CANCEL; tc->call = c; - tc->optional_message = gpr_slice_from_copied_string(description); - GPR_ASSERT(status != GRPC_STATUS_OK); - tc->status = status; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); return terminate_with_status(exec_ctx, tc); } @@ -826,13 +836,15 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, grpc_status_code status, const char *description) { + GPR_ASSERT(status != GRPC_STATUS_OK); termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(termination_closure)); tc->type = TC_CLOSE; tc->call = c; - tc->optional_message = gpr_slice_from_copied_string(description); - GPR_ASSERT(status != GRPC_STATUS_OK); - tc->status = status; + tc->error = grpc_error_set_int( + grpc_error_set_str(GRPC_ERROR_CREATE(description), + GRPC_ERROR_STR_GRPC_MESSAGE, description), + GRPC_ERROR_INT_GRPC_STATUS, status); return terminate_with_status(exec_ctx, tc); } diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b5f2f65e5c..72b7db1ba9 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -240,7 +240,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, " "done_arg=%p, storage=%p)", 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage)); - if (grpc_trace_operation_failures) { + if (grpc_trace_operation_failures && error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } grpc_error_free_string(errmsg); diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 1105494a85..79a20e1262 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -36,6 +36,7 @@ #include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include "src/core/lib/support/string.h" #include "src/core/lib/transport/transport_impl.h" #ifdef GRPC_STREAM_REFCOUNT_DEBUG @@ -162,55 +163,63 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } -void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, - grpc_status_code status) { - GPR_ASSERT(status != GRPC_STATUS_OK); - if (op->cancel_with_status == GRPC_STATUS_OK) { - op->cancel_with_status = status; - } - if (op->close_with_status != GRPC_STATUS_OK) { - op->close_with_status = GRPC_STATUS_OK; - if (op->optional_close_message != NULL) { - gpr_slice_unref(*op->optional_close_message); - op->optional_close_message = NULL; - } - } -} - typedef struct { - gpr_slice message; + grpc_error *error; grpc_closure *then_call; grpc_closure closure; } close_message_data; static void free_message(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { close_message_data *cmd = p; - gpr_slice_unref(cmd->message); + GRPC_ERROR_UNREF(cmd->error); if (cmd->then_call != NULL) { cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, GRPC_ERROR_REF(error)); } gpr_free(cmd); } +static void add_error(grpc_transport_stream_op *op, grpc_error **which, + grpc_error *error) { + close_message_data *cmd; + cmd = gpr_malloc(sizeof(*cmd)); + cmd->error = error; + cmd->then_call = op->on_complete; + grpc_closure_init(&cmd->closure, free_message, cmd); + op->on_complete = &cmd->closure; + *which = error; +} + +void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, + grpc_status_code status) { + GPR_ASSERT(status != GRPC_STATUS_OK); + if (op->cancel_error == GRPC_ERROR_NONE) { + op->cancel_error = grpc_error_set_int(GRPC_ERROR_CANCELLED, + GRPC_ERROR_INT_GRPC_STATUS, status); + op->close_error = GRPC_ERROR_NONE; + } +} + void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op, grpc_status_code status, gpr_slice *optional_message) { - close_message_data *cmd; GPR_ASSERT(status != GRPC_STATUS_OK); - if (op->cancel_with_status != GRPC_STATUS_OK || - op->close_with_status != GRPC_STATUS_OK) { + if (op->cancel_error != GRPC_ERROR_NONE || + op->close_error != GRPC_ERROR_NONE) { if (optional_message) { gpr_slice_unref(*optional_message); } return; } - if (optional_message) { - cmd = gpr_malloc(sizeof(*cmd)); - cmd->message = *optional_message; - cmd->then_call = op->on_complete; - grpc_closure_init(&cmd->closure, free_message, cmd); - op->on_complete = &cmd->closure; - op->optional_close_message = &cmd->message; + grpc_error *error; + if (optional_message != NULL) { + char *msg = gpr_dump_slice(*optional_message, GPR_DUMP_ASCII); + error = grpc_error_set_str(GRPC_ERROR_CREATE(msg), + GRPC_ERROR_STR_GRPC_MESSAGE, msg); + gpr_free(msg); + gpr_slice_unref(*optional_message); + } else { + error = GRPC_ERROR_CREATE("Call force closed"); } - op->close_with_status = status; + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); + add_error(op, &op->close_error, error); } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index a46ccb643c..d2f6344ee3 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -135,13 +135,12 @@ typedef struct grpc_transport_stream_op { /** Collect any stats into provided buffer, zero internal stat counters */ grpc_transport_stream_stats *collect_stats; - /** If != GRPC_STATUS_OK, cancel this stream */ - grpc_status_code cancel_with_status; + /** If != GRPC_ERROR_NONE, cancel this stream */ + grpc_error *cancel_error; - /** If != GRPC_STATUS_OK, send grpc-status, grpc-message, and close this + /** If != GRPC_ERROR, send grpc-status, grpc-message, and close this stream for both reading and writing */ - grpc_status_code close_with_status; - gpr_slice *optional_close_message; + grpc_error *close_error; /* Indexes correspond to grpc_context_index enum values */ grpc_call_context_element *context; diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index aeaba5339f..138591db2a 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -119,10 +119,21 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) { gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA")); } - if (op->cancel_with_status != GRPC_STATUS_OK) { + if (op->cancel_error != GRPC_ERROR_NONE) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); first = 0; - gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status); + const char *msg = grpc_error_string(op->cancel_error); + gpr_asprintf(&tmp, "CANCEL:%s", msg); + grpc_error_free_string(msg); + gpr_strvec_add(&b, tmp); + } + + if (op->close_error != GRPC_ERROR_NONE) { + if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); + first = 0; + const char *msg = grpc_error_string(op->close_error); + gpr_asprintf(&tmp, "CLOSE:%s", msg); + grpc_error_free_string(msg); gpr_strvec_add(&b, tmp); } |