diff options
author | Muxi Yan <muxi@users.noreply.github.com> | 2017-08-02 16:57:23 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-02 16:57:23 -0700 |
commit | 69b8fe02b6a45a2731c17e91d6f22d85026fcd2f (patch) | |
tree | 39d464fd58172412c966e0363ab85c6669cb465b /test | |
parent | 627a5824c3cbf37d38869238e5382f4ace6d6370 (diff) | |
parent | 4f68ecf665433933b0ef1f0a3df12b1590646a90 (diff) |
Merge branch 'master' into stream_compression_config
Diffstat (limited to 'test')
-rw-r--r-- | test/core/end2end/fuzzers/api_fuzzer.c | 5 | ||||
-rw-r--r-- | test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-minimized-4688823906729984 | bin | 0 -> 150 bytes | |||
-rw-r--r-- | test/core/end2end/fuzzers/server_fuzzer.c | 5 | ||||
-rw-r--r-- | test/core/end2end/tests/cancel_with_status.c | 8 | ||||
-rw-r--r-- | test/core/fling/server.c | 6 | ||||
-rw-r--r-- | test/core/surface/completion_queue_test.c | 6 | ||||
-rw-r--r-- | test/core/surface/completion_queue_threading_test.c | 4 | ||||
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 177 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_api_test.cc | 16 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_chttp2_transport.cc | 36 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_cq.cc | 7 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_cq_multiple_threads.cc | 2 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack_trickle.cc | 27 | ||||
-rw-r--r-- | test/cpp/qps/server.h | 7 | ||||
-rw-r--r-- | test/cpp/util/BUILD | 1 |
15 files changed, 198 insertions, 109 deletions
diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index 281a1af20c..01fa4f748f 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -34,6 +34,7 @@ #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/timer_manager.h" #include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/surface/server.h" #include "src/core/lib/transport/metadata.h" #include "test/core/end2end/data/ssl_test_data.h" @@ -731,7 +732,9 @@ static validator *make_finished_batch_validator(call_state *cs, int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { grpc_test_only_set_slice_hash_seed(0); - if (squelch) gpr_set_log_function(dont_log); + char *grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER"); + if (squelch && grpc_trace_fuzzer == NULL) gpr_set_log_function(dont_log); + gpr_free(grpc_trace_fuzzer); input_stream inp = {data, data + size}; grpc_tcp_client_connect_impl = my_tcp_client_connect; gpr_now_impl = now_impl; diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-minimized-4688823906729984 b/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-minimized-4688823906729984 Binary files differnew file mode 100644 index 0000000000..a5b730382e --- /dev/null +++ b/test/core/end2end/fuzzers/api_fuzzer_corpus/clusterfuzz-testcase-minimized-4688823906729984 diff --git a/test/core/end2end/fuzzers/server_fuzzer.c b/test/core/end2end/fuzzers/server_fuzzer.c index 3ddc1ae907..ef4c0a4bfd 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.c +++ b/test/core/end2end/fuzzers/server_fuzzer.c @@ -72,8 +72,9 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { grpc_metadata_array_init(&request_metadata1); int requested_calls = 0; - grpc_server_request_call(server, &call1, &call_details1, &request_metadata1, - cq, cq, tag(1)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(server, &call1, &call_details1, + &request_metadata1, cq, cq, tag(1))); requested_calls++; grpc_event ev; diff --git a/test/core/end2end/tests/cancel_with_status.c b/test/core/end2end/tests/cancel_with_status.c index d659d1173a..fd26fd122e 100644 --- a/test/core/end2end/tests/cancel_with_status.c +++ b/test/core/end2end/tests/cancel_with_status.c @@ -25,6 +25,7 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/lib/support/string.h" @@ -138,7 +139,12 @@ static void simple_request_body(grpc_end2end_test_config config, error = grpc_call_start_batch(c, ops, num_ops, tag(1), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - grpc_call_cancel_with_status(c, GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL); + char *dynamic_string = gpr_strdup("xyz"); + grpc_call_cancel_with_status(c, GRPC_STATUS_UNIMPLEMENTED, + (const char *)dynamic_string, NULL); + // The API of \a description allows for it to be a dynamic/non-const + // string, test this guarantee. + gpr_free(dynamic_string); CQ_EXPECT_COMPLETION(cqv, tag(1), 1); cq_verify(cqv); diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 0f0f22ffcf..b3a7fa21ec 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -77,8 +77,10 @@ typedef struct { static void request_call(void) { grpc_metadata_array_init(&request_metadata_recv); - grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, - cq, cq, tag(FLING_SERVER_NEW_REQUEST)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(server, &call, &call_details, + &request_metadata_recv, cq, cq, + tag(FLING_SERVER_NEW_REQUEST))); } static void handle_unary_method(void) { diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index f9d88d6327..e6372a379c 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -144,7 +144,7 @@ static void test_cq_end_op(void) { cc = grpc_completion_queue_create( grpc_completion_queue_factory_lookup(&attr), &attr, NULL); - grpc_cq_begin_op(cc, tag); + GPR_ASSERT(grpc_cq_begin_op(cc, tag)); grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completion); @@ -233,7 +233,7 @@ static void test_pluck(void) { grpc_completion_queue_factory_lookup(&attr), &attr, NULL); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } @@ -245,7 +245,7 @@ static void test_pluck(void) { } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c index 99d0fa4980..9996b6b840 100644 --- a/test/core/surface/completion_queue_threading_test.c +++ b/test/core/surface/completion_queue_threading_test.c @@ -107,7 +107,7 @@ static void test_too_many_plucks(void) { GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); + GPR_ASSERT(grpc_cq_begin_op(cc, tags[i])); grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion, NULL, &completions[i]); } @@ -153,7 +153,7 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 1", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { - grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1); + GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void *)(intptr_t)1)); } gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id); diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index 1f3255d18d..4fef535506 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -45,6 +45,7 @@ extern "C" { #include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" +#include <gmock/gmock.h> #include <gtest/gtest.h> // TODO(dgq): Other scenarios in need of testing: @@ -131,6 +132,19 @@ class BackendServiceImpl : public BackendService { IncreaseResponseCount(); return status; } + + // Returns true on its first invocation, false otherwise. + bool Shutdown() { + std::unique_lock<std::mutex> lock(mu_); + const bool prev = !shutdown_; + shutdown_ = true; + gpr_log(GPR_INFO, "Backend: shut down"); + return prev; + } + + private: + std::mutex mu_; + bool shutdown_ = false; }; grpc::string Ip4ToPackedString(const char* ip_str) { @@ -142,22 +156,20 @@ grpc::string Ip4ToPackedString(const char* ip_str) { struct ClientStats { size_t num_calls_started = 0; size_t num_calls_finished = 0; - size_t num_calls_finished_with_drop_for_rate_limiting = 0; - size_t num_calls_finished_with_drop_for_load_balancing = 0; size_t num_calls_finished_with_client_failed_to_send = 0; size_t num_calls_finished_known_received = 0; + std::map<grpc::string, size_t> drop_token_counts; ClientStats& operator+=(const ClientStats& other) { num_calls_started += other.num_calls_started; num_calls_finished += other.num_calls_finished; - num_calls_finished_with_drop_for_rate_limiting += - other.num_calls_finished_with_drop_for_rate_limiting; - num_calls_finished_with_drop_for_load_balancing += - other.num_calls_finished_with_drop_for_load_balancing; num_calls_finished_with_client_failed_to_send += other.num_calls_finished_with_client_failed_to_send; num_calls_finished_known_received += other.num_calls_finished_known_received; + for (const auto& p : other.drop_token_counts) { + drop_token_counts[p.first] += p.second; + } return *this; } }; @@ -173,11 +185,12 @@ class BalancerServiceImpl : public BalancerService { shutdown_(false) {} Status BalanceLoad(ServerContext* context, Stream* stream) override { - gpr_log(GPR_INFO, "LB: BalanceLoad"); + gpr_log(GPR_INFO, "LB[%p]: BalanceLoad", this); LoadBalanceRequest request; stream->Read(&request); IncreaseRequestCount(); - gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str()); + gpr_log(GPR_INFO, "LB[%p]: recv msg '%s'", this, + request.DebugString().c_str()); if (client_load_reporting_interval_seconds_ > 0) { LoadBalanceResponse initial_response; @@ -208,7 +221,7 @@ class BalancerServiceImpl : public BalancerService { if (client_load_reporting_interval_seconds_ > 0) { request.Clear(); stream->Read(&request); - gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'", + gpr_log(GPR_INFO, "LB[%p]: recv client load report msg: '%s'", this, request.DebugString().c_str()); GPR_ASSERT(request.has_client_stats()); // We need to acquire the lock here in order to prevent the notify_one @@ -218,21 +231,21 @@ class BalancerServiceImpl : public BalancerService { request.client_stats().num_calls_started(); client_stats_.num_calls_finished += request.client_stats().num_calls_finished(); - client_stats_.num_calls_finished_with_drop_for_rate_limiting += - request.client_stats() - .num_calls_finished_with_drop_for_rate_limiting(); - client_stats_.num_calls_finished_with_drop_for_load_balancing += - request.client_stats() - .num_calls_finished_with_drop_for_load_balancing(); client_stats_.num_calls_finished_with_client_failed_to_send += request.client_stats() .num_calls_finished_with_client_failed_to_send(); client_stats_.num_calls_finished_known_received += request.client_stats().num_calls_finished_known_received(); + for (const auto& drop_token_count : + request.client_stats().calls_finished_with_drop()) { + client_stats_ + .drop_token_counts[drop_token_count.load_balance_token()] += + drop_token_count.num_calls(); + } load_report_cond_.notify_one(); } done: - gpr_log(GPR_INFO, "LB: done"); + gpr_log(GPR_INFO, "LB[%p]: done", this); return Status::OK; } @@ -247,21 +260,20 @@ class BalancerServiceImpl : public BalancerService { std::unique_lock<std::mutex> lock(mu_); const bool prev = !shutdown_; shutdown_ = true; - gpr_log(GPR_INFO, "LB: shut down"); + gpr_log(GPR_INFO, "LB[%p]: shut down", this); return prev; } static LoadBalanceResponse BuildResponseForBackends( - const std::vector<int>& backend_ports, int num_drops_for_rate_limiting, - int num_drops_for_load_balancing) { + const std::vector<int>& backend_ports, + const std::map<grpc::string, size_t>& drop_token_counts) { LoadBalanceResponse response; - for (int i = 0; i < num_drops_for_rate_limiting; ++i) { - auto* server = response.mutable_server_list()->add_servers(); - server->set_drop_for_rate_limiting(true); - } - for (int i = 0; i < num_drops_for_load_balancing; ++i) { - auto* server = response.mutable_server_list()->add_servers(); - server->set_drop_for_load_balancing(true); + for (const auto& drop_token_count : drop_token_counts) { + for (size_t i = 0; i < drop_token_count.second; ++i) { + auto* server = response.mutable_server_list()->add_servers(); + server->set_drop(true); + server->set_load_balance_token(drop_token_count.first); + } } for (const int& backend_port : backend_ports) { auto* server = response.mutable_server_list()->add_servers(); @@ -285,13 +297,13 @@ class BalancerServiceImpl : public BalancerService { private: void SendResponse(Stream* stream, const LoadBalanceResponse& response, int delay_ms) { - gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms); + gpr_log(GPR_INFO, "LB[%p]: sleeping for %d ms...", this, delay_ms); if (delay_ms > 0) { gpr_sleep_until( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); } - gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'", + gpr_log(GPR_INFO, "LB[%p]: Woke up! Sending response '%s'", this, response.DebugString().c_str()); IncreaseResponseCount(); stream->Write(response); @@ -341,7 +353,7 @@ class GrpclbEnd2endTest : public ::testing::Test { void TearDown() override { for (size_t i = 0; i < backends_.size(); ++i) { - backend_servers_[i].Shutdown(); + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); } for (size_t i = 0; i < balancers_.size(); ++i) { if (balancers_[i]->Shutdown()) balancer_servers_[i].Shutdown(); @@ -499,7 +511,7 @@ class SingleBalancerTest : public GrpclbEnd2endTest { TEST_F(SingleBalancerTest, Vanilla) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // Make sure that trying to connect works without a call. channel_->GetState(true /* try_to_connect */); @@ -538,7 +550,7 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) { ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0); // Send non-empty serverlist only after kServerlistDelayMs ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); const auto t0 = system_clock::now(); @@ -580,11 +592,11 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { // Send a serverlist right away. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // ... and the same one a bit later. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), kServerlistDelayMs); // Send num_backends/2 requests. @@ -639,6 +651,61 @@ TEST_F(SingleBalancerTest, RepeatedServerlist) { EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); } +TEST_F(SingleBalancerTest, BackendsRestart) { + const size_t kNumRpcsPerAddress = 100; + ScheduleResponseForBalancer( + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), + 0); + // Make sure that trying to connect works without a call. + channel_->GetState(true /* try_to_connect */); + // Send 100 RPCs per server. + auto statuses_and_responses = + SendRpc(kMessage_, kNumRpcsPerAddress * num_backends_); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + const EchoResponse& response = status_and_response.second; + EXPECT_TRUE(status.ok()) << "code=" << status.error_code() + << " message=" << status.error_message(); + EXPECT_EQ(response.message(), kMessage_); + } + // Each backend should have gotten 100 requests. + for (size_t i = 0; i < backends_.size(); ++i) { + EXPECT_EQ(kNumRpcsPerAddress, + backend_servers_[i].service_->request_count()); + } + balancers_[0]->NotifyDoneWithServerlists(); + // The balancer got a single request. + EXPECT_EQ(1U, balancer_servers_[0].service_->request_count()); + // and sent a single response. + EXPECT_EQ(1U, balancer_servers_[0].service_->response_count()); + for (size_t i = 0; i < backends_.size(); ++i) { + if (backends_[i]->Shutdown()) backend_servers_[i].Shutdown(); + } + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + } + for (size_t i = 0; i < num_backends_; ++i) { + backends_.emplace_back(new BackendServiceImpl()); + backend_servers_.emplace_back(ServerThread<BackendService>( + "backend", server_host_, backends_.back().get())); + } + // The following RPC will fail due to the backend ports having changed. It + // will nonetheless exercise the grpclb-roundrobin handling of the RR policy + // having gone into shutdown. + // TODO(dgq): implement the "backend restart" component as well. We need extra + // machinery to either update the LB responses "on the fly" or instruct + // backends which ports to restart on. + statuses_and_responses = SendRpc(kMessage_, 1); + for (const auto& status_and_response : statuses_and_responses) { + const Status& status = status_and_response.first; + EXPECT_FALSE(status.ok()); + } + // Check LB policy name for the channel. + EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName()); +} + class UpdatesTest : public GrpclbEnd2endTest { public: UpdatesTest() : GrpclbEnd2endTest(4, 3, 0) {} @@ -648,10 +715,9 @@ TEST_F(UpdatesTest, UpdateBalancers) { const std::vector<int> first_backend{GetBackendPorts()[0]}; const std::vector<int> second_backend{GetBackendPorts()[1]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -726,10 +792,9 @@ TEST_F(UpdatesTest, UpdateBalancersRepeated) { const std::vector<int> second_backend{GetBackendPorts()[0]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -809,10 +874,9 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { const std::vector<int> second_backend{GetBackendPorts()[1]}; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, 0, 0), 0); + 0, BalancerServiceImpl::BuildResponseForBackends(first_backend, {}), 0); ScheduleResponseForBalancer( - 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, 0, 0), - 0); + 1, BalancerServiceImpl::BuildResponseForBackends(second_backend, {}), 0); // Start servers and send 10 RPCs per server. gpr_log(GPR_INFO, "========= BEFORE FIRST BATCH =========="); @@ -901,7 +965,8 @@ TEST_F(UpdatesTest, UpdateBalancersDeadUpdate) { TEST_F(SingleBalancerTest, Drop) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 1, 2), + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", 1}, {"load_balancing", 2}}), 0); // Send 100 RPCs for each server and drop address. const auto& statuses_and_responses = @@ -936,7 +1001,9 @@ TEST_F(SingleBalancerTest, Drop) { TEST_F(SingleBalancerTest, DropAllFirst) { // All registered addresses are marked as "drop". ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 0); + 0, BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), + 0); const auto& statuses_and_responses = SendRpc(kMessage_, 1); for (const auto& status_and_response : statuses_and_responses) { const Status& status = status_and_response.first; @@ -947,10 +1014,12 @@ TEST_F(SingleBalancerTest, DropAllFirst) { TEST_F(SingleBalancerTest, DropAll) { ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends({}, 1, 1), 1000); + 0, BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", 1}, {"load_balancing", 1}}), + 1000); // First call succeeds. auto statuses_and_responses = SendRpc(kMessage_, 1); @@ -980,7 +1049,7 @@ class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest { TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { const size_t kNumRpcsPerAddress = 100; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 0, 0), + 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), {}), 0); // Send 100 RPCs per server. const auto& statuses_and_responses = @@ -1009,17 +1078,17 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) { EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_started); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished); - EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting); - EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished_known_received); + EXPECT_THAT(client_stats.drop_token_counts, ::testing::ElementsAre()); } TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { const size_t kNumRpcsPerAddress = 3; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts(), 2, 1), + 0, BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), {{"rate_limiting", 2}, {"load_balancing", 1}}), 0); // Send 100 RPCs for each server and drop address. const auto& statuses_and_responses = @@ -1056,13 +1125,13 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { client_stats.num_calls_started); EXPECT_EQ(kNumRpcsPerAddress * (num_backends_ + 3), client_stats.num_calls_finished); - EXPECT_EQ(kNumRpcsPerAddress * 2, - client_stats.num_calls_finished_with_drop_for_rate_limiting); - EXPECT_EQ(kNumRpcsPerAddress, - client_stats.num_calls_finished_with_drop_for_load_balancing); EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send); EXPECT_EQ(kNumRpcsPerAddress * num_backends_, client_stats.num_calls_finished_known_received); + EXPECT_THAT(client_stats.drop_token_counts, + ::testing::ElementsAre( + ::testing::Pair("load_balancing", kNumRpcsPerAddress), + ::testing::Pair("rate_limiting", kNumRpcsPerAddress * 2))); } } // namespace diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc index cec9666503..6b0350e1f9 100644 --- a/test/cpp/grpclb/grpclb_api_test.cc +++ b/test/cpp/grpclb/grpclb_api_test.cc @@ -91,13 +91,13 @@ TEST_F(GrpclbTest, ParseResponseServerList) { auto* server = serverlist->add_servers(); server->set_ip_address(Ip4ToPackedString("127.0.0.1")); server->set_port(12345); - server->set_drop_for_rate_limiting(true); - server->set_drop_for_load_balancing(false); + server->set_load_balance_token("rate_limting"); + server->set_drop(true); server = response.mutable_server_list()->add_servers(); server->set_ip_address(Ip4ToPackedString("10.0.0.1")); server->set_port(54321); - server->set_drop_for_rate_limiting(false); - server->set_drop_for_load_balancing(true); + server->set_load_balance_token("load_balancing"); + server->set_drop(true); auto* expiration_interval = serverlist->mutable_expiration_interval(); expiration_interval->set_seconds(888); expiration_interval->set_nanos(999); @@ -112,14 +112,14 @@ TEST_F(GrpclbTest, ParseResponseServerList) { EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address), "127.0.0.1"); EXPECT_EQ(c_serverlist->servers[0]->port, 12345); - EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting); - EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing); + EXPECT_STREQ(c_serverlist->servers[0]->load_balance_token, "rate_limting"); + EXPECT_TRUE(c_serverlist->servers[0]->drop); EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address); EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1"); EXPECT_EQ(c_serverlist->servers[1]->port, 54321); - EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting); - EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing); + EXPECT_STREQ(c_serverlist->servers[1]->load_balance_token, "load_balancing"); + EXPECT_TRUE(c_serverlist->servers[1]->drop); EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds); EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888); diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 567ef1cf24..cb113c5254 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -391,8 +391,9 @@ static void BM_TransportStreamSend(benchmark::State &state) { MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (!state.KeepRunning()) return; // force outgoing window to be yuge - s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024; - f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024; + s.chttp2_stream()->flow_control.remote_window_delta = + 1024 * 1024 * 1024; + f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024; grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); reset_op(); op.on_complete = c.get(); @@ -517,21 +518,22 @@ static void BM_TransportStreamRecv(benchmark::State &state) { std::unique_ptr<Closure> drain_continue; grpc_slice recv_slice; - std::unique_ptr<Closure> c = - MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { - if (!state.KeepRunning()) return; - // force outgoing window to be yuge - s.chttp2_stream()->incoming_window_delta = 1024 * 1024 * 1024; - f.chttp2_transport()->incoming_window = 1024 * 1024 * 1024; - received = 0; - reset_op(); - op.on_complete = do_nothing.get(); - op.recv_message = true; - op.payload->recv_message.recv_message = &recv_stream; - op.payload->recv_message.recv_message_ready = drain_start.get(); - s.Op(&op); - f.PushInput(grpc_slice_ref(incoming_data)); - }); + std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx, + grpc_error *error) { + if (!state.KeepRunning()) return; + // force outgoing window to be yuge + s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024; + s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024; + f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024; + received = 0; + reset_op(); + op.on_complete = do_nothing.get(); + op.recv_message = true; + op.payload->recv_message.recv_message = &recv_stream; + op.payload->recv_message.recv_message_ready = drain_start.get(); + s.Op(&op); + f.PushInput(grpc_slice_ref(incoming_data)); + }); drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { if (recv_stream == NULL) { diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 18308a2e16..a0c0414f2f 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -23,6 +23,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/impl/grpc_library.h> #include <grpc/grpc.h> +#include <grpc/support/log.h> #include "test/cpp/microbenchmarks/helpers.h" extern "C" { @@ -82,7 +83,7 @@ static void BM_Pass1Cpp(benchmark::State& state) { grpc_cq_completion completion; DummyTag dummy_tag; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_cq_begin_op(c_cq, &dummy_tag); + GPR_ASSERT(grpc_cq_begin_op(c_cq, &dummy_tag)); grpc_cq_end_op(&exec_ctx, c_cq, &dummy_tag, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); @@ -102,7 +103,7 @@ static void BM_Pass1Core(benchmark::State& state) { while (state.KeepRunning()) { grpc_cq_completion completion; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_cq_begin_op(cq, NULL); + GPR_ASSERT(grpc_cq_begin_op(cq, NULL)); grpc_cq_end_op(&exec_ctx, cq, NULL, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); @@ -121,7 +122,7 @@ static void BM_Pluck1Core(benchmark::State& state) { while (state.KeepRunning()) { grpc_cq_completion completion; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_cq_begin_op(cq, NULL); + GPR_ASSERT(grpc_cq_begin_op(cq, NULL)); grpc_cq_end_op(&exec_ctx, cq, NULL, GRPC_ERROR_NONE, DoneWithCompletionOnStack, NULL, &completion); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc index f79db15a47..f109fe6251 100644 --- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc +++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc @@ -78,7 +78,7 @@ static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps, } gpr_mu_unlock(&ps->mu); - grpc_cq_begin_op(g_cq, g_tag); + GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag)); grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL, (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion))); grpc_exec_ctx_flush(exec_ctx); diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index aeec7d831b..5c44b9751f 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -73,11 +73,11 @@ class TrickledCHTTP2 : public EndpointPairFixture { log_.reset(new std::ofstream(fn.str().c_str())); write_csv(log_.get(), "t", "iteration", "client_backlog", "server_backlog", "client_t_stall", "client_s_stall", - "server_t_stall", "server_s_stall", "client_t_outgoing", - "server_t_outgoing", "client_t_incoming", "server_t_incoming", - "client_s_outgoing_delta", "server_s_outgoing_delta", - "client_s_incoming_delta", "server_s_incoming_delta", - "client_s_announce_window", "server_s_announce_window", + "server_t_stall", "server_s_stall", "client_t_remote", + "server_t_remote", "client_t_announced", "server_t_announced", + "client_s_remote_delta", "server_s_remote_delta", + "client_s_local_delta", "server_s_local_delta", + "client_s_announced_delta", "server_s_announced_delta", "client_peer_iws", "client_local_iws", "client_sent_iws", "client_acked_iws", "server_peer_iws", "server_local_iws", "server_sent_iws", "server_acked_iws", "client_queued_bytes", @@ -127,14 +127,15 @@ class TrickledCHTTP2 : public EndpointPairFixture { client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr, server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr, - client->outgoing_window, server->outgoing_window, - client->incoming_window, server->incoming_window, - client_stream ? client_stream->outgoing_window_delta : -1, - server_stream ? server_stream->outgoing_window_delta : -1, - client_stream ? client_stream->incoming_window_delta : -1, - server_stream ? server_stream->incoming_window_delta : -1, - client_stream ? client_stream->announce_window : -1, - server_stream ? server_stream->announce_window : -1, + client->flow_control.remote_window, server->flow_control.remote_window, + client->flow_control.announced_window, + server->flow_control.announced_window, + client_stream ? client_stream->flow_control.remote_window_delta : -1, + server_stream ? server_stream->flow_control.remote_window_delta : -1, + client_stream ? client_stream->flow_control.local_window_delta : -1, + server_stream ? server_stream->flow_control.local_window_delta : -1, + client_stream ? client_stream->flow_control.announced_window_delta : -1, + server_stream ? server_stream->flow_control.announced_window_delta : -1, client->settings[GRPC_PEER_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], client->settings[GRPC_LOCAL_SETTINGS] diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index c0dac96d8b..df27a4368e 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -80,8 +80,11 @@ class Server { return false; } payload->set_type(type); - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); + // Don't waste time creating a new payload of identical size. + if (payload->body().length() != (size_t)size) { + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); + } return true; } diff --git a/test/cpp/util/BUILD b/test/cpp/util/BUILD index 33240f6f69..c9b0d6c1df 100644 --- a/test/cpp/util/BUILD +++ b/test/cpp/util/BUILD @@ -30,6 +30,7 @@ grpc_cc_binary( name = "testso.so", srcs = [], linkshared = 1, + linkopts = ['-Wl,--no-undefined'], deps = ["//:grpc++_unsecure"], ) |