aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/client_async.cc
diff options
context:
space:
mode:
authorGravatar Yuxuan Li <yuxuanli@google.com>2017-05-08 11:35:00 -0700
committerGravatar Yuxuan Li <yuxuanli@google.com>2017-05-08 11:35:00 -0700
commita7f7fcf94ac4d32152ef7fdd8e558e18393e1706 (patch)
treedcc435ee4e94acaa82124609f766ceea52a514bc /test/cpp/qps/client_async.cc
parent87827e03aa01ba12829b88a775b946e32eadcf3a (diff)
parentd64a4bea823c85642e57b831fffdc34308bfcb93 (diff)
Merge branch 'master' into poll_stat
Diffstat (limited to 'test/cpp/qps/client_async.cc')
-rw-r--r--test/cpp/qps/client_async.cc296
1 files changed, 276 insertions, 20 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index d9cda9fb07..3aa8268311 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -321,9 +321,9 @@ class AsyncUnaryClient final
};
template <class RequestType, class ResponseType>
-class ClientRpcContextStreamingImpl : public ClientRpcContext {
+class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
public:
- ClientRpcContextStreamingImpl(
+ ClientRpcContextStreamingPingPongImpl(
BenchmarkService::Stub* stub, const RequestType& req,
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<
@@ -341,7 +341,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
callback_(on_done),
next_issue_(next_issue),
start_req_(start_req) {}
- ~ClientRpcContextStreamingImpl() override {}
+ ~ClientRpcContextStreamingPingPongImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq, config.messages_per_stream());
}
@@ -402,8 +402,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
}
}
void StartNewClone(CompletionQueue* cq) override {
- auto* clone = new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
- start_req_, callback_);
+ auto* clone = new ClientRpcContextStreamingPingPongImpl(
+ stub_, req_, next_issue_, start_req_, callback_);
clone->StartInternal(cq, messages_per_stream_);
}
@@ -442,23 +442,23 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
void StartInternal(CompletionQueue* cq, int messages_per_stream) {
cq_ = cq;
- next_state_ = State::STREAM_IDLE;
- stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
messages_per_stream_ = messages_per_stream;
messages_issued_ = 0;
+ next_state_ = State::STREAM_IDLE;
+ stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
}
};
-class AsyncStreamingClient final
+class AsyncStreamingPingPongClient final
: public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
public:
- explicit AsyncStreamingClient(const ClientConfig& config)
+ explicit AsyncStreamingPingPongClient(const ClientConfig& config)
: AsyncClient<BenchmarkService::Stub, SimpleRequest>(
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
- ~AsyncStreamingClient() override {}
+ ~AsyncStreamingPingPongClient() override {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -472,9 +472,250 @@ class AsyncStreamingClient final
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) {
- return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- stub, req, next_issue, AsyncStreamingClient::StartReq,
- AsyncStreamingClient::CheckDone);
+ return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
+ SimpleResponse>(
+ stub, req, next_issue, AsyncStreamingPingPongClient::StartReq,
+ AsyncStreamingPingPongClient::CheckDone);
+ }
+};
+
+template <class RequestType, class ResponseType>
+class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
+ public:
+ ClientRpcContextStreamingFromClientImpl(
+ BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<gpr_timespec()> next_issue,
+ std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
+ CompletionQueue*, void*)>
+ start_req,
+ std::function<void(grpc::Status, ResponseType*)> on_done)
+ : context_(),
+ stub_(stub),
+ cq_(nullptr),
+ req_(req),
+ response_(),
+ next_state_(State::INVALID),
+ callback_(on_done),
+ next_issue_(next_issue),
+ start_req_(start_req) {}
+ ~ClientRpcContextStreamingFromClientImpl() override {}
+ void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ StartInternal(cq);
+ }
+ bool RunNextState(bool ok, HistogramEntry* entry) override {
+ while (true) {
+ switch (next_state_) {
+ case State::STREAM_IDLE:
+ if (!next_issue_) { // ready to issue
+ next_state_ = State::READY_TO_WRITE;
+ } else {
+ next_state_ = State::WAIT;
+ }
+ break; // loop around, don't return
+ case State::WAIT:
+ alarm_.reset(
+ new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ next_state_ = State::READY_TO_WRITE;
+ return true;
+ case State::READY_TO_WRITE:
+ if (!ok) {
+ return false;
+ }
+ start_ = UsageTimer::Now();
+ next_state_ = State::WRITE_DONE;
+ stream_->Write(req_, ClientRpcContext::tag(this));
+ return true;
+ case State::WRITE_DONE:
+ if (!ok) {
+ return false;
+ }
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
+ next_state_ = State::STREAM_IDLE;
+ break; // loop around
+ default:
+ GPR_ASSERT(false);
+ return false;
+ }
+ }
+ }
+ void StartNewClone(CompletionQueue* cq) override {
+ auto* clone = new ClientRpcContextStreamingFromClientImpl(
+ stub_, req_, next_issue_, start_req_, callback_);
+ clone->StartInternal(cq);
+ }
+
+ private:
+ grpc::ClientContext context_;
+ BenchmarkService::Stub* stub_;
+ CompletionQueue* cq_;
+ std::unique_ptr<Alarm> alarm_;
+ RequestType req_;
+ ResponseType response_;
+ enum State {
+ INVALID,
+ STREAM_IDLE,
+ WAIT,
+ READY_TO_WRITE,
+ WRITE_DONE,
+ };
+ State next_state_;
+ std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<gpr_timespec()> next_issue_;
+ std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
+ CompletionQueue*, void*)>
+ start_req_;
+ grpc::Status status_;
+ double start_;
+ std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
+
+ void StartInternal(CompletionQueue* cq) {
+ cq_ = cq;
+ stream_ = start_req_(stub_, &context_, &response_, cq,
+ ClientRpcContext::tag(this));
+ next_state_ = State::STREAM_IDLE;
+ }
+};
+
+class AsyncStreamingFromClientClient final
+ : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
+ public:
+ explicit AsyncStreamingFromClientClient(const ClientConfig& config)
+ : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
+ config, SetupCtx, BenchmarkStubCreator) {
+ StartThreads(num_async_threads_);
+ }
+
+ ~AsyncStreamingFromClientClient() override {}
+
+ private:
+ static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> StartReq(
+ BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
+ SimpleResponse* resp, CompletionQueue* cq, void* tag) {
+ auto stream = stub->AsyncStreamingFromClient(ctx, resp, cq, tag);
+ return stream;
+ };
+ static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+ std::function<gpr_timespec()> next_issue,
+ const SimpleRequest& req) {
+ return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
+ SimpleResponse>(
+ stub, req, next_issue, AsyncStreamingFromClientClient::StartReq,
+ AsyncStreamingFromClientClient::CheckDone);
+ }
+};
+
+template <class RequestType, class ResponseType>
+class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
+ public:
+ ClientRpcContextStreamingFromServerImpl(
+ BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<gpr_timespec()> next_issue,
+ std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
+ CompletionQueue*, void*)>
+ start_req,
+ std::function<void(grpc::Status, ResponseType*)> on_done)
+ : context_(),
+ stub_(stub),
+ cq_(nullptr),
+ req_(req),
+ response_(),
+ next_state_(State::INVALID),
+ callback_(on_done),
+ next_issue_(next_issue),
+ start_req_(start_req) {}
+ ~ClientRpcContextStreamingFromServerImpl() override {}
+ void Start(CompletionQueue* cq, const ClientConfig& config) override {
+ StartInternal(cq);
+ }
+ bool RunNextState(bool ok, HistogramEntry* entry) override {
+ while (true) {
+ switch (next_state_) {
+ case State::STREAM_IDLE:
+ if (!ok) {
+ return false;
+ }
+ start_ = UsageTimer::Now();
+ next_state_ = State::READ_DONE;
+ stream_->Read(&response_, ClientRpcContext::tag(this));
+ return true;
+ case State::READ_DONE:
+ if (!ok) {
+ return false;
+ }
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
+ callback_(status_, &response_);
+ next_state_ = State::STREAM_IDLE;
+ break; // loop around
+ default:
+ GPR_ASSERT(false);
+ return false;
+ }
+ }
+ }
+ void StartNewClone(CompletionQueue* cq) override {
+ auto* clone = new ClientRpcContextStreamingFromServerImpl(
+ stub_, req_, next_issue_, start_req_, callback_);
+ clone->StartInternal(cq);
+ }
+
+ private:
+ grpc::ClientContext context_;
+ BenchmarkService::Stub* stub_;
+ CompletionQueue* cq_;
+ std::unique_ptr<Alarm> alarm_;
+ RequestType req_;
+ ResponseType response_;
+ enum State { INVALID, STREAM_IDLE, READ_DONE };
+ State next_state_;
+ std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<gpr_timespec()> next_issue_;
+ std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
+ CompletionQueue*, void*)>
+ start_req_;
+ grpc::Status status_;
+ double start_;
+ std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
+
+ void StartInternal(CompletionQueue* cq) {
+ // TODO(vjpai): Add support to rate-pace this
+ cq_ = cq;
+ next_state_ = State::STREAM_IDLE;
+ stream_ =
+ start_req_(stub_, &context_, req_, cq, ClientRpcContext::tag(this));
+ }
+};
+
+class AsyncStreamingFromServerClient final
+ : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
+ public:
+ explicit AsyncStreamingFromServerClient(const ClientConfig& config)
+ : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
+ config, SetupCtx, BenchmarkStubCreator) {
+ StartThreads(num_async_threads_);
+ }
+
+ ~AsyncStreamingFromServerClient() override {}
+
+ private:
+ static void CheckDone(grpc::Status s, SimpleResponse* response) {}
+ static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> StartReq(
+ BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
+ const SimpleRequest& req, CompletionQueue* cq, void* tag) {
+ auto stream = stub->AsyncStreamingFromServer(ctx, req, cq, tag);
+ return stream;
+ };
+ static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+ std::function<gpr_timespec()> next_issue,
+ const SimpleRequest& req) {
+ return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
+ SimpleResponse>(
+ stub, req, next_issue, AsyncStreamingFromServerClient::StartReq,
+ AsyncStreamingFromServerClient::CheckDone);
}
};
@@ -599,11 +840,11 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
cq_ = cq;
const grpc::string kMethodName(
"/grpc.testing.BenchmarkService/StreamingCall");
+ messages_per_stream_ = messages_per_stream;
+ messages_issued_ = 0;
next_state_ = State::STREAM_IDLE;
stream_ = start_req_(stub_, &context_, kMethodName, cq,
ClientRpcContext::tag(this));
- messages_per_stream_ = messages_per_stream;
- messages_issued_ = 0;
}
};
@@ -640,11 +881,26 @@ class GenericAsyncStreamingClient final
}
};
-std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
- return std::unique_ptr<Client>(new AsyncUnaryClient(args));
-}
-std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
- return std::unique_ptr<Client>(new AsyncStreamingClient(args));
+std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {
+ switch (config.rpc_type()) {
+ case UNARY:
+ return std::unique_ptr<Client>(new AsyncUnaryClient(config));
+ case STREAMING:
+ return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));
+ case STREAMING_FROM_CLIENT:
+ return std::unique_ptr<Client>(
+ new AsyncStreamingFromClientClient(config));
+ case STREAMING_FROM_SERVER:
+ return std::unique_ptr<Client>(
+ new AsyncStreamingFromServerClient(config));
+ case STREAMING_BOTH_WAYS:
+ // TODO(vjpai): Implement this
+ assert(false);
+ return nullptr;
+ default:
+ assert(false);
+ return nullptr;
+ }
}
std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
const ClientConfig& args) {