aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/filters/client_channel/client_channel.cc
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-08-13 10:30:51 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-08-13 10:30:51 -0700
commite0ae6c73ec97c5e62c82c99a603ac13730b23cff (patch)
tree4cc45f3ea98c04a438ed6048b6efc98c49ddf837 /src/core/ext/filters/client_channel/client_channel.cc
parentccf04c45112ac43b880b7a28cb7c789fa47284fa (diff)
Fix bug with proxy end2end test
Diffstat (limited to 'src/core/ext/filters/client_channel/client_channel.cc')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc58
1 files changed, 30 insertions, 28 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 8b4f1b604c..d3a4c49821 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -934,8 +934,10 @@ typedef struct client_channel_call_data {
grpc_closure pick_closure;
grpc_closure pick_cancel_closure;
+ // state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_channelz;
grpc_closure* original_recv_trailing_metadata;
+ grpc_transport_stream_op_batch* recv_trailing_metadata_batch;
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@@ -1291,17 +1293,11 @@ static void recv_trailing_metadata_ready_channelz(void* arg,
"error=%s",
chand, calld, grpc_error_string(error));
}
- // find the right pending batch.
- pending_batch* pending = pending_batch_find(
- elem, "invoking recv_trailing_metadata_channelz for",
- [](grpc_transport_stream_op_batch* batch) {
- return batch->recv_trailing_metadata &&
- batch->payload->recv_trailing_metadata
- .recv_trailing_metadata_ready != nullptr;
- });
+ GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
grpc_status_code status = GRPC_STATUS_OK;
grpc_metadata_batch* md_batch =
- pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata
+ .recv_trailing_metadata;
get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
grpc_core::channelz::SubchannelNode* channelz_subchannel =
calld->pick.connected_subchannel->channelz_subchannel();
@@ -1311,29 +1307,39 @@ static void recv_trailing_metadata_ready_channelz(void* arg,
} else {
channelz_subchannel->RecordCallFailed();
}
- pending->batch = nullptr;
+ calld->recv_trailing_metadata_batch = nullptr;
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
}
// If channelz is enabled, intercept recv_trailing so that we may check the
// status and associate it to a subchannel.
// Returns true if callback was intercepted, false otherwise.
-static bool maybe_intercept_recv_trailing_for_channelz(
+static void maybe_intercept_recv_trailing_for_channelz(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
+ // only intercept payloads with recv trailing.
+ if (!batch->recv_trailing_metadata) {
+ return;
+ }
// only add interceptor is channelz is enabled.
- if (calld->pick.connected_subchannel->channelz_subchannel() != nullptr) {
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
- recv_trailing_metadata_ready_channelz, elem,
- grpc_schedule_on_exec_ctx);
- calld->original_recv_trailing_metadata =
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &calld->recv_trailing_metadata_ready_channelz;
- return true;
- } else {
- return false;
+ if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
+ return;
+ }
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
+ batch);
}
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
+ recv_trailing_metadata_ready_channelz, elem,
+ grpc_schedule_on_exec_ctx);
+ // save some state needed for the interception callback.
+ GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
+ calld->recv_trailing_metadata_batch = batch;
+ calld->original_recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready_channelz;
}
// This is called via the call combiner, so access to calld is synchronized.
@@ -1360,18 +1366,14 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- bool intercepted =
- maybe_intercept_recv_trailing_for_channelz(elem, batch);
+ maybe_intercept_recv_trailing_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
"pending_batches_resume");
- // Only clear if we haven't intercepted anything.
- if (!intercepted) {
- pending_batch_clear(calld, pending);
- }
+ pending_batch_clear(calld, pending);
}
}
// Note: This will release the call combiner.