diff options
author | Mark D. Roth <roth@google.com> | 2017-05-02 08:13:26 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2017-05-02 08:13:26 -0700 |
commit | 09e458c6cdc6967bae58739131acd208f3738a27 (patch) | |
tree | 9366b2255a6088f5869d4907101695057e8a4bd6 /src | |
parent | d8696e10238a7e35e25f363e16f921c6c642069a (diff) |
Implement client-side load reporting for grpclb.
Diffstat (limited to 'src')
16 files changed, 794 insertions, 73 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 0463b25412..95578d989c 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -795,6 +795,7 @@ typedef struct client_channel_call_data { subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; grpc_transport_stream_op_batch **waiting_ops; @@ -944,7 +945,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -973,6 +975,7 @@ typedef struct { grpc_metadata_batch *initial_metadata; uint32_t initial_metadata_flags; grpc_connected_subchannel **connected_subchannel; + grpc_call_context_element *subchannel_call_context; grpc_closure *on_ready; grpc_call_element *elem; grpc_closure closure; @@ -984,7 +987,8 @@ typedef struct { static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready, grpc_error *error); static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -995,10 +999,10 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready, - GRPC_ERROR_NONE)) { + if (pick_subchannel_locked( + exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, cpa->connected_subchannel, + cpa->subchannel_call_context, cpa->on_ready, GRPC_ERROR_NONE)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } } @@ -1008,7 +1012,8 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready, grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); @@ -1076,8 +1081,8 @@ static bool pick_subchannel_locked( GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping"); w_on_pick_arg->lb_policy = lb_policy; const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, - &w_on_pick_arg->wrapper_closure); + exec_ctx, lb_policy, &inputs, connected_subchannel, + subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy, @@ -1100,6 +1105,7 @@ static bool pick_subchannel_locked( cpa->initial_metadata = initial_metadata; cpa->initial_metadata_flags = initial_metadata_flags; cpa->connected_subchannel = connected_subchannel; + cpa->subchannel_call_context = subchannel_call_context; cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, @@ -1158,7 +1164,7 @@ static void start_transport_stream_op_batch_locked_inner( break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: pick_subchannel_locked( - exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, + exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, NULL, GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); break; } @@ -1183,7 +1189,8 @@ static void start_transport_stream_op_batch_locked_inner( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata, op->payload->send_initial_metadata.send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) { + &calld->connected_subchannel, calld->subchannel_call_context, + &calld->next_step, GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -1200,7 +1207,8 @@ static void start_transport_stream_op_batch_locked_inner( .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -1355,6 +1363,12 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, "picked"); } + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { + if (calld->subchannel_call_context[i].value != NULL) { + calld->subchannel_call_context[i].destroy( + calld->subchannel_call_context[i].value); + } + } gpr_free(calld->waiting_ops); grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 2d31499d13..112ba40658 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -119,9 +119,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target, - user_data, on_complete); + context, user_data, on_complete); } void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 25427666ae..fefcb4912c 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -43,9 +43,6 @@ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; -typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, - grpc_status_code status, const char *errmsg); - struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; @@ -76,7 +73,8 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_pick */ int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** \see grpc_lb_policy_cancel_pick */ @@ -156,6 +154,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, \a target will be set to the selected subchannel, or NULL on failure. Upon success, \a user_data will be set to whatever opaque information may need to be propagated from the LB policy, or NULL if not needed. + \a context will be populated with context to pass to the subchannel + call, if needed. If the pick succeeds and a result is known immediately, a non-zero value will be returned. Otherwise, \a on_complete will be invoked @@ -167,6 +167,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c new file mode 100644 index 0000000000..67baa46de7 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -0,0 +1,153 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" + +#include <grpc/support/atm.h> +#include <grpc/support/log.h> + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" + +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + return GRPC_ERROR_NONE; +} + +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) {} + +typedef struct { + // Stats object to update. + grpc_grpclb_client_stats *client_stats; + // State for intercepting send_initial_metadata. + grpc_closure on_complete_for_send; + grpc_closure *original_on_complete_for_send; + bool send_initial_metadata_succeeded; + // State for intercepting recv_initial_metadata. + grpc_closure recv_initial_metadata_ready; + grpc_closure *original_recv_initial_metadata_ready; + bool recv_initial_metadata_succeeded; +} call_data; + +static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->send_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_on_complete_for_send, + GRPC_ERROR_REF(error)); +} + +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->recv_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + call_data *calld = elem->call_data; + // Get stats object from context and take a ref. + GPR_ASSERT(args->context != NULL); + GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); + calld->client_stats = grpc_grpclb_client_stats_ref( + args->context[GRPC_GRPCLB_CLIENT_STATS].value); + // Record call started. + grpc_grpclb_client_stats_add_call_started(calld->client_stats); + return GRPC_ERROR_NONE; +} + +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_final_info *final_info, + grpc_closure *ignored) { + call_data *calld = elem->call_data; + // Record call finished, optionally setting client_failed_to_send and + // received. + grpc_grpclb_client_stats_add_call_finished( + false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */, + !calld->send_initial_metadata_succeeded /* client_failed_to_send */, + calld->recv_initial_metadata_succeeded /* known_received */, + calld->client_stats); + // All done, so unref the stats object. + grpc_grpclb_client_stats_unref(calld->client_stats); +} + +static void start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); + // Intercept send_initial_metadata. + if (batch->send_initial_metadata) { + calld->original_on_complete_for_send = batch->on_complete; + grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld, + grpc_schedule_on_exec_ctx); + batch->on_complete = &calld->on_complete_for_send; + } + // Intercept recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, calld, + grpc_schedule_on_exec_ctx); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + } + // Chain to next filter. + grpc_call_next_op(exec_ctx, elem, batch); + GPR_TIMER_END("clr_start_transport_stream_op_batch", 0); +} + +const grpc_channel_filter grpc_client_load_reporting_filter = { + start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, // sizeof(channel_data) + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "client_load_reporting"}; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h new file mode 100644 index 0000000000..28b313d874 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_client_load_reporting_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 9e158a94ad..37468101f0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -108,13 +108,16 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -126,6 +129,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" #define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 @@ -147,6 +151,10 @@ static grpc_error *initial_metadata_add_lb_token( lb_token_mdelem_storage, lb_token); } +static void destroy_client_stats(void *arg) { + grpc_grpclb_client_stats_unref(arg); +} + typedef struct wrapped_rr_closure_arg { /* the closure instance using this struct as argument */ grpc_closure wrapper_closure; @@ -163,6 +171,13 @@ typedef struct wrapped_rr_closure_arg { * initial metadata */ grpc_connected_subchannel **target; + /* the context to be populated for the subchannel call */ + grpc_call_context_element *context; + + /* Stats for client-side load reporting. Note that this holds a + * reference, which must be either passed on via context or unreffed. */ + grpc_grpclb_client_stats *client_stats; + /* the LB token associated with the pick */ grpc_mdelem lb_token; @@ -202,6 +217,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, (void *)*wc_arg->target, (void *)wc_arg->rr_policy); abort(); } + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + } else { + grpc_grpclb_client_stats_unref(wc_arg->client_stats); } if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy); @@ -237,6 +258,7 @@ typedef struct pending_pick { static void add_pending_pick(pending_pick **root, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, grpc_closure *on_complete) { pending_pick *pp = gpr_zalloc(sizeof(*pp)); pp->next = *root; @@ -244,6 +266,7 @@ static void add_pending_pick(pending_pick **root, pp->target = target; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.context = context; pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; @@ -316,6 +339,10 @@ typedef struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ + + /* Finished sending initial request. */ + grpc_closure lb_on_sent_initial_request; + /* Status from the LB server has been received. This signals the end of the LB * call. */ grpc_closure lb_on_server_status_received; @@ -348,6 +375,23 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; + + bool initial_request_sent; + bool seen_initial_response; + + /* Stats for client-side load reporting. Should be unreffed and + * recreated whenever lb_call is replaced. */ + grpc_grpclb_client_stats *client_stats; + /* Interval and timer for next client load report. */ + gpr_timespec client_stats_report_interval; + grpc_timer client_load_report_timer; + bool client_load_report_timer_pending; + bool last_client_load_report_counters_were_zero; + /* Closure used for either the load report timer or the callback for + * completion of sending the load report. */ + grpc_closure client_load_report_closure; + /* Client load report message payload. */ + grpc_byte_buffer *client_load_report_payload; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -552,8 +596,8 @@ static bool pick_from_internal_rr_locked( grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { GPR_ASSERT(rr_policy != NULL); const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token, - &wc_arg->wrapper_closure); + exec_ctx, rr_policy, pick_args, target, wc_arg->context, + (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { @@ -567,7 +611,12 @@ static bool pick_from_internal_rr_locked( pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - gpr_free(wc_arg); + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + + gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the * pending pick list inside the RR policy (glb_policy->rr_policy). @@ -690,6 +739,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_picks = pp->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; + pp->wrapped_on_complete_arg.client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); @@ -864,9 +915,18 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_uri_destroy(uri); glb_policy->cc_factory = args->client_channel_factory; - glb_policy->args = grpc_channel_args_copy(args->args); GPR_ASSERT(glb_policy->cc_factory != NULL); + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + grpc_arg new_arg; + new_arg.key = GRPC_ARG_LB_POLICY_NAME; + new_arg.type = GRPC_ARG_STRING; + new_arg.value.string = "grpclb"; + static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + glb_policy->args = grpc_channel_args_copy_and_add_and_remove( + args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + grpc_slice_hash_table *targets_info = NULL; /* Create a client channel over them to communicate with a LB service */ char *lb_service_target_addresses = @@ -880,6 +940,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, lb_channel_args); gpr_free(lb_service_target_addresses); if (glb_policy->lb_channel == NULL) { + gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); gpr_free(glb_policy); return NULL; } @@ -895,6 +957,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); grpc_channel_args_destroy(exec_ctx, glb_policy->args); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -1011,7 +1076,8 @@ static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; @@ -1039,6 +1105,10 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_schedule_on_exec_ctx); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; + wc_arg->context = context; + GPR_ASSERT(glb_policy->client_stats != NULL); + wc_arg->client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); wc_arg->wrapped_closure = on_complete; wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; @@ -1052,7 +1122,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, "picks", (void *)(glb_policy)); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, + add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, on_complete); if (!glb_policy->started_picking) { @@ -1093,6 +1163,104 @@ static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, exec_ctx, &glb_policy->state_tracker, current, notify); } +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + +static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec next_client_load_report_time = + gpr_time_add(now, glb_policy->client_stats_report_interval); + grpc_closure_init(&glb_policy->client_load_report_closure, + send_client_load_report_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, + next_client_load_report_time, + &glb_policy->client_load_report_closure, now); +} + +static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); + glb_policy->client_load_report_payload = NULL; + if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + schedule_next_client_load_report(exec_ctx, glb_policy); +} + +static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = glb_policy->client_load_report_payload; + grpc_closure_init(&glb_policy->client_load_report_closure, + client_load_report_done_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, &op, 1, + &glb_policy->client_load_report_closure); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static bool load_report_counters_are_zero(grpc_grpclb_request *request) { + return request->client_stats.num_calls_started == 0 && + request->client_stats.num_calls_finished == 0 && + request->client_stats.num_calls_finished_with_drop_for_rate_limiting == + 0 && + request->client_stats + .num_calls_finished_with_drop_for_load_balancing == 0 && + request->client_stats.num_calls_finished_with_client_failed_to_send == + 0 && + request->client_stats.num_calls_finished_known_received == 0; +} + +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + // Construct message payload. + GPR_ASSERT(glb_policy->client_load_report_payload == NULL); + grpc_grpclb_request *request = + grpc_grpclb_load_report_request_create(glb_policy->client_stats); + // Skip client load report if the counters were all zero in the last + // report and they are still zero in this one. + if (load_report_counters_are_zero(request)) { + if (glb_policy->last_client_load_report_counters_were_zero) { + grpc_grpclb_request_destroy(request); + schedule_next_client_load_report(exec_ctx, glb_policy); + return; + } + glb_policy->last_client_load_report_counters_were_zero = true; + } else { + glb_policy->last_client_load_report_counters_were_zero = false; + } + grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); + glb_policy->client_load_report_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_grpclb_request_destroy(request); + // If we've already sent the initial request, then we can go ahead and + // sent the load report. Otherwise, we need to wait until the initial + // request has been sent to send this + // (see lb_on_sent_initial_request_locked() below). + if (glb_policy->initial_request_sent) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } +} + +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1114,6 +1282,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, &host, glb_policy->deadline, NULL); grpc_slice_unref_internal(exec_ctx, host); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } + glb_policy->client_stats = grpc_grpclb_client_stats_create(); + grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); @@ -1125,6 +1298,9 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_slice_unref_internal(exec_ctx, request_payload_slice); grpc_grpclb_request_destroy(request); + grpc_closure_init(&glb_policy->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_closure_init(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); @@ -1138,6 +1314,10 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, GRPC_GRPCLB_RECONNECT_JITTER, GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + + glb_policy->initial_request_sent = false; + glb_policy->seen_initial_response = false; + glb_policy->last_client_load_report_counters_were_zero = false; } static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -1151,6 +1331,10 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_byte_buffer_destroy(glb_policy->lb_request_payload); grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details); + + if (!glb_policy->client_load_report_timer_pending) { + grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer); + } } /* @@ -1179,21 +1363,27 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_sent_initial_request); + GPR_ASSERT(GRPC_CALL_OK == call_error); + op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &glb_policy->lb_trailing_metadata_recv; @@ -1225,6 +1415,19 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(GRPC_CALL_OK == call_error); } +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + glb_lb_policy *glb_policy = arg; + glb_policy->initial_request_sent = true; + // If we attempted to send a client load report before the initial + // request was sent, send the load report now. + if (glb_policy->client_load_report_payload != NULL) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_locked"); +} + static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; @@ -1240,58 +1443,91 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_destroy(glb_policy->lb_response_payload); - grpc_grpclb_serverlist *serverlist = - grpc_grpclb_response_parse_serverlist(response_slice); - if (serverlist != NULL) { - GPR_ASSERT(glb_policy->lb_call != NULL); - grpc_slice_unref_internal(exec_ctx, response_slice); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Serverlist with %lu servers received", - (unsigned long)serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_resolved_address addr; - parse_server(serverlist->servers[i], &addr); - char *ipport; - grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); - gpr_free(ipport); + + grpc_grpclb_initial_response *response = NULL; + if (!glb_policy->seen_initial_response && + (response = grpc_grpclb_initial_response_parse(response_slice)) != + NULL) { + if (response->has_client_stats_report_interval) { + glb_policy->client_stats_report_interval = + gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN), + grpc_grpclb_duration_to_timespec( + &response->client_stats_report_interval)); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting interval = %" PRId64 ".%09d sec", + glb_policy->client_stats_report_interval.tv_sec, + glb_policy->client_stats_report_interval.tv_nsec); } + /* take a weak ref (won't prevent calling of \a glb_shutdown() if the + * strong ref count goes to zero) to be unref'd in + * send_client_load_report() */ + glb_policy->client_load_report_timer_pending = true; + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); + schedule_next_client_load_report(exec_ctx, glb_policy); + } else if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting NOT enabled"); } + grpc_grpclb_initial_response_destroy(response); + glb_policy->seen_initial_response = true; + } else { + grpc_grpclb_serverlist *serverlist = + grpc_grpclb_response_parse_serverlist(response_slice); + if (serverlist != NULL) { + GPR_ASSERT(glb_policy->lb_call != NULL); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Serverlist with %lu servers received", + (unsigned long)serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char *ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_free(ipport); + } + } - /* update serverlist */ - if (serverlist->num_servers > 0) { - if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { + /* update serverlist */ + if (serverlist->num_servers > 0) { + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, + serverlist)) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Incoming server list identical to current, ignoring."); + } + grpc_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (glb_policy->serverlist != NULL) { + /* dispose of the old serverlist */ + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); + } + /* and update the copy in the glb_lb_policy instance. This + * serverlist instance will be destroyed either upon the next + * update or in glb_destroy() */ + glb_policy->serverlist = serverlist; + + rr_handover_locked(exec_ctx, glb_policy); + } + } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, - "Incoming server list identical to current, ignoring."); + "Received empty server list. Picks will stay pending until " + "a response with > 0 servers is received"); } grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (glb_policy->serverlist != NULL) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } - /* and update the copy in the glb_lb_policy instance. This serverlist - * instance will be destroyed either upon the next update or in - * glb_destroy() */ - glb_policy->serverlist = serverlist; - - rr_handover_locked(exec_ctx, glb_policy); } - } else { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "Received empty server list. Picks will stay pending until a " - "response with > 0 servers is received"); - } - grpc_grpclb_destroy_serverlist(serverlist); + } else { /* serverlist == NULL */ + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } - } else { /* serverlist == NULL */ - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); - grpc_slice_unref_internal(exec_ctx, response_slice); } + grpc_slice_unref_internal(exec_ctx, response_slice); + if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1403,9 +1639,29 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() { } /* Plugin registration */ + +// Only add client_load_reporting filter if the grpclb LB policy is used. +static bool maybe_add_client_load_reporting_filter( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { + const grpc_channel_args *args = + grpc_channel_stack_builder_get_channel_arguments(builder); + const grpc_arg *channel_arg = + grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); + if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING && + strcmp(channel_arg->value.string, "grpclb") == 0) { + return grpc_channel_stack_builder_append_filter( + builder, (const grpc_channel_filter *)arg, NULL, NULL); + } + return true; +} + void grpc_lb_policy_grpclb_init() { grpc_register_lb_policy(grpc_glb_lb_factory_create()); grpc_register_tracer("glb", &grpc_lb_glb_trace); + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_client_load_reporting_filter, + (void *)&grpc_client_load_reporting_filter); } void grpc_lb_policy_grpclb_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c new file mode 100644 index 0000000000..444c03b9aa --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -0,0 +1,133 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/channel/channel_args.h" + +#define GRPC_ARG_GRPCLB_CLIENT_STATS "grpc.grpclb_client_stats" + +struct grpc_grpclb_client_stats { + gpr_refcount refs; + gpr_atm num_calls_started; + gpr_atm num_calls_finished; + gpr_atm num_calls_finished_with_drop_for_rate_limiting; + gpr_atm num_calls_finished_with_drop_for_load_balancing; + gpr_atm num_calls_finished_with_client_failed_to_send; + gpr_atm num_calls_finished_known_received; +}; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create() { + grpc_grpclb_client_stats* client_stats = gpr_zalloc(sizeof(*client_stats)); + gpr_ref_init(&client_stats->refs, 1); + return client_stats; +} + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats) { + gpr_ref(&client_stats->refs); + return client_stats; +} + +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) { + if (gpr_unref(&client_stats->refs)) { + gpr_free(client_stats); + } +} + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1); +} + +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); + if (finished_with_drop_for_rate_limiting) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_rate_limiting, + (gpr_atm)1); + } + if (finished_with_drop_for_load_balancing) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_load_balancing, + (gpr_atm)1); + } + if (finished_with_client_failed_to_send) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_client_failed_to_send, + (gpr_atm)1); + } + if (finished_known_received) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished_known_received, + (gpr_atm)1); + } +} + +static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) { + *value = (int64_t)gpr_atm_acq_load(counter); + gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value)); +} + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received) { + atomic_get_and_reset_counter(num_calls_started, + &client_stats->num_calls_started); + atomic_get_and_reset_counter(num_calls_finished, + &client_stats->num_calls_finished); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_rate_limiting, + &client_stats->num_calls_finished_with_drop_for_rate_limiting); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_load_balancing, + &client_stats->num_calls_finished_with_drop_for_load_balancing); + atomic_get_and_reset_counter( + num_calls_finished_with_client_failed_to_send, + &client_stats->num_calls_finished_with_client_failed_to_send); + atomic_get_and_reset_counter( + num_calls_finished_known_received, + &client_stats->num_calls_finished_known_received); +} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h new file mode 100644 index 0000000000..0af4a919f8 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H + +#include <stdbool.h> + +#include <grpc/impl/codegen/grpc_types.h> + +typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create(); +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 87549b78f0..81b6932fae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -80,15 +80,45 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request)); - - req->has_client_stats = 0; /* TODO(dgq): add support for stats once defined */ - req->has_initial_request = 1; - req->initial_request.has_name = 1; + req->has_client_stats = false; + req->has_initial_request = true; + req->initial_request.has_name = true; strncpy(req->initial_request.name, lb_service_name, GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); return req; } +static void populate_timestamp(gpr_timespec timestamp, + struct _grpc_lb_v1_Timestamp *timestamp_pb) { + timestamp_pb->has_seconds = true; + timestamp_pb->seconds = timestamp.tv_sec; + timestamp_pb->has_nanos = true; + timestamp_pb->nanos = timestamp.tv_nsec; +} + +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats) { + grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); + req->has_client_stats = true; + req->client_stats.has_timestamp = true; + populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); + req->client_stats.has_num_calls_started = true; + req->client_stats.has_num_calls_finished = true; + req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; + req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_known_received = true; + grpc_grpclb_client_stats_get( + client_stats, &req->client_stats.num_calls_started, + &req->client_stats.num_calls_finished, + &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, + &req->client_stats.num_calls_finished_with_drop_for_load_balancing, + &req->client_stats.num_calls_finished_with_client_failed_to_send, + &req->client_stats.num_calls_finished_known_received); + return req; +} + grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { size_t encoded_length; pb_ostream_t sizestream; @@ -122,6 +152,9 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } + + if (!res.has_initial_response) return NULL; + grpc_grpclb_initial_response *initial_res = gpr_malloc(sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, @@ -243,6 +276,15 @@ int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, return 0; } +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb) { + gpr_timespec duration; + duration.tv_sec = duration_pb->has_seconds ? duration_pb->seconds : 0; + duration.tv_nsec = duration_pb->has_nanos ? duration_pb->nanos : 0; + duration.clock_type = GPR_TIMESPAN; + return duration; +} + void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response) { gpr_free(response); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index d014b8800c..06873821bd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -36,6 +36,7 @@ #include <grpc/slice_buffer.h> +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" @@ -58,6 +59,8 @@ typedef struct grpc_grpclb_serverlist { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats); /** Protocol Buffers v3-encode \a request */ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); @@ -93,6 +96,9 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, const grpc_grpclb_duration *rhs); +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb); + /** Destroy \a initial_response */ void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index 2b77cd39b8..b1c5dfc61c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -189,7 +189,8 @@ static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index ff41e61b3e..4c17f9c082 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -414,7 +414,8 @@ static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index b2de85c4a1..1af3393a62 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -781,7 +781,7 @@ grpc_error *grpc_connected_subchannel_create_call( (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); const grpc_call_element_args call_args = {.call_stack = callstk, .server_transport_data = NULL, - .context = NULL, + .context = args->context, .path = args->path, .start_time = args->start_time, .deadline = args->deadline, diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 6473de49b0..e433c33e40 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -119,6 +119,7 @@ typedef struct { gpr_timespec start_time; gpr_timespec deadline; gpr_arena *arena; + grpc_call_context_element *context; } grpc_connected_subchannel_call_args; grpc_error *grpc_connected_subchannel_create_call( diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 6c931ad28a..f0a21113c5 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -50,6 +50,9 @@ typedef enum { /// Reserved for traffic_class_context. GRPC_CONTEXT_TRAFFIC, + /// Value is a \a grpc_grpclb_client_stats. + GRPC_GRPCLB_CLIENT_STATS, + GRPC_CONTEXT_COUNT } grpc_context_index; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index d2a570cc87..42c13b2cb6 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -280,8 +280,10 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel_secure.c', + 'src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', |