aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-03-04 12:50:11 -0800
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-03-04 12:50:11 -0800
commitd6479d6cc42ca180bcfc219751cf2a6f73f87dd4 (patch)
tree2ffe4636168ac687015b8ca2b47181501549f744
parentef6383904280e5606cbf6b5f71a534b1da17e956 (diff)
Async server works
-rw-r--r--Makefile2
-rw-r--r--build.json1
-rw-r--r--test/cpp/qps/server.cc31
-rw-r--r--test/cpp/qps/server.h30
-rw-r--r--test/cpp/qps/server_async.cc140
-rw-r--r--test/cpp/qps/worker.cc2
6 files changed, 58 insertions, 148 deletions
diff --git a/Makefile b/Makefile
index 36eedb95ad..aef6ff3a63 100644
--- a/Makefile
+++ b/Makefile
@@ -8181,6 +8181,7 @@ QPS_WORKER_SRC = \
test/cpp/qps/client.cc \
test/cpp/qps/client_async.cc \
test/cpp/qps/server.cc \
+ test/cpp/qps/server_async.cc \
test/cpp/qps/worker.cc \
QPS_WORKER_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(QPS_WORKER_SRC))))
@@ -8214,6 +8215,7 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/client_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/server.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
+$(OBJDIR)/$(CONFIG)/test/cpp/qps/server_async.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/qps/worker.o: $(LIBDIR)/$(CONFIG)/libqps.a $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_qps_worker: $(QPS_WORKER_OBJS:.o=.dep)
diff --git a/build.json b/build.json
index 08b1e1842c..a8dc2e6bec 100644
--- a/build.json
+++ b/build.json
@@ -1849,6 +1849,7 @@
"test/cpp/qps/client.cc",
"test/cpp/qps/client_async.cc",
"test/cpp/qps/server.cc",
+ "test/cpp/qps/server_async.cc",
"test/cpp/qps/worker.cc"
],
"deps": [
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc
index 8424dbafe9..e598fb51ae 100644
--- a/test/cpp/qps/server.cc
+++ b/test/cpp/qps/server.cc
@@ -57,24 +57,12 @@
namespace grpc {
namespace testing {
-static bool SetPayload(PayloadType type, int size, Payload* payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
- }
- payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- return true;
-}
-
class TestServiceImpl GRPC_FINAL : public TestService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) override {
if (request->has_response_size() && request->response_size() > 0) {
- if (!SetPayload(request->response_type(), request->response_size(),
+ if (!Server::SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {
return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
}
@@ -87,21 +75,7 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
SynchronousServer(const ServerConfig& config, int port)
: thread_pool_(config.threads()),
- impl_(MakeImpl(port)),
- timer_(new Timer) {}
-
- ServerStats Mark() GRPC_OVERRIDE {
- std::unique_ptr<Timer> timer(new Timer);
- timer.swap(timer_);
-
- auto timer_result = timer->Mark();
-
- ServerStats stats;
- stats.set_time_elapsed(timer_result.wall);
- stats.set_time_system(timer_result.system);
- stats.set_time_user(timer_result.user);
- return stats;
- }
+ impl_(MakeImpl(port)) {}
private:
std::unique_ptr<grpc::Server> MakeImpl(int port) {
@@ -120,7 +94,6 @@ class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
TestServiceImpl service_;
ThreadPool thread_pool_;
std::unique_ptr<grpc::Server> impl_;
- std::unique_ptr<Timer> timer_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 3542c17a6a..ca22d7ca1c 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -34,6 +34,7 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
+#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/qpstest.pb.h"
namespace grpc {
@@ -41,9 +42,36 @@ namespace testing {
class Server {
public:
+ Server():timer_(new Timer) {}
virtual ~Server() {}
- virtual ServerStats Mark() = 0;
+ ServerStats Mark() {
+ std::unique_ptr<Timer> timer(new Timer);
+ timer.swap(timer_);
+
+ auto timer_result = timer->Mark();
+
+ ServerStats stats;
+ stats.set_time_elapsed(timer_result.wall);
+ stats.set_time_system(timer_result.system);
+ stats.set_time_user(timer_result.user);
+ return stats;
+ }
+
+ static bool SetPayload(PayloadType type, int size, Payload* payload) {
+ PayloadType response_type = type;
+ // TODO(yangg): Support UNCOMPRESSABLE payload.
+ if (type != PayloadType::COMPRESSABLE) {
+ return false;
+ }
+ payload->set_type(response_type);
+ std::unique_ptr<char[]> body(new char[size]());
+ payload->set_body(body.get(), size);
+ return true;
+ }
+
+ private:
+ std::unique_ptr<Timer> timer_;
};
std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c006262fc3..741a85802a 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -51,104 +51,37 @@
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/server.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
-DEFINE_int32(port, 0, "Server port.");
-DEFINE_int32(server_threads, 4, "Number of server threads.");
+namespace grpc {
+ namespace testing {
-using grpc::CompletionQueue;
-using grpc::Server;
-using grpc::ServerBuilder;
-using grpc::ServerContext;
-using grpc::ThreadPool;
-using grpc::testing::Payload;
-using grpc::testing::PayloadType;
-using grpc::testing::ServerStats;
-using grpc::testing::SimpleRequest;
-using grpc::testing::SimpleResponse;
-using grpc::testing::StatsRequest;
-using grpc::testing::TestService;
-using grpc::Status;
-
-// In some distros, gflags is in the namespace google, and in some others,
-// in gflags. This hack is enabling us to find both.
-namespace google {}
-namespace gflags {}
-using namespace google;
-using namespace gflags;
-
-static bool got_sigint = false;
-
-static void sigint_handler(int x) { got_sigint = 1; }
-
-static double time_double(struct timeval *tv) {
- return tv->tv_sec + 1e-6 * tv->tv_usec;
-}
-
-static bool SetPayload(PayloadType type, int size, Payload *payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
- }
- payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- return true;
-}
-
-namespace {
-
-class AsyncQpsServerTest {
+class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
+ AsyncQpsServerTest(const ServerConfig& config, int port) : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL;
- gpr_join_host_port(&server_address, "::", FLAGS_port);
+ gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
builder.AddPort(server_address);
+ gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
server_ = builder.BuildAndStart();
- gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
- gpr_free(server_address);
using namespace std::placeholders;
request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall,
&async_service_, _1, _2, _3, &srv_cq_, _4);
- request_stats_ =
- std::bind(&TestService::AsyncService::RequestCollectServerStats,
- &async_service_, _1, _2, _3, &srv_cq_, _4);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, UnaryCall));
- contexts_.push_front(
- new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>(
- request_stats_, CollectServerStats));
- }
- }
- ~AsyncQpsServerTest() {
- server_->Shutdown();
- void *ignored_tag;
- bool ignored_ok;
- srv_cq_.Shutdown();
- while (srv_cq_.Next(&ignored_tag, &ignored_ok)) {
}
- while (!contexts_.empty()) {
- delete contexts_.front();
- contexts_.pop_front();
- }
- for (auto& thr: threads_) {
- thr.join();
- }
- }
- void ServeRpcs(int num_threads) {
- for (int i = 0; i < num_threads; i++) {
+ for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
@@ -166,8 +99,16 @@ class AsyncQpsServerTest {
return;
}));
}
- while (!got_sigint) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ }
+ ~AsyncQpsServerTest() {
+ server_->Shutdown();
+ srv_cq_.Shutdown();
+ for (auto& thr: threads_) {
+ thr.join();
+ }
+ while (!contexts_.empty()) {
+ delete contexts_.front();
+ contexts_.pop_front();
}
}
@@ -240,17 +181,6 @@ class AsyncQpsServerTest {
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
- static Status CollectServerStats(const StatsRequest *,
- ServerStats *response) {
- struct rusage usage;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- getrusage(RUSAGE_SELF, &usage);
- response->set_time_now(time_double(&tv));
- response->set_time_user(time_double(&usage.ru_utime));
- response->set_time_system(time_double(&usage.ru_stime));
- return Status::OK;
- }
static Status UnaryCall(const SimpleRequest *request,
SimpleResponse *response) {
if (request->has_response_size() && request->response_size() > 0) {
@@ -264,40 +194,16 @@ class AsyncQpsServerTest {
CompletionQueue srv_cq_;
TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
- std::unique_ptr<Server> server_;
+ std::unique_ptr<grpc::Server> server_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
- std::function<void(ServerContext *, StatsRequest *,
- grpc::ServerAsyncResponseWriter<ServerStats> *, void *)>
- request_stats_;
std::forward_list<ServerRpcContext *> contexts_;
};
-} // namespace
-
-static void RunServer() {
- AsyncQpsServerTest server;
-
- grpc_profiler_start("qps_server_async.prof");
-
- server.ServeRpcs(FLAGS_server_threads);
-
- grpc_profiler_stop();
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port) {
+ return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
-int main(int argc, char **argv) {
- grpc_init();
- ParseCommandLineFlags(&argc, &argv, true);
- GPR_ASSERT(FLAGS_port != 0);
- GPR_ASSERT(!FLAGS_enable_ssl);
-
- signal(SIGINT, sigint_handler);
-
- RunServer();
-
- grpc_shutdown();
- google::protobuf::ShutdownProtobufLibrary();
-
- return 0;
-}
+ }// namespace testing
+}// namespace grpc
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 4a2e798a47..a8d5752120 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -90,7 +90,7 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
case ServerType::SYNCHRONOUS_SERVER:
return CreateSynchronousServer(config, FLAGS_server_port);
case ServerType::ASYNC_SERVER:
- abort(); // return CreateAsyncServer(config, FLAGS_server_port);
+ return CreateAsyncServer(config, FLAGS_server_port);
}
abort();
}