From 58da895a8ceecd5997b32aba5ca43007a958e606 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 26 Jan 2016 10:57:07 -0800 Subject: localhost -> wildcard address --- test/cpp/qps/qps_driver.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 9816a09592..620e874c14 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -153,7 +153,7 @@ static void QpsDriver() { ServerConfig server_config; server_config.set_server_type(server_type); - server_config.set_host("localhost"); + server_config.set_host("::"); // Use the wildcard server address server_config.set_async_server_threads(FLAGS_async_server_threads); if (FLAGS_secure_test) { -- cgit v1.2.3 From 57ecf766ab4137ad0265ccbc95b8610929af9808 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 26 Jan 2016 12:42:48 -0800 Subject: clang-format --- test/cpp/qps/qps_driver.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 620e874c14..e3e7cb296b 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -153,7 +153,7 @@ static void QpsDriver() { ServerConfig server_config; server_config.set_server_type(server_type); - server_config.set_host("::"); // Use the wildcard server address + server_config.set_host("::"); // Use the wildcard server address server_config.set_async_server_threads(FLAGS_async_server_threads); if (FLAGS_secure_test) { -- cgit v1.2.3 From 0f14209061f79d37f4f72801d66c2ab68dc69b7f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 26 Jan 2016 14:20:07 -0800 Subject: Improve logging for test --- test/cpp/qps/driver.cc | 4 ++++ test/cpp/qps/qps_worker.cc | 22 ++++++++++++++++++++++ 2 files changed, 26 insertions(+) (limited to 'test/cpp') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index acb265b308..1c4f3d0e56 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -161,6 +161,8 @@ std::unique_ptr RunScenario( // where class contained in std::vector must have a copy constructor auto* servers = new ServerData[num_servers]; for (size_t i = 0; i < num_servers; i++) { + gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", + workers[i].c_str(), i); servers[i].stub = WorkerService::NewStub( CreateChannel(workers[i], InsecureChannelCredentials())); ServerArgs args; @@ -188,6 +190,8 @@ std::unique_ptr RunScenario( // where class contained in std::vector must have a copy constructor auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { + gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", + workers[i].c_str(), i); clients[i].stub = WorkerService::NewStub( CreateChannel(workers[i + num_servers], InsecureChannelCredentials())); ClientArgs args; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index c0276d05b3..d8c8573a2c 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -61,6 +61,11 @@ namespace grpc { namespace testing { static std::unique_ptr CreateClient(const ClientConfig& config) { + gpr_log(GPR_INFO, "Starting client of type %s %s %d", + ClientType_Name(config.client_type()).c_str(), + RpcType_Name(config.rpc_type()).c_str(), + config.payload_config().has_bytebuf_params()); + switch (config.client_type()) { case ClientType::SYNC_CLIENT: return (config.rpc_type() == RpcType::UNARY) @@ -81,6 +86,9 @@ static std::unique_ptr CreateClient(const ClientConfig& config) { static void LimitCores(int cores) {} static std::unique_ptr CreateServer(const ServerConfig& config) { + gpr_log(GPR_INFO, "Starting server of type %s", + ServerType_Name(config.server_type()).c_str()); + if (config.core_limit() > 0) { LimitCores(config.core_limit()); } @@ -169,22 +177,29 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { if (!args.has_setup()) { return Status(StatusCode::INVALID_ARGUMENT, ""); } + gpr_log(GPR_INFO, "RunClientBody: about to create client"); auto client = CreateClient(args.setup()); if (!client) { return Status(StatusCode::INVALID_ARGUMENT, ""); } + gpr_log(GPR_INFO, "RunClientBody: client created"); ClientStatus status; if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } + gpr_log(GPR_INFO, "RunClientBody: creation status reported\n"); while (stream->Read(&args)) { + gpr_log(GPR_INFO, "RunClientBody: Message read\n"); if (!args.has_mark()) { + gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!\n"); return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = client->Mark(args.mark().reset()); stream->Write(status); + gpr_log(GPR_INFO, "RunClientBody: Mark response given\n"); } + gpr_log(GPR_INFO, "RunClientBody: Returning\n"); return Status::OK; } @@ -200,24 +215,31 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { if (server_port_ != 0) { args.mutable_setup()->set_port(server_port_); } + gpr_log(GPR_INFO, "RunServerBody: about to create server"); auto server = CreateServer(args.setup()); if (!server) { return Status(StatusCode::INVALID_ARGUMENT, ""); } + gpr_log(GPR_INFO, "RunServerBody: server created"); ServerStatus status; status.set_port(server->port()); status.set_cores(server->cores()); if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } + gpr_log(GPR_INFO, "RunServerBody: creation status reported\n"); while (stream->Read(&args)) { + gpr_log(GPR_INFO, "RunServerBody: Message read\n"); if (!args.has_mark()) { + gpr_log(GPR_INFO, "RunServerBody: Message not a mark!\n"); return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = server->Mark(args.mark().reset()); stream->Write(status); + gpr_log(GPR_INFO, "RunServerBody: Mark response given\n"); } + gpr_log(GPR_INFO, "RunServerBody: Returning\n"); return Status::OK; } -- cgit v1.2.3 From bdf4acbbfb4aafbe12073808f3a4dfffe0aad283 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 26 Jan 2016 15:15:26 -0800 Subject: Properly state client name --- test/cpp/qps/driver.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 1c4f3d0e56..daeb33a0aa 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -191,7 +191,7 @@ std::unique_ptr RunScenario( auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", - workers[i].c_str(), i); + workers[i + num_servers].c_str(), i + num_servers); clients[i].stub = WorkerService::NewStub( CreateChannel(workers[i + num_servers], InsecureChannelCredentials())); ClientArgs args; -- cgit v1.2.3 From a00f998089a641ed7cc059eb5d4ce52734ba2e3e Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 26 Jan 2016 17:51:35 -0800 Subject: Actually create async generic server --- src/proto/grpc/testing/control.proto | 1 + test/cpp/qps/generic_async_streaming_ping_pong_test.cc | 2 +- test/cpp/qps/qps_driver.cc | 2 +- test/cpp/qps/qps_worker.cc | 2 ++ test/cpp/qps/server.h | 1 + 5 files changed, 6 insertions(+), 2 deletions(-) (limited to 'test/cpp') diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 0784ebf91c..496108ab5f 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -42,6 +42,7 @@ enum ClientType { enum ServerType { SYNC_SERVER = 0; ASYNC_SERVER = 1; + ASYNC_GENERIC_SERVER = 2; } enum RpcType { diff --git a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc index 2b2e1c820f..81c0f24103 100644 --- a/test/cpp/qps/generic_async_streaming_ping_pong_test.cc +++ b/test/cpp/qps/generic_async_streaming_ping_pong_test.cc @@ -60,7 +60,7 @@ static void RunGenericAsyncStreamingPingPong() { bbuf->set_req_size(0); ServerConfig server_config; - server_config.set_server_type(ASYNC_SERVER); + server_config.set_server_type(ASYNC_GENERIC_SERVER); server_config.set_host("localhost"); server_config.set_async_server_threads(1); diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index e3e7cb296b..680e4b19b9 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -170,7 +170,7 @@ static void QpsDriver() { GPR_ASSERT(!client_config.payload_config().has_bytebuf_params() || (client_config.client_type() == ASYNC_CLIENT && client_config.rpc_type() == STREAMING && - server_config.server_type() == ASYNC_SERVER)); + server_config.server_type() == ASYNC_GENERIC_SERVER)); const auto result = RunScenario( client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index c0276d05b3..0bacc11b23 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -89,6 +89,8 @@ static std::unique_ptr CreateServer(const ServerConfig& config) { return CreateSynchronousServer(config); case ServerType::ASYNC_SERVER: return CreateAsyncServer(config); + case ServerType::ASYNC_GENERIC_SERVER: + return CreateAsyncGenericServer(config); default: abort(); } diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 32a3e85026..196fdac8f3 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -108,6 +108,7 @@ class Server { std::unique_ptr CreateSynchronousServer(const ServerConfig& config); std::unique_ptr CreateAsyncServer(const ServerConfig& config); +std::unique_ptr CreateAsyncGenericServer(const ServerConfig& config); } // namespace testing } // namespace grpc -- cgit v1.2.3 From a091a23a7f10b6862c526f69bb5a8951d4365ecf Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 26 Jan 2016 18:10:42 -0800 Subject: No need to put \n at end of a log message --- test/cpp/qps/qps_worker.cc | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index d8c8573a2c..e782b2a6c5 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -187,19 +187,19 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } - gpr_log(GPR_INFO, "RunClientBody: creation status reported\n"); + gpr_log(GPR_INFO, "RunClientBody: creation status reported"); while (stream->Read(&args)) { - gpr_log(GPR_INFO, "RunClientBody: Message read\n"); + gpr_log(GPR_INFO, "RunClientBody: Message read"); if (!args.has_mark()) { - gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!\n"); + gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!"); return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = client->Mark(args.mark().reset()); stream->Write(status); - gpr_log(GPR_INFO, "RunClientBody: Mark response given\n"); + gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } - gpr_log(GPR_INFO, "RunClientBody: Returning\n"); + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } @@ -227,19 +227,19 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } - gpr_log(GPR_INFO, "RunServerBody: creation status reported\n"); + gpr_log(GPR_INFO, "RunServerBody: creation status reported"); while (stream->Read(&args)) { - gpr_log(GPR_INFO, "RunServerBody: Message read\n"); + gpr_log(GPR_INFO, "RunServerBody: Message read"); if (!args.has_mark()) { - gpr_log(GPR_INFO, "RunServerBody: Message not a mark!\n"); + gpr_log(GPR_INFO, "RunServerBody: Message not a mark!"); return Status(StatusCode::INVALID_ARGUMENT, ""); } *status.mutable_stats() = server->Mark(args.mark().reset()); stream->Write(status); - gpr_log(GPR_INFO, "RunServerBody: Mark response given\n"); + gpr_log(GPR_INFO, "RunServerBody: Mark response given"); } - gpr_log(GPR_INFO, "RunServerBody: Returning\n"); + gpr_log(GPR_INFO, "RunServerBody: Returning"); return Status::OK; } -- cgit v1.2.3 From 020c2f36ac7bec80a20c1acfe30d179902f77acd Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 26 Jan 2016 18:11:22 -0800 Subject: Fix copyright --- test/cpp/qps/driver.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index daeb33a0aa..20d429751d 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without -- cgit v1.2.3 From 94aada9ee5f35b5c1df6b103f3d8e8f24cbaaf70 Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 26 Jan 2016 18:12:30 -0800 Subject: clang-format --- test/cpp/qps/driver.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 20d429751d..39b8f5cf0d 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -161,8 +161,8 @@ std::unique_ptr RunScenario( // where class contained in std::vector must have a copy constructor auto* servers = new ServerData[num_servers]; for (size_t i = 0; i < num_servers; i++) { - gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", - workers[i].c_str(), i); + gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", workers[i].c_str(), + i); servers[i].stub = WorkerService::NewStub( CreateChannel(workers[i], InsecureChannelCredentials())); ServerArgs args; -- cgit v1.2.3 From 0c31b608058a36bd9d4163c72f666e94eb8442b0 Mon Sep 17 00:00:00 2001 From: vjpai Date: Wed, 27 Jan 2016 09:03:18 -0800 Subject: Sanity failures (copyright, clang-format) --- include/grpc++/server.h | 2 +- test/cpp/qps/driver.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'test/cpp') diff --git a/include/grpc++/server.h b/include/grpc++/server.h index c9371ba649..f24ed333bb 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 39b8f5cf0d..66269ae757 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -162,7 +162,7 @@ std::unique_ptr RunScenario( auto* servers = new ServerData[num_servers]; for (size_t i = 0; i < num_servers; i++) { gpr_log(GPR_INFO, "Starting server on %s (worker #%d)", workers[i].c_str(), - i); + i); servers[i].stub = WorkerService::NewStub( CreateChannel(workers[i], InsecureChannelCredentials())); ServerArgs args; -- cgit v1.2.3 From a24e9d774caf776e5d0182a779d39ba584c7387a Mon Sep 17 00:00:00 2001 From: vjpai Date: Wed, 27 Jan 2016 09:45:29 -0800 Subject: Sanity checks and asan fix --- include/grpc++/server.h | 2 +- src/proto/grpc/testing/control.proto | 2 +- test/cpp/qps/server_async.cc | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) (limited to 'test/cpp') diff --git a/include/grpc++/server.h b/include/grpc++/server.h index c9371ba649..f24ed333bb 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 496108ab5f..7ba6f9856f 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -1,4 +1,4 @@ -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index d530dac86b..cd8d546c28 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -373,7 +373,7 @@ static Status ProcessGenericRPC(const PayloadConfig &payload_config, const ByteBuffer *request, ByteBuffer *response) { int resp_size = payload_config.bytebuf_params().resp_size(); - std::unique_ptr buf(new char[resp_size]); + std::unique_ptr buf(new char[resp_size]); gpr_slice s = gpr_slice_from_copied_buffer(buf.get(), resp_size); Slice slice(s, Slice::STEAL_REF); *response = ByteBuffer(&slice, 1); -- cgit v1.2.3 From 32baa5e62209b67ff247e4a03803c70cb364b457 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 27 Jan 2016 10:01:41 -0800 Subject: Allow dynamic sizing of async client and server thread pools --- test/cpp/qps/client_async.cc | 62 +++++++++++++++++++++++++++++--------------- test/cpp/qps/server_async.cc | 38 +++++++++++++++++---------- 2 files changed, 65 insertions(+), 35 deletions(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f270cd0987..13e71fec7d 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -46,13 +46,14 @@ #include #include #include +#include #include #include +#include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" -#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { namespace testing { @@ -93,7 +94,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function< std::unique_ptr>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req, + CompletionQueue*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -139,7 +141,8 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function callback_; std::function>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> start_req_; + CompletionQueue*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -158,20 +161,22 @@ class AsyncClient : public ClientImpl { using Client::closed_loop_; using ClientImpl::channels_; using ClientImpl::request_; - AsyncClient(const ClientConfig& config, - std::function setup_ctx, - std::function(std::shared_ptr)> - create_stub) + AsyncClient( + const ClientConfig& config, + std::function + setup_ctx, + std::function(std::shared_ptr)> + create_stub) : ClientImpl(config, create_stub), + num_async_threads_(NumThreads(config)), channel_lock_(new std::mutex[config.client_channels()]), contexts_(config.client_channels()), max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), channel_count_(config.client_channels()), - pref_channel_inc_(config.async_client_threads()) { - SetupLoadTest(config, config.async_client_threads()); + pref_channel_inc_(num_async_threads_) { + SetupLoadTest(config, num_async_threads_); - for (int i = 0; i < config.async_client_threads(); i++) { + for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); if (!closed_loop_) { rpc_deadlines_.emplace_back(); @@ -324,6 +329,9 @@ class AsyncClient : public ClientImpl { return true; } + protected: + int num_async_threads_; + private: class boolean { // exists only to avoid data-race on vector public: @@ -338,6 +346,15 @@ class AsyncClient : public ClientImpl { private: bool val_; }; + static int NumThreads(const ClientConfig& config) { + int num_threads = config.async_client_threads(); + if (num_threads <= 0) { // Use dynamic sizing + num_threads = gpr_cpu_num_cores(); + gpr_log(GPR_INFO, "Sizing client server to %d threads\n", num_threads); + } + return num_threads; + } + std::vector> cli_cqs_; std::vector rpc_deadlines_; // per thread deadlines @@ -363,7 +380,7 @@ class AsyncUnaryClient GRPC_FINAL public: explicit AsyncUnaryClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } @@ -391,7 +408,8 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req, + void*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -443,10 +461,10 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function callback_; - std::function< - std::unique_ptr>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> start_req_; + std::function>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -461,7 +479,7 @@ class AsyncStreamingClient GRPC_FINAL // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } @@ -490,7 +508,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, std::function( grpc::GenericStub*, grpc::ClientContext*, - const grpc::string& method_name, CompletionQueue*, void*)> start_req, + const grpc::string& method_name, CompletionQueue*, void*)> + start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -547,7 +566,8 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function callback_; std::function( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, - CompletionQueue*, void*)> start_req_; + CompletionQueue*, void*)> + start_req_; grpc::Status status_; double start_; std::unique_ptr stream_; @@ -566,7 +586,7 @@ class GenericAsyncStreamingClient GRPC_FINAL // async streaming currently only supports closed loop GPR_ASSERT(closed_loop_); - StartThreads(config.async_client_threads()); + StartThreads(num_async_threads_); } ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index cd8d546c28..8e1503b460 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -50,8 +50,8 @@ #include #include -#include "test/cpp/qps/server.h" #include "src/proto/grpc/testing/services.grpc.pb.h" +#include "test/cpp/qps/server.h" namespace grpc { namespace testing { @@ -72,7 +72,8 @@ class AsyncQpsServerTest : public Server { CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, std::function process_rpc) + ResponseType *)> + process_rpc) : Server(config) { char *server_address = NULL; @@ -85,7 +86,13 @@ class AsyncQpsServerTest : public Server { register_service(&builder, &async_service_); - for (int i = 0; i < config.async_server_threads(); i++) { + int num_threads = config.async_server_threads(); + if (num_threads <= 0) { // dynamic sizing + num_threads = cores(); + gpr_log(GPR_INFO, "Sizing async server to %d threads\n", num_threads); + } + + for (int i = 0; i < num_threads; i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } @@ -96,8 +103,8 @@ class AsyncQpsServerTest : public Server { auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), _1, _2); - for (int i = 0; i < 10000 / config.async_server_threads(); i++) { - for (int j = 0; j < config.async_server_threads(); j++) { + for (int i = 0; i < 10000 / num_threads; i++) { + for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = std::bind(request_unary_function, &async_service_, _1, _2, _3, @@ -115,10 +122,10 @@ class AsyncQpsServerTest : public Server { } } - for (int i = 0; i < config.async_server_threads(); i++) { + for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } - for (int i = 0; i < config.async_server_threads(); i++) { + for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } @@ -184,7 +191,8 @@ class AsyncQpsServerTest : public Server { ServerRpcContextUnaryImpl( std::function *, - void *)> request_method, + void *)> + request_method, std::function invoke_method) : srv_ctx_(new ServerContextType), @@ -381,12 +389,14 @@ static Status ProcessGenericRPC(const PayloadConfig &payload_config, } std::unique_ptr CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr(new AsyncQpsServerTest< - SimpleRequest, SimpleResponse, BenchmarkService::AsyncService, - grpc::ServerContext>( - config, RegisterBenchmarkService, - &BenchmarkService::AsyncService::RequestUnaryCall, - &BenchmarkService::AsyncService::RequestStreamingCall, ProcessSimpleRPC)); + return std::unique_ptr( + new AsyncQpsServerTest( + config, RegisterBenchmarkService, + &BenchmarkService::AsyncService::RequestUnaryCall, + &BenchmarkService::AsyncService::RequestStreamingCall, + ProcessSimpleRPC)); } std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { return std::unique_ptr( -- cgit v1.2.3 From b0f275e4af3a18882110994713368eb288594b66 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 27 Jan 2016 10:45:50 -0800 Subject: Small testing fixes - end2end test deadlines may complete before checking IsCancelled => don't expect it to be false in these cases - add exponential backoff to port_posix - ensure run_tests rebuilds targets with a regex I commonly use --- src/proto/grpc/testing/echo_messages.proto | 1 + test/core/util/port_posix.c | 3 ++- test/cpp/end2end/end2end_test.cc | 3 ++- tools/run_tests/run_tests.py | 4 ++-- 4 files changed, 7 insertions(+), 4 deletions(-) (limited to 'test/cpp') diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto index f01d645af7..b9f1eec2e2 100644 --- a/src/proto/grpc/testing/echo_messages.proto +++ b/src/proto/grpc/testing/echo_messages.proto @@ -41,6 +41,7 @@ message RequestParams { int32 response_message_length = 6; bool echo_peer = 7; string expected_client_identity = 8; // will force check_auth_context. + bool skip_cancelled_check = 9; } message EchoRequest { diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index 732a51c5cb..1b574f4399 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -37,6 +37,7 @@ #include "test/core/util/port.h" +#include #include #include #include @@ -229,10 +230,10 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_httpcli_request req; memset(&req, 0, sizeof(req)); GPR_ASSERT(pr->retries < 10); + sleep(1 + (unsigned)(pow(1.3, pr->retries) * rand() / RAND_MAX)); pr->retries++; req.host = pr->server; req.path = "/get"; - sleep(1); grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, pr); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index f8027bcf0b..6303d2cc13 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -244,7 +244,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { gpr_time_from_micros(request->param().server_cancel_after_us(), GPR_TIMESPAN))); return Status::CANCELLED; - } else { + } else if (!request->has_param() || !request->param().skip_cancelled_check()) { EXPECT_FALSE(context->IsCancelled()); } @@ -823,6 +823,7 @@ TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { EchoRequest request; EchoResponse response; request.set_message("Hello"); + request.mutable_param()->set_skip_cancelled_check(true); ClientContext context; std::chrono::system_clock::time_point deadline = diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index f8b01021c8..3180e6bc3a 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -151,9 +151,9 @@ class CLanguage(object): def make_targets(self, test_regex): if platform_string() != 'windows' and test_regex != '.*': # use the regex to minimize the number of things to build - return [target['name'] + return [os.path.basename(target['name']) for target in get_c_tests(False, self.test_lang) - if re.search(test_regex, target['name'])] + if re.search(test_regex, '/' + target['name'])] if platform_string() == 'windows': # don't build tools on windows just yet return ['buildtests_%s' % self.make_target] -- cgit v1.2.3 From d02dd30a1fb0895bac9f6139bf8ef9418a26b69d Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 27 Jan 2016 11:23:34 -0800 Subject: Also dynamic sizing for num_clients on performance tests --- test/cpp/qps/driver.cc | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index acb265b308..4ef70a2570 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -31,24 +31,24 @@ * */ +#include #include #include -#include #include -#include -#include -#include #include #include +#include +#include +#include #include "src/core/support/env.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" #include "test/cpp/qps/driver.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qps_worker.h" -#include "src/proto/grpc/testing/services.grpc.pb.h" using std::list; using std::thread; @@ -142,6 +142,12 @@ std::unique_ptr RunScenario( } } + // if num_clients is set to <=0, do dynamic sizing: all workers + // except for servers are clients + if (num_clients <= 0) { + num_clients = workers.size() - num_servers; + } + // TODO(ctiller): support running multiple configurations, and binpack // client/server pairs // to available workers -- cgit v1.2.3 From 7a984f0e3aa65367dac193d8f616bd7692e61998 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 27 Jan 2016 11:24:57 -0800 Subject: Copyright --- test/cpp/qps/driver.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 4ef70a2570..240c66d7a3 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without -- cgit v1.2.3 From 7d8335f8753afe33117358bee32f89808eae5d31 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 27 Jan 2016 12:00:41 -0800 Subject: Don't log newlines --- test/cpp/qps/client_async.cc | 2 +- test/cpp/qps/server_async.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 13e71fec7d..4ab74a9d4c 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -350,7 +350,7 @@ class AsyncClient : public ClientImpl { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing num_threads = gpr_cpu_num_cores(); - gpr_log(GPR_INFO, "Sizing client server to %d threads\n", num_threads); + gpr_log(GPR_INFO, "Sizing client server to %d threads", num_threads); } return num_threads; } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 8e1503b460..ddfeb8638d 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -89,7 +89,7 @@ class AsyncQpsServerTest : public Server { int num_threads = config.async_server_threads(); if (num_threads <= 0) { // dynamic sizing num_threads = cores(); - gpr_log(GPR_INFO, "Sizing async server to %d threads\n", num_threads); + gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads); } for (int i = 0; i < num_threads; i++) { -- cgit v1.2.3 From 16e45702ac13ce756a911e492317856e94d067aa Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 27 Jan 2016 14:51:12 -0800 Subject: clang-format --- test/cpp/qps/client_async.cc | 34 ++++++++++++++-------------------- test/cpp/qps/server_async.cc | 20 ++++++++------------ 2 files changed, 22 insertions(+), 32 deletions(-) (limited to 'test/cpp') diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 4ab74a9d4c..e423ee2598 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -94,8 +94,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function< std::unique_ptr>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> - start_req, + CompletionQueue*)> start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -141,8 +140,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { std::function callback_; std::function>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, - CompletionQueue*)> - start_req_; + CompletionQueue*)> start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -161,12 +159,11 @@ class AsyncClient : public ClientImpl { using Client::closed_loop_; using ClientImpl::channels_; using ClientImpl::request_; - AsyncClient( - const ClientConfig& config, - std::function - setup_ctx, - std::function(std::shared_ptr)> - create_stub) + AsyncClient(const ClientConfig& config, + std::function setup_ctx, + std::function(std::shared_ptr)> + create_stub) : ClientImpl(config, create_stub), num_async_threads_(NumThreads(config)), channel_lock_(new std::mutex[config.client_channels()]), @@ -408,8 +405,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { std::function>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, - void*)> - start_req, + void*)> start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -461,10 +457,10 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { ResponseType response_; bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function callback_; - std::function>( - BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> - start_req_; + std::function< + std::unique_ptr>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, + void*)> start_req_; grpc::Status status_; double start_; std::unique_ptr> @@ -508,8 +504,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, std::function( grpc::GenericStub*, grpc::ClientContext*, - const grpc::string& method_name, CompletionQueue*, void*)> - start_req, + const grpc::string& method_name, CompletionQueue*, void*)> start_req, std::function on_done) : ClientRpcContext(channel_id), context_(), @@ -566,8 +561,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { std::function callback_; std::function( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, - CompletionQueue*, void*)> - start_req_; + CompletionQueue*, void*)> start_req_; grpc::Status status_; double start_; std::unique_ptr stream_; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index ddfeb8638d..ffa6226388 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -72,8 +72,7 @@ class AsyncQpsServerTest : public Server { CompletionQueue *, ServerCompletionQueue *, void *)> request_streaming_function, std::function - process_rpc) + ResponseType *)> process_rpc) : Server(config) { char *server_address = NULL; @@ -191,8 +190,7 @@ class AsyncQpsServerTest : public Server { ServerRpcContextUnaryImpl( std::function *, - void *)> - request_method, + void *)> request_method, std::function invoke_method) : srv_ctx_(new ServerContextType), @@ -389,14 +387,12 @@ static Status ProcessGenericRPC(const PayloadConfig &payload_config, } std::unique_ptr CreateAsyncServer(const ServerConfig &config) { - return std::unique_ptr( - new AsyncQpsServerTest( - config, RegisterBenchmarkService, - &BenchmarkService::AsyncService::RequestUnaryCall, - &BenchmarkService::AsyncService::RequestStreamingCall, - ProcessSimpleRPC)); + return std::unique_ptr(new AsyncQpsServerTest< + SimpleRequest, SimpleResponse, BenchmarkService::AsyncService, + grpc::ServerContext>( + config, RegisterBenchmarkService, + &BenchmarkService::AsyncService::RequestUnaryCall, + &BenchmarkService::AsyncService::RequestStreamingCall, ProcessSimpleRPC)); } std::unique_ptr CreateAsyncGenericServer(const ServerConfig &config) { return std::unique_ptr( -- cgit v1.2.3 From a0fef1478436881247828893b6179157acac6678 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 27 Jan 2016 15:10:39 -0800 Subject: clang-format code --- test/cpp/end2end/end2end_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'test/cpp') diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 6303d2cc13..5a414ebc86 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -244,7 +244,8 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { gpr_time_from_micros(request->param().server_cancel_after_us(), GPR_TIMESPAN))); return Status::CANCELLED; - } else if (!request->has_param() || !request->param().skip_cancelled_check()) { + } else if (!request->has_param() || + !request->param().skip_cancelled_check()) { EXPECT_FALSE(context->IsCancelled()); } -- cgit v1.2.3