aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/worker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/worker.cc')
-rw-r--r--test/cpp/qps/worker.cc109
1 files changed, 66 insertions, 43 deletions
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index fdcd9d5069..101eb9f969 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -55,7 +55,7 @@
#include <grpc++/stream.h>
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/util/create_test_channel.h"
-#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/qpstest.grpc.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
@@ -71,15 +71,20 @@ using namespace gflags;
static bool got_sigint = false;
+static void sigint_handler(int x) {got_sigint = true;}
+
namespace grpc {
namespace testing {
std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
case ClientType::SYNCHRONOUS_CLIENT:
- return CreateSynchronousClient(config);
+ return (config.rpc_type() == RpcType::UNARY) ?
+ CreateSynchronousUnaryClient(config) :
+ CreateSynchronousStreamingClient(config);
case ClientType::ASYNC_CLIENT:
- return CreateAsyncClient(config);
+ return (config.rpc_type() == RpcType::UNARY) ?
+ CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config);
}
abort();
}
@@ -106,6 +111,60 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
return Status(RESOURCE_EXHAUSTED);
}
+ grpc_profiler_start("qps_client.prof");
+ Status ret = RunTestBody(ctx,stream);
+ grpc_profiler_stop();
+ return ret;
+ }
+
+ Status RunServer(ServerContext* ctx,
+ ServerReaderWriter<ServerStatus, ServerArgs>* stream)
+ GRPC_OVERRIDE {
+ InstanceGuard g(this);
+ if (!g.Acquired()) {
+ return Status(RESOURCE_EXHAUSTED);
+ }
+
+ grpc_profiler_start("qps_server.prof");
+ Status ret = RunServerBody(ctx,stream);
+ grpc_profiler_stop();
+ return ret;
+ }
+
+ private:
+ // Protect against multiple clients using this worker at once.
+ class InstanceGuard {
+ public:
+ InstanceGuard(WorkerImpl* impl)
+ : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
+ ~InstanceGuard() {
+ if (acquired_) {
+ impl_->ReleaseInstance();
+ }
+ }
+
+ bool Acquired() const { return acquired_; }
+
+ private:
+ WorkerImpl* const impl_;
+ const bool acquired_;
+ };
+
+ bool TryAcquireInstance() {
+ std::lock_guard<std::mutex> g(mu_);
+ if (acquired_) return false;
+ acquired_ = true;
+ return true;
+ }
+
+ void ReleaseInstance() {
+ std::lock_guard<std::mutex> g(mu_);
+ GPR_ASSERT(acquired_);
+ acquired_ = false;
+ }
+
+ Status RunTestBody(ServerContext* ctx,
+ ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
return Status(INVALID_ARGUMENT);
@@ -132,14 +191,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
return Status::OK;
}
- Status RunServer(ServerContext* ctx,
- ServerReaderWriter<ServerStatus, ServerArgs>* stream)
- GRPC_OVERRIDE {
- InstanceGuard g(this);
- if (!g.Acquired()) {
- return Status(RESOURCE_EXHAUSTED);
- }
-
+ Status RunServerBody(ServerContext* ctx,
+ ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) {
return Status(INVALID_ARGUMENT);
@@ -167,38 +220,6 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
return Status::OK;
}
- private:
- // Protect against multiple clients using this worker at once.
- class InstanceGuard {
- public:
- InstanceGuard(WorkerImpl* impl)
- : impl_(impl), acquired_(impl->TryAcquireInstance()) {}
- ~InstanceGuard() {
- if (acquired_) {
- impl_->ReleaseInstance();
- }
- }
-
- bool Acquired() const { return acquired_; }
-
- private:
- WorkerImpl* const impl_;
- const bool acquired_;
- };
-
- bool TryAcquireInstance() {
- std::lock_guard<std::mutex> g(mu_);
- if (acquired_) return false;
- acquired_ = true;
- return true;
- }
-
- void ReleaseInstance() {
- std::lock_guard<std::mutex> g(mu_);
- GPR_ASSERT(acquired_);
- acquired_ = false;
- }
-
std::mutex mu_;
bool acquired_;
};
@@ -229,6 +250,8 @@ int main(int argc, char** argv) {
grpc_init();
ParseCommandLineFlags(&argc, &argv, true);
+ signal(SIGINT, sigint_handler);
+
grpc::testing::RunServer();
grpc_shutdown();