aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-09-07 15:44:59 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-09-07 15:44:59 -0700
commitbe8844bcdb704cff6a70507f5093e4bb26320ea3 (patch)
treebb58d1f011563a2f5838218837a039d18f6c6ae0 /src/core
parent4b5b019d5644affef122e06c6898811286850b8d (diff)
reviewer feedback
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc133
-rw-r--r--src/core/lib/channel/channelz.h7
-rw-r--r--src/core/lib/channel/connected_channel.cc6
-rw-r--r--src/core/lib/surface/call.cc8
4 files changed, 74 insertions, 80 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index d015ceb335..000cf82c6c 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -937,7 +937,7 @@ typedef struct client_channel_call_data {
// state needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_channelz;
grpc_closure* original_recv_trailing_metadata;
- grpc_transport_stream_op_batch* recv_trailing_metadata_batch;
+ grpc_metadata_batch* recv_trailing_metadata_batch;
grpc_polling_entity* pollent;
bool pollent_added_to_interested_parties;
@@ -1000,14 +1000,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
static void on_complete(void* arg, grpc_error* error);
static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
static void start_pick_locked(void* arg, grpc_error* ignored);
-template <typename Predicate>
-static pending_batch* pending_batch_find(grpc_call_element* elem,
- const char* log_message,
- Predicate predicate);
-static void get_call_status(grpc_call_element* elem,
- grpc_metadata_batch* md_batch, grpc_error* error,
- grpc_status_code* status,
- grpc_mdelem** server_pushback_md);
+static void maybe_intercept_metadata_for_channelz(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
//
// send op data caching
@@ -1282,66 +1276,6 @@ static void resume_pending_batch_in_call_combiner(void* arg,
grpc_subchannel_call_process_op(subchannel_call, batch);
}
-static void recv_trailing_metadata_ready_channelz(void* arg,
- grpc_error* error) {
- grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
- channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- call_data* calld = static_cast<call_data*>(elem->call_data);
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
- "error=%s",
- chand, calld, grpc_error_string(error));
- }
- GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
- grpc_status_code status = GRPC_STATUS_OK;
- grpc_metadata_batch* md_batch =
- calld->recv_trailing_metadata_batch->payload->recv_trailing_metadata
- .recv_trailing_metadata;
- get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
- grpc_core::channelz::SubchannelNode* channelz_subchannel =
- calld->pick.connected_subchannel->channelz_subchannel();
- GPR_ASSERT(channelz_subchannel != nullptr);
- if (status == GRPC_STATUS_OK) {
- channelz_subchannel->RecordCallSucceeded();
- } else {
- channelz_subchannel->RecordCallFailed();
- }
- calld->recv_trailing_metadata_batch = nullptr;
- GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
-}
-
-// If channelz is enabled, intercept recv_trailing so that we may check the
-// status and associate it to a subchannel.
-// Returns true if callback was intercepted, false otherwise.
-static void maybe_intercept_recv_trailing_for_channelz(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
- call_data* calld = static_cast<call_data*>(elem->call_data);
- // only intercept payloads with recv trailing.
- if (!batch->recv_trailing_metadata) {
- return;
- }
- // only add interceptor is channelz is enabled.
- if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
- return;
- }
- if (grpc_client_channel_trace.enabled()) {
- gpr_log(GPR_INFO,
- "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
- batch);
- }
- GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
- recv_trailing_metadata_ready_channelz, elem,
- grpc_schedule_on_exec_ctx);
- // save some state needed for the interception callback.
- GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
- calld->recv_trailing_metadata_batch = batch;
- calld->original_recv_trailing_metadata =
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
- batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
- &calld->recv_trailing_metadata_ready_channelz;
-}
-
// This is called via the call combiner, so access to calld is synchronized.
static void pending_batches_resume(grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
@@ -1366,7 +1300,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
pending_batch* pending = &calld->pending_batches[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch != nullptr) {
- maybe_intercept_recv_trailing_for_channelz(elem, batch);
+ maybe_intercept_metadata_for_channelz(elem, batch);
batch->handler_private.extra_arg = calld->subchannel_call;
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
@@ -2736,6 +2670,65 @@ static void pick_done(void* arg, grpc_error* error) {
}
}
+static void recv_trailing_metadata_ready_channelz(void* arg,
+ grpc_error* error) {
+ grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
+ channel_data* chand = static_cast<channel_data*>(elem->channel_data);
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
+ "error=%s",
+ chand, calld, grpc_error_string(error));
+ }
+ GPR_ASSERT(calld->recv_trailing_metadata_batch != nullptr);
+ grpc_status_code status = GRPC_STATUS_OK;
+ grpc_metadata_batch* md_batch = calld->recv_trailing_metadata_batch;
+ get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
+ grpc_core::channelz::SubchannelNode* channelz_subchannel =
+ calld->pick.connected_subchannel->channelz_subchannel();
+ GPR_ASSERT(channelz_subchannel != nullptr);
+ if (status == GRPC_STATUS_OK) {
+ channelz_subchannel->RecordCallSucceeded();
+ } else {
+ channelz_subchannel->RecordCallFailed();
+ }
+ calld->recv_trailing_metadata_batch = nullptr;
+ GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
+}
+
+// If channelz is enabled, intercept recv_trailing so that we may check the
+// status and associate it to a subchannel.
+// Returns true if callback was intercepted, false otherwise.
+static void maybe_intercept_metadata_for_channelz(
+ grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+ call_data* calld = static_cast<call_data*>(elem->call_data);
+ // only intercept payloads with recv trailing.
+ if (!batch->recv_trailing_metadata) {
+ return;
+ }
+ // only add interceptor is channelz is enabled.
+ if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
+ return;
+ }
+ if (grpc_client_channel_trace.enabled()) {
+ gpr_log(GPR_INFO,
+ "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
+ batch);
+ }
+ GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
+ recv_trailing_metadata_ready_channelz, elem,
+ grpc_schedule_on_exec_ctx);
+ // save some state needed for the interception callback.
+ GPR_ASSERT(calld->recv_trailing_metadata_batch == nullptr);
+ calld->recv_trailing_metadata_batch =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
+ calld->original_recv_trailing_metadata =
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+ batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+ &calld->recv_trailing_metadata_ready_channelz;
+}
+
static void maybe_add_call_to_channel_interested_parties_locked(
grpc_call_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h
index bd2735929c..db5d05140d 100644
--- a/src/core/lib/channel/channelz.h
+++ b/src/core/lib/channel/channelz.h
@@ -79,7 +79,7 @@ class BaseNode : public RefCounted<BaseNode> {
const intptr_t uuid_;
};
-// This class is a helper class for channelz entities that deal with Channels
+// This class is a helper class for channelz entities that deal with Channels,
// Subchannels, and Servers, since those have similar proto definitions.
// This class has the ability to:
// - track calls_{started,succeeded,failed}
@@ -133,6 +133,9 @@ class ChannelNode : public BaseNode {
// so it leaves these implementations blank.
//
// This is utilizing the template method design pattern.
+ //
+ // TODO(ncteisen): remove these template methods in favor of manual traversal
+ // and mutation of the grpc_json object.
virtual void PopulateConnectivityState(grpc_json* json) {}
virtual void PopulateChildRefs(grpc_json* json) {}
@@ -158,7 +161,7 @@ class ChannelNode : public BaseNode {
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
- // to allow the channel trace test to access trace();
+ // to allow the channel trace test to access trace_.
friend class testing::ChannelNodePeer;
grpc_channel* channel_ = nullptr;
UniquePtr<char> target_;
diff --git a/src/core/lib/channel/connected_channel.cc b/src/core/lib/channel/connected_channel.cc
index 90a0254663..4a4f0e49d0 100644
--- a/src/core/lib/channel/connected_channel.cc
+++ b/src/core/lib/channel/connected_channel.cc
@@ -104,18 +104,18 @@ static void con_start_transport_stream_op_batch(
if (batch->recv_initial_metadata) {
callback_state* state = &calld->recv_initial_metadata_ready;
intercept_callback(
- calld, state, false, "connected_recv_initial_metadata_ready",
+ calld, state, false, "recv_initial_metadata_ready",
&batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
}
if (batch->recv_message) {
callback_state* state = &calld->recv_message_ready;
- intercept_callback(calld, state, false, "connected_recv_message_ready",
+ intercept_callback(calld, state, false, "recv_message_ready",
&batch->payload->recv_message.recv_message_ready);
}
if (batch->recv_trailing_metadata) {
callback_state* state = &calld->recv_trailing_metadata_ready;
intercept_callback(
- calld, state, false, "connected_recv_trailing_metadata_ready",
+ calld, state, false, "recv_trailing_metadata_ready",
&batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready);
}
if (batch->cancel_stream) {
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 3d69db4f83..eb7e67233b 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -1425,7 +1425,7 @@ static void receiving_stream_ready_in_call_combiner(void* bctlp,
grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner, "call_recv_message_ready");
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
receiving_stream_ready(bctlp, error);
}
@@ -1510,8 +1510,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner,
- "call_recv_initial_metadata_ready");
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
@@ -1562,8 +1561,7 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
- GRPC_CALL_COMBINER_STOP(&call->call_combiner,
- "call_recv_trailing_metadata_ready");
+ GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
add_batch_error(bctl, GRPC_ERROR_REF(error), false);
grpc_metadata_batch* md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];