diff options
author | 2016-11-30 13:41:53 -0800 | |
---|---|---|
committer | 2016-11-30 13:41:53 -0800 | |
commit | 36b3135929cf1561d35039fc9e04e038f5351ed7 (patch) | |
tree | b20b4f6ab4043fae9a170adb711b488be0bd1a59 /test/cpp | |
parent | 0f97958b64a792e551aa3bde84bb8f53b04de3b4 (diff) | |
parent | 369ddc524a6ca55afb8e8bd6743ed5624e1a94ce (diff) |
Merge branch 'slice_interning' into metadata_filter
Diffstat (limited to 'test/cpp')
-rw-r--r-- | test/cpp/grpclb/grpclb_test.cc | 59 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_fullstack.cc | 15 | ||||
-rwxr-xr-x | test/cpp/qps/gen_build_yaml.py | 61 | ||||
-rw-r--r-- | test/cpp/qps/json_run_localhost.cc | 73 |
4 files changed, 151 insertions, 57 deletions
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 57a53ca11e..fcdcaba6a2 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -523,9 +523,8 @@ static void perform_request(client_fixture *cf) { CQ_EXPECT_COMPLETION(cqv, tag(2), 1); cq_verify(cqv); + gpr_log(GPR_INFO, "Client after sending msg %d / 4", i + 1); GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD)); - GPR_ASSERT(grpc_channel_check_connectivity_state( - cf->client, 0 /* try to connect */) == GRPC_CHANNEL_READY); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(response_payload_recv); @@ -546,16 +545,17 @@ static void perform_request(client_fixture *cf) { cq_verify(cqv); peer = grpc_call_get_peer(c); gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer); - gpr_free(peer); grpc_call_destroy(c); - cq_verify_empty_timeout(cqv, 1); + cq_verify_empty_timeout(cqv, 1 /* seconds */); cq_verifier_destroy(cqv); grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&trailing_metadata_recv); gpr_free(details); + gpr_log(GPR_INFO, "Client call (peer %s) DESTROYED.", peer); + gpr_free(peer); } static void setup_client(const char *server_hostport, client_fixture *cf) { @@ -699,39 +699,42 @@ static test_fixture test_update(int lb_server_update_delay_ms) { TEST(GrpclbTest, Updates) { grpc::test_fixture tf_result; - // Clients take a bit over one second to complete a call (the last part of the + // Clients take at least one second to complete a call (the last part of the // call sleeps for 1 second while verifying the client's completion queue is - // empty). Therefore: + // empty), more if the system is under load. Therefore: // // If the LB server waits 800ms before sending an update, it will arrive - // before the first client request is done, skipping the second server from - // batch 1 altogether: the 2nd client request will go to the 1st server of - // batch 2 (ie, the third one out of the four total servers). + // before the first client request finishes, skipping the second server from + // batch 1. All subsequent picks will come from the second half of the + // backends, those coming in the LB update. tf_result = grpc::test_update(800); GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0); - GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2); - GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced + + tf_result.lb_backends[3].num_calls_serviced > + 0); + int num_serviced_calls = 0; + for (int i = 0; i < 4; i++) { + num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced; + } + GPR_ASSERT(num_serviced_calls == 4); - // If the LB server waits 1500ms, the update arrives after having picked the - // 2nd server from batch 1 but before the next pick for the first server of - // batch 2. All server are used. - tf_result = grpc::test_update(1500); - GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); - GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); - GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1); - GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); - - // If the LB server waits > 2000ms, the update arrives after the first two - // request are done and the third pick is performed, which returns, in RR - // fashion, the 1st server of the 1st update. Therefore, the second server of - // batch 1 is hit at least one, whereas the first server of batch 2 is never - // hit. + // If the LB server waits 2500ms, the update arrives after two calls and three + // picks. The third pick will be the 1st server of the 1st update (RR policy + // going around). The fourth and final pick will come from the second LB + // update. In any case, the total number of serviced calls must again be equal + // to four across all the backends. tf_result = grpc::test_update(2500); GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1); - GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0); - GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0); - GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0); + GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced + + tf_result.lb_backends[3].num_calls_serviced > + 0); + num_serviced_calls = 0; + for (int i = 0; i < 4; i++) { + num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced; + } + GPR_ASSERT(num_serviced_calls == 4); } TEST(GrpclbTest, InvalidAddressInServerlist) {} diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc index cd8087e65b..e7b05b803f 100644 --- a/test/cpp/microbenchmarks/bm_fullstack.cc +++ b/test/cpp/microbenchmarks/bm_fullstack.cc @@ -71,6 +71,8 @@ static class InitializeStuff { rq_ = grpc_resource_quota_create("bm"); } + ~InitializeStuff() { init_lib_.shutdown(); } + grpc_resource_quota* rq() { return rq_; } private: @@ -126,7 +128,16 @@ class TCP : public FullstackFixture { class UDS : public FullstackFixture { public: - UDS(Service* service) : FullstackFixture(service, "unix:bm_fullstack") {} + UDS(Service* service) : FullstackFixture(service, MakeAddress()) {} + + private: + static grpc::string MakeAddress() { + int port = grpc_pick_unused_port_or_die(); // just for a unique id - not a + // real port + std::stringstream addr; + addr << "unix:/tmp/bm_fullstack." << port; + return addr.str(); + } }; class EndpointPairFixture { @@ -221,7 +232,7 @@ class InProcessCHTTP2 : public EndpointPairFixture { * CONTEXT MUTATORS */ -static const int kPregenerateKeyCount = 1000000; +static const int kPregenerateKeyCount = 100000; template <class F> auto MakeVector(size_t length, F f) -> std::vector<decltype(f())> { diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py index e4d9e7ac58..4aa58d2737 100755 --- a/test/cpp/qps/gen_build_yaml.py +++ b/test/cpp/qps/gen_build_yaml.py @@ -43,28 +43,38 @@ sys.path.append(run_tests_root) import performance.scenario_config as scenario_config -def _scenario_json_string(scenario_json): +configs_from_yaml = yaml.load(open(os.path.join(os.path.dirname(sys.argv[0]), '../../../build.yaml')))['configs'].keys() + +def mutate_scenario(scenario_json, is_tsan): # tweak parameters to get fast test times + scenario_json = dict(scenario_json) scenario_json['warmup_seconds'] = 0 scenario_json['benchmark_seconds'] = 1 - scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]} + outstanding_rpcs_divisor = 1 + if is_tsan and ( + scenario_json['client_config']['client_type'] == 'SYNC_CLIENT' or + scenario_json['server_config']['server_type'] == 'SYNC_SERVER'): + outstanding_rpcs_divisor = 10 + scenario_json['client_config']['outstanding_rpcs_per_channel'] = max(1, + int(scenario_json['client_config']['outstanding_rpcs_per_channel'] / outstanding_rpcs_divisor)) + return scenario_json + +def _scenario_json_string(scenario_json, is_tsan): + scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(mutate_scenario(scenario_json, is_tsan))]} return json.dumps(scenarios_json) -def threads_of_type(scenario_json, path): - d = scenario_json - for el in path.split('/'): - if el not in d: - return 0 - d = d[el] - return d +def threads_required(scenario_json, where, is_tsan): + scenario_json = mutate_scenario(scenario_json, is_tsan) + if scenario_json['%s_config' % where]['%s_type' % where] == 'ASYNC_%s' % where.upper(): + return scenario_json['%s_config' % where].get('async_%s_threads' % where, 0) + return scenario_json['client_config']['outstanding_rpcs_per_channel'] * scenario_json['client_config']['client_channels'] -def guess_cpu(scenario_json): - client = threads_of_type(scenario_json, 'client_config/async_client_threads') - server = threads_of_type(scenario_json, 'server_config/async_server_threads') +def guess_cpu(scenario_json, is_tsan): + client = threads_required(scenario_json, 'client', is_tsan) + server = threads_required(scenario_json, 'server', is_tsan) # make an arbitrary guess if set to auto-detect # about the size of the jenkins instances we have for unit tests - if client == 0: client = 8 - if server == 0: server = 8 + if client == 0 or server == 0: return 'capacity' return (scenario_json['num_clients'] * client + scenario_json['num_servers'] * server) @@ -73,15 +83,32 @@ print yaml.dump({ { 'name': 'json_run_localhost', 'shortname': 'json_run_localhost:%s' % scenario_json['name'], - 'args': ['--scenarios_json', _scenario_json_string(scenario_json)], + 'args': ['--scenarios_json', _scenario_json_string(scenario_json, False)], + 'ci_platforms': ['linux'], + 'platforms': ['linux'], + 'flaky': False, + 'language': 'c++', + 'boringssl': True, + 'defaults': 'boringssl', + 'cpu_cost': guess_cpu(scenario_json, False), + 'exclude_configs': ['tsan'], + 'timeout_seconds': 6*60 + } + for scenario_json in scenario_config.CXXLanguage().scenarios() + if 'scalable' in scenario_json.get('CATEGORIES', []) + ] + [ + { + 'name': 'json_run_localhost', + 'shortname': 'json_run_localhost:%s' % scenario_json['name'], + 'args': ['--scenarios_json', _scenario_json_string(scenario_json, True)], 'ci_platforms': ['linux'], 'platforms': ['linux'], 'flaky': False, 'language': 'c++', 'boringssl': True, 'defaults': 'boringssl', - 'cpu_cost': guess_cpu(scenario_json), - 'exclude_configs': [], + 'cpu_cost': guess_cpu(scenario_json, True), + 'exclude_configs': sorted(c for c in configs_from_yaml if c != 'tsan'), 'timeout_seconds': 6*60 } for scenario_json in scenario_config.CXXLanguage().scenarios() diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 74e40fbf1a..b7b2553f12 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -31,7 +31,11 @@ * */ +#include <signal.h> +#include <string.h> + #include <memory> +#include <mutex> #include <sstream> #include <string> @@ -43,6 +47,11 @@ using grpc::SubProcess; +constexpr auto kNumWorkers = 2; + +static SubProcess* g_driver; +static SubProcess* g_workers[kNumWorkers]; + template <class T> std::string as_string(const T& val) { std::ostringstream out; @@ -50,9 +59,38 @@ std::string as_string(const T& val) { return out.str(); } +static void sighandler(int sig) { + const int errno_saved = errno; + if (g_driver != NULL) g_driver->Interrupt(); + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) g_workers[i]->Interrupt(); + } + errno = errno_saved; +} + +static void register_sighandler() { + struct sigaction act; + memset(&act, 0, sizeof(act)); + act.sa_handler = sighandler; + + sigaction(SIGINT, &act, NULL); + sigaction(SIGTERM, &act, NULL); +} + +static void LogStatus(int status, const char* label) { + if (WIFEXITED(status)) { + gpr_log(GPR_INFO, "%s: subprocess exited with status %d", label, + WEXITSTATUS(status)); + } else if (WIFSIGNALED(status)) { + gpr_log(GPR_INFO, "%s: subprocess terminated with signal %d", label, + WTERMSIG(status)); + } else { + gpr_log(GPR_INFO, "%s: unknown subprocess status: %d", label, status); + } +} + int main(int argc, char** argv) { - typedef std::unique_ptr<SubProcess> SubProcessPtr; - std::vector<SubProcessPtr> jobs; + register_sighandler(); std::string my_bin = argv[0]; std::string bin_dir = my_bin.substr(0, my_bin.rfind('/')); @@ -60,11 +98,11 @@ int main(int argc, char** argv) { std::ostringstream env; bool first = true; - for (int i = 0; i < 2; i++) { - auto port = grpc_pick_unused_port_or_die(); + for (int i = 0; i < kNumWorkers; i++) { + const auto port = grpc_pick_unused_port_or_die(); std::vector<std::string> args = {bin_dir + "/qps_worker", "-driver_port", as_string(port)}; - jobs.emplace_back(new SubProcess(args)); + g_workers[i] = new SubProcess(args); if (!first) env << ","; env << "localhost:" << port; first = false; @@ -75,12 +113,27 @@ int main(int argc, char** argv) { for (int i = 1; i < argc; i++) { args.push_back(argv[i]); } - GPR_ASSERT(SubProcess(args).Join() == 0); - for (auto it = jobs.begin(); it != jobs.end(); ++it) { - (*it)->Interrupt(); + g_driver = new SubProcess(args); + const int driver_join_status = g_driver->Join(); + if (driver_join_status != 0) { + LogStatus(driver_join_status, "driver"); } - for (auto it = jobs.begin(); it != jobs.end(); ++it) { - (*it)->Join(); + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) g_workers[i]->Interrupt(); } + + for (int i = 0; i < kNumWorkers; ++i) { + if (g_workers[i]) { + const int worker_status = g_workers[i]->Join(); + if (worker_status != 0) { + LogStatus(worker_status, "worker"); + } + } + } + + delete g_driver; + g_driver = NULL; + for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i]; + GPR_ASSERT(driver_join_status == 0); } |