aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/cpp
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-11-30 13:41:53 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-11-30 13:41:53 -0800
commit36b3135929cf1561d35039fc9e04e038f5351ed7 (patch)
treeb20b4f6ab4043fae9a170adb711b488be0bd1a59 /test/cpp
parent0f97958b64a792e551aa3bde84bb8f53b04de3b4 (diff)
parent369ddc524a6ca55afb8e8bd6743ed5624e1a94ce (diff)
Merge branch 'slice_interning' into metadata_filter
Diffstat (limited to 'test/cpp')
-rw-r--r--test/cpp/grpclb/grpclb_test.cc59
-rw-r--r--test/cpp/microbenchmarks/bm_fullstack.cc15
-rwxr-xr-xtest/cpp/qps/gen_build_yaml.py61
-rw-r--r--test/cpp/qps/json_run_localhost.cc73
4 files changed, 151 insertions, 57 deletions
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index 57a53ca11e..fcdcaba6a2 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -523,9 +523,8 @@ static void perform_request(client_fixture *cf) {
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
cq_verify(cqv);
+ gpr_log(GPR_INFO, "Client after sending msg %d / 4", i + 1);
GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, PAYLOAD));
- GPR_ASSERT(grpc_channel_check_connectivity_state(
- cf->client, 0 /* try to connect */) == GRPC_CHANNEL_READY);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(response_payload_recv);
@@ -546,16 +545,17 @@ static void perform_request(client_fixture *cf) {
cq_verify(cqv);
peer = grpc_call_get_peer(c);
gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
- gpr_free(peer);
grpc_call_destroy(c);
- cq_verify_empty_timeout(cqv, 1);
+ cq_verify_empty_timeout(cqv, 1 /* seconds */);
cq_verifier_destroy(cqv);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
gpr_free(details);
+ gpr_log(GPR_INFO, "Client call (peer %s) DESTROYED.", peer);
+ gpr_free(peer);
}
static void setup_client(const char *server_hostport, client_fixture *cf) {
@@ -699,39 +699,42 @@ static test_fixture test_update(int lb_server_update_delay_ms) {
TEST(GrpclbTest, Updates) {
grpc::test_fixture tf_result;
- // Clients take a bit over one second to complete a call (the last part of the
+ // Clients take at least one second to complete a call (the last part of the
// call sleeps for 1 second while verifying the client's completion queue is
- // empty). Therefore:
+ // empty), more if the system is under load. Therefore:
//
// If the LB server waits 800ms before sending an update, it will arrive
- // before the first client request is done, skipping the second server from
- // batch 1 altogether: the 2nd client request will go to the 1st server of
- // batch 2 (ie, the third one out of the four total servers).
+ // before the first client request finishes, skipping the second server from
+ // batch 1. All subsequent picks will come from the second half of the
+ // backends, those coming in the LB update.
tf_result = grpc::test_update(800);
GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0);
- GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2);
- GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced +
+ tf_result.lb_backends[3].num_calls_serviced >
+ 0);
+ int num_serviced_calls = 0;
+ for (int i = 0; i < 4; i++) {
+ num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced;
+ }
+ GPR_ASSERT(num_serviced_calls == 4);
- // If the LB server waits 1500ms, the update arrives after having picked the
- // 2nd server from batch 1 but before the next pick for the first server of
- // batch 2. All server are used.
- tf_result = grpc::test_update(1500);
- GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
- GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
- GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
- GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
-
- // If the LB server waits > 2000ms, the update arrives after the first two
- // request are done and the third pick is performed, which returns, in RR
- // fashion, the 1st server of the 1st update. Therefore, the second server of
- // batch 1 is hit at least one, whereas the first server of batch 2 is never
- // hit.
+ // If the LB server waits 2500ms, the update arrives after two calls and three
+ // picks. The third pick will be the 1st server of the 1st update (RR policy
+ // going around). The fourth and final pick will come from the second LB
+ // update. In any case, the total number of serviced calls must again be equal
+ // to four across all the backends.
tf_result = grpc::test_update(2500);
GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1);
- GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0);
- GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0);
- GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced +
+ tf_result.lb_backends[3].num_calls_serviced >
+ 0);
+ num_serviced_calls = 0;
+ for (int i = 0; i < 4; i++) {
+ num_serviced_calls += tf_result.lb_backends[i].num_calls_serviced;
+ }
+ GPR_ASSERT(num_serviced_calls == 4);
}
TEST(GrpclbTest, InvalidAddressInServerlist) {}
diff --git a/test/cpp/microbenchmarks/bm_fullstack.cc b/test/cpp/microbenchmarks/bm_fullstack.cc
index cd8087e65b..e7b05b803f 100644
--- a/test/cpp/microbenchmarks/bm_fullstack.cc
+++ b/test/cpp/microbenchmarks/bm_fullstack.cc
@@ -71,6 +71,8 @@ static class InitializeStuff {
rq_ = grpc_resource_quota_create("bm");
}
+ ~InitializeStuff() { init_lib_.shutdown(); }
+
grpc_resource_quota* rq() { return rq_; }
private:
@@ -126,7 +128,16 @@ class TCP : public FullstackFixture {
class UDS : public FullstackFixture {
public:
- UDS(Service* service) : FullstackFixture(service, "unix:bm_fullstack") {}
+ UDS(Service* service) : FullstackFixture(service, MakeAddress()) {}
+
+ private:
+ static grpc::string MakeAddress() {
+ int port = grpc_pick_unused_port_or_die(); // just for a unique id - not a
+ // real port
+ std::stringstream addr;
+ addr << "unix:/tmp/bm_fullstack." << port;
+ return addr.str();
+ }
};
class EndpointPairFixture {
@@ -221,7 +232,7 @@ class InProcessCHTTP2 : public EndpointPairFixture {
* CONTEXT MUTATORS
*/
-static const int kPregenerateKeyCount = 1000000;
+static const int kPregenerateKeyCount = 100000;
template <class F>
auto MakeVector(size_t length, F f) -> std::vector<decltype(f())> {
diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py
index e4d9e7ac58..4aa58d2737 100755
--- a/test/cpp/qps/gen_build_yaml.py
+++ b/test/cpp/qps/gen_build_yaml.py
@@ -43,28 +43,38 @@ sys.path.append(run_tests_root)
import performance.scenario_config as scenario_config
-def _scenario_json_string(scenario_json):
+configs_from_yaml = yaml.load(open(os.path.join(os.path.dirname(sys.argv[0]), '../../../build.yaml')))['configs'].keys()
+
+def mutate_scenario(scenario_json, is_tsan):
# tweak parameters to get fast test times
+ scenario_json = dict(scenario_json)
scenario_json['warmup_seconds'] = 0
scenario_json['benchmark_seconds'] = 1
- scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]}
+ outstanding_rpcs_divisor = 1
+ if is_tsan and (
+ scenario_json['client_config']['client_type'] == 'SYNC_CLIENT' or
+ scenario_json['server_config']['server_type'] == 'SYNC_SERVER'):
+ outstanding_rpcs_divisor = 10
+ scenario_json['client_config']['outstanding_rpcs_per_channel'] = max(1,
+ int(scenario_json['client_config']['outstanding_rpcs_per_channel'] / outstanding_rpcs_divisor))
+ return scenario_json
+
+def _scenario_json_string(scenario_json, is_tsan):
+ scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(mutate_scenario(scenario_json, is_tsan))]}
return json.dumps(scenarios_json)
-def threads_of_type(scenario_json, path):
- d = scenario_json
- for el in path.split('/'):
- if el not in d:
- return 0
- d = d[el]
- return d
+def threads_required(scenario_json, where, is_tsan):
+ scenario_json = mutate_scenario(scenario_json, is_tsan)
+ if scenario_json['%s_config' % where]['%s_type' % where] == 'ASYNC_%s' % where.upper():
+ return scenario_json['%s_config' % where].get('async_%s_threads' % where, 0)
+ return scenario_json['client_config']['outstanding_rpcs_per_channel'] * scenario_json['client_config']['client_channels']
-def guess_cpu(scenario_json):
- client = threads_of_type(scenario_json, 'client_config/async_client_threads')
- server = threads_of_type(scenario_json, 'server_config/async_server_threads')
+def guess_cpu(scenario_json, is_tsan):
+ client = threads_required(scenario_json, 'client', is_tsan)
+ server = threads_required(scenario_json, 'server', is_tsan)
# make an arbitrary guess if set to auto-detect
# about the size of the jenkins instances we have for unit tests
- if client == 0: client = 8
- if server == 0: server = 8
+ if client == 0 or server == 0: return 'capacity'
return (scenario_json['num_clients'] * client +
scenario_json['num_servers'] * server)
@@ -73,15 +83,32 @@ print yaml.dump({
{
'name': 'json_run_localhost',
'shortname': 'json_run_localhost:%s' % scenario_json['name'],
- 'args': ['--scenarios_json', _scenario_json_string(scenario_json)],
+ 'args': ['--scenarios_json', _scenario_json_string(scenario_json, False)],
+ 'ci_platforms': ['linux'],
+ 'platforms': ['linux'],
+ 'flaky': False,
+ 'language': 'c++',
+ 'boringssl': True,
+ 'defaults': 'boringssl',
+ 'cpu_cost': guess_cpu(scenario_json, False),
+ 'exclude_configs': ['tsan'],
+ 'timeout_seconds': 6*60
+ }
+ for scenario_json in scenario_config.CXXLanguage().scenarios()
+ if 'scalable' in scenario_json.get('CATEGORIES', [])
+ ] + [
+ {
+ 'name': 'json_run_localhost',
+ 'shortname': 'json_run_localhost:%s' % scenario_json['name'],
+ 'args': ['--scenarios_json', _scenario_json_string(scenario_json, True)],
'ci_platforms': ['linux'],
'platforms': ['linux'],
'flaky': False,
'language': 'c++',
'boringssl': True,
'defaults': 'boringssl',
- 'cpu_cost': guess_cpu(scenario_json),
- 'exclude_configs': [],
+ 'cpu_cost': guess_cpu(scenario_json, True),
+ 'exclude_configs': sorted(c for c in configs_from_yaml if c != 'tsan'),
'timeout_seconds': 6*60
}
for scenario_json in scenario_config.CXXLanguage().scenarios()
diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc
index 74e40fbf1a..b7b2553f12 100644
--- a/test/cpp/qps/json_run_localhost.cc
+++ b/test/cpp/qps/json_run_localhost.cc
@@ -31,7 +31,11 @@
*
*/
+#include <signal.h>
+#include <string.h>
+
#include <memory>
+#include <mutex>
#include <sstream>
#include <string>
@@ -43,6 +47,11 @@
using grpc::SubProcess;
+constexpr auto kNumWorkers = 2;
+
+static SubProcess* g_driver;
+static SubProcess* g_workers[kNumWorkers];
+
template <class T>
std::string as_string(const T& val) {
std::ostringstream out;
@@ -50,9 +59,38 @@ std::string as_string(const T& val) {
return out.str();
}
+static void sighandler(int sig) {
+ const int errno_saved = errno;
+ if (g_driver != NULL) g_driver->Interrupt();
+ for (int i = 0; i < kNumWorkers; ++i) {
+ if (g_workers[i]) g_workers[i]->Interrupt();
+ }
+ errno = errno_saved;
+}
+
+static void register_sighandler() {
+ struct sigaction act;
+ memset(&act, 0, sizeof(act));
+ act.sa_handler = sighandler;
+
+ sigaction(SIGINT, &act, NULL);
+ sigaction(SIGTERM, &act, NULL);
+}
+
+static void LogStatus(int status, const char* label) {
+ if (WIFEXITED(status)) {
+ gpr_log(GPR_INFO, "%s: subprocess exited with status %d", label,
+ WEXITSTATUS(status));
+ } else if (WIFSIGNALED(status)) {
+ gpr_log(GPR_INFO, "%s: subprocess terminated with signal %d", label,
+ WTERMSIG(status));
+ } else {
+ gpr_log(GPR_INFO, "%s: unknown subprocess status: %d", label, status);
+ }
+}
+
int main(int argc, char** argv) {
- typedef std::unique_ptr<SubProcess> SubProcessPtr;
- std::vector<SubProcessPtr> jobs;
+ register_sighandler();
std::string my_bin = argv[0];
std::string bin_dir = my_bin.substr(0, my_bin.rfind('/'));
@@ -60,11 +98,11 @@ int main(int argc, char** argv) {
std::ostringstream env;
bool first = true;
- for (int i = 0; i < 2; i++) {
- auto port = grpc_pick_unused_port_or_die();
+ for (int i = 0; i < kNumWorkers; i++) {
+ const auto port = grpc_pick_unused_port_or_die();
std::vector<std::string> args = {bin_dir + "/qps_worker", "-driver_port",
as_string(port)};
- jobs.emplace_back(new SubProcess(args));
+ g_workers[i] = new SubProcess(args);
if (!first) env << ",";
env << "localhost:" << port;
first = false;
@@ -75,12 +113,27 @@ int main(int argc, char** argv) {
for (int i = 1; i < argc; i++) {
args.push_back(argv[i]);
}
- GPR_ASSERT(SubProcess(args).Join() == 0);
- for (auto it = jobs.begin(); it != jobs.end(); ++it) {
- (*it)->Interrupt();
+ g_driver = new SubProcess(args);
+ const int driver_join_status = g_driver->Join();
+ if (driver_join_status != 0) {
+ LogStatus(driver_join_status, "driver");
}
- for (auto it = jobs.begin(); it != jobs.end(); ++it) {
- (*it)->Join();
+ for (int i = 0; i < kNumWorkers; ++i) {
+ if (g_workers[i]) g_workers[i]->Interrupt();
}
+
+ for (int i = 0; i < kNumWorkers; ++i) {
+ if (g_workers[i]) {
+ const int worker_status = g_workers[i]->Join();
+ if (worker_status != 0) {
+ LogStatus(worker_status, "worker");
+ }
+ }
+ }
+
+ delete g_driver;
+ g_driver = NULL;
+ for (int i = 0; i < kNumWorkers; ++i) delete g_workers[i];
+ GPR_ASSERT(driver_join_status == 0);
}