aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-08-02 10:05:17 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-08-02 10:05:17 -0700
commit6e26952db4fc3eef2c764d1202281d8cdbb1975a (patch)
tree10ad005176bbca463c2f7afd90e25448247127ad /test
parent9fd00425c50800906f4deb2c5aafa9b64217b54c (diff)
parent1e3fdbe7fd7831fe31b05a3da8044872564c4351 (diff)
Merge remote-tracking branch 'upstream/master' into filter_call_init_failure
Diffstat (limited to 'test')
-rw-r--r--test/core/channel/channel_stack_test.c3
-rw-r--r--test/core/end2end/bad_server_response_test.c4
-rw-r--r--test/core/end2end/cq_verifier.c9
-rw-r--r--test/core/end2end/cq_verifier.h3
-rw-r--r--test/core/end2end/end2end_nosec_tests.c8
-rw-r--r--test/core/end2end/end2end_tests.c8
-rw-r--r--test/core/end2end/fixtures/h2_load_reporting.c (renamed from test/core/end2end/fixtures/h2_loadreporting.c)95
-rw-r--r--test/core/end2end/fuzzers/hpack.dictionary6
-rwxr-xr-xtest/core/end2end/gen_build_yaml.py3
-rw-r--r--test/core/end2end/tests/filter_causes_close.c2
-rw-r--r--test/core/end2end/tests/load_reporting_hook.c321
-rw-r--r--test/core/iomgr/udp_server_test.c12
-rw-r--r--test/core/nanopb/fuzzer_response.c6
-rw-r--r--test/core/support/slice_test.c22
-rw-r--r--test/cpp/end2end/async_end2end_test.cc15
-rw-r--r--test/cpp/end2end/end2end_test.cc2
-rw-r--r--test/cpp/end2end/proto_server_reflection_test.cc24
-rw-r--r--test/cpp/end2end/server_builder_plugin_test.cc7
-rw-r--r--test/cpp/grpclb/grpclb_api_test.cc18
-rw-r--r--test/cpp/grpclb/grpclb_test.cc688
-rw-r--r--test/cpp/util/grpc_cli.cc89
-rw-r--r--test/cpp/util/proto_file_parser.cc47
-rw-r--r--test/cpp/util/proto_file_parser.h11
-rw-r--r--test/cpp/util/proto_reflection_descriptor_database.cc24
-rw-r--r--test/cpp/util/proto_reflection_descriptor_database.h23
-rwxr-xr-xtest/distrib/python/run_distrib_test.sh55
26 files changed, 1297 insertions, 208 deletions
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index d6c8a9a142..569b3f7cd2 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -65,7 +65,8 @@ static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {}
static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
+ const grpc_call_final_info *final_info,
+ void *ignored) {
++*(int *)(elem->channel_data);
}
diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c
index cca75f54a5..ab80adf0e0 100644
--- a/test/core/end2end/bad_server_response_test.c
+++ b/test/core/end2end/bad_server_response_test.c
@@ -71,6 +71,8 @@
#define UNPARSEABLE_DETAIL_MSG "Failed parsing HTTP/2"
+#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server"
+
/* TODO(zyc) Check the content of incomming data instead of using this length */
#define EXPECTED_INCOMING_DATA_LENGTH (size_t)310
@@ -334,7 +336,7 @@ int main(int argc, char **argv) {
/* http1 response */
run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE,
- UNPARSEABLE_DETAIL_MSG);
+ HTTP1_DETAIL_MSG);
return 0;
}
diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c
index 890309c44a..b77568058c 100644
--- a/test/core/end2end/cq_verifier.c
+++ b/test/core/end2end/cq_verifier.c
@@ -259,9 +259,10 @@ void cq_verify(cq_verifier *v) {
gpr_strvec_destroy(&have_tags);
}
-void cq_verify_empty(cq_verifier *v) {
- gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(1, GPR_TIMESPAN));
+void cq_verify_empty_timeout(cq_verifier *v, int timeout_sec) {
+ gpr_timespec deadline =
+ gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(timeout_sec, GPR_TIMESPAN));
grpc_event ev;
GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty");
@@ -275,6 +276,8 @@ void cq_verify_empty(cq_verifier *v) {
}
}
+void cq_verify_empty(cq_verifier *v) { cq_verify_empty_timeout(v, 1); }
+
static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) {
expectation *e = gpr_malloc(sizeof(expectation));
e->type = type;
diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h
index 8c9a85c218..bf82468c9a 100644
--- a/test/core/end2end/cq_verifier.h
+++ b/test/core/end2end/cq_verifier.h
@@ -55,6 +55,9 @@ void cq_verify(cq_verifier *v);
/* ensure that the completion queue is empty */
void cq_verify_empty(cq_verifier *v);
+/* ensure that the completion queue is empty, waiting up to \a timeout secs. */
+void cq_verify_empty_timeout(cq_verifier *v, int timeout_sec);
+
/* Various expectation matchers
Any functions taking ... expect a NULL terminated list of key/value pairs
(each pair using two parameter slots) of metadata that MUST be present in
diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c
index f91d018dc8..3efd18cf2e 100644
--- a/test/core/end2end/end2end_nosec_tests.c
+++ b/test/core/end2end/end2end_nosec_tests.c
@@ -85,6 +85,8 @@ extern void invoke_large_request(grpc_end2end_test_config config);
extern void invoke_large_request_pre_init(void);
extern void large_metadata(grpc_end2end_test_config config);
extern void large_metadata_pre_init(void);
+extern void load_reporting_hook(grpc_end2end_test_config config);
+extern void load_reporting_hook_pre_init(void);
extern void max_concurrent_streams(grpc_end2end_test_config config);
extern void max_concurrent_streams_pre_init(void);
extern void max_message_length(grpc_end2end_test_config config);
@@ -148,6 +150,7 @@ void grpc_end2end_tests_pre_init(void) {
idempotent_request_pre_init();
invoke_large_request_pre_init();
large_metadata_pre_init();
+ load_reporting_hook_pre_init();
max_concurrent_streams_pre_init();
max_message_length_pre_init();
negative_deadline_pre_init();
@@ -197,6 +200,7 @@ void grpc_end2end_tests(int argc, char **argv,
idempotent_request(config);
invoke_large_request(config);
large_metadata(config);
+ load_reporting_hook(config);
max_concurrent_streams(config);
max_message_length(config);
negative_deadline(config);
@@ -304,6 +308,10 @@ void grpc_end2end_tests(int argc, char **argv,
large_metadata(config);
continue;
}
+ if (0 == strcmp("load_reporting_hook", argv[i])) {
+ load_reporting_hook(config);
+ continue;
+ }
if (0 == strcmp("max_concurrent_streams", argv[i])) {
max_concurrent_streams(config);
continue;
diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c
index 1347043bd5..e3d791abc1 100644
--- a/test/core/end2end/end2end_tests.c
+++ b/test/core/end2end/end2end_tests.c
@@ -87,6 +87,8 @@ extern void invoke_large_request(grpc_end2end_test_config config);
extern void invoke_large_request_pre_init(void);
extern void large_metadata(grpc_end2end_test_config config);
extern void large_metadata_pre_init(void);
+extern void load_reporting_hook(grpc_end2end_test_config config);
+extern void load_reporting_hook_pre_init(void);
extern void max_concurrent_streams(grpc_end2end_test_config config);
extern void max_concurrent_streams_pre_init(void);
extern void max_message_length(grpc_end2end_test_config config);
@@ -151,6 +153,7 @@ void grpc_end2end_tests_pre_init(void) {
idempotent_request_pre_init();
invoke_large_request_pre_init();
large_metadata_pre_init();
+ load_reporting_hook_pre_init();
max_concurrent_streams_pre_init();
max_message_length_pre_init();
negative_deadline_pre_init();
@@ -201,6 +204,7 @@ void grpc_end2end_tests(int argc, char **argv,
idempotent_request(config);
invoke_large_request(config);
large_metadata(config);
+ load_reporting_hook(config);
max_concurrent_streams(config);
max_message_length(config);
negative_deadline(config);
@@ -312,6 +316,10 @@ void grpc_end2end_tests(int argc, char **argv,
large_metadata(config);
continue;
}
+ if (0 == strcmp("load_reporting_hook", argv[i])) {
+ load_reporting_hook(config);
+ continue;
+ }
if (0 == strcmp("max_concurrent_streams", argv[i])) {
max_concurrent_streams(config);
continue;
diff --git a/test/core/end2end/fixtures/h2_loadreporting.c b/test/core/end2end/fixtures/h2_load_reporting.c
index 4ed02f9728..f6d3923db9 100644
--- a/test/core/end2end/fixtures/h2_loadreporting.c
+++ b/test/core/end2end/fixtures/h2_load_reporting.c
@@ -52,18 +52,16 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
-static grpc_load_reporting_config *g_client_lrc;
-static grpc_load_reporting_config *g_server_lrc;
-
-typedef struct fullstack_fixture_data {
+typedef struct load_reporting_fixture_data {
char *localaddr;
-} fullstack_fixture_data;
+} load_reporting_fixture_data;
-static grpc_end2end_test_fixture chttp2_create_fixture_fullstack(
+static grpc_end2end_test_fixture chttp2_create_fixture_load_reporting(
grpc_channel_args *client_args, grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
int port = grpc_pick_unused_port_or_die();
- fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data));
+ load_reporting_fixture_data *ffd =
+ gpr_malloc(sizeof(load_reporting_fixture_data));
memset(&f, 0, sizeof(f));
gpr_join_host_port(&ffd->localaddr, "localhost", port);
@@ -74,47 +72,20 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack(
return f;
}
-typedef struct {
- int64_t total_bytes;
- bool fully_processed;
- uint32_t initial_token;
- uint32_t final_token;
-} aggregated_bw_stats;
-
-static void sample_fn(const grpc_load_reporting_call_data *call_data,
- void *user_data) {
- GPR_ASSERT(user_data != NULL);
- aggregated_bw_stats *custom_stats = (aggregated_bw_stats *)user_data;
- if (call_data == NULL) {
- /* initial invocation */
- custom_stats->initial_token = 0xDEADBEEF;
- } else {
- /* final invocation */
- custom_stats->total_bytes =
- (int64_t)(call_data->stats->transport_stream_stats.outgoing.data_bytes +
- call_data->stats->transport_stream_stats.incoming.data_bytes);
- custom_stats->final_token = 0xCAFED00D;
- custom_stats->fully_processed = true;
- }
-}
-
-void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f,
- grpc_channel_args *client_args) {
- fullstack_fixture_data *ffd = f->fixture_data;
- grpc_arg arg = grpc_load_reporting_config_create_arg(g_client_lrc);
- client_args = grpc_channel_args_copy_and_add(client_args, &arg, 1);
+void chttp2_init_client_load_reporting(grpc_end2end_test_fixture *f,
+ grpc_channel_args *client_args) {
+ load_reporting_fixture_data *ffd = f->fixture_data;
f->client = grpc_insecure_channel_create(ffd->localaddr, client_args, NULL);
- grpc_channel_args_destroy(client_args);
GPR_ASSERT(f->client);
}
-void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
- grpc_channel_args *server_args) {
- fullstack_fixture_data *ffd = f->fixture_data;
+void chttp2_init_server_load_reporting(grpc_end2end_test_fixture *f,
+ grpc_channel_args *server_args) {
+ load_reporting_fixture_data *ffd = f->fixture_data;
+ grpc_arg arg = grpc_load_reporting_enable_arg();
if (f->server) {
grpc_server_destroy(f->server);
}
- grpc_arg arg = grpc_load_reporting_config_create_arg(g_server_lrc);
server_args = grpc_channel_args_copy_and_add(server_args, &arg, 1);
f->server = grpc_server_create(server_args, NULL);
grpc_channel_args_destroy(server_args);
@@ -123,36 +94,23 @@ void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f,
grpc_server_start(f->server);
}
-void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) {
- fullstack_fixture_data *ffd = f->fixture_data;
+void chttp2_tear_down_load_reporting(grpc_end2end_test_fixture *f) {
+ load_reporting_fixture_data *ffd = f->fixture_data;
gpr_free(ffd->localaddr);
gpr_free(ffd);
}
/* All test configurations */
static grpc_end2end_test_config configs[] = {
- {"chttp2/fullstack+loadreporting", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
- chttp2_create_fixture_fullstack, chttp2_init_client_fullstack,
- chttp2_init_server_fullstack, chttp2_tear_down_fullstack},
+ {"chttp2/fullstack+load_reporting",
+ FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
+ chttp2_create_fixture_load_reporting, chttp2_init_client_load_reporting,
+ chttp2_init_server_load_reporting, chttp2_tear_down_load_reporting},
};
int main(int argc, char **argv) {
size_t i;
- aggregated_bw_stats *aggr_stats_client =
- gpr_malloc(sizeof(aggregated_bw_stats));
- aggr_stats_client->total_bytes = -1;
- aggr_stats_client->fully_processed = false;
- aggregated_bw_stats *aggr_stats_server =
- gpr_malloc(sizeof(aggregated_bw_stats));
- aggr_stats_server->total_bytes = -1;
- aggr_stats_server->fully_processed = false;
-
- g_client_lrc =
- grpc_load_reporting_config_create(sample_fn, aggr_stats_client);
- g_server_lrc =
- grpc_load_reporting_config_create(sample_fn, aggr_stats_server);
-
grpc_test_init(argc, argv);
grpc_end2end_tests_pre_init();
grpc_init();
@@ -163,22 +121,5 @@ int main(int argc, char **argv) {
grpc_shutdown();
- grpc_load_reporting_config_destroy(g_client_lrc);
- grpc_load_reporting_config_destroy(g_server_lrc);
-
- if (aggr_stats_client->fully_processed) {
- GPR_ASSERT(aggr_stats_client->total_bytes >= 0);
- GPR_ASSERT(aggr_stats_client->initial_token == 0xDEADBEEF);
- GPR_ASSERT(aggr_stats_client->final_token == 0xCAFED00D);
- }
- if (aggr_stats_server->fully_processed) {
- GPR_ASSERT(aggr_stats_server->total_bytes >= 0);
- GPR_ASSERT(aggr_stats_server->initial_token == 0xDEADBEEF);
- GPR_ASSERT(aggr_stats_server->final_token == 0xCAFED00D);
- }
-
- gpr_free(aggr_stats_client);
- gpr_free(aggr_stats_server);
-
return 0;
}
diff --git a/test/core/end2end/fuzzers/hpack.dictionary b/test/core/end2end/fuzzers/hpack.dictionary
index 097e9a8922..af075c09ef 100644
--- a/test/core/end2end/fuzzers/hpack.dictionary
+++ b/test/core/end2end/fuzzers/hpack.dictionary
@@ -63,7 +63,8 @@
"\x13if-unmodified-since"
"\x0Dlast-modified"
"\x04link"
-"\x0Eload-reporting"
+"\x16load-reporting-initial"
+"\x17load-reporting-trailing"
"\x08location"
"\x0Cmax-forwards"
"\x07:method"
@@ -137,7 +138,8 @@
"\x00\x13if-unmodified-since\x00"
"\x00\x0Dlast-modified\x00"
"\x00\x04link\x00"
-"\x00\x0Eload-reporting\x00"
+"\x00\x16load-reporting-initial\x00"
+"\x00\x17load-reporting-trailing\x00"
"\x00\x08location\x00"
"\x00\x0Cmax-forwards\x00"
"\x00\x07:method\x03GET"
diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py
index 8655da65d9..e59b7dc9fb 100755
--- a/test/core/end2end/gen_build_yaml.py
+++ b/test/core/end2end/gen_build_yaml.py
@@ -53,13 +53,13 @@ fd_unsecure_fixture_options = default_unsecure_fixture_options._replace(
END2END_FIXTURES = {
'h2_compress': default_unsecure_fixture_options,
'h2_census': default_unsecure_fixture_options,
+ 'h2_load_reporting': default_unsecure_fixture_options,
'h2_fakesec': default_secure_fixture_options._replace(ci_mac=False),
'h2_fd': fd_unsecure_fixture_options,
'h2_full': default_unsecure_fixture_options,
'h2_full+pipe': default_unsecure_fixture_options._replace(
platforms=['linux']),
'h2_full+trace': default_unsecure_fixture_options._replace(tracing=True),
- 'h2_loadreporting': default_unsecure_fixture_options,
'h2_oauth2': default_secure_fixture_options._replace(ci_mac=False),
'h2_proxy': default_unsecure_fixture_options._replace(includes_proxy=True,
ci_mac=False),
@@ -116,6 +116,7 @@ END2END_TESTS = {
'network_status_change': default_test_options,
'no_op': default_test_options,
'payload': default_test_options,
+ 'load_reporting_hook': default_test_options,
'ping_pong_streaming': default_test_options,
'ping': connectivity_test_options._replace(proxyable=False),
'registered_call': default_test_options,
diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c
index 8b0c8dba61..c6c36d668b 100644
--- a/test/core/end2end/tests/filter_causes_close.c
+++ b/test/core/end2end/tests/filter_causes_close.c
@@ -240,7 +240,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats,
+ const grpc_call_final_info *final_info,
void *and_free_memory) {}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
diff --git a/test/core/end2end/tests/load_reporting_hook.c b/test/core/end2end/tests/load_reporting_hook.c
new file mode 100644
index 0000000000..2c6519881a
--- /dev/null
+++ b/test/core/end2end/tests/load_reporting_hook.c
@@ -0,0 +1,321 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <string.h>
+
+#include <grpc/byte_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "test/core/end2end/cq_verifier.h"
+
+#include "src/core/ext/load_reporting/load_reporting.h"
+#include "src/core/ext/load_reporting/load_reporting_filter.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/transport/static_metadata.h"
+
+enum { TIMEOUT = 200000 };
+
+static void *tag(intptr_t t) { return (void *)t; }
+
+typedef struct {
+ gpr_mu mu;
+ intptr_t channel_id;
+ intptr_t call_id;
+
+ char *initial_md_str;
+ char *trailing_md_str;
+ char *method_name;
+
+ uint64_t incoming_bytes;
+ uint64_t outgoing_bytes;
+
+ grpc_status_code call_final_status;
+
+ bool fully_processed;
+} load_reporting_data;
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char *test_name,
+ grpc_channel_args *client_args,
+ grpc_channel_args *server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "%s/%s", test_name, config.name);
+
+ f = config.create_fixture(client_args, server_args);
+ config.init_server(&f, server_args);
+ config.init_client(&f, client_args);
+
+ return f;
+}
+
+static gpr_timespec n_seconds_time(int n) {
+ return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
+}
+
+static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); }
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture *f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(
+ f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = NULL;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture *f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = NULL;
+}
+
+static void end_test(grpc_end2end_test_fixture *f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+}
+
+static void request_response_with_payload(grpc_end2end_test_fixture f,
+ const char *method_name,
+ const char *request_msg,
+ const char *response_msg,
+ grpc_metadata *initial_lr_metadata,
+ grpc_metadata *trailing_lr_metadata) {
+ gpr_slice request_payload_slice = gpr_slice_from_static_string(request_msg);
+ gpr_slice response_payload_slice = gpr_slice_from_static_string(response_msg);
+ grpc_call *c;
+ grpc_call *s;
+ grpc_byte_buffer *request_payload =
+ grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+ grpc_byte_buffer *response_payload =
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+ gpr_timespec deadline = five_seconds_time();
+ cq_verifier *cqv = cq_verifier_create(f.cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_byte_buffer *request_payload_recv = NULL;
+ grpc_byte_buffer *response_payload_recv = NULL;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ grpc_call_error error;
+ char *details = NULL;
+ size_t details_capacity = 0;
+ int was_cancelled = 2;
+
+ c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ method_name, "foo.test.google.fr", deadline,
+ NULL);
+ GPR_ASSERT(c);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ GPR_ASSERT(initial_lr_metadata != NULL);
+ op->data.send_initial_metadata.count = 1;
+ op->data.send_initial_metadata.metadata = initial_lr_metadata;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = request_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &response_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ error =
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(101));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ cq_expect_completion(cqv, tag(101), 1);
+ cq_verify(cqv);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(102), 1);
+ cq_verify(cqv);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ GPR_ASSERT(trailing_lr_metadata != NULL);
+ op->data.send_status_from_server.trailing_metadata_count = 1;
+ op->data.send_status_from_server.trailing_metadata = trailing_lr_metadata;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.status_details = "xyz";
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(103), 1);
+ cq_expect_completion(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(status == GRPC_STATUS_OK);
+
+ gpr_free(details);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+
+ grpc_call_destroy(c);
+ grpc_call_destroy(s);
+
+ cq_verifier_destroy(cqv);
+
+ grpc_byte_buffer_destroy(request_payload);
+ grpc_byte_buffer_destroy(response_payload);
+ grpc_byte_buffer_destroy(request_payload_recv);
+ grpc_byte_buffer_destroy(response_payload_recv);
+}
+
+/* override the default for testing purposes */
+extern void (*g_load_reporting_fn)(
+ const grpc_load_reporting_call_data *call_data);
+
+static void test_load_reporting_hook(grpc_end2end_test_config config) {
+ /* TODO(dgq): this test is currently a noop until LR is fully defined.
+ * Leaving the rest here, as it'll likely be reusable. */
+
+ /* Introduce load reporting for the server through its arguments */
+ grpc_arg arg = grpc_load_reporting_enable_arg();
+ grpc_channel_args *lr_server_args =
+ grpc_channel_args_copy_and_add(NULL, &arg, 1);
+
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_load_reporting_hook", NULL, lr_server_args);
+
+ const char *method_name = "/gRPCFTW";
+ const char *request_msg = "the msg from the client";
+ const char *response_msg = "... and the response from the server";
+
+ grpc_metadata initial_lr_metadata;
+ grpc_metadata trailing_lr_metadata;
+
+ initial_lr_metadata.key = GRPC_LOAD_REPORTING_INITIAL_MD_KEY;
+ initial_lr_metadata.value = "client-token";
+ initial_lr_metadata.value_length = strlen(initial_lr_metadata.value);
+ memset(&initial_lr_metadata.internal_data, 0,
+ sizeof(initial_lr_metadata.internal_data));
+
+ trailing_lr_metadata.key = GRPC_LOAD_REPORTING_TRAILING_MD_KEY;
+ trailing_lr_metadata.value = "server-token";
+ trailing_lr_metadata.value_length = strlen(trailing_lr_metadata.value);
+ memset(&trailing_lr_metadata.internal_data, 0,
+ sizeof(trailing_lr_metadata.internal_data));
+
+ request_response_with_payload(f, method_name, request_msg, response_msg,
+ &initial_lr_metadata, &trailing_lr_metadata);
+ end_test(&f);
+ grpc_channel_args_destroy(lr_server_args);
+ config.tear_down_data(&f);
+}
+
+void load_reporting_hook(grpc_end2end_test_config config) {
+ test_load_reporting_hook(config);
+}
+
+void load_reporting_hook_pre_init(void) {}
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 3152fb7a46..a959a7e07f 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -70,7 +70,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
g_number_of_reads++;
g_number_of_bytes_read += (int)byte_count;
- grpc_pollset_kick(g_pollset, NULL);
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
@@ -179,8 +180,10 @@ static void test_receive(int number_of_clients) {
while (g_number_of_reads == number_of_reads_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
@@ -199,7 +202,8 @@ static void test_receive(int number_of_clients) {
GPR_ASSERT(g_number_of_orphan_calls == 1);
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
grpc_pollset_destroy(p);
}
diff --git a/test/core/nanopb/fuzzer_response.c b/test/core/nanopb/fuzzer_response.c
index 21a5d7b968..75a99faf3f 100644
--- a/test/core/nanopb/fuzzer_response.c
+++ b/test/core/nanopb/fuzzer_response.c
@@ -43,9 +43,9 @@ bool leak_check = true;
int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size);
- grpc_grpclb_response *response;
- if ((response = grpc_grpclb_response_parse(slice))) {
- grpc_grpclb_response_destroy(response);
+ grpc_grpclb_initial_response *response;
+ if ((response = grpc_grpclb_initial_response_parse(slice))) {
+ grpc_grpclb_initial_response_destroy(response);
}
gpr_slice_unref(slice);
return 0;
diff --git a/test/core/support/slice_test.c b/test/core/support/slice_test.c
index 0da483a321..06c364b368 100644
--- a/test/core/support/slice_test.c
+++ b/test/core/support/slice_test.c
@@ -85,6 +85,27 @@ static void test_slice_new_returns_something_sensible(void) {
gpr_slice_unref(slice);
}
+/* destroy function that sets a mark to indicate it was called. */
+static void set_mark(void *p) { *((int *)p) = 1; }
+
+static void test_slice_new_with_user_data(void) {
+ int marker = 0;
+ uint8_t buf[2];
+ gpr_slice slice;
+
+ buf[0] = 0;
+ buf[1] = 1;
+ slice = gpr_slice_new_with_user_data(buf, 2, set_mark, &marker);
+ GPR_ASSERT(marker == 0);
+ GPR_ASSERT(GPR_SLICE_LENGTH(slice) == 2);
+ GPR_ASSERT(GPR_SLICE_START_PTR(slice)[0] == 0);
+ GPR_ASSERT(GPR_SLICE_START_PTR(slice)[1] == 1);
+
+ /* unref should cause destroy function to run. */
+ gpr_slice_unref(slice);
+ GPR_ASSERT(marker == 1);
+}
+
static int do_nothing_with_len_1_calls = 0;
static void do_nothing_with_len_1(void *ignored, size_t len) {
@@ -232,6 +253,7 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_slice_malloc_returns_something_sensible();
test_slice_new_returns_something_sensible();
+ test_slice_new_with_user_data();
test_slice_new_with_len_returns_something_sensible();
for (length = 0; length < 128; length++) {
test_slice_sub_works(length);
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 4a8936d281..ac79fe8274 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -200,6 +200,10 @@ class Verifier {
bool spin_;
};
+bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
+ return plugin->has_sync_methods();
+}
+
// This class disables the server builder plugins that may add sync services to
// the server. If there are sync services, UnimplementedRpc test will triger
// the sync unkown rpc routine on the server side, rather than the async one
@@ -210,14 +214,9 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
void UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins)
GRPC_OVERRIDE {
- auto plugin = plugins->begin();
- while (plugin != plugins->end()) {
- if ((*plugin)->has_sync_methods()) {
- plugins->erase(plugin++);
- } else {
- plugin++;
- }
- }
+ plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
+ plugin_has_sync_methods),
+ plugins->end());
}
};
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 0f87ae3e44..46a58d3ac3 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1414,7 +1414,7 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) {
std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
std::vector<grpc::string_ref> tst =
auth_ctx->FindPropertyValues("transport_security_type");
- EXPECT_EQ(1u, tst.size());
+ ASSERT_EQ(1u, tst.size());
EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
if (GetParam().credentials_type == kTlsCredentialsType) {
EXPECT_EQ("x509_subject_alternative_name",
diff --git a/test/cpp/end2end/proto_server_reflection_test.cc b/test/cpp/end2end/proto_server_reflection_test.cc
index f8fc39b553..efbb0e1f8e 100644
--- a/test/cpp/end2end/proto_server_reflection_test.cc
+++ b/test/cpp/end2end/proto_server_reflection_test.cc
@@ -31,7 +31,6 @@
*
*/
-#include <google/protobuf/descriptor.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
@@ -59,7 +58,7 @@ class ProtoServerReflectionTest : public ::testing::Test {
void SetUp() GRPC_OVERRIDE {
port_ = grpc_pick_unused_port_or_die();
- ref_desc_pool_ = google::protobuf::DescriptorPool::generated_pool();
+ ref_desc_pool_ = protobuf::DescriptorPool::generated_pool();
ServerBuilder builder;
grpc::string server_address = "localhost:" + to_string(port_);
@@ -73,7 +72,7 @@ class ProtoServerReflectionTest : public ::testing::Test {
CreateChannel(target, InsecureChannelCredentials());
stub_ = grpc::testing::EchoTestService::NewStub(channel);
desc_db_.reset(new ProtoReflectionDescriptorDatabase(channel));
- desc_pool_.reset(new google::protobuf::DescriptorPool(desc_db_.get()));
+ desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get()));
}
string to_string(const int number) {
@@ -83,15 +82,15 @@ class ProtoServerReflectionTest : public ::testing::Test {
}
void CompareService(const grpc::string& service) {
- const google::protobuf::ServiceDescriptor* service_desc =
+ const protobuf::ServiceDescriptor* service_desc =
desc_pool_->FindServiceByName(service);
- const google::protobuf::ServiceDescriptor* ref_service_desc =
+ const protobuf::ServiceDescriptor* ref_service_desc =
ref_desc_pool_->FindServiceByName(service);
EXPECT_TRUE(service_desc != nullptr);
EXPECT_TRUE(ref_service_desc != nullptr);
EXPECT_EQ(service_desc->DebugString(), ref_service_desc->DebugString());
- const google::protobuf::FileDescriptor* file_desc = service_desc->file();
+ const protobuf::FileDescriptor* file_desc = service_desc->file();
if (known_files_.find(file_desc->package() + "/" + file_desc->name()) !=
known_files_.end()) {
EXPECT_EQ(file_desc->DebugString(),
@@ -105,9 +104,9 @@ class ProtoServerReflectionTest : public ::testing::Test {
}
void CompareMethod(const grpc::string& method) {
- const google::protobuf::MethodDescriptor* method_desc =
+ const protobuf::MethodDescriptor* method_desc =
desc_pool_->FindMethodByName(method);
- const google::protobuf::MethodDescriptor* ref_method_desc =
+ const protobuf::MethodDescriptor* ref_method_desc =
ref_desc_pool_->FindMethodByName(method);
EXPECT_TRUE(method_desc != nullptr);
EXPECT_TRUE(ref_method_desc != nullptr);
@@ -122,9 +121,8 @@ class ProtoServerReflectionTest : public ::testing::Test {
return;
}
- const google::protobuf::Descriptor* desc =
- desc_pool_->FindMessageTypeByName(type);
- const google::protobuf::Descriptor* ref_desc =
+ const protobuf::Descriptor* desc = desc_pool_->FindMessageTypeByName(type);
+ const protobuf::Descriptor* ref_desc =
ref_desc_pool_->FindMessageTypeByName(type);
EXPECT_TRUE(desc != nullptr);
EXPECT_TRUE(ref_desc != nullptr);
@@ -135,10 +133,10 @@ class ProtoServerReflectionTest : public ::testing::Test {
std::unique_ptr<Server> server_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ProtoReflectionDescriptorDatabase> desc_db_;
- std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_;
+ std::unique_ptr<protobuf::DescriptorPool> desc_pool_;
std::unordered_set<string> known_files_;
std::unordered_set<string> known_types_;
- const google::protobuf::DescriptorPool* ref_desc_pool_;
+ const protobuf::DescriptorPool* ref_desc_pool_;
int port_;
reflection::ProtoServerReflectionPlugin plugin_;
};
diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc
index 778a2be573..b967a5d1e9 100644
--- a/test/cpp/end2end/server_builder_plugin_test.cc
+++ b/test/cpp/end2end/server_builder_plugin_test.cc
@@ -191,7 +191,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
// we run some tests without a service, and for those we need to supply a
// frequently polled completion queue
cq_ = builder_->AddCompletionQueue();
- cq_thread_ = grpc::thread(std::bind(&ServerBuilderPluginTest::RunCQ, this));
+ cq_thread_ = new grpc::thread(&ServerBuilderPluginTest::RunCQ, this);
server_ = builder_->BuildAndStart();
EXPECT_TRUE(CheckPresent());
}
@@ -209,7 +209,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
EXPECT_TRUE(plugin->finish_is_called());
server_->Shutdown();
cq_->Shutdown();
- cq_thread_.join();
+ cq_thread_->join();
+ delete cq_thread_;
}
string to_string(const int number) {
@@ -224,7 +225,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<Server> server_;
- grpc::thread cq_thread_;
+ grpc::thread* cq_thread_;
TestServiceImpl service_;
int port_;
diff --git a/test/cpp/grpclb/grpclb_api_test.cc b/test/cpp/grpclb/grpclb_api_test.cc
index bf77878e0a..33de1ee93c 100644
--- a/test/cpp/grpclb/grpclb_api_test.cc
+++ b/test/cpp/grpclb/grpclb_api_test.cc
@@ -58,26 +58,24 @@ TEST_F(GrpclbTest, CreateRequest) {
grpc_grpclb_request_destroy(c_req);
}
-TEST_F(GrpclbTest, ParseResponse) {
+TEST_F(GrpclbTest, ParseInitialResponse) {
LoadBalanceResponse response;
auto* initial_response = response.mutable_initial_response();
auto* client_stats_report_interval =
initial_response->mutable_client_stats_report_interval();
client_stats_report_interval->set_seconds(123);
client_stats_report_interval->set_nanos(456);
-
const std::string encoded_response = response.SerializeAsString();
gpr_slice encoded_slice =
gpr_slice_from_copied_string(encoded_response.c_str());
- grpc_grpclb_response* c_response = grpc_grpclb_response_parse(encoded_slice);
- EXPECT_TRUE(c_response->has_initial_response);
- EXPECT_FALSE(c_response->initial_response.has_load_balancer_delegate);
- EXPECT_EQ(c_response->initial_response.client_stats_report_interval.seconds,
- 123);
- EXPECT_EQ(c_response->initial_response.client_stats_report_interval.nanos,
- 456);
+
+ grpc_grpclb_initial_response* c_initial_response =
+ grpc_grpclb_initial_response_parse(encoded_slice);
+ EXPECT_FALSE(c_initial_response->has_load_balancer_delegate);
+ EXPECT_EQ(c_initial_response->client_stats_report_interval.seconds, 123);
+ EXPECT_EQ(c_initial_response->client_stats_report_interval.nanos, 456);
gpr_slice_unref(encoded_slice);
- grpc_grpclb_response_destroy(c_response);
+ grpc_grpclb_initial_response_destroy(c_initial_response);
}
TEST_F(GrpclbTest, ParseResponseServerList) {
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
new file mode 100644
index 0000000000..1430f9de58
--- /dev/null
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -0,0 +1,688 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <cinttypes>
+#include <cstdarg>
+#include <cstdint>
+#include <cstring>
+#include <string>
+
+extern "C" {
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+#include "src/core/ext/client_config/client_channel.h"
+#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/support/tmpfile.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/server.h"
+#include "test/core/end2end/cq_verifier.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+}
+
+#include "src/proto/grpc/lb/v1/load_balancer.pb.h"
+
+#define NUM_BACKENDS 4
+
+// TODO(dgq): Other scenarios in need of testing:
+// - Send an empty serverlist update and verify that the client request blocks
+// until a new serverlist with actual contents is available.
+// - Send identical serverlist update
+// - Test reception of invalid serverlist
+// - Test pinging
+// - Test against a non-LB server. That server should return UNIMPLEMENTED and
+// the call should fail.
+// - Random LB server closing the stream unexpectedly.
+
+namespace grpc {
+namespace {
+
+typedef struct client_fixture {
+ grpc_channel *client;
+ char *server_uri;
+ grpc_completion_queue *cq;
+} client_fixture;
+
+typedef struct server_fixture {
+ grpc_server *server;
+ grpc_call *server_call;
+ grpc_completion_queue *cq;
+ char *servers_hostport;
+ int port;
+ gpr_thd_id tid;
+ int num_calls_serviced;
+} server_fixture;
+
+typedef struct test_fixture {
+ server_fixture lb_server;
+ server_fixture lb_backends[NUM_BACKENDS];
+ client_fixture client;
+ int lb_server_update_delay_ms;
+} test_fixture;
+
+static void *tag(intptr_t t) { return (void *)t; }
+
+static gpr_slice build_response_payload_slice(
+ const char *host, int *ports, size_t nports,
+ int64_t expiration_interval_secs, int32_t expiration_interval_nanos) {
+ // server_list {
+ // servers {
+ // ip_address: "127.0.0.1"
+ // port: ...
+ // load_balance_token: "token..."
+ // }
+ // ...
+ // }
+ grpc::lb::v1::LoadBalanceResponse response;
+ auto *serverlist = response.mutable_server_list();
+
+ if (expiration_interval_secs > 0 || expiration_interval_nanos > 0) {
+ auto *expiration_interval = serverlist->mutable_expiration_interval();
+ if (expiration_interval_secs > 0) {
+ expiration_interval->set_seconds(expiration_interval_secs);
+ }
+ if (expiration_interval_nanos > 0) {
+ expiration_interval->set_nanos(expiration_interval_nanos);
+ }
+ }
+ for (size_t i = 0; i < nports; i++) {
+ auto *server = serverlist->add_servers();
+ server->set_ip_address(host);
+ server->set_port(ports[i]);
+ // The following long long int cast is meant to work around the
+ // disfunctional implementation of std::to_string in gcc 4.4, which doesn't
+ // have a version for int but does have one for long long int.
+ server->set_load_balance_token("token" +
+ std::to_string((long long int)ports[i]));
+ }
+
+ gpr_log(GPR_INFO, "generating response: %s",
+ response.ShortDebugString().c_str());
+
+ const gpr_slice response_slice =
+ gpr_slice_from_copied_string(response.SerializeAsString().c_str());
+ return response_slice;
+}
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
+ NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void sleep_ms(int delay_ms) {
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(delay_ms, GPR_TIMESPAN)));
+}
+
+static void start_lb_server(server_fixture *sf, int *ports, size_t nports,
+ int update_delay_ms) {
+ grpc_call *s;
+ cq_verifier *cqv = cq_verifier_create(sf->cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_call_error error;
+ int was_cancelled = 2;
+ grpc_byte_buffer *request_payload_recv;
+ grpc_byte_buffer *response_payload;
+
+ memset(ops, 0, sizeof(ops));
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ error = grpc_server_request_call(sf->server, &s, &call_details,
+ &request_metadata_recv, sf->cq, sf->cq,
+ tag(200));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport);
+ cq_expect_completion(cqv, tag(200), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport);
+
+ // receive request for backends
+ op = ops;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ cq_expect_completion(cqv, tag(202), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport);
+ // TODO(dgq): validate request.
+ grpc_byte_buffer_destroy(request_payload_recv);
+ gpr_slice response_payload_slice;
+ for (int i = 0; i < 2; i++) {
+ if (i == 0) {
+ // First half of the ports.
+ response_payload_slice =
+ build_response_payload_slice("127.0.0.1", ports, nports / 2, -1, -1);
+ } else {
+ // Second half of the ports.
+ sleep_ms(update_delay_ms);
+ response_payload_slice =
+ build_response_payload_slice("127.0.0.1", ports + (nports / 2),
+ (nports + 1) / 2 /* ceil */, -1, -1);
+ }
+
+ response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+ op = ops;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(203), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ cq_expect_completion(cqv, tag(203), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d",
+ sf->servers_hostport, i);
+
+ grpc_byte_buffer_destroy(response_payload);
+ gpr_slice_unref(response_payload_slice);
+ }
+ gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.status_details = "xyz";
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(204), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(201), 1);
+ cq_expect_completion(cqv, tag(204), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out",
+ sf->servers_hostport);
+
+ grpc_call_destroy(s);
+
+ cq_verifier_destroy(cqv);
+
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+}
+
+static void start_backend_server(server_fixture *sf) {
+ grpc_call *s;
+ cq_verifier *cqv;
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_call_error error;
+ int was_cancelled;
+ grpc_byte_buffer *request_payload_recv;
+ grpc_byte_buffer *response_payload;
+ grpc_event ev;
+
+ while (true) {
+ memset(ops, 0, sizeof(ops));
+ cqv = cq_verifier_create(sf->cq);
+ was_cancelled = 2;
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ error = grpc_server_request_call(sf->server, &s, &call_details,
+ &request_metadata_recv, sf->cq, sf->cq,
+ tag(100));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport);
+ ev = grpc_completion_queue_next(sf->cq,
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL);
+ if (!ev.success) {
+ gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport);
+ cq_verifier_destroy(cqv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+ return;
+ }
+ GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
+ gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport);
+
+ bool exit = false;
+ gpr_slice response_payload_slice =
+ gpr_slice_from_copied_string("hello you");
+ while (!exit) {
+ op = ops;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &request_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ ev = grpc_completion_queue_next(
+ sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
+ if (ev.type == GRPC_OP_COMPLETE && ev.success) {
+ GPR_ASSERT(ev.tag = tag(102));
+ if (request_payload_recv == NULL) {
+ exit = true;
+ gpr_log(GPR_INFO,
+ "Server[%s] recv \"close\" from client, exiting. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ }
+ } else {
+ gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ exit = true;
+ }
+ gpr_log(GPR_INFO, "Server[%s] after tag 102. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+
+ if (!exit) {
+ response_payload =
+ grpc_raw_byte_buffer_create(&response_payload_slice, 1);
+ op = ops;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = response_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error =
+ grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ ev = grpc_completion_queue_next(
+ sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL);
+ if (ev.type == GRPC_OP_COMPLETE && ev.success) {
+ GPR_ASSERT(ev.tag = tag(103));
+ } else {
+ gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ exit = true;
+ }
+ gpr_log(GPR_INFO, "Server[%s] after tag 103. Call #%d",
+ sf->servers_hostport, sf->num_calls_serviced);
+ grpc_byte_buffer_destroy(response_payload);
+ }
+
+ grpc_byte_buffer_destroy(request_payload_recv);
+ }
+ ++sf->num_calls_serviced;
+
+ gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport);
+ gpr_slice_unref(response_payload_slice);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_OK;
+ op->data.send_status_from_server.status_details = "Backend server out a-ok";
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(101), 1);
+ cq_expect_completion(cqv, tag(104), 1);
+ cq_verify(cqv);
+ gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls",
+ sf->servers_hostport, sf->num_calls_serviced);
+
+ grpc_call_destroy(s);
+ cq_verifier_destroy(cqv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+ }
+}
+
+static void perform_request(client_fixture *cf) {
+ grpc_call *c;
+ cq_verifier *cqv = cq_verifier_create(cf->cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_status_code status;
+ grpc_call_error error;
+ char *details = NULL;
+ size_t details_capacity = 0;
+ grpc_byte_buffer *request_payload;
+ grpc_byte_buffer *response_payload_recv;
+ int i;
+
+ memset(ops, 0, sizeof(ops));
+ gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
+
+ c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS,
+ cf->cq, "/foo", "foo.test.google.fr:1234",
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1000), NULL);
+ gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c);
+ GPR_ASSERT(c);
+ char *peer;
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.status_details_capacity = &details_capacity;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ for (i = 0; i < 4; i++) {
+ request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = request_payload;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &response_payload_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ peer = grpc_call_get_peer(c);
+ cq_expect_completion(cqv, tag(2), 1);
+ cq_verify(cqv);
+ gpr_free(peer);
+
+ grpc_byte_buffer_destroy(request_payload);
+ grpc_byte_buffer_destroy(response_payload_recv);
+ }
+
+ gpr_slice_unref(request_payload_slice);
+
+ op = ops;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ cq_expect_completion(cqv, tag(1), 1);
+ cq_expect_completion(cqv, tag(3), 1);
+ cq_verify(cqv);
+ peer = grpc_call_get_peer(c);
+ gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer);
+ gpr_free(peer);
+
+ grpc_call_destroy(c);
+
+ cq_verify_empty_timeout(cqv, 1);
+ cq_verifier_destroy(cqv);
+
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ gpr_free(details);
+}
+
+static void setup_client(const char *server_hostport, client_fixture *cf) {
+ cf->cq = grpc_completion_queue_create(NULL);
+ cf->server_uri = gpr_strdup(server_hostport);
+ cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL);
+}
+
+static void teardown_client(client_fixture *cf) {
+ grpc_completion_queue_shutdown(cf->cq);
+ drain_cq(cf->cq);
+ grpc_completion_queue_destroy(cf->cq);
+ cf->cq = NULL;
+ grpc_channel_destroy(cf->client);
+ cf->client = NULL;
+ gpr_free(cf->server_uri);
+}
+
+static void setup_server(const char *host, server_fixture *sf) {
+ int assigned_port;
+
+ sf->cq = grpc_completion_queue_create(NULL);
+ const char *colon_idx = strchr(host, ':');
+ if (colon_idx) {
+ const char *port_str = colon_idx + 1;
+ sf->port = atoi(port_str);
+ sf->servers_hostport = gpr_strdup(host);
+ } else {
+ sf->port = grpc_pick_unused_port_or_die();
+ gpr_join_host_port(&sf->servers_hostport, host, sf->port);
+ }
+
+ sf->server = grpc_server_create(NULL, NULL);
+ grpc_server_register_completion_queue(sf->server, sf->cq, NULL);
+ GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port(
+ sf->server, sf->servers_hostport)) > 0);
+ GPR_ASSERT(sf->port == assigned_port);
+ grpc_server_start(sf->server);
+}
+
+static void teardown_server(server_fixture *sf) {
+ if (!sf->server) return;
+
+ gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
+ grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(
+ sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(sf->server);
+ gpr_thd_join(sf->tid);
+
+ sf->server = NULL;
+ grpc_completion_queue_shutdown(sf->cq);
+ drain_cq(sf->cq);
+ grpc_completion_queue_destroy(sf->cq);
+
+ gpr_log(GPR_INFO, "Server[%s] bye bye", sf->servers_hostport);
+ gpr_free(sf->servers_hostport);
+}
+
+static void fork_backend_server(void *arg) {
+ server_fixture *sf = static_cast<server_fixture *>(arg);
+ start_backend_server(sf);
+}
+
+static void fork_lb_server(void *arg) {
+ test_fixture *tf = static_cast<test_fixture *>(arg);
+ int ports[NUM_BACKENDS];
+ for (int i = 0; i < NUM_BACKENDS; i++) {
+ ports[i] = tf->lb_backends[i].port;
+ }
+ start_lb_server(&tf->lb_server, ports, NUM_BACKENDS,
+ tf->lb_server_update_delay_ms);
+}
+
+static void setup_test_fixture(test_fixture *tf,
+ int lb_server_update_delay_ms) {
+ tf->lb_server_update_delay_ms = lb_server_update_delay_ms;
+
+ gpr_thd_options options = gpr_thd_options_default();
+ gpr_thd_options_set_joinable(&options);
+
+ for (int i = 0; i < NUM_BACKENDS; ++i) {
+ setup_server("127.0.0.1", &tf->lb_backends[i]);
+ gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server,
+ &tf->lb_backends[i], &options);
+ }
+
+ setup_server("127.0.0.1", &tf->lb_server);
+ gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options);
+
+ char *server_uri;
+ gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1",
+ tf->lb_server.servers_hostport);
+ setup_client(server_uri, &tf->client);
+ gpr_free(server_uri);
+}
+
+static void teardown_test_fixture(test_fixture *tf) {
+ teardown_client(&tf->client);
+ for (int i = 0; i < NUM_BACKENDS; ++i) {
+ teardown_server(&tf->lb_backends[i]);
+ }
+ teardown_server(&tf->lb_server);
+}
+
+// The LB server will send two updates: batch 1 and batch 2. Each batch
+// contains
+// two addresses, both of a valid and running backend server. Batch 1 is
+// readily
+// available and provided as soon as the client establishes the streaming
+// call.
+// Batch 2 is sent after a delay of \a lb_server_update_delay_ms
+// milliseconds.
+static test_fixture test_update(int lb_server_update_delay_ms) {
+ gpr_log(GPR_INFO, "start %s(%d)", __func__, lb_server_update_delay_ms);
+ test_fixture tf;
+ memset(&tf, 0, sizeof(tf));
+ setup_test_fixture(&tf, lb_server_update_delay_ms);
+ perform_request(
+ &tf.client); // "consumes" 1st backend server of 1st serverlist
+ perform_request(
+ &tf.client); // "consumes" 2nd backend server of 1st serverlist
+
+ perform_request(
+ &tf.client); // "consumes" 1st backend server of 2nd serverlist
+ perform_request(
+ &tf.client); // "consumes" 2nd backend server of 2nd serverlist
+
+ teardown_test_fixture(&tf);
+ gpr_log(GPR_INFO, "end %s(%d)", __func__, lb_server_update_delay_ms);
+ return tf;
+}
+
+} // namespace
+} // namespace grpc
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+
+ grpc::test_fixture tf_result;
+ // Clients take a bit over one second to complete a call (the last part of the
+ // call sleeps for 1 second while verifying the client's completion queue is
+ // empty). Therefore:
+ //
+ // If the LB server waits 800ms before sending an update, it will arrive
+ // before the first client request is done, skipping the second server from
+ // batch 1 altogether: the 2nd client request will go to the 1st server of
+ // batch 2 (ie, the third one out of the four total servers).
+ tf_result = grpc::test_update(800);
+ GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2);
+ GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
+
+ // If the LB server waits 1500ms, the update arrives after having picked the
+ // 2nd server from batch 1 but before the next pick for the first server of
+ // batch 2. All server are used.
+ tf_result = grpc::test_update(1500);
+ GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1);
+
+ // If the LB server waits > 2000ms, the update arrives after the first two
+ // request are done and the third pick is performed, which returns, in RR
+ // fashion, the 1st server of the 1st update. Therefore, the second server of
+ // batch 1 is hit twice, whereas the first server of batch 2 is never hit.
+ tf_result = grpc::test_update(2100);
+ GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 2);
+ GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1);
+ GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0);
+
+ grpc_shutdown();
+ return 0;
+}
diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc
index c52e48bae6..53529da782 100644
--- a/test/cpp/util/grpc_cli.cc
+++ b/test/cpp/util/grpc_cli.cc
@@ -34,18 +34,21 @@
/*
A command line tool to talk to a grpc server.
Example of talking to grpc interop server:
- grpc_cli call localhost:50051 UnaryCall src/proto/grpc/testing/test.proto \
- "response_size:10" --enable_ssl=false
+ grpc_cli call localhost:50051 UnaryCall "response_size:10" \
+ --protofiles=src/proto/grpc/testing/test.proto --enable_ssl=false
Options:
- 1. --proto_path, if your proto file is not under current working directory,
+ 1. --protofiles, use this flag to provide a proto file if the server does
+ does not have the reflection service.
+ 2. --proto_path, if your proto file is not under current working directory,
use this flag to provide a search root. It should work similar to the
- counterpart in protoc.
- 2. --metadata specifies metadata to be sent to the server, such as:
+ counterpart in protoc. This option is valid only when protofiles is
+ provided.
+ 3. --metadata specifies metadata to be sent to the server, such as:
--metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2"
- 3. --enable_ssl, whether to use tls.
- 4. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call
- 3. --input_binary_file, a file containing the serialized request. The file
+ 4. --enable_ssl, whether to use tls.
+ 5. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call
+ 6. --input_binary_file, a file containing the serialized request. The file
can be generated by calling something like:
protoc --proto_path=src/proto/grpc/testing/ \
--encode=grpc.testing.SimpleRequest \
@@ -53,7 +56,7 @@
< input.txt > input.bin
If this is used and no proto file is provided in the argument list, the
method string has to be exact in the form of /package.service/method.
- 4. --output_binary_file, a file to write binary format response into, it can
+ 7. --output_binary_file, a file to write binary format response into, it can
be later decoded using protoc:
protoc --proto_path=src/proto/grpc/testing/ \
--decode=grpc.testing.SimpleResponse \
@@ -61,6 +64,7 @@
< output.bin > output.txt
*/
+#include <unistd.h>
#include <fstream>
#include <iostream>
#include <sstream>
@@ -86,6 +90,8 @@ DEFINE_string(output_binary_file, "",
DEFINE_string(metadata, "",
"Metadata to send to server, in the form of key1:val1:key2:val2");
DEFINE_string(proto_path, ".", "Path to look for the proto file.");
+// TODO(zyc): support a list of input proto files
+DEFINE_string(protofiles, "", "Name of the proto file.");
void ParseMetadataFlag(
std::multimap<grpc::string, grpc::string>* client_metadata) {
@@ -129,35 +135,61 @@ void PrintMetadata(const T& m, const grpc::string& message) {
int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
- if (argc < 4 || argc == 5 || grpc::string(argv[1]) != "call") {
+ if (argc < 4 || grpc::string(argv[1]) != "call") {
std::cout << "Usage: grpc_cli call server_host:port method_name "
<< "[proto file] [text format request] [<options>]" << std::endl;
+ return 1;
}
- grpc::string file_name;
grpc::string request_text;
grpc::string server_address(argv[2]);
grpc::string method_name(argv[3]);
std::unique_ptr<grpc::testing::ProtoFileParser> parser;
grpc::string serialized_request_proto;
- if (argc == 6) {
- file_name = argv[4];
- // TODO(yangg) read from stdin as well?
- request_text = argv[5];
+ if (argc == 5) {
+ request_text = argv[4];
+ }
+
+ std::shared_ptr<grpc::ChannelCredentials> creds;
+ if (!FLAGS_enable_ssl) {
+ creds = grpc::InsecureChannelCredentials();
+ } else {
+ if (FLAGS_use_auth) {
+ creds = grpc::GoogleDefaultCredentials();
+ } else {
+ creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
+ }
}
+ std::shared_ptr<grpc::Channel> channel =
+ grpc::CreateChannel(server_address, creds);
if (request_text.empty() && FLAGS_input_binary_file.empty()) {
- std::cout << "Missing input. Use text format input or "
- << "--input_binary_file for serialized request" << std::endl;
- return 1;
- } else if (!request_text.empty()) {
- parser.reset(new grpc::testing::ProtoFileParser(FLAGS_proto_path, file_name,
- method_name));
+ if (isatty(STDIN_FILENO)) {
+ std::cout << "reading request message from stdin..." << std::endl;
+ }
+ std::stringstream input_stream;
+ input_stream << std::cin.rdbuf();
+ request_text = input_stream.str();
+ }
+
+ if (!request_text.empty()) {
+ if (!FLAGS_protofiles.empty()) {
+ parser.reset(new grpc::testing::ProtoFileParser(
+ FLAGS_proto_path, FLAGS_protofiles, method_name));
+ } else {
+ parser.reset(new grpc::testing::ProtoFileParser(channel, method_name));
+ }
method_name = parser->GetFullMethodName();
if (parser->HasError()) {
return 1;
}
+
+ if (!FLAGS_input_binary_file.empty()) {
+ std::cout
+ << "warning: request given in argv, ignoring --input_binary_file"
+ << std::endl;
+ }
}
if (parser) {
@@ -175,19 +207,6 @@ int main(int argc, char** argv) {
}
std::cout << "connecting to " << server_address << std::endl;
- std::shared_ptr<grpc::ChannelCredentials> creds;
- if (!FLAGS_enable_ssl) {
- creds = grpc::InsecureChannelCredentials();
- } else {
- if (FLAGS_use_auth) {
- creds = grpc::GoogleDefaultCredentials();
- } else {
- creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
- }
- }
- std::shared_ptr<grpc::Channel> channel =
- grpc::CreateChannel(server_address, creds);
-
grpc::string serialized_response_proto;
std::multimap<grpc::string, grpc::string> client_metadata;
std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
@@ -219,7 +238,7 @@ int main(int argc, char** argv) {
}
} else {
std::cout << "Rpc failed with status code " << s.error_code()
- << " error message " << s.error_message() << std::endl;
+ << ", error message: " << s.error_message() << std::endl;
}
return 0;
diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc
index 25aec329eb..5b0d925e1c 100644
--- a/test/cpp/util/proto_file_parser.cc
+++ b/test/cpp/util/proto_file_parser.cc
@@ -95,9 +95,48 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path,
dynamic_factory_.reset(
new google::protobuf::DynamicMessageFactory(importer_->pool()));
+ std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list;
+ for (int i = 0; i < file_desc->service_count(); i++) {
+ service_desc_list.push_back(file_desc->service(i));
+ }
+ InitProtoFileParser(method, service_desc_list);
+}
+
+ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method)
+ : has_error_(false),
+ desc_db_(new grpc::ProtoReflectionDescriptorDatabase(channel)),
+ desc_pool_(new google::protobuf::DescriptorPool(desc_db_.get())) {
+ std::vector<std::string> service_list;
+ if (!desc_db_->GetServices(&service_list)) {
+ LogError(
+ "Failed to get services from the server, "
+ "it may not have the reflection service.\n"
+ "Please try to use the --protofiles option to provide a proto file.");
+ }
+ if (has_error_) {
+ return;
+ }
+ dynamic_factory_.reset(
+ new google::protobuf::DynamicMessageFactory(desc_pool_.get()));
+
+ std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list;
+ for (auto it = service_list.begin(); it != service_list.end(); it++) {
+ service_desc_list.push_back(desc_pool_->FindServiceByName(*it));
+ }
+ InitProtoFileParser(method, service_desc_list);
+}
+
+ProtoFileParser::~ProtoFileParser() {}
+
+void ProtoFileParser::InitProtoFileParser(
+ const grpc::string& method,
+ const std::vector<const google::protobuf::ServiceDescriptor*>
+ service_desc_list) {
const google::protobuf::MethodDescriptor* method_descriptor = nullptr;
- for (int i = 0; !method_descriptor && i < file_desc->service_count(); i++) {
- const auto* service_desc = file_desc->service(i);
+ for (auto it = service_desc_list.begin(); it != service_desc_list.end();
+ it++) {
+ const auto* service_desc = *it;
for (int j = 0; j < service_desc->method_count(); j++) {
const auto* method_desc = service_desc->method(j);
if (MethodNameMatch(method_desc->full_name(), method)) {
@@ -130,8 +169,6 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path,
dynamic_factory_->GetPrototype(method_descriptor->output_type())->New());
}
-ProtoFileParser::~ProtoFileParser() {}
-
grpc::string ProtoFileParser::GetSerializedProto(
const grpc::string& text_format_proto, bool is_request) {
grpc::string serialized;
@@ -143,7 +180,7 @@ grpc::string ProtoFileParser::GetSerializedProto(
LogError("Failed to parse text format to proto.");
return "";
}
- ok = request_prototype_->SerializeToString(&serialized);
+ ok = msg->SerializeToString(&serialized);
if (!ok) {
LogError("Failed to serialize proto.");
return "";
diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h
index 46cdd66503..b442d77db9 100644
--- a/test/cpp/util/proto_file_parser.h
+++ b/test/cpp/util/proto_file_parser.h
@@ -38,8 +38,10 @@
#include <google/protobuf/compiler/importer.h>
#include <google/protobuf/dynamic_message.h>
+#include <grpc++/channel.h>
#include "src/compiler/config.h"
+#include "test/cpp/util/proto_reflection_descriptor_database.h"
namespace grpc {
namespace testing {
@@ -53,6 +55,9 @@ class ProtoFileParser {
// even just Method. It will log an error if there is ambiguity.
ProtoFileParser(const grpc::string& proto_path, const grpc::string& file_name,
const grpc::string& method);
+
+ ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method);
~ProtoFileParser();
grpc::string GetFullMethodName() const { return full_method_name_; }
@@ -68,12 +73,18 @@ class ProtoFileParser {
void LogError(const grpc::string& error_msg);
private:
+ void InitProtoFileParser(
+ const grpc::string& method,
+ const std::vector<const google::protobuf::ServiceDescriptor*> services);
+
bool has_error_;
grpc::string request_text_;
grpc::string full_method_name_;
google::protobuf::compiler::DiskSourceTree source_tree_;
std::unique_ptr<ErrorPrinter> error_printer_;
std::unique_ptr<google::protobuf::compiler::Importer> importer_;
+ std::unique_ptr<grpc::ProtoReflectionDescriptorDatabase> desc_db_;
+ std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_;
std::unique_ptr<google::protobuf::DynamicMessageFactory> dynamic_factory_;
std::unique_ptr<grpc::protobuf::Message> request_prototype_;
std::unique_ptr<grpc::protobuf::Message> response_prototype_;
diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc
index 25b720aee0..8fd466feb0 100644
--- a/test/cpp/util/proto_reflection_descriptor_database.cc
+++ b/test/cpp/util/proto_reflection_descriptor_database.cc
@@ -53,10 +53,20 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
std::shared_ptr<grpc::Channel> channel)
: stub_(ServerReflection::NewStub(channel)) {}
-ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {}
+ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {
+ if (stream_) {
+ stream_->WritesDone();
+ Status status = stream_->Finish();
+ if (!status.ok()) {
+ gpr_log(GPR_ERROR,
+ "ServerReflectionInfo rpc failed. Error code: %d, details: %s",
+ (int)status.error_code(), status.error_message().c_str());
+ }
+ }
+}
bool ProtoReflectionDescriptorDatabase::FindFileByName(
- const string& filename, google::protobuf::FileDescriptorProto* output) {
+ const string& filename, protobuf::FileDescriptorProto* output) {
if (cached_db_.FindFileByName(filename, output)) {
return true;
}
@@ -101,7 +111,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileByName(
}
bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol(
- const string& symbol_name, google::protobuf::FileDescriptorProto* output) {
+ const string& symbol_name, protobuf::FileDescriptorProto* output) {
if (cached_db_.FindFileContainingSymbol(symbol_name, output)) {
return true;
}
@@ -148,7 +158,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol(
bool ProtoReflectionDescriptorDatabase::FindFileContainingExtension(
const string& containing_type, int field_number,
- google::protobuf::FileDescriptorProto* output) {
+ protobuf::FileDescriptorProto* output) {
if (cached_db_.FindFileContainingExtension(containing_type, field_number,
output)) {
return true;
@@ -276,10 +286,10 @@ bool ProtoReflectionDescriptorDatabase::GetServices(
return false;
}
-const google::protobuf::FileDescriptorProto
+const protobuf::FileDescriptorProto
ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse(
const std::string& byte_fd_proto) {
- google::protobuf::FileDescriptorProto file_desc_proto;
+ protobuf::FileDescriptorProto file_desc_proto;
file_desc_proto.ParseFromString(byte_fd_proto);
return file_desc_proto;
}
@@ -287,7 +297,7 @@ ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse(
void ProtoReflectionDescriptorDatabase::AddFileFromResponse(
const grpc::reflection::v1alpha::FileDescriptorResponse& response) {
for (int i = 0; i < response.file_descriptor_proto_size(); ++i) {
- const google::protobuf::FileDescriptorProto file_proto =
+ const protobuf::FileDescriptorProto file_proto =
ParseFileDescriptorProtoResponse(response.file_descriptor_proto(i));
if (known_files_.find(file_proto.name()) == known_files_.end()) {
known_files_.insert(file_proto.name());
diff --git a/test/cpp/util/proto_reflection_descriptor_database.h b/test/cpp/util/proto_reflection_descriptor_database.h
index 99c00675bb..eb7cf4907d 100644
--- a/test/cpp/util/proto_reflection_descriptor_database.h
+++ b/test/cpp/util/proto_reflection_descriptor_database.h
@@ -38,19 +38,19 @@
#include <unordered_set>
#include <vector>
-#include <google/protobuf/descriptor.h>
-#include <google/protobuf/descriptor.pb.h>
-#include <google/protobuf/descriptor_database.h>
+// GRPC_NO_GENERATED_CODE indicates generated pb files should not be used
+#ifdef GRPC_NO_GENERATED_CODE
+#include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h"
+#else
#include <grpc++/ext/reflection.grpc.pb.h>
+#endif // GRPC_NO_GENERATED_CODE
#include <grpc++/grpc++.h>
-
namespace grpc {
// ProtoReflectionDescriptorDatabase takes a stub of ServerReflection and
// provides the methods defined by DescriptorDatabase interfaces. It can be used
// to feed a DescriptorPool instance.
-class ProtoReflectionDescriptorDatabase
- : public google::protobuf::DescriptorDatabase {
+class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
public:
explicit ProtoReflectionDescriptorDatabase(
std::unique_ptr<reflection::v1alpha::ServerReflection::Stub> stub);
@@ -65,14 +65,13 @@ class ProtoReflectionDescriptorDatabase
// Find a file by file name. Fills in in *output and returns true if found.
// Otherwise, returns false, leaving the contents of *output undefined.
bool FindFileByName(const string& filename,
- google::protobuf::FileDescriptorProto* output)
- GRPC_OVERRIDE;
+ protobuf::FileDescriptorProto* output) GRPC_OVERRIDE;
// Find the file that declares the given fully-qualified symbol name.
// If found, fills in *output and returns true, otherwise returns false
// and leaves *output undefined.
bool FindFileContainingSymbol(const string& symbol_name,
- google::protobuf::FileDescriptorProto* output)
+ protobuf::FileDescriptorProto* output)
GRPC_OVERRIDE;
// Find the file which defines an extension extending the given message type
@@ -81,7 +80,7 @@ class ProtoReflectionDescriptorDatabase
// must be a fully-qualified type name.
bool FindFileContainingExtension(
const string& containing_type, int field_number,
- google::protobuf::FileDescriptorProto* output) GRPC_OVERRIDE;
+ protobuf::FileDescriptorProto* output) GRPC_OVERRIDE;
// Finds the tag numbers used by all known extensions of
// extendee_type, and appends them to output in an undefined
@@ -102,7 +101,7 @@ class ProtoReflectionDescriptorDatabase
grpc::reflection::v1alpha::ServerReflectionResponse>
ClientStream;
- const google::protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse(
+ const protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse(
const std::string& byte_fd_proto);
void AddFileFromResponse(
@@ -123,7 +122,7 @@ class ProtoReflectionDescriptorDatabase
std::unordered_map<string, std::vector<int>> cached_extension_numbers_;
std::mutex stream_mutex_;
- google::protobuf::SimpleDescriptorDatabase cached_db_;
+ protobuf::SimpleDescriptorDatabase cached_db_;
};
} // namespace grpc
diff --git a/test/distrib/python/run_distrib_test.sh b/test/distrib/python/run_distrib_test.sh
index 8a983bc248..0c2154838f 100755
--- a/test/distrib/python/run_distrib_test.sh
+++ b/test/distrib/python/run_distrib_test.sh
@@ -33,34 +33,47 @@ set -ex
cd $(dirname $0)
# Pick up the source dist archive whatever its version is
+SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.tar.gz
BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.whl
+TOOLS_SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.tar.gz
TOOLS_BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.whl
-if [ ! -f ${SDIST_ARCHIVE} ]
-then
- echo "Archive ${SDIST_ARCHIVE} does not exist."
- exit 1
-fi
+function make_virtualenv() {
+ virtualenv $1
+ $1/bin/python -m pip install --upgrade six pip
+ $1/bin/python -m pip install cython
+}
-PYTHON=python2
-PIP=pip2
-which $PYTHON || PYTHON=python
-which $PIP || PIP=pip
+function at_least_one_installs() {
+ for file in "$@"; do
+ if python -m pip install $file; then
+ return 0
+ fi
+ done
+ return -1
+}
-# TODO(jtattermusch): this shouldn't be required
-# TODO(jtattermusch): run the command twice to workaround docker-on-overlay
-# issue https://github.com/docker/docker/issues/12327
-# (first attempt will fail when using docker with overlayFS)
-${PIP} install --upgrade six pip || ${PIP} install --upgrade six pip
+make_virtualenv bdist_test
+make_virtualenv sdist_test
-# At least one of the bdist packages has to succeed (whichever one matches the
-# test machine, anyway).
-for bdist in ${BDIST_ARCHIVES} ${TOOLS_BDIST_ARCHIVES}; do
- ($PYTHON -m pip install $bdist) || true
-done
+#
+# Install our distributions in order of dependencies
+#
+
+(source bdist_test/bin/activate && at_least_one_installs ${BDIST_ARCHIVES})
+(source bdist_test/bin/activate && at_least_one_installs ${TOOLS_BDIST_ARCHIVES})
+
+(source sdist_test/bin/activate && at_least_one_installs ${SDIST_ARCHIVES})
+(source sdist_test/bin/activate && at_least_one_installs ${TOOLS_SDIST_ARCHIVES})
+
+#
+# Test our distributions
+#
# TODO(jtattermusch): add a .proto file to the distribtest, generate python
# code from it and then use the generated code from distribtest.py
-$PYTHON -m grpc.tools.protoc
+(source bdist_test/bin/activate && python -m grpc.tools.protoc --help)
+(source sdist_test/bin/activate && python -m grpc.tools.protoc --help)
-$PYTHON distribtest.py
+(source bdist_test/bin/activate && python distribtest.py)
+(source sdist_test/bin/activate && python distribtest.py)