aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-08-01 11:02:22 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2016-08-01 11:02:22 -0700
commit87e06c84fdbffe89316cd52dc68ae5180b73b278 (patch)
tree522b195d777f3dd7f076f79d957bf51c982d1712 /test
parent8d48911faa184cbf8eaa79e1d8efc37f02bef5d2 (diff)
parent2507fef7379aa54cbced074f1b1e7ba3b53eae26 (diff)
Merge branch 'master' of github.com:grpc/grpc into grpclb_v0
Diffstat (limited to 'test')
-rw-r--r--test/core/end2end/bad_server_response_test.c4
-rw-r--r--test/core/end2end/tests/high_initial_seqno.c6
-rw-r--r--test/core/end2end/tests/network_status_change.c5
-rw-r--r--test/core/internal_api_canaries/iomgr.c13
-rw-r--r--test/core/iomgr/udp_server_test.c12
-rw-r--r--test/core/iomgr/workqueue_test.c150
-rw-r--r--test/core/support/slice_test.c22
-rw-r--r--test/core/util/mock_endpoint.c12
-rw-r--r--test/core/util/passthru_endpoint.c12
-rw-r--r--test/cpp/end2end/async_end2end_test.cc40
-rw-r--r--test/cpp/end2end/end2end_test.cc5
-rw-r--r--test/cpp/end2end/proto_server_reflection_test.cc24
-rw-r--r--test/cpp/end2end/server_builder_plugin_test.cc7
-rw-r--r--test/cpp/qps/client.h68
-rw-r--r--test/cpp/qps/client_async.cc103
-rw-r--r--test/cpp/qps/client_sync.cc28
-rw-r--r--test/cpp/qps/driver.cc135
-rw-r--r--test/cpp/qps/driver.h2
-rwxr-xr-xtest/cpp/qps/gen_build_yaml.py11
-rw-r--r--test/cpp/qps/json_run_localhost.cc2
-rw-r--r--test/cpp/qps/qps_json_driver.cc18
-rw-r--r--test/cpp/qps/qps_worker.cc40
-rw-r--r--test/cpp/qps/server_async.cc57
-rw-r--r--test/cpp/util/grpc_cli.cc89
-rw-r--r--test/cpp/util/proto_file_parser.cc47
-rw-r--r--test/cpp/util/proto_file_parser.h11
-rw-r--r--test/cpp/util/proto_reflection_descriptor_database.cc24
-rw-r--r--test/cpp/util/proto_reflection_descriptor_database.h23
-rwxr-xr-xtest/distrib/python/run_distrib_test.sh55
29 files changed, 592 insertions, 433 deletions
diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c
index cca75f54a5..ab80adf0e0 100644
--- a/test/core/end2end/bad_server_response_test.c
+++ b/test/core/end2end/bad_server_response_test.c
@@ -71,6 +71,8 @@
#define UNPARSEABLE_DETAIL_MSG "Failed parsing HTTP/2"
+#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server"
+
/* TODO(zyc) Check the content of incomming data instead of using this length */
#define EXPECTED_INCOMING_DATA_LENGTH (size_t)310
@@ -334,7 +336,7 @@ int main(int argc, char **argv) {
/* http1 response */
run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE,
- UNPARSEABLE_DETAIL_MSG);
+ HTTP1_DETAIL_MSG);
return 0;
}
diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c
index 50e3c9cb89..db45f5eb5a 100644
--- a/test/core/end2end/tests/high_initial_seqno.c
+++ b/test/core/end2end/tests/high_initial_seqno.c
@@ -203,6 +203,12 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
grpc_call_destroy(c);
grpc_call_destroy(s);
+ /* TODO(ctiller): this rate limits the test, and it should be removed when
+ retry has been implemented; until then cross-thread chatter
+ may result in some requests needing to be cancelled due to
+ seqno exhaustion. */
+ cq_verify_empty(cqv);
+
cq_verifier_destroy(cqv);
}
diff --git a/test/core/end2end/tests/network_status_change.c b/test/core/end2end/tests/network_status_change.c
index 10207844ab..39ddc13754 100644
--- a/test/core/end2end/tests/network_status_change.c
+++ b/test/core/end2end/tests/network_status_change.c
@@ -186,9 +186,10 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == error);
cq_expect_completion(cqv, tag(102), 1);
+ cq_verify(cqv);
+
// Simulate the network loss event
grpc_network_status_shutdown_all_endpoints();
- cq_verify(cqv);
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
@@ -205,7 +206,7 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) {
op++;
error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
- void shutdown_all_endpoints();
+
cq_expect_completion(cqv, tag(103), 1);
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c
index 5e86c42309..27d630623e 100644
--- a/test/core/internal_api_canaries/iomgr.c
+++ b/test/core/internal_api_canaries/iomgr.c
@@ -77,11 +77,14 @@ static void test_code(void) {
/* endpoint.h */
grpc_endpoint endpoint;
- grpc_endpoint_vtable vtable = {
- grpc_endpoint_read, grpc_endpoint_write,
- grpc_endpoint_add_to_pollset, grpc_endpoint_add_to_pollset_set,
- grpc_endpoint_shutdown, grpc_endpoint_destroy,
- grpc_endpoint_get_peer};
+ grpc_endpoint_vtable vtable = {grpc_endpoint_read,
+ grpc_endpoint_write,
+ grpc_endpoint_get_workqueue,
+ grpc_endpoint_add_to_pollset,
+ grpc_endpoint_add_to_pollset_set,
+ grpc_endpoint_shutdown,
+ grpc_endpoint_destroy,
+ grpc_endpoint_get_peer};
endpoint.vtable = &vtable;
grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL);
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 3152fb7a46..a959a7e07f 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -70,7 +70,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
g_number_of_reads++;
g_number_of_bytes_read += (int)byte_count;
- grpc_pollset_kick(g_pollset, NULL);
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
@@ -179,8 +180,10 @@ static void test_receive(int number_of_clients) {
while (g_number_of_reads == number_of_reads_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
@@ -199,7 +202,8 @@ static void test_receive(int number_of_clients) {
GPR_ASSERT(g_number_of_orphan_calls == 1);
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
grpc_pollset_destroy(p);
}
diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c
deleted file mode 100644
index 76ecfae74b..0000000000
--- a/test/core/iomgr/workqueue_test.c
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/iomgr/workqueue.h"
-
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "test/core/util/test_config.h"
-
-static gpr_mu *g_mu;
-static grpc_pollset *g_pollset;
-
-static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) {
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- gpr_mu_lock(g_mu);
- *(int *)p = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
- gpr_mu_unlock(g_mu);
-}
-
-static void test_ref_unref(void) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_workqueue *wq;
- GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create",
- grpc_workqueue_create(&exec_ctx, &wq)));
- GRPC_WORKQUEUE_REF(wq, "test");
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "test");
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void test_add_closure(void) {
- grpc_closure c;
- int done = 0;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_workqueue *wq;
- GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create",
- grpc_workqueue_create(&exec_ctx, &wq)));
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
- grpc_pollset_worker *worker = NULL;
- grpc_closure_init(&c, must_succeed, &done);
-
- grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE);
- grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset);
-
- gpr_mu_lock(g_mu);
- GPR_ASSERT(!done);
- while (!done) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(deadline.clock_type), deadline)));
- }
- gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(done);
-
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void test_flush(void) {
- grpc_closure c;
- int done = 0;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_workqueue *wq;
- GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create",
- grpc_workqueue_create(&exec_ctx, &wq)));
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
- grpc_pollset_worker *worker = NULL;
- grpc_closure_init(&c, must_succeed, &done);
-
- grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL);
- grpc_workqueue_flush(&exec_ctx, wq);
- grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset);
-
- gpr_mu_lock(g_mu);
- GPR_ASSERT(!done);
- while (!done) {
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work",
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(deadline.clock_type), deadline)));
- }
- gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(done);
-
- GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy");
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
- grpc_error *error) {
- grpc_pollset_destroy(p);
-}
-
-int main(int argc, char **argv) {
- grpc_closure destroyed;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_test_init(argc, argv);
- grpc_init();
- g_pollset = gpr_malloc(grpc_pollset_size());
- grpc_pollset_init(g_pollset, &g_mu);
-
- test_ref_unref();
- test_add_closure();
- test_flush();
-
- grpc_closure_init(&destroyed, destroy_pollset, g_pollset);
- grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
- grpc_exec_ctx_finish(&exec_ctx);
- grpc_shutdown();
-
- gpr_free(g_pollset);
- return 0;
-}
diff --git a/test/core/support/slice_test.c b/test/core/support/slice_test.c
index 0da483a321..06c364b368 100644
--- a/test/core/support/slice_test.c
+++ b/test/core/support/slice_test.c
@@ -85,6 +85,27 @@ static void test_slice_new_returns_something_sensible(void) {
gpr_slice_unref(slice);
}
+/* destroy function that sets a mark to indicate it was called. */
+static void set_mark(void *p) { *((int *)p) = 1; }
+
+static void test_slice_new_with_user_data(void) {
+ int marker = 0;
+ uint8_t buf[2];
+ gpr_slice slice;
+
+ buf[0] = 0;
+ buf[1] = 1;
+ slice = gpr_slice_new_with_user_data(buf, 2, set_mark, &marker);
+ GPR_ASSERT(marker == 0);
+ GPR_ASSERT(GPR_SLICE_LENGTH(slice) == 2);
+ GPR_ASSERT(GPR_SLICE_START_PTR(slice)[0] == 0);
+ GPR_ASSERT(GPR_SLICE_START_PTR(slice)[1] == 1);
+
+ /* unref should cause destroy function to run. */
+ gpr_slice_unref(slice);
+ GPR_ASSERT(marker == 1);
+}
+
static int do_nothing_with_len_1_calls = 0;
static void do_nothing_with_len_1(void *ignored, size_t len) {
@@ -232,6 +253,7 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_slice_malloc_returns_something_sensible();
test_slice_new_returns_something_sensible();
+ test_slice_new_with_user_data();
test_slice_new_with_len_returns_something_sensible();
for (length = 0; length < 128; length++) {
test_slice_sub_works(length);
diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c
index ed9545e9df..13e0e918fb 100644
--- a/test/core/util/mock_endpoint.c
+++ b/test/core/util/mock_endpoint.c
@@ -95,9 +95,17 @@ static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
+static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
static const grpc_endpoint_vtable vtable = {
- me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
- me_shutdown, me_destroy, me_get_peer,
+ me_read,
+ me_write,
+ me_get_workqueue,
+ me_add_to_pollset,
+ me_add_to_pollset_set,
+ me_shutdown,
+ me_destroy,
+ me_get_peer,
};
grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) {
diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c
index a39f3dd66e..7ed9e97bd6 100644
--- a/test/core/util/passthru_endpoint.c
+++ b/test/core/util/passthru_endpoint.c
@@ -140,9 +140,17 @@ static char *me_get_peer(grpc_endpoint *ep) {
return gpr_strdup("fake:mock_endpoint");
}
+static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; }
+
static const grpc_endpoint_vtable vtable = {
- me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
- me_shutdown, me_destroy, me_get_peer,
+ me_read,
+ me_write,
+ me_get_workqueue,
+ me_add_to_pollset,
+ me_add_to_pollset_set,
+ me_shutdown,
+ me_destroy,
+ me_get_peer,
};
static void half_init(half *m, passthru_endpoint *parent) {
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 6c7eae53a4..ac79fe8274 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -200,6 +200,10 @@ class Verifier {
bool spin_;
};
+bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
+ return plugin->has_sync_methods();
+}
+
// This class disables the server builder plugins that may add sync services to
// the server. If there are sync services, UnimplementedRpc test will triger
// the sync unkown rpc routine on the server side, rather than the async one
@@ -210,14 +214,9 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
void UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins)
GRPC_OVERRIDE {
- auto plugin = plugins->begin();
- while (plugin != plugins->end()) {
- if ((*plugin)->has_sync_methods()) {
- plugins->erase(plugin++);
- } else {
- plugin++;
- }
- }
+ plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
+ plugin_has_sync_methods),
+ plugins->end());
}
};
@@ -345,6 +344,31 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
+// We do not need to protect notify because the use is synchronized.
+void ServerWait(Server* server, int* notify) {
+ server->Wait();
+ *notify = 1;
+}
+TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
+ int notify = 0;
+ std::thread* wait_thread =
+ new std::thread(&ServerWait, server_.get(), &notify);
+ ResetStub();
+ SendRpc(1);
+ EXPECT_EQ(0, notify);
+ server_->Shutdown();
+ wait_thread->join();
+ EXPECT_EQ(1, notify);
+ delete wait_thread;
+}
+
+TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
+ ResetStub();
+ SendRpc(1);
+ server_->Shutdown();
+ server_->Wait();
+}
+
// Test a simple RPC using the async version of Next
TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ResetStub();
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 354a59cedd..46a58d3ac3 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -1166,6 +1166,9 @@ TEST_P(ProxyEnd2endTest, HugeResponse) {
request.mutable_param()->set_response_message_length(kResponseSize);
ClientContext context;
+ std::chrono::system_clock::time_point deadline =
+ std::chrono::system_clock::now() + std::chrono::seconds(20);
+ context.set_deadline(deadline);
Status s = stub_->Echo(&context, request, &response);
EXPECT_EQ(kResponseSize, response.message().size());
EXPECT_TRUE(s.ok());
@@ -1411,7 +1414,7 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) {
std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
std::vector<grpc::string_ref> tst =
auth_ctx->FindPropertyValues("transport_security_type");
- EXPECT_EQ(1u, tst.size());
+ ASSERT_EQ(1u, tst.size());
EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
if (GetParam().credentials_type == kTlsCredentialsType) {
EXPECT_EQ("x509_subject_alternative_name",
diff --git a/test/cpp/end2end/proto_server_reflection_test.cc b/test/cpp/end2end/proto_server_reflection_test.cc
index f8fc39b553..efbb0e1f8e 100644
--- a/test/cpp/end2end/proto_server_reflection_test.cc
+++ b/test/cpp/end2end/proto_server_reflection_test.cc
@@ -31,7 +31,6 @@
*
*/
-#include <google/protobuf/descriptor.h>
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
@@ -59,7 +58,7 @@ class ProtoServerReflectionTest : public ::testing::Test {
void SetUp() GRPC_OVERRIDE {
port_ = grpc_pick_unused_port_or_die();
- ref_desc_pool_ = google::protobuf::DescriptorPool::generated_pool();
+ ref_desc_pool_ = protobuf::DescriptorPool::generated_pool();
ServerBuilder builder;
grpc::string server_address = "localhost:" + to_string(port_);
@@ -73,7 +72,7 @@ class ProtoServerReflectionTest : public ::testing::Test {
CreateChannel(target, InsecureChannelCredentials());
stub_ = grpc::testing::EchoTestService::NewStub(channel);
desc_db_.reset(new ProtoReflectionDescriptorDatabase(channel));
- desc_pool_.reset(new google::protobuf::DescriptorPool(desc_db_.get()));
+ desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get()));
}
string to_string(const int number) {
@@ -83,15 +82,15 @@ class ProtoServerReflectionTest : public ::testing::Test {
}
void CompareService(const grpc::string& service) {
- const google::protobuf::ServiceDescriptor* service_desc =
+ const protobuf::ServiceDescriptor* service_desc =
desc_pool_->FindServiceByName(service);
- const google::protobuf::ServiceDescriptor* ref_service_desc =
+ const protobuf::ServiceDescriptor* ref_service_desc =
ref_desc_pool_->FindServiceByName(service);
EXPECT_TRUE(service_desc != nullptr);
EXPECT_TRUE(ref_service_desc != nullptr);
EXPECT_EQ(service_desc->DebugString(), ref_service_desc->DebugString());
- const google::protobuf::FileDescriptor* file_desc = service_desc->file();
+ const protobuf::FileDescriptor* file_desc = service_desc->file();
if (known_files_.find(file_desc->package() + "/" + file_desc->name()) !=
known_files_.end()) {
EXPECT_EQ(file_desc->DebugString(),
@@ -105,9 +104,9 @@ class ProtoServerReflectionTest : public ::testing::Test {
}
void CompareMethod(const grpc::string& method) {
- const google::protobuf::MethodDescriptor* method_desc =
+ const protobuf::MethodDescriptor* method_desc =
desc_pool_->FindMethodByName(method);
- const google::protobuf::MethodDescriptor* ref_method_desc =
+ const protobuf::MethodDescriptor* ref_method_desc =
ref_desc_pool_->FindMethodByName(method);
EXPECT_TRUE(method_desc != nullptr);
EXPECT_TRUE(ref_method_desc != nullptr);
@@ -122,9 +121,8 @@ class ProtoServerReflectionTest : public ::testing::Test {
return;
}
- const google::protobuf::Descriptor* desc =
- desc_pool_->FindMessageTypeByName(type);
- const google::protobuf::Descriptor* ref_desc =
+ const protobuf::Descriptor* desc = desc_pool_->FindMessageTypeByName(type);
+ const protobuf::Descriptor* ref_desc =
ref_desc_pool_->FindMessageTypeByName(type);
EXPECT_TRUE(desc != nullptr);
EXPECT_TRUE(ref_desc != nullptr);
@@ -135,10 +133,10 @@ class ProtoServerReflectionTest : public ::testing::Test {
std::unique_ptr<Server> server_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ProtoReflectionDescriptorDatabase> desc_db_;
- std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_;
+ std::unique_ptr<protobuf::DescriptorPool> desc_pool_;
std::unordered_set<string> known_files_;
std::unordered_set<string> known_types_;
- const google::protobuf::DescriptorPool* ref_desc_pool_;
+ const protobuf::DescriptorPool* ref_desc_pool_;
int port_;
reflection::ProtoServerReflectionPlugin plugin_;
};
diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc
index 778a2be573..b967a5d1e9 100644
--- a/test/cpp/end2end/server_builder_plugin_test.cc
+++ b/test/cpp/end2end/server_builder_plugin_test.cc
@@ -191,7 +191,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
// we run some tests without a service, and for those we need to supply a
// frequently polled completion queue
cq_ = builder_->AddCompletionQueue();
- cq_thread_ = grpc::thread(std::bind(&ServerBuilderPluginTest::RunCQ, this));
+ cq_thread_ = new grpc::thread(&ServerBuilderPluginTest::RunCQ, this);
server_ = builder_->BuildAndStart();
EXPECT_TRUE(CheckPresent());
}
@@ -209,7 +209,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
EXPECT_TRUE(plugin->finish_is_called());
server_->Shutdown();
cq_->Shutdown();
- cq_thread_.join();
+ cq_thread_->join();
+ delete cq_thread_;
}
string to_string(const int number) {
@@ -224,7 +225,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<Server> server_;
- grpc::thread cq_thread_;
+ grpc::thread* cq_thread_;
TestServiceImpl service_;
int port_;
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 047bd16408..4045e13460 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> {
}
};
+class HistogramEntry GRPC_FINAL {
+ public:
+ HistogramEntry() : used_(false) {}
+ bool used() const { return used_; }
+ double value() const { return value_; }
+ void set_value(double v) {
+ used_ = true;
+ value_ = v;
+ }
+
+ private:
+ bool used_;
+ double value_;
+};
+
class Client {
public:
Client() : timer_(new UsageTimer), interarrival_timer_() {}
@@ -151,10 +166,21 @@ class Client {
return stats;
}
+ // Must call AwaitThreadsCompletion before destructor to avoid a race
+ // between destructor and invocation of virtual ThreadFunc
+ void AwaitThreadsCompletion() {
+ DestroyMultithreading();
+ std::unique_lock<std::mutex> g(thread_completion_mu_);
+ while (threads_remaining_ != 0) {
+ threads_complete_.wait(g);
+ }
+ }
+
protected:
bool closed_loop_;
void StartThreads(size_t num_threads) {
+ threads_remaining_ = num_threads;
for (size_t i = 0; i < num_threads; i++) {
threads_.emplace_back(new Thread(this, i));
}
@@ -162,7 +188,8 @@ class Client {
void EndThreads() { threads_.clear(); }
- virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0;
+ virtual void DestroyMultithreading() = 0;
+ virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -215,7 +242,6 @@ class Client {
public:
Thread(Client* client, size_t idx)
: done_(false),
- new_stats_(nullptr),
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
@@ -230,15 +256,10 @@ class Client {
void BeginSwap(Histogram* n) {
std::lock_guard<std::mutex> g(mu_);
- new_stats_ = n;
+ n->Swap(&histogram_);
}
- void EndSwap() {
- std::unique_lock<std::mutex> g(mu_);
- while (new_stats_ != nullptr) {
- cv_.wait(g);
- };
- }
+ void EndSwap() {}
void MergeStatsInto(Histogram* hist) {
std::unique_lock<std::mutex> g(mu_);
@@ -252,29 +273,26 @@ class Client {
void ThreadFunc() {
for (;;) {
// run the loop body
- const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
- // lock, see if we're done
+ HistogramEntry entry;
+ const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
+ // lock, update histogram if needed and see if we're done
std::lock_guard<std::mutex> g(mu_);
+ if (entry.used()) {
+ histogram_.Add(entry.value());
+ }
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
+ client_->CompleteThread();
return;
}
- // check if we're resetting stats, swap out the histogram if so
- if (new_stats_) {
- new_stats_->Swap(&histogram_);
- new_stats_ = nullptr;
- cv_.notify_one();
- }
}
}
std::mutex mu_;
- std::condition_variable cv_;
bool done_;
- Histogram* new_stats_;
Histogram histogram_;
Client* client_;
const size_t idx_;
@@ -286,6 +304,18 @@ class Client {
InterarrivalTimer interarrival_timer_;
std::vector<gpr_timespec> next_time_;
+
+ std::mutex thread_completion_mu_;
+ size_t threads_remaining_;
+ std::condition_variable threads_complete_;
+
+ void CompleteThread() {
+ std::lock_guard<std::mutex> g(thread_completion_mu_);
+ threads_remaining_--;
+ if (threads_remaining_ == 0) {
+ threads_complete_.notify_all();
+ }
+ }
};
template <class StubType, class RequestType>
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 1507d1e3d6..5d9cb4bd0c 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <forward_list>
#include <functional>
#include <list>
@@ -48,7 +47,6 @@
#include <grpc++/generic/generic_stub.h>
#include <grpc/grpc.h>
#include <grpc/support/cpu.h>
-#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include "src/proto/grpc/testing/services.grpc.pb.h"
@@ -64,7 +62,7 @@ class ClientRpcContext {
ClientRpcContext() {}
virtual ~ClientRpcContext() {}
// next state, return false if done. Collect stats when appropriate
- virtual bool RunNextState(bool, Histogram* hist) = 0;
+ virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
virtual ClientRpcContext* StartNewClone() = 0;
static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
static ClientRpcContext* detag(void* t) {
@@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this)));
}
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
switch (next_state_) {
case State::READY:
start_ = UsageTimer::Now();
@@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext {
next_state_ = State::RESP_DONE;
return true;
case State::RESP_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::INVALID;
return false;
@@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
for (int i = 0; i < num_async_threads_; i++) {
cli_cqs_.emplace_back(new CompletionQueue);
next_issuers_.emplace_back(NextIssuer(i));
+ shutdown_state_.emplace_back(new PerThreadShutdownState());
}
using namespace std::placeholders;
@@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
virtual ~AsyncClient() {
for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
- (*cq)->Shutdown();
void* got_tag;
bool ok;
while ((*cq)->Next(&got_tag, &ok)) {
@@ -201,32 +199,16 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
}
- bool ThreadFunc(Histogram* histogram,
- size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
- void* got_tag;
- bool ok;
-
- if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) {
- // Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
- if (!ctx->RunNextState(ok, histogram)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- auto clone = ctx->StartNewClone();
- clone->Start(cli_cqs_[thread_idx].get());
- // delete the old version
- delete ctx;
- }
- return true;
- } else { // queue is shutting down
- return false;
- }
- }
-
protected:
const int num_async_threads_;
private:
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
+ };
+
int NumThreads(const ClientConfig& config) {
int num_threads = config.async_client_threads();
if (num_threads <= 0) { // Use dynamic sizing
@@ -235,9 +217,60 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
}
return num_threads;
}
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL {
+ for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
+ (*cq)->Shutdown();
+ }
+ this->EndThreads(); // this needed for resolution
+ }
+
+ bool ThreadFunc(HistogramEntry* entry,
+ size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
+ void* got_tag;
+ bool ok;
+
+ switch (cli_cqs_[thread_idx]->AsyncNext(
+ &got_tag, &ok,
+ std::chrono::system_clock::now() + std::chrono::milliseconds(10))) {
+ case CompletionQueue::GOT_EVENT: {
+ // Got a regular event, so process it
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ return true;
+ } else if (!ctx->RunNextState(ok, entry)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ auto clone = ctx->StartNewClone();
+ clone->Start(cli_cqs_[thread_idx].get());
+ // delete the old version
+ delete ctx;
+ }
+ return true;
+ }
+ case CompletionQueue::TIMEOUT: {
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
+ return true;
+ }
+ return true;
+ }
+ case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be
+ // done
+ return true;
+ }
+ GPR_UNREACHABLE_CODE(return true);
+ }
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
std::vector<std::function<gpr_timespec()>> next_issuers_;
+ std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
@@ -253,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL
config, SetupCtx, BenchmarkStubCreator) {
StartThreads(num_async_threads_);
}
- ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncUnaryClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -298,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@@ -330,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@@ -382,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~AsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
@@ -430,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
ClientRpcContext::tag(this));
next_state_ = State::STREAM_IDLE;
}
- bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
+ bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE {
while (true) {
switch (next_state_) {
case State::STREAM_IDLE:
@@ -462,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
return true;
break;
case State::READ_DONE:
- hist->Add((UsageTimer::Now() - start_) * 1e9);
+ entry->set_value((UsageTimer::Now() - start_) * 1e9);
callback_(status_, &response_);
next_state_ = State::STREAM_IDLE;
break; // loop around
@@ -518,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL
StartThreads(num_async_threads_);
}
- ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
+ ~GenericAsyncStreamingClient() GRPC_OVERRIDE {}
private:
static void CheckDone(grpc::Status s, ByteBuffer* response) {}
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index c88e95b80e..25c7823553 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -31,7 +31,6 @@
*
*/
-#include <cassert>
#include <chrono>
#include <memory>
#include <mutex>
@@ -46,7 +45,6 @@
#include <grpc++/server_builder.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
-#include <grpc/support/histogram.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -55,7 +53,6 @@
#include "src/core/lib/profiling/timers.h"
#include "src/proto/grpc/testing/services.grpc.pb.h"
#include "test/cpp/qps/client.h"
-#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/usage_timer.h"
@@ -90,6 +87,9 @@ class SynchronousClient
size_t num_threads_;
std::vector<SimpleResponse> responses_;
+
+ private:
+ void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); }
};
class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
@@ -98,9 +98,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
: SynchronousClient(config) {
StartThreads(num_threads_);
}
- ~SynchronousUnaryClient() { EndThreads(); }
+ ~SynchronousUnaryClient() {}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
double start = UsageTimer::Now();
@@ -108,7 +108,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
grpc::ClientContext context;
grpc::Status s =
stub->UnaryCall(&context, request_, &responses_[thread_idx]);
- histogram->Add((UsageTimer::Now() - start) * 1e9);
+ entry->set_value((UsageTimer::Now() - start) * 1e9);
return s.ok();
}
};
@@ -127,25 +127,29 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
StartThreads(num_threads_);
}
~SynchronousStreamingClient() {
- EndThreads();
- for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
- stream++) {
+ for (size_t i = 0; i < num_threads_; i++) {
+ auto stream = &stream_[i];
if (*stream) {
(*stream)->WritesDone();
- EXPECT_TRUE((*stream)->Finish().ok());
+ Status s = (*stream)->Finish();
+ EXPECT_TRUE(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Stream %zu received an error %s", i,
+ s.error_message().c_str());
+ }
}
}
delete[] stream_;
delete[] context_;
}
- bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
+ bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE {
WaitToIssue(thread_idx);
GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0);
double start = UsageTimer::Now();
if (stream_[thread_idx]->Write(request_) &&
stream_[thread_idx]->Read(&responses_[thread_idx])) {
- histogram->Add((UsageTimer::Now() - start) * 1e9);
+ entry->set_value((UsageTimer::Now() - start) * 1e9);
return true;
}
return false;
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index 08bf045883..2aeaea51f2 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores(
CoreRequest dummy;
CoreResponse cores;
grpc::Status s = stub->CoreCount(&ctx, dummy, &cores);
- assert(s.ok());
+ GPR_ASSERT(s.ok());
std::deque<int> dq;
for (int i = 0; i < cores.cores(); i++) {
dq.push_back(i);
@@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = server_config;
servers[i].stream =
servers[i].stub->RunServer(runsc::AllocContext(&contexts));
- GPR_ASSERT(servers[i].stream->Write(args));
+ if (!servers[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
+ }
ServerStatus init_status;
- GPR_ASSERT(servers[i].stream->Read(&init_status));
+ if (!servers[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
+ }
gpr_join_host_port(&cli_target, host, init_status.port());
client_config.add_server_targets(cli_target);
gpr_free(host);
@@ -345,9 +349,13 @@ std::unique_ptr<ScenarioResult> RunScenario(
*args.mutable_setup() = per_client_config;
clients[i].stream =
clients[i].stub->RunClient(runsc::AllocContext(&contexts));
- GPR_ASSERT(clients[i].stream->Write(args));
+ if (!clients[i].stream->Write(args)) {
+ gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
+ }
ClientStatus init_status;
- GPR_ASSERT(clients[i].stream->Read(&init_status));
+ if (!clients[i].stream->Read(&init_status)) {
+ gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
+ }
}
// Let everything warmup
@@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark()->set_reset(true);
ClientArgs client_mark;
client_mark.mutable_mark()->set_reset(true);
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
}
ServerStatus server_status;
ClientStatus client_status;
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Read(&server_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Read(&client_status)) {
+ gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
+ }
}
// Wait some time
@@ -390,37 +410,73 @@ std::unique_ptr<ScenarioResult> RunScenario(
Histogram merged_latencies;
gpr_log(GPR_INFO, "Finishing clients");
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Write(client_mark));
- GPR_ASSERT(client->stream->WritesDone());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ if (!client->stream->Write(client_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
+ }
+ if (!client->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Read(&client_status));
- const auto& stats = client_status.stats();
- merged_latencies.MergeProto(stats.latencies());
- result->add_client_stats()->CopyFrom(stats);
- GPR_ASSERT(!client->stream->Read(&client_status));
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ // Read the client final status
+ if (client->stream->Read(&client_status)) {
+ gpr_log(GPR_INFO, "Received final status from client %zu", i);
+ const auto& stats = client_status.stats();
+ merged_latencies.MergeProto(stats.latencies());
+ result->add_client_stats()->CopyFrom(stats);
+ // That final status should be the last message on the client stream
+ GPR_ASSERT(!client->stream->Read(&client_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
+ }
}
- for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
- GPR_ASSERT(client->stream->Finish().ok());
+ for (size_t i = 0; i < num_clients; i++) {
+ auto client = &clients[i];
+ Status s = client->stream->Finish();
+ result->add_client_success(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] clients;
merged_latencies.FillProto(result->mutable_latencies());
gpr_log(GPR_INFO, "Finishing servers");
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Write(server_mark));
- GPR_ASSERT(server->stream->WritesDone());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ if (!server->stream->Write(server_mark)) {
+ gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
+ }
+ if (!server->stream->WritesDone()) {
+ gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Read(&server_status));
- result->add_server_stats()->CopyFrom(server_status.stats());
- result->add_server_cores(server_status.cores());
- GPR_ASSERT(!server->stream->Read(&server_status));
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ // Read the server final status
+ if (server->stream->Read(&server_status)) {
+ gpr_log(GPR_INFO, "Received final status from server %zu", i);
+ result->add_server_stats()->CopyFrom(server_status.stats());
+ result->add_server_cores(server_status.cores());
+ // That final status should be the last message on the server stream
+ GPR_ASSERT(!server->stream->Read(&server_status));
+ } else {
+ gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
+ }
}
- for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
- GPR_ASSERT(server->stream->Finish().ok());
+ for (size_t i = 0; i < num_servers; i++) {
+ auto server = &servers[i];
+ Status s = server->stream->Finish();
+ result->add_server_success(s.ok());
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
+ s.error_message().c_str());
+ }
}
delete[] servers;
@@ -429,8 +485,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
return result;
}
-void RunQuit() {
+bool RunQuit() {
// Get client, server lists
+ bool result = true;
auto workers = get_workers("QPS_WORKERS");
for (size_t i = 0; i < workers.size(); i++) {
auto stub = WorkerService::NewStub(
@@ -438,8 +495,14 @@ void RunQuit() {
Void dummy;
grpc::ClientContext ctx;
ctx.set_fail_fast(false);
- GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok());
+ Status s = stub->QuitWorker(&ctx, dummy, &dummy);
+ if (!s.ok()) {
+ gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
+ s.error_message().c_str());
+ result = false;
+ }
}
+ return result;
}
} // namespace testing
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index 3a5cf138f1..93f4370caf 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -47,7 +47,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count);
-void RunQuit();
+bool RunQuit();
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py
index 34b8151441..4ff4e44b8b 100755
--- a/test/cpp/qps/gen_build_yaml.py
+++ b/test/cpp/qps/gen_build_yaml.py
@@ -45,9 +45,10 @@ import performance.scenario_config as scenario_config
def _scenario_json_string(scenario_json):
# tweak parameters to get fast test times
- scenario_json['warmup_seconds'] = 1
+ scenario_json['warmup_seconds'] = 0
scenario_json['benchmark_seconds'] = 1
- return json.dumps(scenario_config.remove_nonproto_fields(scenario_json))
+ scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]}
+ return json.dumps(scenarios_json)
def threads_of_type(scenario_json, path):
d = scenario_json
@@ -72,8 +73,7 @@ print yaml.dump({
{
'name': 'json_run_localhost',
'shortname': 'json_run_localhost:%s' % scenario_json['name'],
- 'args': ['--scenario_json',
- pipes.quote(_scenario_json_string(scenario_json))],
+ 'args': ['--scenarios_json', _scenario_json_string(scenario_json)],
'ci_platforms': ['linux', 'mac', 'posix', 'windows'],
'platforms': ['linux', 'mac', 'posix', 'windows'],
'flaky': False,
@@ -81,7 +81,8 @@ print yaml.dump({
'boringssl': True,
'defaults': 'boringssl',
'cpu_cost': guess_cpu(scenario_json),
- 'exclude_configs': []
+ 'exclude_configs': [],
+ 'timeout_seconds': 3*60
}
for scenario_json in scenario_config.CXXLanguage().scenarios()
]
diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc
index 6545dc2917..74e40fbf1a 100644
--- a/test/cpp/qps/json_run_localhost.cc
+++ b/test/cpp/qps/json_run_localhost.cc
@@ -75,7 +75,7 @@ int main(int argc, char** argv) {
for (int i = 1; i < argc; i++) {
args.push_back(argv[i]);
}
- SubProcess(args).Join();
+ GPR_ASSERT(SubProcess(args).Join() == 0);
for (auto it = jobs.begin(); it != jobs.end(); ++it) {
(*it)->Interrupt();
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index f5d739f893..1524ebbc38 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers");
namespace grpc {
namespace testing {
-static void QpsDriver() {
+static bool QpsDriver() {
grpc::string json;
bool scfile = (FLAGS_scenarios_file != "");
@@ -81,13 +81,13 @@ static void QpsDriver() {
} else if (scjson) {
json = FLAGS_scenarios_json.c_str();
} else if (FLAGS_quit) {
- RunQuit();
- return;
+ return RunQuit();
}
// Parse into an array of scenarios
Scenarios scenarios;
ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios);
+ bool success = true;
// Make sure that there is at least some valid scenario here
GPR_ASSERT(scenarios.scenarios_size() > 0);
@@ -109,7 +109,15 @@ static void QpsDriver() {
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
+
+ for (int i = 0; success && i < result->client_success_size(); i++) {
+ success = result->client_success(i);
+ }
+ for (int i = 0; success && i < result->server_success_size(); i++) {
+ success = result->server_success(i);
+ }
}
+ return success;
}
} // namespace testing
@@ -118,7 +126,7 @@ static void QpsDriver() {
int main(int argc, char **argv) {
grpc::testing::InitBenchmark(&argc, &argv, true);
- grpc::testing::QpsDriver();
+ bool ok = grpc::testing::QpsDriver();
- return 0;
+ return ok ? 0 : 1;
}
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index f514e23e85..d3e53fe14a 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -33,7 +33,6 @@
#include "test/cpp/qps/qps_worker.h"
-#include <cassert>
#include <memory>
#include <mutex>
#include <sstream>
@@ -124,11 +123,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy");
}
ScopedProfile profile("qps_client.prof", false);
Status ret = RunClientBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunClient: Returning");
return ret;
}
@@ -137,11 +137,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy");
}
ScopedProfile profile("qps_server.prof", false);
Status ret = RunServerBody(ctx, stream);
+ gpr_log(GPR_INFO, "RunServer: Returning");
return ret;
}
@@ -154,7 +155,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE {
InstanceGuard g(this);
if (!g.Acquired()) {
- return Status(StatusCode::RESOURCE_EXHAUSTED, "");
+ return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy");
}
worker_->MarkDone();
@@ -197,33 +198,38 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ClientStatus, ClientArgs>* stream) {
ClientArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg");
}
gpr_log(GPR_INFO, "RunClientBody: about to create client");
auto client = CreateClient(args.setup());
if (!client) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client");
}
gpr_log(GPR_INFO, "RunClientBody: client created");
ClientStatus status;
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Client couldn't report init status");
}
gpr_log(GPR_INFO, "RunClientBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunClientBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = client->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunClientBody: Mark response given");
}
+ gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion");
+ client->AwaitThreadsCompletion();
+
gpr_log(GPR_INFO, "RunClientBody: Returning");
return Status::OK;
}
@@ -232,10 +238,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
ServerReaderWriter<ServerStatus, ServerArgs>* stream) {
ServerArgs args;
if (!stream->Read(&args)) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args");
}
if (!args.has_setup()) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args");
}
if (server_port_ != 0) {
args.mutable_setup()->set_port(server_port_);
@@ -243,24 +249,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service {
gpr_log(GPR_INFO, "RunServerBody: about to create server");
auto server = CreateServer(args.setup());
if (!server) {
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server");
}
gpr_log(GPR_INFO, "RunServerBody: server created");
ServerStatus status;
status.set_port(server->port());
status.set_cores(server->cores());
if (!stream->Write(status)) {
- return Status(StatusCode::UNKNOWN, "");
+ return Status(StatusCode::UNKNOWN, "Server couldn't report init status");
}
gpr_log(GPR_INFO, "RunServerBody: creation status reported");
while (stream->Read(&args)) {
gpr_log(GPR_INFO, "RunServerBody: Message read");
if (!args.has_mark()) {
gpr_log(GPR_INFO, "RunServerBody: Message not a mark!");
- return Status(StatusCode::INVALID_ARGUMENT, "");
+ return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark");
}
*status.mutable_stats() = server->Mark(args.mark().reset());
- stream->Write(status);
+ if (!stream->Write(status)) {
+ return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark");
+ }
gpr_log(GPR_INFO, "RunServerBody: Mark response given");
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index c9954d0d02..dea8746331 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -102,7 +102,7 @@ class AsyncQpsServerTest : public Server {
auto process_rpc_bound =
std::bind(process_rpc, config.payload_config(), _1, _2);
- for (int i = 0; i < 10000 / num_threads; i++) {
+ for (int i = 0; i < 15000; i++) {
for (int j = 0; j < num_threads; j++) {
if (request_unary_function) {
auto request_unary =
@@ -123,21 +123,24 @@ class AsyncQpsServerTest : public Server {
for (int i = 0; i < num_threads; i++) {
shutdown_state_.emplace_back(new PerThreadShutdownState());
- }
- for (int i = 0; i < num_threads; i++) {
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
- (*ss)->set_shutdown();
+ std::lock_guard<std::mutex> lock((*ss)->mutex);
+ (*ss)->shutdown = true;
+ }
+ // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
+ auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
+ server_->Shutdown(deadline);
+ for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
+ (*cq)->Shutdown();
}
- server_->Shutdown();
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
}
for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
- (*cq)->Shutdown();
bool ok;
void *got_tag;
while ((*cq)->Next(&got_tag, &ok))
@@ -150,22 +153,24 @@ class AsyncQpsServerTest : public Server {
}
private:
- void ThreadFunc(int rank) {
+ void ThreadFunc(int thread_idx) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
+ while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
- const bool still_going = ctx->RunNextState(ok);
- if (!shutdown_state_[rank]->shutdown()) {
- // this RPC context is done, so refresh it
- if (!still_going) {
- ctx->Reset();
- }
- } else {
+ // Proceed while holding a lock to make sure that
+ // this thread isn't supposed to shut down
+ std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ if (shutdown_state_[thread_idx]->shutdown) {
return;
}
+ const bool still_going = ctx->RunNextState(ok);
+ // if this RPC context is done, refresh it
+ if (!still_going) {
+ ctx->Reset();
+ }
}
return;
}
@@ -333,24 +338,12 @@ class AsyncQpsServerTest : public Server {
ServiceType async_service_;
std::forward_list<ServerRpcContext *> contexts_;
- class PerThreadShutdownState {
- public:
- PerThreadShutdownState() : shutdown_(false) {}
-
- bool shutdown() const {
- std::lock_guard<std::mutex> lock(mutex_);
- return shutdown_;
- }
-
- void set_shutdown() {
- std::lock_guard<std::mutex> lock(mutex_);
- shutdown_ = true;
- }
-
- private:
- mutable std::mutex mutex_;
- bool shutdown_;
+ struct PerThreadShutdownState {
+ mutable std::mutex mutex;
+ bool shutdown;
+ PerThreadShutdownState() : shutdown(false) {}
};
+
std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc
index c52e48bae6..53529da782 100644
--- a/test/cpp/util/grpc_cli.cc
+++ b/test/cpp/util/grpc_cli.cc
@@ -34,18 +34,21 @@
/*
A command line tool to talk to a grpc server.
Example of talking to grpc interop server:
- grpc_cli call localhost:50051 UnaryCall src/proto/grpc/testing/test.proto \
- "response_size:10" --enable_ssl=false
+ grpc_cli call localhost:50051 UnaryCall "response_size:10" \
+ --protofiles=src/proto/grpc/testing/test.proto --enable_ssl=false
Options:
- 1. --proto_path, if your proto file is not under current working directory,
+ 1. --protofiles, use this flag to provide a proto file if the server does
+ does not have the reflection service.
+ 2. --proto_path, if your proto file is not under current working directory,
use this flag to provide a search root. It should work similar to the
- counterpart in protoc.
- 2. --metadata specifies metadata to be sent to the server, such as:
+ counterpart in protoc. This option is valid only when protofiles is
+ provided.
+ 3. --metadata specifies metadata to be sent to the server, such as:
--metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2"
- 3. --enable_ssl, whether to use tls.
- 4. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call
- 3. --input_binary_file, a file containing the serialized request. The file
+ 4. --enable_ssl, whether to use tls.
+ 5. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call
+ 6. --input_binary_file, a file containing the serialized request. The file
can be generated by calling something like:
protoc --proto_path=src/proto/grpc/testing/ \
--encode=grpc.testing.SimpleRequest \
@@ -53,7 +56,7 @@
< input.txt > input.bin
If this is used and no proto file is provided in the argument list, the
method string has to be exact in the form of /package.service/method.
- 4. --output_binary_file, a file to write binary format response into, it can
+ 7. --output_binary_file, a file to write binary format response into, it can
be later decoded using protoc:
protoc --proto_path=src/proto/grpc/testing/ \
--decode=grpc.testing.SimpleResponse \
@@ -61,6 +64,7 @@
< output.bin > output.txt
*/
+#include <unistd.h>
#include <fstream>
#include <iostream>
#include <sstream>
@@ -86,6 +90,8 @@ DEFINE_string(output_binary_file, "",
DEFINE_string(metadata, "",
"Metadata to send to server, in the form of key1:val1:key2:val2");
DEFINE_string(proto_path, ".", "Path to look for the proto file.");
+// TODO(zyc): support a list of input proto files
+DEFINE_string(protofiles, "", "Name of the proto file.");
void ParseMetadataFlag(
std::multimap<grpc::string, grpc::string>* client_metadata) {
@@ -129,35 +135,61 @@ void PrintMetadata(const T& m, const grpc::string& message) {
int main(int argc, char** argv) {
grpc::testing::InitTest(&argc, &argv, true);
- if (argc < 4 || argc == 5 || grpc::string(argv[1]) != "call") {
+ if (argc < 4 || grpc::string(argv[1]) != "call") {
std::cout << "Usage: grpc_cli call server_host:port method_name "
<< "[proto file] [text format request] [<options>]" << std::endl;
+ return 1;
}
- grpc::string file_name;
grpc::string request_text;
grpc::string server_address(argv[2]);
grpc::string method_name(argv[3]);
std::unique_ptr<grpc::testing::ProtoFileParser> parser;
grpc::string serialized_request_proto;
- if (argc == 6) {
- file_name = argv[4];
- // TODO(yangg) read from stdin as well?
- request_text = argv[5];
+ if (argc == 5) {
+ request_text = argv[4];
+ }
+
+ std::shared_ptr<grpc::ChannelCredentials> creds;
+ if (!FLAGS_enable_ssl) {
+ creds = grpc::InsecureChannelCredentials();
+ } else {
+ if (FLAGS_use_auth) {
+ creds = grpc::GoogleDefaultCredentials();
+ } else {
+ creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
+ }
}
+ std::shared_ptr<grpc::Channel> channel =
+ grpc::CreateChannel(server_address, creds);
if (request_text.empty() && FLAGS_input_binary_file.empty()) {
- std::cout << "Missing input. Use text format input or "
- << "--input_binary_file for serialized request" << std::endl;
- return 1;
- } else if (!request_text.empty()) {
- parser.reset(new grpc::testing::ProtoFileParser(FLAGS_proto_path, file_name,
- method_name));
+ if (isatty(STDIN_FILENO)) {
+ std::cout << "reading request message from stdin..." << std::endl;
+ }
+ std::stringstream input_stream;
+ input_stream << std::cin.rdbuf();
+ request_text = input_stream.str();
+ }
+
+ if (!request_text.empty()) {
+ if (!FLAGS_protofiles.empty()) {
+ parser.reset(new grpc::testing::ProtoFileParser(
+ FLAGS_proto_path, FLAGS_protofiles, method_name));
+ } else {
+ parser.reset(new grpc::testing::ProtoFileParser(channel, method_name));
+ }
method_name = parser->GetFullMethodName();
if (parser->HasError()) {
return 1;
}
+
+ if (!FLAGS_input_binary_file.empty()) {
+ std::cout
+ << "warning: request given in argv, ignoring --input_binary_file"
+ << std::endl;
+ }
}
if (parser) {
@@ -175,19 +207,6 @@ int main(int argc, char** argv) {
}
std::cout << "connecting to " << server_address << std::endl;
- std::shared_ptr<grpc::ChannelCredentials> creds;
- if (!FLAGS_enable_ssl) {
- creds = grpc::InsecureChannelCredentials();
- } else {
- if (FLAGS_use_auth) {
- creds = grpc::GoogleDefaultCredentials();
- } else {
- creds = grpc::SslCredentials(grpc::SslCredentialsOptions());
- }
- }
- std::shared_ptr<grpc::Channel> channel =
- grpc::CreateChannel(server_address, creds);
-
grpc::string serialized_response_proto;
std::multimap<grpc::string, grpc::string> client_metadata;
std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
@@ -219,7 +238,7 @@ int main(int argc, char** argv) {
}
} else {
std::cout << "Rpc failed with status code " << s.error_code()
- << " error message " << s.error_message() << std::endl;
+ << ", error message: " << s.error_message() << std::endl;
}
return 0;
diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc
index 25aec329eb..5b0d925e1c 100644
--- a/test/cpp/util/proto_file_parser.cc
+++ b/test/cpp/util/proto_file_parser.cc
@@ -95,9 +95,48 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path,
dynamic_factory_.reset(
new google::protobuf::DynamicMessageFactory(importer_->pool()));
+ std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list;
+ for (int i = 0; i < file_desc->service_count(); i++) {
+ service_desc_list.push_back(file_desc->service(i));
+ }
+ InitProtoFileParser(method, service_desc_list);
+}
+
+ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method)
+ : has_error_(false),
+ desc_db_(new grpc::ProtoReflectionDescriptorDatabase(channel)),
+ desc_pool_(new google::protobuf::DescriptorPool(desc_db_.get())) {
+ std::vector<std::string> service_list;
+ if (!desc_db_->GetServices(&service_list)) {
+ LogError(
+ "Failed to get services from the server, "
+ "it may not have the reflection service.\n"
+ "Please try to use the --protofiles option to provide a proto file.");
+ }
+ if (has_error_) {
+ return;
+ }
+ dynamic_factory_.reset(
+ new google::protobuf::DynamicMessageFactory(desc_pool_.get()));
+
+ std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list;
+ for (auto it = service_list.begin(); it != service_list.end(); it++) {
+ service_desc_list.push_back(desc_pool_->FindServiceByName(*it));
+ }
+ InitProtoFileParser(method, service_desc_list);
+}
+
+ProtoFileParser::~ProtoFileParser() {}
+
+void ProtoFileParser::InitProtoFileParser(
+ const grpc::string& method,
+ const std::vector<const google::protobuf::ServiceDescriptor*>
+ service_desc_list) {
const google::protobuf::MethodDescriptor* method_descriptor = nullptr;
- for (int i = 0; !method_descriptor && i < file_desc->service_count(); i++) {
- const auto* service_desc = file_desc->service(i);
+ for (auto it = service_desc_list.begin(); it != service_desc_list.end();
+ it++) {
+ const auto* service_desc = *it;
for (int j = 0; j < service_desc->method_count(); j++) {
const auto* method_desc = service_desc->method(j);
if (MethodNameMatch(method_desc->full_name(), method)) {
@@ -130,8 +169,6 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path,
dynamic_factory_->GetPrototype(method_descriptor->output_type())->New());
}
-ProtoFileParser::~ProtoFileParser() {}
-
grpc::string ProtoFileParser::GetSerializedProto(
const grpc::string& text_format_proto, bool is_request) {
grpc::string serialized;
@@ -143,7 +180,7 @@ grpc::string ProtoFileParser::GetSerializedProto(
LogError("Failed to parse text format to proto.");
return "";
}
- ok = request_prototype_->SerializeToString(&serialized);
+ ok = msg->SerializeToString(&serialized);
if (!ok) {
LogError("Failed to serialize proto.");
return "";
diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h
index 46cdd66503..b442d77db9 100644
--- a/test/cpp/util/proto_file_parser.h
+++ b/test/cpp/util/proto_file_parser.h
@@ -38,8 +38,10 @@
#include <google/protobuf/compiler/importer.h>
#include <google/protobuf/dynamic_message.h>
+#include <grpc++/channel.h>
#include "src/compiler/config.h"
+#include "test/cpp/util/proto_reflection_descriptor_database.h"
namespace grpc {
namespace testing {
@@ -53,6 +55,9 @@ class ProtoFileParser {
// even just Method. It will log an error if there is ambiguity.
ProtoFileParser(const grpc::string& proto_path, const grpc::string& file_name,
const grpc::string& method);
+
+ ProtoFileParser(std::shared_ptr<grpc::Channel> channel,
+ const grpc::string& method);
~ProtoFileParser();
grpc::string GetFullMethodName() const { return full_method_name_; }
@@ -68,12 +73,18 @@ class ProtoFileParser {
void LogError(const grpc::string& error_msg);
private:
+ void InitProtoFileParser(
+ const grpc::string& method,
+ const std::vector<const google::protobuf::ServiceDescriptor*> services);
+
bool has_error_;
grpc::string request_text_;
grpc::string full_method_name_;
google::protobuf::compiler::DiskSourceTree source_tree_;
std::unique_ptr<ErrorPrinter> error_printer_;
std::unique_ptr<google::protobuf::compiler::Importer> importer_;
+ std::unique_ptr<grpc::ProtoReflectionDescriptorDatabase> desc_db_;
+ std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_;
std::unique_ptr<google::protobuf::DynamicMessageFactory> dynamic_factory_;
std::unique_ptr<grpc::protobuf::Message> request_prototype_;
std::unique_ptr<grpc::protobuf::Message> response_prototype_;
diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc
index 25b720aee0..8fd466feb0 100644
--- a/test/cpp/util/proto_reflection_descriptor_database.cc
+++ b/test/cpp/util/proto_reflection_descriptor_database.cc
@@ -53,10 +53,20 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase(
std::shared_ptr<grpc::Channel> channel)
: stub_(ServerReflection::NewStub(channel)) {}
-ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {}
+ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {
+ if (stream_) {
+ stream_->WritesDone();
+ Status status = stream_->Finish();
+ if (!status.ok()) {
+ gpr_log(GPR_ERROR,
+ "ServerReflectionInfo rpc failed. Error code: %d, details: %s",
+ (int)status.error_code(), status.error_message().c_str());
+ }
+ }
+}
bool ProtoReflectionDescriptorDatabase::FindFileByName(
- const string& filename, google::protobuf::FileDescriptorProto* output) {
+ const string& filename, protobuf::FileDescriptorProto* output) {
if (cached_db_.FindFileByName(filename, output)) {
return true;
}
@@ -101,7 +111,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileByName(
}
bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol(
- const string& symbol_name, google::protobuf::FileDescriptorProto* output) {
+ const string& symbol_name, protobuf::FileDescriptorProto* output) {
if (cached_db_.FindFileContainingSymbol(symbol_name, output)) {
return true;
}
@@ -148,7 +158,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol(
bool ProtoReflectionDescriptorDatabase::FindFileContainingExtension(
const string& containing_type, int field_number,
- google::protobuf::FileDescriptorProto* output) {
+ protobuf::FileDescriptorProto* output) {
if (cached_db_.FindFileContainingExtension(containing_type, field_number,
output)) {
return true;
@@ -276,10 +286,10 @@ bool ProtoReflectionDescriptorDatabase::GetServices(
return false;
}
-const google::protobuf::FileDescriptorProto
+const protobuf::FileDescriptorProto
ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse(
const std::string& byte_fd_proto) {
- google::protobuf::FileDescriptorProto file_desc_proto;
+ protobuf::FileDescriptorProto file_desc_proto;
file_desc_proto.ParseFromString(byte_fd_proto);
return file_desc_proto;
}
@@ -287,7 +297,7 @@ ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse(
void ProtoReflectionDescriptorDatabase::AddFileFromResponse(
const grpc::reflection::v1alpha::FileDescriptorResponse& response) {
for (int i = 0; i < response.file_descriptor_proto_size(); ++i) {
- const google::protobuf::FileDescriptorProto file_proto =
+ const protobuf::FileDescriptorProto file_proto =
ParseFileDescriptorProtoResponse(response.file_descriptor_proto(i));
if (known_files_.find(file_proto.name()) == known_files_.end()) {
known_files_.insert(file_proto.name());
diff --git a/test/cpp/util/proto_reflection_descriptor_database.h b/test/cpp/util/proto_reflection_descriptor_database.h
index 99c00675bb..eb7cf4907d 100644
--- a/test/cpp/util/proto_reflection_descriptor_database.h
+++ b/test/cpp/util/proto_reflection_descriptor_database.h
@@ -38,19 +38,19 @@
#include <unordered_set>
#include <vector>
-#include <google/protobuf/descriptor.h>
-#include <google/protobuf/descriptor.pb.h>
-#include <google/protobuf/descriptor_database.h>
+// GRPC_NO_GENERATED_CODE indicates generated pb files should not be used
+#ifdef GRPC_NO_GENERATED_CODE
+#include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h"
+#else
#include <grpc++/ext/reflection.grpc.pb.h>
+#endif // GRPC_NO_GENERATED_CODE
#include <grpc++/grpc++.h>
-
namespace grpc {
// ProtoReflectionDescriptorDatabase takes a stub of ServerReflection and
// provides the methods defined by DescriptorDatabase interfaces. It can be used
// to feed a DescriptorPool instance.
-class ProtoReflectionDescriptorDatabase
- : public google::protobuf::DescriptorDatabase {
+class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase {
public:
explicit ProtoReflectionDescriptorDatabase(
std::unique_ptr<reflection::v1alpha::ServerReflection::Stub> stub);
@@ -65,14 +65,13 @@ class ProtoReflectionDescriptorDatabase
// Find a file by file name. Fills in in *output and returns true if found.
// Otherwise, returns false, leaving the contents of *output undefined.
bool FindFileByName(const string& filename,
- google::protobuf::FileDescriptorProto* output)
- GRPC_OVERRIDE;
+ protobuf::FileDescriptorProto* output) GRPC_OVERRIDE;
// Find the file that declares the given fully-qualified symbol name.
// If found, fills in *output and returns true, otherwise returns false
// and leaves *output undefined.
bool FindFileContainingSymbol(const string& symbol_name,
- google::protobuf::FileDescriptorProto* output)
+ protobuf::FileDescriptorProto* output)
GRPC_OVERRIDE;
// Find the file which defines an extension extending the given message type
@@ -81,7 +80,7 @@ class ProtoReflectionDescriptorDatabase
// must be a fully-qualified type name.
bool FindFileContainingExtension(
const string& containing_type, int field_number,
- google::protobuf::FileDescriptorProto* output) GRPC_OVERRIDE;
+ protobuf::FileDescriptorProto* output) GRPC_OVERRIDE;
// Finds the tag numbers used by all known extensions of
// extendee_type, and appends them to output in an undefined
@@ -102,7 +101,7 @@ class ProtoReflectionDescriptorDatabase
grpc::reflection::v1alpha::ServerReflectionResponse>
ClientStream;
- const google::protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse(
+ const protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse(
const std::string& byte_fd_proto);
void AddFileFromResponse(
@@ -123,7 +122,7 @@ class ProtoReflectionDescriptorDatabase
std::unordered_map<string, std::vector<int>> cached_extension_numbers_;
std::mutex stream_mutex_;
- google::protobuf::SimpleDescriptorDatabase cached_db_;
+ protobuf::SimpleDescriptorDatabase cached_db_;
};
} // namespace grpc
diff --git a/test/distrib/python/run_distrib_test.sh b/test/distrib/python/run_distrib_test.sh
index 8a983bc248..0c2154838f 100755
--- a/test/distrib/python/run_distrib_test.sh
+++ b/test/distrib/python/run_distrib_test.sh
@@ -33,34 +33,47 @@ set -ex
cd $(dirname $0)
# Pick up the source dist archive whatever its version is
+SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.tar.gz
BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.whl
+TOOLS_SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.tar.gz
TOOLS_BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.whl
-if [ ! -f ${SDIST_ARCHIVE} ]
-then
- echo "Archive ${SDIST_ARCHIVE} does not exist."
- exit 1
-fi
+function make_virtualenv() {
+ virtualenv $1
+ $1/bin/python -m pip install --upgrade six pip
+ $1/bin/python -m pip install cython
+}
-PYTHON=python2
-PIP=pip2
-which $PYTHON || PYTHON=python
-which $PIP || PIP=pip
+function at_least_one_installs() {
+ for file in "$@"; do
+ if python -m pip install $file; then
+ return 0
+ fi
+ done
+ return -1
+}
-# TODO(jtattermusch): this shouldn't be required
-# TODO(jtattermusch): run the command twice to workaround docker-on-overlay
-# issue https://github.com/docker/docker/issues/12327
-# (first attempt will fail when using docker with overlayFS)
-${PIP} install --upgrade six pip || ${PIP} install --upgrade six pip
+make_virtualenv bdist_test
+make_virtualenv sdist_test
-# At least one of the bdist packages has to succeed (whichever one matches the
-# test machine, anyway).
-for bdist in ${BDIST_ARCHIVES} ${TOOLS_BDIST_ARCHIVES}; do
- ($PYTHON -m pip install $bdist) || true
-done
+#
+# Install our distributions in order of dependencies
+#
+
+(source bdist_test/bin/activate && at_least_one_installs ${BDIST_ARCHIVES})
+(source bdist_test/bin/activate && at_least_one_installs ${TOOLS_BDIST_ARCHIVES})
+
+(source sdist_test/bin/activate && at_least_one_installs ${SDIST_ARCHIVES})
+(source sdist_test/bin/activate && at_least_one_installs ${TOOLS_SDIST_ARCHIVES})
+
+#
+# Test our distributions
+#
# TODO(jtattermusch): add a .proto file to the distribtest, generate python
# code from it and then use the generated code from distribtest.py
-$PYTHON -m grpc.tools.protoc
+(source bdist_test/bin/activate && python -m grpc.tools.protoc --help)
+(source sdist_test/bin/activate && python -m grpc.tools.protoc --help)
-$PYTHON distribtest.py
+(source bdist_test/bin/activate && python distribtest.py)
+(source sdist_test/bin/activate && python distribtest.py)