aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c1
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c26
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c79
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h27
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c44
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c22
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h48
-rw-r--r--src/proto/grpc/lb/v1/load_balancer.proto42
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc92
-rw-r--r--test/cpp/grpclb/grpclb_api_test.cc16
11 files changed, 241 insertions, 158 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
index 52c6e38c87..568bb2ba8d 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
@@ -88,7 +88,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
// Record call finished, optionally setting client_failed_to_send and
// received.
grpc_grpclb_client_stats_add_call_finished(
- false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */,
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */,
calld->client_stats);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index ebce801b37..f248a873ba 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -416,9 +416,7 @@ struct rr_connectivity_data {
static bool is_server_valid(const grpc_grpclb_server *server, size_t idx,
bool log) {
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
- return false;
- }
+ if (server->drop) return false;
const grpc_grpclb_ip_address *ip = &server->ip_address;
if (server->port >> 16 != 0) {
if (log) {
@@ -462,7 +460,7 @@ static const grpc_lb_user_data_vtable lb_token_vtable = {
static void parse_server(const grpc_grpclb_server *server,
grpc_resolved_address *addr) {
memset(addr, 0, sizeof(*addr));
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return;
+ if (server->drop) return;
const uint16_t netorder_port = htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
@@ -610,7 +608,7 @@ static bool pick_from_internal_rr_locked(
if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) {
glb_policy->serverlist_index = 0; // Wrap-around.
}
- if (server->drop_for_rate_limiting || server->drop_for_load_balancing) {
+ if (server->drop) {
// Not using the RR policy, so unref it.
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")",
@@ -622,11 +620,8 @@ static bool pick_from_internal_rr_locked(
// the client_load_reporting filter, because we do not create a
// subchannel call (and therefore no client_load_reporting filter)
// for dropped calls.
- grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats);
- grpc_grpclb_client_stats_add_call_finished(
- server->drop_for_rate_limiting, server->drop_for_load_balancing,
- false /* failed_to_send */, false /* known_received */,
- wc_arg->client_stats);
+ grpc_grpclb_client_stats_add_call_dropped_locked(server->load_balance_token,
+ wc_arg->client_stats);
grpc_grpclb_client_stats_unref(wc_arg->client_stats);
if (force_async) {
GPR_ASSERT(wc_arg->wrapped_closure != NULL);
@@ -1309,15 +1304,14 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
}
static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 &&
- request->client_stats.num_calls_finished_with_drop_for_rate_limiting ==
- 0 &&
- request->client_stats
- .num_calls_finished_with_drop_for_load_balancing == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send ==
0 &&
- request->client_stats.num_calls_finished_known_received == 0;
+ request->client_stats.num_calls_finished_known_received == 0 &&
+ (drop_entries == NULL || drop_entries->num_entries == 0);
}
static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -1332,7 +1326,7 @@ static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg,
// Construct message payload.
GPR_ASSERT(glb_policy->client_load_report_payload == NULL);
grpc_grpclb_request *request =
- grpc_grpclb_load_report_request_create(glb_policy->client_stats);
+ grpc_grpclb_load_report_request_create_locked(glb_policy->client_stats);
// Skip client load report if the counters were all zero in the last
// report and they are still zero in this one.
if (load_report_counters_are_zero(request)) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
index c762443b7c..5b62623145 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c
@@ -18,8 +18,11 @@
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
+#include <string.h>
+
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@@ -29,10 +32,11 @@
struct grpc_grpclb_client_stats {
gpr_refcount refs;
+ // This field must only be accessed via *_locked() methods.
+ grpc_grpclb_dropped_call_counts* drop_token_counts;
+ // These fields may be accessed from multiple threads at a time.
gpr_atm num_calls_started;
gpr_atm num_calls_finished;
- gpr_atm num_calls_finished_with_drop_for_rate_limiting;
- gpr_atm num_calls_finished_with_drop_for_load_balancing;
gpr_atm num_calls_finished_with_client_failed_to_send;
gpr_atm num_calls_finished_known_received;
};
@@ -51,6 +55,7 @@ grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) {
if (gpr_unref(&client_stats->refs)) {
+ grpc_grpclb_dropped_call_counts_destroy(client_stats->drop_token_counts);
gpr_free(client_stats);
}
}
@@ -61,21 +66,9 @@ void grpc_grpclb_client_stats_add_call_started(
}
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats) {
gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
- if (finished_with_drop_for_rate_limiting) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_rate_limiting,
- (gpr_atm)1);
- }
- if (finished_with_drop_for_load_balancing) {
- gpr_atm_full_fetch_add(
- &client_stats->num_calls_finished_with_drop_for_load_balancing,
- (gpr_atm)1);
- }
if (finished_with_client_failed_to_send) {
gpr_atm_full_fetch_add(
&client_stats->num_calls_finished_with_client_failed_to_send,
@@ -87,32 +80,70 @@ void grpc_grpclb_client_stats_add_call_finished(
}
}
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats) {
+ // Increment num_calls_started and num_calls_finished.
+ gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1);
+ gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1);
+ // Record the drop.
+ if (client_stats->drop_token_counts == NULL) {
+ client_stats->drop_token_counts =
+ gpr_zalloc(sizeof(grpc_grpclb_dropped_call_counts));
+ }
+ grpc_grpclb_dropped_call_counts* drop_token_counts =
+ client_stats->drop_token_counts;
+ for (size_t i = 0; i < drop_token_counts->num_entries; ++i) {
+ if (strcmp(drop_token_counts->token_counts[i].token, token) == 0) {
+ ++drop_token_counts->token_counts[i].count;
+ return;
+ }
+ }
+ // Not found, so add a new entry. We double the size of the array each time.
+ size_t new_num_entries = 2;
+ while (new_num_entries < drop_token_counts->num_entries + 1) {
+ new_num_entries *= 2;
+ }
+ drop_token_counts->token_counts =
+ gpr_realloc(drop_token_counts->token_counts,
+ new_num_entries * sizeof(grpc_grpclb_drop_token_count));
+ grpc_grpclb_drop_token_count* new_entry =
+ &drop_token_counts->token_counts[drop_token_counts->num_entries++];
+ new_entry->token = gpr_strdup(token);
+ new_entry->count = 1;
+}
+
static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) {
*value = (int64_t)gpr_atm_acq_load(counter);
gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value));
}
-void grpc_grpclb_client_stats_get(
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received) {
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts) {
atomic_get_and_reset_counter(num_calls_started,
&client_stats->num_calls_started);
atomic_get_and_reset_counter(num_calls_finished,
&client_stats->num_calls_finished);
atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_rate_limiting,
- &client_stats->num_calls_finished_with_drop_for_rate_limiting);
- atomic_get_and_reset_counter(
- num_calls_finished_with_drop_for_load_balancing,
- &client_stats->num_calls_finished_with_drop_for_load_balancing);
- atomic_get_and_reset_counter(
num_calls_finished_with_client_failed_to_send,
&client_stats->num_calls_finished_with_client_failed_to_send);
atomic_get_and_reset_counter(
num_calls_finished_known_received,
&client_stats->num_calls_finished_known_received);
+ *drop_token_counts = client_stats->drop_token_counts;
+ client_stats->drop_token_counts = NULL;
+}
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries) {
+ if (drop_entries != NULL) {
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ gpr_free(drop_entries->token_counts[i].token);
+ }
+ gpr_free(drop_entries->token_counts);
+ gpr_free(drop_entries);
+ }
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
index 4bb47d5c5c..c51e2a431a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h
@@ -25,6 +25,16 @@
typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats;
+typedef struct {
+ char* token;
+ int64_t count;
+} grpc_grpclb_drop_token_count;
+
+typedef struct {
+ grpc_grpclb_drop_token_count* token_counts;
+ size_t num_entries;
+} grpc_grpclb_dropped_call_counts;
+
grpc_grpclb_client_stats* grpc_grpclb_client_stats_create();
grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref(
grpc_grpclb_client_stats* client_stats);
@@ -33,18 +43,23 @@ void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_started(
grpc_grpclb_client_stats* client_stats);
void grpc_grpclb_client_stats_add_call_finished(
- bool finished_with_drop_for_rate_limiting,
- bool finished_with_drop_for_load_balancing,
bool finished_with_client_failed_to_send, bool finished_known_received,
grpc_grpclb_client_stats* client_stats);
-void grpc_grpclb_client_stats_get(
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_add_call_dropped_locked(
+ char* token, grpc_grpclb_client_stats* client_stats);
+
+// This method is not thread-safe; caller must synchronize.
+void grpc_grpclb_client_stats_get_locked(
grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started,
int64_t* num_calls_finished,
- int64_t* num_calls_finished_with_drop_for_rate_limiting,
- int64_t* num_calls_finished_with_drop_for_load_balancing,
int64_t* num_calls_finished_with_client_failed_to_send,
- int64_t* num_calls_finished_known_received);
+ int64_t* num_calls_finished_known_received,
+ grpc_grpclb_dropped_call_counts** drop_token_counts);
+
+void grpc_grpclb_dropped_call_counts_destroy(
+ grpc_grpclb_dropped_call_counts* drop_entries);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \
*/
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
index bec7c97a78..6fa29f326e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
@@ -76,7 +76,33 @@ static void populate_timestamp(gpr_timespec timestamp,
timestamp_pb->nanos = timestamp.tv_nsec;
}
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+static bool encode_string(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ char *str = *arg;
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ return pb_encode_string(stream, (uint8_t *)str, strlen(str));
+}
+
+static bool encode_drops(pb_ostream_t *stream, const pb_field_t *field,
+ void *const *arg) {
+ grpc_grpclb_dropped_call_counts *drop_entries = *arg;
+ if (drop_entries == NULL) return true;
+ for (size_t i = 0; i < drop_entries->num_entries; ++i) {
+ if (!pb_encode_tag_for_field(stream, field)) return false;
+ grpc_lb_v1_ClientStatsPerToken drop_message;
+ drop_message.load_balance_token.funcs.encode = encode_string;
+ drop_message.load_balance_token.arg = drop_entries->token_counts[i].token;
+ drop_message.has_num_calls = true;
+ drop_message.num_calls = drop_entries->token_counts[i].count;
+ if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields,
+ &drop_message)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats) {
grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request));
req->has_client_stats = true;
@@ -84,18 +110,17 @@ grpc_grpclb_request *grpc_grpclb_load_report_request_create(
populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp);
req->client_stats.has_num_calls_started = true;
req->client_stats.has_num_calls_finished = true;
- req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true;
- req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true;
req->client_stats.has_num_calls_finished_known_received = true;
- grpc_grpclb_client_stats_get(
+ req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops;
+ grpc_grpclb_client_stats_get_locked(
client_stats, &req->client_stats.num_calls_started,
&req->client_stats.num_calls_finished,
- &req->client_stats.num_calls_finished_with_drop_for_rate_limiting,
- &req->client_stats.num_calls_finished_with_drop_for_load_balancing,
&req->client_stats.num_calls_finished_with_client_failed_to_send,
- &req->client_stats.num_calls_finished_known_received);
+ &req->client_stats.num_calls_finished_known_received,
+ (grpc_grpclb_dropped_call_counts **)&req->client_stats
+ .calls_finished_with_drop.arg);
return req;
}
@@ -117,6 +142,11 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
}
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
+ if (request->has_client_stats) {
+ grpc_grpclb_dropped_call_counts *drop_entries =
+ request->client_stats.calls_finished_with_drop.arg;
+ grpc_grpclb_dropped_call_counts_destroy(drop_entries);
+ }
gpr_free(request);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
index ef8d563edc..c4a98492c9 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h
@@ -44,7 +44,7 @@ typedef struct {
/** Create a request for a gRPC LB service under \a lb_service_name */
grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name);
-grpc_grpclb_request *grpc_grpclb_load_report_request_create(
+grpc_grpclb_request *grpc_grpclb_load_report_request_create_locked(
grpc_grpclb_client_stats *client_stats);
/** Protocol Buffers v3-encode \a request */
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index fb119c7fc8..6a5d54c82a 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -33,14 +33,19 @@ const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_ClientStats_fields[8] = {
+const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3] = {
+ PB_FIELD( 1, STRING , OPTIONAL, CALLBACK, FIRST, grpc_lb_v1_ClientStatsPerToken, load_balance_token, load_balance_token, 0),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStatsPerToken, num_calls, load_balance_token, 0),
+ PB_LAST_FIELD
+};
+
+const pb_field_t grpc_lb_v1_ClientStats_fields[7] = {
PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, timestamp, timestamp, &grpc_lb_v1_Timestamp_fields),
PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_started, timestamp, 0),
PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished, num_calls_started, 0),
- PB_FIELD( 4, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_rate_limiting, num_calls_finished, 0),
- PB_FIELD( 5, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_drop_for_load_balancing, num_calls_finished_with_drop_for_rate_limiting, 0),
- PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished_with_drop_for_load_balancing, 0),
+ PB_FIELD( 6, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_with_client_failed_to_send, num_calls_finished, 0),
PB_FIELD( 7, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, num_calls_finished_known_received, num_calls_finished_with_client_failed_to_send, 0),
+ PB_FIELD( 8, MESSAGE , REPEATED, CALLBACK, OTHER, grpc_lb_v1_ClientStats, calls_finished_with_drop, num_calls_finished_known_received, &grpc_lb_v1_ClientStatsPerToken_fields),
PB_LAST_FIELD
};
@@ -62,12 +67,11 @@ const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v1_Server_fields[6] = {
+const pb_field_t grpc_lb_v1_Server_fields[5] = {
PB_FIELD( 1, BYTES , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_rate_limiting, load_balance_token, 0),
- PB_FIELD( 5, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_for_load_balancing, drop_for_rate_limiting, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop, load_balance_token, 0),
PB_LAST_FIELD
};
@@ -81,7 +85,7 @@ const pb_field_t grpc_lb_v1_Server_fields[6] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 65536 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -92,7 +96,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_ClientStats, timestamp) < 256 && pb_membersize(grpc_lb_v1_ClientStats, calls_finished_with_drop) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_Timestamp_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStatsPerToken_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
index d3ae919ec2..93333d1aed 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -14,6 +14,13 @@ extern "C" {
#endif
/* Struct definitions */
+typedef struct _grpc_lb_v1_ClientStatsPerToken {
+ pb_callback_t load_balance_token;
+ bool has_num_calls;
+ int64_t num_calls;
+/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStatsPerToken) */
+} grpc_lb_v1_ClientStatsPerToken;
+
typedef struct _grpc_lb_v1_Duration {
bool has_seconds;
int64_t seconds;
@@ -36,10 +43,8 @@ typedef struct _grpc_lb_v1_Server {
int32_t port;
bool has_load_balance_token;
char load_balance_token[50];
- bool has_drop_for_rate_limiting;
- bool drop_for_rate_limiting;
- bool has_drop_for_load_balancing;
- bool drop_for_load_balancing;
+ bool has_drop;
+ bool drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */
} grpc_lb_v1_Server;
@@ -58,14 +63,11 @@ typedef struct _grpc_lb_v1_ClientStats {
int64_t num_calls_started;
bool has_num_calls_finished;
int64_t num_calls_finished;
- bool has_num_calls_finished_with_drop_for_rate_limiting;
- int64_t num_calls_finished_with_drop_for_rate_limiting;
- bool has_num_calls_finished_with_drop_for_load_balancing;
- int64_t num_calls_finished_with_drop_for_load_balancing;
bool has_num_calls_finished_with_client_failed_to_send;
int64_t num_calls_finished_with_client_failed_to_send;
bool has_num_calls_finished_known_received;
int64_t num_calls_finished_known_received;
+ pb_callback_t calls_finished_with_drop;
/* @@protoc_insertion_point(struct:grpc_lb_v1_ClientStats) */
} grpc_lb_v1_ClientStats;
@@ -107,39 +109,41 @@ typedef struct _grpc_lb_v1_LoadBalanceResponse {
#define grpc_lb_v1_Timestamp_init_default {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_default {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_default {false, grpc_lb_v1_Timestamp_init_default, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
-#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_default {false, {0, {0}}, false, 0, false, "", false, 0}
#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
#define grpc_lb_v1_Timestamp_init_zero {false, 0, false, 0}
#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, false, 0, false, 0}
+#define grpc_lb_v1_ClientStatsPerToken_init_zero {{{NULL}, NULL}, false, 0}
+#define grpc_lb_v1_ClientStats_init_zero {false, grpc_lb_v1_Timestamp_init_zero, false, 0, false, 0, false, 0, false, 0, {{NULL}, NULL}}
#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
-#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0, false, 0}
+#define grpc_lb_v1_Server_init_zero {false, {0, {0}}, false, 0, false, "", false, 0}
/* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ClientStatsPerToken_load_balance_token_tag 1
+#define grpc_lb_v1_ClientStatsPerToken_num_calls_tag 2
#define grpc_lb_v1_Duration_seconds_tag 1
#define grpc_lb_v1_Duration_nanos_tag 2
#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
#define grpc_lb_v1_Server_ip_address_tag 1
#define grpc_lb_v1_Server_port_tag 2
#define grpc_lb_v1_Server_load_balance_token_tag 3
-#define grpc_lb_v1_Server_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_Server_drop_for_load_balancing_tag 5
+#define grpc_lb_v1_Server_drop_tag 4
#define grpc_lb_v1_Timestamp_seconds_tag 1
#define grpc_lb_v1_Timestamp_nanos_tag 2
#define grpc_lb_v1_ClientStats_timestamp_tag 1
#define grpc_lb_v1_ClientStats_num_calls_started_tag 2
#define grpc_lb_v1_ClientStats_num_calls_finished_tag 3
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_rate_limiting_tag 4
-#define grpc_lb_v1_ClientStats_num_calls_finished_with_drop_for_load_balancing_tag 5
#define grpc_lb_v1_ClientStats_num_calls_finished_with_client_failed_to_send_tag 6
#define grpc_lb_v1_ClientStats_num_calls_finished_known_received_tag 7
+#define grpc_lb_v1_ClientStats_calls_finished_with_drop_tag 8
#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 1
#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 2
#define grpc_lb_v1_ServerList_servers_tag 1
@@ -154,22 +158,24 @@ extern const pb_field_t grpc_lb_v1_Duration_fields[3];
extern const pb_field_t grpc_lb_v1_Timestamp_fields[3];
extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v1_ClientStats_fields[8];
+extern const pb_field_t grpc_lb_v1_ClientStatsPerToken_fields[3];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[7];
extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v1_Server_fields[6];
+extern const pb_field_t grpc_lb_v1_Server_fields[5];
/* Maximum encoded size of messages (where known) */
#define grpc_lb_v1_Duration_size 22
#define grpc_lb_v1_Timestamp_size 22
-#define grpc_lb_v1_LoadBalanceRequest_size 226
+#define grpc_lb_v1_LoadBalanceRequest_size (140 + grpc_lb_v1_ClientStats_size)
#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v1_ClientStats_size 90
+/* grpc_lb_v1_ClientStatsPerToken_size depends on runtime parameters */
+/* grpc_lb_v1_ClientStats_size depends on runtime parameters */
#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
/* grpc_lb_v1_ServerList_size depends on runtime parameters */
-#define grpc_lb_v1_Server_size 85
+#define grpc_lb_v1_Server_size 83
/* Message IDs (where set with "msgid" option) */
#ifdef PB_MSGID
diff --git a/src/proto/grpc/lb/v1/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto
index b13b3438cf..0a33568bd6 100644
--- a/src/proto/grpc/lb/v1/load_balancer.proto
+++ b/src/proto/grpc/lb/v1/load_balancer.proto
@@ -67,6 +67,15 @@ message InitialLoadBalanceRequest {
string name = 1;
}
+// Contains the number of calls finished for a particular load balance token.
+message ClientStatsPerToken {
+ // See Server.load_balance_token.
+ string load_balance_token = 1;
+
+ // The total number of RPCs that finished associated with the token.
+ int64 num_calls = 2;
+}
+
// Contains client level statistics that are useful to load balancing. Each
// count except the timestamp should be reset to zero after reporting the stats.
message ClientStats {
@@ -79,20 +88,17 @@ message ClientStats {
// The total number of RPCs that finished.
int64 num_calls_finished = 3;
- // The total number of RPCs that were dropped by the client because of rate
- // limiting.
- int64 num_calls_finished_with_drop_for_rate_limiting = 4;
-
- // The total number of RPCs that were dropped by the client because of load
- // balancing.
- int64 num_calls_finished_with_drop_for_load_balancing = 5;
-
// The total number of RPCs that failed to reach a server except dropped RPCs.
int64 num_calls_finished_with_client_failed_to_send = 6;
// The total number of RPCs that finished and are known to have been received
// by a server.
int64 num_calls_finished_known_received = 7;
+
+ // The list of dropped calls.
+ repeated ClientStatsPerToken calls_finished_with_drop = 8;
+
+ reserved 4, 5;
}
message LoadBalanceResponse {
@@ -134,10 +140,8 @@ message ServerList {
Duration expiration_interval = 3;
}
-// Contains server information. When none of the [drop_for_*] fields are true,
-// use the other fields. When drop_for_rate_limiting is true, ignore all other
-// fields. Use drop_for_load_balancing only when it is true and
-// drop_for_rate_limiting is false.
+// Contains server information. When the drop field is not true, use the other
+// fields.
message Server {
// A resolved address for the server, serialized in network-byte-order. It may
// either be an IPv4 or IPv6 address.
@@ -149,16 +153,16 @@ message Server {
// An opaque but printable token given to the frontend for each pick. All
// frontend requests for that pick must include the token in its initial
// metadata. The token is used by the backend to verify the request and to
- // allow the backend to report load to the gRPC LB system.
+ // allow the backend to report load to the gRPC LB system. The token is also
+ // used in client stats for reporting dropped calls.
//
// Its length is variable but less than 50 bytes.
string load_balance_token = 3;
- // Indicates whether this particular request should be dropped by the client
- // for rate limiting.
- bool drop_for_rate_limiting = 4;
+ // Indicates whether this particular request should be dropped by the client.
+ // If the request is dropped, there will be a corresponding entry in
+ // ClientStats.calls_finished_with_drop.
+ bool drop = 4;
- // Indicates whether this particular request should be dropped by the client
- // for load balancing.
- bool drop_for_load_balancing = 5;
+ reserved 5;
}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 1f3255d18d..e403ee66fc 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -45,6 +45,7 @@ extern "C" {
#include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
// TODO(dgq): Other scenarios in need of testing:
@@ -142,22 +143,20 @@ grpc::string Ip4ToPackedString(const char* ip_str) {
struct ClientStats {
size_t num_calls_started = 0;
size_t num_calls_finished = 0;
- size_t num_calls_finished_with_drop_for_rate_limiting = 0;
- size_t num_calls_finished_with_drop_for_load_balancing = 0;
size_t num_calls_finished_with_client_failed_to_send = 0;
size_t num_calls_finished_known_received = 0;
+ std::map<grpc::string, size_t> drop_token_counts;
ClientStats& operator+=(const ClientStats& other) {
num_calls_started += other.num_calls_started;
num_calls_finished += other.num_calls_finished;
- num_calls_finished_with_drop_for_rate_limiting +=
- other.num_calls_finished_with_drop_for_rate_limiting;
- num_calls_finished_with_drop_for_load_balancing +=
- other.num_calls_finished_with_drop_for_load_balancing;
num_calls_finished_with_client_failed_to_send +=
other.num_calls_finished_with_client_failed_to_send;
num_calls_finished_known_received +=
other.num_calls_finished_known_received;
+ for (const auto& p : other.drop_token_counts) {
+ drop_token_counts[p.first] += p.second;
+ }
return *this;
}
};
@@ -218,17 +217,17 @@ class BalancerServiceImpl : public BalancerService {
request.client_stats().num_calls_started();
client_stats_.num_calls_finished +=
request.client_stats().num_calls_finished();
- client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
- request.client_stats()
- .num_calls_finished_with_drop_for_rate_limiting();
- client_stats_.num_calls_finished_with_drop_for_load_balancing +=
- request.client_stats()
- .num_calls_finished_with_drop_for_load_balancing();
client_stats_.num_calls_finished_with_client_failed_to_send +=
request.client_stats()
.num_calls_finished_with_client_failed_to_send();
client_stats_.num_calls_finished_known_received +=
request.client_stats().num_calls_finished_known_received();
+ for (const auto& drop_token_count :
+ request.client_stats().calls_finished_with_drop()) {
+ client_stats_
+ .drop_token_counts[drop_token_count.load_balance_token()] +=
+ drop_token_count.num_calls();
+ }
load_report_cond_.notify_one();
}
done:
@@ -252,16 +251,15 @@ class BalancerServiceImpl : public BalancerService {
}
static LoadBalanceResponse BuildResponseForBackends(
- const std::vector<int>& backend_ports, int num_drops_for_rate_limiting,
- int num_drops_for_load_balancing) {
+ const std::vector<int>& backend_ports,
+ const std::map<grpc::string, size_t>& drop_token_counts) {
LoadBalanceResponse response;
- for (int i = 0; i < num_drops_for_rate_limiting; ++i) {
- auto* server = response.mutable_server_list()->add_servers();
- server->set_drop_for_rate_limiting(true);
- }
- for (int i = 0; i < num_drops_for_load_balancing; ++i) {
- auto* server = response.mutable_server_list()->add_servers();
- server->set_drop_for_load_balancing(true);
+ for (const auto& drop_token_count : drop_token_counts) {
+ for (size_t i = 0; i < drop_token_count.second; ++i) {
+ auto* server = response.mutable_server_list()->add_servers();
+ server->set_drop(true);
+ server->set_load_balance_token(drop_token_count.first);
+ }
}
for (const int& backend_port : backend_ports) {
auto* server = response.mutable_server_list()->add_servers();
@@ -499,7 +497,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest {
TEST_F(SingleBalancerTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Make sure that trying to connect works without a call.
channel_->GetState(true /* try_to_connect */);
@@ -538,7 +536,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
// Send non-empty serverlist only after kServerlistDelayMs
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
const auto t0 = system_clock::now();
@@ -580,11 +578,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) {
// Send a serverlist right away.
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// ... and the same one a bit later.
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
kServerlistDelayMs);
// Send num_backends/2 requests.
@@ -648,10 +646,9 @@ TEST_F(UpdatesTest, UpdateBalancers) {
const std::vector<int> first_backend{GetBackendPorts()[0]};
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -726,10 +723,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) {
const std::vector<int> second_backend{GetBackendPorts()[0]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -809,10 +805,9 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
const std::vector<int> second_backend{GetBackendPorts()[1]};
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0);
ScheduleResponseForBalancer(
- 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0),
- 0);
+ 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0);
// Start servers and send 10 RPCs per server.
gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH ==========");
@@ -901,7 +896,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) {
TEST_F(SingleBalancerTest, Drop) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2),
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@@ -936,7 +932,9 @@ TEST_F(SingleBalancerTest, Drop) {
TEST_F(SingleBalancerTest, DropAllFirst) {
// All registered addresses are marked as "drop".
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 0);
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+ 0);
const auto& statuses_and_responses = SendRpc(kMessage_, 1);
for (const auto& status_and_response : statuses_and_responses) {
const Status& status = status_and_response.first;
@@ -947,10 +945,12 @@ TEST_F(SingleBalancerTest, DropAllFirst) {
TEST_F(SingleBalancerTest, DropAll) {
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 1000);
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ {}, {{"rate_limiting", 1}, {"load_balancing", 1}}),
+ 1000);
// First call succeeds.
auto statuses_and_responses = SendRpc(kMessage_, 1);
@@ -980,7 +980,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
const size_t kNumRpcsPerAddress = 100;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0),
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}),
0);
// Send 100 RPCs per server.
const auto& statuses_and_responses =
@@ -1009,17 +1009,17 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished);
- EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
- EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
+ EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre());
}
TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
const size_t kNumRpcsPerAddress = 3;
ScheduleResponseForBalancer(
- 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1),
+ 0, BalancerServiceImpl::BuildResponseForBackends(
+ GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}),
0);
// Send 100 RPCs for each server and drop address.
const auto& statuses_and_responses =
@@ -1056,13 +1056,13 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
client_stats.num_calls_started);
EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3),
client_stats.num_calls_finished);
- EXPECT_EQ(kNumRpcsPerAddress * 2,
- client_stats.num_calls_finished_with_drop_for_rate_limiting);
- EXPECT_EQ(kNumRpcsPerAddress,
- client_stats.num_calls_finished_with_drop_for_load_balancing);
EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
EXPECT_EQ(kNumRpcsPerAddress * num_backends_,
client_stats.num_calls_finished_known_received);
+ EXPECT_THAT(client_stats.drop_token_counts,
+ ::testing::ElementsAre(
+ ::testing::Pair("load_balancing", kNumRpcsPerAddress),
+ ::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2)));
}
} // namespace
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index cec9666503..6b0350e1f9 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -91,13 +91,13 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
auto* server = serverlist->add_servers();
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(12345);
- server->set_drop_for_rate_limiting(true);
- server->set_drop_for_load_balancing(false);
+ server->set_load_balance_token("rate_limting");
+ server->set_drop(true);
server = response.mutable_server_list()->add_servers();
server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
server->set_port(54321);
- server->set_drop_for_rate_limiting(false);
- server->set_drop_for_load_balancing(true);
+ server->set_load_balance_token("load_balancing");
+ server->set_drop(true);
auto* expiration_interval = serverlist->mutable_expiration_interval();
expiration_interval->set_seconds(888);
expiration_interval->set_nanos(999);
@@ -112,14 +112,14 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
"127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
- EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting);
- EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing);
+ EXPECT_STREQ(c_serverlist->servers[0]->load_balance_token, "rate_limting");
+ EXPECT_TRUE(c_serverlist->servers[0]->drop);
EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
- EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting);
- EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing);
+ EXPECT_STREQ(c_serverlist->servers[1]->load_balance_token, "load_balancing");
+ EXPECT_TRUE(c_serverlist->servers[1]->drop);
EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds);
EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888);