aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/census/grpc_filter.c3
-rw-r--r--src/core/ext/client_config/channel_connectivity.c7
-rw-r--r--src/core/ext/client_config/client_channel.c6
-rw-r--r--src/core/ext/client_config/subchannel.c2
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c7
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.c3
-rw-r--r--src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c256
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h6
9 files changed, 182 insertions, 111 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 72e4e5427e..f51d850e01 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -45,6 +45,7 @@
#include "src/core/ext/census/census_interface.h"
#include "src/core/ext/census/census_rpc_stats.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/transport/static_metadata.h"
typedef struct call_data {
@@ -92,6 +93,7 @@ static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
grpc_error *error) {
+ GPR_TIMER_BEGIN("census-server:server_on_done_recv", 0);
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@@ -99,6 +101,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand);
}
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, error);
+ GPR_TIMER_END("census-server:server_on_done_recv", 0);
}
static void server_mutate_op(grpc_call_element *elem,
diff --git a/src/core/ext/client_config/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c
index 20c01a9a7c..c1220e3a8c 100644
--- a/src/core/ext/client_config/channel_connectivity.c
+++ b/src/core/ext/client_config/channel_connectivity.c
@@ -189,10 +189,11 @@ void grpc_channel_watch_connectivity_state(
GRPC_API_TRACE(
"grpc_channel_watch_connectivity_state("
"channel=%p, last_observed_state=%d, "
- "deadline=gpr_timespec { tv_sec: %lld, tv_nsec: %d, clock_type: %d }, "
+ "deadline=gpr_timespec { tv_sec: %" PRId64
+ ", tv_nsec: %d, clock_type: %d }, "
"cq=%p, tag=%p)",
- 7, (channel, (int)last_observed_state, (long long)deadline.tv_sec,
- (int)deadline.tv_nsec, (int)deadline.clock_type, cq, tag));
+ 7, (channel, (int)last_observed_state, deadline.tv_sec, deadline.tv_nsec,
+ (int)deadline.clock_type, cq, tag));
grpc_cq_begin_op(cq, tag);
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 1d5a7d5224..a096435c98 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -367,6 +367,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
+ GPR_TIMER_BEGIN("cc_pick_subchannel", 0);
+
grpc_call_element *elem = elemp;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
@@ -391,6 +393,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
}
gpr_mu_unlock(&chand->mu_config);
+ GPR_TIMER_END("cc_pick_subchannel", 0);
return 1;
}
if (chand->lb_policy != NULL) {
@@ -402,6 +405,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
initial_metadata, initial_metadata_flags,
connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
+ GPR_TIMER_END("cc_pick_subchannel", 0);
return r;
}
if (chand->resolver != NULL && !chand->started_resolving) {
@@ -426,6 +430,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
NULL);
}
gpr_mu_unlock(&chand->mu_config);
+
+ GPR_TIMER_END("cc_pick_subchannel", 0);
return 0;
}
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 468067ea57..d089cd4399 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -690,9 +690,11 @@ char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call,
grpc_transport_stream_op *op) {
+ GPR_TIMER_BEGIN("grpc_subchannel_call_process_op", 0);
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
+ GPR_TIMER_END("grpc_subchannel_call_process_op", 0);
}
grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index e31800edd9..b96a0ad093 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -120,16 +120,13 @@ retry:
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
- if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (op->cancel_error != GRPC_ERROR_NONE) {
if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
goto retry;
} else {
switch (holder->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
- fail_locked(exec_ctx, holder,
- grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"),
- GRPC_ERROR_INT_GRPC_STATUS,
- op->cancel_with_status));
+ fail_locked(exec_ctx, holder, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
index 9bae3a94f9..e5c987925c 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
@@ -97,7 +97,8 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
goto error;
}
- err = grpc_tcp_server_create(NULL, &tcp);
+ err =
+ grpc_tcp_server_create(NULL, grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
index ead8a4d566..c42810e913 100644
--- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
+++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
@@ -216,7 +216,8 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state = gpr_malloc(sizeof(*state));
memset(state, 0, sizeof(*state));
grpc_closure_init(&state->destroy_closure, destroy_done, state);
- err = grpc_tcp_server_create(&state->destroy_closure, &tcp);
+ err = grpc_tcp_server_create(&state->destroy_closure,
+ grpc_server_get_channel_args(server), &tcp);
if (err != GRPC_ERROR_NONE) {
goto error;
}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 8f47ea79b5..38e782b9b4 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -106,14 +106,12 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status,
- gpr_slice *optional_message);
+ grpc_error *error);
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status,
- gpr_slice *optional_message);
+ grpc_error *error);
/** Add endpoint from this transport to pollset */
static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
@@ -163,8 +161,6 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(t->ep == NULL);
- gpr_slice_unref(t->optional_drop_message);
-
gpr_slice_buffer_destroy(&t->global.qbuf);
gpr_slice_buffer_destroy(&t->writing.outbuf);
@@ -266,7 +262,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->parsing.is_first_frame = true;
t->writing.is_client = is_client;
- t->optional_drop_message = gpr_empty_slice();
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
@@ -594,6 +589,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_metadata_buffer_destroy(
&s->global.received_trailing_metadata);
gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
+ GRPC_ERROR_UNREF(s->global.removal_error);
UNREF_TRANSPORT(exec_ctx, t, "stream");
@@ -642,6 +638,8 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
grpc_chttp2_executor_action_header *hdr;
grpc_chttp2_executor_action_header *next;
+ GPR_TIMER_BEGIN("finish_global_actions", 0);
+
for (;;) {
if (!t->executor.writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
@@ -659,7 +657,9 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
NULL;
gpr_mu_unlock(&t->executor.mu);
while (hdr != NULL) {
+ GPR_TIMER_BEGIN("chttp2:locked_action", 0);
hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
+ GPR_TIMER_END("chttp2:locked_action", 0);
next = hdr->next;
gpr_free(hdr);
UNREF_TRANSPORT(exec_ctx, t, "pending_action");
@@ -672,6 +672,8 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&t->executor.mu);
break;
}
+
+ GPR_TIMER_END("finish_global_actions", 0);
}
void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
@@ -681,6 +683,8 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
void *arg, size_t sizeof_arg) {
grpc_chttp2_executor_action_header *hdr;
+ GPR_TIMER_BEGIN("grpc_chttp2_run_with_global_lock", 0);
+
REF_TRANSPORT(t, "run_global");
gpr_mu_lock(&t->executor.mu);
@@ -689,7 +693,9 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
t->executor.global_active = 1;
gpr_mu_unlock(&t->executor.mu);
+ GPR_TIMER_BEGIN("chttp2:locked_action", 0);
action(exec_ctx, t, optional_stream, arg);
+ GPR_TIMER_END("chttp2:locked_action", 0);
finish_global_actions(exec_ctx, t);
} else {
@@ -726,6 +732,8 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
}
UNREF_TRANSPORT(exec_ctx, t, "run_global");
+
+ GPR_TIMER_END("grpc_chttp2_run_with_global_lock", 0);
}
/*******************************************************************************
@@ -876,7 +884,9 @@ static void maybe_start_some_streams(
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_UNAVAILABLE, NULL);
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE("Stream IDs exhausted"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
}
@@ -958,14 +968,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
on_complete->next_data.scratch |= CLOSURE_BARRIER_STATS_BIT;
}
- if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (op->cancel_error != GRPC_ERROR_NONE) {
cancel_from_api(exec_ctx, transport_global, stream_global,
- op->cancel_with_status, op->optional_cancel_message);
+ GRPC_ERROR_REF(op->cancel_error));
}
- if (op->close_with_status != GRPC_STATUS_OK) {
+ if (op->close_error != GRPC_ERROR_NONE) {
close_from_api(exec_ctx, transport_global, stream_global,
- op->close_with_status, op->optional_close_message);
+ GRPC_ERROR_REF(op->close_error));
}
if (op->send_initial_metadata != NULL) {
@@ -979,12 +989,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
transport_global->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (metadata_size > metadata_peer_limit) {
- gpr_log(GPR_DEBUG,
- "to-be-sent initial metadata size exceeds peer limit "
- "(%" PRIuPTR " vs. %" PRIuPTR ")",
- metadata_size, metadata_peer_limit);
- cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
+ cancel_from_api(
+ exec_ctx, transport_global, stream_global,
+ grpc_error_set_int(
+ grpc_error_set_int(
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE("to-be-sent initial metadata size "
+ "exceeds peer limit"),
+ GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size),
+ GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
@@ -1038,12 +1052,16 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
transport_global->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (metadata_size > metadata_peer_limit) {
- gpr_log(GPR_DEBUG,
- "to-be-sent trailing metadata size exceeds peer limit "
- "(%" PRIuPTR " vs. %" PRIuPTR ")",
- metadata_size, metadata_peer_limit);
- cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
+ cancel_from_api(
+ exec_ctx, transport_global, stream_global,
+ grpc_error_set_int(
+ grpc_error_set_int(
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE("to-be-sent trailing metadata size "
+ "exceeds peer limit"),
+ GRPC_ERROR_INT_SIZE, (intptr_t)metadata_size),
+ GRPC_ERROR_INT_LIMIT, (intptr_t)metadata_peer_limit),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
@@ -1235,8 +1253,12 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
- cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
+ cancel_from_api(
+ exec_ctx, transport_global, stream_global,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE(
+ "received initial metadata size exceeds limit"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1275,8 +1297,12 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
}
if (stream_global->exceeded_metadata_size) {
- cancel_from_api(exec_ctx, transport_global, stream_global,
- GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
+ cancel_from_api(
+ exec_ctx, transport_global, stream_global,
+ grpc_error_set_int(
+ GRPC_ERROR_CREATE(
+ "received trailing metadata size exceeds limit"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
}
}
if (stream_global->all_incoming_byte_streams_finished) {
@@ -1316,15 +1342,15 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
}
if (s->parsing.data_parser.parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream_finished(
- exec_ctx, s->parsing.data_parser.parsing_frame,
- GRPC_ERROR_CREATE_REFERENCING("Stream removed", &error, 1), 0);
+ exec_ctx, s->parsing.data_parser.parsing_frame, GRPC_ERROR_REF(error),
+ 0);
s->parsing.data_parser.parsing_frame = NULL;
}
if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
close_transport_locked(
- exec_ctx, t,
- GRPC_ERROR_CREATE("Last stream closed after sending GOAWAY"));
+ exec_ctx, t, GRPC_ERROR_CREATE_REFERENCING(
+ "Last stream closed after sending GOAWAY", &error, 1));
}
if (grpc_chttp2_list_remove_writable_stream(&t->global, &s->global)) {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
@@ -1340,35 +1366,67 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_ERROR_UNREF(error);
}
+static void status_codes_from_error(grpc_error *error,
+ grpc_chttp2_error_code *http2_error,
+ grpc_status_code *grpc_status) {
+ intptr_t ip_http;
+ intptr_t ip_grpc;
+ bool have_http =
+ grpc_error_get_int(error, GRPC_ERROR_INT_HTTP2_ERROR, &ip_http);
+ bool have_grpc =
+ grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &ip_grpc);
+ if (have_http) {
+ *http2_error = (grpc_chttp2_error_code)ip_http;
+ } else if (have_grpc) {
+ *http2_error =
+ grpc_chttp2_grpc_status_to_http2_error((grpc_status_code)ip_grpc);
+ } else {
+ *http2_error = GRPC_CHTTP2_INTERNAL_ERROR;
+ }
+ if (have_grpc) {
+ *grpc_status = (grpc_status_code)ip_grpc;
+ } else if (have_http) {
+ *grpc_status =
+ grpc_chttp2_http2_error_to_grpc_status((grpc_chttp2_error_code)ip_http);
+ } else {
+ *grpc_status = GRPC_STATUS_INTERNAL;
+ }
+}
+
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status,
- gpr_slice *optional_message) {
+ grpc_error *due_to_error) {
if (!stream_global->read_closed || !stream_global->write_closed) {
+ grpc_status_code grpc_status;
+ grpc_chttp2_error_code http_error;
+ status_codes_from_error(due_to_error, &http_error, &grpc_status);
+
if (stream_global->id != 0) {
gpr_slice_buffer_add(
&transport_global->qbuf,
- grpc_chttp2_rst_stream_create(
- stream_global->id,
- (uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
- &stream_global->stats.outgoing));
+ grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error,
+ &stream_global->stats.outgoing));
}
- if (optional_message) {
- gpr_slice_ref(*optional_message);
+ const char *msg =
+ grpc_error_get_str(due_to_error, GRPC_ERROR_STR_GRPC_MESSAGE);
+ bool free_msg = false;
+ if (msg == NULL) {
+ free_msg = true;
+ msg = grpc_error_string(due_to_error);
}
- grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
- optional_message);
+ gpr_slice msg_slice = gpr_slice_from_copied_string(msg);
+ grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
+ grpc_status, &msg_slice);
+ if (free_msg) grpc_error_free_string(msg);
}
- if (status != GRPC_STATUS_OK && !stream_global->seen_error) {
+ if (due_to_error != GRPC_ERROR_NONE && !stream_global->seen_error) {
stream_global->seen_error = true;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
- grpc_chttp2_mark_stream_closed(
- exec_ctx, transport_global, stream_global, 1, 1,
- grpc_error_set_int(GRPC_ERROR_CREATE("Cancelled"),
- GRPC_ERROR_INT_GRPC_STATUS, status));
+ grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
+ 1, due_to_error);
}
void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
@@ -1451,6 +1509,7 @@ void grpc_chttp2_mark_stream_closed(
}
}
if (stream_global->read_closed && stream_global->write_closed) {
+ stream_global->removal_error = GRPC_ERROR_REF(error);
if (stream_global->id != 0 &&
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) {
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
@@ -1469,15 +1528,17 @@ void grpc_chttp2_mark_stream_closed(
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
- grpc_status_code status,
- gpr_slice *optional_message) {
+ grpc_error *error) {
gpr_slice hdr;
gpr_slice status_hdr;
gpr_slice message_pfx;
uint8_t *p;
uint32_t len = 0;
+ grpc_status_code grpc_status;
+ grpc_chttp2_error_code http_error;
+ status_codes_from_error(error, &http_error, &grpc_status);
- GPR_ASSERT(status >= 0 && (int)status < 100);
+ GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
if (stream_global->id != 0 && !transport_global->is_client) {
/* Hand roll a header block.
@@ -1487,7 +1548,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
time we got around to sending this, so instead we ignore HPACK
compression
and just write the uncompressed bytes onto the wire. */
- status_hdr = gpr_slice_malloc(15 + (status >= 10));
+ status_hdr = gpr_slice_malloc(15 + (grpc_status >= 10));
p = GPR_SLICE_START_PTR(status_hdr);
*p++ = 0x40; /* literal header */
*p++ = 11; /* len(grpc-status) */
@@ -1502,19 +1563,23 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
*p++ = 't';
*p++ = 'u';
*p++ = 's';
- if (status < 10) {
+ if (grpc_status < 10) {
*p++ = 1;
- *p++ = (uint8_t)('0' + status);
+ *p++ = (uint8_t)('0' + grpc_status);
} else {
*p++ = 2;
- *p++ = (uint8_t)('0' + (status / 10));
- *p++ = (uint8_t)('0' + (status % 10));
+ *p++ = (uint8_t)('0' + (grpc_status / 10));
+ *p++ = (uint8_t)('0' + (grpc_status % 10));
}
GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
len += (uint32_t)GPR_SLICE_LENGTH(status_hdr);
- if (optional_message) {
- GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
+ const char *optional_message =
+ grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
+
+ if (optional_message != NULL) {
+ size_t msg_len = strlen(optional_message);
+ GPR_ASSERT(msg_len < 127);
message_pfx = gpr_slice_malloc(15);
p = GPR_SLICE_START_PTR(message_pfx);
*p++ = 0x40;
@@ -1531,10 +1596,10 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
*p++ = 'a';
*p++ = 'g';
*p++ = 'e';
- *p++ = (uint8_t)GPR_SLICE_LENGTH(*optional_message);
+ *p++ = (uint8_t)msg_len;
GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
len += (uint32_t)GPR_SLICE_LENGTH(message_pfx);
- len += (uint32_t)GPR_SLICE_LENGTH(*optional_message);
+ len += (uint32_t)msg_len;
}
hdr = gpr_slice_malloc(9);
@@ -1555,53 +1620,53 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
if (optional_message) {
gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
gpr_slice_buffer_add(&transport_global->qbuf,
- gpr_slice_ref(*optional_message));
+ gpr_slice_from_copied_string(optional_message));
}
-
gpr_slice_buffer_add(
&transport_global->qbuf,
grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR,
&stream_global->stats.outgoing));
-
- if (optional_message) {
- gpr_slice_ref(*optional_message);
- }
}
- grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
- optional_message);
- grpc_error *err = GRPC_ERROR_CREATE("Stream closed");
- err = grpc_error_set_int(err, GRPC_ERROR_INT_GRPC_STATUS, status);
- if (optional_message) {
- char *str =
- gpr_dump_slice(*optional_message, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- err = grpc_error_set_str(err, GRPC_ERROR_STR_GRPC_MESSAGE, str);
- gpr_free(str);
+ const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
+ bool free_msg = false;
+ if (msg == NULL) {
+ free_msg = true;
+ msg = grpc_error_string(error);
}
+ gpr_slice msg_slice = gpr_slice_from_copied_string(msg);
+ grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global,
+ grpc_status, &msg_slice);
+ if (free_msg) grpc_error_free_string(msg);
+
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
- 1, err);
+ 1, error);
}
+typedef struct {
+ grpc_exec_ctx *exec_ctx;
+ grpc_error *error;
+} cancel_stream_cb_args;
+
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
- grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global);
- cancel_from_api(user_data, transport_global, stream_global,
- GRPC_STATUS_UNAVAILABLE,
- GPR_SLICE_IS_EMPTY(transport->optional_drop_message)
- ? NULL
- : &transport->optional_drop_message);
+ cancel_stream_cb_args *args = user_data;
+ cancel_from_api(args->exec_ctx, transport_global, stream_global,
+ GRPC_ERROR_REF(args->error));
}
-static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
- grpc_chttp2_transport *t) {
- grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
+static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
+ grpc_error *error) {
+ cancel_stream_cb_args args = {exec_ctx, error};
+ grpc_chttp2_for_all_streams(&t->global, &args, cancel_stream_cb);
+ GRPC_ERROR_UNREF(error);
}
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_error *error) {
- close_transport_locked(exec_ctx, t, error);
- end_all_the_calls(exec_ctx, t);
+ close_transport_locked(exec_ctx, t, GRPC_ERROR_REF(error));
+ end_all_the_calls(exec_ctx, t, error);
}
/** update window from a settings change */
@@ -1707,16 +1772,8 @@ static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing,
t->read_buffer.slices[i]);
};
- if (i != t->read_buffer.count || errors[1] != GRPC_ERROR_NONE) {
- gpr_slice_unref(t->optional_drop_message);
+ if (i != t->read_buffer.count) {
errors[2] = try_http_parsing(exec_ctx, t);
- if (errors[2] != GRPC_ERROR_NONE) {
- t->optional_drop_message = gpr_slice_from_copied_string(
- "Connection dropped: received http1.x response");
- } else {
- t->optional_drop_message = gpr_slice_from_copied_string(
- "Connection dropped: received unparseable response");
- }
}
grpc_error *err =
errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE &&
@@ -1767,7 +1824,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
- GRPC_ERROR_CREATE("Stream removed"));
+ GRPC_ERROR_REF(stream_global->removal_error));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
@@ -1784,6 +1841,10 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
error = GRPC_ERROR_CREATE("Transport closed");
}
if (error != GRPC_ERROR_NONE) {
+ if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ }
drop_connection(exec_ctx, t, GRPC_ERROR_REF(error));
t->endpoint_reading = 0;
if (!t->executor.writing_active && t->ep) {
@@ -1798,6 +1859,7 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
prevent_endpoint_shutdown(t);
}
gpr_slice_buffer_reset_and_unref(&t->read_buffer);
+ GRPC_ERROR_UNREF(error);
if (keep_reading) {
grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->reading_action);
@@ -1806,8 +1868,6 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
} else {
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}
-
- GRPC_LOG_IF_ERROR("close_transport", error);
}
/*******************************************************************************
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index d63170e350..b5180c6fc8 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -384,9 +384,6 @@ struct grpc_chttp2_transport {
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
-
- /** Message explaining the reason of dropping connection */
- gpr_slice optional_drop_message;
};
typedef struct {
@@ -439,6 +436,9 @@ typedef struct {
bool seen_error;
bool exceeded_metadata_size;
+ /** the error that resulted in this stream being removed */
+ grpc_error *removal_error;
+
bool published_initial_metadata;
bool published_trailing_metadata;
bool final_metadata_requested;