aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/subchannel.cc
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-10-10 16:56:07 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-10-10 17:00:44 -0700
commit17b6c04b8910dca993a52c24794dd78b26410068 (patch)
tree81aeb89566b8ed46d72f2ed490f67cf57665b17e /src/core/ext/filters/client_channel/subchannel.cc
parent73c99f890a42479bc82e2c0ab132a97cbcf8434a (diff)
Intercept trailing in subchannel, not client_channel
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc70
1 files changed, 70 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 4a668b0fa7..38d7a5a6e3 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -49,6 +49,8 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
+#include "src/core/lib/transport/error_utils.h"
+#include "src/core/lib/transport/status_metadata.h"
#define INTERNAL_REF_BITS 16
#define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
@@ -144,6 +146,11 @@ struct grpc_subchannel {
struct grpc_subchannel_call {
grpc_core::ConnectedSubchannel* connection;
grpc_closure* schedule_closure_after_destroy;
+ // state needed to support channelz interception of recv trailing metadata.
+ grpc_closure recv_trailing_metadata_ready_channelz;
+ grpc_closure* original_recv_trailing_metadata;
+ grpc_metadata_batch* recv_trailing_metadata;
+ grpc_millis deadline;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
@@ -745,12 +752,74 @@ void grpc_subchannel_call_unref(
GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
}
+// Sets *status and *server_pushback_md based on md_batch and error.
+// Only sets *server_pushback_md if server_pushback_md != nullptr.
+static void get_call_status(grpc_subchannel_call* call,
+ grpc_metadata_batch* md_batch, grpc_error* error,
+ grpc_status_code* status) {
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error_get_status(error, call->deadline, status, nullptr, nullptr,
+ nullptr);
+ } else {
+ GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
+ *status =
+ grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
+ }
+ GRPC_ERROR_UNREF(error);
+}
+
+static void recv_trailing_metadata_ready_channelz(void* arg,
+ grpc_error* error) {
+ grpc_subchannel_call* call = static_cast<grpc_subchannel_call*>(arg);
+ GPR_ASSERT(call->recv_trailing_metadata != nullptr);
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_metadata_batch* md_batch = call->recv_trailing_metadata;
+ get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status);
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ call->connection->channelz_subchannel();
+ GPR_ASSERT(channelz_subchannel != nullptr);
+ if (status == GRPC_STATUS_OK) {
+ channelz_subchannel->RecordCallSucceeded();
+ } else {
+ channelz_subchannel->RecordCallFailed();
+ }
+ call->recv_trailing_metadata = nullptr;
+ GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata,
+ GRPC_ERROR_REF(error));
+}
+
+// If channelz is enabled, intercept recv_trailing so that we may check the
+// status and associate it to a subchannel.
+static void maybe_intercept_recv_trailing_metadata_for_channelz(
+ grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) {
+ // only intercept payloads with recv trailing.
+ if (!batch->recv_trailing_metadata) {
+ return;
+ }
+ // only add interceptor is channelz is enabled.
+ if (call->connection->channelz_subchannel() == nullptr) {
+ return;
+ }
+ GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready_channelz,
+ recv_trailing_metadata_ready_channelz, call,
+ grpc_schedule_on_exec_ctx);
+ // save some state needed for the interception callback.
+ GPR_ASSERT(call->recv_trailing_metadata == nullptr);
+ call->recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ call->original_recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &call->recv_trailing_metadata_ready_channelz;
+}
+
void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
grpc_transport_stream_op_batch* batch) {
GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0);
grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
+ maybe_intercept_recv_trailing_metadata_for_channelz(call, batch);
top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
}
@@ -872,6 +941,7 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
Ref(DEBUG_LOCATION, "subchannel_call");
connection.release(); // Ref is passed to the grpc_subchannel_call object.
(*call)->connection = this;
+ (*call)->deadline = args.deadline;
const grpc_call_element_args call_args = {
callstk, /* call_stack */
nullptr, /* server_transport_data */