aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc5
-rw-r--r--src/core/ext/filters/client_channel/client_channel_channelz.cc2
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc33
-rw-r--r--src/core/ext/filters/client_channel/subchannel.h10
-rw-r--r--src/core/lib/channel/channel_stack.h1
-rw-r--r--src/core/lib/surface/call.cc22
-rw-r--r--src/core/lib/surface/call.h13
-rw-r--r--test/core/channel/channel_stack_test.cc3
-rw-r--r--test/core/end2end/tests/channelz.cc4
9 files changed, 77 insertions, 16 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 024c9d737e..13a0b95511 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -892,6 +892,7 @@ typedef struct client_channel_call_data {
grpc_millis deadline;
gpr_arena* arena;
grpc_call_stack* owning_call;
+ grpc_call* call;
grpc_call_combiner* call_combiner;
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
@@ -2561,7 +2562,8 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
calld->arena, // arena
calld->pick.subchannel_call_context, // context
calld->call_combiner, // call_combiner
- parent_data_size // parent_data_size
+ parent_data_size, // parent_data_size
+ calld->call // call
};
grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
call_args, &calld->subchannel_call);
@@ -3092,6 +3094,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem,
calld->arena = args->arena;
calld->owning_call = args->call_stack;
calld->call_combiner = args->call_combiner;
+ calld->call = args->call;
if (GPR_LIKELY(chand->deadline_checking_enabled)) {
grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
calld->deadline);
diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc
index 0db9799a65..a1ecbe75a2 100644
--- a/src/core/ext/filters/client_channel/client_channel_channelz.cc
+++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc
@@ -126,6 +126,8 @@ grpc_json* ClientChannelNode::RenderJson() {
// as CallCountingAndTracingNode to populate trace and call count data.
PopulateTrace(json);
PopulateCallData(json);
+ // reset to the top level
+ json = top_level_json;
PopulateChildRefs(json);
return top_level_json;
}
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 8dfbd33ffe..a15cfd43c3 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -46,6 +46,7 @@
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -640,8 +641,8 @@ static bool publish_transport_locked(grpc_subchannel* c) {
}
/* publish */
- c->connected_subchannel.reset(
- grpc_core::New<grpc_core::ConnectedSubchannel>(stk));
+ c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
+ stk, c->channelz_subchannel.get()));
gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
c->connected_subchannel.get(), c);
@@ -796,9 +797,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
namespace grpc_core {
-ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack)
+ConnectedSubchannel::ConnectedSubchannel(
+ grpc_channel_stack* channel_stack,
+ channelz::SubchannelNode* channelz_subchannel)
: RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
- channel_stack_(channel_stack) {}
+ channel_stack_(channel_stack),
+ channelz_subchannel_(channelz_subchannel) {}
ConnectedSubchannel::~ConnectedSubchannel() {
GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
@@ -845,14 +849,15 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
connection.release(); // Ref is passed to the grpc_subchannel_call object.
(*call)->connection = this;
const grpc_call_element_args call_args = {
- callstk, /* call_stack */
- nullptr, /* server_transport_data */
- args.context, /* context */
- args.path, /* path */
- args.start_time, /* start_time */
- args.deadline, /* deadline */
- args.arena, /* arena */
- args.call_combiner /* call_combiner */
+ callstk, /* call_stack */
+ nullptr, /* server_transport_data */
+ args.context, /* context */
+ args.path, /* path */
+ args.start_time, /* start_time */
+ args.deadline, /* deadline */
+ args.arena, /* arena */
+ args.call_combiner, /* call_combiner */
+ args.call /* call */
};
grpc_error* error = grpc_call_stack_init(
channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
@@ -861,6 +866,10 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
+ if (channelz_subchannel_ != nullptr) {
+ channelz_subchannel_->RecordCallStarted();
+ grpc_call_set_channelz_subchannel(args.call, channelz_subchannel_);
+ }
grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h
index dd3a2d9621..d62348488e 100644
--- a/src/core/ext/filters/client_channel/subchannel.h
+++ b/src/core/ext/filters/client_channel/subchannel.h
@@ -83,9 +83,11 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
grpc_call_context_element* context;
grpc_call_combiner* call_combiner;
size_t parent_data_size;
+ grpc_call* call;
};
- explicit ConnectedSubchannel(grpc_channel_stack* channel_stack);
+ explicit ConnectedSubchannel(grpc_channel_stack* channel_stack,
+ channelz::SubchannelNode* channelz_subchannel);
~ConnectedSubchannel();
grpc_channel_stack* channel_stack() { return channel_stack_; }
@@ -94,9 +96,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> {
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
+ channelz::SubchannelNode* channelz_subchannel() {
+ return channelz_subchannel_;
+ }
private:
grpc_channel_stack* channel_stack_;
+ // backpointer to the channelz node in this connected subchannel's
+ // owning subchannel.
+ channelz::SubchannelNode* channelz_subchannel_;
};
} // namespace grpc_core
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 7581f937b6..727f36a6f8 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -71,6 +71,7 @@ typedef struct {
grpc_millis deadline;
gpr_arena* arena;
grpc_call_combiner* call_combiner;
+ grpc_call* call;
} grpc_call_element_args;
typedef struct {
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index 88e015ce22..6545af8f72 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -170,6 +170,11 @@ struct grpc_call {
/* parent_call* */ gpr_atm parent_call_atm;
child_call* child;
+ // the call holds onto this so that once the call knows if the RPC was
+ // a success or failure, it can update the channelz bookkeeping for the
+ // subchannel that sent it.
+ grpc_core::channelz::CallCountingAndTracingNode* channelz_subchannel_;
+
/* client or server call */
bool is_client;
/** has grpc_call_unref been called */
@@ -269,6 +274,11 @@ struct grpc_call {
gpr_atm recv_state;
};
+void grpc_call_set_channelz_subchannel(
+ grpc_call* call, grpc_core::channelz::CallCountingAndTracingNode* node) {
+ call->channelz_subchannel_ = node;
+}
+
grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
grpc_core::TraceFlag grpc_compression_trace(false, "compression");
@@ -444,7 +454,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
call->start_time,
send_deadline,
call->arena,
- &call->call_combiner};
+ &call->call_combiner,
+ call};
add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
call, &call_args));
// Publish this call to parent only after the call stack has been initialized.
@@ -1263,6 +1274,7 @@ static void post_batch_completion(batch_control* bctl) {
get_final_status(call, set_cancelled_value,
call->final_op.server.cancelled, nullptr, nullptr);
}
+ // Record channelz data for the channel.
grpc_core::channelz::ChannelNode* channelz_channel =
grpc_channel_get_channelz_node(call->channel);
if (channelz_channel != nullptr) {
@@ -1272,6 +1284,14 @@ static void post_batch_completion(batch_control* bctl) {
channelz_channel->RecordCallSucceeded();
}
}
+ // Record channelz data for the subchannel.
+ if (call->channelz_subchannel_ != nullptr) {
+ if (*call->final_op.client.status != GRPC_STATUS_OK) {
+ call->channelz_subchannel_->RecordCallFailed();
+ } else {
+ call->channelz_subchannel_->RecordCallSucceeded();
+ }
+ }
GRPC_ERROR_UNREF(error);
error = GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index b3b06059d4..200c840c61 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -110,6 +110,19 @@ size_t grpc_call_get_initial_size_estimate();
grpc_compression_algorithm grpc_call_compression_for_level(
grpc_call* call, grpc_compression_level level);
+namespace grpc_core {
+namespace channelz {
+class CallCountingAndTracingNode;
+} // namespace channelz
+} // namespace grpc_core
+
+// We need this so that a subchannel selected for a call can add itself to
+// the call's data structure. This allows the call to trigger the correct
+// channelz bookkeeping on the subchannel once the call knows if the RPC was
+// successful or not.
+void grpc_call_set_channelz_subchannel(
+ grpc_call* call, grpc_core::channelz::CallCountingAndTracingNode* node);
+
extern grpc_core::TraceFlag grpc_call_error_trace;
extern grpc_core::TraceFlag grpc_compression_trace;
diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc
index 2f5329a96d..4dc2ee3f55 100644
--- a/test/core/channel/channel_stack_test.cc
+++ b/test/core/channel/channel_stack_test.cc
@@ -124,7 +124,8 @@ static void test_create_channel_stack(void) {
gpr_now(GPR_CLOCK_MONOTONIC), /* start_time */
GRPC_MILLIS_INF_FUTURE, /* deadline */
nullptr, /* arena */
- nullptr /* call_combiner */
+ nullptr, /* call_combiner */
+ nullptr /* call */
};
grpc_error* error =
grpc_call_stack_init(channel_stack, 1, free_call, call_stack, &args);
diff --git a/test/core/end2end/tests/channelz.cc b/test/core/end2end/tests/channelz.cc
index 533703a2be..754c3d3741 100644
--- a/test/core/end2end/tests/channelz.cc
+++ b/test/core/end2end/tests/channelz.cc
@@ -241,6 +241,10 @@ static void test_channelz(grpc_end2end_test_config config) {
GPR_ASSERT(nullptr == strstr(json, "\"severity\":\"CT_INFO\""));
gpr_free(json);
+ json = grpc_channelz_get_subchannel(2);
+ gpr_log(GPR_INFO, "%s", json);
+ gpr_free(json);
+
end_test(&f);
config.tear_down_data(&f);
}