aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-18 08:57:19 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-18 08:57:19 -0700
commitadbcec4f57f7e25859326a6daf8ad7e08f2805ca (patch)
treec029f503127946f0564134d343c8af9f88ac8da5 /test/cpp/qps
parent7f10c299c7eb5d6688c20ae43c63b0548ca8acd1 (diff)
parenta5f46e29bcee3068f5ec6691b7e06cf944430881 (diff)
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'test/cpp/qps')
-rw-r--r--test/cpp/qps/client_async.cc153
1 files changed, 64 insertions, 89 deletions
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 912c871482..1a4438047d 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -56,11 +56,6 @@ class ClientRpcContext {
}
virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
- void lock() { mu_.lock(); }
- void unlock() { mu_.unlock(); }
-
- private:
- std::mutex mu_;
};
template <class RequestType, class ResponseType>
@@ -73,7 +68,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)>
- start_req,
+ prepare_req,
std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
: context_(),
stub_(stub),
@@ -83,7 +78,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_(State::READY),
callback_(on_done),
next_issue_(next_issue),
- start_req_(start_req) {}
+ prepare_req_(prepare_req) {}
~ClientRpcContextUnaryImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq);
@@ -92,7 +87,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
- response_reader_ = start_req_(stub_, &context_, req_, cq_);
+ response_reader_ = prepare_req_(stub_, &context_, req_, cq_);
+ response_reader_->StartCall();
next_state_ = State::RESP_DONE;
response_reader_->Finish(&response_, &status_,
ClientRpcContext::tag(this));
@@ -111,8 +107,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
}
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
- start_req_, callback_);
- std::lock_guard<ClientRpcContext> lclone(*clone);
+ prepare_req_, callback_);
clone->StartInternal(cq);
}
@@ -130,7 +125,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)>
- start_req_;
+ prepare_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
@@ -252,29 +247,13 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
// this thread isn't supposed to shut down
std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
if (shutdown_state_[thread_idx]->shutdown) {
- // We want to delete the context. However, it is possible that
- // another thread that just initiated an action on this
- // context still has its lock even though the action on the
- // context has completed. To delay for that, just grab the
- // lock for serialization. Take a new scope.
- { std::lock_guard<ClientRpcContext> lctx(*ctx); }
delete ctx;
return true;
}
- bool del = false;
-
- // Create a new scope for a lock_guard'ed region
- {
- std::lock_guard<ClientRpcContext> lctx(*ctx);
- if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- // set the old version to delete
- del = true;
- }
- }
- if (del) {
+ if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
delete ctx;
}
return true;
@@ -311,15 +290,15 @@ class AsyncUnaryClient final
entry->set_status(s.error_code());
}
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
- StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
- const SimpleRequest& request, CompletionQueue* cq) {
- return stub->AsyncUnaryCall(ctx, request, cq);
+ PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
+ const SimpleRequest& request, CompletionQueue* cq) {
+ return stub->PrepareAsyncUnaryCall(ctx, request, cq);
};
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- stub, req, next_issue, AsyncUnaryClient::StartReq,
+ stub, req, next_issue, AsyncUnaryClient::PrepareReq,
AsyncUnaryClient::CheckDone);
}
};
@@ -332,9 +311,8 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
- BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
- void*)>
- start_req,
+ BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
+ prepare_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
@@ -344,7 +322,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
next_state_(State::INVALID),
callback_(on_done),
next_issue_(next_issue),
- start_req_(start_req) {}
+ prepare_req_(prepare_req) {}
~ClientRpcContextStreamingPingPongImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq, config.messages_per_stream());
@@ -407,8 +385,7 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
}
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingPingPongImpl(
- stub_, req_, next_issue_, start_req_, callback_);
- std::lock_guard<ClientRpcContext> lclone(*clone);
+ stub_, req_, next_issue_, prepare_req_, callback_);
clone->StartInternal(cq, messages_per_stream_);
}
@@ -432,10 +409,10 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
State next_state_;
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<gpr_timespec()> next_issue_;
- std::function<std::unique_ptr<
- grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
- BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
- start_req_;
+ std::function<
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
+ prepare_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
@@ -449,8 +426,9 @@ class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
cq_ = cq;
messages_per_stream_ = messages_per_stream;
messages_issued_ = 0;
+ stream_ = prepare_req_(stub_, &context_, cq);
next_state_ = State::STREAM_IDLE;
- stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
+ stream_->StartCall(ClientRpcContext::tag(this));
}
};
@@ -469,9 +447,9 @@ class AsyncStreamingPingPongClient final
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
- StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
- CompletionQueue* cq, void* tag) {
- auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
+ PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
+ CompletionQueue* cq) {
+ auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);
return stream;
};
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
@@ -479,7 +457,7 @@ class AsyncStreamingPingPongClient final
const SimpleRequest& req) {
return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
SimpleResponse>(
- stub, req, next_issue, AsyncStreamingPingPongClient::StartReq,
+ stub, req, next_issue, AsyncStreamingPingPongClient::PrepareReq,
AsyncStreamingPingPongClient::CheckDone);
}
};
@@ -492,8 +470,8 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
- CompletionQueue*, void*)>
- start_req,
+ CompletionQueue*)>
+ prepare_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
@@ -503,7 +481,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
next_state_(State::INVALID),
callback_(on_done),
next_issue_(next_issue),
- start_req_(start_req) {}
+ prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromClientImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq);
@@ -546,8 +524,7 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
}
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromClientImpl(
- stub_, req_, next_issue_, start_req_, callback_);
- std::lock_guard<ClientRpcContext> lclone(*clone);
+ stub_, req_, next_issue_, prepare_req_, callback_);
clone->StartInternal(cq);
}
@@ -570,17 +547,17 @@ class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
- CompletionQueue*, void*)>
- start_req_;
+ CompletionQueue*)>
+ prepare_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
void StartInternal(CompletionQueue* cq) {
cq_ = cq;
- stream_ = start_req_(stub_, &context_, &response_, cq,
- ClientRpcContext::tag(this));
+ stream_ = prepare_req_(stub_, &context_, &response_, cq);
next_state_ = State::STREAM_IDLE;
+ stream_->StartCall(ClientRpcContext::tag(this));
}
};
@@ -597,10 +574,10 @@ class AsyncStreamingFromClientClient final
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
- static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> StartReq(
+ static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(
BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
- SimpleResponse* resp, CompletionQueue* cq, void* tag) {
- auto stream = stub->AsyncStreamingFromClient(ctx, resp, cq, tag);
+ SimpleResponse* resp, CompletionQueue* cq) {
+ auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);
return stream;
};
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
@@ -608,7 +585,7 @@ class AsyncStreamingFromClientClient final
const SimpleRequest& req) {
return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
SimpleResponse>(
- stub, req, next_issue, AsyncStreamingFromClientClient::StartReq,
+ stub, req, next_issue, AsyncStreamingFromClientClient::PrepareReq,
AsyncStreamingFromClientClient::CheckDone);
}
};
@@ -621,8 +598,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
- CompletionQueue*, void*)>
- start_req,
+ CompletionQueue*)>
+ prepare_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: context_(),
stub_(stub),
@@ -632,7 +609,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
next_state_(State::INVALID),
callback_(on_done),
next_issue_(next_issue),
- start_req_(start_req) {}
+ prepare_req_(prepare_req) {}
~ClientRpcContextStreamingFromServerImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq);
@@ -664,8 +641,7 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
}
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextStreamingFromServerImpl(
- stub_, req_, next_issue_, start_req_, callback_);
- std::lock_guard<ClientRpcContext> lclone(*clone);
+ stub_, req_, next_issue_, prepare_req_, callback_);
clone->StartInternal(cq);
}
@@ -682,8 +658,8 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
- CompletionQueue*, void*)>
- start_req_;
+ CompletionQueue*)>
+ prepare_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
@@ -691,9 +667,9 @@ class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
void StartInternal(CompletionQueue* cq) {
// TODO(vjpai): Add support to rate-pace this
cq_ = cq;
+ stream_ = prepare_req_(stub_, &context_, req_, cq);
next_state_ = State::STREAM_IDLE;
- stream_ =
- start_req_(stub_, &context_, req_, cq, ClientRpcContext::tag(this));
+ stream_->StartCall(ClientRpcContext::tag(this));
}
};
@@ -710,10 +686,10 @@ class AsyncStreamingFromServerClient final
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
- static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> StartReq(
+ static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(
BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
- const SimpleRequest& req, CompletionQueue* cq, void* tag) {
- auto stream = stub->AsyncStreamingFromServer(ctx, req, cq, tag);
+ const SimpleRequest& req, CompletionQueue* cq) {
+ auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);
return stream;
};
static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
@@ -721,7 +697,7 @@ class AsyncStreamingFromServerClient final
const SimpleRequest& req) {
return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
SimpleResponse>(
- stub, req, next_issue, AsyncStreamingFromServerClient::StartReq,
+ stub, req, next_issue, AsyncStreamingFromServerClient::PrepareReq,
AsyncStreamingFromServerClient::CheckDone);
}
};
@@ -733,8 +709,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*,
- const grpc::string& method_name, CompletionQueue*, void*)>
- start_req,
+ const grpc::string& method_name, CompletionQueue*)>
+ prepare_req,
std::function<void(grpc::Status, ByteBuffer*)> on_done)
: context_(),
stub_(stub),
@@ -744,7 +720,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
next_state_(State::INVALID),
callback_(on_done),
next_issue_(next_issue),
- start_req_(start_req) {}
+ prepare_req_(prepare_req) {}
~ClientRpcContextGenericStreamingImpl() override {}
void Start(CompletionQueue* cq, const ClientConfig& config) override {
StartInternal(cq, config.messages_per_stream());
@@ -807,8 +783,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
}
void StartNewClone(CompletionQueue* cq) override {
auto* clone = new ClientRpcContextGenericStreamingImpl(
- stub_, req_, next_issue_, start_req_, callback_);
- std::lock_guard<ClientRpcContext> lclone(*clone);
+ stub_, req_, next_issue_, prepare_req_, callback_);
clone->StartInternal(cq, messages_per_stream_);
}
@@ -834,8 +809,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
- CompletionQueue*, void*)>
- start_req_;
+ CompletionQueue*)>
+ prepare_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
@@ -850,9 +825,9 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
"/grpc.testing.BenchmarkService/StreamingCall");
messages_per_stream_ = messages_per_stream;
messages_issued_ = 0;
+ stream_ = prepare_req_(stub_, &context_, kMethodName, cq);
next_state_ = State::STREAM_IDLE;
- stream_ = start_req_(stub_, &context_, kMethodName, cq,
- ClientRpcContext::tag(this));
+ stream_->StartCall(ClientRpcContext::tag(this));
}
};
@@ -874,17 +849,17 @@ class GenericAsyncStreamingClient final
private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}
- static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> StartReq(
+ static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(
grpc::GenericStub* stub, grpc::ClientContext* ctx,
- const grpc::string& method_name, CompletionQueue* cq, void* tag) {
- auto stream = stub->Call(ctx, method_name, cq, tag);
+ const grpc::string& method_name, CompletionQueue* cq) {
+ auto stream = stub->PrepareCall(ctx, method_name, cq);
return stream;
};
static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
std::function<gpr_timespec()> next_issue,
const ByteBuffer& req) {
return new ClientRpcContextGenericStreamingImpl(
- stub, req, next_issue, GenericAsyncStreamingClient::StartReq,
+ stub, req, next_issue, GenericAsyncStreamingClient::PrepareReq,
GenericAsyncStreamingClient::CheckDone);
}
};