diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/core/census/log_test.c | 589 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_uchannel.c | 2 | ||||
-rw-r--r-- | test/core/iomgr/tcp_client_posix_test.c | 26 | ||||
-rw-r--r-- | test/core/iomgr/udp_server_test.c | 4 | ||||
-rw-r--r-- | test/cpp/common/channel_arguments_test.cc | 191 | ||||
-rw-r--r-- | test/cpp/end2end/end2end_test.cc | 23 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 63 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 469 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 7 | ||||
-rw-r--r-- | test/cpp/qps/driver.cc | 4 | ||||
-rw-r--r-- | test/cpp/qps/interarrival.h | 41 | ||||
-rwxr-xr-x | test/cpp/qps/qps-sweep.sh | 59 | ||||
-rw-r--r-- | test/cpp/qps/qps_interarrival_test.cc | 10 | ||||
-rw-r--r-- | test/cpp/qps/qps_openloop_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/qps_test.cc | 2 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 3 |
16 files changed, 1049 insertions, 446 deletions
diff --git a/test/core/census/log_test.c b/test/core/census/log_test.c new file mode 100644 index 0000000000..b68ca11504 --- /dev/null +++ b/test/core/census/log_test.c @@ -0,0 +1,589 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/census/log.h" +#include <grpc/support/cpu.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include "test/core/util/test_config.h" + +// Change this to non-zero if you want more output. +#define VERBOSE 0 + +// Log size to use for all tests. +#define LOG_SIZE_IN_MB 1 +#define LOG_SIZE_IN_BYTES (LOG_SIZE_IN_MB << 20) + +// Fills in 'record' of size 'size'. Each byte in record is filled in with the +// same value. The value is extracted from 'record' pointer. +static void write_record(char* record, size_t size) { + char data = (char)((uintptr_t)record % 255); + memset(record, data, size); +} + +// Reads fixed size records. Returns the number of records read in +// 'num_records'. +static void read_records(size_t record_size, const char* buffer, + size_t buffer_size, int* num_records) { + GPR_ASSERT(buffer_size >= record_size); + GPR_ASSERT(buffer_size % record_size == 0); + *num_records = (int)(buffer_size / record_size); + for (int i = 0; i < *num_records; ++i) { + const char* record = buffer + (record_size * (size_t)i); + char data = (char)((uintptr_t)record % 255); + for (size_t j = 0; j < record_size; ++j) { + GPR_ASSERT(data == record[j]); + } + } +} + +// Tries to write the specified number of records. Stops when the log gets +// full. Returns the number of records written. Spins for random +// number of times, up to 'max_spin_count', between writes. +static int write_records_to_log(int writer_id, size_t record_size, + int num_records, int max_spin_count) { + int counter = 0; + for (int i = 0; i < num_records; ++i) { + int spin_count = max_spin_count ? rand() % max_spin_count : 0; + if (VERBOSE && (counter++ == num_records / 10)) { + printf(" Writer %d: %d out of %d written\n", writer_id, i, num_records); + counter = 0; + } + char* record = (char*)(census_log_start_write(record_size)); + if (record == NULL) { + return i; + } + write_record(record, record_size); + census_log_end_write(record, record_size); + for (int j = 0; j < spin_count; ++j) { + GPR_ASSERT(j >= 0); + } + } + return num_records; +} + +// Performs a single read iteration. Returns the number of records read. +static int perform_read_iteration(size_t record_size) { + const void* read_buffer = NULL; + size_t bytes_available; + int records_read = 0; + census_log_init_reader(); + while ((read_buffer = census_log_read_next(&bytes_available))) { + int num_records = 0; + read_records(record_size, (const char*)read_buffer, bytes_available, + &num_records); + records_read += num_records; + } + return records_read; +} + +// Asserts that the log is empty. +static void assert_log_empty(void) { + census_log_init_reader(); + size_t bytes_available; + GPR_ASSERT(census_log_read_next(&bytes_available) == NULL); +} + +// Fills the log and verifies data. If 'no fragmentation' is true, records +// are sized such that CENSUS_LOG_2_MAX_RECORD_SIZE is a multiple of record +// size. If not a circular log, verifies that the number of records written +// match the number of records read. +static void fill_log(size_t log_size, int no_fragmentation, int circular_log) { + size_t size; + if (no_fragmentation) { + int log2size = rand() % (CENSUS_LOG_2_MAX_RECORD_SIZE + 1); + size = ((size_t)1 << log2size); + } else { + while (1) { + size = 1 + ((size_t)rand() % CENSUS_LOG_MAX_RECORD_SIZE); + if (CENSUS_LOG_MAX_RECORD_SIZE % size) { + break; + } + } + } + int records_written = + write_records_to_log(0 /* writer id */, size, + (int)((log_size / size) * 2), 0 /* spin count */); + int records_read = perform_read_iteration(size); + if (!circular_log) { + GPR_ASSERT(records_written == records_read); + } + assert_log_empty(); +} + +// Structure to pass args to writer_thread +typedef struct writer_thread_args { + // Index of this thread in the writers vector. + int index; + // Record size. + size_t record_size; + // Number of records to write. + int num_records; + // Used to signal when writer is complete + gpr_cv* done; + gpr_mu* mu; + int* count; +} writer_thread_args; + +// Writes the given number of records of random size (up to kMaxRecordSize) and +// random data to the specified log. +static void writer_thread(void* arg) { + writer_thread_args* args = (writer_thread_args*)arg; + // Maximum number of times to spin between writes. + static const int MAX_SPIN_COUNT = 50; + int records_written = 0; + if (VERBOSE) { + printf(" Writer %d starting\n", args->index); + } + while (records_written < args->num_records) { + records_written += write_records_to_log(args->index, args->record_size, + args->num_records - records_written, + MAX_SPIN_COUNT); + if (records_written < args->num_records) { + // Ran out of log space. Sleep for a bit and let the reader catch up. + // This should never happen for circular logs. + if (VERBOSE) { + printf( + " Writer %d stalled due to out-of-space: %d out of %d " + "written\n", + args->index, records_written, args->num_records); + } + gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + } + } + // Done. Decrement count and signal. + gpr_mu_lock(args->mu); + (*args->count)--; + gpr_cv_signal(args->done); + if (VERBOSE) { + printf(" Writer %d done\n", args->index); + } + gpr_mu_unlock(args->mu); +} + +// struct to pass args to reader_thread +typedef struct reader_thread_args { + // Record size. + size_t record_size; + // Interval between read iterations. + int read_iteration_interval_in_msec; + // Total number of records. + int total_records; + // Signalled when reader should stop. + gpr_cv stop; + int stop_flag; + // Used to signal when reader has finished + gpr_cv* done; + gpr_mu* mu; + int running; +} reader_thread_args; + +// Reads and verifies the specified number of records. Reader can also be +// stopped via gpr_cv_signal(&args->stop). Sleeps for 'read_interval_in_msec' +// between read iterations. +static void reader_thread(void* arg) { + reader_thread_args* args = (reader_thread_args*)arg; + if (VERBOSE) { + printf(" Reader starting\n"); + } + gpr_timespec interval = gpr_time_from_micros( + args->read_iteration_interval_in_msec * 1000, GPR_TIMESPAN); + gpr_mu_lock(args->mu); + int records_read = 0; + int num_iterations = 0; + int counter = 0; + while (!args->stop_flag && records_read < args->total_records) { + gpr_cv_wait(&args->stop, args->mu, interval); + if (!args->stop_flag) { + records_read += perform_read_iteration(args->record_size); + GPR_ASSERT(records_read <= args->total_records); + if (VERBOSE && (counter++ == 100000)) { + printf(" Reader: %d out of %d read\n", records_read, + args->total_records); + counter = 0; + } + ++num_iterations; + } + } + // Done + args->running = 0; + gpr_cv_signal(args->done); + if (VERBOSE) { + printf(" Reader: records: %d, iterations: %d\n", records_read, + num_iterations); + } + gpr_mu_unlock(args->mu); +} + +// Creates NUM_WRITERS writers where each writer writes NUM_RECORDS_PER_WRITER +// records. Also, starts a reader that iterates over and reads blocks every +// READ_ITERATION_INTERVAL_IN_MSEC. +// Number of writers. +#define NUM_WRITERS 5 +static void multiple_writers_single_reader(int circular_log) { + // Sleep interval between read iterations. + static const int READ_ITERATION_INTERVAL_IN_MSEC = 10; + // Maximum record size. + static const size_t MAX_RECORD_SIZE = 20; + // Number of records written by each writer. This is sized such that we + // will write through the entire log ~10 times. + const int NUM_RECORDS_PER_WRITER = + (int)((10 * census_log_remaining_space()) / (MAX_RECORD_SIZE / 2)) / + NUM_WRITERS; + size_t record_size = ((size_t)rand() % MAX_RECORD_SIZE) + 1; + // Create and start writers. + writer_thread_args writers[NUM_WRITERS]; + int writers_count = NUM_WRITERS; + gpr_cv writers_done; + gpr_mu writers_mu; // protects writers_done and writers_count + gpr_cv_init(&writers_done); + gpr_mu_init(&writers_mu); + gpr_thd_id id; + for (int i = 0; i < NUM_WRITERS; ++i) { + writers[i].index = i; + writers[i].record_size = record_size; + writers[i].num_records = NUM_RECORDS_PER_WRITER; + writers[i].done = &writers_done; + writers[i].count = &writers_count; + writers[i].mu = &writers_mu; + gpr_thd_new(&id, &writer_thread, &writers[i], NULL); + } + // Start reader. + gpr_cv reader_done; + gpr_mu reader_mu; // protects reader_done and reader.running + reader_thread_args reader; + reader.record_size = record_size; + reader.read_iteration_interval_in_msec = READ_ITERATION_INTERVAL_IN_MSEC; + reader.total_records = NUM_WRITERS * NUM_RECORDS_PER_WRITER; + reader.stop_flag = 0; + gpr_cv_init(&reader.stop); + gpr_cv_init(&reader_done); + reader.done = &reader_done; + gpr_mu_init(&reader_mu); + reader.mu = &reader_mu; + reader.running = 1; + gpr_thd_new(&id, &reader_thread, &reader, NULL); + // Wait for writers to finish. + gpr_mu_lock(&writers_mu); + while (writers_count != 0) { + gpr_cv_wait(&writers_done, &writers_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&writers_mu); + gpr_mu_destroy(&writers_mu); + gpr_cv_destroy(&writers_done); + gpr_mu_lock(&reader_mu); + if (circular_log) { + // Stop reader. + reader.stop_flag = 1; + gpr_cv_signal(&reader.stop); + } + // wait for reader to finish + while (reader.running) { + gpr_cv_wait(&reader_done, &reader_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + if (circular_log) { + // Assert that there were no out-of-space errors. + GPR_ASSERT(0 == census_log_out_of_space_count()); + } + gpr_mu_unlock(&reader_mu); + gpr_mu_destroy(&reader_mu); + gpr_cv_destroy(&reader_done); + if (VERBOSE) { + printf(" Reader: finished\n"); + } +} + +static void setup_test(int circular_log) { + census_log_initialize(LOG_SIZE_IN_MB, circular_log); + GPR_ASSERT(census_log_remaining_space() == LOG_SIZE_IN_BYTES); +} + +// Attempts to create a record of invalid size (size > +// CENSUS_LOG_MAX_RECORD_SIZE). +void test_invalid_record_size(void) { + static const size_t INVALID_SIZE = CENSUS_LOG_MAX_RECORD_SIZE + 1; + static const size_t VALID_SIZE = 1; + printf("Starting test: invalid record size\n"); + setup_test(0); + void* record = census_log_start_write(INVALID_SIZE); + GPR_ASSERT(record == NULL); + // Now try writing a valid record. + record = census_log_start_write(VALID_SIZE); + GPR_ASSERT(record != NULL); + census_log_end_write(record, VALID_SIZE); + // Verifies that available space went down by one block. In theory, this + // check can fail if the thread is context switched to a new CPU during the + // start_write execution (multiple blocks get allocated), but this has not + // been observed in practice. + GPR_ASSERT(LOG_SIZE_IN_BYTES - CENSUS_LOG_MAX_RECORD_SIZE == + census_log_remaining_space()); + census_log_shutdown(); +} + +// Tests end_write() with a different size than what was specified in +// start_write(). +void test_end_write_with_different_size(void) { + static const size_t START_WRITE_SIZE = 10; + static const size_t END_WRITE_SIZE = 7; + printf("Starting test: end write with different size\n"); + setup_test(0); + void* record_written = census_log_start_write(START_WRITE_SIZE); + GPR_ASSERT(record_written != NULL); + census_log_end_write(record_written, END_WRITE_SIZE); + census_log_init_reader(); + size_t bytes_available; + const void* record_read = census_log_read_next(&bytes_available); + GPR_ASSERT(record_written == record_read); + GPR_ASSERT(END_WRITE_SIZE == bytes_available); + assert_log_empty(); + census_log_shutdown(); +} + +// Verifies that pending records are not available via read_next(). +void test_read_pending_record(void) { + static const size_t PR_RECORD_SIZE = 1024; + printf("Starting test: read pending record\n"); + setup_test(0); + // Start a write. + void* record_written = census_log_start_write(PR_RECORD_SIZE); + GPR_ASSERT(record_written != NULL); + // As write is pending, read should fail. + census_log_init_reader(); + size_t bytes_available; + const void* record_read = census_log_read_next(&bytes_available); + GPR_ASSERT(record_read == NULL); + // A read followed by end_write() should succeed. + census_log_end_write(record_written, PR_RECORD_SIZE); + census_log_init_reader(); + record_read = census_log_read_next(&bytes_available); + GPR_ASSERT(record_written == record_read); + GPR_ASSERT(PR_RECORD_SIZE == bytes_available); + assert_log_empty(); + census_log_shutdown(); +} + +// Tries reading beyond pending write. +void test_read_beyond_pending_record(void) { + printf("Starting test: read beyond pending record\n"); + setup_test(0); + // Start a write. + const size_t incomplete_record_size = 10; + void* incomplete_record = census_log_start_write(incomplete_record_size); + GPR_ASSERT(incomplete_record != NULL); + const size_t complete_record_size = 20; + void* complete_record = census_log_start_write(complete_record_size); + GPR_ASSERT(complete_record != NULL); + GPR_ASSERT(complete_record != incomplete_record); + census_log_end_write(complete_record, complete_record_size); + // Now iterate over blocks to read completed records. + census_log_init_reader(); + size_t bytes_available; + const void* record_read = census_log_read_next(&bytes_available); + GPR_ASSERT(complete_record == record_read); + GPR_ASSERT(complete_record_size == bytes_available); + // Complete first record. + census_log_end_write(incomplete_record, incomplete_record_size); + // Have read past the incomplete record, so read_next() should return NULL. + // NB: this test also assumes our thread did not get switched to a different + // CPU between the two start_write calls + record_read = census_log_read_next(&bytes_available); + GPR_ASSERT(record_read == NULL); + // Reset reader to get the newly completed record. + census_log_init_reader(); + record_read = census_log_read_next(&bytes_available); + GPR_ASSERT(incomplete_record == record_read); + GPR_ASSERT(incomplete_record_size == bytes_available); + assert_log_empty(); + census_log_shutdown(); +} + +// Tests scenario where block being read is detached from a core and put on the +// dirty list. +void test_detached_while_reading(void) { + printf("Starting test: detached while reading\n"); + setup_test(0); + // Start a write. + static const size_t DWR_RECORD_SIZE = 10; + void* record_written = census_log_start_write(DWR_RECORD_SIZE); + GPR_ASSERT(record_written != NULL); + census_log_end_write(record_written, DWR_RECORD_SIZE); + // Read this record. + census_log_init_reader(); + size_t bytes_available; + const void* record_read = census_log_read_next(&bytes_available); + GPR_ASSERT(record_read != NULL); + GPR_ASSERT(DWR_RECORD_SIZE == bytes_available); + // Now fill the log. This will move the block being read from core-local + // array to the dirty list. + while ((record_written = census_log_start_write(DWR_RECORD_SIZE))) { + census_log_end_write(record_written, DWR_RECORD_SIZE); + } + + // In this iteration, read_next() should only traverse blocks in the + // core-local array. Therefore, we expect at most gpr_cpu_num_cores() more + // blocks. As log is full, if read_next() is traversing the dirty list, we + // will get more than gpr_cpu_num_cores() blocks. + int block_read = 0; + while ((record_read = census_log_read_next(&bytes_available))) { + ++block_read; + GPR_ASSERT(block_read <= (int)gpr_cpu_num_cores()); + } + census_log_shutdown(); +} + +// Fills non-circular log with records sized such that size is a multiple of +// CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). +void test_fill_log_no_fragmentation(void) { + printf("Starting test: fill log no fragmentation\n"); + const int circular = 0; + setup_test(circular); + fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular); + census_log_shutdown(); +} + +// Fills circular log with records sized such that size is a multiple of +// CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). +void test_fill_circular_log_no_fragmentation(void) { + printf("Starting test: fill circular log no fragmentation\n"); + const int circular = 1; + setup_test(circular); + fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular); + census_log_shutdown(); +} + +// Fills non-circular log with records that may straddle end of a block. +void test_fill_log_with_straddling_records(void) { + printf("Starting test: fill log with straddling records\n"); + const int circular = 0; + setup_test(circular); + fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular); + census_log_shutdown(); +} + +// Fills circular log with records that may straddle end of a block. +void test_fill_circular_log_with_straddling_records(void) { + printf("Starting test: fill circular log with straddling records\n"); + const int circular = 1; + setup_test(circular); + fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular); + census_log_shutdown(); +} + +// Tests scenario where multiple writers and a single reader are using a log +// that is configured to discard old records. +void test_multiple_writers_circular_log(void) { + printf("Starting test: multiple writers circular log\n"); + const int circular = 1; + setup_test(circular); + multiple_writers_single_reader(circular); + census_log_shutdown(); +} + +// Tests scenario where multiple writers and a single reader are using a log +// that is configured to discard old records. +void test_multiple_writers(void) { + printf("Starting test: multiple writers\n"); + const int circular = 0; + setup_test(circular); + multiple_writers_single_reader(circular); + census_log_shutdown(); +} + +// Repeat the straddling records and multiple writers tests with a small log. +void test_small_log(void) { + printf("Starting test: small log\n"); + const int circular = 0; + census_log_initialize(0, circular); + size_t log_size = census_log_remaining_space(); + GPR_ASSERT(log_size > 0); + fill_log(log_size, 0, circular); + census_log_shutdown(); + census_log_initialize(0, circular); + multiple_writers_single_reader(circular); + census_log_shutdown(); +} + +void test_performance(void) { + for (size_t write_size = 1; write_size < CENSUS_LOG_MAX_RECORD_SIZE; + write_size *= 2) { + setup_test(0); + gpr_timespec start_time = gpr_now(GPR_CLOCK_REALTIME); + int nrecords = 0; + while (1) { + void* record = census_log_start_write(write_size); + if (record == NULL) { + break; + } + census_log_end_write(record, write_size); + nrecords++; + } + gpr_timespec write_time = + gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time); + double write_time_micro = + (double)write_time.tv_sec * 1000000 + (double)write_time.tv_nsec / 1000; + census_log_shutdown(); + printf( + "Wrote %d %d byte records in %.3g microseconds: %g records/us " + "(%g ns/record), %g gigabytes/s\n", + nrecords, (int)write_size, write_time_micro, + nrecords / write_time_micro, 1000 * write_time_micro / nrecords, + (double)((int)write_size * nrecords) / write_time_micro / 1000); + } +} + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + gpr_time_init(); + srand((unsigned)gpr_now(GPR_CLOCK_REALTIME).tv_nsec); + test_invalid_record_size(); + test_end_write_with_different_size(); + test_read_pending_record(); + test_read_beyond_pending_record(); + test_detached_while_reading(); + test_fill_log_no_fragmentation(); + test_fill_circular_log_no_fragmentation(); + test_fill_log_with_straddling_records(); + test_fill_circular_log_with_straddling_records(); + test_small_log(); + test_multiple_writers(); + test_multiple_writers_circular_log(); + test_performance(); + return 0; +} diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 5ab64f9800..dbdd3524ed 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -159,7 +159,7 @@ static grpc_subchannel *subchannel_factory_create_subchannel( c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); args->args = final_args; - s = grpc_subchannel_create(&c->base, args); + s = grpc_subchannel_create(exec_ctx, &c->base, args); grpc_connector_unref(exec_ctx, &c->base); grpc_channel_args_destroy(final_args); if (*f->sniffed_subchannel) { diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 9725d8a3b6..b57478059f 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -45,6 +45,7 @@ #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/timer.h" #include "test/core/util/test_config.h" static grpc_pollset_set g_pollset_set; @@ -125,11 +126,13 @@ void test_succeeds(void) { gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + + grpc_exec_ctx_finish(&exec_ctx); } void test_fails(void) { @@ -159,14 +162,18 @@ void test_fails(void) { /* wait for the connection callback to finish */ while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), test_deadline()); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec polling_deadline = test_deadline(); + if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); + } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); } void test_times_out(void) { @@ -243,15 +250,18 @@ void test_times_out(void) { GPR_ASSERT(g_connections_complete == connections_complete_before + is_after_deadline); } - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + gpr_timespec polling_deadline = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10); + if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); + } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + close(svr_fd); for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { close(client_fd[i]); diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 85e28732e4..2e253d8a8a 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -173,7 +173,7 @@ static void test_receive(int number_of_clients) { grpc_exec_ctx_finish(&exec_ctx); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { grpc_pollset_destroy(p); } diff --git a/test/cpp/common/channel_arguments_test.cc b/test/cpp/common/channel_arguments_test.cc index e010d375cf..a4821b4d0b 100644 --- a/test/cpp/common/channel_arguments_test.cc +++ b/test/cpp/common/channel_arguments_test.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,94 +41,141 @@ namespace testing { class ChannelArgumentsTest : public ::testing::Test { protected: + ChannelArgumentsTest() + : pointer_vtable_({&ChannelArguments::PointerVtableMembers::Copy, + &ChannelArguments::PointerVtableMembers::Destroy, + &ChannelArguments::PointerVtableMembers::Compare}) {} + void SetChannelArgs(const ChannelArguments& channel_args, grpc_channel_args* args) { channel_args.SetChannelArgs(args); } + + grpc::string GetDefaultUserAgentPrefix() { + std::ostringstream user_agent_prefix; + user_agent_prefix << "grpc-c++/" << grpc_version_string(); + return user_agent_prefix.str(); + } + + void VerifyDefaultChannelArgs() { + grpc_channel_args args; + SetChannelArgs(channel_args_, &args); + EXPECT_EQ(static_cast<size_t>(1), args.num_args); + EXPECT_STREQ(GRPC_ARG_PRIMARY_USER_AGENT_STRING, args.args[0].key); + EXPECT_EQ(GetDefaultUserAgentPrefix(), + grpc::string(args.args[0].value.string)); + } + + bool HasArg(grpc_arg expected_arg) { + grpc_channel_args args; + SetChannelArgs(channel_args_, &args); + for (size_t i = 0; i < args.num_args; i++) { + const grpc_arg& arg = args.args[i]; + if (arg.type == expected_arg.type && + grpc::string(arg.key) == expected_arg.key) { + if (arg.type == GRPC_ARG_INTEGER) { + return arg.value.integer == expected_arg.value.integer; + } else if (arg.type == GRPC_ARG_STRING) { + return grpc::string(arg.value.string) == expected_arg.value.string; + } else if (arg.type == GRPC_ARG_POINTER) { + return arg.value.pointer.p == expected_arg.value.pointer.p && + arg.value.pointer.vtable->copy == + expected_arg.value.pointer.vtable->copy && + arg.value.pointer.vtable->destroy == + expected_arg.value.pointer.vtable->destroy; + } + } + } + return false; + } + grpc_arg_pointer_vtable pointer_vtable_; + ChannelArguments channel_args_; }; TEST_F(ChannelArgumentsTest, SetInt) { - grpc_channel_args args; - ChannelArguments channel_args; - // Empty arguments. - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(0), args.num_args); - - grpc::string key("key0"); - channel_args.SetInt(key, 0); + VerifyDefaultChannelArgs(); + grpc::string key0("key0"); + grpc_arg arg0; + arg0.type = GRPC_ARG_INTEGER; + arg0.key = const_cast<char*>(key0.c_str()); + arg0.value.integer = 0; + grpc::string key1("key1"); + grpc_arg arg1; + arg1.type = GRPC_ARG_INTEGER; + arg1.key = const_cast<char*>(key1.c_str()); + arg1.value.integer = 1; + + grpc::string arg_key0(key0); + channel_args_.SetInt(arg_key0, arg0.value.integer); // Clear key early to make sure channel_args takes a copy - key = ""; - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(1), args.num_args); - EXPECT_EQ(GRPC_ARG_INTEGER, args.args[0].type); - EXPECT_STREQ("key0", args.args[0].key); - EXPECT_EQ(0, args.args[0].value.integer); - - key = "key1"; - channel_args.SetInt(key, 1); - key = ""; - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(2), args.num_args); - // We do not enforce order on the arguments. - for (size_t i = 0; i < args.num_args; i++) { - EXPECT_EQ(GRPC_ARG_INTEGER, args.args[i].type); - if (grpc::string(args.args[i].key) == "key0") { - EXPECT_EQ(0, args.args[i].value.integer); - } else if (grpc::string(args.args[i].key) == "key1") { - EXPECT_EQ(1, args.args[i].value.integer); - } - } + arg_key0.clear(); + EXPECT_TRUE(HasArg(arg0)); + + grpc::string arg_key1(key1); + channel_args_.SetInt(arg_key1, arg1.value.integer); + arg_key1.clear(); + EXPECT_TRUE(HasArg(arg0)); + EXPECT_TRUE(HasArg(arg1)); } TEST_F(ChannelArgumentsTest, SetString) { - grpc_channel_args args; - ChannelArguments channel_args; - // Empty arguments. - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(0), args.num_args); - - grpc::string key("key0"); - grpc::string val("val0"); - channel_args.SetString(key, val); + VerifyDefaultChannelArgs(); + grpc::string key0("key0"); + grpc::string val0("val0"); + grpc_arg arg0; + arg0.type = GRPC_ARG_STRING; + arg0.key = const_cast<char*>(key0.c_str()); + arg0.value.string = const_cast<char*>(val0.c_str()); + grpc::string key1("key1"); + grpc::string val1("val1"); + grpc_arg arg1; + arg1.type = GRPC_ARG_STRING; + arg1.key = const_cast<char*>(key1.c_str()); + arg1.value.string = const_cast<char*>(val1.c_str()); + + grpc::string key(key0); + grpc::string val(val0); + channel_args_.SetString(key, val); // Clear key/val early to make sure channel_args takes a copy key = ""; val = ""; - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(1), args.num_args); - EXPECT_EQ(GRPC_ARG_STRING, args.args[0].type); - EXPECT_STREQ("key0", args.args[0].key); - EXPECT_STREQ("val0", args.args[0].value.string); - - key = "key1"; - val = "val1"; - channel_args.SetString(key, val); - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(2), args.num_args); - // We do not enforce order on the arguments. - for (size_t i = 0; i < args.num_args; i++) { - EXPECT_EQ(GRPC_ARG_STRING, args.args[i].type); - if (grpc::string(args.args[i].key) == "key0") { - EXPECT_STREQ("val0", args.args[i].value.string); - } else if (grpc::string(args.args[i].key) == "key1") { - EXPECT_STREQ("val1", args.args[i].value.string); - } - } + EXPECT_TRUE(HasArg(arg0)); + + key = key1; + val = val1; + channel_args_.SetString(key, val); + // Clear key/val early to make sure channel_args takes a copy + key = ""; + val = ""; + EXPECT_TRUE(HasArg(arg0)); + EXPECT_TRUE(HasArg(arg1)); } TEST_F(ChannelArgumentsTest, SetPointer) { - grpc_channel_args args; - ChannelArguments channel_args; - // Empty arguments. - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(0), args.num_args); - - grpc::string key("key0"); - channel_args.SetPointer(key, &key); - SetChannelArgs(channel_args, &args); - EXPECT_EQ(static_cast<size_t>(1), args.num_args); - EXPECT_EQ(GRPC_ARG_POINTER, args.args[0].type); - EXPECT_STREQ("key0", args.args[0].key); - EXPECT_EQ(&key, args.args[0].value.pointer.p); + VerifyDefaultChannelArgs(); + grpc::string key0("key0"); + grpc_arg arg0; + arg0.type = GRPC_ARG_POINTER; + arg0.key = const_cast<char*>(key0.c_str()); + arg0.value.pointer.p = &key0; + arg0.value.pointer.vtable = &pointer_vtable_; + + grpc::string key(key0); + channel_args_.SetPointer(key, arg0.value.pointer.p); + EXPECT_TRUE(HasArg(arg0)); +} + +TEST_F(ChannelArgumentsTest, SetUserAgentPrefix) { + VerifyDefaultChannelArgs(); + grpc::string prefix("prefix"); + grpc::string whole_prefix = prefix + " " + GetDefaultUserAgentPrefix(); + grpc_arg arg0; + arg0.type = GRPC_ARG_STRING; + arg0.key = const_cast<char*>(GRPC_ARG_PRIMARY_USER_AGENT_STRING); + arg0.value.string = const_cast<char*>(whole_prefix.c_str()); + + channel_args_.SetUserAgentPrefix(prefix); + EXPECT_TRUE(HasArg(arg0)); } } // namespace testing diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 65da71b391..c8523847ab 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -252,6 +252,9 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { args.SetSslTargetNameOverride("foo.test.google.fr"); channel_creds = SslCredentials(ssl_opts); } + if (!user_agent_prefix_.empty()) { + args.SetUserAgentPrefix(user_agent_prefix_); + } args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); channel_ = CreateCustomChannel(server_address_.str(), channel_creds, args); } @@ -285,6 +288,7 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> { TestServiceImpl service_; TestServiceImpl special_service_; TestServiceImplDupPkg dup_pkg_service_; + grpc::string user_agent_prefix_; }; static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, @@ -601,6 +605,25 @@ TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); } +TEST_P(End2endTest, SimpleRpcWithCustomeUserAgentPrefix) { + user_agent_prefix_ = "custom_prefix"; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello hello hello hello"); + request.mutable_param()->set_echo_metadata(true); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + const auto& trailing_metadata = context.GetServerTrailingMetadata(); + auto iter = trailing_metadata.find("user-agent"); + EXPECT_TRUE(iter != trailing_metadata.end()); + grpc::string expected_prefix = user_agent_prefix_ + " grpc-c++/"; + EXPECT_TRUE(iter->second.starts_with(expected_prefix)); +} + TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { ResetStub(); std::vector<std::thread*> threads; diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 50b2bf2514..c94a523fa1 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -41,6 +41,7 @@ #include <grpc++/support/byte_buffer.h> #include <grpc++/support/slice.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> #include "src/proto/grpc/testing/payloads.grpc.pb.h" #include "src/proto/grpc/testing/services.grpc.pb.h" @@ -52,27 +53,8 @@ #include "test/cpp/util/create_test_channel.h" namespace grpc { - -#if defined(__APPLE__) -// Specialize Timepoint for high res clock as we need that -template <> -class TimePoint<std::chrono::high_resolution_clock::time_point> { - public: - TimePoint(const std::chrono::high_resolution_clock::time_point& time) { - TimepointHR2Timespec(time, &time_); - } - gpr_timespec raw_time() const { return time_; } - - private: - gpr_timespec time_; -}; -#endif - namespace testing { -typedef std::chrono::high_resolution_clock grpc_time_source; -typedef std::chrono::time_point<grpc_time_source> grpc_time; - template <class RequestType> class ClientRequestCreator { public: @@ -184,7 +166,7 @@ class Client { // Set up the load distribution based on the number of threads const auto& load = config.load_params(); - std::unique_ptr<RandomDist> random_dist; + std::unique_ptr<RandomDistInterface> random_dist; switch (load.load_case()) { case LoadParams::kClosedLoop: // Closed-loop doesn't use random dist at all @@ -218,25 +200,26 @@ class Client { closed_loop_ = false; // set up interarrival timer according to random dist interarrival_timer_.init(*random_dist, num_threads); + const auto now = gpr_now(GPR_CLOCK_MONOTONIC); for (size_t i = 0; i < num_threads; i++) { - next_time_.push_back( - grpc_time_source::now() + - std::chrono::duration_cast<grpc_time_source::duration>( - interarrival_timer_(i))); + next_time_.push_back(gpr_time_add( + now, + gpr_time_from_nanos(interarrival_timer_.next(i), GPR_TIMESPAN))); } } } - bool NextIssueTime(int thread_idx, grpc_time* time_delay) { - if (closed_loop_) { - return false; - } else { - *time_delay = next_time_[thread_idx]; - next_time_[thread_idx] += - std::chrono::duration_cast<grpc_time_source::duration>( - interarrival_timer_(thread_idx)); - return true; - } + gpr_timespec NextIssueTime(int thread_idx) { + const gpr_timespec result = next_time_[thread_idx]; + next_time_[thread_idx] = + gpr_time_add(next_time_[thread_idx], + gpr_time_from_nanos(interarrival_timer_.next(thread_idx), + GPR_TIMESPAN)); + return result; + } + std::function<gpr_timespec()> NextIssuer(int thread_idx) { + return closed_loop_ ? std::function<gpr_timespec()>() + : std::bind(&Client::NextIssueTime, this, thread_idx); } private: @@ -306,7 +289,7 @@ class Client { Histogram* new_stats_; Histogram histogram_; Client* client_; - size_t idx_; + const size_t idx_; std::thread impl_; }; @@ -314,7 +297,7 @@ class Client { std::unique_ptr<Timer> timer_; InterarrivalTimer interarrival_timer_; - std::vector<grpc_time> next_time_; + std::vector<gpr_timespec> next_time_; }; template <class StubType, class RequestType> @@ -323,9 +306,9 @@ class ClientImpl : public Client { ClientImpl(const ClientConfig& config, std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) - : channels_(config.client_channels()), create_stub_(create_stub) { - cores_ = LimitCores(config.core_list().data(), config.core_list_size()); - + : cores_(LimitCores(config.core_list().data(), config.core_list_size())), + channels_(config.client_channels()), + create_stub_(create_stub) { for (int i = 0; i < config.client_channels(); i++) { channels_[i].init(config.server_targets(i % config.server_targets_size()), config, create_stub_); @@ -337,7 +320,7 @@ class ClientImpl : public Client { virtual ~ClientImpl() {} protected: - int cores_; + const int cores_; RequestType request_; class ClientChannelInfo { diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index f3f8f37051..9e8767d103 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -43,9 +43,9 @@ #include <vector> #include <gflags/gflags.h> +#include <grpc++/alarm.h> #include <grpc++/channel.h> #include <grpc++/client_context.h> -#include <grpc++/client_context.h> #include <grpc++/generic/generic_stub.h> #include <grpc/grpc.h> #include <grpc/support/cpu.h> @@ -60,11 +60,9 @@ namespace grpc { namespace testing { -typedef std::list<grpc_time> deadline_list; - class ClientRpcContext { public: - explicit ClientRpcContext(int ch) : channel_id_(ch) {} + ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate virtual bool RunNextState(bool, Histogram* hist) = 0; @@ -74,72 +72,73 @@ class ClientRpcContext { return reinterpret_cast<ClientRpcContext*>(t); } - deadline_list::iterator deadline_posn() const { return deadline_posn_; } - void set_deadline_posn(const deadline_list::iterator& it) { - deadline_posn_ = it; - } virtual void Start(CompletionQueue* cq) = 0; - int channel_id() const { return channel_id_; } - - protected: - int channel_id_; - - private: - deadline_list::iterator deadline_posn_; }; template <class RequestType, class ResponseType> class ClientRpcContextUnaryImpl : public ClientRpcContext { public: ClientRpcContextUnaryImpl( - int channel_id, BenchmarkService::Stub* stub, const RequestType& req, + BenchmarkService::Stub* stub, const RequestType& req, + std::function<gpr_timespec()> next_issue, std::function< std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : ClientRpcContext(channel_id), - context_(), + : context_(), stub_(stub), + cq_(nullptr), req_(req), response_(), - next_state_(&ClientRpcContextUnaryImpl::RespDone), + next_state_(State::READY), callback_(on_done), + next_issue_(next_issue), start_req_(start_req) {} + ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} void Start(CompletionQueue* cq) GRPC_OVERRIDE { - start_ = Timer::Now(); - response_reader_ = start_req_(stub_, &context_, req_, cq); - response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this)); + cq_ = cq; + if (!next_issue_) { // ready to issue + RunNextState(true, nullptr); + } else { // wait for the issue time + alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + } } - ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {} bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { - bool ret = (this->*next_state_)(ok); - if (!ret) { - hist->Add((Timer::Now() - start_) * 1e9); + switch (next_state_) { + case State::READY: + start_ = Timer::Now(); + response_reader_ = start_req_(stub_, &context_, req_, cq_); + response_reader_->Finish(&response_, &status_, + ClientRpcContext::tag(this)); + next_state_ = State::RESP_DONE; + return true; + case State::RESP_DONE: + hist->Add((Timer::Now() - start_) * 1e9); + callback_(status_, &response_); + next_state_ = State::INVALID; + return false; + default: + GPR_ASSERT(false); + return false; } - return ret; } - ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_, + return new ClientRpcContextUnaryImpl(stub_, req_, next_issue_, start_req_, callback_); } private: - bool RespDone(bool) { - next_state_ = &ClientRpcContextUnaryImpl::DoCallBack; - return false; - } - bool DoCallBack(bool) { - callback_(status_, &response_); - return true; // we're done, this'll be ignored - } grpc::ClientContext context_; BenchmarkService::Stub* stub_; + CompletionQueue* cq_; + std::unique_ptr<Alarm> alarm_; RequestType req_; ResponseType response_; - bool (ClientRpcContextUnaryImpl::*next_state_)(bool); + enum State { INVALID, READY, RESP_DONE }; + State next_state_; std::function<void(grpc::Status, ResponseType*)> callback_; + std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&, CompletionQueue*)> start_req_; @@ -157,49 +156,35 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { // member name resolution until the template types are fully resolved public: using Client::SetupLoadTest; - using Client::NextIssueTime; using Client::closed_loop_; + using Client::NextIssuer; using ClientImpl<StubType, RequestType>::cores_; using ClientImpl<StubType, RequestType>::channels_; using ClientImpl<StubType, RequestType>::request_; AsyncClient(const ClientConfig& config, - std::function<ClientRpcContext*(int, StubType*, - const RequestType&)> setup_ctx, + std::function<ClientRpcContext*( + StubType*, std::function<gpr_timespec()> next_issue, + const RequestType&)> setup_ctx, std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)> create_stub) : ClientImpl<StubType, RequestType>(config, create_stub), - num_async_threads_(NumThreads(config)), - channel_lock_(new std::mutex[config.client_channels()]), - contexts_(config.client_channels()), - max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()), - channel_count_(config.client_channels()), - pref_channel_inc_(num_async_threads_) { + num_async_threads_(NumThreads(config)) { SetupLoadTest(config, num_async_threads_); for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); - if (!closed_loop_) { - rpc_deadlines_.emplace_back(); - next_channel_.push_back(i % channel_count_); - issue_allowed_.emplace_back(true); - - grpc_time next_issue; - NextIssueTime(i, &next_issue); - next_issue_.push_back(next_issue); - } + next_issuers_.emplace_back(NextIssuer(i)); } + using namespace std::placeholders; int t = 0; for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) { - for (int ch = 0; ch < channel_count_; ch++) { + for (int ch = 0; ch < config.client_channels(); ch++) { auto* cq = cli_cqs_[t].get(); + auto ctx = + setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_); + ctx->Start(cq); t = (t + 1) % cli_cqs_.size(); - auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_); - if (closed_loop_) { - ctx->Start(cq); - } else { - contexts_[ch].push_front(ctx); - } } } } @@ -212,140 +197,34 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { delete ClientRpcContext::detag(got_tag); } } - // Now clear out all the pre-allocated idle contexts - for (int ch = 0; ch < channel_count_; ch++) { - while (!contexts_[ch].empty()) { - // Get an idle context from the front of the list - auto* ctx = *(contexts_[ch].begin()); - contexts_[ch].pop_front(); - delete ctx; - } - } - delete[] channel_lock_; } bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { void* got_tag; bool ok; - grpc_time deadline, short_deadline; - if (closed_loop_) { - deadline = grpc_time_source::now() + std::chrono::seconds(1); - short_deadline = deadline; - } else { - if (rpc_deadlines_[thread_idx].empty()) { - deadline = grpc_time_source::now() + std::chrono::seconds(1); - } else { - deadline = *(rpc_deadlines_[thread_idx].begin()); - } - short_deadline = - issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline; - } - - bool got_event; - switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) { - case CompletionQueue::SHUTDOWN: - return false; - case CompletionQueue::TIMEOUT: - got_event = false; - break; - case CompletionQueue::GOT_EVENT: - got_event = true; - break; - default: - GPR_ASSERT(false); - break; - } - if (got_event) { + if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { + // Got a regular event, so process it ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (ctx->RunNextState(ok, histogram) == false) { - // call the callback and then clone the ctx - ctx->RunNextState(ok, histogram); - ClientRpcContext* clone_ctx = ctx->StartNewClone(); - if (closed_loop_) { - clone_ctx->Start(cli_cqs_[thread_idx].get()); - } else { - // Remove the entry from the rpc deadlines list - rpc_deadlines_[thread_idx].erase(ctx->deadline_posn()); - // Put the clone_ctx in the list of idle contexts for this channel - // Under lock - int ch = clone_ctx->channel_id(); - std::lock_guard<std::mutex> g(channel_lock_[ch]); - contexts_[ch].push_front(clone_ctx); - } + if (!ctx->RunNextState(ok, histogram)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + auto clone = ctx->StartNewClone(); + clone->Start(cli_cqs_[thread_idx].get()); // delete the old version delete ctx; } - if (!closed_loop_) - issue_allowed_[thread_idx] = - true; // may be ok now even if it hadn't been + return true; + } else { // queue is shutting down + return false; } - if (!closed_loop_ && issue_allowed_[thread_idx] && - grpc_time_source::now() >= next_issue_[thread_idx]) { - // Attempt to issue - bool issued = false; - for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx]; - num_attempts < channel_count_ && !issued; num_attempts++) { - bool can_issue = false; - ClientRpcContext* ctx = nullptr; - { - std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]); - if (!contexts_[channel_attempt].empty()) { - // Get an idle context from the front of the list - ctx = *(contexts_[channel_attempt].begin()); - contexts_[channel_attempt].pop_front(); - can_issue = true; - } - } - if (can_issue) { - // do the work to issue - rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() + - std::chrono::seconds(1)); - auto it = rpc_deadlines_[thread_idx].end(); - --it; - ctx->set_deadline_posn(it); - ctx->Start(cli_cqs_[thread_idx].get()); - issued = true; - // If we did issue, then next time, try our thread's next - // preferred channel - next_channel_[thread_idx] += pref_channel_inc_; - if (next_channel_[thread_idx] >= channel_count_) - next_channel_[thread_idx] = (thread_idx % channel_count_); - } else { - // Do a modular increment of channel attempt if we couldn't issue - channel_attempt = (channel_attempt + 1) % channel_count_; - } - } - if (issued) { - // We issued one; see when we can issue the next - grpc_time next_issue; - NextIssueTime(thread_idx, &next_issue); - next_issue_[thread_idx] = next_issue; - } else { - issue_allowed_[thread_idx] = false; - } - } - return true; } protected: - int num_async_threads_; + const int num_async_threads_; private: - class boolean { // exists only to avoid data-race on vector<bool> - public: - boolean() : val_(false) {} - boolean(bool b) : val_(b) {} - operator bool() const { return val_; } - boolean& operator=(bool b) { - val_ = b; - return *this; - } - - private: - bool val_; - }; int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing @@ -356,18 +235,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; - - std::vector<deadline_list> rpc_deadlines_; // per thread deadlines - std::vector<int> next_channel_; // per thread round-robin channel ctr - std::vector<boolean> issue_allowed_; // may this thread attempt to issue - std::vector<grpc_time> next_issue_; // when should it issue? - - std::mutex* - channel_lock_; // a vector, but avoid std::vector for old compilers - std::vector<context_list> contexts_; // per-channel list of idle contexts - int max_outstanding_per_channel_; - int channel_count_; - int pref_channel_inc_; + std::vector<std::function<gpr_timespec()>> next_issuers_; }; static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( @@ -391,11 +259,11 @@ class AsyncUnaryClient GRPC_FINAL const SimpleRequest& request, CompletionQueue* cq) { return stub->AsyncUnaryCall(ctx, request, cq); }; - static ClientRpcContext* SetupCtx(int channel_id, - BenchmarkService::Stub* stub, + static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, + std::function<gpr_timespec()> next_issue, const SimpleRequest& req) { return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, AsyncUnaryClient::StartReq, + stub, req, next_issue, AsyncUnaryClient::StartReq, AsyncUnaryClient::CheckDone); } }; @@ -404,62 +272,94 @@ template <class RequestType, class ResponseType> class ClientRpcContextStreamingImpl : public ClientRpcContext { public: ClientRpcContextStreamingImpl( - int channel_id, BenchmarkService::Stub* stub, const RequestType& req, + BenchmarkService::Stub* stub, const RequestType& req, + std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr< grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)> start_req, std::function<void(grpc::Status, ResponseType*)> on_done) - : ClientRpcContext(channel_id), - context_(), + : context_(), stub_(stub), + cq_(nullptr), req_(req), response_(), - next_state_(&ClientRpcContextStreamingImpl::ReqSent), + next_state_(State::INVALID), callback_(on_done), + next_issue_(next_issue), start_req_(start_req), start_(Timer::Now()) {} ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {} + void Start(CompletionQueue* cq) GRPC_OVERRIDE { + cq_ = cq; + stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); + next_state_ = State::STREAM_IDLE; + } bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { - return (this->*next_state_)(ok, hist); + while (true) { + switch (next_state_) { + case State::STREAM_IDLE: + if (!next_issue_) { // ready to issue + next_state_ = State::READY_TO_WRITE; + } else { + next_state_ = State::WAIT; + } + break; // loop around, don't return + case State::WAIT: + alarm_.reset( + new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + next_state_ = State::READY_TO_WRITE; + return true; + case State::READY_TO_WRITE: + if (!ok) { + return false; + } + start_ = Timer::Now(); + next_state_ = State::WRITE_DONE; + stream_->Write(req_, ClientRpcContext::tag(this)); + return true; + case State::WRITE_DONE: + if (!ok) { + return false; + } + next_state_ = State::READ_DONE; + stream_->Read(&response_, ClientRpcContext::tag(this)); + return true; + break; + case State::READ_DONE: + hist->Add((Timer::Now() - start_) * 1e9); + callback_(status_, &response_); + next_state_ = State::STREAM_IDLE; + break; // loop around + default: + GPR_ASSERT(false); + return false; + } + } } ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_, + return new ClientRpcContextStreamingImpl(stub_, req_, next_issue_, start_req_, callback_); } - void Start(CompletionQueue* cq) GRPC_OVERRIDE { - stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); - } private: - bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } - bool StartWrite(bool ok) { - if (!ok) { - return (false); - } - start_ = Timer::Now(); - next_state_ = &ClientRpcContextStreamingImpl::WriteDone; - stream_->Write(req_, ClientRpcContext::tag(this)); - return true; - } - bool WriteDone(bool ok, Histogram*) { - if (!ok) { - return (false); - } - next_state_ = &ClientRpcContextStreamingImpl::ReadDone; - stream_->Read(&response_, ClientRpcContext::tag(this)); - return true; - } - bool ReadDone(bool ok, Histogram* hist) { - hist->Add((Timer::Now() - start_) * 1e9); - return StartWrite(ok); - } grpc::ClientContext context_; BenchmarkService::Stub* stub_; + CompletionQueue* cq_; + std::unique_ptr<Alarm> alarm_; RequestType req_; ResponseType response_; - bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*); + enum State { + INVALID, + STREAM_IDLE, + WAIT, + READY_TO_WRITE, + WRITE_DONE, + READ_DONE + }; + State next_state_; std::function<void(grpc::Status, ResponseType*)> callback_; + std::function<gpr_timespec()> next_issue_; std::function< std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>( BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*, @@ -475,9 +375,6 @@ class AsyncStreamingClient GRPC_FINAL public: explicit AsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, BenchmarkStubCreator) { - // async streaming currently only supports closed loop - GPR_ASSERT(closed_loop_); - StartThreads(num_async_threads_); } @@ -492,11 +389,11 @@ class AsyncStreamingClient GRPC_FINAL auto stream = stub->AsyncStreamingCall(ctx, cq, tag); return stream; }; - static ClientRpcContext* SetupCtx(int channel_id, - BenchmarkService::Stub* stub, + static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub, + std::function<gpr_timespec()> next_issue, const SimpleRequest& req) { return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>( - channel_id, stub, req, AsyncStreamingClient::StartReq, + stub, req, next_issue, AsyncStreamingClient::StartReq, AsyncStreamingClient::CheckDone); } }; @@ -504,64 +401,96 @@ class AsyncStreamingClient GRPC_FINAL class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { public: ClientRpcContextGenericStreamingImpl( - int channel_id, grpc::GenericStub* stub, const ByteBuffer& req, + grpc::GenericStub* stub, const ByteBuffer& req, + std::function<gpr_timespec()> next_issue, std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, const grpc::string& method_name, CompletionQueue*, void*)> start_req, std::function<void(grpc::Status, ByteBuffer*)> on_done) - : ClientRpcContext(channel_id), - context_(), + : context_(), stub_(stub), + cq_(nullptr), req_(req), response_(), - next_state_(&ClientRpcContextGenericStreamingImpl::ReqSent), + next_state_(State::INVALID), callback_(on_done), + next_issue_(next_issue), start_req_(start_req), start_(Timer::Now()) {} ~ClientRpcContextGenericStreamingImpl() GRPC_OVERRIDE {} - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { - return (this->*next_state_)(ok, hist); - } - ClientRpcContext* StartNewClone() GRPC_OVERRIDE { - return new ClientRpcContextGenericStreamingImpl(channel_id_, stub_, req_, - start_req_, callback_); - } void Start(CompletionQueue* cq) GRPC_OVERRIDE { + cq_ = cq; const grpc::string kMethodName( "/grpc.testing.BenchmarkService/StreamingCall"); stream_ = start_req_(stub_, &context_, kMethodName, cq, ClientRpcContext::tag(this)); + next_state_ = State::STREAM_IDLE; } - - private: - bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); } - bool StartWrite(bool ok) { - if (!ok) { - return (false); - } - start_ = Timer::Now(); - next_state_ = &ClientRpcContextGenericStreamingImpl::WriteDone; - stream_->Write(req_, ClientRpcContext::tag(this)); - return true; - } - bool WriteDone(bool ok, Histogram*) { - if (!ok) { - return (false); + bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + while (true) { + switch (next_state_) { + case State::STREAM_IDLE: + if (!next_issue_) { // ready to issue + next_state_ = State::READY_TO_WRITE; + } else { + next_state_ = State::WAIT; + } + break; // loop around, don't return + case State::WAIT: + alarm_.reset( + new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); + next_state_ = State::READY_TO_WRITE; + return true; + case State::READY_TO_WRITE: + if (!ok) { + return false; + } + start_ = Timer::Now(); + next_state_ = State::WRITE_DONE; + stream_->Write(req_, ClientRpcContext::tag(this)); + return true; + case State::WRITE_DONE: + if (!ok) { + return false; + } + next_state_ = State::READ_DONE; + stream_->Read(&response_, ClientRpcContext::tag(this)); + return true; + break; + case State::READ_DONE: + hist->Add((Timer::Now() - start_) * 1e9); + callback_(status_, &response_); + next_state_ = State::STREAM_IDLE; + break; // loop around + default: + GPR_ASSERT(false); + return false; + } } - next_state_ = &ClientRpcContextGenericStreamingImpl::ReadDone; - stream_->Read(&response_, ClientRpcContext::tag(this)); - return true; } - bool ReadDone(bool ok, Histogram* hist) { - hist->Add((Timer::Now() - start_) * 1e9); - return StartWrite(ok); + ClientRpcContext* StartNewClone() GRPC_OVERRIDE { + return new ClientRpcContextGenericStreamingImpl(stub_, req_, next_issue_, + start_req_, callback_); } + + private: grpc::ClientContext context_; grpc::GenericStub* stub_; + CompletionQueue* cq_; + std::unique_ptr<Alarm> alarm_; ByteBuffer req_; ByteBuffer response_; - bool (ClientRpcContextGenericStreamingImpl::*next_state_)(bool, Histogram*); + enum State { + INVALID, + STREAM_IDLE, + WAIT, + READY_TO_WRITE, + WRITE_DONE, + READ_DONE + }; + State next_state_; std::function<void(grpc::Status, ByteBuffer*)> callback_; + std::function<gpr_timespec()> next_issue_; std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>( grpc::GenericStub*, grpc::ClientContext*, const grpc::string&, CompletionQueue*, void*)> start_req_; @@ -580,9 +509,6 @@ class GenericAsyncStreamingClient GRPC_FINAL public: explicit GenericAsyncStreamingClient(const ClientConfig& config) : AsyncClient(config, SetupCtx, GenericStubCreator) { - // async streaming currently only supports closed loop - GPR_ASSERT(closed_loop_); - StartThreads(num_async_threads_); } @@ -596,10 +522,11 @@ class GenericAsyncStreamingClient GRPC_FINAL auto stream = stub->Call(ctx, method_name, cq, tag); return stream; }; - static ClientRpcContext* SetupCtx(int channel_id, grpc::GenericStub* stub, + static ClientRpcContext* SetupCtx(grpc::GenericStub* stub, + std::function<gpr_timespec()> next_issue, const ByteBuffer& req) { return new ClientRpcContextGenericStreamingImpl( - channel_id, stub, req, GenericAsyncStreamingClient::StartReq, + stub, req, next_issue, GenericAsyncStreamingClient::StartReq, GenericAsyncStreamingClient::CheckDone); } }; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index d93537b279..edfc246a25 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -84,11 +84,8 @@ class SynchronousClient protected: void WaitToIssue(int thread_idx) { - grpc_time next_time; - if (NextIssueTime(thread_idx, &next_time)) { - gpr_timespec next_timespec; - TimepointHR2Timespec(next_time, &next_timespec); - gpr_sleep_until(next_timespec); + if (!closed_loop_) { + gpr_sleep_until(NextIssueTime(thread_idx)); } } diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 80f6ada409..1c7fdf8796 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -197,9 +197,7 @@ std::unique_ptr<ScenarioResult> RunScenario( workers.resize(num_clients + num_servers); gpr_timespec deadline = - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds( - warmup_seconds + benchmark_seconds + 20, GPR_TIMESPAN)); + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(warmup_seconds + benchmark_seconds + 20); // Start servers using runsc::ServerData; diff --git a/test/cpp/qps/interarrival.h b/test/cpp/qps/interarrival.h index 841619e3ff..b6fd67b77c 100644 --- a/test/cpp/qps/interarrival.h +++ b/test/cpp/qps/interarrival.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -51,15 +51,15 @@ namespace testing { // stacks. Thus, this code only uses a uniform distribution of doubles [0,1) // and then provides the distribution functions itself. -class RandomDist { +class RandomDistInterface { public: - RandomDist() {} - virtual ~RandomDist() = 0; - // Argument to operator() is a uniform double in the range [0,1) - virtual double operator()(double uni) const = 0; + RandomDistInterface() {} + virtual ~RandomDistInterface() = 0; + // Argument to transform is a uniform double in the range [0,1) + virtual double transform(double uni) const = 0; }; -inline RandomDist::~RandomDist() {} +inline RandomDistInterface::~RandomDistInterface() {} // ExpDist implements an exponential distribution, which is the // interarrival distribution for a Poisson process. The parameter @@ -69,11 +69,11 @@ inline RandomDist::~RandomDist() {} // independent identical stationary sources. For more information, // see http://en.wikipedia.org/wiki/Exponential_distribution -class ExpDist GRPC_FINAL : public RandomDist { +class ExpDist GRPC_FINAL : public RandomDistInterface { public: explicit ExpDist(double lambda) : lambda_recip_(1.0 / lambda) {} ~ExpDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { + double transform(double uni) const GRPC_OVERRIDE { // Note: Use 1.0-uni above to avoid NaN if uni is 0 return lambda_recip_ * (-log(1.0 - uni)); } @@ -87,11 +87,11 @@ class ExpDist GRPC_FINAL : public RandomDist { // mean interarrival time is (lo+hi)/2. For more information, // see http://en.wikipedia.org/wiki/Uniform_distribution_%28continuous%29 -class UniformDist GRPC_FINAL : public RandomDist { +class UniformDist GRPC_FINAL : public RandomDistInterface { public: UniformDist(double lo, double hi) : lo_(lo), range_(hi - lo) {} ~UniformDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { + double transform(double uni) const GRPC_OVERRIDE { return uni * range_ + lo_; } @@ -106,11 +106,11 @@ class UniformDist GRPC_FINAL : public RandomDist { // clients) will not preserve any deterministic interarrival gap across // requests. -class DetDist GRPC_FINAL : public RandomDist { +class DetDist GRPC_FINAL : public RandomDistInterface { public: explicit DetDist(double val) : val_(val) {} ~DetDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { return val_; } + double transform(double uni) const GRPC_OVERRIDE { return val_; } private: double val_; @@ -123,12 +123,12 @@ class DetDist GRPC_FINAL : public RandomDist { // good representation of the response times of data center jobs. See // http://en.wikipedia.org/wiki/Pareto_distribution -class ParetoDist GRPC_FINAL : public RandomDist { +class ParetoDist GRPC_FINAL : public RandomDistInterface { public: ParetoDist(double base, double alpha) : base_(base), alpha_recip_(1.0 / alpha) {} ~ParetoDist() GRPC_OVERRIDE {} - double operator()(double uni) const GRPC_OVERRIDE { + double transform(double uni) const GRPC_OVERRIDE { // Note: Use 1.0-uni above to avoid div by zero if uni is 0 return base_ / pow(1.0 - uni, alpha_recip_); } @@ -145,13 +145,14 @@ class ParetoDist GRPC_FINAL : public RandomDist { class InterarrivalTimer { public: InterarrivalTimer() {} - void init(const RandomDist& r, int threads, int entries = 1000000) { + void init(const RandomDistInterface& r, int threads, int entries = 1000000) { for (int i = 0; i < entries; i++) { // rand is the only choice that is portable across POSIX and Windows // and that supports new and old compilers - const double uniform_0_1 = rand() / RAND_MAX; + const double uniform_0_1 = + static_cast<double>(rand()) / static_cast<double>(RAND_MAX); random_table_.push_back( - std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1)))); + static_cast<int64_t>(1e9 * r.transform(uniform_0_1))); } // Now set up the thread positions for (int i = 0; i < threads; i++) { @@ -160,7 +161,7 @@ class InterarrivalTimer { } virtual ~InterarrivalTimer(){}; - std::chrono::nanoseconds operator()(int thread_num) { + int64_t next(int thread_num) { auto ret = *(thread_posns_[thread_num]++); if (thread_posns_[thread_num] == random_table_.end()) thread_posns_[thread_num] = random_table_.begin(); @@ -168,7 +169,7 @@ class InterarrivalTimer { } private: - typedef std::vector<std::chrono::nanoseconds> time_table; + typedef std::vector<int64_t> time_table; std::vector<time_table::const_iterator> thread_posns_; time_table random_table_; }; diff --git a/test/cpp/qps/qps-sweep.sh b/test/cpp/qps/qps-sweep.sh index 539da1d893..7a35788849 100755 --- a/test/cpp/qps/qps-sweep.sh +++ b/test/cpp/qps/qps-sweep.sh @@ -37,9 +37,26 @@ fi bins=`find . .. ../.. ../../.. -name bins | head -1` +# Print out each command that gets executed set -x +# +# Specify parameters used in some of the tests +# + +# big is the size in bytes of large messages (0 is the size otherwise) big=65536 + +# wide is the number of client channels in multi-channel tests (1 otherwise) +wide=64 + +# deep is the number of RPCs outstanding on a channel in non-ping-pong tests +# (the value used is 1 otherwise) +deep=100 + +# half is half the count of worker processes, used in the crossbar scenario +# that uses equal clients and servers. The other scenarios use only 1 server +# and either 1 client or N-1 clients as appropriate half=`echo $QPS_WORKERS | awk -F, '{print int(NF/2)}'` for secure in true false; do @@ -52,30 +69,40 @@ for secure in true false; do # Scenario 2: generic async streaming "unconstrained" (QPS) "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ - --client_channels=64 --bbuf_req_size=0 --bbuf_resp_size=0 \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ + --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ - --num_servers=1 --num_clients=0 + --num_servers=1 --num_clients=0 |& tee /tmp/qps-test.$$ # Scenario 2b: QPS with a single server core "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ - --client_channels=64 --bbuf_req_size=0 --bbuf_resp_size=0 \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ + --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 --server_core_limit=1 # Scenario 2c: protobuf-based QPS "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=100 \ - --client_channels=64 --simple_req_size=0 --simple_resp_size=0 \ + --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=$deep \ + --client_channels=$wide --simple_req_size=0 --simple_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=0 - # Scenario 3: Latency at near-peak load (TBD) + # Scenario 3: Latency at sub-peak load (all clients equally loaded) + for loadfactor in 0.2 0.5 0.7; do + "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ + --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ + --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ + --num_servers=1 --num_clients=0 --poisson_load=`awk -v lf=$loadfactor \ + '$5 == "QPS:" {print int(lf * $6); exit}' /tmp/qps-test.$$` + done + + rm /tmp/qps-test.$$ # Scenario 4: Single-channel bidirectional throughput test (like TCP_STREAM). "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=1 --bbuf_req_size=$big --bbuf_resp_size=$big \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 @@ -108,35 +135,35 @@ for secure in true false; do # Scenario 9: Crossbar QPS test "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ - --client_channels=64 --bbuf_req_size=0 --bbuf_resp_size=0 \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ + --client_channels=$wide --bbuf_req_size=0 --bbuf_resp_size=0 \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=$half --num_clients=0 # Scenario 10: Multi-channel bidir throughput test "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ - --client_channels=64 --bbuf_req_size=$big --bbuf_resp_size=$big \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=1 \ + --client_channels=$wide --bbuf_req_size=$big --bbuf_resp_size=$big \ --async_client_threads=0 --async_server_threads=0 --secure_test=$secure \ --num_servers=1 --num_clients=1 # Scenario 11: Single-channel request throughput test "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=1 --bbuf_req_size=$big --bbuf_resp_size=0 \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 # Scenario 12: Single-channel response throughput test "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=100 \ + --server_type=ASYNC_GENERIC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=1 --bbuf_req_size=0 --bbuf_resp_size=$big \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 # Scenario 13: Single-channel bidirectional protobuf throughput test "$bins"/opt/qps_driver --rpc_type=STREAMING --client_type=ASYNC_CLIENT \ - --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=100 \ + --server_type=ASYNC_SERVER --outstanding_rpcs_per_channel=$deep \ --client_channels=1 --simple_req_size=$big --simple_resp_size=$big \ --async_client_threads=1 --async_server_threads=1 --secure_test=$secure \ --num_servers=1 --num_clients=1 diff --git a/test/cpp/qps/qps_interarrival_test.cc b/test/cpp/qps/qps_interarrival_test.cc index ccda28f09a..77e81fb84b 100644 --- a/test/cpp/qps/qps_interarrival_test.cc +++ b/test/cpp/qps/qps_interarrival_test.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -39,17 +39,17 @@ #include "test/cpp/qps/interarrival.h" -using grpc::testing::RandomDist; +using grpc::testing::RandomDistInterface; using grpc::testing::InterarrivalTimer; -static void RunTest(RandomDist &&r, int threads, std::string title) { +static void RunTest(RandomDistInterface &&r, int threads, std::string title) { InterarrivalTimer timer; timer.init(r, threads); gpr_histogram *h(gpr_histogram_create(0.01, 60e9)); for (int i = 0; i < 10000000; i++) { for (int j = 0; j < threads; j++) { - gpr_histogram_add(h, timer(j).count()); + gpr_histogram_add(h, timer.next(j)); } } @@ -70,7 +70,7 @@ using grpc::testing::ParetoDist; int main(int argc, char **argv) { RunTest(ExpDist(10.0), 5, std::string("Exponential(10)")); RunTest(DetDist(5.0), 5, std::string("Det(5)")); - RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(1,10)")); + RunTest(UniformDist(0.0, 10.0), 5, std::string("Uniform(0,10)")); RunTest(ParetoDist(1.0, 1.0), 5, std::string("Pareto(1,1)")); return 0; } diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc index fe5f685b6e..0ac41d9f96 100644 --- a/test/cpp/qps/qps_openloop_test.cc +++ b/test/cpp/qps/qps_openloop_test.cc @@ -53,7 +53,7 @@ static void RunQPS() { client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_client_channels(8); client_config.set_async_client_threads(8); - client_config.set_rpc_type(UNARY); + client_config.set_rpc_type(STREAMING); client_config.mutable_load_params()->mutable_poisson()->set_offered_load( 1000.0); diff --git a/test/cpp/qps/qps_test.cc b/test/cpp/qps/qps_test.cc index 15054db892..27aaf137f6 100644 --- a/test/cpp/qps/qps_test.cc +++ b/test/cpp/qps/qps_test.cc @@ -53,7 +53,7 @@ static void RunQPS() { client_config.set_outstanding_rpcs_per_channel(1000); client_config.set_client_channels(8); client_config.set_async_client_threads(8); - client_config.set_rpc_type(UNARY); + client_config.set_rpc_type(STREAMING); client_config.mutable_load_params()->mutable_closed_loop(); ServerConfig server_config; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 1302d718f0..2024e0bfef 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -51,6 +51,7 @@ #include <gtest/gtest.h> #include "src/proto/grpc/testing/services.grpc.pb.h" +#include "test/core/util/test_config.h" #include "test/cpp/qps/server.h" namespace grpc { @@ -129,7 +130,7 @@ class AsyncQpsServerTest : public Server { } } ~AsyncQpsServerTest() { - auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(10); + auto deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); server_->Shutdown(deadline); for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { (*ss)->set_shutdown(); |