aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server_async.cc
diff options
context:
space:
mode:
Diffstat (limited to 'test/cpp/qps/server_async.cc')
-rw-r--r--test/cpp/qps/server_async.cc146
1 files changed, 27 insertions, 119 deletions
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c006262fc3..64aca957e4 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -51,104 +51,38 @@
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
+#include "test/cpp/qps/server.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
-DEFINE_int32(port, 0, "Server port.");
-DEFINE_int32(server_threads, 4, "Number of server threads.");
+namespace grpc {
+namespace testing {
-using grpc::CompletionQueue;
-using grpc::Server;
-using grpc::ServerBuilder;
-using grpc::ServerContext;
-using grpc::ThreadPool;
-using grpc::testing::Payload;
-using grpc::testing::PayloadType;
-using grpc::testing::ServerStats;
-using grpc::testing::SimpleRequest;
-using grpc::testing::SimpleResponse;
-using grpc::testing::StatsRequest;
-using grpc::testing::TestService;
-using grpc::Status;
-
-// In some distros, gflags is in the namespace google, and in some others,
-// in gflags. This hack is enabling us to find both.
-namespace google {}
-namespace gflags {}
-using namespace google;
-using namespace gflags;
-
-static bool got_sigint = false;
-
-static void sigint_handler(int x) { got_sigint = 1; }
-
-static double time_double(struct timeval *tv) {
- return tv->tv_sec + 1e-6 * tv->tv_usec;
-}
-
-static bool SetPayload(PayloadType type, int size, Payload *payload) {
- PayloadType response_type = type;
- // TODO(yangg): Support UNCOMPRESSABLE payload.
- if (type != PayloadType::COMPRESSABLE) {
- return false;
- }
- payload->set_type(response_type);
- std::unique_ptr<char[]> body(new char[size]());
- payload->set_body(body.get(), size);
- return true;
-}
-
-namespace {
-
-class AsyncQpsServerTest {
+class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest() : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
+ AsyncQpsServerTest(const ServerConfig &config, int port)
+ : srv_cq_(), async_service_(&srv_cq_), server_(nullptr) {
char *server_address = NULL;
- gpr_join_host_port(&server_address, "::", FLAGS_port);
+ gpr_join_host_port(&server_address, "::", port);
ServerBuilder builder;
builder.AddPort(server_address);
+ gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
server_ = builder.BuildAndStart();
- gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
- gpr_free(server_address);
using namespace std::placeholders;
request_unary_ = std::bind(&TestService::AsyncService::RequestUnaryCall,
&async_service_, _1, _2, _3, &srv_cq_, _4);
- request_stats_ =
- std::bind(&TestService::AsyncService::RequestCollectServerStats,
- &async_service_, _1, _2, _3, &srv_cq_, _4);
for (int i = 0; i < 100; i++) {
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary_, UnaryCall));
- contexts_.push_front(
- new ServerRpcContextUnaryImpl<StatsRequest, ServerStats>(
- request_stats_, CollectServerStats));
- }
- }
- ~AsyncQpsServerTest() {
- server_->Shutdown();
- void *ignored_tag;
- bool ignored_ok;
- srv_cq_.Shutdown();
- while (srv_cq_.Next(&ignored_tag, &ignored_ok)) {
}
- while (!contexts_.empty()) {
- delete contexts_.front();
- contexts_.pop_front();
- }
- for (auto& thr: threads_) {
- thr.join();
- }
- }
- void ServeRpcs(int num_threads) {
- for (int i = 0; i < num_threads; i++) {
+ for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
@@ -166,8 +100,16 @@ class AsyncQpsServerTest {
return;
}));
}
- while (!got_sigint) {
- std::this_thread::sleep_for(std::chrono::seconds(5));
+ }
+ ~AsyncQpsServerTest() {
+ server_->Shutdown();
+ srv_cq_.Shutdown();
+ for (auto &thr : threads_) {
+ thr.join();
+ }
+ while (!contexts_.empty()) {
+ delete contexts_.front();
+ contexts_.pop_front();
}
}
@@ -176,8 +118,8 @@ class AsyncQpsServerTest {
public:
ServerRpcContext() {}
virtual ~ServerRpcContext(){};
- virtual bool RunNextState() = 0;// do next state, return false if all done
- virtual void Reset() = 0; // start this back at a clean state
+ virtual bool RunNextState() = 0; // do next state, return false if all done
+ virtual void Reset() = 0; // start this back at a clean state
};
static void *tag(ServerRpcContext *func) {
return reinterpret_cast<void *>(func);
@@ -240,17 +182,6 @@ class AsyncQpsServerTest {
grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
};
- static Status CollectServerStats(const StatsRequest *,
- ServerStats *response) {
- struct rusage usage;
- struct timeval tv;
- gettimeofday(&tv, NULL);
- getrusage(RUSAGE_SELF, &usage);
- response->set_time_now(time_double(&tv));
- response->set_time_user(time_double(&usage.ru_utime));
- response->set_time_system(time_double(&usage.ru_stime));
- return Status::OK;
- }
static Status UnaryCall(const SimpleRequest *request,
SimpleResponse *response) {
if (request->has_response_size() && request->response_size() > 0) {
@@ -264,40 +195,17 @@ class AsyncQpsServerTest {
CompletionQueue srv_cq_;
TestService::AsyncService async_service_;
std::vector<std::thread> threads_;
- std::unique_ptr<Server> server_;
+ std::unique_ptr<grpc::Server> server_;
std::function<void(ServerContext *, SimpleRequest *,
grpc::ServerAsyncResponseWriter<SimpleResponse> *, void *)>
request_unary_;
- std::function<void(ServerContext *, StatsRequest *,
- grpc::ServerAsyncResponseWriter<ServerStats> *, void *)>
- request_stats_;
std::forward_list<ServerRpcContext *> contexts_;
};
-} // namespace
-
-static void RunServer() {
- AsyncQpsServerTest server;
-
- grpc_profiler_start("qps_server_async.prof");
-
- server.ServeRpcs(FLAGS_server_threads);
-
- grpc_profiler_stop();
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
+ int port) {
+ return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
}
-int main(int argc, char **argv) {
- grpc_init();
- ParseCommandLineFlags(&argc, &argv, true);
- GPR_ASSERT(FLAGS_port != 0);
- GPR_ASSERT(!FLAGS_enable_ssl);
-
- signal(SIGINT, sigint_handler);
-
- RunServer();
-
- grpc_shutdown();
- google::protobuf::ShutdownProtobufLibrary();
-
- return 0;
-}
+} // namespace testing
+} // namespace grpc