diff options
author | 2017-12-04 12:14:18 -0800 | |
---|---|---|
committer | 2017-12-04 12:14:18 -0800 | |
commit | 9a434371e2f582675880239c1d0d5d85081761d8 (patch) | |
tree | 935e23c511274413add36f5c175a6c2bee8afe1d /test | |
parent | 4ca35636fe3c5d1e936d4cc03d18efb4be2824b8 (diff) | |
parent | c3b1e55a3c6af7c39ed1a6d7dea3463ba6194449 (diff) |
Merge branch 'master' into rq-fix
Diffstat (limited to 'test')
22 files changed, 828 insertions, 329 deletions
diff --git a/test/core/fling/client.cc b/test/core/fling/client.cc index 544b66d480..69fb6dc7c7 100644 --- a/test/core/fling/client.cc +++ b/test/core/fling/client.cc @@ -22,15 +22,15 @@ #include <string.h> #include <grpc/support/cmdline.h> -#include <grpc/support/histogram.h> #include <grpc/support/log.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/lib/profiling/timers.h" #include "test/core/util/grpc_profiler.h" +#include "test/core/util/histogram.h" #include "test/core/util/test_config.h" -static gpr_histogram* histogram; +static grpc_histogram* histogram; static grpc_byte_buffer* the_buffer; static grpc_channel* channel; static grpc_completion_queue* cq; @@ -195,7 +195,7 @@ int main(int argc, char** argv) { channel = grpc_insecure_channel_create(target, nullptr, nullptr); cq = grpc_completion_queue_create_for_next(nullptr); the_buffer = grpc_raw_byte_buffer_create(&slice, (size_t)payload_size); - histogram = gpr_histogram_create(0.01, 60e9); + histogram = grpc_histogram_create(0.01, 60e9); sc.init(); @@ -213,7 +213,7 @@ int main(int argc, char** argv) { start = now(); sc.do_one_step(); stop = now(); - gpr_histogram_add(histogram, stop - start); + grpc_histogram_add(histogram, stop - start); } grpc_profiler_stop(); @@ -232,11 +232,11 @@ int main(int argc, char** argv) { grpc_slice_unref(slice); gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f", - gpr_histogram_percentile(histogram, 50), - gpr_histogram_percentile(histogram, 95), - gpr_histogram_percentile(histogram, 99), - gpr_histogram_percentile(histogram, 99.9)); - gpr_histogram_destroy(histogram); + grpc_histogram_percentile(histogram, 50), + grpc_histogram_percentile(histogram, 95), + grpc_histogram_percentile(histogram, 99), + grpc_histogram_percentile(histogram, 99.9)); + grpc_histogram_destroy(histogram); grpc_shutdown(); diff --git a/test/core/network_benchmarks/low_level_ping_pong.cc b/test/core/network_benchmarks/low_level_ping_pong.cc index 687395d916..2ae9a45d7c 100644 --- a/test/core/network_benchmarks/low_level_ping_pong.cc +++ b/test/core/network_benchmarks/low_level_ping_pong.cc @@ -36,13 +36,13 @@ #include <grpc/support/alloc.h> #include <grpc/support/cmdline.h> -#include <grpc/support/histogram.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/socket_utils_posix.h" +#include "test/core/util/histogram.h" typedef struct fd_pair { int read_fd; @@ -275,14 +275,14 @@ static void server_thread_wrap(void* arg) { server_thread(args); } -static void print_histogram(gpr_histogram* histogram) { +static void print_histogram(grpc_histogram* histogram) { /* TODO(klempner): Print more detailed information, such as detailed histogram buckets */ gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f", - gpr_histogram_percentile(histogram, 50), - gpr_histogram_percentile(histogram, 95), - gpr_histogram_percentile(histogram, 99), - gpr_histogram_percentile(histogram, 99.9)); + grpc_histogram_percentile(histogram, 50), + grpc_histogram_percentile(histogram, 95), + grpc_histogram_percentile(histogram, 99), + grpc_histogram_percentile(histogram, 99.9)); } static double now(void) { @@ -293,7 +293,7 @@ static double now(void) { static void client_thread(thread_args* args) { char* buf = static_cast<char*>(gpr_malloc(args->msg_size * sizeof(char))); memset(buf, 0, args->msg_size * sizeof(char)); - gpr_histogram* histogram = gpr_histogram_create(0.01, 60e9); + grpc_histogram* histogram = grpc_histogram_create(0.01, 60e9); double start_time; double end_time; double interval; @@ -316,13 +316,13 @@ static void client_thread(thread_args* args) { end_time = now(); if (i > kNumIters / 2) { interval = end_time - start_time; - gpr_histogram_add(histogram, interval); + grpc_histogram_add(histogram, interval); } } print_histogram(histogram); error: gpr_free(buf); - gpr_histogram_destroy(histogram); + grpc_histogram_destroy(histogram); } /* This roughly matches tcp_server's create_listening_socket */ diff --git a/test/core/support/BUILD b/test/core/support/BUILD index 32b64d4b8e..4372b49b54 100644 --- a/test/core/support/BUILD +++ b/test/core/support/BUILD @@ -69,16 +69,6 @@ grpc_cc_test( ) grpc_cc_test( - name = "histogram_test", - srcs = ["histogram_test.cc"], - language = "C++", - deps = [ - "//:gpr", - "//test/core/util:gpr_test_util", - ], -) - -grpc_cc_test( name = "host_port_test", srcs = ["host_port_test.cc"], language = "C++", diff --git a/test/core/support/histogram_test.cc b/test/core/support/histogram_test.cc deleted file mode 100644 index 86b7d599e6..0000000000 --- a/test/core/support/histogram_test.cc +++ /dev/null @@ -1,163 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/histogram.h> -#include <grpc/support/log.h> - -#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x); - -static void test_no_op(void) { - gpr_histogram_destroy(gpr_histogram_create(0.01, 60e9)); -} - -static void expect_percentile(gpr_histogram* h, double percentile, - double min_expect, double max_expect) { - double got = gpr_histogram_percentile(h, percentile); - gpr_log(GPR_INFO, "@%f%%, expect %f <= %f <= %f", percentile, min_expect, got, - max_expect); - GPR_ASSERT(min_expect <= got); - GPR_ASSERT(got <= max_expect); -} - -static void test_simple(void) { - gpr_histogram* h; - - LOG_TEST("test_simple"); - - h = gpr_histogram_create(0.01, 60e9); - gpr_histogram_add(h, 10000); - gpr_histogram_add(h, 10000); - gpr_histogram_add(h, 11000); - gpr_histogram_add(h, 11000); - - expect_percentile(h, 50, 10001, 10999); - GPR_ASSERT(gpr_histogram_mean(h) == 10500); - - gpr_histogram_destroy(h); -} - -static void test_percentile(void) { - gpr_histogram* h; - double last; - double i; - double cur; - - LOG_TEST("test_percentile"); - - h = gpr_histogram_create(0.05, 1e9); - gpr_histogram_add(h, 2.5); - gpr_histogram_add(h, 2.5); - gpr_histogram_add(h, 8); - gpr_histogram_add(h, 4); - - GPR_ASSERT(gpr_histogram_count(h) == 4); - GPR_ASSERT(gpr_histogram_minimum(h) == 2.5); - GPR_ASSERT(gpr_histogram_maximum(h) == 8); - GPR_ASSERT(gpr_histogram_sum(h) == 17); - GPR_ASSERT(gpr_histogram_sum_of_squares(h) == 92.5); - GPR_ASSERT(gpr_histogram_mean(h) == 4.25); - GPR_ASSERT(gpr_histogram_variance(h) == 5.0625); - GPR_ASSERT(gpr_histogram_stddev(h) == 2.25); - - expect_percentile(h, -10, 2.5, 2.5); - expect_percentile(h, 0, 2.5, 2.5); - expect_percentile(h, 12.5, 2.5, 2.5); - expect_percentile(h, 25, 2.5, 2.5); - expect_percentile(h, 37.5, 2.5, 2.8); - expect_percentile(h, 50, 3.0, 3.5); - expect_percentile(h, 62.5, 3.5, 4.5); - expect_percentile(h, 75, 5, 7.9); - expect_percentile(h, 100, 8, 8); - expect_percentile(h, 110, 8, 8); - - /* test monotonicity */ - last = 0.0; - for (i = 0; i < 100.0; i += 0.01) { - cur = gpr_histogram_percentile(h, i); - GPR_ASSERT(cur >= last); - last = cur; - } - - gpr_histogram_destroy(h); -} - -static void test_merge(void) { - gpr_histogram *h1, *h2; - double last; - double i; - double cur; - - LOG_TEST("test_merge"); - - h1 = gpr_histogram_create(0.05, 1e9); - gpr_histogram_add(h1, 2.5); - gpr_histogram_add(h1, 2.5); - gpr_histogram_add(h1, 8); - gpr_histogram_add(h1, 4); - - h2 = gpr_histogram_create(0.01, 1e9); - GPR_ASSERT(gpr_histogram_merge(h1, h2) == 0); - gpr_histogram_destroy(h2); - - h2 = gpr_histogram_create(0.05, 1e10); - GPR_ASSERT(gpr_histogram_merge(h1, h2) == 0); - gpr_histogram_destroy(h2); - - h2 = gpr_histogram_create(0.05, 1e9); - GPR_ASSERT(gpr_histogram_merge(h1, h2) == 1); - GPR_ASSERT(gpr_histogram_count(h1) == 4); - GPR_ASSERT(gpr_histogram_minimum(h1) == 2.5); - GPR_ASSERT(gpr_histogram_maximum(h1) == 8); - GPR_ASSERT(gpr_histogram_sum(h1) == 17); - GPR_ASSERT(gpr_histogram_sum_of_squares(h1) == 92.5); - GPR_ASSERT(gpr_histogram_mean(h1) == 4.25); - GPR_ASSERT(gpr_histogram_variance(h1) == 5.0625); - GPR_ASSERT(gpr_histogram_stddev(h1) == 2.25); - gpr_histogram_destroy(h2); - - h2 = gpr_histogram_create(0.05, 1e9); - gpr_histogram_add(h2, 7.0); - gpr_histogram_add(h2, 17.0); - gpr_histogram_add(h2, 1.0); - GPR_ASSERT(gpr_histogram_merge(h1, h2) == 1); - GPR_ASSERT(gpr_histogram_count(h1) == 7); - GPR_ASSERT(gpr_histogram_minimum(h1) == 1.0); - GPR_ASSERT(gpr_histogram_maximum(h1) == 17.0); - GPR_ASSERT(gpr_histogram_sum(h1) == 42.0); - GPR_ASSERT(gpr_histogram_sum_of_squares(h1) == 431.5); - GPR_ASSERT(gpr_histogram_mean(h1) == 6.0); - - /* test monotonicity */ - last = 0.0; - for (i = 0; i < 100.0; i += 0.01) { - cur = gpr_histogram_percentile(h1, i); - GPR_ASSERT(cur >= last); - last = cur; - } - - gpr_histogram_destroy(h1); - gpr_histogram_destroy(h2); -} - -int main(void) { - test_no_op(); - test_simple(); - test_percentile(); - test_merge(); - return 0; -} diff --git a/test/core/surface/concurrent_connectivity_test.cc b/test/core/surface/concurrent_connectivity_test.cc index 8fa15ab331..c7611b0dd1 100644 --- a/test/core/surface/concurrent_connectivity_test.cc +++ b/test/core/surface/concurrent_connectivity_test.cc @@ -49,10 +49,11 @@ #define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10 #define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100 #define DELAY_MILLIS_SHORT_TIMEOUTS 1 -// in a successful test run, POLL_MILLIS should never be reached beause all runs -// should -// end after the shorter delay_millis +// in a successful test run, POLL_MILLIS should never be reached because all +// runs should end after the shorter delay_millis #define POLL_MILLIS_SHORT_TIMEOUTS 30000 +// it should never take longer that this to shutdown the server +#define SERVER_SHUTDOWN_TIMEOUT 30000 static void* tag(int n) { return (void*)(uintptr_t)n; } static int detag(void* p) { return (int)(uintptr_t)p; } @@ -95,7 +96,8 @@ struct server_thread_args { void server_thread(void* vargs) { struct server_thread_args* args = (struct server_thread_args*)vargs; grpc_event ev; - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = + grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT); ev = grpc_completion_queue_next(args->cq, deadline, nullptr); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); GPR_ASSERT(detag(ev.tag) == 0xd1e); diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c index 33dc70a685..8d2384ba61 100644 --- a/test/core/surface/public_headers_must_be_c89.c +++ b/test/core/surface/public_headers_must_be_c89.c @@ -50,7 +50,6 @@ #include <grpc/support/avl.h> #include <grpc/support/cmdline.h> #include <grpc/support/cpu.h> -#include <grpc/support/histogram.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> @@ -266,21 +265,6 @@ int main(int argc, char **argv) { printf("%lx", (unsigned long) gpr_cmdline_usage_string); printf("%lx", (unsigned long) gpr_cpu_num_cores); printf("%lx", (unsigned long) gpr_cpu_current_cpu); - printf("%lx", (unsigned long) gpr_histogram_create); - printf("%lx", (unsigned long) gpr_histogram_destroy); - printf("%lx", (unsigned long) gpr_histogram_add); - printf("%lx", (unsigned long) gpr_histogram_merge); - printf("%lx", (unsigned long) gpr_histogram_percentile); - printf("%lx", (unsigned long) gpr_histogram_mean); - printf("%lx", (unsigned long) gpr_histogram_stddev); - printf("%lx", (unsigned long) gpr_histogram_variance); - printf("%lx", (unsigned long) gpr_histogram_maximum); - printf("%lx", (unsigned long) gpr_histogram_minimum); - printf("%lx", (unsigned long) gpr_histogram_count); - printf("%lx", (unsigned long) gpr_histogram_sum); - printf("%lx", (unsigned long) gpr_histogram_sum_of_squares); - printf("%lx", (unsigned long) gpr_histogram_get_contents); - printf("%lx", (unsigned long) gpr_histogram_merge_contents); printf("%lx", (unsigned long) gpr_join_host_port); printf("%lx", (unsigned long) gpr_split_host_port); printf("%lx", (unsigned long) gpr_log_severity_string); diff --git a/test/core/util/BUILD b/test/core/util/BUILD index 6443553466..2237cfc173 100644 --- a/test/core/util/BUILD +++ b/test/core/util/BUILD @@ -16,7 +16,10 @@ load("//bazel:grpc_build_system.bzl", "grpc_cc_library", "grpc_cc_test", "grpc_c licenses(["notice"]) # Apache v2 -grpc_package(name = "test/core/util", visibility = "public") +grpc_package( + name = "test/core/util", + visibility = "public", +) grpc_cc_library( name = "gpr_test_util", @@ -49,10 +52,12 @@ grpc_cc_library( name = "grpc_test_util_base", srcs = [ "grpc_profiler.cc", + "histogram.cc", "mock_endpoint.cc", "parse_hexstring.cc", "passthru_endpoint.cc", "port.cc", + "port_isolated_runtime_environment.cc", "port_server_client.cc", "reconnect_server.cc", "slice_splitter.cc", @@ -62,6 +67,7 @@ grpc_cc_library( ], hdrs = [ "grpc_profiler.h", + "histogram.h", "mock_endpoint.h", "parse_hexstring.h", "passthru_endpoint.h", @@ -76,8 +82,8 @@ grpc_cc_library( language = "C++", deps = [ ":gpr_test_util", + ":grpc_debugger_macros", "//:grpc_common", - ":grpc_debugger_macros" ], ) @@ -107,13 +113,23 @@ grpc_cc_library( name = "fuzzer_corpus_test", testonly = 1, srcs = ["fuzzer_corpus_test.cc"], + external_deps = [ + "gtest", + "gflags", + ], deps = [ ":gpr_test_util", "//:grpc", ], - external_deps = [ - "gtest", - "gflags", +) + +grpc_cc_test( + name = "histogram_test", + srcs = ["histogram_test.cc"], + language = "C++", + deps = [ + ":grpc_test_util", + "//:gpr", ], ) @@ -121,3 +137,8 @@ sh_library( name = "fuzzer_one_entry_runner", srcs = ["fuzzer_one_entry_runner.sh"], ) + +sh_library( + name = "run_with_poller_sh", + srcs = ["run_with_poller.sh"], +) diff --git a/test/core/util/histogram.cc b/test/core/util/histogram.cc new file mode 100644 index 0000000000..2f916f831d --- /dev/null +++ b/test/core/util/histogram.cc @@ -0,0 +1,227 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <math.h> +#include <stddef.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/useful.h> + +#include "test/core/util/histogram.h" + +/* Histograms are stored with exponentially increasing bucket sizes. + The first bucket is [0, m) where m = 1 + resolution + Bucket n (n>=1) contains [m**n, m**(n+1)) + There are sufficient buckets to reach max_bucket_start */ + +struct grpc_histogram { + /* Sum of all values seen so far */ + double sum; + /* Sum of squares of all values seen so far */ + double sum_of_squares; + /* number of values seen so far */ + double count; + /* m in the description */ + double multiplier; + double one_on_log_multiplier; + /* minimum value seen */ + double min_seen; + /* maximum value seen */ + double max_seen; + /* maximum representable value */ + double max_possible; + /* number of buckets */ + size_t num_buckets; + /* the buckets themselves */ + uint32_t* buckets; +}; + +/* determine a bucket index given a value - does no bounds checking */ +static size_t bucket_for_unchecked(grpc_histogram* h, double x) { + return (size_t)(log(x) * h->one_on_log_multiplier); +} + +/* bounds checked version of the above */ +static size_t bucket_for(grpc_histogram* h, double x) { + size_t bucket = bucket_for_unchecked(h, GPR_CLAMP(x, 1.0, h->max_possible)); + GPR_ASSERT(bucket < h->num_buckets); + return bucket; +} + +/* at what value does a bucket start? */ +static double bucket_start(grpc_histogram* h, double x) { + return pow(h->multiplier, x); +} + +grpc_histogram* grpc_histogram_create(double resolution, + double max_bucket_start) { + grpc_histogram* h = (grpc_histogram*)gpr_malloc(sizeof(grpc_histogram)); + GPR_ASSERT(resolution > 0.0); + GPR_ASSERT(max_bucket_start > resolution); + h->sum = 0.0; + h->sum_of_squares = 0.0; + h->multiplier = 1.0 + resolution; + h->one_on_log_multiplier = 1.0 / log(1.0 + resolution); + h->max_possible = max_bucket_start; + h->count = 0.0; + h->min_seen = max_bucket_start; + h->max_seen = 0.0; + h->num_buckets = bucket_for_unchecked(h, max_bucket_start) + 1; + GPR_ASSERT(h->num_buckets > 1); + GPR_ASSERT(h->num_buckets < 100000000); + h->buckets = (uint32_t*)gpr_zalloc(sizeof(uint32_t) * h->num_buckets); + return h; +} + +void grpc_histogram_destroy(grpc_histogram* h) { + gpr_free(h->buckets); + gpr_free(h); +} + +void grpc_histogram_add(grpc_histogram* h, double x) { + h->sum += x; + h->sum_of_squares += x * x; + h->count++; + if (x < h->min_seen) { + h->min_seen = x; + } + if (x > h->max_seen) { + h->max_seen = x; + } + h->buckets[bucket_for(h, x)]++; +} + +int grpc_histogram_merge(grpc_histogram* dst, const grpc_histogram* src) { + if ((dst->num_buckets != src->num_buckets) || + (dst->multiplier != src->multiplier)) { + /* Fail because these histograms don't match */ + return 0; + } + grpc_histogram_merge_contents(dst, src->buckets, src->num_buckets, + src->min_seen, src->max_seen, src->sum, + src->sum_of_squares, src->count); + return 1; +} + +void grpc_histogram_merge_contents(grpc_histogram* dst, const uint32_t* data, + size_t data_count, double min_seen, + double max_seen, double sum, + double sum_of_squares, double count) { + size_t i; + GPR_ASSERT(dst->num_buckets == data_count); + dst->sum += sum; + dst->sum_of_squares += sum_of_squares; + dst->count += count; + if (min_seen < dst->min_seen) { + dst->min_seen = min_seen; + } + if (max_seen > dst->max_seen) { + dst->max_seen = max_seen; + } + for (i = 0; i < dst->num_buckets; i++) { + dst->buckets[i] += data[i]; + } +} + +static double threshold_for_count_below(grpc_histogram* h, double count_below) { + double count_so_far; + double lower_bound; + double upper_bound; + size_t lower_idx; + size_t upper_idx; + + if (h->count == 0) { + return 0.0; + } + + if (count_below <= 0) { + return h->min_seen; + } + if (count_below >= h->count) { + return h->max_seen; + } + + /* find the lowest bucket that gets us above count_below */ + count_so_far = 0.0; + for (lower_idx = 0; lower_idx < h->num_buckets; lower_idx++) { + count_so_far += h->buckets[lower_idx]; + if (count_so_far >= count_below) { + break; + } + } + if (count_so_far == count_below) { + /* this bucket hits the threshold exactly... we should be midway through + any run of zero values following the bucket */ + for (upper_idx = lower_idx + 1; upper_idx < h->num_buckets; upper_idx++) { + if (h->buckets[upper_idx]) { + break; + } + } + return (bucket_start(h, (double)lower_idx) + + bucket_start(h, (double)upper_idx)) / + 2.0; + } else { + /* treat values as uniform throughout the bucket, and find where this value + should lie */ + lower_bound = bucket_start(h, (double)lower_idx); + upper_bound = bucket_start(h, (double)(lower_idx + 1)); + return GPR_CLAMP(upper_bound - (upper_bound - lower_bound) * + (count_so_far - count_below) / + h->buckets[lower_idx], + h->min_seen, h->max_seen); + } +} + +double grpc_histogram_percentile(grpc_histogram* h, double percentile) { + return threshold_for_count_below(h, h->count * percentile / 100.0); +} + +double grpc_histogram_mean(grpc_histogram* h) { + GPR_ASSERT(h->count != 0); + return h->sum / h->count; +} + +double grpc_histogram_stddev(grpc_histogram* h) { + return sqrt(grpc_histogram_variance(h)); +} + +double grpc_histogram_variance(grpc_histogram* h) { + if (h->count == 0) return 0.0; + return (h->sum_of_squares * h->count - h->sum * h->sum) / + (h->count * h->count); +} + +double grpc_histogram_maximum(grpc_histogram* h) { return h->max_seen; } + +double grpc_histogram_minimum(grpc_histogram* h) { return h->min_seen; } + +double grpc_histogram_count(grpc_histogram* h) { return h->count; } + +double grpc_histogram_sum(grpc_histogram* h) { return h->sum; } + +double grpc_histogram_sum_of_squares(grpc_histogram* h) { + return h->sum_of_squares; +} + +const uint32_t* grpc_histogram_get_contents(grpc_histogram* h, size_t* size) { + *size = h->num_buckets; + return h->buckets; +} diff --git a/test/core/util/histogram.h b/test/core/util/histogram.h new file mode 100644 index 0000000000..9d4985e64f --- /dev/null +++ b/test/core/util/histogram.h @@ -0,0 +1,62 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_SUPPORT_HISTOGRAM_H +#define GRPC_SUPPORT_HISTOGRAM_H + +#include <grpc/support/port_platform.h> +#include <stddef.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct grpc_histogram grpc_histogram; + +grpc_histogram* grpc_histogram_create(double resolution, + double max_bucket_start); +void grpc_histogram_destroy(grpc_histogram* h); +void grpc_histogram_add(grpc_histogram* h, double x); + +/** The following merges the second histogram into the first. It only works + if they have the same buckets and resolution. Returns 0 on failure, 1 + on success */ +int grpc_histogram_merge(grpc_histogram* dst, const grpc_histogram* src); + +double grpc_histogram_percentile(grpc_histogram* histogram, double percentile); +double grpc_histogram_mean(grpc_histogram* histogram); +double grpc_histogram_stddev(grpc_histogram* histogram); +double grpc_histogram_variance(grpc_histogram* histogram); +double grpc_histogram_maximum(grpc_histogram* histogram); +double grpc_histogram_minimum(grpc_histogram* histogram); +double grpc_histogram_count(grpc_histogram* histogram); +double grpc_histogram_sum(grpc_histogram* histogram); +double grpc_histogram_sum_of_squares(grpc_histogram* histogram); + +const uint32_t* grpc_histogram_get_contents(grpc_histogram* histogram, + size_t* count); +void grpc_histogram_merge_contents(grpc_histogram* histogram, + const uint32_t* data, size_t data_count, + double min_seen, double max_seen, double sum, + double sum_of_squares, double count); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_SUPPORT_HISTOGRAM_H */ diff --git a/test/core/util/histogram_test.cc b/test/core/util/histogram_test.cc new file mode 100644 index 0000000000..b96ac7d841 --- /dev/null +++ b/test/core/util/histogram_test.cc @@ -0,0 +1,163 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/core/util/histogram.h" +#include <grpc/support/log.h> + +#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x); + +static void test_no_op(void) { + grpc_histogram_destroy(grpc_histogram_create(0.01, 60e9)); +} + +static void expect_percentile(grpc_histogram* h, double percentile, + double min_expect, double max_expect) { + double got = grpc_histogram_percentile(h, percentile); + gpr_log(GPR_INFO, "@%f%%, expect %f <= %f <= %f", percentile, min_expect, got, + max_expect); + GPR_ASSERT(min_expect <= got); + GPR_ASSERT(got <= max_expect); +} + +static void test_simple(void) { + grpc_histogram* h; + + LOG_TEST("test_simple"); + + h = grpc_histogram_create(0.01, 60e9); + grpc_histogram_add(h, 10000); + grpc_histogram_add(h, 10000); + grpc_histogram_add(h, 11000); + grpc_histogram_add(h, 11000); + + expect_percentile(h, 50, 10001, 10999); + GPR_ASSERT(grpc_histogram_mean(h) == 10500); + + grpc_histogram_destroy(h); +} + +static void test_percentile(void) { + grpc_histogram* h; + double last; + double i; + double cur; + + LOG_TEST("test_percentile"); + + h = grpc_histogram_create(0.05, 1e9); + grpc_histogram_add(h, 2.5); + grpc_histogram_add(h, 2.5); + grpc_histogram_add(h, 8); + grpc_histogram_add(h, 4); + + GPR_ASSERT(grpc_histogram_count(h) == 4); + GPR_ASSERT(grpc_histogram_minimum(h) == 2.5); + GPR_ASSERT(grpc_histogram_maximum(h) == 8); + GPR_ASSERT(grpc_histogram_sum(h) == 17); + GPR_ASSERT(grpc_histogram_sum_of_squares(h) == 92.5); + GPR_ASSERT(grpc_histogram_mean(h) == 4.25); + GPR_ASSERT(grpc_histogram_variance(h) == 5.0625); + GPR_ASSERT(grpc_histogram_stddev(h) == 2.25); + + expect_percentile(h, -10, 2.5, 2.5); + expect_percentile(h, 0, 2.5, 2.5); + expect_percentile(h, 12.5, 2.5, 2.5); + expect_percentile(h, 25, 2.5, 2.5); + expect_percentile(h, 37.5, 2.5, 2.8); + expect_percentile(h, 50, 3.0, 3.5); + expect_percentile(h, 62.5, 3.5, 4.5); + expect_percentile(h, 75, 5, 7.9); + expect_percentile(h, 100, 8, 8); + expect_percentile(h, 110, 8, 8); + + /* test monotonicity */ + last = 0.0; + for (i = 0; i < 100.0; i += 0.01) { + cur = grpc_histogram_percentile(h, i); + GPR_ASSERT(cur >= last); + last = cur; + } + + grpc_histogram_destroy(h); +} + +static void test_merge(void) { + grpc_histogram *h1, *h2; + double last; + double i; + double cur; + + LOG_TEST("test_merge"); + + h1 = grpc_histogram_create(0.05, 1e9); + grpc_histogram_add(h1, 2.5); + grpc_histogram_add(h1, 2.5); + grpc_histogram_add(h1, 8); + grpc_histogram_add(h1, 4); + + h2 = grpc_histogram_create(0.01, 1e9); + GPR_ASSERT(grpc_histogram_merge(h1, h2) == 0); + grpc_histogram_destroy(h2); + + h2 = grpc_histogram_create(0.05, 1e10); + GPR_ASSERT(grpc_histogram_merge(h1, h2) == 0); + grpc_histogram_destroy(h2); + + h2 = grpc_histogram_create(0.05, 1e9); + GPR_ASSERT(grpc_histogram_merge(h1, h2) == 1); + GPR_ASSERT(grpc_histogram_count(h1) == 4); + GPR_ASSERT(grpc_histogram_minimum(h1) == 2.5); + GPR_ASSERT(grpc_histogram_maximum(h1) == 8); + GPR_ASSERT(grpc_histogram_sum(h1) == 17); + GPR_ASSERT(grpc_histogram_sum_of_squares(h1) == 92.5); + GPR_ASSERT(grpc_histogram_mean(h1) == 4.25); + GPR_ASSERT(grpc_histogram_variance(h1) == 5.0625); + GPR_ASSERT(grpc_histogram_stddev(h1) == 2.25); + grpc_histogram_destroy(h2); + + h2 = grpc_histogram_create(0.05, 1e9); + grpc_histogram_add(h2, 7.0); + grpc_histogram_add(h2, 17.0); + grpc_histogram_add(h2, 1.0); + GPR_ASSERT(grpc_histogram_merge(h1, h2) == 1); + GPR_ASSERT(grpc_histogram_count(h1) == 7); + GPR_ASSERT(grpc_histogram_minimum(h1) == 1.0); + GPR_ASSERT(grpc_histogram_maximum(h1) == 17.0); + GPR_ASSERT(grpc_histogram_sum(h1) == 42.0); + GPR_ASSERT(grpc_histogram_sum_of_squares(h1) == 431.5); + GPR_ASSERT(grpc_histogram_mean(h1) == 6.0); + + /* test monotonicity */ + last = 0.0; + for (i = 0; i < 100.0; i += 0.01) { + cur = grpc_histogram_percentile(h1, i); + GPR_ASSERT(cur >= last); + last = cur; + } + + grpc_histogram_destroy(h1); + grpc_histogram_destroy(h2); +} + +int main(void) { + test_no_op(); + test_simple(); + test_percentile(); + test_merge(); + return 0; +} diff --git a/test/core/util/port_isolated_runtime_environment.cc b/test/core/util/port_isolated_runtime_environment.cc new file mode 100644 index 0000000000..5f0585e9fb --- /dev/null +++ b/test/core/util/port_isolated_runtime_environment.cc @@ -0,0 +1,42 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* When running tests on remote machines, the framework takes a round-robin pick + * of a port within certain range. There is no need to recycle ports. + */ +#include "src/core/lib/iomgr/port.h" +#include "test/core/util/test_config.h" +#if defined(GRPC_PORT_ISOLATED_RUNTIME) + +#include "test/core/util/port.h" + +#define LOWER_PORT 49152 +static int s_allocated_port = LOWER_PORT; + +int grpc_pick_unused_port_or_die(void) { + int allocated_port = s_allocated_port++; + if (s_allocated_port == 65536) { + s_allocated_port = LOWER_PORT; + } + + return allocated_port; +} + +void grpc_recycle_unused_port(int port) { (void)port; } + +#endif /* GRPC_PORT_ISOLATED_RUNTIME */ diff --git a/test/core/util/run_with_poller.sh b/test/core/util/run_with_poller.sh new file mode 100755 index 0000000000..05791457a2 --- /dev/null +++ b/test/core/util/run_with_poller.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -ex +export GRPC_POLL_STRATEGY=$1 +shift +$@ diff --git a/test/core/util/test_config.h b/test/core/util/test_config.h index 775ffac949..5b3d34799e 100644 --- a/test/core/util/test_config.h +++ b/test/core/util/test_config.h @@ -33,7 +33,7 @@ gpr_timespec grpc_timeout_seconds_to_deadline(int64_t time_s); /* Converts a given timeout (in milliseconds) to a deadline. */ gpr_timespec grpc_timeout_milliseconds_to_deadline(int64_t time_ms); -#ifndef GRPC_TEST_CUSTOM_PICK_PORT +#if !defined(GRPC_TEST_CUSTOM_PICK_PORT) && !defined(GRPC_PORT_ISOLATED_RUNTIME) #define GRPC_TEST_PICK_PORT #endif diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc index a24cdc7d2d..30bd8bfef8 100644 --- a/test/cpp/interop/interop_server.cc +++ b/test/cpp/interop/interop_server.cc @@ -317,9 +317,15 @@ class TestServiceImpl : public TestService::Service { void grpc::testing::interop::RunServer( std::shared_ptr<ServerCredentials> creds) { - GPR_ASSERT(FLAGS_port != 0); + RunServer(creds, FLAGS_port, nullptr); +} + +void grpc::testing::interop::RunServer( + std::shared_ptr<ServerCredentials> creds, const int port, + ServerStartedCondition* server_started_condition) { + GPR_ASSERT(port != 0); std::ostringstream server_address; - server_address << "0.0.0.0:" << FLAGS_port; + server_address << "0.0.0.0:" << port; TestServiceImpl service; SimpleRequest request; @@ -333,6 +339,14 @@ void grpc::testing::interop::RunServer( } std::unique_ptr<Server> server(builder.BuildAndStart()); gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str()); + + // Signal that the server has started. + if (server_started_condition) { + std::unique_lock<std::mutex> lock(server_started_condition->mutex); + server_started_condition->server_started = true; + server_started_condition->condition.notify_all(); + } + while (!gpr_atm_no_barrier_load(&g_got_sigint)) { gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(5, GPR_TIMESPAN))); diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 6af003fe93..1bf7db1e14 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -19,6 +19,7 @@ #ifndef GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H #define GRPC_TEST_CPP_INTEROP_SERVER_HELPER_H +#include <condition_variable> #include <memory> #include <grpc/compression.h> @@ -50,8 +51,27 @@ class InteropServerContextInspector { namespace interop { extern gpr_atm g_got_sigint; + +struct ServerStartedCondition { + std::mutex mutex; + std::condition_variable condition; + bool server_started = false; +}; + +/// Run gRPC interop server using port FLAGS_port. +/// +/// \param creds The credentials associated with the server. void RunServer(std::shared_ptr<ServerCredentials> creds); +/// Run gRPC interop server. +/// +/// \param creds The credentials associated with the server. +/// \param port Port to use for the server. +/// \param server_started_condition (optional) Struct holding mutex, condition +/// variable, and condition used to notify when the server has started. +void RunServer(std::shared_ptr<ServerCredentials> creds, int port, + ServerStartedCondition* server_started_condition); + } // namespace interop } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD index 0d91d52f22..f1abb19e64 100644 --- a/test/cpp/qps/BUILD +++ b/test/cpp/qps/BUILD @@ -106,7 +106,7 @@ grpc_cc_library( "histogram.h", "stats.h", ], - deps = ["//:gpr"], + deps = ["//test/core/util:grpc_test_util"], ) grpc_cc_test( diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 9f20b148eb..82a3f0042d 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -60,21 +60,20 @@ class SynchronousClient SetupLoadTest(config, num_threads_); } - virtual ~SynchronousClient(){}; + virtual ~SynchronousClient() {} - virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool InitThreadFuncImpl(size_t thread_idx) = 0; virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; void ThreadFunc(size_t thread_idx, Thread* t) override { - InitThreadFuncImpl(thread_idx); + if (!InitThreadFuncImpl(thread_idx)) { + return; + } for (;;) { // run the loop body HistogramEntry entry; const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); t->UpdateHistogram(&entry); - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } if (!thread_still_ok || ThreadCompleted()) { return; } @@ -109,9 +108,6 @@ class SynchronousClient size_t num_threads_; std::vector<SimpleResponse> responses_; - - private: - void DestroyMultithreading() override final { EndThreads(); } }; class SynchronousUnaryClient final : public SynchronousClient { @@ -122,7 +118,7 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFuncImpl(size_t thread_idx) override {} + bool InitThreadFuncImpl(size_t thread_idx) override { return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { @@ -140,6 +136,9 @@ class SynchronousUnaryClient final : public SynchronousClient { entry->set_status(s.error_code()); return true; } + + private: + void DestroyMultithreading() override final { EndThreads(); } }; template <class StreamType> @@ -149,31 +148,30 @@ class SynchronousStreamingClient : public SynchronousClient { : SynchronousClient(config), context_(num_threads_), stream_(num_threads_), + stream_mu_(num_threads_), + shutdown_(num_threads_), messages_per_stream_(config.messages_per_stream()), messages_issued_(num_threads_) { StartThreads(num_threads_); } virtual ~SynchronousStreamingClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - // forcibly cancel the streams, then finish - context_[i].TryCancel(); - (*stream)->Finish().IgnoreError(); - // don't log any error message on !ok since this was canceled - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams([this](size_t thread_idx) { + // Don't log any kind of error since we may have canceled this + stream_[thread_idx]->Finish().IgnoreError(); + }); } protected: std::vector<grpc::ClientContext> context_; std::vector<std::unique_ptr<StreamType>> stream_; + // stream_mu_ is only needed when changing an element of stream_ or context_ + std::vector<std::mutex> stream_mu_; + // use struct Bool rather than bool because vector<bool> is not concurrent + struct Bool { + bool val; + Bool() : val(false) {} + }; + std::vector<Bool> shutdown_; const int messages_per_stream_; std::vector<int> messages_issued_; @@ -182,27 +180,26 @@ class SynchronousStreamingClient : public SynchronousClient { // don't set the value since the stream is failed and shouldn't be timed entry->set_status(s.error_code()); if (!s.ok()) { - gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", thread_idx, - s.error_message().c_str()); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", + thread_idx, s.error_message().c_str()); + } } + // Lock the stream_mu_ now because the client context could change + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); context_[thread_idx].~ClientContext(); new (&context_[thread_idx]) ClientContext(); } -}; -class SynchronousStreamingPingPongClient final - : public SynchronousStreamingClient< - grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { - public: - SynchronousStreamingPingPongClient(const ClientConfig& config) - : SynchronousStreamingClient(config) {} - ~SynchronousStreamingPingPongClient() { + void CleanupAllStreams(std::function<void(size_t)> cleaner) { std::vector<std::thread> cleanup_threads; for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); + cleanup_threads.emplace_back([this, i, cleaner] { + std::lock_guard<std::mutex> l(stream_mu_[i]); + shutdown_[i].val = true; + if (stream_[i]) { + cleaner(i); } }); } @@ -211,10 +208,36 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + void DestroyMultithreading() override final { + CleanupAllStreams( + [this](size_t thread_idx) { context_[thread_idx].TryCancel(); }); + EndThreads(); + } +}; + +class SynchronousStreamingPingPongClient final + : public SynchronousStreamingClient< + grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> { + public: + SynchronousStreamingPingPongClient(const ClientConfig& config) + : SynchronousStreamingClient(config) {} + ~SynchronousStreamingPingPongClient() { + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); + } + + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + return false; + } messages_issued_[thread_idx] = 0; + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -239,7 +262,13 @@ class SynchronousStreamingPingPongClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } messages_issued_[thread_idx] = 0; return true; } @@ -251,25 +280,24 @@ class SynchronousStreamingFromClientClient final SynchronousStreamingFromClientClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_issue_(num_threads_) {} ~SynchronousStreamingFromClientClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + std::vector<double> last_issue_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + return false; + } last_issue_[thread_idx] = UsageTimer::Now(); + return true; } bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { @@ -287,13 +315,16 @@ class SynchronousStreamingFromClientClient final stream_[thread_idx]->WritesDone(); FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], - &responses_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], + &responses_[thread_idx]); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_issue_; }; class SynchronousStreamingFromServerClient final @@ -301,12 +332,24 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFuncImpl(size_t thread_idx) override { + ~SynchronousStreamingFromServerClient() {} + + private: + std::vector<double> last_recv_; + + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + return false; + } last_recv_[thread_idx] = UsageTimer::Now(); + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { @@ -317,13 +360,16 @@ class SynchronousStreamingFromServerClient final } FinishStream(entry, thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = - stub->StreamingFromServer(&context_[thread_idx], request_); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = + stub->StreamingFromServer(&context_[thread_idx], request_); + } else { + stream_[thread_idx].reset(); + return false; + } return true; } - - private: - std::vector<double> last_recv_; }; class SynchronousStreamingBothWaysClient final @@ -333,24 +379,22 @@ class SynchronousStreamingBothWaysClient final SynchronousStreamingBothWaysClient(const ClientConfig& config) : SynchronousStreamingClient(config) {} ~SynchronousStreamingBothWaysClient() { - std::vector<std::thread> cleanup_threads; - for (size_t i = 0; i < num_threads_; i++) { - cleanup_threads.emplace_back([this, i]() { - auto stream = &stream_[i]; - if (*stream) { - (*stream)->WritesDone(); - } - }); - } - for (auto& th : cleanup_threads) { - th.join(); - } + CleanupAllStreams( + [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); }); } - void InitThreadFuncImpl(size_t thread_idx) override { + private: + bool InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); - stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + std::lock_guard<std::mutex> l(stream_mu_[thread_idx]); + if (!shutdown_[thread_idx].val) { + stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); + } else { + return false; + } + return true; } + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h index e31d5d78a8..ba72b5b332 100644 --- a/test/cpp/qps/histogram.h +++ b/test/cpp/qps/histogram.h @@ -19,8 +19,8 @@ #ifndef TEST_QPS_HISTOGRAM_H #define TEST_QPS_HISTOGRAM_H -#include <grpc/support/histogram.h> #include "src/proto/grpc/testing/stats.pb.h" +#include "test/core/util/histogram.h" namespace grpc { namespace testing { @@ -29,36 +29,36 @@ class Histogram { public: // TODO: look into making histogram params not hardcoded for C++ Histogram() - : impl_(gpr_histogram_create(default_resolution(), - default_max_possible())) {} + : impl_(grpc_histogram_create(default_resolution(), + default_max_possible())) {} ~Histogram() { - if (impl_) gpr_histogram_destroy(impl_); + if (impl_) grpc_histogram_destroy(impl_); } Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; } - void Merge(const Histogram& h) { gpr_histogram_merge(impl_, h.impl_); } - void Add(double value) { gpr_histogram_add(impl_, value); } + void Merge(const Histogram& h) { grpc_histogram_merge(impl_, h.impl_); } + void Add(double value) { grpc_histogram_add(impl_, value); } double Percentile(double pctile) const { - return gpr_histogram_percentile(impl_, pctile); + return grpc_histogram_percentile(impl_, pctile); } - double Count() const { return gpr_histogram_count(impl_); } + double Count() const { return grpc_histogram_count(impl_); } void Swap(Histogram* other) { std::swap(impl_, other->impl_); } void FillProto(HistogramData* p) { size_t n; - const auto* data = gpr_histogram_get_contents(impl_, &n); + const auto* data = grpc_histogram_get_contents(impl_, &n); for (size_t i = 0; i < n; i++) { p->add_bucket(data[i]); } - p->set_min_seen(gpr_histogram_minimum(impl_)); - p->set_max_seen(gpr_histogram_maximum(impl_)); - p->set_sum(gpr_histogram_sum(impl_)); - p->set_sum_of_squares(gpr_histogram_sum_of_squares(impl_)); - p->set_count(gpr_histogram_count(impl_)); + p->set_min_seen(grpc_histogram_minimum(impl_)); + p->set_max_seen(grpc_histogram_maximum(impl_)); + p->set_sum(grpc_histogram_sum(impl_)); + p->set_sum_of_squares(grpc_histogram_sum_of_squares(impl_)); + p->set_count(grpc_histogram_count(impl_)); } void MergeProto(const HistogramData& p) { - gpr_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(), - p.min_seen(), p.max_seen(), p.sum(), - p.sum_of_squares(), p.count()); + grpc_histogram_merge_contents(impl_, &*p.bucket().begin(), p.bucket_size(), + p.min_seen(), p.max_seen(), p.sum(), + p.sum_of_squares(), p.count()); } static double default_resolution() { return 0.01; } @@ -68,7 +68,7 @@ class Histogram { Histogram(const Histogram&); Histogram& operator=(const Histogram&); - gpr_histogram* impl_; + grpc_histogram* impl_; }; } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc index 461bf624ce..625b7db426 100644 --- a/test/cpp/qps/qps_interarrival_test.cc +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -20,7 +20,7 @@ #include <iostream> // Use the C histogram rather than C++ to avoid depending on proto -#include <grpc/support/histogram.h> +#include "test/core/util/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/util/test_config.h" @@ -31,21 +31,21 @@ using grpc::testing::RandomDistInterface; static void RunTest(RandomDistInterface&& r, int threads, std::string title) { InterarrivalTimer timer; timer.init(r, threads); - gpr_histogram* h(gpr_histogram_create(0.01, 60e9)); + grpc_histogram* h(grpc_histogram_create(0.01, 60e9)); for (int i = 0; i < 10000000; i++) { for (int j = 0; j < threads; j++) { - gpr_histogram_add(h, timer.next(j)); + grpc_histogram_add(h, timer.next(j)); } } std::cout << title << " Distribution" << std::endl; std::cout << "Value, Percentile" << std::endl; for (double pct = 0.0; pct < 100.0; pct += 1.0) { - std::cout << gpr_histogram_percentile(h, pct) << "," << pct << std::endl; + std::cout << grpc_histogram_percentile(h, pct) << "," << pct << std::endl; } - gpr_histogram_destroy(h); + grpc_histogram_destroy(h); } using grpc::testing::ExpDist; diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index c288b03ec5..4c9ab0ea43 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -32,12 +32,12 @@ #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/cpu.h> -#include <grpc/support/histogram.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include "src/proto/grpc/testing/services.pb.h" #include "test/core/util/grpc_profiler.h" +#include "test/core/util/histogram.h" #include "test/cpp/qps/client.h" #include "test/cpp/qps/server.h" #include "test/cpp/util/create_test_channel.h" diff --git a/test/distrib/cpp/run_distrib_test_cmake.bat b/test/distrib/cpp/run_distrib_test_cmake.bat new file mode 100644 index 0000000000..047846b0f1 --- /dev/null +++ b/test/distrib/cpp/run_distrib_test_cmake.bat @@ -0,0 +1,75 @@ +@rem Copyright 2016 gRPC authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. + +@rem enter this directory +cd /d %~dp0\..\..\.. + +@rem TODO(jtattermusch): Kokoro has pre-installed protoc.exe in C:\Program Files\ProtoC and that directory +@rem is on PATH. To avoid picking up the older version protoc.exe, we change the path to something non-existent. +set PATH=%PATH:ProtoC=DontPickupProtoC% + +@rem Install into ./testinstall, but use absolute path and foward slashes +set INSTALL_DIR=%cd:\=/%/testinstall + +@rem Download OpenSSL-Win32 originally installed from https://slproweb.com/products/Win32OpenSSL.html +powershell -Command "(New-Object Net.WebClient).DownloadFile('https://storage.googleapis.com/grpc-testing.appspot.com/OpenSSL-Win32-1_1_0g.zip', 'OpenSSL-Win32.zip')" +powershell -Command "Add-Type -Assembly 'System.IO.Compression.FileSystem'; [System.IO.Compression.ZipFile]::ExtractToDirectory('OpenSSL-Win32.zip', '.');" + +@rem set absolute path to OpenSSL with forward slashes +set OPENSSL_DIR=%cd:\=/%/OpenSSL-Win32 + +cd third_party/zlib +mkdir cmake +cd cmake +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% .. +cmake --build . --config Release --target install || goto :error +cd ../../.. + +cd third_party/protobuf/cmake +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -Dprotobuf_MSVC_STATIC_RUNTIME=OFF -Dprotobuf_BUILD_TESTS=OFF .. +cmake --build . --config Release --target install || goto :error +cd ../../../.. + +cd third_party/cares/cares +mkdir cmake +cd cmake +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% .. +cmake --build . --config Release --target install || goto :error +cd ../../../.. + +@rem OpenSSL-Win32 and OpenSSL-Win64 can be downloaded from https://slproweb.com/products/Win32OpenSSL.html +cd cmake +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% -DOPENSSL_ROOT_DIR=%OPENSSL_DIR% -DOPENSSL_INCLUDE_DIR=%OPENSSL_DIR%/include -DZLIB_LIBRARY=%INSTALL_DIR%/lib/zlibstatic.lib -DZLIB_INCLUDE_DIR=%INSTALL_DIR%/include -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DgRPC_PROTOBUF_PROVIDER=package -DgRPC_ZLIB_PROVIDER=package -DgRPC_CARES_PROVIDER=package -DgRPC_SSL_PROVIDER=package -DCMAKE_BUILD_TYPE=Release ../.. || goto :error +cmake --build . --config Release --target install || goto :error +cd ../.. + +# Build helloworld example using cmake +cd examples/cpp/helloworld +mkdir cmake +cd cmake +mkdir build +cd build +cmake -DCMAKE_INSTALL_PREFIX=%INSTALL_DIR% ../.. || goto :error +cmake --build . --config Release || goto :error +cd ../../../../.. + +goto :EOF + +:error +echo Failed! +exit /b %errorlevel% diff --git a/test/distrib/cpp/run_distrib_test_cmake.sh b/test/distrib/cpp/run_distrib_test_cmake.sh index ead8cc10bc..a9c081c2f5 100755 --- a/test/distrib/cpp/run_distrib_test_cmake.sh +++ b/test/distrib/cpp/run_distrib_test_cmake.sh @@ -19,7 +19,6 @@ cd $(dirname $0)/../../.. echo "deb http://ftp.debian.org/debian jessie-backports main" | tee /etc/apt/sources.list.d/jessie-backports.list apt-get update -#apt-get install -t jessie-backports -y libc-ares-dev # we need specifically version 1.12 apt-get install -t jessie-backports -y libssl-dev # Install c-ares |