From 0e66efdaddf096f9833ca37e7eee958b2316242f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jan 2016 11:15:49 -0800 Subject: Make core limitation work for both client and server so that we can run tests on the same machine if desired. The core_list flags to qps_driver are comma-separated lists of core numbers. --- test/cpp/qps/client.h | 13 +++++++++ test/cpp/qps/coresched.cc | 72 ++++++++++++++++++++++++++++++++++++++++++++++ test/cpp/qps/coresched.h | 45 +++++++++++++++++++++++++++++ test/cpp/qps/qps_driver.cc | 37 ++++++++++++++++++++++-- test/cpp/qps/qps_worker.cc | 5 ---- test/cpp/qps/server.h | 15 +++++++++- 6 files changed, 178 insertions(+), 9 deletions(-) create mode 100644 test/cpp/qps/coresched.cc create mode 100644 test/cpp/qps/coresched.h (limited to 'test') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index ee0049578d..a81155c242 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -36,9 +36,13 @@ #include #include +#include + +#include #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" +#include "test/cpp/qps/coresched.h" #include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" #include "src/proto/grpc/testing/payloads.grpc.pb.h" @@ -72,6 +76,15 @@ class Client { : channels_(config.client_channels()), timer_(new Timer), interarrival_timer_() { + int clsize = config.core_list_size(); + if (clsize > 0) { + std::vector core_list; + for (int i = 0; i < clsize; i++) { + core_list.push_back(config.core_list(i)); + } + LimitCores(core_list); + } + for (int i = 0; i < config.client_channels(); i++) { channels_[i].init(config.server_targets(i % config.server_targets_size()), config); diff --git a/test/cpp/qps/coresched.cc b/test/cpp/qps/coresched.cc new file mode 100644 index 0000000000..8a7b4d4e67 --- /dev/null +++ b/test/cpp/qps/coresched.cc @@ -0,0 +1,72 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/coresched.h" + +#include +#include +#include +#include + +namespace grpc { +namespace testing { + +#ifdef GPR_CPU_LINUX +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include +int LimitCores(std::vector cores) { + size_t num_cores = static_cast(gpr_cpu_num_cores()); + if (num_cores > cores.size()) { + cpu_set_t *cpup = CPU_ALLOC(num_cores); + GPR_ASSERT(cpup); + size_t size = CPU_ALLOC_SIZE(num_cores); + CPU_ZERO_S(size, cpup); + + for (size_t i = 0; i < cores.size(); i++) { + CPU_SET_S(cores[i], size, cpup); + } + GPR_ASSERT(sched_setaffinity(0, size, cpup) == 0); + CPU_FREE(cpup); + return cores.size(); + } else { + return num_cores; + } +} +#else +// LimitCores is not currently supported for non-Linux platforms +int LimitCores(std::vector core_vec) {return gpr_cpu_num_cores();} +#endif +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/coresched.h b/test/cpp/qps/coresched.h new file mode 100644 index 0000000000..38f6859636 --- /dev/null +++ b/test/cpp/qps/coresched.h @@ -0,0 +1,45 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_CORESCHED_H +#define TEST_QPS_CORESCHED_H + +#include + +namespace grpc { +namespace testing { +int LimitCores(std::vector core_vec); +} // namespace testing +} // namespace grpc + +#endif // TEST_QPS_CORESCHED_H diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index c7096391e6..6eafabcc2c 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -48,14 +48,13 @@ DEFINE_int32(warmup_seconds, 5, "Warmup time (in seconds)"); DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)"); DEFINE_int32(local_workers, 0, "Number of local workers to start"); -// Common config -DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING"); - // Server config DEFINE_int32(async_server_threads, 1, "Number of threads for async servers"); DEFINE_string(server_type, "SYNC_SERVER", "Server type"); +DEFINE_string(server_core_list, "", "Comma-separated list of cores for server"); // Client config +DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING"); DEFINE_int32(outstanding_rpcs_per_channel, 1, "Number of outstanding rpcs per channel"); DEFINE_int32(client_channels, 1, "Number of client channels"); @@ -73,6 +72,8 @@ DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)"); DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)"); DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value"); +DEFINE_string(client_core_list, "", "Comma-separated list of cores for client"); + DEFINE_bool(secure_test, false, "Run a secure test"); using grpc::testing::ClientConfig; @@ -86,6 +87,22 @@ using grpc::testing::SecurityParams; namespace grpc { namespace testing { +static std::vector IntParse(const std::string& s) { + size_t pos = 0; + std::vector res; + while (pos < s.size()) { + size_t comma = s.find(',', pos); + if (comma == std::string::npos) { + res.push_back(std::stoi(s.substr(pos))); + break; + } else { + res.push_back(std::stoi(s.substr(pos, comma-pos), nullptr)); + pos = comma + 1; + } + } + return res; +} + static void QpsDriver() { RpcType rpc_type; GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type)); @@ -142,11 +159,25 @@ static void QpsDriver() { client_config.mutable_histogram_params()->set_max_possible( Histogram::default_max_possible()); + if (FLAGS_client_core_list.size() > 0) { + auto v = IntParse(FLAGS_client_core_list); + for (size_t i=0; i 0) { + auto v = IntParse(FLAGS_server_core_list); + for (size_t i=0; i CreateClient(const ClientConfig& config) { abort(); } -static void LimitCores(int cores) {} - static std::unique_ptr CreateServer(const ServerConfig& config) { - if (config.core_limit() > 0) { - LimitCores(config.core_limit()); - } switch (config.server_type()) { case ServerType::SYNC_SERVER: return CreateSynchronousServer(config); diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 620bc32f4b..474473a7d0 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -34,11 +34,13 @@ #ifndef TEST_QPS_SERVER_H #define TEST_QPS_SERVER_H +#include #include #include #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" +#include "test/cpp/qps/coresched.h" #include "test/cpp/qps/timer.h" #include "src/proto/grpc/testing/messages.grpc.pb.h" #include "src/proto/grpc/testing/control.grpc.pb.h" @@ -49,6 +51,16 @@ namespace testing { class Server { public: explicit Server(const ServerConfig& config) : timer_(new Timer) { + int clsize = config.core_list_size(); + if (clsize > 0) { + std::vector core_list; + for (int i = 0; i < clsize; i++) { + core_list.push_back(config.core_list(i)); + } + cores_ = LimitCores(core_list); + } else { + cores_ = gpr_cpu_num_cores(); + } if (config.port()) { port_ = config.port(); } else { @@ -87,7 +99,7 @@ class Server { } int port() const { return port_; } - int cores() const { return gpr_cpu_num_cores(); } + int cores() const { return cores_;} static std::shared_ptr CreateServerCredentials( const ServerConfig& config) { if (config.has_security_params()) { @@ -104,6 +116,7 @@ class Server { private: int port_; + int cores_; std::unique_ptr timer_; }; -- cgit v1.2.3 From 3448c27275aced9d591b70358d130f3059499b73 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 13 Jan 2016 11:17:13 -0800 Subject: clang-format --- test/cpp/qps/client.h | 6 +++--- test/cpp/qps/coresched.cc | 4 ++-- test/cpp/qps/coresched.h | 2 +- test/cpp/qps/qps_driver.cc | 6 +++--- test/cpp/qps/qps_worker.cc | 14 +++++++------- test/cpp/qps/server.h | 10 +++++----- 6 files changed, 21 insertions(+), 21 deletions(-) (limited to 'test') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index a81155c242..c6d3bb54d5 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -40,13 +40,13 @@ #include +#include "src/proto/grpc/testing/payloads.grpc.pb.h" +#include "src/proto/grpc/testing/services.grpc.pb.h" +#include "test/cpp/qps/coresched.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" -#include "test/cpp/qps/coresched.h" #include "test/cpp/qps/timer.h" #include "test/cpp/util/create_test_channel.h" -#include "src/proto/grpc/testing/payloads.grpc.pb.h" -#include "src/proto/grpc/testing/services.grpc.pb.h" namespace grpc { diff --git a/test/cpp/qps/coresched.cc b/test/cpp/qps/coresched.cc index 8a7b4d4e67..4b59227bd8 100644 --- a/test/cpp/qps/coresched.cc +++ b/test/cpp/qps/coresched.cc @@ -33,10 +33,10 @@ #include "test/cpp/qps/coresched.h" -#include #include #include #include +#include namespace grpc { namespace testing { @@ -66,7 +66,7 @@ int LimitCores(std::vector cores) { } #else // LimitCores is not currently supported for non-Linux platforms -int LimitCores(std::vector core_vec) {return gpr_cpu_num_cores();} +int LimitCores(std::vector core_vec) { return gpr_cpu_num_cores(); } #endif } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/coresched.h b/test/cpp/qps/coresched.h index 38f6859636..ec32a1d7fc 100644 --- a/test/cpp/qps/coresched.h +++ b/test/cpp/qps/coresched.h @@ -42,4 +42,4 @@ int LimitCores(std::vector core_vec); } // namespace testing } // namespace grpc -#endif // TEST_QPS_CORESCHED_H +#endif // TEST_QPS_CORESCHED_H diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 6eafabcc2c..691ad0b816 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -96,7 +96,7 @@ static std::vector IntParse(const std::string& s) { res.push_back(std::stoi(s.substr(pos))); break; } else { - res.push_back(std::stoi(s.substr(pos, comma-pos), nullptr)); + res.push_back(std::stoi(s.substr(pos, comma - pos), nullptr)); pos = comma + 1; } } @@ -161,7 +161,7 @@ static void QpsDriver() { if (FLAGS_client_core_list.size() > 0) { auto v = IntParse(FLAGS_client_core_list); - for (size_t i=0; i 0) { auto v = IntParse(FLAGS_server_core_list); - for (size_t i=0; i #include #include +#include #include #include #include -#include +#include +#include +#include +#include #include #include #include -#include #include -#include -#include -#include -#include +#include +#include "src/proto/grpc/testing/services.pb.h" #include "test/core/util/grpc_profiler.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/server.h" #include "test/cpp/util/create_test_channel.h" -#include "src/proto/grpc/testing/services.pb.h" namespace grpc { namespace testing { diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 474473a7d0..3d2ffa8710 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -34,16 +34,16 @@ #ifndef TEST_QPS_SERVER_H #define TEST_QPS_SERVER_H -#include -#include #include +#include +#include +#include "src/proto/grpc/testing/control.grpc.pb.h" +#include "src/proto/grpc/testing/messages.grpc.pb.h" #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" #include "test/cpp/qps/coresched.h" #include "test/cpp/qps/timer.h" -#include "src/proto/grpc/testing/messages.grpc.pb.h" -#include "src/proto/grpc/testing/control.grpc.pb.h" namespace grpc { namespace testing { @@ -99,7 +99,7 @@ class Server { } int port() const { return port_; } - int cores() const { return cores_;} + int cores() const { return cores_; } static std::shared_ptr CreateServerCredentials( const ServerConfig& config) { if (config.has_security_params()) { -- cgit v1.2.3 From 861eb9f13a1437542a381e466f6e5796dafa0941 Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 19 Jan 2016 13:37:27 -0800 Subject: Fix copyrights for sanity --- test/cpp/qps/coresched.cc | 2 +- test/cpp/qps/coresched.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'test') diff --git a/test/cpp/qps/coresched.cc b/test/cpp/qps/coresched.cc index 4b59227bd8..9dc9fd5841 100644 --- a/test/cpp/qps/coresched.cc +++ b/test/cpp/qps/coresched.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/test/cpp/qps/coresched.h b/test/cpp/qps/coresched.h index ec32a1d7fc..e1012781b1 100644 --- a/test/cpp/qps/coresched.h +++ b/test/cpp/qps/coresched.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without -- cgit v1.2.3 From 4f5146dac201c234e4aefdad118413ba3aa76bd6 Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 19 Jan 2016 15:28:52 -0800 Subject: Change name from coresched to limit_cores --- test/cpp/qps/client.h | 2 +- test/cpp/qps/coresched.cc | 72 --------------------------------------------- test/cpp/qps/coresched.h | 45 ---------------------------- test/cpp/qps/limit_cores.cc | 72 +++++++++++++++++++++++++++++++++++++++++++++ test/cpp/qps/limit_cores.h | 45 ++++++++++++++++++++++++++++ test/cpp/qps/server.h | 2 +- 6 files changed, 119 insertions(+), 119 deletions(-) delete mode 100644 test/cpp/qps/coresched.cc delete mode 100644 test/cpp/qps/coresched.h create mode 100644 test/cpp/qps/limit_cores.cc create mode 100644 test/cpp/qps/limit_cores.h (limited to 'test') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 2cb4e094c0..576adeb256 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -45,7 +45,7 @@ #include "src/proto/grpc/testing/payloads.grpc.pb.h" #include "src/proto/grpc/testing/services.grpc.pb.h" -#include "test/cpp/qps/coresched.h" +#include "test/cpp/qps/limit_cores.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" diff --git a/test/cpp/qps/coresched.cc b/test/cpp/qps/coresched.cc deleted file mode 100644 index 9dc9fd5841..0000000000 --- a/test/cpp/qps/coresched.cc +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "test/cpp/qps/coresched.h" - -#include -#include -#include -#include - -namespace grpc { -namespace testing { - -#ifdef GPR_CPU_LINUX -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#endif -#include -int LimitCores(std::vector cores) { - size_t num_cores = static_cast(gpr_cpu_num_cores()); - if (num_cores > cores.size()) { - cpu_set_t *cpup = CPU_ALLOC(num_cores); - GPR_ASSERT(cpup); - size_t size = CPU_ALLOC_SIZE(num_cores); - CPU_ZERO_S(size, cpup); - - for (size_t i = 0; i < cores.size(); i++) { - CPU_SET_S(cores[i], size, cpup); - } - GPR_ASSERT(sched_setaffinity(0, size, cpup) == 0); - CPU_FREE(cpup); - return cores.size(); - } else { - return num_cores; - } -} -#else -// LimitCores is not currently supported for non-Linux platforms -int LimitCores(std::vector core_vec) { return gpr_cpu_num_cores(); } -#endif -} // namespace testing -} // namespace grpc diff --git a/test/cpp/qps/coresched.h b/test/cpp/qps/coresched.h deleted file mode 100644 index e1012781b1..0000000000 --- a/test/cpp/qps/coresched.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef TEST_QPS_CORESCHED_H -#define TEST_QPS_CORESCHED_H - -#include - -namespace grpc { -namespace testing { -int LimitCores(std::vector core_vec); -} // namespace testing -} // namespace grpc - -#endif // TEST_QPS_CORESCHED_H diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc new file mode 100644 index 0000000000..0ba46d3d0a --- /dev/null +++ b/test/cpp/qps/limit_cores.cc @@ -0,0 +1,72 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/cpp/qps/limit_cores.h" + +#include +#include +#include +#include + +namespace grpc { +namespace testing { + +#ifdef GPR_CPU_LINUX +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include +int LimitCores(std::vector cores) { + size_t num_cores = static_cast(gpr_cpu_num_cores()); + if (num_cores > cores.size()) { + cpu_set_t *cpup = CPU_ALLOC(num_cores); + GPR_ASSERT(cpup); + size_t size = CPU_ALLOC_SIZE(num_cores); + CPU_ZERO_S(size, cpup); + + for (size_t i = 0; i < cores.size(); i++) { + CPU_SET_S(cores[i], size, cpup); + } + GPR_ASSERT(sched_setaffinity(0, size, cpup) == 0); + CPU_FREE(cpup); + return cores.size(); + } else { + return num_cores; + } +} +#else +// LimitCores is not currently supported for non-Linux platforms +int LimitCores(std::vector core_vec) { return gpr_cpu_num_cores(); } +#endif +} // namespace testing +} // namespace grpc diff --git a/test/cpp/qps/limit_cores.h b/test/cpp/qps/limit_cores.h new file mode 100644 index 0000000000..e1012781b1 --- /dev/null +++ b/test/cpp/qps/limit_cores.h @@ -0,0 +1,45 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef TEST_QPS_CORESCHED_H +#define TEST_QPS_CORESCHED_H + +#include + +namespace grpc { +namespace testing { +int LimitCores(std::vector core_vec); +} // namespace testing +} // namespace grpc + +#endif // TEST_QPS_CORESCHED_H diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 0c6e264a50..6195e6ce63 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -42,7 +42,7 @@ #include "src/proto/grpc/testing/messages.grpc.pb.h" #include "test/core/end2end/data/ssl_test_data.h" #include "test/core/util/port.h" -#include "test/cpp/qps/coresched.h" +#include "test/cpp/qps/limit_cores.h" #include "test/cpp/qps/timer.h" namespace grpc { -- cgit v1.2.3 From 41859d025b2596230b1d5a355e7e03a6dfddd20d Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 19 Jan 2016 15:30:22 -0800 Subject: Added todo as a reminder to drive core selection automatically --- test/cpp/qps/qps_driver.cc | 2 ++ 1 file changed, 2 insertions(+) (limited to 'test') diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 3d81e62957..e770f92d3f 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -51,6 +51,7 @@ DEFINE_int32(local_workers, 0, "Number of local workers to start"); // Server config DEFINE_int32(async_server_threads, 1, "Number of threads for async servers"); DEFINE_string(server_type, "SYNC_SERVER", "Server type"); +// TODO (vpai): Automatically generate the core list to avoid breakage DEFINE_string(server_core_list, "", "Comma-separated list of cores for server"); // Client config @@ -74,6 +75,7 @@ DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)"); DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)"); DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value"); +// TODO (vpai): Automatically generate the core list to avoid breakage DEFINE_string(client_core_list, "", "Comma-separated list of cores for client"); DEFINE_bool(secure_test, false, "Run a secure test"); -- cgit v1.2.3 From f524844da919716bc5563bd60f33836d7ba78877 Mon Sep 17 00:00:00 2001 From: vjpai Date: Tue, 19 Jan 2016 15:34:14 -0800 Subject: Fix include guard --- test/cpp/qps/limit_cores.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'test') diff --git a/test/cpp/qps/limit_cores.h b/test/cpp/qps/limit_cores.h index e1012781b1..54c805216c 100644 --- a/test/cpp/qps/limit_cores.h +++ b/test/cpp/qps/limit_cores.h @@ -31,8 +31,8 @@ * */ -#ifndef TEST_QPS_CORESCHED_H -#define TEST_QPS_CORESCHED_H +#ifndef TEST_QPS_LIMIT_CORES_H +#define TEST_QPS_LIMIT_CORES_H #include @@ -42,4 +42,4 @@ int LimitCores(std::vector core_vec); } // namespace testing } // namespace grpc -#endif // TEST_QPS_CORESCHED_H +#endif // TEST_QPS_LIMIT_CORES_H -- cgit v1.2.3 From c64736d852991c734938a16d58cb5861b7014c07 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 09:33:11 -0800 Subject: Clean-up core list usage and make it possible to reset the core list --- src/proto/grpc/testing/control.proto | 3 +-- test/cpp/qps/client.h | 2 ++ test/cpp/qps/limit_cores.cc | 34 +++++++++++++++++++++------------- test/cpp/qps/limit_cores.h | 5 ++++- test/cpp/qps/qps_worker.cc | 4 ---- test/cpp/qps/server.h | 12 ++---------- 6 files changed, 30 insertions(+), 30 deletions(-) (limited to 'test') diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index b2b83af3df..40e8a97471 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -134,8 +134,7 @@ message ServerConfig { int32 port = 4; // Only for async server. Number of threads used to serve the requests. int32 async_server_threads = 7; - // restrict core usage, currently unused - int32 core_limit = 8; + // payload config, used in generic server PayloadConfig payload_config = 9; // Specify the cores we should run the server on, if desired diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 576adeb256..7d5f6466f9 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -324,6 +324,8 @@ class ClientImpl : public Client { std::function(std::shared_ptr)> create_stub) : channels_(config.client_channels()), create_stub_(create_stub) { + LimitCores(config.core_list().data(), config.core_list_size()); + for (int i = 0; i < config.client_channels(); i++) { channels_[i].init(config.server_targets(i % config.server_targets_size()), config, create_stub_); diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc index 0ba46d3d0a..5fd8d555a5 100644 --- a/test/cpp/qps/limit_cores.cc +++ b/test/cpp/qps/limit_cores.cc @@ -46,23 +46,31 @@ namespace testing { #define _GNU_SOURCE #endif #include -int LimitCores(std::vector cores) { - size_t num_cores = static_cast(gpr_cpu_num_cores()); - if (num_cores > cores.size()) { - cpu_set_t *cpup = CPU_ALLOC(num_cores); - GPR_ASSERT(cpup); - size_t size = CPU_ALLOC_SIZE(num_cores); - CPU_ZERO_S(size, cpup); +int LimitCores(const int *cores, int cores_size) { + int num_cores = gpr_cpu_num_cores(); + int cores_set = 0; - for (size_t i = 0; i < cores.size(); i++) { - CPU_SET_S(cores[i], size, cpup); + cpu_set_t *cpup = CPU_ALLOC(num_cores); + GPR_ASSERT(cpup); + size_t size = CPU_ALLOC_SIZE(num_cores); + CPU_ZERO_S(size, cpup); + + if (cores_size > 0) { + for (int i = 0; i < cores_size; i++) { + if (cores[i] < num_cores) { + CPU_SET_S(cores[i], size, cpup); + cores_set++; + } } - GPR_ASSERT(sched_setaffinity(0, size, cpup) == 0); - CPU_FREE(cpup); - return cores.size(); } else { - return num_cores; + for (int i = 0; i < num_cores; i++) { + CPU_SET_S(i, size, cpup); + cores_set++; + } } + GPR_ASSERT(sched_setaffinity(0, size, cpup) == 0); + CPU_FREE(cpup); + return cores_set; } #else // LimitCores is not currently supported for non-Linux platforms diff --git a/test/cpp/qps/limit_cores.h b/test/cpp/qps/limit_cores.h index 54c805216c..5467f3b881 100644 --- a/test/cpp/qps/limit_cores.h +++ b/test/cpp/qps/limit_cores.h @@ -38,7 +38,10 @@ namespace grpc { namespace testing { -int LimitCores(std::vector core_vec); +// LimitCores takes array and size arguments (instead of vector) for more direct +// conversion from repeated field of protobuf. Use a cores_size of 0 to remove +// existing limits (from an empty repeated field) +int LimitCores(const int *cores, int cores_size); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 5cb5850fd4..6289c1a843 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -87,10 +87,6 @@ static std::unique_ptr CreateServer(const ServerConfig& config) { gpr_log(GPR_INFO, "Starting server of type %s", ServerType_Name(config.server_type()).c_str()); - if (config.core_limit() > 0) { - LimitCores(config.core_limit()); - } - switch (config.server_type()) { case ServerType::SYNC_SERVER: return CreateSynchronousServer(config); diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index bc6f9f99e3..94a6f8acfa 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -51,18 +51,10 @@ namespace testing { class Server { public: explicit Server(const ServerConfig& config) : timer_(new Timer) { - int clsize = config.core_list_size(); - if (clsize > 0) { - std::vector core_list; - for (int i = 0; i < clsize; i++) { - core_list.push_back(config.core_list(i)); - } - cores_ = LimitCores(core_list); - } else { - cores_ = gpr_cpu_num_cores(); - } + cores_ = LimitCores(config.core_list().data(), config.core_list_size()); if (config.port()) { port_ = config.port(); + } else { port_ = grpc_pick_unused_port_or_die(); } -- cgit v1.2.3 From c5eec2be89bb201f84b7278da5522a205dc3620b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 09:46:30 -0800 Subject: Make dynamic sizing of async threads work again --- test/cpp/qps/client.h | 3 ++- test/cpp/qps/client_async.cc | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'test') diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 7d5f6466f9..50b2bf2514 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -324,7 +324,7 @@ class ClientImpl : public Client { std::function(std::shared_ptr)> create_stub) : channels_(config.client_channels()), create_stub_(create_stub) { - LimitCores(config.core_list().data(), config.core_list_size()); + cores_ = LimitCores(config.core_list().data(), config.core_list_size()); for (int i = 0; i < config.client_channels(); i++) { channels_[i].init(config.server_targets(i % config.server_targets_size()), @@ -337,6 +337,7 @@ class ClientImpl : public Client { virtual ~ClientImpl() {} protected: + int cores_; RequestType request_; class ClientChannelInfo { diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 4229e1956e..f3f8f37051 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -159,6 +159,7 @@ class AsyncClient : public ClientImpl { using Client::SetupLoadTest; using Client::NextIssueTime; using Client::closed_loop_; + using ClientImpl::cores_; using ClientImpl::channels_; using ClientImpl::request_; AsyncClient(const ClientConfig& config, @@ -345,11 +346,11 @@ class AsyncClient : public ClientImpl { private: bool val_; }; - static int NumThreads(const ClientConfig& config) { + int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing - num_threads = gpr_cpu_num_cores(); - gpr_log(GPR_INFO, "Sizing client server to %d threads", num_threads); + num_threads = cores_; + gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads); } return num_threads; } -- cgit v1.2.3 From 7d45cdb60b1f6f9cd8750f405d74362600b82c16 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 13:00:19 -0800 Subject: Enable properly working core limits on clients and servers, and determine these dynamically if only one or the other is specified but both are running on the same host --- src/proto/grpc/testing/control.proto | 12 +++- src/proto/grpc/testing/services.proto | 3 + test/cpp/qps/driver.cc | 126 ++++++++++++++++++++++++++++++---- test/cpp/qps/qps_driver.cc | 36 ++-------- test/cpp/qps/qps_worker.cc | 7 ++ 5 files changed, 140 insertions(+), 44 deletions(-) (limited to 'test') diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 40e8a97471..c857e4d210 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -110,6 +110,7 @@ message ClientConfig { // Specify the cores we should run the client on, if desired repeated int32 core_list = 13; + int32 core_limit = 14; } message ClientStatus { ClientStats stats = 1; } @@ -139,6 +140,7 @@ message ServerConfig { // Specify the cores we should run the server on, if desired repeated int32 core_list = 10; + int32 core_limit = 11; } message ServerArgs { @@ -152,6 +154,14 @@ message ServerStatus { ServerStats stats = 1; // the port bound by the server int32 port = 2; - // Number of cores on the server. See gpr_cpu_num_cores. + // Number of cores available to the server int32 cores = 3; } + +message CoreRequest { +} + +message CoreResponse { + // Number of cores available on the server + int32 cores = 1; +} diff --git a/src/proto/grpc/testing/services.proto b/src/proto/grpc/testing/services.proto index af285ceab8..59269a23ca 100644 --- a/src/proto/grpc/testing/services.proto +++ b/src/proto/grpc/testing/services.proto @@ -62,4 +62,7 @@ service WorkerService { // and once the shutdown has finished, the OK status is sent to terminate // this RPC. rpc RunClient(stream ClientArgs) returns (stream ClientStatus); + + // Just return the core count - unary call + rpc CoreCount(CoreRequest) returns (CoreResponse); } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 490156aec2..f3b92c8082 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -59,7 +60,42 @@ using std::vector; namespace grpc { namespace testing { -static deque get_hosts(const string& name) { +static std::string get_host(const std::string &worker) { + char *host; + char *port; + + gpr_split_host_port(worker.c_str(), &host, &port); + string s(host); + + gpr_free(host); + gpr_free(port); + return s; +} + +static std::unordered_map> + get_hosts_and_cores(const deque& workers) { + std::unordered_map> hosts; + for (auto it = workers.begin(); it != workers.end(); it++) { + string host = get_host(*it); + if (hosts.find(host) == hosts.end()) { + auto stub = WorkerService::NewStub( + CreateChannel(*it, InsecureChannelCredentials())); + grpc::ClientContext ctx; + CoreRequest dummy; + CoreResponse cores; + grpc::Status s = stub->CoreCount(&ctx, dummy, &cores); + assert(s.ok()); + std::deque dq; + for (int i=0; i get_workers(const string& name) { char* env = gpr_getenv(name.c_str()); if (!env) return deque(); @@ -105,7 +141,7 @@ struct ClientData { std::unique_ptr RunScenario( const ClientConfig& initial_client_config, size_t num_clients, - const ServerConfig& server_config, size_t num_servers, int warmup_seconds, + const ServerConfig& initial_server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { // ClientContext allocations (all are destroyed at scope exit) list contexts; @@ -113,10 +149,10 @@ std::unique_ptr RunScenario( // To be added to the result, containing the final configuration used for // client and config (including host, etc.) ClientConfig result_client_config; - ServerConfig result_server_config; + ServerConfig result_server_config = initial_server_config; // Get client, server lists - auto workers = get_hosts("QPS_WORKERS"); + auto workers = get_workers("QPS_WORKERS"); ClientConfig client_config = initial_client_config; // Spawn some local workers if desired @@ -143,6 +179,9 @@ std::unique_ptr RunScenario( } } + // Setup the hosts and core counts + auto hosts_cores = get_hosts_and_cores(workers); + // if num_clients is set to <=0, do dynamic sizing: all workers // except for servers are clients if (num_clients <= 0) { @@ -172,18 +211,49 @@ std::unique_ptr RunScenario( i); servers[i].stub = WorkerService::NewStub( CreateChannel(workers[i], InsecureChannelCredentials())); + + ServerConfig server_config = initial_server_config; + char* host; + char* driver_port; + char* cli_target; + gpr_split_host_port(workers[i].c_str(), &host, &driver_port); + string host_str(host); + int server_core_limit = initial_server_config.core_limit(); + int client_core_limit = initial_client_config.core_limit(); + + if (server_core_limit == 0 && client_core_limit > 0) { + // In this case, limit the server cores if it matches the + // same host as one or more clients + const auto& dq = hosts_cores[host_str]; + bool match = false; + int limit = dq.size(); + for (size_t cli = 0; cli < num_clients; cli++) { + if (host_str == get_host(workers[cli+num_servers])) { + limit -= client_core_limit; + match = true; + } + } + if (match) { + GPR_ASSERT(limit > 0); + server_core_limit = limit; + } + } + if (server_core_limit > 0) { + auto& dq = hosts_cores[host_str]; + GPR_ASSERT(dq.size() >= static_cast(server_core_limit)); + for (int core=0; core < server_core_limit; core++) { + server_config.add_core_list(dq.front()); + dq.pop_front(); + } + } + ServerArgs args; - result_server_config = server_config; *args.mutable_setup() = server_config; servers[i].stream = servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline)); GPR_ASSERT(servers[i].stream->Write(args)); ServerStatus init_status; GPR_ASSERT(servers[i].stream->Read(&init_status)); - char* host; - char* driver_port; - char* cli_target; - gpr_split_host_port(workers[i].c_str(), &host, &driver_port); gpr_join_host_port(&cli_target, host, init_status.port()); client_config.add_server_targets(cli_target); gpr_free(host); @@ -191,19 +261,49 @@ std::unique_ptr RunScenario( gpr_free(cli_target); } + // Targets are all set by now + result_client_config = client_config; // Start clients using runsc::ClientData; // clients is array rather than std::vector to avoid gcc-4.4 issues // where class contained in std::vector must have a copy constructor auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { + const auto& worker = workers[i + num_servers]; gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", - workers[i + num_servers].c_str(), i + num_servers); + worker.c_str(), i + num_servers); clients[i].stub = WorkerService::NewStub( - CreateChannel(workers[i + num_servers], InsecureChannelCredentials())); + CreateChannel(worker, InsecureChannelCredentials())); + ClientConfig per_client_config = client_config; + + int server_core_limit = initial_server_config.core_limit(); + int client_core_limit = initial_client_config.core_limit(); + if ((server_core_limit > 0) || (client_core_limit > 0)) { + auto& dq = hosts_cores[get_host(worker)]; + if (client_core_limit == 0) { + // limit client cores if it matches a server host + bool match = false; + int limit = dq.size(); + for (size_t srv = 0; srv < num_servers; srv++) { + if (get_host(worker) == get_host(workers[srv])) { + match = true; + } + } + if (match) { + client_core_limit = limit; + } + } + if (client_core_limit > 0) { + GPR_ASSERT(dq.size() >= static_cast(client_core_limit)); + for (int core=0; core < client_core_limit; core++) { + per_client_config.add_core_list(dq.front()); + dq.pop_front(); + } + } + } + ClientArgs args; - result_client_config = client_config; - *args.mutable_setup() = client_config; + *args.mutable_setup() = per_client_config; clients[i].stream = clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline)); GPR_ASSERT(clients[i].stream->Write(args)); diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index 6462050b6c..ffc8a83fc5 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -51,8 +51,7 @@ DEFINE_int32(local_workers, 0, "Number of local workers to start"); // Server config DEFINE_int32(async_server_threads, 1, "Number of threads for async servers"); DEFINE_string(server_type, "SYNC_SERVER", "Server type"); -// TODO (vpai): Automatically generate the core list to avoid breakage -DEFINE_string(server_core_list, "", "Comma-separated list of cores for server"); +DEFINE_int32(server_core_limit, -1, "Limit on server cores to use"); // Client config DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING"); @@ -75,8 +74,7 @@ DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)"); DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)"); DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value"); -// TODO (vpai): Automatically generate the core list to avoid breakage -DEFINE_string(client_core_list, "", "Comma-separated list of cores for client"); +DEFINE_int32(client_core_limit, -1, "Limit on client cores to use"); DEFINE_bool(secure_test, false, "Run a secure test"); @@ -91,22 +89,6 @@ using grpc::testing::SecurityParams; namespace grpc { namespace testing { -static std::vector IntParse(const std::string& s) { - size_t pos = 0; - std::vector res; - while (pos < s.size()) { - size_t comma = s.find(',', pos); - if (comma == std::string::npos) { - res.push_back(std::stoi(s.substr(pos))); - break; - } else { - res.push_back(std::stoi(s.substr(pos, comma - pos), nullptr)); - pos = comma + 1; - } - } - return res; -} - static void QpsDriver() { RpcType rpc_type; GPR_ASSERT(RpcType_Parse(FLAGS_rpc_type, &rpc_type)); @@ -170,22 +152,16 @@ static void QpsDriver() { client_config.mutable_histogram_params()->set_max_possible( Histogram::default_max_possible()); - if (FLAGS_client_core_list.size() > 0) { - auto v = IntParse(FLAGS_client_core_list); - for (size_t i = 0; i < v.size(); i++) { - client_config.add_core_list(v[i]); - } + if (FLAGS_client_core_limit > 0) { + client_config.set_core_limit(FLAGS_client_core_limit); } ServerConfig server_config; server_config.set_server_type(server_type); server_config.set_async_server_threads(FLAGS_async_server_threads); - if (FLAGS_server_core_list.size() > 0) { - auto v = IntParse(FLAGS_server_core_list); - for (size_t i = 0; i < v.size(); i++) { - server_config.add_core_list(v[i]); - } + if (FLAGS_server_core_limit > 0) { + server_config.set_core_limit(FLAGS_server_core_limit); } if (FLAGS_secure_test) { diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 6289c1a843..d0adbb1a54 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -47,6 +47,7 @@ #include #include #include +#include #include #include #include @@ -133,6 +134,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { return ret; } + Status CoreCount(ServerContext *ctx, const CoreRequest*, + CoreResponse* resp) GRPC_OVERRIDE { + resp->set_cores(gpr_cpu_num_cores()); + return Status::OK; + } + private: // Protect against multiple clients using this worker at once. class InstanceGuard { -- cgit v1.2.3 From 6b05639ab1fbde6efe4bd702f6267230f470425b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 13:01:34 -0800 Subject: Make sure that client limit is valid --- test/cpp/qps/driver.cc | 1 + 1 file changed, 1 insertion(+) (limited to 'test') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index f3b92c8082..a00d8643f4 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -290,6 +290,7 @@ std::unique_ptr RunScenario( } } if (match) { + GPR_ASSERT(limit > 0); client_core_limit = limit; } } -- cgit v1.2.3 From 595674275bbf54033bebdca9029e0df6245ee67c Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 13:28:55 -0800 Subject: Copyright and clang-format --- src/proto/grpc/testing/services.proto | 2 +- test/cpp/qps/driver.cc | 28 ++++++++++++++-------------- test/cpp/qps/qps_worker.cc | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) (limited to 'test') diff --git a/src/proto/grpc/testing/services.proto b/src/proto/grpc/testing/services.proto index 59269a23ca..4c8e32bb8f 100644 --- a/src/proto/grpc/testing/services.proto +++ b/src/proto/grpc/testing/services.proto @@ -1,4 +1,4 @@ -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index a00d8643f4..57b85b107f 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -60,9 +60,9 @@ using std::vector; namespace grpc { namespace testing { -static std::string get_host(const std::string &worker) { - char *host; - char *port; +static std::string get_host(const std::string& worker) { + char* host; + char* port; gpr_split_host_port(worker.c_str(), &host, &port); string s(host); @@ -72,9 +72,9 @@ static std::string get_host(const std::string &worker) { return s; } -static std::unordered_map> - get_hosts_and_cores(const deque& workers) { - std::unordered_map> hosts; +static std::unordered_map> get_hosts_and_cores( + const deque& workers) { + std::unordered_map> hosts; for (auto it = workers.begin(); it != workers.end(); it++) { string host = get_host(*it); if (hosts.find(host) == hosts.end()) { @@ -86,7 +86,7 @@ static std::unordered_map> grpc::Status s = stub->CoreCount(&ctx, dummy, &cores); assert(s.ok()); std::deque dq; - for (int i=0; i RunScenario( const ClientConfig& initial_client_config, size_t num_clients, - const ServerConfig& initial_server_config, size_t num_servers, int warmup_seconds, - int benchmark_seconds, int spawn_local_worker_count) { + const ServerConfig& initial_server_config, size_t num_servers, + int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { // ClientContext allocations (all are destroyed at scope exit) list contexts; @@ -228,7 +228,7 @@ std::unique_ptr RunScenario( bool match = false; int limit = dq.size(); for (size_t cli = 0; cli < num_clients; cli++) { - if (host_str == get_host(workers[cli+num_servers])) { + if (host_str == get_host(workers[cli + num_servers])) { limit -= client_core_limit; match = true; } @@ -241,7 +241,7 @@ std::unique_ptr RunScenario( if (server_core_limit > 0) { auto& dq = hosts_cores[host_str]; GPR_ASSERT(dq.size() >= static_cast(server_core_limit)); - for (int core=0; core < server_core_limit; core++) { + for (int core = 0; core < server_core_limit; core++) { server_config.add_core_list(dq.front()); dq.pop_front(); } @@ -270,8 +270,8 @@ std::unique_ptr RunScenario( auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { const auto& worker = workers[i + num_servers]; - gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", - worker.c_str(), i + num_servers); + gpr_log(GPR_INFO, "Starting client on %s (worker #%d)", worker.c_str(), + i + num_servers); clients[i].stub = WorkerService::NewStub( CreateChannel(worker, InsecureChannelCredentials())); ClientConfig per_client_config = client_config; @@ -296,7 +296,7 @@ std::unique_ptr RunScenario( } if (client_core_limit > 0) { GPR_ASSERT(dq.size() >= static_cast(client_core_limit)); - for (int core=0; core < client_core_limit; core++) { + for (int core = 0; core < client_core_limit; core++) { per_client_config.add_core_list(dq.front()); dq.pop_front(); } diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index d0adbb1a54..7e9e05f7ec 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -134,7 +134,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { return ret; } - Status CoreCount(ServerContext *ctx, const CoreRequest*, + Status CoreCount(ServerContext* ctx, const CoreRequest*, CoreResponse* resp) GRPC_OVERRIDE { resp->set_cores(gpr_cpu_num_cores()); return Status::OK; -- cgit v1.2.3 From 0d7a070e7e51f87bbcdaf72b5a25d0430f82093b Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 14:10:47 -0800 Subject: Add core-limited scenario and protobuf-based QPS scenario --- test/cpp/qps/qps-sweep.sh | 14 ++++++++++++++ 1 file changed, 14 insertions(+) (limited to 'test') diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh index 333f4bd7d0..539da1d893 100755 --- a/test/cpp/qps/qps-sweep.sh +++ b/test/cpp/qps/qps-sweep.sh @@ -57,6 +57,20 @@ for secure in true false; do --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 + # Scenario 2b: QPS with a single server core + "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ + --client_channels=64 --bbuf_req_size=0 --bbuf_resp_size=0 \ + --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ + --num_servers=1 --num_clients=0 --server_core_limit=1 + + # Scenario 2c: protobuf-based QPS + "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ + --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=100 \ + --client_channels=64 --simple_req_size=0 --simple_resp_size=0 \ + --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ + --num_servers=1 --num_clients=0 + # Scenario 3: Latency at near-peak load (TBD) # Scenario 4: Single-channel bidirectional throughput test (like TCP_STREAM). -- cgit v1.2.3 From 33e51184fcb4bb021a6b2878d90a2d19c53821ff Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 16:40:06 -0800 Subject: Address reviewer comments regarding const and shortage of comments --- test/cpp/qps/driver.cc | 10 +++++----- test/cpp/qps/limit_cores.cc | 4 ++-- test/cpp/qps/limit_cores.h | 9 ++++++--- 3 files changed, 13 insertions(+), 10 deletions(-) (limited to 'test') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 57b85b107f..9eef4076d9 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -65,7 +65,7 @@ static std::string get_host(const std::string& worker) { char* port; gpr_split_host_port(worker.c_str(), &host, &port); - string s(host); + const string s(host); gpr_free(host); gpr_free(port); @@ -76,7 +76,7 @@ static std::unordered_map> get_hosts_and_cores( const deque& workers) { std::unordered_map> hosts; for (auto it = workers.begin(); it != workers.end(); it++) { - string host = get_host(*it); + const string host = get_host(*it); if (hosts.find(host) == hosts.end()) { auto stub = WorkerService::NewStub( CreateChannel(*it, InsecureChannelCredentials())); @@ -149,7 +149,7 @@ std::unique_ptr RunScenario( // To be added to the result, containing the final configuration used for // client and config (including host, etc.) ClientConfig result_client_config; - ServerConfig result_server_config = initial_server_config; + const ServerConfig result_server_config = initial_server_config; // Get client, server lists auto workers = get_workers("QPS_WORKERS"); @@ -224,7 +224,7 @@ std::unique_ptr RunScenario( if (server_core_limit == 0 && client_core_limit > 0) { // In this case, limit the server cores if it matches the // same host as one or more clients - const auto& dq = hosts_cores[host_str]; + const auto& dq = hosts_cores.at(host_str); bool match = false; int limit = dq.size(); for (size_t cli = 0; cli < num_clients; cli++) { @@ -239,7 +239,7 @@ std::unique_ptr RunScenario( } } if (server_core_limit > 0) { - auto& dq = hosts_cores[host_str]; + auto& dq = hosts_cores.at(host_str); GPR_ASSERT(dq.size() >= static_cast(server_core_limit)); for (int core = 0; core < server_core_limit; core++) { server_config.add_core_list(dq.front()); diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc index 5fd8d555a5..c2f3ad8fde 100644 --- a/test/cpp/qps/limit_cores.cc +++ b/test/cpp/qps/limit_cores.cc @@ -47,12 +47,12 @@ namespace testing { #endif #include int LimitCores(const int *cores, int cores_size) { - int num_cores = gpr_cpu_num_cores(); + const int num_cores = gpr_cpu_num_cores(); int cores_set = 0; cpu_set_t *cpup = CPU_ALLOC(num_cores); GPR_ASSERT(cpup); - size_t size = CPU_ALLOC_SIZE(num_cores); + const size_t size = CPU_ALLOC_SIZE(num_cores); CPU_ZERO_S(size, cpup); if (cores_size > 0) { diff --git a/test/cpp/qps/limit_cores.h b/test/cpp/qps/limit_cores.h index 5467f3b881..5c0d1e315d 100644 --- a/test/cpp/qps/limit_cores.h +++ b/test/cpp/qps/limit_cores.h @@ -38,9 +38,12 @@ namespace grpc { namespace testing { -// LimitCores takes array and size arguments (instead of vector) for more direct -// conversion from repeated field of protobuf. Use a cores_size of 0 to remove -// existing limits (from an empty repeated field) +/// LimitCores: allow this worker to only run on the cores specified in the +/// array \a cores, which is of length \a cores_size. +/// +/// LimitCores takes array and size arguments (instead of vector) for direct +/// conversion from repeated field of protobuf. Use a cores_size of 0 to remove +/// existing limits (from an empty repeated field) int LimitCores(const int *cores, int cores_size); } // namespace testing } // namespace grpc -- cgit v1.2.3 From daadcc8f745fff4149c37a4ba9467d91694a2c8e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 1 Feb 2016 16:49:55 -0800 Subject: at --- test/cpp/qps/driver.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test') diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 9eef4076d9..c70b0303b8 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -279,7 +279,7 @@ std::unique_ptr RunScenario( int server_core_limit = initial_server_config.core_limit(); int client_core_limit = initial_client_config.core_limit(); if ((server_core_limit > 0) || (client_core_limit > 0)) { - auto& dq = hosts_cores[get_host(worker)]; + auto& dq = hosts_cores.at(get_host(worker)); if (client_core_limit == 0) { // limit client cores if it matches a server host bool match = false; -- cgit v1.2.3