aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/codegen/compiler_test_golden17
-rw-r--r--test/cpp/end2end/async_end2end_test.cc135
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc11
-rw-r--r--test/cpp/end2end/end2end_test.cc22
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc17
-rw-r--r--test/cpp/end2end/thread_stress_test.cc40
-rw-r--r--test/cpp/grpclb/grpclb_test.cc4
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_hpack.cc114
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc54
-rw-r--r--test/cpp/microbenchmarks/bm_cq.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_trickle.cc22
-rw-r--r--test/cpp/microbenchmarks/fullstack_fixtures.h4
-rw-r--r--test/cpp/microbenchmarks/helpers.cc14
-rw-r--r--test/cpp/qps/BUILD12
-rw-r--r--test/cpp/qps/client.h77
-rw-r--r--test/cpp/qps/client_async.cc46
-rw-r--r--test/cpp/qps/client_sync.cc39
-rw-r--r--test/cpp/qps/driver.cc48
-rw-r--r--test/cpp/qps/driver.h2
-rwxr-xr-xtest/cpp/qps/gen_build_yaml.py18
-rw-r--r--test/cpp/qps/inproc_sync_unary_ping_pong_test.cc66
-rw-r--r--test/cpp/qps/json_run_localhost.cc12
-rw-r--r--test/cpp/qps/qps_json_driver.cc7
-rw-r--r--test/cpp/qps/qps_openloop_test.cc6
-rw-r--r--test/cpp/qps/qps_worker.cc21
-rw-r--r--test/cpp/qps/qps_worker.h14
-rw-r--r--test/cpp/qps/secure_sync_unary_ping_pong_test.cc6
-rw-r--r--test/cpp/qps/server.h8
-rw-r--r--test/cpp/qps/server_async.cc86
-rw-r--r--test/cpp/qps/server_sync.cc20
-rw-r--r--test/cpp/qps/worker.cc3
-rw-r--r--test/cpp/util/create_test_channel.cc4
-rw-r--r--test/cpp/util/error_details_test.cc20
33 files changed, 700 insertions, 271 deletions
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index 3d664e8825..026a94112a 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -39,7 +39,6 @@
namespace grpc {
class CompletionQueue;
class Channel;
-class RpcService;
class ServerCompletionQueue;
class ServerContext;
} // namespace grpc
@@ -169,10 +168,10 @@ class ServiceA final {
::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override;
::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override;
- const ::grpc::RpcMethod rpcmethod_MethodA1_;
- const ::grpc::RpcMethod rpcmethod_MethodA2_;
- const ::grpc::RpcMethod rpcmethod_MethodA3_;
- const ::grpc::RpcMethod rpcmethod_MethodA4_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA1_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA2_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA3_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA4_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
@@ -352,7 +351,7 @@ class ServiceA final {
public:
WithStreamedUnaryMethod_MethodA1() {
::grpc::Service::MarkMethodStreamed(0,
- new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_MethodA1() override {
BaseClassMustBeDerivedFromService(this);
@@ -373,7 +372,7 @@ class ServiceA final {
public:
WithSplitStreamingMethod_MethodA3() {
::grpc::Service::MarkMethodStreamed(2,
- new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithSplitStreamingMethod_MethodA3() override {
BaseClassMustBeDerivedFromService(this);
@@ -427,7 +426,7 @@ class ServiceB final {
std::shared_ptr< ::grpc::ChannelInterface> channel_;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
- const ::grpc::RpcMethod rpcmethod_MethodB1_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodB1_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
@@ -484,7 +483,7 @@ class ServiceB final {
public:
WithStreamedUnaryMethod_MethodB1() {
::grpc::Service::MarkMethodStreamed(0,
- new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_MethodB1() override {
BaseClassMustBeDerivedFromService(this);
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 2a33e8ae11..cf1cc7e486 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -28,12 +28,14 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/tls.h>
#include "src/core/lib/iomgr/port.h"
+#include "src/core/lib/support/env.h"
#include "src/proto/grpc/health/v1/health.grpc.pb.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@@ -99,7 +101,7 @@ class PollingOverrider {
class Verifier {
public:
- explicit Verifier(bool spin) : spin_(spin) {}
+ explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {}
// Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) {
return ExpectUnless(i, expect_ok, false);
@@ -142,6 +144,18 @@ class Verifier {
return detag(got_tag);
}
+ template <typename T>
+ CompletionQueue::NextStatus DoOnceThenAsyncNext(
+ CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
+ std::function<void(void)> lambda) {
+ if (lambda_run_) {
+ return cq->AsyncNext(got_tag, ok, deadline);
+ } else {
+ lambda_run_ = true;
+ return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
+ }
+ }
+
// Verify keeps calling Next until all currently set
// expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); }
@@ -154,6 +168,7 @@ class Verifier {
Next(cq, ignore_ok);
}
}
+
// This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) {
@@ -193,6 +208,47 @@ class Verifier {
}
}
+ // This version of Verify stops after a certain deadline, and uses the
+ // DoThenAsyncNext API
+ // to call the lambda
+ void Verify(CompletionQueue* cq,
+ std::chrono::system_clock::time_point deadline,
+ std::function<void(void)> lambda) {
+ if (expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ while (std::chrono::system_clock::now() < deadline) {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ while (!expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ for (;;) {
+ GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+ auto r = DoOnceThenAsyncNext(
+ cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda);
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::GOT_EVENT);
+ }
+ GotTag(got_tag, ok, false);
+ }
+ }
+ }
+
private:
void GotTag(void* got_tag, bool ok, bool ignore_ok) {
auto it = expectations_.find(got_tag);
@@ -226,6 +282,7 @@ class Verifier {
std::map<void*, bool> expectations_;
std::map<void*, MaybeExpect> maybe_expectations_;
bool spin_;
+ bool lambda_run_;
};
bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
@@ -404,6 +461,14 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) {
if (GetParam().inproc) {
return;
}
+ int poller_slowdown_factor = 1;
+ // It needs 2 pollset_works to reconnect the channel with polling engine
+ // "poll"
+ char* s = gpr_getenv("GRPC_POLL_STRATEGY");
+ if (s != NULL && 0 == strcmp(s, "poll")) {
+ poller_slowdown_factor = 2;
+ }
+ gpr_free(s);
ResetStub();
SendRpc(1);
server_->Shutdown();
@@ -413,10 +478,13 @@ TEST_P(AsyncEnd2endTest, ReconnectChannel) {
while (cq_->Next(&ignored_tag, &ignored_ok))
;
BuildAndStartServer();
- // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
- // channel.
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(1600, GPR_TIMESPAN)));
+ // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
+ // reconnect the channel.
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(
+ 300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
+ GPR_TIMESPAN)));
SendRpc(1);
}
@@ -490,6 +558,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
EXPECT_TRUE(recv_status.ok());
}
+// Test a simple RPC using the async version of Next
+TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+
+ std::chrono::system_clock::time_point time_now(
+ std::chrono::system_clock::now());
+ std::chrono::system_clock::time_point time_limit(
+ std::chrono::system_clock::now() + std::chrono::seconds(10));
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+
+ auto resp_writer_ptr = &response_writer;
+ auto lambda_2 = [&, this, resp_writer_ptr]() {
+ gpr_log(GPR_ERROR, "CALLED");
+ service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
+ cq_.get(), tag(2));
+ };
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Verify(cq_.get(), time_limit, lambda_2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ auto recv_resp_ptr = &recv_response;
+ auto status_ptr = &recv_status;
+ send_response.set_message(recv_request.message());
+ auto lambda_3 = [&, this, resp_writer_ptr, send_response]() {
+ resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
+ };
+ response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get(), std::chrono::system_clock::time_point::max(),
+ lambda_3);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Two pings and a final pong.
TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();
@@ -1890,6 +2012,9 @@ INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
} // namespace grpc
int main(int argc, char** argv) {
+ // Change the backup poll interval from 5s to 200ms to speed up the
+ // ReconnectChannel test
+ gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200");
grpc_test_init(argc, argv);
gpr_tls_init(&g_is_async_end2end_test);
::testing::InitGoogleTest(&argc, argv);
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index c236f76e89..fc87badc31 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -36,6 +36,7 @@
extern "C" {
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/ext/filters/client_channel/subchannel_index.h"
+#include "src/core/lib/support/env.h"
}
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@@ -86,7 +87,11 @@ class MyTestServiceImpl : public TestServiceImpl {
class ClientLbEnd2endTest : public ::testing::Test {
protected:
ClientLbEnd2endTest()
- : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {}
+ : server_host_("localhost"), kRequestMessage_("Live long and prosper.") {
+ // Make the backup poller poll very frequently in order to pick up
+ // updates from all the subchannels's FDs.
+ gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1");
+ }
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -305,7 +310,7 @@ TEST_F(ClientLbEnd2endTest, PickFirstUpdates) {
ports.clear();
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** SET none *******");
- grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
+ grpc_connectivity_state channel_state;
do {
channel_state = channel_->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
@@ -481,7 +486,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinUpdates) {
// An empty update will result in the channel going into TRANSIENT_FAILURE.
ports.clear();
SetNextResolution(ports);
- grpc_connectivity_state channel_state = GRPC_CHANNEL_INIT;
+ grpc_connectivity_state channel_state;
do {
channel_state = channel_->GetState(true /* try to connect */);
} while (channel_state == GRPC_CHANNEL_READY);
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 1aa547d4e3..82ca39466e 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -30,11 +30,13 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "src/core/lib/security/credentials/credentials.h"
+#include "src/core/lib/support/env.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@@ -704,13 +706,25 @@ TEST_P(End2endTest, ReconnectChannel) {
if (GetParam().inproc) {
return;
}
+ gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200");
+ int poller_slowdown_factor = 1;
+ // It needs 2 pollset_works to reconnect the channel with polling engine
+ // "poll"
+ char* s = gpr_getenv("GRPC_POLL_STRATEGY");
+ if (s != NULL && 0 == strcmp(s, "poll")) {
+ poller_slowdown_factor = 2;
+ }
+ gpr_free(s);
ResetStub();
SendRpc(stub_.get(), 1, false);
RestartServer(std::shared_ptr<AuthMetadataProcessor>());
- // It needs more than kConnectivityCheckIntervalMsec time to reconnect the
- // channel.
- gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_millis(1600, GPR_TIMESPAN)));
+ // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
+ // reconnect the channel.
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(
+ 300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
+ GPR_TIMESPAN)));
SendRpc(stub_.get(), 1, false);
}
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index f73a9c1791..b9ee77d7ff 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -36,6 +36,7 @@
extern "C" {
#include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/support/env.h"
}
#include "test/core/util/port.h"
@@ -74,9 +75,9 @@ extern "C" {
using std::chrono::system_clock;
+using grpc::lb::v1::LoadBalancer;
using grpc::lb::v1::LoadBalanceRequest;
using grpc::lb::v1::LoadBalanceResponse;
-using grpc::lb::v1::LoadBalancer;
namespace grpc {
namespace testing {
@@ -332,8 +333,11 @@ class GrpclbEnd2endTest : public ::testing::Test {
num_backends_(num_backends),
num_balancers_(num_balancers),
client_load_reporting_interval_seconds_(
- client_load_reporting_interval_seconds),
- kRequestMessage_("Live long and prosper.") {}
+ client_load_reporting_interval_seconds) {
+ // Make the backup poller poll very frequently in order to pick up
+ // updates from all the subchannels's FDs.
+ gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1");
+ }
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -559,7 +563,6 @@ class GrpclbEnd2endTest : public ::testing::Test {
std::unique_ptr<std::thread> thread_;
};
- const grpc::string kMessage_ = "Live long and prosper.";
const grpc::string server_host_;
const size_t num_backends_;
const size_t num_balancers_;
@@ -571,7 +574,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
std::vector<ServerThread<BackendService>> backend_servers_;
std::vector<ServerThread<BalancerService>> balancer_servers_;
grpc_fake_resolver_response_generator* response_generator_;
- const grpc::string kRequestMessage_;
+ const grpc::string kRequestMessage_ = "Live long and prosper.";
};
class SingleBalancerTest : public GrpclbEnd2endTest {
@@ -1086,7 +1089,7 @@ TEST_F(SingleBalancerTest, Drop) {
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
+ EXPECT_EQ(response.message(), kRequestMessage_);
}
}
EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
@@ -1210,7 +1213,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
+ EXPECT_EQ(response.message(), kRequestMessage_);
}
}
EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index f990a7ed9d..90b2eddbbb 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -50,23 +50,6 @@ const int kNumRpcs = 1000; // Number of RPCs per thread
namespace grpc {
namespace testing {
-namespace {
-
-// When echo_deadline is requested, deadline seen in the ServerContext is set in
-// the response in seconds.
-void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) {
- if (request->has_param() && request->param().echo_deadline()) {
- gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- if (context->deadline() != system_clock::time_point::max()) {
- Timepoint2Timespec(context->deadline(), &deadline);
- }
- response->mutable_param()->set_request_deadline(deadline.tv_sec);
- }
-}
-
-} // namespace
-
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
TestServiceImpl() : signal_client_(false) {}
@@ -74,29 +57,6 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
response->set_message(request->message());
- MaybeEchoDeadline(context, request, response);
- if (request->has_param() && request->param().client_cancel_after_us()) {
- {
- std::unique_lock<std::mutex> lock(mu_);
- signal_client_ = true;
- }
- while (!context->IsCancelled()) {
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().client_cancel_after_us(),
- GPR_TIMESPAN)));
- }
- return Status::CANCELLED;
- } else if (request->has_param() &&
- request->param().server_cancel_after_us()) {
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().server_cancel_after_us(),
- GPR_TIMESPAN)));
- return Status::CANCELLED;
- } else {
- EXPECT_FALSE(context->IsCancelled());
- }
return Status::OK;
}
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index e740ea513a..522bc9e0d6 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -42,6 +42,7 @@ extern "C" {
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/support/tmpfile.h"
#include "src/core/lib/surface/channel.h"
@@ -790,6 +791,9 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_test_init(argc, argv);
+ // Make the backup poller poll very frequently in order to pick up
+ // updates from all the subchannels's FDs.
+ gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "1");
grpc_init();
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
index 5428cc47e7..bc2157b9f1 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
@@ -28,6 +28,7 @@ extern "C" {
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/static_metadata.h"
+#include "src/core/lib/transport/timeout_encoding.h"
}
#include "test/cpp/microbenchmarks/helpers.h"
#include "third_party/benchmark/include/benchmark/benchmark.h"
@@ -441,7 +442,7 @@ static void UnrefHeader(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_MDELEM_UNREF(exec_ctx, md);
}
-template <class Fixture>
+template <class Fixture, void (*OnHeader)(grpc_exec_ctx *, void *, grpc_mdelem)>
static void BM_HpackParserParseHeader(benchmark::State &state) {
TrackCounters track_counters;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -449,7 +450,7 @@ static void BM_HpackParserParseHeader(benchmark::State &state) {
std::vector<grpc_slice> benchmark_slices = Fixture::GetBenchmarkSlices();
grpc_chttp2_hpack_parser p;
grpc_chttp2_hpack_parser_init(&exec_ctx, &p);
- p.on_header = UnrefHeader;
+ p.on_header = OnHeader;
p.on_header_user_data = nullptr;
for (auto slice : init_slices) {
GPR_ASSERT(GRPC_ERROR_NONE ==
@@ -759,32 +760,97 @@ class RepresentativeServerTrailingMetadata {
}
};
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>);
+static void free_timeout(void *p) { gpr_free(p); }
+
+// New implementation.
+static void OnHeaderNew(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_mdelem md) {
+ if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) {
+ grpc_millis *cached_timeout =
+ static_cast<grpc_millis *>(grpc_mdelem_get_user_data(md, free_timeout));
+ grpc_millis timeout;
+ if (cached_timeout != NULL) {
+ timeout = *cached_timeout;
+ } else {
+ if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), &timeout)) {
+ char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val);
+ gpr_free(val);
+ timeout = GRPC_MILLIS_INF_FUTURE;
+ }
+ if (GRPC_MDELEM_IS_INTERNED(md)) {
+ /* not already parsed: parse it now, and store the
+ * result away */
+ cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis));
+ *cached_timeout = timeout;
+ grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
+ }
+ }
+ benchmark::DoNotOptimize(timeout);
+ GRPC_MDELEM_UNREF(exec_ctx, md);
+ } else {
+ GPR_ASSERT(0);
+ }
+}
+
+// Send the same deadline repeatedly
+class SameDeadline {
+ public:
+ static std::vector<grpc_slice> GetInitSlices() {
+ return {
+ grpc_slice_from_static_string("@\x0cgrpc-timeout\x03"
+ "30S")};
+ }
+ static std::vector<grpc_slice> GetBenchmarkSlices() {
+ // Use saved key and literal value.
+ return {MakeSlice({0x0f, 0x2f, 0x03, '3', '0', 'S'})};
+ }
+};
+
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch, UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem, UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>,
+ UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- RepresentativeClientInitialMetadata);
+ RepresentativeClientInitialMetadata, UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- MoreRepresentativeClientInitialMetadata);
+ MoreRepresentativeClientInitialMetadata, UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- RepresentativeServerInitialMetadata);
+ RepresentativeServerInitialMetadata, UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- RepresentativeServerTrailingMetadata);
+ RepresentativeServerTrailingMetadata, UnrefHeader);
+
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderNew);
} // namespace hpack_parser_fixtures
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 8ee3ae7268..e9f537faa4 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -44,10 +44,16 @@ auto &force_library_initialization = Library::get();
class DummyEndpoint : public grpc_endpoint {
public:
DummyEndpoint() {
- static const grpc_endpoint_vtable my_vtable = {
- read, write, add_to_pollset, add_to_pollset_set,
- shutdown, destroy, get_resource_user, get_peer,
- get_fd};
+ static const grpc_endpoint_vtable my_vtable = {read,
+ write,
+ add_to_pollset,
+ add_to_pollset_set,
+ delete_from_pollset_set,
+ shutdown,
+ destroy,
+ get_resource_user,
+ get_peer,
+ get_fd};
grpc_endpoint::vtable = &my_vtable;
ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint");
}
@@ -102,6 +108,10 @@ class DummyEndpoint : public grpc_endpoint {
static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
+ static void delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep,
+ grpc_pollset_set *pollset) {}
+
static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_resource_user_shutdown(exec_ctx,
@@ -428,9 +438,8 @@ static void BM_TransportStreamSend(benchmark::State &state) {
return;
}
// force outgoing window to be yuge
- s->chttp2_stream()->flow_control.remote_window_delta =
- 1024 * 1024 * 1024;
- f.chttp2_transport()->flow_control.remote_window = 1024 * 1024 * 1024;
+ s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
+ f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
reset_op();
op.on_complete = c.get();
@@ -560,22 +569,21 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
std::unique_ptr<Closure> drain_continue;
grpc_slice recv_slice;
- std::unique_ptr<Closure> c = MakeClosure([&](grpc_exec_ctx *exec_ctx,
- grpc_error *error) {
- if (!state.KeepRunning()) return;
- // force outgoing window to be yuge
- s.chttp2_stream()->flow_control.local_window_delta = 1024 * 1024 * 1024;
- s.chttp2_stream()->flow_control.announced_window_delta = 1024 * 1024 * 1024;
- f.chttp2_transport()->flow_control.announced_window = 1024 * 1024 * 1024;
- received = 0;
- reset_op();
- op.on_complete = do_nothing.get();
- op.recv_message = true;
- op.payload->recv_message.recv_message = &recv_stream;
- op.payload->recv_message.recv_message_ready = drain_start.get();
- s.Op(exec_ctx, &op);
- f.PushInput(grpc_slice_ref(incoming_data));
- });
+ std::unique_ptr<Closure> c =
+ MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
+ if (!state.KeepRunning()) return;
+ // force outgoing window to be yuge
+ s.chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
+ f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
+ received = 0;
+ reset_op();
+ op.on_complete = do_nothing.get();
+ op.recv_message = true;
+ op.payload->recv_message.recv_message = &recv_stream;
+ op.payload->recv_message.recv_message_ready = drain_start.get();
+ s.Op(exec_ctx, &op);
+ f.PushInput(grpc_slice_ref(incoming_data));
+ });
drain_start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (recv_stream == NULL) {
diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc
index a0c0414f2f..020ec0461c 100644
--- a/test/cpp/microbenchmarks/bm_cq.cc
+++ b/test/cpp/microbenchmarks/bm_cq.cc
@@ -70,7 +70,7 @@ BENCHMARK(BM_CreateDestroyCore);
static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg,
grpc_cq_completion* completion) {}
-class DummyTag final : public CompletionQueueTag {
+class DummyTag final : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) override { return true; }
};
diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
index 06ae342985..25d243a104 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
@@ -21,6 +21,7 @@
#include <benchmark/benchmark.h>
#include <gflags/gflags.h>
#include <fstream>
+
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/iomgr/timer_manager.h"
@@ -142,15 +143,18 @@ class TrickledCHTTP2 : public EndpointPairFixture {
client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
- client->flow_control.remote_window, server->flow_control.remote_window,
- client->flow_control.announced_window,
- server->flow_control.announced_window,
- client_stream ? client_stream->flow_control.remote_window_delta : -1,
- server_stream ? server_stream->flow_control.remote_window_delta : -1,
- client_stream ? client_stream->flow_control.local_window_delta : -1,
- server_stream ? server_stream->flow_control.local_window_delta : -1,
- client_stream ? client_stream->flow_control.announced_window_delta : -1,
- server_stream ? server_stream->flow_control.announced_window_delta : -1,
+ client->flow_control->remote_window_,
+ server->flow_control->remote_window_,
+ client->flow_control->announced_window_,
+ server->flow_control->announced_window_,
+ client_stream ? client_stream->flow_control->remote_window_delta_ : -1,
+ server_stream ? server_stream->flow_control->remote_window_delta_ : -1,
+ client_stream ? client_stream->flow_control->local_window_delta_ : -1,
+ server_stream ? server_stream->flow_control->local_window_delta_ : -1,
+ client_stream ? client_stream->flow_control->announced_window_delta_
+ : -1,
+ server_stream ? server_stream->flow_control->announced_window_delta_
+ : -1,
client->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
client->settings[GRPC_LOCAL_SETTINGS]
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index a7f8504505..9d345a909b 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -25,6 +25,7 @@
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
+#include <grpc/support/atm.h>
#include <grpc/support/log.h>
extern "C" {
@@ -259,7 +260,8 @@ class InProcessCHTTP2 : public EndpointPairFixture {
void AddToLabel(std::ostream& out, benchmark::State& state) {
EndpointPairFixture::AddToLabel(out, state);
out << " writes/iter:"
- << (double)stats_.num_writes / (double)state.iterations();
+ << static_cast<double>(gpr_atm_no_barrier_load(&stats_.num_writes)) /
+ static_cast<double>(state.iterations());
}
private:
diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc
index 6802a0aa99..782f12e99a 100644
--- a/test/cpp/microbenchmarks/helpers.cc
+++ b/test/cpp/microbenchmarks/helpers.cc
@@ -16,6 +16,8 @@
*
*/
+#include <string.h>
+
#include "test/cpp/microbenchmarks/helpers.h"
void TrackCounters::Finish(benchmark::State &state) {
@@ -45,10 +47,14 @@ void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) {
<< "/iter:" << ((double)stats.counters[i] / (double)state.iterations());
}
for (int i = 0; i < GRPC_STATS_HISTOGRAM_COUNT; i++) {
- out << " " << grpc_stats_histogram_name[i] << "-median:"
- << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 50.0)
- << " " << grpc_stats_histogram_name[i] << "-99p:"
- << grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0);
+ std::ostringstream median_ss;
+ median_ss << grpc_stats_histogram_name[i] << "-median";
+ state.counters[median_ss.str()] = benchmark::Counter(
+ grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 50.0));
+ std::ostringstream tail_ss;
+ tail_ss << grpc_stats_histogram_name[i] << "-99p";
+ state.counters[tail_ss.str()] = benchmark::Counter(
+ grpc_stats_histo_percentile(&stats, (grpc_stats_histograms)i, 99.0));
}
#ifdef GPR_LOW_LEVEL_COUNTERS
grpc_memory_counters counters_at_end = grpc_memory_counters_snapshot();
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD
index 3352269517..0d91d52f22 100644
--- a/test/cpp/qps/BUILD
+++ b/test/cpp/qps/BUILD
@@ -109,6 +109,18 @@ grpc_cc_library(
deps = ["//:gpr"],
)
+grpc_cc_test(
+ name = "inproc_sync_unary_ping_pong_test",
+ srcs = ["inproc_sync_unary_ping_pong_test.cc"],
+ deps = [
+ ":benchmark_config",
+ ":driver_impl",
+ "//:grpc++",
+ "//test/cpp/util:test_config",
+ "//test/cpp/util:test_util",
+ ],
+)
+
grpc_cc_library(
name = "interarrival",
hdrs = ["interarrival.h"],
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index abf755b393..82c6361abd 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -37,10 +37,14 @@
#include "src/cpp/util/core_stats.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
+#include "test/cpp/qps/qps_worker.h"
+#include "test/cpp/qps/server.h"
#include "test/cpp/qps/usage_timer.h"
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/util/test_credentials_provider.h"
+#define INPROC_NAME_PREFIX "qpsinproc:"
+
namespace grpc {
namespace testing {
@@ -226,8 +230,6 @@ class Client {
}
virtual void DestroyMultithreading() = 0;
- virtual void InitThreadFunc(size_t thread_idx) = 0;
- virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -275,7 +277,6 @@ class Client {
: std::bind(&Client::NextIssueTime, this, thread_idx);
}
- private:
class Thread {
public:
Thread(Client* client, size_t idx)
@@ -295,6 +296,16 @@ class Client {
MergeStatusHistogram(statuses_, s);
}
+ void UpdateHistogram(HistogramEntry* entry) {
+ std::lock_guard<std::mutex> g(mu_);
+ if (entry->value_used()) {
+ histogram_.Add(entry->value());
+ }
+ if (entry->status_used()) {
+ statuses_[entry->status()]++;
+ }
+ }
+
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
@@ -310,29 +321,8 @@ class Client {
wait_loop++;
}
- client_->InitThreadFunc(idx_);
-
- for (;;) {
- // run the loop body
- HistogramEntry entry;
- const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
- // lock, update histogram if needed and see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (entry.value_used()) {
- histogram_.Add(entry.value());
- }
- if (entry.status_used()) {
- statuses_[entry.status()]++;
- }
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- }
- if (!thread_still_ok ||
- static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) {
- client_->CompleteThread();
- return;
- }
- }
+ client_->ThreadFunc(idx_, this);
+ client_->CompleteThread();
}
std::mutex mu_;
@@ -343,6 +333,12 @@ class Client {
std::thread impl_;
};
+ bool ThreadCompleted() {
+ return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
+ }
+
+ virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
+
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<UsageTimer> timer_;
@@ -422,11 +418,21 @@ class ClientImpl : public Client {
type = config.security_params().cred_type();
}
- channel_ = CreateTestChannel(
- target, type, config.security_params().server_host_override(),
- !config.security_params().use_test_ca(),
- std::shared_ptr<CallCredentials>(), args);
- gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
+ grpc::string inproc_pfx(INPROC_NAME_PREFIX);
+ if (target.find(inproc_pfx) != 0) {
+ channel_ = CreateTestChannel(
+ target, type, config.security_params().server_host_override(),
+ !config.security_params().use_test_ca(),
+ std::shared_ptr<CallCredentials>(), args);
+ gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
+ is_inproc_ = false;
+ } else {
+ grpc::string tgt = target;
+ tgt.erase(0, inproc_pfx.length());
+ int srv_num = std::stoi(tgt);
+ channel_ = (*g_inproc_servers)[srv_num]->InProcessChannel(args);
+ is_inproc_ = true;
+ }
stub_ = create_stub(channel_);
}
Channel* get_channel() { return channel_.get(); }
@@ -434,9 +440,11 @@ class ClientImpl : public Client {
std::unique_ptr<std::thread> WaitForReady() {
return std::unique_ptr<std::thread>(new std::thread([this]() {
- GPR_ASSERT(channel_->WaitForConnected(
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(10, GPR_TIMESPAN))));
+ if (!is_inproc_) {
+ GPR_ASSERT(channel_->WaitForConnected(
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(10, GPR_TIMESPAN))));
+ }
}));
}
@@ -455,6 +463,7 @@ class ClientImpl : public Client {
std::shared_ptr<Channel> channel_;
std::unique_ptr<StubType> stub_;
+ bool is_inproc_;
};
std::vector<ClientChannelInfo> channels_;
std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)>
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ed4e0b355..a541f94fa5 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -236,32 +236,46 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
this->EndThreads(); // this needed for resolution
}
- void InitThreadFunc(size_t thread_idx) override final {}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final {
+ void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
void* got_tag;
bool ok;
- if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ HistogramEntry entry;
+ HistogramEntry* entry_ptr = &entry;
+ if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
+ shutdown_mu->lock();
+ while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, entry_ptr, shutdown_mu]() {
+ if (!ctx->RunNextState(ok, entry_ptr)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
+ t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ shutdown_mu->lock();
if (shutdown_state_[thread_idx]->shutdown) {
ctx->TryCancel();
delete ctx;
- return true;
- }
- if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- delete ctx;
+ while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ ctx = ClientRpcContext::detag(got_tag);
+ ctx->TryCancel();
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ return;
}
- return true;
- } else {
- // queue is shutting down, so we must be done
- return true;
}
}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 94554a46b2..9f20b148eb 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -62,6 +62,25 @@ class SynchronousClient
virtual ~SynchronousClient(){};
+ virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+ virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
+
+ void ThreadFunc(size_t thread_idx, Thread* t) override {
+ InitThreadFuncImpl(thread_idx);
+ for (;;) {
+ // run the loop body
+ HistogramEntry entry;
+ const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
+ t->UpdateHistogram(&entry);
+ if (!thread_still_ok) {
+ gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+ }
+ if (!thread_still_ok || ThreadCompleted()) {
+ return;
+ }
+ }
+ }
+
protected:
// WaitToIssue returns false if we realize that we need to break out
bool WaitToIssue(int thread_idx) {
@@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
- void InitThreadFunc(size_t thread_idx) override {}
+ void InitThreadFuncImpl(size_t thread_idx) override {}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
}
@@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
messages_issued_[thread_idx] = 0;
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
}
@@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
&responses_[thread_idx]);
last_issue_[thread_idx] = UsageTimer::Now();
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// Figure out how to make histogram sensible if this is rate-paced
if (!WaitToIssue(thread_idx)) {
return true;
@@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] =
stub->StreamingFromServer(&context_[thread_idx], request_);
last_recv_[thread_idx] = UsageTimer::Now();
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
double now = UsageTimer::Now();
@@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;
}
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 4458e389e7..5504c0b96a 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -36,6 +36,7 @@
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/qps/client.h"
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
@@ -63,11 +64,11 @@ static std::string get_host(const std::string& worker) {
}
static deque<string> get_workers(const string& env_name) {
+ deque<string> out;
char* env = gpr_getenv(env_name.c_str());
if (!env) {
env = gpr_strdup("");
}
- deque<string> out;
char* p = env;
if (strlen(env) != 0) {
for (;;) {
@@ -187,12 +188,17 @@ static void postprocess_scenario_result(ScenarioResult* result) {
client_queries_per_cpu_sec);
}
+std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
+
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& initial_server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
- const grpc::string& credential_type) {
+ const grpc::string& credential_type, bool run_inproc) {
+ if (run_inproc) {
+ g_inproc_servers = new std::vector<grpc::testing::Server*>;
+ }
// Log everything from the driver
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
@@ -210,8 +216,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
ClientConfig result_client_config;
const ServerConfig result_server_config = initial_server_config;
- // Get client, server lists
- auto workers = get_workers("QPS_WORKERS");
+ // Get client, server lists; ignore if inproc test
+ auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
ClientConfig client_config = initial_client_config;
// Spawn some local workers if desired
@@ -227,9 +233,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
called_init = true;
}
- int driver_port = grpc_pick_unused_port_or_die();
- local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
char addr[256];
+ // we use port # of -1 to indicate inproc
+ int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
+ local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {
workers.push_front(addr);
@@ -265,9 +272,14 @@ std::unique_ptr<ScenarioResult> RunScenario(
for (size_t i = 0; i < num_servers; i++) {
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
workers[i].c_str(), i);
- servers[i].stub = WorkerService::NewStub(CreateChannel(
- workers[i], GetCredentialsProvider()->GetChannelCredentials(
- credential_type, &channel_args)));
+ if (!run_inproc) {
+ servers[i].stub = WorkerService::NewStub(CreateChannel(
+ workers[i], GetCredentialsProvider()->GetChannelCredentials(
+ credential_type, &channel_args)));
+ } else {
+ servers[i].stub = WorkerService::NewStub(
+ local_workers[i]->InProcessChannel(channel_args));
+ }
ServerConfig server_config = initial_server_config;
if (server_config.core_limit() != 0) {
@@ -289,6 +301,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
// overriding the qps server target only works if there is 1 server
GPR_ASSERT(num_servers == 1);
client_config.add_server_targets(qps_server_target_override);
+ } else if (run_inproc) {
+ std::string cli_target(INPROC_NAME_PREFIX);
+ cli_target += std::to_string(i);
+ client_config.add_server_targets(cli_target);
} else {
std::string host;
char* cli_target;
@@ -312,9 +328,14 @@ std::unique_ptr<ScenarioResult> RunScenario(
const auto& worker = workers[i + num_servers];
gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
worker.c_str(), i + num_servers);
- clients[i].stub = WorkerService::NewStub(
- CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials(
- credential_type, &channel_args)));
+ if (!run_inproc) {
+ clients[i].stub = WorkerService::NewStub(
+ CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials(
+ credential_type, &channel_args)));
+ } else {
+ clients[i].stub = WorkerService::NewStub(
+ local_workers[i + num_servers]->InProcessChannel(channel_args));
+ }
ClientConfig per_client_config = client_config;
if (initial_client_config.core_limit() != 0) {
@@ -495,6 +516,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
}
+ if (g_inproc_servers != nullptr) {
+ delete g_inproc_servers;
+ }
postprocess_scenario_result(result.get());
return result;
}
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 29f2776d79..fede4d8045 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -32,7 +32,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
- const grpc::string& credential_type);
+ const grpc::string& credential_type, bool run_inproc);
bool RunQuit(const grpc::string& credential_type);
} // namespace testing
diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py
index 65553f57f1..1ef8f65b0b 100755
--- a/test/cpp/qps/gen_build_yaml.py
+++ b/test/cpp/qps/gen_build_yaml.py
@@ -85,6 +85,24 @@ print yaml.dump({
if 'scalable' in scenario_json.get('CATEGORIES', [])
] + [
{
+ 'name': 'qps_json_driver',
+ 'shortname': 'qps_json_driver:inproc_%s' % scenario_json['name'],
+ 'args': ['--run_inproc', '--scenarios_json', _scenario_json_string(scenario_json, False)],
+ 'ci_platforms': ['linux'],
+ 'platforms': ['linux'],
+ 'flaky': False,
+ 'language': 'c++',
+ 'boringssl': True,
+ 'defaults': 'boringssl',
+ 'cpu_cost': guess_cpu(scenario_json, False),
+ 'exclude_configs': ['tsan', 'asan'],
+ 'timeout_seconds': 6*60,
+ 'excluded_poll_engines': scenario_json.get('EXCLUDED_POLL_ENGINES', [])
+ }
+ for scenario_json in scenario_config.CXXLanguage().scenarios()
+ if 'inproc' in scenario_json.get('CATEGORIES', [])
+ ] + [
+ {
'name': 'json_run_localhost',
'shortname': 'json_run_localhost:%s_low_thread_count' % scenario_json['name'],
'args': ['--scenarios_json', _scenario_json_string(scenario_json, True)],
diff --git a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
new file mode 100644
index 0000000000..f2e977d48b
--- /dev/null
+++ b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <set>
+
+#include <grpc/support/log.h>
+
+#include "test/cpp/qps/benchmark_config.h"
+#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/report.h"
+#include "test/cpp/qps/server.h"
+#include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
+
+namespace grpc {
+namespace testing {
+
+static const int WARMUP = 5;
+static const int BENCHMARK = 5;
+
+static void RunSynchronousUnaryPingPong() {
+ gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
+
+ ClientConfig client_config;
+ client_config.set_client_type(SYNC_CLIENT);
+ client_config.set_outstanding_rpcs_per_channel(1);
+ client_config.set_client_channels(1);
+ client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
+
+ ServerConfig server_config;
+ server_config.set_server_type(SYNC_SERVER);
+
+ const auto result =
+ RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
+ kInsecureCredentialsType, true);
+
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportLatency(*result);
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc::testing::InitTest(&argc, &argv, true);
+
+ grpc::testing::RunSynchronousUnaryPingPong();
+
+ return 0;
+}
diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc
index 1d394b216f..4b788eae73 100644
--- a/test/cpp/qps/json_run_localhost.cc
+++ b/test/cpp/qps/json_run_localhost.cc
@@ -117,8 +117,14 @@ int main(int argc, char** argv) {
}
}
- delete g_driver;
- g_driver = NULL;
- for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i];
+ if (g_driver != nullptr) {
+ delete g_driver;
+ }
+ g_driver = nullptr;
+ for (int i = 0; i < kNumWorkers; ++i) {
+ if (g_workers[i] != nullptr) {
+ delete g_workers[i];
+ }
+ }
GPR_ASSERT(driver_join_status == 0);
}
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index cca59f64d8..1672527426 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -30,6 +30,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/parse_json.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/qps/server.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/test_credentials_provider.h"
@@ -64,6 +65,7 @@ DEFINE_string(json_file_out, "", "File to write the JSON output to.");
DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
"Credential type for communication with workers");
+DEFINE_bool(run_inproc, false, "Perform an in-process transport test");
namespace grpc {
namespace testing {
@@ -75,8 +77,9 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
RunScenario(scenario.client_config(), scenario.num_clients(),
scenario.server_config(), scenario.num_servers(),
scenario.warmup_seconds(), scenario.benchmark_seconds(),
- scenario.spawn_local_worker_count(),
- FLAGS_qps_server_target_override, FLAGS_credential_type);
+ !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2,
+ FLAGS_qps_server_target_override, FLAGS_credential_type,
+ FLAGS_run_inproc);
// Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here.
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index 069b3fa076..df929b9811 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -24,6 +24,7 @@
#include "test/cpp/qps/benchmark_config.h"
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/qps/server.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/test_credentials_provider.h"
@@ -49,8 +50,9 @@ static void RunQPS() {
server_config.set_server_type(ASYNC_SERVER);
server_config.set_async_server_threads(8);
- const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
- BENCHMARK, -2, "", kInsecureCredentialsType);
+ const auto result =
+ RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
+ kInsecureCredentialsType, false);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index d20bc1b074..c288b03ec5 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -225,11 +225,14 @@ class WorkerServiceImpl final : public WorkerService::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
}
- if (server_port_ != 0) {
+ if (server_port_ > 0) {
args.mutable_setup()->set_port(server_port_);
}
gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
+ if (g_inproc_servers != nullptr) {
+ g_inproc_servers->push_back(server.get());
+ }
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
}
@@ -269,17 +272,17 @@ QpsWorker::QpsWorker(int driver_port, int server_port,
impl_.reset(new WorkerServiceImpl(server_port, this));
gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
- char* server_address = NULL;
- gpr_join_host_port(&server_address, "::", driver_port);
-
ServerBuilder builder;
- builder.AddListeningPort(
- server_address,
- GetCredentialsProvider()->GetServerCredentials(credential_type));
+ if (driver_port >= 0) {
+ char* server_address = nullptr;
+ gpr_join_host_port(&server_address, "::", driver_port);
+ builder.AddListeningPort(
+ server_address,
+ GetCredentialsProvider()->GetServerCredentials(credential_type));
+ gpr_free(server_address);
+ }
builder.RegisterService(impl_.get());
- gpr_free(server_address);
-
server_ = builder.BuildAndStart();
}
diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h
index 360125fb17..a5167426d0 100644
--- a/test/cpp/qps/qps_worker.h
+++ b/test/cpp/qps/qps_worker.h
@@ -21,17 +21,21 @@
#include <memory>
+#include <grpc++/server.h>
+#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
#include <grpc/support/atm.h>
-namespace grpc {
+#include "test/cpp/qps/server.h"
-class Server;
+namespace grpc {
namespace testing {
class WorkerServiceImpl;
+extern std::vector<grpc::testing::Server*>* g_inproc_servers;
+
class QpsWorker {
public:
explicit QpsWorker(int driver_port, int server_port,
@@ -41,9 +45,13 @@ class QpsWorker {
bool Done() const;
void MarkDone();
+ std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args) {
+ return server_->InProcessChannel(args);
+ }
+
private:
std::unique_ptr<WorkerServiceImpl> impl_;
- std::unique_ptr<Server> server_;
+ std::unique_ptr<grpc::Server> server_;
gpr_atm done_;
};
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
index 137b33ee25..bb415e9d63 100644
--- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -23,6 +23,7 @@
#include "test/cpp/qps/benchmark_config.h"
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
+#include "test/cpp/qps/server.h"
#include "test/cpp/util/test_config.h"
#include "test/cpp/util/test_credentials_provider.h"
@@ -52,8 +53,9 @@ static void RunSynchronousUnaryPingPong() {
client_config.mutable_security_params()->CopyFrom(security);
server_config.mutable_security_params()->CopyFrom(security);
- const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
- BENCHMARK, -2, "", kInsecureCredentialsType);
+ const auto result =
+ RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
+ kInsecureCredentialsType, false);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 16d101d5e6..9da33566dd 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -42,10 +42,9 @@ class Server {
explicit Server(const ServerConfig& config)
: timer_(new UsageTimer), last_reset_poll_count_(0) {
cores_ = gpr_cpu_num_cores();
- if (config.port()) {
+ if (config.port()) { // positive for a fixed port, negative for inproc
port_ = config.port();
-
- } else {
+ } else { // zero for dynamic port
port_ = grpc_pick_unused_port_or_die();
}
}
@@ -115,6 +114,9 @@ class Server {
return 0;
}
+ virtual std::shared_ptr<Channel> InProcessChannel(
+ const ChannelArguments& args) = 0;
+
protected:
static void ApplyConfigToBuilder(const ServerConfig& config,
ServerBuilder* builder) {
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4a82f98199..1c1a5636a9 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -70,18 +70,21 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerAsyncReaderWriter<ResponseType, RequestType> *,
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_both_ways_function,
- std::function<grpc::Status(const PayloadConfig &, const RequestType *,
+ std::function<grpc::Status(const PayloadConfig &, RequestType *,
ResponseType *)>
process_rpc)
: Server(config) {
- char *server_address = NULL;
-
- gpr_join_host_port(&server_address, "::", port());
-
ServerBuilder builder;
- builder.AddListeningPort(server_address,
- Server::CreateServerCredentials(config));
- gpr_free(server_address);
+
+ auto port_num = port();
+ // Negative port number means inproc server, so no listen port needed
+ if (port_num >= 0) {
+ char *server_address = NULL;
+ gpr_join_host_port(&server_address, "::", port_num);
+ builder.AddListeningPort(server_address,
+ Server::CreateServerCredentials(config));
+ gpr_free(server_address);
+ }
register_service(&builder, &async_service_);
@@ -183,6 +186,11 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
return count;
}
+ std::shared_ptr<Channel> InProcessChannel(
+ const ChannelArguments &args) override {
+ return server_->InProcessChannel(args);
+ }
+
private:
void ShutdownThreadFunc() {
// TODO (vpai): Remove this deadline and allow Shutdown to finish properly
@@ -194,23 +202,31 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
- ServerRpcContext *ctx = detag(got_tag);
+ if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ServerRpcContext *ctx;
+ std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex;
+ do {
+ ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) {
+ mu_ptr->unlock();
return;
}
- std::lock_guard<ServerRpcContext> l2(*ctx);
- const bool still_going = ctx->RunNextState(ok);
- // if this RPC context is done, refresh it
- if (!still_going) {
- ctx->Reset();
- }
- }
- return;
+ } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, mu_ptr]() {
+ ctx->lock();
+ if (!ctx->RunNextState(ok)) {
+ ctx->Reset();
+ }
+ ctx->unlock();
+ mu_ptr->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
}
class ServerRpcContext {
@@ -238,7 +254,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncResponseWriter<ResponseType> *,
void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextUnaryImpl::invoker),
@@ -284,8 +300,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
@@ -296,7 +311,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerContextType *,
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingImpl::request_done),
@@ -364,8 +379,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerContextType *,
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
@@ -377,7 +391,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReader<ResponseType, RequestType> *,
void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
@@ -435,8 +449,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReader<ResponseType, RequestType> *,
void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
};
@@ -447,7 +460,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncWriter<ResponseType> *, void *)>
request_method,
- std::function<grpc::Status(const RequestType *, ResponseType *)>
+ std::function<grpc::Status(RequestType *, ResponseType *)>
invoke_method)
: srv_ctx_(new ServerContextType),
next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
@@ -504,8 +517,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::function<void(ServerContextType *, RequestType *,
grpc::ServerAsyncWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType *, ResponseType *)>
- invoke_method_;
+ std::function<grpc::Status(RequestType *, ResponseType *)> invoke_method_;
grpc::ServerAsyncWriter<ResponseType> stream_;
};
@@ -534,8 +546,7 @@ static void RegisterGenericService(ServerBuilder *builder,
builder->RegisterAsyncGenericService(service);
}
-static Status ProcessSimpleRPC(const PayloadConfig &,
- const SimpleRequest *request,
+static Status ProcessSimpleRPC(const PayloadConfig &, SimpleRequest *request,
SimpleResponse *response) {
if (request->response_size() > 0) {
if (!Server::SetPayload(request->response_type(), request->response_size(),
@@ -543,12 +554,17 @@ static Status ProcessSimpleRPC(const PayloadConfig &,
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
}
+ // We are done using the request. Clear it to reduce working memory.
+ // This proves to reduce cache misses in large message size cases.
+ request->Clear();
return Status::OK;
}
static Status ProcessGenericRPC(const PayloadConfig &payload_config,
- const ByteBuffer *request,
- ByteBuffer *response) {
+ ByteBuffer *request, ByteBuffer *response) {
+ // We are done using the request. Clear it to reduce working memory.
+ // This proves to reduce cache misses in large message size cases.
+ request->Clear();
int resp_size = payload_config.bytebuf_params().resp_size();
std::unique_ptr<char[]> buf(new char[resp_size]);
Slice slice(buf.get(), resp_size);
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index 9954e2c0bf..4ef57bd08b 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -156,12 +156,15 @@ class SynchronousServer final : public grpc::testing::Server {
explicit SynchronousServer(const ServerConfig& config) : Server(config) {
ServerBuilder builder;
- char* server_address = NULL;
-
- gpr_join_host_port(&server_address, "::", port());
- builder.AddListeningPort(server_address,
- Server::CreateServerCredentials(config));
- gpr_free(server_address);
+ auto port_num = port();
+ // Negative port number means inproc server, so no listen port needed
+ if (port_num >= 0) {
+ char* server_address = NULL;
+ gpr_join_host_port(&server_address, "::", port_num);
+ builder.AddListeningPort(server_address,
+ Server::CreateServerCredentials(config));
+ gpr_free(server_address);
+ }
ApplyConfigToBuilder(config, &builder);
@@ -170,6 +173,11 @@ class SynchronousServer final : public grpc::testing::Server {
impl_ = builder.BuildAndStart();
}
+ std::shared_ptr<Channel> InProcessChannel(
+ const ChannelArguments& args) override {
+ return impl_->InProcessChannel(args);
+ }
+
private:
BenchmarkServiceImpl service_;
std::unique_ptr<grpc::Server> impl_;
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 27010b7315..38287464d9 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -20,6 +20,7 @@
#include <chrono>
#include <thread>
+#include <vector>
#include <gflags/gflags.h>
#include <grpc/grpc.h>
@@ -41,6 +42,8 @@ static void sigint_handler(int x) { got_sigint = true; }
namespace grpc {
namespace testing {
+std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
+
static void RunServer() {
QpsWorker worker(FLAGS_driver_port, FLAGS_server_port, FLAGS_credential_type);
diff --git a/test/cpp/util/create_test_channel.cc b/test/cpp/util/create_test_channel.cc
index 34b6d60d01..4d047473b9 100644
--- a/test/cpp/util/create_test_channel.cc
+++ b/test/cpp/util/create_test_channel.cc
@@ -74,7 +74,7 @@ std::shared_ptr<Channel> CreateTestChannel(
ChannelArguments channel_args(args);
std::shared_ptr<ChannelCredentials> channel_creds;
if (cred_type.empty()) {
- return CreateChannel(server, InsecureChannelCredentials());
+ return CreateCustomChannel(server, InsecureChannelCredentials(), args);
} else if (cred_type == testing::kTlsCredentialsType) { // cred_type == "ssl"
if (use_prod_roots) {
gpr_once_init(&g_once_init_add_prod_ssl_provider, &AddProdSslType);
@@ -101,7 +101,7 @@ std::shared_ptr<Channel> CreateTestChannel(
cred_type, &channel_args);
GPR_ASSERT(channel_creds != nullptr);
- return CreateChannel(server, channel_creds);
+ return CreateCustomChannel(server, channel_creds, args);
}
}
diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc
index 69a6876a3f..16a00fb201 100644
--- a/test/cpp/util/error_details_test.cc
+++ b/test/cpp/util/error_details_test.cc
@@ -82,7 +82,7 @@ TEST(SetTest, NullInput) {
TEST(SetTest, OutOfScopeErrorCode) {
google::rpc::Status expected;
- expected.set_code(20); // Out of scope (DATA_LOSS is 15).
+ expected.set_code(17); // Out of scope (UNAUTHENTICATED is 16).
expected.set_message("I am an error message");
testing::EchoRequest expected_details;
expected_details.set_message(grpc::string(100, '\0'));
@@ -96,6 +96,24 @@ TEST(SetTest, OutOfScopeErrorCode) {
EXPECT_EQ(expected.SerializeAsString(), to.error_details());
}
+TEST(SetTest, ValidScopeErrorCode) {
+ for (int c = StatusCode::OK; c <= StatusCode::UNAUTHENTICATED; c++) {
+ google::rpc::Status expected;
+ expected.set_code(c);
+ expected.set_message("I am an error message");
+ testing::EchoRequest expected_details;
+ expected_details.set_message(grpc::string(100, '\0'));
+ expected.add_details()->PackFrom(expected_details);
+
+ Status to;
+ Status s = SetErrorDetails(expected, &to);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(c, to.error_code());
+ EXPECT_EQ(expected.message(), to.error_message());
+ EXPECT_EQ(expected.SerializeAsString(), to.error_details());
+ }
+}
+
} // namespace
} // namespace grpc