aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
authorGravatar Yuxuan Li <yuxuanli@google.com>2017-05-08 11:35:00 -0700
committerGravatar Yuxuan Li <yuxuanli@google.com>2017-05-08 11:35:00 -0700
commita7f7fcf94ac4d32152ef7fdd8e558e18393e1706 (patch)
treedcc435ee4e94acaa82124609f766ceea52a514bc /test/cpp/qps/server_async.cc
parent87827e03aa01ba12829b88a775b946e32eadcf3a (diff)
parentd64a4bea823c85642e57b831fffdc34308bfcb93 (diff)
Merge branch 'master' into poll_stat
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc190
1 files changed, 183 insertions, 7 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4f0b1f54d7..952b334119 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -71,6 +71,18 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
ServerAsyncReaderWriter<ResponseType, RequestType> *,
CompletionQueue *, ServerCompletionQueue *, void *)>
request_streaming_function,
+ std::function<void(ServiceType *, ServerContextType *,
+ ServerAsyncReader<ResponseType, RequestType> *,
+ CompletionQueue *, ServerCompletionQueue *, void *)>
+ request_streaming_from_client_function,
+ std::function<void(ServiceType *, ServerContextType *, RequestType *,
+ ServerAsyncWriter<ResponseType> *, CompletionQueue *,
+ ServerCompletionQueue *, void *)>
+ request_streaming_from_server_function,
+ std::function<void(ServiceType *, ServerContextType *,
+ ServerAsyncReaderWriter<ResponseType, RequestType> *,
+ CompletionQueue *, ServerCompletionQueue *, void *)>
+ request_streaming_both_ways_function,
std::function<grpc::Status(const PayloadConfig &, const RequestType *,
ResponseType *)>
process_rpc)
@@ -107,7 +119,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
std::placeholders::_2);
- for (int i = 0; i < 15000; i++) {
+ for (int i = 0; i < 5000; i++) {
for (int j = 0; j < num_threads; j++) {
if (request_unary_function) {
auto request_unary = std::bind(
@@ -125,6 +137,26 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
contexts_.emplace_back(new ServerRpcContextStreamingImpl(
request_streaming, process_rpc_bound));
}
+ if (request_streaming_from_client_function) {
+ auto request_streaming_from_client = std::bind(
+ request_streaming_from_client_function, &async_service_,
+ std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
+ srv_cqs_[j].get(), std::placeholders::_3);
+ contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
+ request_streaming_from_client, process_rpc_bound));
+ }
+ if (request_streaming_from_server_function) {
+ auto request_streaming_from_server =
+ std::bind(request_streaming_from_server_function, &async_service_,
+ std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3, srv_cqs_[j].get(),
+ srv_cqs_[j].get(), std::placeholders::_4);
+ contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
+ request_streaming_from_server, process_rpc_bound));
+ }
+ if (request_streaming_both_ways_function) {
+ // TODO(vjpai): Add this code
+ }
}
}
@@ -297,8 +329,8 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
if (!ok) {
return false;
}
- stream_.Read(&req_, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::read_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
return true;
}
@@ -308,23 +340,23 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Call the RPC processing function
grpc::Status status = invoke_method_(&req_, &response_);
// initiate the write
- stream_.Write(response_, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::write_done;
+ stream_.Write(response_, AsyncQpsServerTest::tag(this));
} else { // client has sent writes done
// finish the stream
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
}
return true;
}
bool write_done(bool ok) {
// now go back and get another streaming read!
if (ok) {
- stream_.Read(&req_, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::read_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
} else {
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
}
return true;
}
@@ -343,6 +375,146 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
+ class ServerRpcContextStreamingFromClientImpl final
+ : public ServerRpcContext {
+ public:
+ ServerRpcContextStreamingFromClientImpl(
+ std::function<void(ServerContextType *,
+ grpc::ServerAsyncReader<ResponseType, RequestType> *,
+ void *)>
+ request_method,
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method)
+ : srv_ctx_(new ServerContextType),
+ next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
+ request_method_(request_method),
+ invoke_method_(invoke_method),
+ stream_(srv_ctx_.get()) {
+ request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
+ }
+ ~ServerRpcContextStreamingFromClientImpl() override {}
+ bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
+ void Reset() override {
+ srv_ctx_.reset(new ServerContextType);
+ req_ = RequestType();
+ stream_ =
+ grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
+
+ // Then request the method
+ next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
+ request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
+ }
+
+ private:
+ bool request_done(bool ok) {
+ if (!ok) {
+ return false;
+ }
+ next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
+ return true;
+ }
+
+ bool read_done(bool ok) {
+ if (ok) {
+ // In this case, just do another read
+ // next_state_ is unchanged
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
+ return true;
+ } else { // client has sent writes done
+ // invoke the method
+ // Call the RPC processing function
+ grpc::Status status = invoke_method_(&req_, &response_);
+ // finish the stream
+ next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
+ stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
+ }
+ return true;
+ }
+ bool finish_done(bool ok) { return false; /* reset the context */ }
+
+ std::unique_ptr<ServerContextType> srv_ctx_;
+ RequestType req_;
+ ResponseType response_;
+ bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
+ std::function<void(ServerContextType *,
+ grpc::ServerAsyncReader<ResponseType, RequestType> *,
+ void *)>
+ request_method_;
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method_;
+ grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
+ };
+
+ class ServerRpcContextStreamingFromServerImpl final
+ : public ServerRpcContext {
+ public:
+ ServerRpcContextStreamingFromServerImpl(
+ std::function<void(ServerContextType *, RequestType *,
+ grpc::ServerAsyncWriter<ResponseType> *, void *)>
+ request_method,
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method)
+ : srv_ctx_(new ServerContextType),
+ next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
+ request_method_(request_method),
+ invoke_method_(invoke_method),
+ stream_(srv_ctx_.get()) {
+ request_method_(srv_ctx_.get(), &req_, &stream_,
+ AsyncQpsServerTest::tag(this));
+ }
+ ~ServerRpcContextStreamingFromServerImpl() override {}
+ bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
+ void Reset() override {
+ srv_ctx_.reset(new ServerContextType);
+ req_ = RequestType();
+ stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
+
+ // Then request the method
+ next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
+ request_method_(srv_ctx_.get(), &req_, &stream_,
+ AsyncQpsServerTest::tag(this));
+ }
+
+ private:
+ bool request_done(bool ok) {
+ if (!ok) {
+ return false;
+ }
+ // invoke the method
+ // Call the RPC processing function
+ grpc::Status status = invoke_method_(&req_, &response_);
+
+ next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
+ stream_.Write(response_, AsyncQpsServerTest::tag(this));
+ return true;
+ }
+
+ bool write_done(bool ok) {
+ if (ok) {
+ // Do another write!
+ // next_state_ is unchanged
+ stream_.Write(response_, AsyncQpsServerTest::tag(this));
+ } else { // must be done so let's finish
+ next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ }
+ return true;
+ }
+ bool finish_done(bool ok) { return false; /* reset the context */ }
+
+ std::unique_ptr<ServerContextType> srv_ctx_;
+ RequestType req_;
+ ResponseType response_;
+ bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
+ std::function<void(ServerContextType *, RequestType *,
+ grpc::ServerAsyncWriter<ResponseType> *, void *)>
+ request_method_;
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
+ invoke_method_;
+ grpc::ServerAsyncWriter<ResponseType> stream_;
+ };
+
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
@@ -398,6 +570,9 @@ std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
config, RegisterBenchmarkService,
&BenchmarkService::AsyncService::RequestUnaryCall,
&BenchmarkService::AsyncService::RequestStreamingCall,
+ &BenchmarkService::AsyncService::RequestStreamingFromClient,
+ &BenchmarkService::AsyncService::RequestStreamingFromServer,
+ &BenchmarkService::AsyncService::RequestStreamingBothWays,
ProcessSimpleRPC));
}
std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
@@ -405,7 +580,8 @@ std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig &config) {
new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
grpc::GenericServerContext>(
config, RegisterGenericService, nullptr,
- &grpc::AsyncGenericService::RequestCall, ProcessGenericRPC));
+ &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
+ ProcessGenericRPC));
}
} // namespace testing