diff options
author | yang-g <yangg@google.com> | 2016-01-14 09:12:08 -0800 |
---|---|---|
committer | yang-g <yangg@google.com> | 2016-01-14 09:12:08 -0800 |
commit | df2778d45f57f0647448e566b5bb7e3643b51ebb (patch) | |
tree | 773958a73eb6e147d3d995e2a0ea09381481b59b | |
parent | 11493fbcc82a24f5aa9eeff2a08aa815d17e1145 (diff) | |
parent | dafa125c7b01df1003b96ea2619eb39956f5ddcc (diff) |
Merge remote-tracking branch 'upstream/master' into epollset
-rw-r--r-- | Makefile | 47 | ||||
-rw-r--r-- | build.yaml | 17 | ||||
-rw-r--r-- | include/grpc++/generic/async_generic_service.h | 5 | ||||
-rw-r--r-- | include/grpc++/support/byte_buffer.h | 13 | ||||
-rw-r--r-- | include/grpc++/support/slice.h | 6 | ||||
-rw-r--r-- | src/cpp/util/byte_buffer.cc | 13 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 166 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 156 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 30 | ||||
-rw-r--r-- | test/cpp/qps/generic_async_streaming_ping_pong_test.cc | 82 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 13 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 20 | ||||
-rw-r--r-- | test/cpp/qps/server.h | 7 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 151 | ||||
-rw-r--r-- | tools/run_tests/sources_and_headers.json | 17 | ||||
-rw-r--r-- | tools/run_tests/tests.json | 17 |
16 files changed, 602 insertions, 158 deletions
@@ -903,6 +903,7 @@ cxx_slice_test: $(BINDIR)/$(CONFIG)/cxx_slice_test cxx_string_ref_test: $(BINDIR)/$(CONFIG)/cxx_string_ref_test cxx_time_test: $(BINDIR)/$(CONFIG)/cxx_time_test end2end_test: $(BINDIR)/$(CONFIG)/end2end_test +generic_async_streaming_ping_pong_test: $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test generic_end2end_test: $(BINDIR)/$(CONFIG)/generic_end2end_test grpc_cli: $(BINDIR)/$(CONFIG)/grpc_cli grpc_cpp_plugin: $(BINDIR)/$(CONFIG)/grpc_cpp_plugin @@ -1256,6 +1257,7 @@ buildtests_cxx: buildtests_zookeeper privatelibs_cxx \ $(BINDIR)/$(CONFIG)/cxx_string_ref_test \ $(BINDIR)/$(CONFIG)/cxx_time_test \ $(BINDIR)/$(CONFIG)/end2end_test \ + $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test \ $(BINDIR)/$(CONFIG)/generic_end2end_test \ $(BINDIR)/$(CONFIG)/grpc_cli \ $(BINDIR)/$(CONFIG)/interop_client \ @@ -1555,6 +1557,8 @@ test_cxx: test_zookeeper buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/cxx_time_test || ( echo test cxx_time_test failed ; exit 1 ) $(E) "[RUN] Testing end2end_test" $(Q) $(BINDIR)/$(CONFIG)/end2end_test || ( echo test end2end_test failed ; exit 1 ) + $(E) "[RUN] Testing generic_async_streaming_ping_pong_test" + $(Q) $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test || ( echo test generic_async_streaming_ping_pong_test failed ; exit 1 ) $(E) "[RUN] Testing generic_end2end_test" $(Q) $(BINDIR)/$(CONFIG)/generic_end2end_test || ( echo test generic_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing interop_test" @@ -9325,6 +9329,49 @@ endif endif +GENERIC_ASYNC_STREAMING_PING_PONG_TEST_SRC = \ + test/cpp/qps/generic_async_streaming_ping_pong_test.cc \ + +GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test: $(PROTOBUF_DEP) $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/generic_async_streaming_ping_pong_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/qps/generic_async_streaming_ping_pong_test.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_generic_async_streaming_ping_pong_test: $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(GENERIC_ASYNC_STREAMING_PING_PONG_TEST_OBJS:.o=.dep) +endif +endif + + GENERIC_END2END_TEST_SRC = \ test/cpp/end2end/generic_end2end_test.cc \ diff --git a/build.yaml b/build.yaml index aa30bae9f6..a23197c12c 100644 --- a/build.yaml +++ b/build.yaml @@ -1950,6 +1950,23 @@ targets: - grpc - gpr_test_util - gpr +- name: generic_async_streaming_ping_pong_test + build: test + language: c++ + src: + - test/cpp/qps/generic_async_streaming_ping_pong_test.cc + deps: + - qps + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr + platforms: + - mac + - linux + - posix - name: generic_end2end_test build: test language: c++ diff --git a/include/grpc++/generic/async_generic_service.h b/include/grpc++/generic/async_generic_service.h index 8578d850ff..33045b8d85 100644 --- a/include/grpc++/generic/async_generic_service.h +++ b/include/grpc++/generic/async_generic_service.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -61,6 +61,7 @@ class AsyncGenericService GRPC_FINAL { // TODO(yangg) Once we can add multiple completion queues to the server // in c core, add a CompletionQueue* argument to the ctor here. // TODO(yangg) support methods list. + AsyncGenericService() : server_(nullptr) {} AsyncGenericService(const grpc::string& methods) : server_(nullptr) {} void RequestCall(GenericServerContext* ctx, @@ -75,4 +76,4 @@ class AsyncGenericService GRPC_FINAL { } // namespace grpc -#endif // GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H +#endif // GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H
\ No newline at end of file diff --git a/include/grpc++/support/byte_buffer.h b/include/grpc++/support/byte_buffer.h index 9d19b07708..84042cbef8 100644 --- a/include/grpc++/support/byte_buffer.h +++ b/include/grpc++/support/byte_buffer.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -55,8 +55,14 @@ class ByteBuffer GRPC_FINAL { /// Construct buffer from \a slices, of which there are \a nslices. ByteBuffer(const Slice* slices, size_t nslices); + /// Constuct a byte buffer by referencing elements of existing buffer + /// \a buf. Wrapper of core function grpc_byte_buffer_copy + ByteBuffer(const ByteBuffer& buf); + ~ByteBuffer(); + ByteBuffer& operator=(const ByteBuffer&); + /// Dump (read) the buffer contents into \a slices. void Dump(std::vector<Slice>* slices) const; @@ -69,9 +75,6 @@ class ByteBuffer GRPC_FINAL { private: friend class SerializationTraits<ByteBuffer, void>; - ByteBuffer(const ByteBuffer&); - ByteBuffer& operator=(const ByteBuffer&); - // takes ownership void set_buffer(grpc_byte_buffer* buf) { if (buffer_) { @@ -104,4 +107,4 @@ class SerializationTraits<ByteBuffer, void> { } // namespace grpc -#endif // GRPCXX_SUPPORT_BYTE_BUFFER_H +#endif // GRPCXX_SUPPORT_BYTE_BUFFER_H
\ No newline at end of file diff --git a/include/grpc++/support/slice.h b/include/grpc++/support/slice.h index be316e7c94..30325ef90b 100644 --- a/include/grpc++/support/slice.h +++ b/include/grpc++/support/slice.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,7 +39,7 @@ namespace grpc { -/// A wrapper around \a grpc_slice. +/// A wrapper around \a gpr_slice. /// /// A slice represents a contiguous reference counted array of bytes. /// It is cheap to take references to a slice, and it is cheap to create a @@ -85,4 +85,4 @@ class Slice GRPC_FINAL { } // namespace grpc -#endif // GRPCXX_SUPPORT_SLICE_H +#endif // GRPCXX_SUPPORT_SLICE_H
\ No newline at end of file diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index 755234d7e8..df873eb1ce 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -79,4 +79,13 @@ size_t ByteBuffer::Length() const { } } -} // namespace grpc +ByteBuffer::ByteBuffer(const ByteBuffer& buf) + : buffer_(grpc_byte_buffer_copy(buf.buffer_)) {} + +ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { + Clear(); // first remove existing data + buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy + return *this; +} + +} // namespace grpc
\ No newline at end of file diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index ee0049578d..0159f4f1e6 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,6 +37,9 @@ #include <condition_variable> #include <mutex> +#include <grpc++/support/byte_buffer.h> +#include <grpc++/support/slice.h> + #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" @@ -66,37 +69,64 @@ namespace testing { typedef std::chrono::high_resolution_clock grpc_time_source; typedef std::chrono::time_point<grpc_time_source> grpc_time; -class Client { +template <class RequestType> +class ClientRequestCreator { public: - explicit Client(const ClientConfig& config) - : channels_(config.client_channels()), - timer_(new Timer), - interarrival_timer_() { - for (int i = 0; i < config.client_channels(); i++) { - channels_[i].init(config.server_targets(i % config.server_targets_size()), - config); - } - if (config.payload_config().has_bytebuf_params()) { - GPR_ASSERT(false); // not yet implemented - } else if (config.payload_config().has_simple_params()) { - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size( - config.payload_config().simple_params().resp_size()); - request_.mutable_payload()->set_type( + ClientRequestCreator(RequestType* req, const PayloadConfig&) { + // this template must be specialized + // fail with an assertion rather than a compile-time + // check since these only happen at the beginning anyway + GPR_ASSERT(false); + } +}; + +template <> +class ClientRequestCreator<SimpleRequest> { + public: + ClientRequestCreator(SimpleRequest* req, + const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + GPR_ASSERT(false); // not appropriate for this specialization + } else if (payload_config.has_simple_params()) { + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(payload_config.simple_params().resp_size()); + req->mutable_payload()->set_type( grpc::testing::PayloadType::COMPRESSABLE); - int size = config.payload_config().simple_params().req_size(); + int size = payload_config.simple_params().req_size(); std::unique_ptr<char[]> body(new char[size]); - request_.mutable_payload()->set_body(body.get(), size); - } else if (config.payload_config().has_complex_params()) { - GPR_ASSERT(false); // not yet implemented + req->mutable_payload()->set_body(body.get(), size); + } else if (payload_config.has_complex_params()) { + GPR_ASSERT(false); // not appropriate for this specialization } else { // default should be simple proto without payloads - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size(0); - request_.mutable_payload()->set_type( + req->set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + req->set_response_size(0); + req->mutable_payload()->set_type( grpc::testing::PayloadType::COMPRESSABLE); } } +}; + +template <> +class ClientRequestCreator<ByteBuffer> { + public: + ClientRequestCreator(ByteBuffer* req, const PayloadConfig& payload_config) { + if (payload_config.has_bytebuf_params()) { + std::unique_ptr<char> buf( + new char[payload_config.bytebuf_params().req_size()]); + gpr_slice s = gpr_slice_from_copied_buffer( + buf.get(), payload_config.bytebuf_params().req_size()); + Slice slice(s, Slice::STEAL_REF); + *req = ByteBuffer(&slice, 1); + } else { + GPR_ASSERT(false); // not appropriate for this specialization + } + } +}; + +class Client { + public: + Client() : timer_(new Timer), interarrival_timer_() {} virtual ~Client() {} ClientStats Mark(bool reset) { @@ -134,37 +164,8 @@ class Client { } protected: - SimpleRequest request_; bool closed_loop_; - class ClientChannelInfo { - public: - ClientChannelInfo() {} - ClientChannelInfo(const ClientChannelInfo& i) { - // The copy constructor is to satisfy old compilers - // that need it for using std::vector . It is only ever - // used for empty entries - GPR_ASSERT(!i.channel_ && !i.stub_); - } - void init(const grpc::string& target, const ClientConfig& config) { - // We have to use a 2-phase init like this with a default - // constructor followed by an initializer function to make - // old compilers happy with using this in std::vector - channel_ = CreateTestChannel( - target, config.security_params().server_host_override(), - config.has_security_params(), - !config.security_params().use_test_ca()); - stub_ = BenchmarkService::NewStub(channel_); - } - Channel* get_channel() { return channel_.get(); } - BenchmarkService::Stub* get_stub() { return stub_.get(); } - - private: - std::shared_ptr<Channel> channel_; - std::unique_ptr<BenchmarkService::Stub> stub_; - }; - std::vector<ClientChannelInfo> channels_; - void StartThreads(size_t num_threads) { for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); @@ -295,8 +296,6 @@ class Client { } } - BenchmarkService::Stub* stub_; - ClientConfig config_; std::mutex mu_; std::condition_variable cv_; bool done_; @@ -314,13 +313,68 @@ class Client { std::vector<grpc_time> next_time_; }; +template <class StubType, class RequestType> +class ClientImpl : public Client { + public: + ClientImpl(const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) + : channels_(config.client_channels()), create_stub_(create_stub) { + for (int i = 0; i < config.client_channels(); i++) { + channels_[i].init(config.server_targets(i % config.server_targets_size()), + config, create_stub_); + } + + ClientRequestCreator<RequestType> create_req(&request_, + config.payload_config()); + } + virtual ~ClientImpl() {} + + protected: + RequestType request_; + + class ClientChannelInfo { + public: + ClientChannelInfo() {} + ClientChannelInfo(const ClientChannelInfo& i) { + // The copy constructor is to satisfy old compilers + // that need it for using std::vector . It is only ever + // used for empty entries + GPR_ASSERT(!i.channel_ && !i.stub_); + } + void init(const grpc::string& target, const ClientConfig& config, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) { + // We have to use a 2-phase init like this with a default + // constructor followed by an initializer function to make + // old compilers happy with using this in std::vector + channel_ = CreateTestChannel( + target, config.security_params().server_host_override(), + config.has_security_params(), + !config.security_params().use_test_ca()); + stub_ = create_stub(channel_); + } + Channel* get_channel() { return channel_.get(); } + StubType* get_stub() { return stub_.get(); } + + private: + std::shared_ptr<Channel> channel_; + std::unique_ptr<StubType> stub_; + }; + std::vector<ClientChannelInfo> channels_; + std::function<std::unique_ptr<StubType>(const std::shared_ptr<Channel>&)> + create_stub_; +}; + std::unique_ptr<Client> CreateSynchronousUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateSynchronousStreamingClient( const ClientConfig& args); std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args); std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args); +std::unique_ptr<Client> CreateGenericAsyncStreamingClient( + const ClientConfig& args); } // namespace testing } // namespace grpc -#endif +#endif
\ No newline at end of file diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 3aaf06564c..3e2317c6d4 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,19 +37,20 @@ #include <list> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> +#include <gflags/gflags.h> +#include <grpc++/client_context.h> +#include <grpc++/generic/generic_stub.h> #include <grpc/grpc.h> #include <grpc/support/histogram.h> #include <grpc/support/log.h> -#include <gflags/gflags.h> -#include <grpc++/client_context.h> -#include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" +#include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" #include "src/proto/grpc/testing/services.grpc.pb.h" @@ -147,13 +148,22 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { typedef std::forward_list<ClientRpcContext*> context_list; -class AsyncClient : public Client { +template <class StubType, class RequestType> +class AsyncClient : public ClientImpl<StubType, RequestType> { + // Specify which protected members we are using since there is no + // member name resolution until the template types are fully resolved public: - explicit AsyncClient( - const ClientConfig& config, - std::function<ClientRpcContext*(int, BenchmarkService::Stub*, - const SimpleRequest&)> setup_ctx) - : Client(config), + using Client::SetupLoadTest; + using Client::NextIssueTime; + using Client::closed_loop_; + using ClientImpl<StubType, RequestType>::channels_; + using ClientImpl<StubType, RequestType>::request_; + AsyncClient(const ClientConfig& config, + std::function<ClientRpcContext*(int, StubType*, + const RequestType&)> setup_ctx, + std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> + create_stub) + : ClientImpl<StubType, RequestType>(config, create_stub), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), @@ -343,10 +353,16 @@ class AsyncClient : public Client { int pref_channel_inc_; }; -class AsyncUnaryClient GRPC_FINAL : public AsyncClient { +static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( + std::shared_ptr<Channel> ch) { + return BenchmarkService::NewStub(ch); +} + +class AsyncUnaryClient GRPC_FINAL + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncUnaryClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { StartThreads(config.async_client_threads()); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -437,10 +453,11 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_; }; -class AsyncStreamingClient GRPC_FINAL : public AsyncClient { +class AsyncStreamingClient GRPC_FINAL + : public AsyncClient<BenchmarkService::Stub, SimpleRequest> { public: explicit AsyncStreamingClient(const ClientConfig& config) - : AsyncClient(config, SetupCtx) { + : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); @@ -467,12 +484,119 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { } }; +class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { + public: + ClientRpcContextGenericStreamingImpl( + int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, + std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( + grpc::GenericStub*, grpc::ClientContext*, + const grpc::string& method_name, CompletionQueue*, void*)> start_req, + std::function<void(grpc::Status, ByteBuffer*)> on_done) + : ClientRpcContext(channel_id), + context_(), + stub_(stub), + req_(req), + response_(), + next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent), + callback_(on_done), + start_req_(start_req), + start_(Timer::Now()) {} + ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} + bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + return (this->*next_state_)(ok, hist); + } + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_, + start_req_, callback_); + } + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + const grpc::string kMethodName( + "/grpc.testing.BenchmarkService/StreamingCall"); + stream_ = start_req_(stub_, &context_, kMethodName, cq, + ClientRpcContext::tag(this)); + } + + private: + bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } + bool StartWrite(bool ok) { + if (!ok) { + return (false); + } + start_ = Timer::Now(); + next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone; + stream_->Write(req_, ClientRpcContext::tag(this)); + return true; + } + bool WriteDone(bool ok, Histogram*) { + if (!ok) { + return (false); + } + next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone; + stream_->Read(&response_, ClientRpcContext::tag(this)); + return true; + } + bool ReadDone(bool ok, Histogram* hist) { + hist->Add((Timer::Now() - start_) * 1e9); + return StartWrite(ok); + } + grpc::ClientContext context_; + grpc::GenericStub* stub_; + ByteBuffer req_; + ByteBuffer response_; + bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*); + std::function<void(grpc::Status, ByteBuffer*)> callback_; + std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( + grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, + CompletionQueue*, void*)> start_req_; + grpc::Status status_; + double start_; + std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_; +}; + +static std::unique_ptr<grpc::GenericStub> GenericStubCreator( + std::shared_ptr<Channel> ch) { + return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch)); +} + +class GenericAsyncStreamingClient GRPC_FINAL + : public AsyncClient<grpc::GenericStub, ByteBuffer> { + public: + explicit GenericAsyncStreamingClient(const ClientConfig& config) + : AsyncClient(config, SetupCtx, GenericStubCreator) { + // async streaming currently only supports closed loop + GPR_ASSERT(closed_loop_); + + StartThreads(config.async_client_threads()); + } + + ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + + private: + static void CheckDone(grpc::Status s, ByteBuffer* response) {} + static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq( + grpc::GenericStub* stub, grpc::ClientContext* ctx, + const grpc::string& method_name, CompletionQueue* cq, void* tag) { + auto stream = stub->Call(ctx, method_name, cq, tag); + return stream; + }; + static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, + const ByteBuffer& req) { + return new ClientRpcContextGenericStreamingImpl( + channel_id, stub, req, GenericAsyncStreamingClient::StartReq, + GenericAsyncStreamingClient::CheckDone); + } +}; + std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) { return std::unique_ptr<Client>(new AsyncUnaryClient(args)); } std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) { return std::unique_ptr<Client>(new AsyncStreamingClient(args)); } +std::unique_ptr<Client> CreateGenericAsyncStreamingClient( + const ClientConfig& args) { + return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args)); +} } // namespace testing -} // namespace grpc +} // namespace grpc
\ No newline at end of file diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 9e94f43e35..1045915b83 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,38 +35,44 @@ #include <chrono> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> #include <gflags/gflags.h> +#include <grpc++/client_context.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/histogram.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/time.h> -#include <grpc++/client_context.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> #include <gtest/gtest.h> -#include "test/cpp/util/create_test_channel.h" +#include "src/core/profiling/timers.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" -#include "src/proto/grpc/testing/services.grpc.pb.h" - -#include "src/core/profiling/timers.h" namespace grpc { namespace testing { -class SynchronousClient : public Client { +static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( + std::shared_ptr<Channel> ch) { + return BenchmarkService::NewStub(ch); +} + +class SynchronousClient + : public ClientImpl<BenchmarkService::Stub, SimpleRequest> { public: - SynchronousClient(const ClientConfig& config) : Client(config) { + SynchronousClient(const ClientConfig& config) + : ClientImpl<BenchmarkService::Stub, SimpleRequest>( + config, BenchmarkStubCreator) { num_threads_ = config.outstanding_rpcs_per_channel() * config.client_channels(); responses_.resize(num_threads_); @@ -166,4 +172,4 @@ std::unique_ptr<Client> CreateSynchronousStreamingClient( } } // namespace testing -} // namespace grpc +} // namespace grpc
\ No newline at end of file diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc new file mode 100644 index 0000000000..7a1275054a --- /dev/null +++ b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc @@ -0,0 +1,82 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 10; + +static void RunGenericAsyncStreamingPingPong() { + gpr_log(GPR_INFO, "Running Generic Async Streaming Ping Pong"); + + ClientConfig client_config; + client_config.set_client_type(ASYNC_CLIENT); + client_config.set_outstanding_rpcs_per_channel(1); + client_config.set_client_channels(1); + client_config.set_async_client_threads(1); + client_config.set_rpc_type(STREAMING); + client_config.mutable_load_params()->mutable_closed_loop(); + auto bbuf = client_config.mutable_payload_config()->mutable_bytebuf_params(); + bbuf->set_resp_size(0); + bbuf->set_req_size(0); + + ServerConfig server_config; + server_config.set_server_type(ASYNC_SERVER); + server_config.set_host("localhost"); + server_config.set_async_server_threads(1); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPS(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + grpc::testing::RunGenericAsyncStreamingPingPong(); + return 0; +}
\ No newline at end of file diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index c7096391e6..c70db188d9 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -62,6 +62,8 @@ DEFINE_int32(client_channels, 1, "Number of client channels"); DEFINE_int32(simple_req_size, -1, "Simple proto request payload size"); DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size"); +DEFINE_int32(bbuf_req_size, -1, "Byte-buffer request payload size"); +DEFINE_int32(bbuf_resp_size, -1, "Byte-buffer response payload size"); DEFINE_string(client_type, "SYNC_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); @@ -109,6 +111,13 @@ static void QpsDriver() { if (FLAGS_simple_req_size >= 0) { params->set_req_size(FLAGS_simple_req_size); } + } else if (FLAGS_bbuf_resp_size >= 0) { + auto params = + client_config.mutable_payload_config()->mutable_bytebuf_params(); + params->set_resp_size(FLAGS_bbuf_resp_size); + if (FLAGS_bbuf_req_size >= 0) { + params->set_req_size(FLAGS_bbuf_req_size); + } } else { // set a reasonable default: proto but no payload client_config.mutable_payload_config()->mutable_simple_params(); @@ -175,4 +184,4 @@ int main(int argc, char** argv) { grpc::testing::QpsDriver(); return 0; -} +}
\ No newline at end of file diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index b3f383253f..e7714c0bb3 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,20 +36,20 @@ #include <cassert> #include <memory> #include <mutex> +#include <sstream> #include <string> #include <thread> #include <vector> -#include <sstream> +#include <grpc++/client_context.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/histogram.h> -#include <grpc/support/log.h> #include <grpc/support/host_port.h> -#include <grpc++/client_context.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> -#include <grpc++/security/server_credentials.h> +#include <grpc/support/log.h> #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/client.h" @@ -69,7 +69,9 @@ static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { case ClientType::ASYNC_CLIENT: return (config.rpc_type() == RpcType::UNARY) ? CreateAsyncUnaryClient(config) - : CreateAsyncStreamingClient(config); + : (config.payload_config().has_bytebuf_params() + ? CreateGenericAsyncStreamingClient(config) + : CreateAsyncStreamingClient(config)); default: abort(); } @@ -237,4 +239,4 @@ QpsWorker::QpsWorker(int driver_port) { QpsWorker::~QpsWorker() {} } // namespace testing -} // namespace grpc +} // namespace grpc
\ No newline at end of file diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 620bc32f4b..daee7c3663 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -75,12 +75,11 @@ class Server { } static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; // TODO(yangg): Support UNCOMPRESSABLE payload. if (type != PayloadType::COMPRESSABLE) { return false; } - payload->set_type(response_type); + payload->set_type(type); std::unique_ptr<char[]> body(new char[size]()); payload->set_body(body.get(), size); return true; @@ -113,4 +112,4 @@ std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config); } // namespace testing } // namespace grpc -#endif +#endif
\ No newline at end of file diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 03fde06e77..1ae88d7323 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,15 +38,16 @@ #include <thread> #include <gflags/gflags.h> +#include <grpc++/generic/async_generic_service.h> +#include <grpc++/security/server_credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/support/config.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> -#include <grpc++/support/config.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> -#include <grpc++/server_context.h> -#include <grpc++/security/server_credentials.h> #include <gtest/gtest.h> #include "test/cpp/qps/server.h" @@ -55,9 +56,24 @@ namespace grpc { namespace testing { +template <class RequestType, class ResponseType, class ServiceType, + class ServerContextType> class AsyncQpsServerTest : public Server { public: - explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) { + AsyncQpsServerTest( + const ServerConfig &config, + std::function<void(ServerBuilder *, ServiceType *)> register_service, + std::function<void(ServiceType *, ServerContextType *, RequestType *, + ServerAsyncResponseWriter<ResponseType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_unary_function, + std::function<void(ServiceType *, ServerContextType *, + ServerAsyncReaderWriter<ResponseType, RequestType> *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_function, + std::function<grpc::Status(const PayloadConfig &, const RequestType *, + ResponseType *)> process_rpc) + : Server(config) { char *server_address = NULL; gpr_join_host_port(&server_address, config.host().c_str(), port()); @@ -67,7 +83,8 @@ class AsyncQpsServerTest : public Server { Server::CreateServerCredentials(config)); gpr_free(server_address); - builder.RegisterAsyncService(&async_service_); + register_service(&builder, &async_service_); + for (int i = 0; i < config.async_server_threads(); i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } @@ -75,22 +92,29 @@ class AsyncQpsServerTest : public Server { server_ = builder.BuildAndStart(); using namespace std::placeholders; + + auto process_rpc_bound = + std::bind(process_rpc, config.payload_config(), _1, _2); + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { for (int j = 0; j < config.async_server_threads(); j++) { - auto request_unary = std::bind( - &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_, - _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); - auto request_streaming = std::bind( - &BenchmarkService::AsyncService::RequestStreamingCall, - &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); - contexts_.push_front( - new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - request_unary, ProcessRPC)); - contexts_.push_front( - new ServerRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - request_streaming, ProcessRPC)); + if (request_unary_function) { + auto request_unary = + std::bind(request_unary_function, &async_service_, _1, _2, _3, + srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + contexts_.push_front( + new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound)); + } + if (request_streaming_function) { + auto request_streaming = + std::bind(request_streaming_function, &async_service_, _1, _2, + srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + contexts_.push_front(new ServerRpcContextStreamingImpl( + request_streaming, process_rpc_bound)); + } } } + for (int i = 0; i < config.async_server_threads(); i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } @@ -155,16 +179,15 @@ class AsyncQpsServerTest : public Server { return reinterpret_cast<ServerRpcContext *>(tag); } - template <class RequestType, class ResponseType> class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextUnaryImpl( - std::function<void(ServerContext *, RequestType *, + std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextUnaryImpl::invoker), request_method_(request_method), invoke_method_(invoke_method), @@ -177,7 +200,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); response_writer_ = grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get()); @@ -205,10 +228,10 @@ class AsyncQpsServerTest : public Server { response_writer_.Finish(response, status, AsyncQpsServerTest::tag(this)); return true; } - std::unique_ptr<ServerContext> srv_ctx_; + std::unique_ptr<ServerContextType> srv_ctx_; RequestType req_; bool (ServerRpcContextUnaryImpl::*next_state_)(bool); - std::function<void(ServerContext *, RequestType *, + std::function<void(ServerContextType *, RequestType *, grpc::ServerAsyncResponseWriter<ResponseType> *, void *)> request_method_; std::function<grpc::Status(const RequestType *, ResponseType *)> @@ -216,16 +239,16 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; - template <class RequestType, class ResponseType> class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext { public: ServerRpcContextStreamingImpl( - std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter< - ResponseType, RequestType> *, - void *)> request_method, + std::function<void( + ServerContextType *, + grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> + request_method, std::function<grpc::Status(const RequestType *, ResponseType *)> invoke_method) - : srv_ctx_(new ServerContext), + : srv_ctx_(new ServerContextType), next_state_(&ServerRpcContextStreamingImpl::request_done), request_method_(request_method), invoke_method_(invoke_method), @@ -237,7 +260,7 @@ class AsyncQpsServerTest : public Server { return (this->*next_state_)(ok); } void Reset() GRPC_OVERRIDE { - srv_ctx_.reset(new ServerContext); + srv_ctx_.reset(new ServerContextType); req_ = RequestType(); stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>( srv_ctx_.get()); @@ -286,11 +309,11 @@ class AsyncQpsServerTest : public Server { } bool finish_done(bool ok) { return false; /* reset the context */ } - std::unique_ptr<ServerContext> srv_ctx_; + std::unique_ptr<ServerContextType> srv_ctx_; RequestType req_; bool (ServerRpcContextStreamingImpl::*next_state_)(bool); std::function<void( - ServerContext *, + ServerContextType *, grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)> request_method_; std::function<grpc::Status(const RequestType *, ResponseType *)> @@ -298,20 +321,10 @@ class AsyncQpsServerTest : public Server { grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_; }; - static Status ProcessRPC(const SimpleRequest *request, - SimpleResponse *response) { - if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); - } - } - return Status::OK; - } std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; - BenchmarkService::AsyncService async_service_; + ServiceType async_service_; std::forward_list<ServerRpcContext *> contexts_; class PerThreadShutdownState { @@ -335,9 +348,53 @@ class AsyncQpsServerTest : public Server { std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; +static void RegisterBenchmarkService(ServerBuilder *builder, + BenchmarkService::AsyncService *service) { + builder->RegisterAsyncService(service); +} +static void RegisterGenericService(ServerBuilder *builder, + grpc::AsyncGenericService *service) { + builder->RegisterAsyncGenericService(service); +} + +static Status ProcessSimpleRPC(const PayloadConfig &, + const SimpleRequest *request, + SimpleResponse *response) { + if (request->response_size() > 0) { + if (!Server::SetPayload(request->response_type(), request->response_size(), + response->mutable_payload())) { + return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + } + } + return Status::OK; +} + +static Status ProcessGenericRPC(const PayloadConfig &payload_config, + const ByteBuffer *request, + ByteBuffer *response) { + int resp_size = payload_config.bytebuf_params().resp_size(); + std::unique_ptr<char> buf(new char[resp_size]); + gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size); + Slice slice(s, Slice::STEAL_REF); + *response = ByteBuffer(&slice, 1); + return Status::OK; +} + std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr<Server>(new AsyncQpsServerTest(config)); + return std::unique_ptr<Server>(new AsyncQpsServerTest< + SimpleRequest, SimpleResponse, BenchmarkService::AsyncService, + grpc::ServerContext>( + config, RegisterBenchmarkService, + &BenchmarkService::AsyncService::RequestUnaryCall, + &BenchmarkService::AsyncService::RequestStreamingCall, ProcessSimpleRPC)); +} +std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) { + return std::unique_ptr<Server>( + new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService, + grpc::GenericServerContext>( + config, RegisterGenericService, nullptr, + &grpc::AsyncGenericService::RequestCall, ProcessGenericRPC)); } } // namespace testing -} // namespace grpc +} // namespace grpc
\ No newline at end of file diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 8d86fa3bf3..612b830e80 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -1504,6 +1504,23 @@ "grpc", "grpc++", "grpc++_test_util", + "grpc_test_util", + "qps" + ], + "headers": [], + "language": "c++", + "name": "generic_async_streaming_ping_pong_test", + "src": [ + "test/cpp/qps/generic_async_streaming_ping_pong_test.cc" + ] + }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", "grpc_test_util" ], "headers": [], diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index d4839e235f..2757ce445d 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -1828,6 +1828,23 @@ "ci_platforms": [ "linux", "mac", + "posix" + ], + "exclude_configs": [], + "flaky": false, + "language": "c++", + "name": "generic_async_streaming_ping_pong_test", + "platforms": [ + "linux", + "mac", + "posix" + ] + }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", "posix", "windows" ], |