diff options
author | ncteisen <ncteisen@gmail.com> | 2018-10-10 16:56:07 -0700 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2018-10-10 17:00:44 -0700 |
commit | 17b6c04b8910dca993a52c24794dd78b26410068 (patch) | |
tree | 81aeb89566b8ed46d72f2ed490f67cf57665b17e /src/core/ext/filters/client_channel/subchannel.cc | |
parent | 73c99f890a42479bc82e2c0ab132a97cbcf8434a (diff) |
Intercept trailing in subchannel, not client_channel
Diffstat (limited to 'src/core/ext/filters/client_channel/subchannel.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 4a668b0fa7..38d7a5a6e3 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -49,6 +49,8 @@ #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/status_metadata.h" #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) @@ -144,6 +146,11 @@ struct grpc_subchannel { struct grpc_subchannel_call { grpc_core::ConnectedSubchannel* connection; grpc_closure* schedule_closure_after_destroy; + // state needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready_channelz; + grpc_closure* original_recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata; + grpc_millis deadline; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ @@ -745,12 +752,74 @@ void grpc_subchannel_call_unref( GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } +// Sets *status and *server_pushback_md based on md_batch and error. +// Only sets *server_pushback_md if server_pushback_md != nullptr. +static void get_call_status(grpc_subchannel_call* call, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status) { + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, call->deadline, status, nullptr, nullptr, + nullptr); + } else { + GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr); + *status = + grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md); + } + GRPC_ERROR_UNREF(error); +} + +static void recv_trailing_metadata_ready_channelz(void* arg, + grpc_error* error) { + grpc_subchannel_call* call = static_cast<grpc_subchannel_call*>(arg); + GPR_ASSERT(call->recv_trailing_metadata != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = call->recv_trailing_metadata; + get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + call->connection->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); + } + call->recv_trailing_metadata = nullptr; + GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata, + GRPC_ERROR_REF(error)); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +static void maybe_intercept_recv_trailing_metadata_for_channelz( + grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (call->connection->channelz_subchannel() == nullptr) { + return; + } + GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready_channelz, + recv_trailing_metadata_ready_channelz, call, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(call->recv_trailing_metadata == nullptr); + call->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + call->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->recv_trailing_metadata_ready_channelz; +} + void grpc_subchannel_call_process_op(grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0); grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); + maybe_intercept_recv_trailing_metadata_for_channelz(call, batch); top_elem->filter->start_transport_stream_op_batch(top_elem, batch); } @@ -872,6 +941,7 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, Ref(DEBUG_LOCATION, "subchannel_call"); connection.release(); // Ref is passed to the grpc_subchannel_call object. (*call)->connection = this; + (*call)->deadline = args.deadline; const grpc_call_element_args call_args = { callstk, /* call_stack */ nullptr, /* server_transport_data */ |