diff options
45 files changed, 1141 insertions, 122 deletions
@@ -829,14 +829,18 @@ grpc_cc_library( grpc_cc_library( name = "grpc_lb_policy_grpclb", srcs = [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", ], hdrs = [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", ], @@ -853,14 +857,18 @@ grpc_cc_library( grpc_cc_library( name = "grpc_lb_policy_grpclb_secure", srcs = [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", ], hdrs = [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index 325344a81c..f6c5f9e56d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1121,8 +1121,10 @@ add_library(grpc src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c src/core/ext/transport/chttp2/client/insecure/channel_create.c src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c + src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c third_party/nanopb/pb_common.c @@ -1991,8 +1993,10 @@ add_library(grpc_unsecure src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c src/core/ext/filters/load_reporting/load_reporting.c src/core/ext/filters/load_reporting/load_reporting_filter.c + src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c third_party/nanopb/pb_common.c @@ -3104,8 +3104,10 @@ LIBGRPC_SRC = \ src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c \ src/core/ext/transport/chttp2/client/insecure/channel_create.c \ src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c \ + src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c \ + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ third_party/nanopb/pb_common.c \ @@ -3943,8 +3945,10 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c \ src/core/ext/filters/load_reporting/load_reporting.c \ src/core/ext/filters/load_reporting/load_reporting_filter.c \ + src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c \ + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ third_party/nanopb/pb_common.c \ diff --git a/binding.gyp b/binding.gyp index 582c61282f..19724f544b 100644 --- a/binding.gyp +++ b/binding.gyp @@ -855,8 +855,10 @@ 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', diff --git a/build.yaml b/build.yaml index 4cfa75cae1..865beab481 100644 --- a/build.yaml +++ b/build.yaml @@ -488,13 +488,17 @@ filegroups: - grpc_base - name: grpc_lb_policy_grpclb headers: + - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h + - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h src: + - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c + - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c plugin: grpc_lb_policy_grpclb @@ -504,13 +508,17 @@ filegroups: - nanopb - name: grpc_lb_policy_grpclb_secure headers: + - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h + - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h src: + - src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c + - src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c - src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c - src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c plugin: grpc_lb_policy_grpclb @@ -291,8 +291,10 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c \ src/core/ext/transport/chttp2/client/insecure/channel_create.c \ src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c \ + src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c \ + src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ third_party/nanopb/pb_common.c \ diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 07755ac727..241eba01dd 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -434,8 +434,10 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/uri_parser.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/transport/chttp2/client/chttp2_connector.h', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h', 'third_party/nanopb/pb.h', @@ -668,8 +670,10 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', @@ -895,8 +899,10 @@ Pod::Spec.new do |s| 'src/core/ext/filters/client_channel/uri_parser.h', 'src/core/ext/filters/deadline/deadline_filter.h', 'src/core/ext/transport/chttp2/client/chttp2_connector.h', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h', 'third_party/nanopb/pb.h', diff --git a/grpc.gemspec b/grpc.gemspec index 1cd6d66335..b96f3cbab5 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -350,8 +350,10 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/filters/client_channel/uri_parser.h ) s.files += %w( src/core/ext/filters/deadline/deadline_filter.h ) s.files += %w( src/core/ext/transport/chttp2/client/chttp2_connector.h ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h ) s.files += %w( third_party/nanopb/pb.h ) @@ -584,8 +586,10 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c ) s.files += %w( src/core/ext/transport/chttp2/client/insecure/channel_create.c ) s.files += %w( src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c ) + s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c ) s.files += %w( src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c ) s.files += %w( third_party/nanopb/pb_common.c ) diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index f334ba61d6..9fe2bbb75e 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -364,28 +364,6 @@ class CallOpRecvMessage { bool allow_not_getting_message_; }; -namespace CallOpGenericRecvMessageHelper { -class DeserializeFunc { - public: - virtual Status Deserialize(grpc_byte_buffer* buf) = 0; - virtual ~DeserializeFunc() {} -}; - -template <class R> -class DeserializeFuncType final : public DeserializeFunc { - public: - DeserializeFuncType(R* message) : message_(message) {} - Status Deserialize(grpc_byte_buffer* buf) override { - return SerializationTraits<R>::Deserialize(buf, message_); - } - - ~DeserializeFuncType() override {} - - private: - R* message_; // Not a managed pointer because management is external to this -}; -} // namespace CallOpGenericRecvMessageHelper - class CallOpGenericRecvMessage { public: CallOpGenericRecvMessage() @@ -393,11 +371,9 @@ class CallOpGenericRecvMessage { template <class R> void RecvMessage(R* message) { - // Use an explicit base class pointer to avoid resolution error in the - // following unique_ptr::reset for some old implementations. - CallOpGenericRecvMessageHelper::DeserializeFunc* func = - new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message); - deserialize_.reset(func); + deserialize_ = [message](grpc_byte_buffer* buf) -> Status { + return SerializationTraits<R>::Deserialize(buf, message); + }; } // Do not change status if no message is received. @@ -420,7 +396,7 @@ class CallOpGenericRecvMessage { if (recv_buf_) { if (*status) { got_message = true; - *status = deserialize_->Deserialize(recv_buf_).ok(); + *status = deserialize_(recv_buf_).ok(); } else { got_message = false; g_core_codegen_interface->grpc_byte_buffer_destroy(recv_buf_); @@ -431,11 +407,12 @@ class CallOpGenericRecvMessage { *status = false; } } - deserialize_.reset(); + deserialize_ = DeserializeFunc(); } private: - std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_; + typedef std::function<Status(grpc_byte_buffer*)> DeserializeFunc; + DeserializeFunc deserialize_; grpc_byte_buffer* recv_buf_; bool allow_not_getting_message_; }; diff --git a/package.xml b/package.xml index e7d67eca18..d47708efa4 100644 --- a/package.xml +++ b/package.xml @@ -359,8 +359,10 @@ <file baseinstalldir="/" name="src/core/ext/filters/client_channel/uri_parser.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/deadline/deadline_filter.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/chttp2_connector.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb.h" role="src" /> @@ -593,8 +595,10 @@ <file baseinstalldir="/" name="src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/insecure/channel_create.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c" role="src" /> + <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c" role="src" /> <file baseinstalldir="/" name="src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c" role="src" /> <file baseinstalldir="/" name="third_party/nanopb/pb_common.c" role="src" /> diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 652d789ec4..3e14d3e789 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -789,6 +789,7 @@ typedef struct client_channel_call_data { bool pick_pending; grpc_connected_subchannel *connected_subchannel; + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; grpc_transport_stream_op_batch **waiting_ops; @@ -937,7 +938,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -966,6 +968,7 @@ typedef struct { grpc_metadata_batch *initial_metadata; uint32_t initial_metadata_flags; grpc_connected_subchannel **connected_subchannel; + grpc_call_context_element *subchannel_call_context; grpc_closure *on_ready; grpc_call_element *elem; grpc_closure closure; @@ -977,7 +980,8 @@ typedef struct { static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready); static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -987,9 +991,10 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready)) { + if (pick_subchannel_locked( + exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, cpa->connected_subchannel, + cpa->subchannel_call_context, cpa->on_ready)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } } @@ -1021,7 +1026,9 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, + grpc_closure *on_ready) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -1065,8 +1072,8 @@ static bool pick_subchannel_locked( GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping"); w_on_pick_arg->lb_policy = lb_policy; const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, - &w_on_pick_arg->wrapper_closure); + exec_ctx, lb_policy, &inputs, connected_subchannel, + subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy, @@ -1089,6 +1096,7 @@ static bool pick_subchannel_locked( cpa->initial_metadata = initial_metadata; cpa->initial_metadata_flags = initial_metadata_flags; cpa->connected_subchannel = connected_subchannel; + cpa->subchannel_call_context = subchannel_call_context; cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, @@ -1169,7 +1177,8 @@ static void start_transport_stream_op_batch_locked_inner( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata, op->payload->send_initial_metadata.send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step)) { + &calld->connected_subchannel, calld->subchannel_call_context, + &calld->next_step)) { calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -1185,7 +1194,8 @@ static void start_transport_stream_op_batch_locked_inner( .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -1340,6 +1350,12 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, "picked"); } + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { + if (calld->subchannel_call_context[i].value != NULL) { + calld->subchannel_call_context[i].destroy( + calld->subchannel_call_context[i].value); + } + } gpr_free(calld->waiting_ops); grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 2d31499d13..112ba40658 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -119,9 +119,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target, - user_data, on_complete); + context, user_data, on_complete); } void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 25427666ae..fefcb4912c 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -43,9 +43,6 @@ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; -typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, - grpc_status_code status, const char *errmsg); - struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; @@ -76,7 +73,8 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_pick */ int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** \see grpc_lb_policy_cancel_pick */ @@ -156,6 +154,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, \a target will be set to the selected subchannel, or NULL on failure. Upon success, \a user_data will be set to whatever opaque information may need to be propagated from the LB policy, or NULL if not needed. + \a context will be populated with context to pass to the subchannel + call, if needed. If the pick succeeds and a result is known immediately, a non-zero value will be returned. Otherwise, \a on_complete will be invoked @@ -167,6 +167,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c new file mode 100644 index 0000000000..67baa46de7 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -0,0 +1,153 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" + +#include <grpc/support/atm.h> +#include <grpc/support/log.h> + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" + +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + return GRPC_ERROR_NONE; +} + +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) {} + +typedef struct { + // Stats object to update. + grpc_grpclb_client_stats *client_stats; + // State for intercepting send_initial_metadata. + grpc_closure on_complete_for_send; + grpc_closure *original_on_complete_for_send; + bool send_initial_metadata_succeeded; + // State for intercepting recv_initial_metadata. + grpc_closure recv_initial_metadata_ready; + grpc_closure *original_recv_initial_metadata_ready; + bool recv_initial_metadata_succeeded; +} call_data; + +static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->send_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_on_complete_for_send, + GRPC_ERROR_REF(error)); +} + +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->recv_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + call_data *calld = elem->call_data; + // Get stats object from context and take a ref. + GPR_ASSERT(args->context != NULL); + GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); + calld->client_stats = grpc_grpclb_client_stats_ref( + args->context[GRPC_GRPCLB_CLIENT_STATS].value); + // Record call started. + grpc_grpclb_client_stats_add_call_started(calld->client_stats); + return GRPC_ERROR_NONE; +} + +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_final_info *final_info, + grpc_closure *ignored) { + call_data *calld = elem->call_data; + // Record call finished, optionally setting client_failed_to_send and + // received. + grpc_grpclb_client_stats_add_call_finished( + false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */, + !calld->send_initial_metadata_succeeded /* client_failed_to_send */, + calld->recv_initial_metadata_succeeded /* known_received */, + calld->client_stats); + // All done, so unref the stats object. + grpc_grpclb_client_stats_unref(calld->client_stats); +} + +static void start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); + // Intercept send_initial_metadata. + if (batch->send_initial_metadata) { + calld->original_on_complete_for_send = batch->on_complete; + grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld, + grpc_schedule_on_exec_ctx); + batch->on_complete = &calld->on_complete_for_send; + } + // Intercept recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, calld, + grpc_schedule_on_exec_ctx); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + } + // Chain to next filter. + grpc_call_next_op(exec_ctx, elem, batch); + GPR_TIMER_END("clr_start_transport_stream_op_batch", 0); +} + +const grpc_channel_filter grpc_client_load_reporting_filter = { + start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, // sizeof(channel_data) + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "client_load_reporting"}; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h new file mode 100644 index 0000000000..28b313d874 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_client_load_reporting_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 9e158a94ad..37468101f0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -108,13 +108,16 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -126,6 +129,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" #define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 @@ -147,6 +151,10 @@ static grpc_error *initial_metadata_add_lb_token( lb_token_mdelem_storage, lb_token); } +static void destroy_client_stats(void *arg) { + grpc_grpclb_client_stats_unref(arg); +} + typedef struct wrapped_rr_closure_arg { /* the closure instance using this struct as argument */ grpc_closure wrapper_closure; @@ -163,6 +171,13 @@ typedef struct wrapped_rr_closure_arg { * initial metadata */ grpc_connected_subchannel **target; + /* the context to be populated for the subchannel call */ + grpc_call_context_element *context; + + /* Stats for client-side load reporting. Note that this holds a + * reference, which must be either passed on via context or unreffed. */ + grpc_grpclb_client_stats *client_stats; + /* the LB token associated with the pick */ grpc_mdelem lb_token; @@ -202,6 +217,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, (void *)*wc_arg->target, (void *)wc_arg->rr_policy); abort(); } + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + } else { + grpc_grpclb_client_stats_unref(wc_arg->client_stats); } if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy); @@ -237,6 +258,7 @@ typedef struct pending_pick { static void add_pending_pick(pending_pick **root, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, grpc_closure *on_complete) { pending_pick *pp = gpr_zalloc(sizeof(*pp)); pp->next = *root; @@ -244,6 +266,7 @@ static void add_pending_pick(pending_pick **root, pp->target = target; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.context = context; pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; @@ -316,6 +339,10 @@ typedef struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ + + /* Finished sending initial request. */ + grpc_closure lb_on_sent_initial_request; + /* Status from the LB server has been received. This signals the end of the LB * call. */ grpc_closure lb_on_server_status_received; @@ -348,6 +375,23 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; + + bool initial_request_sent; + bool seen_initial_response; + + /* Stats for client-side load reporting. Should be unreffed and + * recreated whenever lb_call is replaced. */ + grpc_grpclb_client_stats *client_stats; + /* Interval and timer for next client load report. */ + gpr_timespec client_stats_report_interval; + grpc_timer client_load_report_timer; + bool client_load_report_timer_pending; + bool last_client_load_report_counters_were_zero; + /* Closure used for either the load report timer or the callback for + * completion of sending the load report. */ + grpc_closure client_load_report_closure; + /* Client load report message payload. */ + grpc_byte_buffer *client_load_report_payload; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -552,8 +596,8 @@ static bool pick_from_internal_rr_locked( grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { GPR_ASSERT(rr_policy != NULL); const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token, - &wc_arg->wrapper_closure); + exec_ctx, rr_policy, pick_args, target, wc_arg->context, + (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { @@ -567,7 +611,12 @@ static bool pick_from_internal_rr_locked( pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - gpr_free(wc_arg); + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + + gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the * pending pick list inside the RR policy (glb_policy->rr_policy). @@ -690,6 +739,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_picks = pp->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; + pp->wrapped_on_complete_arg.client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); @@ -864,9 +915,18 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_uri_destroy(uri); glb_policy->cc_factory = args->client_channel_factory; - glb_policy->args = grpc_channel_args_copy(args->args); GPR_ASSERT(glb_policy->cc_factory != NULL); + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + grpc_arg new_arg; + new_arg.key = GRPC_ARG_LB_POLICY_NAME; + new_arg.type = GRPC_ARG_STRING; + new_arg.value.string = "grpclb"; + static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + glb_policy->args = grpc_channel_args_copy_and_add_and_remove( + args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + grpc_slice_hash_table *targets_info = NULL; /* Create a client channel over them to communicate with a LB service */ char *lb_service_target_addresses = @@ -880,6 +940,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, lb_channel_args); gpr_free(lb_service_target_addresses); if (glb_policy->lb_channel == NULL) { + gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); gpr_free(glb_policy); return NULL; } @@ -895,6 +957,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); grpc_channel_args_destroy(exec_ctx, glb_policy->args); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -1011,7 +1076,8 @@ static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; @@ -1039,6 +1105,10 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_schedule_on_exec_ctx); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; + wc_arg->context = context; + GPR_ASSERT(glb_policy->client_stats != NULL); + wc_arg->client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); wc_arg->wrapped_closure = on_complete; wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; @@ -1052,7 +1122,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, "picks", (void *)(glb_policy)); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, + add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, on_complete); if (!glb_policy->started_picking) { @@ -1093,6 +1163,104 @@ static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, exec_ctx, &glb_policy->state_tracker, current, notify); } +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + +static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec next_client_load_report_time = + gpr_time_add(now, glb_policy->client_stats_report_interval); + grpc_closure_init(&glb_policy->client_load_report_closure, + send_client_load_report_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, + next_client_load_report_time, + &glb_policy->client_load_report_closure, now); +} + +static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); + glb_policy->client_load_report_payload = NULL; + if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + schedule_next_client_load_report(exec_ctx, glb_policy); +} + +static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = glb_policy->client_load_report_payload; + grpc_closure_init(&glb_policy->client_load_report_closure, + client_load_report_done_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, &op, 1, + &glb_policy->client_load_report_closure); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static bool load_report_counters_are_zero(grpc_grpclb_request *request) { + return request->client_stats.num_calls_started == 0 && + request->client_stats.num_calls_finished == 0 && + request->client_stats.num_calls_finished_with_drop_for_rate_limiting == + 0 && + request->client_stats + .num_calls_finished_with_drop_for_load_balancing == 0 && + request->client_stats.num_calls_finished_with_client_failed_to_send == + 0 && + request->client_stats.num_calls_finished_known_received == 0; +} + +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + // Construct message payload. + GPR_ASSERT(glb_policy->client_load_report_payload == NULL); + grpc_grpclb_request *request = + grpc_grpclb_load_report_request_create(glb_policy->client_stats); + // Skip client load report if the counters were all zero in the last + // report and they are still zero in this one. + if (load_report_counters_are_zero(request)) { + if (glb_policy->last_client_load_report_counters_were_zero) { + grpc_grpclb_request_destroy(request); + schedule_next_client_load_report(exec_ctx, glb_policy); + return; + } + glb_policy->last_client_load_report_counters_were_zero = true; + } else { + glb_policy->last_client_load_report_counters_were_zero = false; + } + grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); + glb_policy->client_load_report_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_grpclb_request_destroy(request); + // If we've already sent the initial request, then we can go ahead and + // sent the load report. Otherwise, we need to wait until the initial + // request has been sent to send this + // (see lb_on_sent_initial_request_locked() below). + if (glb_policy->initial_request_sent) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } +} + +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1114,6 +1282,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, &host, glb_policy->deadline, NULL); grpc_slice_unref_internal(exec_ctx, host); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } + glb_policy->client_stats = grpc_grpclb_client_stats_create(); + grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); @@ -1125,6 +1298,9 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_slice_unref_internal(exec_ctx, request_payload_slice); grpc_grpclb_request_destroy(request); + grpc_closure_init(&glb_policy->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_closure_init(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); @@ -1138,6 +1314,10 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, GRPC_GRPCLB_RECONNECT_JITTER, GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + + glb_policy->initial_request_sent = false; + glb_policy->seen_initial_response = false; + glb_policy->last_client_load_report_counters_were_zero = false; } static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -1151,6 +1331,10 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_byte_buffer_destroy(glb_policy->lb_request_payload); grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details); + + if (!glb_policy->client_load_report_timer_pending) { + grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer); + } } /* @@ -1179,21 +1363,27 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_sent_initial_request); + GPR_ASSERT(GRPC_CALL_OK == call_error); + op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &glb_policy->lb_trailing_metadata_recv; @@ -1225,6 +1415,19 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(GRPC_CALL_OK == call_error); } +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + glb_lb_policy *glb_policy = arg; + glb_policy->initial_request_sent = true; + // If we attempted to send a client load report before the initial + // request was sent, send the load report now. + if (glb_policy->client_load_report_payload != NULL) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_locked"); +} + static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; @@ -1240,58 +1443,91 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_destroy(glb_policy->lb_response_payload); - grpc_grpclb_serverlist *serverlist = - grpc_grpclb_response_parse_serverlist(response_slice); - if (serverlist != NULL) { - GPR_ASSERT(glb_policy->lb_call != NULL); - grpc_slice_unref_internal(exec_ctx, response_slice); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Serverlist with %lu servers received", - (unsigned long)serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_resolved_address addr; - parse_server(serverlist->servers[i], &addr); - char *ipport; - grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); - gpr_free(ipport); + + grpc_grpclb_initial_response *response = NULL; + if (!glb_policy->seen_initial_response && + (response = grpc_grpclb_initial_response_parse(response_slice)) != + NULL) { + if (response->has_client_stats_report_interval) { + glb_policy->client_stats_report_interval = + gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN), + grpc_grpclb_duration_to_timespec( + &response->client_stats_report_interval)); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting interval = %" PRId64 ".%09d sec", + glb_policy->client_stats_report_interval.tv_sec, + glb_policy->client_stats_report_interval.tv_nsec); } + /* take a weak ref (won't prevent calling of \a glb_shutdown() if the + * strong ref count goes to zero) to be unref'd in + * send_client_load_report() */ + glb_policy->client_load_report_timer_pending = true; + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); + schedule_next_client_load_report(exec_ctx, glb_policy); + } else if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting NOT enabled"); } + grpc_grpclb_initial_response_destroy(response); + glb_policy->seen_initial_response = true; + } else { + grpc_grpclb_serverlist *serverlist = + grpc_grpclb_response_parse_serverlist(response_slice); + if (serverlist != NULL) { + GPR_ASSERT(glb_policy->lb_call != NULL); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Serverlist with %lu servers received", + (unsigned long)serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char *ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_free(ipport); + } + } - /* update serverlist */ - if (serverlist->num_servers > 0) { - if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { + /* update serverlist */ + if (serverlist->num_servers > 0) { + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, + serverlist)) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Incoming server list identical to current, ignoring."); + } + grpc_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (glb_policy->serverlist != NULL) { + /* dispose of the old serverlist */ + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); + } + /* and update the copy in the glb_lb_policy instance. This + * serverlist instance will be destroyed either upon the next + * update or in glb_destroy() */ + glb_policy->serverlist = serverlist; + + rr_handover_locked(exec_ctx, glb_policy); + } + } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, - "Incoming server list identical to current, ignoring."); + "Received empty server list. Picks will stay pending until " + "a response with > 0 servers is received"); } grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (glb_policy->serverlist != NULL) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } - /* and update the copy in the glb_lb_policy instance. This serverlist - * instance will be destroyed either upon the next update or in - * glb_destroy() */ - glb_policy->serverlist = serverlist; - - rr_handover_locked(exec_ctx, glb_policy); } - } else { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "Received empty server list. Picks will stay pending until a " - "response with > 0 servers is received"); - } - grpc_grpclb_destroy_serverlist(serverlist); + } else { /* serverlist == NULL */ + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } - } else { /* serverlist == NULL */ - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); - grpc_slice_unref_internal(exec_ctx, response_slice); } + grpc_slice_unref_internal(exec_ctx, response_slice); + if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1403,9 +1639,29 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() { } /* Plugin registration */ + +// Only add client_load_reporting filter if the grpclb LB policy is used. +static bool maybe_add_client_load_reporting_filter( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { + const grpc_channel_args *args = + grpc_channel_stack_builder_get_channel_arguments(builder); + const grpc_arg *channel_arg = + grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); + if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING && + strcmp(channel_arg->value.string, "grpclb") == 0) { + return grpc_channel_stack_builder_append_filter( + builder, (const grpc_channel_filter *)arg, NULL, NULL); + } + return true; +} + void grpc_lb_policy_grpclb_init() { grpc_register_lb_policy(grpc_glb_lb_factory_create()); grpc_register_tracer("glb", &grpc_lb_glb_trace); + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_client_load_reporting_filter, + (void *)&grpc_client_load_reporting_filter); } void grpc_lb_policy_grpclb_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c new file mode 100644 index 0000000000..444c03b9aa --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -0,0 +1,133 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/channel/channel_args.h" + +#define GRPC_ARG_GRPCLB_CLIENT_STATS "grpc.grpclb_client_stats" + +struct grpc_grpclb_client_stats { + gpr_refcount refs; + gpr_atm num_calls_started; + gpr_atm num_calls_finished; + gpr_atm num_calls_finished_with_drop_for_rate_limiting; + gpr_atm num_calls_finished_with_drop_for_load_balancing; + gpr_atm num_calls_finished_with_client_failed_to_send; + gpr_atm num_calls_finished_known_received; +}; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create() { + grpc_grpclb_client_stats* client_stats = gpr_zalloc(sizeof(*client_stats)); + gpr_ref_init(&client_stats->refs, 1); + return client_stats; +} + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats) { + gpr_ref(&client_stats->refs); + return client_stats; +} + +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) { + if (gpr_unref(&client_stats->refs)) { + gpr_free(client_stats); + } +} + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1); +} + +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); + if (finished_with_drop_for_rate_limiting) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_rate_limiting, + (gpr_atm)1); + } + if (finished_with_drop_for_load_balancing) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_load_balancing, + (gpr_atm)1); + } + if (finished_with_client_failed_to_send) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_client_failed_to_send, + (gpr_atm)1); + } + if (finished_known_received) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished_known_received, + (gpr_atm)1); + } +} + +static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) { + *value = (int64_t)gpr_atm_acq_load(counter); + gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value)); +} + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received) { + atomic_get_and_reset_counter(num_calls_started, + &client_stats->num_calls_started); + atomic_get_and_reset_counter(num_calls_finished, + &client_stats->num_calls_finished); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_rate_limiting, + &client_stats->num_calls_finished_with_drop_for_rate_limiting); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_load_balancing, + &client_stats->num_calls_finished_with_drop_for_load_balancing); + atomic_get_and_reset_counter( + num_calls_finished_with_client_failed_to_send, + &client_stats->num_calls_finished_with_client_failed_to_send); + atomic_get_and_reset_counter( + num_calls_finished_known_received, + &client_stats->num_calls_finished_known_received); +} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h new file mode 100644 index 0000000000..0af4a919f8 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H + +#include <stdbool.h> + +#include <grpc/impl/codegen/grpc_types.h> + +typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create(); +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 87549b78f0..81b6932fae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -80,15 +80,45 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request)); - - req->has_client_stats = 0; /* TODO(dgq): add support for stats once defined */ - req->has_initial_request = 1; - req->initial_request.has_name = 1; + req->has_client_stats = false; + req->has_initial_request = true; + req->initial_request.has_name = true; strncpy(req->initial_request.name, lb_service_name, GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); return req; } +static void populate_timestamp(gpr_timespec timestamp, + struct _grpc_lb_v1_Timestamp *timestamp_pb) { + timestamp_pb->has_seconds = true; + timestamp_pb->seconds = timestamp.tv_sec; + timestamp_pb->has_nanos = true; + timestamp_pb->nanos = timestamp.tv_nsec; +} + +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats) { + grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); + req->has_client_stats = true; + req->client_stats.has_timestamp = true; + populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); + req->client_stats.has_num_calls_started = true; + req->client_stats.has_num_calls_finished = true; + req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; + req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_known_received = true; + grpc_grpclb_client_stats_get( + client_stats, &req->client_stats.num_calls_started, + &req->client_stats.num_calls_finished, + &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, + &req->client_stats.num_calls_finished_with_drop_for_load_balancing, + &req->client_stats.num_calls_finished_with_client_failed_to_send, + &req->client_stats.num_calls_finished_known_received); + return req; +} + grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { size_t encoded_length; pb_ostream_t sizestream; @@ -122,6 +152,9 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } + + if (!res.has_initial_response) return NULL; + grpc_grpclb_initial_response *initial_res = gpr_malloc(sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, @@ -243,6 +276,15 @@ int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, return 0; } +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb) { + gpr_timespec duration; + duration.tv_sec = duration_pb->has_seconds ? duration_pb->seconds : 0; + duration.tv_nsec = duration_pb->has_nanos ? duration_pb->nanos : 0; + duration.clock_type = GPR_TIMESPAN; + return duration; +} + void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response) { gpr_free(response); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index d014b8800c..06873821bd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -36,6 +36,7 @@ #include <grpc/slice_buffer.h> +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" @@ -58,6 +59,8 @@ typedef struct grpc_grpclb_serverlist { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats); /** Protocol Buffers v3-encode \a request */ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); @@ -93,6 +96,9 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, const grpc_grpclb_duration *rhs); +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb); + /** Destroy \a initial_response */ void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index 2b77cd39b8..b1c5dfc61c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -189,7 +189,8 @@ static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index ff41e61b3e..4c17f9c082 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -414,7 +414,8 @@ static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index b2de85c4a1..1af3393a62 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -781,7 +781,7 @@ grpc_error *grpc_connected_subchannel_create_call( (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); const grpc_call_element_args call_args = {.call_stack = callstk, .server_transport_data = NULL, - .context = NULL, + .context = args->context, .path = args->path, .start_time = args->start_time, .deadline = args->deadline, diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 6473de49b0..e433c33e40 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -119,6 +119,7 @@ typedef struct { gpr_timespec start_time; gpr_timespec deadline; gpr_arena *arena; + grpc_call_context_element *context; } grpc_connected_subchannel_call_args; grpc_error *grpc_connected_subchannel_create_call( diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 6c931ad28a..f0a21113c5 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -50,6 +50,9 @@ typedef enum { /// Reserved for traffic_class_context. GRPC_CONTEXT_TRAFFIC, + /// Value is a \a grpc_grpclb_client_stats. + GRPC_GRPCLB_CLIENT_STATS, + GRPC_CONTEXT_COUNT } grpc_context_index; diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index b9c0fe6d0d..c9f7c42b71 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -179,6 +179,9 @@ namespace Grpc.IntegrationTesting statsResetCount.Increment(); } + GrpcEnvironment.Logger.Info("[ClientRunnerImpl.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, gen3 {3} (histogram reset count:{4}, seconds since reset: {5})", + GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), GC.CollectionCount(3), statsResetCount.Count, secondsElapsed); + // TODO: populate user time and system time return new ClientStats { diff --git a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs index b17b2c2183..486befe964 100644 --- a/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs +++ b/src/csharp/Grpc.IntegrationTesting/QpsWorker.cs @@ -95,10 +95,13 @@ namespace Grpc.IntegrationTesting Ports = { new ServerPort(host, options.DriverPort, ServerCredentials.Insecure )} }; int boundPort = server.Ports.Single().BoundPort; - Console.WriteLine("Running qps worker server on " + string.Format("{0}:{1}", host, boundPort)); + GrpcEnvironment.Logger.Info("Running qps worker server on {0}:{1}", host, boundPort); server.Start(); await tcs.Task; await server.ShutdownAsync(); + + GrpcEnvironment.Logger.Info("GC collection counts (after shutdown): gen0 {0}, gen1 {1}, gen2 {2}, gen3 {3}", + GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), GC.CollectionCount(3)); } } } diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index 8689d188ae..7ab7734700 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -154,6 +154,9 @@ namespace Grpc.IntegrationTesting { var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; + GrpcEnvironment.Logger.Info("[ServerRunner.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, gen3 {3} (seconds since last reset {4})", + GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), GC.CollectionCount(3), secondsElapsed); + // TODO: populate user time and system time return new ServerStats { diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d2a570cc87..42c13b2cb6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -280,8 +280,10 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 0bd41d153b..e867493fb9 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -198,6 +198,7 @@ cc_test( ], ) + cc_test( name = "grpclb_end2end_test", srcs = ["grpclb_end2end_test.cc"], diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 8ebeba3522..30e1a1e0c9 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -147,12 +147,38 @@ grpc::string Ip4ToPackedString(const char* ip_str) { return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4)); } +struct ClientStats { + size_t num_calls_started = 0; + size_t num_calls_finished = 0; + size_t num_calls_finished_with_drop_for_rate_limiting = 0; + size_t num_calls_finished_with_drop_for_load_balancing = 0; + size_t num_calls_finished_with_client_failed_to_send = 0; + size_t num_calls_finished_known_received = 0; + + ClientStats& operator+=(const ClientStats& other) { + num_calls_started += other.num_calls_started; + num_calls_finished += other.num_calls_finished; + num_calls_finished_with_drop_for_rate_limiting += + other.num_calls_finished_with_drop_for_rate_limiting; + num_calls_finished_with_drop_for_load_balancing += + other.num_calls_finished_with_drop_for_load_balancing; + num_calls_finished_with_client_failed_to_send += + other.num_calls_finished_with_client_failed_to_send; + num_calls_finished_known_received += + other.num_calls_finished_known_received; + return *this; + } +}; + class BalancerServiceImpl : public BalancerService { public: using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>; using ResponseDelayPair = std::pair<LoadBalanceResponse, int>; - BalancerServiceImpl() : shutdown_(false) {} + explicit BalancerServiceImpl(int client_load_reporting_interval_seconds) + : client_load_reporting_interval_seconds_( + client_load_reporting_interval_seconds), + shutdown_(false) {} Status BalanceLoad(ServerContext* context, Stream* stream) override { LoadBalanceRequest request; @@ -160,16 +186,49 @@ class BalancerServiceImpl : public BalancerService { IncreaseRequestCount(); gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str()); + if (client_load_reporting_interval_seconds_ > 0) { + LoadBalanceResponse initial_response; + initial_response.mutable_initial_response() + ->mutable_client_stats_report_interval() + ->set_seconds(client_load_reporting_interval_seconds_); + stream->Write(initial_response); + } + std::vector<ResponseDelayPair> responses_and_delays; { std::unique_lock<std::mutex> lock(mu_); responses_and_delays = responses_and_delays_; } - for (const auto& response_and_delay : responses_and_delays) { if (shutdown_) break; SendResponse(stream, response_and_delay.first, response_and_delay.second); } + + if (client_load_reporting_interval_seconds_ > 0) { + request.Clear(); + stream->Read(&request); + gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'", + request.DebugString().c_str()); + GPR_ASSERT(request.has_client_stats()); + client_stats_.num_calls_started += + request.client_stats().num_calls_started(); + client_stats_.num_calls_finished += + request.client_stats().num_calls_finished(); + client_stats_.num_calls_finished_with_drop_for_rate_limiting += + request.client_stats() + .num_calls_finished_with_drop_for_rate_limiting(); + client_stats_.num_calls_finished_with_drop_for_load_balancing += + request.client_stats() + .num_calls_finished_with_drop_for_load_balancing(); + client_stats_.num_calls_finished_with_client_failed_to_send += + request.client_stats() + .num_calls_finished_with_client_failed_to_send(); + client_stats_.num_calls_finished_known_received += + request.client_stats().num_calls_finished_known_received(); + std::lock_guard<std::mutex> lock(mu_); + cond_.notify_one(); + } + return Status::OK; } @@ -194,6 +253,12 @@ class BalancerServiceImpl : public BalancerService { return response; } + const ClientStats& WaitForLoadReport() { + std::unique_lock<std::mutex> lock(mu_); + cond_.wait(lock); + return client_stats_; + } + private: void SendResponse(Stream* stream, const LoadBalanceResponse& response, int delay_ms) { @@ -206,16 +271,23 @@ class BalancerServiceImpl : public BalancerService { IncreaseResponseCount(); } + const int client_load_reporting_interval_seconds_; std::vector<ResponseDelayPair> responses_and_delays_; + std::mutex mu_; + std::condition_variable cond_; + ClientStats client_stats_; bool shutdown_; }; class GrpclbEnd2endTest : public ::testing::Test { protected: - GrpclbEnd2endTest(int num_backends, int num_balancers) + GrpclbEnd2endTest(int num_backends, int num_balancers, + int client_load_reporting_interval_seconds) : server_host_("localhost"), num_backends_(num_backends), - num_balancers_(num_balancers) {} + num_balancers_(num_balancers), + client_load_reporting_interval_seconds_( + client_load_reporting_interval_seconds) {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -227,7 +299,8 @@ class GrpclbEnd2endTest : public ::testing::Test { } // Start the load balancers. for (size_t i = 0; i < num_balancers_; ++i) { - balancers_.emplace_back(new BalancerServiceImpl()); + balancers_.emplace_back( + new BalancerServiceImpl(client_load_reporting_interval_seconds_)); balancer_servers_.emplace_back(ServerThread<BalancerService>( "balancer", server_host_, balancers_.back().get())); } @@ -261,6 +334,14 @@ class GrpclbEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } + ClientStats WaitForLoadReports() { + ClientStats client_stats; + for (const auto& balancer : balancers_) { + client_stats += balancer->WaitForLoadReport(); + } + return client_stats; + } + struct AddressData { int port; bool is_balancer; @@ -367,6 +448,7 @@ class GrpclbEnd2endTest : public ::testing::Test { const grpc::string server_host_; const size_t num_backends_; const size_t num_balancers_; + const int client_load_reporting_interval_seconds_; std::shared_ptr<Channel> channel_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; @@ -381,7 +463,7 @@ class GrpclbEnd2endTest : public ::testing::Test { class SingleBalancerTest : public GrpclbEnd2endTest { public: - SingleBalancerTest() : GrpclbEnd2endTest(4, 1) {} + SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {} }; TEST_F(SingleBalancerTest, Vanilla) { @@ -505,6 +587,41 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { + public: + SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {} +}; + +TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); + // Start servers and send 100 RPCs per server. + const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_); + + for (const auto& status_and_response : statuses_and_responses) { + EXPECT_TRUE(status_and_response.first.ok()); + EXPECT_EQ(status_and_response.second.message(), kMessage_); + } + + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(100, backend_servers_[i].service_->request_count()); + } + // The balancer got a single request. + EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1, balancer_servers_[0].service_->response_count()); + + const ClientStats client_stats = WaitForLoadReports(); + EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started); + EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished); + EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting); + EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing); + EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); + EXPECT_EQ(100 * num_backends_, + client_stats.num_calls_finished_known_received); +} + } // namespace } // namespace testing } // namespace grpc diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index f49c2de76c..51d77c2b52 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -910,10 +910,14 @@ src/core/ext/filters/client_channel/http_proxy.c \ src/core/ext/filters/client_channel/http_proxy.h \ src/core/ext/filters/client_channel/lb_policy.c \ src/core/ext/filters/client_channel/lb_policy.h \ +src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c \ +src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c \ +src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c \ +src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h \ src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ diff --git a/tools/internal_ci/linux/grpc_master.sh b/tools/internal_ci/linux/grpc_master.sh index 9ecf123959..ec4bb2bda1 100755 --- a/tools/internal_ci/linux/grpc_master.sh +++ b/tools/internal_ci/linux/grpc_master.sh @@ -47,12 +47,4 @@ git submodule update --init # download docker images from dockerhub export DOCKERHUB_ORGANIZATION=grpctesting -tools/run_tests/run_tests.py -l c -t -x sponge_log.xml || FAILED="true" - -# kill port_server.py to prevent the build from hanging -ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9 - -if [ "$FAILED" != "" ] -then - exit 1 -fi +tools/run_tests/run_tests_matrix.py -f basictests linux --internal_ci diff --git a/tools/internal_ci/macos/grpc_master.cfg b/tools/internal_ci/macos/grpc_master.cfg new file mode 100644 index 0000000000..039c99ec42 --- /dev/null +++ b/tools/internal_ci/macos/grpc_master.cfg @@ -0,0 +1,39 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Config file for the internal CI (in protobuf text format) + +# Location of the continuous shell script in repository. +build_file: "grpc/tools/internal_ci/macos/grpc_master.sh" +timeout_mins: 240 +action { + define_artifacts { + regex: "**/*sponge_log.xml" + } +} diff --git a/tools/internal_ci/macos/grpc_master.sh b/tools/internal_ci/macos/grpc_master.sh new file mode 100755 index 0000000000..4ce1af73a5 --- /dev/null +++ b/tools/internal_ci/macos/grpc_master.sh @@ -0,0 +1,46 @@ +#!/bin/bash +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +set -ex + +# change to grpc repo root +cd $(dirname $0)/../../.. + +git submodule update --init + +tools/run_tests/run_tests_matrix.py -f basictests macos --internal_ci || FAILED="true" + +# kill port_server.py to prevent the build from hanging +ps aux | grep port_server\\.py | awk '{print $2}' | xargs kill -9 + +if [ "$FAILED" != "" ] +then + exit 1 +fi diff --git a/tools/internal_ci/windows/grpc_master.bat b/tools/internal_ci/windows/grpc_master.bat index 8943390a8d..b6f3c8790f 100644 --- a/tools/internal_ci/windows/grpc_master.bat +++ b/tools/internal_ci/windows/grpc_master.bat @@ -36,7 +36,7 @@ cd /d %~dp0\..\..\.. git submodule update --init -python tools/run_tests/run_tests_matrix.py -f basictests windows -j 1 --inner_jobs 8 || goto :error +python tools/run_tests/run_tests_matrix.py -f basictests windows -j 1 --inner_jobs 8 --internal_ci || goto :error goto :EOF :error diff --git a/tools/internal_ci/windows/grpc_portability_master.bat b/tools/internal_ci/windows/grpc_portability_master.bat index b98c70146c..789808664b 100644 --- a/tools/internal_ci/windows/grpc_portability_master.bat +++ b/tools/internal_ci/windows/grpc_portability_master.bat @@ -36,7 +36,7 @@ cd /d %~dp0\..\..\.. git submodule update --init -python tools/run_tests/run_tests_matrix.py -f portability windows -j 1 --inner_jobs 8 || goto :error +python tools/run_tests/run_tests_matrix.py -f portability windows -j 1 --inner_jobs 8 --internal_ci || goto :error goto :EOF :error diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index adaf5481b2..c962823437 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -8234,8 +8234,10 @@ "nanopb" ], "headers": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" ], @@ -8243,10 +8245,14 @@ "language": "c", "name": "grpc_lb_policy_grpclb", "src": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", @@ -8264,8 +8270,10 @@ "nanopb" ], "headers": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" ], @@ -8273,10 +8281,14 @@ "language": "c", "name": "grpc_lb_policy_grpclb_secure", "src": [ + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c", + "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", diff --git a/tools/run_tests/helper_scripts/pre_build_cmake.bat b/tools/run_tests/helper_scripts/pre_build_cmake.bat index c937b9e09f..c721e1624d 100644 --- a/tools/run_tests/helper_scripts/pre_build_cmake.bat +++ b/tools/run_tests/helper_scripts/pre_build_cmake.bat @@ -37,7 +37,10 @@ mkdir build cd build @rem TODO(jtattermusch): Stop hardcoding path to yasm once Jenkins workers can locate yasm correctly -cmake -G "Visual Studio 14 2015" -DgRPC_BUILD_TESTS=ON -DCMAKE_ASM_NASM_COMPILER="C:/Program Files (x86)/yasm/yasm.exe" ../.. || goto :error +@rem If yasm is not on the path, use hardcoded path instead. +yasm --version || set USE_HARDCODED_YASM_PATH_MAYBE=-DCMAKE_ASM_NASM_COMPILER="C:/Program Files (x86)/yasm/yasm.exe" + +cmake -G "Visual Studio 14 2015" -DgRPC_BUILD_TESTS=ON %USE_HARDCODED_YASM_PATH_MAYBE% ../.. || goto :error endlocal diff --git a/tools/run_tests/helper_scripts/pre_build_csharp.bat b/tools/run_tests/helper_scripts/pre_build_csharp.bat index e59dac4edc..47ebd8bee3 100644 --- a/tools/run_tests/helper_scripts/pre_build_csharp.bat +++ b/tools/run_tests/helper_scripts/pre_build_csharp.bat @@ -42,8 +42,12 @@ mkdir build cd build mkdir %ARCHITECTURE% cd %ARCHITECTURE% + @rem TODO(jtattermusch): Stop hardcoding path to yasm once Jenkins workers can locate yasm correctly -cmake -G "Visual Studio 14 2015" -A %ARCHITECTURE% -DgRPC_BUILD_TESTS=OFF -DgRPC_MSVC_STATIC_RUNTIME=ON -DCMAKE_ASM_NASM_COMPILER="C:/Program Files (x86)/yasm/yasm.exe" ../../.. || goto :error +@rem If yasm is not on the path, use hardcoded path instead. +yasm --version || set USE_HARDCODED_YASM_PATH_MAYBE=-DCMAKE_ASM_NASM_COMPILER="C:/Program Files (x86)/yasm/yasm.exe" + +cmake -G "Visual Studio 14 2015" -A %ARCHITECTURE% -DgRPC_BUILD_TESTS=OFF -DgRPC_MSVC_STATIC_RUNTIME=ON %USE_HARDCODED_YASM_PATH_MAYBE% ../../.. || goto :error cd ..\..\..\src\csharp diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index ce0808829f..08411bf9a4 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -410,6 +410,21 @@ class CSharpLanguage: categories=[SMOKETEST, SCALABLE]) yield _ping_pong_scenario( + 'csharp_generic_async_streaming_ping_pong_insecure_1MB', rpc_type='STREAMING', + client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', + req_size=1024*1024, resp_size=1024*1024, + use_generic_payload=True, + secure=False, + categories=[SMOKETEST, SCALABLE]) + + yield _ping_pong_scenario( + 'csharp_generic_async_streaming_qps_unconstrained_insecure', rpc_type='STREAMING', + client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', + unconstrained_client='async', use_generic_payload=True, + secure=False, + categories=[SMOKETEST, SCALABLE]) + + yield _ping_pong_scenario( 'csharp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') @@ -469,7 +484,6 @@ class CSharpLanguage: req_size=1024*1024, resp_size=1024*1024, categories=[SMOKETEST, SCALABLE]) - def __str__(self): return 'csharp' diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 71520098a6..86b5856d68 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -475,8 +475,10 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\uri_parser.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\deadline\deadline_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\chttp2_connector.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.h" /> <ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb.h" /> @@ -915,10 +917,14 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\insecure\channel_create_posix.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel_secure.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.c"> diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index de2dfe67e6..943ad521f7 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -613,12 +613,18 @@ <ClCompile Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\insecure\channel_create_posix.c"> <Filter>src\core\ext\transport\chttp2\client\insecure</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel_secure.c"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClCompile> @@ -1334,12 +1340,18 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\transport\chttp2\client\chttp2_connector.h"> <Filter>src\core\ext\transport\chttp2\client</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClInclude> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 0bfda72e81..2db0ffa692 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -444,8 +444,10 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\resolver\dns\c_ares\grpc_ares_wrapper.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h" /> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h" /> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.h" /> <ClInclude Include="$(SolutionDir)\..\third_party\nanopb\pb.h" /> @@ -836,10 +838,14 @@ </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.c"> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c"> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c"> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\proto\grpc\lb\v1\load_balancer.pb.c"> diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 63c8d7f254..c7d6670db1 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -547,12 +547,18 @@ <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.c"> <Filter>src\core\ext\filters\load_reporting</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.c"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.c"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.c"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClCompile> + <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.c"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClCompile> <ClCompile Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.c"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClCompile> @@ -1181,12 +1187,18 @@ <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\load_reporting\load_reporting_filter.h"> <Filter>src\core\ext\filters\load_reporting</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\client_load_reporting_filter.h"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb.h"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_channel.h"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClInclude> + <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\grpclb_client_stats.h"> + <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> + </ClInclude> <ClInclude Include="$(SolutionDir)\..\src\core\ext\filters\client_channel\lb_policy\grpclb\load_balancer_api.h"> <Filter>src\core\ext\filters\client_channel\lb_policy\grpclb</Filter> </ClInclude> |