aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-10-31 14:20:01 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-10-31 14:20:01 -0700
commit7e0c2f8301e6f76f5e8d38d1a3f891da20147c39 (patch)
treecf21380503e17debfbbcecb803af26a5d36897b3 /test/cpp
parentcb356b26eaf8cfde85166a7f6a4b3b8b6b52f839 (diff)
parentccc6a9cbf264655ae6b60727cd86b987a62977c9 (diff)
Merge remote-tracking branch 'upstream/master' into lb_policy_name_channel_arg
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/end2end/end2end_test.cc49
-rw-r--r--test/cpp/interop/client.cc62
-rw-r--r--test/cpp/interop/interop_client.cc32
-rw-r--r--test/cpp/interop/interop_client.h2
-rw-r--r--test/cpp/qps/client.h57
-rw-r--r--test/cpp/qps/client_async.cc11
-rw-r--r--test/cpp/qps/client_sync.cc7
-rw-r--r--test/cpp/qps/driver.cc31
-rw-r--r--test/cpp/qps/report.cc6
-rw-r--r--test/cpp/qps/server_async.cc6
-rw-r--r--test/cpp/qps/server_sync.cc6
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc136
12 files changed, 349 insertions, 56 deletions
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 7af0fb997e..43b7d44255 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -37,6 +37,7 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
+#include <grpc++/resource_quota.h>
#include <grpc++/security/auth_metadata_processor.h>
#include <grpc++/security/credentials.h>
#include <grpc++/security/server_credentials.h>
@@ -240,6 +241,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
server_address_ << "127.0.0.1:" << port;
// Setup server
ServerBuilder builder;
+ ConfigureServerBuilder(&builder);
auto server_creds = GetServerCredentials(GetParam().credentials_type);
if (GetParam().credentials_type != kInsecureCredentialsType) {
server_creds->SetAuthMetadataProcessor(processor);
@@ -247,13 +249,21 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
builder.RegisterService("foo.test.youtube.com", &special_service_);
- builder.SetMaxMessageSize(
- kMaxMessageSize_); // For testing max message size.
builder.RegisterService(&dup_pkg_service_);
+
+ builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
+ builder.SetSyncServerOption(
+ ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
+
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
+ virtual void ConfigureServerBuilder(ServerBuilder* builder) {
+ builder->SetMaxMessageSize(
+ kMaxMessageSize_); // For testing max message size.
+ }
+
void ResetChannel() {
if (!is_server_started_) {
StartServer(std::shared_ptr<AuthMetadataProcessor>());
@@ -279,6 +289,11 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
ServerBuilder builder;
builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
builder.RegisterService(proxy_service_.get());
+
+ builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
+ builder.SetSyncServerOption(
+ ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
+
proxy_server_ = builder.BuildAndStart();
channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
@@ -1476,6 +1491,32 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) {
}
}
+class ResourceQuotaEnd2endTest : public End2endTest {
+ public:
+ ResourceQuotaEnd2endTest()
+ : server_resource_quota_("server_resource_quota") {}
+
+ virtual void ConfigureServerBuilder(ServerBuilder* builder) GRPC_OVERRIDE {
+ builder->SetResourceQuota(server_resource_quota_);
+ }
+
+ private:
+ ResourceQuota server_resource_quota_;
+};
+
+TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
+ ResetStub();
+
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("Hello");
+
+ ClientContext context;
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok());
+}
+
std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
bool test_insecure,
bool test_secure) {
@@ -1513,6 +1554,10 @@ INSTANTIATE_TEST_CASE_P(SecureEnd2end, SecureEnd2endTest,
::testing::ValuesIn(CreateTestScenarios(false, false,
true)));
+INSTANTIATE_TEST_CASE_P(ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
+ ::testing::ValuesIn(CreateTestScenarios(false, true,
+ true)));
+
} // namespace
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 4197ba8bab..c58910abc3 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -54,33 +54,35 @@ DEFINE_int32(server_port, 0, "Server port.");
DEFINE_string(server_host, "127.0.0.1", "Server host to connect to");
DEFINE_string(server_host_override, "foo.test.google.fr",
"Override the server host which is sent in HTTP header");
-DEFINE_string(test_case, "large_unary",
- "Configure different test cases. Valid options are:\n\n"
- "all : all test cases;\n"
- "cancel_after_begin : cancel stream after starting it;\n"
- "cancel_after_first_response: cancel on first response;\n"
- "client_compressed_streaming : compressed request streaming with "
- "client_compressed_unary : single compressed request;\n"
- "client_streaming : request streaming with single response;\n"
- "compute_engine_creds: large_unary with compute engine auth;\n"
- "custom_metadata: server will echo custom metadata;\n"
- "empty_stream : bi-di stream with no request/response;\n"
- "empty_unary : empty (zero bytes) request and response;\n"
- "half_duplex : half-duplex streaming;\n"
- "jwt_token_creds: large_unary with JWT token auth;\n"
- "large_unary : single request and (large) response;\n"
- "oauth2_auth_token: raw oauth2 access token auth;\n"
- "per_rpc_creds: raw oauth2 access token on a single rpc;\n"
- "ping_pong : full-duplex streaming;\n"
- "response streaming;\n"
- "server_compressed_streaming : single request with compressed "
- "server_compressed_unary : single compressed response;\n"
- "server_streaming : single request with response streaming;\n"
- "slow_consumer : single request with response streaming with "
- "slow client consumer;\n"
- "status_code_and_message: verify status code & message;\n"
- "timeout_on_sleeping_server: deadline exceeds on stream;\n"
- "unimplemented_method: client calls an unimplemented method;\n");
+DEFINE_string(
+ test_case, "large_unary",
+ "Configure different test cases. Valid options are:\n\n"
+ "all : all test cases;\n"
+ "cancel_after_begin : cancel stream after starting it;\n"
+ "cancel_after_first_response: cancel on first response;\n"
+ "client_compressed_streaming : compressed request streaming with "
+ "client_compressed_unary : single compressed request;\n"
+ "client_streaming : request streaming with single response;\n"
+ "compute_engine_creds: large_unary with compute engine auth;\n"
+ "custom_metadata: server will echo custom metadata;\n"
+ "empty_stream : bi-di stream with no request/response;\n"
+ "empty_unary : empty (zero bytes) request and response;\n"
+ "half_duplex : half-duplex streaming;\n"
+ "jwt_token_creds: large_unary with JWT token auth;\n"
+ "large_unary : single request and (large) response;\n"
+ "oauth2_auth_token: raw oauth2 access token auth;\n"
+ "per_rpc_creds: raw oauth2 access token on a single rpc;\n"
+ "ping_pong : full-duplex streaming;\n"
+ "response streaming;\n"
+ "server_compressed_streaming : single request with compressed "
+ "server_compressed_unary : single compressed response;\n"
+ "server_streaming : single request with response streaming;\n"
+ "slow_consumer : single request with response streaming with "
+ "slow client consumer;\n"
+ "status_code_and_message: verify status code & message;\n"
+ "timeout_on_sleeping_server: deadline exceeds on stream;\n"
+ "unimplemented_method: client calls an unimplemented method;\n"
+ "unimplemented_service: client calls an unimplemented service;\n");
DEFINE_string(default_service_account, "",
"Email of GCE default service account");
DEFINE_string(service_account_key_file, "",
@@ -152,6 +154,8 @@ int main(int argc, char** argv) {
client.DoCustomMetadata();
} else if (FLAGS_test_case == "unimplemented_method") {
client.DoUnimplementedMethod();
+ } else if (FLAGS_test_case == "unimplemented_service") {
+ client.DoUnimplementedService();
} else if (FLAGS_test_case == "cacheable_unary") {
client.DoCacheableUnary();
} else if (FLAGS_test_case == "all") {
@@ -172,6 +176,7 @@ int main(int argc, char** argv) {
client.DoStatusWithMessage();
client.DoCustomMetadata();
client.DoUnimplementedMethod();
+ client.DoUnimplementedService();
client.DoCacheableUnary();
// service_account_creds and jwt_token_creds can only run with ssl.
if (FLAGS_use_tls) {
@@ -207,7 +212,8 @@ int main(int argc, char** argv) {
"server_streaming",
"status_code_and_message",
"timeout_on_sleeping_server",
- "unimplemented_method"};
+ "unimplemented_method",
+ "unimplemented_service"};
char* joined_testcases =
gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL);
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index 1668589cc4..d1242627ef 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -107,6 +107,11 @@ TestService::Stub* InteropClient::ServiceStub::Get() {
return stub_.get();
}
+UnimplementedService::Stub*
+InteropClient::ServiceStub::GetUnimplementedServiceStub() {
+ return UnimplementedService::NewStub(channel_).get();
+}
+
void InteropClient::ServiceStub::Reset(std::shared_ptr<Channel> channel) {
channel_ = channel;
@@ -162,8 +167,8 @@ bool InteropClient::AssertStatusCode(const Status& s,
bool InteropClient::DoEmpty() {
gpr_log(GPR_DEBUG, "Sending an empty rpc...");
- Empty request = Empty::default_instance();
- Empty response = Empty::default_instance();
+ Empty request;
+ Empty response;
ClientContext context;
Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
@@ -1002,11 +1007,30 @@ bool InteropClient::DoCustomMetadata() {
return true;
}
+bool InteropClient::DoUnimplementedService() {
+ gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
+
+ Empty request;
+ Empty response;
+ ClientContext context;
+
+ UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
+
+ Status s = stub->UnimplementedCall(&context, request, &response);
+
+ if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED)) {
+ return false;
+ }
+
+ gpr_log(GPR_DEBUG, "unimplemented service done.");
+ return true;
+}
+
bool InteropClient::DoUnimplementedMethod() {
gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc...");
- Empty request = Empty::default_instance();
- Empty response = Empty::default_instance();
+ Empty request;
+ Empty response;
ClientContext context;
Status s =
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index 0a96e7734d..7ec7ebee20 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -80,6 +80,7 @@ class InteropClient {
bool DoStatusWithMessage();
bool DoCustomMetadata();
bool DoUnimplementedMethod();
+ bool DoUnimplementedService();
bool DoCacheableUnary();
// Auth tests.
// username is a string containing the user email
@@ -100,6 +101,7 @@ class InteropClient {
ServiceStub(std::shared_ptr<Channel> channel, bool new_stub_every_call);
TestService::Stub* Get();
+ UnimplementedService::Stub* GetUnimplementedServiceStub();
void Reset(std::shared_ptr<Channel> channel);
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 5fb87b2782..9983c8a7b0 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -36,6 +36,7 @@
#include <condition_variable>
#include <mutex>
+#include <unordered_map>
#include <vector>
#include <grpc++/channel.h>
@@ -114,19 +115,37 @@ class ClientRequestCreator<ByteBuffer> {
class HistogramEntry GRPC_FINAL {
public:
- HistogramEntry() : used_(false) {}
- bool used() const { return used_; }
+ HistogramEntry() : value_used_(false), status_used_(false) {}
+ bool value_used() const { return value_used_; }
double value() const { return value_; }
void set_value(double v) {
- used_ = true;
+ value_used_ = true;
value_ = v;
}
+ bool status_used() const { return status_used_; }
+ int status() const { return status_; }
+ void set_status(int status) {
+ status_used_ = true;
+ status_ = status;
+ }
private:
- bool used_;
+ bool value_used_;
double value_;
+ bool status_used_;
+ int status_;
};
+typedef std::unordered_map<int, int64_t> StatusHistogram;
+
+inline void MergeStatusHistogram(const StatusHistogram& from,
+ StatusHistogram* to) {
+ for (StatusHistogram::const_iterator it = from.begin(); it != from.end();
+ ++it) {
+ (*to)[it->first] += it->second;
+ }
+}
+
class Client {
public:
Client()
@@ -139,6 +158,7 @@ class Client {
ClientStats Mark(bool reset) {
Histogram latencies;
+ StatusHistogram statuses;
UsageTimer::Result timer_result;
MaybeStartRequests();
@@ -146,27 +166,36 @@ class Client {
// avoid std::vector for old compilers that expect a copy constructor
if (reset) {
Histogram* to_merge = new Histogram[threads_.size()];
+ StatusHistogram* to_merge_status = new StatusHistogram[threads_.size()];
+
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->BeginSwap(&to_merge[i]);
+ threads_[i]->BeginSwap(&to_merge[i], &to_merge_status[i]);
}
std::unique_ptr<UsageTimer> timer(new UsageTimer);
timer_.swap(timer);
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->EndSwap();
latencies.Merge(to_merge[i]);
+ MergeStatusHistogram(to_merge_status[i], &statuses);
}
delete[] to_merge;
+ delete[] to_merge_status;
timer_result = timer->Mark();
} else {
// merge snapshots of each thread histogram
for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->MergeStatsInto(&latencies);
+ threads_[i]->MergeStatsInto(&latencies, &statuses);
}
timer_result = timer_->Mark();
}
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
+ for (StatusHistogram::const_iterator it = statuses.begin();
+ it != statuses.end(); ++it) {
+ RequestResultCount* rrc = stats.add_request_results();
+ rrc->set_status_code(it->first);
+ rrc->set_count(it->second);
+ }
stats.set_time_elapsed(timer_result.wall);
stats.set_time_system(timer_result.system);
stats.set_time_user(timer_result.user);
@@ -258,16 +287,16 @@ class Client {
~Thread() { impl_.join(); }
- void BeginSwap(Histogram* n) {
+ void BeginSwap(Histogram* n, StatusHistogram* s) {
std::lock_guard<std::mutex> g(mu_);
n->Swap(&histogram_);
+ s->swap(statuses_);
}
- void EndSwap() {}
-
- void MergeStatsInto(Histogram* hist) {
+ void MergeStatsInto(Histogram* hist, StatusHistogram* s) {
std::unique_lock<std::mutex> g(mu_);
hist->Merge(histogram_);
+ MergeStatusHistogram(statuses_, s);
}
private:
@@ -288,9 +317,12 @@ class Client {
const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
// lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
- if (entry.used()) {
+ if (entry.value_used()) {
histogram_.Add(entry.value());
}
+ if (entry.status_used()) {
+ statuses_[entry.status()]++;
+ }
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
}
@@ -304,6 +336,7 @@ class Client {
std::mutex mu_;
Histogram histogram_;
+ StatusHistogram statuses_;
Client* client_;
const size_t idx_;
std::thread impl_;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 081114859c..4d36a6ba42 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -83,7 +83,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)>
start_req,
- std::function<void(grpc::Status, ResponseType*)> on_done)
+ std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
: context_(),
stub_(stub),
cq_(nullptr),
@@ -113,7 +113,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return true;
case State::RESP_DONE:
entry->set_value((UsageTimer::Now() - start_) * 1e9);
- callback_(status_, &response_);
+ callback_(status_, &response_, entry);
next_state_ = State::INVALID;
return false;
default:
@@ -135,7 +135,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
ResponseType response_;
enum State { INVALID, READY, RESP_DONE };
State next_state_;
- std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;
std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
@@ -290,7 +290,10 @@ class AsyncUnaryClient GRPC_FINAL
~AsyncUnaryClient() GRPC_OVERRIDE {}
private:
- static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static void CheckDone(grpc::Status s, SimpleResponse* response,
+ HistogramEntry* entry) {
+ entry->set_status(s.error_code());
+ }
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 0ccf4e270b..f61e80d76b 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -130,11 +130,8 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
entry->set_value((UsageTimer::Now() - start) * 1e9);
- if (!s.ok()) {
- gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
- s.error_message().c_str());
- }
- return s.ok();
+ entry->set_status(s.error_code());
+ return true;
}
};
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 6c675e3483..a440341ccf 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -132,7 +132,8 @@ static void postprocess_scenario_result(ScenarioResult* result) {
Histogram histogram;
histogram.MergeProto(result->latencies());
- auto qps = histogram.Count() / average(result->client_stats(), WallTime);
+ auto time_estimate = average(result->client_stats(), WallTime);
+ auto qps = histogram.Count() / time_estimate;
auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
result->mutable_summary()->set_qps(qps);
@@ -157,6 +158,23 @@ static void postprocess_scenario_result(ScenarioResult* result) {
result->mutable_summary()->set_server_user_time(server_user_time);
result->mutable_summary()->set_client_system_time(client_system_time);
result->mutable_summary()->set_client_user_time(client_user_time);
+
+ if (result->request_results_size() > 0) {
+ int64_t successes = 0;
+ int64_t failures = 0;
+ for (int i = 0; i < result->request_results_size(); i++) {
+ RequestResultCount rrc = result->request_results(i);
+ if (rrc.status_code() == 0) {
+ successes += rrc.count();
+ } else {
+ failures += rrc.count();
+ }
+ }
+ result->mutable_summary()->set_successful_requests_per_second(
+ successes / time_estimate);
+ result->mutable_summary()->set_failed_requests_per_second(failures /
+ time_estimate);
+ }
}
// Namespace for classes and functions used only in RunScenario
@@ -444,6 +462,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Finish a run
std::unique_ptr<ScenarioResult> result(new ScenarioResult);
Histogram merged_latencies;
+ std::unordered_map<int, int64_t> merged_statuses;
gpr_log(GPR_INFO, "Finishing clients");
for (size_t i = 0; i < num_clients; i++) {
@@ -462,6 +481,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_log(GPR_INFO, "Received final status from client %zu", i);
const auto& stats = client_status.stats();
merged_latencies.MergeProto(stats.latencies());
+ for (int i = 0; i < stats.request_results_size(); i++) {
+ merged_statuses[stats.request_results(i).status_code()] +=
+ stats.request_results(i).count();
+ }
result->add_client_stats()->CopyFrom(stats);
// That final status should be the last message on the client stream
GPR_ASSERT(!client->stream->Read(&client_status));
@@ -481,6 +504,12 @@ std::unique_ptr<ScenarioResult> RunScenario(
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
+ for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
+ it != merged_statuses.end(); ++it) {
+ RequestResultCount* rrc = result->add_request_results();
+ rrc->set_status_code(it->first);
+ rrc->set_count(it->second);
+ }
gpr_log(GPR_INFO, "Finishing servers");
for (size_t i = 0; i < num_servers; i++) {
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 2ec7d8676c..41617e968a 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -73,6 +73,12 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) {
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
+ if (result.summary().failed_requests_per_second() > 0) {
+ gpr_log(GPR_INFO, "failed requests/second: %.1f",
+ result.summary().failed_requests_per_second());
+ gpr_log(GPR_INFO, "successful requests/second: %.1f",
+ result.summary().successful_requests_per_second());
+ }
}
void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 2fcc64819b..bc4c896d83 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -38,6 +38,7 @@
#include <thread>
#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/resource_quota.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
@@ -95,6 +96,11 @@ class AsyncQpsServerTest GRPC_FINAL : public grpc::testing::Server {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
+ if (config.resource_quota_size() > 0) {
+ builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
+ .Resize(config.resource_quota_size()));
+ }
+
server_ = builder.BuildAndStart();
using namespace std::placeholders;
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index 0caed0ab49..07f48e2644 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -31,6 +31,7 @@
*
*/
+#include <grpc++/resource_quota.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
@@ -91,6 +92,11 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
Server::CreateServerCredentials(config));
gpr_free(server_address);
+ if (config.resource_quota_size() > 0) {
+ builder.SetResourceQuota(ResourceQuota("AsyncQpsServerTest")
+ .Resize(config.resource_quota_size()));
+ }
+
builder.RegisterService(&service_);
impl_ = builder.BuildAndStart();
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
new file mode 100644
index 0000000000..5c70103947
--- /dev/null
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -0,0 +1,136 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *is % allowed in string
+ */
+
+#include <memory>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <grpc++/grpc++.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+
+#include "src/cpp/thread_manager/thread_manager.h"
+#include "test/cpp/util/test_config.h"
+
+namespace grpc {
+class ThreadManagerTest GRPC_FINAL : public grpc::ThreadManager {
+ public:
+ ThreadManagerTest()
+ : ThreadManager(kMinPollers, kMaxPollers),
+ num_do_work_(0),
+ num_poll_for_work_(0),
+ num_work_found_(0) {}
+
+ grpc::ThreadManager::WorkStatus PollForWork(void **tag,
+ bool *ok) GRPC_OVERRIDE;
+ void DoWork(void *tag, bool ok) GRPC_OVERRIDE;
+ void PerformTest();
+
+ private:
+ void SleepForMs(int sleep_time_ms);
+
+ static const int kMinPollers = 2;
+ static const int kMaxPollers = 10;
+
+ static const int kPollingTimeoutMsec = 10;
+ static const int kDoWorkDurationMsec = 1;
+
+ // PollForWork will return SHUTDOWN after these many number of invocations
+ static const int kMaxNumPollForWork = 50;
+
+ gpr_atm num_do_work_; // Number of calls to DoWork
+ gpr_atm num_poll_for_work_; // Number of calls to PollForWork
+ gpr_atm num_work_found_; // Number of times WORK_FOUND was returned
+};
+
+void ThreadManagerTest::SleepForMs(int duration_ms) {
+ gpr_timespec sleep_time =
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(duration_ms, GPR_TIMESPAN));
+ gpr_sleep_until(sleep_time);
+}
+
+grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void **tag,
+ bool *ok) {
+ int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1);
+
+ if (call_num >= kMaxNumPollForWork) {
+ Shutdown();
+ return SHUTDOWN;
+ }
+
+ // Simulate "polling for work" by sleeping for sometime
+ SleepForMs(kPollingTimeoutMsec);
+
+ *tag = nullptr;
+ *ok = true;
+
+ // Return timeout roughly 1 out of every 3 calls
+ if (call_num % 3 == 0) {
+ return TIMEOUT;
+ } else {
+ gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
+ return WORK_FOUND;
+ }
+}
+
+void ThreadManagerTest::DoWork(void *tag, bool ok) {
+ gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
+ SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping
+}
+
+void ThreadManagerTest::PerformTest() {
+ // Initialize() starts the ThreadManager
+ Initialize();
+
+ // Wait for all the threads to gracefully terminate
+ Wait();
+
+ // The number of times DoWork() was called is equal to the number of times
+ // WORK_FOUND was returned
+ gpr_log(GPR_DEBUG, "DoWork() called %ld times",
+ gpr_atm_no_barrier_load(&num_do_work_));
+ GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) ==
+ gpr_atm_no_barrier_load(&num_work_found_));
+}
+} // namespace grpc
+
+int main(int argc, char **argv) {
+ std::srand(std::time(NULL));
+
+ grpc::testing::InitTest(&argc, &argv, true);
+ grpc::ThreadManagerTest test_rpc_manager;
+ test_rpc_manager.PerformTest();
+
+ return 0;
+}