aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_sync.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/server_sync.cc')
-rw-r--r--test/cpp/qps/server_sync.cc111
1 files changed, 99 insertions, 12 deletions
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index f79284d225..f04465e261 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -31,6 +31,9 @@
*
*/
+#include <atomic>
+#include <thread>
+
#include <grpc++/resource_quota.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
@@ -52,12 +55,9 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override {
- if (request->response_size() > 0) {
- if (!Server::SetPayload(request->response_type(),
- request->response_size(),
- response->mutable_payload())) {
- return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
- }
+ auto s = SetResponse(request, response);
+ if (!s.ok()) {
+ return s;
}
return Status::OK;
}
@@ -67,12 +67,9 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
SimpleRequest request;
while (stream->Read(&request)) {
SimpleResponse response;
- if (request.response_size() > 0) {
- if (!Server::SetPayload(request.response_type(),
- request.response_size(),
- response.mutable_payload())) {
- return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
- }
+ auto s = SetResponse(&request, &response);
+ if (!s.ok()) {
+ return s;
}
if (!stream->Write(response)) {
return Status(StatusCode::INTERNAL, "Server couldn't respond");
@@ -80,6 +77,96 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service {
}
return Status::OK;
}
+ Status StreamingFromClient(ServerContext* context,
+ ServerReader<SimpleRequest>* stream,
+ SimpleResponse* response) override {
+ auto s = ClientPull(context, stream, response);
+ if (!s.ok()) {
+ return s;
+ }
+ return Status::OK;
+ }
+ Status StreamingFromServer(ServerContext* context,
+ const SimpleRequest* request,
+ ServerWriter<SimpleResponse>* stream) override {
+ SimpleResponse response;
+ auto s = SetResponse(request, &response);
+ if (!s.ok()) {
+ return s;
+ }
+ return ServerPush(context, stream, response, nullptr);
+ }
+ Status StreamingBothWays(
+ ServerContext* context,
+ ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
+ // Read the first client message to setup server response
+ SimpleRequest request;
+ if (!stream->Read(&request)) {
+ return Status::OK;
+ }
+ SimpleResponse response;
+ auto s = SetResponse(&request, &response);
+ if (!s.ok()) {
+ return s;
+ }
+ std::atomic_bool done;
+ Status sp;
+ std::thread t([context, stream, &response, &done, &sp]() {
+ sp = ServerPush(context, stream, response, [&done]() {
+ return done.load(std::memory_order_relaxed);
+ });
+ });
+ SimpleResponse dummy;
+ auto cp = ClientPull(context, stream, &dummy);
+ done.store(true, std::memory_order_relaxed); // can be lazy
+ t.join();
+ if (!cp.ok()) {
+ return cp;
+ }
+ if (!sp.ok()) {
+ return sp;
+ }
+ return Status::OK;
+ }
+
+ private:
+ static Status ClientPull(ServerContext* context,
+ ReaderInterface<SimpleRequest>* stream,
+ SimpleResponse* response) {
+ SimpleRequest request;
+ while (stream->Read(&request)) {
+ }
+ if (request.response_size() > 0) {
+ if (!Server::SetPayload(request.response_type(), request.response_size(),
+ response->mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
+ }
+ return Status::OK;
+ }
+ static Status ServerPush(ServerContext* context,
+ WriterInterface<SimpleResponse>* stream,
+ const SimpleResponse& response,
+ std::function<bool()> done) {
+ while ((done == nullptr) || !done()) {
+ // TODO(vjpai): Add potential for rate-pacing on this
+ if (!stream->Write(response)) {
+ return Status(StatusCode::INTERNAL, "Server couldn't push");
+ }
+ }
+ return Status::OK;
+ }
+ static Status SetResponse(const SimpleRequest* request,
+ SimpleResponse* response) {
+ if (request->response_size() > 0) {
+ if (!Server::SetPayload(request->response_type(),
+ request->response_size(),
+ response->mutable_payload())) {
+ return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
+ }
+ }
+ return Status::OK;
+ }
};
class SynchronousServer final : public grpc::testing::Server {