aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp/qps/server.cc
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-02-23 16:31:57 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-02-23 16:31:57 -0800
commit5c004c68deac80995cb3befcca441fc47cf9f302 (patch)
treefb29e199b2208248d05bf3eb193daa05fccad0aa /test/cpp/qps/server.cc
parentb0a32fc72d6c4ad667a57a98c38fed507d326462 (diff)
Driver changes
WIP - things compile again after a broad set of changes preparing for the driver code.
Diffstat (limited to 'test/cpp/qps/server.cc')
-rw-r--r--test/cpp/qps/server.cc82
1 files changed, 64 insertions, 18 deletions
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc
index 8e136349a1..a5edb05493 100644
--- a/test/cpp/qps/server.cc
+++ b/test/cpp/qps/server.cc
@@ -44,6 +44,7 @@
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/status.h>
+#include <grpc++/stream.h>
#include "src/cpp/server/thread_pool.h"
#include "test/core/util/grpc_profiler.h"
#include "test/cpp/qps/qpstest.pb.h"
@@ -51,13 +52,13 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
-DEFINE_bool(enable_ssl, false, "Whether to use ssl/tls.");
DEFINE_int32(port, 0, "Server port.");
-DEFINE_int32(server_threads, 4, "Number of server threads.");
+DEFINE_int32(driver_port, 0, "Server driver port.");
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
+using grpc::ServerReaderWriter;
using grpc::ThreadPool;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
@@ -66,6 +67,10 @@ using grpc::testing::SimpleRequest;
using grpc::testing::SimpleResponse;
using grpc::testing::StatsRequest;
using grpc::testing::TestService;
+using grpc::testing::QpsServer;
+using grpc::testing::ServerArgs;
+using grpc::testing::ServerStats;
+using grpc::testing::ServerStatus;
using grpc::Status;
// In some distros, gflags is in the namespace google, and in some others,
@@ -124,34 +129,76 @@ class TestServiceImpl final : public TestService::Service {
} // namespace
+class ServerImpl : public QpsServer::Service {
+ public:
+ Status RunServer(ServerContext* ctx, ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
+ ServerArgs args;
+ std::unique_ptr<ServerStats> last_stats;
+ if (!stream->Read(&args)) return Status::OK;
+
+ bool done = false;
+ while (!done) {
+ std::lock_guard<std::mutex> lock(server_mu_);
+
+ char* server_address = NULL;
+ gpr_join_host_port(&server_address, "::", FLAGS_port);
+
+ TestServiceImpl service;
+
+ ServerBuilder builder;
+ builder.AddPort(server_address);
+ builder.RegisterService(&service);
+
+ gpr_free(server_address);
+
+ std::unique_ptr<ThreadPool> pool(new ThreadPool(args.threads()));
+ builder.SetThreadPool(pool.get());
+
+ auto server = builder.BuildAndStart();
+ gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
+
+ ServerStatus last_status;
+ if (last_stats.get()) {
+ *last_status.mutable_stats() = *last_stats;
+ }
+ if (!stream->Write(last_status)) return Status(grpc::UNKNOWN);
+
+ grpc_profiler_start("qps_server.prof");
+
+ done = stream->Read(&args);
+
+ grpc_profiler_stop();
+ }
+
+ ServerStatus last_status;
+ if (last_stats.get()) {
+ *last_status.mutable_stats() = *last_stats;
+ }
+ stream->Write(last_status);
+ return Status::OK;
+ }
+
+ private:
+ std::mutex server_mu_;
+};
+
static void RunServer() {
char* server_address = NULL;
- gpr_join_host_port(&server_address, "::", FLAGS_port);
-
- TestServiceImpl service;
+ gpr_join_host_port(&server_address, "::", FLAGS_driver_port);
- SimpleRequest request;
- SimpleResponse response;
+ ServerImpl service;
ServerBuilder builder;
builder.AddPort(server_address);
builder.RegisterService(&service);
- std::unique_ptr<ThreadPool> pool(new ThreadPool(FLAGS_server_threads));
- builder.SetThreadPool(pool.get());
-
- std::unique_ptr<Server> server(builder.BuildAndStart());
- gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
+ gpr_free(server_address);
- grpc_profiler_start("qps_server.prof");
+ auto server = builder.BuildAndStart();
while (!got_sigint) {
std::this_thread::sleep_for(std::chrono::seconds(5));
}
-
- grpc_profiler_stop();
-
- gpr_free(server_address);
}
int main(int argc, char** argv) {
@@ -161,7 +208,6 @@ int main(int argc, char** argv) {
signal(SIGINT, sigint_handler);
GPR_ASSERT(FLAGS_port != 0);
- GPR_ASSERT(!FLAGS_enable_ssl);
RunServer();
grpc_shutdown();