aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-02-17 14:25:12 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-02-17 14:25:12 -0800
commitc0ce00fd164b3e40bb2c1046a452b4c12e295578 (patch)
tree721f298cd488e31d2351935a7309646c1b7ecd31 /test
parent24e274b8d9c9617bc10b9dcde66864fb2884b58c (diff)
parentc77dc39771d0e0fb8a581636bb9c3dc1adef3fb1 (diff)
Merge github.com:grpc/grpc into sceq
Diffstat (limited to 'test')
-rwxr-xr-xtest/core/bad_client/gen_build_yaml.py8
-rwxr-xr-xtest/core/bad_ssl/gen_build_yaml.py12
-rw-r--r--test/core/census/log_test.c589
-rwxr-xr-xtest/core/end2end/gen_build_yaml.py8
-rw-r--r--test/core/fling/client.c12
-rw-r--r--test/core/iomgr/udp_server_test.c4
-rw-r--r--test/cpp/common/alarm_cpp_test.cc (renamed from test/cpp/common/alarm_test.cc)29
-rw-r--r--test/cpp/end2end/async_end2end_test.cc4
-rw-r--r--test/cpp/qps/client.h63
-rw-r--r--test/cpp/qps/client_async.cc469
-rw-r--r--test/cpp/qps/client_sync.cc7
-rw-r--r--test/cpp/qps/driver.cc4
-rw-r--r--test/cpp/qps/interarrival.h41
-rw-r--r--test/cpp/qps/qps_interarrival_test.cc10
-rw-r--r--test/cpp/qps/qps_openloop_test.cc2
-rw-r--r--test/cpp/qps/qps_test.cc2
-rw-r--r--test/cpp/qps/server_async.cc3
-rwxr-xr-xtest/distrib/csharp/run_distrib_test.sh5
-rwxr-xr-xtest/distrib/node/run_distrib_test.sh10
-rw-r--r--test/distrib/python/distribtest.py2
-rwxr-xr-xtest/distrib/python/run_distrib_test.sh8
-rw-r--r--test/distrib/ruby/Gemfile11
-rw-r--r--test/distrib/ruby/distribtest.gemspec19
-rwxr-xr-xtest/distrib/ruby/distribtest.rb39
-rwxr-xr-xtest/distrib/ruby/run_distrib_test.sh10
25 files changed, 978 insertions, 393 deletions
diff --git a/test/core/bad_client/gen_build_yaml.py b/test/core/bad_client/gen_build_yaml.py
index f37aa66c5f..c538bffd71 100755
--- a/test/core/bad_client/gen_build_yaml.py
+++ b/test/core/bad_client/gen_build_yaml.py
@@ -67,7 +67,9 @@ def main():
'vs_proj_dir': 'test/bad_client',
'deps': [
'grpc_test_util_unsecure',
- 'grpc_unsecure'
+ 'grpc_unsecure',
+ 'gpr_test_util',
+ 'gpr'
]
}],
'targets': [
@@ -82,7 +84,9 @@ def main():
'deps': [
'bad_client_test',
'grpc_test_util_unsecure',
- 'grpc_unsecure'
+ 'grpc_unsecure',
+ 'gpr_test_util',
+ 'gpr'
]
}
for t in sorted(BAD_CLIENT_TESTS.keys())]}
diff --git a/test/core/bad_ssl/gen_build_yaml.py b/test/core/bad_ssl/gen_build_yaml.py
index 9f05fed485..cc097a8fdf 100755
--- a/test/core/bad_ssl/gen_build_yaml.py
+++ b/test/core/bad_ssl/gen_build_yaml.py
@@ -58,7 +58,9 @@ def main():
'platforms': ['linux', 'posix', 'mac'],
'deps': [
'grpc_test_util',
- 'grpc'
+ 'grpc',
+ 'gpr_test_util',
+ 'gpr'
]
}
],
@@ -74,7 +76,9 @@ def main():
'deps': [
'bad_ssl_test_server',
'grpc_test_util',
- 'grpc'
+ 'grpc',
+ 'gpr_test_util',
+ 'gpr'
]
}
for t in sorted(BAD_CLIENT_TESTS.keys())] + [
@@ -88,7 +92,9 @@ def main():
'platforms': ['linux', 'posix', 'mac'],
'deps': [
'grpc_test_util',
- 'grpc'
+ 'grpc',
+ 'gpr_test_util',
+ 'gpr'
]
}
for t in sorted(BAD_CLIENT_TESTS.keys())]}
diff --git a/test/core/census/log_test.c b/test/core/census/log_test.c
new file mode 100644
index 0000000000..b68ca11504
--- /dev/null
+++ b/test/core/census/log_test.c
@@ -0,0 +1,589 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/census/log.h"
+#include <grpc/support/cpu.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include "test/core/util/test_config.h"
+
+// Change this to non-zero if you want more output.
+#define VERBOSE 0
+
+// Log size to use for all tests.
+#define LOG_SIZE_IN_MB 1
+#define LOG_SIZE_IN_BYTES (LOG_SIZE_IN_MB << 20)
+
+// Fills in 'record' of size 'size'. Each byte in record is filled in with the
+// same value. The value is extracted from 'record' pointer.
+static void write_record(char* record, size_t size) {
+ char data = (char)((uintptr_t)record % 255);
+ memset(record, data, size);
+}
+
+// Reads fixed size records. Returns the number of records read in
+// 'num_records'.
+static void read_records(size_t record_size, const char* buffer,
+ size_t buffer_size, int* num_records) {
+ GPR_ASSERT(buffer_size >= record_size);
+ GPR_ASSERT(buffer_size % record_size == 0);
+ *num_records = (int)(buffer_size / record_size);
+ for (int i = 0; i < *num_records; ++i) {
+ const char* record = buffer + (record_size * (size_t)i);
+ char data = (char)((uintptr_t)record % 255);
+ for (size_t j = 0; j < record_size; ++j) {
+ GPR_ASSERT(data == record[j]);
+ }
+ }
+}
+
+// Tries to write the specified number of records. Stops when the log gets
+// full. Returns the number of records written. Spins for random
+// number of times, up to 'max_spin_count', between writes.
+static int write_records_to_log(int writer_id, size_t record_size,
+ int num_records, int max_spin_count) {
+ int counter = 0;
+ for (int i = 0; i < num_records; ++i) {
+ int spin_count = max_spin_count ? rand() % max_spin_count : 0;
+ if (VERBOSE && (counter++ == num_records / 10)) {
+ printf(" Writer %d: %d out of %d written\n", writer_id, i, num_records);
+ counter = 0;
+ }
+ char* record = (char*)(census_log_start_write(record_size));
+ if (record == NULL) {
+ return i;
+ }
+ write_record(record, record_size);
+ census_log_end_write(record, record_size);
+ for (int j = 0; j < spin_count; ++j) {
+ GPR_ASSERT(j >= 0);
+ }
+ }
+ return num_records;
+}
+
+// Performs a single read iteration. Returns the number of records read.
+static int perform_read_iteration(size_t record_size) {
+ const void* read_buffer = NULL;
+ size_t bytes_available;
+ int records_read = 0;
+ census_log_init_reader();
+ while ((read_buffer = census_log_read_next(&bytes_available))) {
+ int num_records = 0;
+ read_records(record_size, (const char*)read_buffer, bytes_available,
+ &num_records);
+ records_read += num_records;
+ }
+ return records_read;
+}
+
+// Asserts that the log is empty.
+static void assert_log_empty(void) {
+ census_log_init_reader();
+ size_t bytes_available;
+ GPR_ASSERT(census_log_read_next(&bytes_available) == NULL);
+}
+
+// Fills the log and verifies data. If 'no fragmentation' is true, records
+// are sized such that CENSUS_LOG_2_MAX_RECORD_SIZE is a multiple of record
+// size. If not a circular log, verifies that the number of records written
+// match the number of records read.
+static void fill_log(size_t log_size, int no_fragmentation, int circular_log) {
+ size_t size;
+ if (no_fragmentation) {
+ int log2size = rand() % (CENSUS_LOG_2_MAX_RECORD_SIZE + 1);
+ size = ((size_t)1 << log2size);
+ } else {
+ while (1) {
+ size = 1 + ((size_t)rand() % CENSUS_LOG_MAX_RECORD_SIZE);
+ if (CENSUS_LOG_MAX_RECORD_SIZE % size) {
+ break;
+ }
+ }
+ }
+ int records_written =
+ write_records_to_log(0 /* writer id */, size,
+ (int)((log_size / size) * 2), 0 /* spin count */);
+ int records_read = perform_read_iteration(size);
+ if (!circular_log) {
+ GPR_ASSERT(records_written == records_read);
+ }
+ assert_log_empty();
+}
+
+// Structure to pass args to writer_thread
+typedef struct writer_thread_args {
+ // Index of this thread in the writers vector.
+ int index;
+ // Record size.
+ size_t record_size;
+ // Number of records to write.
+ int num_records;
+ // Used to signal when writer is complete
+ gpr_cv* done;
+ gpr_mu* mu;
+ int* count;
+} writer_thread_args;
+
+// Writes the given number of records of random size (up to kMaxRecordSize) and
+// random data to the specified log.
+static void writer_thread(void* arg) {
+ writer_thread_args* args = (writer_thread_args*)arg;
+ // Maximum number of times to spin between writes.
+ static const int MAX_SPIN_COUNT = 50;
+ int records_written = 0;
+ if (VERBOSE) {
+ printf(" Writer %d starting\n", args->index);
+ }
+ while (records_written < args->num_records) {
+ records_written += write_records_to_log(args->index, args->record_size,
+ args->num_records - records_written,
+ MAX_SPIN_COUNT);
+ if (records_written < args->num_records) {
+ // Ran out of log space. Sleep for a bit and let the reader catch up.
+ // This should never happen for circular logs.
+ if (VERBOSE) {
+ printf(
+ " Writer %d stalled due to out-of-space: %d out of %d "
+ "written\n",
+ args->index, records_written, args->num_records);
+ }
+ gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+ }
+ }
+ // Done. Decrement count and signal.
+ gpr_mu_lock(args->mu);
+ (*args->count)--;
+ gpr_cv_signal(args->done);
+ if (VERBOSE) {
+ printf(" Writer %d done\n", args->index);
+ }
+ gpr_mu_unlock(args->mu);
+}
+
+// struct to pass args to reader_thread
+typedef struct reader_thread_args {
+ // Record size.
+ size_t record_size;
+ // Interval between read iterations.
+ int read_iteration_interval_in_msec;
+ // Total number of records.
+ int total_records;
+ // Signalled when reader should stop.
+ gpr_cv stop;
+ int stop_flag;
+ // Used to signal when reader has finished
+ gpr_cv* done;
+ gpr_mu* mu;
+ int running;
+} reader_thread_args;
+
+// Reads and verifies the specified number of records. Reader can also be
+// stopped via gpr_cv_signal(&args->stop). Sleeps for 'read_interval_in_msec'
+// between read iterations.
+static void reader_thread(void* arg) {
+ reader_thread_args* args = (reader_thread_args*)arg;
+ if (VERBOSE) {
+ printf(" Reader starting\n");
+ }
+ gpr_timespec interval = gpr_time_from_micros(
+ args->read_iteration_interval_in_msec * 1000, GPR_TIMESPAN);
+ gpr_mu_lock(args->mu);
+ int records_read = 0;
+ int num_iterations = 0;
+ int counter = 0;
+ while (!args->stop_flag && records_read < args->total_records) {
+ gpr_cv_wait(&args->stop, args->mu, interval);
+ if (!args->stop_flag) {
+ records_read += perform_read_iteration(args->record_size);
+ GPR_ASSERT(records_read <= args->total_records);
+ if (VERBOSE && (counter++ == 100000)) {
+ printf(" Reader: %d out of %d read\n", records_read,
+ args->total_records);
+ counter = 0;
+ }
+ ++num_iterations;
+ }
+ }
+ // Done
+ args->running = 0;
+ gpr_cv_signal(args->done);
+ if (VERBOSE) {
+ printf(" Reader: records: %d, iterations: %d\n", records_read,
+ num_iterations);
+ }
+ gpr_mu_unlock(args->mu);
+}
+
+// Creates NUM_WRITERS writers where each writer writes NUM_RECORDS_PER_WRITER
+// records. Also, starts a reader that iterates over and reads blocks every
+// READ_ITERATION_INTERVAL_IN_MSEC.
+// Number of writers.
+#define NUM_WRITERS 5
+static void multiple_writers_single_reader(int circular_log) {
+ // Sleep interval between read iterations.
+ static const int READ_ITERATION_INTERVAL_IN_MSEC = 10;
+ // Maximum record size.
+ static const size_t MAX_RECORD_SIZE = 20;
+ // Number of records written by each writer. This is sized such that we
+ // will write through the entire log ~10 times.
+ const int NUM_RECORDS_PER_WRITER =
+ (int)((10 * census_log_remaining_space()) / (MAX_RECORD_SIZE / 2)) /
+ NUM_WRITERS;
+ size_t record_size = ((size_t)rand() % MAX_RECORD_SIZE) + 1;
+ // Create and start writers.
+ writer_thread_args writers[NUM_WRITERS];
+ int writers_count = NUM_WRITERS;
+ gpr_cv writers_done;
+ gpr_mu writers_mu; // protects writers_done and writers_count
+ gpr_cv_init(&writers_done);
+ gpr_mu_init(&writers_mu);
+ gpr_thd_id id;
+ for (int i = 0; i < NUM_WRITERS; ++i) {
+ writers[i].index = i;
+ writers[i].record_size = record_size;
+ writers[i].num_records = NUM_RECORDS_PER_WRITER;
+ writers[i].done = &writers_done;
+ writers[i].count = &writers_count;
+ writers[i].mu = &writers_mu;
+ gpr_thd_new(&id, &writer_thread, &writers[i], NULL);
+ }
+ // Start reader.
+ gpr_cv reader_done;
+ gpr_mu reader_mu; // protects reader_done and reader.running
+ reader_thread_args reader;
+ reader.record_size = record_size;
+ reader.read_iteration_interval_in_msec = READ_ITERATION_INTERVAL_IN_MSEC;
+ reader.total_records = NUM_WRITERS * NUM_RECORDS_PER_WRITER;
+ reader.stop_flag = 0;
+ gpr_cv_init(&reader.stop);
+ gpr_cv_init(&reader_done);
+ reader.done = &reader_done;
+ gpr_mu_init(&reader_mu);
+ reader.mu = &reader_mu;
+ reader.running = 1;
+ gpr_thd_new(&id, &reader_thread, &reader, NULL);
+ // Wait for writers to finish.
+ gpr_mu_lock(&writers_mu);
+ while (writers_count != 0) {
+ gpr_cv_wait(&writers_done, &writers_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&writers_mu);
+ gpr_mu_destroy(&writers_mu);
+ gpr_cv_destroy(&writers_done);
+ gpr_mu_lock(&reader_mu);
+ if (circular_log) {
+ // Stop reader.
+ reader.stop_flag = 1;
+ gpr_cv_signal(&reader.stop);
+ }
+ // wait for reader to finish
+ while (reader.running) {
+ gpr_cv_wait(&reader_done, &reader_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ if (circular_log) {
+ // Assert that there were no out-of-space errors.
+ GPR_ASSERT(0 == census_log_out_of_space_count());
+ }
+ gpr_mu_unlock(&reader_mu);
+ gpr_mu_destroy(&reader_mu);
+ gpr_cv_destroy(&reader_done);
+ if (VERBOSE) {
+ printf(" Reader: finished\n");
+ }
+}
+
+static void setup_test(int circular_log) {
+ census_log_initialize(LOG_SIZE_IN_MB, circular_log);
+ GPR_ASSERT(census_log_remaining_space() == LOG_SIZE_IN_BYTES);
+}
+
+// Attempts to create a record of invalid size (size >
+// CENSUS_LOG_MAX_RECORD_SIZE).
+void test_invalid_record_size(void) {
+ static const size_t INVALID_SIZE = CENSUS_LOG_MAX_RECORD_SIZE + 1;
+ static const size_t VALID_SIZE = 1;
+ printf("Starting test: invalid record size\n");
+ setup_test(0);
+ void* record = census_log_start_write(INVALID_SIZE);
+ GPR_ASSERT(record == NULL);
+ // Now try writing a valid record.
+ record = census_log_start_write(VALID_SIZE);
+ GPR_ASSERT(record != NULL);
+ census_log_end_write(record, VALID_SIZE);
+ // Verifies that available space went down by one block. In theory, this
+ // check can fail if the thread is context switched to a new CPU during the
+ // start_write execution (multiple blocks get allocated), but this has not
+ // been observed in practice.
+ GPR_ASSERT(LOG_SIZE_IN_BYTES - CENSUS_LOG_MAX_RECORD_SIZE ==
+ census_log_remaining_space());
+ census_log_shutdown();
+}
+
+// Tests end_write() with a different size than what was specified in
+// start_write().
+void test_end_write_with_different_size(void) {
+ static const size_t START_WRITE_SIZE = 10;
+ static const size_t END_WRITE_SIZE = 7;
+ printf("Starting test: end write with different size\n");
+ setup_test(0);
+ void* record_written = census_log_start_write(START_WRITE_SIZE);
+ GPR_ASSERT(record_written != NULL);
+ census_log_end_write(record_written, END_WRITE_SIZE);
+ census_log_init_reader();
+ size_t bytes_available;
+ const void* record_read = census_log_read_next(&bytes_available);
+ GPR_ASSERT(record_written == record_read);
+ GPR_ASSERT(END_WRITE_SIZE == bytes_available);
+ assert_log_empty();
+ census_log_shutdown();
+}
+
+// Verifies that pending records are not available via read_next().
+void test_read_pending_record(void) {
+ static const size_t PR_RECORD_SIZE = 1024;
+ printf("Starting test: read pending record\n");
+ setup_test(0);
+ // Start a write.
+ void* record_written = census_log_start_write(PR_RECORD_SIZE);
+ GPR_ASSERT(record_written != NULL);
+ // As write is pending, read should fail.
+ census_log_init_reader();
+ size_t bytes_available;
+ const void* record_read = census_log_read_next(&bytes_available);
+ GPR_ASSERT(record_read == NULL);
+ // A read followed by end_write() should succeed.
+ census_log_end_write(record_written, PR_RECORD_SIZE);
+ census_log_init_reader();
+ record_read = census_log_read_next(&bytes_available);
+ GPR_ASSERT(record_written == record_read);
+ GPR_ASSERT(PR_RECORD_SIZE == bytes_available);
+ assert_log_empty();
+ census_log_shutdown();
+}
+
+// Tries reading beyond pending write.
+void test_read_beyond_pending_record(void) {
+ printf("Starting test: read beyond pending record\n");
+ setup_test(0);
+ // Start a write.
+ const size_t incomplete_record_size = 10;
+ void* incomplete_record = census_log_start_write(incomplete_record_size);
+ GPR_ASSERT(incomplete_record != NULL);
+ const size_t complete_record_size = 20;
+ void* complete_record = census_log_start_write(complete_record_size);
+ GPR_ASSERT(complete_record != NULL);
+ GPR_ASSERT(complete_record != incomplete_record);
+ census_log_end_write(complete_record, complete_record_size);
+ // Now iterate over blocks to read completed records.
+ census_log_init_reader();
+ size_t bytes_available;
+ const void* record_read = census_log_read_next(&bytes_available);
+ GPR_ASSERT(complete_record == record_read);
+ GPR_ASSERT(complete_record_size == bytes_available);
+ // Complete first record.
+ census_log_end_write(incomplete_record, incomplete_record_size);
+ // Have read past the incomplete record, so read_next() should return NULL.
+ // NB: this test also assumes our thread did not get switched to a different
+ // CPU between the two start_write calls
+ record_read = census_log_read_next(&bytes_available);
+ GPR_ASSERT(record_read == NULL);
+ // Reset reader to get the newly completed record.
+ census_log_init_reader();
+ record_read = census_log_read_next(&bytes_available);
+ GPR_ASSERT(incomplete_record == record_read);
+ GPR_ASSERT(incomplete_record_size == bytes_available);
+ assert_log_empty();
+ census_log_shutdown();
+}
+
+// Tests scenario where block being read is detached from a core and put on the
+// dirty list.
+void test_detached_while_reading(void) {
+ printf("Starting test: detached while reading\n");
+ setup_test(0);
+ // Start a write.
+ static const size_t DWR_RECORD_SIZE = 10;
+ void* record_written = census_log_start_write(DWR_RECORD_SIZE);
+ GPR_ASSERT(record_written != NULL);
+ census_log_end_write(record_written, DWR_RECORD_SIZE);
+ // Read this record.
+ census_log_init_reader();
+ size_t bytes_available;
+ const void* record_read = census_log_read_next(&bytes_available);
+ GPR_ASSERT(record_read != NULL);
+ GPR_ASSERT(DWR_RECORD_SIZE == bytes_available);
+ // Now fill the log. This will move the block being read from core-local
+ // array to the dirty list.
+ while ((record_written = census_log_start_write(DWR_RECORD_SIZE))) {
+ census_log_end_write(record_written, DWR_RECORD_SIZE);
+ }
+
+ // In this iteration, read_next() should only traverse blocks in the
+ // core-local array. Therefore, we expect at most gpr_cpu_num_cores() more
+ // blocks. As log is full, if read_next() is traversing the dirty list, we
+ // will get more than gpr_cpu_num_cores() blocks.
+ int block_read = 0;
+ while ((record_read = census_log_read_next(&bytes_available))) {
+ ++block_read;
+ GPR_ASSERT(block_read <= (int)gpr_cpu_num_cores());
+ }
+ census_log_shutdown();
+}
+
+// Fills non-circular log with records sized such that size is a multiple of
+// CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation).
+void test_fill_log_no_fragmentation(void) {
+ printf("Starting test: fill log no fragmentation\n");
+ const int circular = 0;
+ setup_test(circular);
+ fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
+ census_log_shutdown();
+}
+
+// Fills circular log with records sized such that size is a multiple of
+// CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation).
+void test_fill_circular_log_no_fragmentation(void) {
+ printf("Starting test: fill circular log no fragmentation\n");
+ const int circular = 1;
+ setup_test(circular);
+ fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
+ census_log_shutdown();
+}
+
+// Fills non-circular log with records that may straddle end of a block.
+void test_fill_log_with_straddling_records(void) {
+ printf("Starting test: fill log with straddling records\n");
+ const int circular = 0;
+ setup_test(circular);
+ fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
+ census_log_shutdown();
+}
+
+// Fills circular log with records that may straddle end of a block.
+void test_fill_circular_log_with_straddling_records(void) {
+ printf("Starting test: fill circular log with straddling records\n");
+ const int circular = 1;
+ setup_test(circular);
+ fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
+ census_log_shutdown();
+}
+
+// Tests scenario where multiple writers and a single reader are using a log
+// that is configured to discard old records.
+void test_multiple_writers_circular_log(void) {
+ printf("Starting test: multiple writers circular log\n");
+ const int circular = 1;
+ setup_test(circular);
+ multiple_writers_single_reader(circular);
+ census_log_shutdown();
+}
+
+// Tests scenario where multiple writers and a single reader are using a log
+// that is configured to discard old records.
+void test_multiple_writers(void) {
+ printf("Starting test: multiple writers\n");
+ const int circular = 0;
+ setup_test(circular);
+ multiple_writers_single_reader(circular);
+ census_log_shutdown();
+}
+
+// Repeat the straddling records and multiple writers tests with a small log.
+void test_small_log(void) {
+ printf("Starting test: small log\n");
+ const int circular = 0;
+ census_log_initialize(0, circular);
+ size_t log_size = census_log_remaining_space();
+ GPR_ASSERT(log_size > 0);
+ fill_log(log_size, 0, circular);
+ census_log_shutdown();
+ census_log_initialize(0, circular);
+ multiple_writers_single_reader(circular);
+ census_log_shutdown();
+}
+
+void test_performance(void) {
+ for (size_t write_size = 1; write_size < CENSUS_LOG_MAX_RECORD_SIZE;
+ write_size *= 2) {
+ setup_test(0);
+ gpr_timespec start_time = gpr_now(GPR_CLOCK_REALTIME);
+ int nrecords = 0;
+ while (1) {
+ void* record = census_log_start_write(write_size);
+ if (record == NULL) {
+ break;
+ }
+ census_log_end_write(record, write_size);
+ nrecords++;
+ }
+ gpr_timespec write_time =
+ gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time);
+ double write_time_micro =
+ (double)write_time.tv_sec * 1000000 + (double)write_time.tv_nsec / 1000;
+ census_log_shutdown();
+ printf(
+ "Wrote %d %d byte records in %.3g microseconds: %g records/us "
+ "(%g ns/record), %g gigabytes/s\n",
+ nrecords, (int)write_size, write_time_micro,
+ nrecords / write_time_micro, 1000 * write_time_micro / nrecords,
+ (double)((int)write_size * nrecords) / write_time_micro / 1000);
+ }
+}
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ gpr_time_init();
+ srand((unsigned)gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
+ test_invalid_record_size();
+ test_end_write_with_different_size();
+ test_read_pending_record();
+ test_read_beyond_pending_record();
+ test_detached_while_reading();
+ test_fill_log_no_fragmentation();
+ test_fill_circular_log_no_fragmentation();
+ test_fill_log_with_straddling_records();
+ test_fill_circular_log_with_straddling_records();
+ test_small_log();
+ test_multiple_writers();
+ test_multiple_writers_circular_log();
+ test_performance();
+ return 0;
+}
diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py
index d4fa7ab518..f24dbe72cf 100755
--- a/test/core/end2end/gen_build_yaml.py
+++ b/test/core/end2end/gen_build_yaml.py
@@ -152,11 +152,15 @@ def main():
sec_deps = [
'end2end_certs',
'grpc_test_util',
- 'grpc'
+ 'grpc',
+ 'gpr_test_util',
+ 'gpr'
]
unsec_deps = [
'grpc_test_util_unsecure',
- 'grpc_unsecure'
+ 'grpc_unsecure',
+ 'gpr_test_util',
+ 'gpr'
]
json = {
'#': 'generated with test/end2end/gen_build_json.py',
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index 02db681cfd..b36aef3093 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -51,7 +51,7 @@ static grpc_channel *channel;
static grpc_completion_queue *cq;
static grpc_call *call;
static grpc_op ops[6];
-static grpc_op stream_init_op;
+static grpc_op stream_init_ops[2];
static grpc_op stream_step_ops[2];
static grpc_metadata_array initial_metadata_recv;
static grpc_metadata_array trailing_metadata_recv;
@@ -105,13 +105,17 @@ static void step_ping_pong_request(void) {
}
static void init_ping_pong_stream(void) {
+ grpc_metadata_array_init(&initial_metadata_recv);
+
grpc_call_error error;
call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
"/Reflector/reflectStream", "localhost",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
- stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA;
- stream_init_op.data.send_initial_metadata.count = 0;
- error = grpc_call_start_batch(call, &stream_init_op, 1, (void *)1, NULL);
+ stream_init_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ stream_init_ops[0].data.send_initial_metadata.count = 0;
+ stream_init_ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
+ stream_init_ops[1].data.recv_initial_metadata = &initial_metadata_recv;
+ error = grpc_call_start_batch(call, stream_init_ops, 2, (void *)1, NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 85e28732e4..2e253d8a8a 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -173,7 +173,7 @@ static void test_receive(int number_of_clients) {
grpc_exec_ctx_finish(&exec_ctx);
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {
grpc_pollset_destroy(p);
}
diff --git a/test/cpp/common/alarm_test.cc b/test/cpp/common/alarm_cpp_test.cc
index 09df6852a5..4745ef14ec 100644
--- a/test/cpp/common/alarm_test.cc
+++ b/test/cpp/common/alarm_cpp_test.cc
@@ -35,58 +35,47 @@
#include <grpc++/completion_queue.h>
#include <gtest/gtest.h>
-#include <grpc++/completion_queue.h>
#include "test/core/util/test_config.h"
namespace grpc {
namespace {
-class TestTag : public CompletionQueueTag {
- public:
- TestTag() : tag_(0) {}
- TestTag(intptr_t tag) : tag_(tag) {}
- bool FinalizeResult(void** tag, bool* status) { return true; }
- intptr_t tag() { return tag_; }
-
- private:
- intptr_t tag_;
-};
-
TEST(AlarmTest, RegularExpiry) {
CompletionQueue cq;
- TestTag input_tag(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), &input_tag);
+ void* junk = reinterpret_cast<void*>(1618033);
+ Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), junk);
- TestTag* output_tag;
+ void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
(void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_TRUE(ok);
- EXPECT_EQ(output_tag->tag(), input_tag.tag());
+ EXPECT_EQ(junk, output_tag);
}
TEST(AlarmTest, Cancellation) {
CompletionQueue cq;
- TestTag input_tag(1618033);
- Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), &input_tag);
+ void* junk = reinterpret_cast<void*>(1618033);
+ Alarm alarm(&cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(2), junk);
alarm.Cancel();
- TestTag* output_tag;
+ void* output_tag;
bool ok;
const CompletionQueue::NextStatus status = cq.AsyncNext(
(void**)&output_tag, &ok, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
EXPECT_EQ(status, CompletionQueue::GOT_EVENT);
EXPECT_FALSE(ok);
- EXPECT_EQ(output_tag->tag(), input_tag.tag());
+ EXPECT_EQ(junk, output_tag);
}
} // namespace
} // namespace grpc
int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 252bda3798..a194c615cd 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -479,8 +479,10 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_request.set_message("Hello");
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
+ std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
+ cli_ctx.AddMetadata(meta3.first, meta3.second);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@@ -494,6 +496,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ToString(client_initial_metadata.find(meta1.first)->second));
EXPECT_EQ(meta2.second,
ToString(client_initial_metadata.find(meta2.first)->second));
+ EXPECT_EQ(meta3.second,
+ ToString(client_initial_metadata.find(meta3.first)->second));
EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
send_response.set_message(recv_request.message());
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 50b2bf2514..c94a523fa1 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -41,6 +41,7 @@
#include <grpc++/support/byte_buffer.h>
#include <grpc++/support/slice.h>
#include <grpc/support/log.h>
+#include <grpc/support/time.h>
#include "src/proto/grpc/testing/payloads.grpc.pb.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -52,27 +53,8 @@
#include "test/cpp/util/create_test_channel.h"
namespace grpc {
-
-#if defined(__APPLE__)
-// Specialize Timepoint for high res clock as we need that
-template <>
-class TimePoint<std::chrono::high_resolution_clock::time_point> {
- public:
- TimePoint(const std::chrono::high_resolution_clock::time_point& time) {
- TimepointHR2Timespec(time, &time_);
- }
- gpr_timespec raw_time() const { return time_; }
-
- private:
- gpr_timespec time_;
-};
-#endif
-
namespace testing {
-typedef std::chrono::high_resolution_clock grpc_time_source;
-typedef std::chrono::time_point<grpc_time_source> grpc_time;
-
template <class RequestType>
class ClientRequestCreator {
public:
@@ -184,7 +166,7 @@ class Client {
// Set up the load distribution based on the number of threads
const auto& load = config.load_params();
- std::unique_ptr<RandomDist> random_dist;
+ std::unique_ptr<RandomDistInterface> random_dist;
switch (load.load_case()) {
case LoadParams::kClosedLoop:
// Closed-loop doesn't use random dist at all
@@ -218,25 +200,26 @@ class Client {
closed_loop_ = false;
// set up interarrival timer according to random dist
interarrival_timer_.init(*random_dist, num_threads);
+ const auto now = gpr_now(GPR_CLOCK_MONOTONIC);
for (size_t i = 0; i < num_threads; i++) {
- next_time_.push_back(
- grpc_time_source::now() +
- std::chrono::duration_cast<grpc_time_source::duration>(
- interarrival_timer_(i)));
+ next_time_.push_back(gpr_time_add(
+ now,
+ gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN)));
}
}
}
- bool NextIssueTime(int thread_idx, grpc_time* time_delay) {
- if (closed_loop_) {
- return false;
- } else {
- *time_delay = next_time_[thread_idx];
- next_time_[thread_idx] +=
- std::chrono::duration_cast<grpc_time_source::duration>(
- interarrival_timer_(thread_idx));
- return true;
- }
+ gpr_timespec NextIssueTime(int thread_idx) {
+ const gpr_timespec result = next_time_[thread_idx];
+ next_time_[thread_idx] =
+ gpr_time_add(next_time_[thread_idx],
+ gpr_time_from_nanos(interarrival_timer_.next(thread_idx),
+ GPR_TIMESPAN));
+ return result;
+ }
+ std::function<gpr_timespec()> NextIssuer(int thread_idx) {
+ return closed_loop_ ? std::function<gpr_timespec()>()
+ : std::bind(&Client::NextIssueTime, this, thread_idx);
}
private:
@@ -306,7 +289,7 @@ class Client {
Histogram* new_stats_;
Histogram histogram_;
Client* client_;
- size_t idx_;
+ const size_t idx_;
std::thread impl_;
};
@@ -314,7 +297,7 @@ class Client {
std::unique_ptr<Timer> timer_;
InterarrivalTimer interarrival_timer_;
- std::vector<grpc_time> next_time_;
+ std::vector<gpr_timespec> next_time_;
};
template <class StubType, class RequestType>
@@ -323,9 +306,9 @@ class ClientImpl : public Client {
ClientImpl(const ClientConfig& config,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub)
- : channels_(config.client_channels()), create_stub_(create_stub) {
- cores_ = LimitCores(config.core_list().data(), config.core_list_size());
-
+ : cores_(LimitCores(config.core_list().data(), config.core_list_size())),
+ channels_(config.client_channels()),
+ create_stub_(create_stub) {
for (int i = 0; i < config.client_channels(); i++) {
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config, create_stub_);
@@ -337,7 +320,7 @@ class ClientImpl : public Client {
virtual ~ClientImpl() {}
protected:
- int cores_;
+ const int cores_;
RequestType request_;
class ClientChannelInfo {
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index f3f8f37051..9e8767d103 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -43,9 +43,9 @@
#include <vector>
#include <gflags/gflags.h>
+#include <grpc++/alarm.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
-#include <grpc++/client_context.h>
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/cpu.h>
@@ -60,11 +60,9 @@
namespace grpc {
namespace testing {
-typedef std::list<grpc_time> deadline_list;
-
class ClientRpcContext {
public:
- explicit ClientRpcContext(int ch) : channel_id_(ch) {}
+ ClientRpcContext() {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
virtual bool RunNextState(bool, Histogram* hist) = 0;
@@ -74,72 +72,73 @@ class ClientRpcContext {
return reinterpret_cast<ClientRpcContext*>(t);
}
- deadline_list::iterator deadline_posn() const { return deadline_posn_; }
- void set_deadline_posn(const deadline_list::iterator& it) {
- deadline_posn_ = it;
- }
virtual void Start(CompletionQueue* cq) = 0;
- int channel_id() const { return channel_id_; }
-
- protected:
- int channel_id_;
-
- private:
- deadline_list::iterator deadline_posn_;
};
template <class RequestType, class ResponseType>
class ClientRpcContextUnaryImpl : public ClientRpcContext {
public:
ClientRpcContextUnaryImpl(
- int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+ BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<gpr_timespec()> next_issue,
std::function<
std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
- : ClientRpcContext(channel_id),
- context_(),
+ : context_(),
stub_(stub),
+ cq_(nullptr),
req_(req),
response_(),
- next_state_(&ClientRpcContextUnaryImpl::RespDone),
+ next_state_(State::READY),
callback_(on_done),
+ next_issue_(next_issue),
start_req_(start_req) {}
+ ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
void Start(CompletionQueue* cq) GRPC_OVERRIDE {
- start_ = Timer::Now();
- response_reader_ = start_req_(stub_, &context_, req_, cq);
- response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
+ cq_ = cq;
+ if (!next_issue_) { // ready to issue
+ RunNextState(true, nullptr);
+ } else { // wait for the issue time
+ alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ }
}
- ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
- bool ret = (this->*next_state_)(ok);
- if (!ret) {
- hist->Add((Timer::Now() - start_) * 1e9);
+ switch (next_state_) {
+ case State::READY:
+ start_ = Timer::Now();
+ response_reader_ = start_req_(stub_, &context_, req_, cq_);
+ response_reader_->Finish(&response_, &status_,
+ ClientRpcContext::tag(this));
+ next_state_ = State::RESP_DONE;
+ return true;
+ case State::RESP_DONE:
+ hist->Add((Timer::Now() - start_) * 1e9);
+ callback_(status_, &response_);
+ next_state_ = State::INVALID;
+ return false;
+ default:
+ GPR_ASSERT(false);
+ return false;
}
- return ret;
}
-
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
- return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_,
+ return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_,
callback_);
}
private:
- bool RespDone(bool) {
- next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
- return false;
- }
- bool DoCallBack(bool) {
- callback_(status_, &response_);
- return true; // we're done, this'll be ignored
- }
grpc::ClientContext context_;
BenchmarkService::Stub* stub_;
+ CompletionQueue* cq_;
+ std::unique_ptr<Alarm> alarm_;
RequestType req_;
ResponseType response_;
- bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
+ enum State { INVALID, READY, RESP_DONE };
+ State next_state_;
std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
CompletionQueue*)> start_req_;
@@ -157,49 +156,35 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
// member name resolution until the template types are fully resolved
public:
using Client::SetupLoadTest;
- using Client::NextIssueTime;
using Client::closed_loop_;
+ using Client::NextIssuer;
using ClientImpl<StubType, RequestType>::cores_;
using ClientImpl<StubType, RequestType>::channels_;
using ClientImpl<StubType, RequestType>::request_;
AsyncClient(const ClientConfig& config,
- std::function<ClientRpcContext*(int, StubType*,
- const RequestType&)> setup_ctx,
+ std::function<ClientRpcContext*(
+ StubType*, std::function<gpr_timespec()> next_issue,
+ const RequestType&)> setup_ctx,
std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
create_stub)
: ClientImpl<StubType, RequestType>(config, create_stub),
- num_async_threads_(NumThreads(config)),
- channel_lock_(new std::mutex[config.client_channels()]),
- contexts_(config.client_channels()),
- max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
- channel_count_(config.client_channels()),
- pref_channel_inc_(num_async_threads_) {
+ num_async_threads_(NumThreads(config)) {
SetupLoadTest(config, num_async_threads_);
for (int i = 0; i < num_async_threads_; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
- if (!closed_loop_) {
- rpc_deadlines_.emplace_back();
- next_channel_.push_back(i % channel_count_);
- issue_allowed_.emplace_back(true);
-
- grpc_time next_issue;
- NextIssueTime(i, &next_issue);
- next_issue_.push_back(next_issue);
- }
+ next_issuers_.emplace_back(NextIssuer(i));
}
+ using namespace std::placeholders;
int t = 0;
for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
- for (int ch = 0; ch < channel_count_; ch++) {
+ for (int ch = 0; ch < config.client_channels(); ch++) {
auto* cq = cli_cqs_[t].get();
+ auto ctx =
+ setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
+ ctx->Start(cq);
t = (t + 1) % cli_cqs_.size();
- auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_);
- if (closed_loop_) {
- ctx->Start(cq);
- } else {
- contexts_[ch].push_front(ctx);
- }
}
}
}
@@ -212,140 +197,34 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
delete ClientRpcContext::detag(got_tag);
}
}
- // Now clear out all the pre-allocated idle contexts
- for (int ch = 0; ch < channel_count_; ch++) {
- while (!contexts_[ch].empty()) {
- // Get an idle context from the front of the list
- auto* ctx = *(contexts_[ch].begin());
- contexts_[ch].pop_front();
- delete ctx;
- }
- }
- delete[] channel_lock_;
}
bool ThreadFunc(Histogram* histogram,
size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
void* got_tag;
bool ok;
- grpc_time deadline, short_deadline;
- if (closed_loop_) {
- deadline = grpc_time_source::now() + std::chrono::seconds(1);
- short_deadline = deadline;
- } else {
- if (rpc_deadlines_[thread_idx].empty()) {
- deadline = grpc_time_source::now() + std::chrono::seconds(1);
- } else {
- deadline = *(rpc_deadlines_[thread_idx].begin());
- }
- short_deadline =
- issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline;
- }
-
- bool got_event;
- switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
- case CompletionQueue::SHUTDOWN:
- return false;
- case CompletionQueue::TIMEOUT:
- got_event = false;
- break;
- case CompletionQueue::GOT_EVENT:
- got_event = true;
- break;
- default:
- GPR_ASSERT(false);
- break;
- }
- if (got_event) {
+ if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
+ // Got a regular event, so process it
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- if (ctx->RunNextState(ok, histogram) == false) {
- // call the callback and then clone the ctx
- ctx->RunNextState(ok, histogram);
- ClientRpcContext* clone_ctx = ctx->StartNewClone();
- if (closed_loop_) {
- clone_ctx->Start(cli_cqs_[thread_idx].get());
- } else {
- // Remove the entry from the rpc deadlines list
- rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
- // Put the clone_ctx in the list of idle contexts for this channel
- // Under lock
- int ch = clone_ctx->channel_id();
- std::lock_guard<std::mutex> g(channel_lock_[ch]);
- contexts_[ch].push_front(clone_ctx);
- }
+ if (!ctx->RunNextState(ok, histogram)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ auto clone = ctx->StartNewClone();
+ clone->Start(cli_cqs_[thread_idx].get());
// delete the old version
delete ctx;
}
- if (!closed_loop_)
- issue_allowed_[thread_idx] =
- true; // may be ok now even if it hadn't been
+ return true;
+ } else { // queue is shutting down
+ return false;
}
- if (!closed_loop_ && issue_allowed_[thread_idx] &&
- grpc_time_source::now() >= next_issue_[thread_idx]) {
- // Attempt to issue
- bool issued = false;
- for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
- num_attempts < channel_count_ && !issued; num_attempts++) {
- bool can_issue = false;
- ClientRpcContext* ctx = nullptr;
- {
- std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]);
- if (!contexts_[channel_attempt].empty()) {
- // Get an idle context from the front of the list
- ctx = *(contexts_[channel_attempt].begin());
- contexts_[channel_attempt].pop_front();
- can_issue = true;
- }
- }
- if (can_issue) {
- // do the work to issue
- rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() +
- std::chrono::seconds(1));
- auto it = rpc_deadlines_[thread_idx].end();
- --it;
- ctx->set_deadline_posn(it);
- ctx->Start(cli_cqs_[thread_idx].get());
- issued = true;
- // If we did issue, then next time, try our thread's next
- // preferred channel
- next_channel_[thread_idx] += pref_channel_inc_;
- if (next_channel_[thread_idx] >= channel_count_)
- next_channel_[thread_idx] = (thread_idx % channel_count_);
- } else {
- // Do a modular increment of channel attempt if we couldn't issue
- channel_attempt = (channel_attempt + 1) % channel_count_;
- }
- }
- if (issued) {
- // We issued one; see when we can issue the next
- grpc_time next_issue;
- NextIssueTime(thread_idx, &next_issue);
- next_issue_[thread_idx] = next_issue;
- } else {
- issue_allowed_[thread_idx] = false;
- }
- }
- return true;
}
protected:
- int num_async_threads_;
+ const int num_async_threads_;
private:
- class boolean { // exists only to avoid data-race on vector<bool>
- public:
- boolean() : val_(false) {}
- boolean(bool b) : val_(b) {}
- operator bool() const { return val_; }
- boolean& operator=(bool b) {
- val_ = b;
- return *this;
- }
-
- private:
- bool val_;
- };
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
@@ -356,18 +235,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
-
- std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
- std::vector<int> next_channel_; // per thread round-robin channel ctr
- std::vector<boolean> issue_allowed_; // may this thread attempt to issue
- std::vector<grpc_time> next_issue_; // when should it issue?
-
- std::mutex*
- channel_lock_; // a vector, but avoid std::vector for old compilers
- std::vector<context_list> contexts_; // per-channel list of idle contexts
- int max_outstanding_per_channel_;
- int channel_count_;
- int pref_channel_inc_;
+ std::vector<std::function<gpr_timespec()>> next_issuers_;
};
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@@ -391,11 +259,11 @@ class AsyncUnaryClient GRPC_FINAL
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
- static ClientRpcContext* SetupCtx(int channel_id,
- BenchmarkService::Stub* stub,
+ static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+ std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
- channel_id, stub, req, AsyncUnaryClient::StartReq,
+ stub, req, next_issue, AsyncUnaryClient::StartReq,
AsyncUnaryClient::CheckDone);
}
};
@@ -404,62 +272,94 @@ template <class RequestType, class ResponseType>
class ClientRpcContextStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextStreamingImpl(
- int channel_id, BenchmarkService::Stub* stub, const RequestType& req,
+ BenchmarkService::Stub* stub, const RequestType& req,
+ std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<
grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
void*)> start_req,
std::function<void(grpc::Status, ResponseType*)> on_done)
- : ClientRpcContext(channel_id),
- context_(),
+ : context_(),
stub_(stub),
+ cq_(nullptr),
req_(req),
response_(),
- next_state_(&ClientRpcContextStreamingImpl::ReqSent),
+ next_state_(State::INVALID),
callback_(on_done),
+ next_issue_(next_issue),
start_req_(start_req),
start_(Timer::Now()) {}
~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
+ void Start(CompletionQueue* cq) GRPC_OVERRIDE {
+ cq_ = cq;
+ stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
+ next_state_ = State::STREAM_IDLE;
+ }
bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
- return (this->*next_state_)(ok, hist);
+ while (true) {
+ switch (next_state_) {
+ case State::STREAM_IDLE:
+ if (!next_issue_) { // ready to issue
+ next_state_ = State::READY_TO_WRITE;
+ } else {
+ next_state_ = State::WAIT;
+ }
+ break; // loop around, don't return
+ case State::WAIT:
+ alarm_.reset(
+ new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ next_state_ = State::READY_TO_WRITE;
+ return true;
+ case State::READY_TO_WRITE:
+ if (!ok) {
+ return false;
+ }
+ start_ = Timer::Now();
+ next_state_ = State::WRITE_DONE;
+ stream_->Write(req_, ClientRpcContext::tag(this));
+ return true;
+ case State::WRITE_DONE:
+ if (!ok) {
+ return false;
+ }
+ next_state_ = State::READ_DONE;
+ stream_->Read(&response_, ClientRpcContext::tag(this));
+ return true;
+ break;
+ case State::READ_DONE:
+ hist->Add((Timer::Now() - start_) * 1e9);
+ callback_(status_, &response_);
+ next_state_ = State::STREAM_IDLE;
+ break; // loop around
+ default:
+ GPR_ASSERT(false);
+ return false;
+ }
+ }
}
ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
- return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_,
+ return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_,
start_req_, callback_);
}
- void Start(CompletionQueue* cq) GRPC_OVERRIDE {
- stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
- }
private:
- bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
- bool StartWrite(bool ok) {
- if (!ok) {
- return (false);
- }
- start_ = Timer::Now();
- next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
- stream_->Write(req_, ClientRpcContext::tag(this));
- return true;
- }
- bool WriteDone(bool ok, Histogram*) {
- if (!ok) {
- return (false);
- }
- next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
- stream_->Read(&response_, ClientRpcContext::tag(this));
- return true;
- }
- bool ReadDone(bool ok, Histogram* hist) {
- hist->Add((Timer::Now() - start_) * 1e9);
- return StartWrite(ok);
- }
grpc::ClientContext context_;
BenchmarkService::Stub* stub_;
+ CompletionQueue* cq_;
+ std::unique_ptr<Alarm> alarm_;
RequestType req_;
ResponseType response_;
- bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
+ enum State {
+ INVALID,
+ STREAM_IDLE,
+ WAIT,
+ READY_TO_WRITE,
+ WRITE_DONE,
+ READ_DONE
+ };
+ State next_state_;
std::function<void(grpc::Status, ResponseType*)> callback_;
+ std::function<gpr_timespec()> next_issue_;
std::function<
std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*,
@@ -475,9 +375,6 @@ class AsyncStreamingClient GRPC_FINAL
public:
explicit AsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, BenchmarkStubCreator) {
- // async streaming currently only supports closed loop
- GPR_ASSERT(closed_loop_);
-
StartThreads(num_async_threads_);
}
@@ -492,11 +389,11 @@ class AsyncStreamingClient GRPC_FINAL
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
- static ClientRpcContext* SetupCtx(int channel_id,
- BenchmarkService::Stub* stub,
+ static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
+ std::function<gpr_timespec()> next_issue,
const SimpleRequest& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
- channel_id, stub, req, AsyncStreamingClient::StartReq,
+ stub, req, next_issue, AsyncStreamingClient::StartReq,
AsyncStreamingClient::CheckDone);
}
};
@@ -504,64 +401,96 @@ class AsyncStreamingClient GRPC_FINAL
class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
public:
ClientRpcContextGenericStreamingImpl(
- int channel_id, grpc::GenericStub* stub, const ByteBuffer& req,
+ grpc::GenericStub* stub, const ByteBuffer& req,
+ std::function<gpr_timespec()> next_issue,
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*,
const grpc::string& method_name, CompletionQueue*, void*)> start_req,
std::function<void(grpc::Status, ByteBuffer*)> on_done)
- : ClientRpcContext(channel_id),
- context_(),
+ : context_(),
stub_(stub),
+ cq_(nullptr),
req_(req),
response_(),
- next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent),
+ next_state_(State::INVALID),
callback_(on_done),
+ next_issue_(next_issue),
start_req_(start_req),
start_(Timer::Now()) {}
~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
- return (this->*next_state_)(ok, hist);
- }
- ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
- return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_,
- start_req_, callback_);
- }
void Start(CompletionQueue* cq) GRPC_OVERRIDE {
+ cq_ = cq;
const grpc::string kMethodName(
"/grpc.testing.BenchmarkService/StreamingCall");
stream_ = start_req_(stub_, &context_, kMethodName, cq,
ClientRpcContext::tag(this));
+ next_state_ = State::STREAM_IDLE;
}
-
- private:
- bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
- bool StartWrite(bool ok) {
- if (!ok) {
- return (false);
- }
- start_ = Timer::Now();
- next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone;
- stream_->Write(req_, ClientRpcContext::tag(this));
- return true;
- }
- bool WriteDone(bool ok, Histogram*) {
- if (!ok) {
- return (false);
+ bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ while (true) {
+ switch (next_state_) {
+ case State::STREAM_IDLE:
+ if (!next_issue_) { // ready to issue
+ next_state_ = State::READY_TO_WRITE;
+ } else {
+ next_state_ = State::WAIT;
+ }
+ break; // loop around, don't return
+ case State::WAIT:
+ alarm_.reset(
+ new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
+ next_state_ = State::READY_TO_WRITE;
+ return true;
+ case State::READY_TO_WRITE:
+ if (!ok) {
+ return false;
+ }
+ start_ = Timer::Now();
+ next_state_ = State::WRITE_DONE;
+ stream_->Write(req_, ClientRpcContext::tag(this));
+ return true;
+ case State::WRITE_DONE:
+ if (!ok) {
+ return false;
+ }
+ next_state_ = State::READ_DONE;
+ stream_->Read(&response_, ClientRpcContext::tag(this));
+ return true;
+ break;
+ case State::READ_DONE:
+ hist->Add((Timer::Now() - start_) * 1e9);
+ callback_(status_, &response_);
+ next_state_ = State::STREAM_IDLE;
+ break; // loop around
+ default:
+ GPR_ASSERT(false);
+ return false;
+ }
}
- next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone;
- stream_->Read(&response_, ClientRpcContext::tag(this));
- return true;
}
- bool ReadDone(bool ok, Histogram* hist) {
- hist->Add((Timer::Now() - start_) * 1e9);
- return StartWrite(ok);
+ ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
+ return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_,
+ start_req_, callback_);
}
+
+ private:
grpc::ClientContext context_;
grpc::GenericStub* stub_;
+ CompletionQueue* cq_;
+ std::unique_ptr<Alarm> alarm_;
ByteBuffer req_;
ByteBuffer response_;
- bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*);
+ enum State {
+ INVALID,
+ STREAM_IDLE,
+ WAIT,
+ READY_TO_WRITE,
+ WRITE_DONE,
+ READ_DONE
+ };
+ State next_state_;
std::function<void(grpc::Status, ByteBuffer*)> callback_;
+ std::function<gpr_timespec()> next_issue_;
std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
CompletionQueue*, void*)> start_req_;
@@ -580,9 +509,6 @@ class GenericAsyncStreamingClient GRPC_FINAL
public:
explicit GenericAsyncStreamingClient(const ClientConfig& config)
: AsyncClient(config, SetupCtx, GenericStubCreator) {
- // async streaming currently only supports closed loop
- GPR_ASSERT(closed_loop_);
-
StartThreads(num_async_threads_);
}
@@ -596,10 +522,11 @@ class GenericAsyncStreamingClient GRPC_FINAL
auto stream = stub->Call(ctx, method_name, cq, tag);
return stream;
};
- static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub,
+ static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
+ std::function<gpr_timespec()> next_issue,
const ByteBuffer& req) {
return new ClientRpcContextGenericStreamingImpl(
- channel_id, stub, req, GenericAsyncStreamingClient::StartReq,
+ stub, req, next_issue, GenericAsyncStreamingClient::StartReq,
GenericAsyncStreamingClient::CheckDone);
}
};
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index d93537b279..edfc246a25 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -84,11 +84,8 @@ class SynchronousClient
protected:
void WaitToIssue(int thread_idx) {
- grpc_time next_time;
- if (NextIssueTime(thread_idx, &next_time)) {
- gpr_timespec next_timespec;
- TimepointHR2Timespec(next_time, &next_timespec);
- gpr_sleep_until(next_timespec);
+ if (!closed_loop_) {
+ gpr_sleep_until(NextIssueTime(thread_idx));
}
}
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 80f6ada409..1c7fdf8796 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -197,9 +197,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
workers.resize(num_clients + num_servers);
gpr_timespec deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(
- warmup_seconds + benchmark_seconds + 20, GPR_TIMESPAN));
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(warmup_seconds + benchmark_seconds + 20);
// Start servers
using runsc::ServerData;
diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h
index 841619e3ff..b6fd67b77c 100644
--- a/test/cpp/qps/interarrival.h
+++ b/test/cpp/qps/interarrival.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -51,15 +51,15 @@ namespace testing {
// stacks. Thus, this code only uses a uniform distribution of doubles [0,1)
// and then provides the distribution functions itself.
-class RandomDist {
+class RandomDistInterface {
public:
- RandomDist() {}
- virtual ~RandomDist() = 0;
- // Argument to operator() is a uniform double in the range [0,1)
- virtual double operator()(double uni) const = 0;
+ RandomDistInterface() {}
+ virtual ~RandomDistInterface() = 0;
+ // Argument to transform is a uniform double in the range [0,1)
+ virtual double transform(double uni) const = 0;
};
-inline RandomDist::~RandomDist() {}
+inline RandomDistInterface::~RandomDistInterface() {}
// ExpDist implements an exponential distribution, which is the
// interarrival distribution for a Poisson process. The parameter
@@ -69,11 +69,11 @@ inline RandomDist::~RandomDist() {}
// independent identical stationary sources. For more information,
// see http://en.wikipedia.org/wiki/Exponential_distribution
-class ExpDist GRPC_FINAL : public RandomDist {
+class ExpDist GRPC_FINAL : public RandomDistInterface {
public:
explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {}
~ExpDist() GRPC_OVERRIDE {}
- double operator()(double uni) const GRPC_OVERRIDE {
+ double transform(double uni) const GRPC_OVERRIDE {
// Note: Use 1.0-uni above to avoid NaN if uni is 0
return lambda_recip_ * (-log(1.0 - uni));
}
@@ -87,11 +87,11 @@ class ExpDist GRPC_FINAL : public RandomDist {
// mean interarrival time is (lo+hi)/2. For more information,
// see http://en.wikipedia.org/wiki/Uniform_distribution_%28continuous%29
-class UniformDist GRPC_FINAL : public RandomDist {
+class UniformDist GRPC_FINAL : public RandomDistInterface {
public:
UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {}
~UniformDist() GRPC_OVERRIDE {}
- double operator()(double uni) const GRPC_OVERRIDE {
+ double transform(double uni) const GRPC_OVERRIDE {
return uni * range_ + lo_;
}
@@ -106,11 +106,11 @@ class UniformDist GRPC_FINAL : public RandomDist {
// clients) will not preserve any deterministic interarrival gap across
// requests.
-class DetDist GRPC_FINAL : public RandomDist {
+class DetDist GRPC_FINAL : public RandomDistInterface {
public:
explicit DetDist(double val) : val_(val) {}
~DetDist() GRPC_OVERRIDE {}
- double operator()(double uni) const GRPC_OVERRIDE { return val_; }
+ double transform(double uni) const GRPC_OVERRIDE { return val_; }
private:
double val_;
@@ -123,12 +123,12 @@ class DetDist GRPC_FINAL : public RandomDist {
// good representation of the response times of data center jobs. See
// http://en.wikipedia.org/wiki/Pareto_distribution
-class ParetoDist GRPC_FINAL : public RandomDist {
+class ParetoDist GRPC_FINAL : public RandomDistInterface {
public:
ParetoDist(double base, double alpha)
: base_(base), alpha_recip_(1.0 / alpha) {}
~ParetoDist() GRPC_OVERRIDE {}
- double operator()(double uni) const GRPC_OVERRIDE {
+ double transform(double uni) const GRPC_OVERRIDE {
// Note: Use 1.0-uni above to avoid div by zero if uni is 0
return base_ / pow(1.0 - uni, alpha_recip_);
}
@@ -145,13 +145,14 @@ class ParetoDist GRPC_FINAL : public RandomDist {
class InterarrivalTimer {
public:
InterarrivalTimer() {}
- void init(const RandomDist& r, int threads, int entries = 1000000) {
+ void init(const RandomDistInterface& r, int threads, int entries = 1000000) {
for (int i = 0; i < entries; i++) {
// rand is the only choice that is portable across POSIX and Windows
// and that supports new and old compilers
- const double uniform_0_1 = rand() / RAND_MAX;
+ const double uniform_0_1 =
+ static_cast<double>(rand()) / static_cast<double>(RAND_MAX);
random_table_.push_back(
- std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1))));
+ static_cast<int64_t>(1e9 * r.transform(uniform_0_1)));
}
// Now set up the thread positions
for (int i = 0; i < threads; i++) {
@@ -160,7 +161,7 @@ class InterarrivalTimer {
}
virtual ~InterarrivalTimer(){};
- std::chrono::nanoseconds operator()(int thread_num) {
+ int64_t next(int thread_num) {
auto ret = *(thread_posns_[thread_num]++);
if (thread_posns_[thread_num] == random_table_.end())
thread_posns_[thread_num] = random_table_.begin();
@@ -168,7 +169,7 @@ class InterarrivalTimer {
}
private:
- typedef std::vector<std::chrono::nanoseconds> time_table;
+ typedef std::vector<int64_t> time_table;
std::vector<time_table::const_iterator> thread_posns_;
time_table random_table_;
};
diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc
index ccda28f09a..77e81fb84b 100644
--- a/test/cpp/qps/qps_interarrival_test.cc
+++ b/test/cpp/qps/qps_interarrival_test.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -39,17 +39,17 @@
#include "test/cpp/qps/interarrival.h"
-using grpc::testing::RandomDist;
+using grpc::testing::RandomDistInterface;
using grpc::testing::InterarrivalTimer;
-static void RunTest(RandomDist &&r, int threads, std::string title) {
+static void RunTest(RandomDistInterface &&r, int threads, std::string title) {
InterarrivalTimer timer;
timer.init(r, threads);
gpr_histogram *h(gpr_histogram_create(0.01, 60e9));
for (int i = 0; i < 10000000; i++) {
for (int j = 0; j < threads; j++) {
- gpr_histogram_add(h, timer(j).count());
+ gpr_histogram_add(h, timer.next(j));
}
}
@@ -70,7 +70,7 @@ using grpc::testing::ParetoDist;
int main(int argc, char **argv) {
RunTest(ExpDist(10.0), 5, std::string("Exponential(10)"));
RunTest(DetDist(5.0), 5, std::string("Det(5)"));
- RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(1,10)"));
+ RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(0,10)"));
RunTest(ParetoDist(1.0, 1.0), 5, std::string("Pareto(1,1)"));
return 0;
}
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index fe5f685b6e..0ac41d9f96 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -53,7 +53,7 @@ static void RunQPS() {
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_async_client_threads(8);
- client_config.set_rpc_type(UNARY);
+ client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_poisson()->set_offered_load(
1000.0);
diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc
index 15054db892..27aaf137f6 100644
--- a/test/cpp/qps/qps_test.cc
+++ b/test/cpp/qps/qps_test.cc
@@ -53,7 +53,7 @@ static void RunQPS() {
client_config.set_outstanding_rpcs_per_channel(1000);
client_config.set_client_channels(8);
client_config.set_async_client_threads(8);
- client_config.set_rpc_type(UNARY);
+ client_config.set_rpc_type(STREAMING);
client_config.mutable_load_params()->mutable_closed_loop();
ServerConfig server_config;
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 1302d718f0..2024e0bfef 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -51,6 +51,7 @@
#include <gtest/gtest.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
+#include "test/core/util/test_config.h"
#include "test/cpp/qps/server.h"
namespace grpc {
@@ -129,7 +130,7 @@ class AsyncQpsServerTest : public Server {
}
}
~AsyncQpsServerTest() {
- auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(10);
+ auto deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
server_->Shutdown(deadline);
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
(*ss)->set_shutdown();
diff --git a/test/distrib/csharp/run_distrib_test.sh b/test/distrib/csharp/run_distrib_test.sh
index 8ddb4c4bff..1de62041b3 100755
--- a/test/distrib/csharp/run_distrib_test.sh
+++ b/test/distrib/csharp/run_distrib_test.sh
@@ -34,8 +34,9 @@ cd $(dirname $0)
unzip -o "$EXTERNAL_GIT_ROOT/input_artifacts/csharp_nugets.zip" -d TestNugetFeed
-# TODO(jtattermusch): replace the version number
-./update_version.sh 0.13.0
+# Extract the version number from Grpc nuget package name.
+CSHARP_VERSION=$(ls TestNugetFeed | grep '^Grpc\.[0-9].*\.nupkg$' | sed s/^Grpc\.// | sed s/\.nupkg$//)
+./update_version.sh $CSHARP_VERSION
nuget restore
diff --git a/test/distrib/node/run_distrib_test.sh b/test/distrib/node/run_distrib_test.sh
index ac9e7d7090..9b8f15771b 100755
--- a/test/distrib/node/run_distrib_test.sh
+++ b/test/distrib/node/run_distrib_test.sh
@@ -28,19 +28,19 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+NODE_VERSION=$1
+source ~/.nvm/nvm.sh
set -ex
cd $(dirname $0)
-NODE_VERSION="$1"
-
-# make sure nvm is available
-source ~/.nvm/nvm.sh || true
-
nvm install $NODE_VERSION
npm install -g node-static
+# Kill off existing static servers
+kill -9 $(ps aux | grep '[n]ode .*static' | awk '{print $2}') || true
+
STATIC_SERVER=127.0.0.1
STATIC_PORT=8080
diff --git a/test/distrib/python/distribtest.py b/test/distrib/python/distribtest.py
index 66c1f88796..dc20688140 100644
--- a/test/distrib/python/distribtest.py
+++ b/test/distrib/python/distribtest.py
@@ -1,4 +1,4 @@
-# Copyright 2015-2016, Google Inc.
+# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
diff --git a/test/distrib/python/run_distrib_test.sh b/test/distrib/python/run_distrib_test.sh
index 08856ad388..8fd7ffb54c 100755
--- a/test/distrib/python/run_distrib_test.sh
+++ b/test/distrib/python/run_distrib_test.sh
@@ -32,11 +32,11 @@ set -ex
cd $(dirname $0)
-# TODO(jtattermusch): replace the version number
-SDIST_ARCHIVE="$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-0.12.0b8.tar.gz"
+# Pick up the source dist archive whatever its version is
+SDIST_ARCHIVE=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.tar.gz
BDIST_DIR="file://$EXTERNAL_GIT_ROOT/input_artifacts"
-if [ ! -f "${SDIST_ARCHIVE}" ]
+if [ ! -f ${SDIST_ARCHIVE} ]
then
echo "Archive ${SDIST_ARCHIVE} does not exist."
exit 1
@@ -52,7 +52,7 @@ $PIP install --upgrade six
GRPC_PYTHON_BINARIES_REPOSITORY="${BDIST_DIR}" \
$PIP install \
- "${SDIST_ARCHIVE}"
+ ${SDIST_ARCHIVE}
$PYTHON distribtest.py
diff --git a/test/distrib/ruby/Gemfile b/test/distrib/ruby/Gemfile
new file mode 100644
index 0000000000..96e68e9c34
--- /dev/null
+++ b/test/distrib/ruby/Gemfile
@@ -0,0 +1,11 @@
+# -*- ruby -*-
+# encoding: utf-8
+
+source 'https://rubygems.org/'
+
+# TODO(jtattermusch): don't hardcode the absolute path the local gem source
+source "file:///var/local/git/grpc/gem_source" do
+ gem 'grpc'
+end
+
+gemspec
diff --git a/test/distrib/ruby/distribtest.gemspec b/test/distrib/ruby/distribtest.gemspec
new file mode 100644
index 0000000000..d72892f46c
--- /dev/null
+++ b/test/distrib/ruby/distribtest.gemspec
@@ -0,0 +1,19 @@
+# -*- ruby -*-
+# encoding: utf-8
+
+Gem::Specification.new do |s|
+ s.name = 'distribtest'
+ s.version = '0.0.1'
+ s.authors = ['gRPC Authors']
+ s.email = 'jtattermusch@google.com'
+ s.homepage = 'https://github.com/grpc/grpc'
+ s.summary = 'gRPC Distribution test'
+
+ s.files = ['distribtest.rb']
+ s.executables = ['distribtest.rb']
+ s.platform = Gem::Platform::RUBY
+
+ s.add_dependency 'grpc', '>=0'
+
+ s.add_development_dependency 'bundler', '~> 1.7'
+end
diff --git a/test/distrib/ruby/distribtest.rb b/test/distrib/ruby/distribtest.rb
new file mode 100755
index 0000000000..3f656a89f3
--- /dev/null
+++ b/test/distrib/ruby/distribtest.rb
@@ -0,0 +1,39 @@
+#!/usr/bin/env ruby
+
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'grpc'
+
+# This code doesn't do much but makes sure the native extension is loaded
+# which is what we are testing here.
+ch = GRPC::Core::Channel.new('localhost:1000', nil, :this_channel_is_insecure)
+ch.destroy
+
+puts "Success!"
diff --git a/test/distrib/ruby/run_distrib_test.sh b/test/distrib/ruby/run_distrib_test.sh
index ae409f8dc5..be60e44b63 100755
--- a/test/distrib/ruby/run_distrib_test.sh
+++ b/test/distrib/ruby/run_distrib_test.sh
@@ -32,5 +32,13 @@ set -ex
cd $(dirname $0)
-# do something ruby-ish
+# Create an indexed local gem source with gRPC gems to test
+GEM_SOURCE=../../../gem_source
+mkdir -p ${GEM_SOURCE}/gems
+cp -r $EXTERNAL_GIT_ROOT/input_artifacts/*.gem ${GEM_SOURCE}/gems
+gem install builder
+gem generate_index --directory ${GEM_SOURCE}
+bundle install
+
+bundle exec ./distribtest.rb