From eea8cf0fe3a836b78e9ba122a01f6f1552ad8402 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 23 Mar 2017 16:19:00 -0700 Subject: Add QPS tests for one-sided streaming --- test/cpp/qps/server_async.cc | 190 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 183 insertions(+), 7 deletions(-) (limited to 'test/cpp/qps/server_async.cc') diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index b499b82091..84f1579c2f 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -71,6 +71,18 @@ class AsyncQpsServerTest final : public grpc::testing::Server { ServerAsyncReaderWriter *, CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, + std::function *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_from_client_function, + std::function *, CompletionQueue *, + ServerCompletionQueue *, void *)> + request_streaming_from_server_function, + std::function *, + CompletionQueue *, ServerCompletionQueue *, void *)> + request_streaming_both_ways_function, std::function process_rpc) @@ -107,7 +119,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { std::bind(process_rpc, config.payload_config(), std::placeholders::_1, std::placeholders::_2); - for (int i = 0; i < 15000; i++) { + for (int i = 0; i < 5000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = std::bind( @@ -125,6 +137,26 @@ class AsyncQpsServerTest final : public grpc::testing::Server { contexts_.emplace_back(new ServerRpcContextStreamingImpl( request_streaming, process_rpc_bound)); } + if (request_streaming_from_client_function) { + auto request_streaming_from_client = std::bind( + request_streaming_from_client_function, &async_service_, + std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_3); + contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl( + request_streaming_from_client, process_rpc_bound)); + } + if (request_streaming_from_server_function) { + auto request_streaming_from_server = + std::bind(request_streaming_from_server_function, &async_service_, + std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3, srv_cqs_[j].get(), + srv_cqs_[j].get(), std::placeholders::_4); + contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl( + request_streaming_from_server, process_rpc_bound)); + } + if (request_streaming_both_ways_function) { + // TODO(vjpai): Add this code + } } } @@ -289,8 +321,8 @@ class AsyncQpsServerTest final : public grpc::testing::Server { if (!ok) { return false; } - stream_.Read(&req_, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::read_done; + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); return true; } @@ -300,23 +332,23 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Call the RPC processing function grpc::Status status = invoke_method_(&req_, &response_); // initiate the write - stream_.Write(response_, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::write_done; + stream_.Write(response_, AsyncQpsServerTest::tag(this)); } else { // client has sent writes done // finish the stream - stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::finish_done; + stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); } return true; } bool write_done(bool ok) { // now go back and get another streaming read! if (ok) { - stream_.Read(&req_, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::read_done; + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); } else { - stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); next_state_ = &ServerRpcContextStreamingImpl::finish_done; + stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); } return true; } @@ -335,6 +367,146 @@ class AsyncQpsServerTest final : public grpc::testing::Server { grpc::ServerAsyncReaderWriter stream_; }; + class ServerRpcContextStreamingFromClientImpl final + : public ServerRpcContext { + public: + ServerRpcContextStreamingFromClientImpl( + std::function *, + void *)> + request_method, + std::function + invoke_method) + : srv_ctx_(new ServerContextType), + next_state_(&ServerRpcContextStreamingFromClientImpl::request_done), + request_method_(request_method), + invoke_method_(invoke_method), + stream_(srv_ctx_.get()) { + request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); + } + ~ServerRpcContextStreamingFromClientImpl() override {} + bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } + void Reset() override { + srv_ctx_.reset(new ServerContextType); + req_ = RequestType(); + stream_ = + grpc::ServerAsyncReader(srv_ctx_.get()); + + // Then request the method + next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done; + request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this)); + } + + private: + bool request_done(bool ok) { + if (!ok) { + return false; + } + next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done; + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); + return true; + } + + bool read_done(bool ok) { + if (ok) { + // In this case, just do another read + // next_state_ is unchanged + stream_.Read(&req_, AsyncQpsServerTest::tag(this)); + return true; + } else { // client has sent writes done + // invoke the method + // Call the RPC processing function + grpc::Status status = invoke_method_(&req_, &response_); + // finish the stream + next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done; + stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this)); + } + return true; + } + bool finish_done(bool ok) { return false; /* reset the context */ } + + std::unique_ptr srv_ctx_; + RequestType req_; + ResponseType response_; + bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool); + std::function *, + void *)> + request_method_; + std::function + invoke_method_; + grpc::ServerAsyncReader stream_; + }; + + class ServerRpcContextStreamingFromServerImpl final + : public ServerRpcContext { + public: + ServerRpcContextStreamingFromServerImpl( + std::function *, void *)> + request_method, + std::function + invoke_method) + : srv_ctx_(new ServerContextType), + next_state_(&ServerRpcContextStreamingFromServerImpl::request_done), + request_method_(request_method), + invoke_method_(invoke_method), + stream_(srv_ctx_.get()) { + request_method_(srv_ctx_.get(), &req_, &stream_, + AsyncQpsServerTest::tag(this)); + } + ~ServerRpcContextStreamingFromServerImpl() override {} + bool RunNextState(bool ok) override { return (this->*next_state_)(ok); } + void Reset() override { + srv_ctx_.reset(new ServerContextType); + req_ = RequestType(); + stream_ = grpc::ServerAsyncWriter(srv_ctx_.get()); + + // Then request the method + next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done; + request_method_(srv_ctx_.get(), &req_, &stream_, + AsyncQpsServerTest::tag(this)); + } + + private: + bool request_done(bool ok) { + if (!ok) { + return false; + } + // invoke the method + // Call the RPC processing function + grpc::Status status = invoke_method_(&req_, &response_); + + next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done; + stream_.Write(response_, AsyncQpsServerTest::tag(this)); + return true; + } + + bool write_done(bool ok) { + if (ok) { + // Do another write! + // next_state_ is unchanged + stream_.Write(response_, AsyncQpsServerTest::tag(this)); + } else { // must be done so let's finish + next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done; + stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this)); + } + return true; + } + bool finish_done(bool ok) { return false; /* reset the context */ } + + std::unique_ptr srv_ctx_; + RequestType req_; + ResponseType response_; + bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool); + std::function *, void *)> + request_method_; + std::function + invoke_method_; + grpc::ServerAsyncWriter stream_; + }; + std::vector threads_; std::unique_ptr server_; std::vector> srv_cqs_; @@ -390,6 +562,9 @@ std::unique_ptr CreateAsyncServer(const ServerConfig &config) { config, RegisterBenchmarkService, &BenchmarkService::AsyncService::RequestUnaryCall, &BenchmarkService::AsyncService::RequestStreamingCall, + &BenchmarkService::AsyncService::RequestStreamingFromClient, + &BenchmarkService::AsyncService::RequestStreamingFromServer, + &BenchmarkService::AsyncService::RequestStreamingBothWays, ProcessSimpleRPC)); } std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { @@ -397,7 +572,8 @@ std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { new AsyncQpsServerTest( config, RegisterGenericService, nullptr, - &grpc::AsyncGenericService::RequestCall, ProcessGenericRPC)); + &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr, + ProcessGenericRPC)); } } // namespace testing -- cgit v1.2.3