aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc125
1 files changed, 68 insertions, 57 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 6cb3192908..b9998405f6 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -64,7 +64,7 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
- char* server_address = NULL;
+ char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
@@ -95,16 +95,19 @@ class AsyncQpsServerTest : public Server {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
- void* got_tag;
+ void *got_tag;
while (srv_cq_->Next(&got_tag, &ok)) {
- ServerRpcContext* ctx = detag(got_tag);
+ ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- if (ctx->RunNextState(ok) == false) {
+ bool still_going = ctx->RunNextState(ok);
+ std::lock_guard<std::mutex> g(shutdown_mutex_);
+ if (!shutdown_) {
// this RPC context is done, so refresh it
- std::lock_guard<std::mutex> g(shutdown_mutex_);
- if (!shutdown_) {
+ if (!still_going) {
ctx->Reset();
}
+ } else {
+ return;
}
}
return;
@@ -116,11 +119,15 @@ class AsyncQpsServerTest : public Server {
{
std::lock_guard<std::mutex> g(shutdown_mutex_);
shutdown_ = true;
- srv_cq_->Shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
+ srv_cq_->Shutdown();
+ bool ok;
+ void *got_tag;
+ while (srv_cq_->Next(&got_tag, &ok))
+ ;
while (!contexts_.empty()) {
delete contexts_.front();
contexts_.pop_front();
@@ -133,23 +140,23 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext() {}
virtual ~ServerRpcContext(){};
virtual bool RunNextState(bool) = 0; // next state, return false if done
- virtual void Reset() = 0; // start this back at a clean state
+ virtual void Reset() = 0; // start this back at a clean state
};
- static void* tag(ServerRpcContext* func) {
- return reinterpret_cast<void*>(func);
+ static void *tag(ServerRpcContext *func) {
+ return reinterpret_cast<void *>(func);
}
- static ServerRpcContext* detag(void* tag) {
- return reinterpret_cast<ServerRpcContext*>(tag);
+ static ServerRpcContext *detag(void *tag) {
+ return reinterpret_cast<ServerRpcContext *>(tag);
}
template <class RequestType, class ResponseType>
class ServerRpcContextUnaryImpl GRPC_FINAL : public ServerRpcContext {
public:
ServerRpcContextUnaryImpl(
- std::function<void(ServerContext*, RequestType*,
- grpc::ServerAsyncResponseWriter<ResponseType>*,
- void*)> request_method,
- std::function<grpc::Status(const RequestType*, ResponseType*)>
+ std::function<void(ServerContext *, RequestType *,
+ grpc::ServerAsyncResponseWriter<ResponseType> *,
+ void *)> request_method,
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: next_state_(&ServerRpcContextUnaryImpl::invoker),
request_method_(request_method),
@@ -159,7 +166,9 @@ class AsyncQpsServerTest : public Server {
AsyncQpsServerTest::tag(this));
}
~ServerRpcContextUnaryImpl() GRPC_OVERRIDE {}
- bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);}
+ bool RunNextState(bool ok) GRPC_OVERRIDE {
+ return (this->*next_state_)(ok);
+ }
void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext();
req_ = RequestType();
@@ -192,10 +201,10 @@ class AsyncQpsServerTest : public Server {
ServerContext srv_ctx_;
RequestType req_;
bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
- std::function<void(ServerContext*, RequestType*,
- grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
+ std::function<void(ServerContext *, RequestType *,
+ grpc::ServerAsyncResponseWriter<ResponseType> *, void *)>
request_method_;
- std::function<grpc::Status(const RequestType*, ResponseType*)>
+ std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_;
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
@@ -204,9 +213,9 @@ class AsyncQpsServerTest : public Server {
class ServerRpcContextStreamingImpl GRPC_FINAL : public ServerRpcContext {
public:
ServerRpcContextStreamingImpl(
- std::function<void(ServerContext *,
- grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType> *, void *)> request_method,
+ std::function<void(ServerContext *, grpc::ServerAsyncReaderWriter<
+ ResponseType, RequestType> *,
+ void *)> request_method,
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method)
: next_state_(&ServerRpcContextStreamingImpl::request_done),
@@ -215,14 +224,15 @@ class AsyncQpsServerTest : public Server {
stream_(&srv_ctx_) {
request_method_(&srv_ctx_, &stream_, AsyncQpsServerTest::tag(this));
}
- ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {
+ ~ServerRpcContextStreamingImpl() GRPC_OVERRIDE {}
+ bool RunNextState(bool ok) GRPC_OVERRIDE {
+ return (this->*next_state_)(ok);
}
- bool RunNextState(bool ok) GRPC_OVERRIDE {return (this->*next_state_)(ok);}
void Reset() GRPC_OVERRIDE {
srv_ctx_ = ServerContext();
req_ = RequestType();
- stream_ = grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType>(&srv_ctx_);
+ stream_ =
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(&srv_ctx_);
// Then request the method
next_state_ = &ServerRpcContextStreamingImpl::request_done;
@@ -241,47 +251,47 @@ class AsyncQpsServerTest : public Server {
bool read_done(bool ok) {
if (ok) {
- // invoke the method
- ResponseType response;
- // Call the RPC processing function
- grpc::Status status = invoke_method_(&req_, &response);
- // initiate the write
- stream_.Write(response, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::write_done;
- } else { // client has sent writes done
- // finish the stream
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ // invoke the method
+ ResponseType response;
+ // Call the RPC processing function
+ grpc::Status status = invoke_method_(&req_, &response);
+ // initiate the write
+ stream_.Write(response, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::write_done;
+ } else { // client has sent writes done
+ // finish the stream
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::finish_done;
}
return true;
}
bool write_done(bool ok) {
// now go back and get another streaming read!
if (ok) {
- stream_.Read(&req_, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::read_done;
- }
- else {
- stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
- next_state_ = &ServerRpcContextStreamingImpl::finish_done;
+ stream_.Read(&req_, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::read_done;
+ } else {
+ stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
+ next_state_ = &ServerRpcContextStreamingImpl::finish_done;
}
return true;
}
- bool finish_done(bool ok) {return false; /* reset the context */ }
+ bool finish_done(bool ok) { return false; /* reset the context */ }
ServerContext srv_ctx_;
RequestType req_;
bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
- std::function<void(ServerContext *,
- grpc::ServerAsyncReaderWriter<ResponseType,
- RequestType> *, void *)> request_method_;
+ std::function<void(
+ ServerContext *,
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType> *, void *)>
+ request_method_;
std::function<grpc::Status(const RequestType *, ResponseType *)>
invoke_method_;
- grpc::ServerAsyncReaderWriter<ResponseType,RequestType> stream_;
+ grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
};
- static Status ProcessRPC(const SimpleRequest* request,
- SimpleResponse* response) {
+ static Status ProcessRPC(const SimpleRequest *request,
+ SimpleResponse *response) {
if (request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
@@ -294,19 +304,20 @@ class AsyncQpsServerTest : public Server {
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> srv_cq_;
TestService::AsyncService async_service_;
- std::function<void(ServerContext*, SimpleRequest*,
- grpc::ServerAsyncResponseWriter<SimpleResponse>*, void*)>
+ std::function<void(ServerContext *, SimpleRequest *,
+ grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
- std::function<void(ServerContext*, grpc::ServerAsyncReaderWriter<
- SimpleResponse,SimpleRequest>*, void*)>
+ std::function<void(
+ ServerContext *,
+ grpc::ServerAsyncReaderWriter<SimpleResponse, SimpleRequest> *, void *)>
request_streaming_;
- std::forward_list<ServerRpcContext*> contexts_;
+ std::forward_list<ServerRpcContext *> contexts_;
std::mutex shutdown_mutex_;
bool shutdown_;
};
-std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config,
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
int port) {
return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}