aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/common/channel_filter_test.cc3
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc17
-rw-r--r--test/cpp/end2end/filter_end2end_test.cc3
-rw-r--r--test/cpp/end2end/thread_stress_test.cc117
-rw-r--r--test/cpp/interop/client_helper.cc20
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc83
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc153
-rw-r--r--test/cpp/util/BUILD18
8 files changed, 263 insertions, 151 deletions
diff --git a/test/cpp/common/channel_filter_test.cc b/test/cpp/common/channel_filter_test.cc
index 9b603ca5b4..7bdd53f9e7 100644
--- a/test/cpp/common/channel_filter_test.cc
+++ b/test/cpp/common/channel_filter_test.cc
@@ -50,8 +50,7 @@ class MyCallData : public CallData {
// C-core, we don't accidentally break the C++ filter API.
TEST(ChannelFilterTest, RegisterChannelFilter) {
grpc::RegisterChannelFilter<MyChannelData, MyCallData>(
- "myfilter", GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_LOW, true,
- nullptr);
+ "myfilter", GRPC_CLIENT_CHANNEL, INT_MAX, nullptr);
}
// TODO(roth): When we have time, add tests for all methods of the
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index d0b7e79654..2dbf9fc6b6 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -380,6 +380,23 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
+TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
+ StartServers(1); // Single server
+ auto channel = BuildChannel(""); // test that pick first is the default.
+ auto stub = BuildStub(channel);
+ SetNextResolution({servers_[0]->port_});
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+ // Create a new channel and its corresponding PF LB policy, which will pick
+ // the subchannels in READY state from the previous RPC against the same
+ // target (even if it happened over a different channel, because subchannels
+ // are globally reused). Progress should happen without any transition from
+ // this READY state.
+ auto second_channel = BuildChannel("");
+ auto second_stub = BuildStub(second_channel);
+ SetNextResolution({servers_[0]->port_});
+ CheckRpcSendOk(second_stub, DEBUG_LOCATION);
+}
+
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
ChannelArguments args;
constexpr int kInitialBackOffMs = 100;
diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc
index a8022823b1..88f8f380c3 100644
--- a/test/cpp/end2end/filter_end2end_test.cc
+++ b/test/cpp/end2end/filter_end2end_test.cc
@@ -323,8 +323,7 @@ TEST_F(FilterEnd2endTest, SimpleBidiStreaming) {
void RegisterFilter() {
grpc::RegisterChannelFilter<ChannelDataImpl, CallDataImpl>(
- "test-filter", GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_LOW, true,
- nullptr);
+ "test-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
}
} // namespace
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index ccf8400a87..1a5ed28a2c 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -16,6 +16,7 @@
*
*/
+#include <cinttypes>
#include <mutex>
#include <thread>
@@ -24,6 +25,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
+#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
@@ -51,63 +53,13 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
- TestServiceImpl() : signal_client_(false) {}
+ TestServiceImpl() {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
response->set_message(request->message());
return Status::OK;
}
-
- // Unimplemented is left unimplemented to test the returned error.
-
- Status RequestStream(ServerContext* context,
- ServerReader<EchoRequest>* reader,
- EchoResponse* response) override {
- EchoRequest request;
- response->set_message("");
- while (reader->Read(&request)) {
- response->mutable_message()->append(request.message());
- }
- return Status::OK;
- }
-
- // Return 3 messages.
- // TODO(yangg) make it generic by adding a parameter into EchoRequest
- Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) override {
- EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
-
- return Status::OK;
- }
-
- Status BidiStream(
- ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
- EchoRequest request;
- EchoResponse response;
- while (stream->Read(&request)) {
- gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
- response.set_message(request.message());
- stream->Write(response);
- }
- return Status::OK;
- }
-
- bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
- return signal_client_;
- }
-
- private:
- bool signal_client_;
- std::mutex mu_;
};
template <class Service>
@@ -118,6 +70,7 @@ class CommonStressTest {
virtual void SetUp() = 0;
virtual void TearDown() = 0;
virtual void ResetStub() = 0;
+ virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected:
@@ -146,6 +99,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return false; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -161,7 +115,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_;
};
-template <class Service>
+template <class Service, bool allow_resource_exhaustion>
class CommonStressTestInproc : public CommonStressTest<Service> {
public:
void ResetStub() override {
@@ -169,6 +123,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -194,6 +149,26 @@ class CommonStressTestSyncServer : public BaseClass {
};
template <class BaseClass>
+class CommonStressTestSyncServerLowThreadCount : public BaseClass {
+ public:
+ void SetUp() override {
+ ServerBuilder builder;
+ ResourceQuota quota;
+ this->SetUpStart(&builder, &service_);
+ quota.SetMaxThreads(4);
+ builder.SetResourceQuota(quota);
+ this->SetUpEnd(&builder);
+ }
+ void TearDown() override {
+ this->TearDownStart();
+ this->TearDownEnd();
+ }
+
+ private:
+ TestServiceImpl service_;
+};
+
+template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass {
public:
CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
@@ -293,7 +268,8 @@ class End2endTest : public ::testing::Test {
Common common_;
};
-static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
+static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
+ bool allow_exhaustion, gpr_atm* errors) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@@ -301,34 +277,53 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
- EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok() || (allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
if (!s.ok()) {
- gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
- s.error_message().c_str());
+ if (!(allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
+ gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
+ s.error_message().c_str());
+ }
+ gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
+ } else {
+ EXPECT_EQ(response.message(), request.message());
}
- ASSERT_TRUE(s.ok());
}
}
typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
- CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>,
+ CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
+ CommonStressTestSyncServerLowThreadCount<
+ CommonStressTestInproc<TestServiceImpl, true>>,
CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
- CommonStressTestAsyncServer<
- CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>>
+ CommonStressTestAsyncServer<CommonStressTestInproc<
+ grpc::testing::EchoTestService::AsyncService, false>>>
CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread> threads;
+ gpr_atm errors;
+ gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; ++i) {
- threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
+ threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
+ this->common_.AllowExhaustion(), &errors);
}
for (int i = 0; i < kNumThreads; ++i) {
threads[i].join();
}
+ uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
+ if (error_cnt != 0) {
+ gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
+ }
+ // If this test allows resource exhaustion, expect that it actually sees some
+ if (this->common_.AllowExhaustion()) {
+ EXPECT_GT(error_cnt, static_cast<uint64_t>(0));
+ }
}
template <class Common>
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index 29b5a1ed6c..fb7b7bb7d0 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -88,20 +88,20 @@ std::shared_ptr<Channel> CreateChannelForTestCase(
std::shared_ptr<CallCredentials> creds;
if (test_case == "compute_engine_creds") {
- GPR_ASSERT(FLAGS_use_tls);
- creds = GoogleComputeEngineCredentials();
- GPR_ASSERT(creds);
+ creds = FLAGS_custom_credentials_type == "google_default_credentials"
+ ? nullptr
+ : GoogleComputeEngineCredentials();
} else if (test_case == "jwt_token_creds") {
- GPR_ASSERT(FLAGS_use_tls);
grpc::string json_key = GetServiceAccountJsonKey();
std::chrono::seconds token_lifetime = std::chrono::hours(1);
- creds =
- ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
- GPR_ASSERT(creds);
+ creds = FLAGS_custom_credentials_type == "google_default_credentials"
+ ? nullptr
+ : ServiceAccountJWTAccessCredentials(json_key,
+ token_lifetime.count());
} else if (test_case == "oauth2_auth_token") {
- grpc::string raw_token = GetOauth2AccessToken();
- creds = AccessTokenCredentials(raw_token);
- GPR_ASSERT(creds);
+ creds = FLAGS_custom_credentials_type == "google_default_credentials"
+ ? nullptr
+ : AccessTokenCredentials(GetOauth2AccessToken());
}
if (FLAGS_custom_credentials_type.empty()) {
transport_security security_type =
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index da095c3e68..85767c8758 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -34,15 +34,15 @@ struct grpc_pollset {
gpr_mu mu;
};
+static gpr_mu g_mu;
+static gpr_cv g_cv;
+static int g_threads_active;
+static bool g_active;
+
namespace grpc {
namespace testing {
-
-auto& force_library_initialization = Library::get();
-
-static void* g_tag = (void*)static_cast<intptr_t>(10); // Some random number
static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable;
-static const grpc_event_engine_vtable* g_old_vtable;
static void pollset_shutdown(grpc_pollset* ps, grpc_closure* closure) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
@@ -74,16 +74,18 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker,
}
gpr_mu_unlock(&ps->mu);
- GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag));
+
+ void* tag = (void*)static_cast<intptr_t>(10); // Some random number
+ GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
grpc_cq_end_op(
- g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
+ g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(&ps->mu);
return GRPC_ERROR_NONE;
}
-static void init_engine_vtable() {
+static const grpc_event_engine_vtable* init_engine_vtable(bool) {
memset(&g_vtable, 0, sizeof(g_vtable));
g_vtable.pollset_size = sizeof(grpc_pollset);
@@ -92,17 +94,23 @@ static void init_engine_vtable() {
g_vtable.pollset_destroy = pollset_destroy;
g_vtable.pollset_work = pollset_work;
g_vtable.pollset_kick = pollset_kick;
+ g_vtable.shutdown_engine = [] {};
+
+ return &g_vtable;
}
static void setup() {
- grpc_init();
+ // This test should only ever be run with a non or any polling engine
+ // Override the polling engine for the non-polling engine
+ // and add a custom polling engine
+ grpc_register_event_engine_factory("none", init_engine_vtable, false);
+ grpc_register_event_engine_factory("bm_cq_multiple_threads",
+ init_engine_vtable, true);
- /* Override the event engine with our test event engine (g_vtable); but before
- * that, save the current event engine in g_old_vtable. We will have to set
- * g_old_vtable back before calling grpc_shutdown() */
- init_engine_vtable();
- g_old_vtable = grpc_get_event_engine_test_only();
- grpc_set_event_engine_test_only(&g_vtable);
+ grpc_init();
+ GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
+ strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
+ 0);
g_cq = grpc_completion_queue_create_for_next(nullptr);
}
@@ -118,9 +126,6 @@ static void teardown() {
}
grpc_completion_queue_destroy(g_cq);
-
- /* Restore the old event engine before calling grpc_shutdown */
- grpc_set_event_engine_test_only(g_old_vtable);
grpc_shutdown();
}
@@ -137,14 +142,33 @@ static void teardown() {
code (i.e the code between two successive calls of state.KeepRunning()) if
state.KeepRunning() returns false. So it is safe to do the teardown in one
of the threads after state.keepRunning() returns false.
+
+ However, our use requires synchronization because we do additional work at
+ each thread that requires specific ordering (TrackCounters must be constructed
+ after grpc_init because it needs the number of cores, initialized by grpc,
+ and its Finish call must take place before grpc_shutdown so that it can use
+ grpc_stats).
*/
static void BM_Cq_Throughput(benchmark::State& state) {
- TrackCounters track_counters;
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ auto thd_idx = state.thread_index;
- if (state.thread_index == 0) {
+ gpr_mu_lock(&g_mu);
+ g_threads_active++;
+ if (thd_idx == 0) {
setup();
+ g_active = true;
+ gpr_cv_broadcast(&g_cv);
+ } else {
+ while (!g_active) {
+ gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
}
+ gpr_mu_unlock(&g_mu);
+
+ // Use a TrackCounters object to monitor the gRPC performance statistics
+ // (optionally including low-level counters) before and after the test
+ TrackCounters track_counters;
while (state.KeepRunning()) {
GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
@@ -152,12 +176,23 @@ static void BM_Cq_Throughput(benchmark::State& state) {
}
state.SetItemsProcessed(state.iterations());
+ track_counters.Finish(state);
- if (state.thread_index == 0) {
- teardown();
+ gpr_mu_lock(&g_mu);
+ g_threads_active--;
+ if (g_threads_active == 0) {
+ gpr_cv_broadcast(&g_cv);
+ } else {
+ while (g_threads_active > 0) {
+ gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
}
+ gpr_mu_unlock(&g_mu);
- track_counters.Finish(state);
+ if (thd_idx == 0) {
+ teardown();
+ g_active = false;
+ }
}
BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
@@ -172,6 +207,8 @@ void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
::benchmark::Initialize(&argc, argv);
::grpc::testing::InitTest(&argc, &argv, false);
benchmark::RunTheBenchmarksNamespaced();
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
index 7a95a9f17d..99de5a3e01 100644
--- a/test/cpp/thread_manager/thread_manager_test.cc
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -30,30 +30,44 @@
#include "test/cpp/util/test_config.h"
namespace grpc {
+
+struct ThreadManagerTestSettings {
+ // The min number of pollers that SHOULD be active in ThreadManager
+ int min_pollers;
+ // The max number of pollers that could be active in ThreadManager
+ int max_pollers;
+ // The sleep duration in PollForWork() function to simulate "polling"
+ int poll_duration_ms;
+ // The sleep duration in DoWork() function to simulate "work"
+ int work_duration_ms;
+ // Max number of times PollForWork() is called before shutting down
+ int max_poll_calls;
+};
+
class ThreadManagerTest final : public grpc::ThreadManager {
public:
- ThreadManagerTest()
- : ThreadManager(kMinPollers, kMaxPollers),
+ ThreadManagerTest(const char* name, grpc_resource_quota* rq,
+ const ThreadManagerTestSettings& settings)
+ : ThreadManager(name, rq, settings.min_pollers, settings.max_pollers),
+ settings_(settings),
num_do_work_(0),
num_poll_for_work_(0),
num_work_found_(0) {}
grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
- void DoWork(void* tag, bool ok) override;
- void PerformTest();
+ void DoWork(void* tag, bool ok, bool resources) override;
+
+ // Get number of times PollForWork() returned WORK_FOUND
+ int GetNumWorkFound();
+ // Get number of times DoWork() was called
+ int GetNumDoWork();
private:
void SleepForMs(int sleep_time_ms);
- static const int kMinPollers = 2;
- static const int kMaxPollers = 10;
-
- static const int kPollingTimeoutMsec = 10;
- static const int kDoWorkDurationMsec = 1;
-
- // PollForWork will return SHUTDOWN after these many number of invocations
- static const int kMaxNumPollForWork = 50;
+ ThreadManagerTestSettings settings_;
+ // Counters
gpr_atm num_do_work_; // Number of calls to DoWork
gpr_atm num_poll_for_work_; // Number of calls to PollForWork
gpr_atm num_work_found_; // Number of times WORK_FOUND was returned
@@ -69,54 +83,117 @@ void ThreadManagerTest::SleepForMs(int duration_ms) {
grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
bool* ok) {
int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1);
-
- if (call_num >= kMaxNumPollForWork) {
+ if (call_num >= settings_.max_poll_calls) {
Shutdown();
return SHUTDOWN;
}
- // Simulate "polling for work" by sleeping for sometime
- SleepForMs(kPollingTimeoutMsec);
-
+ SleepForMs(settings_.poll_duration_ms); // Simulate "polling" duration
*tag = nullptr;
*ok = true;
- // Return timeout roughly 1 out of every 3 calls
+ // Return timeout roughly 1 out of every 3 calls just to make the test a bit
+ // more interesting
if (call_num % 3 == 0) {
return TIMEOUT;
- } else {
- gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
- return WORK_FOUND;
}
+
+ gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
+ return WORK_FOUND;
}
-void ThreadManagerTest::DoWork(void* tag, bool ok) {
+void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) {
gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
- SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping
+ SleepForMs(settings_.work_duration_ms); // Simulate work by sleeping
}
-void ThreadManagerTest::PerformTest() {
- // Initialize() starts the ThreadManager
- Initialize();
-
- // Wait for all the threads to gracefully terminate
- Wait();
+int ThreadManagerTest::GetNumWorkFound() {
+ return static_cast<int>(gpr_atm_no_barrier_load(&num_work_found_));
+}
- // The number of times DoWork() was called is equal to the number of times
- // WORK_FOUND was returned
- gpr_log(GPR_DEBUG, "DoWork() called %" PRIdPTR " times",
- gpr_atm_no_barrier_load(&num_do_work_));
- GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) ==
- gpr_atm_no_barrier_load(&num_work_found_));
+int ThreadManagerTest::GetNumDoWork() {
+ return static_cast<int>(gpr_atm_no_barrier_load(&num_do_work_));
}
} // namespace grpc
+// Test that the number of times DoWork() is called is equal to the number of
+// times PollForWork() returned WORK_FOUND
+static void TestPollAndWork() {
+ grpc_resource_quota* rq = grpc_resource_quota_create("Test-poll-and-work");
+ grpc::ThreadManagerTestSettings settings = {
+ 2 /* min_pollers */, 10 /* max_pollers */, 10 /* poll_duration_ms */,
+ 1 /* work_duration_ms */, 50 /* max_poll_calls */};
+
+ grpc::ThreadManagerTest test_thread_mgr("TestThreadManager", rq, settings);
+ grpc_resource_quota_unref(rq);
+
+ test_thread_mgr.Initialize(); // Start the thread manager
+ test_thread_mgr.Wait(); // Wait for all threads to finish
+
+ // Verify that The number of times DoWork() was called is equal to the number
+ // of times WORK_FOUND was returned
+ gpr_log(GPR_DEBUG, "DoWork() called %d times",
+ test_thread_mgr.GetNumDoWork());
+ GPR_ASSERT(test_thread_mgr.GetNumDoWork() ==
+ test_thread_mgr.GetNumWorkFound());
+}
+
+static void TestThreadQuota() {
+ const int kMaxNumThreads = 3;
+ grpc_resource_quota* rq = grpc_resource_quota_create("Test-thread-quota");
+ grpc_resource_quota_set_max_threads(rq, kMaxNumThreads);
+
+ // Set work_duration_ms to be much greater than poll_duration_ms. This way,
+ // the thread manager will be forced to create more 'polling' threads to
+ // honor the min_pollers guarantee
+ grpc::ThreadManagerTestSettings settings = {
+ 1 /* min_pollers */, 1 /* max_pollers */, 1 /* poll_duration_ms */,
+ 10 /* work_duration_ms */, 50 /* max_poll_calls */};
+
+ // Create two thread managers (but with same resource quota). This means
+ // that the max number of active threads across BOTH the thread managers
+ // cannot be greater than kMaxNumthreads
+ grpc::ThreadManagerTest test_thread_mgr_1("TestThreadManager-1", rq,
+ settings);
+ grpc::ThreadManagerTest test_thread_mgr_2("TestThreadManager-2", rq,
+ settings);
+ // It is ok to unref resource quota before starting thread managers.
+ grpc_resource_quota_unref(rq);
+
+ // Start both thread managers
+ test_thread_mgr_1.Initialize();
+ test_thread_mgr_2.Initialize();
+
+ // Wait for both to finish
+ test_thread_mgr_1.Wait();
+ test_thread_mgr_2.Wait();
+
+ // Now verify that the total number of active threads in either thread manager
+ // never exceeds kMaxNumThreads
+ //
+ // NOTE: Actually the total active threads across *both* thread managers at
+ // any point of time never exceeds kMaxNumThreads but unfortunately there is
+ // no easy way to verify it (i.e we can't just do (max1 + max2 <= k))
+ // Its okay to not test this case here. The resource quota c-core tests
+ // provide enough coverage to resource quota object with multiple resource
+ // users
+ int max1 = test_thread_mgr_1.GetMaxActiveThreadsSoFar();
+ int max2 = test_thread_mgr_2.GetMaxActiveThreadsSoFar();
+ gpr_log(
+ GPR_DEBUG,
+ "MaxActiveThreads in TestThreadManager_1: %d, TestThreadManager_2: %d",
+ max1, max2);
+ GPR_ASSERT(max1 <= kMaxNumThreads && max2 <= kMaxNumThreads);
+}
+
int main(int argc, char** argv) {
std::srand(std::time(nullptr));
-
grpc::testing::InitTest(&argc, &argv, true);
- grpc::ThreadManagerTest test_rpc_manager;
- test_rpc_manager.PerformTest();
+ grpc_init();
+
+ TestPollAndWork();
+ TestThreadQuota();
+ grpc_shutdown();
return 0;
}
diff --git a/test/cpp/util/BUILD b/test/cpp/util/BUILD
index c3bfeb7615..477862a0ee 100644
--- a/test/cpp/util/BUILD
+++ b/test/cpp/util/BUILD
@@ -269,27 +269,15 @@ grpc_cc_test(
grpc_cc_binary(
name = "grpc_cli",
srcs = [
- "cli_call.cc",
- "cli_call.h",
- "cli_credentials.cc",
- "cli_credentials.h",
- "config_grpc_cli.h",
"grpc_cli.cc",
- "grpc_tool.cc",
- "grpc_tool.h",
- "proto_file_parser.cc",
- "proto_file_parser.h",
- "proto_reflection_descriptor_database.cc",
- "proto_reflection_descriptor_database.h",
- "service_describer.cc",
- "service_describer.h",
- "test_config.h",
- "test_config_cc.cc",
],
external_deps = [
"gflags",
],
deps = [
+ ":grpc_cli_libs",
+ ":grpc++_proto_reflection_desc_db",
+ ":test_config",
"//:grpc++",
"//src/proto/grpc/reflection/v1alpha:reflection_proto",
],