diff options
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r-- | test/cpp/qps/server_async.cc | 146 |
1 files changed, 27 insertions, 119 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c006262fc3..64aca957e4 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -51,104 +51,38 @@ #include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/server.h" #include <grpc/grpc.h> #include <grpc/support/log.h> -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); +namespace grpc { +namespace testing { -using grpc::CompletionQueue; -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ThreadPool; -using grpc::testing::Payload; -using grpc::testing::PayloadType; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; -using grpc::Status; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static bool got_sigint = false; - -static void sigint_handler(int x) { got_sigint = 1; } - -static double time_double(struct timeval *tv) { - return tv->tv_sec + 1e-6 * tv->tv_usec; -} - -static bool SetPayload(PayloadType type, int size, Payload *payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - -namespace { - -class AsyncQpsServerTest { +class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + AsyncQpsServerTest(const ServerConfig &config, int port) + : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { char *server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); + gpr_join_host_port(&server_address, "::", port); ServerBuilder builder; builder.AddPort(server_address); + gpr_free(server_address); builder.RegisterAsyncService(&async_service_); server_ = builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); - gpr_free(server_address); using namespace std::placeholders; request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, _1, _2, _3, &srv_cq_, _4); - request_stats_ = - std::bind(&TestService::AsyncService::RequestCollectServerStats, - &async_service_, _1, _2, _3, &srv_cq_, _4); for (int i = 0; i < 100; i++) { contexts_.push_front( new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( request_unary_, UnaryCall)); - contexts_.push_front( - new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>( - request_stats_, CollectServerStats)); - } - } - ~AsyncQpsServerTest() { - server_->Shutdown(); - void *ignored_tag; - bool ignored_ok; - srv_cq_.Shutdown(); - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) { } - while (!contexts_.empty()) { - delete contexts_.front(); - contexts_.pop_front(); - } - for (auto& thr: threads_) { - thr.join(); - } - } - void ServeRpcs(int num_threads) { - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; @@ -166,8 +100,16 @@ class AsyncQpsServerTest { return; })); } - while (!got_sigint) { - std::this_thread::sleep_for(std::chrono::seconds(5)); + } + ~AsyncQpsServerTest() { + server_->Shutdown(); + srv_cq_.Shutdown(); + for (auto &thr : threads_) { + thr.join(); + } + while (!contexts_.empty()) { + delete contexts_.front(); + contexts_.pop_front(); } } @@ -176,8 +118,8 @@ class AsyncQpsServerTest { public: ServerRpcContext() {} virtual ~ServerRpcContext(){}; - virtual bool RunNextState() = 0;// do next state, return false if all done - virtual void Reset() = 0; // start this back at a clean state + virtual bool RunNextState() = 0; // do next state, return false if all done + virtual void Reset() = 0; // start this back at a clean state }; static void *tag(ServerRpcContext *func) { return reinterpret_cast<void *>(func); @@ -240,17 +182,6 @@ class AsyncQpsServerTest { grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; - static Status CollectServerStats(const StatsRequest *, - ServerStats *response) { - struct rusage usage; - struct timeval tv; - gettimeofday(&tv, NULL); - getrusage(RUSAGE_SELF, &usage); - response->set_time_now(time_double(&tv)); - response->set_time_user(time_double(&usage.ru_utime)); - response->set_time_system(time_double(&usage.ru_stime)); - return Status::OK; - } static Status UnaryCall(const SimpleRequest *request, SimpleResponse *response) { if (request->has_response_size() && request->response_size() > 0) { @@ -264,40 +195,17 @@ class AsyncQpsServerTest { CompletionQueue srv_cq_; TestService::AsyncService async_service_; std::vector<std::thread> threads_; - std::unique_ptr<Server> server_; + std::unique_ptr<grpc::Server> server_; std::function<void(ServerContext *, SimpleRequest *, grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> request_unary_; - std::function<void(ServerContext *, StatsRequest *, - grpc::ServerAsyncResponseWriter<ServerStats> *, void *)> - request_stats_; std::forward_list<ServerRpcContext *> contexts_; }; -} // namespace - -static void RunServer() { - AsyncQpsServerTest server; - - grpc_profiler_start("qps_server_async.prof"); - - server.ServeRpcs(FLAGS_server_threads); - - grpc_profiler_stop(); +std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, + int port) { + return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port)); } -int main(int argc, char **argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); - - signal(SIGINT, sigint_handler); - - RunServer(); - - grpc_shutdown(); - google::protobuf::ShutdownProtobufLibrary(); - - return 0; -} +} // namespace testing +} // namespace grpc |