diff options
author | 2018-07-20 16:17:48 -0700 | |
---|---|---|
committer | 2018-07-20 16:43:22 -0700 | |
commit | 835dab6a464b3246f6bdc3b1bb0fe706551a0260 (patch) | |
tree | 1b6477175b543848ed5a0877c2c28f5b4c97403b | |
parent | a9cde770ff3654aec3e2c59379310ea0ecaf0039 (diff) |
Full subchannel support
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel.cc | 5 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/client_channel_channelz.cc | 2 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.cc | 33 | ||||
-rw-r--r-- | src/core/ext/filters/client_channel/subchannel.h | 10 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 1 | ||||
-rw-r--r-- | src/core/lib/surface/call.cc | 22 | ||||
-rw-r--r-- | src/core/lib/surface/call.h | 13 | ||||
-rw-r--r-- | test/core/channel/channel_stack_test.cc | 3 | ||||
-rw-r--r-- | test/core/end2end/tests/channelz.cc | 4 |
9 files changed, 77 insertions, 16 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 024c9d737e..13a0b95511 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -892,6 +892,7 @@ typedef struct client_channel_call_data { grpc_millis deadline; gpr_arena* arena; grpc_call_stack* owning_call; + grpc_call* call; grpc_call_combiner* call_combiner; grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data; @@ -2561,7 +2562,8 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { calld->arena, // arena calld->pick.subchannel_call_context, // context calld->call_combiner, // call_combiner - parent_data_size // parent_data_size + parent_data_size, // parent_data_size + calld->call // call }; grpc_error* new_error = calld->pick.connected_subchannel->CreateCall( call_args, &calld->subchannel_call); @@ -3092,6 +3094,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem, calld->arena = args->arena; calld->owning_call = args->call_stack; calld->call_combiner = args->call_combiner; + calld->call = args->call; if (GPR_LIKELY(chand->deadline_checking_enabled)) { grpc_deadline_state_init(elem, args->call_stack, args->call_combiner, calld->deadline); diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 0db9799a65..a1ecbe75a2 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -126,6 +126,8 @@ grpc_json* ClientChannelNode::RenderJson() { // as CallCountingAndTracingNode to populate trace and call count data. PopulateTrace(json); PopulateCallData(json); + // reset to the top level + json = top_level_json; PopulateChildRefs(json); return top_level_json; } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 8dfbd33ffe..a15cfd43c3 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -46,6 +46,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/connectivity_state.h" @@ -640,8 +641,8 @@ static bool publish_transport_locked(grpc_subchannel* c) { } /* publish */ - c->connected_subchannel.reset( - grpc_core::New<grpc_core::ConnectedSubchannel>(stk)); + c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>( + stk, c->channelz_subchannel.get())); gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", c->connected_subchannel.get(), c); @@ -796,9 +797,12 @@ grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { namespace grpc_core { -ConnectedSubchannel::ConnectedSubchannel(grpc_channel_stack* channel_stack) +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel) : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount), - channel_stack_(channel_stack) {} + channel_stack_(channel_stack), + channelz_subchannel_(channelz_subchannel) {} ConnectedSubchannel::~ConnectedSubchannel() { GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); @@ -845,14 +849,15 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, connection.release(); // Ref is passed to the grpc_subchannel_call object. (*call)->connection = this; const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args.context, /* context */ - args.path, /* path */ - args.start_time, /* start_time */ - args.deadline, /* deadline */ - args.arena, /* arena */ - args.call_combiner /* call_combiner */ + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner, /* call_combiner */ + args.call /* call */ }; grpc_error* error = grpc_call_stack_init( channel_stack_, 1, subchannel_call_destroy, *call, &call_args); @@ -861,6 +866,10 @@ grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, gpr_log(GPR_ERROR, "error: %s", error_string); return error; } + if (channelz_subchannel_ != nullptr) { + channelz_subchannel_->RecordCallStarted(); + grpc_call_set_channelz_subchannel(args.call, channelz_subchannel_); + } grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index dd3a2d9621..d62348488e 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -83,9 +83,11 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { grpc_call_context_element* context; grpc_call_combiner* call_combiner; size_t parent_data_size; + grpc_call* call; }; - explicit ConnectedSubchannel(grpc_channel_stack* channel_stack); + explicit ConnectedSubchannel(grpc_channel_stack* channel_stack, + channelz::SubchannelNode* channelz_subchannel); ~ConnectedSubchannel(); grpc_channel_stack* channel_stack() { return channel_stack_; } @@ -94,9 +96,15 @@ class ConnectedSubchannel : public RefCountedWithTracing<ConnectedSubchannel> { grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + channelz::SubchannelNode* channelz_subchannel() { + return channelz_subchannel_; + } private: grpc_channel_stack* channel_stack_; + // backpointer to the channelz node in this connected subchannel's + // owning subchannel. + channelz::SubchannelNode* channelz_subchannel_; }; } // namespace grpc_core diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 7581f937b6..727f36a6f8 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -71,6 +71,7 @@ typedef struct { grpc_millis deadline; gpr_arena* arena; grpc_call_combiner* call_combiner; + grpc_call* call; } grpc_call_element_args; typedef struct { diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index 88e015ce22..6545af8f72 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -170,6 +170,11 @@ struct grpc_call { /* parent_call* */ gpr_atm parent_call_atm; child_call* child; + // the call holds onto this so that once the call knows if the RPC was + // a success or failure, it can update the channelz bookkeeping for the + // subchannel that sent it. + grpc_core::channelz::CallCountingAndTracingNode* channelz_subchannel_; + /* client or server call */ bool is_client; /** has grpc_call_unref been called */ @@ -269,6 +274,11 @@ struct grpc_call { gpr_atm recv_state; }; +void grpc_call_set_channelz_subchannel( + grpc_call* call, grpc_core::channelz::CallCountingAndTracingNode* node) { + call->channelz_subchannel_ = node; +} + grpc_core::TraceFlag grpc_call_error_trace(false, "call_error"); grpc_core::TraceFlag grpc_compression_trace(false, "compression"); @@ -444,7 +454,8 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args, call->start_time, send_deadline, call->arena, - &call->call_combiner}; + &call->call_combiner, + call}; add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call, call, &call_args)); // Publish this call to parent only after the call stack has been initialized. @@ -1263,6 +1274,7 @@ static void post_batch_completion(batch_control* bctl) { get_final_status(call, set_cancelled_value, call->final_op.server.cancelled, nullptr, nullptr); } + // Record channelz data for the channel. grpc_core::channelz::ChannelNode* channelz_channel = grpc_channel_get_channelz_node(call->channel); if (channelz_channel != nullptr) { @@ -1272,6 +1284,14 @@ static void post_batch_completion(batch_control* bctl) { channelz_channel->RecordCallSucceeded(); } } + // Record channelz data for the subchannel. + if (call->channelz_subchannel_ != nullptr) { + if (*call->final_op.client.status != GRPC_STATUS_OK) { + call->channelz_subchannel_->RecordCallFailed(); + } else { + call->channelz_subchannel_->RecordCallSucceeded(); + } + } GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h index b3b06059d4..200c840c61 100644 --- a/src/core/lib/surface/call.h +++ b/src/core/lib/surface/call.h @@ -110,6 +110,19 @@ size_t grpc_call_get_initial_size_estimate(); grpc_compression_algorithm grpc_call_compression_for_level( grpc_call* call, grpc_compression_level level); +namespace grpc_core { +namespace channelz { +class CallCountingAndTracingNode; +} // namespace channelz +} // namespace grpc_core + +// We need this so that a subchannel selected for a call can add itself to +// the call's data structure. This allows the call to trigger the correct +// channelz bookkeeping on the subchannel once the call knows if the RPC was +// successful or not. +void grpc_call_set_channelz_subchannel( + grpc_call* call, grpc_core::channelz::CallCountingAndTracingNode* node); + extern grpc_core::TraceFlag grpc_call_error_trace; extern grpc_core::TraceFlag grpc_compression_trace; diff --git a/test/core/channel/channel_stack_test.cc b/test/core/channel/channel_stack_test.cc index 2f5329a96d..4dc2ee3f55 100644 --- a/test/core/channel/channel_stack_test.cc +++ b/test/core/channel/channel_stack_test.cc @@ -124,7 +124,8 @@ static void test_create_channel_stack(void) { gpr_now(GPR_CLOCK_MONOTONIC), /* start_time */ GRPC_MILLIS_INF_FUTURE, /* deadline */ nullptr, /* arena */ - nullptr /* call_combiner */ + nullptr, /* call_combiner */ + nullptr /* call */ }; grpc_error* error = grpc_call_stack_init(channel_stack, 1, free_call, call_stack, &args); diff --git a/test/core/end2end/tests/channelz.cc b/test/core/end2end/tests/channelz.cc index 533703a2be..754c3d3741 100644 --- a/test/core/end2end/tests/channelz.cc +++ b/test/core/end2end/tests/channelz.cc @@ -241,6 +241,10 @@ static void test_channelz(grpc_end2end_test_config config) { GPR_ASSERT(nullptr == strstr(json, "\"severity\":\"CT_INFO\"")); gpr_free(json); + json = grpc_channelz_get_subchannel(2); + gpr_log(GPR_INFO, "%s", json); + gpr_free(json); + end_test(&f); config.tear_down_data(&f); } |