From 4f2b0fdadfb1e1eb6c796c40ec5aabfbb348aa89 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 19 Jan 2018 12:12:23 -0800 Subject: Rename 'gpr++' directories to 'gprpp'. --- src/core/ext/filters/client_channel/lb_policy.h | 2 +- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 2 +- .../ext/filters/client_channel/lb_policy/round_robin/round_robin.cc | 2 +- src/core/ext/filters/client_channel/lb_policy/subchannel_list.h | 2 +- .../filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc | 2 +- .../ext/filters/client_channel/resolver/dns/native/dns_resolver.cc | 2 +- src/core/ext/filters/client_channel/subchannel.cc | 4 ++-- src/core/ext/filters/client_channel/subchannel.h | 4 ++-- 8 files changed, 10 insertions(+), 10 deletions(-) (limited to 'src/core/ext/filters') diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index e19726efb3..30660cb83d 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -20,7 +20,7 @@ #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H #include "src/core/ext/filters/client_channel/subchannel.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index eb5ced4c20..b89a28c077 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -106,8 +106,8 @@ #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index e217a0b0c0..24c381a46d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -34,7 +34,7 @@ #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index f4e345def6..3377605263 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -22,7 +22,7 @@ #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/transport/connectivity_state.h" // TODO(roth): This code is intended to be shared between pick_first and diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index 7ea3cdd6e1..9024ffb092 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -34,9 +34,9 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/gethostname.h" #include "src/core/lib/iomgr/resolve_address.h" diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index 93a1fe87a2..5aa7e6cc7e 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -29,9 +29,9 @@ #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/gpr++/manual_constructor.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index fe4fcbbb7d..3edefaae64 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -37,8 +37,8 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/debug/stats.h" -#include "src/core/lib/gpr++/debug_location.h" -#include "src/core/lib/gpr++/manual_constructor.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index f2a5c1e273..b7593ec911 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -21,9 +21,9 @@ #include "src/core/ext/filters/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/gpr++/ref_counted.h" -#include "src/core/lib/gpr++/ref_counted_ptr.h" #include "src/core/lib/gpr/arena.h" +#include "src/core/lib/gprpp/ref_counted.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" -- cgit v1.2.3 From 8e4c9d308c34a8e35fd9acb1d6cf8bf216a11f18 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Tue, 23 Jan 2018 16:26:39 -0800 Subject: Extract lb_calld from glb_policy --- .../grpclb/client_load_reporting_filter.cc | 64 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 865 +++++++++++---------- test/cpp/end2end/grpclb_end2end_test.cc | 47 +- 3 files changed, 512 insertions(+), 464 deletions(-) (limited to 'src/core/ext/filters') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc index 1708d81e61..4596f90745 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc @@ -69,11 +69,13 @@ static grpc_error* init_call_elem(grpc_call_element* elem, call_data* calld = (call_data*)elem->call_data; // Get stats object from context and take a ref. GPR_ASSERT(args->context != nullptr); - GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr); - calld->client_stats = grpc_grpclb_client_stats_ref( - (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS].value); - // Record call started. - grpc_grpclb_client_stats_add_call_started(calld->client_stats); + if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { + calld->client_stats = grpc_grpclb_client_stats_ref( + (grpc_grpclb_client_stats*)args->context[GRPC_GRPCLB_CLIENT_STATS] + .value); + // Record call started. + grpc_grpclb_client_stats_add_call_started(calld->client_stats); + } return GRPC_ERROR_NONE; } @@ -81,36 +83,40 @@ static void destroy_call_elem(grpc_call_element* elem, const grpc_call_final_info* final_info, grpc_closure* ignored) { call_data* calld = (call_data*)elem->call_data; - // Record call finished, optionally setting client_failed_to_send and - // received. - grpc_grpclb_client_stats_add_call_finished( - !calld->send_initial_metadata_succeeded /* client_failed_to_send */, - calld->recv_initial_metadata_succeeded /* known_received */, - calld->client_stats); - // All done, so unref the stats object. - grpc_grpclb_client_stats_unref(calld->client_stats); + if (calld->client_stats != nullptr) { + // Record call finished, optionally setting client_failed_to_send and + // received. + grpc_grpclb_client_stats_add_call_finished( + !calld->send_initial_metadata_succeeded /* client_failed_to_send */, + calld->recv_initial_metadata_succeeded /* known_received */, + calld->client_stats); + // All done, so unref the stats object. + grpc_grpclb_client_stats_unref(calld->client_stats); + } } static void start_transport_stream_op_batch( grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { call_data* calld = (call_data*)elem->call_data; GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); - // Intercept send_initial_metadata. - if (batch->send_initial_metadata) { - calld->original_on_complete_for_send = batch->on_complete; - GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, calld, - grpc_schedule_on_exec_ctx); - batch->on_complete = &calld->on_complete_for_send; - } - // Intercept recv_initial_metadata. - if (batch->recv_initial_metadata) { - calld->original_recv_initial_metadata_ready = - batch->payload->recv_initial_metadata.recv_initial_metadata_ready; - GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, calld, - grpc_schedule_on_exec_ctx); - batch->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->recv_initial_metadata_ready; + if (calld->client_stats != nullptr) { + // Intercept send_initial_metadata. + if (batch->send_initial_metadata) { + calld->original_on_complete_for_send = batch->on_complete; + GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, + calld, grpc_schedule_on_exec_ctx); + batch->on_complete = &calld->on_complete_for_send; + } + // Intercept recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, calld, + grpc_schedule_on_exec_ctx); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + } } // Chain to next filter. grpc_call_next_op(elem, batch); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 06ae79041e..6393884127 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -169,25 +169,78 @@ struct pending_ping { } // namespace -struct glb_lb_policy { - /** base policy: must be first */ +typedef struct glb_lb_call_data { + struct glb_lb_policy* glb_policy; + // todo refactor + gpr_refcount refs; + + /** The streaming call to the LB server. Always non-NULL. */ + grpc_call* lb_call; + + /** The initial metadata received from the LB server. */ + grpc_metadata_array lb_initial_metadata_recv; + + /** The message sent to the LB server. It's used to query for backends (the + * value may vary if the LB server indicates a redirect) or send client load + * report. */ + grpc_byte_buffer* send_message_payload; + /** The callback after the initial request is sent. */ + grpc_closure lb_on_sent_initial_request; + + /** The response received from the LB server, if any. */ + grpc_byte_buffer* recv_message_payload; + /** The callback to process the response received from the LB server. */ + grpc_closure lb_on_response_received; + bool seen_initial_response; + + /** The callback to process the status received from the LB server, which + * signals the end of the LB call. */ + grpc_closure lb_on_server_status_received; + /** The trailing metadata from the LB server. */ + grpc_metadata_array lb_trailing_metadata_recv; + /** The call status code and details. */ + grpc_status_code lb_call_status; + grpc_slice lb_call_status_details; + + /** The stats for client-side load reporting associated with this LB call. + * Created after the first serverlist is received. */ + grpc_grpclb_client_stats* client_stats; + /** The interval and timer for next client load report. */ + grpc_millis client_stats_report_interval; + grpc_timer client_load_report_timer; + bool client_load_report_timer_callback_pending; + bool last_client_load_report_counters_were_zero; + bool client_load_report_is_due; + /** The closure used for either the load report timer or the callback for + * completion of sending the load report. */ + grpc_closure client_load_report_closure; +} glb_lb_call_data; + +typedef struct glb_lb_policy { + /** Base policy: must be first. */ grpc_lb_policy base; - /** who the client is trying to communicate with */ + /** Who the client is trying to communicate with. */ const char* server_name; + + /** Channel related data that will be propagated to the internal RR policy. */ grpc_client_channel_factory* cc_factory; grpc_channel_args* args; - /** timeout in milliseconds for the LB call. 0 means no deadline. */ - int lb_call_timeout_ms; - - /** timeout in milliseconds for before using fallback backend addresses. + /** Timeout in milliseconds for before using fallback backend addresses. * 0 means not using fallback. */ int lb_fallback_timeout_ms; - /** for communicating with the LB server */ + /** The channel for communicating with the LB server. */ grpc_channel* lb_channel; + /** The data associated with the current LB call. It holds a ref to this LB + * policy. It's initialized every time we query for backends. It's reset to + * NULL whenever the current LB call is no longer needed (e.g., the LB policy + * is shutting down, or the LB call has ended). A non-NULL lb_calld always + * contains a non-NULL lb_call. */ + glb_lb_call_data* lb_calld; + /** response generator to inject address updates into \a lb_channel */ grpc_fake_resolver_response_generator* response_generator; @@ -225,9 +278,6 @@ struct glb_lb_policy { bool shutting_down; - /** are we currently updating lb_call? */ - bool updating_lb_call; - /** are we already watching the LB channel's connectivity? */ bool watching_lb_channel; @@ -243,65 +293,70 @@ struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ - /* Finished sending initial request. */ - grpc_closure lb_on_sent_initial_request; - - /* Status from the LB server has been received. This signals the end of the LB - * call. */ - grpc_closure lb_on_server_status_received; - - /* A response from the LB server has been received. Process it */ - grpc_closure lb_on_response_received; - - /* LB call retry timer callback. */ - grpc_closure lb_on_call_retry; - - /* LB fallback timer callback. */ - grpc_closure lb_on_fallback; - - grpc_call* lb_call; /* streaming call to the LB server, */ - - grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ - grpc_metadata_array - lb_trailing_metadata_recv; /* trailing MD from LB server */ - - /* what's being sent to the LB server. Note that its value may vary if the LB - * server indicates a redirect. */ - grpc_byte_buffer* lb_request_payload; - - /* response the LB server, if any. Processed in lb_on_response_received() */ - grpc_byte_buffer* lb_response_payload; - - /* call status code and details, set in lb_on_server_status_received() */ - grpc_status_code lb_call_status; - grpc_slice lb_call_status_details; /** LB call retry backoff state */ grpc_core::ManualConstructor lb_call_backoff; + /** timeout in milliseconds for the LB call. 0 means no deadline. */ + int lb_call_timeout_ms; + /** LB call retry timer */ grpc_timer lb_call_retry_timer; + /** LB call retry timer callback */ + grpc_closure lb_on_call_retry; /** LB fallback timer */ grpc_timer lb_fallback_timer; + /** LB fallback timer callback */ + grpc_closure lb_on_fallback; +} glb_lb_policy; - bool initial_request_sent; - bool seen_initial_response; +static void glb_lb_call_data_ref(glb_lb_call_data* lb_calld, + const char* reason) { + gpr_ref_non_zero(&lb_calld->refs); + if (grpc_lb_glb_trace.enabled()) { + const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count); + gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p REF %lu->%lu (%s)", + grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld, + (unsigned long)(count - 1), (unsigned long)count, reason); + } +} - /* Stats for client-side load reporting. Should be unreffed and - * recreated whenever lb_call is replaced. */ - grpc_grpclb_client_stats* client_stats; - /* Interval and timer for next client load report. */ - grpc_millis client_stats_report_interval; - grpc_timer client_load_report_timer; - bool client_load_report_timer_callback_pending; - bool last_client_load_report_counters_were_zero; - /* Closure used for either the load report timer or the callback for - * completion of sending the load report. */ - grpc_closure client_load_report_closure; - /* Client load report message payload. */ - grpc_byte_buffer* client_load_report_payload; -}; +static void glb_lb_call_data_unref(glb_lb_call_data* lb_calld, + const char* reason) { + const bool done = gpr_unref(&lb_calld->refs); + if (grpc_lb_glb_trace.enabled()) { + const gpr_atm count = gpr_atm_acq_load(&lb_calld->refs.count); + gpr_log(GPR_DEBUG, "[%s %p] lb_calld %p UNREF %lu->%lu (%s)", + grpc_lb_glb_trace.name(), lb_calld->glb_policy, lb_calld, + (unsigned long)(count + 1), (unsigned long)count, reason); + } + if (done) { + GPR_ASSERT(lb_calld->lb_call != nullptr); + grpc_call_unref(lb_calld->lb_call); + grpc_metadata_array_destroy(&lb_calld->lb_initial_metadata_recv); + grpc_metadata_array_destroy(&lb_calld->lb_trailing_metadata_recv); + grpc_byte_buffer_destroy(lb_calld->send_message_payload); + grpc_byte_buffer_destroy(lb_calld->recv_message_payload); + grpc_slice_unref_internal(lb_calld->lb_call_status_details); + if (lb_calld->client_stats != nullptr) { + grpc_grpclb_client_stats_unref(lb_calld->client_stats); + } + GRPC_LB_POLICY_UNREF(&lb_calld->glb_policy->base, "lb_calld"); + gpr_free(lb_calld); + } +} + +static void lb_call_data_shutdown(glb_lb_policy* glb_policy) { + GPR_ASSERT(glb_policy->lb_calld != nullptr); + GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr); + // lb_on_server_status_received will complete the cancellation and clean up. + grpc_call_cancel(glb_policy->lb_calld->lb_call, nullptr); + if (glb_policy->lb_calld->client_load_report_timer_callback_pending) { + grpc_timer_cancel(&glb_policy->lb_calld->client_load_report_timer); + } + glb_policy->lb_calld = nullptr; +} /* add lb_token of selected subchannel (address) to the call's initial * metadata */ @@ -334,11 +389,12 @@ static void pending_pick_set_metadata_and_context(pending_pick* pp) { abort(); } // Pass on client stats via context. Passes ownership of the reference. - GPR_ASSERT(pp->client_stats != nullptr); - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = - pp->client_stats; - pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = - destroy_client_stats; + if (pp->client_stats != nullptr) { + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].value = + pp->client_stats; + pp->pick->subchannel_call_context[GRPC_GRPCLB_CLIENT_STATS].destroy = + destroy_client_stats; + } } else { if (pp->client_stats != nullptr) { grpc_grpclb_client_stats_unref(pp->client_stats); @@ -605,9 +661,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - GPR_ASSERT(glb_policy->client_stats != nullptr); - grpc_grpclb_client_stats_add_call_dropped_locked( - server->load_balance_token, glb_policy->client_stats); + if (glb_policy->lb_calld != nullptr && + glb_policy->lb_calld->client_stats != nullptr) { + grpc_grpclb_client_stats_add_call_dropped_locked( + server->load_balance_token, glb_policy->lb_calld->client_stats); + } if (force_async) { GRPC_CLOSURE_SCHED(pp->original_on_complete, GRPC_ERROR_NONE); gpr_free(pp); @@ -618,7 +676,11 @@ static bool pick_from_internal_rr_locked(glb_lb_policy* glb_policy, } } // Set client_stats and user_data. - pp->client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); + if (glb_policy->lb_calld != nullptr && + glb_policy->lb_calld->client_stats != nullptr) { + pp->client_stats = + grpc_grpclb_client_stats_ref(glb_policy->lb_calld->client_stats); + } GPR_ASSERT(pp->pick->user_data == nullptr); pp->pick->user_data = (void**)&pp->lb_token; // Pick via the RR policy. @@ -872,9 +934,6 @@ static void glb_destroy(grpc_lb_policy* pol) { GPR_ASSERT(glb_policy->pending_pings == nullptr); gpr_free((void*)glb_policy->server_name); grpc_channel_args_destroy(glb_policy->args); - if (glb_policy->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(glb_policy->client_stats); - } grpc_connectivity_state_destroy(&glb_policy->state_tracker); if (glb_policy->serverlist != nullptr) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); @@ -892,13 +951,8 @@ static void glb_shutdown_locked(grpc_lb_policy* pol, glb_lb_policy* glb_policy = (glb_lb_policy*)pol; grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); glb_policy->shutting_down = true; - /* glb_policy->lb_call and this local lb_call must be consistent at this point - * because glb_policy->lb_call is only assigned in lb_call_init_locked as part - * of query_for_backends_locked, which can only be invoked while - * glb_policy->shutting_down is false. */ - if (glb_policy->lb_call != nullptr) { - grpc_call_cancel(glb_policy->lb_call, nullptr); - /* lb_on_server_status_received will pick up the cancel and clean up */ + if (glb_policy->lb_calld != nullptr) { + lb_call_data_shutdown(glb_policy); } if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); @@ -1048,7 +1102,6 @@ static void start_picking_locked(glb_lb_policy* glb_policy) { grpc_timer_init(&glb_policy->lb_fallback_timer, deadline, &glb_policy->lb_on_fallback); } - glb_policy->started_picking = true; glb_policy->lb_call_backoff->Reset(); query_for_backends_locked(glb_policy); @@ -1089,7 +1142,6 @@ static int glb_pick_locked(grpc_lb_policy* pol, gpr_log(GPR_INFO, "[grpclb %p] about to PICK from RR %p", glb_policy, glb_policy->rr_policy); } - GPR_ASSERT(glb_policy->client_stats != nullptr); pick_done = pick_from_internal_rr_locked(glb_policy, false /* force_async */, pp); } @@ -1139,8 +1191,8 @@ static void glb_notify_on_state_change_locked(grpc_lb_policy* pol, static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { glb_lb_policy* glb_policy = (glb_lb_policy*)arg; glb_policy->retry_timer_callback_pending = false; - if (!glb_policy->shutting_down && glb_policy->lb_call == nullptr && - error == GRPC_ERROR_NONE) { + if (!glb_policy->shutting_down && error == GRPC_ERROR_NONE && + glb_policy->lb_calld == nullptr) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", glb_policy); } @@ -1149,84 +1201,55 @@ static void lb_call_on_retry_timer_locked(void* arg, grpc_error* error) { GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_retry_timer"); } -static void maybe_restart_lb_call(glb_lb_policy* glb_policy) { - if (glb_policy->started_picking && glb_policy->updating_lb_call) { - if (glb_policy->retry_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_call_retry_timer); - } - if (!glb_policy->shutting_down) start_picking_locked(glb_policy); - glb_policy->updating_lb_call = false; - } else if (!glb_policy->shutting_down) { - /* if we aren't shutting down, restart the LB client call after some time */ - grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime(); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", +static void start_lb_call_retry_timer_locked(glb_lb_policy* glb_policy) { + grpc_millis next_try = glb_policy->lb_call_backoff->NextAttemptTime(); + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_DEBUG, "[grpclb %p] Connection to LB server lost...", + glb_policy); + grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); + if (timeout > 0) { + gpr_log(GPR_DEBUG, + "[grpclb %p] ... retry_timer_active in %" PRIuPTR "ms.", + glb_policy, timeout); + } else { + gpr_log(GPR_DEBUG, "[grpclb %p] ... retry_timer_active immediately.", glb_policy); - grpc_millis timeout = next_try - grpc_core::ExecCtx::Get()->Now(); - if (timeout > 0) { - gpr_log(GPR_DEBUG, - "[grpclb %p] ... retry LB call after %" PRIuPTR "ms.", - glb_policy, timeout); - } else { - gpr_log(GPR_DEBUG, "[grpclb %p] ... retry LB call immediately.", - glb_policy); - } } - GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, - lb_call_on_retry_timer_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - glb_policy->retry_timer_callback_pending = true; - grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, - &glb_policy->lb_on_call_retry); } - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_server_status_received_locked"); + GRPC_LB_POLICY_REF(&glb_policy->base, "grpclb_retry_timer"); + GRPC_CLOSURE_INIT(&glb_policy->lb_on_call_retry, + lb_call_on_retry_timer_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner)); + glb_policy->retry_timer_callback_pending = true; + grpc_timer_init(&glb_policy->lb_call_retry_timer, next_try, + &glb_policy->lb_on_call_retry); } -static void send_client_load_report_locked(void* arg, grpc_error* error); +static void maybe_send_client_load_report_locked(void* arg, grpc_error* error); -static void schedule_next_client_load_report(glb_lb_policy* glb_policy) { +static void schedule_next_client_load_report(glb_lb_call_data* lb_calld) { const grpc_millis next_client_load_report_time = - grpc_core::ExecCtx::Get()->Now() + - glb_policy->client_stats_report_interval; - GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, - send_client_load_report_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_timer_init(&glb_policy->client_load_report_timer, + grpc_core::ExecCtx::Get()->Now() + lb_calld->client_stats_report_interval; + GRPC_CLOSURE_INIT( + &lb_calld->client_load_report_closure, + maybe_send_client_load_report_locked, lb_calld, + grpc_combiner_scheduler(lb_calld->glb_policy->base.combiner)); + grpc_timer_init(&lb_calld->client_load_report_timer, next_client_load_report_time, - &glb_policy->client_load_report_closure); + &lb_calld->client_load_report_closure); + lb_calld->client_load_report_timer_callback_pending = true; } static void client_load_report_done_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); - glb_policy->client_load_report_payload = nullptr; - if (error != GRPC_ERROR_NONE || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_callback_pending = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); - if (glb_policy->lb_call == nullptr) { - maybe_restart_lb_call(glb_policy); - } + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + grpc_byte_buffer_destroy(lb_calld->send_message_payload); + lb_calld->send_message_payload = nullptr; + if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) { + glb_lb_call_data_unref(lb_calld, "client_load_report"); return; } - schedule_next_client_load_report(glb_policy); -} - -static void do_send_client_load_report_locked(glb_lb_policy* glb_policy) { - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_SEND_MESSAGE; - op.data.send_message.send_message = glb_policy->client_load_report_payload; - GRPC_CLOSURE_INIT(&glb_policy->client_load_report_closure, - client_load_report_done_locked, glb_policy, - grpc_combiner_scheduler(glb_policy->base.combiner)); - grpc_call_error call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, &op, 1, &glb_policy->client_load_report_closure); - if (call_error != GRPC_CALL_OK) { - gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); - GPR_ASSERT(GRPC_CALL_OK == call_error); - } + schedule_next_client_load_report(lb_calld); } static bool load_report_counters_are_zero(grpc_grpclb_request* request) { @@ -1241,341 +1264,377 @@ static bool load_report_counters_are_zero(grpc_grpclb_request* request) { (drop_entries == nullptr || drop_entries->num_entries == 0); } -static void send_client_load_report_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == nullptr) { - glb_policy->client_load_report_timer_callback_pending = false; - GRPC_LB_POLICY_UNREF(&glb_policy->base, "client_load_report"); - if (glb_policy->lb_call == nullptr) { - maybe_restart_lb_call(glb_policy); - } - return; - } +static void send_client_load_report_locked(glb_lb_call_data* lb_calld) { + glb_lb_policy* glb_policy = lb_calld->glb_policy; // Construct message payload. - GPR_ASSERT(glb_policy->client_load_report_payload == nullptr); + GPR_ASSERT(lb_calld->send_message_payload == nullptr); grpc_grpclb_request* request = - grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats); + grpc_grpclb_load_report_request_create_locked(lb_calld->client_stats); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. if (load_report_counters_are_zero(request)) { - if (glb_policy->last_client_load_report_counters_were_zero) { + if (lb_calld->last_client_load_report_counters_were_zero) { grpc_grpclb_request_destroy(request); - schedule_next_client_load_report(glb_policy); + schedule_next_client_load_report(lb_calld); return; } - glb_policy->last_client_load_report_counters_were_zero = true; + lb_calld->last_client_load_report_counters_were_zero = true; } else { - glb_policy->last_client_load_report_counters_were_zero = false; + lb_calld->last_client_load_report_counters_were_zero = false; } grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); - glb_policy->client_load_report_payload = + lb_calld->send_message_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); + // Send the report. + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = lb_calld->send_message_payload; + GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure, + client_load_report_done_locked, lb_calld, + grpc_combiner_scheduler(glb_policy->base.combiner)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_calld->lb_call, &op, 1, &lb_calld->client_load_report_closure); + if (call_error != GRPC_CALL_OK) { + gpr_log(GPR_ERROR, "[grpclb %p] call_error=%d", glb_policy, call_error); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } +} + +static void maybe_send_client_load_report_locked(void* arg, grpc_error* error) { + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + lb_calld->client_load_report_timer_callback_pending = false; + if (error != GRPC_ERROR_NONE || lb_calld != glb_policy->lb_calld) { + glb_lb_call_data_unref(lb_calld, "client_load_report"); + return; + } // If we've already sent the initial request, then we can go ahead and send // the load report. Otherwise, we need to wait until the initial request has - // been sent to send this (see lb_on_sent_initial_request_locked() below). - if (glb_policy->initial_request_sent) { - do_send_client_load_report_locked(glb_policy); + // been sent to send this (see lb_on_sent_initial_request_locked()). + if (lb_calld->send_message_payload == nullptr) { + send_client_load_report_locked(lb_calld); + } else { + lb_calld->client_load_report_is_due = true; } } static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error); static void lb_on_server_status_received_locked(void* arg, grpc_error* error); static void lb_on_response_received_locked(void* arg, grpc_error* error); -static void lb_call_init_locked(glb_lb_policy* glb_policy) { +static glb_lb_call_data* lb_call_data_create_locked(glb_lb_policy* glb_policy) { + GPR_ASSERT(!glb_policy->shutting_down); + // Init the LB call. Note that the LB call will progress every time there's + // activity in glb_policy->base.interested_parties, which is comprised of the + // polling entities from client_channel. GPR_ASSERT(glb_policy->server_name != nullptr); GPR_ASSERT(glb_policy->server_name[0] != '\0'); - GPR_ASSERT(glb_policy->lb_call == nullptr); - GPR_ASSERT(!glb_policy->shutting_down); - - /* Note the following LB call progresses every time there's activity in \a - * glb_policy->base.interested_parties, which is comprised of the polling - * entities from \a client_channel. */ grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name); grpc_millis deadline = glb_policy->lb_call_timeout_ms == 0 ? GRPC_MILLIS_INF_FUTURE : grpc_core::ExecCtx::Get()->Now() + glb_policy->lb_call_timeout_ms; - glb_policy->lb_call = grpc_channel_create_pollset_set_call( + glb_lb_call_data* lb_calld = (glb_lb_call_data*)gpr_zalloc(sizeof(*lb_calld)); + lb_calld->lb_call = grpc_channel_create_pollset_set_call( glb_policy->lb_channel, nullptr, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, &host, deadline, nullptr); grpc_slice_unref_internal(host); - - if (glb_policy->client_stats != nullptr) { - grpc_grpclb_client_stats_unref(glb_policy->client_stats); - } - glb_policy->client_stats = grpc_grpclb_client_stats_create(); - - grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); - grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); - + // Init the LB call request payload. grpc_grpclb_request* request = grpc_grpclb_request_create(glb_policy->server_name); grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); - glb_policy->lb_request_payload = + lb_calld->send_message_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); grpc_slice_unref_internal(request_payload_slice); grpc_grpclb_request_destroy(request); - - GRPC_CLOSURE_INIT(&glb_policy->lb_on_sent_initial_request, - lb_on_sent_initial_request_locked, glb_policy, + // Init other data associated with the LB call. + lb_calld->glb_policy = glb_policy; + gpr_ref_init(&lb_calld->refs, 1); + grpc_metadata_array_init(&lb_calld->lb_initial_metadata_recv); + grpc_metadata_array_init(&lb_calld->lb_trailing_metadata_recv); + GRPC_CLOSURE_INIT(&lb_calld->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, lb_calld, grpc_combiner_scheduler(glb_policy->base.combiner)); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_server_status_received, - lb_on_server_status_received_locked, glb_policy, + GRPC_CLOSURE_INIT(&lb_calld->lb_on_response_received, + lb_on_response_received_locked, lb_calld, grpc_combiner_scheduler(glb_policy->base.combiner)); - GRPC_CLOSURE_INIT(&glb_policy->lb_on_response_received, - lb_on_response_received_locked, glb_policy, + GRPC_CLOSURE_INIT(&lb_calld->lb_on_server_status_received, + lb_on_server_status_received_locked, lb_calld, grpc_combiner_scheduler(glb_policy->base.combiner)); - - grpc_core::BackOff::Options backoff_options; - backoff_options - .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) - .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) - .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) - .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); - - glb_policy->lb_call_backoff.Init(backoff_options); - - glb_policy->initial_request_sent = false; - glb_policy->seen_initial_response = false; - glb_policy->last_client_load_report_counters_were_zero = false; -} - -static void lb_call_destroy_locked(glb_lb_policy* glb_policy) { - GPR_ASSERT(glb_policy->lb_call != nullptr); - grpc_call_unref(glb_policy->lb_call); - glb_policy->lb_call = nullptr; - - grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv); - grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv); - - grpc_byte_buffer_destroy(glb_policy->lb_request_payload); - grpc_slice_unref_internal(glb_policy->lb_call_status_details); - - if (glb_policy->client_load_report_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->client_load_report_timer); - } + // Hold a ref to the glb_policy. + GRPC_LB_POLICY_REF(&glb_policy->base, "lb_calld"); + return lb_calld; } /* * Auxiliary functions and LB client callbacks. */ + static void query_for_backends_locked(glb_lb_policy* glb_policy) { GPR_ASSERT(glb_policy->lb_channel != nullptr); if (glb_policy->shutting_down) return; - - lb_call_init_locked(glb_policy); - + // Init the LB call data. + GPR_ASSERT(glb_policy->lb_calld == nullptr); + glb_policy->lb_calld = lb_call_data_create_locked(glb_policy); if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Query for backends (lb_channel: %p, lb_call: %p)", - glb_policy, glb_policy->lb_channel, glb_policy->lb_call); + "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p, " + "lb_call: %p)", + glb_policy, glb_policy->lb_channel, glb_policy->lb_calld, + glb_policy->lb_calld->lb_call); } - GPR_ASSERT(glb_policy->lb_call != nullptr); - + GPR_ASSERT(glb_policy->lb_calld->lb_call != nullptr); + // Create the ops. grpc_call_error call_error; grpc_op ops[3]; memset(ops, 0, sizeof(ops)); - + // Op: send initial metadata. grpc_op* op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op->flags = 0; op->reserved = nullptr; op++; + // Op: send request message. + GPR_ASSERT(glb_policy->lb_calld->send_message_payload != nullptr); + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = + glb_policy->lb_calld->send_message_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + glb_lb_call_data_ref(glb_policy->lb_calld, + "lb_on_sent_initial_request_locked"); + call_error = grpc_call_start_batch_and_execute( + glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_calld->lb_on_sent_initial_request); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Op: recv initial metadata. + op = ops; op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = - &glb_policy->lb_initial_metadata_recv; + &glb_policy->lb_calld->lb_initial_metadata_recv; op->flags = 0; op->reserved = nullptr; op++; - GPR_ASSERT(glb_policy->lb_request_payload != nullptr); - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = glb_policy->lb_request_payload; + // Op: recv response. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = + &glb_policy->lb_calld->recv_message_payload; op->flags = 0; op->reserved = nullptr; op++; - /* take a ref to be released in lb_on_sent_initial_request_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_sent_initial_request_locked"); + glb_lb_call_data_ref(glb_policy->lb_calld, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_sent_initial_request); + glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_calld->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); - + // Op: recv server status. op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = - &glb_policy->lb_trailing_metadata_recv; - op->data.recv_status_on_client.status = &glb_policy->lb_call_status; + &glb_policy->lb_calld->lb_trailing_metadata_recv; + op->data.recv_status_on_client.status = &glb_policy->lb_calld->lb_call_status; op->data.recv_status_on_client.status_details = - &glb_policy->lb_call_status_details; + &glb_policy->lb_calld->lb_call_status_details; op->flags = 0; op->reserved = nullptr; op++; - /* take a ref to be released in lb_on_server_status_received_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_server_status_received_locked"); + // This callback signals the end of the LB call, so it relies on the initial + // ref instead of a new ref. When it's invoked, it's the initial ref that is + // unreffed. call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_server_status_received); - GPR_ASSERT(GRPC_CALL_OK == call_error); - - op = ops; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &glb_policy->lb_response_payload; - op->flags = 0; - op->reserved = nullptr; - op++; - /* take a ref to be unref'd/reused in lb_on_response_received_locked() */ - GRPC_LB_POLICY_REF(&glb_policy->base, "lb_on_response_received_locked"); - call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_response_received); + glb_policy->lb_calld->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_calld->lb_on_server_status_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } static void lb_on_sent_initial_request_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - glb_policy->initial_request_sent = true; + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + grpc_byte_buffer_destroy(lb_calld->send_message_payload); + lb_calld->send_message_payload = nullptr; // If we attempted to send a client load report before the initial request was - // sent, send the load report now. - if (glb_policy->client_load_report_payload != nullptr) { - do_send_client_load_report_locked(glb_policy); + // sent (and this lb_calld is still in use), send the load report now. + if (lb_calld->client_load_report_is_due && + lb_calld == lb_calld->glb_policy->lb_calld) { + send_client_load_report_locked(lb_calld); + lb_calld->client_load_report_is_due = false; } - GRPC_LB_POLICY_UNREF(&glb_policy->base, "lb_on_sent_initial_request_locked"); + glb_lb_call_data_unref(lb_calld, "lb_on_sent_initial_request_locked"); } static void lb_on_response_received_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + // Empty payload means the LB call was cancelled. + if (lb_calld != glb_policy->lb_calld || + lb_calld->recv_message_payload == nullptr) { + glb_lb_call_data_unref(lb_calld, "lb_on_response_received_locked"); + return; + } grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op* op = ops; - if (glb_policy->lb_response_payload != nullptr) { - glb_policy->lb_call_backoff->Reset(); - /* Received data from the LB server. Look inside - * glb_policy->lb_response_payload, for a serverlist. */ - grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); - grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_reader_destroy(&bbr); - grpc_byte_buffer_destroy(glb_policy->lb_response_payload); - - grpc_grpclb_initial_response* response = nullptr; - if (!glb_policy->seen_initial_response && - (response = grpc_grpclb_initial_response_parse(response_slice)) != - nullptr) { - if (response->has_client_stats_report_interval) { - glb_policy->client_stats_report_interval = GPR_MAX( - GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( - &response->client_stats_report_interval)); - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; " - "client load reporting interval = %" PRIdPTR " milliseconds", - glb_policy, glb_policy->client_stats_report_interval); - } - /* take a ref to be unref'd in send_client_load_report_locked() */ - glb_policy->client_load_report_timer_callback_pending = true; - GRPC_LB_POLICY_REF(&glb_policy->base, "client_load_report"); - schedule_next_client_load_report(glb_policy); - } else if (grpc_lb_glb_trace.enabled()) { + glb_policy->lb_call_backoff->Reset(); + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload); + grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_reader_destroy(&bbr); + grpc_byte_buffer_destroy(lb_calld->recv_message_payload); + lb_calld->recv_message_payload = nullptr; + grpc_grpclb_initial_response* initial_response; + grpc_grpclb_serverlist* serverlist; + if (!lb_calld->seen_initial_response && + (initial_response = grpc_grpclb_initial_response_parse(response_slice)) != + nullptr) { + // Have NOT seen initial response, look for initial response. + if (initial_response->has_client_stats_report_interval) { + lb_calld->client_stats_report_interval = GPR_MAX( + GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis( + &initial_response->client_stats_report_interval)); + if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Received initial LB response message; client load " - "reporting NOT enabled", - glb_policy); + "[grpclb %p] Received initial LB response message; " + "client load reporting interval = %" PRIdPTR " milliseconds", + glb_policy, lb_calld->client_stats_report_interval); } - grpc_grpclb_initial_response_destroy(response); - glb_policy->seen_initial_response = true; - } else { - grpc_grpclb_serverlist* serverlist = - grpc_grpclb_response_parse_serverlist(response_slice); - if (serverlist != nullptr) { - GPR_ASSERT(glb_policy->lb_call != nullptr); + } else if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Received initial LB response message; client load " + "reporting NOT enabled", + glb_policy); + } + grpc_grpclb_initial_response_destroy(initial_response); + lb_calld->seen_initial_response = true; + } else if ((serverlist = grpc_grpclb_response_parse_serverlist( + response_slice)) != nullptr) { + // Have seen initial response, look for serverlist. + GPR_ASSERT(lb_calld->lb_call != nullptr); + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, + "[grpclb %p] Serverlist with %" PRIuPTR " servers received", + glb_policy, serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char* ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", + glb_policy, i, ipport); + gpr_free(ipport); + } + } + /* update serverlist */ + if (serverlist->num_servers > 0) { + // Start sending client load report only after we start using the + // serverlist returned from the current LB call. + if (lb_calld->client_stats_report_interval > 0 && + lb_calld->client_stats == nullptr) { + lb_calld->client_stats = grpc_grpclb_client_stats_create(); + glb_lb_call_data_ref(lb_calld, "client_load_report"); + schedule_next_client_load_report(lb_calld); + } + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, - "[grpclb %p] Serverlist with %" PRIuPTR " servers received", - glb_policy, serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_resolved_address addr; - parse_server(serverlist->servers[i], &addr); - char* ipport; - grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "[grpclb %p] Serverlist[%" PRIuPTR "]: %s", - glb_policy, i, ipport); - gpr_free(ipport); - } + "[grpclb %p] Incoming server list identical to current, " + "ignoring.", + glb_policy); } - /* update serverlist */ - if (serverlist->num_servers > 0) { - if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, - serverlist)) { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Incoming server list identical to current, " - "ignoring.", - glb_policy); - } - grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (glb_policy->serverlist != nullptr) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } else { - /* or dispose of the fallback */ - grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); - glb_policy->fallback_backend_addresses = nullptr; - if (glb_policy->fallback_timer_callback_pending) { - grpc_timer_cancel(&glb_policy->lb_fallback_timer); - } - } - /* and update the copy in the glb_lb_policy instance. This - * serverlist instance will be destroyed either upon the next - * update or in glb_destroy() */ - glb_policy->serverlist = serverlist; - glb_policy->serverlist_index = 0; - rr_handover_locked(glb_policy); - } + grpc_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (glb_policy->serverlist != nullptr) { + /* dispose of the old serverlist */ + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } else { - if (grpc_lb_glb_trace.enabled()) { - gpr_log(GPR_INFO, - "[grpclb %p] Received empty server list, ignoring.", - glb_policy); + /* or dispose of the fallback */ + grpc_lb_addresses_destroy(glb_policy->fallback_backend_addresses); + glb_policy->fallback_backend_addresses = nullptr; + if (glb_policy->fallback_timer_callback_pending) { + grpc_timer_cancel(&glb_policy->lb_fallback_timer); + glb_policy->fallback_timer_callback_pending = false; } - grpc_grpclb_destroy_serverlist(serverlist); } - } else { /* serverlist == nullptr */ - gpr_log(GPR_ERROR, - "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", - glb_policy, - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + /* and update the copy in the glb_lb_policy instance. This + * serverlist instance will be destroyed either upon the next + * update or in glb_destroy() */ + glb_policy->serverlist = serverlist; + glb_policy->serverlist_index = 0; + rr_handover_locked(glb_policy); } + } else { + if (grpc_lb_glb_trace.enabled()) { + gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.", + glb_policy); + } + grpc_grpclb_destroy_serverlist(serverlist); + } + } else { + // No valid initial response or serverlist found. + gpr_log(GPR_ERROR, + "[grpclb %p] Invalid LB response received: '%s'. Ignoring.", + glb_policy, + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + } + grpc_slice_unref_internal(response_slice); + if (!glb_policy->shutting_down) { + // Keep listening for serverlist updates. + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &lb_calld->recv_message_payload; + op->flags = 0; + op->reserved = nullptr; + op++; + // Reuse the "lb_on_response_received_locked" ref taken in + // query_for_backends_locked(). + const grpc_call_error call_error = grpc_call_start_batch_and_execute( + lb_calld->lb_call, ops, (size_t)(op - ops), + &lb_calld->lb_on_response_received); + GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { + glb_lb_call_data_unref(lb_calld, + "lb_on_response_received_locked+glb_shutdown"); + } +} + +static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { + glb_lb_call_data* lb_calld = (glb_lb_call_data*)arg; + glb_lb_policy* glb_policy = lb_calld->glb_policy; + GPR_ASSERT(lb_calld->lb_call != nullptr); + if (grpc_lb_glb_trace.enabled()) { + char* status_details = + grpc_slice_to_c_string(lb_calld->lb_call_status_details); + gpr_log(GPR_INFO, + "[grpclb %p] Status from LB server received. Status = %d, details " + "= '%s', (lb_calld: %p, lb_call: %p), error '%s'", + lb_calld->glb_policy, lb_calld->lb_call_status, status_details, + lb_calld, lb_calld->lb_call, grpc_error_string(error)); + gpr_free(status_details); + } + // If this lb_calld is still in use, this call ended because of a failure so + // we want to retry connecting. Otherwise, we have deliberately ended this + // call and no further action is required. + if (lb_calld == glb_policy->lb_calld) { + glb_policy->lb_calld = nullptr; + if (lb_calld->client_load_report_timer_callback_pending) { + grpc_timer_cancel(&lb_calld->client_load_report_timer); } - grpc_slice_unref_internal(response_slice); - if (!glb_policy->shutting_down) { - /* keep listening for serverlist updates */ - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &glb_policy->lb_response_payload; - op->flags = 0; - op->reserved = nullptr; - op++; - /* reuse the "lb_on_response_received_locked" ref taken in - * query_for_backends_locked() */ - const grpc_call_error call_error = grpc_call_start_batch_and_execute( - glb_policy->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_on_response_received); /* loop */ - GPR_ASSERT(GRPC_CALL_OK == call_error); + GPR_ASSERT(!glb_policy->shutting_down); + if (lb_calld->seen_initial_response) { + // If we lose connection to the LB server, reset the backoff and restart + // the LB call immediately. + glb_policy->lb_call_backoff->Reset(); + query_for_backends_locked(glb_policy); } else { - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_response_received_locked_shutdown"); + // If this LB call fails establishing any connection to the LB server, + // retry later. + start_lb_call_retry_timer_locked(glb_policy); } - } else { /* empty payload: call cancelled. */ - /* dispose of the "lb_on_response_received_locked" ref taken in - * query_for_backends_locked() and reused in every reception loop */ - GRPC_LB_POLICY_UNREF(&glb_policy->base, - "lb_on_response_received_locked_empty_payload"); } + glb_lb_call_data_unref(lb_calld, "lb_call_ended"); } static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { @@ -1597,29 +1656,6 @@ static void lb_on_fallback_timer_locked(void* arg, grpc_error* error) { GRPC_LB_POLICY_UNREF(&glb_policy->base, "grpclb_fallback_timer"); } -static void lb_on_server_status_received_locked(void* arg, grpc_error* error) { - glb_lb_policy* glb_policy = (glb_lb_policy*)arg; - GPR_ASSERT(glb_policy->lb_call != nullptr); - if (grpc_lb_glb_trace.enabled()) { - char* status_details = - grpc_slice_to_c_string(glb_policy->lb_call_status_details); - gpr_log(GPR_INFO, - "[grpclb %p] Status from LB server received. Status = %d, Details " - "= '%s', (call: %p), error '%s'", - glb_policy, glb_policy->lb_call_status, status_details, - glb_policy->lb_call, grpc_error_string(error)); - gpr_free(status_details); - } - /* We need to perform cleanups no matter what. */ - lb_call_destroy_locked(glb_policy); - // If the load report timer is still pending, we wait for it to be - // called before restarting the call. Otherwise, we restart the call - // here. - if (!glb_policy->client_load_report_timer_callback_pending) { - maybe_restart_lb_call(glb_policy); - } -} - static void fallback_update_locked(glb_lb_policy* glb_policy, const grpc_lb_addresses* addresses) { GPR_ASSERT(glb_policy->fallback_backend_addresses != nullptr); @@ -1701,7 +1737,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, switch (glb_policy->lb_channel_connectivity) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_TRANSIENT_FAILURE: { - /* resub. */ + // Keep watching the LB channel. grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element( grpc_channel_get_channel_stack(glb_policy->lb_channel)); @@ -1714,29 +1750,26 @@ static void glb_lb_channel_on_connectivity_changed_cb(void* arg, &glb_policy->lb_channel_on_connectivity_changed, nullptr); break; } + // The LB channel may be IDLE because it's shut down before the update. + // Restart the LB call to kick the LB channel into gear. case GRPC_CHANNEL_IDLE: - // lb channel inactive (probably shutdown prior to update). Restart lb - // call to kick the lb channel into gear. - /* fallthrough */ case GRPC_CHANNEL_READY: - if (glb_policy->lb_call != nullptr) { - glb_policy->updating_lb_call = true; - grpc_call_cancel(glb_policy->lb_call, nullptr); - // lb_on_server_status_received() will pick up the cancel and reinit - // lb_call. - } else if (glb_policy->started_picking) { + if (glb_policy->lb_calld != nullptr) { + lb_call_data_shutdown(glb_policy); + } + if (glb_policy->started_picking) { if (glb_policy->retry_timer_callback_pending) { grpc_timer_cancel(&glb_policy->lb_call_retry_timer); } - start_picking_locked(glb_policy); + glb_policy->lb_call_backoff->Reset(); + query_for_backends_locked(glb_policy); } - /* fallthrough */ + // Fall through. case GRPC_CHANNEL_SHUTDOWN: done: glb_policy->watching_lb_channel = false; GRPC_LB_POLICY_UNREF(&glb_policy->base, "watch_lb_channel_connectivity_cb_shutdown"); - break; } } @@ -1851,6 +1884,14 @@ static grpc_lb_policy* glb_create(grpc_lb_policy_factory* factory, grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); + // Init LB call backoff option. + grpc_core::BackOff::Options backoff_options; + backoff_options + .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS * 1000) + .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER) + .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER) + .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + glb_policy->lb_call_backoff.Init(backoff_options); return &glb_policy->base; } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 815dfd0c4f..78527587cf 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -220,30 +220,31 @@ class BalancerServiceImpl : public BalancerService { if (client_load_reporting_interval_seconds_ > 0) { request.Clear(); - stream->Read(&request); - gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this, - request.DebugString().c_str()); - GPR_ASSERT(request.has_client_stats()); - // We need to acquire the lock here in order to prevent the notify_one - // below from firing before its corresponding wait is executed. - std::lock_guard lock(mu_); - client_stats_.num_calls_started += - request.client_stats().num_calls_started(); - client_stats_.num_calls_finished += - request.client_stats().num_calls_finished(); - client_stats_.num_calls_finished_with_client_failed_to_send += - request.client_stats() - .num_calls_finished_with_client_failed_to_send(); - client_stats_.num_calls_finished_known_received += - request.client_stats().num_calls_finished_known_received(); - for (const auto& drop_token_count : - request.client_stats().calls_finished_with_drop()) { - client_stats_ - .drop_token_counts[drop_token_count.load_balance_token()] += - drop_token_count.num_calls(); + if (stream->Read(&request)) { + gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this, + request.DebugString().c_str()); + GPR_ASSERT(request.has_client_stats()); + // We need to acquire the lock here in order to prevent the notify_one + // below from firing before its corresponding wait is executed. + std::lock_guard lock(mu_); + client_stats_.num_calls_started += + request.client_stats().num_calls_started(); + client_stats_.num_calls_finished += + request.client_stats().num_calls_finished(); + client_stats_.num_calls_finished_with_client_failed_to_send += + request.client_stats() + .num_calls_finished_with_client_failed_to_send(); + client_stats_.num_calls_finished_known_received += + request.client_stats().num_calls_finished_known_received(); + for (const auto& drop_token_count : + request.client_stats().calls_finished_with_drop()) { + client_stats_ + .drop_token_counts[drop_token_count.load_balance_token()] += + drop_token_count.num_calls(); + } + load_report_ready_ = true; + load_report_cond_.notify_one(); } - load_report_ready_ = true; - load_report_cond_.notify_one(); } done: gpr_log(GPR_INFO, "LB[%p]: done", this); -- cgit v1.2.3 From c872ad459043a8c4376d87fe7970c27189f55743 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Wed, 24 Jan 2018 10:29:38 -0800 Subject: Fix comment --- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/core/ext/filters') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 6393884127..1709e5622e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -171,7 +171,7 @@ struct pending_ping { typedef struct glb_lb_call_data { struct glb_lb_policy* glb_policy; - // todo refactor + // TODO(juanlishen): c++ize this struct. gpr_refcount refs; /** The streaming call to the LB server. Always non-NULL. */ -- cgit v1.2.3