diff options
author | Mark D. Roth <roth@google.com> | 2017-05-17 12:22:17 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2017-05-17 12:22:17 -0700 |
commit | d7389b418f9ea077e4ab3de2b53362ab02ab6870 (patch) | |
tree | 98c664ca97d9593e99e9d17df4bbd95c87febfcc | |
parent | 90224ba893324fad358934eb2a4487e33c6a3baf (diff) |
Implement grpclb drop support.
6 files changed, 287 insertions, 114 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index 24843d52e9..f2f27b9175 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -914,10 +914,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); if (calld->connected_subchannel == NULL) { - gpr_atm_no_barrier_store(&calld->subchannel_call, 1); + gpr_atm_no_barrier_store(&calld->subchannel_call, (gpr_atm)CANCELLED_CALL); fail_locked(exec_ctx, calld, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Failed to create subchannel", &error, 1)); + error == GRPC_ERROR_NONE + ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy") + : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Failed to create subchannel", &error, 1)); } else if (GET_CALL(calld) == CANCELLED_CALL) { /* already cancelled before subchannel became ready */ grpc_error *cancellation_error = @@ -1180,6 +1183,15 @@ static void start_transport_stream_op_batch_locked_inner( &calld->next_step)) { calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + if (calld->connected_subchannel == NULL) { + gpr_atm_no_barrier_store(&calld->subchannel_call, + (gpr_atm)CANCELLED_CALL); + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Call dropped by load balancing policy"); + fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + return; // Early out. + } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 184b2ef720..fb4aa084a6 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -149,7 +149,9 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, /** Finds an appropriate subchannel for a call, based on \a pick_args. - \a target will be set to the selected subchannel, or NULL on failure. + \a target will be set to the selected subchannel, or NULL on failure + or when the LB policy decides to drop the call. + Upon success, \a user_data will be set to whatever opaque information may need to be propagated from the LB policy, or NULL if not needed. \a context will be populated with context to pass to the subchannel diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index b7c0e929b7..d2a2856a18 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -327,6 +327,11 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; + /** Index into serverlist for next pick. + * If the server at this index is a drop, we return a drop. + * Otherwise, we delegate to the RR policy. */ + size_t serverlist_index; + /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; @@ -402,6 +407,9 @@ struct rr_connectivity_data { static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, bool log) { + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + return false; + } const grpc_grpclb_ip_address *ip = &server->ip_address; if (server->port >> 16 != 0) { if (log) { @@ -411,7 +419,6 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, } return false; } - if (ip->size != 4 && ip->size != 16) { if (log) { gpr_log(GPR_ERROR, @@ -445,11 +452,12 @@ static const grpc_lb_user_data_vtable lb_token_vtable = { static void parse_server(const grpc_grpclb_server *server, grpc_resolved_address *addr) { + memset(addr, 0, sizeof(*addr)); + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) return; const uint16_t netorder_port = htons((uint16_t)server->port); /* the addresses are given in binary format (a in(6)_addr struct) in * server->ip_address.bytes. */ const grpc_grpclb_ip_address *ip = &server->ip_address; - memset(addr, 0, sizeof(*addr)); if (ip->size == 4) { addr->len = sizeof(struct sockaddr_in); struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr; @@ -586,16 +594,51 @@ static bool update_lb_connectivity_status_locked( return true; } -/* perform a pick over \a rr_policy. Given that a pick can return immediately - * (ignoring its completion callback) we need to perform the cleanups this - * callback would be otherwise resposible for */ +/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return + * immediately (ignoring its completion callback), we need to perform the + * cleanups this callback would otherwise be resposible for. + * If \a force_async is true, then we will manually schedule the + * completion callback even if the pick is available immediately. */ static bool pick_from_internal_rr_locked( - grpc_exec_ctx *exec_ctx, grpc_lb_policy *rr_policy, - const grpc_lb_policy_pick_args *pick_args, + grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, + const grpc_lb_policy_pick_args *pick_args, bool force_async, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { - GPR_ASSERT(rr_policy != NULL); + // Look at the index into the serverlist to see if we should drop this call. + grpc_grpclb_server *server = + glb_policy->serverlist->servers[glb_policy->serverlist_index++]; + if (glb_policy->serverlist_index == glb_policy->serverlist->num_servers) { + glb_policy->serverlist_index = 0; // Wrap-around. + } + if (server->drop_for_rate_limiting || server->drop_for_load_balancing) { + // Not using the RR policy, so unref it. + if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { + gpr_log(GPR_INFO, "Unreffing RR for drop (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); + // Update client load reporting stats to indicate the number of + // dropped calls. Note that we have to do this here instead of in + // the client_load_reporting filter, because we do not create a + // subchannel call (and therefore no client_load_reporting filter) + // for dropped calls. + grpc_grpclb_client_stats_add_call_started(wc_arg->client_stats); + grpc_grpclb_client_stats_add_call_finished( + server->drop_for_rate_limiting, server->drop_for_load_balancing, + false /* failed_to_send */, false /* known_received */, + wc_arg->client_stats); + grpc_grpclb_client_stats_unref(wc_arg->client_stats); + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } + gpr_free(wc_arg->free_when_done); + return true; + } + // Pick via the RR policy. const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, rr_policy, pick_args, target, wc_arg->context, + exec_ctx, wc_arg->rr_policy, pick_args, target, wc_arg->context, (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ @@ -604,17 +647,20 @@ static bool pick_from_internal_rr_locked( (intptr_t)wc_arg->rr_policy); } GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); - /* add the load reporting initial metadata */ initial_metadata_add_lb_token(exec_ctx, pick_args->initial_metadata, pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - // Pass on client stats via context. Passes ownership of the reference. GPR_ASSERT(wc_arg->client_stats != NULL); wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; - + if (force_async) { + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_closure_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_NONE); + gpr_free(wc_arg->free_when_done); + return false; + } gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the @@ -744,8 +790,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - &pp->pick_args, pp->target, + pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, + true /* force_async */, pp->target, &pp->wrapped_on_complete_arg); } @@ -1115,8 +1161,9 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; wc_arg->free_when_done = wc_arg; - pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, - pick_args, target, wc_arg); + pick_done = + pick_from_internal_rr_locked(exec_ctx, glb_policy, pick_args, + false /* force_async */, target, wc_arg); } else { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, @@ -1517,7 +1564,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, * serverlist instance will be destroyed either upon the next * update or in glb_destroy() */ glb_policy->serverlist = serverlist; - + glb_policy->serverlist_index = 0; rr_handover_locked(exec_ctx, glb_policy); } } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 81b6932fae..90e7c2efe5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -37,44 +37,39 @@ #include <grpc/support/alloc.h> +/* invoked once for every Server in ServerList */ +static bool count_serverlist(pb_istream_t *stream, const pb_field_t *field, + void **arg) { + grpc_grpclb_serverlist *sl = *arg; + grpc_grpclb_server server; + if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; + } + ++sl->num_servers; + return true; +} + typedef struct decode_serverlist_arg { - /* The first pass counts the number of servers in the server list. The second - * one allocates and decodes. */ - bool first_pass; /* The decoding callback is invoked once per server in serverlist. Remember * which index of the serverlist are we currently decoding */ size_t decoding_idx; - /* Populated after the first pass. Number of server in the input serverlist */ - size_t num_servers; /* The decoded serverlist */ - grpc_grpclb_server **servers; + grpc_grpclb_serverlist *serverlist; } decode_serverlist_arg; /* invoked once for every Server in ServerList */ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, void **arg) { decode_serverlist_arg *dec_arg = *arg; - if (dec_arg->first_pass) { /* count how many server do we have */ - grpc_grpclb_server server; - if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->num_servers++; - } else { /* second pass. Actually decode. */ - grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); - GPR_ASSERT(dec_arg->num_servers > 0); - if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */ - dec_arg->servers = - gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers); - } - if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); - return false; - } - dec_arg->servers[dec_arg->decoding_idx++] = server; + GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); + grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); + if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) { + gpr_free(server); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); + return false; } - + dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; return true; } @@ -165,36 +160,38 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( grpc_slice encoded_grpc_grpclb_response) { - bool status; - decode_serverlist_arg arg; pb_istream_t stream = pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_grpc_grpclb_response), GRPC_SLICE_LENGTH(encoded_grpc_grpclb_response)); pb_istream_t stream_at_start = stream; + grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); - memset(&arg, 0, sizeof(decode_serverlist_arg)); - - res.server_list.servers.funcs.decode = decode_serverlist; - res.server_list.servers.arg = &arg; - arg.first_pass = true; - status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); + // First pass: count number of servers. + res.server_list.servers.funcs.decode = count_serverlist; + res.server_list.servers.arg = sl; + bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); if (!status) { + gpr_free(sl); gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } - - arg.first_pass = false; - status = - pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, &res); - if (!status) { - gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); - return NULL; + // Second pass: populate servers. + if (sl->num_servers > 0) { + sl->servers = gpr_zalloc(sizeof(grpc_grpclb_server *) * sl->num_servers); + decode_serverlist_arg decode_arg; + memset(&decode_arg, 0, sizeof(decode_arg)); + decode_arg.serverlist = sl; + res.server_list.servers.funcs.decode = decode_serverlist; + res.server_list.servers.arg = &decode_arg; + status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, + &res); + if (!status) { + grpc_grpclb_destroy_serverlist(sl); + gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); + return NULL; + } } - - grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); - sl->num_servers = arg.num_servers; - sl->servers = arg.servers; if (res.server_list.has_expiration_interval) { sl->expiration_interval = res.server_list.expiration_interval; } @@ -228,7 +225,7 @@ grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, const grpc_grpclb_serverlist *rhs) { - if ((lhs == NULL) || (rhs == NULL)) { + if (lhs == NULL || rhs == NULL) { return false; } if (lhs->num_servers != rhs->num_servers) { diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index 06873821bd..7f596ce1f1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -51,7 +51,7 @@ typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request; typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response; typedef grpc_lb_v1_Server grpc_grpclb_server; typedef grpc_lb_v1_Duration grpc_grpclb_duration; -typedef struct grpc_grpclb_serverlist { +typedef struct { grpc_grpclb_server **servers; size_t num_servers; grpc_grpclb_duration expiration_interval; diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 8417f1a99c..4e1bcc7a60 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -98,12 +98,12 @@ namespace { template <typename ServiceType> class CountedService : public ServiceType { public: - int request_count() { + size_t request_count() { std::unique_lock<std::mutex> lock(mu_); return request_count_; } - int response_count() { + size_t response_count() { std::unique_lock<std::mutex> lock(mu_); return response_count_; } @@ -121,8 +121,8 @@ class CountedService : public ServiceType { std::mutex mu_; private: - int request_count_ = 0; - int response_count_ = 0; + size_t request_count_ = 0; + size_t response_count_ = 0; }; using BackendService = CountedService<TestServiceImpl>; @@ -243,9 +243,18 @@ class BalancerServiceImpl : public BalancerService { } static LoadBalanceResponse BuildResponseForBackends( - const std::vector<int>& backend_ports) { + const std::vector<int>& backend_ports, int num_drops_for_rate_limiting, + int num_drops_for_load_balancing) { LoadBalanceResponse response; - for (const int backend_port : backend_ports) { + for (int i = 0; i < num_drops_for_rate_limiting; ++i) { + auto* server = response.mutable_server_list()->add_servers(); + server->set_drop_for_rate_limiting(true); + } + for (int i = 0; i < num_drops_for_load_balancing; ++i) { + auto* server = response.mutable_server_list()->add_servers(); + server->set_drop_for_load_balancing(true); + } + for (const int& backend_port : backend_ports) { auto* server = response.mutable_server_list()->add_servers(); server->set_ip_address(Ip4ToPackedString("127.0.0.1")); server->set_port(backend_port); @@ -327,10 +336,8 @@ class GrpclbEnd2endTest : public ::testing::Test { ChannelArguments args; args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR, response_generator_); - std::ostringstream uri; - uri << "test:///servername_not_used"; - channel_ = - CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args); + channel_ = CreateCustomChannel("test:///not_used", + InsecureChannelCredentials(), args); stub_ = grpc::testing::EchoTestService::NewStub(channel_); } @@ -467,26 +474,33 @@ class SingleBalancerTest : public GrpclbEnd2endTest { }; TEST_F(SingleBalancerTest, Vanilla) { + const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); - // Start servers and send 100 RPCs per server. - const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_); + // Send 100 RPCs per server. + const auto& statuses_and_responses = + SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); } // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(100, backend_servers_[i].service_->request_count()); + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); } // The balancer got a single request. - EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. - EXPECT_EQ(1, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); @@ -500,7 +514,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0); // Send non-empty serverlist only after kServerlistDelayMs ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), kServerlistDelayMs); const auto t0 = system_clock::now(); @@ -518,17 +532,20 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { // Each backend should have gotten 1 request. for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(1, backend_servers_[i].service_->request_count()); + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); } for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); } // The balancer got a single request. - EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent two responses. - EXPECT_EQ(2, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); @@ -539,10 +556,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { // Send a serverlist right away. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0); // ... and the same one a bit later. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), kServerlistDelayMs); // Send num_backends/2 requests. @@ -550,14 +568,17 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { // only the first half of the backends will receive them. for (size_t i = 0; i < backends_.size(); ++i) { if (i < backends_.size() / 2) - EXPECT_EQ(1, backend_servers_[i].service_->request_count()); + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); else - EXPECT_EQ(0, backend_servers_[i].service_->request_count()); + EXPECT_EQ(0U, backend_servers_[i].service_->request_count()); } EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); } // Wait for the (duplicated) serverlist update. @@ -566,7 +587,7 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN))); // Verify the LB has sent two responses. - EXPECT_EQ(2, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(2U, balancer_servers_[0].service_->response_count()); // Some more calls to complete the total number of backends. statuses_and_responses = SendRpc( @@ -575,52 +596,146 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { // Because a duplicated serverlist should have no effect, all backends must // have been hit once now. for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(1, backend_servers_[i].service_->request_count()); + EXPECT_EQ(1U, backend_servers_[i].service_->request_count()); } EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2); for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); } // The balancer got a single request. - EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // Check LB policy name for the channel. EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, Drop) { + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2), + 0); + // Send 100 RPCs for each server and drop address. + const auto& statuses_and_responses = + SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); + + size_t num_drops = 0; + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); + } + } + EXPECT_EQ(kNumRpcsPerAddress * 3, num_drops); + + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); +} + class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { public: SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {} }; TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { + const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0); - // Start servers and send 100 RPCs per server. - const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_); + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0); + // Send 100 RPCs per server. + const auto& statuses_and_responses = + SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); for (const auto& status_and_response : statuses_and_responses) { - EXPECT_TRUE(status_and_response.first.ok()); - EXPECT_EQ(status_and_response.second.message(), kMessage_); + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); } // Each backend should have gotten 100 requests. for (size_t i = 0; i < backends_.size(); ++i) { - EXPECT_EQ(100, backend_servers_[i].service_->request_count()); + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); } // The balancer got a single request. - EXPECT_EQ(1, balancer_servers_[0].service_->request_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); // and sent a single response. - EXPECT_EQ(1, balancer_servers_[0].service_->response_count()); + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); const ClientStats client_stats = WaitForLoadReports(); - EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started); - EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished); + EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started); + EXPECT_EQ(kNumRpcsPerAddress * num_backends_, + client_stats.num_calls_finished); EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting); EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); - EXPECT_EQ(100 * num_backends_, + EXPECT_EQ(kNumRpcsPerAddress * num_backends_, + client_stats.num_calls_finished_known_received); +} + +TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { + const size_t kNumRpcsPerAddress = 3; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1), + 0); + // Send 100 RPCs for each server and drop address. + const auto& statuses_and_responses = + SendRpc(kMessage_, kNumRpcsPerAddress * (num_backends_ + 3)); + + size_t num_drops = 0; + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + if (!status.ok() && + status.error_message() == "Call dropped by load balancing policy") { + ++num_drops; + } else { + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); + } + } + EXPECT_EQ(kNumRpcsPerAddress * 3, num_drops); + + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + + const ClientStats client_stats = WaitForLoadReports(); + EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3), + client_stats.num_calls_started); + EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3), + client_stats.num_calls_finished); + EXPECT_EQ(kNumRpcsPerAddress * 2, + client_stats.num_calls_finished_with_drop_for_rate_limiting); + EXPECT_EQ(kNumRpcsPerAddress, + client_stats.num_calls_finished_with_drop_for_load_balancing); + EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); + EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished_known_received); } |