aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-09-08 10:19:26 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-09-08 10:19:26 -0700
commit5946ae5eff6cb97d0f5aed2bceda34811c6290fc (patch)
tree3325a2d8b304364a5f06b133c91afb22e440a85f /src/core
parentfe1f7f58139a3ccafb7b8e8ca92d5d209880a3e5 (diff)
parent2f76fd452ebd796945b29f4ad1d10471dfac3346 (diff)
Merge branch 'channelz-subchannels' into channelz-server
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc134
-rw-r--r--src/core/ext/filters/http/client/http_client_filter.cc16
-rw-r--r--src/core/ext/filters/http/server/http_server_filter.cc25
-rw-r--r--src/core/ext/filters/max_age/max_age_filter.cc3
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.cc36
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc465
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h2
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc2
-rw-r--r--src/core/lib/channel/channelz.h7
-rw-r--r--src/core/lib/channel/connected_channel.cc6
-rw-r--r--src/core/lib/iomgr/error.cc21
-rw-r--r--src/core/lib/iomgr/error.h8
-rw-r--r--src/core/lib/iomgr/timer_generic.cc7
-rw-r--r--src/core/lib/security/security_connector/security_connector.cc14
-rw-r--r--src/core/lib/security/transport/server_auth_filter.cc25
-rw-r--r--src/core/lib/surface/call.cc380
-rw-r--r--src/core/lib/surface/channel.cc1
-rw-r--r--src/core/lib/surface/server.cc23
-rw-r--r--src/core/tsi/alts/handshaker/alts_handshaker_client.cc9
-rw-r--r--src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc4
-rw-r--r--src/core/tsi/alts/handshaker/alts_tsi_event.cc6
-rw-r--r--src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc11
-rw-r--r--src/core/tsi/alts/handshaker/alts_tsi_utils.cc4
-rw-r--r--src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc4
-rw-r--r--src/core/tsi/alts_transport_security.cc4
-rw-r--r--src/core/tsi/ssl/session_cache/ssl_session_cache.cc3
26 files changed, 621 insertions, 599 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index d015ceb335..4691aa1c97 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -457,7 +457,6 @@ get_service_config_from_resolver_result_locked(channel_data* chand) {
grpc_uri* uri = grpc_uri_parse(server_uri, true);
GPR_ASSERT(uri->path[0] != '\0');
service_config_parsing_state parsing_state;
- memset(&parsing_state, 0, sizeof(parsing_state));
parsing_state.server_name =
uri->path[0] == '/' ? uri->path + 1 : uri->path;
service_config->ParseGlobalParams(parse_retry_throttle_params,
@@ -937,7 +936,7 @@ typedef struct client_channel_call_data {
// state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_channelz;
grpc_closure* original_recv_trailing_metadata;
- grpc_transport_stream_op_batch* recv_trailing_metadata_batch;
+ grpc_metadata_batch* recv_trailing_metadata_batch;
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@@ -1000,14 +999,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored);
-template <typename Predicate>
-static pending_batch* pending_batch_find(grpc_call_element* elem,
- const char* log_message,
- Predicate predicate);
-static void get_call_status(grpc_call_element* elem,
- grpc_metadata_batch* md_batch, grpc_error* error,
- grpc_status_code* status,
- grpc_mdelem** server_pushback_md);
+static void maybe_intercept_metadata_for_channelz(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
//
// send op data caching
@@ -1282,66 +1275,6 @@ static void resume_pending_batch_in_call_combiner(void* arg,
grpc_subchannel_call_process_op(subchannel_call, batch);
}
-static void recv_trailing_metadata_ready_channelz(void* arg,
- grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
- "error=%s",
- chand, calld, grpc_error_string(error));
- }
- GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_metadata_batch* md_batch =
- calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata
- .recv_trailing_metadata;
- get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
- grpc_core::channelz::SubchannelNode* channelz_subchannel =
- calld->pick.connected_subchannel->channelz_subchannel();
- GPR_ASSERT(channelz_subchannel != nullptr);
- if (status == GRPC_STATUS_OK) {
- channelz_subchannel->RecordCallSucceeded();
- } else {
- channelz_subchannel->RecordCallFailed();
- }
- calld->recv_trailing_metadata_batch = nullptr;
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
-}
-
-// If channelz is enabled, intercept recv_trailing so that we may check the
-// status and associate it to a subchannel.
-// Returns true if callback was intercepted, false otherwise.
-static void maybe_intercept_recv_trailing_for_channelz(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- // only intercept payloads with recv trailing.
- if (!batch->recv_trailing_metadata) {
- return;
- }
- // only add interceptor is channelz is enabled.
- if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
- return;
- }
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
- batch);
- }
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
- recv_trailing_metadata_ready_channelz, elem,
- grpc_schedule_on_exec_ctx);
- // save some state needed for the interception callback.
- GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
- calld->recv_trailing_metadata_batch = batch;
- calld->original_recv_trailing_metadata =
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &calld->recv_trailing_metadata_ready_channelz;
-}
-
// This is called via the call combiner, so access to calld is synchronized.
static void pending_batches_resume(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@@ -1366,7 +1299,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- maybe_intercept_recv_trailing_for_channelz(elem, batch);
+ maybe_intercept_metadata_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
@@ -2736,6 +2669,65 @@ static void pick_done(void* arg, grpc_error* error) {
}
}
+static void recv_trailing_metadata_ready_channelz(void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
+ "error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_metadata_batch* md_batch = calld->recv_trailing_metadata_batch;
+ get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ calld->pick.connected_subchannel->channelz_subchannel();
+ GPR_ASSERT(channelz_subchannel != nullptr);
+ if (status == GRPC_STATUS_OK) {
+ channelz_subchannel->RecordCallSucceeded();
+ } else {
+ channelz_subchannel->RecordCallFailed();
+ }
+ calld->recv_trailing_metadata_batch = nullptr;
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
+}
+
+// If channelz is enabled, intercept recv_trailing so that we may check the
+// status and associate it to a subchannel.
+// Returns true if callback was intercepted, false otherwise.
+static void maybe_intercept_metadata_for_channelz(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // only intercept payloads with recv trailing.
+ if (!batch->recv_trailing_metadata) {
+ return;
+ }
+ // only add interceptor is channelz is enabled.
+ if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
+ return;
+ }
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
+ batch);
+ }
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
+ recv_trailing_metadata_ready_channelz, elem,
+ grpc_schedule_on_exec_ctx);
+ // save some state needed for the interception callback.
+ GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
+ calld->recv_trailing_metadata_batch =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ calld->original_recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready_channelz;
+}
+
static void maybe_add_call_to_channel_interested_parties_locked(
grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc
index 1678051beb..91fa163fec 100644
--- a/src/core/ext/filters/http/client/http_client_filter.cc
+++ b/src/core/ext/filters/http/client/http_client_filter.cc
@@ -51,6 +51,7 @@ struct call_data {
grpc_linked_mdelem user_agent;
// State for handling recv_initial_metadata ops.
grpc_metadata_batch* recv_initial_metadata;
+ grpc_error* recv_initial_metadata_error;
grpc_closure* original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
@@ -78,7 +79,12 @@ struct channel_data {
static grpc_error* client_filter_incoming_metadata(grpc_call_element* elem,
grpc_metadata_batch* b) {
if (b->idx.named.status != nullptr) {
- if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) {
+ /* If both gRPC status and HTTP status are provided in the response, we
+ * should prefer the gRPC status code, as mentioned in
+ * https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md.
+ */
+ if (b->idx.named.grpc_status != nullptr ||
+ grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) {
grpc_metadata_batch_remove(b, b->idx.named.status);
} else {
char* val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md),
@@ -147,6 +153,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error == GRPC_ERROR_NONE) {
error = client_filter_incoming_metadata(elem, calld->recv_initial_metadata);
+ calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
} else {
GRPC_ERROR_REF(error);
}
@@ -162,6 +169,8 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
} else {
GRPC_ERROR_REF(error);
}
+ error = grpc_error_add_child(
+ error, GRPC_ERROR_REF(calld->recv_initial_metadata_error));
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
}
@@ -434,7 +443,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
- grpc_closure* ignored) {}
+ grpc_closure* ignored) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
+}
static grpc_mdelem scheme_from_args(const grpc_channel_args* args) {
unsigned i;
diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc
index 3919447f26..926afeec84 100644
--- a/src/core/ext/filters/http/server/http_server_filter.cc
+++ b/src/core/ext/filters/http/server/http_server_filter.cc
@@ -50,6 +50,7 @@ struct call_data {
// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
+ grpc_error* recv_initial_metadata_ready_error;
grpc_closure* original_recv_initial_metadata_ready;
grpc_metadata_batch* recv_initial_metadata;
uint32_t* recv_initial_metadata_flags;
@@ -60,6 +61,9 @@ struct call_data {
grpc_closure recv_message_ready;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
bool seen_recv_message_ready;
+
+ grpc_closure recv_trailing_metadata_ready;
+ grpc_closure* original_recv_trailing_metadata_ready;
};
} // namespace
@@ -267,6 +271,7 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) {
calld->seen_recv_initial_metadata_ready = true;
if (err == GRPC_ERROR_NONE) {
err = hs_filter_incoming_metadata(elem, calld->recv_initial_metadata);
+ calld->recv_initial_metadata_ready_error = GRPC_ERROR_REF(err);
if (calld->seen_recv_message_ready) {
// We've already seen the recv_message callback, but we previously
// deferred it, so we need to return it here.
@@ -313,6 +318,15 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) {
}
}
+static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ err = grpc_error_add_child(
+ GRPC_ERROR_REF(err),
+ GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error));
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
+}
+
static grpc_error* hs_mutate_op(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
/* grab pointers to our data from the call element */
@@ -357,6 +371,13 @@ static grpc_error* hs_mutate_op(grpc_call_element* elem,
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
+ if (op->recv_trailing_metadata) {
+ calld->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
+ }
+
if (op->send_trailing_metadata) {
grpc_error* error = hs_filter_outgoing_metadata(
elem, op->payload->send_trailing_metadata.send_trailing_metadata);
@@ -389,6 +410,9 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->recv_message_ready, hs_recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ hs_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE;
}
@@ -397,6 +421,7 @@ static void hs_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
+ GRPC_ERROR_UNREF(calld->recv_initial_metadata_ready_error);
if (calld->have_read_stream) {
calld->read_stream->Orphan();
}
diff --git a/src/core/ext/filters/max_age/max_age_filter.cc b/src/core/ext/filters/max_age/max_age_filter.cc
index 1fe8288bd0..431472609e 100644
--- a/src/core/ext/filters/max_age/max_age_filter.cc
+++ b/src/core/ext/filters/max_age/max_age_filter.cc
@@ -429,8 +429,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
? GRPC_MILLIS_INF_FUTURE
: DEFAULT_MAX_CONNECTION_IDLE_MS;
chand->idle_state = MAX_IDLE_STATE_INIT;
- gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis,
- GRPC_MILLIS_INF_PAST);
+ gpr_atm_no_barrier_store(&chand->last_enter_idle_time_millis, GPR_ATM_MIN);
for (size_t i = 0; i < args->channel_args->num_args; ++i) {
if (0 == strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_CONNECTION_AGE_MS)) {
diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc
index c7fc3f2e62..c17df86f3d 100644
--- a/src/core/ext/filters/message_size/message_size_filter.cc
+++ b/src/core/ext/filters/message_size/message_size_filter.cc
@@ -99,10 +99,15 @@ struct call_data {
// recv_message_ready up-call on transport_stream_op, and remember to
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
+ grpc_closure recv_trailing_metadata_ready;
+ // The error caused by a message that is too large, or GRPC_ERROR_NONE
+ grpc_error* error;
// Used by recv_message_ready.
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready;
+ // Original recv_trailing_metadata callback, invoked after our own.
+ grpc_closure* original_recv_trailing_metadata_ready;
};
struct channel_data {
@@ -130,12 +135,13 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
+ GRPC_ERROR_UNREF(calld->error);
if (error == GRPC_ERROR_NONE) {
error = new_error;
} else {
error = grpc_error_add_child(error, new_error);
- GRPC_ERROR_UNREF(new_error);
}
+ calld->error = GRPC_ERROR_REF(error);
gpr_free(message_string);
} else {
GRPC_ERROR_REF(error);
@@ -144,6 +150,17 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error);
}
+// Callback invoked on completion of recv_trailing_metadata
+// Notifies the recv_trailing_metadata batch of any message size failures
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ error =
+ grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error));
+ // Invoke the next callback.
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
+}
+
// Start transport stream op.
static void start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
@@ -172,6 +189,13 @@ static void start_transport_stream_op_batch(
calld->recv_message = op->payload->recv_message.recv_message;
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
+ // Inject callback for receiving trailing metadata.
+ if (op->recv_trailing_metadata) {
+ calld->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
+ }
// Chain to the next filter.
grpc_call_next_op(elem, op);
}
@@ -183,8 +207,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
calld->call_combiner = args->call_combiner;
calld->next_recv_message_ready = nullptr;
+ calld->original_recv_trailing_metadata_ready = nullptr;
+ calld->error = GRPC_ERROR_NONE;
GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
@@ -213,7 +242,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
// Destructor for call_data.
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
- grpc_closure* ignored) {}
+ grpc_closure* ignored) {
+ call_data* calld = (call_data*)elem->call_data;
+ GRPC_ERROR_UNREF(calld->error);
+}
static int default_size(const grpc_channel_args* args,
int without_minimal_stack) {
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 027a57d606..26cad2cc9a 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -230,35 +230,165 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport* t) { gpr_ref(&t->refs); }
static const grpc_transport_vtable* get_vtable(void);
-static void init_transport(grpc_chttp2_transport* t,
- const grpc_channel_args* channel_args,
- grpc_endpoint* ep, bool is_client) {
+/* Returns whether bdp is enabled */
+static bool read_channel_args(grpc_chttp2_transport* t,
+ const grpc_channel_args* channel_args,
+ bool is_client) {
+ bool enable_bdp = true;
size_t i;
int j;
- GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
- GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
-
- t->base.vtable = get_vtable();
- t->ep = ep;
- /* one ref is for destroy */
- gpr_ref_init(&t->refs, 1);
- t->combiner = grpc_combiner_create();
- t->peer_string = grpc_endpoint_get_peer(ep);
- t->endpoint_reading = 1;
- t->next_stream_id = is_client ? 1 : 2;
- t->is_client = is_client;
- t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
- t->is_first_frame = true;
- grpc_connectivity_state_init(
- &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
- is_client ? "client_transport" : "server_transport");
-
- grpc_slice_buffer_init(&t->qbuf);
-
- grpc_slice_buffer_init(&t->outbuf);
- grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
+ for (i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
+ if ((t->next_stream_id & 1) != (value & 1)) {
+ gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
+ GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
+ is_client ? "client" : "server");
+ } else {
+ t->next_stream_id = static_cast<uint32_t>(value);
+ }
+ }
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
+ const grpc_integer_options options = {-1, 0, INT_MAX};
+ const int value =
+ grpc_channel_arg_get_integer(&channel_args->args[i], options);
+ if (value >= 0) {
+ grpc_chttp2_hpack_compressor_set_max_usable_size(
+ &t->hpack_compressor, static_cast<uint32_t>(value));
+ }
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
+ t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ {g_default_max_pings_without_data, 0, INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
+ t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
+ &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
+ } else if (0 ==
+ strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ t->ping_policy.min_sent_ping_interval_without_data =
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{
+ g_default_min_sent_ping_interval_without_data_ms, 0,
+ INT_MAX});
+ } else if (0 ==
+ strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
+ t->ping_policy.min_recv_ping_interval_without_data =
+ grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{
+ g_default_min_recv_ping_interval_without_data_ms, 0,
+ INT_MAX});
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
+ t->write_buffer_size = static_cast<uint32_t>(grpc_channel_arg_get_integer(
+ &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
+ enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
+ } else if (0 ==
+ strcmp(channel_args->args[i].key, GRPC_ARG_KEEPALIVE_TIME_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{t->is_client
+ ? g_default_client_keepalive_time_ms
+ : g_default_server_keepalive_time_ms,
+ 1, INT_MAX});
+ t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i],
+ grpc_integer_options{t->is_client
+ ? g_default_client_keepalive_timeout_ms
+ : g_default_server_keepalive_timeout_ms,
+ 0, INT_MAX});
+ t->keepalive_timeout = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
+ t->keepalive_permit_without_calls = static_cast<uint32_t>(
+ grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_OPTIMIZATION_TARGET)) {
+ if (channel_args->args[i].type != GRPC_ARG_STRING) {
+ gpr_log(GPR_ERROR, "%s should be a string",
+ GRPC_ARG_OPTIMIZATION_TARGET);
+ } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
+ } else if (0 ==
+ strcmp(channel_args->args[i].value.string, "throughput")) {
+ t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
+ } else {
+ gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
+ GRPC_ARG_OPTIMIZATION_TARGET,
+ channel_args->args[i].value.string);
+ }
+ } else {
+ static const struct {
+ const char* channel_arg_name;
+ grpc_chttp2_setting_id setting_id;
+ grpc_integer_options integer_options;
+ bool availability[2] /* server, client */;
+ } settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
+ GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
+ {-1, 0, INT32_MAX},
+ {true, false}},
+ {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
+ GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_MAX_METADATA_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ {-1, 0, INT32_MAX},
+ {true, true}},
+ {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
+ GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
+ {-1, 16384, 16777215},
+ {true, true}},
+ {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
+ GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
+ {1, 0, 1},
+ {true, true}},
+ {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
+ GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
+ {-1, 5, INT32_MAX},
+ {true, true}}};
+ for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
+ if (0 == strcmp(channel_args->args[i].key,
+ settings_map[j].channel_arg_name)) {
+ if (!settings_map[j].availability[is_client]) {
+ gpr_log(GPR_DEBUG, "%s is not available on %s",
+ settings_map[j].channel_arg_name,
+ is_client ? "clients" : "servers");
+ } else {
+ int value = grpc_channel_arg_get_integer(
+ &channel_args->args[i], settings_map[j].integer_options);
+ if (value >= 0) {
+ queue_setting_update(t, settings_map[j].setting_id,
+ static_cast<uint32_t>(value));
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+ return enable_bdp;
+}
+static void init_transport_closures(grpc_chttp2_transport* t) {
GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t,
grpc_combiner_scheduler(t->combiner));
GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t,
@@ -286,6 +416,79 @@ static void init_transport(grpc_chttp2_transport* t,
GRPC_CLOSURE_INIT(&t->keepalive_watchdog_fired_locked,
keepalive_watchdog_fired_locked, t,
grpc_combiner_scheduler(t->combiner));
+}
+
+static void init_transport_keepalive_settings(grpc_chttp2_transport* t) {
+ if (t->is_client) {
+ t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_client_keepalive_time_ms;
+ t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_client_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_client_keepalive_permit_without_calls;
+ } else {
+ t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_server_keepalive_time_ms;
+ t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
+ ? GRPC_MILLIS_INF_FUTURE
+ : g_default_server_keepalive_timeout_ms;
+ t->keepalive_permit_without_calls =
+ g_default_server_keepalive_permit_without_calls;
+ }
+}
+
+static void configure_transport_ping_policy(grpc_chttp2_transport* t) {
+ t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
+ t->ping_policy.min_sent_ping_interval_without_data =
+ g_default_min_sent_ping_interval_without_data_ms;
+ t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
+ t->ping_policy.min_recv_ping_interval_without_data =
+ g_default_min_recv_ping_interval_without_data_ms;
+}
+
+static void init_keepalive_pings_if_enabled(grpc_chttp2_transport* t) {
+ if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
+ GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
+ grpc_timer_init(&t->keepalive_ping_timer,
+ grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
+ &t->init_keepalive_ping_locked);
+ } else {
+ /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
+ inflight keeaplive timers */
+ t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
+ }
+}
+
+static void init_transport(grpc_chttp2_transport* t,
+ const grpc_channel_args* channel_args,
+ grpc_endpoint* ep, bool is_client) {
+ GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
+ GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
+
+ t->base.vtable = get_vtable();
+ t->ep = ep;
+ /* one ref is for destroy */
+ gpr_ref_init(&t->refs, 1);
+ t->combiner = grpc_combiner_create();
+ t->peer_string = grpc_endpoint_get_peer(ep);
+ t->endpoint_reading = 1;
+ t->next_stream_id = is_client ? 1 : 2;
+ t->is_client = is_client;
+ t->deframe_state = is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
+ t->is_first_frame = true;
+ grpc_connectivity_state_init(
+ &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
+ is_client ? "client_transport" : "server_transport");
+
+ grpc_slice_buffer_init(&t->qbuf);
+ grpc_slice_buffer_init(&t->outbuf);
+ grpc_chttp2_hpack_compressor_init(&t->hpack_compressor);
+
+ init_transport_closures(t);
t->goaway_error = GRPC_ERROR_NONE;
grpc_chttp2_goaway_parser_init(&t->goaway_parser);
@@ -301,6 +504,8 @@ static void init_transport(grpc_chttp2_transport* t,
grpc_chttp2_stream_map_init(&t->stream_map, 8);
/* copy in initial settings to all setting sets */
+ size_t i;
+ int j;
for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
t->settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
@@ -328,191 +533,14 @@ static void init_transport(grpc_chttp2_transport* t,
queue_setting_update(t, GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
1);
- t->ping_policy.max_pings_without_data = g_default_max_pings_without_data;
- t->ping_policy.min_sent_ping_interval_without_data =
- g_default_min_sent_ping_interval_without_data_ms;
- t->ping_policy.max_ping_strikes = g_default_max_ping_strikes;
- t->ping_policy.min_recv_ping_interval_without_data =
- g_default_min_recv_ping_interval_without_data_ms;
-
- /* Keepalive setting */
- if (t->is_client) {
- t->keepalive_time = g_default_client_keepalive_time_ms == INT_MAX
- ? GRPC_MILLIS_INF_FUTURE
- : g_default_client_keepalive_time_ms;
- t->keepalive_timeout = g_default_client_keepalive_timeout_ms == INT_MAX
- ? GRPC_MILLIS_INF_FUTURE
- : g_default_client_keepalive_timeout_ms;
- t->keepalive_permit_without_calls =
- g_default_client_keepalive_permit_without_calls;
- } else {
- t->keepalive_time = g_default_server_keepalive_time_ms == INT_MAX
- ? GRPC_MILLIS_INF_FUTURE
- : g_default_server_keepalive_time_ms;
- t->keepalive_timeout = g_default_server_keepalive_timeout_ms == INT_MAX
- ? GRPC_MILLIS_INF_FUTURE
- : g_default_server_keepalive_timeout_ms;
- t->keepalive_permit_without_calls =
- g_default_server_keepalive_permit_without_calls;
- }
+ configure_transport_ping_policy(t);
+ init_transport_keepalive_settings(t);
t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
bool enable_bdp = true;
-
if (channel_args) {
- for (i = 0; i < channel_args->num_args; i++) {
- if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
- const grpc_integer_options options = {-1, 0, INT_MAX};
- const int value =
- grpc_channel_arg_get_integer(&channel_args->args[i], options);
- if (value >= 0) {
- if ((t->next_stream_id & 1) != (value & 1)) {
- gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
- GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
- t->next_stream_id & 1, is_client ? "client" : "server");
- } else {
- t->next_stream_id = static_cast<uint32_t>(value);
- }
- }
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
- const grpc_integer_options options = {-1, 0, INT_MAX};
- const int value =
- grpc_channel_arg_get_integer(&channel_args->args[i], options);
- if (value >= 0) {
- grpc_chttp2_hpack_compressor_set_max_usable_size(
- &t->hpack_compressor, static_cast<uint32_t>(value));
- }
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA)) {
- t->ping_policy.max_pings_without_data = grpc_channel_arg_get_integer(
- &channel_args->args[i],
- {g_default_max_pings_without_data, 0, INT_MAX});
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_MAX_PING_STRIKES)) {
- t->ping_policy.max_ping_strikes = grpc_channel_arg_get_integer(
- &channel_args->args[i], {g_default_max_ping_strikes, 0, INT_MAX});
- } else if (0 ==
- strcmp(
- channel_args->args[i].key,
- GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS)) {
- t->ping_policy.min_sent_ping_interval_without_data =
- grpc_channel_arg_get_integer(
- &channel_args->args[i],
- grpc_integer_options{
- g_default_min_sent_ping_interval_without_data_ms, 0,
- INT_MAX});
- } else if (0 ==
- strcmp(
- channel_args->args[i].key,
- GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS)) {
- t->ping_policy.min_recv_ping_interval_without_data =
- grpc_channel_arg_get_integer(
- &channel_args->args[i],
- grpc_integer_options{
- g_default_min_recv_ping_interval_without_data_ms, 0,
- INT_MAX});
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)) {
- t->write_buffer_size =
- static_cast<uint32_t>(grpc_channel_arg_get_integer(
- &channel_args->args[i], {0, 0, MAX_WRITE_BUFFER_SIZE}));
- } else if (0 ==
- strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) {
- enable_bdp = grpc_channel_arg_get_bool(&channel_args->args[i], true);
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_KEEPALIVE_TIME_MS)) {
- const int value = grpc_channel_arg_get_integer(
- &channel_args->args[i],
- grpc_integer_options{t->is_client
- ? g_default_client_keepalive_time_ms
- : g_default_server_keepalive_time_ms,
- 1, INT_MAX});
- t->keepalive_time = value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_KEEPALIVE_TIMEOUT_MS)) {
- const int value = grpc_channel_arg_get_integer(
- &channel_args->args[i],
- grpc_integer_options{t->is_client
- ? g_default_client_keepalive_timeout_ms
- : g_default_server_keepalive_timeout_ms,
- 0, INT_MAX});
- t->keepalive_timeout =
- value == INT_MAX ? GRPC_MILLIS_INF_FUTURE : value;
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)) {
- t->keepalive_permit_without_calls = static_cast<uint32_t>(
- grpc_channel_arg_get_integer(&channel_args->args[i], {0, 0, 1}));
- } else if (0 == strcmp(channel_args->args[i].key,
- GRPC_ARG_OPTIMIZATION_TARGET)) {
- if (channel_args->args[i].type != GRPC_ARG_STRING) {
- gpr_log(GPR_ERROR, "%s should be a string",
- GRPC_ARG_OPTIMIZATION_TARGET);
- } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) {
- t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
- } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) {
- t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY;
- } else if (0 ==
- strcmp(channel_args->args[i].value.string, "throughput")) {
- t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT;
- } else {
- gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'",
- GRPC_ARG_OPTIMIZATION_TARGET,
- channel_args->args[i].value.string);
- }
- } else {
- static const struct {
- const char* channel_arg_name;
- grpc_chttp2_setting_id setting_id;
- grpc_integer_options integer_options;
- bool availability[2] /* server, client */;
- } settings_map[] = {
- {GRPC_ARG_MAX_CONCURRENT_STREAMS,
- GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
- {-1, 0, INT32_MAX},
- {true, false}},
- {GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
- GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
- {-1, 0, INT32_MAX},
- {true, true}},
- {GRPC_ARG_MAX_METADATA_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
- {-1, 0, INT32_MAX},
- {true, true}},
- {GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
- GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
- {-1, 16384, 16777215},
- {true, true}},
- {GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
- GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
- {1, 0, 1},
- {true, true}},
- {GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
- GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
- {-1, 5, INT32_MAX},
- {true, true}}};
- for (j = 0; j < static_cast<int> GPR_ARRAY_SIZE(settings_map); j++) {
- if (0 == strcmp(channel_args->args[i].key,
- settings_map[j].channel_arg_name)) {
- if (!settings_map[j].availability[is_client]) {
- gpr_log(GPR_DEBUG, "%s is not available on %s",
- settings_map[j].channel_arg_name,
- is_client ? "clients" : "servers");
- } else {
- int value = grpc_channel_arg_get_integer(
- &channel_args->args[i], settings_map[j].integer_options);
- if (value >= 0) {
- queue_setting_update(t, settings_map[j].setting_id,
- static_cast<uint32_t>(value));
- }
- }
- break;
- }
- }
- }
- }
+ enable_bdp = read_channel_args(t, channel_args, is_client);
}
if (g_flow_control_enabled) {
@@ -531,23 +559,11 @@ static void init_transport(grpc_chttp2_transport* t,
t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST;
t->ping_recv_state.ping_strikes = 0;
- /* Start keepalive pings */
- if (t->keepalive_time != GRPC_MILLIS_INF_FUTURE) {
- t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
- GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping");
- grpc_timer_init(&t->keepalive_ping_timer,
- grpc_core::ExecCtx::Get()->Now() + t->keepalive_time,
- &t->init_keepalive_ping_locked);
- } else {
- /* Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
- inflight keeaplive timers */
- t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
- }
+ init_keepalive_pings_if_enabled(t);
if (enable_bdp) {
GRPC_CHTTP2_REF_TRANSPORT(t, "bdp_ping");
schedule_bdp_ping_locked(t);
-
grpc_chttp2_act_on_flowctl_action(t->flow_control->PeriodicUpdate(), t,
nullptr);
}
@@ -2887,17 +2903,20 @@ bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
}
}
+void Chttp2IncomingByteStream::MaybeCreateStreamDecompressionCtx() {
+ if (!stream_->stream_decompression_ctx) {
+ stream_->stream_decompression_ctx = grpc_stream_compression_context_create(
+ stream_->stream_decompression_method);
+ }
+}
+
grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
grpc_error* error;
if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
if (!stream_->unprocessed_incoming_frames_decompressed) {
bool end_of_context;
- if (!stream_->stream_decompression_ctx) {
- stream_->stream_decompression_ctx =
- grpc_stream_compression_context_create(
- stream_->stream_decompression_method);
- }
+ MaybeCreateStreamDecompressionCtx();
if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
&stream_->unprocessed_incoming_frames_buffer,
&stream_->decompressed_data_buffer, nullptr,
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ca6e715978..6b5309bab4 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -246,6 +246,8 @@ class Chttp2IncomingByteStream : public ByteStream {
static void NextLocked(void* arg, grpc_error* error_ignored);
static void OrphanLocked(void* arg, grpc_error* error_ignored);
+ void MaybeCreateStreamDecompressionCtx();
+
grpc_chttp2_transport* transport_; // Immutable.
grpc_chttp2_stream* stream_; // Immutable.
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 4a252d972d..81e2634e3a 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -1287,7 +1287,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_error* error = GRPC_ERROR_NONE;
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
error = GRPC_ERROR_REF(stream_state->cancel_error);
- } else if (stream_state->state_op_done[OP_FAILED]) {
+ } else if (stream_state->state_callback_received[OP_FAILED]) {
error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
} else if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index 603ec0b473..9be256147b 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -79,7 +79,7 @@ class BaseNode : public RefCounted<BaseNode> {
const intptr_t uuid_;
};
-// This class is a helper class for channelz entities that deal with Channels
+// This class is a helper class for channelz entities that deal with Channels,
// Subchannels, and Servers, since those have similar proto definitions.
// This class has the ability to:
// - track calls_{started,succeeded,failed}
@@ -133,6 +133,9 @@ class ChannelNode : public BaseNode {
// so it leaves these implementations blank.
//
// This is utilizing the template method design pattern.
+ //
+ // TODO(ncteisen): remove these template methods in favor of manual traversal
+ // and mutation of the grpc_json object.
virtual void PopulateConnectivityState(grpc_json* json) {}
virtual void PopulateChildRefs(grpc_json* json) {}
@@ -158,7 +161,7 @@ class ChannelNode : public BaseNode {
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
- // to allow the channel trace test to access trace();
+ // to allow the channel trace test to access trace_.
friend class testing::ChannelNodePeer;
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index 90a0254663..4a4f0e49d0 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -104,18 +104,18 @@ static void con_start_transport_stream_op_batch(
if (batch->recv_initial_metadata) {
callback_state* state = &calld->recv_initial_metadata_ready;
intercept_callback(
- calld, state, false, "connected_recv_initial_metadata_ready",
+ calld, state, false, "recv_initial_metadata_ready",
&batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
}
if (batch->recv_message) {
callback_state* state = &calld->recv_message_ready;
- intercept_callback(calld, state, false, "connected_recv_message_ready",
+ intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
if (batch->recv_trailing_metadata) {
callback_state* state = &calld->recv_trailing_metadata_ready;
intercept_callback(
- calld, state, false, "connected_recv_trailing_metadata_ready",
+ calld, state, false, "recv_trailing_metadata_ready",
&batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
}
if (batch->cancel_stream) {
diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc
index 90ed34da11..13bc69ffb6 100644
--- a/src/core/lib/iomgr/error.cc
+++ b/src/core/lib/iomgr/error.cc
@@ -513,9 +513,24 @@ bool grpc_error_get_str(grpc_error* err, grpc_error_strs which,
grpc_error* grpc_error_add_child(grpc_error* src, grpc_error* child) {
GPR_TIMER_SCOPE("grpc_error_add_child", 0);
- grpc_error* new_err = copy_error_and_unref(src);
- internal_add_error(&new_err, child);
- return new_err;
+ if (src != GRPC_ERROR_NONE) {
+ if (child == GRPC_ERROR_NONE) {
+ /* \a child is empty. Simply return the ref to \a src */
+ return src;
+ } else if (child != src) {
+ grpc_error* new_err = copy_error_and_unref(src);
+ internal_add_error(&new_err, child);
+ return new_err;
+ } else {
+ /* \a src and \a child are the same. Drop one of the references and return
+ * the other */
+ GRPC_ERROR_UNREF(child);
+ return src;
+ }
+ } else {
+ /* \a src is empty. Simply return the ref to \a child */
+ return child;
+ }
}
static const char* no_error_string = "\"No Error\"";
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 27c4d22fd1..49f4029bc2 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -185,8 +185,16 @@ bool grpc_error_get_str(grpc_error* error, grpc_error_strs which,
/// error occurring. Allows root causing high level errors from lower level
/// errors that contributed to them. The src error takes ownership of the
/// child error.
+///
+/// Edge Conditions -
+/// 1) If either of \a src or \a child is GRPC_ERROR_NONE, returns a reference
+/// to the other argument. 2) If both \a src and \a child are GRPC_ERROR_NONE,
+/// returns GRPC_ERROR_NONE. 3) If \a src and \a child point to the same error,
+/// returns a single reference. (Note that, 2 references should have been
+/// received to the error in this case.)
grpc_error* grpc_error_add_child(grpc_error* src,
grpc_error* child) GRPC_MUST_USE_RESULT;
+
grpc_error* grpc_os_error(const char* file, int line, int err,
const char* call_name) GRPC_MUST_USE_RESULT;
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index 4294162af7..008d37119a 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -291,7 +291,7 @@ static void timer_list_init() {
static void timer_list_shutdown() {
size_t i;
run_some_expired_timers(
- GPR_ATM_MAX, nullptr,
+ GRPC_MILLIS_INF_FUTURE, nullptr,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timer list shutdown"));
for (i = 0; i < g_num_shards; i++) {
timer_shard* shard = &g_shards[i];
@@ -714,9 +714,10 @@ static grpc_timer_check_result timer_check(grpc_millis* next) {
#if GPR_ARCH_64
gpr_log(GPR_INFO,
"TIMER CHECK BEGIN: now=%" PRId64 " next=%s tls_min=%" PRId64
- " glob_min=%" PRIdPTR,
+ " glob_min=%" PRId64,
now, next_str, min_timer,
- gpr_atm_no_barrier_load((gpr_atm*)(&g_shared_mutables.min_timer)));
+ static_cast<grpc_millis>(gpr_atm_no_barrier_load(
+ (gpr_atm*)(&g_shared_mutables.min_timer))));
#else
gpr_log(GPR_INFO, "TIMER CHECK BEGIN: now=%" PRId64 " next=%s min=%" PRId64,
now, next_str, min_timer);
diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc
index 04b4c87c71..6246613e7b 100644
--- a/src/core/lib/security/security_connector/security_connector.cc
+++ b/src/core/lib/security/security_connector/security_connector.cc
@@ -59,8 +59,8 @@ static const char* installed_roots_path =
/** Environment variable used as a flag to enable/disable loading system root
certificates from the OS trust store. */
-#ifndef GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR
-#define GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_USE_SYSTEM_SSL_ROOTS"
+#ifndef GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR
+#define GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR "GRPC_NOT_USE_SYSTEM_SSL_ROOTS"
#endif
#ifndef TSI_OPENSSL_ALPN_SUPPORT
@@ -1192,10 +1192,10 @@ const char* DefaultSslRootStore::GetPemRootCerts() {
grpc_slice DefaultSslRootStore::ComputePemRootCerts() {
grpc_slice result = grpc_empty_slice();
- char* use_system_roots_env_value =
- gpr_getenv(GRPC_USE_SYSTEM_SSL_ROOTS_ENV_VAR);
- const bool use_system_roots = gpr_is_true(use_system_roots_env_value);
- gpr_free(use_system_roots_env_value);
+ char* not_use_system_roots_env_value =
+ gpr_getenv(GRPC_NOT_USE_SYSTEM_SSL_ROOTS_ENV_VAR);
+ const bool not_use_system_roots = gpr_is_true(not_use_system_roots_env_value);
+ gpr_free(not_use_system_roots_env_value);
// First try to load the roots from the environment.
char* default_root_certs_path =
gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR);
@@ -1218,7 +1218,7 @@ grpc_slice DefaultSslRootStore::ComputePemRootCerts() {
gpr_free(pem_root_certs);
}
// Try loading roots from OS trust store if flag is enabled.
- if (GRPC_SLICE_IS_EMPTY(result) && use_system_roots) {
+ if (GRPC_SLICE_IS_EMPTY(result) && !not_use_system_roots) {
result = LoadSystemRootCerts();
}
// Fallback to roots manually shipped with gRPC.
diff --git a/src/core/lib/security/transport/server_auth_filter.cc b/src/core/lib/security/transport/server_auth_filter.cc
index 19cbb03b63..552e70130a 100644
--- a/src/core/lib/security/transport/server_auth_filter.cc
+++ b/src/core/lib/security/transport/server_auth_filter.cc
@@ -41,6 +41,9 @@ struct call_data {
grpc_transport_stream_op_batch* recv_initial_metadata_batch;
grpc_closure* original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
+ grpc_error* error;
+ grpc_closure recv_trailing_metadata_ready;
+ grpc_closure* original_recv_trailing_metadata_ready;
grpc_metadata_array md;
const grpc_metadata* consumed_md;
size_t num_consumed_md;
@@ -111,6 +114,7 @@ static void on_md_processing_done_inner(grpc_call_element* elem,
batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
}
+ calld->error = GRPC_ERROR_REF(error);
GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error);
}
@@ -184,6 +188,13 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
GRPC_ERROR_REF(error));
}
+static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
+}
+
static void auth_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
@@ -195,6 +206,12 @@ static void auth_start_transport_stream_op_batch(
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
+ if (batch->recv_trailing_metadata) {
+ calld->original_recv_trailing_metadata_ready =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
+ }
grpc_call_next_op(elem, batch);
}
@@ -208,6 +225,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
// Create server security context. Set its auth context from channel
// data and save it in the call context.
grpc_server_security_context* server_ctx =
@@ -227,7 +247,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* final_info,
- grpc_closure* ignored) {}
+ grpc_closure* ignored) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ GRPC_ERROR_UNREF(calld->error);
+}
/* Constructor for channel_data */
static grpc_error* init_channel_elem(grpc_channel_element* elem,
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index bff3ec379f..3e008e5606 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -72,46 +72,6 @@
// Used to create arena for the first call.
#define ESTIMATED_MDELEM_COUNT 16
-/* Status data for a request can come from several sources; this
- enumerates them all, and acts as a priority sorting for which
- status to return to the application - earlier entries override
- later ones */
-typedef enum {
- /* Status came from the application layer overriding whatever
- the wire says */
- STATUS_FROM_API_OVERRIDE = 0,
- /* Status came from 'the wire' - or somewhere below the surface
- layer */
- STATUS_FROM_WIRE,
- /* Status was created by some internal channel stack operation: must come via
- add_batch_error */
- STATUS_FROM_CORE,
- /* Status was created by some surface error */
- STATUS_FROM_SURFACE,
- /* Status came from the server sending status */
- STATUS_FROM_SERVER_STATUS,
- STATUS_SOURCE_COUNT
-} status_source;
-
-typedef struct {
- bool is_set;
- grpc_error* error;
-} received_status;
-
-static gpr_atm pack_received_status(received_status r) {
- return r.is_set ? (1 | (gpr_atm)r.error) : 0;
-}
-
-static received_status unpack_received_status(gpr_atm atm) {
- if ((atm & 1) == 0) {
- return {false, GRPC_ERROR_NONE};
- } else {
- return {true, (grpc_error*)(atm & ~static_cast<gpr_atm>(1))};
- }
-}
-
-#define MAX_ERRORS_PER_BATCH 4
-
typedef struct batch_control {
grpc_call* call;
/* Share memory for cq_completion and notify_tag as they are never needed
@@ -136,10 +96,7 @@ typedef struct batch_control {
grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
-
- grpc_error* errors[MAX_ERRORS_PER_BATCH];
- gpr_atm num_errors;
-
+ grpc_error* batch_error;
grpc_transport_stream_op_batch op;
} batch_control;
@@ -202,9 +159,6 @@ struct grpc_call {
// A char* indicating the peer name.
gpr_atm peer_string;
- /* Packed received call statuses from various sources */
- gpr_atm status[STATUS_SOURCE_COUNT];
-
/* Call data useful used for reporting. Only valid after the call has
* completed */
grpc_call_final_info final_info;
@@ -237,6 +191,7 @@ struct grpc_call {
grpc_closure receiving_initial_metadata_ready;
grpc_closure receiving_trailing_metadata_ready;
uint32_t test_only_last_message_flags;
+ gpr_atm cancelled;
grpc_closure release_call;
@@ -252,6 +207,7 @@ struct grpc_call {
grpc_server* server;
} server;
} final_op;
+ grpc_error* status_error;
/* recv_state can contain one of the following values:
RECV_NONE : : no initial metadata and messages received
@@ -289,23 +245,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression");
static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
grpc_closure* start_batch_closure);
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description);
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error);
+static void cancel_with_error(grpc_call* c, grpc_error* error);
static void destroy_call(void* call_stack, grpc_error* error);
static void receiving_slice_ready(void* bctlp, grpc_error* error);
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string);
-static void set_status_value_directly(grpc_status_code status, void* dest);
-static void set_status_from_error(grpc_call* call, status_source source,
- grpc_error* error);
+static void set_final_status(grpc_call* call, grpc_error* error);
static void process_data_after_md(batch_control* bctl);
static void post_batch_completion(batch_control* bctl);
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled);
static void add_init_error(grpc_error** composite, grpc_error* new_err) {
if (new_err == GRPC_ERROR_NONE) return;
@@ -356,6 +304,7 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
channel_stack->call_stack_size));
gpr_ref_init(&call->ext_ref, 1);
+ gpr_atm_no_barrier_store(&call->cancelled, 0);
call->arena = arena;
grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
@@ -465,10 +414,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
gpr_mu_unlock(&pc->child_list_mu);
}
if (error != GRPC_ERROR_NONE) {
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
if (immediately_cancel) {
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
}
if (args->cq != nullptr) {
GPR_ASSERT(args->pollset_set_alternative == nullptr &&
@@ -570,16 +519,13 @@ static void destroy_call(void* call, grpc_error* error) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
- get_final_status(c, set_status_value_directly, &c->final_info.final_status,
- nullptr, &(c->final_info.error_string));
+ grpc_error_get_status(c->status_error, c->send_deadline,
+ &c->final_info.final_status, nullptr, nullptr,
+ &(c->final_info.error_string));
+ GRPC_ERROR_UNREF(c->status_error);
c->final_info.stats.latency =
gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- GRPC_ERROR_UNREF(
- unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
- }
-
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
grpc_schedule_on_exec_ctx));
@@ -617,7 +563,7 @@ void grpc_call_unref(grpc_call* c) {
bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
gpr_atm_acq_load(&c->received_final_op_atm) == 0;
if (cancel) {
- cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+ cancel_with_error(c, GRPC_ERROR_CANCELLED);
} else {
// Unset the call combiner cancellation closure. This has the
// effect of scheduling the previously set cancellation closure, if
@@ -635,8 +581,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
grpc_core::ExecCtx exec_ctx;
- cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
-
+ cancel_with_error(call, GRPC_ERROR_CANCELLED);
return GRPC_CALL_OK;
}
@@ -690,8 +635,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
"c=%p, status=%d, description=%s, reserved=%p)",
4, (c, (int)status, description, reserved));
GPR_ASSERT(reserved == nullptr);
- cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description);
-
+ cancel_with_status(c, status, description);
return GRPC_CALL_OK;
}
@@ -711,15 +655,17 @@ static void done_termination(void* arg, grpc_error* error) {
gpr_free(state);
}
-static void cancel_with_error(grpc_call* c, status_source source,
- grpc_error* error) {
+static void cancel_with_error(grpc_call* c, grpc_error* error) {
+ if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
GRPC_CALL_INTERNAL_REF(c, "termination");
// Inform the call combiner of the cancellation, so that it can cancel
// any in-flight asynchronous actions that may be holding the call
// combiner. This ensures that the cancel_stream batch can be sent
// down the filter stack in a timely manner.
grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
- set_status_from_error(c, source, GRPC_ERROR_REF(error));
cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
state->call = c;
GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
@@ -742,90 +688,45 @@ static grpc_error* error_from_status(grpc_status_code status,
GRPC_ERROR_INT_GRPC_STATUS, status);
}
-static void cancel_with_status(grpc_call* c, status_source source,
- grpc_status_code status,
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
const char* description) {
- cancel_with_error(c, source, error_from_status(status, description));
-}
-
-/*******************************************************************************
- * FINAL STATUS CODE MANIPULATION
- */
-
-static bool get_final_status_from(
- grpc_call* call, grpc_error* error, bool allow_ok_status,
- void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string) {
- grpc_status_code code;
- grpc_slice slice = grpc_empty_slice();
- grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
- error_string);
- if (code == GRPC_STATUS_OK && !allow_ok_status) {
- return false;
- }
-
- set_value(code, set_value_user_data);
- if (details != nullptr) {
- *details = grpc_slice_ref_internal(slice);
- }
- return true;
+ cancel_with_error(c, error_from_status(status, description));
}
-static void get_final_status(
- grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
- void* set_value_user_data, grpc_slice* details, const char** error_string) {
- int i;
- received_status status[STATUS_SOURCE_COUNT];
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
- }
+static void set_final_status(grpc_call* call, grpc_error* error) {
if (grpc_call_error_trace.enabled()) {
- gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR");
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set) {
- gpr_log(GPR_INFO, " %d: %s", i, grpc_error_string(status[i].error));
- }
- }
+ gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
+ gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
}
- /* first search through ignoring "OK" statuses: if something went wrong,
- * ensure we report it */
- for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
- /* search for the best status we can present: ideally the error we use has a
- clearly defined grpc-status, and we'll prefer that. */
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set &&
- grpc_error_has_clear_grpc_status(status[i].error)) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
- return;
- }
+ if (call->is_client) {
+ grpc_error_get_status(error, call->send_deadline,
+ call->final_op.client.status,
+ call->final_op.client.status_details, nullptr,
+ call->final_op.client.error_string);
+ // explicitly take a ref
+ grpc_slice_ref_internal(*call->final_op.client.status_details);
+ call->status_error = error;
+ grpc_core::channelz::ChannelNode* channelz_channel =
+ grpc_channel_get_channelz_node(call->channel);
+ if (channelz_channel != nullptr) {
+ if (*call->final_op.client.status != GRPC_STATUS_OK) {
+ channelz_channel->RecordCallFailed();
+ } else {
+ channelz_channel->RecordCallSucceeded();
}
}
- /* If no clearly defined status exists, search for 'anything' */
- for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
- if (status[i].is_set) {
- if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
- set_value, set_value_user_data, details,
- error_string)) {
- return;
- }
+ } else {
+ *call->final_op.server.cancelled =
+ error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
+ grpc_core::channelz::ServerNode* channelz_server =
+ grpc_server_get_channelz_node(call->final_op.server.server);
+ if (channelz_server != nullptr) {
+ if (*call->final_op.server.cancelled) {
+ channelz_server->RecordCallFailed();
+ } else {
+ channelz_server->RecordCallSucceeded();
}
}
- }
- /* If nothing exists, set some default */
- if (call->is_client) {
- set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
- } else {
- set_value(GRPC_STATUS_OK, set_value_user_data);
- }
-}
-
-static void set_status_from_error(grpc_call* call, status_source source,
- grpc_error* error) {
- if (!gpr_atm_rel_cas(&call->status[source],
- pack_received_status({false, GRPC_ERROR_NONE}),
- pack_received_status({true, error}))) {
GRPC_ERROR_UNREF(error);
}
}
@@ -1044,6 +945,7 @@ static grpc_stream_compression_algorithm decode_stream_compression(
static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
int is_trailing) {
if (b->list.count == 0) return;
+ if (!call->is_client && is_trailing) return;
if (is_trailing && call->buffered_metadata[1] == nullptr) return;
GPR_TIMER_SCOPE("publish_app_metadata", 0);
grpc_metadata_array* dest;
@@ -1097,9 +999,12 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
publish_app_metadata(call, b, false);
}
-static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
+static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
+ grpc_error* batch_error) {
grpc_call* call = static_cast<grpc_call*>(args);
- if (b->idx.named.grpc_status != nullptr) {
+ if (batch_error != GRPC_ERROR_NONE) {
+ set_final_status(call, batch_error);
+ } else if (b->idx.named.grpc_status != nullptr) {
grpc_status_code status_code =
grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
grpc_error* error = GRPC_ERROR_NONE;
@@ -1117,8 +1022,18 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
- set_status_from_error(call, STATUS_FROM_WIRE, error);
+ set_final_status(call, GRPC_ERROR_REF(error));
grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
+ GRPC_ERROR_UNREF(error);
+ } else if (!call->is_client) {
+ set_final_status(call, GRPC_ERROR_NONE);
+ } else {
+ gpr_log(GPR_DEBUG,
+ "Received trailing metadata with no error and no status");
+ set_final_status(
+ call, grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
}
publish_app_metadata(call, b, true);
}
@@ -1133,14 +1048,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
* BATCH API IMPLEMENTATION
*/
-static void set_status_value_directly(grpc_status_code status, void* dest) {
- *static_cast<grpc_status_code*>(dest) = status;
-}
-
-static void set_cancelled_value(grpc_status_code status, void* dest) {
- *static_cast<int*>(dest) = (status != GRPC_STATUS_OK);
-}
-
static bool are_write_flags_valid(uint32_t flags) {
/* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
const uint32_t allowed_write_positions =
@@ -1208,31 +1115,15 @@ static void finish_batch_completion(void* user_data,
GRPC_CALL_INTERNAL_UNREF(call, "completion");
}
-static grpc_error* consolidate_batch_errors(batch_control* bctl) {
- size_t n = static_cast<size_t>(gpr_atm_acq_load(&bctl->num_errors));
- if (n == 0) {
- return GRPC_ERROR_NONE;
- } else if (n == 1) {
- /* Skip creating a composite error in the case that only one error was
- logged */
- grpc_error* e = bctl->errors[0];
- bctl->errors[0] = nullptr;
- return e;
- } else {
- grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Call batch failed", bctl->errors, n);
- for (size_t i = 0; i < n; i++) {
- GRPC_ERROR_UNREF(bctl->errors[i]);
- bctl->errors[i] = nullptr;
- }
- return error;
- }
+static void reset_batch_errors(batch_control* bctl) {
+ GRPC_ERROR_UNREF(bctl->batch_error);
+ bctl->batch_error = GRPC_ERROR_NONE;
}
static void post_batch_completion(batch_control* bctl) {
grpc_call* next_child_call;
grpc_call* call = bctl->call;
- grpc_error* error = consolidate_batch_errors(bctl);
+ grpc_error* error = GRPC_ERROR_REF(bctl->batch_error);
if (bctl->op.send_initial_metadata) {
grpc_metadata_batch_destroy(
@@ -1258,8 +1149,7 @@ static void post_batch_completion(batch_control* bctl) {
next_child_call = child->child->sibling_next;
if (child->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
- cancel_with_error(child, STATUS_FROM_API_OVERRIDE,
- GRPC_ERROR_CANCELLED);
+ cancel_with_error(child, GRPC_ERROR_CANCELLED);
GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
}
child = next_child_call;
@@ -1267,33 +1157,6 @@ static void post_batch_completion(batch_control* bctl) {
}
gpr_mu_unlock(&pc->child_list_mu);
}
- if (call->is_client) {
- get_final_status(call, set_status_value_directly,
- call->final_op.client.status,
- call->final_op.client.status_details,
- call->final_op.client.error_string);
- grpc_core::channelz::ChannelNode* channelz_channel =
- grpc_channel_get_channelz_node(call->channel);
- if (channelz_channel != nullptr) {
- if (*call->final_op.client.status != GRPC_STATUS_OK) {
- channelz_channel->RecordCallFailed();
- } else {
- channelz_channel->RecordCallSucceeded();
- }
- }
- } else {
- get_final_status(call, set_cancelled_value,
- call->final_op.server.cancelled, nullptr, nullptr);
- grpc_core::channelz::ServerNode* channelz_server =
- grpc_server_get_channelz_node(call->final_op.server.server);
- if (channelz_server != nullptr) {
- if (*call->final_op.server.cancelled) {
- channelz_server->RecordCallFailed();
- } else {
- channelz_server->RecordCallSucceeded();
- }
- }
- }
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
@@ -1302,9 +1165,10 @@ static void post_batch_completion(batch_control* bctl) {
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
}
+ reset_batch_errors(bctl);
if (bctl->completion_data.notify_tag.is_closure) {
- /* unrefs bctl->error */
+ /* unrefs error */
bctl->call = nullptr;
/* This closure may be meant to be run within some combiner. Since we aren't
* running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
@@ -1314,7 +1178,7 @@ static void post_batch_completion(batch_control* bctl) {
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
- /* unrefs bctl->error */
+ /* unrefs error */
grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
finish_batch_completion, bctl,
&bctl->completion_data.cq_completion);
@@ -1423,8 +1287,10 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
call->receiving_stream.reset();
- add_batch_error(bctl, GRPC_ERROR_REF(error), true);
- cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+ if (bctl->batch_error == GRPC_ERROR_NONE) {
+ bctl->batch_error = GRPC_ERROR_REF(error);
+ }
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
/* If recv_state is RECV_NONE, we will save the batch_control
* object with rel_cas, and will not use it after the cas. Its corresponding
@@ -1442,7 +1308,7 @@ static void receiving_stream_ready_in_call_combiner(void* bctlp,
grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_recv_message_ready");
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
receiving_stream_ready(bctlp, error);
}
@@ -1460,8 +1326,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else if (
grpc_compression_algorithm_from_message_stream_compression_algorithm(
@@ -1473,8 +1338,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
"compression (%d).",
call->incoming_stream_compression_algorithm,
call->incoming_message_compression_algorithm);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
gpr_free(error_msg);
} else {
char* error_msg = nullptr;
@@ -1484,8 +1348,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
compression_algorithm);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, compression_algorithm) == 0) {
/* check if algorithm is supported by current channel config */
@@ -1494,8 +1357,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
- cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
- error_msg);
+ cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
}
gpr_free(error_msg);
@@ -1513,24 +1375,12 @@ static void validate_filtered_metadata(batch_control* bctl) {
}
}
-static void add_batch_error(batch_control* bctl, grpc_error* error,
- bool has_cancelled) {
- if (error == GRPC_ERROR_NONE) return;
- int idx = static_cast<int>(gpr_atm_full_fetch_add(&bctl->num_errors, 1));
- if (idx == 0 && !has_cancelled) {
- cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error));
- }
- bctl->errors[idx] = error;
-}
-
static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner,
- "call_recv_initial_metadata_ready");
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1543,6 +1393,11 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
call->send_deadline = md->deadline;
}
+ } else {
+ if (bctl->batch_error == GRPC_ERROR_NONE) {
+ bctl->batch_error = GRPC_ERROR_REF(error);
+ }
+ cancel_with_error(call, GRPC_ERROR_REF(error));
}
grpc_closure* saved_rsr_closure = nullptr;
@@ -1579,20 +1434,23 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner,
- "call_recv_trailing_metadata_ready");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
- recv_trailing_filter(call, md);
+ recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
finish_batch_step(bctl);
}
static void finish_batch(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_on_complete");
- add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
+ if (bctl->batch_error == GRPC_ERROR_NONE) {
+ bctl->batch_error = GRPC_ERROR_REF(error);
+ }
+ if (error != GRPC_ERROR_NONE) {
+ cancel_with_error(call, GRPC_ERROR_REF(error));
+ }
finish_batch_step(bctl);
}
@@ -1794,28 +1652,32 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
call->channel, op->data.send_status_from_server.status);
- {
- grpc_error* override_error = GRPC_ERROR_NONE;
- if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
- override_error =
- error_from_status(op->data.send_status_from_server.status,
- "Returned non-ok status");
- }
- if (op->data.send_status_from_server.status_details != nullptr) {
- call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
- GRPC_MDSTR_GRPC_MESSAGE,
- grpc_slice_ref_internal(
- *op->data.send_status_from_server.status_details));
- call->send_extra_metadata_count++;
+ grpc_error* status_error =
+ op->data.send_status_from_server.status == GRPC_STATUS_OK
+ ? GRPC_ERROR_NONE
+ : grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Server returned error"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ static_cast<intptr_t>(
+ op->data.send_status_from_server.status));
+ if (op->data.send_status_from_server.status_details != nullptr) {
+ call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
+ GRPC_MDSTR_GRPC_MESSAGE,
+ grpc_slice_ref_internal(
+ *op->data.send_status_from_server.status_details));
+ call->send_extra_metadata_count++;
+ if (status_error != GRPC_ERROR_NONE) {
char* msg = grpc_slice_to_c_string(
GRPC_MDVALUE(call->send_extra_metadata[1].md));
- override_error =
- grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
+ status_error =
+ grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
- set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error);
}
+
+ call->status_error = status_error;
if (!prepare_application_metadata(
call,
static_cast<int>(
diff --git a/src/core/lib/surface/channel.cc b/src/core/lib/surface/channel.cc
index 52056b8671..054fe105c3 100644
--- a/src/core/lib/surface/channel.cc
+++ b/src/core/lib/surface/channel.cc
@@ -100,7 +100,6 @@ grpc_channel* grpc_channel_create_with_builder(
return channel;
}
- memset(channel, 0, sizeof(*channel));
channel->target = target;
channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
size_t channel_tracer_max_nodes = 0; // default to off
diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc
index 8e9d8ec98c..c0fae0f140 100644
--- a/src/core/lib/surface/server.cc
+++ b/src/core/lib/surface/server.cc
@@ -149,6 +149,9 @@ struct call_data {
grpc_closure server_on_recv_initial_metadata;
grpc_closure kill_zombie_closure;
grpc_closure* on_done_recv_initial_metadata;
+ grpc_closure recv_trailing_metadata_ready;
+ grpc_error* error;
+ grpc_closure* original_recv_trailing_metadata_ready;
grpc_closure publish;
@@ -733,6 +736,14 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
}
+static void server_recv_trailing_metadata_ready(void* user_data,
+ grpc_error* err) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
+}
+
static void server_mutate_op(grpc_call_element* elem,
grpc_transport_stream_op_batch* op) {
call_data* calld = static_cast<call_data*>(elem->call_data);
@@ -748,6 +759,12 @@ static void server_mutate_op(grpc_call_element* elem,
op->payload->recv_initial_metadata.recv_flags =
&calld->recv_initial_metadata_flags;
}
+ if (op->recv_trailing_metadata) {
+ calld->original_recv_trailing_metadata_ready =
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready;
+ }
}
static void server_start_transport_stream_op_batch(
@@ -832,7 +849,9 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
server_on_recv_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
-
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+ server_recv_trailing_metadata_ready, elem,
+ grpc_schedule_on_exec_ctx);
server_ref(chand->server);
return GRPC_ERROR_NONE;
}
@@ -844,7 +863,7 @@ static void destroy_call_elem(grpc_call_element* elem,
call_data* calld = static_cast<call_data*>(elem->call_data);
GPR_ASSERT(calld->state != PENDING);
-
+ GRPC_ERROR_UNREF(calld->error);
if (calld->host_set) {
grpc_slice_unref_internal(calld->host);
}
diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
index b5268add0d..17e8026096 100644
--- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
+++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
@@ -24,6 +24,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/alts/handshaker/alts_handshaker_service_api.h"
const int kHandshakerClientOpNum = 4;
@@ -109,7 +110,7 @@ static grpc_byte_buffer* get_serialized_start_client(alts_tsi_event* event) {
if (ok) {
buffer = grpc_raw_byte_buffer_create(&slice, 1 /* number of slices */);
}
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
gpr_free(target_name);
grpc_gcp_handshaker_req_destroy(req);
return buffer;
@@ -157,7 +158,7 @@ static grpc_byte_buffer* get_serialized_start_server(
if (ok) {
buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */);
}
- grpc_slice_unref(req_slice);
+ grpc_slice_unref_internal(req_slice);
grpc_gcp_handshaker_req_destroy(req);
return buffer;
}
@@ -195,7 +196,7 @@ static grpc_byte_buffer* get_serialized_next(grpc_slice* bytes_received) {
if (ok) {
buffer = grpc_raw_byte_buffer_create(&req_slice, 1 /* number of slices */);
}
- grpc_slice_unref(req_slice);
+ grpc_slice_unref_internal(req_slice);
grpc_gcp_handshaker_req_destroy(req);
return buffer;
}
@@ -258,7 +259,7 @@ alts_handshaker_client* alts_grpc_handshaker_client_create(
grpc_slice_from_static_string(ALTS_SERVICE_METHOD), &slice,
gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
client->base.vtable = &vtable;
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
return &client->base;
}
diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
index e0e4184686..d63d3538c5 100644
--- a/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
+++ b/src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.cc
@@ -20,6 +20,8 @@
#include "src/core/tsi/alts/handshaker/alts_handshaker_service_api_util.h"
+#include "src/core/lib/slice/slice_internal.h"
+
void add_repeated_field(repeated_field** head, const void* data) {
repeated_field* field =
static_cast<repeated_field*>(gpr_zalloc(sizeof(*field)));
@@ -67,7 +69,7 @@ grpc_slice* create_slice(const char* data, size_t size) {
void destroy_slice(grpc_slice* slice) {
if (slice != nullptr) {
- grpc_slice_unref(*slice);
+ grpc_slice_unref_internal(*slice);
gpr_free(slice);
}
}
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_event.cc b/src/core/tsi/alts/handshaker/alts_tsi_event.cc
index ec0bf12b95..cb36d5ebd1 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_event.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_event.cc
@@ -24,6 +24,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include "src/core/lib/slice/slice_internal.h"
+
tsi_result alts_tsi_event_create(alts_tsi_handshaker* handshaker,
tsi_handshaker_on_next_done_cb cb,
void* user_data,
@@ -66,8 +68,8 @@ void alts_tsi_event_destroy(alts_tsi_event* event) {
grpc_byte_buffer_destroy(event->recv_buffer);
grpc_metadata_array_destroy(&event->initial_metadata);
grpc_metadata_array_destroy(&event->trailing_metadata);
- grpc_slice_unref(event->details);
- grpc_slice_unref(event->target_name);
+ grpc_slice_unref_internal(event->details);
+ grpc_slice_unref_internal(event->target_name);
grpc_alts_credentials_options_destroy(event->options);
gpr_free(event);
}
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
index 1df1021bb1..34608a3de1 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_handshaker.cc
@@ -31,6 +31,7 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gprpp/thd.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/alts/frame_protector/alts_frame_protector.h"
#include "src/core/tsi/alts/handshaker/alts_handshaker_client.h"
#include "src/core/tsi/alts/handshaker/alts_tsi_utils.h"
@@ -182,7 +183,7 @@ static void handshaker_result_destroy(tsi_handshaker_result* self) {
gpr_free(result->peer_identity);
gpr_free(result->key_data);
gpr_free(result->unused_bytes);
- grpc_slice_unref(result->rpc_versions);
+ grpc_slice_unref_internal(result->rpc_versions);
gpr_free(result);
}
@@ -269,12 +270,12 @@ static tsi_result handshaker_next(
handshaker->has_sent_start_message = true;
} else {
if (!GRPC_SLICE_IS_EMPTY(handshaker->recv_bytes)) {
- grpc_slice_unref(handshaker->recv_bytes);
+ grpc_slice_unref_internal(handshaker->recv_bytes);
}
handshaker->recv_bytes = grpc_slice_ref(slice);
ok = alts_handshaker_client_next(handshaker->client, event, &slice);
}
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
if (ok != TSI_OK) {
gpr_log(GPR_ERROR, "Failed to schedule ALTS handshaker requests");
return ok;
@@ -299,8 +300,8 @@ static void handshaker_destroy(tsi_handshaker* self) {
alts_tsi_handshaker* handshaker =
reinterpret_cast<alts_tsi_handshaker*>(self);
alts_handshaker_client_destroy(handshaker->client);
- grpc_slice_unref(handshaker->recv_bytes);
- grpc_slice_unref(handshaker->target_name);
+ grpc_slice_unref_internal(handshaker->recv_bytes);
+ grpc_slice_unref_internal(handshaker->target_name);
grpc_alts_credentials_options_destroy(handshaker->options);
gpr_free(handshaker->buffer);
gpr_free(handshaker);
diff --git a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
index d9b5e6c945..1747f1ad04 100644
--- a/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
+++ b/src/core/tsi/alts/handshaker/alts_tsi_utils.cc
@@ -22,6 +22,8 @@
#include <grpc/byte_buffer_reader.h>
+#include "src/core/lib/slice/slice_internal.h"
+
tsi_result alts_tsi_utils_convert_to_tsi_result(grpc_status_code code) {
switch (code) {
case GRPC_STATUS_OK:
@@ -47,7 +49,7 @@ grpc_gcp_handshaker_resp* alts_tsi_utils_deserialize_response(
grpc_slice slice = grpc_byte_buffer_reader_readall(&bbr);
grpc_gcp_handshaker_resp* resp = grpc_gcp_handshaker_resp_create();
bool ok = grpc_gcp_handshaker_resp_decode(slice, resp);
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
grpc_byte_buffer_reader_destroy(&bbr);
if (!ok) {
grpc_gcp_handshaker_resp_destroy(resp);
diff --git a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
index d4fd88d1e2..e7890903d5 100644
--- a/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
+++ b/src/core/tsi/alts/zero_copy_frame_protector/alts_grpc_privacy_integrity_record_protocol.cc
@@ -61,7 +61,7 @@ static tsi_result alts_grpc_privacy_integrity_protect(
if (status != GRPC_STATUS_OK) {
gpr_log(GPR_ERROR, "Failed to protect, %s", error_details);
gpr_free(error_details);
- grpc_slice_unref(protected_slice);
+ grpc_slice_unref_internal(protected_slice);
return TSI_INTERNAL_ERROR;
}
grpc_slice_buffer_add(protected_slices, protected_slice);
@@ -106,7 +106,7 @@ static tsi_result alts_grpc_privacy_integrity_unprotect(
if (status != GRPC_STATUS_OK) {
gpr_log(GPR_ERROR, "Failed to unprotect, %s", error_details);
gpr_free(error_details);
- grpc_slice_unref(unprotected_slice);
+ grpc_slice_unref_internal(unprotected_slice);
return TSI_INTERNAL_ERROR;
}
grpc_slice_buffer_reset_and_unref_internal(&rp->header_sb);
diff --git a/src/core/tsi/alts_transport_security.cc b/src/core/tsi/alts_transport_security.cc
index 2fd408103b..dac23bbf7a 100644
--- a/src/core/tsi/alts_transport_security.cc
+++ b/src/core/tsi/alts_transport_security.cc
@@ -45,7 +45,9 @@ void grpc_tsi_alts_signal_for_cq_destroy() {
}
void grpc_tsi_alts_init() {
- memset(&g_alts_resource, 0, sizeof(alts_shared_resource));
+ g_alts_resource.channel = nullptr;
+ g_alts_resource.cq = nullptr;
+ g_alts_resource.is_cq_drained = false;
gpr_mu_init(&g_alts_resource.mu);
gpr_cv_init(&g_alts_resource.cv);
}
diff --git a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
index ce74fde343..f9184bcc34 100644
--- a/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
+++ b/src/core/tsi/ssl/session_cache/ssl_session_cache.cc
@@ -19,6 +19,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/mutex_lock.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/tsi/ssl/session_cache/ssl_session.h"
#include "src/core/tsi/ssl/session_cache/ssl_session_cache.h"
@@ -53,7 +54,7 @@ class SslSessionLRUCache::Node {
SetSession(std::move(session));
}
- ~Node() { grpc_slice_unref(key_); }
+ ~Node() { grpc_slice_unref_internal(key_); }
// Not copyable nor movable.
Node(const Node&) = delete;