diff options
Diffstat (limited to 'test/cpp/qps/worker.cc')
-rw-r--r-- | test/cpp/qps/worker.cc | 109 |
1 files changed, 66 insertions, 43 deletions
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index fdcd9d5069..101eb9f969 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -55,7 +55,7 @@ #include <grpc++/stream.h> #include "test/core/util/grpc_profiler.h" #include "test/cpp/util/create_test_channel.h" -#include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/qpstest.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/server.h" @@ -71,15 +71,20 @@ using namespace gflags; static bool got_sigint = false; +static void sigint_handler(int x) {got_sigint = true;} + namespace grpc { namespace testing { std::unique_ptr<Client> CreateClient(const ClientConfig& config) { switch (config.client_type()) { case ClientType::SYNCHRONOUS_CLIENT: - return CreateSynchronousClient(config); + return (config.rpc_type() == RpcType::UNARY) ? + CreateSynchronousUnaryClient(config) : + CreateSynchronousStreamingClient(config); case ClientType::ASYNC_CLIENT: - return CreateAsyncClient(config); + return (config.rpc_type() == RpcType::UNARY) ? + CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config); } abort(); } @@ -106,6 +111,60 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { return Status(RESOURCE_EXHAUSTED); } + grpc_profiler_start("qps_client.prof"); + Status ret = RunTestBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + Status RunServer(ServerContext* ctx, + ServerReaderWriter<ServerStatus, ServerArgs>* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + grpc_profiler_start("qps_server.prof"); + Status ret = RunServerBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + private: + // Protect against multiple clients using this worker at once. + class InstanceGuard { + public: + InstanceGuard(WorkerImpl* impl) + : impl_(impl), acquired_(impl->TryAcquireInstance()) {} + ~InstanceGuard() { + if (acquired_) { + impl_->ReleaseInstance(); + } + } + + bool Acquired() const { return acquired_; } + + private: + WorkerImpl* const impl_; + const bool acquired_; + }; + + bool TryAcquireInstance() { + std::lock_guard<std::mutex> g(mu_); + if (acquired_) return false; + acquired_ = true; + return true; + } + + void ReleaseInstance() { + std::lock_guard<std::mutex> g(mu_); + GPR_ASSERT(acquired_); + acquired_ = false; + } + + Status RunTestBody(ServerContext* ctx, + ServerReaderWriter<ClientStatus, ClientArgs>* stream) { ClientArgs args; if (!stream->Read(&args)) { return Status(INVALID_ARGUMENT); @@ -132,14 +191,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { return Status::OK; } - Status RunServer(ServerContext* ctx, - ServerReaderWriter<ServerStatus, ServerArgs>* stream) - GRPC_OVERRIDE { - InstanceGuard g(this); - if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); - } - + Status RunServerBody(ServerContext* ctx, + ServerReaderWriter<ServerStatus, ServerArgs>* stream) { ServerArgs args; if (!stream->Read(&args)) { return Status(INVALID_ARGUMENT); @@ -167,38 +220,6 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { return Status::OK; } - private: - // Protect against multiple clients using this worker at once. - class InstanceGuard { - public: - InstanceGuard(WorkerImpl* impl) - : impl_(impl), acquired_(impl->TryAcquireInstance()) {} - ~InstanceGuard() { - if (acquired_) { - impl_->ReleaseInstance(); - } - } - - bool Acquired() const { return acquired_; } - - private: - WorkerImpl* const impl_; - const bool acquired_; - }; - - bool TryAcquireInstance() { - std::lock_guard<std::mutex> g(mu_); - if (acquired_) return false; - acquired_ = true; - return true; - } - - void ReleaseInstance() { - std::lock_guard<std::mutex> g(mu_); - GPR_ASSERT(acquired_); - acquired_ = false; - } - std::mutex mu_; bool acquired_; }; @@ -229,6 +250,8 @@ int main(int argc, char** argv) { grpc_init(); ParseCommandLineFlags(&argc, &argv, true); + signal(SIGINT, sigint_handler); + grpc::testing::RunServer(); grpc_shutdown(); |