aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/client/credentials_test.cc4
-rw-r--r--test/cpp/codegen/BUILD2
-rw-r--r--test/cpp/codegen/compiler_test_golden150
-rw-r--r--test/cpp/codegen/compiler_test_mock_golden34
-rw-r--r--test/cpp/codegen/golden_file_test.cc31
-rw-r--r--test/cpp/end2end/BUILD23
-rw-r--r--test/cpp/end2end/async_end2end_test.cc28
-rw-r--r--test/cpp/end2end/end2end_test.cc33
-rw-r--r--test/cpp/end2end/filter_end2end_test.cc5
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc639
-rw-r--r--test/cpp/end2end/mock_test.cc279
-rw-r--r--test/cpp/end2end/test_service_impl.cc5
-rw-r--r--test/cpp/grpclb/grpclb_api_test.cc14
-rw-r--r--test/cpp/grpclb/grpclb_test.cc68
-rw-r--r--test/cpp/interop/BUILD90
-rw-r--r--test/cpp/interop/client.cc7
-rw-r--r--test/cpp/interop/client_helper.cc3
-rw-r--r--test/cpp/interop/client_helper.h4
-rw-r--r--test/cpp/interop/http2_client.cc2
-rw-r--r--test/cpp/interop/interop_client.cc20
-rw-r--r--test/cpp/microbenchmarks/BUILD11
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc222
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_hpack.cc154
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc90
-rw-r--r--test/cpp/microbenchmarks/bm_cq.cc16
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc162
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc15
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc19
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_trickle.cc270
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc9
-rw-r--r--test/cpp/microbenchmarks/bm_pollset.cc36
-rw-r--r--test/cpp/microbenchmarks/fullstack_fixtures.h103
-rw-r--r--test/cpp/microbenchmarks/helpers.cc4
-rw-r--r--test/cpp/microbenchmarks/helpers.h3
-rw-r--r--test/cpp/performance/writes_per_rpc_test.cc4
-rw-r--r--test/cpp/qps/benchmark_config.cc15
-rw-r--r--test/cpp/qps/client.h7
-rw-r--r--test/cpp/qps/client_async.cc462
-rw-r--r--test/cpp/qps/client_sync.cc238
-rw-r--r--test/cpp/qps/qps_worker.cc12
-rw-r--r--test/cpp/qps/report.cc35
-rw-r--r--test/cpp/qps/report.h18
-rw-r--r--test/cpp/qps/server_async.cc190
-rw-r--r--test/cpp/qps/server_sync.cc111
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc2
-rw-r--r--test/cpp/util/BUILD49
-rw-r--r--test/cpp/util/cli_call.cc2
-rw-r--r--test/cpp/util/error_details_test.cc120
-rw-r--r--test/cpp/util/grpc_tool.cc1
49 files changed, 3358 insertions, 463 deletions
diff --git a/test/cpp/client/credentials_test.cc b/test/cpp/client/credentials_test.cc
index 418a54439a..23b3b2ef3f 100644
--- a/test/cpp/client/credentials_test.cc
+++ b/test/cpp/client/credentials_test.cc
@@ -50,6 +50,10 @@ TEST_F(CredentialsTest, InvalidGoogleRefreshToken) {
EXPECT_EQ(static_cast<CallCredentials*>(nullptr), bad1.get());
}
+TEST_F(CredentialsTest, DefaultCredentials) {
+ auto creds = GoogleDefaultCredentials();
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/codegen/BUILD b/test/cpp/codegen/BUILD
index 90325414b1..f974e63eb4 100644
--- a/test/cpp/codegen/BUILD
+++ b/test/cpp/codegen/BUILD
@@ -70,7 +70,7 @@ grpc_cc_test(
grpc_cc_test(
name = "golden_file_test",
srcs = ["golden_file_test.cc"],
- args = ["--generated_file_path=$(GENDIR)/src/proto/grpc/testing/compiler_test.grpc.pb.h"],
+ args = ["--generated_file_path=$(GENDIR)/src/proto/grpc/testing/"],
data = [
":compiler_test_golden",
"//src/proto/grpc/testing:_compiler_test_proto_grpc_codegen",
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index fd26a17ac1..eb12d371ea 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -69,6 +69,9 @@ namespace testing {
// ServiceA leading comment 1
class ServiceA final {
public:
+ static constexpr char const* service_full_name() {
+ return "grpc.testing.ServiceA";
+ }
class StubInterface {
public:
virtual ~StubInterface() {}
@@ -89,10 +92,30 @@ class ServiceA final {
return std::unique_ptr< ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag));
}
// MethodA2 trailing comment 1
+ // Method A3 leading comment 1
+ std::unique_ptr< ::grpc::ClientReaderInterface< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) {
+ return std::unique_ptr< ::grpc::ClientReaderInterface< ::grpc::testing::Response>>(MethodA3Raw(context, request));
+ }
+ std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) {
+ return std::unique_ptr< ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag));
+ }
+ // Method A3 trailing comment 1
+ // Method A4 leading comment 1
+ std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) {
+ return std::unique_ptr< ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(MethodA4Raw(context));
+ }
+ std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
+ return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag));
+ }
+ // Method A4 trailing comment 1
private:
virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0;
virtual ::grpc::ClientWriterInterface< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) = 0;
virtual ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) = 0;
+ virtual ::grpc::ClientReaderInterface< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) = 0;
+ virtual ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) = 0;
+ virtual ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) = 0;
+ virtual ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) = 0;
};
class Stub final : public StubInterface {
public:
@@ -107,14 +130,32 @@ class ServiceA final {
std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>> AsyncMethodA2(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) {
return std::unique_ptr< ::grpc::ClientAsyncWriter< ::grpc::testing::Request>>(AsyncMethodA2Raw(context, response, cq, tag));
}
+ std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>> MethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request) {
+ return std::unique_ptr< ::grpc::ClientReader< ::grpc::testing::Response>>(MethodA3Raw(context, request));
+ }
+ std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>> AsyncMethodA3(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) {
+ return std::unique_ptr< ::grpc::ClientAsyncReader< ::grpc::testing::Response>>(AsyncMethodA3Raw(context, request, cq, tag));
+ }
+ std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> MethodA4(::grpc::ClientContext* context) {
+ return std::unique_ptr< ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(MethodA4Raw(context));
+ }
+ std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> AsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) {
+ return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(AsyncMethodA4Raw(context, cq, tag));
+ }
private:
std::shared_ptr< ::grpc::ChannelInterface> channel_;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override;
::grpc::ClientAsyncWriter< ::grpc::testing::Request>* AsyncMethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag) override;
+ ::grpc::ClientReader< ::grpc::testing::Response>* MethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request) override;
+ ::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override;
+ ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override;
+ ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
const ::grpc::RpcMethod rpcmethod_MethodA1_;
const ::grpc::RpcMethod rpcmethod_MethodA2_;
+ const ::grpc::RpcMethod rpcmethod_MethodA3_;
+ const ::grpc::RpcMethod rpcmethod_MethodA4_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
@@ -131,6 +172,12 @@ class ServiceA final {
// Method A2 leading comment 2
virtual ::grpc::Status MethodA2(::grpc::ServerContext* context, ::grpc::ServerReader< ::grpc::testing::Request>* reader, ::grpc::testing::Response* response);
// MethodA2 trailing comment 1
+ // Method A3 leading comment 1
+ virtual ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer);
+ // Method A3 trailing comment 1
+ // Method A4 leading comment 1
+ virtual ::grpc::Status MethodA4(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream);
+ // Method A4 trailing comment 1
};
template <class BaseClass>
class WithAsyncMethod_MethodA1 : public BaseClass {
@@ -172,7 +219,47 @@ class ServiceA final {
::grpc::Service::RequestAsyncClientStreaming(1, context, reader, new_call_cq, notification_cq, tag);
}
};
- typedef WithAsyncMethod_MethodA1<WithAsyncMethod_MethodA2<Service > > AsyncService;
+ template <class BaseClass>
+ class WithAsyncMethod_MethodA3 : public BaseClass {
+ private:
+ void BaseClassMustBeDerivedFromService(const Service *service) {}
+ public:
+ WithAsyncMethod_MethodA3() {
+ ::grpc::Service::MarkMethodAsync(2);
+ }
+ ~WithAsyncMethod_MethodA3() override {
+ BaseClassMustBeDerivedFromService(this);
+ }
+ // disable synchronous version of this method
+ ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer) final override {
+ abort();
+ return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+ }
+ void RequestMethodA3(::grpc::ServerContext* context, ::grpc::testing::Request* request, ::grpc::ServerAsyncWriter< ::grpc::testing::Response>* writer, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
+ ::grpc::Service::RequestAsyncServerStreaming(2, context, request, writer, new_call_cq, notification_cq, tag);
+ }
+ };
+ template <class BaseClass>
+ class WithAsyncMethod_MethodA4 : public BaseClass {
+ private:
+ void BaseClassMustBeDerivedFromService(const Service *service) {}
+ public:
+ WithAsyncMethod_MethodA4() {
+ ::grpc::Service::MarkMethodAsync(3);
+ }
+ ~WithAsyncMethod_MethodA4() override {
+ BaseClassMustBeDerivedFromService(this);
+ }
+ // disable synchronous version of this method
+ ::grpc::Status MethodA4(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream) final override {
+ abort();
+ return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+ }
+ void RequestMethodA4(::grpc::ServerContext* context, ::grpc::ServerAsyncReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream, ::grpc::CompletionQueue* new_call_cq, ::grpc::ServerCompletionQueue* notification_cq, void *tag) {
+ ::grpc::Service::RequestAsyncBidiStreaming(3, context, stream, new_call_cq, notification_cq, tag);
+ }
+ };
+ typedef WithAsyncMethod_MethodA1<WithAsyncMethod_MethodA2<WithAsyncMethod_MethodA3<WithAsyncMethod_MethodA4<Service > > > > AsyncService;
template <class BaseClass>
class WithGenericMethod_MethodA1 : public BaseClass {
private:
@@ -208,6 +295,40 @@ class ServiceA final {
}
};
template <class BaseClass>
+ class WithGenericMethod_MethodA3 : public BaseClass {
+ private:
+ void BaseClassMustBeDerivedFromService(const Service *service) {}
+ public:
+ WithGenericMethod_MethodA3() {
+ ::grpc::Service::MarkMethodGeneric(2);
+ }
+ ~WithGenericMethod_MethodA3() override {
+ BaseClassMustBeDerivedFromService(this);
+ }
+ // disable synchronous version of this method
+ ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer) final override {
+ abort();
+ return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+ }
+ };
+ template <class BaseClass>
+ class WithGenericMethod_MethodA4 : public BaseClass {
+ private:
+ void BaseClassMustBeDerivedFromService(const Service *service) {}
+ public:
+ WithGenericMethod_MethodA4() {
+ ::grpc::Service::MarkMethodGeneric(3);
+ }
+ ~WithGenericMethod_MethodA4() override {
+ BaseClassMustBeDerivedFromService(this);
+ }
+ // disable synchronous version of this method
+ ::grpc::Status MethodA4(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::grpc::testing::Response, ::grpc::testing::Request>* stream) final override {
+ abort();
+ return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+ }
+ };
+ template <class BaseClass>
class WithStreamedUnaryMethod_MethodA1 : public BaseClass {
private:
void BaseClassMustBeDerivedFromService(const Service *service) {}
@@ -228,13 +349,36 @@ class ServiceA final {
virtual ::grpc::Status StreamedMethodA1(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::grpc::testing::Request,::grpc::testing::Response>* server_unary_streamer) = 0;
};
typedef WithStreamedUnaryMethod_MethodA1<Service > StreamedUnaryService;
- typedef Service SplitStreamedService;
- typedef WithStreamedUnaryMethod_MethodA1<Service > StreamedService;
+ template <class BaseClass>
+ class WithSplitStreamingMethod_MethodA3 : public BaseClass {
+ private:
+ void BaseClassMustBeDerivedFromService(const Service *service) {}
+ public:
+ WithSplitStreamingMethod_MethodA3() {
+ ::grpc::Service::MarkMethodStreamed(2,
+ new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2)));
+ }
+ ~WithSplitStreamingMethod_MethodA3() override {
+ BaseClassMustBeDerivedFromService(this);
+ }
+ // disable regular version of this method
+ ::grpc::Status MethodA3(::grpc::ServerContext* context, const ::grpc::testing::Request* request, ::grpc::ServerWriter< ::grpc::testing::Response>* writer) final override {
+ abort();
+ return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
+ }
+ // replace default version of method with split streamed
+ virtual ::grpc::Status StreamedMethodA3(::grpc::ServerContext* context, ::grpc::ServerSplitStreamer< ::grpc::testing::Request,::grpc::testing::Response>* server_split_streamer) = 0;
+ };
+ typedef WithSplitStreamingMethod_MethodA3<Service > SplitStreamedService;
+ typedef WithStreamedUnaryMethod_MethodA1<WithSplitStreamingMethod_MethodA3<Service > > StreamedService;
};
// ServiceB leading comment 1
class ServiceB final {
public:
+ static constexpr char const* service_full_name() {
+ return "grpc.testing.ServiceB";
+ }
class StubInterface {
public:
virtual ~StubInterface() {}
diff --git a/test/cpp/codegen/compiler_test_mock_golden b/test/cpp/codegen/compiler_test_mock_golden
new file mode 100644
index 0000000000..8e4b4d5911
--- /dev/null
+++ b/test/cpp/codegen/compiler_test_mock_golden
@@ -0,0 +1,34 @@
+// Generated by the gRPC C++ plugin.
+// If you make any local change, they will be lost.
+// source: src/proto/grpc/testing/compiler_test.proto
+
+#include "src/proto/grpc/testing/compiler_test.pb.h"
+#include "src/proto/grpc/testing/compiler_test.grpc.pb.h"
+
+#include <grpc++/impl/codegen/async_stream.h>
+#include <grpc++/impl/codegen/sync_stream.h>
+#include <gmock/gmock.h>
+namespace grpc {
+namespace testing {
+
+class MockServiceAStub : public ServiceA::StubInterface {
+ public:
+ MOCK_METHOD3(MethodA1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response));
+ MOCK_METHOD3(AsyncMethodA1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq));
+ MOCK_METHOD2(MethodA2Raw, ::grpc::ClientWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response));
+ MOCK_METHOD4(AsyncMethodA2Raw, ::grpc::ClientAsyncWriterInterface< ::grpc::testing::Request>*(::grpc::ClientContext* context, ::grpc::testing::Response* response, ::grpc::CompletionQueue* cq, void* tag));
+ MOCK_METHOD2(MethodA3Raw, ::grpc::ClientReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request));
+ MOCK_METHOD4(AsyncMethodA3Raw, ::grpc::ClientAsyncReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag));
+ MOCK_METHOD1(MethodA4Raw, ::grpc::ClientReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context));
+ MOCK_METHOD3(AsyncMethodA4Raw, ::grpc::ClientAsyncReaderWriterInterface<::grpc::testing::Request, ::grpc::testing::Response>*(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag));
+};
+
+class MockServiceBStub : public ServiceB::StubInterface {
+ public:
+ MOCK_METHOD3(MethodB1, ::grpc::Status(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response));
+ MOCK_METHOD3(AsyncMethodB1Raw, ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>*(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq));
+};
+
+} // namespace grpc
+} // namespace testing
+
diff --git a/test/cpp/codegen/golden_file_test.cc b/test/cpp/codegen/golden_file_test.cc
index 158a4d933c..7789ac738b 100644
--- a/test/cpp/codegen/golden_file_test.cc
+++ b/test/cpp/codegen/golden_file_test.cc
@@ -37,16 +37,18 @@
#include <gflags/gflags.h>
#include <gtest/gtest.h>
-DEFINE_string(generated_file_path, "",
- "path to the generated compiler_test.grpc.pb.h file");
+DEFINE_string(
+ generated_file_path, "",
+ "path to the directory containing generated files compiler_test.grpc.pb.h"
+ "and compiler_test_mock.grpc.pb.h");
const char kGoldenFilePath[] = "test/cpp/codegen/compiler_test_golden";
+const char kMockGoldenFilePath[] = "test/cpp/codegen/compiler_test_mock_golden";
-TEST(GoldenFileTest, TestGeneratedFile) {
- ASSERT_FALSE(FLAGS_generated_file_path.empty());
-
- std::ifstream generated(FLAGS_generated_file_path);
- std::ifstream golden(kGoldenFilePath);
+void run_test(std::basic_string<char> generated_file,
+ std::basic_string<char> golden_file) {
+ std::ifstream generated(generated_file);
+ std::ifstream golden(golden_file);
ASSERT_TRUE(generated.good());
ASSERT_TRUE(golden.good());
@@ -61,8 +63,23 @@ TEST(GoldenFileTest, TestGeneratedFile) {
golden.close();
}
+TEST(GoldenFileTest, TestGeneratedFile) {
+ run_test(FLAGS_generated_file_path + "compiler_test.grpc.pb.h",
+ kGoldenFilePath);
+}
+
+TEST(GoldenMockFileTest, TestGeneratedMockFile) {
+ run_test(FLAGS_generated_file_path + "compiler_test_mock.grpc.pb.h",
+ kMockGoldenFilePath);
+}
+
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
::google::ParseCommandLineFlags(&argc, &argv, true);
+ if (FLAGS_generated_file_path.empty()) {
+ FLAGS_generated_file_path = "gens/src/proto/grpc/testing/";
+ }
+ if (FLAGS_generated_file_path.back() != '/')
+ FLAGS_generated_file_path.append("/");
return RUN_ALL_TESTS();
}
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index 5390fe15c6..cbfc720916 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -53,6 +53,7 @@ grpc_cc_test(
"//:gpr",
"//:grpc",
"//:grpc++",
+ "//src/proto/grpc/health/v1:health_proto",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
@@ -230,6 +231,28 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "grpclb_end2end_test",
+ srcs = ["grpclb_end2end_test.cc"],
+ deps = [
+ ":test_service_impl",
+ "//:gpr",
+ "//:grpc",
+ "//:grpc++",
+ "//src/proto/grpc/lb/v1:load_balancer_proto",
+ "//src/proto/grpc/testing:echo_messages_proto",
+ "//src/proto/grpc/testing:echo_proto",
+ "//src/proto/grpc/testing/duplicate:echo_duplicate_proto",
+ "//test/core/end2end:fake_resolver",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ "//test/cpp/util:test_util",
+ ],
+ external_deps = [
+ "gtest",
+ ],
+)
+
+grpc_cc_test(
name = "proto_server_reflection_test",
srcs = ["proto_server_reflection_test.cc"],
deps = [
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 317c8e4032..8d59cbb7e0 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -38,6 +38,7 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
+#include <grpc++/ext/health_check_service_server_builder_option.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
@@ -48,6 +49,7 @@
#include <grpc/support/tls.h>
#include "src/core/lib/iomgr/port.h"
+#include "src/proto/grpc/health/v1/health.grpc.pb.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@@ -225,13 +227,15 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
class TestScenario {
public:
- TestScenario(bool non_block, const grpc::string& creds_type,
+ TestScenario(bool non_block, const grpc::string& creds_type, bool hcs,
const grpc::string& content)
: disable_blocking(non_block),
+ health_check_service(hcs),
credentials_type(creds_type),
message_content(content) {}
void Log() const;
bool disable_blocking;
+ bool health_check_service;
// Although the below grpc::string's are logically const, we can't declare
// them const because of a limitation in the way old compilers (e.g., gcc-4.4)
// manage vector insertion using a copy constructor
@@ -244,6 +248,8 @@ static std::ostream& operator<<(std::ostream& out,
return out << "TestScenario{disable_blocking="
<< (scenario.disable_blocking ? "true" : "false")
<< ", credentials='" << scenario.credentials_type
+ << ", health_check_service="
+ << (scenario.health_check_service ? "true" : "false")
<< "', message_size=" << scenario.message_content.size() << "}";
}
@@ -253,6 +259,8 @@ void TestScenario::Log() const {
gpr_log(GPR_DEBUG, "%s", out.str().c_str());
}
+class HealthCheck : public health::v1::Health::Service {};
+
class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
AsyncEnd2endTest() { GetParam().Log(); }
@@ -269,6 +277,9 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
+ if (GetParam().health_check_service) {
+ builder.RegisterService(&health_check_);
+ }
cq_ = builder.AddCompletionQueue();
// TODO(zyc): make a test option to choose wheather sync plugins should be
@@ -341,6 +352,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
grpc::testing::EchoTestService::AsyncService service_;
+ HealthCheck health_check_;
std::ostringstream server_address_;
int port_;
@@ -1755,12 +1767,14 @@ std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
messages.push_back(big_msg);
}
- for (auto cred = credentials_types.begin(); cred != credentials_types.end();
- ++cred) {
- for (auto msg = messages.begin(); msg != messages.end(); msg++) {
- scenarios.emplace_back(false, *cred, *msg);
- if (test_disable_blocking) {
- scenarios.emplace_back(true, *cred, *msg);
+ for (auto health_check_service : {false, true}) {
+ for (auto cred = credentials_types.begin(); cred != credentials_types.end();
+ ++cred) {
+ for (auto msg = messages.begin(); msg != messages.end(); msg++) {
+ scenarios.emplace_back(false, *cred, health_check_service, *msg);
+ if (test_disable_blocking) {
+ scenarios.emplace_back(true, *cred, health_check_service, *msg);
+ }
}
}
}
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 85a2d3b8ed..9a9e81853f 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1131,6 +1131,39 @@ TEST_P(End2endTest, BinaryTrailerTest) {
EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
}
+TEST_P(End2endTest, ExpectErrorTest) {
+ ResetStub();
+
+ std::vector<ErrorStatus> expected_status;
+ expected_status.emplace_back();
+ expected_status.back().set_code(13); // INTERNAL
+ expected_status.back().set_error_message("text error message");
+ expected_status.back().set_binary_error_details("text error details");
+ expected_status.emplace_back();
+ expected_status.back().set_code(13); // INTERNAL
+ expected_status.back().set_error_message("text error message");
+ expected_status.back().set_binary_error_details(
+ "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
+
+ for (auto iter = expected_status.begin(); iter != expected_status.end();
+ ++iter) {
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext context;
+ request.set_message("Hello");
+ auto* error = request.mutable_param()->mutable_expected_error();
+ error->set_code(iter->code());
+ error->set_error_message(iter->error_message());
+ error->set_binary_error_details(iter->binary_error_details());
+
+ Status s = stub_->Echo(&context, request, &response);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(iter->code(), s.error_code());
+ EXPECT_EQ(iter->error_message(), s.error_message());
+ EXPECT_EQ(iter->binary_error_details(), s.error_details());
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
// Test with and without a proxy.
class ProxyEnd2endTest : public End2endTest {
diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc
index 5f1ebd5412..5589e2fddc 100644
--- a/test/cpp/end2end/filter_end2end_test.cc
+++ b/test/cpp/end2end/filter_end2end_test.cc
@@ -123,8 +123,9 @@ class ChannelDataImpl : public ChannelData {
class CallDataImpl : public CallData {
public:
- void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- TransportStreamOp* op) override {
+ void StartTransportStreamOpBatch(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ TransportStreamOpBatch* op) override {
// Incrementing the counter could be done from Init(), but we want
// to test that the individual methods are actually called correctly.
if (op->recv_initial_metadata() != nullptr) IncrementCallCounter();
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
new file mode 100644
index 0000000000..8417f1a99c
--- /dev/null
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -0,0 +1,639 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <memory>
+#include <mutex>
+#include <sstream>
+#include <thread>
+
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <gtest/gtest.h>
+
+extern "C" {
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "test/core/end2end/fake_resolver.h"
+}
+
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/test_service_impl.h"
+
+#include "src/proto/grpc/lb/v1/load_balancer.grpc.pb.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+
+// TODO(dgq): Other scenarios in need of testing:
+// - Send a serverlist with faulty ip:port addresses (port > 2^16, etc).
+// - Test reception of invalid serverlist
+// - Test pinging
+// - Test against a non-LB server.
+// - Random LB server closing the stream unexpectedly.
+// - Test using DNS-resolvable names (localhost?)
+// - Test handling of creation of faulty RR instance by having the LB return a
+// serverlist with non-existent backends after having initially returned a
+// valid one.
+//
+// Findings from end to end testing to be covered here:
+// - Handling of LB servers restart, including reconnection after backing-off
+// retries.
+// - Destruction of load balanced channel (and therefore of grpclb instance)
+// while:
+// 1) the internal LB call is still active. This should work by virtue
+// of the weak reference the LB call holds. The call should be terminated as
+// part of the grpclb shutdown process.
+// 2) the retry timer is active. Again, the weak reference it holds should
+// prevent a premature call to \a glb_destroy.
+// - Restart of backend servers with no changes to serverlist. This exercises
+// the RR handover mechanism.
+
+using std::chrono::system_clock;
+
+using grpc::lb::v1::LoadBalanceResponse;
+using grpc::lb::v1::LoadBalanceRequest;
+using grpc::lb::v1::LoadBalancer;
+
+namespace grpc {
+namespace testing {
+namespace {
+
+template <typename ServiceType>
+class CountedService : public ServiceType {
+ public:
+ int request_count() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return request_count_;
+ }
+
+ int response_count() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return response_count_;
+ }
+
+ void IncreaseResponseCount() {
+ std::unique_lock<std::mutex> lock(mu_);
+ ++response_count_;
+ }
+ void IncreaseRequestCount() {
+ std::unique_lock<std::mutex> lock(mu_);
+ ++request_count_;
+ }
+
+ protected:
+ std::mutex mu_;
+
+ private:
+ int request_count_ = 0;
+ int response_count_ = 0;
+};
+
+using BackendService = CountedService<TestServiceImpl>;
+using BalancerService = CountedService<LoadBalancer::Service>;
+
+class BackendServiceImpl : public BackendService {
+ public:
+ BackendServiceImpl() {}
+
+ Status Echo(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) override {
+ IncreaseRequestCount();
+ const auto status = TestServiceImpl::Echo(context, request, response);
+ IncreaseResponseCount();
+ return status;
+ }
+};
+
+grpc::string Ip4ToPackedString(const char* ip_str) {
+ struct in_addr ip4;
+ GPR_ASSERT(inet_pton(AF_INET, ip_str, &ip4) == 1);
+ return grpc::string(reinterpret_cast<const char*>(&ip4), sizeof(ip4));
+}
+
+struct ClientStats {
+ size_t num_calls_started = 0;
+ size_t num_calls_finished = 0;
+ size_t num_calls_finished_with_drop_for_rate_limiting = 0;
+ size_t num_calls_finished_with_drop_for_load_balancing = 0;
+ size_t num_calls_finished_with_client_failed_to_send = 0;
+ size_t num_calls_finished_known_received = 0;
+
+ ClientStats& operator+=(const ClientStats& other) {
+ num_calls_started += other.num_calls_started;
+ num_calls_finished += other.num_calls_finished;
+ num_calls_finished_with_drop_for_rate_limiting +=
+ other.num_calls_finished_with_drop_for_rate_limiting;
+ num_calls_finished_with_drop_for_load_balancing +=
+ other.num_calls_finished_with_drop_for_load_balancing;
+ num_calls_finished_with_client_failed_to_send +=
+ other.num_calls_finished_with_client_failed_to_send;
+ num_calls_finished_known_received +=
+ other.num_calls_finished_known_received;
+ return *this;
+ }
+};
+
+class BalancerServiceImpl : public BalancerService {
+ public:
+ using Stream = ServerReaderWriter<LoadBalanceResponse, LoadBalanceRequest>;
+ using ResponseDelayPair = std::pair<LoadBalanceResponse, int>;
+
+ explicit BalancerServiceImpl(int client_load_reporting_interval_seconds)
+ : client_load_reporting_interval_seconds_(
+ client_load_reporting_interval_seconds),
+ shutdown_(false) {}
+
+ Status BalanceLoad(ServerContext* context, Stream* stream) override {
+ LoadBalanceRequest request;
+ stream->Read(&request);
+ IncreaseRequestCount();
+ gpr_log(GPR_INFO, "LB: recv msg '%s'", request.DebugString().c_str());
+
+ if (client_load_reporting_interval_seconds_ > 0) {
+ LoadBalanceResponse initial_response;
+ initial_response.mutable_initial_response()
+ ->mutable_client_stats_report_interval()
+ ->set_seconds(client_load_reporting_interval_seconds_);
+ stream->Write(initial_response);
+ }
+
+ std::vector<ResponseDelayPair> responses_and_delays;
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ responses_and_delays = responses_and_delays_;
+ }
+ for (const auto& response_and_delay : responses_and_delays) {
+ if (shutdown_) break;
+ SendResponse(stream, response_and_delay.first, response_and_delay.second);
+ }
+
+ if (client_load_reporting_interval_seconds_ > 0) {
+ request.Clear();
+ stream->Read(&request);
+ gpr_log(GPR_INFO, "LB: recv client load report msg: '%s'",
+ request.DebugString().c_str());
+ GPR_ASSERT(request.has_client_stats());
+ client_stats_.num_calls_started +=
+ request.client_stats().num_calls_started();
+ client_stats_.num_calls_finished +=
+ request.client_stats().num_calls_finished();
+ client_stats_.num_calls_finished_with_drop_for_rate_limiting +=
+ request.client_stats()
+ .num_calls_finished_with_drop_for_rate_limiting();
+ client_stats_.num_calls_finished_with_drop_for_load_balancing +=
+ request.client_stats()
+ .num_calls_finished_with_drop_for_load_balancing();
+ client_stats_.num_calls_finished_with_client_failed_to_send +=
+ request.client_stats()
+ .num_calls_finished_with_client_failed_to_send();
+ client_stats_.num_calls_finished_known_received +=
+ request.client_stats().num_calls_finished_known_received();
+ std::lock_guard<std::mutex> lock(mu_);
+ cond_.notify_one();
+ }
+
+ return Status::OK;
+ }
+
+ void add_response(const LoadBalanceResponse& response, int send_after_ms) {
+ std::unique_lock<std::mutex> lock(mu_);
+ responses_and_delays_.push_back(std::make_pair(response, send_after_ms));
+ }
+
+ void Shutdown() {
+ std::unique_lock<std::mutex> lock(mu_);
+ shutdown_ = true;
+ }
+
+ static LoadBalanceResponse BuildResponseForBackends(
+ const std::vector<int>& backend_ports) {
+ LoadBalanceResponse response;
+ for (const int backend_port : backend_ports) {
+ auto* server = response.mutable_server_list()->add_servers();
+ server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
+ server->set_port(backend_port);
+ }
+ return response;
+ }
+
+ const ClientStats& WaitForLoadReport() {
+ std::unique_lock<std::mutex> lock(mu_);
+ cond_.wait(lock);
+ return client_stats_;
+ }
+
+ private:
+ void SendResponse(Stream* stream, const LoadBalanceResponse& response,
+ int delay_ms) {
+ gpr_log(GPR_INFO, "LB: sleeping for %d ms...", delay_ms);
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
+ gpr_log(GPR_INFO, "LB: Woke up! Sending response '%s'",
+ response.DebugString().c_str());
+ stream->Write(response);
+ IncreaseResponseCount();
+ }
+
+ const int client_load_reporting_interval_seconds_;
+ std::vector<ResponseDelayPair> responses_and_delays_;
+ std::mutex mu_;
+ std::condition_variable cond_;
+ ClientStats client_stats_;
+ bool shutdown_;
+};
+
+class GrpclbEnd2endTest : public ::testing::Test {
+ protected:
+ GrpclbEnd2endTest(int num_backends, int num_balancers,
+ int client_load_reporting_interval_seconds)
+ : server_host_("localhost"),
+ num_backends_(num_backends),
+ num_balancers_(num_balancers),
+ client_load_reporting_interval_seconds_(
+ client_load_reporting_interval_seconds) {}
+
+ void SetUp() override {
+ response_generator_ = grpc_fake_resolver_response_generator_create();
+ // Start the backends.
+ for (size_t i = 0; i < num_backends_; ++i) {
+ backends_.emplace_back(new BackendServiceImpl());
+ backend_servers_.emplace_back(ServerThread<BackendService>(
+ "backend", server_host_, backends_.back().get()));
+ }
+ // Start the load balancers.
+ for (size_t i = 0; i < num_balancers_; ++i) {
+ balancers_.emplace_back(
+ new BalancerServiceImpl(client_load_reporting_interval_seconds_));
+ balancer_servers_.emplace_back(ServerThread<BalancerService>(
+ "balancer", server_host_, balancers_.back().get()));
+ }
+ ResetStub();
+ std::vector<AddressData> addresses;
+ for (size_t i = 0; i < balancer_servers_.size(); ++i) {
+ addresses.emplace_back(AddressData{balancer_servers_[i].port_, true, ""});
+ }
+ SetNextResolution(addresses);
+ }
+
+ void TearDown() override {
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ backend_servers_[i].Shutdown();
+ }
+ for (size_t i = 0; i < balancers_.size(); ++i) {
+ balancers_[i]->Shutdown();
+ balancer_servers_[i].Shutdown();
+ }
+ grpc_fake_resolver_response_generator_unref(response_generator_);
+ }
+
+ void ResetStub() {
+ ChannelArguments args;
+ args.SetPointer(GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
+ response_generator_);
+ std::ostringstream uri;
+ uri << "test:///servername_not_used";
+ channel_ =
+ CreateCustomChannel(uri.str(), InsecureChannelCredentials(), args);
+ stub_ = grpc::testing::EchoTestService::NewStub(channel_);
+ }
+
+ ClientStats WaitForLoadReports() {
+ ClientStats client_stats;
+ for (const auto& balancer : balancers_) {
+ client_stats += balancer->WaitForLoadReport();
+ }
+ return client_stats;
+ }
+
+ struct AddressData {
+ int port;
+ bool is_balancer;
+ grpc::string balancer_name;
+ };
+
+ void SetNextResolution(const std::vector<AddressData>& address_data) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_lb_addresses* addresses =
+ grpc_lb_addresses_create(address_data.size(), nullptr);
+ for (size_t i = 0; i < address_data.size(); ++i) {
+ char* lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:127.0.0.1:%d", address_data[i].port);
+ grpc_uri* lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true);
+ GPR_ASSERT(lb_uri != nullptr);
+ grpc_lb_addresses_set_address_from_uri(
+ addresses, i, lb_uri, address_data[i].is_balancer,
+ address_data[i].balancer_name.c_str(), nullptr);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
+ }
+ grpc_arg fake_addresses = grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args fake_result = {1, &fake_addresses};
+ grpc_fake_resolver_response_generator_set_response(
+ &exec_ctx, response_generator_, &fake_result);
+ grpc_lb_addresses_destroy(&exec_ctx, addresses);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+
+ const std::vector<int> GetBackendPorts() const {
+ std::vector<int> backend_ports;
+ for (const auto& bs : backend_servers_) {
+ backend_ports.push_back(bs.port_);
+ }
+ return backend_ports;
+ }
+
+ void ScheduleResponseForBalancer(size_t i,
+ const LoadBalanceResponse& response,
+ int delay_ms) {
+ balancers_.at(i)->add_response(response, delay_ms);
+ }
+
+ std::vector<std::pair<Status, EchoResponse>> SendRpc(const string& message,
+ int num_rpcs,
+ int timeout_ms = 1000) {
+ std::vector<std::pair<Status, EchoResponse>> results;
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message(message);
+ for (int i = 0; i < num_rpcs; i++) {
+ ClientContext context;
+ context.set_deadline(grpc_timeout_milliseconds_to_deadline(timeout_ms));
+ Status status = stub_->Echo(&context, request, &response);
+ results.push_back(std::make_pair(status, response));
+ }
+ return results;
+ }
+
+ template <typename T>
+ struct ServerThread {
+ explicit ServerThread(const grpc::string& type,
+ const grpc::string& server_host, T* service)
+ : type_(type), service_(service) {
+ port_ = grpc_pick_unused_port_or_die();
+ gpr_log(GPR_INFO, "starting %s server on port %d", type_.c_str(), port_);
+ std::mutex mu;
+ std::condition_variable cond;
+ thread_.reset(new std::thread(
+ std::bind(&ServerThread::Start, this, server_host, &mu, &cond)));
+ std::unique_lock<std::mutex> lock(mu);
+ cond.wait(lock);
+ gpr_log(GPR_INFO, "%s server startup complete", type_.c_str());
+ }
+
+ void Start(const grpc::string& server_host, std::mutex* mu,
+ std::condition_variable* cond) {
+ std::ostringstream server_address;
+ server_address << server_host << ":" << port_;
+ ServerBuilder builder;
+ builder.AddListeningPort(server_address.str(),
+ InsecureServerCredentials());
+ builder.RegisterService(service_);
+ server_ = builder.BuildAndStart();
+ std::lock_guard<std::mutex> lock(*mu);
+ cond->notify_one();
+ }
+
+ void Shutdown() {
+ gpr_log(GPR_INFO, "%s about to shutdown", type_.c_str());
+ server_->Shutdown();
+ thread_->join();
+ gpr_log(GPR_INFO, "%s shutdown completed", type_.c_str());
+ }
+
+ int port_;
+ grpc::string type_;
+ std::unique_ptr<Server> server_;
+ T* service_;
+ std::unique_ptr<std::thread> thread_;
+ };
+
+ const grpc::string kMessage_ = "Live long and prosper.";
+ const grpc::string server_host_;
+ const size_t num_backends_;
+ const size_t num_balancers_;
+ const int client_load_reporting_interval_seconds_;
+ std::shared_ptr<Channel> channel_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+
+ std::vector<std::unique_ptr<BackendServiceImpl>> backends_;
+ std::vector<std::unique_ptr<BalancerServiceImpl>> balancers_;
+
+ std::vector<ServerThread<BackendService>> backend_servers_;
+ std::vector<ServerThread<BalancerService>> balancer_servers_;
+
+ grpc_fake_resolver_response_generator* response_generator_;
+};
+
+class SingleBalancerTest : public GrpclbEnd2endTest {
+ public:
+ SingleBalancerTest() : GrpclbEnd2endTest(4, 1, 0) {}
+};
+
+TEST_F(SingleBalancerTest, Vanilla) {
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
+ // Make sure that trying to connect works without a call.
+ channel_->GetState(true /* try_to_connect */);
+ // Start servers and send 100 RPCs per server.
+ const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_);
+
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+
+ // Each backend should have gotten 100 requests.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(100, backend_servers_[i].service_->request_count());
+ }
+ // The balancer got a single request.
+ EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1, balancer_servers_[0].service_->response_count());
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
+ const int kServerlistDelayMs = 500 * grpc_test_slowdown_factor();
+ const int kCallDeadlineMs = 1000 * grpc_test_slowdown_factor();
+
+ // First response is an empty serverlist, sent right away.
+ ScheduleResponseForBalancer(0, LoadBalanceResponse(), 0);
+ // Send non-empty serverlist only after kServerlistDelayMs
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()),
+ kServerlistDelayMs);
+
+ const auto t0 = system_clock::now();
+ // Client will block: LB will initially send empty serverlist.
+ const auto& statuses_and_responses =
+ SendRpc(kMessage_, num_backends_, kCallDeadlineMs);
+ const auto ellapsed_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ system_clock::now() - t0);
+ // but eventually, the LB sends a serverlist update that allows the call to
+ // proceed. The call delay must be larger than the delay in sending the
+ // populated serverlist but under the call's deadline.
+ EXPECT_GT(ellapsed_ms.count(), kServerlistDelayMs);
+ EXPECT_LT(ellapsed_ms.count(), kCallDeadlineMs);
+
+ // Each backend should have gotten 1 request.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(1, backend_servers_[i].service_->request_count());
+ }
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+
+ // The balancer got a single request.
+ EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+ // and sent two responses.
+ EXPECT_EQ(2, balancer_servers_[0].service_->response_count());
+
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
+TEST_F(SingleBalancerTest, RepeatedServerlist) {
+ constexpr int kServerlistDelayMs = 100;
+
+ // Send a serverlist right away.
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
+ // ... and the same one a bit later.
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()),
+ kServerlistDelayMs);
+
+ // Send num_backends/2 requests.
+ auto statuses_and_responses = SendRpc(kMessage_, num_backends_ / 2);
+ // only the first half of the backends will receive them.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ if (i < backends_.size() / 2)
+ EXPECT_EQ(1, backend_servers_[i].service_->request_count());
+ else
+ EXPECT_EQ(0, backend_servers_[i].service_->request_count());
+ }
+ EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+
+ // Wait for the (duplicated) serverlist update.
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(kServerlistDelayMs * 1.1, GPR_TIMESPAN)));
+
+ // Verify the LB has sent two responses.
+ EXPECT_EQ(2, balancer_servers_[0].service_->response_count());
+
+ // Some more calls to complete the total number of backends.
+ statuses_and_responses = SendRpc(
+ kMessage_,
+ num_backends_ / 2 + (num_backends_ & 0x1) /* extra one if num_bes odd */);
+ // Because a duplicated serverlist should have no effect, all backends must
+ // have been hit once now.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(1, backend_servers_[i].service_->request_count());
+ }
+ EXPECT_EQ(statuses_and_responses.size(), num_backends_ / 2);
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+
+ // The balancer got a single request.
+ EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+ // Check LB policy name for the channel.
+ EXPECT_EQ("grpclb", channel_->GetLoadBalancingPolicyName());
+}
+
+class SingleBalancerWithClientLoadReportingTest : public GrpclbEnd2endTest {
+ public:
+ SingleBalancerWithClientLoadReportingTest() : GrpclbEnd2endTest(4, 1, 2) {}
+};
+
+TEST_F(SingleBalancerWithClientLoadReportingTest, Vanilla) {
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(GetBackendPorts()), 0);
+ // Start servers and send 100 RPCs per server.
+ const auto& statuses_and_responses = SendRpc(kMessage_, 100 * num_backends_);
+
+ for (const auto& status_and_response : statuses_and_responses) {
+ EXPECT_TRUE(status_and_response.first.ok());
+ EXPECT_EQ(status_and_response.second.message(), kMessage_);
+ }
+
+ // Each backend should have gotten 100 requests.
+ for (size_t i = 0; i < backends_.size(); ++i) {
+ EXPECT_EQ(100, backend_servers_[i].service_->request_count());
+ }
+ // The balancer got a single request.
+ EXPECT_EQ(1, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1, balancer_servers_[0].service_->response_count());
+
+ const ClientStats client_stats = WaitForLoadReports();
+ EXPECT_EQ(100 * num_backends_, client_stats.num_calls_started);
+ EXPECT_EQ(100 * num_backends_, client_stats.num_calls_finished);
+ EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_rate_limiting);
+ EXPECT_EQ(0U, client_stats.num_calls_finished_with_drop_for_load_balancing);
+ EXPECT_EQ(0U, client_stats.num_calls_finished_with_client_failed_to_send);
+ EXPECT_EQ(100 * num_backends_,
+ client_stats.num_calls_finished_known_received);
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_init();
+ grpc_test_init(argc, argv);
+ grpc_fake_resolver_init();
+ ::testing::InitGoogleTest(&argc, argv);
+ const auto result = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return result;
+}
diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc
index 3f5a6eedf9..b28afad7d7 100644
--- a/test/cpp/end2end/mock_test.cc
+++ b/test/cpp/end2end/mock_test.cc
@@ -34,6 +34,7 @@
#include <climits>
#include <thread>
+#include <gmock/gmock.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
@@ -45,122 +46,38 @@
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
+#include <grpc++/test/mock_stream.h>
+
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "src/proto/grpc/testing/echo_mock.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include <gtest/gtest.h>
+#include <iostream>
+
+using namespace std;
+>>>>>>> 45b89fb11ca3cd524787aeba7a1270f744a1256c
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using grpc::testing::EchoTestService;
+using grpc::testing::MockClientReaderWriter;
using std::chrono::system_clock;
+using ::testing::AtLeast;
+using ::testing::SetArgPointee;
+using ::testing::SaveArg;
+using ::testing::_;
+using ::testing::Return;
+using ::testing::Invoke;
+using ::testing::WithArg;
+using ::testing::DoAll;
namespace grpc {
namespace testing {
namespace {
-template <class W, class R>
-class MockClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
- public:
- void WaitForInitialMetadata() override {}
- bool NextMessageSize(uint32_t* sz) override {
- *sz = UINT_MAX;
- return true;
- }
- bool Read(R* msg) override { return true; }
- bool Write(const W& msg) override { return true; }
- bool WritesDone() override { return true; }
- Status Finish() override { return Status::OK; }
-};
-template <>
-class MockClientReaderWriter<EchoRequest, EchoResponse> final
- : public ClientReaderWriterInterface<EchoRequest, EchoResponse> {
- public:
- MockClientReaderWriter() : writes_done_(false) {}
- void WaitForInitialMetadata() override {}
- bool NextMessageSize(uint32_t* sz) override {
- *sz = UINT_MAX;
- return true;
- }
- bool Read(EchoResponse* msg) override {
- if (writes_done_) return false;
- msg->set_message(last_message_);
- return true;
- }
-
- bool Write(const EchoRequest& msg, WriteOptions options) override {
- gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str());
- last_message_ = msg.message();
- return true;
- }
- bool WritesDone() override {
- writes_done_ = true;
- return true;
- }
- Status Finish() override { return Status::OK; }
-
- private:
- bool writes_done_;
- grpc::string last_message_;
-};
-
-// Mocked stub.
-class MockStub : public EchoTestService::StubInterface {
- public:
- MockStub() {}
- ~MockStub() {}
- Status Echo(ClientContext* context, const EchoRequest& request,
- EchoResponse* response) override {
- response->set_message(request.message());
- return Status::OK;
- }
- Status Unimplemented(ClientContext* context, const EchoRequest& request,
- EchoResponse* response) override {
- return Status::OK;
- }
-
- private:
- ClientAsyncResponseReaderInterface<EchoResponse>* AsyncEchoRaw(
- ClientContext* context, const EchoRequest& request,
- CompletionQueue* cq) override {
- return nullptr;
- }
- ClientWriterInterface<EchoRequest>* RequestStreamRaw(
- ClientContext* context, EchoResponse* response) override {
- return nullptr;
- }
- ClientAsyncWriterInterface<EchoRequest>* AsyncRequestStreamRaw(
- ClientContext* context, EchoResponse* response, CompletionQueue* cq,
- void* tag) override {
- return nullptr;
- }
- ClientReaderInterface<EchoResponse>* ResponseStreamRaw(
- ClientContext* context, const EchoRequest& request) override {
- return nullptr;
- }
- ClientAsyncReaderInterface<EchoResponse>* AsyncResponseStreamRaw(
- ClientContext* context, const EchoRequest& request, CompletionQueue* cq,
- void* tag) override {
- return nullptr;
- }
- ClientReaderWriterInterface<EchoRequest, EchoResponse>* BidiStreamRaw(
- ClientContext* context) override {
- return new MockClientReaderWriter<EchoRequest, EchoResponse>();
- }
- ClientAsyncReaderWriterInterface<EchoRequest, EchoResponse>*
- AsyncBidiStreamRaw(ClientContext* context, CompletionQueue* cq,
- void* tag) override {
- return nullptr;
- }
- ClientAsyncResponseReaderInterface<EchoResponse>* AsyncUnimplementedRaw(
- ClientContext* context, const EchoRequest& request,
- CompletionQueue* cq) override {
- return nullptr;
- }
-};
-
class FakeClient {
public:
explicit FakeClient(EchoTestService::StubInterface* stub) : stub_(stub) {}
@@ -175,6 +92,55 @@ class FakeClient {
EXPECT_TRUE(s.ok());
}
+ void DoRequestStream() {
+ EchoRequest request;
+ EchoResponse response;
+
+ ClientContext context;
+ grpc::string msg("hello");
+ grpc::string exp(msg);
+
+ std::unique_ptr<ClientWriterInterface<EchoRequest>> cstream =
+ stub_->RequestStream(&context, &response);
+
+ request.set_message(msg);
+ EXPECT_TRUE(cstream->Write(request));
+
+ msg = ", world";
+ request.set_message(msg);
+ exp.append(msg);
+ EXPECT_TRUE(cstream->Write(request));
+
+ cstream->WritesDone();
+ Status s = cstream->Finish();
+
+ EXPECT_EQ(exp, response.message());
+ EXPECT_TRUE(s.ok());
+ }
+
+ void DoResponseStream() {
+ EchoRequest request;
+ EchoResponse response;
+ request.set_message("hello world");
+
+ ClientContext context;
+ std::unique_ptr<ClientReaderInterface<EchoResponse>> cstream =
+ stub_->ResponseStream(&context, request);
+
+ grpc::string exp = "";
+ EXPECT_TRUE(cstream->Read(&response));
+ exp.append(response.message() + " ");
+
+ EXPECT_TRUE(cstream->Read(&response));
+ exp.append(response.message());
+
+ EXPECT_FALSE(cstream->Read(&response));
+ EXPECT_EQ(request.message(), exp);
+
+ Status s = cstream->Finish();
+ EXPECT_TRUE(s.ok());
+ }
+
void DoBidiStream() {
EchoRequest request;
EchoResponse response;
@@ -220,6 +186,30 @@ class TestServiceImpl : public EchoTestService::Service {
return Status::OK;
}
+ Status RequestStream(ServerContext* context,
+ ServerReader<EchoRequest>* reader,
+ EchoResponse* response) override {
+ EchoRequest request;
+ grpc::string resp("");
+ while (reader->Read(&request)) {
+ gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
+ resp.append(request.message());
+ }
+ response->set_message(resp);
+ return Status::OK;
+ }
+
+ Status ResponseStream(ServerContext* context, const EchoRequest* request,
+ ServerWriter<EchoResponse>* writer) override {
+ EchoResponse response;
+ vector<grpc::string> tokens = split(request->message());
+ for (grpc::string token : tokens) {
+ response.set_message(token);
+ writer->Write(response);
+ }
+ return Status::OK;
+ }
+
Status BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
@@ -232,6 +222,25 @@ class TestServiceImpl : public EchoTestService::Service {
}
return Status::OK;
}
+
+ private:
+ const vector<grpc::string> split(const grpc::string& input) {
+ grpc::string buff("");
+ vector<grpc::string> result;
+
+ for (auto n : input) {
+ if (n != ' ') {
+ buff += n;
+ continue;
+ }
+ if (buff == "") continue;
+ result.push_back(buff);
+ buff = "";
+ }
+ if (buff != "") result.push_back(buff);
+
+ return result;
+ }
};
class MockTest : public ::testing::Test {
@@ -268,16 +277,82 @@ TEST_F(MockTest, SimpleRpc) {
ResetStub();
FakeClient client(stub_.get());
client.DoEcho();
- MockStub stub;
+ MockEchoTestServiceStub stub;
+ EchoResponse resp;
+ resp.set_message("hello world");
+ EXPECT_CALL(stub, Echo(_, _, _))
+ .Times(AtLeast(1))
+ .WillOnce(DoAll(SetArgPointee<2>(resp), Return(Status::OK)));
client.ResetStub(&stub);
client.DoEcho();
}
+TEST_F(MockTest, ClientStream) {
+ ResetStub();
+ FakeClient client(stub_.get());
+ client.DoRequestStream();
+
+ MockEchoTestServiceStub stub;
+ auto w = new MockClientWriter<EchoRequest>();
+ EchoResponse resp;
+ resp.set_message("hello, world");
+
+ EXPECT_CALL(*w, Write(_, _)).Times(2).WillRepeatedly(Return(true));
+ EXPECT_CALL(*w, WritesDone());
+ EXPECT_CALL(*w, Finish()).WillOnce(Return(Status::OK));
+
+ EXPECT_CALL(stub, RequestStreamRaw(_, _))
+ .WillOnce(DoAll(SetArgPointee<1>(resp), Return(w)));
+ client.ResetStub(&stub);
+ client.DoRequestStream();
+}
+
+TEST_F(MockTest, ServerStream) {
+ ResetStub();
+ FakeClient client(stub_.get());
+ client.DoResponseStream();
+
+ MockEchoTestServiceStub stub;
+ auto r = new MockClientReader<EchoResponse>();
+ EchoResponse resp1;
+ resp1.set_message("hello");
+ EchoResponse resp2;
+ resp2.set_message("world");
+
+ EXPECT_CALL(*r, Read(_))
+ .WillOnce(DoAll(SetArgPointee<0>(resp1), Return(true)))
+ .WillOnce(DoAll(SetArgPointee<0>(resp2), Return(true)))
+ .WillOnce(Return(false));
+ EXPECT_CALL(*r, Finish()).WillOnce(Return(Status::OK));
+
+ EXPECT_CALL(stub, ResponseStreamRaw(_, _)).WillOnce(Return(r));
+
+ client.ResetStub(&stub);
+ client.DoResponseStream();
+}
+
+ACTION_P(copy, msg) { arg0->set_message(msg->message()); }
+
TEST_F(MockTest, BidiStream) {
ResetStub();
FakeClient client(stub_.get());
client.DoBidiStream();
- MockStub stub;
+ MockEchoTestServiceStub stub;
+ auto rw = new MockClientReaderWriter<EchoRequest, EchoResponse>();
+ EchoRequest msg;
+
+ EXPECT_CALL(*rw, Write(_, _))
+ .Times(3)
+ .WillRepeatedly(DoAll(SaveArg<0>(&msg), Return(true)));
+ EXPECT_CALL(*rw, Read(_))
+ .WillOnce(DoAll(WithArg<0>(copy(&msg)), Return(true)))
+ .WillOnce(DoAll(WithArg<0>(copy(&msg)), Return(true)))
+ .WillOnce(DoAll(WithArg<0>(copy(&msg)), Return(true)))
+ .WillOnce(Return(false));
+ EXPECT_CALL(*rw, WritesDone());
+ EXPECT_CALL(*rw, Finish()).WillOnce(Return(Status::OK));
+
+ EXPECT_CALL(stub, BidiStreamRaw(_)).WillOnce(Return(rw));
client.ResetStub(&stub);
client.DoBidiStream();
}
diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc
index 705cfc429f..5411b35f16 100644
--- a/test/cpp/end2end/test_service_impl.cc
+++ b/test/cpp/end2end/test_service_impl.cc
@@ -92,6 +92,11 @@ Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
gpr_log(GPR_ERROR, "The request should not reach application handler.");
GPR_ASSERT(0);
}
+ if (request->has_param() && request->param().has_expected_error()) {
+ const auto& error = request->param().expected_error();
+ return Status(static_cast<StatusCode>(error.code()), error.error_message(),
+ error.binary_error_details());
+ }
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel > DO_NOT_CANCEL) {
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index 82ccf436f8..0b569f1415 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -34,7 +34,7 @@
#include <grpc++/impl/codegen/config.h>
#include <gtest/gtest.h>
-#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h"
+#include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/proto/grpc/lb/v1/load_balancer.pb.h" // C++ version
@@ -106,11 +106,13 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
auto* server = serverlist->add_servers();
server->set_ip_address(Ip4ToPackedString("127.0.0.1"));
server->set_port(12345);
- server->set_drop_request(true);
+ server->set_drop_for_rate_limiting(true);
+ server->set_drop_for_load_balancing(false);
server = response.mutable_server_list()->add_servers();
server->set_ip_address(Ip4ToPackedString("10.0.0.1"));
server->set_port(54321);
- server->set_drop_request(false);
+ server->set_drop_for_rate_limiting(false);
+ server->set_drop_for_load_balancing(true);
auto* expiration_interval = serverlist->mutable_expiration_interval();
expiration_interval->set_seconds(888);
expiration_interval->set_nanos(999);
@@ -125,12 +127,14 @@ TEST_F(GrpclbTest, ParseResponseServerList) {
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[0]->ip_address),
"127.0.0.1");
EXPECT_EQ(c_serverlist->servers[0]->port, 12345);
- EXPECT_TRUE(c_serverlist->servers[0]->drop_request);
+ EXPECT_TRUE(c_serverlist->servers[0]->drop_for_rate_limiting);
+ EXPECT_FALSE(c_serverlist->servers[0]->drop_for_load_balancing);
EXPECT_TRUE(c_serverlist->servers[1]->has_ip_address);
EXPECT_EQ(PackedStringToIp(c_serverlist->servers[1]->ip_address), "10.0.0.1");
EXPECT_EQ(c_serverlist->servers[1]->port, 54321);
- EXPECT_FALSE(c_serverlist->servers[1]->drop_request);
+ EXPECT_FALSE(c_serverlist->servers[1]->drop_for_rate_limiting);
+ EXPECT_TRUE(c_serverlist->servers[1]->drop_for_load_balancing);
EXPECT_TRUE(c_serverlist->expiration_interval.has_seconds);
EXPECT_EQ(c_serverlist->expiration_interval.seconds, 888);
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index 89ed9249ad..a002c7f77d 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -51,7 +51,7 @@
#include <grpc++/impl/codegen/config.h>
extern "C" {
-#include "src/core/ext/client_channel/client_channel.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/sockaddr.h"
@@ -310,7 +310,7 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
gpr_log(GPR_INFO, "LB Server[%s](%s) after tag 204. All done. LB server out",
sf->servers_hostport, sf->balancer_name);
- grpc_call_destroy(s);
+ grpc_call_unref(s);
cq_verifier_destroy(cqv);
@@ -457,7 +457,7 @@ static void start_backend_server(server_fixture *sf) {
gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls",
sf->servers_hostport, sf->num_calls_serviced);
- grpc_call_destroy(s);
+ grpc_call_unref(s);
cq_verifier_destroy(cqv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
@@ -557,7 +557,7 @@ static void perform_request(client_fixture *cf) {
peer = grpc_call_get_peer(c);
gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
- grpc_call_destroy(c);
+ grpc_call_unref(c);
cq_verify_empty_timeout(cqv, 1 /* seconds */);
cq_verifier_destroy(cqv);
@@ -573,34 +573,51 @@ static void perform_request(client_fixture *cf) {
static void setup_client(const server_fixture *lb_server,
const server_fixture *backends, client_fixture *cf) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *lb_uri;
- // The grpclb LB policy will be automatically selected by virtue of
- // the fact that the returned addresses are balancer addresses.
- gpr_asprintf(&lb_uri, "test:///%s?lb_enabled=1&balancer_names=%s",
- lb_server->servers_hostport, lb_server->balancer_name);
-
- grpc_arg expected_target_arg;
- expected_target_arg.type = GRPC_ARG_STRING;
- expected_target_arg.key =
- const_cast<char *>(GRPC_ARG_FAKE_SECURITY_EXPECTED_TARGETS);
char *expected_target_names = NULL;
const char *backends_name = lb_server->servers_hostport;
gpr_asprintf(&expected_target_names, "%s;%s", backends_name, BALANCERS_NAME);
- expected_target_arg.value.string = const_cast<char *>(expected_target_names);
+ grpc_fake_resolver_response_generator *response_generator =
+ grpc_fake_resolver_response_generator_create();
+
+ grpc_lb_addresses *addresses = grpc_lb_addresses_create(1, NULL);
+ char *lb_uri_str;
+ gpr_asprintf(&lb_uri_str, "ipv4:%s", lb_server->servers_hostport);
+ grpc_uri *lb_uri = grpc_uri_parse(&exec_ctx, lb_uri_str, true);
+ GPR_ASSERT(lb_uri != NULL);
+ grpc_lb_addresses_set_address_from_uri(addresses, 0, lb_uri, true,
+ lb_server->balancer_name, NULL);
+ grpc_uri_destroy(lb_uri);
+ gpr_free(lb_uri_str);
+
+ gpr_asprintf(&cf->server_uri, "test:///%s", lb_server->servers_hostport);
+ const grpc_arg fake_addresses =
+ grpc_lb_addresses_create_channel_arg(addresses);
+ grpc_channel_args *fake_result =
+ grpc_channel_args_copy_and_add(NULL, &fake_addresses, 1);
+ grpc_lb_addresses_destroy(&exec_ctx, addresses);
+
+ const grpc_arg new_args[] = {
+ grpc_fake_transport_expected_targets_arg(expected_target_names),
+ grpc_fake_resolver_response_generator_arg(response_generator)};
+
grpc_channel_args *args =
- grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1);
+ grpc_channel_args_copy_and_add(NULL, new_args, GPR_ARRAY_SIZE(new_args));
gpr_free(expected_target_names);
- cf->cq = grpc_completion_queue_create(NULL);
- cf->server_uri = lb_uri;
+ cf->cq = grpc_completion_queue_create_for_next(NULL);
grpc_channel_credentials *fake_creds =
grpc_fake_transport_security_credentials_create();
cf->client =
grpc_secure_channel_create(fake_creds, cf->server_uri, args, NULL);
+ grpc_fake_resolver_response_generator_set_response(
+ &exec_ctx, response_generator, fake_result);
+ grpc_channel_args_destroy(&exec_ctx, fake_result);
grpc_channel_credentials_unref(&exec_ctx, fake_creds);
grpc_channel_args_destroy(&exec_ctx, args);
+ grpc_fake_resolver_response_generator_unref(response_generator);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void teardown_client(client_fixture *cf) {
@@ -616,7 +633,7 @@ static void teardown_client(client_fixture *cf) {
static void setup_server(const char *host, server_fixture *sf) {
int assigned_port;
- sf->cq = grpc_completion_queue_create(NULL);
+ sf->cq = grpc_completion_queue_create_for_next(NULL);
const char *colon_idx = strchr(host, ':');
if (colon_idx) {
const char *port_str = colon_idx + 1;
@@ -643,10 +660,15 @@ static void teardown_server(server_fixture *sf) {
if (!sf->server) return;
gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
- grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000));
- GPR_ASSERT(grpc_completion_queue_pluck(
- sf->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL)
+
+ grpc_completion_queue *shutdown_cq =
+ grpc_completion_queue_create_for_pluck(NULL);
+ grpc_server_shutdown_and_notify(sf->server, shutdown_cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000),
+ grpc_timeout_seconds_to_deadline(5),
+ NULL)
.type == GRPC_OP_COMPLETE);
+ grpc_completion_queue_destroy(shutdown_cq);
grpc_server_destroy(sf->server);
gpr_thd_join(sf->tid);
@@ -782,8 +804,8 @@ TEST(GrpclbTest, InvalidAddressInServerlist) {}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
- grpc_test_init(argc, argv);
grpc_fake_resolver_init();
+ grpc_test_init(argc, argv);
grpc_init();
const auto result = RUN_ALL_TESTS();
grpc_shutdown();
diff --git a/test/cpp/interop/BUILD b/test/cpp/interop/BUILD
new file mode 100644
index 0000000000..1a3e8d916f
--- /dev/null
+++ b/test/cpp/interop/BUILD
@@ -0,0 +1,90 @@
+# Copyright 2017, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+licenses(["notice"]) # 3-clause BSD
+
+cc_library(
+ name = "server_helper_lib",
+ srcs = [
+ "server_helper.cc",
+ ],
+ hdrs = [
+ "server_helper.h",
+ ],
+ deps = [
+ "//test/cpp/util:test_util",
+ "//external:gflags",
+ ],
+)
+
+cc_binary(
+ name = "interop_server",
+ srcs = [
+ "interop_server.cc",
+ "interop_server_bootstrap.cc",
+ ],
+ deps = [
+ ":server_helper_lib",
+ "//:grpc++",
+ "//src/proto/grpc/testing:empty_proto",
+ "//src/proto/grpc/testing:messages_proto",
+ "//src/proto/grpc/testing:test_proto",
+ "//test/cpp/util:test_config",
+ ],
+)
+
+cc_library(
+ name = "client_helper_lib",
+ srcs = [
+ "client_helper.cc",
+ "interop_client.cc",
+ ],
+ hdrs = [
+ "client_helper.h",
+ "interop_client.h",
+ ],
+ deps = [
+ "//test/cpp/util:test_util",
+ "//src/proto/grpc/testing:empty_proto",
+ "//src/proto/grpc/testing:messages_proto",
+ "//src/proto/grpc/testing:test_proto",
+ "//test/core/security:oauth2_utils",
+ "//test/cpp/util:test_config",
+ ],
+)
+
+cc_binary(
+ name = "interop_client",
+ srcs = [
+ "client.cc",
+ ],
+ deps = [
+ ":client_helper_lib",
+ ],
+)
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 5688ab7971..6f1d910304 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -99,6 +99,7 @@ DEFINE_bool(do_not_abort_on_transient_failures, false,
using grpc::testing::CreateChannelForTestCase;
using grpc::testing::GetServiceAccountJsonKey;
+using grpc::testing::UpdateActions;
int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
@@ -162,8 +163,10 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoUnimplementedMethod, &client);
actions["unimplemented_service"] =
std::bind(&grpc::testing::InteropClient::DoUnimplementedService, &client);
- // actions["cacheable_unary"] =
- // std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
+ actions["cacheable_unary"] =
+ std::bind(&grpc::testing::InteropClient::DoCacheableUnary, &client);
+
+ UpdateActions(&actions);
if (FLAGS_test_case == "all") {
for (const auto& action : actions) {
diff --git a/test/cpp/interop/client_helper.cc b/test/cpp/interop/client_helper.cc
index d3192ad0c9..784cd2826d 100644
--- a/test/cpp/interop/client_helper.cc
+++ b/test/cpp/interop/client_helper.cc
@@ -89,6 +89,9 @@ grpc::string GetOauth2AccessToken() {
return access_token;
}
+void UpdateActions(
+ std::unordered_map<grpc::string, std::function<bool()>>* actions) {}
+
std::shared_ptr<Channel> CreateChannelForTestCase(
const grpc::string& test_case) {
GPR_ASSERT(FLAGS_server_port);
diff --git a/test/cpp/interop/client_helper.h b/test/cpp/interop/client_helper.h
index 622b96e4fb..387530a21c 100644
--- a/test/cpp/interop/client_helper.h
+++ b/test/cpp/interop/client_helper.h
@@ -35,6 +35,7 @@
#define GRPC_TEST_CPP_INTEROP_CLIENT_HELPER_H
#include <memory>
+#include <unordered_map>
#include <grpc++/channel.h>
@@ -47,6 +48,9 @@ grpc::string GetServiceAccountJsonKey();
grpc::string GetOauth2AccessToken();
+void UpdateActions(
+ std::unordered_map<grpc::string, std::function<bool()>>* actions);
+
std::shared_ptr<Channel> CreateChannelForTestCase(
const grpc::string& test_case);
diff --git a/test/cpp/interop/http2_client.cc b/test/cpp/interop/http2_client.cc
index 38a437f39f..2109e34616 100644
--- a/test/cpp/interop/http2_client.cc
+++ b/test/cpp/interop/http2_client.cc
@@ -108,7 +108,7 @@ bool Http2Client::DoRstAfterData() {
SimpleResponse response;
AssertStatusCode(SendUnaryCall(&response), grpc::StatusCode::INTERNAL);
- GPR_ASSERT(response.has_payload()); // data should be received
+ // There is no guarantee that data would be received.
gpr_log(GPR_DEBUG, "Done testing reset stream after data");
return true;
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index 55ba324cc7..0e79c5e4b4 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -918,6 +918,26 @@ bool InteropClient::DoCacheableUnary() {
// second response is a cached copy of the first response
GPR_ASSERT(response2.payload().body() == response1.payload().body());
+ // Request 3
+ // Modify the request body so it will not get a cache hit
+ ts = gpr_now(GPR_CLOCK_PRECISE);
+ timestamp = std::to_string((long long unsigned)ts.tv_nsec);
+ SimpleRequest request1;
+ request1.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
+ ClientContext context3;
+ SimpleResponse response3;
+ context3.set_cacheable(true);
+ context3.AddMetadata("x-user-ip", "1.2.3.4");
+ Status s3 =
+ serviceStub_.Get()->CacheableUnaryCall(&context3, request1, &response3);
+ if (!AssertStatusOk(s3)) {
+ return false;
+ }
+ gpr_log(GPR_DEBUG, "response 3 payload: %s",
+ response3.payload().body().c_str());
+
+ // Check that the response is different from the previous response.
+ GPR_ASSERT(response3.payload().body() != response1.payload().body());
return true;
}
diff --git a/test/cpp/microbenchmarks/BUILD b/test/cpp/microbenchmarks/BUILD
index 2bf14ce0a5..c989b6141f 100644
--- a/test/cpp/microbenchmarks/BUILD
+++ b/test/cpp/microbenchmarks/BUILD
@@ -70,6 +70,12 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "bm_cq_multiple_threads",
+ srcs = ["bm_cq_multiple_threads.cc"],
+ deps = [":helpers"],
+)
+
+grpc_cc_test(
name = "bm_error",
srcs = ["bm_error.cc"],
deps = [":helpers"],
@@ -90,7 +96,10 @@ grpc_cc_test(
grpc_cc_test(
name = "bm_fullstack_trickle",
srcs = ["bm_fullstack_trickle.cc"],
- deps = [":helpers"],
+ deps = [":helpers"]
+ external_deps = [
+ "gflags",
+ ]
)
grpc_cc_test(
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index 4af2263e82..67e7c02535 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -45,15 +45,16 @@
#include <grpc/support/string_util.h>
extern "C" {
-#include "src/core/ext/client_channel/client_channel.h"
-#include "src/core/ext/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/client_channel/client_channel.h"
+#include "src/core/ext/filters/deadline/deadline_filter.h"
+#include "src/core/ext/filters/http/client/http_client_filter.h"
+#include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
+#include "src/core/ext/filters/http/server/http_server_filter.h"
+#include "src/core/ext/filters/load_reporting/load_reporting_filter.h"
+#include "src/core/ext/filters/message_size/message_size_filter.h"
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/deadline_filter.h"
-#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/channel/message_size_filter.h"
+#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/transport/transport_impl.h"
}
@@ -67,10 +68,12 @@ auto &force_library_initialization = Library::get();
void BM_Zalloc(benchmark::State &state) {
// speed of light for call creation is zalloc, so benchmark a few interesting
// sizes
+ TrackCounters track_counters;
size_t sz = state.range(0);
while (state.KeepRunning()) {
gpr_free(gpr_zalloc(sz));
}
+ track_counters.Finish(state);
}
BENCHMARK(BM_Zalloc)
->Arg(64)
@@ -118,12 +121,12 @@ template <class Fixture>
static void BM_CallCreateDestroy(benchmark::State &state) {
TrackCounters track_counters;
Fixture fixture;
- grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
+ grpc_completion_queue *cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
void *method_hdl =
grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL);
while (state.KeepRunning()) {
- grpc_call_destroy(grpc_channel_create_registered_call(
+ grpc_call_unref(grpc_channel_create_registered_call(
fixture.channel(), NULL, GRPC_PROPAGATE_DEFAULTS, cq, method_hdl,
deadline, NULL));
}
@@ -152,6 +155,7 @@ static void BM_LameChannelCallCreateCpp(benchmark::State &state) {
grpc::testing::EchoResponse recv_response;
grpc::Status recv_status;
while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
grpc::ClientContext cli_ctx;
auto reader = stub->AsyncEcho(&cli_ctx, send_request, &cq);
reader->Finish(&recv_response, &recv_status, tag(0));
@@ -164,6 +168,165 @@ static void BM_LameChannelCallCreateCpp(benchmark::State &state) {
}
BENCHMARK(BM_LameChannelCallCreateCpp);
+static void do_nothing(void *ignored) {}
+
+static void BM_LameChannelCallCreateCore(benchmark::State &state) {
+ TrackCounters track_counters;
+
+ grpc_channel *channel;
+ grpc_completion_queue *cq;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_byte_buffer *response_payload_recv = NULL;
+ grpc_status_code status;
+ grpc_slice details;
+ grpc::testing::EchoRequest send_request;
+ grpc_slice send_request_slice =
+ grpc_slice_new(&send_request, sizeof(send_request), do_nothing);
+
+ channel = grpc_lame_client_channel_create(
+ "localhost:1234", GRPC_STATUS_UNAUTHENTICATED, "blah");
+ cq = grpc_completion_queue_create_for_next(NULL);
+ void *rc = grpc_channel_register_call(
+ channel, "/grpc.testing.EchoTestService/Echo", NULL, NULL);
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ grpc_call *call = grpc_channel_create_registered_call(
+ channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, rc,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_byte_buffer *request_payload_send =
+ grpc_raw_byte_buffer_create(&send_request_slice, 1);
+
+ // Fill in call ops
+ grpc_op ops[6];
+ memset(ops, 0, sizeof(ops));
+ grpc_op *op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message = request_payload_send;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata =
+ &initial_metadata_recv;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &response_payload_recv;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op++;
+
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops,
+ (size_t)(op - ops),
+ (void *)1, NULL));
+ grpc_event ev = grpc_completion_queue_next(
+ cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ GPR_ASSERT(ev.type != GRPC_QUEUE_SHUTDOWN);
+ GPR_ASSERT(ev.success != 0);
+ grpc_call_unref(call);
+ grpc_byte_buffer_destroy(request_payload_send);
+ grpc_byte_buffer_destroy(response_payload_recv);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ }
+ grpc_channel_destroy(channel);
+ grpc_completion_queue_destroy(cq);
+ grpc_slice_unref(send_request_slice);
+ track_counters.Finish(state);
+}
+BENCHMARK(BM_LameChannelCallCreateCore);
+
+static void BM_LameChannelCallCreateCoreSeparateBatch(benchmark::State &state) {
+ TrackCounters track_counters;
+
+ grpc_channel *channel;
+ grpc_completion_queue *cq;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_byte_buffer *response_payload_recv = NULL;
+ grpc_status_code status;
+ grpc_slice details;
+ grpc::testing::EchoRequest send_request;
+ grpc_slice send_request_slice =
+ grpc_slice_new(&send_request, sizeof(send_request), do_nothing);
+
+ channel = grpc_lame_client_channel_create(
+ "localhost:1234", GRPC_STATUS_UNAUTHENTICATED, "blah");
+ cq = grpc_completion_queue_create_for_next(NULL);
+ void *rc = grpc_channel_register_call(
+ channel, "/grpc.testing.EchoTestService/Echo", NULL, NULL);
+ while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ grpc_call *call = grpc_channel_create_registered_call(
+ channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq, rc,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_byte_buffer *request_payload_send =
+ grpc_raw_byte_buffer_create(&send_request_slice, 1);
+
+ // Fill in call ops
+ grpc_op ops[3];
+ memset(ops, 0, sizeof(ops));
+ grpc_op *op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message.send_message = request_payload_send;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op++;
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops,
+ (size_t)(op - ops),
+ (void *)0, NULL));
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata =
+ &initial_metadata_recv;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message.recv_message = &response_payload_recv;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op++;
+
+ GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops,
+ (size_t)(op - ops),
+ (void *)1, NULL));
+ grpc_event ev = grpc_completion_queue_next(
+ cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ GPR_ASSERT(ev.type != GRPC_QUEUE_SHUTDOWN);
+ GPR_ASSERT(ev.success == 0);
+ ev = grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL);
+ GPR_ASSERT(ev.type != GRPC_QUEUE_SHUTDOWN);
+ GPR_ASSERT(ev.success != 0);
+ grpc_call_unref(call);
+ grpc_byte_buffer_destroy(request_payload_send);
+ grpc_byte_buffer_destroy(response_payload_recv);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ }
+ grpc_channel_destroy(channel);
+ grpc_completion_queue_destroy(cq);
+ grpc_slice_unref(send_request_slice);
+ track_counters.Finish(state);
+}
+BENCHMARK(BM_LameChannelCallCreateCoreSeparateBatch);
+
static void FilterDestroy(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
gpr_free(arg);
@@ -221,7 +384,7 @@ namespace dummy_filter {
static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {}
+ grpc_transport_stream_op_batch *op) {}
static void StartTransportOp(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -296,7 +459,7 @@ void SetPollsetSet(grpc_exec_ctx *exec_ctx, grpc_transport *self,
/* implementation of grpc_transport_perform_stream_op */
void PerformStreamOp(grpc_exec_ctx *exec_ctx, grpc_transport *self,
- grpc_stream *stream, grpc_transport_stream_op *op) {
+ grpc_stream *stream, grpc_transport_stream_op_batch *op) {
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
}
@@ -346,13 +509,15 @@ class SendEmptyMetadata {
memset(&op_, 0, sizeof(op_));
op_.on_complete = grpc_closure_init(&closure_, DoNothing, nullptr,
grpc_schedule_on_exec_ctx);
+ op_.send_initial_metadata = true;
+ op_.payload = &op_payload_;
}
class Op {
public:
Op(grpc_exec_ctx *exec_ctx, SendEmptyMetadata *p, grpc_call_stack *s) {
grpc_metadata_batch_init(&batch_);
- p->op_.send_initial_metadata = &batch_;
+ p->op_payload_.send_initial_metadata.send_initial_metadata = &batch_;
}
void Finish(grpc_exec_ctx *exec_ctx) {
grpc_metadata_batch_destroy(exec_ctx, &batch_);
@@ -366,7 +531,8 @@ class SendEmptyMetadata {
const gpr_timespec deadline_ = gpr_inf_future(GPR_CLOCK_MONOTONIC);
const gpr_timespec start_time_ = gpr_now(GPR_CLOCK_MONOTONIC);
const grpc_slice method_ = grpc_slice_from_static_string("/foo/bar");
- grpc_transport_stream_op op_;
+ grpc_transport_stream_op_batch op_;
+ grpc_transport_stream_op_batch_payload op_payload_;
grpc_closure closure_;
};
@@ -397,7 +563,8 @@ static void BM_IsolatedFilter(benchmark::State &state) {
}
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- size_t channel_size = grpc_channel_stack_size(&filters[0], filters.size());
+ size_t channel_size = grpc_channel_stack_size(
+ filters.size() == 0 ? NULL : &filters[0], filters.size());
grpc_channel_stack *channel_stack =
static_cast<grpc_channel_stack *>(gpr_zalloc(channel_size));
GPR_ASSERT(GRPC_LOG_IF_ERROR(
@@ -426,6 +593,7 @@ static void BM_IsolatedFilter(benchmark::State &state) {
const int kArenaSize = 4096;
call_args.arena = gpr_arena_create(kArenaSize);
while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
GRPC_ERROR_UNREF(grpc_call_stack_init(&exec_ctx, channel_stack, 1,
DoNothing, NULL, &call_args));
typename TestOp::Op op(&exec_ctx, &test_op_data, call_stack);
@@ -455,7 +623,7 @@ BENCHMARK_TEMPLATE(BM_IsolatedFilter, DummyFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, DummyFilter, SendEmptyMetadata);
typedef Fixture<&grpc_client_channel_filter, 0> ClientChannelFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, ClientChannelFilter, NoOp);
-typedef Fixture<&grpc_compress_filter, CHECKS_NOT_LAST> CompressFilter;
+typedef Fixture<&grpc_message_compress_filter, CHECKS_NOT_LAST> CompressFilter;
BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, NoOp);
BENCHMARK_TEMPLATE(BM_IsolatedFilter, CompressFilter, SendEmptyMetadata);
typedef Fixture<&grpc_client_deadline_filter, CHECKS_NOT_LAST>
@@ -488,13 +656,16 @@ namespace isolated_call_filter {
static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op_batch *op) {
if (op->recv_initial_metadata) {
- grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(
+ exec_ctx,
+ op->payload->recv_initial_metadata.recv_initial_metadata_ready,
+ GRPC_ERROR_NONE);
}
if (op->recv_message) {
- grpc_closure_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_NONE);
}
grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_NONE);
}
@@ -551,7 +722,7 @@ static const grpc_channel_filter isolated_call_filter = {
GetPeer,
GetChannelInfo,
"isolated_call_filter"};
-}
+} // namespace isolated_call_filter
class IsolatedCallFixture : public TrackCounters {
public:
@@ -567,7 +738,7 @@ class IsolatedCallFixture : public TrackCounters {
GRPC_CLIENT_CHANNEL);
grpc_exec_ctx_finish(&exec_ctx);
}
- cq_ = grpc_completion_queue_create(NULL);
+ cq_ = grpc_completion_queue_create_for_next(NULL);
}
void Finish(benchmark::State &state) {
@@ -590,7 +761,8 @@ static void BM_IsolatedCall_NoOp(benchmark::State &state) {
void *method_hdl =
grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL);
while (state.KeepRunning()) {
- grpc_call_destroy(grpc_channel_create_registered_call(
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ grpc_call_unref(grpc_channel_create_registered_call(
fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(),
method_hdl, deadline, NULL));
}
@@ -628,13 +800,14 @@ static void BM_IsolatedCall_Unary(benchmark::State &state) {
ops[5].data.recv_status_on_client.status_details = &status_details;
ops[5].data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata;
while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
grpc_call *call = grpc_channel_create_registered_call(
fixture.channel(), nullptr, GRPC_PROPAGATE_DEFAULTS, fixture.cq(),
method_hdl, deadline, NULL);
grpc_call_start_batch(call, ops, 6, tag(1), NULL);
grpc_completion_queue_next(fixture.cq(),
gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
- grpc_call_destroy(call);
+ grpc_call_unref(call);
}
fixture.Finish(state);
grpc_metadata_array_destroy(&recv_initial_metadata);
@@ -670,11 +843,12 @@ static void BM_IsolatedCall_StreamingSend(benchmark::State &state) {
ops[0].op = GRPC_OP_SEND_MESSAGE;
ops[0].data.send_message.send_message = send_message;
while (state.KeepRunning()) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
grpc_call_start_batch(call, ops, 1, tag(2), NULL);
grpc_completion_queue_next(fixture.cq(),
gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
}
- grpc_call_destroy(call);
+ grpc_call_unref(call);
fixture.Finish(state);
grpc_metadata_array_destroy(&recv_initial_metadata);
grpc_metadata_array_destroy(&recv_trailing_metadata);
diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
index 55d2d2f58d..581440682a 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
@@ -90,10 +90,15 @@ static void BM_HpackEncoderEncodeHeader(benchmark::State &state) {
grpc_slice_buffer outbuf;
grpc_slice_buffer_init(&outbuf);
while (state.KeepRunning()) {
- uint32_t stream_id = static_cast<uint32_t>(state.iterations());
- grpc_chttp2_encode_header(&exec_ctx, &c, stream_id, &b, state.range(0),
- state.range(1), &stats, &outbuf);
- if (!logged_representative_output) {
+ grpc_encode_header_options hopt = {
+ static_cast<uint32_t>(state.iterations()),
+ state.range(0) != 0,
+ Fixture::kEnableTrueBinary,
+ (size_t)state.range(1),
+ &stats,
+ };
+ grpc_chttp2_encode_header(&exec_ctx, &c, &b, &hopt, &outbuf);
+ if (!logged_representative_output && state.iterations() > 3) {
logged_representative_output = true;
for (size_t i = 0; i < outbuf.count; i++) {
char *s = grpc_dump_slice(outbuf.slices[i], GPR_DUMP_HEX);
@@ -122,6 +127,7 @@ namespace hpack_encoder_fixtures {
class EmptyBatch {
public:
+ static constexpr bool kEnableTrueBinary = false;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {};
}
@@ -129,6 +135,7 @@ class EmptyBatch {
class SingleStaticElem {
public:
+ static constexpr bool kEnableTrueBinary = false;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {GRPC_MDELEM_GRPC_ACCEPT_ENCODING_IDENTITY_COMMA_DEFLATE};
}
@@ -136,6 +143,7 @@ class SingleStaticElem {
class SingleInternedElem {
public:
+ static constexpr bool kEnableTrueBinary = false;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {grpc_mdelem_from_slices(
exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc")),
@@ -143,9 +151,10 @@ class SingleInternedElem {
}
};
-template <int kLength>
+template <int kLength, bool kTrueBinary>
class SingleInternedBinaryElem {
public:
+ static constexpr bool kEnableTrueBinary = kTrueBinary;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
grpc_slice bytes = MakeBytes();
std::vector<grpc_mdelem> out = {grpc_mdelem_from_slices(
@@ -167,6 +176,7 @@ class SingleInternedBinaryElem {
class SingleInternedKeyElem {
public:
+ static constexpr bool kEnableTrueBinary = false;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {grpc_mdelem_from_slices(
exec_ctx, grpc_slice_intern(grpc_slice_from_static_string("abc")),
@@ -176,6 +186,7 @@ class SingleInternedKeyElem {
class SingleNonInternedElem {
public:
+ static constexpr bool kEnableTrueBinary = false;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {grpc_mdelem_from_slices(exec_ctx,
grpc_slice_from_static_string("abc"),
@@ -183,9 +194,10 @@ class SingleNonInternedElem {
}
};
-template <int kLength>
+template <int kLength, bool kTrueBinary>
class SingleNonInternedBinaryElem {
public:
+ static constexpr bool kEnableTrueBinary = kTrueBinary;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {grpc_mdelem_from_slices(
exec_ctx, grpc_slice_from_static_string("abc-bin"), MakeBytes())};
@@ -203,6 +215,7 @@ class SingleNonInternedBinaryElem {
class RepresentativeClientInitialMetadata {
public:
+ static constexpr bool kEnableTrueBinary = true;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {
GRPC_MDELEM_SCHEME_HTTP, GRPC_MDELEM_METHOD_POST,
@@ -224,6 +237,7 @@ class RepresentativeClientInitialMetadata {
class RepresentativeServerInitialMetadata {
public:
+ static constexpr bool kEnableTrueBinary = true;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {GRPC_MDELEM_STATUS_200,
GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC,
@@ -233,6 +247,7 @@ class RepresentativeServerInitialMetadata {
class RepresentativeServerTrailingMetadata {
public:
+ static constexpr bool kEnableTrueBinary = true;
static std::vector<grpc_mdelem> GetElems(grpc_exec_ctx *exec_ctx) {
return {GRPC_MDELEM_GRPC_STATUS_0};
}
@@ -247,28 +262,67 @@ BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedKeyElem)
->Args({0, 16384});
BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedElem)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedBinaryElem<1>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<1, false>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<3, false>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<10, false>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<31, false>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<100, false>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<1, true>)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedBinaryElem<3>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<3, true>)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedBinaryElem<10>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<10, true>)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedBinaryElem<31>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<31, true>)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleInternedBinaryElem<100>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleInternedBinaryElem<100, true>)
->Args({0, 16384});
BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedElem)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedBinaryElem<1>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<1, false>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<3, false>)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedBinaryElem<3>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<10, false>)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedBinaryElem<10>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<31, false>)
->Args({0, 16384});
-BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedBinaryElem<31>)
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<100, false>)
->Args({0, 16384});
BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
- SingleNonInternedBinaryElem<100>)
+ SingleNonInternedBinaryElem<1, true>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<3, true>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<10, true>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<31, true>)
+ ->Args({0, 16384});
+BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader,
+ SingleNonInternedBinaryElem<100, true>)
->Args({0, 16384});
// test with a tiny frame size, to highlight continuation costs
BENCHMARK_TEMPLATE(BM_HpackEncoderEncodeHeader, SingleNonInternedElem)
@@ -421,7 +475,27 @@ class NonIndexedElem {
}
};
-class NonIndexedBinaryElem1 {
+template <int kLength, bool kTrueBinary>
+class NonIndexedBinaryElem;
+
+template <int kLength>
+class NonIndexedBinaryElem<kLength, true> {
+ public:
+ static std::vector<grpc_slice> GetInitSlices() { return {}; }
+ static std::vector<grpc_slice> GetBenchmarkSlices() {
+ std::vector<uint8_t> v = {
+ 0x00, 0x07, 'a', 'b', 'c',
+ '-', 'b', 'i', 'n', static_cast<uint8_t>(kLength + 1),
+ 0};
+ for (int i = 0; i < kLength; i++) {
+ v.push_back(static_cast<uint8_t>(i));
+ }
+ return {MakeSlice(v)};
+ }
+};
+
+template <>
+class NonIndexedBinaryElem<1, false> {
public:
static std::vector<grpc_slice> GetInitSlices() { return {}; }
static std::vector<grpc_slice> GetBenchmarkSlices() {
@@ -430,7 +504,8 @@ class NonIndexedBinaryElem1 {
}
};
-class NonIndexedBinaryElem3 {
+template <>
+class NonIndexedBinaryElem<3, false> {
public:
static std::vector<grpc_slice> GetInitSlices() { return {}; }
static std::vector<grpc_slice> GetBenchmarkSlices() {
@@ -439,7 +514,8 @@ class NonIndexedBinaryElem3 {
}
};
-class NonIndexedBinaryElem10 {
+template <>
+class NonIndexedBinaryElem<10, false> {
public:
static std::vector<grpc_slice> GetInitSlices() { return {}; }
static std::vector<grpc_slice> GetBenchmarkSlices() {
@@ -449,7 +525,8 @@ class NonIndexedBinaryElem10 {
}
};
-class NonIndexedBinaryElem31 {
+template <>
+class NonIndexedBinaryElem<31, false> {
public:
static std::vector<grpc_slice> GetInitSlices() { return {}; }
static std::vector<grpc_slice> GetBenchmarkSlices() {
@@ -461,7 +538,8 @@ class NonIndexedBinaryElem31 {
}
};
-class NonIndexedBinaryElem100 {
+template <>
+class NonIndexedBinaryElem<100, false> {
public:
static std::vector<grpc_slice> GetInitSlices() { return {}; }
static std::vector<grpc_slice> GetBenchmarkSlices() {
@@ -570,11 +648,16 @@ BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem1);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem3);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem10);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem31);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem100);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
RepresentativeClientInitialMetadata);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
@@ -584,23 +667,4 @@ BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
} // namespace hpack_parser_fixtures
-static void BM_Base16SomeStuff(benchmark::State &state) {
- uint8_t *bytes = new uint8_t[state.range(0)];
- for (int i = 0; i < state.range(0); i++) {
- bytes[i] = static_cast<uint8_t>(rand());
- }
- uint8_t *encoded = new uint8_t[state.range(0) * 2];
- static const uint8_t hex[] = "0123456789abcdef";
- while (state.KeepRunning()) {
- for (int i = 0; i < state.range(0); i++) {
- encoded[2 * i + 0] = hex[encoded[i] >> 8];
- encoded[2 * i + 1] = hex[encoded[i] & 0xf];
- }
- }
- delete[] encoded;
- delete[] bytes;
- state.SetBytesProcessed(state.iterations() * state.range(0));
-}
-BENCHMARK(BM_Base16SomeStuff)->Range(1, 4096);
-
BENCHMARK_MAIN();
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 254d57de20..8c5413b5fd 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -207,7 +207,7 @@ class Stream {
static_cast<grpc_stream *>(stream_), closure);
}
- void Op(grpc_transport_stream_op *op) {
+ void Op(grpc_transport_stream_op_batch *op) {
grpc_transport_perform_stream_op(f_->exec_ctx(), f_->transport(),
static_cast<grpc_stream *>(stream_), op);
}
@@ -305,10 +305,16 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) {
TrackCounters track_counters;
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
+ grpc_transport_stream_op_batch_payload op_payload;
std::unique_ptr<Closure> start;
std::unique_ptr<Closure> done;
+ auto reset_op = [&]() {
+ memset(&op, 0, sizeof(op));
+ op.payload = &op_payload;
+ };
+
grpc_metadata_batch b;
grpc_metadata_batch_init(&b);
b.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
@@ -324,14 +330,16 @@ static void BM_StreamCreateSendInitialMetadataDestroy(benchmark::State &state) {
start = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return;
s.Init(state);
- memset(&op, 0, sizeof(op));
+ reset_op();
op.on_complete = done.get();
- op.send_initial_metadata = &b;
+ op.send_initial_metadata = true;
+ op.payload->send_initial_metadata.send_initial_metadata = &b;
s.Op(&op);
});
done = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
- memset(&op, 0, sizeof(op));
- op.cancel_error = GRPC_ERROR_CANCELLED;
+ reset_op();
+ op.cancel_stream = true;
+ op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
s.Op(&op);
s.DestroyThen(start.get());
});
@@ -348,11 +356,16 @@ static void BM_TransportEmptyOp(benchmark::State &state) {
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
s.Init(state);
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
+ grpc_transport_stream_op_batch_payload op_payload;
+ auto reset_op = [&]() {
+ memset(&op, 0, sizeof(op));
+ op.payload = &op_payload;
+ };
std::unique_ptr<Closure> c =
MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
if (!state.KeepRunning()) return;
- memset(&op, 0, sizeof(op));
+ reset_op();
op.on_complete = c.get();
s.Op(&op);
});
@@ -370,7 +383,12 @@ static void BM_TransportStreamSend(benchmark::State &state) {
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
s.Init(state);
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch op;
+ grpc_transport_stream_op_batch_payload op_payload;
+ auto reset_op = [&]() {
+ memset(&op, 0, sizeof(op));
+ op.payload = &op_payload;
+ };
grpc_slice_buffer_stream send_stream;
grpc_slice_buffer send_buffer;
grpc_slice_buffer_init(&send_buffer);
@@ -397,20 +415,23 @@ static void BM_TransportStreamSend(benchmark::State &state) {
s.chttp2_stream()->outgoing_window_delta = 1024 * 1024 * 1024;
f.chttp2_transport()->outgoing_window = 1024 * 1024 * 1024;
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
- memset(&op, 0, sizeof(op));
+ reset_op();
op.on_complete = c.get();
- op.send_message = &send_stream.base;
+ op.send_message = true;
+ op.payload->send_message.send_message = &send_stream.base;
s.Op(&op);
});
- memset(&op, 0, sizeof(op));
- op.send_initial_metadata = &b;
+ reset_op();
+ op.send_initial_metadata = true;
+ op.payload->send_initial_metadata.send_initial_metadata = &b;
op.on_complete = c.get();
s.Op(&op);
f.FlushExecCtx();
- memset(&op, 0, sizeof(op));
- op.cancel_error = GRPC_ERROR_CANCELLED;
+ reset_op();
+ op.cancel_stream = true;
+ op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
s.Op(&op);
s.DestroyThen(
MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
@@ -483,10 +504,16 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
Fixture f(grpc::ChannelArguments(), true);
Stream s(&f);
s.Init(state);
- grpc_transport_stream_op op;
+ grpc_transport_stream_op_batch_payload op_payload;
+ grpc_transport_stream_op_batch op;
grpc_byte_stream *recv_stream;
grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384);
+ auto reset_op = [&]() {
+ memset(&op, 0, sizeof(op));
+ op.payload = &op_payload;
+ };
+
grpc_metadata_batch b;
grpc_metadata_batch_init(&b);
grpc_metadata_batch b_recv;
@@ -518,10 +545,11 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
s.chttp2_stream()->incoming_window_delta = 1024 * 1024 * 1024;
f.chttp2_transport()->incoming_window = 1024 * 1024 * 1024;
received = 0;
- memset(&op, 0, sizeof(op));
+ reset_op();
op.on_complete = do_nothing.get();
- op.recv_message = &recv_stream;
- op.recv_message_ready = drain_start.get();
+ op.recv_message = true;
+ op.payload->recv_message.recv_message = &recv_stream;
+ op.payload->recv_message.recv_message_ready = drain_start.get();
s.Op(&op);
f.PushInput(grpc_slice_ref(incoming_data));
});
@@ -541,20 +569,29 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE);
return;
}
- } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice,
+ } while (grpc_byte_stream_next(exec_ctx, recv_stream,
recv_stream->length - received,
- drain_continue.get()));
+ drain_continue.get()) &&
+ GRPC_ERROR_NONE ==
+ grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) &&
+ (received += GRPC_SLICE_LENGTH(recv_slice),
+ grpc_slice_unref_internal(exec_ctx, recv_slice), true));
});
drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
+ grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(exec_ctx, recv_slice);
grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);
});
- memset(&op, 0, sizeof(op));
- op.send_initial_metadata = &b;
- op.recv_initial_metadata = &b_recv;
+ reset_op();
+ op.send_initial_metadata = true;
+ op.payload->send_initial_metadata.send_initial_metadata = &b;
+ op.recv_initial_metadata = true;
+ op.payload->recv_initial_metadata.recv_initial_metadata = &b_recv;
+ op.payload->recv_initial_metadata.recv_initial_metadata_ready =
+ do_nothing.get();
op.on_complete = c.get();
s.Op(&op);
f.PushInput(SLICE_FROM_BUFFER(
@@ -571,8 +608,9 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
"\x10\x14grpc-accept-encoding\x15identity,deflate,gzip"));
f.FlushExecCtx();
- memset(&op, 0, sizeof(op));
- op.cancel_error = GRPC_ERROR_CANCELLED;
+ reset_op();
+ op.cancel_stream = true;
+ op.payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
s.Op(&op);
s.DestroyThen(
MakeOnceClosure([](grpc_exec_ctx *exec_ctx, grpc_error *error) {}));
diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc
index 38ac9d2705..8b26bf977c 100644
--- a/test/cpp/microbenchmarks/bm_cq.cc
+++ b/test/cpp/microbenchmarks/bm_cq.cc
@@ -62,7 +62,8 @@ BENCHMARK(BM_CreateDestroyCpp);
static void BM_CreateDestroyCpp2(benchmark::State& state) {
TrackCounters track_counters;
while (state.KeepRunning()) {
- grpc_completion_queue* core_cq = grpc_completion_queue_create(NULL);
+ grpc_completion_queue* core_cq =
+ grpc_completion_queue_create_for_next(NULL);
CompletionQueue cq(core_cq);
}
track_counters.Finish(state);
@@ -72,7 +73,9 @@ BENCHMARK(BM_CreateDestroyCpp2);
static void BM_CreateDestroyCore(benchmark::State& state) {
TrackCounters track_counters;
while (state.KeepRunning()) {
- grpc_completion_queue_destroy(grpc_completion_queue_create(NULL));
+ // TODO: sreek Templatize this benchmark and pass completion type and
+ // polling type as parameters
+ grpc_completion_queue_destroy(grpc_completion_queue_create_for_next(NULL));
}
track_counters.Finish(state);
}
@@ -108,7 +111,8 @@ BENCHMARK(BM_Pass1Cpp);
static void BM_Pass1Core(benchmark::State& state) {
TrackCounters track_counters;
- grpc_completion_queue* cq = grpc_completion_queue_create(NULL);
+ // TODO: sreek Templatize this benchmark and pass polling_type as a param
+ grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_cq_completion completion;
@@ -126,7 +130,8 @@ BENCHMARK(BM_Pass1Core);
static void BM_Pluck1Core(benchmark::State& state) {
TrackCounters track_counters;
- grpc_completion_queue* cq = grpc_completion_queue_create(NULL);
+ // TODO: sreek Templatize this benchmark and pass polling_type as a param
+ grpc_completion_queue* cq = grpc_completion_queue_create_for_pluck(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_cq_completion completion;
@@ -144,7 +149,8 @@ BENCHMARK(BM_Pluck1Core);
static void BM_EmptyCore(benchmark::State& state) {
TrackCounters track_counters;
- grpc_completion_queue* cq = grpc_completion_queue_create(NULL);
+ // TODO: sreek Templatize this benchmark and pass polling_type as a param
+ grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_completion_queue_next(cq, deadline, NULL);
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
new file mode 100644
index 0000000000..0d267da723
--- /dev/null
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -0,0 +1,162 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <benchmark/benchmark.h>
+#include <string.h>
+#include <atomic>
+
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include "test/cpp/microbenchmarks/helpers.h"
+
+extern "C" {
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/port.h"
+#include "src/core/lib/surface/completion_queue.h"
+}
+
+struct grpc_pollset {
+ gpr_mu mu;
+};
+
+namespace grpc {
+namespace testing {
+
+static void* g_tag = (void*)(intptr_t)10; // Some random number
+static grpc_completion_queue* g_cq;
+static grpc_event_engine_vtable g_vtable;
+
+static void pollset_shutdown(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
+ grpc_closure* closure) {
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+}
+
+static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
+ gpr_mu_init(&ps->mu);
+ *mu = &ps->mu;
+}
+
+static void pollset_destroy(grpc_exec_ctx* exec_ctx, grpc_pollset* ps) {
+ gpr_mu_destroy(&ps->mu);
+}
+
+static grpc_error* pollset_kick(grpc_pollset* p, grpc_pollset_worker* worker) {
+ return GRPC_ERROR_NONE;
+}
+
+/* Callback when the tag is dequeued from the completion queue. Does nothing */
+static void cq_done_cb(grpc_exec_ctx* exec_ctx, void* done_arg,
+ grpc_cq_completion* cq_completion) {
+ gpr_free(cq_completion);
+}
+
+/* Queues a completion tag. ZERO polling overhead */
+static grpc_error* pollset_work(grpc_exec_ctx* exec_ctx, grpc_pollset* ps,
+ grpc_pollset_worker** worker, gpr_timespec now,
+ gpr_timespec deadline) {
+ gpr_mu_unlock(&ps->mu);
+ grpc_cq_begin_op(g_cq, g_tag);
+ grpc_cq_end_op(exec_ctx, g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, NULL,
+ (grpc_cq_completion*)gpr_malloc(sizeof(grpc_cq_completion)));
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&ps->mu);
+ return GRPC_ERROR_NONE;
+}
+
+static void init_engine_vtable() {
+ memset(&g_vtable, 0, sizeof(g_vtable));
+
+ g_vtable.pollset_size = sizeof(grpc_pollset);
+ g_vtable.pollset_init = pollset_init;
+ g_vtable.pollset_shutdown = pollset_shutdown;
+ g_vtable.pollset_destroy = pollset_destroy;
+ g_vtable.pollset_work = pollset_work;
+ g_vtable.pollset_kick = pollset_kick;
+}
+
+static void setup() {
+ grpc_init();
+ init_engine_vtable();
+ grpc_set_event_engine_test_only(&g_vtable);
+
+ g_cq = grpc_completion_queue_create_for_next(NULL);
+}
+
+static void teardown() {
+ grpc_completion_queue_shutdown(g_cq);
+ grpc_completion_queue_destroy(g_cq);
+}
+
+/* A few notes about Multi-threaded benchmarks:
+
+ Setup:
+ The benchmark framework ensures that none of the threads proceed beyond the
+ state.KeepRunning() call unless all the threads have called state.keepRunning
+ atleast once. So it is safe to do the initialization in one of the threads
+ before state.KeepRunning() is called.
+
+ Teardown:
+ The benchmark framework also ensures that no thread is running the benchmark
+ code (i.e the code between two successive calls of state.KeepRunning()) if
+ state.KeepRunning() returns false. So it is safe to do the teardown in one
+ of the threads after state.keepRunning() returns false.
+*/
+static void BM_Cq_Throughput(benchmark::State& state) {
+ TrackCounters track_counters;
+ gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+
+ if (state.thread_index == 0) {
+ setup();
+ }
+
+ while (state.KeepRunning()) {
+ GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, NULL).type ==
+ GRPC_OP_COMPLETE);
+ }
+
+ state.SetItemsProcessed(state.iterations());
+
+ if (state.thread_index == 0) {
+ teardown();
+ }
+
+ track_counters.Finish(state);
+}
+
+BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
+
+} // namespace testing
+} // namespace grpc
+
+BENCHMARK_MAIN();
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
index c536e15a2c..fd2210c474 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_ping_pong.cc
@@ -436,6 +436,18 @@ BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, InProcessCHTTP2, NoOpMutator,
BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, TCP, NoOpMutator, NoOpMutator)
->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_StreamingPingPong, MinInProcessCHTTP2, NoOpMutator,
+ NoOpMutator)
+ ->Apply(StreamingPingPongArgs);
+BENCHMARK_TEMPLATE(BM_StreamingPingPong, MinTCP, NoOpMutator, NoOpMutator)
+ ->Apply(StreamingPingPongArgs);
+
+BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, MinInProcessCHTTP2, NoOpMutator,
+ NoOpMutator)
+ ->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_StreamingPingPongMsgs, MinTCP, NoOpMutator, NoOpMutator)
+ ->Range(0, 128 * 1024 * 1024);
+
// Generate Args for StreamingPingPongWithCoalescingApi benchmarks. Currently
// generates args for only "small streams" (i.e streams with 0, 1 or 2 messages)
static void StreamingPingPongWithCoalescingApiArgs(
@@ -459,6 +471,9 @@ static void StreamingPingPongWithCoalescingApiArgs(
BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, InProcessCHTTP2,
NoOpMutator, NoOpMutator)
->Apply(StreamingPingPongWithCoalescingApiArgs);
+BENCHMARK_TEMPLATE(BM_StreamingPingPongWithCoalescingApi, MinInProcessCHTTP2,
+ NoOpMutator, NoOpMutator)
+ ->Apply(StreamingPingPongWithCoalescingApiArgs);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
index 5c1eb1165b..01ff39121e 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_streaming_pump.cc
@@ -105,6 +105,17 @@ static void BM_PumpStreamClientToServer(benchmark::State& state) {
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
+ response_rw.Finish(Status::OK, tag(0));
+ Status final_status;
+ request_rw->Finish(&final_status, tag(1));
+ need_tags = (1 << 0) | (1 << 1);
+ while (need_tags) {
+ GPR_ASSERT(fixture->cq()->Next(&t, &ok));
+ int i = (int)(intptr_t)t;
+ GPR_ASSERT(need_tags & (1 << i));
+ need_tags &= ~(1 << i);
+ }
+ GPR_ASSERT(final_status.ok());
}
fixture->Finish(state);
fixture.reset();
@@ -189,6 +200,14 @@ BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, SockPair)
->Range(0, 128 * 1024 * 1024);
BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, InProcessCHTTP2)
->Range(0, 128 * 1024 * 1024);
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinTCP)->Arg(0);
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinUDS)->Arg(0);
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinSockPair)->Arg(0);
+BENCHMARK_TEMPLATE(BM_PumpStreamClientToServer, MinInProcessCHTTP2)->Arg(0);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinTCP)->Arg(0);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinUDS)->Arg(0);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinSockPair)->Arg(0);
+BENCHMARK_TEMPLATE(BM_PumpStreamServerToClient, MinInProcessCHTTP2)->Arg(0);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
index c563f28b55..6b9fa8b38d 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc
@@ -34,6 +34,8 @@
/* Benchmark gRPC end2end in various configurations */
#include <benchmark/benchmark.h>
+#include <gflags/gflags.h>
+#include <fstream>
#include "src/core/lib/profiling/timers.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@@ -45,15 +47,58 @@ extern "C" {
#include "test/core/util/trickle_endpoint.h"
}
+DEFINE_bool(log, false, "Log state to CSV files");
+DEFINE_int32(
+ warmup_megabytes, 1,
+ "Number of megabytes to pump before collecting flow control stats");
+DEFINE_int32(
+ warmup_iterations, 100,
+ "Number of iterations to run before collecting flow control stats");
+DEFINE_int32(warmup_max_time_seconds, 10,
+ "Maximum number of seconds to run warmup loop");
+
namespace grpc {
namespace testing {
static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
+template <class A0>
+static void write_csv(std::ostream* out, A0&& a0) {
+ if (!out) return;
+ (*out) << a0 << "\n";
+}
+
+template <class A0, class... Arg>
+static void write_csv(std::ostream* out, A0&& a0, Arg&&... arg) {
+ if (!out) return;
+ (*out) << a0 << ",";
+ write_csv(out, std::forward<Arg>(arg)...);
+}
+
class TrickledCHTTP2 : public EndpointPairFixture {
public:
- TrickledCHTTP2(Service* service, size_t megabits_per_second)
- : EndpointPairFixture(service, MakeEndpoints(megabits_per_second)) {}
+ TrickledCHTTP2(Service* service, bool streaming, size_t req_size,
+ size_t resp_size, size_t kilobits_per_second)
+ : EndpointPairFixture(service, MakeEndpoints(kilobits_per_second),
+ FixtureConfiguration()) {
+ if (FLAGS_log) {
+ std::ostringstream fn;
+ fn << "trickle." << (streaming ? "streaming" : "unary") << "." << req_size
+ << "." << resp_size << "." << kilobits_per_second << ".csv";
+ log_.reset(new std::ofstream(fn.str().c_str()));
+ write_csv(log_.get(), "t", "iteration", "client_backlog",
+ "server_backlog", "client_t_stall", "client_s_stall",
+ "server_t_stall", "server_s_stall", "client_t_outgoing",
+ "server_t_outgoing", "client_t_incoming", "server_t_incoming",
+ "client_s_outgoing_delta", "server_s_outgoing_delta",
+ "client_s_incoming_delta", "server_s_incoming_delta",
+ "client_s_announce_window", "server_s_announce_window",
+ "client_peer_iws", "client_local_iws", "client_sent_iws",
+ "client_acked_iws", "server_peer_iws", "server_local_iws",
+ "server_sent_iws", "server_acked_iws", "client_queued_bytes",
+ "server_queued_bytes");
+ }
+ }
void AddToLabel(std::ostream& out, benchmark::State& state) {
out << " writes/iter:"
@@ -74,7 +119,58 @@ class TrickledCHTTP2 : public EndpointPairFixture {
(double)state.iterations());
}
- void Step() {
+ void Log(int64_t iteration) {
+ auto now = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_);
+ grpc_chttp2_transport* client =
+ reinterpret_cast<grpc_chttp2_transport*>(client_transport_);
+ grpc_chttp2_transport* server =
+ reinterpret_cast<grpc_chttp2_transport*>(server_transport_);
+ grpc_chttp2_stream* client_stream =
+ client->stream_map.count == 1
+ ? static_cast<grpc_chttp2_stream*>(client->stream_map.values[0])
+ : nullptr;
+ grpc_chttp2_stream* server_stream =
+ server->stream_map.count == 1
+ ? static_cast<grpc_chttp2_stream*>(server->stream_map.values[0])
+ : nullptr;
+ write_csv(
+ log_.get(), static_cast<double>(now.tv_sec) +
+ 1e-9 * static_cast<double>(now.tv_nsec),
+ iteration, grpc_trickle_get_backlog(endpoint_pair_.client),
+ grpc_trickle_get_backlog(endpoint_pair_.server),
+ client->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
+ client->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
+ server->lists[GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT].head != nullptr,
+ server->lists[GRPC_CHTTP2_LIST_STALLED_BY_STREAM].head != nullptr,
+ client->outgoing_window, server->outgoing_window,
+ client->incoming_window, server->incoming_window,
+ client_stream ? client_stream->outgoing_window_delta : -1,
+ server_stream ? server_stream->outgoing_window_delta : -1,
+ client_stream ? client_stream->incoming_window_delta : -1,
+ server_stream ? server_stream->incoming_window_delta : -1,
+ client_stream ? client_stream->announce_window : -1,
+ server_stream ? server_stream->announce_window : -1,
+ client->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ server->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ client_stream ? client_stream->flow_controlled_buffer.length : 0,
+ server_stream ? server_stream->flow_controlled_buffer.length : 0);
+ }
+
+ void Step(bool update_stats) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t client_backlog =
grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.client);
@@ -82,10 +178,12 @@ class TrickledCHTTP2 : public EndpointPairFixture {
grpc_trickle_endpoint_trickle(&exec_ctx, endpoint_pair_.server);
grpc_exec_ctx_finish(&exec_ctx);
- UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_,
- client_backlog);
- UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_,
- server_backlog);
+ if (update_stats) {
+ UpdateStats((grpc_chttp2_transport*)client_transport_, &client_stats_,
+ client_backlog);
+ UpdateStats((grpc_chttp2_transport*)server_transport_, &server_stats_,
+ server_backlog);
+ }
}
private:
@@ -96,6 +194,8 @@ class TrickledCHTTP2 : public EndpointPairFixture {
};
Stats client_stats_;
Stats server_stats_;
+ std::unique_ptr<std::ofstream> log_;
+ gpr_timespec start_ = gpr_now(GPR_CLOCK_MONOTONIC);
grpc_endpoint_pair MakeEndpoints(size_t kilobits) {
grpc_endpoint_pair p;
@@ -122,13 +222,15 @@ class TrickledCHTTP2 : public EndpointPairFixture {
// force library initialization
auto& force_library_initialization = Library::get();
-static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
+static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok,
+ int64_t iteration) {
while (true) {
+ fixture->Log(iteration);
switch (fixture->cq()->AsyncNext(
t, ok, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(100, GPR_TIMESPAN)))) {
case CompletionQueue::TIMEOUT:
- fixture->Step();
+ fixture->Step(iteration != -1);
break;
case CompletionQueue::SHUTDOWN:
GPR_ASSERT(false);
@@ -141,8 +243,9 @@ static void TrickleCQNext(TrickledCHTTP2* fixture, void** t, bool* ok) {
static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
EchoTestService::AsyncService service;
- std::unique_ptr<TrickledCHTTP2> fixture(
- new TrickledCHTTP2(&service, state.range(1)));
+ std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2(
+ &service, true, state.range(0) /* req_size */,
+ state.range(0) /* resp_size */, state.range(1) /* bw in kbit/s */));
{
EchoResponse send_response;
EchoResponse recv_response;
@@ -162,18 +265,19 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
void* t;
bool ok;
while (need_tags) {
- TrickleCQNext(fixture.get(), &t, &ok);
+ TrickleCQNext(fixture.get(), &t, &ok, -1);
GPR_ASSERT(ok);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
}
request_rw->Read(&recv_response, tag(0));
- while (state.KeepRunning()) {
+ auto inner_loop = [&](bool in_warmup) {
GPR_TIMER_SCOPE("BenchmarkCycle", 0);
response_rw.Write(send_response, tag(1));
while (true) {
- TrickleCQNext(fixture.get(), &t, &ok);
+ TrickleCQNext(fixture.get(), &t, &ok,
+ in_warmup ? -1 : state.iterations());
if (t == tag(0)) {
request_rw->Read(&recv_response, tag(0));
} else if (t == tag(1)) {
@@ -182,11 +286,26 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
GPR_ASSERT(false);
}
}
+ };
+ gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (int i = 0;
+ i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 *
+ 1024 / (14 + state.range(0)));
+ i++) {
+ inner_loop(true);
+ if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start),
+ gpr_time_from_seconds(FLAGS_warmup_max_time_seconds,
+ GPR_TIMESPAN)) > 0) {
+ break;
+ }
+ }
+ while (state.KeepRunning()) {
+ inner_loop(false);
}
response_rw.Finish(Status::OK, tag(1));
need_tags = (1 << 0) | (1 << 1);
while (need_tags) {
- TrickleCQNext(fixture.get(), &t, &ok);
+ TrickleCQNext(fixture.get(), &t, &ok, -1);
int i = (int)(intptr_t)t;
GPR_ASSERT(need_tags & (1 << i));
need_tags &= ~(1 << i);
@@ -197,23 +316,126 @@ static void BM_PumpStreamServerToClient_Trickle(benchmark::State& state) {
state.SetBytesProcessed(state.range(0) * state.iterations());
}
-/*******************************************************************************
- * CONFIGURATIONS
- */
-
-static void TrickleArgs(benchmark::internal::Benchmark* b) {
+static void StreamingTrickleArgs(benchmark::internal::Benchmark* b) {
for (int i = 1; i <= 128 * 1024 * 1024; i *= 8) {
- for (int j = 1; j <= 128 * 1024 * 1024; j *= 8) {
+ for (int j = 64; j <= 128 * 1024 * 1024; j *= 8) {
double expected_time =
static_cast<double>(14 + i) / (125.0 * static_cast<double>(j));
- if (expected_time > 0.01) continue;
+ if (expected_time > 2.0) continue;
b->Args({i, j});
}
}
}
+BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(StreamingTrickleArgs);
+
+static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) {
+ EchoTestService::AsyncService service;
+ std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2(
+ &service, true, state.range(0) /* req_size */,
+ state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */));
+ EchoRequest send_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ if (state.range(0) > 0) {
+ send_request.set_message(std::string(state.range(0), 'a'));
+ }
+ if (state.range(1) > 0) {
+ send_response.set_message(std::string(state.range(1), 'a'));
+ }
+ Status recv_status;
+ struct ServerEnv {
+ ServerContext ctx;
+ EchoRequest recv_request;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
+ ServerEnv() : response_writer(&ctx) {}
+ };
+ uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
+ ServerEnv* server_env[2] = {
+ reinterpret_cast<ServerEnv*>(server_env_buffer),
+ reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
+ new (server_env[0]) ServerEnv;
+ new (server_env[1]) ServerEnv;
+ service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
+ &server_env[0]->response_writer, fixture->cq(),
+ fixture->cq(), tag(0));
+ service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
+ &server_env[1]->response_writer, fixture->cq(),
+ fixture->cq(), tag(1));
+ std::unique_ptr<EchoTestService::Stub> stub(
+ EchoTestService::NewStub(fixture->channel()));
+ auto inner_loop = [&](bool in_warmup) {
+ GPR_TIMER_SCOPE("BenchmarkCycle", 0);
+ recv_response.Clear();
+ ClientContext cli_ctx;
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
+ void* t;
+ bool ok;
+ TrickleCQNext(fixture.get(), &t, &ok, state.iterations());
+ GPR_ASSERT(ok);
+ GPR_ASSERT(t == tag(0) || t == tag(1));
+ intptr_t slot = reinterpret_cast<intptr_t>(t);
+ ServerEnv* senv = server_env[slot];
+ senv->response_writer.Finish(send_response, Status::OK, tag(3));
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ for (int i = (1 << 3) | (1 << 4); i != 0;) {
+ TrickleCQNext(fixture.get(), &t, &ok, state.iterations());
+ GPR_ASSERT(ok);
+ int tagnum = (int)reinterpret_cast<intptr_t>(t);
+ GPR_ASSERT(i & (1 << tagnum));
+ i -= 1 << tagnum;
+ }
+ GPR_ASSERT(recv_status.ok());
+
+ senv->~ServerEnv();
+ senv = new (senv) ServerEnv();
+ service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
+ fixture->cq(), fixture->cq(), tag(slot));
+ };
+ gpr_timespec warmup_start = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (int i = 0;
+ i < GPR_MAX(FLAGS_warmup_iterations, FLAGS_warmup_megabytes * 1024 *
+ 1024 / (14 + state.range(0)));
+ i++) {
+ inner_loop(true);
+ if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), warmup_start),
+ gpr_time_from_seconds(FLAGS_warmup_max_time_seconds,
+ GPR_TIMESPAN)) > 0) {
+ break;
+ }
+ }
+ while (state.KeepRunning()) {
+ inner_loop(false);
+ }
+ fixture->Finish(state);
+ fixture.reset();
+ server_env[0]->~ServerEnv();
+ server_env[1]->~ServerEnv();
+ state.SetBytesProcessed(state.range(0) * state.iterations() +
+ state.range(1) * state.iterations());
+}
-BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(TrickleArgs);
+static void UnaryTrickleArgs(benchmark::internal::Benchmark* b) {
+ const int cli_1024k = 1024 * 1024;
+ const int cli_32M = 32 * 1024 * 1024;
+ const int svr_256k = 256 * 1024;
+ const int svr_4M = 4 * 1024 * 1024;
+ const int svr_64M = 64 * 1024 * 1024;
+ for (int bw = 64; bw <= 128 * 1024 * 1024; bw *= 16) {
+ b->Args({bw, cli_1024k, svr_256k});
+ b->Args({bw, cli_1024k, svr_4M});
+ b->Args({bw, cli_1024k, svr_64M});
+ b->Args({bw, cli_32M, svr_256k});
+ b->Args({bw, cli_32M, svr_4M});
+ b->Args({bw, cli_32M, svr_64M});
+ }
+}
+BENCHMARK(BM_PumpUnbalancedUnary_Trickle)->Apply(UnaryTrickleArgs);
}
}
-BENCHMARK_MAIN();
+int main(int argc, char** argv) {
+ ::benchmark::Initialize(&argc, argv);
+ ::google::ParseCommandLineFlags(&argc, &argv, false);
+ ::benchmark::RunSpecifiedBenchmarks();
+}
diff --git a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
index 615b05b7c7..7524751fbc 100644
--- a/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack_unary_ping_pong.cc
@@ -141,12 +141,21 @@ static void SweepSizesArgs(benchmark::internal::Benchmark* b) {
BENCHMARK_TEMPLATE(BM_UnaryPingPong, TCP, NoOpMutator, NoOpMutator)
->Apply(SweepSizesArgs);
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinTCP, NoOpMutator, NoOpMutator)
+ ->Apply(SweepSizesArgs);
BENCHMARK_TEMPLATE(BM_UnaryPingPong, UDS, NoOpMutator, NoOpMutator)
->Args({0, 0});
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinUDS, NoOpMutator, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, SockPair, NoOpMutator, NoOpMutator)
->Args({0, 0});
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinSockPair, NoOpMutator, NoOpMutator)
+ ->Args({0, 0});
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2, NoOpMutator, NoOpMutator)
->Apply(SweepSizesArgs);
+BENCHMARK_TEMPLATE(BM_UnaryPingPong, MinInProcessCHTTP2, NoOpMutator,
+ NoOpMutator)
+ ->Apply(SweepSizesArgs);
BENCHMARK_TEMPLATE(BM_UnaryPingPong, InProcessCHTTP2,
Client_AddMetadata<RandomBinaryMetadata<10>, 1>, NoOpMutator)
->Args({0, 0});
diff --git a/test/cpp/microbenchmarks/bm_pollset.cc b/test/cpp/microbenchmarks/bm_pollset.cc
index 0f3d3cef66..f5e8d13881 100644
--- a/test/cpp/microbenchmarks/bm_pollset.cc
+++ b/test/cpp/microbenchmarks/bm_pollset.cc
@@ -59,7 +59,7 @@ extern "C" {
auto& force_library_initialization = Library::get();
static void shutdown_ps(grpc_exec_ctx* exec_ctx, void* ps, grpc_error* error) {
- grpc_pollset_destroy(static_cast<grpc_pollset*>(ps));
+ grpc_pollset_destroy(exec_ctx, static_cast<grpc_pollset*>(ps));
}
static void BM_CreateDestroyPollset(benchmark::State& state) {
@@ -136,8 +136,7 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(mu);
while (state.KeepRunning()) {
- grpc_pollset_worker* worker;
- GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline));
+ GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_closure shutdown_ps_closure;
grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
@@ -150,6 +149,34 @@ static void BM_PollEmptyPollset(benchmark::State& state) {
}
BENCHMARK(BM_PollEmptyPollset);
+static void BM_PollAddFd(benchmark::State& state) {
+ TrackCounters track_counters;
+ size_t ps_sz = grpc_pollset_size();
+ grpc_pollset* ps = static_cast<grpc_pollset*>(gpr_zalloc(ps_sz));
+ gpr_mu* mu;
+ grpc_pollset_init(ps, &mu);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_wakeup_fd wakeup_fd;
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&wakeup_fd)));
+ grpc_fd* fd = grpc_fd_create(wakeup_fd.read_fd, "xxx");
+ while (state.KeepRunning()) {
+ grpc_pollset_add_fd(&exec_ctx, ps, fd);
+ grpc_exec_ctx_flush(&exec_ctx);
+ }
+ grpc_fd_orphan(&exec_ctx, fd, NULL, NULL, "xxx");
+ grpc_closure shutdown_ps_closure;
+ grpc_closure_init(&shutdown_ps_closure, shutdown_ps, ps,
+ grpc_schedule_on_exec_ctx);
+ gpr_mu_lock(mu);
+ grpc_pollset_shutdown(&exec_ctx, ps, &shutdown_ps_closure);
+ gpr_mu_unlock(mu);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(ps);
+ track_counters.Finish(state);
+}
+BENCHMARK(BM_PollAddFd);
+
class Closure : public grpc_closure {
public:
virtual ~Closure() {}
@@ -233,8 +260,7 @@ static void BM_SingleThreadPollOneFd(benchmark::State& state) {
grpc_fd_notify_on_read(&exec_ctx, wakeup, continue_closure);
gpr_mu_lock(mu);
while (!done) {
- grpc_pollset_worker* worker;
- GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, &worker, now, deadline));
+ GRPC_ERROR_UNREF(grpc_pollset_work(&exec_ctx, ps, NULL, now, deadline));
}
grpc_fd_orphan(&exec_ctx, wakeup, NULL, NULL, "done");
wakeup_fd.read_fd = 0;
diff --git a/test/cpp/microbenchmarks/fullstack_fixtures.h b/test/cpp/microbenchmarks/fullstack_fixtures.h
index dc29701059..98aca1c346 100644
--- a/test/cpp/microbenchmarks/fullstack_fixtures.h
+++ b/test/cpp/microbenchmarks/fullstack_fixtures.h
@@ -61,29 +61,33 @@ extern "C" {
namespace grpc {
namespace testing {
-static void ApplyCommonServerBuilderConfig(ServerBuilder* b) {
- b->SetMaxReceiveMessageSize(INT_MAX);
- b->SetMaxSendMessageSize(INT_MAX);
-}
+class FixtureConfiguration {
+ public:
+ virtual void ApplyCommonChannelArguments(ChannelArguments* c) const {
+ c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
+ c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
+ }
-static void ApplyCommonChannelArguments(ChannelArguments* c) {
- c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
- c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
-}
+ virtual void ApplyCommonServerBuilderConfig(ServerBuilder* b) const {
+ b->SetMaxReceiveMessageSize(INT_MAX);
+ b->SetMaxSendMessageSize(INT_MAX);
+ }
+};
class BaseFixture : public TrackCounters {};
class FullstackFixture : public BaseFixture {
public:
- FullstackFixture(Service* service, const grpc::string& address) {
+ FullstackFixture(Service* service, const FixtureConfiguration& config,
+ const grpc::string& address) {
ServerBuilder b;
b.AddListeningPort(address, InsecureServerCredentials());
cq_ = b.AddCompletionQueue(true);
b.RegisterService(service);
- ApplyCommonServerBuilderConfig(&b);
+ config.ApplyCommonServerBuilderConfig(&b);
server_ = b.BuildAndStart();
ChannelArguments args;
- ApplyCommonChannelArguments(&args);
+ config.ApplyCommonChannelArguments(&args);
channel_ = CreateCustomChannel(address, InsecureChannelCredentials(), args);
}
@@ -107,39 +111,52 @@ class FullstackFixture : public BaseFixture {
class TCP : public FullstackFixture {
public:
- TCP(Service* service) : FullstackFixture(service, MakeAddress()) {}
+ TCP(Service* service, const FixtureConfiguration& fixture_configuration =
+ FixtureConfiguration())
+ : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {}
+
+ ~TCP() { grpc_recycle_unused_port(port_); }
private:
- static grpc::string MakeAddress() {
- int port = grpc_pick_unused_port_or_die();
+ int port_;
+
+ static grpc::string MakeAddress(int* port) {
+ *port = grpc_pick_unused_port_or_die();
std::stringstream addr;
- addr << "localhost:" << port;
+ addr << "localhost:" << *port;
return addr.str();
}
};
class UDS : public FullstackFixture {
public:
- UDS(Service* service) : FullstackFixture(service, MakeAddress()) {}
+ UDS(Service* service, const FixtureConfiguration& fixture_configuration =
+ FixtureConfiguration())
+ : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {}
+
+ ~UDS() { grpc_recycle_unused_port(port_); }
private:
- static grpc::string MakeAddress() {
- int port = grpc_pick_unused_port_or_die(); // just for a unique id - not a
- // real port
+ int port_;
+
+ static grpc::string MakeAddress(int* port) {
+ *port = grpc_pick_unused_port_or_die(); // just for a unique id - not a
+ // real port
std::stringstream addr;
- addr << "unix:/tmp/bm_fullstack." << port;
+ addr << "unix:/tmp/bm_fullstack." << *port;
return addr.str();
}
};
class EndpointPairFixture : public BaseFixture {
public:
- EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints)
+ EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints,
+ const FixtureConfiguration& fixture_configuration)
: endpoint_pair_(endpoints) {
ServerBuilder b;
cq_ = b.AddCompletionQueue(true);
b.RegisterService(service);
- ApplyCommonServerBuilderConfig(&b);
+ fixture_configuration.ApplyCommonServerBuilderConfig(&b);
server_ = b.BuildAndStart();
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -169,7 +186,7 @@ class EndpointPairFixture : public BaseFixture {
{
ChannelArguments args;
args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
- ApplyCommonChannelArguments(&args);
+ fixture_configuration.ApplyCommonChannelArguments(&args);
grpc_channel_args c_args = args.c_channel_args();
client_transport_ =
@@ -211,15 +228,19 @@ class EndpointPairFixture : public BaseFixture {
class SockPair : public EndpointPairFixture {
public:
- SockPair(Service* service)
- : EndpointPairFixture(service, grpc_iomgr_create_endpoint_pair(
- "test", Library::get().rq(), 8192)) {}
+ SockPair(Service* service, const FixtureConfiguration& fixture_configuration =
+ FixtureConfiguration())
+ : EndpointPairFixture(service,
+ grpc_iomgr_create_endpoint_pair("test", NULL),
+ fixture_configuration) {}
};
class InProcessCHTTP2 : public EndpointPairFixture {
public:
- InProcessCHTTP2(Service* service)
- : EndpointPairFixture(service, MakeEndpoints()) {}
+ InProcessCHTTP2(Service* service,
+ const FixtureConfiguration& fixture_configuration =
+ FixtureConfiguration())
+ : EndpointPairFixture(service, MakeEndpoints(), fixture_configuration) {}
void AddToLabel(std::ostream& out, benchmark::State& state) {
EndpointPairFixture::AddToLabel(out, state);
@@ -238,6 +259,32 @@ class InProcessCHTTP2 : public EndpointPairFixture {
}
};
+////////////////////////////////////////////////////////////////////////////////
+// Minimal stack fixtures
+
+class MinStackConfiguration : public FixtureConfiguration {
+ void ApplyCommonChannelArguments(ChannelArguments* a) const override {
+ a->SetInt(GRPC_ARG_MINIMAL_STACK, 1);
+ FixtureConfiguration::ApplyCommonChannelArguments(a);
+ }
+
+ void ApplyCommonServerBuilderConfig(ServerBuilder* b) const override {
+ b->AddChannelArgument(GRPC_ARG_MINIMAL_STACK, 1);
+ FixtureConfiguration::ApplyCommonServerBuilderConfig(b);
+ }
+};
+
+template <class Base>
+class MinStackize : public Base {
+ public:
+ MinStackize(Service* service) : Base(service, MinStackConfiguration()) {}
+};
+
+typedef MinStackize<TCP> MinTCP;
+typedef MinStackize<UDS> MinUDS;
+typedef MinStackize<SockPair> MinSockPair;
+typedef MinStackize<InProcessCHTTP2> MinInProcessCHTTP2;
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/microbenchmarks/helpers.cc b/test/cpp/microbenchmarks/helpers.cc
index d277c5984c..6550742453 100644
--- a/test/cpp/microbenchmarks/helpers.cc
+++ b/test/cpp/microbenchmarks/helpers.cc
@@ -57,6 +57,10 @@ void TrackCounters::AddToLabel(std::ostream &out, benchmark::State &state) {
<< ((double)(gpr_atm_no_barrier_load(&gpr_counter_atm_add) -
atm_add_at_start_) /
(double)state.iterations())
+ << " nows/iter:"
+ << ((double)(gpr_atm_no_barrier_load(&gpr_now_call_count) -
+ now_calls_at_start_) /
+ (double)state.iterations())
<< " allocs/iter:"
<< ((double)(counters_at_end.total_allocs_absolute -
counters_at_start_.total_allocs_absolute) /
diff --git a/test/cpp/microbenchmarks/helpers.h b/test/cpp/microbenchmarks/helpers.h
index 49ed517b1d..7360a1c9f2 100644
--- a/test/cpp/microbenchmarks/helpers.h
+++ b/test/cpp/microbenchmarks/helpers.h
@@ -72,6 +72,7 @@ class Library {
extern "C" gpr_atm gpr_mu_locks;
extern "C" gpr_atm gpr_counter_atm_cas;
extern "C" gpr_atm gpr_counter_atm_add;
+extern "C" gpr_atm gpr_now_call_count;
#endif
class TrackCounters {
@@ -86,6 +87,8 @@ class TrackCounters {
gpr_atm_no_barrier_load(&gpr_counter_atm_cas);
const size_t atm_add_at_start_ =
gpr_atm_no_barrier_load(&gpr_counter_atm_add);
+ const size_t now_calls_at_start_ =
+ gpr_atm_no_barrier_load(&gpr_now_call_count);
grpc_memory_counters counters_at_start_ = grpc_memory_counters_snapshot();
#endif
};
diff --git a/test/cpp/performance/writes_per_rpc_test.cc b/test/cpp/performance/writes_per_rpc_test.cc
index 7a914c1547..12d8268330 100644
--- a/test/cpp/performance/writes_per_rpc_test.cc
+++ b/test/cpp/performance/writes_per_rpc_test.cc
@@ -254,8 +254,8 @@ TEST(WritesPerRpcTest, UnaryPingPong) {
EXPECT_LT(UnaryPingPong(0, 0), 2.05);
EXPECT_LT(UnaryPingPong(1, 0), 2.05);
EXPECT_LT(UnaryPingPong(0, 1), 2.05);
- EXPECT_LT(UnaryPingPong(4096, 0), 2.2);
- EXPECT_LT(UnaryPingPong(0, 4096), 2.2);
+ EXPECT_LT(UnaryPingPong(4096, 0), 2.5);
+ EXPECT_LT(UnaryPingPong(0, 4096), 2.5);
}
} // namespace testing
diff --git a/test/cpp/qps/benchmark_config.cc b/test/cpp/qps/benchmark_config.cc
index 98b8d0ba37..d33f3e9ae1 100644
--- a/test/cpp/qps/benchmark_config.cc
+++ b/test/cpp/qps/benchmark_config.cc
@@ -33,6 +33,9 @@
#include "test/cpp/qps/benchmark_config.h"
#include <gflags/gflags.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/security/credentials.h>
+#include <grpc/support/log.h>
DEFINE_bool(enable_log_reporter, true,
"Enable reporting of benchmark results through GprLog");
@@ -51,6 +54,11 @@ DEFINE_string(server_address, "localhost:50052",
DEFINE_string(tag, "", "Optional tag for the test");
+DEFINE_string(rpc_reporter_server_address, "",
+ "Server address for rpc reporter to send results to");
+
+DEFINE_bool(enable_rpc_reporter, false, "Enable use of RPC reporter");
+
// In some distros, gflags is in the namespace google, and in some others,
// in gflags. This hack is enabling us to find both.
namespace google {}
@@ -75,6 +83,13 @@ static std::shared_ptr<Reporter> InitBenchmarkReporters() {
composite_reporter->add(std::unique_ptr<Reporter>(
new JsonReporter("JsonReporter", FLAGS_scenario_result_file)));
}
+ if (FLAGS_enable_rpc_reporter) {
+ GPR_ASSERT(!FLAGS_rpc_reporter_server_address.empty());
+ composite_reporter->add(std::unique_ptr<Reporter>(new RpcReporter(
+ "RpcReporter",
+ grpc::CreateChannel(FLAGS_rpc_reporter_server_address,
+ grpc::InsecureChannelCredentials()))));
+ }
return std::shared_ptr<Reporter>(composite_reporter);
}
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 25a19a5a74..c3197eb622 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -443,11 +443,8 @@ class ClientImpl : public Client {
create_stub_;
};
-std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args);
-std::unique_ptr<Client> CreateSynchronousStreamingClient(
- const ClientConfig& args);
-std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args);
-std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& args);
+std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& args);
std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
const ClientConfig& args);
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 396d308e2a..82c3356f02 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -63,13 +63,13 @@ class ClientRpcContext {
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
- virtual ClientRpcContext* StartNewClone() = 0;
+ virtual void StartNewClone(CompletionQueue* cq) = 0;
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext* detag(void* t) {
return reinterpret_cast<ClientRpcContext*>(t);
}
- virtual void Start(CompletionQueue* cq) = 0;
+ virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
};
template <class RequestType, class ResponseType>
@@ -94,22 +94,17 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_issue_(next_issue),
start_req_(start_req) {}
~ClientRpcContextUnaryImpl() override {}
- void Start(CompletionQueue* cq) override {
- cq_ = cq;
- if (!next_issue_) { // ready to issue
- RunNextState(true, nullptr);
- } else { // wait for the issue time
- alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
- }
+ void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ StartInternal(cq);
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
response_reader_ = start_req_(stub_, &context_, req_, cq_);
+ next_state_ = State::RESP_DONE;
response_reader_->Finish(&response_, &status_,
ClientRpcContext::tag(this));
- next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
if (status_.ok()) {
@@ -123,9 +118,10 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return false;
}
}
- ClientRpcContext* StartNewClone() override {
- return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_,
- callback_);
+ void StartNewClone(CompletionQueue* cq) override {
+ auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
+ start_req_, callback_);
+ clone->StartInternal(cq);
}
private:
@@ -147,6 +143,15 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
response_reader_;
+
+ void StartInternal(CompletionQueue* cq) {
+ cq_ = cq;
+ if (!next_issue_) { // ready to issue
+ RunNextState(true, nullptr);
+ } else { // wait for the issue time
+ alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ }
+ }
};
typedef std::forward_list<ClientRpcContext*> context_list;
@@ -185,7 +190,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
auto* cq = cli_cqs_[t].get();
auto ctx =
setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
- ctx->Start(cq);
+ ctx->Start(cq, config);
}
t = (t + 1) % cli_cqs_.size();
}
@@ -233,40 +238,27 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
void* got_tag;
bool ok;
- switch (cli_cqs_[thread_idx]->AsyncNext(
- &got_tag, &ok,
- std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
- case CompletionQueue::GOT_EVENT: {
- // Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- // Proceed while holding a lock to make sure that
- // this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
- if (shutdown_state_[thread_idx]->shutdown) {
- delete ctx;
- return true;
- } else if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- auto clone = ctx->StartNewClone();
- clone->Start(cli_cqs_[thread_idx].get());
- // delete the old version
- delete ctx;
- }
- return true;
- }
- case CompletionQueue::TIMEOUT: {
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
- if (shutdown_state_[thread_idx]->shutdown) {
- return true;
- }
+ if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ // Got a regular event, so process it
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ delete ctx;
return true;
+ } else if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[thread_idx].get());
+ // delete the old version
+ delete ctx;
}
- case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
- // done
- return true;
+ return true;
+ } else {
+ // queue is shutting down, so we must be done
+ return true;
}
- GPR_UNREACHABLE_CODE(return true);
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
@@ -309,9 +301,9 @@ class AsyncUnaryClient final
};
template <class RequestType, class ResponseType>
-class ClientRpcContextStreamingImpl : public ClientRpcContext {
+class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
public:
- ClientRpcContextStreamingImpl(
+ ClientRpcContextStreamingPingPongImpl(
BenchmarkService::Stub* stub, const RequestType& req,
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<
@@ -329,11 +321,9 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
callback_(on_done),
next_issue_(next_issue),
start_req_(start_req) {}
- ~ClientRpcContextStreamingImpl() override {}
- void Start(CompletionQueue* cq) override {
- cq_ = cq;
- stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
- next_state_ = State::STREAM_IDLE;
+ ~ClientRpcContextStreamingPingPongImpl() override {}
+ void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ StartInternal(cq, config.messages_per_stream());
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
while (true) {
@@ -346,9 +336,9 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
}
break; // loop around, don't return
case State::WAIT:
+ next_state_ = State::READY_TO_WRITE;
alarm_.reset(
new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
- next_state_ = State::READY_TO_WRITE;
return true;
case State::READY_TO_WRITE:
if (!ok) {
@@ -369,17 +359,32 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
case State::READ_DONE:
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
+ if ((messages_per_stream_ != 0) &&
+ (++messages_issued_ >= messages_per_stream_)) {
+ next_state_ = State::WRITES_DONE_DONE;
+ stream_->WritesDone(ClientRpcContext::tag(this));
+ return true;
+ }
next_state_ = State::STREAM_IDLE;
break; // loop around
+ case State::WRITES_DONE_DONE:
+ next_state_ = State::FINISH_DONE;
+ stream_->Finish(&status_, ClientRpcContext::tag(this));
+ return true;
+ case State::FINISH_DONE:
+ next_state_ = State::INVALID;
+ return false;
+ break;
default:
GPR_ASSERT(false);
return false;
}
}
}
- ClientRpcContext* StartNewClone() override {
- return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
- start_req_, callback_);
+ void StartNewClone(CompletionQueue* cq) override {
+ auto* clone = new ClientRpcContextStreamingPingPongImpl(
+ stub_, req_, next_issue_, start_req_, callback_);
+ clone->StartInternal(cq, messages_per_stream_);
}
private:
@@ -395,7 +400,9 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
WAIT,
READY_TO_WRITE,
WRITE_DONE,
- READ_DONE
+ READ_DONE,
+ WRITES_DONE_DONE,
+ FINISH_DONE
};
State next_state_;
std::function<void(grpc::Status, ResponseType*)> callback_;
@@ -408,18 +415,30 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
stream_;
+
+ // Allow a limit on number of messages in a stream
+ int messages_per_stream_;
+ int messages_issued_;
+
+ void StartInternal(CompletionQueue* cq, int messages_per_stream) {
+ cq_ = cq;
+ messages_per_stream_ = messages_per_stream;
+ messages_issued_ = 0;
+ next_state_ = State::STREAM_IDLE;
+ stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
+ }
};
-class AsyncStreamingClient final
+class AsyncStreamingPingPongClient final
: public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
public:
- explicit AsyncStreamingClient(const ClientConfig& config)
+ explicit AsyncStreamingPingPongClient(const ClientConfig& config)
: AsyncClient<BenchmarkService::Stub, SimpleRequest>(
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
- ~AsyncStreamingClient() override {}
+ ~AsyncStreamingPingPongClient() override {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -433,9 +452,250 @@ class AsyncStreamingClient final
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) {
- return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- stub, req, next_issue, AsyncStreamingClient::StartReq,
- AsyncStreamingClient::CheckDone);
+ return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
+ SimpleResponse>(
+ stub, req, next_issue, AsyncStreamingPingPongClient::StartReq,
+ AsyncStreamingPingPongClient::CheckDone);
+ }
+};
+
+template <class RequestType, class ResponseType>
+class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
+ public:
+ ClientRpcContextStreamingFromClientImpl(
+ BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<gpr_timespec()> next_issue,
+ std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
+ CompletionQueue*, void*)>
+ start_req,
+ std::function<void(grpc::Status, ResponseType*)> on_done)
+ : context_(),
+ stub_(stub),
+ cq_(nullptr),
+ req_(req),
+ response_(),
+ next_state_(State::INVALID),
+ callback_(on_done),
+ next_issue_(next_issue),
+ start_req_(start_req) {}
+ ~ClientRpcContextStreamingFromClientImpl() override {}
+ void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ StartInternal(cq);
+ }
+ bool RunNextState(bool ok, HistogramEntry* entry) override {
+ while (true) {
+ switch (next_state_) {
+ case State::STREAM_IDLE:
+ if (!next_issue_) { // ready to issue
+ next_state_ = State::READY_TO_WRITE;
+ } else {
+ next_state_ = State::WAIT;
+ }
+ break; // loop around, don't return
+ case State::WAIT:
+ alarm_.reset(
+ new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ next_state_ = State::READY_TO_WRITE;
+ return true;
+ case State::READY_TO_WRITE:
+ if (!ok) {
+ return false;
+ }
+ start_ = UsageTimer::Now();
+ next_state_ = State::WRITE_DONE;
+ stream_->Write(req_, ClientRpcContext::tag(this));
+ return true;
+ case State::WRITE_DONE:
+ if (!ok) {
+ return false;
+ }
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
+ next_state_ = State::STREAM_IDLE;
+ break; // loop around
+ default:
+ GPR_ASSERT(false);
+ return false;
+ }
+ }
+ }
+ void StartNewClone(CompletionQueue* cq) override {
+ auto* clone = new ClientRpcContextStreamingFromClientImpl(
+ stub_, req_, next_issue_, start_req_, callback_);
+ clone->StartInternal(cq);
+ }
+
+ private:
+ grpc::ClientContext context_;
+ BenchmarkService::Stub* stub_;
+ CompletionQueue* cq_;
+ std::unique_ptr<Alarm> alarm_;
+ RequestType req_;
+ ResponseType response_;
+ enum State {
+ INVALID,
+ STREAM_IDLE,
+ WAIT,
+ READY_TO_WRITE,
+ WRITE_DONE,
+ };
+ State next_state_;
+ std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<gpr_timespec()> next_issue_;
+ std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
+ CompletionQueue*, void*)>
+ start_req_;
+ grpc::Status status_;
+ double start_;
+ std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
+
+ void StartInternal(CompletionQueue* cq) {
+ cq_ = cq;
+ stream_ = start_req_(stub_, &context_, &response_, cq,
+ ClientRpcContext::tag(this));
+ next_state_ = State::STREAM_IDLE;
+ }
+};
+
+class AsyncStreamingFromClientClient final
+ : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
+ public:
+ explicit AsyncStreamingFromClientClient(const ClientConfig& config)
+ : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
+ config, SetupCtx, BenchmarkStubCreator) {
+ StartThreads(num_async_threads_);
+ }
+
+ ~AsyncStreamingFromClientClient() override {}
+
+ private:
+ static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> StartReq(
+ BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
+ SimpleResponse* resp, CompletionQueue* cq, void* tag) {
+ auto stream = stub->AsyncStreamingFromClient(ctx, resp, cq, tag);
+ return stream;
+ };
+ static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+ std::function<gpr_timespec()> next_issue,
+ const SimpleRequest& req) {
+ return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
+ SimpleResponse>(
+ stub, req, next_issue, AsyncStreamingFromClientClient::StartReq,
+ AsyncStreamingFromClientClient::CheckDone);
+ }
+};
+
+template <class RequestType, class ResponseType>
+class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
+ public:
+ ClientRpcContextStreamingFromServerImpl(
+ BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<gpr_timespec()> next_issue,
+ std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
+ CompletionQueue*, void*)>
+ start_req,
+ std::function<void(grpc::Status, ResponseType*)> on_done)
+ : context_(),
+ stub_(stub),
+ cq_(nullptr),
+ req_(req),
+ response_(),
+ next_state_(State::INVALID),
+ callback_(on_done),
+ next_issue_(next_issue),
+ start_req_(start_req) {}
+ ~ClientRpcContextStreamingFromServerImpl() override {}
+ void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ StartInternal(cq);
+ }
+ bool RunNextState(bool ok, HistogramEntry* entry) override {
+ while (true) {
+ switch (next_state_) {
+ case State::STREAM_IDLE:
+ if (!ok) {
+ return false;
+ }
+ start_ = UsageTimer::Now();
+ next_state_ = State::READ_DONE;
+ stream_->Read(&response_, ClientRpcContext::tag(this));
+ return true;
+ case State::READ_DONE:
+ if (!ok) {
+ return false;
+ }
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
+ callback_(status_, &response_);
+ next_state_ = State::STREAM_IDLE;
+ break; // loop around
+ default:
+ GPR_ASSERT(false);
+ return false;
+ }
+ }
+ }
+ void StartNewClone(CompletionQueue* cq) override {
+ auto* clone = new ClientRpcContextStreamingFromServerImpl(
+ stub_, req_, next_issue_, start_req_, callback_);
+ clone->StartInternal(cq);
+ }
+
+ private:
+ grpc::ClientContext context_;
+ BenchmarkService::Stub* stub_;
+ CompletionQueue* cq_;
+ std::unique_ptr<Alarm> alarm_;
+ RequestType req_;
+ ResponseType response_;
+ enum State { INVALID, STREAM_IDLE, READ_DONE };
+ State next_state_;
+ std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<gpr_timespec()> next_issue_;
+ std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
+ CompletionQueue*, void*)>
+ start_req_;
+ grpc::Status status_;
+ double start_;
+ std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
+
+ void StartInternal(CompletionQueue* cq) {
+ // TODO(vjpai): Add support to rate-pace this
+ cq_ = cq;
+ next_state_ = State::STREAM_IDLE;
+ stream_ =
+ start_req_(stub_, &context_, req_, cq, ClientRpcContext::tag(this));
+ }
+};
+
+class AsyncStreamingFromServerClient final
+ : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
+ public:
+ explicit AsyncStreamingFromServerClient(const ClientConfig& config)
+ : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
+ config, SetupCtx, BenchmarkStubCreator) {
+ StartThreads(num_async_threads_);
+ }
+
+ ~AsyncStreamingFromServerClient() override {}
+
+ private:
+ static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> StartReq(
+ BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
+ const SimpleRequest& req, CompletionQueue* cq, void* tag) {
+ auto stream = stub->AsyncStreamingFromServer(ctx, req, cq, tag);
+ return stream;
+ };
+ static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+ std::function<gpr_timespec()> next_issue,
+ const SimpleRequest& req) {
+ return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
+ SimpleResponse>(
+ stub, req, next_issue, AsyncStreamingFromServerClient::StartReq,
+ AsyncStreamingFromServerClient::CheckDone);
}
};
@@ -459,13 +719,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
next_issue_(next_issue),
start_req_(start_req) {}
~ClientRpcContextGenericStreamingImpl() override {}
- void Start(CompletionQueue* cq) override {
- cq_ = cq;
- const grpc::string kMethodName(
- "/grpc.testing.BenchmarkService/StreamingCall");
- stream_ = start_req_(stub_, &context_, kMethodName, cq,
- ClientRpcContext::tag(this));
- next_state_ = State::STREAM_IDLE;
+ void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ StartInternal(cq, config.messages_per_stream());
}
bool RunNextState(bool ok, HistogramEntry* entry) override {
while (true) {
@@ -478,9 +733,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
}
break; // loop around, don't return
case State::WAIT:
+ next_state_ = State::READY_TO_WRITE;
alarm_.reset(
new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
- next_state_ = State::READY_TO_WRITE;
return true;
case State::READY_TO_WRITE:
if (!ok) {
@@ -501,17 +756,32 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
case State::READ_DONE:
entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
+ if ((messages_per_stream_ != 0) &&
+ (++messages_issued_ >= messages_per_stream_)) {
+ next_state_ = State::WRITES_DONE_DONE;
+ stream_->WritesDone(ClientRpcContext::tag(this));
+ return true;
+ }
next_state_ = State::STREAM_IDLE;
break; // loop around
+ case State::WRITES_DONE_DONE:
+ next_state_ = State::FINISH_DONE;
+ stream_->Finish(&status_, ClientRpcContext::tag(this));
+ return true;
+ case State::FINISH_DONE:
+ next_state_ = State::INVALID;
+ return false;
+ break;
default:
GPR_ASSERT(false);
return false;
}
}
}
- ClientRpcContext* StartNewClone() override {
- return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_,
- start_req_, callback_);
+ void StartNewClone(CompletionQueue* cq) override {
+ auto* clone = new ClientRpcContextGenericStreamingImpl(
+ stub_, req_, next_issue_, start_req_, callback_);
+ clone->StartInternal(cq, messages_per_stream_);
}
private:
@@ -527,7 +797,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
WAIT,
READY_TO_WRITE,
WRITE_DONE,
- READ_DONE
+ READ_DONE,
+ WRITES_DONE_DONE,
+ FINISH_DONE
};
State next_state_;
std::function<void(grpc::Status, ByteBuffer*)> callback_;
@@ -539,6 +811,21 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
grpc::Status status_;
double start_;
std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
+
+ // Allow a limit on number of messages in a stream
+ int messages_per_stream_;
+ int messages_issued_;
+
+ void StartInternal(CompletionQueue* cq, int messages_per_stream) {
+ cq_ = cq;
+ const grpc::string kMethodName(
+ "/grpc.testing.BenchmarkService/StreamingCall");
+ messages_per_stream_ = messages_per_stream;
+ messages_issued_ = 0;
+ next_state_ = State::STREAM_IDLE;
+ stream_ = start_req_(stub_, &context_, kMethodName, cq,
+ ClientRpcContext::tag(this));
+ }
};
static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
@@ -574,11 +861,26 @@ class GenericAsyncStreamingClient final
}
};
-std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
- return std::unique_ptr<Client>(new AsyncUnaryClient(args));
-}
-std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
- return std::unique_ptr<Client>(new AsyncStreamingClient(args));
+std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {
+ switch (config.rpc_type()) {
+ case UNARY:
+ return std::unique_ptr<Client>(new AsyncUnaryClient(config));
+ case STREAMING:
+ return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));
+ case STREAMING_FROM_CLIENT:
+ return std::unique_ptr<Client>(
+ new AsyncStreamingFromClientClient(config));
+ case STREAMING_FROM_SERVER:
+ return std::unique_ptr<Client>(
+ new AsyncStreamingFromServerClient(config));
+ case STREAMING_BOTH_WAYS:
+ // TODO(vjpai): Implement this
+ assert(false);
+ return nullptr;
+ default:
+ assert(false);
+ return nullptr;
+ }
}
std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
const ClientConfig& args) {
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index d87771b192..f35713280a 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -136,29 +136,78 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
};
-class SynchronousStreamingClient final : public SynchronousClient {
+template <class StreamType>
+class SynchronousStreamingClient : public SynchronousClient {
public:
SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config),
context_(num_threads_),
- stream_(num_threads_) {
+ stream_(num_threads_),
+ messages_per_stream_(config.messages_per_stream()),
+ messages_issued_(num_threads_) {
+ StartThreads(num_threads_);
+ }
+ virtual ~SynchronousStreamingClient() {
+ std::vector<std::thread> cleanup_threads;
+ for (size_t i = 0; i < num_threads_; i++) {
+ cleanup_threads.emplace_back([this, i]() {
+ auto stream = &stream_[i];
+ if (*stream) {
+ // forcibly cancel the streams, then finish
+ context_[i].TryCancel();
+ (*stream)->Finish();
+ // don't log any error message on !ok since this was canceled
+ }
+ });
+ }
+ for (auto& th : cleanup_threads) {
+ th.join();
+ }
+ }
+
+ protected:
+ std::vector<grpc::ClientContext> context_;
+ std::vector<std::unique_ptr<StreamType>> stream_;
+ const int messages_per_stream_;
+ std::vector<int> messages_issued_;
+
+ void FinishStream(HistogramEntry* entry, size_t thread_idx) {
+ Status s = stream_[thread_idx]->Finish();
+ // don't set the value since the stream is failed and shouldn't be timed
+ entry->set_status(s.error_code());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
+ s.error_message().c_str());
+ }
+ context_[thread_idx].~ClientContext();
+ new (&context_[thread_idx]) ClientContext();
+ }
+};
+
+class SynchronousStreamingPingPongClient final
+ : public SynchronousStreamingClient<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
+ public:
+ SynchronousStreamingPingPongClient(const ClientConfig& config)
+ : SynchronousStreamingClient(config) {
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
}
- StartThreads(num_threads_);
}
- ~SynchronousStreamingClient() {
+ ~SynchronousStreamingPingPongClient() {
+ std::vector<std::thread> cleanup_threads;
for (size_t i = 0; i < num_threads_; i++) {
- auto stream = &stream_[i];
- if (*stream) {
- (*stream)->WritesDone();
- Status s = (*stream)->Finish();
- if (!s.ok()) {
- gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i,
- s.error_message().c_str());
+ cleanup_threads.emplace_back([this, i]() {
+ auto stream = &stream_[i];
+ if (*stream) {
+ (*stream)->WritesDone();
}
- }
+ });
+ }
+ for (auto& th : cleanup_threads) {
+ th.join();
}
}
@@ -166,45 +215,164 @@ class SynchronousStreamingClient final : public SynchronousClient {
if (!WaitToIssue(thread_idx)) {
return true;
}
- GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
+ GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0);
double start = UsageTimer::Now();
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
entry->set_value((UsageTimer::Now() - start) * 1e9);
// don't set the status since there isn't one yet
+ if ((messages_per_stream_ != 0) &&
+ (++messages_issued_[thread_idx] < messages_per_stream_)) {
+ return true;
+ } else if (messages_per_stream_ == 0) {
+ return true;
+ } else {
+ // Fall through to the below resetting code after finish
+ }
+ }
+ stream_[thread_idx]->WritesDone();
+ FinishStream(entry, thread_idx);
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ messages_issued_[thread_idx] = 0;
+ return true;
+ }
+};
+
+class SynchronousStreamingFromClientClient final
+ : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
+ public:
+ SynchronousStreamingFromClientClient(const ClientConfig& config)
+ : SynchronousStreamingClient(config), last_issue_(num_threads_) {
+ for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ last_issue_[thread_idx] = UsageTimer::Now();
+ }
+ }
+ ~SynchronousStreamingFromClientClient() {
+ std::vector<std::thread> cleanup_threads;
+ for (size_t i = 0; i < num_threads_; i++) {
+ cleanup_threads.emplace_back([this, i]() {
+ auto stream = &stream_[i];
+ if (*stream) {
+ (*stream)->WritesDone();
+ }
+ });
+ }
+ for (auto& th : cleanup_threads) {
+ th.join();
+ }
+ }
+
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ // Figure out how to make histogram sensible if this is rate-paced
+ if (!WaitToIssue(thread_idx)) {
+ return true;
+ }
+ GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0);
+ if (stream_[thread_idx]->Write(request_)) {
+ double now = UsageTimer::Now();
+ entry->set_value((now - last_issue_[thread_idx]) * 1e9);
+ last_issue_[thread_idx] = now;
return true;
}
stream_[thread_idx]->WritesDone();
- Status s = stream_[thread_idx]->Finish();
- // don't set the value since the stream is failed and shouldn't be timed
- entry->set_status(s.error_code());
- if (!s.ok()) {
- gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx,
- s.error_message().c_str());
+ FinishStream(entry, thread_idx);
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
+ &responses_[thread_idx]);
+ return true;
+ }
+
+ private:
+ std::vector<double> last_issue_;
+};
+
+class SynchronousStreamingFromServerClient final
+ : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
+ public:
+ SynchronousStreamingFromServerClient(const ClientConfig& config)
+ : SynchronousStreamingClient(config), last_recv_(num_threads_) {
+ for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
+ last_recv_[thread_idx] = UsageTimer::Now();
+ }
+ }
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
+ if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
+ double now = UsageTimer::Now();
+ entry->set_value((now - last_recv_[thread_idx]) * 1e9);
+ last_recv_[thread_idx] = now;
+ return true;
}
+ FinishStream(entry, thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
- context_[thread_idx].~ClientContext();
- new (&context_[thread_idx]) ClientContext();
- stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
+ stream_[thread_idx] =
+ stub->StreamingFromServer(&context_[thread_idx], request_);
return true;
}
private:
- // These are both conceptually std::vector but cannot be for old compilers
- // that expect contained classes to support copy constructors
- std::vector<grpc::ClientContext> context_;
- std::vector<
- std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>>
- stream_;
+ std::vector<double> last_recv_;
};
-std::unique_ptr<Client> CreateSynchronousUnaryClient(
- const ClientConfig& config) {
- return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
-}
-std::unique_ptr<Client> CreateSynchronousStreamingClient(
- const ClientConfig& config) {
- return std::unique_ptr<Client>(new SynchronousStreamingClient(config));
+class SynchronousStreamingBothWaysClient final
+ : public SynchronousStreamingClient<
+ grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
+ public:
+ SynchronousStreamingBothWaysClient(const ClientConfig& config)
+ : SynchronousStreamingClient(config) {
+ for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
+ auto* stub = channels_[thread_idx % channels_.size()].get_stub();
+ stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
+ }
+ }
+ ~SynchronousStreamingBothWaysClient() {
+ std::vector<std::thread> cleanup_threads;
+ for (size_t i = 0; i < num_threads_; i++) {
+ cleanup_threads.emplace_back([this, i]() {
+ auto stream = &stream_[i];
+ if (*stream) {
+ (*stream)->WritesDone();
+ }
+ });
+ }
+ for (auto& th : cleanup_threads) {
+ th.join();
+ }
+ }
+
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ // TODO (vjpai): Do this
+ return true;
+ }
+};
+
+std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
+ switch (config.rpc_type()) {
+ case UNARY:
+ return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
+ case STREAMING:
+ return std::unique_ptr<Client>(
+ new SynchronousStreamingPingPongClient(config));
+ case STREAMING_FROM_CLIENT:
+ return std::unique_ptr<Client>(
+ new SynchronousStreamingFromClientClient(config));
+ case STREAMING_FROM_SERVER:
+ return std::unique_ptr<Client>(
+ new SynchronousStreamingFromServerClient(config));
+ case STREAMING_BOTH_WAYS:
+ return std::unique_ptr<Client>(
+ new SynchronousStreamingBothWaysClient(config));
+ default:
+ assert(false);
+ return nullptr;
+ }
}
} // namespace testing
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index d437920e68..92408974bd 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -68,15 +68,11 @@ static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNC_CLIENT:
- return (config.rpc_type() == RpcType::UNARY)
- ? CreateSynchronousUnaryClient(config)
- : CreateSynchronousStreamingClient(config);
+ return CreateSynchronousClient(config);
case ClientType::ASYNC_CLIENT:
- return (config.rpc_type() == RpcType::UNARY)
- ? CreateAsyncUnaryClient(config)
- : (config.payload_config().has_bytebuf_params()
- ? CreateGenericAsyncStreamingClient(config)
- : CreateAsyncStreamingClient(config));
+ return config.payload_config().has_bytebuf_params()
+ ? CreateGenericAsyncStreamingClient(config)
+ : CreateAsyncClient(config);
default:
abort();
}
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 7f84816421..a9130bf5d4 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -40,6 +40,9 @@
#include "test/cpp/qps/parse_json.h"
#include "test/cpp/qps/stats.h"
+#include <grpc++/client_context.h>
+#include "src/proto/grpc/testing/services.grpc.pb.h"
+
namespace grpc {
namespace testing {
@@ -142,5 +145,37 @@ void JsonReporter::ReportCpuUsage(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
+void RpcReporter::ReportQPS(const ScenarioResult& result) {
+ grpc::ClientContext context;
+ grpc::Status status;
+ Void dummy;
+
+ gpr_log(GPR_INFO, "RPC reporter sending scenario result to server");
+ status = stub_->ReportScenario(&context, result, &dummy);
+
+ if (status.ok()) {
+ gpr_log(GPR_INFO, "RpcReporter report RPC success!");
+ } else {
+ gpr_log(GPR_ERROR, "RpcReporter report RPC: code: %d. message: %s",
+ status.error_code(), status.error_message().c_str());
+ }
+}
+
+void RpcReporter::ReportQPSPerCore(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
+void RpcReporter::ReportLatency(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
+void RpcReporter::ReportTimes(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
+void RpcReporter::ReportCpuUsage(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index faf87ff060..1749be98c6 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -42,6 +42,9 @@
#include "test/cpp/qps/driver.h"
+#include <grpc++/channel.h>
+#include "src/proto/grpc/testing/services.grpc.pb.h"
+
namespace grpc {
namespace testing {
@@ -124,6 +127,21 @@ class JsonReporter : public Reporter {
const string report_file_;
};
+class RpcReporter : public Reporter {
+ public:
+ RpcReporter(const string& name, std::shared_ptr<grpc::Channel> channel)
+ : Reporter(name), stub_(ReportQpsScenarioService::NewStub(channel)) {}
+
+ private:
+ void ReportQPS(const ScenarioResult& result) override;
+ void ReportQPSPerCore(const ScenarioResult& result) override;
+ void ReportLatency(const ScenarioResult& result) override;
+ void ReportTimes(const ScenarioResult& result) override;
+ void ReportCpuUsage(const ScenarioResult& result) override;
+
+ std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
+};
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index b499b82091..84f1579c2f 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -71,6 +71,18 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerAsyncReaderWriter<ResponseType, RequestType> *,
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_function,
+ std::function<void(ServiceType *, ServerContextType *,
+ ServerAsyncReader<ResponseType, RequestType> *,
+ CompletionQueue *, ServerCompletionQueue *, void *)>
+ request_streaming_from_client_function,
+ std::function<void(ServiceType *, ServerContextType *, RequestType *,
+ ServerAsyncWriter<ResponseType> *, CompletionQueue *,
+ ServerCompletionQueue *, void *)>
+ request_streaming_from_server_function,
+ std::function<void(ServiceType *, ServerContextType *,
+ ServerAsyncReaderWriter<ResponseType, RequestType> *,
+ CompletionQueue *, ServerCompletionQueue *, void *)>
+ request_streaming_both_ways_function,
std::function<grpc::Status(const PayloadConfig &, const RequestType *,
ResponseType *)>
process_rpc)
@@ -107,7 +119,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
std::placeholders::_2);
- for (int i = 0; i < 15000; i++) {
+ for (int i = 0; i < 5000; i++) {
for (int j = 0; j < num_threads; j++) {
if (request_unary_function) {
auto request_unary = std::bind(
@@ -125,6 +137,26 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
contexts_.emplace_back(new ServerRpcContextStreamingImpl(
request_streaming, process_rpc_bound));
}
+ if (request_streaming_from_client_function) {
+ auto request_streaming_from_client = std::bind(
+ request_streaming_from_client_function, &async_service_,
+ std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
+ srv_cqs_[j].get(), std::placeholders::_3);
+ contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
+ request_streaming_from_client, process_rpc_bound));
+ }
+ if (request_streaming_from_server_function) {
+ auto request_streaming_from_server =
+ std::bind(request_streaming_from_server_function, &async_service_,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3, srv_cqs_[j].get(),
+ srv_cqs_[j].get(), std::placeholders::_4);
+ contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
+ request_streaming_from_server, process_rpc_bound));
+ }
+ if (request_streaming_both_ways_function) {
+ // TODO(vjpai): Add this code
+ }
}
}
@@ -289,8 +321,8 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
if (!ok) {
return false;
}
- stream_.Read(&req_, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::read_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
return true;
}
@@ -300,23 +332,23 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Call the RPC processing function
grpc::Status status = invoke_method_(&req_, &response_);
// initiate the write
- stream_.Write(response_, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::write_done;
+ stream_.Write(response_, AsyncQpsServerTest::tag(this));
} else { // client has sent writes done
// finish the stream
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
}
return true;
}
bool write_done(bool ok) {
// now go back and get another streaming read!
if (ok) {
- stream_.Read(&req_, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::read_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
} else {
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
}
return true;
}
@@ -335,6 +367,146 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
+ class ServerRpcContextStreamingFromClientImpl final
+ : public ServerRpcContext {
+ public:
+ ServerRpcContextStreamingFromClientImpl(
+ std::function<void(ServerContextType *,
+ grpc::ServerAsyncReader<ResponseType, RequestType> *,
+ void *)>
+ request_method,
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method)
+ : srv_ctx_(new ServerContextType),
+ next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
+ request_method_(request_method),
+ invoke_method_(invoke_method),
+ stream_(srv_ctx_.get()) {
+ request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
+ }
+ ~ServerRpcContextStreamingFromClientImpl() override {}
+ bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
+ void Reset() override {
+ srv_ctx_.reset(new ServerContextType);
+ req_ = RequestType();
+ stream_ =
+ grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
+
+ // Then request the method
+ next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
+ request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
+ }
+
+ private:
+ bool request_done(bool ok) {
+ if (!ok) {
+ return false;
+ }
+ next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
+ return true;
+ }
+
+ bool read_done(bool ok) {
+ if (ok) {
+ // In this case, just do another read
+ // next_state_ is unchanged
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
+ return true;
+ } else { // client has sent writes done
+ // invoke the method
+ // Call the RPC processing function
+ grpc::Status status = invoke_method_(&req_, &response_);
+ // finish the stream
+ next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
+ stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
+ }
+ return true;
+ }
+ bool finish_done(bool ok) { return false; /* reset the context */ }
+
+ std::unique_ptr<ServerContextType> srv_ctx_;
+ RequestType req_;
+ ResponseType response_;
+ bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
+ std::function<void(ServerContextType *,
+ grpc::ServerAsyncReader<ResponseType, RequestType> *,
+ void *)>
+ request_method_;
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method_;
+ grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
+ };
+
+ class ServerRpcContextStreamingFromServerImpl final
+ : public ServerRpcContext {
+ public:
+ ServerRpcContextStreamingFromServerImpl(
+ std::function<void(ServerContextType *, RequestType *,
+ grpc::ServerAsyncWriter<ResponseType> *, void *)>
+ request_method,
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method)
+ : srv_ctx_(new ServerContextType),
+ next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
+ request_method_(request_method),
+ invoke_method_(invoke_method),
+ stream_(srv_ctx_.get()) {
+ request_method_(srv_ctx_.get(), &req_, &stream_,
+ AsyncQpsServerTest::tag(this));
+ }
+ ~ServerRpcContextStreamingFromServerImpl() override {}
+ bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
+ void Reset() override {
+ srv_ctx_.reset(new ServerContextType);
+ req_ = RequestType();
+ stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
+
+ // Then request the method
+ next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
+ request_method_(srv_ctx_.get(), &req_, &stream_,
+ AsyncQpsServerTest::tag(this));
+ }
+
+ private:
+ bool request_done(bool ok) {
+ if (!ok) {
+ return false;
+ }
+ // invoke the method
+ // Call the RPC processing function
+ grpc::Status status = invoke_method_(&req_, &response_);
+
+ next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
+ stream_.Write(response_, AsyncQpsServerTest::tag(this));
+ return true;
+ }
+
+ bool write_done(bool ok) {
+ if (ok) {
+ // Do another write!
+ // next_state_ is unchanged
+ stream_.Write(response_, AsyncQpsServerTest::tag(this));
+ } else { // must be done so let's finish
+ next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ }
+ return true;
+ }
+ bool finish_done(bool ok) { return false; /* reset the context */ }
+
+ std::unique_ptr<ServerContextType> srv_ctx_;
+ RequestType req_;
+ ResponseType response_;
+ bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
+ std::function<void(ServerContextType *, RequestType *,
+ grpc::ServerAsyncWriter<ResponseType> *, void *)>
+ request_method_;
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method_;
+ grpc::ServerAsyncWriter<ResponseType> stream_;
+ };
+
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
@@ -390,6 +562,9 @@ std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
config, RegisterBenchmarkService,
&BenchmarkService::AsyncService::RequestUnaryCall,
&BenchmarkService::AsyncService::RequestStreamingCall,
+ &BenchmarkService::AsyncService::RequestStreamingFromClient,
+ &BenchmarkService::AsyncService::RequestStreamingFromServer,
+ &BenchmarkService::AsyncService::RequestStreamingBothWays,
ProcessSimpleRPC));
}
std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
@@ -397,7 +572,8 @@ std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
grpc::GenericServerContext>(
config, RegisterGenericService, nullptr,
- &grpc::AsyncGenericService::RequestCall, ProcessGenericRPC));
+ &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
+ ProcessGenericRPC));
}
} // namespace testing
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index f79284d225..f04465e261 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -31,6 +31,9 @@
*
*/
+#include <atomic>
+#include <thread>
+
#include <grpc++/resource_quota.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
@@ -52,12 +55,9 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override {
- if (request->response_size() > 0) {
- if (!Server::SetPayload(request->response_type(),
- request->response_size(),
- response->mutable_payload())) {
- return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
- }
+ auto s = SetResponse(request, response);
+ if (!s.ok()) {
+ return s;
}
return Status::OK;
}
@@ -67,12 +67,9 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
SimpleRequest request;
while (stream->Read(&request)) {
SimpleResponse response;
- if (request.response_size() > 0) {
- if (!Server::SetPayload(request.response_type(),
- request.response_size(),
- response.mutable_payload())) {
- return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
- }
+ auto s = SetResponse(&request, &response);
+ if (!s.ok()) {
+ return s;
}
if (!stream->Write(response)) {
return Status(StatusCode::INTERNAL, "Server couldn't respond");
@@ -80,6 +77,96 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
}
return Status::OK;
}
+ Status StreamingFromClient(ServerContext* context,
+ ServerReader<SimpleRequest>* stream,
+ SimpleResponse* response) override {
+ auto s = ClientPull(context, stream, response);
+ if (!s.ok()) {
+ return s;
+ }
+ return Status::OK;
+ }
+ Status StreamingFromServer(ServerContext* context,
+ const SimpleRequest* request,
+ ServerWriter<SimpleResponse>* stream) override {
+ SimpleResponse response;
+ auto s = SetResponse(request, &response);
+ if (!s.ok()) {
+ return s;
+ }
+ return ServerPush(context, stream, response, nullptr);
+ }
+ Status StreamingBothWays(
+ ServerContext* context,
+ ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
+ // Read the first client message to setup server response
+ SimpleRequest request;
+ if (!stream->Read(&request)) {
+ return Status::OK;
+ }
+ SimpleResponse response;
+ auto s = SetResponse(&request, &response);
+ if (!s.ok()) {
+ return s;
+ }
+ std::atomic_bool done;
+ Status sp;
+ std::thread t([context, stream, &response, &done, &sp]() {
+ sp = ServerPush(context, stream, response, [&done]() {
+ return done.load(std::memory_order_relaxed);
+ });
+ });
+ SimpleResponse dummy;
+ auto cp = ClientPull(context, stream, &dummy);
+ done.store(true, std::memory_order_relaxed); // can be lazy
+ t.join();
+ if (!cp.ok()) {
+ return cp;
+ }
+ if (!sp.ok()) {
+ return sp;
+ }
+ return Status::OK;
+ }
+
+ private:
+ static Status ClientPull(ServerContext* context,
+ ReaderInterface<SimpleRequest>* stream,
+ SimpleResponse* response) {
+ SimpleRequest request;
+ while (stream->Read(&request)) {
+ }
+ if (request.response_size() > 0) {
+ if (!Server::SetPayload(request.response_type(), request.response_size(),
+ response->mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
+ }
+ return Status::OK;
+ }
+ static Status ServerPush(ServerContext* context,
+ WriterInterface<SimpleResponse>* stream,
+ const SimpleResponse& response,
+ std::function<bool()> done) {
+ while ((done == nullptr) || !done()) {
+ // TODO(vjpai): Add potential for rate-pacing on this
+ if (!stream->Write(response)) {
+ return Status(StatusCode::INTERNAL, "Server couldn't push");
+ }
+ }
+ return Status::OK;
+ }
+ static Status SetResponse(const SimpleRequest* request,
+ SimpleResponse* response) {
+ if (request->response_size() > 0) {
+ if (!Server::SetPayload(request->response_type(),
+ request->response_size(),
+ response->mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
+ }
+ return Status::OK;
+ }
};
class SynchronousServer final : public grpc::testing::Server {
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
index 35c8d5d088..e1a03666f0 100644
--- a/test/cpp/thread_manager/thread_manager_test.cc
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -118,7 +118,7 @@ void ThreadManagerTest::PerformTest() {
// The number of times DoWork() was called is equal to the number of times
// WORK_FOUND was returned
- gpr_log(GPR_DEBUG, "DoWork() called %ld times",
+ gpr_log(GPR_DEBUG, "DoWork() called %" PRIdPTR " times",
gpr_atm_no_barrier_load(&num_do_work_));
GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) ==
gpr_atm_no_barrier_load(&num_work_found_));
diff --git a/test/cpp/util/BUILD b/test/cpp/util/BUILD
index 118cf86aae..fc565afed9 100644
--- a/test/cpp/util/BUILD
+++ b/test/cpp/util/BUILD
@@ -33,6 +33,15 @@ load("//bazel:grpc_build_system.bzl", "grpc_cc_library")
package(default_visibility = ["//visibility:public"])
+# The following builds a shared-object to confirm that grpc++_unsecure
+# builds properly. Build-only is sufficient here
+grpc_cc_binary(
+ name = "testso.so",
+ srcs = [],
+ linkshared = 1,
+ deps = ["//:grpc++_unsecure"],
+)
+
grpc_cc_library(
name = "test_config",
srcs = [
@@ -66,7 +75,6 @@ grpc_cc_library(
grpc_cc_library(
name = "test_util",
srcs = [
- # "test/cpp/end2end/test_service_impl.cc",
"byte_buffer_proto_helper.cc",
"create_test_channel.cc",
"string_ref_helper.cc",
@@ -132,3 +140,42 @@ grpc_cc_library(
"//:grpc++",
],
)
+
+cc_test(
+ name = "error_details_test",
+ srcs = [
+ "error_details_test.cc",
+ ],
+ deps = [
+ "//:grpc++_error_details",
+ "//external:gtest",
+ "//src/proto/grpc/testing:echo_messages_proto",
+ ],
+)
+
+cc_binary(
+ name = "grpc_cli",
+ srcs = [
+ "cli_call.cc",
+ "cli_call.h",
+ "cli_credentials.cc",
+ "cli_credentials.h",
+ "config_grpc_cli.h",
+ "grpc_cli.cc",
+ "grpc_tool.cc",
+ "grpc_tool.h",
+ "proto_file_parser.cc",
+ "proto_file_parser.h",
+ "proto_reflection_descriptor_database.cc",
+ "proto_reflection_descriptor_database.h",
+ "service_describer.cc",
+ "service_describer.h",
+ "test_config.h",
+ "test_config_cc.cc",
+ ],
+ deps = [
+ "//:grpc++",
+ "//external:gflags",
+ "//src/proto/grpc/reflection/v1alpha:reflection_proto",
+ ],
+)
diff --git a/test/cpp/util/cli_call.cc b/test/cpp/util/cli_call.cc
index 041cc0e4c3..b26beb050d 100644
--- a/test/cpp/util/cli_call.cc
+++ b/test/cpp/util/cli_call.cc
@@ -91,7 +91,7 @@ void CliCall::Write(const grpc::string& request) {
void* got_tag;
bool ok;
- grpc_slice s = grpc_slice_from_copied_string(request.c_str());
+ gpr_slice s = gpr_slice_from_copied_buffer(request.data(), request.size());
grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
grpc::ByteBuffer send_buffer(&req_slice, 1);
call_->Write(send_buffer, tag(2));
diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc
new file mode 100644
index 0000000000..d01fd3b087
--- /dev/null
+++ b/test/cpp/util/error_details_test.cc
@@ -0,0 +1,120 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc++/support/error_details.h>
+#include <gtest/gtest.h>
+
+#include "src/proto/grpc/status/status.pb.h"
+#include "src/proto/grpc/testing/echo_messages.pb.h"
+
+namespace grpc {
+namespace {
+
+TEST(ExtractTest, Success) {
+ google::rpc::Status expected;
+ expected.set_code(13); // INTERNAL
+ expected.set_message("I am an error message");
+ testing::EchoRequest expected_details;
+ expected_details.set_message(grpc::string(100, '\0'));
+ expected.add_details()->PackFrom(expected_details);
+
+ google::rpc::Status to;
+ grpc::string error_details = expected.SerializeAsString();
+ Status from(static_cast<StatusCode>(expected.code()), expected.message(),
+ error_details);
+ EXPECT_TRUE(ExtractErrorDetails(from, &to).ok());
+ EXPECT_EQ(expected.code(), to.code());
+ EXPECT_EQ(expected.message(), to.message());
+ EXPECT_EQ(1, to.details_size());
+ testing::EchoRequest details;
+ to.details(0).UnpackTo(&details);
+ EXPECT_EQ(expected_details.message(), details.message());
+}
+
+TEST(ExtractTest, NullInput) {
+ EXPECT_EQ(StatusCode::FAILED_PRECONDITION,
+ ExtractErrorDetails(Status(), nullptr).error_code());
+}
+
+TEST(ExtractTest, Unparsable) {
+ grpc::string error_details("I am not a status object");
+ Status from(StatusCode::INTERNAL, "", error_details);
+ google::rpc::Status to;
+ EXPECT_EQ(StatusCode::INVALID_ARGUMENT,
+ ExtractErrorDetails(from, &to).error_code());
+}
+
+TEST(SetTest, Success) {
+ google::rpc::Status expected;
+ expected.set_code(13); // INTERNAL
+ expected.set_message("I am an error message");
+ testing::EchoRequest expected_details;
+ expected_details.set_message(grpc::string(100, '\0'));
+ expected.add_details()->PackFrom(expected_details);
+
+ Status to;
+ Status s = SetErrorDetails(expected, &to);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(expected.code(), to.error_code());
+ EXPECT_EQ(expected.message(), to.error_message());
+ EXPECT_EQ(expected.SerializeAsString(), to.error_details());
+}
+
+TEST(SetTest, NullInput) {
+ EXPECT_EQ(StatusCode::FAILED_PRECONDITION,
+ SetErrorDetails(google::rpc::Status(), nullptr).error_code());
+}
+
+TEST(SetTest, OutOfScopeErrorCode) {
+ google::rpc::Status expected;
+ expected.set_code(20); // Out of scope (DATA_LOSS is 15).
+ expected.set_message("I am an error message");
+ testing::EchoRequest expected_details;
+ expected_details.set_message(grpc::string(100, '\0'));
+ expected.add_details()->PackFrom(expected_details);
+
+ Status to;
+ Status s = SetErrorDetails(expected, &to);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(StatusCode::UNKNOWN, to.error_code());
+ EXPECT_EQ(expected.message(), to.error_message());
+ EXPECT_EQ(expected.SerializeAsString(), to.error_details());
+}
+
+} // namespace
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/util/grpc_tool.cc b/test/cpp/util/grpc_tool.cc
index 856cd32c3c..c7acb7c631 100644
--- a/test/cpp/util/grpc_tool.cc
+++ b/test/cpp/util/grpc_tool.cc
@@ -321,6 +321,7 @@ bool GrpcTool::ListServices(int argc, const char** argv,
std::vector<grpc::string> service_list;
if (!desc_db.GetServices(&service_list)) {
+ fprintf(stderr, "Received an error when querying services endpoint.\n");
return false;
}