diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-03-04 12:50:11 -0800 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-03-04 12:50:11 -0800 |
commit | d6479d6cc42ca180bcfc219751cf2a6f73f87dd4 (patch) | |
tree | 2ffe4636168ac687015b8ca2b47181501549f744 | |
parent | ef6383904280e5606cbf6b5f71a534b1da17e956 (diff) |
Async server works
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | build.json | 1 | ||||
-rw-r--r-- | test/cpp/qps/server.cc | 31 | ||||
-rw-r--r-- | test/cpp/qps/server.h | 30 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 140 | ||||
-rw-r--r-- | test/cpp/qps/worker.cc | 2 |
6 files changed, 58 insertions, 148 deletions
@@ -8181,6 +8181,7 @@ QPS_WORKER_SRC = \ test/cpp/qps/client.cc \ test/cpp/qps/client_async.cc \ test/cpp/qps/server.cc \ + test/cpp/qps/server_async.cc \ test/cpp/qps/worker.cc \ QPS_WORKER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(QPS_WORKER_SRC)))) @@ -8214,6 +8215,7 @@ endif $(OBJDIR)/$(CONFIG)/test/cpp/qps/client.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/server.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a +$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a deps_qps_worker: $(QPS_WORKER_OBJS:.o=.dep) diff --git a/build.json b/build.json index 08b1e1842c..a8dc2e6bec 100644 --- a/build.json +++ b/build.json @@ -1849,6 +1849,7 @@ "test/cpp/qps/client.cc", "test/cpp/qps/client_async.cc", "test/cpp/qps/server.cc", + "test/cpp/qps/server_async.cc", "test/cpp/qps/worker.cc" ], "deps": [ diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 8424dbafe9..e598fb51ae 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -57,24 +57,12 @@ namespace grpc { namespace testing { -static bool SetPayload(PayloadType type, int size, Payload* payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - class TestServiceImpl GRPC_FINAL : public TestService::Service { public: Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) override { if (request->has_response_size() && request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), + if (!Server::SetPayload(request->response_type(), request->response_size(), response->mutable_payload())) { return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); } @@ -87,21 +75,7 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server { public: SynchronousServer(const ServerConfig& config, int port) : thread_pool_(config.threads()), - impl_(MakeImpl(port)), - timer_(new Timer) {} - - ServerStats Mark() GRPC_OVERRIDE { - std::unique_ptr<Timer> timer(new Timer); - timer.swap(timer_); - - auto timer_result = timer->Mark(); - - ServerStats stats; - stats.set_time_elapsed(timer_result.wall); - stats.set_time_system(timer_result.system); - stats.set_time_user(timer_result.user); - return stats; - } + impl_(MakeImpl(port)) {} private: std::unique_ptr<grpc::Server> MakeImpl(int port) { @@ -120,7 +94,6 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server { TestServiceImpl service_; ThreadPool thread_pool_; std::unique_ptr<grpc::Server> impl_; - std::unique_ptr<Timer> timer_; }; std::unique_ptr<grpc::testing::Server> CreateSynchronousServer( diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 3542c17a6a..ca22d7ca1c 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -34,6 +34,7 @@ #ifndef TEST_QPS_SERVER_H #define TEST_QPS_SERVER_H +#include "test/cpp/qps/timer.h" #include "test/cpp/qps/qpstest.pb.h" namespace grpc { @@ -41,9 +42,36 @@ namespace testing { class Server { public: + Server():timer_(new Timer) {} virtual ~Server() {} - virtual ServerStats Mark() = 0; + ServerStats Mark() { + std::unique_ptr<Timer> timer(new Timer); + timer.swap(timer_); + + auto timer_result = timer->Mark(); + + ServerStats stats; + stats.set_time_elapsed(timer_result.wall); + stats.set_time_system(timer_result.system); + stats.set_time_user(timer_result.user); + return stats; + } + + static bool SetPayload(PayloadType type, int size, Payload* payload) { + PayloadType response_type = type; + // TODO(yangg): Support UNCOMPRESSABLE payload. + if (type != PayloadType::COMPRESSABLE) { + return false; + } + payload->set_type(response_type); + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); + return true; + } + + private: + std::unique_ptr<Timer> timer_; }; std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config, diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c006262fc3..741a85802a 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -51,104 +51,37 @@ #include "src/cpp/server/thread_pool.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/server.h" #include <grpc/grpc.h> #include <grpc/support/log.h> -DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls."); -DEFINE_int32(port, 0, "Server port."); -DEFINE_int32(server_threads, 4, "Number of server threads."); +namespace grpc { + namespace testing { -using grpc::CompletionQueue; -using grpc::Server; -using grpc::ServerBuilder; -using grpc::ServerContext; -using grpc::ThreadPool; -using grpc::testing::Payload; -using grpc::testing::PayloadType; -using grpc::testing::ServerStats; -using grpc::testing::SimpleRequest; -using grpc::testing::SimpleResponse; -using grpc::testing::StatsRequest; -using grpc::testing::TestService; -using grpc::Status; - -// In some distros, gflags is in the namespace google, and in some others, -// in gflags. This hack is enabling us to find both. -namespace google {} -namespace gflags {} -using namespace google; -using namespace gflags; - -static bool got_sigint = false; - -static void sigint_handler(int x) { got_sigint = 1; } - -static double time_double(struct timeval *tv) { - return tv->tv_sec + 1e-6 * tv->tv_usec; -} - -static bool SetPayload(PayloadType type, int size, Payload *payload) { - PayloadType response_type = type; - // TODO(yangg): Support UNCOMPRESSABLE payload. - if (type != PayloadType::COMPRESSABLE) { - return false; - } - payload->set_type(response_type); - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - return true; -} - -namespace { - -class AsyncQpsServerTest { +class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { + AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) { char *server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_port); + gpr_join_host_port(&server_address, "::", port); ServerBuilder builder; builder.AddPort(server_address); + gpr_free(server_address); builder.RegisterAsyncService(&async_service_); server_ = builder.BuildAndStart(); - gpr_log(GPR_INFO, "Server listening on %s\n", server_address); - gpr_free(server_address); using namespace std::placeholders; request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall, &async_service_, _1, _2, _3, &srv_cq_, _4); - request_stats_ = - std::bind(&TestService::AsyncService::RequestCollectServerStats, - &async_service_, _1, _2, _3, &srv_cq_, _4); for (int i = 0; i < 100; i++) { contexts_.push_front( new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( request_unary_, UnaryCall)); - contexts_.push_front( - new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>( - request_stats_, CollectServerStats)); - } - } - ~AsyncQpsServerTest() { - server_->Shutdown(); - void *ignored_tag; - bool ignored_ok; - srv_cq_.Shutdown(); - while (srv_cq_.Next(&ignored_tag, &ignored_ok)) { } - while (!contexts_.empty()) { - delete contexts_.front(); - contexts_.pop_front(); - } - for (auto& thr: threads_) { - thr.join(); - } - } - void ServeRpcs(int num_threads) { - for (int i = 0; i < num_threads; i++) { + for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; @@ -166,8 +99,16 @@ class AsyncQpsServerTest { return; })); } - while (!got_sigint) { - std::this_thread::sleep_for(std::chrono::seconds(5)); + } + ~AsyncQpsServerTest() { + server_->Shutdown(); + srv_cq_.Shutdown(); + for (auto& thr: threads_) { + thr.join(); + } + while (!contexts_.empty()) { + delete contexts_.front(); + contexts_.pop_front(); } } @@ -240,17 +181,6 @@ class AsyncQpsServerTest { grpc::ServerAsyncResponseWriter<ResponseType> response_writer_; }; - static Status CollectServerStats(const StatsRequest *, - ServerStats *response) { - struct rusage usage; - struct timeval tv; - gettimeofday(&tv, NULL); - getrusage(RUSAGE_SELF, &usage); - response->set_time_now(time_double(&tv)); - response->set_time_user(time_double(&usage.ru_utime)); - response->set_time_system(time_double(&usage.ru_stime)); - return Status::OK; - } static Status UnaryCall(const SimpleRequest *request, SimpleResponse *response) { if (request->has_response_size() && request->response_size() > 0) { @@ -264,40 +194,16 @@ class AsyncQpsServerTest { CompletionQueue srv_cq_; TestService::AsyncService async_service_; std::vector<std::thread> threads_; - std::unique_ptr<Server> server_; + std::unique_ptr<grpc::Server> server_; std::function<void(ServerContext *, SimpleRequest *, grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)> request_unary_; - std::function<void(ServerContext *, StatsRequest *, - grpc::ServerAsyncResponseWriter<ServerStats> *, void *)> - request_stats_; std::forward_list<ServerRpcContext *> contexts_; }; -} // namespace - -static void RunServer() { - AsyncQpsServerTest server; - - grpc_profiler_start("qps_server_async.prof"); - - server.ServeRpcs(FLAGS_server_threads); - - grpc_profiler_stop(); +std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port) { + return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port)); } -int main(int argc, char **argv) { - grpc_init(); - ParseCommandLineFlags(&argc, &argv, true); - GPR_ASSERT(FLAGS_port != 0); - GPR_ASSERT(!FLAGS_enable_ssl); - - signal(SIGINT, sigint_handler); - - RunServer(); - - grpc_shutdown(); - google::protobuf::ShutdownProtobufLibrary(); - - return 0; -} + }// namespace testing +}// namespace grpc diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 4a2e798a47..a8d5752120 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -90,7 +90,7 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config) { case ServerType::SYNCHRONOUS_SERVER: return CreateSynchronousServer(config, FLAGS_server_port); case ServerType::ASYNC_SERVER: - abort(); // return CreateAsyncServer(config, FLAGS_server_port); + return CreateAsyncServer(config, FLAGS_server_port); } abort(); } |