diff options
Diffstat (limited to 'test/cpp/grpclb')
-rw-r--r-- | test/cpp/grpclb/grpclb_api_test.cc | 20 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_test.cc | 279 |
2 files changed, 183 insertions, 116 deletions
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index e67189c69e..82ccf436f8 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -63,7 +63,7 @@ grpc::string PackedStringToIp(const grpc_grpclb_ip_address& pb_ip) { } else { abort(); } - GPR_ASSERT(inet_ntop(af, pb_ip.bytes, ip_str, 46) != NULL); + GPR_ASSERT(inet_ntop(af, (void*)pb_ip.bytes, ip_str, 46) != NULL); return ip_str; } @@ -71,12 +71,12 @@ TEST_F(GrpclbTest, CreateRequest) { const grpc::string service_name = "AServiceName"; LoadBalanceRequest request; grpc_grpclb_request* c_req = grpc_grpclb_request_create(service_name.c_str()); - gpr_slice slice = grpc_grpclb_request_encode(c_req); - const int num_bytes_written = GPR_SLICE_LENGTH(slice); + grpc_slice slice = grpc_grpclb_request_encode(c_req); + const int num_bytes_written = GRPC_SLICE_LENGTH(slice); EXPECT_GT(num_bytes_written, 0); - request.ParseFromArray(GPR_SLICE_START_PTR(slice), num_bytes_written); + request.ParseFromArray(GRPC_SLICE_START_PTR(slice), num_bytes_written); EXPECT_EQ(request.initial_request().name(), service_name); - gpr_slice_unref(slice); + grpc_slice_unref(slice); grpc_grpclb_request_destroy(c_req); } @@ -88,15 +88,15 @@ TEST_F(GrpclbTest, ParseInitialResponse) { client_stats_report_interval->set_seconds(123); client_stats_report_interval->set_nanos(456); const grpc::string encoded_response = response.SerializeAsString(); - gpr_slice encoded_slice = - gpr_slice_from_copied_string(encoded_response.c_str()); + grpc_slice encoded_slice = + grpc_slice_from_copied_string(encoded_response.c_str()); grpc_grpclb_initial_response* c_initial_response = grpc_grpclb_initial_response_parse(encoded_slice); EXPECT_FALSE(c_initial_response->has_load_balancer_delegate); EXPECT_EQ(c_initial_response->client_stats_report_interval.seconds, 123); EXPECT_EQ(c_initial_response->client_stats_report_interval.nanos, 456); - gpr_slice_unref(encoded_slice); + grpc_slice_unref(encoded_slice); grpc_grpclb_initial_response_destroy(c_initial_response); } @@ -116,7 +116,7 @@ TEST_F(GrpclbTest, ParseResponseServerList) { expiration_interval->set_nanos(999); const grpc::string encoded_response = response.SerializeAsString(); - const gpr_slice encoded_slice = gpr_slice_from_copied_buffer( + const grpc_slice encoded_slice = grpc_slice_from_copied_buffer( encoded_response.data(), encoded_response.size()); grpc_grpclb_serverlist* c_serverlist = grpc_grpclb_response_parse_serverlist(encoded_slice); @@ -137,7 +137,7 @@ TEST_F(GrpclbTest, ParseResponseServerList) { EXPECT_TRUE(c_serverlist->expiration_interval.has_nanos); EXPECT_EQ(c_serverlist->expiration_interval.nanos, 999); - gpr_slice_unref(encoded_slice); + grpc_slice_unref(encoded_slice); grpc_grpclb_destroy_serverlist(c_serverlist); } diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 80f2fa4f4d..89ed9249ad 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -51,9 +51,11 @@ #include <grpc++/impl/codegen/config.h> extern "C" { -#include "src/core/ext/client_config/client_channel.h" +#include "src/core/ext/client_channel/client_channel.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/security/credentials/fake/fake_credentials.h" #include "src/core/lib/support/string.h" #include "src/core/lib/support/tmpfile.h" #include "src/core/lib/surface/channel.h" @@ -76,10 +78,25 @@ extern "C" { // - Send a serverlist with faulty ip:port addresses (port > 2^16, etc). // - Test reception of invalid serverlist // - Test pinging -// - Test against a non-LB server. That server should return UNIMPLEMENTED and -// the call should fail. +// - Test against a non-LB server. // - Random LB server closing the stream unexpectedly. // - Test using DNS-resolvable names (localhost?) +// - Test handling of creation of faulty RR instance by having the LB return a +// serverlist with non-existent backends after having initially returned a +// valid one. +// +// Findings from end to end testing to be covered here: +// - Handling of LB servers restart, including reconnection after backing-off +// retries. +// - Destruction of load balanced channel (and therefore of grpclb instance) +// while: +// 1) the internal LB call is still active. This should work by virtue +// of the weak reference the LB call holds. The call should be terminated as +// part of the grpclb shutdown process. +// 2) the retry timer is active. Again, the weak reference it holds should +// prevent a premature call to \a glb_destroy. +// - Restart of backend servers with no changes to serverlist. This exercises +// the RR handover mechanism. namespace grpc { namespace { @@ -95,7 +112,9 @@ typedef struct server_fixture { grpc_call *server_call; grpc_completion_queue *cq; char *servers_hostport; + const char *balancer_name; int port; + const char *lb_token_prefix; gpr_thd_id tid; int num_calls_serviced; } server_fixture; @@ -109,9 +128,10 @@ typedef struct test_fixture { static void *tag(intptr_t t) { return (void *)t; } -static gpr_slice build_response_payload_slice( +static grpc_slice build_response_payload_slice( const char *host, int *ports, size_t nports, - int64_t expiration_interval_secs, int32_t expiration_interval_nanos) { + int64_t expiration_interval_secs, int32_t expiration_interval_nanos, + const char *token_prefix) { // server_list { // servers { // ip_address: <in_addr/6 bytes of an IP> @@ -138,23 +158,22 @@ static gpr_slice build_response_payload_slice( struct in_addr ip4; GPR_ASSERT(inet_pton(AF_INET, host, &ip4) == 1); server->set_ip_address( - grpc::string(reinterpret_cast<const char *>(&ip4), sizeof(ip4))); + string(reinterpret_cast<const char *>(&ip4), sizeof(ip4))); server->set_port(ports[i]); - // The following long long int cast is meant to work around the - // disfunctional implementation of std::to_string in gcc 4.4, which doesn't - // have a version for int but does have one for long long int. - string token_data = "token" + std::to_string((long long int)ports[i]); - token_data.resize(64, '-'); - server->set_load_balance_token(token_data); + // Missing tokens are acceptable. Test that path. + if (strlen(token_prefix) > 0) { + string token_data = token_prefix + std::to_string(ports[i]); + server->set_load_balance_token(token_data); + } } - const grpc::string &enc_resp = response.SerializeAsString(); - return gpr_slice_from_copied_buffer(enc_resp.data(), enc_resp.size()); + const string &enc_resp = response.SerializeAsString(); + return grpc_slice_from_copied_buffer(enc_resp.data(), enc_resp.size()); } static void drain_cq(grpc_completion_queue *cq) { grpc_event ev; do { - ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + ev = grpc_completion_queue_next(cq, grpc_timeout_seconds_to_deadline(5), NULL); } while (ev.type != GRPC_QUEUE_SHUTDOWN); } @@ -185,10 +204,12 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, &request_metadata_recv, sf->cq, sf->cq, tag(200)); GPR_ASSERT(GRPC_CALL_OK == error); - gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport); + gpr_log(GPR_INFO, "LB Server[%s](%s) up", sf->servers_hostport, + sf->balancer_name); CQ_EXPECT_COMPLETION(cqv, tag(200), 1); cq_verify(cqv); - gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport); + gpr_log(GPR_INFO, "LB Server[%s](%s) after tag 200", sf->servers_hostport, + sf->balancer_name); // make sure we've received the initial metadata from the grpclb request. GPR_ASSERT(request_metadata_recv.count > 0); @@ -197,7 +218,7 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, // receive request for backends op = ops; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &request_payload_recv; + op->data.recv_message.recv_message = &request_payload_recv; op->flags = 0; op->reserved = NULL; op++; @@ -205,22 +226,23 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, GPR_ASSERT(GRPC_CALL_OK == error); CQ_EXPECT_COMPLETION(cqv, tag(202), 1); cq_verify(cqv); - gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport); + gpr_log(GPR_INFO, "LB Server[%s](%s) after RECV_MSG", sf->servers_hostport, + sf->balancer_name); // validate initial request. grpc_byte_buffer_reader bbr; grpc_byte_buffer_reader_init(&bbr, request_payload_recv); - gpr_slice request_payload_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_slice request_payload_slice = grpc_byte_buffer_reader_readall(&bbr); grpc::lb::v1::LoadBalanceRequest request; - request.ParseFromArray(GPR_SLICE_START_PTR(request_payload_slice), - GPR_SLICE_LENGTH(request_payload_slice)); + request.ParseFromArray(GRPC_SLICE_START_PTR(request_payload_slice), + GRPC_SLICE_LENGTH(request_payload_slice)); GPR_ASSERT(request.has_initial_request()); GPR_ASSERT(request.initial_request().name() == sf->servers_hostport); - gpr_slice_unref(request_payload_slice); + grpc_slice_unref(request_payload_slice); grpc_byte_buffer_reader_destroy(&bbr); grpc_byte_buffer_destroy(request_payload_recv); - gpr_slice response_payload_slice; + grpc_slice response_payload_slice; op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; @@ -234,25 +256,26 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport); + gpr_log(GPR_INFO, "LB Server[%s](%s) after tag 201", sf->servers_hostport, + sf->balancer_name); for (int i = 0; i < 2; i++) { if (i == 0) { // First half of the ports. - response_payload_slice = - build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1); + response_payload_slice = build_response_payload_slice( + "127.0.0.1", ports, nports / 2, -1, -1, sf->lb_token_prefix); } else { // Second half of the ports. sleep_ms(update_delay_ms); - response_payload_slice = - build_response_payload_slice("127.0.0.1", ports + (nports / 2), - (nports + 1) / 2 /* ceil */, -1, -1); + response_payload_slice = build_response_payload_slice( + "127.0.0.1", ports + (nports / 2), (nports + 1) / 2 /* ceil */, -1, + -1, "" /* this half doesn't get to receive an LB token */); } response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); op = ops; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = response_payload; + op->data.send_message.send_message = response_payload; op->flags = 0; op->reserved = NULL; op++; @@ -260,19 +283,21 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, GPR_ASSERT(GRPC_CALL_OK == error); CQ_EXPECT_COMPLETION(cqv, tag(203), 1); cq_verify(cqv); - gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d", - sf->servers_hostport, i); + gpr_log(GPR_INFO, "LB Server[%s](%s) after SEND_MESSAGE, iter %d", + sf->servers_hostport, sf->balancer_name, i); grpc_byte_buffer_destroy(response_payload); - gpr_slice_unref(response_payload_slice); + grpc_slice_unref(response_payload_slice); } - gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport); + gpr_log(GPR_INFO, "LB Server[%s](%s) shutting down", sf->servers_hostport, + sf->balancer_name); op = ops; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; - op->data.send_status_from_server.status_details = "xyz"; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; op->flags = 0; op->reserved = NULL; op++; @@ -282,8 +307,8 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, CQ_EXPECT_COMPLETION(cqv, tag(201), 1); CQ_EXPECT_COMPLETION(cqv, tag(204), 1); cq_verify(cqv); - gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out", - sf->servers_hostport); + gpr_log(GPR_INFO, "LB Server[%s](%s) after tag 204. All done. LB server out", + sf->servers_hostport, sf->balancer_name); grpc_call_destroy(s); @@ -319,7 +344,7 @@ static void start_backend_server(server_fixture *sf) { GPR_ASSERT(GRPC_CALL_OK == error); gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport); ev = grpc_completion_queue_next(sf->cq, - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL); + grpc_timeout_seconds_to_deadline(60), NULL); if (!ev.success) { gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport); cq_verifier_destroy(cqv); @@ -328,12 +353,9 @@ static void start_backend_server(server_fixture *sf) { return; } GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); - - // The following long long int cast is meant to work around the - // disfunctional implementation of std::to_string in gcc 4.4, which doesn't - // have a version for int but does have one for long long int. - string expected_token = "token" + std::to_string((long long int)sf->port); - expected_token.resize(64, '-'); + const string expected_token = + strlen(sf->lb_token_prefix) == 0 ? "" : sf->lb_token_prefix + + std::to_string(sf->port); GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token", expected_token.c_str())); @@ -355,18 +377,18 @@ static void start_backend_server(server_fixture *sf) { gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport); bool exit = false; - gpr_slice response_payload_slice = gpr_slice_from_copied_string(PAYLOAD); + grpc_slice response_payload_slice = grpc_slice_from_copied_string(PAYLOAD); while (!exit) { op = ops; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &request_payload_recv; + op->data.recv_message.recv_message = &request_payload_recv; op->flags = 0; op->reserved = NULL; op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); GPR_ASSERT(GRPC_CALL_OK == error); ev = grpc_completion_queue_next( - sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); + sf->cq, grpc_timeout_seconds_to_deadline(3), NULL); if (ev.type == GRPC_OP_COMPLETE && ev.success) { GPR_ASSERT(ev.tag = tag(102)); if (request_payload_recv == NULL) { @@ -388,7 +410,7 @@ static void start_backend_server(server_fixture *sf) { grpc_raw_byte_buffer_create(&response_payload_slice, 1); op = ops; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = response_payload; + op->data.send_message.send_message = response_payload; op->flags = 0; op->reserved = NULL; op++; @@ -396,7 +418,7 @@ static void start_backend_server(server_fixture *sf) { grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); GPR_ASSERT(GRPC_CALL_OK == error); ev = grpc_completion_queue_next( - sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); + sf->cq, grpc_timeout_seconds_to_deadline(3), NULL); if (ev.type == GRPC_OP_COMPLETE && ev.success) { GPR_ASSERT(ev.tag = tag(103)); } else { @@ -414,13 +436,15 @@ static void start_backend_server(server_fixture *sf) { ++sf->num_calls_serviced; gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport); - gpr_slice_unref(response_payload_slice); + grpc_slice_unref(response_payload_slice); op = ops; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; - op->data.send_status_from_server.status_details = "Backend server out a-ok"; + grpc_slice status_details = + grpc_slice_from_static_string("Backend server out a-ok"); + op->data.send_status_from_server.status_details = &status_details; op->flags = 0; op->reserved = NULL; op++; @@ -449,18 +473,20 @@ static void perform_request(client_fixture *cf) { grpc_metadata_array trailing_metadata_recv; grpc_status_code status; grpc_call_error error; - char *details = NULL; - size_t details_capacity = 0; + grpc_slice details; grpc_byte_buffer *request_payload; grpc_byte_buffer *response_payload_recv; int i; memset(ops, 0, sizeof(ops)); - gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); + grpc_slice request_payload_slice = + grpc_slice_from_copied_string("hello world"); + grpc_slice host = grpc_slice_from_static_string("foo.test.google.fr:1234"); c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS, - cf->cq, "/foo", "foo.test.google.fr:1234", - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL); + cf->cq, grpc_slice_from_static_string("/foo"), + &host, grpc_timeout_seconds_to_deadline(5), + NULL); gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c); GPR_ASSERT(c); char *peer; @@ -475,7 +501,7 @@ static void perform_request(client_fixture *cf) { op->reserved = NULL; op++; op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &initial_metadata_recv; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; @@ -483,7 +509,6 @@ static void perform_request(client_fixture *cf) { op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; op->data.recv_status_on_client.status = &status; op->data.recv_status_on_client.status_details = &details; - op->data.recv_status_on_client.status_details_capacity = &details_capacity; op->flags = 0; op->reserved = NULL; op++; @@ -495,12 +520,12 @@ static void perform_request(client_fixture *cf) { op = ops; op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = request_payload; + op->data.send_message.send_message = request_payload; op->flags = 0; op->reserved = NULL; op++; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &response_payload_recv; + op->data.recv_message.recv_message = &response_payload_recv; op->flags = 0; op->reserved = NULL; op++; @@ -509,13 +534,14 @@ static void perform_request(client_fixture *cf) { CQ_EXPECT_COMPLETION(cqv, tag(2), 1); cq_verify(cqv); + gpr_log(GPR_INFO, "Client after sending msg %d / 4", i + 1); GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD)); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(response_payload_recv); } - gpr_slice_unref(request_payload_slice); + grpc_slice_unref(request_payload_slice); op = ops; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; @@ -530,22 +556,51 @@ static void perform_request(client_fixture *cf) { cq_verify(cqv); peer = grpc_call_get_peer(c); gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer); - gpr_free(peer); grpc_call_destroy(c); - cq_verify_empty_timeout(cqv, 1); + cq_verify_empty_timeout(cqv, 1 /* seconds */); cq_verifier_destroy(cqv); grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv); - gpr_free(details); + grpc_slice_unref(details); + gpr_log(GPR_INFO, "Client call (peer %s) DESTROYED.", peer); + gpr_free(peer); } -static void setup_client(const char *server_hostport, client_fixture *cf) { +#define BALANCERS_NAME "lb.name" +static void setup_client(const server_fixture *lb_server, + const server_fixture *backends, client_fixture *cf) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + char *lb_uri; + // The grpclb LB policy will be automatically selected by virtue of + // the fact that the returned addresses are balancer addresses. + gpr_asprintf(&lb_uri, "test:///%s?lb_enabled=1&balancer_names=%s", + lb_server->servers_hostport, lb_server->balancer_name); + + grpc_arg expected_target_arg; + expected_target_arg.type = GRPC_ARG_STRING; + expected_target_arg.key = + const_cast<char *>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS); + + char *expected_target_names = NULL; + const char *backends_name = lb_server->servers_hostport; + gpr_asprintf(&expected_target_names, "%s;%s", backends_name, BALANCERS_NAME); + + expected_target_arg.value.string = const_cast<char *>(expected_target_names); + grpc_channel_args *args = + grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1); + gpr_free(expected_target_names); + cf->cq = grpc_completion_queue_create(NULL); - cf->server_uri = gpr_strdup(server_hostport); - cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL); + cf->server_uri = lb_uri; + grpc_channel_credentials *fake_creds = + grpc_fake_transport_security_credentials_create(); + cf->client = + grpc_secure_channel_create(fake_creds, cf->server_uri, args, NULL); + grpc_channel_credentials_unref(&exec_ctx, fake_creds); + grpc_channel_args_destroy(&exec_ctx, args); } static void teardown_client(client_fixture *cf) { @@ -572,10 +627,14 @@ static void setup_server(const char *host, server_fixture *sf) { gpr_join_host_port(&sf->servers_hostport, host, sf->port); } + grpc_server_credentials *server_creds = + grpc_fake_transport_security_server_credentials_create(); + sf->server = grpc_server_create(NULL, NULL); grpc_server_register_completion_queue(sf->server, sf->cq, NULL); - GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port( - sf->server, sf->servers_hostport)) > 0); + GPR_ASSERT((assigned_port = grpc_server_add_secure_http2_port( + sf->server, sf->servers_hostport, server_creds)) > 0); + grpc_server_credentials_release(server_creds); GPR_ASSERT(sf->port == assigned_port); grpc_server_start(sf->server); } @@ -586,7 +645,7 @@ static void teardown_server(server_fixture *sf) { gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport); grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000)); GPR_ASSERT(grpc_completion_queue_pluck( - sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) + sf->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(sf->server); gpr_thd_join(sf->tid); @@ -615,6 +674,7 @@ static void fork_lb_server(void *arg) { tf->lb_server_update_delay_ms); } +#define LB_TOKEN_PREFIX "token" static test_fixture setup_test_fixture(int lb_server_update_delay_ms) { test_fixture tf; memset(&tf, 0, sizeof(tf)); @@ -624,20 +684,22 @@ static test_fixture setup_test_fixture(int lb_server_update_delay_ms) { gpr_thd_options_set_joinable(&options); for (int i = 0; i < NUM_BACKENDS; ++i) { + // Only the first half of the servers expect an LB token. + if (i < NUM_BACKENDS / 2) { + tf.lb_backends[i].lb_token_prefix = LB_TOKEN_PREFIX; + } else { + tf.lb_backends[i].lb_token_prefix = ""; + } setup_server("127.0.0.1", &tf.lb_backends[i]); gpr_thd_new(&tf.lb_backends[i].tid, fork_backend_server, &tf.lb_backends[i], &options); } + tf.lb_server.lb_token_prefix = LB_TOKEN_PREFIX; + tf.lb_server.balancer_name = BALANCERS_NAME; setup_server("127.0.0.1", &tf.lb_server); gpr_thd_new(&tf.lb_server.tid, fork_lb_server, &tf.lb_server, &options); - - char *server_uri; - gpr_asprintf(&server_uri, "test:%s?lb_policy=grpclb&lb_enabled=1", - tf.lb_server.servers_hostport); - setup_client(server_uri, &tf.client); - gpr_free(server_uri); - + setup_client(&tf.lb_server, tf.lb_backends, &tf.client); return tf; } @@ -673,39 +735,44 @@ static test_fixture test_update(int lb_server_update_delay_ms) { TEST(GrpclbTest, Updates) { grpc::test_fixture tf_result; - // Clients take a bit over one second to complete a call (the last part of the + // Clients take at least one second to complete a call (the last part of the // call sleeps for 1 second while verifying the client's completion queue is - // empty). Therefore: + // empty), more if the system is under load. Therefore: // // If the LB server waits 800ms before sending an update, it will arrive - // before the first client request is done, skipping the second server from - // batch 1 altogether: the 2nd client request will go to the 1st server of - // batch 2 (ie, the third one out of the four total servers). + // before the first client request finishes, skipping the second server from + // batch 1. All subsequent picks will come from the second half of the + // backends, those coming in the LB update. tf_result = grpc::test_update(800); - GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); - GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0); - GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2); - GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); - - // If the LB server waits 1500ms, the update arrives after having picked the - // 2nd server from batch 1 but before the next pick for the first server of - // batch 2. All server are used. - tf_result = grpc::test_update(1500); - GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); - GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); - GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1); - GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); - - // If the LB server waits > 2000ms, the update arrives after the first two - // request are done and the third pick is performed, which returns, in RR - // fashion, the 1st server of the 1st update. Therefore, the second server of - // batch 1 is hit at least one, whereas the first server of batch 2 is never - // hit. + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced + + tf_result.lb_backends[1].num_calls_serviced == + 1); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced + + tf_result.lb_backends[3].num_calls_serviced > + 0); + int num_serviced_calls = 0; + for (int i = 0; i < 4; i++) { + num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced; + } + GPR_ASSERT(num_serviced_calls == 4); + + // If the LB server waits 2500ms, the update arrives after two calls and three + // picks. The third pick will be the 1st server of the 1st update (RR policy + // going around). The fourth and final pick will come from the second LB + // update. In any case, the total number of serviced calls must again be equal + // to four across all the backends. tf_result = grpc::test_update(2500); - GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1); - GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0); - GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0); - GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0); + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced + + tf_result.lb_backends[1].num_calls_serviced >= + 2); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced + + tf_result.lb_backends[3].num_calls_serviced > + 0); + num_serviced_calls = 0; + for (int i = 0; i < 4; i++) { + num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced; + } + GPR_ASSERT(num_serviced_calls == 4); } TEST(GrpclbTest, InvalidAddressInServerlist) {} |