diff options
author | Craig Tiller <ctiller@google.com> | 2016-12-27 08:38:35 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-12-27 08:38:35 -0800 |
commit | 0704727c2d5bcdbf7a0c60de5b810ed79186f59e (patch) | |
tree | f09e088a030e6fb29fc82527d6e8c6b480561c98 /test/cpp | |
parent | 6997c3473b0e152b00cb5696cf5717e5adb37cb6 (diff) | |
parent | 22b28264f9242b6d049f9e6d5b792bd5be048a97 (diff) |
Merge github.com:grpc/grpc into slice_with_exec_ctx
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/common/channel_filter_test.cc | 18 | ||||
-rw-r--r-- | test/cpp/end2end/filter_end2end_test.cc | 9 | ||||
-rw-r--r-- | test/cpp/grpclb/grpclb_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/interop/stress_test.cc | 4 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 128 | ||||
-rw-r--r-- | test/cpp/qps/driver.h | 4 | ||||
-rw-r--r-- | test/cpp/qps/qps_json_driver.cc | 18 |
7 files changed, 115 insertions, 68 deletions
diff --git a/test/cpp/common/channel_filter_test.cc b/test/cpp/common/channel_filter_test.cc index 600a953d82..32246a4b76 100644 --- a/test/cpp/common/channel_filter_test.cc +++ b/test/cpp/common/channel_filter_test.cc @@ -41,14 +41,24 @@ namespace testing { class MyChannelData : public ChannelData { public: - MyChannelData(const grpc_channel_args& args, const char* peer) - : ChannelData(args, peer) {} + MyChannelData() {} + + grpc_error* Init(grpc_exec_ctx* exec_ctx, + grpc_channel_element_args* args) override { + (void)args->channel_args; // Make sure field is available. + return GRPC_ERROR_NONE; + } }; class MyCallData : public CallData { public: - explicit MyCallData(const ChannelData& channel_data) - : CallData(channel_data) {} + MyCallData() {} + + grpc_error* Init(grpc_exec_ctx* exec_ctx, ChannelData* channel_data, + grpc_call_element_args* args) override { + (void)args->path; // Make sure field is available. + return GRPC_ERROR_NONE; + } }; // This test ensures that when we make changes to the filter API in diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index ab6ed46de5..bd384f68b4 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -114,20 +114,17 @@ int GetCallCounterValue() { class ChannelDataImpl : public ChannelData { public: - ChannelDataImpl(const grpc_channel_args& args, const char* peer) - : ChannelData(args, peer) { + grpc_error* Init(grpc_exec_ctx* exec_ctx, grpc_channel_element_args* args) { IncrementConnectionCounter(); + return GRPC_ERROR_NONE; } }; class CallDataImpl : public CallData { public: - explicit CallDataImpl(const ChannelDataImpl& channel_data) - : CallData(channel_data) {} - void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, TransportStreamOp* op) override { - // Incrementing the counter could be done from the ctor, but we want + // Incrementing the counter could be done from Init(), but we want // to test that the individual methods are actually called correctly. if (op->recv_initial_metadata() != nullptr) IncrementCallCounter(); grpc_call_next_op(exec_ctx, elem, op->op()); diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index fcdcaba6a2..de304b9f89 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -659,7 +659,7 @@ static test_fixture setup_test_fixture(int lb_server_update_delay_ms) { char *server_uri; // The grpclb LB policy will be automatically selected by virtue of // the fact that the returned addresses are balancer addresses. - gpr_asprintf(&server_uri, "test:%s?lb_enabled=1", + gpr_asprintf(&server_uri, "test:///%s?lb_enabled=1", tf.lb_server.servers_hostport); setup_client(server_uri, &tf.client); gpr_free(server_uri); diff --git a/test/cpp/interop/stress_test.cc b/test/cpp/interop/stress_test.cc index fc35db5233..97e658869f 100644 --- a/test/cpp/interop/stress_test.cc +++ b/test/cpp/interop/stress_test.cc @@ -371,9 +371,9 @@ int main(int argc, char** argv) { } // Start metrics server before waiting for the stress test threads + std::unique_ptr<grpc::Server> metrics_server; if (FLAGS_metrics_port > 0) { - std::unique_ptr<grpc::Server> metrics_server = - metrics_service.StartServer(FLAGS_metrics_port); + metrics_server = metrics_service.StartServer(FLAGS_metrics_port); } // Wait for the stress test threads to complete diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 22b2cd080d..93ef32db77 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -44,6 +44,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/env.h" @@ -99,23 +100,36 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores( return hosts; } -static deque<string> get_workers(const string& name) { - char* env = gpr_getenv(name.c_str()); - if (!env || strlen(env) == 0) return deque<string>(); - +static deque<string> get_workers(const string& env_name) { + char* env = gpr_getenv(env_name.c_str()); + if (!env) { + env = gpr_strdup(""); + } deque<string> out; char* p = env; - for (;;) { - char* comma = strchr(p, ','); - if (comma) { - out.emplace_back(p, comma); - p = comma + 1; - } else { - out.emplace_back(p); - gpr_free(env); - return out; + if (strlen(env) != 0) { + for (;;) { + char* comma = strchr(p, ','); + if (comma) { + out.emplace_back(p, comma); + p = comma + 1; + } else { + out.emplace_back(p); + break; + } } } + if (out.size() == 0) { + gpr_log(GPR_ERROR, + "Environment variable \"%s\" does not contain a list of QPS " + "workers to use. Set it to a comma-separated list of " + "hostname:port pairs, starting with hosts that should act as " + "servers. E.g. export " + "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"", + env_name.c_str(), env_name.c_str()); + } + gpr_free(env); + return out; } // helpers for postprocess_scenario_result @@ -195,7 +209,8 @@ static void postprocess_scenario_result(ScenarioResult* result) { std::unique_ptr<ScenarioResult> RunScenario( const ClientConfig& initial_client_config, size_t num_clients, const ServerConfig& initial_server_config, size_t num_servers, - int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count) { + int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, + const char* qps_server_target_override, bool configure_core_lists) { // Log everything from the driver gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); @@ -240,9 +255,7 @@ std::unique_ptr<ScenarioResult> RunScenario( workers.push_back(addr); } } - - // Setup the hosts and core counts - auto hosts_cores = get_hosts_and_cores(workers); + GPR_ASSERT(workers.size() != 0); // if num_clients is set to <=0, do dynamic sizing: all workers // except for servers are clients @@ -264,6 +277,11 @@ std::unique_ptr<ScenarioResult> RunScenario( unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; }; std::vector<ServerData> servers(num_servers); + std::unordered_map<string, std::deque<int>> hosts_cores; + + if (configure_core_lists) { + hosts_cores = get_hosts_and_cores(workers); + } for (size_t i = 0; i < num_servers; i++) { gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")", workers[i].c_str(), i); @@ -271,37 +289,36 @@ std::unique_ptr<ScenarioResult> RunScenario( CreateChannel(workers[i], InsecureChannelCredentials())); ServerConfig server_config = initial_server_config; - char* host; - char* driver_port; - char* cli_target; - gpr_split_host_port(workers[i].c_str(), &host, &driver_port); - string host_str(host); int server_core_limit = initial_server_config.core_limit(); int client_core_limit = initial_client_config.core_limit(); - if (server_core_limit == 0 && client_core_limit > 0) { - // In this case, limit the server cores if it matches the - // same host as one or more clients - const auto& dq = hosts_cores.at(host_str); - bool match = false; - int limit = dq.size(); - for (size_t cli = 0; cli < num_clients; cli++) { - if (host_str == get_host(workers[cli + num_servers])) { - limit -= client_core_limit; - match = true; + if (configure_core_lists) { + string host_str(get_host(workers[i])); + if (server_core_limit == 0 && client_core_limit > 0) { + // In this case, limit the server cores if it matches the + // same host as one or more clients + const auto& dq = hosts_cores.at(host_str); + bool match = false; + int limit = dq.size(); + for (size_t cli = 0; cli < num_clients; cli++) { + if (host_str == get_host(workers[cli + num_servers])) { + limit -= client_core_limit; + match = true; + } + } + if (match) { + GPR_ASSERT(limit > 0); + server_core_limit = limit; } } - if (match) { - GPR_ASSERT(limit > 0); - server_core_limit = limit; - } - } - if (server_core_limit > 0) { - auto& dq = hosts_cores.at(host_str); - GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit)); - for (int core = 0; core < server_core_limit; core++) { - server_config.add_core_list(dq.front()); - dq.pop_front(); + if (server_core_limit > 0) { + auto& dq = hosts_cores.at(host_str); + GPR_ASSERT(dq.size() >= static_cast<size_t>(server_core_limit)); + gpr_log(GPR_INFO, "Setting server core_list"); + for (int core = 0; core < server_core_limit; core++) { + server_config.add_core_list(dq.front()); + dq.pop_front(); + } } } @@ -315,11 +332,19 @@ std::unique_ptr<ScenarioResult> RunScenario( if (!servers[i].stream->Read(&init_status)) { gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i); } - gpr_join_host_port(&cli_target, host, init_status.port()); - client_config.add_server_targets(cli_target); - gpr_free(host); - gpr_free(driver_port); - gpr_free(cli_target); + if (qps_server_target_override != NULL && + strlen(qps_server_target_override) > 0) { + // overriding the qps server target only works if there is 1 server + GPR_ASSERT(num_servers == 1); + client_config.add_server_targets(qps_server_target_override); + } else { + std::string host; + char* cli_target; + host = get_host(workers[i]); + gpr_join_host_port(&cli_target, host.c_str(), init_status.port()); + client_config.add_server_targets(cli_target); + gpr_free(cli_target); + } } // Targets are all set by now @@ -341,7 +366,8 @@ std::unique_ptr<ScenarioResult> RunScenario( int server_core_limit = initial_server_config.core_limit(); int client_core_limit = initial_client_config.core_limit(); - if ((server_core_limit > 0) || (client_core_limit > 0)) { + if (configure_core_lists && + ((server_core_limit > 0) || (client_core_limit > 0))) { auto& dq = hosts_cores.at(get_host(worker)); if (client_core_limit == 0) { // limit client cores if it matches a server host @@ -359,6 +385,7 @@ std::unique_ptr<ScenarioResult> RunScenario( } if (client_core_limit > 0) { GPR_ASSERT(dq.size() >= static_cast<size_t>(client_core_limit)); + gpr_log(GPR_INFO, "Setting client core_list"); for (int core = 0; core < client_core_limit; core++) { per_client_config.add_core_list(dq.front()); dq.pop_front(); @@ -548,6 +575,9 @@ bool RunQuit() { // Get client, server lists bool result = true; auto workers = get_workers("QPS_WORKERS"); + if (workers.size() == 0) { + return false; + } for (size_t i = 0; i < workers.size(); i++) { auto stub = WorkerService::NewStub( CreateChannel(workers[i], InsecureChannelCredentials())); diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 93f4370caf..b5c8152e1b 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -45,7 +45,9 @@ namespace testing { std::unique_ptr<ScenarioResult> RunScenario( const grpc::testing::ClientConfig& client_config, size_t num_clients, const grpc::testing::ServerConfig& server_config, size_t num_servers, - int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); + int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count, + const char* qps_server_target_override = "", + bool configure_core_lists = true); bool RunQuit(); } // namespace testing diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index 31b5917fb7..da835b995a 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -67,17 +67,25 @@ DEFINE_double(error_tolerance, 0.01, "range is narrower than the error_tolerance computed range, we " "stop the search."); +DEFINE_string(qps_server_target_override, "", + "Override QPS server target to configure in client configs." + "Only applicable if there is a single benchmark server."); +DEFINE_bool(configure_core_lists, true, + "Provide 'core_list' parameters to workers. Value determined " + "by cores available and 'core_limit' parameters of the scenarios."); + namespace grpc { namespace testing { static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario, bool* success) { std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n"; - auto result = - RunScenario(scenario.client_config(), scenario.num_clients(), - scenario.server_config(), scenario.num_servers(), - scenario.warmup_seconds(), scenario.benchmark_seconds(), - scenario.spawn_local_worker_count()); + auto result = RunScenario( + scenario.client_config(), scenario.num_clients(), + scenario.server_config(), scenario.num_servers(), + scenario.warmup_seconds(), scenario.benchmark_seconds(), + scenario.spawn_local_worker_count(), + FLAGS_qps_server_target_override.c_str(), FLAGS_configure_core_lists); // Amend the result with scenario config. Eventually we should adjust // RunScenario contract so we don't need to touch the result here. |