diff options
author | Mark D. Roth <roth@google.com> | 2016-10-31 14:20:01 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-10-31 14:20:01 -0700 |
commit | 7e0c2f8301e6f76f5e8d38d1a3f891da20147c39 (patch) | |
tree | cf21380503e17debfbbcecb803af26a5d36897b3 /test/cpp | |
parent | cb356b26eaf8cfde85166a7f6a4b3b8b6b52f839 (diff) | |
parent | ccc6a9cbf264655ae6b60727cd86b987a62977c9 (diff) |
Merge remote-tracking branch 'upstream/master' into lb_policy_name_channel_arg
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 49 | ||||
-rw-r--r-- | test/cpp/interop/client.cc | 62 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.cc | 32 | ||||
-rw-r--r-- | test/cpp/interop/interop_client.h | 2 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 57 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 11 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 7 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 31 | ||||
-rw-r--r-- | test/cpp/qps/report.cc | 6 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 6 | ||||
-rw-r--r-- | test/cpp/qps/server_sync.cc | 6 | ||||
-rw-r--r-- | test/cpp/thread_manager/thread_manager_test.cc | 136 |
12 files changed, 349 insertions, 56 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 7af0fb997e..43b7d44255 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -37,6 +37,7 @@ #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> +#include <grpc++/resource_quota.h> #include <grpc++/security/auth_metadata_processor.h> #include <grpc++/security/credentials.h> #include <grpc++/security/server_credentials.h> @@ -240,6 +241,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { server_address_ << "127.0.0.1:" << port; // Setup server ServerBuilder builder; + ConfigureServerBuilder(&builder); auto server_creds = GetServerCredentials(GetParam().credentials_type); if (GetParam().credentials_type != kInsecureCredentialsType) { server_creds->SetAuthMetadataProcessor(processor); @@ -247,13 +249,21 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { builder.AddListeningPort(server_address_.str(), server_creds); builder.RegisterService(&service_); builder.RegisterService("foo.test.youtube.com", &special_service_); - builder.SetMaxMessageSize( - kMaxMessageSize_); // For testing max message size. builder.RegisterService(&dup_pkg_service_); + + builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); + builder.SetSyncServerOption( + ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); + server_ = builder.BuildAndStart(); is_server_started_ = true; } + virtual void ConfigureServerBuilder(ServerBuilder* builder) { + builder->SetMaxMessageSize( + kMaxMessageSize_); // For testing max message size. + } + void ResetChannel() { if (!is_server_started_) { StartServer(std::shared_ptr<AuthMetadataProcessor>()); @@ -279,6 +289,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { ServerBuilder builder; builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials()); builder.RegisterService(proxy_service_.get()); + + builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); + builder.SetSyncServerOption( + ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); + proxy_server_ = builder.BuildAndStart(); channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials()); @@ -1476,6 +1491,32 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) { } } +class ResourceQuotaEnd2endTest : public End2endTest { + public: + ResourceQuotaEnd2endTest() + : server_resource_quota_("server_resource_quota") {} + + virtual void ConfigureServerBuilder(ServerBuilder* builder) GRPC_OVERRIDE { + builder->SetResourceQuota(server_resource_quota_); + } + + private: + ResourceQuota server_resource_quota_; +}; + +TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) { + ResetStub(); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + std::vector<TestScenario> CreateTestScenarios(bool use_proxy, bool test_insecure, bool test_secure) { @@ -1513,6 +1554,10 @@ INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest, ::testing::ValuesIn(CreateTestScenarios(false, false, true))); +INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, + true))); + } // namespace } // namespace testing } // namespace grpc diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 4197ba8bab..c58910abc3 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -54,33 +54,35 @@ DEFINE_int32(server_port, 0, "Server port."); DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); DEFINE_string(server_host_override, "foo.test.google.fr", "Override the server host which is sent in HTTP header"); -DEFINE_string(test_case, "large_unary", - "Configure different test cases. Valid options are:\n\n" - "all : all test cases;\n" - "cancel_after_begin : cancel stream after starting it;\n" - "cancel_after_first_response: cancel on first response;\n" - "client_compressed_streaming : compressed request streaming with " - "client_compressed_unary : single compressed request;\n" - "client_streaming : request streaming with single response;\n" - "compute_engine_creds: large_unary with compute engine auth;\n" - "custom_metadata: server will echo custom metadata;\n" - "empty_stream : bi-di stream with no request/response;\n" - "empty_unary : empty (zero bytes) request and response;\n" - "half_duplex : half-duplex streaming;\n" - "jwt_token_creds: large_unary with JWT token auth;\n" - "large_unary : single request and (large) response;\n" - "oauth2_auth_token: raw oauth2 access token auth;\n" - "per_rpc_creds: raw oauth2 access token on a single rpc;\n" - "ping_pong : full-duplex streaming;\n" - "response streaming;\n" - "server_compressed_streaming : single request with compressed " - "server_compressed_unary : single compressed response;\n" - "server_streaming : single request with response streaming;\n" - "slow_consumer : single request with response streaming with " - "slow client consumer;\n" - "status_code_and_message: verify status code & message;\n" - "timeout_on_sleeping_server: deadline exceeds on stream;\n" - "unimplemented_method: client calls an unimplemented method;\n"); +DEFINE_string( + test_case, "large_unary", + "Configure different test cases. Valid options are:\n\n" + "all : all test cases;\n" + "cancel_after_begin : cancel stream after starting it;\n" + "cancel_after_first_response: cancel on first response;\n" + "client_compressed_streaming : compressed request streaming with " + "client_compressed_unary : single compressed request;\n" + "client_streaming : request streaming with single response;\n" + "compute_engine_creds: large_unary with compute engine auth;\n" + "custom_metadata: server will echo custom metadata;\n" + "empty_stream : bi-di stream with no request/response;\n" + "empty_unary : empty (zero bytes) request and response;\n" + "half_duplex : half-duplex streaming;\n" + "jwt_token_creds: large_unary with JWT token auth;\n" + "large_unary : single request and (large) response;\n" + "oauth2_auth_token: raw oauth2 access token auth;\n" + "per_rpc_creds: raw oauth2 access token on a single rpc;\n" + "ping_pong : full-duplex streaming;\n" + "response streaming;\n" + "server_compressed_streaming : single request with compressed " + "server_compressed_unary : single compressed response;\n" + "server_streaming : single request with response streaming;\n" + "slow_consumer : single request with response streaming with " + "slow client consumer;\n" + "status_code_and_message: verify status code & message;\n" + "timeout_on_sleeping_server: deadline exceeds on stream;\n" + "unimplemented_method: client calls an unimplemented method;\n" + "unimplemented_service: client calls an unimplemented service;\n"); DEFINE_string(default_service_account, "", "Email of GCE default service account"); DEFINE_string(service_account_key_file, "", @@ -152,6 +154,8 @@ int main(int argc, char** argv) { client.DoCustomMetadata(); } else if (FLAGS_test_case == "unimplemented_method") { client.DoUnimplementedMethod(); + } else if (FLAGS_test_case == "unimplemented_service") { + client.DoUnimplementedService(); } else if (FLAGS_test_case == "cacheable_unary") { client.DoCacheableUnary(); } else if (FLAGS_test_case == "all") { @@ -172,6 +176,7 @@ int main(int argc, char** argv) { client.DoStatusWithMessage(); client.DoCustomMetadata(); client.DoUnimplementedMethod(); + client.DoUnimplementedService(); client.DoCacheableUnary(); // service_account_creds and jwt_token_creds can only run with ssl. if (FLAGS_use_tls) { @@ -207,7 +212,8 @@ int main(int argc, char** argv) { "server_streaming", "status_code_and_message", "timeout_on_sleeping_server", - "unimplemented_method"}; + "unimplemented_method", + "unimplemented_service"}; char* joined_testcases = gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL); diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 1668589cc4..d1242627ef 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -107,6 +107,11 @@ TestService::Stub* InteropClient::ServiceStub::Get() { return stub_.get(); } +UnimplementedService::Stub* +InteropClient::ServiceStub::GetUnimplementedServiceStub() { + return UnimplementedService::NewStub(channel_).get(); +} + void InteropClient::ServiceStub::Reset(std::shared_ptr<Channel> channel) { channel_ = channel; @@ -162,8 +167,8 @@ bool InteropClient::AssertStatusCode(const Status& s, bool InteropClient::DoEmpty() { gpr_log(GPR_DEBUG, "Sending an empty rpc..."); - Empty request = Empty::default_instance(); - Empty response = Empty::default_instance(); + Empty request; + Empty response; ClientContext context; Status s = serviceStub_.Get()->EmptyCall(&context, request, &response); @@ -1002,11 +1007,30 @@ bool InteropClient::DoCustomMetadata() { return true; } +bool InteropClient::DoUnimplementedService() { + gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service..."); + + Empty request; + Empty response; + ClientContext context; + + UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub(); + + Status s = stub->UnimplementedCall(&context, request, &response); + + if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED)) { + return false; + } + + gpr_log(GPR_DEBUG, "unimplemented service done."); + return true; +} + bool InteropClient::DoUnimplementedMethod() { gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc..."); - Empty request = Empty::default_instance(); - Empty response = Empty::default_instance(); + Empty request; + Empty response; ClientContext context; Status s = diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index 0a96e7734d..7ec7ebee20 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -80,6 +80,7 @@ class InteropClient { bool DoStatusWithMessage(); bool DoCustomMetadata(); bool DoUnimplementedMethod(); + bool DoUnimplementedService(); bool DoCacheableUnary(); // Auth tests. // username is a string containing the user email @@ -100,6 +101,7 @@ class InteropClient { ServiceStub(std::shared_ptr<Channel> channel, bool new_stub_every_call); TestService::Stub* Get(); + UnimplementedService::Stub* GetUnimplementedServiceStub(); void Reset(std::shared_ptr<Channel> channel); diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 5fb87b2782..9983c8a7b0 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -36,6 +36,7 @@ #include <condition_variable> #include <mutex> +#include <unordered_map> #include <vector> #include <grpc++/channel.h> @@ -114,19 +115,37 @@ class ClientRequestCreator<ByteBuffer> { class HistogramEntry GRPC_FINAL { public: - HistogramEntry() : used_(false) {} - bool used() const { return used_; } + HistogramEntry() : value_used_(false), status_used_(false) {} + bool value_used() const { return value_used_; } double value() const { return value_; } void set_value(double v) { - used_ = true; + value_used_ = true; value_ = v; } + bool status_used() const { return status_used_; } + int status() const { return status_; } + void set_status(int status) { + status_used_ = true; + status_ = status; + } private: - bool used_; + bool value_used_; double value_; + bool status_used_; + int status_; }; +typedef std::unordered_map<int, int64_t> StatusHistogram; + +inline void MergeStatusHistogram(const StatusHistogram& from, + StatusHistogram* to) { + for (StatusHistogram::const_iterator it = from.begin(); it != from.end(); + ++it) { + (*to)[it->first] += it->second; + } +} + class Client { public: Client() @@ -139,6 +158,7 @@ class Client { ClientStats Mark(bool reset) { Histogram latencies; + StatusHistogram statuses; UsageTimer::Result timer_result; MaybeStartRequests(); @@ -146,27 +166,36 @@ class Client { // avoid std::vector for old compilers that expect a copy constructor if (reset) { Histogram* to_merge = new Histogram[threads_.size()]; + StatusHistogram* to_merge_status = new StatusHistogram[threads_.size()]; + for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->BeginSwap(&to_merge[i]); + threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]); } std::unique_ptr<UsageTimer> timer(new UsageTimer); timer_.swap(timer); for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->EndSwap(); latencies.Merge(to_merge[i]); + MergeStatusHistogram(to_merge_status[i], &statuses); } delete[] to_merge; + delete[] to_merge_status; timer_result = timer->Mark(); } else { // merge snapshots of each thread histogram for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->MergeStatsInto(&latencies); + threads_[i]->MergeStatsInto(&latencies, &statuses); } timer_result = timer_->Mark(); } ClientStats stats; latencies.FillProto(stats.mutable_latencies()); + for (StatusHistogram::const_iterator it = statuses.begin(); + it != statuses.end(); ++it) { + RequestResultCount* rrc = stats.add_request_results(); + rrc->set_status_code(it->first); + rrc->set_count(it->second); + } stats.set_time_elapsed(timer_result.wall); stats.set_time_system(timer_result.system); stats.set_time_user(timer_result.user); @@ -258,16 +287,16 @@ class Client { ~Thread() { impl_.join(); } - void BeginSwap(Histogram* n) { + void BeginSwap(Histogram* n, StatusHistogram* s) { std::lock_guard<std::mutex> g(mu_); n->Swap(&histogram_); + s->swap(statuses_); } - void EndSwap() {} - - void MergeStatsInto(Histogram* hist) { + void MergeStatsInto(Histogram* hist, StatusHistogram* s) { std::unique_lock<std::mutex> g(mu_); hist->Merge(histogram_); + MergeStatusHistogram(statuses_, s); } private: @@ -288,9 +317,12 @@ class Client { const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); // lock, update histogram if needed and see if we're done std::lock_guard<std::mutex> g(mu_); - if (entry.used()) { + if (entry.value_used()) { histogram_.Add(entry.value()); } + if (entry.status_used()) { + statuses_[entry.status()]++; + } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); } @@ -304,6 +336,7 @@ class Client { std::mutex mu_; Histogram histogram_; + StatusHistogram statuses_; Client* client_; const size_t idx_; std::thread impl_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 081114859c..4d36a6ba42 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -83,7 +83,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req, - std::function<void(grpc::Status, ResponseType*)> on_done) + std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done) : context_(), stub_(stub), cq_(nullptr), @@ -113,7 +113,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return true; case State::RESP_DONE: entry->set_value((UsageTimer::Now() - start_) * 1e9); - callback_(status_, &response_); + callback_(status_, &response_, entry); next_state_ = State::INVALID; return false; default: @@ -135,7 +135,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { ResponseType response_; enum State { INVALID, READY, RESP_DONE }; State next_state_; - std::function<void(grpc::Status, ResponseType*)> callback_; + std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_; std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, @@ -290,7 +290,10 @@ class AsyncUnaryClient GRPC_FINAL ~AsyncUnaryClient() GRPC_OVERRIDE {} private: - static void CheckDone(grpc::Status s, SimpleResponse* response) {} + static void CheckDone(grpc::Status s, SimpleResponse* response, + HistogramEntry* entry) { + entry->set_status(s.error_code()); + } static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request, CompletionQueue* cq) { diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 0ccf4e270b..f61e80d76b 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -130,11 +130,8 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); entry->set_value((UsageTimer::Now() - start) * 1e9); - if (!s.ok()) { - gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), - s.error_message().c_str()); - } - return s.ok(); + entry->set_status(s.error_code()); + return true; } }; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 6c675e3483..a440341ccf 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -132,7 +132,8 @@ static void postprocess_scenario_result(ScenarioResult* result) { Histogram histogram; histogram.MergeProto(result->latencies()); - auto qps = histogram.Count() / average(result->client_stats(), WallTime); + auto time_estimate = average(result->client_stats(), WallTime); + auto qps = histogram.Count() / time_estimate; auto qps_per_server_core = qps / sum(result->server_cores(), Cores); result->mutable_summary()->set_qps(qps); @@ -157,6 +158,23 @@ static void postprocess_scenario_result(ScenarioResult* result) { result->mutable_summary()->set_server_user_time(server_user_time); result->mutable_summary()->set_client_system_time(client_system_time); result->mutable_summary()->set_client_user_time(client_user_time); + + if (result->request_results_size() > 0) { + int64_t successes = 0; + int64_t failures = 0; + for (int i = 0; i < result->request_results_size(); i++) { + RequestResultCount rrc = result->request_results(i); + if (rrc.status_code() == 0) { + successes += rrc.count(); + } else { + failures += rrc.count(); + } + } + result->mutable_summary()->set_successful_requests_per_second( + successes / time_estimate); + result->mutable_summary()->set_failed_requests_per_second(failures / + time_estimate); + } } // Namespace for classes and functions used only in RunScenario @@ -444,6 +462,7 @@ std::unique_ptr<ScenarioResult> RunScenario( // Finish a run std::unique_ptr<ScenarioResult> result(new ScenarioResult); Histogram merged_latencies; + std::unordered_map<int, int64_t> merged_statuses; gpr_log(GPR_INFO, "Finishing clients"); for (size_t i = 0; i < num_clients; i++) { @@ -462,6 +481,10 @@ std::unique_ptr<ScenarioResult> RunScenario( gpr_log(GPR_INFO, "Received final status from client %zu", i); const auto& stats = client_status.stats(); merged_latencies.MergeProto(stats.latencies()); + for (int i = 0; i < stats.request_results_size(); i++) { + merged_statuses[stats.request_results(i).status_code()] += + stats.request_results(i).count(); + } result->add_client_stats()->CopyFrom(stats); // That final status should be the last message on the client stream GPR_ASSERT(!client->stream->Read(&client_status)); @@ -481,6 +504,12 @@ std::unique_ptr<ScenarioResult> RunScenario( delete[] clients; merged_latencies.FillProto(result->mutable_latencies()); + for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin(); + it != merged_statuses.end(); ++it) { + RequestResultCount* rrc = result->add_request_results(); + rrc->set_status_code(it->first); + rrc->set_count(it->second); + } gpr_log(GPR_INFO, "Finishing servers"); for (size_t i = 0; i < num_servers; i++) { diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index 2ec7d8676c..41617e968a 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -73,6 +73,12 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) { void GprLogReporter::ReportQPS(const ScenarioResult& result) { gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps()); + if (result.summary().failed_requests_per_second() > 0) { + gpr_log(GPR_INFO, "failed requests/second: %.1f", + result.summary().failed_requests_per_second()); + gpr_log(GPR_INFO, "successful requests/second: %.1f", + result.summary().successful_requests_per_second()); + } } void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 2fcc64819b..bc4c896d83 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -38,6 +38,7 @@ #include <thread> #include <grpc++/generic/async_generic_service.h> +#include <grpc++/resource_quota.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> @@ -95,6 +96,11 @@ class AsyncQpsServerTest GRPC_FINAL : public grpc::testing::Server { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } + if (config.resource_quota_size() > 0) { + builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest") + .Resize(config.resource_quota_size())); + } + server_ = builder.BuildAndStart(); using namespace std::placeholders; diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 0caed0ab49..07f48e2644 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -31,6 +31,7 @@ * */ +#include <grpc++/resource_quota.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> #include <grpc++/server_builder.h> @@ -91,6 +92,11 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server { Server::CreateServerCredentials(config)); gpr_free(server_address); + if (config.resource_quota_size() > 0) { + builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest") + .Resize(config.resource_quota_size())); + } + builder.RegisterService(&service_); impl_ = builder.BuildAndStart(); diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc new file mode 100644 index 0000000000..5c70103947 --- /dev/null +++ b/test/cpp/thread_manager/thread_manager_test.cc @@ -0,0 +1,136 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + *is % allowed in string + */ + +#include <memory> +#include <string> + +#include <gflags/gflags.h> +#include <grpc++/grpc++.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +#include "src/cpp/thread_manager/thread_manager.h" +#include "test/cpp/util/test_config.h" + +namespace grpc { +class ThreadManagerTest GRPC_FINAL : public grpc::ThreadManager { + public: + ThreadManagerTest() + : ThreadManager(kMinPollers, kMaxPollers), + num_do_work_(0), + num_poll_for_work_(0), + num_work_found_(0) {} + + grpc::ThreadManager::WorkStatus PollForWork(void **tag, + bool *ok) GRPC_OVERRIDE; + void DoWork(void *tag, bool ok) GRPC_OVERRIDE; + void PerformTest(); + + private: + void SleepForMs(int sleep_time_ms); + + static const int kMinPollers = 2; + static const int kMaxPollers = 10; + + static const int kPollingTimeoutMsec = 10; + static const int kDoWorkDurationMsec = 1; + + // PollForWork will return SHUTDOWN after these many number of invocations + static const int kMaxNumPollForWork = 50; + + gpr_atm num_do_work_; // Number of calls to DoWork + gpr_atm num_poll_for_work_; // Number of calls to PollForWork + gpr_atm num_work_found_; // Number of times WORK_FOUND was returned +}; + +void ThreadManagerTest::SleepForMs(int duration_ms) { + gpr_timespec sleep_time = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(duration_ms, GPR_TIMESPAN)); + gpr_sleep_until(sleep_time); +} + +grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void **tag, + bool *ok) { + int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1); + + if (call_num >= kMaxNumPollForWork) { + Shutdown(); + return SHUTDOWN; + } + + // Simulate "polling for work" by sleeping for sometime + SleepForMs(kPollingTimeoutMsec); + + *tag = nullptr; + *ok = true; + + // Return timeout roughly 1 out of every 3 calls + if (call_num % 3 == 0) { + return TIMEOUT; + } else { + gpr_atm_no_barrier_fetch_add(&num_work_found_, 1); + return WORK_FOUND; + } +} + +void ThreadManagerTest::DoWork(void *tag, bool ok) { + gpr_atm_no_barrier_fetch_add(&num_do_work_, 1); + SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping +} + +void ThreadManagerTest::PerformTest() { + // Initialize() starts the ThreadManager + Initialize(); + + // Wait for all the threads to gracefully terminate + Wait(); + + // The number of times DoWork() was called is equal to the number of times + // WORK_FOUND was returned + gpr_log(GPR_DEBUG, "DoWork() called %ld times", + gpr_atm_no_barrier_load(&num_do_work_)); + GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) == + gpr_atm_no_barrier_load(&num_work_found_)); +} +} // namespace grpc + +int main(int argc, char **argv) { + std::srand(std::time(NULL)); + + grpc::testing::InitTest(&argc, &argv, true); + grpc::ThreadManagerTest test_rpc_manager; + test_rpc_manager.PerformTest(); + + return 0; +} |