aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2016-08-15 10:27:03 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2016-08-15 10:27:03 -0700
commit26386e1fa0a3ff5f44d37279e73eb227213fe396 (patch)
treecc7eaadafd686159252ef961321575a006236063 /test
parent4272cac7aeb577a0f523f1e69f7dddacd79d3a66 (diff)
parentb6528c59a9a87ae5926ac9e4a11226a4ce490b36 (diff)
Merge remote-tracking branch 'upstream/master' into cli_main_fork
Diffstat (limited to 'test')
-rw-r--r--test/core/census/README7
-rw-r--r--test/core/census/data/resource_empty_name.pb1
-rw-r--r--test/core/census/data/resource_empty_name.txt5
-rw-r--r--test/core/census/data/resource_full.pb2
-rw-r--r--test/core/census/data/resource_full.txt9
-rw-r--r--test/core/census/data/resource_minimal_good.pb2
-rw-r--r--test/core/census/data/resource_minimal_good.txt5
-rw-r--r--test/core/census/data/resource_no_name.pb1
-rw-r--r--test/core/census/data/resource_no_name.txt4
-rw-r--r--test/core/census/data/resource_no_numerator.pb2
-rw-r--r--test/core/census/data/resource_no_numerator.txt6
-rw-r--r--test/core/census/data/resource_no_unit.pb2
-rw-r--r--test/core/census/data/resource_no_unit.txt2
-rw-r--r--test/core/census/resource_test.c157
-rw-r--r--test/core/channel/channel_stack_test.c15
-rw-r--r--test/core/end2end/cq_verifier.c9
-rw-r--r--test/core/end2end/cq_verifier.h3
-rw-r--r--test/core/end2end/end2end_nosec_tests.c16
-rw-r--r--test/core/end2end/end2end_tests.c16
-rw-r--r--test/core/end2end/fixtures/h2_load_reporting.c (renamed from test/core/end2end/fixtures/h2_loadreporting.c)95
-rw-r--r--test/core/end2end/fuzzers/hpack.dictionary10
-rwxr-xr-xtest/core/end2end/gen_build_yaml.py4
-rw-r--r--test/core/end2end/tests/filter_call_init_fails.c273
-rw-r--r--test/core/end2end/tests/filter_causes_close.c9
-rw-r--r--test/core/end2end/tests/load_reporting_hook.c321
-rw-r--r--test/core/iomgr/udp_server_test.c12
-rw-r--r--test/core/json/json_test.c2
-rw-r--r--test/core/nanopb/fuzzer_response.c6
-rw-r--r--test/core/transport/timeout_encoding_test.c (renamed from test/core/transport/chttp2/timeout_encoding_test.c)26
-rw-r--r--test/cpp/end2end/filter_end2end_test.cc353
-rw-r--r--test/cpp/end2end/server_builder_plugin_test.cc7
-rw-r--r--test/cpp/grpclb/grpclb_api_test.cc18
-rw-r--r--test/cpp/grpclb/grpclb_test.cc689
-rw-r--r--test/cpp/interop/interop_server.cc23
-rw-r--r--test/cpp/interop/interop_server_bootstrap.cc54
-rw-r--r--test/cpp/interop/server_helper.h6
-rw-r--r--test/cpp/qps/limit_cores.cc4
37 files changed, 2028 insertions, 148 deletions
diff --git a/test/core/census/README b/test/core/census/README
new file mode 100644
index 0000000000..d5363b7233
--- /dev/null
+++ b/test/core/census/README
@@ -0,0 +1,7 @@
+Test source and data files for Census.
+
+binary proto files (*.pb) in data directory are generated from the *.txt file,
+via:
+
+BASE="filename"
+cat $BASE.txt | protoc --encode=google.census.Resource census.proto > $BASE.pb
diff --git a/test/core/census/data/resource_empty_name.pb b/test/core/census/data/resource_empty_name.pb
new file mode 100644
index 0000000000..4d547445fa
--- /dev/null
+++ b/test/core/census/data/resource_empty_name.pb
@@ -0,0 +1 @@
+ \ No newline at end of file
diff --git a/test/core/census/data/resource_empty_name.txt b/test/core/census/data/resource_empty_name.txt
new file mode 100644
index 0000000000..271fd3274c
--- /dev/null
+++ b/test/core/census/data/resource_empty_name.txt
@@ -0,0 +1,5 @@
+# Name is present, but empty.
+name : ''
+unit {
+ numerator : SECS
+}
diff --git a/test/core/census/data/resource_full.pb b/test/core/census/data/resource_full.pb
new file mode 100644
index 0000000000..e4c6a2aef5
--- /dev/null
+++ b/test/core/census/data/resource_full.pb
@@ -0,0 +1,2 @@
+
+ full_resource"A resource with everything defined \ No newline at end of file
diff --git a/test/core/census/data/resource_full.txt b/test/core/census/data/resource_full.txt
new file mode 100644
index 0000000000..1aa2fafe3a
--- /dev/null
+++ b/test/core/census/data/resource_full.txt
@@ -0,0 +1,9 @@
+# A full resource definition - all fields filled out.
+name : 'full_resource'
+description : 'A resource with everything defined'
+unit {
+ # Megabits per second.
+ prefix : 6
+ numerator : BITS
+ denominator : SECS
+}
diff --git a/test/core/census/data/resource_minimal_good.pb b/test/core/census/data/resource_minimal_good.pb
new file mode 100644
index 0000000000..7100c462bf
--- /dev/null
+++ b/test/core/census/data/resource_minimal_good.pb
@@ -0,0 +1,2 @@
+
+ minimal_good \ No newline at end of file
diff --git a/test/core/census/data/resource_minimal_good.txt b/test/core/census/data/resource_minimal_good.txt
new file mode 100644
index 0000000000..a7a7e71dd6
--- /dev/null
+++ b/test/core/census/data/resource_minimal_good.txt
@@ -0,0 +1,5 @@
+# A minimal "good" Resource definition: has a name and numerator/unit.
+name : 'minimal_good'
+unit {
+ numerator : SECS
+}
diff --git a/test/core/census/data/resource_no_name.pb b/test/core/census/data/resource_no_name.pb
new file mode 100644
index 0000000000..4d547445fa
--- /dev/null
+++ b/test/core/census/data/resource_no_name.pb
@@ -0,0 +1 @@
+ \ No newline at end of file
diff --git a/test/core/census/data/resource_no_name.txt b/test/core/census/data/resource_no_name.txt
new file mode 100644
index 0000000000..8f12a91d35
--- /dev/null
+++ b/test/core/census/data/resource_no_name.txt
@@ -0,0 +1,4 @@
+# The minimal good Resource without a name.
+unit {
+ numerator : SECS
+}
diff --git a/test/core/census/data/resource_no_numerator.pb b/test/core/census/data/resource_no_numerator.pb
new file mode 100644
index 0000000000..2a5cceee70
--- /dev/null
+++ b/test/core/census/data/resource_no_numerator.pb
@@ -0,0 +1,2 @@
+
+resource_no_numeratorýÿÿÿÿÿÿÿÿ \ No newline at end of file
diff --git a/test/core/census/data/resource_no_numerator.txt b/test/core/census/data/resource_no_numerator.txt
new file mode 100644
index 0000000000..fc1fec74a2
--- /dev/null
+++ b/test/core/census/data/resource_no_numerator.txt
@@ -0,0 +1,6 @@
+# Resource without a numerator
+name : 'resource_no_numerator'
+unit {
+ prefix : -3
+ denominator : SECS
+}
diff --git a/test/core/census/data/resource_no_unit.pb b/test/core/census/data/resource_no_unit.pb
new file mode 100644
index 0000000000..9dca2620e0
--- /dev/null
+++ b/test/core/census/data/resource_no_unit.pb
@@ -0,0 +1,2 @@
+
+resource_no_unit \ No newline at end of file
diff --git a/test/core/census/data/resource_no_unit.txt b/test/core/census/data/resource_no_unit.txt
new file mode 100644
index 0000000000..c5d5115ceb
--- /dev/null
+++ b/test/core/census/data/resource_no_unit.txt
@@ -0,0 +1,2 @@
+# The minimal good resource without a unit
+name : 'resource_no_unit'
diff --git a/test/core/census/resource_test.c b/test/core/census/resource_test.c
new file mode 100644
index 0000000000..e1556d7d7b
--- /dev/null
+++ b/test/core/census/resource_test.c
@@ -0,0 +1,157 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/ext/census/resource.h"
+#include <grpc/census.h>
+#include <grpc/support/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/useful.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include "src/core/ext/census/base_resources.h"
+#include "test/core/util/test_config.h"
+
+// Test all the functionality for dealing with Resources.
+
+// Just startup and shutdown resources subsystem.
+static void test_enable_disable() {
+ initialize_resources();
+ shutdown_resources();
+}
+
+// A blank/empty initialization should not work.
+static void test_empty_definition() {
+ initialize_resources();
+ int32_t rid = census_define_resource(NULL, 0);
+ GPR_ASSERT(rid == -1);
+ uint8_t buffer[50] = {0};
+ rid = census_define_resource(buffer, 50);
+ GPR_ASSERT(rid == -1);
+ shutdown_resources();
+}
+
+// Given a file name, read raw proto and define the resource included within.
+// Returns resource id from census_define_resource().
+static int32_t define_resource_from_file(const char *file) {
+#define BUF_SIZE 512
+ uint8_t buffer[BUF_SIZE];
+ FILE *input = fopen(file, "rb");
+ GPR_ASSERT(input != NULL);
+ size_t nbytes = fread(buffer, 1, BUF_SIZE, input);
+ GPR_ASSERT(nbytes != 0 && nbytes < BUF_SIZE && feof(input) && !ferror(input));
+ int32_t rid = census_define_resource(buffer, nbytes);
+ GPR_ASSERT(fclose(input) == 0);
+ return rid;
+}
+
+// Test definition of a single resource, using a proto read from a file. The
+// `succeed` parameter indicates whether we expect the definition to succeed or
+// fail. `name` is used to check that the returned resource can be looked up by
+// name.
+static void test_define_single_resource(const char *file, const char *name,
+ bool succeed) {
+ gpr_log(GPR_INFO, "Test defining resource \"%s\"\n", name);
+ initialize_resources();
+ int32_t rid = define_resource_from_file(file);
+ if (succeed) {
+ GPR_ASSERT(rid >= 0);
+ int32_t rid2 = census_resource_id(name);
+ GPR_ASSERT(rid == rid2);
+ } else {
+ GPR_ASSERT(rid < 0);
+ }
+ shutdown_resources();
+}
+
+// Try deleting various resources (both those that exist and those that don't).
+static void test_delete_resource() {
+ initialize_resources();
+ // Try deleting resource before any are defined.
+ census_delete_resource(0);
+ // Create and check a couple of resources.
+ int32_t rid1 = define_resource_from_file(
+ "test/core/census/data/resource_minimal_good.pb");
+ int32_t rid2 =
+ define_resource_from_file("test/core/census/data/resource_full.pb");
+ GPR_ASSERT(rid1 >= 0 && rid2 >= 0 && rid1 != rid2);
+ int32_t rid3 = census_resource_id("minimal_good");
+ int32_t rid4 = census_resource_id("full_resource");
+ GPR_ASSERT(rid1 == rid3 && rid2 == rid4);
+ // Try deleting non-existant resources.
+ census_delete_resource(-1);
+ census_delete_resource(rid1 + rid2 + 1);
+ census_delete_resource(10000000);
+ // Delete one of the previously defined resources and check for deletion.
+ census_delete_resource(rid1);
+ rid3 = census_resource_id("minimal_good");
+ GPR_ASSERT(rid3 < 0);
+ // Check that re-adding works.
+ rid1 = define_resource_from_file(
+ "test/core/census/data/resource_minimal_good.pb");
+ GPR_ASSERT(rid1 >= 0);
+ rid3 = census_resource_id("minimal_good");
+ GPR_ASSERT(rid1 == rid3);
+ shutdown_resources();
+}
+
+// Test define base resources.
+static void test_base_resources() {
+ initialize_resources();
+ define_base_resources();
+ int32_t rid1 = census_resource_id("client_rpc_latency");
+ int32_t rid2 = census_resource_id("server_rpc_latency");
+ GPR_ASSERT(rid1 >= 0 && rid2 >= 0 && rid1 != rid2);
+ shutdown_resources();
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_enable_disable();
+ test_empty_definition();
+ test_define_single_resource("test/core/census/data/resource_minimal_good.pb",
+ "minimal_good", true);
+ test_define_single_resource("test/core/census/data/resource_full.pb",
+ "full_resource", true);
+ test_define_single_resource("test/core/census/data/resource_no_name.pb",
+ "resource_no_name", false);
+ test_define_single_resource("test/core/census/data/resource_no_numerator.pb",
+ "resource_no_numerator", false);
+ test_define_single_resource("test/core/census/data/resource_no_unit.pb",
+ "resource_no_unit", false);
+ test_define_single_resource("test/core/census/data/resource_empty_name.pb",
+ "resource_empty_name", false);
+ test_delete_resource();
+ test_base_resources();
+ return 0;
+}
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index f9561bed70..569b3f7cd2 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -53,17 +53,20 @@ static void channel_init_func(grpc_exec_ctx *exec_ctx,
*(int *)(elem->channel_data) = 0;
}
-static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *call_init_func(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
++*(int *)(elem->channel_data);
*(int *)(elem->call_data) = 0;
+ return GRPC_ERROR_NONE;
}
static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
+ const grpc_call_final_info *final_info,
+ void *ignored) {
++*(int *)(elem->channel_data);
}
@@ -132,8 +135,10 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size);
- grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, NULL,
- NULL, call_stack);
+ grpc_error *error =
+ grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack,
+ NULL, NULL, call_stack);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0);
GPR_ASSERT(call_elem->filter == channel_elem->filter);
diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c
index 890309c44a..b77568058c 100644
--- a/test/core/end2end/cq_verifier.c
+++ b/test/core/end2end/cq_verifier.c
@@ -259,9 +259,10 @@ void cq_verify(cq_verifier *v) {
gpr_strvec_destroy(&have_tags);
}
-void cq_verify_empty(cq_verifier *v) {
- gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(1, GPR_TIMESPAN));
+void cq_verify_empty_timeout(cq_verifier *v, int timeout_sec) {
+ gpr_timespec deadline =
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(timeout_sec, GPR_TIMESPAN));
grpc_event ev;
GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty");
@@ -275,6 +276,8 @@ void cq_verify_empty(cq_verifier *v) {
}
}
+void cq_verify_empty(cq_verifier *v) { cq_verify_empty_timeout(v, 1); }
+
static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) {
expectation *e = gpr_malloc(sizeof(expectation));
e->type = type;
diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h
index 8c9a85c218..bf82468c9a 100644
--- a/test/core/end2end/cq_verifier.h
+++ b/test/core/end2end/cq_verifier.h
@@ -55,6 +55,9 @@ void cq_verify(cq_verifier *v);
/* ensure that the completion queue is empty */
void cq_verify_empty(cq_verifier *v);
+/* ensure that the completion queue is empty, waiting up to \a timeout secs. */
+void cq_verify_empty_timeout(cq_verifier *v, int timeout_sec);
+
/* Various expectation matchers
Any functions taking ... expect a NULL terminated list of key/value pairs
(each pair using two parameter slots) of metadata that MUST be present in
diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c
index 03e55f1181..3efd18cf2e 100644
--- a/test/core/end2end/end2end_nosec_tests.c
+++ b/test/core/end2end/end2end_nosec_tests.c
@@ -69,6 +69,8 @@ extern void disappearing_server(grpc_end2end_test_config config);
extern void disappearing_server_pre_init(void);
extern void empty_batch(grpc_end2end_test_config config);
extern void empty_batch_pre_init(void);
+extern void filter_call_init_fails(grpc_end2end_test_config config);
+extern void filter_call_init_fails_pre_init(void);
extern void filter_causes_close(grpc_end2end_test_config config);
extern void filter_causes_close_pre_init(void);
extern void graceful_server_shutdown(grpc_end2end_test_config config);
@@ -83,6 +85,8 @@ extern void invoke_large_request(grpc_end2end_test_config config);
extern void invoke_large_request_pre_init(void);
extern void large_metadata(grpc_end2end_test_config config);
extern void large_metadata_pre_init(void);
+extern void load_reporting_hook(grpc_end2end_test_config config);
+extern void load_reporting_hook_pre_init(void);
extern void max_concurrent_streams(grpc_end2end_test_config config);
extern void max_concurrent_streams_pre_init(void);
extern void max_message_length(grpc_end2end_test_config config);
@@ -138,6 +142,7 @@ void grpc_end2end_tests_pre_init(void) {
default_host_pre_init();
disappearing_server_pre_init();
empty_batch_pre_init();
+ filter_call_init_fails_pre_init();
filter_causes_close_pre_init();
graceful_server_shutdown_pre_init();
high_initial_seqno_pre_init();
@@ -145,6 +150,7 @@ void grpc_end2end_tests_pre_init(void) {
idempotent_request_pre_init();
invoke_large_request_pre_init();
large_metadata_pre_init();
+ load_reporting_hook_pre_init();
max_concurrent_streams_pre_init();
max_message_length_pre_init();
negative_deadline_pre_init();
@@ -186,6 +192,7 @@ void grpc_end2end_tests(int argc, char **argv,
default_host(config);
disappearing_server(config);
empty_batch(config);
+ filter_call_init_fails(config);
filter_causes_close(config);
graceful_server_shutdown(config);
high_initial_seqno(config);
@@ -193,6 +200,7 @@ void grpc_end2end_tests(int argc, char **argv,
idempotent_request(config);
invoke_large_request(config);
large_metadata(config);
+ load_reporting_hook(config);
max_concurrent_streams(config);
max_message_length(config);
negative_deadline(config);
@@ -268,6 +276,10 @@ void grpc_end2end_tests(int argc, char **argv,
empty_batch(config);
continue;
}
+ if (0 == strcmp("filter_call_init_fails", argv[i])) {
+ filter_call_init_fails(config);
+ continue;
+ }
if (0 == strcmp("filter_causes_close", argv[i])) {
filter_causes_close(config);
continue;
@@ -296,6 +308,10 @@ void grpc_end2end_tests(int argc, char **argv,
large_metadata(config);
continue;
}
+ if (0 == strcmp("load_reporting_hook", argv[i])) {
+ load_reporting_hook(config);
+ continue;
+ }
if (0 == strcmp("max_concurrent_streams", argv[i])) {
max_concurrent_streams(config);
continue;
diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c
index 877b1b1989..e3d791abc1 100644
--- a/test/core/end2end/end2end_tests.c
+++ b/test/core/end2end/end2end_tests.c
@@ -71,6 +71,8 @@ extern void disappearing_server(grpc_end2end_test_config config);
extern void disappearing_server_pre_init(void);
extern void empty_batch(grpc_end2end_test_config config);
extern void empty_batch_pre_init(void);
+extern void filter_call_init_fails(grpc_end2end_test_config config);
+extern void filter_call_init_fails_pre_init(void);
extern void filter_causes_close(grpc_end2end_test_config config);
extern void filter_causes_close_pre_init(void);
extern void graceful_server_shutdown(grpc_end2end_test_config config);
@@ -85,6 +87,8 @@ extern void invoke_large_request(grpc_end2end_test_config config);
extern void invoke_large_request_pre_init(void);
extern void large_metadata(grpc_end2end_test_config config);
extern void large_metadata_pre_init(void);
+extern void load_reporting_hook(grpc_end2end_test_config config);
+extern void load_reporting_hook_pre_init(void);
extern void max_concurrent_streams(grpc_end2end_test_config config);
extern void max_concurrent_streams_pre_init(void);
extern void max_message_length(grpc_end2end_test_config config);
@@ -141,6 +145,7 @@ void grpc_end2end_tests_pre_init(void) {
default_host_pre_init();
disappearing_server_pre_init();
empty_batch_pre_init();
+ filter_call_init_fails_pre_init();
filter_causes_close_pre_init();
graceful_server_shutdown_pre_init();
high_initial_seqno_pre_init();
@@ -148,6 +153,7 @@ void grpc_end2end_tests_pre_init(void) {
idempotent_request_pre_init();
invoke_large_request_pre_init();
large_metadata_pre_init();
+ load_reporting_hook_pre_init();
max_concurrent_streams_pre_init();
max_message_length_pre_init();
negative_deadline_pre_init();
@@ -190,6 +196,7 @@ void grpc_end2end_tests(int argc, char **argv,
default_host(config);
disappearing_server(config);
empty_batch(config);
+ filter_call_init_fails(config);
filter_causes_close(config);
graceful_server_shutdown(config);
high_initial_seqno(config);
@@ -197,6 +204,7 @@ void grpc_end2end_tests(int argc, char **argv,
idempotent_request(config);
invoke_large_request(config);
large_metadata(config);
+ load_reporting_hook(config);
max_concurrent_streams(config);
max_message_length(config);
negative_deadline(config);
@@ -276,6 +284,10 @@ void grpc_end2end_tests(int argc, char **argv,
empty_batch(config);
continue;
}
+ if (0 == strcmp("filter_call_init_fails", argv[i])) {
+ filter_call_init_fails(config);
+ continue;
+ }
if (0 == strcmp("filter_causes_close", argv[i])) {
filter_causes_close(config);
continue;
@@ -304,6 +316,10 @@ void grpc_end2end_tests(int argc, char **argv,
large_metadata(config);
continue;
}
+ if (0 == strcmp("load_reporting_hook", argv[i])) {
+ load_reporting_hook(config);
+ continue;
+ }
if (0 == strcmp("max_concurrent_streams", argv[i])) {
max_concurrent_streams(config);
continue;
diff --git a/test/core/end2end/fixtures/h2_loadreporting.c b/test/core/end2end/fixtures/h2_load_reporting.c
index 4ed02f9728..f6d3923db9 100644
--- a/test/core/end2end/fixtures/h2_loadreporting.c
+++ b/test/core/end2end/fixtures/h2_load_reporting.c
@@ -52,18 +52,16 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
-static grpc_load_reporting_config *g_client_lrc;
-static grpc_load_reporting_config *g_server_lrc;
-
-typedef struct fullstack_fixture_data {
+typedef struct load_reporting_fixture_data {
char *localaddr;
-} fullstack_fixture_data;
+} load_reporting_fixture_data;
-static grpc_end2end_test_fixture chttp2_create_fixture_fullstack(
+static grpc_end2end_test_fixture chttp2_create_fixture_load_reporting(
grpc_channel_args *client_args, grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
int port = grpc_pick_unused_port_or_die();
- fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data));
+ load_reporting_fixture_data *ffd =
+ gpr_malloc(sizeof(load_reporting_fixture_data));
memset(&f, 0, sizeof(f));
gpr_join_host_port(&ffd->localaddr, "localhost", port);
@@ -74,47 +72,20 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack(
return f;
}
-typedef struct {
- int64_t total_bytes;
- bool fully_processed;
- uint32_t initial_token;
- uint32_t final_token;
-} aggregated_bw_stats;
-
-static void sample_fn(const grpc_load_reporting_call_data *call_data,
- void *user_data) {
- GPR_ASSERT(user_data != NULL);
- aggregated_bw_stats *custom_stats = (aggregated_bw_stats *)user_data;
- if (call_data == NULL) {
- /* initial invocation */
- custom_stats->initial_token = 0xDEADBEEF;
- } else {
- /* final invocation */
- custom_stats->total_bytes =
- (int64_t)(call_data->stats->transport_stream_stats.outgoing.data_bytes +
- call_data->stats->transport_stream_stats.incoming.data_bytes);
- custom_stats->final_token = 0xCAFED00D;
- custom_stats->fully_processed = true;
- }
-}
-
-void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f,
- grpc_channel_args *client_args) {
- fullstack_fixture_data *ffd = f->fixture_data;
- grpc_arg arg = grpc_load_reporting_config_create_arg(g_client_lrc);
- client_args = grpc_channel_args_copy_and_add(client_args, &arg, 1);
+void chttp2_init_client_load_reporting(grpc_end2end_test_fixture *f,
+ grpc_channel_args *client_args) {
+ load_reporting_fixture_data *ffd = f->fixture_data;
f->client = grpc_insecure_channel_create(ffd->localaddr, client_args, NULL);
- grpc_channel_args_destroy(client_args);
GPR_ASSERT(f->client);
}
-void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
- grpc_channel_args *server_args) {
- fullstack_fixture_data *ffd = f->fixture_data;
+void chttp2_init_server_load_reporting(grpc_end2end_test_fixture *f,
+ grpc_channel_args *server_args) {
+ load_reporting_fixture_data *ffd = f->fixture_data;
+ grpc_arg arg = grpc_load_reporting_enable_arg();
if (f->server) {
grpc_server_destroy(f->server);
}
- grpc_arg arg = grpc_load_reporting_config_create_arg(g_server_lrc);
server_args = grpc_channel_args_copy_and_add(server_args, &arg, 1);
f->server = grpc_server_create(server_args, NULL);
grpc_channel_args_destroy(server_args);
@@ -123,36 +94,23 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
grpc_server_start(f->server);
}
-void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) {
- fullstack_fixture_data *ffd = f->fixture_data;
+void chttp2_tear_down_load_reporting(grpc_end2end_test_fixture *f) {
+ load_reporting_fixture_data *ffd = f->fixture_data;
gpr_free(ffd->localaddr);
gpr_free(ffd);
}
/* All test configurations */
static grpc_end2end_test_config configs[] = {
- {"chttp2/fullstack+loadreporting", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
- chttp2_create_fixture_fullstack, chttp2_init_client_fullstack,
- chttp2_init_server_fullstack, chttp2_tear_down_fullstack},
+ {"chttp2/fullstack+load_reporting",
+ FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
+ chttp2_create_fixture_load_reporting, chttp2_init_client_load_reporting,
+ chttp2_init_server_load_reporting, chttp2_tear_down_load_reporting},
};
int main(int argc, char **argv) {
size_t i;
- aggregated_bw_stats *aggr_stats_client =
- gpr_malloc(sizeof(aggregated_bw_stats));
- aggr_stats_client->total_bytes = -1;
- aggr_stats_client->fully_processed = false;
- aggregated_bw_stats *aggr_stats_server =
- gpr_malloc(sizeof(aggregated_bw_stats));
- aggr_stats_server->total_bytes = -1;
- aggr_stats_server->fully_processed = false;
-
- g_client_lrc =
- grpc_load_reporting_config_create(sample_fn, aggr_stats_client);
- g_server_lrc =
- grpc_load_reporting_config_create(sample_fn, aggr_stats_server);
-
grpc_test_init(argc, argv);
grpc_end2end_tests_pre_init();
grpc_init();
@@ -163,22 +121,5 @@ int main(int argc, char **argv) {
grpc_shutdown();
- grpc_load_reporting_config_destroy(g_client_lrc);
- grpc_load_reporting_config_destroy(g_server_lrc);
-
- if (aggr_stats_client->fully_processed) {
- GPR_ASSERT(aggr_stats_client->total_bytes >= 0);
- GPR_ASSERT(aggr_stats_client->initial_token == 0xDEADBEEF);
- GPR_ASSERT(aggr_stats_client->final_token == 0xCAFED00D);
- }
- if (aggr_stats_server->fully_processed) {
- GPR_ASSERT(aggr_stats_server->total_bytes >= 0);
- GPR_ASSERT(aggr_stats_server->initial_token == 0xDEADBEEF);
- GPR_ASSERT(aggr_stats_server->final_token == 0xCAFED00D);
- }
-
- gpr_free(aggr_stats_client);
- gpr_free(aggr_stats_server);
-
return 0;
}
diff --git a/test/core/end2end/fuzzers/hpack.dictionary b/test/core/end2end/fuzzers/hpack.dictionary
index 097e9a8922..3157dfca3f 100644
--- a/test/core/end2end/fuzzers/hpack.dictionary
+++ b/test/core/end2end/fuzzers/hpack.dictionary
@@ -21,8 +21,6 @@
"\x0A:authority"
"\x0Dauthorization"
"\x0Dcache-control"
-"\x0Acensus-bin"
-"\x11census-binary-bin"
"\x13content-disposition"
"\x10content-encoding"
"\x10content-language"
@@ -42,11 +40,13 @@
"\x03GET"
"\x04grpc"
"\x14grpc-accept-encoding"
+"\x0Fgrpc-census-bin"
"\x0Dgrpc-encoding"
"\x1Egrpc-internal-encoding-request"
"\x0Cgrpc-message"
"\x0Bgrpc-status"
"\x0Cgrpc-timeout"
+"\x10grpc-tracing-bin"
"\x04gzip"
"\x0Dgzip, deflate"
"\x04host"
@@ -63,7 +63,8 @@
"\x13if-unmodified-since"
"\x0Dlast-modified"
"\x04link"
-"\x0Eload-reporting"
+"\x16load-reporting-initial"
+"\x17load-reporting-trailing"
"\x08location"
"\x0Cmax-forwards"
"\x07:method"
@@ -137,7 +138,8 @@
"\x00\x13if-unmodified-since\x00"
"\x00\x0Dlast-modified\x00"
"\x00\x04link\x00"
-"\x00\x0Eload-reporting\x00"
+"\x00\x16load-reporting-initial\x00"
+"\x00\x17load-reporting-trailing\x00"
"\x00\x08location\x00"
"\x00\x0Cmax-forwards\x00"
"\x00\x07:method\x03GET"
diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py
index fb7275474d..e59b7dc9fb 100755
--- a/test/core/end2end/gen_build_yaml.py
+++ b/test/core/end2end/gen_build_yaml.py
@@ -53,13 +53,13 @@ fd_unsecure_fixture_options = default_unsecure_fixture_options._replace(
END2END_FIXTURES = {
'h2_compress': default_unsecure_fixture_options,
'h2_census': default_unsecure_fixture_options,
+ 'h2_load_reporting': default_unsecure_fixture_options,
'h2_fakesec': default_secure_fixture_options._replace(ci_mac=False),
'h2_fd': fd_unsecure_fixture_options,
'h2_full': default_unsecure_fixture_options,
'h2_full+pipe': default_unsecure_fixture_options._replace(
platforms=['linux']),
'h2_full+trace': default_unsecure_fixture_options._replace(tracing=True),
- 'h2_loadreporting': default_unsecure_fixture_options,
'h2_oauth2': default_secure_fixture_options._replace(ci_mac=False),
'h2_proxy': default_unsecure_fixture_options._replace(includes_proxy=True,
ci_mac=False),
@@ -102,6 +102,7 @@ END2END_TESTS = {
'disappearing_server': connectivity_test_options,
'empty_batch': default_test_options,
'filter_causes_close': default_test_options,
+ 'filter_call_init_fails': default_test_options,
'graceful_server_shutdown': default_test_options._replace(cpu_cost=LOWCPU),
'hpack_size': default_test_options._replace(proxyable=False,
traceable=False),
@@ -115,6 +116,7 @@ END2END_TESTS = {
'network_status_change': default_test_options,
'no_op': default_test_options,
'payload': default_test_options,
+ 'load_reporting_hook': default_test_options,
'ping_pong_streaming': default_test_options,
'ping': connectivity_test_options._replace(proxyable=False),
'registered_call': default_test_options,
diff --git a/test/core/end2end/tests/filter_call_init_fails.c b/test/core/end2end/tests/filter_call_init_fails.c
new file mode 100644
index 0000000000..a09183b786
--- /dev/null
+++ b/test/core/end2end/tests/filter_call_init_fails.c
@@ -0,0 +1,273 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <limits.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/byte_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/lib/channel/channel_stack_builder.h"
+#include "src/core/lib/surface/channel_init.h"
+#include "test/core/end2end/cq_verifier.h"
+
+enum { TIMEOUT = 200000 };
+
+static bool g_enable_filter = false;
+
+static void *tag(intptr_t t) { return (void *)t; }
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char *test_name,
+ grpc_channel_args *client_args,
+ grpc_channel_args *server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
+ f = config.create_fixture(client_args, server_args);
+ config.init_server(&f, server_args);
+ config.init_client(&f, client_args);
+ return f;
+}
+
+static gpr_timespec n_seconds_time(int n) {
+ return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
+}
+
+static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture *f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(
+ f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = NULL;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture *f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = NULL;
+}
+
+static void end_test(grpc_end2end_test_fixture *f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+}
+
+// Simple request via a server filter that always fails to initialize
+// the call.
+static void test_request(grpc_end2end_test_config config) {
+ grpc_call *c;
+ grpc_call *s;
+ gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
+ grpc_byte_buffer *request_payload =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ gpr_timespec deadline = five_seconds_time();
+ grpc_end2end_test_fixture f =
+ begin_test(config, "filter_call_init_fails", NULL, NULL);
+ cq_verifier *cqv = cq_verifier_create(f.cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_byte_buffer *request_payload_recv = NULL;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ grpc_call_error error;
+ char *details = NULL;
+ size_t details_capacity = 0;
+
+ c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ "/foo", "foo.test.google.fr", deadline, NULL);
+ GPR_ASSERT(c);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->data.send_initial_metadata.metadata = NULL;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = request_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ error =
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(101));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
+ GPR_ASSERT(0 == strcmp(details, "access denied"));
+
+ gpr_free(details);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+
+ grpc_call_destroy(c);
+
+ cq_verifier_destroy(cqv);
+
+ grpc_byte_buffer_destroy(request_payload);
+ grpc_byte_buffer_destroy(request_payload_recv);
+
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+/*******************************************************************************
+ * Test filter - always fails to initialize a call
+ */
+
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ return grpc_error_set_int(GRPC_ERROR_CREATE("access denied"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_PERMISSION_DENIED);
+}
+
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ const grpc_call_final_info *final_info,
+ void *and_free_memory) {}
+
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {}
+
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {}
+
+static const grpc_channel_filter test_filter = {
+ grpc_call_next_op,
+ grpc_channel_next_op,
+ 0,
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ 0,
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "filter_call_init_fails"};
+
+/*******************************************************************************
+ * Registration
+ */
+
+static bool maybe_add_filter(grpc_channel_stack_builder *builder, void *arg) {
+ if (g_enable_filter) {
+ // Want to add the filter as close to the end as possible, to make
+ // sure that all of the filters work well together. However, we
+ // can't add it at the very end, because the connected channel filter
+ // must be the last one. So we add it right before the last one.
+ grpc_channel_stack_builder_iterator *it =
+ grpc_channel_stack_builder_create_iterator_at_last(builder);
+ GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
+ const bool retval = grpc_channel_stack_builder_add_filter_before(
+ it, &test_filter, NULL, NULL);
+ grpc_channel_stack_builder_iterator_destroy(it);
+ return retval;
+ } else {
+ return true;
+ }
+}
+
+static void init_plugin(void) {
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ maybe_add_filter, NULL);
+}
+
+static void destroy_plugin(void) {}
+
+void filter_call_init_fails(grpc_end2end_test_config config) {
+ g_enable_filter = true;
+ test_request(config);
+ g_enable_filter = false;
+}
+
+void filter_call_init_fails_pre_init(void) {
+ grpc_register_plugin(init_plugin, destroy_plugin);
+}
diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c
index 526c05ca3e..c6c36d668b 100644
--- a/test/core/end2end/tests/filter_causes_close.c
+++ b/test/core/end2end/tests/filter_causes_close.c
@@ -233,11 +233,14 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {}
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ return GRPC_ERROR_NONE;
+}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/test/core/end2end/tests/load_reporting_hook.c b/test/core/end2end/tests/load_reporting_hook.c
new file mode 100644
index 0000000000..2c6519881a
--- /dev/null
+++ b/test/core/end2end/tests/load_reporting_hook.c
@@ -0,0 +1,321 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <string.h>
+
+#include <grpc/byte_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "test/core/end2end/cq_verifier.h"
+
+#include "src/core/ext/load_reporting/load_reporting.h"
+#include "src/core/ext/load_reporting/load_reporting_filter.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+enum { TIMEOUT = 200000 };
+
+static void *tag(intptr_t t) { return (void *)t; }
+
+typedef struct {
+ gpr_mu mu;
+ intptr_t channel_id;
+ intptr_t call_id;
+
+ char *initial_md_str;
+ char *trailing_md_str;
+ char *method_name;
+
+ uint64_t incoming_bytes;
+ uint64_t outgoing_bytes;
+
+ grpc_status_code call_final_status;
+
+ bool fully_processed;
+} load_reporting_data;
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char *test_name,
+ grpc_channel_args *client_args,
+ grpc_channel_args *server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
+
+ f = config.create_fixture(client_args, server_args);
+ config.init_server(&f, server_args);
+ config.init_client(&f, client_args);
+
+ return f;
+}
+
+static gpr_timespec n_seconds_time(int n) {
+ return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
+}
+
+static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture *f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(
+ f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = NULL;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture *f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = NULL;
+}
+
+static void end_test(grpc_end2end_test_fixture *f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+}
+
+static void request_response_with_payload(grpc_end2end_test_fixture f,
+ const char *method_name,
+ const char *request_msg,
+ const char *response_msg,
+ grpc_metadata *initial_lr_metadata,
+ grpc_metadata *trailing_lr_metadata) {
+ gpr_slice request_payload_slice = gpr_slice_from_static_string(request_msg);
+ gpr_slice response_payload_slice = gpr_slice_from_static_string(response_msg);
+ grpc_call *c;
+ grpc_call *s;
+ grpc_byte_buffer *request_payload =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ grpc_byte_buffer *response_payload =
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+ gpr_timespec deadline = five_seconds_time();
+ cq_verifier *cqv = cq_verifier_create(f.cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_byte_buffer *request_payload_recv = NULL;
+ grpc_byte_buffer *response_payload_recv = NULL;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ grpc_call_error error;
+ char *details = NULL;
+ size_t details_capacity = 0;
+ int was_cancelled = 2;
+
+ c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ method_name, "foo.test.google.fr", deadline,
+ NULL);
+ GPR_ASSERT(c);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ GPR_ASSERT(initial_lr_metadata != NULL);
+ op->data.send_initial_metadata.count = 1;
+ op->data.send_initial_metadata.metadata = initial_lr_metadata;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = request_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &response_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ error =
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(101));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ cq_expect_completion(cqv, tag(101), 1);
+ cq_verify(cqv);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(102), 1);
+ cq_verify(cqv);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ GPR_ASSERT(trailing_lr_metadata != NULL);
+ op->data.send_status_from_server.trailing_metadata_count = 1;
+ op->data.send_status_from_server.trailing_metadata = trailing_lr_metadata;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.status_details = "xyz";
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(103), 1);
+ cq_expect_completion(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(status == GRPC_STATUS_OK);
+
+ gpr_free(details);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+
+ grpc_call_destroy(c);
+ grpc_call_destroy(s);
+
+ cq_verifier_destroy(cqv);
+
+ grpc_byte_buffer_destroy(request_payload);
+ grpc_byte_buffer_destroy(response_payload);
+ grpc_byte_buffer_destroy(request_payload_recv);
+ grpc_byte_buffer_destroy(response_payload_recv);
+}
+
+/* override the default for testing purposes */
+extern void (*g_load_reporting_fn)(
+ const grpc_load_reporting_call_data *call_data);
+
+static void test_load_reporting_hook(grpc_end2end_test_config config) {
+ /* TODO(dgq): this test is currently a noop until LR is fully defined.
+ * Leaving the rest here, as it'll likely be reusable. */
+
+ /* Introduce load reporting for the server through its arguments */
+ grpc_arg arg = grpc_load_reporting_enable_arg();
+ grpc_channel_args *lr_server_args =
+ grpc_channel_args_copy_and_add(NULL, &arg, 1);
+
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_load_reporting_hook", NULL, lr_server_args);
+
+ const char *method_name = "/gRPCFTW";
+ const char *request_msg = "the msg from the client";
+ const char *response_msg = "... and the response from the server";
+
+ grpc_metadata initial_lr_metadata;
+ grpc_metadata trailing_lr_metadata;
+
+ initial_lr_metadata.key = GRPC_LOAD_REPORTING_INITIAL_MD_KEY;
+ initial_lr_metadata.value = "client-token";
+ initial_lr_metadata.value_length = strlen(initial_lr_metadata.value);
+ memset(&initial_lr_metadata.internal_data, 0,
+ sizeof(initial_lr_metadata.internal_data));
+
+ trailing_lr_metadata.key = GRPC_LOAD_REPORTING_TRAILING_MD_KEY;
+ trailing_lr_metadata.value = "server-token";
+ trailing_lr_metadata.value_length = strlen(trailing_lr_metadata.value);
+ memset(&trailing_lr_metadata.internal_data, 0,
+ sizeof(trailing_lr_metadata.internal_data));
+
+ request_response_with_payload(f, method_name, request_msg, response_msg,
+ &initial_lr_metadata, &trailing_lr_metadata);
+ end_test(&f);
+ grpc_channel_args_destroy(lr_server_args);
+ config.tear_down_data(&f);
+}
+
+void load_reporting_hook(grpc_end2end_test_config config) {
+ test_load_reporting_hook(config);
+}
+
+void load_reporting_hook_pre_init(void) {}
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 3152fb7a46..a959a7e07f 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -70,7 +70,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
g_number_of_reads++;
g_number_of_bytes_read += (int)byte_count;
- grpc_pollset_kick(g_pollset, NULL);
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
@@ -179,8 +180,10 @@ static void test_receive(int number_of_clients) {
while (g_number_of_reads == number_of_reads_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
@@ -199,7 +202,8 @@ static void test_receive(int number_of_clients) {
GPR_ASSERT(g_number_of_orphan_calls == 1);
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
grpc_pollset_destroy(p);
}
diff --git a/test/core/json/json_test.c b/test/core/json/json_test.c
index ac1abbd8f3..7ea5caca5b 100644
--- a/test/core/json/json_test.c
+++ b/test/core/json/json_test.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/test/core/nanopb/fuzzer_response.c b/test/core/nanopb/fuzzer_response.c
index 21a5d7b968..75a99faf3f 100644
--- a/test/core/nanopb/fuzzer_response.c
+++ b/test/core/nanopb/fuzzer_response.c
@@ -43,9 +43,9 @@ bool leak_check = true;
int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size);
- grpc_grpclb_response *response;
- if ((response = grpc_grpclb_response_parse(slice))) {
- grpc_grpclb_response_destroy(response);
+ grpc_grpclb_initial_response *response;
+ if ((response = grpc_grpclb_initial_response_parse(slice))) {
+ grpc_grpclb_initial_response_destroy(response);
}
gpr_slice_unref(slice);
return 0;
diff --git a/test/core/transport/chttp2/timeout_encoding_test.c b/test/core/transport/timeout_encoding_test.c
index 67639936a7..b6004af7b4 100644
--- a/test/core/transport/chttp2/timeout_encoding_test.c
+++ b/test/core/transport/timeout_encoding_test.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
+#include "src/core/lib/transport/timeout_encoding.h"
#include <stdio.h>
#include <string.h>
@@ -46,8 +46,8 @@
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
static void assert_encodes_as(gpr_timespec ts, const char *s) {
- char buffer[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
- grpc_chttp2_encode_timeout(ts, buffer);
+ char buffer[GRPC_HTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
+ grpc_http2_encode_timeout(ts, buffer);
gpr_log(GPR_INFO, "check '%s' == '%s'", buffer, s);
GPR_ASSERT(0 == strcmp(buffer, s));
}
@@ -88,7 +88,7 @@ void test_encoding(void) {
static void assert_decodes_as(const char *buffer, gpr_timespec expected) {
gpr_timespec got;
gpr_log(GPR_INFO, "check decoding '%s'", buffer);
- GPR_ASSERT(1 == grpc_chttp2_decode_timeout(buffer, &got));
+ GPR_ASSERT(1 == grpc_http2_decode_timeout(buffer, &got));
GPR_ASSERT(0 == gpr_time_cmp(got, expected));
}
@@ -137,15 +137,15 @@ void test_decoding(void) {
void test_decoding_fails(void) {
gpr_timespec x;
LOG_TEST("test_decoding_fails");
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout(" ", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("x", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("1", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("1x", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("1ux", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("!", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("n1", &x));
- GPR_ASSERT(0 == grpc_chttp2_decode_timeout("-1u", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout(" ", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("x", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("1", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("1x", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("1ux", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("!", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("n1", &x));
+ GPR_ASSERT(0 == grpc_http2_decode_timeout("-1u", &x));
}
int main(int argc, char **argv) {
diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc
new file mode 100644
index 0000000000..853720fd0d
--- /dev/null
+++ b/test/cpp/end2end/filter_end2end_test.cc
@@ -0,0 +1,353 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <memory>
+#include <mutex>
+
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/generic/async_generic_service.h>
+#include <grpc++/generic/generic_stub.h>
+#include <grpc++/impl/codegen/proto_utils.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/support/config.h>
+#include <grpc++/support/slice.h>
+#include <grpc/grpc.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+#include <gtest/gtest.h>
+
+#include "src/cpp/common/channel_filter.h"
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+#include "test/cpp/util/byte_buffer_proto_helper.h"
+
+using grpc::testing::EchoRequest;
+using grpc::testing::EchoResponse;
+using std::chrono::system_clock;
+
+namespace grpc {
+namespace testing {
+namespace {
+
+void* tag(int i) { return (void*)(intptr_t)i; }
+
+void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
+ bool ok;
+ void* got_tag;
+ EXPECT_TRUE(cq->Next(&got_tag, &ok));
+ EXPECT_EQ(expect_ok, ok);
+ EXPECT_EQ(tag(i), got_tag);
+}
+
+namespace {
+
+int global_num_connections = 0;
+int global_num_calls = 0;
+mutex global_mu;
+
+void IncrementConnectionCounter() {
+ unique_lock<mutex> lock(global_mu);
+ ++global_num_connections;
+}
+
+void ResetConnectionCounter() {
+ unique_lock<mutex> lock(global_mu);
+ global_num_connections = 0;
+}
+
+int GetConnectionCounterValue() {
+ unique_lock<mutex> lock(global_mu);
+ return global_num_connections;
+}
+
+void IncrementCallCounter() {
+ unique_lock<mutex> lock(global_mu);
+ ++global_num_calls;
+}
+
+void ResetCallCounter() {
+ unique_lock<mutex> lock(global_mu);
+ global_num_calls = 0;
+}
+
+int GetCallCounterValue() {
+ unique_lock<mutex> lock(global_mu);
+ return global_num_calls;
+}
+
+} // namespace
+
+class ChannelDataImpl : public ChannelData {
+ public:
+ ChannelDataImpl(const grpc_channel_args& args, const char* peer)
+ : ChannelData(args, peer) {
+ IncrementConnectionCounter();
+ }
+};
+
+class CallDataImpl : public CallData {
+ public:
+ explicit CallDataImpl(const ChannelDataImpl& channel_data)
+ : CallData(channel_data) {}
+
+ void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ TransportStreamOp* op) GRPC_OVERRIDE {
+ // Incrementing the counter could be done from the ctor, but we want
+ // to test that the individual methods are actually called correctly.
+ if (op->recv_initial_metadata() != nullptr) IncrementCallCounter();
+ grpc_call_next_op(exec_ctx, elem, op->op());
+ }
+};
+
+class FilterEnd2endTest : public ::testing::Test {
+ protected:
+ FilterEnd2endTest() : server_host_("localhost") {}
+
+ void SetUp() GRPC_OVERRIDE {
+ int port = grpc_pick_unused_port_or_die();
+ server_address_ << server_host_ << ":" << port;
+ // Setup server
+ ServerBuilder builder;
+ builder.AddListeningPort(server_address_.str(),
+ InsecureServerCredentials());
+ builder.RegisterAsyncGenericService(&generic_service_);
+ srv_cq_ = builder.AddCompletionQueue();
+ server_ = builder.BuildAndStart();
+ }
+
+ void TearDown() GRPC_OVERRIDE {
+ server_->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ cli_cq_.Shutdown();
+ srv_cq_->Shutdown();
+ while (cli_cq_.Next(&ignored_tag, &ignored_ok))
+ ;
+ while (srv_cq_->Next(&ignored_tag, &ignored_ok))
+ ;
+ }
+
+ void ResetStub() {
+ std::shared_ptr<Channel> channel =
+ CreateChannel(server_address_.str(), InsecureChannelCredentials());
+ generic_stub_.reset(new GenericStub(channel));
+ ResetConnectionCounter();
+ ResetCallCounter();
+ }
+
+ void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
+ void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
+ void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
+ void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
+
+ void SendRpc(int num_rpcs) {
+ const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
+ for (int i = 0; i < num_rpcs; i++) {
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ GenericServerContext srv_ctx;
+ GenericServerAsyncReaderWriter stream(&srv_ctx);
+
+ // The string needs to be long enough to test heap-based slice.
+ send_request.set_message("Hello world. Hello world. Hello world.");
+ std::unique_ptr<GenericClientAsyncReaderWriter> call =
+ generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
+ client_ok(1);
+ std::unique_ptr<ByteBuffer> send_buffer =
+ SerializeToByteBuffer(&send_request);
+ call->Write(*send_buffer, tag(2));
+ // Send ByteBuffer can be destroyed after calling Write.
+ send_buffer.reset();
+ client_ok(2);
+ call->WritesDone(tag(3));
+ client_ok(3);
+
+ generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
+ srv_cq_.get(), tag(4));
+
+ verify_ok(srv_cq_.get(), 4, true);
+ EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
+ EXPECT_EQ(kMethodName, srv_ctx.method());
+ ByteBuffer recv_buffer;
+ stream.Read(&recv_buffer, tag(5));
+ server_ok(5);
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ send_buffer = SerializeToByteBuffer(&send_response);
+ stream.Write(*send_buffer, tag(6));
+ send_buffer.reset();
+ server_ok(6);
+
+ stream.Finish(Status::OK, tag(7));
+ server_ok(7);
+
+ recv_buffer.Clear();
+ call->Read(&recv_buffer, tag(8));
+ client_ok(8);
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
+
+ call->Finish(&recv_status, tag(9));
+ client_ok(9);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+ }
+ }
+
+ CompletionQueue cli_cq_;
+ std::unique_ptr<ServerCompletionQueue> srv_cq_;
+ std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
+ std::unique_ptr<grpc::GenericStub> generic_stub_;
+ std::unique_ptr<Server> server_;
+ AsyncGenericService generic_service_;
+ const grpc::string server_host_;
+ std::ostringstream server_address_;
+};
+
+TEST_F(FilterEnd2endTest, SimpleRpc) {
+ ResetStub();
+ EXPECT_EQ(0, GetConnectionCounterValue());
+ EXPECT_EQ(0, GetCallCounterValue());
+ SendRpc(1);
+ EXPECT_EQ(1, GetConnectionCounterValue());
+ EXPECT_EQ(1, GetCallCounterValue());
+}
+
+TEST_F(FilterEnd2endTest, SequentialRpcs) {
+ ResetStub();
+ EXPECT_EQ(0, GetConnectionCounterValue());
+ EXPECT_EQ(0, GetCallCounterValue());
+ SendRpc(10);
+ EXPECT_EQ(1, GetConnectionCounterValue());
+ EXPECT_EQ(10, GetCallCounterValue());
+}
+
+// One ping, one pong.
+TEST_F(FilterEnd2endTest, SimpleBidiStreaming) {
+ ResetStub();
+ EXPECT_EQ(0, GetConnectionCounterValue());
+ EXPECT_EQ(0, GetCallCounterValue());
+
+ const grpc::string kMethodName(
+ "/grpc.cpp.test.util.EchoTestService/BidiStream");
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ GenericServerContext srv_ctx;
+ GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
+
+ cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
+ send_request.set_message("Hello");
+ std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
+ generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
+ client_ok(1);
+
+ generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
+ srv_cq_.get(), tag(2));
+
+ verify_ok(srv_cq_.get(), 2, true);
+ EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
+ EXPECT_EQ(kMethodName, srv_ctx.method());
+
+ std::unique_ptr<ByteBuffer> send_buffer =
+ SerializeToByteBuffer(&send_request);
+ cli_stream->Write(*send_buffer, tag(3));
+ send_buffer.reset();
+ client_ok(3);
+
+ ByteBuffer recv_buffer;
+ srv_stream.Read(&recv_buffer, tag(4));
+ server_ok(4);
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ send_buffer = SerializeToByteBuffer(&send_response);
+ srv_stream.Write(*send_buffer, tag(5));
+ send_buffer.reset();
+ server_ok(5);
+
+ cli_stream->Read(&recv_buffer, tag(6));
+ client_ok(6);
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->WritesDone(tag(7));
+ client_ok(7);
+
+ srv_stream.Read(&recv_buffer, tag(8));
+ server_fail(8);
+
+ srv_stream.Finish(Status::OK, tag(9));
+ server_ok(9);
+
+ cli_stream->Finish(&recv_status, tag(10));
+ client_ok(10);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+
+ EXPECT_EQ(1, GetCallCounterValue());
+ EXPECT_EQ(1, GetConnectionCounterValue());
+}
+
+void RegisterFilter() {
+ grpc::RegisterChannelFilter<ChannelDataImpl, CallDataImpl>(
+ "test-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
+}
+
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc::testing::RegisterFilter();
+ return RUN_ALL_TESTS();
+}
diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc
index 778a2be573..b967a5d1e9 100644
--- a/test/cpp/end2end/server_builder_plugin_test.cc
+++ b/test/cpp/end2end/server_builder_plugin_test.cc
@@ -191,7 +191,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
// we run some tests without a service, and for those we need to supply a
// frequently polled completion queue
cq_ = builder_->AddCompletionQueue();
- cq_thread_ = grpc::thread(std::bind(&ServerBuilderPluginTest::RunCQ, this));
+ cq_thread_ = new grpc::thread(&ServerBuilderPluginTest::RunCQ, this);
server_ = builder_->BuildAndStart();
EXPECT_TRUE(CheckPresent());
}
@@ -209,7 +209,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
EXPECT_TRUE(plugin->finish_is_called());
server_->Shutdown();
cq_->Shutdown();
- cq_thread_.join();
+ cq_thread_->join();
+ delete cq_thread_;
}
string to_string(const int number) {
@@ -224,7 +225,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<Server> server_;
- grpc::thread cq_thread_;
+ grpc::thread* cq_thread_;
TestServiceImpl service_;
int port_;
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index bf77878e0a..33de1ee93c 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -58,26 +58,24 @@ TEST_F(GrpclbTest, CreateRequest) {
grpc_grpclb_request_destroy(c_req);
}
-TEST_F(GrpclbTest, ParseResponse) {
+TEST_F(GrpclbTest, ParseInitialResponse) {
LoadBalanceResponse response;
auto* initial_response = response.mutable_initial_response();
auto* client_stats_report_interval =
initial_response->mutable_client_stats_report_interval();
client_stats_report_interval->set_seconds(123);
client_stats_report_interval->set_nanos(456);
-
const std::string encoded_response = response.SerializeAsString();
gpr_slice encoded_slice =
gpr_slice_from_copied_string(encoded_response.c_str());
- grpc_grpclb_response* c_response = grpc_grpclb_response_parse(encoded_slice);
- EXPECT_TRUE(c_response->has_initial_response);
- EXPECT_FALSE(c_response->initial_response.has_load_balancer_delegate);
- EXPECT_EQ(c_response->initial_response.client_stats_report_interval.seconds,
- 123);
- EXPECT_EQ(c_response->initial_response.client_stats_report_interval.nanos,
- 456);
+
+ grpc_grpclb_initial_response* c_initial_response =
+ grpc_grpclb_initial_response_parse(encoded_slice);
+ EXPECT_FALSE(c_initial_response->has_load_balancer_delegate);
+ EXPECT_EQ(c_initial_response->client_stats_report_interval.seconds, 123);
+ EXPECT_EQ(c_initial_response->client_stats_report_interval.nanos, 456);
gpr_slice_unref(encoded_slice);
- grpc_grpclb_response_destroy(c_response);
+ grpc_grpclb_initial_response_destroy(c_initial_response);
}
TEST_F(GrpclbTest, ParseResponseServerList) {
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
new file mode 100644
index 0000000000..b2fdce2963
--- /dev/null
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -0,0 +1,689 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <cinttypes>
+#include <cstdarg>
+#include <cstdint>
+#include <cstring>
+#include <string>
+
+extern "C" {
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/support/tmpfile.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/server.h"
+#include "test/core/end2end/cq_verifier.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+}
+
+#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
+
+#define NUM_BACKENDS 4
+
+// TODO(dgq): Other scenarios in need of testing:
+// - Send an empty serverlist update and verify that the client request blocks
+// until a new serverlist with actual contents is available.
+// - Send identical serverlist update
+// - Test reception of invalid serverlist
+// - Test pinging
+// - Test against a non-LB server. That server should return UNIMPLEMENTED and
+// the call should fail.
+// - Random LB server closing the stream unexpectedly.
+
+namespace grpc {
+namespace {
+
+typedef struct client_fixture {
+ grpc_channel *client;
+ char *server_uri;
+ grpc_completion_queue *cq;
+} client_fixture;
+
+typedef struct server_fixture {
+ grpc_server *server;
+ grpc_call *server_call;
+ grpc_completion_queue *cq;
+ char *servers_hostport;
+ int port;
+ gpr_thd_id tid;
+ int num_calls_serviced;
+} server_fixture;
+
+typedef struct test_fixture {
+ server_fixture lb_server;
+ server_fixture lb_backends[NUM_BACKENDS];
+ client_fixture client;
+ int lb_server_update_delay_ms;
+} test_fixture;
+
+static void *tag(intptr_t t) { return (void *)t; }
+
+static gpr_slice build_response_payload_slice(
+ const char *host, int *ports, size_t nports,
+ int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
+ // server_list {
+ // servers {
+ // ip_address: "127.0.0.1"
+ // port: ...
+ // load_balance_token: "token..."
+ // }
+ // ...
+ // }
+ grpc::lb::v1::LoadBalanceResponse response;
+ auto *serverlist = response.mutable_server_list();
+
+ if (expiration_interval_secs > 0 || expiration_interval_nanos > 0) {
+ auto *expiration_interval = serverlist->mutable_expiration_interval();
+ if (expiration_interval_secs > 0) {
+ expiration_interval->set_seconds(expiration_interval_secs);
+ }
+ if (expiration_interval_nanos > 0) {
+ expiration_interval->set_nanos(expiration_interval_nanos);
+ }
+ }
+ for (size_t i = 0; i < nports; i++) {
+ auto *server = serverlist->add_servers();
+ server->set_ip_address(host);
+ server->set_port(ports[i]);
+ // The following long long int cast is meant to work around the
+ // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
+ // have a version for int but does have one for long long int.
+ server->set_load_balance_token("token" +
+ std::to_string((long long int)ports[i]));
+ }
+
+ gpr_log(GPR_INFO, "generating response: %s",
+ response.ShortDebugString().c_str());
+
+ const gpr_slice response_slice =
+ gpr_slice_from_copied_string(response.SerializeAsString().c_str());
+ return response_slice;
+}
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
+ NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void sleep_ms(int delay_ms) {
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
+}
+
+static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
+ int update_delay_ms) {
+ grpc_call *s;
+ cq_verifier *cqv = cq_verifier_create(sf->cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_call_error error;
+ int was_cancelled = 2;
+ grpc_byte_buffer *request_payload_recv;
+ grpc_byte_buffer *response_payload;
+
+ memset(ops, 0, sizeof(ops));
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ error = grpc_server_request_call(sf->server, &s, &call_details,
+ &request_metadata_recv, sf->cq, sf->cq,
+ tag(200));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport);
+ cq_expect_completion(cqv, tag(200), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
+
+ // receive request for backends
+ op = ops;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ cq_expect_completion(cqv, tag(202), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
+ // TODO(dgq): validate request.
+ grpc_byte_buffer_destroy(request_payload_recv);
+ gpr_slice response_payload_slice;
+ for (int i = 0; i < 2; i++) {
+ if (i == 0) {
+ // First half of the ports.
+ response_payload_slice =
+ build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1);
+ } else {
+ // Second half of the ports.
+ sleep_ms(update_delay_ms);
+ response_payload_slice =
+ build_response_payload_slice("127.0.0.1", ports + (nports / 2),
+ (nports + 1) / 2 /* ceil */, -1, -1);
+ }
+
+ response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+ op = ops;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(203), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ cq_expect_completion(cqv, tag(203), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d",
+ sf->servers_hostport, i);
+
+ grpc_byte_buffer_destroy(response_payload);
+ gpr_slice_unref(response_payload_slice);
+ }
+ gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.status_details = "xyz";
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(204), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(201), 1);
+ cq_expect_completion(cqv, tag(204), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out",
+ sf->servers_hostport);
+
+ grpc_call_destroy(s);
+
+ cq_verifier_destroy(cqv);
+
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+}
+
+static void start_backend_server(server_fixture *sf) {
+ grpc_call *s;
+ cq_verifier *cqv;
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_call_error error;
+ int was_cancelled;
+ grpc_byte_buffer *request_payload_recv;
+ grpc_byte_buffer *response_payload;
+ grpc_event ev;
+
+ while (true) {
+ memset(ops, 0, sizeof(ops));
+ cqv = cq_verifier_create(sf->cq);
+ was_cancelled = 2;
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ error = grpc_server_request_call(sf->server, &s, &call_details,
+ &request_metadata_recv, sf->cq, sf->cq,
+ tag(100));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport);
+ ev = grpc_completion_queue_next(sf->cq,
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL);
+ if (!ev.success) {
+ gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport);
+ cq_verifier_destroy(cqv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+ return;
+ }
+ GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
+ gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
+
+ bool exit = false;
+ gpr_slice response_payload_slice =
+ gpr_slice_from_copied_string("hello you");
+ while (!exit) {
+ op = ops;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ ev = grpc_completion_queue_next(
+ sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
+ if (ev.type == GRPC_OP_COMPLETE && ev.success) {
+ GPR_ASSERT(ev.tag = tag(102));
+ if (request_payload_recv == NULL) {
+ exit = true;
+ gpr_log(GPR_INFO,
+ "Server[%s] recv \"close\" from client, exiting. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ }
+ } else {
+ gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ exit = true;
+ }
+ gpr_log(GPR_INFO, "Server[%s] after tag 102. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+
+ if (!exit) {
+ response_payload =
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+ op = ops;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error =
+ grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ ev = grpc_completion_queue_next(
+ sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
+ if (ev.type == GRPC_OP_COMPLETE && ev.success) {
+ GPR_ASSERT(ev.tag = tag(103));
+ } else {
+ gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ exit = true;
+ }
+ gpr_log(GPR_INFO, "Server[%s] after tag 103. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ grpc_byte_buffer_destroy(response_payload);
+ }
+
+ grpc_byte_buffer_destroy(request_payload_recv);
+ }
+ ++sf->num_calls_serviced;
+
+ gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport);
+ gpr_slice_unref(response_payload_slice);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.status_details = "Backend server out a-ok";
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(101), 1);
+ cq_expect_completion(cqv, tag(104), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls",
+ sf->servers_hostport, sf->num_calls_serviced);
+
+ grpc_call_destroy(s);
+ cq_verifier_destroy(cqv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+ }
+}
+
+static void perform_request(client_fixture *cf) {
+ grpc_call *c;
+ cq_verifier *cqv = cq_verifier_create(cf->cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_status_code status;
+ grpc_call_error error;
+ char *details = NULL;
+ size_t details_capacity = 0;
+ grpc_byte_buffer *request_payload;
+ grpc_byte_buffer *response_payload_recv;
+ int i;
+
+ memset(ops, 0, sizeof(ops));
+ gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
+
+ c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS,
+ cf->cq, "/foo", "foo.test.google.fr:1234",
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1000), NULL);
+ gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c);
+ GPR_ASSERT(c);
+ char *peer;
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ for (i = 0; i < 4; i++) {
+ request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = request_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &response_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ peer = grpc_call_get_peer(c);
+ cq_expect_completion(cqv, tag(2), 1);
+ cq_verify(cqv);
+ gpr_free(peer);
+
+ grpc_byte_buffer_destroy(request_payload);
+ grpc_byte_buffer_destroy(response_payload_recv);
+ }
+
+ gpr_slice_unref(request_payload_slice);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(1), 1);
+ cq_expect_completion(cqv, tag(3), 1);
+ cq_verify(cqv);
+ peer = grpc_call_get_peer(c);
+ gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
+ gpr_free(peer);
+
+ grpc_call_destroy(c);
+
+ cq_verify_empty_timeout(cqv, 1);
+ cq_verifier_destroy(cqv);
+
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ gpr_free(details);
+}
+
+static void setup_client(const char *server_hostport, client_fixture *cf) {
+ cf->cq = grpc_completion_queue_create(NULL);
+ cf->server_uri = gpr_strdup(server_hostport);
+ cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL);
+}
+
+static void teardown_client(client_fixture *cf) {
+ grpc_completion_queue_shutdown(cf->cq);
+ drain_cq(cf->cq);
+ grpc_completion_queue_destroy(cf->cq);
+ cf->cq = NULL;
+ grpc_channel_destroy(cf->client);
+ cf->client = NULL;
+ gpr_free(cf->server_uri);
+}
+
+static void setup_server(const char *host, server_fixture *sf) {
+ int assigned_port;
+
+ sf->cq = grpc_completion_queue_create(NULL);
+ const char *colon_idx = strchr(host, ':');
+ if (colon_idx) {
+ const char *port_str = colon_idx + 1;
+ sf->port = atoi(port_str);
+ sf->servers_hostport = gpr_strdup(host);
+ } else {
+ sf->port = grpc_pick_unused_port_or_die();
+ gpr_join_host_port(&sf->servers_hostport, host, sf->port);
+ }
+
+ sf->server = grpc_server_create(NULL, NULL);
+ grpc_server_register_completion_queue(sf->server, sf->cq, NULL);
+ GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port(
+ sf->server, sf->servers_hostport)) > 0);
+ GPR_ASSERT(sf->port == assigned_port);
+ grpc_server_start(sf->server);
+}
+
+static void teardown_server(server_fixture *sf) {
+ if (!sf->server) return;
+
+ gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
+ grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(
+ sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(sf->server);
+ gpr_thd_join(sf->tid);
+
+ sf->server = NULL;
+ grpc_completion_queue_shutdown(sf->cq);
+ drain_cq(sf->cq);
+ grpc_completion_queue_destroy(sf->cq);
+
+ gpr_log(GPR_INFO, "Server[%s] bye bye", sf->servers_hostport);
+ gpr_free(sf->servers_hostport);
+}
+
+static void fork_backend_server(void *arg) {
+ server_fixture *sf = static_cast<server_fixture *>(arg);
+ start_backend_server(sf);
+}
+
+static void fork_lb_server(void *arg) {
+ test_fixture *tf = static_cast<test_fixture *>(arg);
+ int ports[NUM_BACKENDS];
+ for (int i = 0; i < NUM_BACKENDS; i++) {
+ ports[i] = tf->lb_backends[i].port;
+ }
+ start_lb_server(&tf->lb_server, ports, NUM_BACKENDS,
+ tf->lb_server_update_delay_ms);
+}
+
+static void setup_test_fixture(test_fixture *tf,
+ int lb_server_update_delay_ms) {
+ tf->lb_server_update_delay_ms = lb_server_update_delay_ms;
+
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+
+ for (int i = 0; i < NUM_BACKENDS; ++i) {
+ setup_server("127.0.0.1", &tf->lb_backends[i]);
+ gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server,
+ &tf->lb_backends[i], &options);
+ }
+
+ setup_server("127.0.0.1", &tf->lb_server);
+ gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options);
+
+ char *server_uri;
+ gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1",
+ tf->lb_server.servers_hostport);
+ setup_client(server_uri, &tf->client);
+ gpr_free(server_uri);
+}
+
+static void teardown_test_fixture(test_fixture *tf) {
+ teardown_client(&tf->client);
+ for (int i = 0; i < NUM_BACKENDS; ++i) {
+ teardown_server(&tf->lb_backends[i]);
+ }
+ teardown_server(&tf->lb_server);
+}
+
+// The LB server will send two updates: batch 1 and batch 2. Each batch
+// contains
+// two addresses, both of a valid and running backend server. Batch 1 is
+// readily
+// available and provided as soon as the client establishes the streaming
+// call.
+// Batch 2 is sent after a delay of \a lb_server_update_delay_ms
+// milliseconds.
+static test_fixture test_update(int lb_server_update_delay_ms) {
+ gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms);
+ test_fixture tf;
+ memset(&tf, 0, sizeof(tf));
+ setup_test_fixture(&tf, lb_server_update_delay_ms);
+ perform_request(
+ &tf.client); // "consumes" 1st backend server of 1st serverlist
+ perform_request(
+ &tf.client); // "consumes" 2nd backend server of 1st serverlist
+
+ perform_request(
+ &tf.client); // "consumes" 1st backend server of 2nd serverlist
+ perform_request(
+ &tf.client); // "consumes" 2nd backend server of 2nd serverlist
+
+ teardown_test_fixture(&tf);
+ gpr_log(GPR_INFO, "end %s(%d)", __func__, lb_server_update_delay_ms);
+ return tf;
+}
+
+} // namespace
+} // namespace grpc
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+
+ grpc::test_fixture tf_result;
+ // Clients take a bit over one second to complete a call (the last part of the
+ // call sleeps for 1 second while verifying the client's completion queue is
+ // empty). Therefore:
+ //
+ // If the LB server waits 800ms before sending an update, it will arrive
+ // before the first client request is done, skipping the second server from
+ // batch 1 altogether: the 2nd client request will go to the 1st server of
+ // batch 2 (ie, the third one out of the four total servers).
+ tf_result = grpc::test_update(800);
+ GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2);
+ GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
+
+ // If the LB server waits 1500ms, the update arrives after having picked the
+ // 2nd server from batch 1 but before the next pick for the first server of
+ // batch 2. All server are used.
+ tf_result = grpc::test_update(1500);
+ GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
+
+ // If the LB server waits > 2000ms, the update arrives after the first two
+ // request are done and the third pick is performed, which returns, in RR
+ // fashion, the 1st server of the 1st update. Therefore, the second server of
+ // batch 1 is hit at least one, whereas the first server of batch 2 is never
+ // hit.
+ tf_result = grpc::test_update(2500);
+ GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced >= 1);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced > 0);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced > 0);
+ GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
+
+ grpc_shutdown();
+ return 0;
+}
diff --git a/test/cpp/interop/interop_server.cc b/test/cpp/interop/interop_server.cc
index ebef0002a3..e5878bb248 100644
--- a/test/cpp/interop/interop_server.cc
+++ b/test/cpp/interop/interop_server.cc
@@ -31,7 +31,6 @@
*
*/
-#include <signal.h>
#include <unistd.h>
#include <fstream>
@@ -78,8 +77,6 @@ using grpc::testing::StreamingOutputCallResponse;
using grpc::testing::TestService;
using grpc::Status;
-static bool got_sigint = false;
-
const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
const char kEchoUserAgentKey[] = "x-grpc-test-echo-useragent";
@@ -311,7 +308,9 @@ class TestServiceImpl : public TestService::Service {
}
};
-void RunServer() {
+void grpc::testing::interop::RunServer(
+ std::shared_ptr<ServerCredentials> creds) {
+ GPR_ASSERT(FLAGS_port != 0);
std::ostringstream server_address;
server_address << "0.0.0.0:" << FLAGS_port;
TestServiceImpl service;
@@ -321,24 +320,10 @@ void RunServer() {
ServerBuilder builder;
builder.RegisterService(&service);
- std::shared_ptr<ServerCredentials> creds =
- grpc::testing::CreateInteropServerCredentials();
builder.AddListeningPort(server_address.str(), creds);
std::unique_ptr<Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
- while (!got_sigint) {
+ while (!g_got_sigint) {
sleep(5);
}
}
-
-static void sigint_handler(int x) { got_sigint = true; }
-
-int main(int argc, char** argv) {
- grpc::testing::InitTest(&argc, &argv, true);
- signal(SIGINT, sigint_handler);
-
- GPR_ASSERT(FLAGS_port != 0);
- RunServer();
-
- return 0;
-}
diff --git a/test/cpp/interop/interop_server_bootstrap.cc b/test/cpp/interop/interop_server_bootstrap.cc
new file mode 100644
index 0000000000..424f7ca7f0
--- /dev/null
+++ b/test/cpp/interop/interop_server_bootstrap.cc
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <signal.h>
+#include <unistd.h>
+
+#include "test/cpp/interop/server_helper.h"
+#include "test/cpp/util/test_config.h"
+
+bool grpc::testing::interop::g_got_sigint = false;
+
+static void sigint_handler(int x) {
+ grpc::testing::interop::g_got_sigint = true;
+}
+
+int main(int argc, char** argv) {
+ grpc::testing::InitTest(&argc, &argv, true);
+ signal(SIGINT, sigint_handler);
+
+ grpc::testing::interop::RunServer(
+ grpc::testing::CreateInteropServerCredentials());
+
+ return 0;
+}
diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h
index a1da14a4c8..fc4ea8b3e8 100644
--- a/test/cpp/interop/server_helper.h
+++ b/test/cpp/interop/server_helper.h
@@ -60,6 +60,12 @@ class InteropServerContextInspector {
const ::grpc::ServerContext& context_;
};
+namespace interop {
+
+extern bool g_got_sigint;
+void RunServer(std::shared_ptr<ServerCredentials> creds);
+
+} // namespace interop
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/limit_cores.cc b/test/cpp/qps/limit_cores.cc
index 59ed369067..b5c222542b 100644
--- a/test/cpp/qps/limit_cores.cc
+++ b/test/cpp/qps/limit_cores.cc
@@ -68,9 +68,9 @@ int LimitCores(const int* cores, int cores_size) {
cores_set++;
}
}
- GPR_ASSERT(sched_setaffinity(0, size, cpup) == 0);
+ bool affinity_set = (sched_setaffinity(0, size, cpup) == 0);
CPU_FREE(cpup);
- return cores_set;
+ return affinity_set ? cores_set : num_cores;
}
} // namespace testing