diff options
Diffstat (limited to 'test/cpp/end2end')
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 136 | ||||
-rw-r--r-- | test/cpp/end2end/client_crash_test.cc | 3 | ||||
-rw-r--r-- | test/cpp/end2end/client_crash_test_server.cc | 4 | ||||
-rw-r--r-- | test/cpp/end2end/client_lb_end2end_test.cc | 22 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 24 | ||||
-rw-r--r-- | test/cpp/end2end/generic_end2end_test.cc | 12 | ||||
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 63 | ||||
-rw-r--r-- | test/cpp/end2end/mock_test.cc | 16 | ||||
-rw-r--r-- | test/cpp/end2end/thread_stress_test.cc | 40 |
9 files changed, 217 insertions, 103 deletions
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 2a33e8ae11..1ea087e706 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -28,12 +28,14 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/tls.h> #include "src/core/lib/iomgr/port.h" +#include "src/core/lib/support/env.h" #include "src/proto/grpc/health/v1/health.grpc.pb.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -99,7 +101,7 @@ class PollingOverrider { class Verifier { public: - explicit Verifier(bool spin) : spin_(spin) {} + explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {} // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { return ExpectUnless(i, expect_ok, false); @@ -142,6 +144,18 @@ class Verifier { return detag(got_tag); } + template <typename T> + CompletionQueue::NextStatus DoOnceThenAsyncNext( + CompletionQueue* cq, void** got_tag, bool* ok, T deadline, + std::function<void(void)> lambda) { + if (lambda_run_) { + return cq->AsyncNext(got_tag, ok, deadline); + } else { + lambda_run_ = true; + return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline); + } + } + // Verify keeps calling Next until all currently set // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } @@ -154,6 +168,7 @@ class Verifier { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { @@ -193,6 +208,47 @@ class Verifier { } } + // This version of Verify stops after a certain deadline, and uses the + // DoThenAsyncNext API + // to call the lambda + void Verify(CompletionQueue* cq, + std::chrono::system_clock::time_point deadline, + std::function<void(void)> lambda) { + if (expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + while (std::chrono::system_clock::now() < deadline) { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + while (!expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + GPR_ASSERT(std::chrono::system_clock::now() < deadline); + auto r = DoOnceThenAsyncNext( + cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::GOT_EVENT); + } + GotTag(got_tag, ok, false); + } + } + } + private: void GotTag(void* got_tag, bool ok, bool ignore_ok) { auto it = expectations_.find(got_tag); @@ -226,6 +282,7 @@ class Verifier { std::map<void*, bool> expectations_; std::map<void*, MaybeExpect> maybe_expectations_; bool spin_; + bool lambda_run_; }; bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) { @@ -401,9 +458,18 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { } TEST_P(AsyncEnd2endTest, ReconnectChannel) { + // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main() if (GetParam().inproc) { return; } + int poller_slowdown_factor = 1; + // It needs 2 pollset_works to reconnect the channel with polling engine + // "poll" + char* s = gpr_getenv("GRPC_POLL_STRATEGY"); + if (s != nullptr && 0 == strcmp(s, "poll")) { + poller_slowdown_factor = 2; + } + gpr_free(s); ResetStub(); SendRpc(1); server_->Shutdown(); @@ -413,10 +479,13 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) { while (cq_->Next(&ignored_tag, &ignored_ok)) ; BuildAndStartServer(); - // It needs more than kConnectivityCheckIntervalMsec time to reconnect the - // channel. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1600, GPR_TIMESPAN))); + // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to + // reconnect the channel. + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis( + 300 * poller_slowdown_factor * grpc_test_slowdown_factor(), + GPR_TIMESPAN))); SendRpc(1); } @@ -490,6 +559,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { EXPECT_TRUE(recv_status.ok()); } +// Test a simple RPC using the async version of Next +TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + std::chrono::system_clock::time_point time_now( + std::chrono::system_clock::now()); + std::chrono::system_clock::time_point time_limit( + std::chrono::system_clock::now() + std::chrono::seconds(10)); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + + auto resp_writer_ptr = &response_writer; + auto lambda_2 = [&, this, resp_writer_ptr]() { + gpr_log(GPR_ERROR, "CALLED"); + service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(), + cq_.get(), tag(2)); + }; + + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit, lambda_2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + auto recv_resp_ptr = &recv_response; + auto status_ptr = &recv_status; + send_response.set_message(recv_request.message()); + auto lambda_3 = [&, this, resp_writer_ptr, send_response]() { + resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); + }; + response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max(), + lambda_3); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // Two pings and a final pong. TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ResetStub(); @@ -1890,6 +2013,9 @@ INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel, } // namespace grpc int main(int argc, char** argv) { + // Change the backup poll interval from 5s to 100ms to speed up the + // ReconnectChannel test + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "100"); grpc_test_init(argc, argv); gpr_tls_init(&g_is_async_end2end_test); ::testing::InitGoogleTest(&argc, argv); diff --git a/test/cpp/end2end/client_crash_test.cc b/test/cpp/end2end/client_crash_test.cc index 4d2304feca..f34b27511b 100644 --- a/test/cpp/end2end/client_crash_test.cc +++ b/test/cpp/end2end/client_crash_test.cc @@ -56,7 +56,8 @@ class CrashTest : public ::testing::Test { addr_stream << "localhost:" << port; auto addr = addr_stream.str(); server_.reset(new SubProcess({ - g_root + "/client_crash_test_server", "--address=" + addr, + g_root + "/client_crash_test_server", + "--address=" + addr, })); GPR_ASSERT(server_); return grpc::testing::EchoTestService::NewStub( diff --git a/test/cpp/end2end/client_crash_test_server.cc b/test/cpp/end2end/client_crash_test_server.cc index 01dcd40f9a..887504d308 100644 --- a/test/cpp/end2end/client_crash_test_server.cc +++ b/test/cpp/end2end/client_crash_test_server.cc @@ -68,8 +68,8 @@ void RunServer() { std::cout << "Server listening on " << FLAGS_address << std::endl; server->Wait(); } -} -} +} // namespace testing +} // namespace grpc int main(int argc, char** argv) { ParseCommandLineFlags(&argc, &argv, true); diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index c236f76e89..f8bb12fde1 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -33,10 +33,9 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> -extern "C" { #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" -} +#include "src/core/lib/support/env.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -86,7 +85,11 @@ class MyTestServiceImpl : public TestServiceImpl { class ClientLbEnd2endTest : public ::testing::Test { protected: ClientLbEnd2endTest() - : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {} + : server_host_("localhost"), kRequestMessage_("Live long and prosper.") { + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); + } void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -110,22 +113,23 @@ class ClientLbEnd2endTest : public ::testing::Test { void SetNextResolution(const std::vector<int>& ports) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_lb_addresses* addresses = grpc_lb_addresses_create(ports.size(), NULL); + grpc_lb_addresses* addresses = + grpc_lb_addresses_create(ports.size(), nullptr); for (size_t i = 0; i < ports.size(); ++i) { char* lb_uri_str; gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", ports[i]); grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true); - GPR_ASSERT(lb_uri != NULL); + GPR_ASSERT(lb_uri != nullptr); grpc_lb_addresses_set_address_from_uri(addresses, i, lb_uri, false /* is balancer */, - "" /* balancer name */, NULL); + "" /* balancer name */, nullptr); grpc_uri_destroy(lb_uri); gpr_free(lb_uri_str); } const grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses); grpc_channel_args* fake_result = - grpc_channel_args_copy_and_add(NULL, &fake_addresses, 1); + grpc_channel_args_copy_and_add(nullptr, &fake_addresses, 1); grpc_fake_resolver_response_generator_set_response( &exec_ctx, response_generator_, fake_result); grpc_channel_args_destroy(&exec_ctx, fake_result); @@ -305,7 +309,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) { ports.clear(); SetNextResolution(ports); gpr_log(GPR_INFO, "****** SET none *******"); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); @@ -481,7 +485,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) { // An empty update will result in the channel going into TRANSIENT_FAILURE. ports.clear(); SetNextResolution(ports); - grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT; + grpc_connectivity_state channel_state; do { channel_state = channel_->GetState(true /* try to connect */); } while (channel_state == GRPC_CHANNEL_READY); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 1aa547d4e3..c71034bbe8 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -30,11 +30,13 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include "src/core/lib/security/credentials/credentials.h" +#include "src/core/lib/support/env.h" #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" @@ -704,13 +706,25 @@ TEST_P(End2endTest, ReconnectChannel) { if (GetParam().inproc) { return; } + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200"); + int poller_slowdown_factor = 1; + // It needs 2 pollset_works to reconnect the channel with polling engine + // "poll" + char* s = gpr_getenv("GRPC_POLL_STRATEGY"); + if (s != nullptr && 0 == strcmp(s, "poll")) { + poller_slowdown_factor = 2; + } + gpr_free(s); ResetStub(); SendRpc(stub_.get(), 1, false); RestartServer(std::shared_ptr<AuthMetadataProcessor>()); - // It needs more than kConnectivityCheckIntervalMsec time to reconnect the - // channel. - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(1600, GPR_TIMESPAN))); + // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to + // reconnect the channel. + gpr_sleep_until(gpr_time_add( + gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis( + 300 * poller_slowdown_factor * grpc_test_slowdown_factor(), + GPR_TIMESPAN))); SendRpc(stub_.get(), 1, false); } @@ -1134,7 +1148,7 @@ TEST_P(End2endTest, ChannelState) { CompletionQueue cq; std::chrono::system_clock::time_point deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(10); - channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL); + channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr); void* tag; bool ok = true; cq.Next(&tag, &ok); diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 9450182302..40949e8f3a 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -216,9 +216,10 @@ TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) { std::unique_ptr<ByteBuffer> cli_send_buffer = SerializeToByteBuffer(&send_request); + // Use the same cq as server so that events can be polled in time. std::unique_ptr<GenericClientAsyncResponseReader> call = generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName, - *cli_send_buffer.get(), &cli_cq_); + *cli_send_buffer.get(), srv_cq_.get()); call->StartCall(); ByteBuffer cli_recv_buffer; call->Finish(&cli_recv_buffer, &recv_status, tag(1)); @@ -226,7 +227,7 @@ TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) { generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), srv_cq_.get(), tag(4)); - verify_ok(srv_cq_.get(), 4, true); + server_ok(4); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(kMethodName, srv_ctx.method()); @@ -245,7 +246,7 @@ TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) { stream.Finish(Status::OK, tag(7)); server_ok(7); - client_ok(1); + verify_ok(srv_cq_.get(), 1, true); EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response)); EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); @@ -321,8 +322,9 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { TEST_F(GenericEnd2endTest, Deadline) { ResetStub(); - SendRpc(1, true, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(10, GPR_TIMESPAN))); + SendRpc(1, true, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(10, GPR_TIMESPAN))); } } // namespace diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index f73a9c1791..c15ab88da1 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -33,10 +33,9 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> -extern "C" { #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h" #include "src/core/lib/iomgr/sockaddr.h" -} +#include "src/core/lib/support/env.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -332,8 +331,11 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds), - kRequestMessage_("Live long and prosper.") {} + client_load_reporting_interval_seconds) { + // Make the backup poller poll very frequently in order to pick up + // updates from all the subchannels's FDs. + gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1"); + } void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -559,7 +561,6 @@ class GrpclbEnd2endTest : public ::testing::Test { std::unique_ptr<std::thread> thread_; }; - const grpc::string kMessage_ = "Live long and prosper."; const grpc::string server_host_; const size_t num_backends_; const size_t num_balancers_; @@ -571,7 +572,7 @@ class GrpclbEnd2endTest : public ::testing::Test { std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_; grpc_fake_resolver_response_generator* response_generator_; - const grpc::string kRequestMessage_; + const grpc::string kRequestMessage_ = "Live long and prosper."; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -658,8 +659,9 @@ TEST_F(SingleBalancerTest, Fallback) { // Send non-empty serverlist only after kServerlistDelayMs. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends( - GetBackendPorts(kNumBackendInResolution /* start_index */), {}), + 0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumBackendInResolution /* start_index */), {}), kServerlistDelayMs); // Wait until all the fallback backends are reachable. @@ -724,10 +726,11 @@ TEST_F(SingleBalancerTest, FallbackUpdate) { // Send non-empty serverlist only after kServerlistDelayMs. ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends( - GetBackendPorts(kNumBackendInResolution + - kNumBackendInResolutionUpdate /* start_index */), - {}), + 0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(kNumBackendInResolution + + kNumBackendInResolutionUpdate /* start_index */), + {}), kServerlistDelayMs); // Wait until all the fallback backends are reachable. @@ -1068,10 +1071,11 @@ TEST_F(SingleBalancerTest, Drop) { num_of_drop_by_load_balancing_addresses; const int num_total_addresses = num_backends_ + num_of_drop_addresses; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends( - GetBackendPorts(), - {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), + 0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), + {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, + {"load_balancing", num_of_drop_by_load_balancing_addresses}}), 0); // Wait until all backends are ready. WaitForAllBackends(); @@ -1086,7 +1090,7 @@ TEST_F(SingleBalancerTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); @@ -1107,9 +1111,10 @@ TEST_F(SingleBalancerTest, DropAllFirst) { const int num_of_drop_by_rate_limiting_addresses = 1; const int num_of_drop_by_load_balancing_addresses = 1; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends( - {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), + 0, + BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, + {"load_balancing", num_of_drop_by_load_balancing_addresses}}), 0); const Status status = SendRpc(); EXPECT_FALSE(status.ok()); @@ -1123,9 +1128,10 @@ TEST_F(SingleBalancerTest, DropAll) { const int num_of_drop_by_rate_limiting_addresses = 1; const int num_of_drop_by_load_balancing_addresses = 1; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends( - {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), + 0, + BalancerServiceImpl::BuildResponseForBackends( + {}, {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, + {"load_balancing", num_of_drop_by_load_balancing_addresses}}), 1000); // First call succeeds. @@ -1187,10 +1193,11 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { num_of_drop_by_load_balancing_addresses; const int num_total_addresses = num_backends_ + num_of_drop_addresses; ScheduleResponseForBalancer( - 0, BalancerServiceImpl::BuildResponseForBackends( - GetBackendPorts(), - {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, - {"load_balancing", num_of_drop_by_load_balancing_addresses}}), + 0, + BalancerServiceImpl::BuildResponseForBackends( + GetBackendPorts(), + {{"rate_limiting", num_of_drop_by_rate_limiting_addresses}, + {"load_balancing", num_of_drop_by_load_balancing_addresses}}), 0); // Wait until all backends are ready. int num_warmup_ok = 0; @@ -1210,7 +1217,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 0b63c25055..61f4111e3b 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -44,19 +44,19 @@ #include <iostream> using namespace std; +using ::testing::AtLeast; +using ::testing::DoAll; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::SaveArg; +using ::testing::SetArgPointee; +using ::testing::WithArg; +using ::testing::_; using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; using grpc::testing::EchoTestService; using grpc::testing::MockClientReaderWriter; using std::chrono::system_clock; -using ::testing::AtLeast; -using ::testing::SetArgPointee; -using ::testing::SaveArg; -using ::testing::_; -using ::testing::Return; -using ::testing::Invoke; -using ::testing::WithArg; -using ::testing::DoAll; namespace grpc { namespace testing { diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc index f990a7ed9d..90b2eddbbb 100644 --- a/test/cpp/end2end/thread_stress_test.cc +++ b/test/cpp/end2end/thread_stress_test.cc @@ -50,23 +50,6 @@ const int kNumRpcs = 1000; // Number of RPCs per thread namespace grpc { namespace testing { -namespace { - -// When echo_deadline is requested, deadline seen in the ServerContext is set in -// the response in seconds. -void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, - EchoResponse* response) { - if (request->has_param() && request->param().echo_deadline()) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME); - if (context->deadline() != system_clock::time_point::max()) { - Timepoint2Timespec(context->deadline(), &deadline); - } - response->mutable_param()->set_request_deadline(deadline.tv_sec); - } -} - -} // namespace - class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { public: TestServiceImpl() : signal_client_(false) {} @@ -74,29 +57,6 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { Status Echo(ServerContext* context, const EchoRequest* request, EchoResponse* response) override { response->set_message(request->message()); - MaybeEchoDeadline(context, request, response); - if (request->has_param() && request->param().client_cancel_after_us()) { - { - std::unique_lock<std::mutex> lock(mu_); - signal_client_ = true; - } - while (!context->IsCancelled()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().client_cancel_after_us(), - GPR_TIMESPAN))); - } - return Status::CANCELLED; - } else if (request->has_param() && - request->param().server_cancel_after_us()) { - gpr_sleep_until(gpr_time_add( - gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(request->param().server_cancel_after_us(), - GPR_TIMESPAN))); - return Status::CANCELLED; - } else { - EXPECT_FALSE(context->IsCancelled()); - } return Status::OK; } |