From e7751807e29f7cddcdf510530ef029c04a305790 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 27 Jul 2017 12:31:45 -0700 Subject: Use new protocol for reporting dropped calls to grpclb balancer. --- .../grpclb/client_load_reporting_filter.c | 1 - .../client_channel/lb_policy/grpclb/grpclb.c | 26 +++--- .../lb_policy/grpclb/grpclb_client_stats.c | 79 +++++++++++++------ .../lb_policy/grpclb/grpclb_client_stats.h | 27 +++++-- .../lb_policy/grpclb/load_balancer_api.c | 44 +++++++++-- .../lb_policy/grpclb/load_balancer_api.h | 2 +- .../grpclb/proto/grpc/lb/v1/load_balancer.pb.c | 22 +++--- .../grpclb/proto/grpc/lb/v1/load_balancer.pb.h | 48 ++++++----- src/proto/grpc/lb/v1/load_balancer.proto | 42 +++++----- test/cpp/end2end/grpclb_end2end_test.cc | 92 +++++++++++----------- test/cpp/grpclb/grpclb_api_test.cc | 16 ++-- 11 files changed, 241 insertions(+), 158 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c index 52c6e38c87..568bb2ba8d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -88,7 +88,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, // Record call finished, optionally setting client_failed_to_send and // received. grpc_grpclb_client_stats_add_call_finished( - false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */, !calld->send_initial_metadata_succeeded /* client_failed_to_send */, calld->recv_initial_metadata_succeeded /* known_received */, calld->client_stats); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index ebce801b37..f248a873ba 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -416,9 +416,7 @@ struct rr_connectivity_data { static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, bool log) { - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { - return false; - } + if (server->drop) return false; const grpc_grpclb_ip_address *ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { @@ -462,7 +460,7 @@ static const grpc_lb_user_data_vtable lb_token_vtable = { static void parse_server(const grpc_grpclb_server *server, grpc_resolved_address *addr) { memset(addr, 0, sizeof(*addr)); - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return; + if (server->drop) return; const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ @@ -610,7 +608,7 @@ static bool pick_from_internal_rr_locked( if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { glb_policy->serverlist_index = 0; // Wrap-around. } - if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + if (server->drop) { // Not using the RR policy, so unref it. if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", @@ -622,11 +620,8 @@ static bool pick_from_internal_rr_locked( // the client_load_reporting filter, because we do not create a // subchannel call (and therefore no client_load_reporting filter) // for dropped calls. - grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats); - grpc_grpclb_client_stats_add_call_finished( - server->drop_for_rate_limiting, server->drop_for_load_balancing, - false /* failed_to_send */, false /* known_received */, - wc_arg->client_stats); + grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token, + wc_arg->client_stats); grpc_grpclb_client_stats_unref(wc_arg->client_stats); if (force_async) { GPR_ASSERT(wc_arg->wrapped_closure != NULL); @@ -1309,15 +1304,14 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, } static bool load_report_counters_are_zero(grpc_grpclb_request *request) { + grpc_grpclb_dropped_call_counts *drop_entries = + request->client_stats.calls_finished_with_drop.arg; return request->client_stats.num_calls_started == 0 && request->client_stats.num_calls_finished == 0 && - request->client_stats.num_calls_finished_with_drop_for_rate_limiting == - 0 && - request->client_stats - .num_calls_finished_with_drop_for_load_balancing == 0 && request->client_stats.num_calls_finished_with_client_failed_to_send == 0 && - request->client_stats.num_calls_finished_known_received == 0; + request->client_stats.num_calls_finished_known_received == 0 && + (drop_entries == NULL || drop_entries->num_entries == 0); } static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1332,7 +1326,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, // Construct message payload. GPR_ASSERT(glb_policy->client_load_report_payload == NULL); grpc_grpclb_request *request = - grpc_grpclb_load_report_request_create(glb_policy->client_stats); + grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats); // Skip client load report if the counters were all zero in the last // report and they are still zero in this one. if (load_report_counters_are_zero(request)) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c index c762443b7c..5b62623145 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -18,8 +18,11 @@ #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" +#include + #include #include +#include #include #include @@ -29,10 +32,11 @@ struct grpc_grpclb_client_stats { gpr_refcount refs; + // This field must only be accessed via *_locked() methods. + grpc_grpclb_dropped_call_counts* drop_token_counts; + // These fields may be accessed from multiple threads at a time. gpr_atm num_calls_started; gpr_atm num_calls_finished; - gpr_atm num_calls_finished_with_drop_for_rate_limiting; - gpr_atm num_calls_finished_with_drop_for_load_balancing; gpr_atm num_calls_finished_with_client_failed_to_send; gpr_atm num_calls_finished_known_received; }; @@ -51,6 +55,7 @@ grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) { if (gpr_unref(&client_stats->refs)) { + grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts); gpr_free(client_stats); } } @@ -61,21 +66,9 @@ void grpc_grpclb_client_stats_add_call_started( } void grpc_grpclb_client_stats_add_call_finished( - bool finished_with_drop_for_rate_limiting, - bool finished_with_drop_for_load_balancing, bool finished_with_client_failed_to_send, bool finished_known_received, grpc_grpclb_client_stats* client_stats) { gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); - if (finished_with_drop_for_rate_limiting) { - gpr_atm_full_fetch_add( - &client_stats->num_calls_finished_with_drop_for_rate_limiting, - (gpr_atm)1); - } - if (finished_with_drop_for_load_balancing) { - gpr_atm_full_fetch_add( - &client_stats->num_calls_finished_with_drop_for_load_balancing, - (gpr_atm)1); - } if (finished_with_client_failed_to_send) { gpr_atm_full_fetch_add( &client_stats->num_calls_finished_with_client_failed_to_send, @@ -87,32 +80,70 @@ void grpc_grpclb_client_stats_add_call_finished( } } +void grpc_grpclb_client_stats_add_call_dropped_locked( + char* token, grpc_grpclb_client_stats* client_stats) { + // Increment num_calls_started and num_calls_finished. + gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1); + gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); + // Record the drop. + if (client_stats->drop_token_counts == NULL) { + client_stats->drop_token_counts = + gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts)); + } + grpc_grpclb_dropped_call_counts* drop_token_counts = + client_stats->drop_token_counts; + for (size_t i = 0; i < drop_token_counts->num_entries; ++i) { + if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) { + ++drop_token_counts->token_counts[i].count; + return; + } + } + // Not found, so add a new entry. We double the size of the array each time. + size_t new_num_entries = 2; + while (new_num_entries < drop_token_counts->num_entries + 1) { + new_num_entries *= 2; + } + drop_token_counts->token_counts = + gpr_realloc(drop_token_counts->token_counts, + new_num_entries * sizeof(grpc_grpclb_drop_token_count)); + grpc_grpclb_drop_token_count* new_entry = + &drop_token_counts->token_counts[drop_token_counts->num_entries++]; + new_entry->token = gpr_strdup(token); + new_entry->count = 1; +} + static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) { *value = (int64_t)gpr_atm_acq_load(counter); gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value)); } -void grpc_grpclb_client_stats_get( +void grpc_grpclb_client_stats_get_locked( grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, int64_t* num_calls_finished, - int64_t* num_calls_finished_with_drop_for_rate_limiting, - int64_t* num_calls_finished_with_drop_for_load_balancing, int64_t* num_calls_finished_with_client_failed_to_send, - int64_t* num_calls_finished_known_received) { + int64_t* num_calls_finished_known_received, + grpc_grpclb_dropped_call_counts** drop_token_counts) { atomic_get_and_reset_counter(num_calls_started, &client_stats->num_calls_started); atomic_get_and_reset_counter(num_calls_finished, &client_stats->num_calls_finished); - atomic_get_and_reset_counter( - num_calls_finished_with_drop_for_rate_limiting, - &client_stats->num_calls_finished_with_drop_for_rate_limiting); - atomic_get_and_reset_counter( - num_calls_finished_with_drop_for_load_balancing, - &client_stats->num_calls_finished_with_drop_for_load_balancing); atomic_get_and_reset_counter( num_calls_finished_with_client_failed_to_send, &client_stats->num_calls_finished_with_client_failed_to_send); atomic_get_and_reset_counter( num_calls_finished_known_received, &client_stats->num_calls_finished_known_received); + *drop_token_counts = client_stats->drop_token_counts; + client_stats->drop_token_counts = NULL; +} + +void grpc_grpclb_dropped_call_counts_destroy( + grpc_grpclb_dropped_call_counts* drop_entries) { + if (drop_entries != NULL) { + for (size_t i = 0; i < drop_entries->num_entries; ++i) { + gpr_free(drop_entries->token_counts[i].token); + } + gpr_free(drop_entries->token_counts); + gpr_free(drop_entries); + } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h index 4bb47d5c5c..c51e2a431a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -25,6 +25,16 @@ typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats; +typedef struct { + char* token; + int64_t count; +} grpc_grpclb_drop_token_count; + +typedef struct { + grpc_grpclb_drop_token_count* token_counts; + size_t num_entries; +} grpc_grpclb_dropped_call_counts; + grpc_grpclb_client_stats* grpc_grpclb_client_stats_create(); grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( grpc_grpclb_client_stats* client_stats); @@ -33,18 +43,23 @@ void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats); void grpc_grpclb_client_stats_add_call_started( grpc_grpclb_client_stats* client_stats); void grpc_grpclb_client_stats_add_call_finished( - bool finished_with_drop_for_rate_limiting, - bool finished_with_drop_for_load_balancing, bool finished_with_client_failed_to_send, bool finished_known_received, grpc_grpclb_client_stats* client_stats); -void grpc_grpclb_client_stats_get( +// This method is not thread-safe; caller must synchronize. +void grpc_grpclb_client_stats_add_call_dropped_locked( + char* token, grpc_grpclb_client_stats* client_stats); + +// This method is not thread-safe; caller must synchronize. +void grpc_grpclb_client_stats_get_locked( grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, int64_t* num_calls_finished, - int64_t* num_calls_finished_with_drop_for_rate_limiting, - int64_t* num_calls_finished_with_drop_for_load_balancing, int64_t* num_calls_finished_with_client_failed_to_send, - int64_t* num_calls_finished_known_received); + int64_t* num_calls_finished_known_received, + grpc_grpclb_dropped_call_counts** drop_token_counts); + +void grpc_grpclb_dropped_call_counts_destroy( + grpc_grpclb_dropped_call_counts* drop_entries); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \ */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index bec7c97a78..6fa29f326e 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -76,7 +76,33 @@ static void populate_timestamp(gpr_timespec timestamp, timestamp_pb->nanos = timestamp.tv_nsec; } -grpc_grpclb_request *grpc_grpclb_load_report_request_create( +static bool encode_string(pb_ostream_t *stream, const pb_field_t *field, + void *const *arg) { + char *str = *arg; + if (!pb_encode_tag_for_field(stream, field)) return false; + return pb_encode_string(stream, (uint8_t *)str, strlen(str)); +} + +static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field, + void *const *arg) { + grpc_grpclb_dropped_call_counts *drop_entries = *arg; + if (drop_entries == NULL) return true; + for (size_t i = 0; i < drop_entries->num_entries; ++i) { + if (!pb_encode_tag_for_field(stream, field)) return false; + grpc_lb_v1_ClientStatsPerToken drop_message; + drop_message.load_balance_token.funcs.encode = encode_string; + drop_message.load_balance_token.arg = drop_entries->token_counts[i].token; + drop_message.has_num_calls = true; + drop_message.num_calls = drop_entries->token_counts[i].count; + if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields, + &drop_message)) { + return false; + } + } + return true; +} + +grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked( grpc_grpclb_client_stats *client_stats) { grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); req->has_client_stats = true; @@ -84,18 +110,17 @@ grpc_grpclb_request *grpc_grpclb_load_report_request_create( populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); req->client_stats.has_num_calls_started = true; req->client_stats.has_num_calls_finished = true; - req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; - req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; req->client_stats.has_num_calls_finished_known_received = true; - grpc_grpclb_client_stats_get( + req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; + grpc_grpclb_client_stats_get_locked( client_stats, &req->client_stats.num_calls_started, &req->client_stats.num_calls_finished, - &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, - &req->client_stats.num_calls_finished_with_drop_for_load_balancing, &req->client_stats.num_calls_finished_with_client_failed_to_send, - &req->client_stats.num_calls_finished_known_received); + &req->client_stats.num_calls_finished_known_received, + (grpc_grpclb_dropped_call_counts **)&req->client_stats + .calls_finished_with_drop.arg); return req; } @@ -117,6 +142,11 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { } void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { + if (request->has_client_stats) { + grpc_grpclb_dropped_call_counts *drop_entries = + request->client_stats.calls_finished_with_drop.arg; + grpc_grpclb_dropped_call_counts_destroy(drop_entries); + } gpr_free(request); } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index ef8d563edc..c4a98492c9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -44,7 +44,7 @@ typedef struct { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); -grpc_grpclb_request *grpc_grpclb_load_report_request_create( +grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked( grpc_grpclb_client_stats *client_stats); /** Protocol Buffers v3-encode \a request */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c index fb119c7fc8..6a5d54c82a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c @@ -33,14 +33,19 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = { PB_LAST_FIELD }; -const pb_field_t grpc_lb_v1_ClientStats_fields[8] = { +const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = { + PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0), + PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0), + PB_LAST_FIELD +}; + +const pb_field_t grpc_lb_v1_ClientStats_fields[7] = { PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields), PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0), PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0), - PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0), - PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0), - PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0), + PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0), PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0), + PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields), PB_LAST_FIELD }; @@ -62,12 +67,11 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = { PB_LAST_FIELD }; -const pb_field_t grpc_lb_v1_Server_fields[6] = { +const pb_field_t grpc_lb_v1_Server_fields[5] = { PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0), PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0), PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0), - PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0), - PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0), + PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0), PB_LAST_FIELD }; @@ -81,7 +85,7 @@ const pb_field_t grpc_lb_v1_Server_fields[6] = { * numbers or field sizes that are larger than what can fit in 8 or 16 bit * field descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif #if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT) @@ -92,7 +96,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) * numbers or field sizes that are larger than what can fit in the default * 8 bit descriptors. */ -PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) +PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server) #endif diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h index d3ae919ec2..93333d1aed 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h @@ -14,6 +14,13 @@ extern "C" { #endif /* Struct definitions */ +typedef struct _grpc_lb_v1_ClientStatsPerToken { + pb_callback_t load_balance_token; + bool has_num_calls; + int64_t num_calls; +/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */ +} grpc_lb_v1_ClientStatsPerToken; + typedef struct _grpc_lb_v1_Duration { bool has_seconds; int64_t seconds; @@ -36,10 +43,8 @@ typedef struct _grpc_lb_v1_Server { int32_t port; bool has_load_balance_token; char load_balance_token[50]; - bool has_drop_for_rate_limiting; - bool drop_for_rate_limiting; - bool has_drop_for_load_balancing; - bool drop_for_load_balancing; + bool has_drop; + bool drop; /* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */ } grpc_lb_v1_Server; @@ -58,14 +63,11 @@ typedef struct _grpc_lb_v1_ClientStats { int64_t num_calls_started; bool has_num_calls_finished; int64_t num_calls_finished; - bool has_num_calls_finished_with_drop_for_rate_limiting; - int64_t num_calls_finished_with_drop_for_rate_limiting; - bool has_num_calls_finished_with_drop_for_load_balancing; - int64_t num_calls_finished_with_drop_for_load_balancing; bool has_num_calls_finished_with_client_failed_to_send; int64_t num_calls_finished_with_client_failed_to_send; bool has_num_calls_finished_known_received; int64_t num_calls_finished_known_received; + pb_callback_t calls_finished_with_drop; /* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */ } grpc_lb_v1_ClientStats; @@ -107,39 +109,41 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse { #define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0} #define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default} #define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""} -#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0} +#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0} +#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default} #define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default} #define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default} -#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0} +#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0} #define grpc_lb_v1_Duration_init_zero {false, 0, false, 0} #define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0} #define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero} #define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""} -#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0} +#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0} +#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}} #define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero} #define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero} #define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero} -#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0} +#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0} /* Field tags (for use in manual encoding/decoding) */ +#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1 +#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2 #define grpc_lb_v1_Duration_seconds_tag 1 #define grpc_lb_v1_Duration_nanos_tag 2 #define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1 #define grpc_lb_v1_Server_ip_address_tag 1 #define grpc_lb_v1_Server_port_tag 2 #define grpc_lb_v1_Server_load_balance_token_tag 3 -#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4 -#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5 +#define grpc_lb_v1_Server_drop_tag 4 #define grpc_lb_v1_Timestamp_seconds_tag 1 #define grpc_lb_v1_Timestamp_nanos_tag 2 #define grpc_lb_v1_ClientStats_timestamp_tag 1 #define grpc_lb_v1_ClientStats_num_calls_started_tag 2 #define grpc_lb_v1_ClientStats_num_calls_finished_tag 3 -#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4 -#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5 #define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6 #define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7 +#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8 #define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1 #define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2 #define grpc_lb_v1_ServerList_servers_tag 1 @@ -154,22 +158,24 @@ extern const pb_field_t grpc_lb_v1_Duration_fields[3]; extern const pb_field_t grpc_lb_v1_Timestamp_fields[3]; extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3]; extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2]; -extern const pb_field_t grpc_lb_v1_ClientStats_fields[8]; +extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3]; +extern const pb_field_t grpc_lb_v1_ClientStats_fields[7]; extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3]; extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3]; extern const pb_field_t grpc_lb_v1_ServerList_fields[3]; -extern const pb_field_t grpc_lb_v1_Server_fields[6]; +extern const pb_field_t grpc_lb_v1_Server_fields[5]; /* Maximum encoded size of messages (where known) */ #define grpc_lb_v1_Duration_size 22 #define grpc_lb_v1_Timestamp_size 22 -#define grpc_lb_v1_LoadBalanceRequest_size 226 +#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size) #define grpc_lb_v1_InitialLoadBalanceRequest_size 131 -#define grpc_lb_v1_ClientStats_size 90 +/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */ +/* grpc_lb_v1_ClientStats_size depends on runtime parameters */ #define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size) #define grpc_lb_v1_InitialLoadBalanceResponse_size 90 /* grpc_lb_v1_ServerList_size depends on runtime parameters */ -#define grpc_lb_v1_Server_size 85 +#define grpc_lb_v1_Server_size 83 /* Message IDs (where set with "msgid" option) */ #ifdef PB_MSGID diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto index b13b3438cf..0a33568bd6 100644 --- a/src/proto/grpc/lb/v1/load_balancer.proto +++ b/src/proto/grpc/lb/v1/load_balancer.proto @@ -67,6 +67,15 @@ message InitialLoadBalanceRequest { string name = 1; } +// Contains the number of calls finished for a particular load balance token. +message ClientStatsPerToken { + // See Server.load_balance_token. + string load_balance_token = 1; + + // The total number of RPCs that finished associated with the token. + int64 num_calls = 2; +} + // Contains client level statistics that are useful to load balancing. Each // count except the timestamp should be reset to zero after reporting the stats. message ClientStats { @@ -79,20 +88,17 @@ message ClientStats { // The total number of RPCs that finished. int64 num_calls_finished = 3; - // The total number of RPCs that were dropped by the client because of rate - // limiting. - int64 num_calls_finished_with_drop_for_rate_limiting = 4; - - // The total number of RPCs that were dropped by the client because of load - // balancing. - int64 num_calls_finished_with_drop_for_load_balancing = 5; - // The total number of RPCs that failed to reach a server except dropped RPCs. int64 num_calls_finished_with_client_failed_to_send = 6; // The total number of RPCs that finished and are known to have been received // by a server. int64 num_calls_finished_known_received = 7; + + // The list of dropped calls. + repeated ClientStatsPerToken calls_finished_with_drop = 8; + + reserved 4, 5; } message LoadBalanceResponse { @@ -134,10 +140,8 @@ message ServerList { Duration expiration_interval = 3; } -// Contains server information. When none of the [drop_for_*] fields are true, -// use the other fields. When drop_for_rate_limiting is true, ignore all other -// fields. Use drop_for_load_balancing only when it is true and -// drop_for_rate_limiting is false. +// Contains server information. When the drop field is not true, use the other +// fields. message Server { // A resolved address for the server, serialized in network-byte-order. It may // either be an IPv4 or IPv6 address. @@ -149,16 +153,16 @@ message Server { // An opaque but printable token given to the frontend for each pick. All // frontend requests for that pick must include the token in its initial // metadata. The token is used by the backend to verify the request and to - // allow the backend to report load to the gRPC LB system. + // allow the backend to report load to the gRPC LB system. The token is also + // used in client stats for reporting dropped calls. // // Its length is variable but less than 50 bytes. string load_balance_token = 3; - // Indicates whether this particular request should be dropped by the client - // for rate limiting. - bool drop_for_rate_limiting = 4; + // Indicates whether this particular request should be dropped by the client. + // If the request is dropped, there will be a corresponding entry in + // ClientStats.calls_finished_with_drop. + bool drop = 4; - // Indicates whether this particular request should be dropped by the client - // for load balancing. - bool drop_for_load_balancing = 5; + reserved 5; } diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 1f3255d18d..e403ee66fc 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -45,6 +45,7 @@ extern "C" { #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" +#include #include // TODO(dgq): Other scenarios in need of testing: @@ -142,22 +143,20 @@ grpc::string Ip4ToPackedString(const char* ip_str) { struct ClientStats { size_t num_calls_started = 0; size_t num_calls_finished = 0; - size_t num_calls_finished_with_drop_for_rate_limiting = 0; - size_t num_calls_finished_with_drop_for_load_balancing = 0; size_t num_calls_finished_with_client_failed_to_send = 0; size_t num_calls_finished_known_received = 0; + std::map drop_token_counts; ClientStats& operator+=(const ClientStats& other) { num_calls_started += other.num_calls_started; num_calls_finished += other.num_calls_finished; - num_calls_finished_with_drop_for_rate_limiting += - other.num_calls_finished_with_drop_for_rate_limiting; - num_calls_finished_with_drop_for_load_balancing += - other.num_calls_finished_with_drop_for_load_balancing; num_calls_finished_with_client_failed_to_send += other.num_calls_finished_with_client_failed_to_send; num_calls_finished_known_received += other.num_calls_finished_known_received; + for (const auto& p : other.drop_token_counts) { + drop_token_counts[p.first] += p.second; + } return *this; } }; @@ -218,17 +217,17 @@ class BalancerServiceImpl : public BalancerService { request.client_stats().num_calls_started(); client_stats_.num_calls_finished += request.client_stats().num_calls_finished(); - client_stats_.num_calls_finished_with_drop_for_rate_limiting += - request.client_stats() - .num_calls_finished_with_drop_for_rate_limiting(); - client_stats_.num_calls_finished_with_drop_for_load_balancing += - request.client_stats() - .num_calls_finished_with_drop_for_load_balancing(); client_stats_.num_calls_finished_with_client_failed_to_send += request.client_stats() .num_calls_finished_with_client_failed_to_send(); client_stats_.num_calls_finished_known_received += request.client_stats().num_calls_finished_known_received(); + for (const auto& drop_token_count : + request.client_stats().calls_finished_with_drop()) { + client_stats_ + .drop_token_counts[drop_token_count.load_balance_token()] += + drop_token_count.num_calls(); + } load_report_cond_.notify_one(); } done: @@ -252,16 +251,15 @@ class BalancerServiceImpl : public BalancerService { } static LoadBalanceResponse BuildResponseForBackends( - const std::vector& backend_ports, int num_drops_for_rate_limiting, - int num_drops_for_load_balancing) { + const std::vector& backend_ports, + const std::map& drop_token_counts) { LoadBalanceResponse response; - for (int i = 0; i < num_drops_for_rate_limiting; ++i) { - auto* server = response.mutable_server_list()->add_servers(); - server->set_drop_for_rate_limiting(true); - } - for (int i = 0; i < num_drops_for_load_balancing; ++i) { - auto* server = response.mutable_server_list()->add_servers(); - server->set_drop_for_load_balancing(true); + for (const auto& drop_token_count : drop_token_counts) { + for (size_t i = 0; i < drop_token_count.second; ++i) { + auto* server = response.mutable_server_list()->add_servers(); + server->set_drop(true); + server->set_load_balance_token(drop_token_count.first); + } } for (const int& backend_port : backend_ports) { auto* server = response.mutable_server_list()->add_servers(); @@ -499,7 +497,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest { TEST_F(SingleBalancerTest, Vanilla) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); @@ -538,7 +536,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0); // Send non-empty serverlist only after kServerlistDelayMs ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); const auto t0 = system_clock::now(); @@ -580,11 +578,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { // Send a serverlist right away. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // ... and the same one a bit later. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); // Send num_backends/2 requests. @@ -648,10 +646,9 @@ TEST_F(UpdatesTest, UpdateBalancers) { const std::vector first_backend{GetBackendPorts()[0]}; const std::vector second_backend{GetBackendPorts()[1]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -726,10 +723,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { const std::vector second_backend{GetBackendPorts()[0]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -809,10 +805,9 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { const std::vector second_backend{GetBackendPorts()[1]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -901,7 +896,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { TEST_F(SingleBalancerTest, Drop) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2), + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), 0); // Send 100 RPCs for each server and drop address. const auto& statuses_and_responses = @@ -936,7 +932,9 @@ TEST_F(SingleBalancerTest, Drop) { TEST_F(SingleBalancerTest, DropAllFirst) { // All registered addresses are marked as "drop". ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 0); + 0, BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), + 0); const auto& statuses_and_responses = SendRpc(kMessage_, 1); for (const auto& status_and_response : statuses_and_responses) { const Status& status = status_and_response.first; @@ -947,10 +945,12 @@ TEST_F(SingleBalancerTest, DropAllFirst) { TEST_F(SingleBalancerTest, DropAll) { ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 1000); + 0, BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), + 1000); // First call succeeds. auto statuses_and_responses = SendRpc(kMessage_, 1); @@ -980,7 +980,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // Send 100 RPCs per server. const auto& statuses_and_responses = @@ -1009,17 +1009,17 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished); - EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting); - EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished_known_received); + EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre()); } TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { const size_t kNumRpcsPerAddress = 3; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1), + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), 0); // Send 100 RPCs for each server and drop address. const auto& statuses_and_responses = @@ -1056,13 +1056,13 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { client_stats.num_calls_started); EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3), client_stats.num_calls_finished); - EXPECT_EQ(kNumRpcsPerAddress * 2, - client_stats.num_calls_finished_with_drop_for_rate_limiting); - EXPECT_EQ(kNumRpcsPerAddress, - client_stats.num_calls_finished_with_drop_for_load_balancing); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished_known_received); + EXPECT_THAT(client_stats.drop_token_counts, + ::testing::ElementsAre( + ::testing::Pair("load_balancing", kNumRpcsPerAddress), + ::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2))); } } // namespace diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index cec9666503..6b0350e1f9 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -91,13 +91,13 @@ TEST_F(GrpclbTest, ParseResponseServerList) { auto* server = serverlist->add_servers(); server->set_ip_address(Ip4ToPackedString("127.0.0.1")); server->set_port(12345); - server->set_drop_for_rate_limiting(true); - server->set_drop_for_load_balancing(false); + server->set_load_balance_token("rate_limting"); + server->set_drop(true); server = response.mutable_server_list()->add_servers(); server->set_ip_address(Ip4ToPackedString("10.0.0.1")); server->set_port(54321); - server->set_drop_for_rate_limiting(false); - server->set_drop_for_load_balancing(true); + server->set_load_balance_token("load_balancing"); + server->set_drop(true); auto* expiration_interval = serverlist->mutable_expiration_interval(); expiration_interval->set_seconds(888); expiration_interval->set_nanos(999); @@ -112,14 +112,14 @@ TEST_F(GrpclbTest, ParseResponseServerList) { EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address), "127.0.0.1"); EXPECT_EQ(c_serverlist->servers[0]->port, 12345); - EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting); - EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing); + EXPECT_STREQ(c_serverlist->servers[0]->load_balance_token, "rate_limting"); + EXPECT_TRUE(c_serverlist->servers[0]->drop); EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address); EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1"); EXPECT_EQ(c_serverlist->servers[1]->port, 54321); - EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting); - EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing); + EXPECT_STREQ(c_serverlist->servers[1]->load_balance_token, "load_balancing"); + EXPECT_TRUE(c_serverlist->servers[1]->drop); EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds); EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888); -- cgit v1.2.3