diff options
Diffstat (limited to 'test/cpp/qps')
-rw-r--r-- | test/cpp/qps/driver.cc | 28 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 3 | ||||
-rw-r--r-- | test/cpp/qps/qps_driver.cc | 4 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.cc | 233 | ||||
-rw-r--r-- | test/cpp/qps/qps_worker.h | 60 | ||||
-rw-r--r-- | test/cpp/qps/smoke_test.cc | 8 | ||||
-rwxr-xr-x | test/cpp/qps/smoke_test.sh | 28 | ||||
-rw-r--r-- | test/cpp/qps/worker.cc | 189 |
8 files changed, 332 insertions, 221 deletions
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index f44883783d..9f7d3b56a4 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -42,21 +42,25 @@ #include <grpc++/stream.h> #include <list> #include <thread> +#include <deque> #include <vector> #include "test/cpp/qps/histogram.h" +#include "test/cpp/qps/qps_worker.h" +#include "test/core/util/port.h" using std::list; using std::thread; using std::unique_ptr; +using std::deque; using std::vector; namespace grpc { namespace testing { -static vector<string> get_hosts(const string& name) { +static deque<string> get_hosts(const string& name) { char* env = gpr_getenv(name.c_str()); - if (!env) return vector<string>(); + if (!env) return deque<string>(); - vector<string> out; + deque<string> out; char* p = env; for (;;) { char* comma = strchr(p, ','); @@ -76,7 +80,8 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, const ServerConfig& server_config, size_t num_servers, int warmup_seconds, - int benchmark_seconds) { + int benchmark_seconds, + int spawn_local_worker_count) { // ClientContext allocator (all are destroyed at scope exit) list<ClientContext> contexts; auto alloc_context = [&contexts]() { @@ -88,6 +93,21 @@ ScenarioResult RunScenario(const ClientConfig& initial_client_config, auto workers = get_hosts("QPS_WORKERS"); ClientConfig client_config = initial_client_config; + // Spawn some local workers if desired + vector<unique_ptr<QpsWorker>> local_workers; + for (int i = 0; i < abs(spawn_local_worker_count); i++) { + int driver_port = grpc_pick_unused_port_or_die(); + int benchmark_port = grpc_pick_unused_port_or_die(); + local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port)); + char addr[256]; + sprintf(addr, "localhost:%d", driver_port); + if (spawn_local_worker_count < 0) { + workers.push_front(addr); + } else { + workers.push_back(addr); + } + } + // TODO(ctiller): support running multiple configurations, and binpack // client/server pairs // to available workers diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index b3a8bf8cc4..eb7119a89d 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -56,7 +56,8 @@ ScenarioResult RunScenario(const grpc::testing::ClientConfig& client_config, const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, - int benchmark_seconds); + int benchmark_seconds, + int spawn_local_worker_count); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 220f826118..8959f7b97e 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -42,6 +42,7 @@ DEFINE_int32(num_servers, 1, "Number of server binaries"); DEFINE_int32(warmup_seconds, 5, "Warmup time (in seconds)"); DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)"); +DEFINE_int32(local_workers, 0, "Number of local workers to start"); // Common config DEFINE_bool(enable_ssl, false, "Use SSL"); @@ -102,7 +103,8 @@ int main(int argc, char** argv) { auto result = RunScenario(client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, - FLAGS_warmup_seconds, FLAGS_benchmark_seconds); + FLAGS_warmup_seconds, FLAGS_benchmark_seconds, + FLAGS_local_workers); ReportQPSPerCore(result, server_config); ReportLatency(result); diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc new file mode 100644 index 0000000000..46d70dce52 --- /dev/null +++ b/test/cpp/qps/qps_worker.cc @@ -0,0 +1,233 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "qps_worker.h" + +#include <cassert> +#include <memory> +#include <mutex> +#include <string> +#include <thread> +#include <vector> +#include <sstream> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/histogram.h> +#include <grpc/support/log.h> +#include <grpc/support/host_port.h> +#include <grpc++/client_context.h> +#include <grpc++/status.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_credentials.h> +#include <grpc++/stream.h> +#include "test/core/util/grpc_profiler.h" +#include "test/cpp/util/create_test_channel.h" +#include "test/cpp/qps/qpstest.pb.h" +#include "test/cpp/qps/client.h" +#include "test/cpp/qps/server.h" + +namespace grpc { +namespace testing { + +std::unique_ptr<Client> CreateClient(const ClientConfig& config) { + switch (config.client_type()) { + case ClientType::SYNCHRONOUS_CLIENT: + return (config.rpc_type() == RpcType::UNARY) ? + CreateSynchronousUnaryClient(config) : + CreateSynchronousStreamingClient(config); + case ClientType::ASYNC_CLIENT: + return (config.rpc_type() == RpcType::UNARY) ? + CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config); + } + abort(); +} + +std::unique_ptr<Server> CreateServer(const ServerConfig& config, int server_port) { + switch (config.server_type()) { + case ServerType::SYNCHRONOUS_SERVER: + return CreateSynchronousServer(config, server_port); + case ServerType::ASYNC_SERVER: + return CreateAsyncServer(config, server_port); + } + abort(); +} + +class WorkerImpl GRPC_FINAL : public Worker::Service { + public: + explicit WorkerImpl(int server_port) : server_port_(server_port), acquired_(false) {} + + Status RunTest(ServerContext* ctx, + ServerReaderWriter<ClientStatus, ClientArgs>* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + grpc_profiler_start("qps_client.prof"); + Status ret = RunTestBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + Status RunServer(ServerContext* ctx, + ServerReaderWriter<ServerStatus, ServerArgs>* stream) + GRPC_OVERRIDE { + InstanceGuard g(this); + if (!g.Acquired()) { + return Status(RESOURCE_EXHAUSTED); + } + + grpc_profiler_start("qps_server.prof"); + Status ret = RunServerBody(ctx,stream); + grpc_profiler_stop(); + return ret; + } + + private: + // Protect against multiple clients using this worker at once. + class InstanceGuard { + public: + InstanceGuard(WorkerImpl* impl) + : impl_(impl), acquired_(impl->TryAcquireInstance()) {} + ~InstanceGuard() { + if (acquired_) { + impl_->ReleaseInstance(); + } + } + + bool Acquired() const { return acquired_; } + + private: + WorkerImpl* const impl_; + const bool acquired_; + }; + + bool TryAcquireInstance() { + std::lock_guard<std::mutex> g(mu_); + if (acquired_) return false; + acquired_ = true; + return true; + } + + void ReleaseInstance() { + std::lock_guard<std::mutex> g(mu_); + GPR_ASSERT(acquired_); + acquired_ = false; + } + + Status RunTestBody(ServerContext* ctx, + ServerReaderWriter<ClientStatus, ClientArgs>* stream) { + ClientArgs args; + if (!stream->Read(&args)) { + return Status(INVALID_ARGUMENT); + } + if (!args.has_setup()) { + return Status(INVALID_ARGUMENT); + } + auto client = CreateClient(args.setup()); + if (!client) { + return Status(INVALID_ARGUMENT); + } + ClientStatus status; + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = client->Mark(); + stream->Write(status); + } + + return Status::OK; + } + + Status RunServerBody(ServerContext* ctx, + ServerReaderWriter<ServerStatus, ServerArgs>* stream) { + ServerArgs args; + if (!stream->Read(&args)) { + return Status(INVALID_ARGUMENT); + } + if (!args.has_setup()) { + return Status(INVALID_ARGUMENT); + } + auto server = CreateServer(args.setup(), server_port_); + if (!server) { + return Status(INVALID_ARGUMENT); + } + ServerStatus status; + status.set_port(server_port_); + if (!stream->Write(status)) { + return Status(UNKNOWN); + } + while (stream->Read(&args)) { + if (!args.has_mark()) { + return Status(INVALID_ARGUMENT); + } + *status.mutable_stats() = server->Mark(); + stream->Write(status); + } + + return Status::OK; + } + + const int server_port_; + + std::mutex mu_; + bool acquired_; +}; + +QpsWorker::QpsWorker(int driver_port, int server_port) { + impl_.reset(new WorkerImpl(server_port)); + + char* server_address = NULL; + gpr_join_host_port(&server_address, "::", driver_port); + + ServerBuilder builder; + builder.AddListeningPort(server_address, InsecureServerCredentials()); + builder.RegisterService(impl_.get()); + + gpr_free(server_address); + + server_ = std::move(builder.BuildAndStart()); +} + +QpsWorker::~QpsWorker() { +} + +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h new file mode 100644 index 0000000000..861588907e --- /dev/null +++ b/test/cpp/qps/qps_worker.h @@ -0,0 +1,60 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef QPS_WORKER_H +#define QPS_WORKER_H + +#include <memory> + +namespace grpc { + +class Server; + +namespace testing { + +class WorkerImpl; + +class QpsWorker { + public: + QpsWorker(int driver_port, int server_port); + ~QpsWorker(); + + private: + std::unique_ptr<WorkerImpl> impl_; + std::unique_ptr<Server> server_; +}; + +} // namespace testing +} // namespace grpc + +#endif diff --git a/test/cpp/qps/smoke_test.cc b/test/cpp/qps/smoke_test.cc index c9d321f133..9531913b00 100644 --- a/test/cpp/qps/smoke_test.cc +++ b/test/cpp/qps/smoke_test.cc @@ -58,7 +58,7 @@ static void RunSynchronousUnaryPingPong() { server_config.set_enable_ssl(false); server_config.set_threads(1); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); ReportQPS(result); ReportLatency(result); @@ -80,7 +80,7 @@ static void RunSynchronousStreamingPingPong() { server_config.set_enable_ssl(false); server_config.set_threads(1); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); ReportQPS(result); ReportLatency(result); @@ -103,7 +103,7 @@ static void RunAsyncUnaryPingPong() { server_config.set_enable_ssl(false); server_config.set_threads(1); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); ReportQPS(result); ReportLatency(result); @@ -126,7 +126,7 @@ static void RunQPS() { server_config.set_enable_ssl(false); server_config.set_threads(4); - auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK); + auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); ReportQPSPerCore(result, server_config); ReportLatency(result); diff --git a/test/cpp/qps/smoke_test.sh b/test/cpp/qps/smoke_test.sh deleted file mode 100755 index ba7f0a4f27..0000000000 --- a/test/cpp/qps/smoke_test.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/sh - -# performs a single qps run with one client and one server - -set -ex - -cd $(dirname $0)/../../.. - -killall qps_worker || true - -config=opt - -NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()'` - -make CONFIG=$config qps_worker qps_smoke_test -j$NUMCPUS - -bins/$config/qps_worker -driver_port 10000 -server_port 10001 & -PID1=$! -bins/$config/qps_worker -driver_port 10010 -server_port 10011 & -PID2=$! - -export QPS_WORKERS="localhost:10000,localhost:10010" - -bins/$config/qps_smoke_test $* - -kill -2 $PID1 $PID2 -wait - diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 101eb9f969..1ef5313b66 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -31,33 +31,15 @@ * */ -#include <cassert> -#include <memory> -#include <mutex> -#include <string> -#include <thread> -#include <vector> -#include <sstream> - #include <sys/signal.h> +#include <chrono> +#include <thread> + #include <grpc/grpc.h> -#include <grpc/support/alloc.h> -#include <grpc/support/histogram.h> -#include <grpc/support/log.h> -#include <grpc/support/host_port.h> #include <gflags/gflags.h> -#include <grpc++/client_context.h> -#include <grpc++/status.h> -#include <grpc++/server.h> -#include <grpc++/server_builder.h> -#include <grpc++/server_credentials.h> -#include <grpc++/stream.h> -#include "test/core/util/grpc_profiler.h" -#include "test/cpp/util/create_test_channel.h" -#include "test/cpp/qps/qpstest.grpc.pb.h" -#include "test/cpp/qps/client.h" -#include "test/cpp/qps/server.h" + +#include "qps_worker.h" DEFINE_int32(driver_port, 0, "Driver server port."); DEFINE_int32(server_port, 0, "Spawned server port."); @@ -76,167 +58,8 @@ static void sigint_handler(int x) {got_sigint = true;} namespace grpc { namespace testing { -std::unique_ptr<Client> CreateClient(const ClientConfig& config) { - switch (config.client_type()) { - case ClientType::SYNCHRONOUS_CLIENT: - return (config.rpc_type() == RpcType::UNARY) ? - CreateSynchronousUnaryClient(config) : - CreateSynchronousStreamingClient(config); - case ClientType::ASYNC_CLIENT: - return (config.rpc_type() == RpcType::UNARY) ? - CreateAsyncUnaryClient(config) : CreateAsyncStreamingClient(config); - } - abort(); -} - -std::unique_ptr<Server> CreateServer(const ServerConfig& config) { - switch (config.server_type()) { - case ServerType::SYNCHRONOUS_SERVER: - return CreateSynchronousServer(config, FLAGS_server_port); - case ServerType::ASYNC_SERVER: - return CreateAsyncServer(config, FLAGS_server_port); - } - abort(); -} - -class WorkerImpl GRPC_FINAL : public Worker::Service { - public: - WorkerImpl() : acquired_(false) {} - - Status RunTest(ServerContext* ctx, - ServerReaderWriter<ClientStatus, ClientArgs>* stream) - GRPC_OVERRIDE { - InstanceGuard g(this); - if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); - } - - grpc_profiler_start("qps_client.prof"); - Status ret = RunTestBody(ctx,stream); - grpc_profiler_stop(); - return ret; - } - - Status RunServer(ServerContext* ctx, - ServerReaderWriter<ServerStatus, ServerArgs>* stream) - GRPC_OVERRIDE { - InstanceGuard g(this); - if (!g.Acquired()) { - return Status(RESOURCE_EXHAUSTED); - } - - grpc_profiler_start("qps_server.prof"); - Status ret = RunServerBody(ctx,stream); - grpc_profiler_stop(); - return ret; - } - - private: - // Protect against multiple clients using this worker at once. - class InstanceGuard { - public: - InstanceGuard(WorkerImpl* impl) - : impl_(impl), acquired_(impl->TryAcquireInstance()) {} - ~InstanceGuard() { - if (acquired_) { - impl_->ReleaseInstance(); - } - } - - bool Acquired() const { return acquired_; } - - private: - WorkerImpl* const impl_; - const bool acquired_; - }; - - bool TryAcquireInstance() { - std::lock_guard<std::mutex> g(mu_); - if (acquired_) return false; - acquired_ = true; - return true; - } - - void ReleaseInstance() { - std::lock_guard<std::mutex> g(mu_); - GPR_ASSERT(acquired_); - acquired_ = false; - } - - Status RunTestBody(ServerContext* ctx, - ServerReaderWriter<ClientStatus, ClientArgs>* stream) { - ClientArgs args; - if (!stream->Read(&args)) { - return Status(INVALID_ARGUMENT); - } - if (!args.has_setup()) { - return Status(INVALID_ARGUMENT); - } - auto client = CreateClient(args.setup()); - if (!client) { - return Status(INVALID_ARGUMENT); - } - ClientStatus status; - if (!stream->Write(status)) { - return Status(UNKNOWN); - } - while (stream->Read(&args)) { - if (!args.has_mark()) { - return Status(INVALID_ARGUMENT); - } - *status.mutable_stats() = client->Mark(); - stream->Write(status); - } - - return Status::OK; - } - - Status RunServerBody(ServerContext* ctx, - ServerReaderWriter<ServerStatus, ServerArgs>* stream) { - ServerArgs args; - if (!stream->Read(&args)) { - return Status(INVALID_ARGUMENT); - } - if (!args.has_setup()) { - return Status(INVALID_ARGUMENT); - } - auto server = CreateServer(args.setup()); - if (!server) { - return Status(INVALID_ARGUMENT); - } - ServerStatus status; - status.set_port(FLAGS_server_port); - if (!stream->Write(status)) { - return Status(UNKNOWN); - } - while (stream->Read(&args)) { - if (!args.has_mark()) { - return Status(INVALID_ARGUMENT); - } - *status.mutable_stats() = server->Mark(); - stream->Write(status); - } - - return Status::OK; - } - - std::mutex mu_; - bool acquired_; -}; - static void RunServer() { - char* server_address = NULL; - gpr_join_host_port(&server_address, "::", FLAGS_driver_port); - - WorkerImpl service; - - ServerBuilder builder; - builder.AddListeningPort(server_address, InsecureServerCredentials()); - builder.RegisterService(&service); - - gpr_free(server_address); - - auto server = builder.BuildAndStart(); + QpsWorker worker(FLAGS_driver_port, FLAGS_server_port); while (!got_sigint) { std::this_thread::sleep_for(std::chrono::seconds(5)); |