aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-10-10 16:56:07 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-10-10 17:00:44 -0700
commit17b6c04b8910dca993a52c24794dd78b26410068 (patch)
tree81aeb89566b8ed46d72f2ed490f67cf57665b17e /src/core/ext/filters/client_channel
parent73c99f890a42479bc82e2c0ab132a97cbcf8434a (diff)
Intercept trailing in subchannel, not client_channel
Diffstat (limited to 'src/core/ext/filters/client_channel')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc80
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc70
2 files changed, 70 insertions, 80 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 35c8bb27ee..bb3ea400d1 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -933,11 +933,6 @@ typedef struct client_channel_call_data {
grpc_closure pick_closure;
grpc_closure pick_cancel_closure;
- // state needed to support channelz interception of recv trailing metadata.
- grpc_closure recv_trailing_metadata_ready_channelz;
- grpc_closure* original_recv_trailing_metadata;
- grpc_metadata_batch* recv_trailing_metadata;
-
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@@ -999,8 +994,6 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored);
-static void maybe_intercept_recv_trailing_metadata_for_channelz(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
//
// send op data caching
@@ -1299,7 +1292,6 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
@@ -1977,15 +1969,6 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_md);
- grpc_core::channelz::SubchannelNode* channelz_subchannel =
- calld->pick.connected_subchannel->channelz_subchannel();
- if (channelz_subchannel != nullptr) {
- if (status == GRPC_STATUS_OK) {
- channelz_subchannel->RecordCallSucceeded();
- } else {
- channelz_subchannel->RecordCallFailed();
- }
- }
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
calld, grpc_status_code_to_string(status));
@@ -2590,69 +2573,6 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
}
//
-// Channelz
-//
-
-static void recv_trailing_metadata_ready_channelz(void* arg,
- grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
- "error=%s",
- chand, calld, grpc_error_string(error));
- }
- GPR_ASSERT(calld->recv_trailing_metadata != nullptr);
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_metadata_batch* md_batch = calld->recv_trailing_metadata;
- get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
- grpc_core::channelz::SubchannelNode* channelz_subchannel =
- calld->pick.connected_subchannel->channelz_subchannel();
- GPR_ASSERT(channelz_subchannel != nullptr);
- if (status == GRPC_STATUS_OK) {
- channelz_subchannel->RecordCallSucceeded();
- } else {
- channelz_subchannel->RecordCallFailed();
- }
- calld->recv_trailing_metadata = nullptr;
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
-}
-
-// If channelz is enabled, intercept recv_trailing so that we may check the
-// status and associate it to a subchannel.
-// Returns true if callback was intercepted, false otherwise.
-static void maybe_intercept_recv_trailing_metadata_for_channelz(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- // only intercept payloads with recv trailing.
- if (!batch->recv_trailing_metadata) {
- return;
- }
- // only add interceptor is channelz is enabled.
- if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
- return;
- }
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
- batch);
- }
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
- recv_trailing_metadata_ready_channelz, elem,
- grpc_schedule_on_exec_ctx);
- // save some state needed for the interception callback.
- GPR_ASSERT(calld->recv_trailing_metadata == nullptr);
- calld->recv_trailing_metadata =
- batch->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->original_recv_trailing_metadata =
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &calld->recv_trailing_metadata_ready_channelz;
-}
-
-//
// LB pick
//
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 4a668b0fa7..38d7a5a6e3 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -49,6 +49,8 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/status_metadata.h"
#define INTERNAL_REF_BITS 16
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
@@ -144,6 +146,11 @@ struct grpc_subchannel {
struct grpc_subchannel_call {
grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
+ // state needed to support channelz interception of recv trailing metadata.
+ grpc_closure recv_trailing_metadata_ready_channelz;
+ grpc_closure* original_recv_trailing_metadata;
+ grpc_metadata_batch* recv_trailing_metadata;
+ grpc_millis deadline;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
@@ -745,12 +752,74 @@ void grpc_subchannel_call_unref(
GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
+// Sets *status and *server_pushback_md based on md_batch and error.
+// Only sets *server_pushback_md if server_pushback_md != nullptr.
+static void get_call_status(grpc_subchannel_call* call,
+ grpc_metadata_batch* md_batch, grpc_error* error,
+ grpc_status_code* status) {
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, call->deadline, status, nullptr, nullptr,
+ nullptr);
+ } else {
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ *status =
+ grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void recv_trailing_metadata_ready_channelz(void* arg,
+ grpc_error* error) {
+ grpc_subchannel_call* call = static_cast<grpc_subchannel_call*>(arg);
+ GPR_ASSERT(call->recv_trailing_metadata != nullptr);
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_metadata_batch* md_batch = call->recv_trailing_metadata;
+ get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status);
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ call->connection->channelz_subchannel();
+ GPR_ASSERT(channelz_subchannel != nullptr);
+ if (status == GRPC_STATUS_OK) {
+ channelz_subchannel->RecordCallSucceeded();
+ } else {
+ channelz_subchannel->RecordCallFailed();
+ }
+ call->recv_trailing_metadata = nullptr;
+ GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata,
+ GRPC_ERROR_REF(error));
+}
+
+// If channelz is enabled, intercept recv_trailing so that we may check the
+// status and associate it to a subchannel.
+static void maybe_intercept_recv_trailing_metadata_for_channelz(
+ grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) {
+ // only intercept payloads with recv trailing.
+ if (!batch->recv_trailing_metadata) {
+ return;
+ }
+ // only add interceptor is channelz is enabled.
+ if (call->connection->channelz_subchannel() == nullptr) {
+ return;
+ }
+ GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready_channelz,
+ recv_trailing_metadata_ready_channelz, call,
+ grpc_schedule_on_exec_ctx);
+ // save some state needed for the interception callback.
+ GPR_ASSERT(call->recv_trailing_metadata == nullptr);
+ call->recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ call->original_recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->recv_trailing_metadata_ready_channelz;
+}
+
void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
grpc_transport_stream_op_batch* batch) {
GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0);
grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
+ maybe_intercept_recv_trailing_metadata_for_channelz(call, batch);
top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
}
@@ -872,6 +941,7 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
Ref(DEBUG_LOCATION, "subchannel_call");
connection.release(); // Ref is passed to the grpc_subchannel_call object.
(*call)->connection = this;
+ (*call)->deadline = args.deadline;
const grpc_call_element_args call_args = {
callstk, /* call_stack */
nullptr, /* server_transport_data */