aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/core/client_config/lb_policies_test.c120
-rwxr-xr-xtest/core/end2end/gen_build_yaml.py40
-rw-r--r--test/core/end2end/tests/cancel_with_status.c183
-rw-r--r--test/core/end2end/tests/negative_deadline.c180
-rw-r--r--test/core/network_benchmarks/low_level_ping_pong.c70
-rw-r--r--test/core/surface/byte_buffer_reader_test.c35
-rw-r--r--test/cpp/end2end/mock_test.cc4
-rw-r--r--test/cpp/qps/async_streaming_ping_pong_test.cc9
-rw-r--r--test/cpp/qps/async_unary_ping_pong_test.cc9
-rw-r--r--test/cpp/qps/client.h154
-rw-r--r--test/cpp/qps/client_async.cc41
-rw-r--r--test/cpp/qps/client_sync.cc2
-rw-r--r--test/cpp/qps/driver.cc27
-rw-r--r--test/cpp/qps/driver.h8
-rw-r--r--test/cpp/qps/histogram.h4
-rw-r--r--test/cpp/qps/perf_db.proto2
-rwxr-xr-xtest/cpp/qps/qps-sweep.sh18
-rw-r--r--test/cpp/qps/qps_driver.cc119
-rw-r--r--test/cpp/qps/qps_interarrival_test.cc2
-rw-r--r--test/cpp/qps/qps_openloop_test.cc9
-rw-r--r--test/cpp/qps/qps_test.cc9
-rw-r--r--test/cpp/qps/qps_test_with_poll.cc9
-rw-r--r--test/cpp/qps/qps_worker.cc54
-rw-r--r--test/cpp/qps/qps_worker.h6
-rw-r--r--test/cpp/qps/report.cc7
-rw-r--r--test/cpp/qps/report.h1
-rw-r--r--test/cpp/qps/secure_sync_unary_ping_pong_test.cc84
-rw-r--r--test/cpp/qps/server.h52
-rw-r--r--test/cpp/qps/server_async.cc35
-rw-r--r--test/cpp/qps/server_sync.cc25
-rwxr-xr-xtest/cpp/qps/single_run_localhost.sh4
-rw-r--r--test/cpp/qps/sync_streaming_ping_pong_test.cc12
-rw-r--r--test/cpp/qps/sync_unary_ping_pong_test.cc12
-rw-r--r--test/cpp/qps/timer.cc2
-rw-r--r--test/cpp/qps/timer.h2
-rw-r--r--test/cpp/qps/worker.cc5
-rw-r--r--test/proto/benchmarks/control.proto (renamed from test/proto/qpstest.proto)148
-rw-r--r--test/proto/benchmarks/payloads.proto55
-rw-r--r--test/proto/benchmarks/services.proto55
-rw-r--r--test/proto/benchmarks/stats.proto59
40 files changed, 1143 insertions, 529 deletions
diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c
index 68e991716b..e44930ed53 100644
--- a/test/core/client_config/lb_policies_test.c
+++ b/test/core/client_config/lb_policies_test.c
@@ -119,14 +119,15 @@ static void test_spec_destroy(test_spec *spec) {
static void *tag(gpr_intptr t) { return (void *)t; }
-static gpr_timespec n_seconds_time(int n) {
- return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
+static gpr_timespec n_millis_time(int n) {
+ return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(n, GPR_TIMESPAN));
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
- ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL);
+ ev = grpc_completion_queue_next(cq, n_millis_time(5000), NULL);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
@@ -134,29 +135,47 @@ static void kill_server(const servers_fixture *f, size_t i) {
gpr_log(GPR_INFO, "KILLING SERVER %d", i);
GPR_ASSERT(f->servers[i] != NULL);
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
- GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
- NULL).type == GRPC_OP_COMPLETE);
+ GPR_ASSERT(
+ grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL)
+ .type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
f->servers[i] = NULL;
}
-static void revive_server(const servers_fixture *f, size_t i) {
+typedef struct request_data {
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ char *details;
+ size_t details_capacity;
+ grpc_status_code status;
+ grpc_call_details *call_details;
+} request_data;
+
+static void revive_server(const servers_fixture *f, request_data *rdata,
+ size_t i) {
int got_port;
gpr_log(GPR_INFO, "RAISE AGAIN SERVER %d", i);
GPR_ASSERT(f->servers[i] == NULL);
+
+ gpr_log(GPR_DEBUG, "revive: %s", f->servers_hostports[i]);
+
f->servers[i] = grpc_server_create(NULL, NULL);
grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
f->servers[i], f->servers_hostports[i])) > 0);
grpc_server_start(f->servers[i]);
+
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f->servers[i], &f->server_calls[i],
+ &rdata->call_details[i],
+ &f->request_metadata_recv[i], f->cq,
+ f->cq, tag(1000 + (int)i)));
}
static servers_fixture *setup_servers(const char *server_host,
+ request_data *rdata,
const size_t num_servers) {
servers_fixture *f = gpr_malloc(sizeof(servers_fixture));
- int *ports;
- int got_port;
size_t i;
f->num_servers = num_servers;
@@ -164,23 +183,16 @@ static servers_fixture *setup_servers(const char *server_host,
f->request_metadata_recv =
gpr_malloc(sizeof(grpc_metadata_array) * num_servers);
/* Create servers. */
- ports = gpr_malloc(sizeof(int *) * num_servers);
f->servers = gpr_malloc(sizeof(grpc_server *) * num_servers);
f->servers_hostports = gpr_malloc(sizeof(char *) * num_servers);
f->cq = grpc_completion_queue_create(NULL);
for (i = 0; i < num_servers; i++) {
- ports[i] = grpc_pick_unused_port_or_die();
-
- gpr_join_host_port(&f->servers_hostports[i], server_host, ports[i]);
-
- f->servers[i] = grpc_server_create(NULL, NULL);
- grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
- GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
- f->servers[i], f->servers_hostports[i])) > 0);
- GPR_ASSERT(ports[i] == got_port);
- grpc_server_start(f->servers[i]);
+ grpc_metadata_array_init(&f->request_metadata_recv[i]);
+ gpr_join_host_port(&f->servers_hostports[i], server_host,
+ grpc_pick_unused_port_or_die());
+ f->servers[i] = 0;
+ revive_server(f, rdata, i);
}
- gpr_free(ports);
return f;
}
@@ -191,8 +203,8 @@ static void teardown_servers(servers_fixture *f) {
if (f->servers[i] == NULL) continue;
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
- NULL).type == GRPC_OP_COMPLETE);
+ n_millis_time(5000), NULL)
+ .type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
}
grpc_completion_queue_shutdown(f->cq);
@@ -203,6 +215,7 @@ static void teardown_servers(servers_fixture *f) {
for (i = 0; i < f->num_servers; i++) {
gpr_free(f->servers_hostports[i]);
+ grpc_metadata_array_destroy(&f->request_metadata_recv[i]);
}
gpr_free(f->servers_hostports);
@@ -211,22 +224,12 @@ static void teardown_servers(servers_fixture *f) {
gpr_free(f);
}
-typedef struct request_data {
- grpc_metadata_array initial_metadata_recv;
- grpc_metadata_array trailing_metadata_recv;
- char *details;
- size_t details_capacity;
- grpc_status_code status;
- grpc_call_details *call_details;
-} request_data;
-
/** Returns connection sequence (server indices), which must be freed */
int *perform_request(servers_fixture *f, grpc_channel *client,
request_data *rdata, const test_spec *spec) {
grpc_call *c;
int s_idx;
int *s_valid;
- gpr_timespec deadline;
grpc_op ops[6];
grpc_op *op;
int was_cancelled;
@@ -237,7 +240,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
int completed_client;
s_valid = gpr_malloc(sizeof(int) * f->num_servers);
- rdata->call_details = gpr_malloc(sizeof(grpc_call_details) * f->num_servers);
connection_sequence = gpr_malloc(sizeof(int) * spec->num_iters);
/* Send a trivial request. */
@@ -253,7 +255,7 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
kill_server(f, i);
} else if (spec->revive_at[iter_num][i] != 0) {
/* killing takes precedence */
- revive_server(f, i);
+ revive_server(f, rdata, i);
}
}
@@ -266,9 +268,9 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
memset(s_valid, 0, f->num_servers * sizeof(int));
- deadline = n_seconds_time(1);
c = grpc_channel_create_call(client, NULL, GRPC_PROPAGATE_DEFAULTS, f->cq,
- "/foo", "foo.test.google.fr", deadline, NULL);
+ "/foo", "foo.test.google.fr", gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL);
GPR_ASSERT(c);
completed_client = 0;
@@ -300,22 +302,9 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL));
- /* "listen" on all servers */
- for (i = 0; i < f->num_servers; i++) {
- grpc_metadata_array_init(&f->request_metadata_recv[i]);
- if (f->servers[i] != NULL) {
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_call(f->servers[i], &f->server_calls[i],
- &rdata->call_details[i],
- &f->request_metadata_recv[i], f->cq,
- f->cq, tag(1000 + (int)i)));
- }
- }
-
s_idx = -1;
- while ((ev = grpc_completion_queue_next(
- f->cq, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(300), NULL)).type !=
- GRPC_QUEUE_TIMEOUT) {
+ while ((ev = grpc_completion_queue_next(f->cq, n_millis_time(s_idx == -1 ? 3000 : 200), NULL))
+ .type != GRPC_QUEUE_TIMEOUT) {
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
read_tag = ((int)(gpr_intptr)ev.tag);
gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d",
@@ -327,11 +316,14 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
s_valid[s_idx] = 1;
connection_sequence[iter_num] = s_idx;
} else if (read_tag == 1) {
+ gpr_log(GPR_DEBUG, "client timed out");
GPR_ASSERT(ev.success);
completed_client = 1;
}
}
+ gpr_log(GPR_DEBUG, "s_idx=%d", s_idx);
+
if (s_idx >= 0) {
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -361,25 +353,30 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
cq_verify(cqv);
+ gpr_log(GPR_DEBUG, "status=%d; %s", rdata->status, rdata->details);
GPR_ASSERT(rdata->status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == strcmp(rdata->details, "xyz"));
GPR_ASSERT(0 == strcmp(rdata->call_details[s_idx].method, "/foo"));
GPR_ASSERT(0 ==
strcmp(rdata->call_details[s_idx].host, "foo.test.google.fr"));
GPR_ASSERT(was_cancelled == 1);
+
+ grpc_call_destroy(f->server_calls[s_idx]);
+
+ /* ask for the next request on this server */
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ f->servers[s_idx], &f->server_calls[s_idx],
+ &rdata->call_details[s_idx],
+ &f->request_metadata_recv[s_idx], f->cq,
+ f->cq, tag(1000 + (int)s_idx)));
} else {
+ grpc_call_cancel(c, NULL);
if (!completed_client) {
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
}
}
- for (i = 0; i < f->num_servers; i++) {
- if (s_valid[i] != 0) {
- grpc_call_destroy(f->server_calls[i]);
- }
- grpc_metadata_array_destroy(&f->request_metadata_recv[i]);
- }
grpc_metadata_array_destroy(&rdata->initial_metadata_recv);
grpc_metadata_array_destroy(&rdata->trailing_metadata_recv);
@@ -393,7 +390,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
gpr_free(rdata->details);
}
- gpr_free(rdata->call_details);
gpr_free(s_valid);
return connection_sequence;
@@ -456,7 +452,10 @@ void run_spec(const test_spec *spec) {
char *servers_hostports_str;
int *actual_connection_sequence;
request_data rdata;
- servers_fixture *f = setup_servers("127.0.0.1", spec->num_servers);
+ servers_fixture *f;
+ rdata.call_details =
+ gpr_malloc(sizeof(grpc_call_details) * spec->num_servers);
+ f = setup_servers("127.0.0.1", &rdata, spec->num_servers);
/* Create client. */
servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports,
@@ -475,6 +474,7 @@ void run_spec(const test_spec *spec) {
gpr_free(client_hostport);
gpr_free(servers_hostports_str);
gpr_free(actual_connection_sequence);
+ gpr_free(rdata.call_details);
grpc_channel_destroy(client);
teardown_servers(f);
diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py
index af8f48576c..38d3b2218a 100755
--- a/test/core/end2end/gen_build_yaml.py
+++ b/test/core/end2end/gen_build_yaml.py
@@ -46,21 +46,21 @@ uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=Fal
# maps fixture name to whether it requires the security library
END2END_FIXTURES = {
+ 'h2_compress': default_unsecure_fixture_options,
'h2_fakesec': default_secure_fixture_options._replace(ci_mac=False),
'h2_full': default_unsecure_fixture_options,
- 'h2_uchannel': default_unsecure_fixture_options,
- 'h2_compress': default_unsecure_fixture_options,
- 'h2_uds': uds_fixture_options,
- 'h2_uds+poll': uds_fixture_options._replace(platforms=['linux']),
'h2_full+poll': default_unsecure_fixture_options._replace(platforms=['linux']),
+ 'h2_oauth2': default_secure_fixture_options._replace(ci_mac=False),
'h2_proxy': default_unsecure_fixture_options._replace(includes_proxy=True, ci_mac=False),
+ 'h2_sockpair_1byte': socketpair_unsecure_fixture_options._replace(ci_mac=False),
+ 'h2_sockpair': socketpair_unsecure_fixture_options._replace(ci_mac=False),
+ 'h2_sockpair+trace': socketpair_unsecure_fixture_options,
'h2_ssl': default_secure_fixture_options,
'h2_ssl+poll': default_secure_fixture_options._replace(platforms=['linux']),
'h2_ssl_proxy': default_secure_fixture_options._replace(includes_proxy=True, ci_mac=False),
- 'h2_oauth2': default_secure_fixture_options._replace(ci_mac=False),
- 'h2_sockpair': socketpair_unsecure_fixture_options._replace(ci_mac=False),
- 'h2_sockpair_1byte': socketpair_unsecure_fixture_options._replace(ci_mac=False),
- 'h2_sockpair+trace': socketpair_unsecure_fixture_options,
+ 'h2_uchannel': default_unsecure_fixture_options,
+ 'h2_uds+poll': uds_fixture_options._replace(platforms=['linux']),
+ 'h2_uds': uds_fixture_options,
}
TestOptions = collections.namedtuple('TestOptions', 'needs_fullstack needs_dns proxyable flaky secure')
@@ -70,38 +70,40 @@ connectivity_test_options = default_test_options._replace(needs_fullstack=True)
# maps test names to options
END2END_TESTS = {
'bad_hostname': default_test_options,
- 'cancel_after_client_done': default_test_options,
+ 'binary_metadata': default_test_options,
+ 'call_creds': default_test_options._replace(secure=True),
'cancel_after_accept': default_test_options,
+ 'cancel_after_client_done': default_test_options,
'cancel_after_invoke': default_test_options,
'cancel_before_invoke': default_test_options,
'cancel_in_a_vacuum': default_test_options,
+ 'cancel_with_status': default_test_options,
'census_simple_request': default_test_options,
'channel_connectivity': connectivity_test_options._replace(proxyable=False),
+ 'compressed_payload': default_test_options._replace(proxyable=False),
'default_host': default_test_options._replace(needs_fullstack=True, needs_dns=True),
'disappearing_server': connectivity_test_options,
- 'shutdown_finishes_calls': default_test_options,
- 'shutdown_finishes_tags': default_test_options,
'empty_batch': default_test_options,
'graceful_server_shutdown': default_test_options,
+ 'high_initial_seqno': default_test_options,
'invoke_large_request': default_test_options,
+ 'large_metadata': default_test_options,
'max_concurrent_streams': default_test_options._replace(proxyable=False),
'max_message_length': default_test_options,
+ 'metadata': default_test_options,
+ 'negative_deadline': default_test_options,
'no_op': default_test_options,
+ 'payload': default_test_options,
'ping_pong_streaming': default_test_options,
'registered_call': default_test_options,
- 'binary_metadata': default_test_options,
- 'metadata': default_test_options,
- 'call_creds': default_test_options._replace(secure=True),
- 'payload': default_test_options,
- 'trailing_metadata': default_test_options,
- 'compressed_payload': default_test_options._replace(proxyable=False),
'request_with_flags': default_test_options._replace(proxyable=False),
- 'large_metadata': default_test_options,
'request_with_payload': default_test_options,
'server_finishes_request': default_test_options,
+ 'shutdown_finishes_calls': default_test_options,
+ 'shutdown_finishes_tags': default_test_options,
'simple_delayed_request': connectivity_test_options,
'simple_request': default_test_options,
- 'high_initial_seqno': default_test_options,
+ 'trailing_metadata': default_test_options,
}
diff --git a/test/core/end2end/tests/cancel_with_status.c b/test/core/end2end/tests/cancel_with_status.c
new file mode 100644
index 0000000000..eecfa83fa4
--- /dev/null
+++ b/test/core/end2end/tests/cancel_with_status.c
@@ -0,0 +1,183 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/byte_buffer.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "test/core/end2end/cq_verifier.h"
+
+enum { TIMEOUT = 200000 };
+
+static void *tag(gpr_intptr t) { return (void *)t; }
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char *test_name,
+ grpc_channel_args *client_args,
+ grpc_channel_args *server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
+ f = config.create_fixture(client_args, server_args);
+ config.init_client(&f, client_args);
+ config.init_server(&f, server_args);
+ return f;
+}
+
+static gpr_timespec n_seconds_time(int n) {
+ return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
+}
+
+static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture *f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000),
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
+ NULL).type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = NULL;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture *f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = NULL;
+}
+
+static void end_test(grpc_end2end_test_fixture *f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+}
+
+static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) {
+ grpc_call *c;
+ gpr_timespec deadline = five_seconds_time();
+ cq_verifier *cqv = cq_verifier_create(f.cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_status_code status;
+ grpc_call_error error;
+ char *details = NULL;
+ size_t details_capacity = 0;
+
+ gpr_log(GPR_DEBUG, "test with %d ops", num_ops);
+
+ c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ "/foo", "foo.test.google.fr:1234", deadline,
+ NULL);
+ GPR_ASSERT(c);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+
+ op = ops;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ GPR_ASSERT(num_ops <= (size_t)(op - ops));
+ error = grpc_call_start_batch(c, ops, num_ops, tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ grpc_call_cancel_with_status(c, GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL);
+
+ cq_expect_completion(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
+ GPR_ASSERT(0 == strcmp(details, "xyz"));
+
+ gpr_free(details);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+
+ grpc_call_destroy(c);
+
+ cq_verifier_destroy(cqv);
+}
+
+static void test_invoke_simple_request(grpc_end2end_test_config config, size_t num_ops) {
+ grpc_end2end_test_fixture f;
+
+ f = begin_test(config, "test_invoke_simple_request", NULL, NULL);
+ simple_request_body(f, num_ops);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+void grpc_end2end_tests(grpc_end2end_test_config config) {
+ size_t i;
+ for (i = 1; i <= 4; i++) {
+ test_invoke_simple_request(config, i);
+ }
+}
diff --git a/test/core/end2end/tests/negative_deadline.c b/test/core/end2end/tests/negative_deadline.c
new file mode 100644
index 0000000000..abcc1ba358
--- /dev/null
+++ b/test/core/end2end/tests/negative_deadline.c
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include "src/core/support/string.h"
+#include <grpc/byte_buffer.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "test/core/end2end/cq_verifier.h"
+
+enum { TIMEOUT = 200000 };
+
+static void *tag(gpr_intptr t) { return (void *)t; }
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char *test_name,
+ grpc_channel_args *client_args,
+ grpc_channel_args *server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
+ f = config.create_fixture(client_args, server_args);
+ config.init_client(&f, client_args);
+ config.init_server(&f, server_args);
+ return f;
+}
+
+static gpr_timespec n_seconds_time(int n) {
+ return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
+}
+
+static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture *f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000),
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
+ NULL).type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = NULL;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture *f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = NULL;
+}
+
+static void end_test(grpc_end2end_test_fixture *f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+}
+
+static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) {
+ grpc_call *c;
+ gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_REALTIME);
+ cq_verifier *cqv = cq_verifier_create(f.cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_status_code status;
+ grpc_call_error error;
+ char *details = NULL;
+ size_t details_capacity = 0;
+
+ gpr_log(GPR_DEBUG, "test with %d ops", num_ops);
+
+ c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ "/foo", "foo.test.google.fr:1234", deadline,
+ NULL);
+ GPR_ASSERT(c);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+
+ op = ops;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ GPR_ASSERT(num_ops <= (size_t)(op - ops));
+ error = grpc_call_start_batch(c, ops, num_ops, tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED);
+
+ gpr_free(details);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+
+ grpc_call_destroy(c);
+
+ cq_verifier_destroy(cqv);
+}
+
+static void test_invoke_simple_request(grpc_end2end_test_config config, size_t num_ops) {
+ grpc_end2end_test_fixture f;
+
+ f = begin_test(config, "test_invoke_simple_request", NULL, NULL);
+ simple_request_body(f, num_ops);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+void grpc_end2end_tests(grpc_end2end_test_config config) {
+ size_t i;
+ for (i = 1; i <= 4; i++) {
+ test_invoke_simple_request(config, i);
+ }
+}
diff --git a/test/core/network_benchmarks/low_level_ping_pong.c b/test/core/network_benchmarks/low_level_ping_pong.c
index be82a36306..7a2d894481 100644
--- a/test/core/network_benchmarks/low_level_ping_pong.c
+++ b/test/core/network_benchmarks/low_level_ping_pong.c
@@ -82,9 +82,9 @@ typedef struct thread_args {
/* Basic call to read() */
static int read_bytes(int fd, char *buf, size_t read_size, int spin) {
size_t bytes_read = 0;
- int err;
+ ssize_t err;
do {
- err = (int)read(fd, buf + bytes_read, read_size - bytes_read);
+ err = read(fd, buf + bytes_read, read_size - bytes_read);
if (err < 0) {
if (errno == EINTR) {
continue;
@@ -115,6 +115,7 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) {
struct pollfd pfd;
size_t bytes_read = 0;
int err;
+ ssize_t err2;
pfd.fd = fd;
pfd.events = POLLIN;
@@ -132,13 +133,13 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) {
GPR_ASSERT(err == 1);
GPR_ASSERT(pfd.revents == POLLIN);
do {
- err = (int)read(fd, buf + bytes_read, read_size - bytes_read);
- } while (err < 0 && errno == EINTR);
- if (err < 0 && errno != EAGAIN) {
+ err2 = read(fd, buf + bytes_read, read_size - bytes_read);
+ } while (err2 < 0 && errno == EINTR);
+ if (err2 < 0 && errno != EAGAIN) {
gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
return -1;
}
- bytes_read += (size_t)err;
+ bytes_read += (size_t) err2;
} while (bytes_read < read_size);
return 0;
}
@@ -157,6 +158,7 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) {
struct epoll_event ev;
size_t bytes_read = 0;
int err;
+ ssize_t err2;
size_t read_size = args->msg_size;
do {
@@ -172,11 +174,11 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) {
GPR_ASSERT(ev.data.fd == args->fds.read_fd);
do {
do {
- err = (int)read(args->fds.read_fd, buf + bytes_read,
- read_size - bytes_read);
- } while (err < 0 && errno == EINTR);
+ err2 = read(args->fds.read_fd, buf + bytes_read,
+ read_size - bytes_read);
+ } while (err2 < 0 && errno == EINTR);
if (errno == EAGAIN) break;
- bytes_read += (size_t)err;
+ bytes_read += (size_t) err2;
/* TODO(klempner): This should really be doing an extra call after we are
done to ensure we see an EAGAIN */
} while (bytes_read < read_size);
@@ -200,11 +202,11 @@ static int epoll_read_bytes_spin(struct thread_args *args, char *buf) {
*/
static int blocking_write_bytes(struct thread_args *args, char *buf) {
size_t bytes_written = 0;
- int err;
+ ssize_t err;
size_t write_size = args->msg_size;
do {
- err = (int)write(args->fds.write_fd, buf + bytes_written,
- write_size - bytes_written);
+ err = write(args->fds.write_fd, buf + bytes_written,
+ write_size - bytes_written);
if (err < 0) {
if (errno == EINTR) {
continue;
@@ -298,7 +300,7 @@ static void print_histogram(gpr_histogram *histogram) {
static double now(void) {
gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME);
- return 1e9 * (double)tv.tv_sec + tv.tv_nsec;
+ return 1e9 * (double)tv.tv_sec + (double)tv.tv_nsec;
}
static void client_thread(thread_args *args) {
@@ -374,7 +376,7 @@ error:
return -1;
}
-static int connect_client(struct sockaddr *addr, int len) {
+static int connect_client(struct sockaddr *addr, socklen_t len) {
int fd = socket(addr->sa_family, SOCK_STREAM, 0);
int err;
if (fd < 0) {
@@ -388,7 +390,7 @@ static int connect_client(struct sockaddr *addr, int len) {
}
do {
- err = (int)connect(fd, addr, (socklen_t)len);
+ err = connect(fd, addr, len);
} while (err < 0 && errno == EINTR);
if (err < 0) {
@@ -587,27 +589,27 @@ static int run_benchmark(char *socket_type, thread_args *client_args,
return 0;
}
-static int run_all_benchmarks(int msg_size) {
+static int run_all_benchmarks(size_t msg_size) {
int error = 0;
size_t i;
for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
- test_strategy *ts = &test_strategies[i];
+ test_strategy *strategy = &test_strategies[i];
size_t j;
for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) {
thread_args *client_args = malloc(sizeof(thread_args));
thread_args *server_args = malloc(sizeof(thread_args));
char *socket_type = socket_types[j];
- client_args->read_bytes = ts->read_strategy;
+ client_args->read_bytes = strategy->read_strategy;
client_args->write_bytes = blocking_write_bytes;
- client_args->setup = ts->setup;
- client_args->msg_size = (size_t)msg_size;
- client_args->strategy_name = ts->name;
- server_args->read_bytes = ts->read_strategy;
+ client_args->setup = strategy->setup;
+ client_args->msg_size = msg_size;
+ client_args->strategy_name = strategy->name;
+ server_args->read_bytes = strategy->read_strategy;
server_args->write_bytes = blocking_write_bytes;
- server_args->setup = ts->setup;
- server_args->msg_size = (size_t)msg_size;
- server_args->strategy_name = ts->name;
+ server_args->setup = strategy->setup;
+ server_args->msg_size = msg_size;
+ server_args->strategy_name = strategy->name;
error = run_benchmark(socket_type, client_args, server_args);
if (error < 0) {
return error;
@@ -624,7 +626,7 @@ int main(int argc, char **argv) {
char *read_strategy = NULL;
char *socket_type = NULL;
size_t i;
- const test_strategy *ts = NULL;
+ const test_strategy *strategy = NULL;
int error = 0;
gpr_cmdline *cmdline =
@@ -644,7 +646,7 @@ int main(int argc, char **argv) {
if (read_strategy == NULL) {
gpr_log(GPR_INFO, "No strategy specified, running all benchmarks");
- return run_all_benchmarks(msg_size);
+ return run_all_benchmarks((size_t)msg_size);
}
if (socket_type == NULL) {
@@ -658,22 +660,22 @@ int main(int argc, char **argv) {
for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) {
if (strcmp(test_strategies[i].name, read_strategy) == 0) {
- ts = &test_strategies[i];
+ strategy = &test_strategies[i];
}
}
- if (ts == NULL) {
+ if (strategy == NULL) {
fprintf(stderr, "Invalid read strategy %s\n", read_strategy);
return -1;
}
- client_args->read_bytes = ts->read_strategy;
+ client_args->read_bytes = strategy->read_strategy;
client_args->write_bytes = blocking_write_bytes;
- client_args->setup = ts->setup;
+ client_args->setup = strategy->setup;
client_args->msg_size = (size_t)msg_size;
client_args->strategy_name = read_strategy;
- server_args->read_bytes = ts->read_strategy;
+ server_args->read_bytes = strategy->read_strategy;
server_args->write_bytes = blocking_write_bytes;
- server_args->setup = ts->setup;
+ server_args->setup = strategy->setup;
server_args->msg_size = (size_t)msg_size;
server_args->strategy_name = read_strategy;
diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c
index 560e0ac7b2..7f9cd6b62b 100644
--- a/test/core/surface/byte_buffer_reader_test.c
+++ b/test/core/surface/byte_buffer_reader_test.c
@@ -184,6 +184,39 @@ static void test_byte_buffer_from_reader(void) {
grpc_byte_buffer_destroy(buffer_from_reader);
}
+static void test_readall(void) {
+ char* lotsa_as[512];
+ char* lotsa_bs[1024];
+ gpr_slice slices[2];
+ grpc_byte_buffer *buffer;
+ grpc_byte_buffer_reader reader;
+ gpr_slice slice_out;
+
+ LOG_TEST("test_readall");
+
+ memset(lotsa_as, 'a', 512);
+ memset(lotsa_bs, 'b', 1024);
+ /* use slices large enough to overflow inlining */
+ slices[0] = gpr_slice_malloc(512);
+ memcpy(GPR_SLICE_START_PTR(slices[0]), lotsa_as, 512);
+ slices[1] = gpr_slice_malloc(1024);
+ memcpy(GPR_SLICE_START_PTR(slices[1]), lotsa_bs, 1024);
+
+ buffer = grpc_raw_byte_buffer_create(slices, 2);
+ gpr_slice_unref(slices[0]);
+ gpr_slice_unref(slices[1]);
+
+ grpc_byte_buffer_reader_init(&reader, buffer);
+ slice_out = grpc_byte_buffer_reader_readall(&reader);
+
+ GPR_ASSERT(GPR_SLICE_LENGTH(slice_out) == 512 + 1024);
+ GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(slice_out), lotsa_as, 512) == 0);
+ GPR_ASSERT(memcmp(&(GPR_SLICE_START_PTR(slice_out)[512]), lotsa_bs, 1024) ==
+ 0);
+ gpr_slice_unref(slice_out);
+ grpc_byte_buffer_destroy(buffer);
+}
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_read_one_slice();
@@ -192,6 +225,6 @@ int main(int argc, char **argv) {
test_read_gzip_compressed_slice();
test_read_deflate_compressed_slice();
test_byte_buffer_from_reader();
-
+ test_readall();
return 0;
}
diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc
index 9c35fede8f..80057d893e 100644
--- a/test/cpp/end2end/mock_test.cc
+++ b/test/cpp/end2end/mock_test.cc
@@ -62,7 +62,7 @@ template <class W, class R>
class MockClientReaderWriter GRPC_FINAL
: public ClientReaderWriterInterface<W, R> {
public:
- void WaitForInitialMetadata() {}
+ void WaitForInitialMetadata() GRPC_OVERRIDE {}
bool Read(R* msg) GRPC_OVERRIDE { return true; }
bool Write(const W& msg) GRPC_OVERRIDE { return true; }
bool WritesDone() GRPC_OVERRIDE { return true; }
@@ -73,7 +73,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> GRPC_FINAL
: public ClientReaderWriterInterface<EchoRequest, EchoResponse> {
public:
MockClientReaderWriter() : writes_done_(false) {}
- void WaitForInitialMetadata() {}
+ void WaitForInitialMetadata() GRPC_OVERRIDE {}
bool Read(EchoResponse* msg) GRPC_OVERRIDE {
if (writes_done_) return false;
msg->set_message(last_message_);
diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc
index 411df4d32a..9fef93a70f 100644
--- a/test/cpp/qps/async_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/async_streaming_ping_pong_test.cc
@@ -35,8 +35,6 @@
#include <grpc/support/log.h>
-#include <signal.h>
-
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@@ -52,17 +50,15 @@ static void RunAsyncStreamingPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(STREAMING);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -77,7 +73,6 @@ static void RunAsyncStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncStreamingPingPong();
return 0;
}
diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc
index eda31b5744..b4ab0e5d59 100644
--- a/test/cpp/qps/async_unary_ping_pong_test.cc
+++ b/test/cpp/qps/async_unary_ping_pong_test.cc
@@ -35,8 +35,6 @@
#include <grpc/support/log.h>
-#include <signal.h>
-
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/benchmark_config.h"
@@ -52,17 +50,15 @@ static void RunAsyncUnaryPingPong() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(1);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_async_server_threads(1);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -75,7 +71,6 @@ static void RunAsyncUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunAsyncUnaryPingPong();
return 0;
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index cd8b34f65b..f4400692fe 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -40,8 +40,9 @@
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/proto/benchmarks/payloads.grpc.pb.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
@@ -75,27 +76,54 @@ class Client {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
}
- request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
- request_.set_response_size(config.payload_size());
+ if (config.payload_config().has_bytebuf_params()) {
+ GPR_ASSERT(false); // not yet implemented
+ } else if (config.payload_config().has_simple_params()) {
+ request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ request_.set_response_size(
+ config.payload_config().simple_params().resp_size());
+ request_.mutable_payload()->set_type(
+ grpc::testing::PayloadType::COMPRESSABLE);
+ int size = config.payload_config().simple_params().req_size();
+ std::unique_ptr<char[]> body(new char[size]);
+ request_.mutable_payload()->set_body(body.get(), size);
+ } else if (config.payload_config().has_complex_params()) {
+ GPR_ASSERT(false); // not yet implemented
+ } else {
+ // default should be simple proto without payloads
+ request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
+ request_.set_response_size(0);
+ request_.mutable_payload()->set_type(
+ grpc::testing::PayloadType::COMPRESSABLE);
+ }
}
virtual ~Client() {}
- ClientStats Mark() {
+ ClientStats Mark(bool reset) {
Histogram latencies;
+ Timer::Result timer_result;
+
// avoid std::vector for old compilers that expect a copy constructor
- Histogram* to_merge = new Histogram[threads_.size()];
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->BeginSwap(&to_merge[i]);
- }
- std::unique_ptr<Timer> timer(new Timer);
- timer_.swap(timer);
- for (size_t i = 0; i < threads_.size(); i++) {
- threads_[i]->EndSwap();
- latencies.Merge(&to_merge[i]);
+ if (reset) {
+ Histogram* to_merge = new Histogram[threads_.size()];
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->BeginSwap(&to_merge[i]);
+ }
+ std::unique_ptr<Timer> timer(new Timer);
+ timer_.swap(timer);
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->EndSwap();
+ latencies.Merge(to_merge[i]);
+ }
+ delete[] to_merge;
+ timer_result = timer->Mark();
+ } else {
+ // merge snapshots of each thread histogram
+ for (size_t i = 0; i < threads_.size(); i++) {
+ threads_[i]->MergeStatsInto(&latencies);
+ }
+ timer_result = timer_->Mark();
}
- delete[] to_merge;
-
- auto timer_result = timer->Mark();
ClientStats stats;
latencies.FillProto(stats.mutable_latencies());
@@ -122,15 +150,18 @@ class Client {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
- channel_ = CreateTestChannel(target, config.enable_ssl());
- stub_ = TestService::NewStub(channel_);
+ channel_ = CreateTestChannel(
+ target, config.security_params().server_host_override(),
+ config.has_security_params(),
+ !config.security_params().use_test_ca());
+ stub_ = BenchmarkService::NewStub(channel_);
}
Channel* get_channel() { return channel_.get(); }
- TestService::Stub* get_stub() { return stub_.get(); }
+ BenchmarkService::Stub* get_stub() { return stub_.get(); }
private:
std::shared_ptr<Channel> channel_;
- std::unique_ptr<TestService::Stub> stub_;
+ std::unique_ptr<BenchmarkService::Stub> stub_;
};
std::vector<ClientChannelInfo> channels_;
@@ -146,37 +177,41 @@ class Client {
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
- if (config.load_type() == CLOSED_LOOP) {
+ const auto& load = config.load_params();
+
+ std::unique_ptr<RandomDist> random_dist;
+ switch (load.load_case()) {
+ case LoadParams::kClosedLoop:
+ // Closed-loop doesn't use random dist at all
+ break;
+ case LoadParams::kPoisson:
+ random_dist.reset(
+ new ExpDist(load.poisson().offered_load() / num_threads));
+ break;
+ case LoadParams::kUniform:
+ random_dist.reset(
+ new UniformDist(load.uniform().interarrival_lo() * num_threads,
+ load.uniform().interarrival_hi() * num_threads));
+ break;
+ case LoadParams::kDeterm:
+ random_dist.reset(
+ new DetDist(num_threads / load.determ().offered_load()));
+ break;
+ case LoadParams::kPareto:
+ random_dist.reset(
+ new ParetoDist(load.pareto().interarrival_base() * num_threads,
+ load.pareto().alpha()));
+ break;
+ default:
+ GPR_ASSERT(false);
+ }
+
+ // Set closed_loop_ based on whether or not random_dist is set
+ if (!random_dist) {
closed_loop_ = true;
} else {
closed_loop_ = false;
-
- std::unique_ptr<RandomDist> random_dist;
- const auto& load = config.load_params();
- switch (config.load_type()) {
- case POISSON:
- random_dist.reset(
- new ExpDist(load.poisson().offered_load() / num_threads));
- break;
- case UNIFORM:
- random_dist.reset(
- new UniformDist(load.uniform().interarrival_lo() * num_threads,
- load.uniform().interarrival_hi() * num_threads));
- break;
- case DETERMINISTIC:
- random_dist.reset(
- new DetDist(num_threads / load.determ().offered_load()));
- break;
- case PARETO:
- random_dist.reset(
- new ParetoDist(load.pareto().interarrival_base() * num_threads,
- load.pareto().alpha()));
- break;
- default:
- GPR_ASSERT(false);
- break;
- }
-
+ // set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
for (size_t i = 0; i < num_threads; i++) {
next_time_.push_back(
@@ -204,7 +239,7 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_(nullptr),
+ new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -219,16 +254,21 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_ = n;
+ new_stats_ = n;
}
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
- while (new_ != nullptr) {
+ while (new_stats_ != nullptr) {
cv_.wait(g);
};
}
+ void MergeStatsInto(Histogram* hist) {
+ std::unique_lock<std::mutex> g(mu_);
+ hist->Merge(histogram_);
+ }
+
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
@@ -246,21 +286,21 @@ class Client {
if (done_) {
return;
}
- // check if we're marking, swap out the histogram if so
- if (new_) {
- new_->Swap(&histogram_);
- new_ = nullptr;
+ // check if we're resetting stats, swap out the histogram if so
+ if (new_stats_) {
+ new_stats_->Swap(&histogram_);
+ new_stats_ = nullptr;
cv_.notify_one();
}
}
}
- TestService::Stub* stub_;
+ BenchmarkService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
std::condition_variable cv_;
bool done_;
- Histogram* new_;
+ Histogram* new_stats_;
Histogram histogram_;
Client* client_;
size_t idx_;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ed42b7db6..9594179822 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -48,10 +48,10 @@
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/timer.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
@@ -88,10 +88,10 @@ template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(
- int channel_id, TestService::Stub* stub, const RequestType& req,
+ int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
- TestService::Stub*, grpc::ClientContext*, const RequestType&,
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
@@ -131,13 +131,13 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
return true; // we're done, this'll be ignored
}
grpc::ClientContext context_;
- TestService::Stub* stub_;
+ BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
- TestService::Stub*, grpc::ClientContext*, const RequestType&,
+ BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req_;
grpc::Status status_;
double start_;
@@ -151,7 +151,7 @@ class AsyncClient : public Client {
public:
explicit AsyncClient(
const ClientConfig& config,
- std::function<ClientRpcContext*(int, TestService::Stub*,
+ std::function<ClientRpcContext*(int, BenchmarkService::Stub*,
const SimpleRequest&)> setup_ctx)
: Client(config),
channel_lock_(new std::mutex[config.client_channels()]),
@@ -354,11 +354,12 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
- StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
+ StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
- static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
+ static ClientRpcContext* SetupCtx(int channel_id,
+ BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncUnaryClient::StartReq,
@@ -370,10 +371,11 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextStreamingImpl(
- int channel_id, TestService::Stub* stub, const RequestType& req,
- std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
- RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*,
- CompletionQueue*, void*)> start_req,
+ int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<std::unique_ptr<
+ grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
+ BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
+ void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
: ClientRpcContext(channel_id),
context_(),
@@ -420,15 +422,15 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return StartWrite(ok);
}
grpc::ClientContext context_;
- TestService::Stub* stub_;
+ BenchmarkService::Stub* stub_;
RequestType req_;
ResponseType response_;
bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
std::function<void(grpc::Status, ResponseType*)> callback_;
std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
- TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
- start_req_;
+ BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
+ void*)> start_req_;
grpc::Status status_;
double start_;
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
@@ -439,8 +441,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
public:
explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx) {
- // async streaming currently only supported closed loop
- GPR_ASSERT(config.load_type() == CLOSED_LOOP);
+ // async streaming currently only supports closed loop
+ GPR_ASSERT(closed_loop_);
StartThreads(config.async_client_threads());
}
@@ -451,12 +453,13 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
- StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
+ StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
- static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
+ static ClientRpcContext* SetupCtx(int channel_id,
+ BenchmarkService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, AsyncStreamingClient::StartReq,
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index ed4134c743..10d680860a 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -54,10 +54,10 @@
#include "test/cpp/util/create_test_channel.h"
#include "test/cpp/qps/client.h"
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/timer.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
#include "src/core/profiling/timers.h"
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index dd5c4f4f73..2c6247deea 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -48,6 +48,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
using std::list;
using std::thread;
@@ -91,12 +92,12 @@ static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
}
struct ServerData {
- unique_ptr<Worker::Stub> stub;
+ unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
struct ClientData {
- unique_ptr<Worker::Stub> stub;
+ unique_ptr<WorkerService::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
} // namespace runsc
@@ -131,8 +132,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
int driver_port = grpc_pick_unused_port_or_die();
- int benchmark_port = grpc_pick_unused_port_or_die();
- local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
+ local_workers.emplace_back(new QpsWorker(driver_port));
char addr[256];
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {
@@ -161,11 +161,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
- servers[i].stub =
- Worker::NewStub(CreateChannel(workers[i], InsecureCredentials()));
+ servers[i].stub = WorkerService::NewStub(
+ CreateChannel(workers[i], InsecureCredentials()));
ServerArgs args;
result_server_config = server_config;
- result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
@@ -189,14 +188,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
- clients[i].stub = Worker::NewStub(
+ clients[i].stub = WorkerService::NewStub(
CreateChannel(workers[i + num_servers], InsecureCredentials()));
ClientArgs args;
result_client_config = client_config;
- result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
clients[i].stream =
- clients[i].stub->RunTest(runsc::AllocContext(&contexts, deadline));
+ clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline));
GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status;
GPR_ASSERT(clients[i].stream->Read(&init_status));
@@ -211,9 +209,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
// Start a run
gpr_log(GPR_INFO, "Starting");
ServerArgs server_mark;
- server_mark.mutable_mark();
+ server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
- client_mark.mutable_mark();
+ client_mark.mutable_mark()->set_reset(true);
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
@@ -251,14 +249,15 @@ std::unique_ptr<ScenarioResult> RunScenario(
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result->server_resources.emplace_back(
- stats.time_elapsed(), stats.time_user(), stats.time_system());
+ stats.time_elapsed(), stats.time_user(), stats.time_system(),
+ server_status.cores());
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies());
result->client_resources.emplace_back(
- stats.time_elapsed(), stats.time_user(), stats.time_system());
+ stats.time_elapsed(), stats.time_user(), stats.time_system(), -1);
}
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 6116aa656a..50bf17ceab 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -37,22 +37,24 @@
#include <memory>
#include "test/cpp/qps/histogram.h"
-#include "test/proto/qpstest.grpc.pb.h"
+#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class ResourceUsage {
public:
- ResourceUsage(double w, double u, double s)
- : wall_time_(w), user_time_(u), system_time_(s) {}
+ ResourceUsage(double w, double u, double s, int c)
+ : wall_time_(w), user_time_(u), system_time_(s), cores_(c) {}
double wall_time() const { return wall_time_; }
double user_time() const { return user_time_; }
double system_time() const { return system_time_; }
+ int cores() const { return cores_; }
private:
double wall_time_;
double user_time_;
double system_time_;
+ int cores_;
};
struct ScenarioResult {
diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h
index 1151cca87c..35527d2a2c 100644
--- a/test/cpp/qps/histogram.h
+++ b/test/cpp/qps/histogram.h
@@ -35,7 +35,7 @@
#define TEST_QPS_HISTOGRAM_H
#include <grpc/support/histogram.h>
-#include "test/proto/qpstest.grpc.pb.h"
+#include "test/proto/benchmarks/stats.grpc.pb.h"
namespace grpc {
namespace testing {
@@ -48,7 +48,7 @@ class Histogram {
}
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
- void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); }
+ void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); }
void Add(double value) { gpr_histogram_add(impl_, value); }
double Percentile(double pctile) const {
return gpr_histogram_percentile(impl_, pctile);
diff --git a/test/cpp/qps/perf_db.proto b/test/cpp/qps/perf_db.proto
index 7ae5cfe86e..8a691ddded 100644
--- a/test/cpp/qps/perf_db.proto
+++ b/test/cpp/qps/perf_db.proto
@@ -29,7 +29,7 @@
syntax = "proto3";
-import "test/proto/qpstest.proto";
+import "test/proto/benchmarks/control.proto";
package grpc.testing;
diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh
index cb93201933..36ea974812 100755
--- a/test/cpp/qps/qps-sweep.sh
+++ b/test/cpp/qps/qps-sweep.sh
@@ -37,17 +37,21 @@ fi
bins=`find . .. ../.. ../../.. -name bins | head -1`
-for channels in 1 2 4 8
+for secure in true false
do
- for client in SYNCHRONOUS_CLIENT ASYNC_CLIENT
+ for channels in 1 2 4 8
do
- for server in SYNCHRONOUS_SERVER ASYNC_SERVER
+ for client in SYNC_CLIENT ASYNC_CLIENT
do
- for rpc in UNARY STREAMING
+ for server in SYNC_SERVER ASYNC_SERVER
do
- echo "Test $rpc $client $server , $channels channels"
- "$bins"/opt/qps_driver --rpc_type=$rpc \
- --client_type=$client --server_type=$server
+ for rpc in UNARY STREAMING
+ do
+ echo "Test $rpc $client $server, $channels channels, secure=$secure"
+ "$bins"/opt/qps_driver --rpc_type=$rpc \
+ --client_type=$client --server_type=$server \
+ --secure_test=$secure
+ done
done
done
done
diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc
index b1463be8f6..4c93a042cf 100644
--- a/test/cpp/qps/qps_driver.cc
+++ b/test/cpp/qps/qps_driver.cc
@@ -33,7 +33,6 @@
#include <memory>
#include <set>
-#include <signal.h>
#include <gflags/gflags.h>
#include <grpc/support/log.h>
@@ -50,31 +49,39 @@ DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)");
DEFINE_int32(local_workers, 0, "Number of local workers to start");
// Common config
-DEFINE_bool(enable_ssl, false, "Use SSL");
DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING");
// Server config
-DEFINE_int32(server_threads, 1, "Number of server threads");
-DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type");
+DEFINE_int32(async_server_threads, 1, "Number of threads for async servers");
+DEFINE_string(server_type, "SYNC_SERVER", "Server type");
// Client config
DEFINE_int32(outstanding_rpcs_per_channel, 1,
"Number of outstanding rpcs per channel");
DEFINE_int32(client_channels, 1, "Number of client channels");
-DEFINE_int32(payload_size, 1, "Payload size");
-DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type");
+
+DEFINE_int32(simple_req_size, -1, "Simple proto request payload size");
+DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size");
+
+DEFINE_string(client_type, "SYNC_CLIENT", "Client type");
DEFINE_int32(async_client_threads, 1, "Async client threads");
-DEFINE_string(load_type, "CLOSED_LOOP", "Load type");
-DEFINE_double(load_param_1, 0.0, "Load parameter 1");
-DEFINE_double(load_param_2, 0.0, "Load parameter 2");
+
+DEFINE_double(poisson_load, -1.0, "Poisson offered load (qps)");
+DEFINE_double(uniform_lo, -1.0, "Uniform low interarrival time (us)");
+DEFINE_double(uniform_hi, -1.0, "Uniform high interarrival time (us)");
+DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)");
+DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)");
+DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value");
+
+DEFINE_bool(secure_test, false, "Run a secure test");
using grpc::testing::ClientConfig;
using grpc::testing::ServerConfig;
using grpc::testing::ClientType;
using grpc::testing::ServerType;
-using grpc::testing::LoadType;
using grpc::testing::RpcType;
using grpc::testing::ResourceUsage;
+using grpc::testing::SecurityParams;
namespace grpc {
namespace testing {
@@ -85,72 +92,63 @@ static void QpsDriver() {
ClientType client_type;
ServerType server_type;
- LoadType load_type;
GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type));
GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type));
- GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type));
ClientConfig client_config;
client_config.set_client_type(client_type);
- client_config.set_load_type(load_type);
- client_config.set_enable_ssl(FLAGS_enable_ssl);
client_config.set_outstanding_rpcs_per_channel(
FLAGS_outstanding_rpcs_per_channel);
client_config.set_client_channels(FLAGS_client_channels);
- client_config.set_payload_size(FLAGS_payload_size);
+
+ // Decide which type to use based on the response type
+ if (FLAGS_simple_resp_size >= 0) {
+ auto params =
+ client_config.mutable_payload_config()->mutable_simple_params();
+ params->set_resp_size(FLAGS_simple_resp_size);
+ if (FLAGS_simple_req_size >= 0) {
+ params->set_req_size(FLAGS_simple_req_size);
+ }
+ } else {
+ // set a reasonable default: proto but no payload
+ client_config.mutable_payload_config()->mutable_simple_params();
+ }
+
client_config.set_async_client_threads(FLAGS_async_client_threads);
client_config.set_rpc_type(rpc_type);
// set up the load parameters
- switch (load_type) {
- case grpc::testing::CLOSED_LOOP:
- break;
- case grpc::testing::POISSON: {
- auto poisson = client_config.mutable_load_params()->mutable_poisson();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- poisson->set_offered_load(FLAGS_load_param_1);
- break;
- }
- case grpc::testing::UNIFORM: {
- auto uniform = client_config.mutable_load_params()->mutable_uniform();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- GPR_ASSERT(FLAGS_load_param_2 != 0.0);
- uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6);
- uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6);
- break;
- }
- case grpc::testing::DETERMINISTIC: {
- auto determ = client_config.mutable_load_params()->mutable_determ();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- determ->set_offered_load(FLAGS_load_param_1);
- break;
- }
- case grpc::testing::PARETO: {
- auto pareto = client_config.mutable_load_params()->mutable_pareto();
- GPR_ASSERT(FLAGS_load_param_1 != 0.0);
- GPR_ASSERT(FLAGS_load_param_2 != 0.0);
- pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6);
- pareto->set_alpha(FLAGS_load_param_2);
- break;
- }
- default:
- GPR_ASSERT(false);
- break;
+ if (FLAGS_poisson_load > 0.0) {
+ auto poisson = client_config.mutable_load_params()->mutable_poisson();
+ poisson->set_offered_load(FLAGS_poisson_load);
+ } else if (FLAGS_uniform_lo > 0.0) {
+ auto uniform = client_config.mutable_load_params()->mutable_uniform();
+ uniform->set_interarrival_lo(FLAGS_uniform_lo / 1e6);
+ uniform->set_interarrival_hi(FLAGS_uniform_hi / 1e6);
+ } else if (FLAGS_determ_load > 0.0) {
+ auto determ = client_config.mutable_load_params()->mutable_determ();
+ determ->set_offered_load(FLAGS_determ_load);
+ } else if (FLAGS_pareto_base > 0.0) {
+ auto pareto = client_config.mutable_load_params()->mutable_pareto();
+ pareto->set_interarrival_base(FLAGS_pareto_base / 1e6);
+ pareto->set_alpha(FLAGS_pareto_alpha);
+ } else {
+ client_config.mutable_load_params()->mutable_closed_loop();
+ // No further load parameters to set up for closed loop
}
ServerConfig server_config;
server_config.set_server_type(server_type);
- server_config.set_threads(FLAGS_server_threads);
- server_config.set_enable_ssl(FLAGS_enable_ssl);
-
- // If we're running a sync-server streaming test, make sure
- // that we have at least as many threads as the active streams
- // or else threads will be blocked from forward progress and the
- // client will deadlock on a timer.
- GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER &&
- rpc_type == grpc::testing::STREAMING &&
- FLAGS_server_threads <
- FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel));
+ server_config.set_async_server_threads(FLAGS_async_server_threads);
+
+ if (FLAGS_secure_test) {
+ // Set up security params
+ SecurityParams security;
+ security.set_use_test_ca(true);
+ security.set_server_host_override("foo.test.google.fr");
+ client_config.mutable_security_params()->CopyFrom(security);
+ server_config.mutable_security_params()->CopyFrom(security);
+ }
const auto result = RunScenario(
client_config, FLAGS_num_clients, server_config, FLAGS_num_servers,
@@ -168,7 +166,6 @@ static void QpsDriver() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::QpsDriver();
return 0;
diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc
index a7979e6187..ccda28f09a 100644
--- a/test/cpp/qps/qps_interarrival_test.cc
+++ b/test/cpp/qps/qps_interarrival_test.cc
@@ -42,7 +42,7 @@
using grpc::testing::RandomDist;
using grpc::testing::InterarrivalTimer;
-void RunTest(RandomDist &&r, int threads, std::string title) {
+static void RunTest(RandomDist &&r, int threads, std::string title) {
InterarrivalTimer timer;
timer.init(r, threads);
gpr_histogram *h(gpr_histogram_create(0.01, 60e9));
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index 5a6a9249a9..dc88c893bb 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -52,20 +50,16 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
- client_config.set_load_type(POISSON);
client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
1000.0);
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(4);
+ server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -80,7 +74,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index d0c4a79cd9..89b35cfb05 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -52,17 +50,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(8);
+ server_config.set_async_server_threads(8);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -77,7 +73,6 @@ static void RunQPS() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;
diff --git a/test/cpp/qps/qps_test_with_poll.cc b/test/cpp/qps/qps_test_with_poll.cc
index 31d2c1bf7b..97da4096ed 100644
--- a/test/cpp/qps/qps_test_with_poll.cc
+++ b/test/cpp/qps/qps_test_with_poll.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -56,17 +54,15 @@ static void RunQPS() {
ClientConfig client_config;
client_config.set_client_type(ASYNC_CLIENT);
- client_config.set_enable_ssl(false);
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
- client_config.set_payload_size(1);
client_config.set_async_client_threads(8);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(4);
+ server_config.set_async_server_threads(4);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -83,7 +79,6 @@ int main(int argc, char** argv) {
grpc_platform_become_multipoller = grpc_poll_become_multipoller;
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunQPS();
return 0;
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 4ce77f366d..dc59eab7ef 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -52,17 +52,17 @@
#include <grpc++/security/server_credentials.h>
#include "test/core/util/grpc_profiler.h"
-#include "test/proto/qpstest.pb.h"
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/proto/benchmarks/services.pb.h"
namespace grpc {
namespace testing {
-std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
+static std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
switch (config.client_type()) {
- case ClientType::SYNCHRONOUS_CLIENT:
+ case ClientType::SYNC_CLIENT:
return (config.rpc_type() == RpcType::UNARY)
? CreateSynchronousUnaryClient(config)
: CreateSynchronousStreamingClient(config);
@@ -76,26 +76,29 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
abort();
}
-std::unique_ptr<Server> CreateServer(const ServerConfig& config,
- int server_port) {
+static void LimitCores(int cores) {}
+
+static std::unique_ptr<Server> CreateServer(const ServerConfig& config) {
+ if (config.core_limit() > 0) {
+ LimitCores(config.core_limit());
+ }
switch (config.server_type()) {
- case ServerType::SYNCHRONOUS_SERVER:
- return CreateSynchronousServer(config, server_port);
+ case ServerType::SYNC_SERVER:
+ return CreateSynchronousServer(config);
case ServerType::ASYNC_SERVER:
- return CreateAsyncServer(config, server_port);
+ return CreateAsyncServer(config);
default:
abort();
}
abort();
}
-class WorkerImpl GRPC_FINAL : public Worker::Service {
+class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
public:
- explicit WorkerImpl(int server_port)
- : server_port_(server_port), acquired_(false) {}
+ explicit WorkerServiceImpl() : acquired_(false) {}
- Status RunTest(ServerContext* ctx,
- ServerReaderWriter<ClientStatus, ClientArgs>* stream)
+ Status RunClient(ServerContext* ctx,
+ ServerReaderWriter<ClientStatus, ClientArgs>* stream)
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
@@ -103,7 +106,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
}
grpc_profiler_start("qps_client.prof");
- Status ret = RunTestBody(ctx, stream);
+ Status ret = RunClientBody(ctx, stream);
grpc_profiler_stop();
return ret;
}
@@ -126,7 +129,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
// Protect against multiple clients using this worker at once.
class InstanceGuard {
public:
- InstanceGuard(WorkerImpl* impl)
+ InstanceGuard(WorkerServiceImpl* impl)
: impl_(impl), acquired_(impl->TryAcquireInstance()) {}
~InstanceGuard() {
if (acquired_) {
@@ -137,7 +140,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
bool Acquired() const { return acquired_; }
private:
- WorkerImpl* const impl_;
+ WorkerServiceImpl* const impl_;
const bool acquired_;
};
@@ -154,8 +157,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
acquired_ = false;
}
- Status RunTestBody(ServerContext* ctx,
- ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
+ Status RunClientBody(ServerContext* ctx,
+ ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
return Status(StatusCode::INVALID_ARGUMENT, "");
@@ -175,7 +178,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
- *status.mutable_stats() = client->Mark();
+ *status.mutable_stats() = client->Mark(args.mark().reset());
stream->Write(status);
}
@@ -191,12 +194,13 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_setup()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
- auto server = CreateServer(args.setup(), server_port_);
+ auto server = CreateServer(args.setup());
if (!server) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
ServerStatus status;
- status.set_port(server_port_);
+ status.set_port(server->port());
+ status.set_cores(server->cores());
if (!stream->Write(status)) {
return Status(StatusCode::UNKNOWN, "");
}
@@ -204,21 +208,19 @@ class WorkerImpl GRPC_FINAL : public Worker::Service {
if (!args.has_mark()) {
return Status(StatusCode::INVALID_ARGUMENT, "");
}
- *status.mutable_stats() = server->Mark();
+ *status.mutable_stats() = server->Mark(args.mark().reset());
stream->Write(status);
}
return Status::OK;
}
- const int server_port_;
-
std::mutex mu_;
bool acquired_;
};
-QpsWorker::QpsWorker(int driver_port, int server_port) {
- impl_.reset(new WorkerImpl(server_port));
+QpsWorker::QpsWorker(int driver_port) {
+ impl_.reset(new WorkerServiceImpl());
char* server_address = NULL;
gpr_join_host_port(&server_address, "::", driver_port);
diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h
index 861588907e..0db88ad3d1 100644
--- a/test/cpp/qps/qps_worker.h
+++ b/test/cpp/qps/qps_worker.h
@@ -42,15 +42,15 @@ class Server;
namespace testing {
-class WorkerImpl;
+class WorkerServiceImpl;
class QpsWorker {
public:
- QpsWorker(int driver_port, int server_port);
+ explicit QpsWorker(int driver_port);
~QpsWorker();
private:
- std::unique_ptr<WorkerImpl> impl_;
+ std::unique_ptr<WorkerServiceImpl> impl_;
std::unique_ptr<Server> server_;
};
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index e03e8e1fb0..b230eb441e 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -43,6 +43,7 @@ namespace testing {
static double WallTime(ResourceUsage u) { return u.wall_time(); }
static double UserTime(ResourceUsage u) { return u.user_time(); }
static double SystemTime(ResourceUsage u) { return u.system_time(); }
+static int Cores(ResourceUsage u) { return u.cores(); }
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter));
@@ -83,7 +84,7 @@ void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
result.latencies.Count() / average(result.client_resources, WallTime);
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
- qps / result.server_config.threads());
+ qps / sum(result.server_resources, Cores));
}
void GprLogReporter::ReportLatency(const ScenarioResult& result) {
@@ -123,10 +124,10 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
- auto qpsPerCore = qps / result.server_config.threads();
+ auto qps_per_core = qps / sum(result.server_resources, Cores);
perf_db_client_.setQps(qps);
- perf_db_client_.setQpsPerCore(qpsPerCore);
+ perf_db_client_.setQpsPerCore(qps_per_core);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 00d12369d5..78779231d3 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -41,7 +41,6 @@
#include <grpc++/support/config.h>
#include "test/cpp/qps/driver.h"
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/perf_db_client.h"
namespace grpc {
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
new file mode 100644
index 0000000000..df06f7e471
--- /dev/null
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <set>
+
+#include <grpc/support/log.h>
+
+#include "test/cpp/qps/driver.h"
+#include "test/cpp/qps/report.h"
+#include "test/cpp/util/benchmark_config.h"
+
+namespace grpc {
+namespace testing {
+
+static const int WARMUP = 5;
+static const int BENCHMARK = 10;
+
+static void RunSynchronousUnaryPingPong() {
+ gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
+
+ ClientConfig client_config;
+ client_config.set_client_type(SYNC_CLIENT);
+ client_config.set_outstanding_rpcs_per_channel(1);
+ client_config.set_client_channels(1);
+ client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
+
+ ServerConfig server_config;
+ server_config.set_server_type(SYNC_SERVER);
+
+ // Set up security params
+ SecurityParams security;
+ security.set_use_test_ca(true);
+ security.set_server_host_override("foo.test.google.fr");
+ client_config.mutable_security_params()->CopyFrom(security);
+ server_config.mutable_security_params()->CopyFrom(security);
+
+ const auto result =
+ RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
+
+ GetReporter()->ReportQPS(*result);
+ GetReporter()->ReportLatency(*result);
+}
+
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc::testing::InitBenchmark(&argc, &argv, true);
+
+ grpc::testing::RunSynchronousUnaryPingPong();
+
+ return 0;
+}
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index e48e873dc3..6e81edc8ff 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -34,22 +34,38 @@
#ifndef TEST_QPS_SERVER_H
#define TEST_QPS_SERVER_H
+#include <grpc/support/cpu.h>
+#include <grpc++/security/server_credentials.h>
+
+#include "test/core/end2end/data/ssl_test_data.h"
+#include "test/core/util/port.h"
#include "test/cpp/qps/timer.h"
-#include "test/proto/qpstest.grpc.pb.h"
+#include "test/proto/messages.grpc.pb.h"
+#include "test/proto/benchmarks/control.grpc.pb.h"
namespace grpc {
namespace testing {
class Server {
public:
- Server() : timer_(new Timer) {}
+ explicit Server(const ServerConfig& config) : timer_(new Timer) {
+ if (config.port()) {
+ port_ = config.port();
+ } else {
+ port_ = grpc_pick_unused_port_or_die();
+ }
+ }
virtual ~Server() {}
- ServerStats Mark() {
- std::unique_ptr<Timer> timer(new Timer);
- timer.swap(timer_);
-
- auto timer_result = timer->Mark();
+ ServerStats Mark(bool reset) {
+ Timer::Result timer_result;
+ if (reset) {
+ std::unique_ptr<Timer> timer(new Timer);
+ timer.swap(timer_);
+ timer_result = timer->Mark();
+ } else {
+ timer_result = timer_->Mark();
+ }
ServerStats stats;
stats.set_time_elapsed(timer_result.wall);
@@ -70,13 +86,29 @@ class Server {
return true;
}
+ int port() const { return port_; }
+ int cores() const { return gpr_cpu_num_cores(); }
+ static std::shared_ptr<ServerCredentials> CreateServerCredentials(
+ const ServerConfig& config) {
+ if (config.has_security_params()) {
+ SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
+ test_server1_cert};
+ SslServerCredentialsOptions ssl_opts;
+ ssl_opts.pem_root_certs = "";
+ ssl_opts.pem_key_cert_pairs.push_back(pkcp);
+ return SslServerCredentials(ssl_opts);
+ } else {
+ return InsecureServerCredentials();
+ }
+ }
+
private:
+ int port_;
std::unique_ptr<Timer> timer_;
};
-std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config,
- int port);
-std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port);
+std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config);
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 98fa9c53e2..2d922fa615 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -49,38 +49,40 @@
#include <grpc++/security/server_credentials.h>
#include <gtest/gtest.h>
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest(const ServerConfig &config, int port) {
+ explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) {
char *server_address = NULL;
- gpr_join_host_port(&server_address, "::", port);
+
+ gpr_join_host_port(&server_address, "::", port());
ServerBuilder builder;
- builder.AddListeningPort(server_address, InsecureServerCredentials());
+ builder.AddListeningPort(server_address,
+ Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterAsyncService(&async_service_);
- for (int i = 0; i < config.threads(); i++) {
+ for (int i = 0; i < config.async_server_threads(); i++) {
srv_cqs_.emplace_back(builder.AddCompletionQueue());
}
server_ = builder.BuildAndStart();
using namespace std::placeholders;
- for (int i = 0; i < 10000 / config.threads(); i++) {
- for (int j = 0; j < config.threads(); j++) {
+ for (int i = 0; i < 10000 / config.async_server_threads(); i++) {
+ for (int j = 0; j < config.async_server_threads(); j++) {
auto request_unary = std::bind(
- &TestService::AsyncService::RequestUnaryCall, &async_service_, _1,
- _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
+ &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_,
+ _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4);
auto request_streaming = std::bind(
- &TestService::AsyncService::RequestStreamingCall, &async_service_,
- _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
+ &BenchmarkService::AsyncService::RequestStreamingCall,
+ &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3);
contexts_.push_front(
new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
request_unary, ProcessRPC));
@@ -89,10 +91,10 @@ class AsyncQpsServerTest : public Server {
request_streaming, ProcessRPC));
}
}
- for (int i = 0; i < config.threads(); i++) {
+ for (int i = 0; i < config.async_server_threads(); i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
- for (int i = 0; i < config.threads(); i++) {
+ for (int i = 0; i < config.async_server_threads(); i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
@@ -309,7 +311,7 @@ class AsyncQpsServerTest : public Server {
std::vector<std::thread> threads_;
std::unique_ptr<grpc::Server> server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
- TestService::AsyncService async_service_;
+ BenchmarkService::AsyncService async_service_;
std::forward_list<ServerRpcContext *> contexts_;
class PerThreadShutdownState {
@@ -333,9 +335,8 @@ class AsyncQpsServerTest : public Server {
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
-std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,
- int port) {
- return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port));
+std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) {
+ return std::unique_ptr<Server>(new AsyncQpsServerTest(config));
}
} // namespace testing
diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc
index b760ef63ec..a09b174b7e 100644
--- a/test/cpp/qps/server_sync.cc
+++ b/test/cpp/qps/server_sync.cc
@@ -43,14 +43,14 @@
#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
-#include "test/proto/qpstest.grpc.pb.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/qps/timer.h"
+#include "test/proto/benchmarks/services.grpc.pb.h"
namespace grpc {
namespace testing {
-class TestServiceImpl GRPC_FINAL : public TestService::Service {
+class BenchmarkServiceImpl GRPC_FINAL : public BenchmarkService::Service {
public:
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) GRPC_OVERRIDE {
@@ -84,30 +84,29 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service {
class SynchronousServer GRPC_FINAL : public grpc::testing::Server {
public:
- SynchronousServer(const ServerConfig& config, int port)
- : impl_(MakeImpl(port)) {}
-
- private:
- std::unique_ptr<grpc::Server> MakeImpl(int port) {
+ explicit SynchronousServer(const ServerConfig& config) : Server(config) {
ServerBuilder builder;
char* server_address = NULL;
- gpr_join_host_port(&server_address, "::", port);
- builder.AddListeningPort(server_address, InsecureServerCredentials());
+
+ gpr_join_host_port(&server_address, "::", port());
+ builder.AddListeningPort(server_address,
+ Server::CreateServerCredentials(config));
gpr_free(server_address);
builder.RegisterService(&service_);
- return builder.BuildAndStart();
+ impl_ = builder.BuildAndStart();
}
- TestServiceImpl service_;
+ private:
+ BenchmarkServiceImpl service_;
std::unique_ptr<grpc::Server> impl_;
};
std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
- const ServerConfig& config, int port) {
- return std::unique_ptr<Server>(new SynchronousServer(config, port));
+ const ServerConfig& config) {
+ return std::unique_ptr<Server>(new SynchronousServer(config));
}
} // namespace testing
diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh
index 9d76f08f80..f5356f1834 100755
--- a/test/cpp/qps/single_run_localhost.sh
+++ b/test/cpp/qps/single_run_localhost.sh
@@ -42,9 +42,9 @@ NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count()
make CONFIG=$config qps_worker qps_driver -j$NUMCPUS
-bins/$config/qps_worker -driver_port 10000 -server_port 10001 &
+bins/$config/qps_worker -driver_port 10000 &
PID1=$!
-bins/$config/qps_worker -driver_port 10010 -server_port 10011 &
+bins/$config/qps_worker -driver_port 10010 &
PID2=$!
export QPS_WORKERS="localhost:10000,localhost:10010"
diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc
index 52e43939a8..186afc03f7 100644
--- a/test/cpp/qps/sync_streaming_ping_pong_test.cc
+++ b/test/cpp/qps/sync_streaming_ping_pong_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -51,17 +49,14 @@ static void RunSynchronousStreamingPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong");
ClientConfig client_config;
- client_config.set_client_type(SYNCHRONOUS_CLIENT);
- client_config.set_enable_ssl(false);
+ client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_rpc_type(STREAMING);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
- server_config.set_server_type(SYNCHRONOUS_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -75,7 +70,6 @@ static void RunSynchronousStreamingPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousStreamingPingPong();
return 0;
diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc
index fbd21357aa..25851833a6 100644
--- a/test/cpp/qps/sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/sync_unary_ping_pong_test.cc
@@ -31,8 +31,6 @@
*
*/
-#include <signal.h>
-
#include <set>
#include <grpc/support/log.h>
@@ -51,17 +49,14 @@ static void RunSynchronousUnaryPingPong() {
gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong");
ClientConfig client_config;
- client_config.set_client_type(SYNCHRONOUS_CLIENT);
- client_config.set_enable_ssl(false);
+ client_config.set_client_type(SYNC_CLIENT);
client_config.set_outstanding_rpcs_per_channel(1);
client_config.set_client_channels(1);
- client_config.set_payload_size(1);
client_config.set_rpc_type(UNARY);
+ client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
- server_config.set_server_type(SYNCHRONOUS_SERVER);
- server_config.set_enable_ssl(false);
- server_config.set_threads(1);
+ server_config.set_server_type(SYNC_SERVER);
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
@@ -76,7 +71,6 @@ static void RunSynchronousUnaryPingPong() {
int main(int argc, char** argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- signal(SIGPIPE, SIG_IGN);
grpc::testing::RunSynchronousUnaryPingPong();
return 0;
diff --git a/test/cpp/qps/timer.cc b/test/cpp/qps/timer.cc
index 8edb838da3..3ec7f49f83 100644
--- a/test/cpp/qps/timer.cc
+++ b/test/cpp/qps/timer.cc
@@ -61,7 +61,7 @@ Timer::Result Timer::Sample() {
return r;
}
-Timer::Result Timer::Mark() {
+Timer::Result Timer::Mark() const {
Result s = Sample();
Result r;
r.wall = s.wall - start_.wall;
diff --git a/test/cpp/qps/timer.h b/test/cpp/qps/timer.h
index 30dbd7e7d5..d1aee1a9d1 100644
--- a/test/cpp/qps/timer.h
+++ b/test/cpp/qps/timer.h
@@ -44,7 +44,7 @@ class Timer {
double system;
};
- Result Mark();
+ Result Mark() const;
static double Now();
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index 935e4853a6..430ffb7cdc 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -43,8 +43,7 @@
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/util/test_config.h"
-DEFINE_int32(driver_port, 0, "Driver server port.");
-DEFINE_int32(server_port, 0, "Spawned server port.");
+DEFINE_int32(driver_port, 0, "Port for communication with driver");
static bool got_sigint = false;
@@ -54,7 +53,7 @@ namespace grpc {
namespace testing {
static void RunServer() {
- QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
+ QpsWorker worker(FLAGS_driver_port);
while (!got_sigint) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
diff --git a/test/proto/qpstest.proto b/test/proto/benchmarks/control.proto
index ef1f9451e9..966ab78baa 100644
--- a/test/proto/qpstest.proto
+++ b/test/proto/benchmarks/control.proto
@@ -1,4 +1,3 @@
-
// Copyright 2015, Google Inc.
// All rights reserved.
//
@@ -28,62 +27,20 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-// An integration test service that covers all the method signature permutations
-// of unary/streaming requests/responses.
syntax = "proto3";
-package grpc.testing;
-
-enum PayloadType {
- // Compressable text format.
- COMPRESSABLE = 0;
-
- // Uncompressable binary format.
- UNCOMPRESSABLE = 1;
-
- // Randomly chosen from all other formats defined in this enum.
- RANDOM = 2;
-}
-
-message StatsRequest {
- // run number
- int32 test_num = 1;
-}
-
-message ServerStats {
- // wall clock time
- double time_elapsed = 1;
-
- // user time used by the server process and threads
- double time_user = 2;
-
- // server time used by the server process and all threads
- double time_system = 3;
-}
-
-message Payload {
- // The type of data in body.
- PayloadType type = 1;
- // Primary contents of payload.
- bytes body = 2;
-}
+import "test/proto/benchmarks/payloads.proto";
+import "test/proto/benchmarks/stats.proto";
-message HistogramData {
- repeated uint32 bucket = 1;
- double min_seen = 2;
- double max_seen = 3;
- double sum = 4;
- double sum_of_squares = 5;
- double count = 6;
-}
+package grpc.testing;
enum ClientType {
- SYNCHRONOUS_CLIENT = 0;
+ SYNC_CLIENT = 0;
ASYNC_CLIENT = 1;
}
enum ServerType {
- SYNCHRONOUS_SERVER = 0;
+ SYNC_SERVER = 0;
ASYNC_SERVER = 1;
}
@@ -92,14 +49,6 @@ enum RpcType {
STREAMING = 1;
}
-enum LoadType {
- CLOSED_LOOP = 0;
- POISSON = 1;
- UNIFORM = 2;
- DETERMINISTIC = 3;
- PARETO = 4;
-}
-
message PoissonParams {
double offered_load = 1;
}
@@ -118,32 +67,45 @@ message ParetoParams {
double alpha = 2;
}
+message ClosedLoopParams {
+}
+
message LoadParams {
oneof load {
- PoissonParams poisson = 1;
- UniformParams uniform = 2;
- DeterministicParams determ = 3;
- ParetoParams pareto = 4;
+ ClosedLoopParams closed_loop = 1;
+ PoissonParams poisson = 2;
+ UniformParams uniform = 3;
+ DeterministicParams determ = 4;
+ ParetoParams pareto = 5;
};
}
+// presence of SecurityParams implies use of TLS
+message SecurityParams {
+ bool use_test_ca = 1;
+ string server_host_override = 2;
+}
+
message ClientConfig {
repeated string server_targets = 1;
ClientType client_type = 2;
- bool enable_ssl = 3;
+ SecurityParams security_params = 3;
int32 outstanding_rpcs_per_channel = 4;
int32 client_channels = 5;
- int32 payload_size = 6;
// only for async client:
int32 async_client_threads = 7;
RpcType rpc_type = 8;
- string host = 9;
- LoadType load_type = 10;
- LoadParams load_params = 11;
+ LoadParams load_params = 10;
+ PayloadConfig payload_config = 11;
+}
+
+message ClientStatus {
+ ClientStats stats = 1;
}
// Request current stats
message Mark {
+ bool reset = 1;
}
message ClientArgs {
@@ -153,22 +115,15 @@ message ClientArgs {
}
}
-message ClientStats {
- HistogramData latencies = 1;
- double time_elapsed = 2;
- double time_user = 3;
- double time_system = 4;
-}
-
-message ClientStatus {
- ClientStats stats = 1;
-}
-
message ServerConfig {
ServerType server_type = 1;
- int32 threads = 2;
- bool enable_ssl = 3;
- string host = 4;
+ SecurityParams security_params = 2;
+ int32 port = 4;
+ // only for async server
+ int32 async_server_threads = 7;
+ // restrict core usage
+ int32 core_limit = 8;
+ PayloadConfig payload_config = 9;
}
message ServerArgs {
@@ -181,38 +136,5 @@ message ServerArgs {
message ServerStatus {
ServerStats stats = 1;
int32 port = 2;
-}
-
-message SimpleRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, server randomly chooses one from other formats.
- PayloadType response_type = 1;
-
- // Desired payload size in the response from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- int32 response_size = 2;
-
- // Optional input payload sent along with the request.
- Payload payload = 3;
-}
-
-message SimpleResponse {
- Payload payload = 1;
-}
-
-service TestService {
- // One request followed by one response.
- // The server returns the client payload as-is.
- rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
-
- // One request followed by one response.
- // The server returns the client payload as-is.
- rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
-}
-
-service Worker {
- // Start test with specified workload
- rpc RunTest(stream ClientArgs) returns (stream ClientStatus);
- // Start test with specified workload
- rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
+ int32 cores = 3;
}
diff --git a/test/proto/benchmarks/payloads.proto b/test/proto/benchmarks/payloads.proto
new file mode 100644
index 0000000000..7e5b2c61ff
--- /dev/null
+++ b/test/proto/benchmarks/payloads.proto
@@ -0,0 +1,55 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+package grpc.testing;
+
+message ByteBufferParams {
+ int32 req_size = 1;
+ int32 resp_size = 2;
+}
+
+message SimpleProtoParams {
+ int32 req_size = 1;
+ int32 resp_size = 2;
+}
+
+message ComplexProtoParams {
+ // TODO (vpai): Fill this in once the details of complex, representative
+ // protos are decided
+}
+
+message PayloadConfig {
+ oneof payload {
+ ByteBufferParams bytebuf_params = 1;
+ SimpleProtoParams simple_params = 2;
+ ComplexProtoParams complex_params = 3;
+ }
+}
diff --git a/test/proto/benchmarks/services.proto b/test/proto/benchmarks/services.proto
new file mode 100644
index 0000000000..4c2cbabdf8
--- /dev/null
+++ b/test/proto/benchmarks/services.proto
@@ -0,0 +1,55 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// An integration test service that covers all the method signature permutations
+// of unary/streaming requests/responses.
+syntax = "proto3";
+
+import "test/proto/messages.proto";
+import "test/proto/benchmarks/control.proto";
+
+package grpc.testing;
+
+service BenchmarkService {
+ // One request followed by one response.
+ // The server returns the client payload as-is.
+ rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
+
+ // One request followed by one response.
+ // The server returns the client payload as-is.
+ rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
+}
+
+service WorkerService {
+ // Start server with specified workload
+ rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
+
+ // Start client with specified workload
+ rpc RunClient(stream ClientArgs) returns (stream ClientStatus);
+}
diff --git a/test/proto/benchmarks/stats.proto b/test/proto/benchmarks/stats.proto
new file mode 100644
index 0000000000..d52144f321
--- /dev/null
+++ b/test/proto/benchmarks/stats.proto
@@ -0,0 +1,59 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+package grpc.testing;
+
+message ServerStats {
+ // wall clock time
+ double time_elapsed = 1;
+
+ // user time used by the server process and threads
+ double time_user = 2;
+
+ // server time used by the server process and all threads
+ double time_system = 3;
+}
+
+message HistogramData {
+ repeated uint32 bucket = 1;
+ double min_seen = 2;
+ double max_seen = 3;
+ double sum = 4;
+ double sum_of_squares = 5;
+ double count = 6;
+}
+
+message ClientStats {
+ HistogramData latencies = 1;
+ double time_elapsed = 2;
+ double time_user = 3;
+ double time_system = 4;
+}