diff options
Diffstat (limited to 'test')
40 files changed, 1143 insertions, 529 deletions
diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c index 68e991716b..e44930ed53 100644 --- a/test/core/client_config/lb_policies_test.c +++ b/test/core/client_config/lb_policies_test.c @@ -119,14 +119,15 @@ static void test_spec_destroy(test_spec *spec) { static void *tag(gpr_intptr t) { return (void *)t; } -static gpr_timespec n_seconds_time(int n) { - return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +static gpr_timespec n_millis_time(int n) { + return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(n, GPR_TIMESPAN)); } static void drain_cq(grpc_completion_queue *cq) { grpc_event ev; do { - ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL); + ev = grpc_completion_queue_next(cq, n_millis_time(5000), NULL); } while (ev.type != GRPC_QUEUE_SHUTDOWN); } @@ -134,29 +135,47 @@ static void kill_server(const servers_fixture *f, size_t i) { gpr_log(GPR_INFO, "KILLING SERVER %d", i); GPR_ASSERT(f->servers[i] != NULL); grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); - GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), - NULL).type == GRPC_OP_COMPLETE); + GPR_ASSERT( + grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL) + .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); f->servers[i] = NULL; } -static void revive_server(const servers_fixture *f, size_t i) { +typedef struct request_data { + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + char *details; + size_t details_capacity; + grpc_status_code status; + grpc_call_details *call_details; +} request_data; + +static void revive_server(const servers_fixture *f, request_data *rdata, + size_t i) { int got_port; gpr_log(GPR_INFO, "RAISE AGAIN SERVER %d", i); GPR_ASSERT(f->servers[i] == NULL); + + gpr_log(GPR_DEBUG, "revive: %s", f->servers_hostports[i]); + f->servers[i] = grpc_server_create(NULL, NULL); grpc_server_register_completion_queue(f->servers[i], f->cq, NULL); GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port( f->servers[i], f->servers_hostports[i])) > 0); grpc_server_start(f->servers[i]); + + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f->servers[i], &f->server_calls[i], + &rdata->call_details[i], + &f->request_metadata_recv[i], f->cq, + f->cq, tag(1000 + (int)i))); } static servers_fixture *setup_servers(const char *server_host, + request_data *rdata, const size_t num_servers) { servers_fixture *f = gpr_malloc(sizeof(servers_fixture)); - int *ports; - int got_port; size_t i; f->num_servers = num_servers; @@ -164,23 +183,16 @@ static servers_fixture *setup_servers(const char *server_host, f->request_metadata_recv = gpr_malloc(sizeof(grpc_metadata_array) * num_servers); /* Create servers. */ - ports = gpr_malloc(sizeof(int *) * num_servers); f->servers = gpr_malloc(sizeof(grpc_server *) * num_servers); f->servers_hostports = gpr_malloc(sizeof(char *) * num_servers); f->cq = grpc_completion_queue_create(NULL); for (i = 0; i < num_servers; i++) { - ports[i] = grpc_pick_unused_port_or_die(); - - gpr_join_host_port(&f->servers_hostports[i], server_host, ports[i]); - - f->servers[i] = grpc_server_create(NULL, NULL); - grpc_server_register_completion_queue(f->servers[i], f->cq, NULL); - GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port( - f->servers[i], f->servers_hostports[i])) > 0); - GPR_ASSERT(ports[i] == got_port); - grpc_server_start(f->servers[i]); + grpc_metadata_array_init(&f->request_metadata_recv[i]); + gpr_join_host_port(&f->servers_hostports[i], server_host, + grpc_pick_unused_port_or_die()); + f->servers[i] = 0; + revive_server(f, rdata, i); } - gpr_free(ports); return f; } @@ -191,8 +203,8 @@ static void teardown_servers(servers_fixture *f) { if (f->servers[i] == NULL) continue; grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), - NULL).type == GRPC_OP_COMPLETE); + n_millis_time(5000), NULL) + .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); } grpc_completion_queue_shutdown(f->cq); @@ -203,6 +215,7 @@ static void teardown_servers(servers_fixture *f) { for (i = 0; i < f->num_servers; i++) { gpr_free(f->servers_hostports[i]); + grpc_metadata_array_destroy(&f->request_metadata_recv[i]); } gpr_free(f->servers_hostports); @@ -211,22 +224,12 @@ static void teardown_servers(servers_fixture *f) { gpr_free(f); } -typedef struct request_data { - grpc_metadata_array initial_metadata_recv; - grpc_metadata_array trailing_metadata_recv; - char *details; - size_t details_capacity; - grpc_status_code status; - grpc_call_details *call_details; -} request_data; - /** Returns connection sequence (server indices), which must be freed */ int *perform_request(servers_fixture *f, grpc_channel *client, request_data *rdata, const test_spec *spec) { grpc_call *c; int s_idx; int *s_valid; - gpr_timespec deadline; grpc_op ops[6]; grpc_op *op; int was_cancelled; @@ -237,7 +240,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client, int completed_client; s_valid = gpr_malloc(sizeof(int) * f->num_servers); - rdata->call_details = gpr_malloc(sizeof(grpc_call_details) * f->num_servers); connection_sequence = gpr_malloc(sizeof(int) * spec->num_iters); /* Send a trivial request. */ @@ -253,7 +255,7 @@ int *perform_request(servers_fixture *f, grpc_channel *client, kill_server(f, i); } else if (spec->revive_at[iter_num][i] != 0) { /* killing takes precedence */ - revive_server(f, i); + revive_server(f, rdata, i); } } @@ -266,9 +268,9 @@ int *perform_request(servers_fixture *f, grpc_channel *client, } memset(s_valid, 0, f->num_servers * sizeof(int)); - deadline = n_seconds_time(1); c = grpc_channel_create_call(client, NULL, GRPC_PROPAGATE_DEFAULTS, f->cq, - "/foo", "foo.test.google.fr", deadline, NULL); + "/foo", "foo.test.google.fr", gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); GPR_ASSERT(c); completed_client = 0; @@ -300,22 +302,9 @@ int *perform_request(servers_fixture *f, grpc_channel *client, GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL)); - /* "listen" on all servers */ - for (i = 0; i < f->num_servers; i++) { - grpc_metadata_array_init(&f->request_metadata_recv[i]); - if (f->servers[i] != NULL) { - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_call(f->servers[i], &f->server_calls[i], - &rdata->call_details[i], - &f->request_metadata_recv[i], f->cq, - f->cq, tag(1000 + (int)i))); - } - } - s_idx = -1; - while ((ev = grpc_completion_queue_next( - f->cq, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(300), NULL)).type != - GRPC_QUEUE_TIMEOUT) { + while ((ev = grpc_completion_queue_next(f->cq, n_millis_time(s_idx == -1 ? 3000 : 200), NULL)) + .type != GRPC_QUEUE_TIMEOUT) { GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); read_tag = ((int)(gpr_intptr)ev.tag); gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d", @@ -327,11 +316,14 @@ int *perform_request(servers_fixture *f, grpc_channel *client, s_valid[s_idx] = 1; connection_sequence[iter_num] = s_idx; } else if (read_tag == 1) { + gpr_log(GPR_DEBUG, "client timed out"); GPR_ASSERT(ev.success); completed_client = 1; } } + gpr_log(GPR_DEBUG, "s_idx=%d", s_idx); + if (s_idx >= 0) { op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -361,25 +353,30 @@ int *perform_request(servers_fixture *f, grpc_channel *client, } cq_verify(cqv); + gpr_log(GPR_DEBUG, "status=%d; %s", rdata->status, rdata->details); GPR_ASSERT(rdata->status == GRPC_STATUS_UNIMPLEMENTED); GPR_ASSERT(0 == strcmp(rdata->details, "xyz")); GPR_ASSERT(0 == strcmp(rdata->call_details[s_idx].method, "/foo")); GPR_ASSERT(0 == strcmp(rdata->call_details[s_idx].host, "foo.test.google.fr")); GPR_ASSERT(was_cancelled == 1); + + grpc_call_destroy(f->server_calls[s_idx]); + + /* ask for the next request on this server */ + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + f->servers[s_idx], &f->server_calls[s_idx], + &rdata->call_details[s_idx], + &f->request_metadata_recv[s_idx], f->cq, + f->cq, tag(1000 + (int)s_idx))); } else { + grpc_call_cancel(c, NULL); if (!completed_client) { cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); } } - for (i = 0; i < f->num_servers; i++) { - if (s_valid[i] != 0) { - grpc_call_destroy(f->server_calls[i]); - } - grpc_metadata_array_destroy(&f->request_metadata_recv[i]); - } grpc_metadata_array_destroy(&rdata->initial_metadata_recv); grpc_metadata_array_destroy(&rdata->trailing_metadata_recv); @@ -393,7 +390,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client, gpr_free(rdata->details); } - gpr_free(rdata->call_details); gpr_free(s_valid); return connection_sequence; @@ -456,7 +452,10 @@ void run_spec(const test_spec *spec) { char *servers_hostports_str; int *actual_connection_sequence; request_data rdata; - servers_fixture *f = setup_servers("127.0.0.1", spec->num_servers); + servers_fixture *f; + rdata.call_details = + gpr_malloc(sizeof(grpc_call_details) * spec->num_servers); + f = setup_servers("127.0.0.1", &rdata, spec->num_servers); /* Create client. */ servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports, @@ -475,6 +474,7 @@ void run_spec(const test_spec *spec) { gpr_free(client_hostport); gpr_free(servers_hostports_str); gpr_free(actual_connection_sequence); + gpr_free(rdata.call_details); grpc_channel_destroy(client); teardown_servers(f); diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index af8f48576c..38d3b2218a 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -46,21 +46,21 @@ uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=Fal # maps fixture name to whether it requires the security library END2END_FIXTURES = { + 'h2_compress': default_unsecure_fixture_options, 'h2_fakesec': default_secure_fixture_options._replace(ci_mac=False), 'h2_full': default_unsecure_fixture_options, - 'h2_uchannel': default_unsecure_fixture_options, - 'h2_compress': default_unsecure_fixture_options, - 'h2_uds': uds_fixture_options, - 'h2_uds+poll': uds_fixture_options._replace(platforms=['linux']), 'h2_full+poll': default_unsecure_fixture_options._replace(platforms=['linux']), + 'h2_oauth2': default_secure_fixture_options._replace(ci_mac=False), 'h2_proxy': default_unsecure_fixture_options._replace(includes_proxy=True, ci_mac=False), + 'h2_sockpair_1byte': socketpair_unsecure_fixture_options._replace(ci_mac=False), + 'h2_sockpair': socketpair_unsecure_fixture_options._replace(ci_mac=False), + 'h2_sockpair+trace': socketpair_unsecure_fixture_options, 'h2_ssl': default_secure_fixture_options, 'h2_ssl+poll': default_secure_fixture_options._replace(platforms=['linux']), 'h2_ssl_proxy': default_secure_fixture_options._replace(includes_proxy=True, ci_mac=False), - 'h2_oauth2': default_secure_fixture_options._replace(ci_mac=False), - 'h2_sockpair': socketpair_unsecure_fixture_options._replace(ci_mac=False), - 'h2_sockpair_1byte': socketpair_unsecure_fixture_options._replace(ci_mac=False), - 'h2_sockpair+trace': socketpair_unsecure_fixture_options, + 'h2_uchannel': default_unsecure_fixture_options, + 'h2_uds+poll': uds_fixture_options._replace(platforms=['linux']), + 'h2_uds': uds_fixture_options, } TestOptions = collections.namedtuple('TestOptions', 'needs_fullstack needs_dns proxyable flaky secure') @@ -70,38 +70,40 @@ connectivity_test_options = default_test_options._replace(needs_fullstack=True) # maps test names to options END2END_TESTS = { 'bad_hostname': default_test_options, - 'cancel_after_client_done': default_test_options, + 'binary_metadata': default_test_options, + 'call_creds': default_test_options._replace(secure=True), 'cancel_after_accept': default_test_options, + 'cancel_after_client_done': default_test_options, 'cancel_after_invoke': default_test_options, 'cancel_before_invoke': default_test_options, 'cancel_in_a_vacuum': default_test_options, + 'cancel_with_status': default_test_options, 'census_simple_request': default_test_options, 'channel_connectivity': connectivity_test_options._replace(proxyable=False), + 'compressed_payload': default_test_options._replace(proxyable=False), 'default_host': default_test_options._replace(needs_fullstack=True, needs_dns=True), 'disappearing_server': connectivity_test_options, - 'shutdown_finishes_calls': default_test_options, - 'shutdown_finishes_tags': default_test_options, 'empty_batch': default_test_options, 'graceful_server_shutdown': default_test_options, + 'high_initial_seqno': default_test_options, 'invoke_large_request': default_test_options, + 'large_metadata': default_test_options, 'max_concurrent_streams': default_test_options._replace(proxyable=False), 'max_message_length': default_test_options, + 'metadata': default_test_options, + 'negative_deadline': default_test_options, 'no_op': default_test_options, + 'payload': default_test_options, 'ping_pong_streaming': default_test_options, 'registered_call': default_test_options, - 'binary_metadata': default_test_options, - 'metadata': default_test_options, - 'call_creds': default_test_options._replace(secure=True), - 'payload': default_test_options, - 'trailing_metadata': default_test_options, - 'compressed_payload': default_test_options._replace(proxyable=False), 'request_with_flags': default_test_options._replace(proxyable=False), - 'large_metadata': default_test_options, 'request_with_payload': default_test_options, 'server_finishes_request': default_test_options, + 'shutdown_finishes_calls': default_test_options, + 'shutdown_finishes_tags': default_test_options, 'simple_delayed_request': connectivity_test_options, 'simple_request': default_test_options, - 'high_initial_seqno': default_test_options, + 'trailing_metadata': default_test_options, } diff --git a/test/core/end2end/tests/cancel_with_status.c b/test/core/end2end/tests/cancel_with_status.c new file mode 100644 index 0000000000..eecfa83fa4 --- /dev/null +++ b/test/core/end2end/tests/cancel_with_status.c @@ -0,0 +1,183 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include "src/core/support/string.h" +#include <grpc/byte_buffer.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "test/core/end2end/cq_verifier.h" + +enum { TIMEOUT = 200000 }; + +static void *tag(gpr_intptr t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "%s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_client(&f, client_args); + config.init_server(&f, server_args); + return f; +} + +static gpr_timespec n_seconds_time(int n) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +} + +static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL).type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); +} + +static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) { + grpc_call *c; + gpr_timespec deadline = five_seconds_time(); + cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + + gpr_log(GPR_DEBUG, "test with %d ops", num_ops); + + c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + "/foo", "foo.test.google.fr:1234", deadline, + NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + GPR_ASSERT(num_ops <= (size_t)(op - ops)); + error = grpc_call_start_batch(c, ops, num_ops, tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + grpc_call_cancel_with_status(c, GRPC_STATUS_UNIMPLEMENTED, "xyz", NULL); + + cq_expect_completion(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED); + GPR_ASSERT(0 == strcmp(details, "xyz")); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + + grpc_call_destroy(c); + + cq_verifier_destroy(cqv); +} + +static void test_invoke_simple_request(grpc_end2end_test_config config, size_t num_ops) { + grpc_end2end_test_fixture f; + + f = begin_test(config, "test_invoke_simple_request", NULL, NULL); + simple_request_body(f, num_ops); + end_test(&f); + config.tear_down_data(&f); +} + +void grpc_end2end_tests(grpc_end2end_test_config config) { + size_t i; + for (i = 1; i <= 4; i++) { + test_invoke_simple_request(config, i); + } +} diff --git a/test/core/end2end/tests/negative_deadline.c b/test/core/end2end/tests/negative_deadline.c new file mode 100644 index 0000000000..abcc1ba358 --- /dev/null +++ b/test/core/end2end/tests/negative_deadline.c @@ -0,0 +1,180 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include "src/core/support/string.h" +#include <grpc/byte_buffer.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "test/core/end2end/cq_verifier.h" + +enum { TIMEOUT = 200000 }; + +static void *tag(gpr_intptr t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "%s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_client(&f, client_args); + config.init_server(&f, server_args); + return f; +} + +static gpr_timespec n_seconds_time(int n) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +} + +static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL).type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); +} + +static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) { + grpc_call *c; + gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_REALTIME); + cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + + gpr_log(GPR_DEBUG, "test with %d ops", num_ops); + + c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + "/foo", "foo.test.google.fr:1234", deadline, + NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + GPR_ASSERT(num_ops <= (size_t)(op - ops)); + error = grpc_call_start_batch(c, ops, num_ops, tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_DEADLINE_EXCEEDED); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + + grpc_call_destroy(c); + + cq_verifier_destroy(cqv); +} + +static void test_invoke_simple_request(grpc_end2end_test_config config, size_t num_ops) { + grpc_end2end_test_fixture f; + + f = begin_test(config, "test_invoke_simple_request", NULL, NULL); + simple_request_body(f, num_ops); + end_test(&f); + config.tear_down_data(&f); +} + +void grpc_end2end_tests(grpc_end2end_test_config config) { + size_t i; + for (i = 1; i <= 4; i++) { + test_invoke_simple_request(config, i); + } +} diff --git a/test/core/network_benchmarks/low_level_ping_pong.c b/test/core/network_benchmarks/low_level_ping_pong.c index be82a36306..7a2d894481 100644 --- a/test/core/network_benchmarks/low_level_ping_pong.c +++ b/test/core/network_benchmarks/low_level_ping_pong.c @@ -82,9 +82,9 @@ typedef struct thread_args { /* Basic call to read() */ static int read_bytes(int fd, char *buf, size_t read_size, int spin) { size_t bytes_read = 0; - int err; + ssize_t err; do { - err = (int)read(fd, buf + bytes_read, read_size - bytes_read); + err = read(fd, buf + bytes_read, read_size - bytes_read); if (err < 0) { if (errno == EINTR) { continue; @@ -115,6 +115,7 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) { struct pollfd pfd; size_t bytes_read = 0; int err; + ssize_t err2; pfd.fd = fd; pfd.events = POLLIN; @@ -132,13 +133,13 @@ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) { GPR_ASSERT(err == 1); GPR_ASSERT(pfd.revents == POLLIN); do { - err = (int)read(fd, buf + bytes_read, read_size - bytes_read); - } while (err < 0 && errno == EINTR); - if (err < 0 && errno != EAGAIN) { + err2 = read(fd, buf + bytes_read, read_size - bytes_read); + } while (err2 < 0 && errno == EINTR); + if (err2 < 0 && errno != EAGAIN) { gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno)); return -1; } - bytes_read += (size_t)err; + bytes_read += (size_t) err2; } while (bytes_read < read_size); return 0; } @@ -157,6 +158,7 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) { struct epoll_event ev; size_t bytes_read = 0; int err; + ssize_t err2; size_t read_size = args->msg_size; do { @@ -172,11 +174,11 @@ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) { GPR_ASSERT(ev.data.fd == args->fds.read_fd); do { do { - err = (int)read(args->fds.read_fd, buf + bytes_read, - read_size - bytes_read); - } while (err < 0 && errno == EINTR); + err2 = read(args->fds.read_fd, buf + bytes_read, + read_size - bytes_read); + } while (err2 < 0 && errno == EINTR); if (errno == EAGAIN) break; - bytes_read += (size_t)err; + bytes_read += (size_t) err2; /* TODO(klempner): This should really be doing an extra call after we are done to ensure we see an EAGAIN */ } while (bytes_read < read_size); @@ -200,11 +202,11 @@ static int epoll_read_bytes_spin(struct thread_args *args, char *buf) { */ static int blocking_write_bytes(struct thread_args *args, char *buf) { size_t bytes_written = 0; - int err; + ssize_t err; size_t write_size = args->msg_size; do { - err = (int)write(args->fds.write_fd, buf + bytes_written, - write_size - bytes_written); + err = write(args->fds.write_fd, buf + bytes_written, + write_size - bytes_written); if (err < 0) { if (errno == EINTR) { continue; @@ -298,7 +300,7 @@ static void print_histogram(gpr_histogram *histogram) { static double now(void) { gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME); - return 1e9 * (double)tv.tv_sec + tv.tv_nsec; + return 1e9 * (double)tv.tv_sec + (double)tv.tv_nsec; } static void client_thread(thread_args *args) { @@ -374,7 +376,7 @@ error: return -1; } -static int connect_client(struct sockaddr *addr, int len) { +static int connect_client(struct sockaddr *addr, socklen_t len) { int fd = socket(addr->sa_family, SOCK_STREAM, 0); int err; if (fd < 0) { @@ -388,7 +390,7 @@ static int connect_client(struct sockaddr *addr, int len) { } do { - err = (int)connect(fd, addr, (socklen_t)len); + err = connect(fd, addr, len); } while (err < 0 && errno == EINTR); if (err < 0) { @@ -587,27 +589,27 @@ static int run_benchmark(char *socket_type, thread_args *client_args, return 0; } -static int run_all_benchmarks(int msg_size) { +static int run_all_benchmarks(size_t msg_size) { int error = 0; size_t i; for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) { - test_strategy *ts = &test_strategies[i]; + test_strategy *strategy = &test_strategies[i]; size_t j; for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) { thread_args *client_args = malloc(sizeof(thread_args)); thread_args *server_args = malloc(sizeof(thread_args)); char *socket_type = socket_types[j]; - client_args->read_bytes = ts->read_strategy; + client_args->read_bytes = strategy->read_strategy; client_args->write_bytes = blocking_write_bytes; - client_args->setup = ts->setup; - client_args->msg_size = (size_t)msg_size; - client_args->strategy_name = ts->name; - server_args->read_bytes = ts->read_strategy; + client_args->setup = strategy->setup; + client_args->msg_size = msg_size; + client_args->strategy_name = strategy->name; + server_args->read_bytes = strategy->read_strategy; server_args->write_bytes = blocking_write_bytes; - server_args->setup = ts->setup; - server_args->msg_size = (size_t)msg_size; - server_args->strategy_name = ts->name; + server_args->setup = strategy->setup; + server_args->msg_size = msg_size; + server_args->strategy_name = strategy->name; error = run_benchmark(socket_type, client_args, server_args); if (error < 0) { return error; @@ -624,7 +626,7 @@ int main(int argc, char **argv) { char *read_strategy = NULL; char *socket_type = NULL; size_t i; - const test_strategy *ts = NULL; + const test_strategy *strategy = NULL; int error = 0; gpr_cmdline *cmdline = @@ -644,7 +646,7 @@ int main(int argc, char **argv) { if (read_strategy == NULL) { gpr_log(GPR_INFO, "No strategy specified, running all benchmarks"); - return run_all_benchmarks(msg_size); + return run_all_benchmarks((size_t)msg_size); } if (socket_type == NULL) { @@ -658,22 +660,22 @@ int main(int argc, char **argv) { for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) { if (strcmp(test_strategies[i].name, read_strategy) == 0) { - ts = &test_strategies[i]; + strategy = &test_strategies[i]; } } - if (ts == NULL) { + if (strategy == NULL) { fprintf(stderr, "Invalid read strategy %s\n", read_strategy); return -1; } - client_args->read_bytes = ts->read_strategy; + client_args->read_bytes = strategy->read_strategy; client_args->write_bytes = blocking_write_bytes; - client_args->setup = ts->setup; + client_args->setup = strategy->setup; client_args->msg_size = (size_t)msg_size; client_args->strategy_name = read_strategy; - server_args->read_bytes = ts->read_strategy; + server_args->read_bytes = strategy->read_strategy; server_args->write_bytes = blocking_write_bytes; - server_args->setup = ts->setup; + server_args->setup = strategy->setup; server_args->msg_size = (size_t)msg_size; server_args->strategy_name = read_strategy; diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c index 560e0ac7b2..7f9cd6b62b 100644 --- a/test/core/surface/byte_buffer_reader_test.c +++ b/test/core/surface/byte_buffer_reader_test.c @@ -184,6 +184,39 @@ static void test_byte_buffer_from_reader(void) { grpc_byte_buffer_destroy(buffer_from_reader); } +static void test_readall(void) { + char* lotsa_as[512]; + char* lotsa_bs[1024]; + gpr_slice slices[2]; + grpc_byte_buffer *buffer; + grpc_byte_buffer_reader reader; + gpr_slice slice_out; + + LOG_TEST("test_readall"); + + memset(lotsa_as, 'a', 512); + memset(lotsa_bs, 'b', 1024); + /* use slices large enough to overflow inlining */ + slices[0] = gpr_slice_malloc(512); + memcpy(GPR_SLICE_START_PTR(slices[0]), lotsa_as, 512); + slices[1] = gpr_slice_malloc(1024); + memcpy(GPR_SLICE_START_PTR(slices[1]), lotsa_bs, 1024); + + buffer = grpc_raw_byte_buffer_create(slices, 2); + gpr_slice_unref(slices[0]); + gpr_slice_unref(slices[1]); + + grpc_byte_buffer_reader_init(&reader, buffer); + slice_out = grpc_byte_buffer_reader_readall(&reader); + + GPR_ASSERT(GPR_SLICE_LENGTH(slice_out) == 512 + 1024); + GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(slice_out), lotsa_as, 512) == 0); + GPR_ASSERT(memcmp(&(GPR_SLICE_START_PTR(slice_out)[512]), lotsa_bs, 1024) == + 0); + gpr_slice_unref(slice_out); + grpc_byte_buffer_destroy(buffer); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); test_read_one_slice(); @@ -192,6 +225,6 @@ int main(int argc, char **argv) { test_read_gzip_compressed_slice(); test_read_deflate_compressed_slice(); test_byte_buffer_from_reader(); - + test_readall(); return 0; } diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 9c35fede8f..80057d893e 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -62,7 +62,7 @@ template <class W, class R> class MockClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { public: - void WaitForInitialMetadata() {} + void WaitForInitialMetadata() GRPC_OVERRIDE {} bool Read(R* msg) GRPC_OVERRIDE { return true; } bool Write(const W& msg) GRPC_OVERRIDE { return true; } bool WritesDone() GRPC_OVERRIDE { return true; } @@ -73,7 +73,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> GRPC_FINAL : public ClientReaderWriterInterface<EchoRequest, EchoResponse> { public: MockClientReaderWriter() : writes_done_(false) {} - void WaitForInitialMetadata() {} + void WaitForInitialMetadata() GRPC_OVERRIDE {} bool Read(EchoResponse* msg) GRPC_OVERRIDE { if (writes_done_) return false; msg->set_message(last_message_); diff --git a/test/cpp/qps/async_streaming_ping_pong_test.cc b/test/cpp/qps/async_streaming_ping_pong_test.cc index 411df4d32a..9fef93a70f 100644 --- a/test/cpp/qps/async_streaming_ping_pong_test.cc +++ b/test/cpp/qps/async_streaming_ping_pong_test.cc @@ -35,8 +35,6 @@ #include <grpc/support/log.h> -#include <signal.h> - #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" #include "test/cpp/util/benchmark_config.h" @@ -52,17 +50,15 @@ static void RunAsyncStreamingPingPong() { ClientConfig client_config; client_config.set_client_type(ASYNC_CLIENT); - client_config.set_enable_ssl(false); client_config.set_outstanding_rpcs_per_channel(1); client_config.set_client_channels(1); - client_config.set_payload_size(1); client_config.set_async_client_threads(1); client_config.set_rpc_type(STREAMING); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(1); + server_config.set_async_server_threads(1); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); @@ -77,7 +73,6 @@ static void RunAsyncStreamingPingPong() { int main(int argc, char** argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - signal(SIGPIPE, SIG_IGN); grpc::testing::RunAsyncStreamingPingPong(); return 0; } diff --git a/test/cpp/qps/async_unary_ping_pong_test.cc b/test/cpp/qps/async_unary_ping_pong_test.cc index eda31b5744..b4ab0e5d59 100644 --- a/test/cpp/qps/async_unary_ping_pong_test.cc +++ b/test/cpp/qps/async_unary_ping_pong_test.cc @@ -35,8 +35,6 @@ #include <grpc/support/log.h> -#include <signal.h> - #include "test/cpp/qps/driver.h" #include "test/cpp/qps/report.h" #include "test/cpp/util/benchmark_config.h" @@ -52,17 +50,15 @@ static void RunAsyncUnaryPingPong() { ClientConfig client_config; client_config.set_client_type(ASYNC_CLIENT); - client_config.set_enable_ssl(false); client_config.set_outstanding_rpcs_per_channel(1); client_config.set_client_channels(1); - client_config.set_payload_size(1); client_config.set_async_client_threads(1); client_config.set_rpc_type(UNARY); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(1); + server_config.set_async_server_threads(1); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); @@ -75,7 +71,6 @@ static void RunAsyncUnaryPingPong() { int main(int argc, char** argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - signal(SIGPIPE, SIG_IGN); grpc::testing::RunAsyncUnaryPingPong(); return 0; diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index cd8b34f65b..f4400692fe 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -40,8 +40,9 @@ #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" -#include "test/proto/qpstest.grpc.pb.h" #include "test/cpp/util/create_test_channel.h" +#include "test/proto/benchmarks/payloads.grpc.pb.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { @@ -75,27 +76,54 @@ class Client { channels_[i].init(config.server_targets(i % config.server_targets_size()), config); } - request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); - request_.set_response_size(config.payload_size()); + if (config.payload_config().has_bytebuf_params()) { + GPR_ASSERT(false); // not yet implemented + } else if (config.payload_config().has_simple_params()) { + request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + request_.set_response_size( + config.payload_config().simple_params().resp_size()); + request_.mutable_payload()->set_type( + grpc::testing::PayloadType::COMPRESSABLE); + int size = config.payload_config().simple_params().req_size(); + std::unique_ptr<char[]> body(new char[size]); + request_.mutable_payload()->set_body(body.get(), size); + } else if (config.payload_config().has_complex_params()) { + GPR_ASSERT(false); // not yet implemented + } else { + // default should be simple proto without payloads + request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE); + request_.set_response_size(0); + request_.mutable_payload()->set_type( + grpc::testing::PayloadType::COMPRESSABLE); + } } virtual ~Client() {} - ClientStats Mark() { + ClientStats Mark(bool reset) { Histogram latencies; + Timer::Result timer_result; + // avoid std::vector for old compilers that expect a copy constructor - Histogram* to_merge = new Histogram[threads_.size()]; - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->BeginSwap(&to_merge[i]); - } - std::unique_ptr<Timer> timer(new Timer); - timer_.swap(timer); - for (size_t i = 0; i < threads_.size(); i++) { - threads_[i]->EndSwap(); - latencies.Merge(&to_merge[i]); + if (reset) { + Histogram* to_merge = new Histogram[threads_.size()]; + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->BeginSwap(&to_merge[i]); + } + std::unique_ptr<Timer> timer(new Timer); + timer_.swap(timer); + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->EndSwap(); + latencies.Merge(to_merge[i]); + } + delete[] to_merge; + timer_result = timer->Mark(); + } else { + // merge snapshots of each thread histogram + for (size_t i = 0; i < threads_.size(); i++) { + threads_[i]->MergeStatsInto(&latencies); + } + timer_result = timer_->Mark(); } - delete[] to_merge; - - auto timer_result = timer->Mark(); ClientStats stats; latencies.FillProto(stats.mutable_latencies()); @@ -122,15 +150,18 @@ class Client { // We have to use a 2-phase init like this with a default // constructor followed by an initializer function to make // old compilers happy with using this in std::vector - channel_ = CreateTestChannel(target, config.enable_ssl()); - stub_ = TestService::NewStub(channel_); + channel_ = CreateTestChannel( + target, config.security_params().server_host_override(), + config.has_security_params(), + !config.security_params().use_test_ca()); + stub_ = BenchmarkService::NewStub(channel_); } Channel* get_channel() { return channel_.get(); } - TestService::Stub* get_stub() { return stub_.get(); } + BenchmarkService::Stub* get_stub() { return stub_.get(); } private: std::shared_ptr<Channel> channel_; - std::unique_ptr<TestService::Stub> stub_; + std::unique_ptr<BenchmarkService::Stub> stub_; }; std::vector<ClientChannelInfo> channels_; @@ -146,37 +177,41 @@ class Client { void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads - if (config.load_type() == CLOSED_LOOP) { + const auto& load = config.load_params(); + + std::unique_ptr<RandomDist> random_dist; + switch (load.load_case()) { + case LoadParams::kClosedLoop: + // Closed-loop doesn't use random dist at all + break; + case LoadParams::kPoisson: + random_dist.reset( + new ExpDist(load.poisson().offered_load() / num_threads)); + break; + case LoadParams::kUniform: + random_dist.reset( + new UniformDist(load.uniform().interarrival_lo() * num_threads, + load.uniform().interarrival_hi() * num_threads)); + break; + case LoadParams::kDeterm: + random_dist.reset( + new DetDist(num_threads / load.determ().offered_load())); + break; + case LoadParams::kPareto: + random_dist.reset( + new ParetoDist(load.pareto().interarrival_base() * num_threads, + load.pareto().alpha())); + break; + default: + GPR_ASSERT(false); + } + + // Set closed_loop_ based on whether or not random_dist is set + if (!random_dist) { closed_loop_ = true; } else { closed_loop_ = false; - - std::unique_ptr<RandomDist> random_dist; - const auto& load = config.load_params(); - switch (config.load_type()) { - case POISSON: - random_dist.reset( - new ExpDist(load.poisson().offered_load() / num_threads)); - break; - case UNIFORM: - random_dist.reset( - new UniformDist(load.uniform().interarrival_lo() * num_threads, - load.uniform().interarrival_hi() * num_threads)); - break; - case DETERMINISTIC: - random_dist.reset( - new DetDist(num_threads / load.determ().offered_load())); - break; - case PARETO: - random_dist.reset( - new ParetoDist(load.pareto().interarrival_base() * num_threads, - load.pareto().alpha())); - break; - default: - GPR_ASSERT(false); - break; - } - + // set up interarrival timer according to random dist interarrival_timer_.init(*random_dist, num_threads); for (size_t i = 0; i < num_threads; i++) { next_time_.push_back( @@ -204,7 +239,7 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), - new_(nullptr), + new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -219,16 +254,21 @@ class Client { void BeginSwap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); - new_ = n; + new_stats_ = n; } void EndSwap() { std::unique_lock<std::mutex> g(mu_); - while (new_ != nullptr) { + while (new_stats_ != nullptr) { cv_.wait(g); }; } + void MergeStatsInto(Histogram* hist) { + std::unique_lock<std::mutex> g(mu_); + hist->Merge(histogram_); + } + private: Thread(const Thread&); Thread& operator=(const Thread&); @@ -246,21 +286,21 @@ class Client { if (done_) { return; } - // check if we're marking, swap out the histogram if so - if (new_) { - new_->Swap(&histogram_); - new_ = nullptr; + // check if we're resetting stats, swap out the histogram if so + if (new_stats_) { + new_stats_->Swap(&histogram_); + new_stats_ = nullptr; cv_.notify_one(); } } } - TestService::Stub* stub_; + BenchmarkService::Stub* stub_; ClientConfig config_; std::mutex mu_; std::condition_variable cv_; bool done_; - Histogram* new_; + Histogram* new_stats_; Histogram histogram_; Client* client_; size_t idx_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ed42b7db6..9594179822 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -48,10 +48,10 @@ #include <gflags/gflags.h> #include <grpc++/client_context.h> -#include "test/proto/qpstest.grpc.pb.h" #include "test/cpp/qps/timer.h" #include "test/cpp/qps/client.h" #include "test/cpp/util/create_test_channel.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { namespace testing { @@ -88,10 +88,10 @@ template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - int channel_id, TestService::Stub* stub, const RequestType& req, + int channel_id, BenchmarkService::Stub* stub, const RequestType& req, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&, + BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) : ClientRpcContext(channel_id), @@ -131,13 +131,13 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { return true; // we're done, this'll be ignored } grpc::ClientContext context_; - TestService::Stub* stub_; + BenchmarkService::Stub* stub_; RequestType req_; ResponseType response_; bool (ClientRpcContextUnaryImpl::*next_state_)(bool); std::function<void(grpc::Status, ResponseType*)> callback_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( - TestService::Stub*, grpc::ClientContext*, const RequestType&, + BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req_; grpc::Status status_; double start_; @@ -151,7 +151,7 @@ class AsyncClient : public Client { public: explicit AsyncClient( const ClientConfig& config, - std::function<ClientRpcContext*(int, TestService::Stub*, + std::function<ClientRpcContext*(int, BenchmarkService::Stub*, const SimpleRequest&)> setup_ctx) : Client(config), channel_lock_(new std::mutex[config.client_channels()]), @@ -354,11 +354,12 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient { private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>> - StartReq(TestService::Stub* stub, grpc::ClientContext* ctx, + StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, const SimpleRequest& request, CompletionQueue* cq) { return stub->AsyncUnaryCall(ctx, request, cq); }; - static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + static ClientRpcContext* SetupCtx(int channel_id, + BenchmarkService::Stub* stub, const SimpleRequest& req) { return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( channel_id, stub, req, AsyncUnaryClient::StartReq, @@ -370,10 +371,11 @@ template <class RequestType, class ResponseType> class ClientRpcContextStreamingImpl : public ClientRpcContext { public: ClientRpcContextStreamingImpl( - int channel_id, TestService::Stub* stub, const RequestType& req, - std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter< - RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*, - CompletionQueue*, void*)> start_req, + int channel_id, BenchmarkService::Stub* stub, const RequestType& req, + std::function<std::unique_ptr< + grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, + void*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) : ClientRpcContext(channel_id), context_(), @@ -420,15 +422,15 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return StartWrite(ok); } grpc::ClientContext context_; - TestService::Stub* stub_; + BenchmarkService::Stub* stub_; RequestType req_; ResponseType response_; bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); std::function<void(grpc::Status, ResponseType*)> callback_; std::function< std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( - TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> - start_req_; + BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, + void*)> start_req_; grpc::Status status_; double start_; std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>> @@ -439,8 +441,8 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { public: explicit AsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx) { - // async streaming currently only supported closed loop - GPR_ASSERT(config.load_type() == CLOSED_LOOP); + // async streaming currently only supports closed loop + GPR_ASSERT(closed_loop_); StartThreads(config.async_client_threads()); } @@ -451,12 +453,13 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient { static void CheckDone(grpc::Status s, SimpleResponse* response) {} static std::unique_ptr< grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>> - StartReq(TestService::Stub* stub, grpc::ClientContext* ctx, + StartReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx, CompletionQueue* cq, void* tag) { auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub, + static ClientRpcContext* SetupCtx(int channel_id, + BenchmarkService::Stub* stub, const SimpleRequest& req) { return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( channel_id, stub, req, AsyncStreamingClient::StartReq, diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index ed4134c743..10d680860a 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -54,10 +54,10 @@ #include "test/cpp/util/create_test_channel.h" #include "test/cpp/qps/client.h" -#include "test/proto/qpstest.grpc.pb.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/timer.h" +#include "test/proto/benchmarks/services.grpc.pb.h" #include "src/core/profiling/timers.h" diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index dd5c4f4f73..2c6247deea 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -48,6 +48,7 @@ #include "test/cpp/qps/driver.h" #include "test/cpp/qps/histogram.h" #include "test/cpp/qps/qps_worker.h" +#include "test/proto/benchmarks/services.grpc.pb.h" using std::list; using std::thread; @@ -91,12 +92,12 @@ static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) { } struct ServerData { - unique_ptr<Worker::Stub> stub; + unique_ptr<WorkerService::Stub> stub; unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream; }; struct ClientData { - unique_ptr<Worker::Stub> stub; + unique_ptr<WorkerService::Stub> stub; unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream; }; } // namespace runsc @@ -131,8 +132,7 @@ std::unique_ptr<ScenarioResult> RunScenario( } int driver_port = grpc_pick_unused_port_or_die(); - int benchmark_port = grpc_pick_unused_port_or_die(); - local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port)); + local_workers.emplace_back(new QpsWorker(driver_port)); char addr[256]; sprintf(addr, "localhost:%d", driver_port); if (spawn_local_worker_count < 0) { @@ -161,11 +161,10 @@ std::unique_ptr<ScenarioResult> RunScenario( // where class contained in std::vector must have a copy constructor auto* servers = new ServerData[num_servers]; for (size_t i = 0; i < num_servers; i++) { - servers[i].stub = - Worker::NewStub(CreateChannel(workers[i], InsecureCredentials())); + servers[i].stub = WorkerService::NewStub( + CreateChannel(workers[i], InsecureCredentials())); ServerArgs args; result_server_config = server_config; - result_server_config.set_host(workers[i]); *args.mutable_setup() = server_config; servers[i].stream = servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline)); @@ -189,14 +188,13 @@ std::unique_ptr<ScenarioResult> RunScenario( // where class contained in std::vector must have a copy constructor auto* clients = new ClientData[num_clients]; for (size_t i = 0; i < num_clients; i++) { - clients[i].stub = Worker::NewStub( + clients[i].stub = WorkerService::NewStub( CreateChannel(workers[i + num_servers], InsecureCredentials())); ClientArgs args; result_client_config = client_config; - result_client_config.set_host(workers[i + num_servers]); *args.mutable_setup() = client_config; clients[i].stream = - clients[i].stub->RunTest(runsc::AllocContext(&contexts, deadline)); + clients[i].stub->RunClient(runsc::AllocContext(&contexts, deadline)); GPR_ASSERT(clients[i].stream->Write(args)); ClientStatus init_status; GPR_ASSERT(clients[i].stream->Read(&init_status)); @@ -211,9 +209,9 @@ std::unique_ptr<ScenarioResult> RunScenario( // Start a run gpr_log(GPR_INFO, "Starting"); ServerArgs server_mark; - server_mark.mutable_mark(); + server_mark.mutable_mark()->set_reset(true); ClientArgs client_mark; - client_mark.mutable_mark(); + client_mark.mutable_mark()->set_reset(true); for (auto server = &servers[0]; server != &servers[num_servers]; server++) { GPR_ASSERT(server->stream->Write(server_mark)); } @@ -251,14 +249,15 @@ std::unique_ptr<ScenarioResult> RunScenario( GPR_ASSERT(server->stream->Read(&server_status)); const auto& stats = server_status.stats(); result->server_resources.emplace_back( - stats.time_elapsed(), stats.time_user(), stats.time_system()); + stats.time_elapsed(), stats.time_user(), stats.time_system(), + server_status.cores()); } for (auto client = &clients[0]; client != &clients[num_clients]; client++) { GPR_ASSERT(client->stream->Read(&client_status)); const auto& stats = client_status.stats(); result->latencies.MergeProto(stats.latencies()); result->client_resources.emplace_back( - stats.time_elapsed(), stats.time_user(), stats.time_system()); + stats.time_elapsed(), stats.time_user(), stats.time_system(), -1); } for (auto client = &clients[0]; client != &clients[num_clients]; client++) { diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 6116aa656a..50bf17ceab 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -37,22 +37,24 @@ #include <memory> #include "test/cpp/qps/histogram.h" -#include "test/proto/qpstest.grpc.pb.h" +#include "test/proto/benchmarks/control.grpc.pb.h" namespace grpc { namespace testing { class ResourceUsage { public: - ResourceUsage(double w, double u, double s) - : wall_time_(w), user_time_(u), system_time_(s) {} + ResourceUsage(double w, double u, double s, int c) + : wall_time_(w), user_time_(u), system_time_(s), cores_(c) {} double wall_time() const { return wall_time_; } double user_time() const { return user_time_; } double system_time() const { return system_time_; } + int cores() const { return cores_; } private: double wall_time_; double user_time_; double system_time_; + int cores_; }; struct ScenarioResult { diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index 1151cca87c..35527d2a2c 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -35,7 +35,7 @@ #define TEST_QPS_HISTOGRAM_H #include <grpc/support/histogram.h> -#include "test/proto/qpstest.grpc.pb.h" +#include "test/proto/benchmarks/stats.grpc.pb.h" namespace grpc { namespace testing { @@ -48,7 +48,7 @@ class Histogram { } Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; } - void Merge(Histogram* h) { gpr_histogram_merge(impl_, h->impl_); } + void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); } void Add(double value) { gpr_histogram_add(impl_, value); } double Percentile(double pctile) const { return gpr_histogram_percentile(impl_, pctile); diff --git a/test/cpp/qps/perf_db.proto b/test/cpp/qps/perf_db.proto index 7ae5cfe86e..8a691ddded 100644 --- a/test/cpp/qps/perf_db.proto +++ b/test/cpp/qps/perf_db.proto @@ -29,7 +29,7 @@ syntax = "proto3"; -import "test/proto/qpstest.proto"; +import "test/proto/benchmarks/control.proto"; package grpc.testing; diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh index cb93201933..36ea974812 100755 --- a/test/cpp/qps/qps-sweep.sh +++ b/test/cpp/qps/qps-sweep.sh @@ -37,17 +37,21 @@ fi bins=`find . .. ../.. ../../.. -name bins | head -1` -for channels in 1 2 4 8 +for secure in true false do - for client in SYNCHRONOUS_CLIENT ASYNC_CLIENT + for channels in 1 2 4 8 do - for server in SYNCHRONOUS_SERVER ASYNC_SERVER + for client in SYNC_CLIENT ASYNC_CLIENT do - for rpc in UNARY STREAMING + for server in SYNC_SERVER ASYNC_SERVER do - echo "Test $rpc $client $server , $channels channels" - "$bins"/opt/qps_driver --rpc_type=$rpc \ - --client_type=$client --server_type=$server + for rpc in UNARY STREAMING + do + echo "Test $rpc $client $server, $channels channels, secure=$secure" + "$bins"/opt/qps_driver --rpc_type=$rpc \ + --client_type=$client --server_type=$server \ + --secure_test=$secure + done done done done diff --git a/test/cpp/qps/qps_driver.cc b/test/cpp/qps/qps_driver.cc index b1463be8f6..4c93a042cf 100644 --- a/test/cpp/qps/qps_driver.cc +++ b/test/cpp/qps/qps_driver.cc @@ -33,7 +33,6 @@ #include <memory> #include <set> -#include <signal.h> #include <gflags/gflags.h> #include <grpc/support/log.h> @@ -50,31 +49,39 @@ DEFINE_int32(benchmark_seconds, 30, "Benchmark time (in seconds)"); DEFINE_int32(local_workers, 0, "Number of local workers to start"); // Common config -DEFINE_bool(enable_ssl, false, "Use SSL"); DEFINE_string(rpc_type, "UNARY", "Type of RPC: UNARY or STREAMING"); // Server config -DEFINE_int32(server_threads, 1, "Number of server threads"); -DEFINE_string(server_type, "SYNCHRONOUS_SERVER", "Server type"); +DEFINE_int32(async_server_threads, 1, "Number of threads for async servers"); +DEFINE_string(server_type, "SYNC_SERVER", "Server type"); // Client config DEFINE_int32(outstanding_rpcs_per_channel, 1, "Number of outstanding rpcs per channel"); DEFINE_int32(client_channels, 1, "Number of client channels"); -DEFINE_int32(payload_size, 1, "Payload size"); -DEFINE_string(client_type, "SYNCHRONOUS_CLIENT", "Client type"); + +DEFINE_int32(simple_req_size, -1, "Simple proto request payload size"); +DEFINE_int32(simple_resp_size, -1, "Simple proto response payload size"); + +DEFINE_string(client_type, "SYNC_CLIENT", "Client type"); DEFINE_int32(async_client_threads, 1, "Async client threads"); -DEFINE_string(load_type, "CLOSED_LOOP", "Load type"); -DEFINE_double(load_param_1, 0.0, "Load parameter 1"); -DEFINE_double(load_param_2, 0.0, "Load parameter 2"); + +DEFINE_double(poisson_load, -1.0, "Poisson offered load (qps)"); +DEFINE_double(uniform_lo, -1.0, "Uniform low interarrival time (us)"); +DEFINE_double(uniform_hi, -1.0, "Uniform high interarrival time (us)"); +DEFINE_double(determ_load, -1.0, "Deterministic offered load (qps)"); +DEFINE_double(pareto_base, -1.0, "Pareto base interarrival time (us)"); +DEFINE_double(pareto_alpha, -1.0, "Pareto alpha value"); + +DEFINE_bool(secure_test, false, "Run a secure test"); using grpc::testing::ClientConfig; using grpc::testing::ServerConfig; using grpc::testing::ClientType; using grpc::testing::ServerType; -using grpc::testing::LoadType; using grpc::testing::RpcType; using grpc::testing::ResourceUsage; +using grpc::testing::SecurityParams; namespace grpc { namespace testing { @@ -85,72 +92,63 @@ static void QpsDriver() { ClientType client_type; ServerType server_type; - LoadType load_type; GPR_ASSERT(ClientType_Parse(FLAGS_client_type, &client_type)); GPR_ASSERT(ServerType_Parse(FLAGS_server_type, &server_type)); - GPR_ASSERT(LoadType_Parse(FLAGS_load_type, &load_type)); ClientConfig client_config; client_config.set_client_type(client_type); - client_config.set_load_type(load_type); - client_config.set_enable_ssl(FLAGS_enable_ssl); client_config.set_outstanding_rpcs_per_channel( FLAGS_outstanding_rpcs_per_channel); client_config.set_client_channels(FLAGS_client_channels); - client_config.set_payload_size(FLAGS_payload_size); + + // Decide which type to use based on the response type + if (FLAGS_simple_resp_size >= 0) { + auto params = + client_config.mutable_payload_config()->mutable_simple_params(); + params->set_resp_size(FLAGS_simple_resp_size); + if (FLAGS_simple_req_size >= 0) { + params->set_req_size(FLAGS_simple_req_size); + } + } else { + // set a reasonable default: proto but no payload + client_config.mutable_payload_config()->mutable_simple_params(); + } + client_config.set_async_client_threads(FLAGS_async_client_threads); client_config.set_rpc_type(rpc_type); // set up the load parameters - switch (load_type) { - case grpc::testing::CLOSED_LOOP: - break; - case grpc::testing::POISSON: { - auto poisson = client_config.mutable_load_params()->mutable_poisson(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - poisson->set_offered_load(FLAGS_load_param_1); - break; - } - case grpc::testing::UNIFORM: { - auto uniform = client_config.mutable_load_params()->mutable_uniform(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - GPR_ASSERT(FLAGS_load_param_2 != 0.0); - uniform->set_interarrival_lo(FLAGS_load_param_1 / 1e6); - uniform->set_interarrival_hi(FLAGS_load_param_2 / 1e6); - break; - } - case grpc::testing::DETERMINISTIC: { - auto determ = client_config.mutable_load_params()->mutable_determ(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - determ->set_offered_load(FLAGS_load_param_1); - break; - } - case grpc::testing::PARETO: { - auto pareto = client_config.mutable_load_params()->mutable_pareto(); - GPR_ASSERT(FLAGS_load_param_1 != 0.0); - GPR_ASSERT(FLAGS_load_param_2 != 0.0); - pareto->set_interarrival_base(FLAGS_load_param_1 / 1e6); - pareto->set_alpha(FLAGS_load_param_2); - break; - } - default: - GPR_ASSERT(false); - break; + if (FLAGS_poisson_load > 0.0) { + auto poisson = client_config.mutable_load_params()->mutable_poisson(); + poisson->set_offered_load(FLAGS_poisson_load); + } else if (FLAGS_uniform_lo > 0.0) { + auto uniform = client_config.mutable_load_params()->mutable_uniform(); + uniform->set_interarrival_lo(FLAGS_uniform_lo / 1e6); + uniform->set_interarrival_hi(FLAGS_uniform_hi / 1e6); + } else if (FLAGS_determ_load > 0.0) { + auto determ = client_config.mutable_load_params()->mutable_determ(); + determ->set_offered_load(FLAGS_determ_load); + } else if (FLAGS_pareto_base > 0.0) { + auto pareto = client_config.mutable_load_params()->mutable_pareto(); + pareto->set_interarrival_base(FLAGS_pareto_base / 1e6); + pareto->set_alpha(FLAGS_pareto_alpha); + } else { + client_config.mutable_load_params()->mutable_closed_loop(); + // No further load parameters to set up for closed loop } ServerConfig server_config; server_config.set_server_type(server_type); - server_config.set_threads(FLAGS_server_threads); - server_config.set_enable_ssl(FLAGS_enable_ssl); - - // If we're running a sync-server streaming test, make sure - // that we have at least as many threads as the active streams - // or else threads will be blocked from forward progress and the - // client will deadlock on a timer. - GPR_ASSERT(!(server_type == grpc::testing::SYNCHRONOUS_SERVER && - rpc_type == grpc::testing::STREAMING && - FLAGS_server_threads < - FLAGS_client_channels * FLAGS_outstanding_rpcs_per_channel)); + server_config.set_async_server_threads(FLAGS_async_server_threads); + + if (FLAGS_secure_test) { + // Set up security params + SecurityParams security; + security.set_use_test_ca(true); + security.set_server_host_override("foo.test.google.fr"); + client_config.mutable_security_params()->CopyFrom(security); + server_config.mutable_security_params()->CopyFrom(security); + } const auto result = RunScenario( client_config, FLAGS_num_clients, server_config, FLAGS_num_servers, @@ -168,7 +166,6 @@ static void QpsDriver() { int main(int argc, char** argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - signal(SIGPIPE, SIG_IGN); grpc::testing::QpsDriver(); return 0; diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc index a7979e6187..ccda28f09a 100644 --- a/test/cpp/qps/qps_interarrival_test.cc +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -42,7 +42,7 @@ using grpc::testing::RandomDist; using grpc::testing::InterarrivalTimer; -void RunTest(RandomDist &&r, int threads, std::string title) { +static void RunTest(RandomDist &&r, int threads, std::string title) { InterarrivalTimer timer; timer.init(r, threads); gpr_histogram *h(gpr_histogram_create(0.01, 60e9)); diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index 5a6a9249a9..dc88c893bb 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -31,8 +31,6 @@ * */ -#include <signal.h> - #include <set> #include <grpc/support/log.h> @@ -52,20 +50,16 @@ static void RunQPS() { ClientConfig client_config; client_config.set_client_type(ASYNC_CLIENT); - client_config.set_enable_ssl(false); client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_client_channels(8); - client_config.set_payload_size(1); client_config.set_async_client_threads(8); client_config.set_rpc_type(UNARY); - client_config.set_load_type(POISSON); client_config.mutable_load_params()->mutable_poisson()->set_offered_load( 1000.0); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(4); + server_config.set_async_server_threads(4); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); @@ -80,7 +74,6 @@ static void RunQPS() { int main(int argc, char** argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - signal(SIGPIPE, SIG_IGN); grpc::testing::RunQPS(); return 0; diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc index d0c4a79cd9..89b35cfb05 100644 --- a/test/cpp/qps/qps_test.cc +++ b/test/cpp/qps/qps_test.cc @@ -31,8 +31,6 @@ * */ -#include <signal.h> - #include <set> #include <grpc/support/log.h> @@ -52,17 +50,15 @@ static void RunQPS() { ClientConfig client_config; client_config.set_client_type(ASYNC_CLIENT); - client_config.set_enable_ssl(false); client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_client_channels(8); - client_config.set_payload_size(1); client_config.set_async_client_threads(8); client_config.set_rpc_type(UNARY); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(8); + server_config.set_async_server_threads(8); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); @@ -77,7 +73,6 @@ static void RunQPS() { int main(int argc, char** argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - signal(SIGPIPE, SIG_IGN); grpc::testing::RunQPS(); return 0; diff --git a/test/cpp/qps/qps_test_with_poll.cc b/test/cpp/qps/qps_test_with_poll.cc index 31d2c1bf7b..97da4096ed 100644 --- a/test/cpp/qps/qps_test_with_poll.cc +++ b/test/cpp/qps/qps_test_with_poll.cc @@ -31,8 +31,6 @@ * */ -#include <signal.h> - #include <set> #include <grpc/support/log.h> @@ -56,17 +54,15 @@ static void RunQPS() { ClientConfig client_config; client_config.set_client_type(ASYNC_CLIENT); - client_config.set_enable_ssl(false); client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_client_channels(8); - client_config.set_payload_size(1); client_config.set_async_client_threads(8); client_config.set_rpc_type(UNARY); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; server_config.set_server_type(ASYNC_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(4); + server_config.set_async_server_threads(4); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); @@ -83,7 +79,6 @@ int main(int argc, char** argv) { grpc_platform_become_multipoller = grpc_poll_become_multipoller; - signal(SIGPIPE, SIG_IGN); grpc::testing::RunQPS(); return 0; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index 4ce77f366d..dc59eab7ef 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -52,17 +52,17 @@ #include <grpc++/security/server_credentials.h> #include "test/core/util/grpc_profiler.h" -#include "test/proto/qpstest.pb.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/server.h" #include "test/cpp/util/create_test_channel.h" +#include "test/proto/benchmarks/services.pb.h" namespace grpc { namespace testing { -std::unique_ptr<Client> CreateClient(const ClientConfig& config) { +static std::unique_ptr<Client> CreateClient(const ClientConfig& config) { switch (config.client_type()) { - case ClientType::SYNCHRONOUS_CLIENT: + case ClientType::SYNC_CLIENT: return (config.rpc_type() == RpcType::UNARY) ? CreateSynchronousUnaryClient(config) : CreateSynchronousStreamingClient(config); @@ -76,26 +76,29 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) { abort(); } -std::unique_ptr<Server> CreateServer(const ServerConfig& config, - int server_port) { +static void LimitCores(int cores) {} + +static std::unique_ptr<Server> CreateServer(const ServerConfig& config) { + if (config.core_limit() > 0) { + LimitCores(config.core_limit()); + } switch (config.server_type()) { - case ServerType::SYNCHRONOUS_SERVER: - return CreateSynchronousServer(config, server_port); + case ServerType::SYNC_SERVER: + return CreateSynchronousServer(config); case ServerType::ASYNC_SERVER: - return CreateAsyncServer(config, server_port); + return CreateAsyncServer(config); default: abort(); } abort(); } -class WorkerImpl GRPC_FINAL : public Worker::Service { +class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { public: - explicit WorkerImpl(int server_port) - : server_port_(server_port), acquired_(false) {} + explicit WorkerServiceImpl() : acquired_(false) {} - Status RunTest(ServerContext* ctx, - ServerReaderWriter<ClientStatus, ClientArgs>* stream) + Status RunClient(ServerContext* ctx, + ServerReaderWriter<ClientStatus, ClientArgs>* stream) GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { @@ -103,7 +106,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { } grpc_profiler_start("qps_client.prof"); - Status ret = RunTestBody(ctx, stream); + Status ret = RunClientBody(ctx, stream); grpc_profiler_stop(); return ret; } @@ -126,7 +129,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { // Protect against multiple clients using this worker at once. class InstanceGuard { public: - InstanceGuard(WorkerImpl* impl) + InstanceGuard(WorkerServiceImpl* impl) : impl_(impl), acquired_(impl->TryAcquireInstance()) {} ~InstanceGuard() { if (acquired_) { @@ -137,7 +140,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { bool Acquired() const { return acquired_; } private: - WorkerImpl* const impl_; + WorkerServiceImpl* const impl_; const bool acquired_; }; @@ -154,8 +157,8 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { acquired_ = false; } - Status RunTestBody(ServerContext* ctx, - ServerReaderWriter<ClientStatus, ClientArgs>* stream) { + Status RunClientBody(ServerContext* ctx, + ServerReaderWriter<ClientStatus, ClientArgs>* stream) { ClientArgs args; if (!stream->Read(&args)) { return Status(StatusCode::INVALID_ARGUMENT, ""); @@ -175,7 +178,7 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { if (!args.has_mark()) { return Status(StatusCode::INVALID_ARGUMENT, ""); } - *status.mutable_stats() = client->Mark(); + *status.mutable_stats() = client->Mark(args.mark().reset()); stream->Write(status); } @@ -191,12 +194,13 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { if (!args.has_setup()) { return Status(StatusCode::INVALID_ARGUMENT, ""); } - auto server = CreateServer(args.setup(), server_port_); + auto server = CreateServer(args.setup()); if (!server) { return Status(StatusCode::INVALID_ARGUMENT, ""); } ServerStatus status; - status.set_port(server_port_); + status.set_port(server->port()); + status.set_cores(server->cores()); if (!stream->Write(status)) { return Status(StatusCode::UNKNOWN, ""); } @@ -204,21 +208,19 @@ class WorkerImpl GRPC_FINAL : public Worker::Service { if (!args.has_mark()) { return Status(StatusCode::INVALID_ARGUMENT, ""); } - *status.mutable_stats() = server->Mark(); + *status.mutable_stats() = server->Mark(args.mark().reset()); stream->Write(status); } return Status::OK; } - const int server_port_; - std::mutex mu_; bool acquired_; }; -QpsWorker::QpsWorker(int driver_port, int server_port) { - impl_.reset(new WorkerImpl(server_port)); +QpsWorker::QpsWorker(int driver_port) { + impl_.reset(new WorkerServiceImpl()); char* server_address = NULL; gpr_join_host_port(&server_address, "::", driver_port); diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h index 861588907e..0db88ad3d1 100644 --- a/test/cpp/qps/qps_worker.h +++ b/test/cpp/qps/qps_worker.h @@ -42,15 +42,15 @@ class Server; namespace testing { -class WorkerImpl; +class WorkerServiceImpl; class QpsWorker { public: - QpsWorker(int driver_port, int server_port); + explicit QpsWorker(int driver_port); ~QpsWorker(); private: - std::unique_ptr<WorkerImpl> impl_; + std::unique_ptr<WorkerServiceImpl> impl_; std::unique_ptr<Server> server_; }; diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc index e03e8e1fb0..b230eb441e 100644 --- a/test/cpp/qps/report.cc +++ b/test/cpp/qps/report.cc @@ -43,6 +43,7 @@ namespace testing { static double WallTime(ResourceUsage u) { return u.wall_time(); } static double UserTime(ResourceUsage u) { return u.user_time(); } static double SystemTime(ResourceUsage u) { return u.system_time(); } +static int Cores(ResourceUsage u) { return u.cores(); } void CompositeReporter::add(std::unique_ptr<Reporter> reporter) { reporters_.emplace_back(std::move(reporter)); @@ -83,7 +84,7 @@ void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) { result.latencies.Count() / average(result.client_resources, WallTime); gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps, - qps / result.server_config.threads()); + qps / sum(result.server_resources, Cores)); } void GprLogReporter::ReportLatency(const ScenarioResult& result) { @@ -123,10 +124,10 @@ void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) { auto qps = result.latencies.Count() / average(result.client_resources, WallTime); - auto qpsPerCore = qps / result.server_config.threads(); + auto qps_per_core = qps / sum(result.server_resources, Cores); perf_db_client_.setQps(qps); - perf_db_client_.setQpsPerCore(qpsPerCore); + perf_db_client_.setQpsPerCore(qps_per_core); perf_db_client_.setConfigs(result.client_config, result.server_config); } diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h index 00d12369d5..78779231d3 100644 --- a/test/cpp/qps/report.h +++ b/test/cpp/qps/report.h @@ -41,7 +41,6 @@ #include <grpc++/support/config.h> #include "test/cpp/qps/driver.h" -#include "test/proto/qpstest.grpc.pb.h" #include "test/cpp/qps/perf_db_client.h" namespace grpc { diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc new file mode 100644 index 0000000000..df06f7e471 --- /dev/null +++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc @@ -0,0 +1,84 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <set> + +#include <grpc/support/log.h> + +#include "test/cpp/qps/driver.h" +#include "test/cpp/qps/report.h" +#include "test/cpp/util/benchmark_config.h" + +namespace grpc { +namespace testing { + +static const int WARMUP = 5; +static const int BENCHMARK = 10; + +static void RunSynchronousUnaryPingPong() { + gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong"); + + ClientConfig client_config; + client_config.set_client_type(SYNC_CLIENT); + client_config.set_outstanding_rpcs_per_channel(1); + client_config.set_client_channels(1); + client_config.set_rpc_type(UNARY); + client_config.mutable_load_params()->mutable_closed_loop(); + + ServerConfig server_config; + server_config.set_server_type(SYNC_SERVER); + + // Set up security params + SecurityParams security; + security.set_use_test_ca(true); + security.set_server_host_override("foo.test.google.fr"); + client_config.mutable_security_params()->CopyFrom(security); + server_config.mutable_security_params()->CopyFrom(security); + + const auto result = + RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); + + GetReporter()->ReportQPS(*result); + GetReporter()->ReportLatency(*result); +} + +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::InitBenchmark(&argc, &argv, true); + + grpc::testing::RunSynchronousUnaryPingPong(); + + return 0; +} diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index e48e873dc3..6e81edc8ff 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -34,22 +34,38 @@ #ifndef TEST_QPS_SERVER_H #define TEST_QPS_SERVER_H +#include <grpc/support/cpu.h> +#include <grpc++/security/server_credentials.h> + +#include "test/core/end2end/data/ssl_test_data.h" +#include "test/core/util/port.h" #include "test/cpp/qps/timer.h" -#include "test/proto/qpstest.grpc.pb.h" +#include "test/proto/messages.grpc.pb.h" +#include "test/proto/benchmarks/control.grpc.pb.h" namespace grpc { namespace testing { class Server { public: - Server() : timer_(new Timer) {} + explicit Server(const ServerConfig& config) : timer_(new Timer) { + if (config.port()) { + port_ = config.port(); + } else { + port_ = grpc_pick_unused_port_or_die(); + } + } virtual ~Server() {} - ServerStats Mark() { - std::unique_ptr<Timer> timer(new Timer); - timer.swap(timer_); - - auto timer_result = timer->Mark(); + ServerStats Mark(bool reset) { + Timer::Result timer_result; + if (reset) { + std::unique_ptr<Timer> timer(new Timer); + timer.swap(timer_); + timer_result = timer->Mark(); + } else { + timer_result = timer_->Mark(); + } ServerStats stats; stats.set_time_elapsed(timer_result.wall); @@ -70,13 +86,29 @@ class Server { return true; } + int port() const { return port_; } + int cores() const { return gpr_cpu_num_cores(); } + static std::shared_ptr<ServerCredentials> CreateServerCredentials( + const ServerConfig& config) { + if (config.has_security_params()) { + SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key, + test_server1_cert}; + SslServerCredentialsOptions ssl_opts; + ssl_opts.pem_root_certs = ""; + ssl_opts.pem_key_cert_pairs.push_back(pkcp); + return SslServerCredentials(ssl_opts); + } else { + return InsecureServerCredentials(); + } + } + private: + int port_; std::unique_ptr<Timer> timer_; }; -std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config, - int port); -std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config, int port); +std::unique_ptr<Server> CreateSynchronousServer(const ServerConfig& config); +std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 98fa9c53e2..2d922fa615 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -49,38 +49,40 @@ #include <grpc++/security/server_credentials.h> #include <gtest/gtest.h> -#include "test/proto/qpstest.grpc.pb.h" #include "test/cpp/qps/server.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig &config, int port) { + explicit AsyncQpsServerTest(const ServerConfig &config) : Server(config) { char *server_address = NULL; - gpr_join_host_port(&server_address, "::", port); + + gpr_join_host_port(&server_address, "::", port()); ServerBuilder builder; - builder.AddListeningPort(server_address, InsecureServerCredentials()); + builder.AddListeningPort(server_address, + Server::CreateServerCredentials(config)); gpr_free(server_address); builder.RegisterAsyncService(&async_service_); - for (int i = 0; i < config.threads(); i++) { + for (int i = 0; i < config.async_server_threads(); i++) { srv_cqs_.emplace_back(builder.AddCompletionQueue()); } server_ = builder.BuildAndStart(); using namespace std::placeholders; - for (int i = 0; i < 10000 / config.threads(); i++) { - for (int j = 0; j < config.threads(); j++) { + for (int i = 0; i < 10000 / config.async_server_threads(); i++) { + for (int j = 0; j < config.async_server_threads(); j++) { auto request_unary = std::bind( - &TestService::AsyncService::RequestUnaryCall, &async_service_, _1, - _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); + &BenchmarkService::AsyncService::RequestUnaryCall, &async_service_, + _1, _2, _3, srv_cqs_[j].get(), srv_cqs_[j].get(), _4); auto request_streaming = std::bind( - &TestService::AsyncService::RequestStreamingCall, &async_service_, - _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); + &BenchmarkService::AsyncService::RequestStreamingCall, + &async_service_, _1, _2, srv_cqs_[j].get(), srv_cqs_[j].get(), _3); contexts_.push_front( new ServerRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( request_unary, ProcessRPC)); @@ -89,10 +91,10 @@ class AsyncQpsServerTest : public Server { request_streaming, ProcessRPC)); } } - for (int i = 0; i < config.threads(); i++) { + for (int i = 0; i < config.async_server_threads(); i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); } - for (int i = 0; i < config.threads(); i++) { + for (int i = 0; i < config.async_server_threads(); i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } @@ -309,7 +311,7 @@ class AsyncQpsServerTest : public Server { std::vector<std::thread> threads_; std::unique_ptr<grpc::Server> server_; std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_; - TestService::AsyncService async_service_; + BenchmarkService::AsyncService async_service_; std::forward_list<ServerRpcContext *> contexts_; class PerThreadShutdownState { @@ -333,9 +335,8 @@ class AsyncQpsServerTest : public Server { std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; -std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, - int port) { - return std::unique_ptr<Server>(new AsyncQpsServerTest(config, port)); +std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config) { + return std::unique_ptr<Server>(new AsyncQpsServerTest(config)); } } // namespace testing diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index b760ef63ec..a09b174b7e 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -43,14 +43,14 @@ #include <grpc++/server_context.h> #include <grpc++/security/server_credentials.h> -#include "test/proto/qpstest.grpc.pb.h" #include "test/cpp/qps/server.h" #include "test/cpp/qps/timer.h" +#include "test/proto/benchmarks/services.grpc.pb.h" namespace grpc { namespace testing { -class TestServiceImpl GRPC_FINAL : public TestService::Service { +class BenchmarkServiceImpl GRPC_FINAL : public BenchmarkService::Service { public: Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) GRPC_OVERRIDE { @@ -84,30 +84,29 @@ class TestServiceImpl GRPC_FINAL : public TestService::Service { class SynchronousServer GRPC_FINAL : public grpc::testing::Server { public: - SynchronousServer(const ServerConfig& config, int port) - : impl_(MakeImpl(port)) {} - - private: - std::unique_ptr<grpc::Server> MakeImpl(int port) { + explicit SynchronousServer(const ServerConfig& config) : Server(config) { ServerBuilder builder; char* server_address = NULL; - gpr_join_host_port(&server_address, "::", port); - builder.AddListeningPort(server_address, InsecureServerCredentials()); + + gpr_join_host_port(&server_address, "::", port()); + builder.AddListeningPort(server_address, + Server::CreateServerCredentials(config)); gpr_free(server_address); builder.RegisterService(&service_); - return builder.BuildAndStart(); + impl_ = builder.BuildAndStart(); } - TestServiceImpl service_; + private: + BenchmarkServiceImpl service_; std::unique_ptr<grpc::Server> impl_; }; std::unique_ptr<grpc::testing::Server> CreateSynchronousServer( - const ServerConfig& config, int port) { - return std::unique_ptr<Server>(new SynchronousServer(config, port)); + const ServerConfig& config) { + return std::unique_ptr<Server>(new SynchronousServer(config)); } } // namespace testing diff --git a/test/cpp/qps/single_run_localhost.sh b/test/cpp/qps/single_run_localhost.sh index 9d76f08f80..f5356f1834 100755 --- a/test/cpp/qps/single_run_localhost.sh +++ b/test/cpp/qps/single_run_localhost.sh @@ -42,9 +42,9 @@ NUMCPUS=`python2.7 -c 'import multiprocessing; print multiprocessing.cpu_count() make CONFIG=$config qps_worker qps_driver -j$NUMCPUS -bins/$config/qps_worker -driver_port 10000 -server_port 10001 & +bins/$config/qps_worker -driver_port 10000 & PID1=$! -bins/$config/qps_worker -driver_port 10010 -server_port 10011 & +bins/$config/qps_worker -driver_port 10010 & PID2=$! export QPS_WORKERS="localhost:10000,localhost:10010" diff --git a/test/cpp/qps/sync_streaming_ping_pong_test.cc b/test/cpp/qps/sync_streaming_ping_pong_test.cc index 52e43939a8..186afc03f7 100644 --- a/test/cpp/qps/sync_streaming_ping_pong_test.cc +++ b/test/cpp/qps/sync_streaming_ping_pong_test.cc @@ -31,8 +31,6 @@ * */ -#include <signal.h> - #include <set> #include <grpc/support/log.h> @@ -51,17 +49,14 @@ static void RunSynchronousStreamingPingPong() { gpr_log(GPR_INFO, "Running Synchronous Streaming Ping Pong"); ClientConfig client_config; - client_config.set_client_type(SYNCHRONOUS_CLIENT); - client_config.set_enable_ssl(false); + client_config.set_client_type(SYNC_CLIENT); client_config.set_outstanding_rpcs_per_channel(1); client_config.set_client_channels(1); - client_config.set_payload_size(1); client_config.set_rpc_type(STREAMING); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; - server_config.set_server_type(SYNCHRONOUS_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(1); + server_config.set_server_type(SYNC_SERVER); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); @@ -75,7 +70,6 @@ static void RunSynchronousStreamingPingPong() { int main(int argc, char** argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - signal(SIGPIPE, SIG_IGN); grpc::testing::RunSynchronousStreamingPingPong(); return 0; diff --git a/test/cpp/qps/sync_unary_ping_pong_test.cc b/test/cpp/qps/sync_unary_ping_pong_test.cc index fbd21357aa..25851833a6 100644 --- a/test/cpp/qps/sync_unary_ping_pong_test.cc +++ b/test/cpp/qps/sync_unary_ping_pong_test.cc @@ -31,8 +31,6 @@ * */ -#include <signal.h> - #include <set> #include <grpc/support/log.h> @@ -51,17 +49,14 @@ static void RunSynchronousUnaryPingPong() { gpr_log(GPR_INFO, "Running Synchronous Unary Ping Pong"); ClientConfig client_config; - client_config.set_client_type(SYNCHRONOUS_CLIENT); - client_config.set_enable_ssl(false); + client_config.set_client_type(SYNC_CLIENT); client_config.set_outstanding_rpcs_per_channel(1); client_config.set_client_channels(1); - client_config.set_payload_size(1); client_config.set_rpc_type(UNARY); + client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; - server_config.set_server_type(SYNCHRONOUS_SERVER); - server_config.set_enable_ssl(false); - server_config.set_threads(1); + server_config.set_server_type(SYNC_SERVER); const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2); @@ -76,7 +71,6 @@ static void RunSynchronousUnaryPingPong() { int main(int argc, char** argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - signal(SIGPIPE, SIG_IGN); grpc::testing::RunSynchronousUnaryPingPong(); return 0; diff --git a/test/cpp/qps/timer.cc b/test/cpp/qps/timer.cc index 8edb838da3..3ec7f49f83 100644 --- a/test/cpp/qps/timer.cc +++ b/test/cpp/qps/timer.cc @@ -61,7 +61,7 @@ Timer::Result Timer::Sample() { return r; } -Timer::Result Timer::Mark() { +Timer::Result Timer::Mark() const { Result s = Sample(); Result r; r.wall = s.wall - start_.wall; diff --git a/test/cpp/qps/timer.h b/test/cpp/qps/timer.h index 30dbd7e7d5..d1aee1a9d1 100644 --- a/test/cpp/qps/timer.h +++ b/test/cpp/qps/timer.h @@ -44,7 +44,7 @@ class Timer { double system; }; - Result Mark(); + Result Mark() const; static double Now(); diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc index 935e4853a6..430ffb7cdc 100644 --- a/test/cpp/qps/worker.cc +++ b/test/cpp/qps/worker.cc @@ -43,8 +43,7 @@ #include "test/cpp/qps/qps_worker.h" #include "test/cpp/util/test_config.h" -DEFINE_int32(driver_port, 0, "Driver server port."); -DEFINE_int32(server_port, 0, "Spawned server port."); +DEFINE_int32(driver_port, 0, "Port for communication with driver"); static bool got_sigint = false; @@ -54,7 +53,7 @@ namespace grpc { namespace testing { static void RunServer() { - QpsWorker worker(FLAGS_driver_port, FLAGS_server_port); + QpsWorker worker(FLAGS_driver_port); while (!got_sigint) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), diff --git a/test/proto/qpstest.proto b/test/proto/benchmarks/control.proto index ef1f9451e9..966ab78baa 100644 --- a/test/proto/qpstest.proto +++ b/test/proto/benchmarks/control.proto @@ -1,4 +1,3 @@ - // Copyright 2015, Google Inc. // All rights reserved. // @@ -28,62 +27,20 @@ // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// An integration test service that covers all the method signature permutations -// of unary/streaming requests/responses. syntax = "proto3"; -package grpc.testing; - -enum PayloadType { - // Compressable text format. - COMPRESSABLE = 0; - - // Uncompressable binary format. - UNCOMPRESSABLE = 1; - - // Randomly chosen from all other formats defined in this enum. - RANDOM = 2; -} - -message StatsRequest { - // run number - int32 test_num = 1; -} - -message ServerStats { - // wall clock time - double time_elapsed = 1; - - // user time used by the server process and threads - double time_user = 2; - - // server time used by the server process and all threads - double time_system = 3; -} - -message Payload { - // The type of data in body. - PayloadType type = 1; - // Primary contents of payload. - bytes body = 2; -} +import "test/proto/benchmarks/payloads.proto"; +import "test/proto/benchmarks/stats.proto"; -message HistogramData { - repeated uint32 bucket = 1; - double min_seen = 2; - double max_seen = 3; - double sum = 4; - double sum_of_squares = 5; - double count = 6; -} +package grpc.testing; enum ClientType { - SYNCHRONOUS_CLIENT = 0; + SYNC_CLIENT = 0; ASYNC_CLIENT = 1; } enum ServerType { - SYNCHRONOUS_SERVER = 0; + SYNC_SERVER = 0; ASYNC_SERVER = 1; } @@ -92,14 +49,6 @@ enum RpcType { STREAMING = 1; } -enum LoadType { - CLOSED_LOOP = 0; - POISSON = 1; - UNIFORM = 2; - DETERMINISTIC = 3; - PARETO = 4; -} - message PoissonParams { double offered_load = 1; } @@ -118,32 +67,45 @@ message ParetoParams { double alpha = 2; } +message ClosedLoopParams { +} + message LoadParams { oneof load { - PoissonParams poisson = 1; - UniformParams uniform = 2; - DeterministicParams determ = 3; - ParetoParams pareto = 4; + ClosedLoopParams closed_loop = 1; + PoissonParams poisson = 2; + UniformParams uniform = 3; + DeterministicParams determ = 4; + ParetoParams pareto = 5; }; } +// presence of SecurityParams implies use of TLS +message SecurityParams { + bool use_test_ca = 1; + string server_host_override = 2; +} + message ClientConfig { repeated string server_targets = 1; ClientType client_type = 2; - bool enable_ssl = 3; + SecurityParams security_params = 3; int32 outstanding_rpcs_per_channel = 4; int32 client_channels = 5; - int32 payload_size = 6; // only for async client: int32 async_client_threads = 7; RpcType rpc_type = 8; - string host = 9; - LoadType load_type = 10; - LoadParams load_params = 11; + LoadParams load_params = 10; + PayloadConfig payload_config = 11; +} + +message ClientStatus { + ClientStats stats = 1; } // Request current stats message Mark { + bool reset = 1; } message ClientArgs { @@ -153,22 +115,15 @@ message ClientArgs { } } -message ClientStats { - HistogramData latencies = 1; - double time_elapsed = 2; - double time_user = 3; - double time_system = 4; -} - -message ClientStatus { - ClientStats stats = 1; -} - message ServerConfig { ServerType server_type = 1; - int32 threads = 2; - bool enable_ssl = 3; - string host = 4; + SecurityParams security_params = 2; + int32 port = 4; + // only for async server + int32 async_server_threads = 7; + // restrict core usage + int32 core_limit = 8; + PayloadConfig payload_config = 9; } message ServerArgs { @@ -181,38 +136,5 @@ message ServerArgs { message ServerStatus { ServerStats stats = 1; int32 port = 2; -} - -message SimpleRequest { - // Desired payload type in the response from the server. - // If response_type is RANDOM, server randomly chooses one from other formats. - PayloadType response_type = 1; - - // Desired payload size in the response from the server. - // If response_type is COMPRESSABLE, this denotes the size before compression. - int32 response_size = 2; - - // Optional input payload sent along with the request. - Payload payload = 3; -} - -message SimpleResponse { - Payload payload = 1; -} - -service TestService { - // One request followed by one response. - // The server returns the client payload as-is. - rpc UnaryCall(SimpleRequest) returns (SimpleResponse); - - // One request followed by one response. - // The server returns the client payload as-is. - rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse); -} - -service Worker { - // Start test with specified workload - rpc RunTest(stream ClientArgs) returns (stream ClientStatus); - // Start test with specified workload - rpc RunServer(stream ServerArgs) returns (stream ServerStatus); + int32 cores = 3; } diff --git a/test/proto/benchmarks/payloads.proto b/test/proto/benchmarks/payloads.proto new file mode 100644 index 0000000000..7e5b2c61ff --- /dev/null +++ b/test/proto/benchmarks/payloads.proto @@ -0,0 +1,55 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package grpc.testing; + +message ByteBufferParams { + int32 req_size = 1; + int32 resp_size = 2; +} + +message SimpleProtoParams { + int32 req_size = 1; + int32 resp_size = 2; +} + +message ComplexProtoParams { + // TODO (vpai): Fill this in once the details of complex, representative + // protos are decided +} + +message PayloadConfig { + oneof payload { + ByteBufferParams bytebuf_params = 1; + SimpleProtoParams simple_params = 2; + ComplexProtoParams complex_params = 3; + } +} diff --git a/test/proto/benchmarks/services.proto b/test/proto/benchmarks/services.proto new file mode 100644 index 0000000000..4c2cbabdf8 --- /dev/null +++ b/test/proto/benchmarks/services.proto @@ -0,0 +1,55 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +syntax = "proto3"; + +import "test/proto/messages.proto"; +import "test/proto/benchmarks/control.proto"; + +package grpc.testing; + +service BenchmarkService { + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by one response. + // The server returns the client payload as-is. + rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse); +} + +service WorkerService { + // Start server with specified workload + rpc RunServer(stream ServerArgs) returns (stream ServerStatus); + + // Start client with specified workload + rpc RunClient(stream ClientArgs) returns (stream ClientStatus); +} diff --git a/test/proto/benchmarks/stats.proto b/test/proto/benchmarks/stats.proto new file mode 100644 index 0000000000..d52144f321 --- /dev/null +++ b/test/proto/benchmarks/stats.proto @@ -0,0 +1,59 @@ +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package grpc.testing; + +message ServerStats { + // wall clock time + double time_elapsed = 1; + + // user time used by the server process and threads + double time_user = 2; + + // server time used by the server process and all threads + double time_system = 3; +} + +message HistogramData { + repeated uint32 bucket = 1; + double min_seen = 2; + double max_seen = 3; + double sum = 4; + double sum_of_squares = 5; + double count = 6; +} + +message ClientStats { + HistogramData latencies = 1; + double time_elapsed = 2; + double time_user = 3; + double time_system = 4; +} |