diff options
author | Craig Tiller <ctiller@google.com> | 2015-02-23 16:31:57 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-02-23 16:31:57 -0800 |
commit | 5c004c68deac80995cb3befcca441fc47cf9f302 (patch) | |
tree | fb29e199b2208248d05bf3eb193daa05fccad0aa /test/cpp/qps/server.cc | |
parent | b0a32fc72d6c4ad667a57a98c38fed507d326462 (diff) |
Driver changes
WIP - things compile again after a broad set of changes preparing for
the driver code.
Diffstat (limited to 'test/cpp/qps/server.cc')
-rw-r--r-- | test/cpp/qps/server.cc | 82 |
1 files changed, 64 insertions, 18 deletions
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 8e136349a1..a5edb05493 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -44,6 +44,7 @@ #include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc++/status.h> +#include <grpc++/stream.h> #include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" @@ -51,13 +52,13 @@ #include <grpc/grpc.h> #include <grpc/support/log.h> -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); +DEFINE_int32(driver_port, 0, "Server driver port."); using grpc::Server; using grpc::ServerBuilder; using grpc::ServerContext; +using grpc::ServerReaderWriter; using grpc::ThreadPool; using grpc::testing::Payload; using grpc::testing::PayloadType; @@ -66,6 +67,10 @@ using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::StatsRequest; using grpc::testing::TestService; +using grpc::testing::QpsServer; +using grpc::testing::ServerArgs; +using grpc::testing::ServerStats; +using grpc::testing::ServerStatus; using grpc::Status; // In some distros, gflags is in the namespace google, and in some others, @@ -124,34 +129,76 @@ class TestServiceImpl final : public TestService::Service { } // namespace +class ServerImpl : public QpsServer::Service { + public: + Status RunServer(ServerContext* ctx, ServerReaderWriter<ServerStatus, ServerArgs>* stream) { + ServerArgs args; + std::unique_ptr<ServerStats> last_stats; + if (!stream->Read(&args)) return Status::OK; + + bool done = false; + while (!done) { + std::lock_guard<std::mutex> lock(server_mu_); + + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", FLAGS_port); + + TestServiceImpl service; + + ServerBuilder builder; + builder.AddPort(server_address); + builder.RegisterService(&service); + + gpr_free(server_address); + + std::unique_ptr<ThreadPool> pool(new ThreadPool(args.threads())); + builder.SetThreadPool(pool.get()); + + auto server = builder.BuildAndStart(); + gpr_log(GPR_INFO, "Server listening on %s\n", server_address); + + ServerStatus last_status; + if (last_stats.get()) { + *last_status.mutable_stats() = *last_stats; + } + if (!stream->Write(last_status)) return Status(grpc::UNKNOWN); + + grpc_profiler_start("qps_server.prof"); + + done = stream->Read(&args); + + grpc_profiler_stop(); + } + + ServerStatus last_status; + if (last_stats.get()) { + *last_status.mutable_stats() = *last_stats; + } + stream->Write(last_status); + return Status::OK; + } + + private: + std::mutex server_mu_; +}; + static void RunServer() { char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); - - TestServiceImpl service; + gpr_join_host_port(&server_address, "::", FLAGS_driver_port); - SimpleRequest request; - SimpleResponse response; + ServerImpl service; ServerBuilder builder; builder.AddPort(server_address); builder.RegisterService(&service); - std::unique_ptr<ThreadPool> pool(new ThreadPool(FLAGS_server_threads)); - builder.SetThreadPool(pool.get()); - - std::unique_ptr<Server> server(builder.BuildAndStart()); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); + gpr_free(server_address); - grpc_profiler_start("qps_server.prof"); + auto server = builder.BuildAndStart(); while (!got_sigint) { std::this_thread::sleep_for(std::chrono::seconds(5)); } - - grpc_profiler_stop(); - - gpr_free(server_address); } int main(int argc, char** argv) { @@ -161,7 +208,6 @@ int main(int argc, char** argv) { signal(SIGINT, sigint_handler); GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); RunServer(); grpc_shutdown(); |