aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-08-02 20:55:00 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-08-03 11:13:52 -0400
commitfde951db9c6f87597faa7ae1b3d0f521ff1adb51 (patch)
tree5d5e72bfe5cab16b506ab40210b0b08ecbc49a2a /src/core/ext/filters/client_channel/client_channel.cc
parentb35328f0127fda3f13b41ae91c885dc4be2dfb02 (diff)
Intecept recv_trailing in client_channel for channelz
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc87
1 files changed, 78 insertions, 9 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index f4d5596270..c3669189f4 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -923,6 +923,10 @@ typedef struct client_channel_call_data {
grpc_closure pick_closure;
grpc_closure pick_cancel_closure;
+ grpc_closure recv_trailing_metadata_ready_channelz;
+ grpc_closure* original_recv_trailing_metadata;
+ // metadata_batch recv_trailing_metadata_channelz;
+
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@@ -984,6 +988,14 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored);
+template <typename Predicate>
+static pending_batch* pending_batch_find(grpc_call_element* elem,
+ const char* log_message,
+ Predicate predicate);
+static void get_call_status(grpc_call_element* elem,
+ grpc_metadata_batch* md_batch, grpc_error* error,
+ grpc_status_code* status,
+ grpc_mdelem** server_pushback_md);
//
// send op data caching
@@ -1258,6 +1270,59 @@ static void resume_pending_batch_in_call_combiner(void* arg,
grpc_subchannel_call_process_op(subchannel_call, batch);
}
+static void recv_trailing_metadata_ready_channelz(void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
+ "error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ // find the right pending batch.
+ pending_batch* pending = pending_batch_find(
+ elem, "invoking recv_trailing_metadata_channelz for",
+ [](grpc_transport_stream_op_batch* batch) {
+ return batch->recv_trailing_metadata &&
+ batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata_ready != nullptr;
+ });
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_metadata_batch* md_batch =
+ pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ calld->pick.connected_subchannel->channelz_subchannel();
+ GPR_ASSERT(channelz_subchannel != nullptr);
+ if (status == GRPC_STATUS_OK) {
+ channelz_subchannel->RecordCallSucceeded();
+ } else {
+ channelz_subchannel->RecordCallFailed();
+ }
+ pending->batch = nullptr;
+ GRPC_CLOSURE_SCHED(calld->original_recv_trailing_metadata, error);
+}
+
+static bool maybe_intercept_recv_trailing_for_channelz(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // only add interceptor is channelz is enabled.
+ if (calld->pick.connected_subchannel->channelz_subchannel() != nullptr) {
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
+ recv_trailing_metadata_ready_channelz, elem,
+ grpc_schedule_on_exec_ctx);
+ calld->original_recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready_channelz;
+ return true;
+ } else {
+ return false;
+ }
+}
+
// This is called via the call combiner, so access to calld is synchronized.
static void pending_batches_resume(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@@ -1282,13 +1347,17 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
+ bool intercepted =
+ maybe_intercept_recv_trailing_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
"pending_batches_resume");
- pending_batch_clear(calld, pending);
+ if (!intercepted) {
+ pending_batch_clear(calld, pending);
+ }
}
}
// Note: This will release the call combiner.
@@ -1768,22 +1837,20 @@ static void recv_message_ready(void* arg, grpc_error* error) {
//
// Sets *status and *server_pushback_md based on batch_data and error.
-static void get_call_status(subchannel_batch_data* batch_data,
- grpc_error* error, grpc_status_code* status,
+static void get_call_status(grpc_call_element* elem,
+ grpc_metadata_batch* md_batch, grpc_error* error,
+ grpc_status_code* status,
grpc_mdelem** server_pushback_md) {
- grpc_call_element* elem = batch_data->elem;
call_data* calld = static_cast<call_data*>(elem->call_data);
if (error != GRPC_ERROR_NONE) {
grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
nullptr);
} else {
- grpc_metadata_batch* md_batch =
- batch_data->batch.payload->recv_trailing_metadata
- .recv_trailing_metadata;
GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
*status =
grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
- if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
+ if (server_pushback_md != nullptr &&
+ md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
*server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
}
}
@@ -1956,7 +2023,9 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
grpc_mdelem* server_pushback_md = nullptr;
- get_call_status(batch_data, GRPC_ERROR_REF(error), &status,
+ grpc_metadata_batch* md_batch =
+ batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
+ get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
&server_pushback_md);
grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel();