diff options
author | David Garcia Quintas <dgq@google.com> | 2016-08-01 11:02:22 -0700 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2016-08-01 11:02:22 -0700 |
commit | 87e06c84fdbffe89316cd52dc68ae5180b73b278 (patch) | |
tree | 522b195d777f3dd7f076f79d957bf51c982d1712 /test | |
parent | 8d48911faa184cbf8eaa79e1d8efc37f02bef5d2 (diff) | |
parent | 2507fef7379aa54cbced074f1b1e7ba3b53eae26 (diff) |
Merge branch 'master' of github.com:grpc/grpc into grpclb_v0
Diffstat (limited to 'test')
29 files changed, 592 insertions, 433 deletions
diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index cca75f54a5..ab80adf0e0 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -71,6 +71,8 @@ #define UNPARSEABLE_DETAIL_MSG "Failed parsing HTTP/2" +#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server" + /* TODO(zyc) Check the content of incomming data instead of using this length */ #define EXPECTED_INCOMING_DATA_LENGTH (size_t)310 @@ -334,7 +336,7 @@ int main(int argc, char **argv) { /* http1 response */ run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE, - UNPARSEABLE_DETAIL_MSG); + HTTP1_DETAIL_MSG); return 0; } diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c index 50e3c9cb89..db45f5eb5a 100644 --- a/test/core/end2end/tests/high_initial_seqno.c +++ b/test/core/end2end/tests/high_initial_seqno.c @@ -203,6 +203,12 @@ static void simple_request_body(grpc_end2end_test_fixture f) { grpc_call_destroy(c); grpc_call_destroy(s); + /* TODO(ctiller): this rate limits the test, and it should be removed when + retry has been implemented; until then cross-thread chatter + may result in some requests needing to be cancelled due to + seqno exhaustion. */ + cq_verify_empty(cqv); + cq_verifier_destroy(cqv); } diff --git a/test/core/end2end/tests/network_status_change.c b/test/core/end2end/tests/network_status_change.c index 10207844ab..39ddc13754 100644 --- a/test/core/end2end/tests/network_status_change.c +++ b/test/core/end2end/tests/network_status_change.c @@ -186,9 +186,10 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == error); cq_expect_completion(cqv, tag(102), 1); + cq_verify(cqv); + // Simulate the network loss event grpc_network_status_shutdown_all_endpoints(); - cq_verify(cqv); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; @@ -205,7 +206,7 @@ static void test_invoke_network_status_change(grpc_end2end_test_config config) { op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - void shutdown_all_endpoints(); + cq_expect_completion(cqv, tag(103), 1); cq_expect_completion(cqv, tag(1), 1); cq_verify(cqv); diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 5e86c42309..27d630623e 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -77,11 +77,14 @@ static void test_code(void) { /* endpoint.h */ grpc_endpoint endpoint; - grpc_endpoint_vtable vtable = { - grpc_endpoint_read, grpc_endpoint_write, - grpc_endpoint_add_to_pollset, grpc_endpoint_add_to_pollset_set, - grpc_endpoint_shutdown, grpc_endpoint_destroy, - grpc_endpoint_get_peer}; + grpc_endpoint_vtable vtable = {grpc_endpoint_read, + grpc_endpoint_write, + grpc_endpoint_get_workqueue, + grpc_endpoint_add_to_pollset, + grpc_endpoint_add_to_pollset_set, + grpc_endpoint_shutdown, + grpc_endpoint_destroy, + grpc_endpoint_get_peer}; endpoint.vtable = &vtable; grpc_endpoint_read(&exec_ctx, &endpoint, NULL, NULL); diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 3152fb7a46..a959a7e07f 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -70,7 +70,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, g_number_of_reads++; g_number_of_bytes_read += (int)byte_count; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -179,8 +180,10 @@ static void test_receive(int number_of_clients) { while (g_number_of_reads == number_of_reads_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -199,7 +202,8 @@ static void test_receive(int number_of_clients) { GPR_ASSERT(g_number_of_orphan_calls == 1); } -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) { +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, + grpc_error *error) { grpc_pollset_destroy(p); } diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c deleted file mode 100644 index 76ecfae74b..0000000000 --- a/test/core/iomgr/workqueue_test.c +++ /dev/null @@ -1,150 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/lib/iomgr/workqueue.h" - -#include <grpc/grpc.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> - -#include "test/core/util/test_config.h" - -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; - -static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - gpr_mu_lock(g_mu); - *(int *)p = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); - gpr_mu_unlock(g_mu); -} - -static void test_ref_unref(void) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - GRPC_WORKQUEUE_REF(wq, "test"); - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "test"); - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void test_add_closure(void) { - grpc_closure c; - int done = 0; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - grpc_pollset_worker *worker = NULL; - grpc_closure_init(&c, must_succeed, &done); - - grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - - gpr_mu_lock(g_mu); - GPR_ASSERT(!done); - while (!done) { - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(deadline.clock_type), deadline))); - } - gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(done); - - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void test_flush(void) { - grpc_closure c; - int done = 0; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq; - GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", - grpc_workqueue_create(&exec_ctx, &wq))); - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - grpc_pollset_worker *worker = NULL; - grpc_closure_init(&c, must_succeed, &done); - - grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); - grpc_workqueue_flush(&exec_ctx, wq); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); - - gpr_mu_lock(g_mu); - GPR_ASSERT(!done); - while (!done) { - GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(deadline.clock_type), deadline))); - } - gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); - GPR_ASSERT(done); - - GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); - grpc_exec_ctx_finish(&exec_ctx); -} - -static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, - grpc_error *error) { - grpc_pollset_destroy(p); -} - -int main(int argc, char **argv) { - grpc_closure destroyed; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_test_init(argc, argv); - grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - - test_ref_unref(); - test_add_closure(); - test_flush(); - - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); - grpc_exec_ctx_finish(&exec_ctx); - grpc_shutdown(); - - gpr_free(g_pollset); - return 0; -} diff --git a/test/core/support/slice_test.c b/test/core/support/slice_test.c index 0da483a321..06c364b368 100644 --- a/test/core/support/slice_test.c +++ b/test/core/support/slice_test.c @@ -85,6 +85,27 @@ static void test_slice_new_returns_something_sensible(void) { gpr_slice_unref(slice); } +/* destroy function that sets a mark to indicate it was called. */ +static void set_mark(void *p) { *((int *)p) = 1; } + +static void test_slice_new_with_user_data(void) { + int marker = 0; + uint8_t buf[2]; + gpr_slice slice; + + buf[0] = 0; + buf[1] = 1; + slice = gpr_slice_new_with_user_data(buf, 2, set_mark, &marker); + GPR_ASSERT(marker == 0); + GPR_ASSERT(GPR_SLICE_LENGTH(slice) == 2); + GPR_ASSERT(GPR_SLICE_START_PTR(slice)[0] == 0); + GPR_ASSERT(GPR_SLICE_START_PTR(slice)[1] == 1); + + /* unref should cause destroy function to run. */ + gpr_slice_unref(slice); + GPR_ASSERT(marker == 1); +} + static int do_nothing_with_len_1_calls = 0; static void do_nothing_with_len_1(void *ignored, size_t len) { @@ -232,6 +253,7 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); test_slice_malloc_returns_something_sensible(); test_slice_new_returns_something_sensible(); + test_slice_new_with_user_data(); test_slice_new_with_len_returns_something_sensible(); for (length = 0; length < 128; length++) { test_slice_sub_works(length); diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index ed9545e9df..13e0e918fb 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -95,9 +95,17 @@ static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } +static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } + static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_peer, + me_read, + me_write, + me_get_workqueue, + me_add_to_pollset, + me_add_to_pollset_set, + me_shutdown, + me_destroy, + me_get_peer, }; grpc_endpoint *grpc_mock_endpoint_create(void (*on_write)(gpr_slice slice)) { diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index a39f3dd66e..7ed9e97bd6 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -140,9 +140,17 @@ static char *me_get_peer(grpc_endpoint *ep) { return gpr_strdup("fake:mock_endpoint"); } +static grpc_workqueue *me_get_workqueue(grpc_endpoint *ep) { return NULL; } + static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_peer, + me_read, + me_write, + me_get_workqueue, + me_add_to_pollset, + me_add_to_pollset_set, + me_shutdown, + me_destroy, + me_get_peer, }; static void half_init(half *m, passthru_endpoint *parent) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 6c7eae53a4..ac79fe8274 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -200,6 +200,10 @@ class Verifier { bool spin_; }; +bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) { + return plugin->has_sync_methods(); +} + // This class disables the server builder plugins that may add sync services to // the server. If there are sync services, UnimplementedRpc test will triger // the sync unkown rpc routine on the server side, rather than the async one @@ -210,14 +214,9 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption { void UpdatePlugins(std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) GRPC_OVERRIDE { - auto plugin = plugins->begin(); - while (plugin != plugins->end()) { - if ((*plugin)->has_sync_methods()) { - plugins->erase(plugin++); - } else { - plugin++; - } - } + plugins->erase(std::remove_if(plugins->begin(), plugins->end(), + plugin_has_sync_methods), + plugins->end()); } }; @@ -345,6 +344,31 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) { SendRpc(10); } +// We do not need to protect notify because the use is synchronized. +void ServerWait(Server* server, int* notify) { + server->Wait(); + *notify = 1; +} +TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) { + int notify = 0; + std::thread* wait_thread = + new std::thread(&ServerWait, server_.get(), ¬ify); + ResetStub(); + SendRpc(1); + EXPECT_EQ(0, notify); + server_->Shutdown(); + wait_thread->join(); + EXPECT_EQ(1, notify); + delete wait_thread; +} + +TEST_P(AsyncEnd2endTest, ShutdownThenWait) { + ResetStub(); + SendRpc(1); + server_->Shutdown(); + server_->Wait(); +} + // Test a simple RPC using the async version of Next TEST_P(AsyncEnd2endTest, AsyncNextRpc) { ResetStub(); diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 354a59cedd..46a58d3ac3 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -1166,6 +1166,9 @@ TEST_P(ProxyEnd2endTest, HugeResponse) { request.mutable_param()->set_response_message_length(kResponseSize); ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(20); + context.set_deadline(deadline); Status s = stub_->Echo(&context, request, &response); EXPECT_EQ(kResponseSize, response.message().size()); EXPECT_TRUE(s.ok()); @@ -1411,7 +1414,7 @@ TEST_P(SecureEnd2endTest, ClientAuthContext) { std::shared_ptr<const AuthContext> auth_ctx = context.auth_context(); std::vector<grpc::string_ref> tst = auth_ctx->FindPropertyValues("transport_security_type"); - EXPECT_EQ(1u, tst.size()); + ASSERT_EQ(1u, tst.size()); EXPECT_EQ(GetParam().credentials_type, ToString(tst[0])); if (GetParam().credentials_type == kTlsCredentialsType) { EXPECT_EQ("x509_subject_alternative_name", diff --git a/test/cpp/end2end/proto_server_reflection_test.cc b/test/cpp/end2end/proto_server_reflection_test.cc index f8fc39b553..efbb0e1f8e 100644 --- a/test/cpp/end2end/proto_server_reflection_test.cc +++ b/test/cpp/end2end/proto_server_reflection_test.cc @@ -31,7 +31,6 @@ * */ -#include <google/protobuf/descriptor.h> #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> @@ -59,7 +58,7 @@ class ProtoServerReflectionTest : public ::testing::Test { void SetUp() GRPC_OVERRIDE { port_ = grpc_pick_unused_port_or_die(); - ref_desc_pool_ = google::protobuf::DescriptorPool::generated_pool(); + ref_desc_pool_ = protobuf::DescriptorPool::generated_pool(); ServerBuilder builder; grpc::string server_address = "localhost:" + to_string(port_); @@ -73,7 +72,7 @@ class ProtoServerReflectionTest : public ::testing::Test { CreateChannel(target, InsecureChannelCredentials()); stub_ = grpc::testing::EchoTestService::NewStub(channel); desc_db_.reset(new ProtoReflectionDescriptorDatabase(channel)); - desc_pool_.reset(new google::protobuf::DescriptorPool(desc_db_.get())); + desc_pool_.reset(new protobuf::DescriptorPool(desc_db_.get())); } string to_string(const int number) { @@ -83,15 +82,15 @@ class ProtoServerReflectionTest : public ::testing::Test { } void CompareService(const grpc::string& service) { - const google::protobuf::ServiceDescriptor* service_desc = + const protobuf::ServiceDescriptor* service_desc = desc_pool_->FindServiceByName(service); - const google::protobuf::ServiceDescriptor* ref_service_desc = + const protobuf::ServiceDescriptor* ref_service_desc = ref_desc_pool_->FindServiceByName(service); EXPECT_TRUE(service_desc != nullptr); EXPECT_TRUE(ref_service_desc != nullptr); EXPECT_EQ(service_desc->DebugString(), ref_service_desc->DebugString()); - const google::protobuf::FileDescriptor* file_desc = service_desc->file(); + const protobuf::FileDescriptor* file_desc = service_desc->file(); if (known_files_.find(file_desc->package() + "/" + file_desc->name()) != known_files_.end()) { EXPECT_EQ(file_desc->DebugString(), @@ -105,9 +104,9 @@ class ProtoServerReflectionTest : public ::testing::Test { } void CompareMethod(const grpc::string& method) { - const google::protobuf::MethodDescriptor* method_desc = + const protobuf::MethodDescriptor* method_desc = desc_pool_->FindMethodByName(method); - const google::protobuf::MethodDescriptor* ref_method_desc = + const protobuf::MethodDescriptor* ref_method_desc = ref_desc_pool_->FindMethodByName(method); EXPECT_TRUE(method_desc != nullptr); EXPECT_TRUE(ref_method_desc != nullptr); @@ -122,9 +121,8 @@ class ProtoServerReflectionTest : public ::testing::Test { return; } - const google::protobuf::Descriptor* desc = - desc_pool_->FindMessageTypeByName(type); - const google::protobuf::Descriptor* ref_desc = + const protobuf::Descriptor* desc = desc_pool_->FindMessageTypeByName(type); + const protobuf::Descriptor* ref_desc = ref_desc_pool_->FindMessageTypeByName(type); EXPECT_TRUE(desc != nullptr); EXPECT_TRUE(ref_desc != nullptr); @@ -135,10 +133,10 @@ class ProtoServerReflectionTest : public ::testing::Test { std::unique_ptr<Server> server_; std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<ProtoReflectionDescriptorDatabase> desc_db_; - std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_; + std::unique_ptr<protobuf::DescriptorPool> desc_pool_; std::unordered_set<string> known_files_; std::unordered_set<string> known_types_; - const google::protobuf::DescriptorPool* ref_desc_pool_; + const protobuf::DescriptorPool* ref_desc_pool_; int port_; reflection::ProtoServerReflectionPlugin plugin_; }; diff --git a/test/cpp/end2end/server_builder_plugin_test.cc b/test/cpp/end2end/server_builder_plugin_test.cc index 778a2be573..b967a5d1e9 100644 --- a/test/cpp/end2end/server_builder_plugin_test.cc +++ b/test/cpp/end2end/server_builder_plugin_test.cc @@ -191,7 +191,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { // we run some tests without a service, and for those we need to supply a // frequently polled completion queue cq_ = builder_->AddCompletionQueue(); - cq_thread_ = grpc::thread(std::bind(&ServerBuilderPluginTest::RunCQ, this)); + cq_thread_ = new grpc::thread(&ServerBuilderPluginTest::RunCQ, this); server_ = builder_->BuildAndStart(); EXPECT_TRUE(CheckPresent()); } @@ -209,7 +209,8 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { EXPECT_TRUE(plugin->finish_is_called()); server_->Shutdown(); cq_->Shutdown(); - cq_thread_.join(); + cq_thread_->join(); + delete cq_thread_; } string to_string(const int number) { @@ -224,7 +225,7 @@ class ServerBuilderPluginTest : public ::testing::TestWithParam<bool> { std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; std::unique_ptr<ServerCompletionQueue> cq_; std::unique_ptr<Server> server_; - grpc::thread cq_thread_; + grpc::thread* cq_thread_; TestServiceImpl service_; int port_; diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 047bd16408..4045e13460 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -112,6 +112,21 @@ class ClientRequestCreator<ByteBuffer> { } }; +class HistogramEntry GRPC_FINAL { + public: + HistogramEntry() : used_(false) {} + bool used() const { return used_; } + double value() const { return value_; } + void set_value(double v) { + used_ = true; + value_ = v; + } + + private: + bool used_; + double value_; +}; + class Client { public: Client() : timer_(new UsageTimer), interarrival_timer_() {} @@ -151,10 +166,21 @@ class Client { return stats; } + // Must call AwaitThreadsCompletion before destructor to avoid a race + // between destructor and invocation of virtual ThreadFunc + void AwaitThreadsCompletion() { + DestroyMultithreading(); + std::unique_lock<std::mutex> g(thread_completion_mu_); + while (threads_remaining_ != 0) { + threads_complete_.wait(g); + } + } + protected: bool closed_loop_; void StartThreads(size_t num_threads) { + threads_remaining_ = num_threads; for (size_t i = 0; i < num_threads; i++) { threads_.emplace_back(new Thread(this, i)); } @@ -162,7 +188,8 @@ class Client { void EndThreads() { threads_.clear(); } - virtual bool ThreadFunc(Histogram* histogram, size_t thread_idx) = 0; + virtual void DestroyMultithreading() = 0; + virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -215,7 +242,6 @@ class Client { public: Thread(Client* client, size_t idx) : done_(false), - new_stats_(nullptr), client_(client), idx_(idx), impl_(&Thread::ThreadFunc, this) {} @@ -230,15 +256,10 @@ class Client { void BeginSwap(Histogram* n) { std::lock_guard<std::mutex> g(mu_); - new_stats_ = n; + n->Swap(&histogram_); } - void EndSwap() { - std::unique_lock<std::mutex> g(mu_); - while (new_stats_ != nullptr) { - cv_.wait(g); - }; - } + void EndSwap() {} void MergeStatsInto(Histogram* hist) { std::unique_lock<std::mutex> g(mu_); @@ -252,29 +273,26 @@ class Client { void ThreadFunc() { for (;;) { // run the loop body - const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_); - // lock, see if we're done + HistogramEntry entry; + const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); + // lock, update histogram if needed and see if we're done std::lock_guard<std::mutex> g(mu_); + if (entry.used()) { + histogram_.Add(entry.value()); + } if (!thread_still_ok) { gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); done_ = true; } if (done_) { + client_->CompleteThread(); return; } - // check if we're resetting stats, swap out the histogram if so - if (new_stats_) { - new_stats_->Swap(&histogram_); - new_stats_ = nullptr; - cv_.notify_one(); - } } } std::mutex mu_; - std::condition_variable cv_; bool done_; - Histogram* new_stats_; Histogram histogram_; Client* client_; const size_t idx_; @@ -286,6 +304,18 @@ class Client { InterarrivalTimer interarrival_timer_; std::vector<gpr_timespec> next_time_; + + std::mutex thread_completion_mu_; + size_t threads_remaining_; + std::condition_variable threads_complete_; + + void CompleteThread() { + std::lock_guard<std::mutex> g(thread_completion_mu_); + threads_remaining_--; + if (threads_remaining_ == 0) { + threads_complete_.notify_all(); + } + } }; template <class StubType, class RequestType> diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 1507d1e3d6..5d9cb4bd0c 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -31,7 +31,6 @@ * */ -#include <cassert> #include <forward_list> #include <functional> #include <list> @@ -48,7 +47,6 @@ #include <grpc++/generic/generic_stub.h> #include <grpc/grpc.h> #include <grpc/support/cpu.h> -#include <grpc/support/histogram.h> #include <grpc/support/log.h> #include "src/proto/grpc/testing/services.grpc.pb.h" @@ -64,7 +62,7 @@ class ClientRpcContext { ClientRpcContext() {} virtual ~ClientRpcContext() {} // next state, return false if done. Collect stats when appropriate - virtual bool RunNextState(bool, Histogram* hist) = 0; + virtual bool RunNextState(bool, HistogramEntry* entry) = 0; virtual ClientRpcContext* StartNewClone() = 0; static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); } static ClientRpcContext* detag(void* t) { @@ -104,7 +102,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { alarm_.reset(new Alarm(cq_, next_issue_(), ClientRpcContext::tag(this))); } } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { switch (next_state_) { case State::READY: start_ = UsageTimer::Now(); @@ -114,7 +112,7 @@ class ClientRpcContextUnaryImpl : public ClientRpcContext { next_state_ = State::RESP_DONE; return true; case State::RESP_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::INVALID; return false; @@ -176,6 +174,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { for (int i = 0; i < num_async_threads_; i++) { cli_cqs_.emplace_back(new CompletionQueue); next_issuers_.emplace_back(NextIssuer(i)); + shutdown_state_.emplace_back(new PerThreadShutdownState()); } using namespace std::placeholders; @@ -192,7 +191,6 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } virtual ~AsyncClient() { for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { - (*cq)->Shutdown(); void* got_tag; bool ok; while ((*cq)->Next(&got_tag, &ok)) { @@ -201,32 +199,16 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } } - bool ThreadFunc(Histogram* histogram, - size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { - void* got_tag; - bool ok; - - if (cli_cqs_[thread_idx]->Next(&got_tag, &ok)) { - // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); - if (!ctx->RunNextState(ok, histogram)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - auto clone = ctx->StartNewClone(); - clone->Start(cli_cqs_[thread_idx].get()); - // delete the old version - delete ctx; - } - return true; - } else { // queue is shutting down - return false; - } - } - protected: const int num_async_threads_; private: + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} + }; + int NumThreads(const ClientConfig& config) { int num_threads = config.async_client_threads(); if (num_threads <= 0) { // Use dynamic sizing @@ -235,9 +217,60 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { } return num_threads; } + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + std::lock_guard<std::mutex> lock((*ss)->mutex); + (*ss)->shutdown = true; + } + for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) { + (*cq)->Shutdown(); + } + this->EndThreads(); // this needed for resolution + } + + bool ThreadFunc(HistogramEntry* entry, + size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL { + void* got_tag; + bool ok; + + switch (cli_cqs_[thread_idx]->AsyncNext( + &got_tag, &ok, + std::chrono::system_clock::now() + std::chrono::milliseconds(10))) { + case CompletionQueue::GOT_EVENT: { + // Got a regular event, so process it + ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + return true; + } else if (!ctx->RunNextState(ok, entry)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + auto clone = ctx->StartNewClone(); + clone->Start(cli_cqs_[thread_idx].get()); + // delete the old version + delete ctx; + } + return true; + } + case CompletionQueue::TIMEOUT: { + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { + return true; + } + return true; + } + case CompletionQueue::SHUTDOWN: // queue is shutting down, so we must be + // done + return true; + } + GPR_UNREACHABLE_CODE(return true); + } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; std::vector<std::function<gpr_timespec()>> next_issuers_; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator( @@ -253,7 +286,7 @@ class AsyncUnaryClient GRPC_FINAL config, SetupCtx, BenchmarkStubCreator) { StartThreads(num_async_threads_); } - ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncUnaryClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -298,7 +331,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -330,7 +363,7 @@ class ClientRpcContextStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -382,7 +415,7 @@ class AsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~AsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, SimpleResponse* response) {} @@ -430,7 +463,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { ClientRpcContext::tag(this)); next_state_ = State::STREAM_IDLE; } - bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE { + bool RunNextState(bool ok, HistogramEntry* entry) GRPC_OVERRIDE { while (true) { switch (next_state_) { case State::STREAM_IDLE: @@ -462,7 +495,7 @@ class ClientRpcContextGenericStreamingImpl : public ClientRpcContext { return true; break; case State::READ_DONE: - hist->Add((UsageTimer::Now() - start_) * 1e9); + entry->set_value((UsageTimer::Now() - start_) * 1e9); callback_(status_, &response_); next_state_ = State::STREAM_IDLE; break; // loop around @@ -518,7 +551,7 @@ class GenericAsyncStreamingClient GRPC_FINAL StartThreads(num_async_threads_); } - ~GenericAsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); } + ~GenericAsyncStreamingClient() GRPC_OVERRIDE {} private: static void CheckDone(grpc::Status s, ByteBuffer* response) {} diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index c88e95b80e..25c7823553 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -31,7 +31,6 @@ * */ -#include <cassert> #include <chrono> #include <memory> #include <mutex> @@ -46,7 +45,6 @@ #include <grpc++/server_builder.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> -#include <grpc/support/histogram.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -55,7 +53,6 @@ #include "src/core/lib/profiling/timers.h" #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/client.h" -#include "test/cpp/qps/histogram.h" #include "test/cpp/qps/interarrival.h" #include "test/cpp/qps/usage_timer.h" @@ -90,6 +87,9 @@ class SynchronousClient size_t num_threads_; std::vector<SimpleResponse> responses_; + + private: + void DestroyMultithreading() GRPC_OVERRIDE GRPC_FINAL { EndThreads(); } }; class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { @@ -98,9 +98,9 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { : SynchronousClient(config) { StartThreads(num_threads_); } - ~SynchronousUnaryClient() { EndThreads(); } + ~SynchronousUnaryClient() {} - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); auto* stub = channels_[thread_idx % channels_.size()].get_stub(); double start = UsageTimer::Now(); @@ -108,7 +108,7 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::ClientContext context; grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return s.ok(); } }; @@ -127,25 +127,29 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient { StartThreads(num_threads_); } ~SynchronousStreamingClient() { - EndThreads(); - for (auto stream = &stream_[0]; stream != &stream_[num_threads_]; - stream++) { + for (size_t i = 0; i < num_threads_; i++) { + auto stream = &stream_[i]; if (*stream) { (*stream)->WritesDone(); - EXPECT_TRUE((*stream)->Finish().ok()); + Status s = (*stream)->Finish(); + EXPECT_TRUE(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Stream %zu received an error %s", i, + s.error_message().c_str()); + } } } delete[] stream_; delete[] context_; } - bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE { + bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) GRPC_OVERRIDE { WaitToIssue(thread_idx); GPR_TIMER_SCOPE("SynchronousStreamingClient::ThreadFunc", 0); double start = UsageTimer::Now(); if (stream_[thread_idx]->Write(request_) && stream_[thread_idx]->Read(&responses_[thread_idx])) { - histogram->Add((UsageTimer::Now() - start) * 1e9); + entry->set_value((UsageTimer::Now() - start) * 1e9); return true; } return false; diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc index 08bf045883..2aeaea51f2 100644 --- a/test/cpp/qps/driver.cc +++ b/test/cpp/qps/driver.cc @@ -87,7 +87,7 @@ static std::unordered_map<string, std::deque<int>> get_hosts_and_cores( CoreRequest dummy; CoreResponse cores; grpc::Status s = stub->CoreCount(&ctx, dummy, &cores); - assert(s.ok()); + GPR_ASSERT(s.ok()); std::deque<int> dq; for (int i = 0; i < cores.cores(); i++) { dq.push_back(i); @@ -289,9 +289,13 @@ std::unique_ptr<ScenarioResult> RunScenario( *args.mutable_setup() = server_config; servers[i].stream = servers[i].stub->RunServer(runsc::AllocContext(&contexts)); - GPR_ASSERT(servers[i].stream->Write(args)); + if (!servers[i].stream->Write(args)) { + gpr_log(GPR_ERROR, "Could not write args to server %zu", i); + } ServerStatus init_status; - GPR_ASSERT(servers[i].stream->Read(&init_status)); + if (!servers[i].stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i); + } gpr_join_host_port(&cli_target, host, init_status.port()); client_config.add_server_targets(cli_target); gpr_free(host); @@ -345,9 +349,13 @@ std::unique_ptr<ScenarioResult> RunScenario( *args.mutable_setup() = per_client_config; clients[i].stream = clients[i].stub->RunClient(runsc::AllocContext(&contexts)); - GPR_ASSERT(clients[i].stream->Write(args)); + if (!clients[i].stream->Write(args)) { + gpr_log(GPR_ERROR, "Could not write args to client %zu", i); + } ClientStatus init_status; - GPR_ASSERT(clients[i].stream->Read(&init_status)); + if (!clients[i].stream->Read(&init_status)) { + gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i); + } } // Let everything warmup @@ -362,19 +370,31 @@ std::unique_ptr<ScenarioResult> RunScenario( server_mark.mutable_mark()->set_reset(true); ClientArgs client_mark; client_mark.mutable_mark()->set_reset(true); - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Write(server_mark)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Write(server_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Write(client_mark)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Write(client_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i); + } } ServerStatus server_status; ClientStatus client_status; - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Read(&server_status)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Read(&server_status)) { + gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Read(&client_status)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Read(&client_status)) { + gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i); + } } // Wait some time @@ -390,37 +410,73 @@ std::unique_ptr<ScenarioResult> RunScenario( Histogram merged_latencies; gpr_log(GPR_INFO, "Finishing clients"); - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Write(client_mark)); - GPR_ASSERT(client->stream->WritesDone()); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + if (!client->stream->Write(client_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i); + } + if (!client->stream->WritesDone()) { + gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Read(&client_status)); - const auto& stats = client_status.stats(); - merged_latencies.MergeProto(stats.latencies()); - result->add_client_stats()->CopyFrom(stats); - GPR_ASSERT(!client->stream->Read(&client_status)); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + // Read the client final status + if (client->stream->Read(&client_status)) { + gpr_log(GPR_INFO, "Received final status from client %zu", i); + const auto& stats = client_status.stats(); + merged_latencies.MergeProto(stats.latencies()); + result->add_client_stats()->CopyFrom(stats); + // That final status should be the last message on the client stream + GPR_ASSERT(!client->stream->Read(&client_status)); + } else { + gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i); + } } - for (auto client = &clients[0]; client != &clients[num_clients]; client++) { - GPR_ASSERT(client->stream->Finish().ok()); + for (size_t i = 0; i < num_clients; i++) { + auto client = &clients[i]; + Status s = client->stream->Finish(); + result->add_client_success(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Client %zu had an error %s", i, + s.error_message().c_str()); + } } delete[] clients; merged_latencies.FillProto(result->mutable_latencies()); gpr_log(GPR_INFO, "Finishing servers"); - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Write(server_mark)); - GPR_ASSERT(server->stream->WritesDone()); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + if (!server->stream->Write(server_mark)) { + gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i); + } + if (!server->stream->WritesDone()) { + gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i); + } } - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Read(&server_status)); - result->add_server_stats()->CopyFrom(server_status.stats()); - result->add_server_cores(server_status.cores()); - GPR_ASSERT(!server->stream->Read(&server_status)); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + // Read the server final status + if (server->stream->Read(&server_status)) { + gpr_log(GPR_INFO, "Received final status from server %zu", i); + result->add_server_stats()->CopyFrom(server_status.stats()); + result->add_server_cores(server_status.cores()); + // That final status should be the last message on the server stream + GPR_ASSERT(!server->stream->Read(&server_status)); + } else { + gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i); + } } - for (auto server = &servers[0]; server != &servers[num_servers]; server++) { - GPR_ASSERT(server->stream->Finish().ok()); + for (size_t i = 0; i < num_servers; i++) { + auto server = &servers[i]; + Status s = server->stream->Finish(); + result->add_server_success(s.ok()); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Server %zu had an error %s", i, + s.error_message().c_str()); + } } delete[] servers; @@ -429,8 +485,9 @@ std::unique_ptr<ScenarioResult> RunScenario( return result; } -void RunQuit() { +bool RunQuit() { // Get client, server lists + bool result = true; auto workers = get_workers("QPS_WORKERS"); for (size_t i = 0; i < workers.size(); i++) { auto stub = WorkerService::NewStub( @@ -438,8 +495,14 @@ void RunQuit() { Void dummy; grpc::ClientContext ctx; ctx.set_fail_fast(false); - GPR_ASSERT(stub->QuitWorker(&ctx, dummy, &dummy).ok()); + Status s = stub->QuitWorker(&ctx, dummy, &dummy); + if (!s.ok()) { + gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i, + s.error_message().c_str()); + result = false; + } } + return result; } } // namespace testing diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h index 3a5cf138f1..93f4370caf 100644 --- a/test/cpp/qps/driver.h +++ b/test/cpp/qps/driver.h @@ -47,7 +47,7 @@ std::unique_ptr<ScenarioResult> RunScenario( const grpc::testing::ServerConfig& server_config, size_t num_servers, int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count); -void RunQuit(); +bool RunQuit(); } // namespace testing } // namespace grpc diff --git a/test/cpp/qps/gen_build_yaml.py b/test/cpp/qps/gen_build_yaml.py index 34b8151441..4ff4e44b8b 100755 --- a/test/cpp/qps/gen_build_yaml.py +++ b/test/cpp/qps/gen_build_yaml.py @@ -45,9 +45,10 @@ import performance.scenario_config as scenario_config def _scenario_json_string(scenario_json): # tweak parameters to get fast test times - scenario_json['warmup_seconds'] = 1 + scenario_json['warmup_seconds'] = 0 scenario_json['benchmark_seconds'] = 1 - return json.dumps(scenario_config.remove_nonproto_fields(scenario_json)) + scenarios_json = {'scenarios': [scenario_config.remove_nonproto_fields(scenario_json)]} + return json.dumps(scenarios_json) def threads_of_type(scenario_json, path): d = scenario_json @@ -72,8 +73,7 @@ print yaml.dump({ { 'name': 'json_run_localhost', 'shortname': 'json_run_localhost:%s' % scenario_json['name'], - 'args': ['--scenario_json', - pipes.quote(_scenario_json_string(scenario_json))], + 'args': ['--scenarios_json', _scenario_json_string(scenario_json)], 'ci_platforms': ['linux', 'mac', 'posix', 'windows'], 'platforms': ['linux', 'mac', 'posix', 'windows'], 'flaky': False, @@ -81,7 +81,8 @@ print yaml.dump({ 'boringssl': True, 'defaults': 'boringssl', 'cpu_cost': guess_cpu(scenario_json), - 'exclude_configs': [] + 'exclude_configs': [], + 'timeout_seconds': 3*60 } for scenario_json in scenario_config.CXXLanguage().scenarios() ] diff --git a/test/cpp/qps/json_run_localhost.cc b/test/cpp/qps/json_run_localhost.cc index 6545dc2917..74e40fbf1a 100644 --- a/test/cpp/qps/json_run_localhost.cc +++ b/test/cpp/qps/json_run_localhost.cc @@ -75,7 +75,7 @@ int main(int argc, char** argv) { for (int i = 1; i < argc; i++) { args.push_back(argv[i]); } - SubProcess(args).Join(); + GPR_ASSERT(SubProcess(args).Join() == 0); for (auto it = jobs.begin(); it != jobs.end(); ++it) { (*it)->Interrupt(); diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc index f5d739f893..1524ebbc38 100644 --- a/test/cpp/qps/qps_json_driver.cc +++ b/test/cpp/qps/qps_json_driver.cc @@ -53,7 +53,7 @@ DEFINE_bool(quit, false, "Quit the workers"); namespace grpc { namespace testing { -static void QpsDriver() { +static bool QpsDriver() { grpc::string json; bool scfile = (FLAGS_scenarios_file != ""); @@ -81,13 +81,13 @@ static void QpsDriver() { } else if (scjson) { json = FLAGS_scenarios_json.c_str(); } else if (FLAGS_quit) { - RunQuit(); - return; + return RunQuit(); } // Parse into an array of scenarios Scenarios scenarios; ParseJson(json.c_str(), "grpc.testing.Scenarios", &scenarios); + bool success = true; // Make sure that there is at least some valid scenario here GPR_ASSERT(scenarios.scenarios_size() > 0); @@ -109,7 +109,15 @@ static void QpsDriver() { GetReporter()->ReportQPSPerCore(*result); GetReporter()->ReportLatency(*result); GetReporter()->ReportTimes(*result); + + for (int i = 0; success && i < result->client_success_size(); i++) { + success = result->client_success(i); + } + for (int i = 0; success && i < result->server_success_size(); i++) { + success = result->server_success(i); + } } + return success; } } // namespace testing @@ -118,7 +126,7 @@ static void QpsDriver() { int main(int argc, char **argv) { grpc::testing::InitBenchmark(&argc, &argv, true); - grpc::testing::QpsDriver(); + bool ok = grpc::testing::QpsDriver(); - return 0; + return ok ? 0 : 1; } diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc index f514e23e85..d3e53fe14a 100644 --- a/test/cpp/qps/qps_worker.cc +++ b/test/cpp/qps/qps_worker.cc @@ -33,7 +33,6 @@ #include "test/cpp/qps/qps_worker.h" -#include <cassert> #include <memory> #include <mutex> #include <sstream> @@ -124,11 +123,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Client worker busy"); } ScopedProfile profile("qps_client.prof", false); Status ret = RunClientBody(ctx, stream); + gpr_log(GPR_INFO, "RunClient: Returning"); return ret; } @@ -137,11 +137,12 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Server worker busy"); } ScopedProfile profile("qps_server.prof", false); Status ret = RunServerBody(ctx, stream); + gpr_log(GPR_INFO, "RunServer: Returning"); return ret; } @@ -154,7 +155,7 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { Status QuitWorker(ServerContext* ctx, const Void*, Void*) GRPC_OVERRIDE { InstanceGuard g(this); if (!g.Acquired()) { - return Status(StatusCode::RESOURCE_EXHAUSTED, ""); + return Status(StatusCode::RESOURCE_EXHAUSTED, "Quitting worker busy"); } worker_->MarkDone(); @@ -197,33 +198,38 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ServerReaderWriter<ClientStatus, ClientArgs>* stream) { ClientArgs args; if (!stream->Read(&args)) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read args"); } if (!args.has_setup()) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid setup arg"); } gpr_log(GPR_INFO, "RunClientBody: about to create client"); auto client = CreateClient(args.setup()); if (!client) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create client"); } gpr_log(GPR_INFO, "RunClientBody: client created"); ClientStatus status; if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, ""); + return Status(StatusCode::UNKNOWN, "Client couldn't report init status"); } gpr_log(GPR_INFO, "RunClientBody: creation status reported"); while (stream->Read(&args)) { gpr_log(GPR_INFO, "RunClientBody: Message read"); if (!args.has_mark()) { gpr_log(GPR_INFO, "RunClientBody: Message is not a mark!"); - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark"); } *status.mutable_stats() = client->Mark(args.mark().reset()); - stream->Write(status); + if (!stream->Write(status)) { + return Status(StatusCode::UNKNOWN, "Client couldn't respond to mark"); + } gpr_log(GPR_INFO, "RunClientBody: Mark response given"); } + gpr_log(GPR_INFO, "RunClientBody: Awaiting Threads Completion"); + client->AwaitThreadsCompletion(); + gpr_log(GPR_INFO, "RunClientBody: Returning"); return Status::OK; } @@ -232,10 +238,10 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { ServerReaderWriter<ServerStatus, ServerArgs>* stream) { ServerArgs args; if (!stream->Read(&args)) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't read server args"); } if (!args.has_setup()) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Bad server creation args"); } if (server_port_ != 0) { args.mutable_setup()->set_port(server_port_); @@ -243,24 +249,26 @@ class WorkerServiceImpl GRPC_FINAL : public WorkerService::Service { gpr_log(GPR_INFO, "RunServerBody: about to create server"); auto server = CreateServer(args.setup()); if (!server) { - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Couldn't create server"); } gpr_log(GPR_INFO, "RunServerBody: server created"); ServerStatus status; status.set_port(server->port()); status.set_cores(server->cores()); if (!stream->Write(status)) { - return Status(StatusCode::UNKNOWN, ""); + return Status(StatusCode::UNKNOWN, "Server couldn't report init status"); } gpr_log(GPR_INFO, "RunServerBody: creation status reported"); while (stream->Read(&args)) { gpr_log(GPR_INFO, "RunServerBody: Message read"); if (!args.has_mark()) { gpr_log(GPR_INFO, "RunServerBody: Message not a mark!"); - return Status(StatusCode::INVALID_ARGUMENT, ""); + return Status(StatusCode::INVALID_ARGUMENT, "Invalid mark"); } *status.mutable_stats() = server->Mark(args.mark().reset()); - stream->Write(status); + if (!stream->Write(status)) { + return Status(StatusCode::UNKNOWN, "Server couldn't respond to mark"); + } gpr_log(GPR_INFO, "RunServerBody: Mark response given"); } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index c9954d0d02..dea8746331 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -102,7 +102,7 @@ class AsyncQpsServerTest : public Server { auto process_rpc_bound = std::bind(process_rpc, config.payload_config(), _1, _2); - for (int i = 0; i < 10000 / num_threads; i++) { + for (int i = 0; i < 15000; i++) { for (int j = 0; j < num_threads; j++) { if (request_unary_function) { auto request_unary = @@ -123,21 +123,24 @@ class AsyncQpsServerTest : public Server { for (int i = 0; i < num_threads; i++) { shutdown_state_.emplace_back(new PerThreadShutdownState()); - } - for (int i = 0; i < num_threads; i++) { threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i); } } ~AsyncQpsServerTest() { for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { - (*ss)->set_shutdown(); + std::lock_guard<std::mutex> lock((*ss)->mutex); + (*ss)->shutdown = true; + } + // TODO (vpai): Remove this deadline and allow Shutdown to finish properly + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3); + server_->Shutdown(deadline); + for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { + (*cq)->Shutdown(); } - server_->Shutdown(); for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); } for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) { - (*cq)->Shutdown(); bool ok; void *got_tag; while ((*cq)->Next(&got_tag, &ok)) @@ -150,22 +153,24 @@ class AsyncQpsServerTest : public Server { } private: - void ThreadFunc(int rank) { + void ThreadFunc(int thread_idx) { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[rank]->Next(&got_tag, &ok)) { + while (srv_cqs_[thread_idx]->Next(&got_tag, &ok)) { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke - const bool still_going = ctx->RunNextState(ok); - if (!shutdown_state_[rank]->shutdown()) { - // this RPC context is done, so refresh it - if (!still_going) { - ctx->Reset(); - } - } else { + // Proceed while holding a lock to make sure that + // this thread isn't supposed to shut down + std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + if (shutdown_state_[thread_idx]->shutdown) { return; } + const bool still_going = ctx->RunNextState(ok); + // if this RPC context is done, refresh it + if (!still_going) { + ctx->Reset(); + } } return; } @@ -333,24 +338,12 @@ class AsyncQpsServerTest : public Server { ServiceType async_service_; std::forward_list<ServerRpcContext *> contexts_; - class PerThreadShutdownState { - public: - PerThreadShutdownState() : shutdown_(false) {} - - bool shutdown() const { - std::lock_guard<std::mutex> lock(mutex_); - return shutdown_; - } - - void set_shutdown() { - std::lock_guard<std::mutex> lock(mutex_); - shutdown_ = true; - } - - private: - mutable std::mutex mutex_; - bool shutdown_; + struct PerThreadShutdownState { + mutable std::mutex mutex; + bool shutdown; + PerThreadShutdownState() : shutdown(false) {} }; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; diff --git a/test/cpp/util/grpc_cli.cc b/test/cpp/util/grpc_cli.cc index c52e48bae6..53529da782 100644 --- a/test/cpp/util/grpc_cli.cc +++ b/test/cpp/util/grpc_cli.cc @@ -34,18 +34,21 @@ /* A command line tool to talk to a grpc server. Example of talking to grpc interop server: - grpc_cli call localhost:50051 UnaryCall src/proto/grpc/testing/test.proto \ - "response_size:10" --enable_ssl=false + grpc_cli call localhost:50051 UnaryCall "response_size:10" \ + --protofiles=src/proto/grpc/testing/test.proto --enable_ssl=false Options: - 1. --proto_path, if your proto file is not under current working directory, + 1. --protofiles, use this flag to provide a proto file if the server does + does not have the reflection service. + 2. --proto_path, if your proto file is not under current working directory, use this flag to provide a search root. It should work similar to the - counterpart in protoc. - 2. --metadata specifies metadata to be sent to the server, such as: + counterpart in protoc. This option is valid only when protofiles is + provided. + 3. --metadata specifies metadata to be sent to the server, such as: --metadata="MyHeaderKey1:Value1:MyHeaderKey2:Value2" - 3. --enable_ssl, whether to use tls. - 4. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call - 3. --input_binary_file, a file containing the serialized request. The file + 4. --enable_ssl, whether to use tls. + 5. --use_auth, if set to true, attach a GoogleDefaultCredentials to the call + 6. --input_binary_file, a file containing the serialized request. The file can be generated by calling something like: protoc --proto_path=src/proto/grpc/testing/ \ --encode=grpc.testing.SimpleRequest \ @@ -53,7 +56,7 @@ < input.txt > input.bin If this is used and no proto file is provided in the argument list, the method string has to be exact in the form of /package.service/method. - 4. --output_binary_file, a file to write binary format response into, it can + 7. --output_binary_file, a file to write binary format response into, it can be later decoded using protoc: protoc --proto_path=src/proto/grpc/testing/ \ --decode=grpc.testing.SimpleResponse \ @@ -61,6 +64,7 @@ < output.bin > output.txt */ +#include <unistd.h> #include <fstream> #include <iostream> #include <sstream> @@ -86,6 +90,8 @@ DEFINE_string(output_binary_file, "", DEFINE_string(metadata, "", "Metadata to send to server, in the form of key1:val1:key2:val2"); DEFINE_string(proto_path, ".", "Path to look for the proto file."); +// TODO(zyc): support a list of input proto files +DEFINE_string(protofiles, "", "Name of the proto file."); void ParseMetadataFlag( std::multimap<grpc::string, grpc::string>* client_metadata) { @@ -129,35 +135,61 @@ void PrintMetadata(const T& m, const grpc::string& message) { int main(int argc, char** argv) { grpc::testing::InitTest(&argc, &argv, true); - if (argc < 4 || argc == 5 || grpc::string(argv[1]) != "call") { + if (argc < 4 || grpc::string(argv[1]) != "call") { std::cout << "Usage: grpc_cli call server_host:port method_name " << "[proto file] [text format request] [<options>]" << std::endl; + return 1; } - grpc::string file_name; grpc::string request_text; grpc::string server_address(argv[2]); grpc::string method_name(argv[3]); std::unique_ptr<grpc::testing::ProtoFileParser> parser; grpc::string serialized_request_proto; - if (argc == 6) { - file_name = argv[4]; - // TODO(yangg) read from stdin as well? - request_text = argv[5]; + if (argc == 5) { + request_text = argv[4]; + } + + std::shared_ptr<grpc::ChannelCredentials> creds; + if (!FLAGS_enable_ssl) { + creds = grpc::InsecureChannelCredentials(); + } else { + if (FLAGS_use_auth) { + creds = grpc::GoogleDefaultCredentials(); + } else { + creds = grpc::SslCredentials(grpc::SslCredentialsOptions()); + } } + std::shared_ptr<grpc::Channel> channel = + grpc::CreateChannel(server_address, creds); if (request_text.empty() && FLAGS_input_binary_file.empty()) { - std::cout << "Missing input. Use text format input or " - << "--input_binary_file for serialized request" << std::endl; - return 1; - } else if (!request_text.empty()) { - parser.reset(new grpc::testing::ProtoFileParser(FLAGS_proto_path, file_name, - method_name)); + if (isatty(STDIN_FILENO)) { + std::cout << "reading request message from stdin..." << std::endl; + } + std::stringstream input_stream; + input_stream << std::cin.rdbuf(); + request_text = input_stream.str(); + } + + if (!request_text.empty()) { + if (!FLAGS_protofiles.empty()) { + parser.reset(new grpc::testing::ProtoFileParser( + FLAGS_proto_path, FLAGS_protofiles, method_name)); + } else { + parser.reset(new grpc::testing::ProtoFileParser(channel, method_name)); + } method_name = parser->GetFullMethodName(); if (parser->HasError()) { return 1; } + + if (!FLAGS_input_binary_file.empty()) { + std::cout + << "warning: request given in argv, ignoring --input_binary_file" + << std::endl; + } } if (parser) { @@ -175,19 +207,6 @@ int main(int argc, char** argv) { } std::cout << "connecting to " << server_address << std::endl; - std::shared_ptr<grpc::ChannelCredentials> creds; - if (!FLAGS_enable_ssl) { - creds = grpc::InsecureChannelCredentials(); - } else { - if (FLAGS_use_auth) { - creds = grpc::GoogleDefaultCredentials(); - } else { - creds = grpc::SslCredentials(grpc::SslCredentialsOptions()); - } - } - std::shared_ptr<grpc::Channel> channel = - grpc::CreateChannel(server_address, creds); - grpc::string serialized_response_proto; std::multimap<grpc::string, grpc::string> client_metadata; std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata, @@ -219,7 +238,7 @@ int main(int argc, char** argv) { } } else { std::cout << "Rpc failed with status code " << s.error_code() - << " error message " << s.error_message() << std::endl; + << ", error message: " << s.error_message() << std::endl; } return 0; diff --git a/test/cpp/util/proto_file_parser.cc b/test/cpp/util/proto_file_parser.cc index 25aec329eb..5b0d925e1c 100644 --- a/test/cpp/util/proto_file_parser.cc +++ b/test/cpp/util/proto_file_parser.cc @@ -95,9 +95,48 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path, dynamic_factory_.reset( new google::protobuf::DynamicMessageFactory(importer_->pool())); + std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list; + for (int i = 0; i < file_desc->service_count(); i++) { + service_desc_list.push_back(file_desc->service(i)); + } + InitProtoFileParser(method, service_desc_list); +} + +ProtoFileParser::ProtoFileParser(std::shared_ptr<grpc::Channel> channel, + const grpc::string& method) + : has_error_(false), + desc_db_(new grpc::ProtoReflectionDescriptorDatabase(channel)), + desc_pool_(new google::protobuf::DescriptorPool(desc_db_.get())) { + std::vector<std::string> service_list; + if (!desc_db_->GetServices(&service_list)) { + LogError( + "Failed to get services from the server, " + "it may not have the reflection service.\n" + "Please try to use the --protofiles option to provide a proto file."); + } + if (has_error_) { + return; + } + dynamic_factory_.reset( + new google::protobuf::DynamicMessageFactory(desc_pool_.get())); + + std::vector<const google::protobuf::ServiceDescriptor*> service_desc_list; + for (auto it = service_list.begin(); it != service_list.end(); it++) { + service_desc_list.push_back(desc_pool_->FindServiceByName(*it)); + } + InitProtoFileParser(method, service_desc_list); +} + +ProtoFileParser::~ProtoFileParser() {} + +void ProtoFileParser::InitProtoFileParser( + const grpc::string& method, + const std::vector<const google::protobuf::ServiceDescriptor*> + service_desc_list) { const google::protobuf::MethodDescriptor* method_descriptor = nullptr; - for (int i = 0; !method_descriptor && i < file_desc->service_count(); i++) { - const auto* service_desc = file_desc->service(i); + for (auto it = service_desc_list.begin(); it != service_desc_list.end(); + it++) { + const auto* service_desc = *it; for (int j = 0; j < service_desc->method_count(); j++) { const auto* method_desc = service_desc->method(j); if (MethodNameMatch(method_desc->full_name(), method)) { @@ -130,8 +169,6 @@ ProtoFileParser::ProtoFileParser(const grpc::string& proto_path, dynamic_factory_->GetPrototype(method_descriptor->output_type())->New()); } -ProtoFileParser::~ProtoFileParser() {} - grpc::string ProtoFileParser::GetSerializedProto( const grpc::string& text_format_proto, bool is_request) { grpc::string serialized; @@ -143,7 +180,7 @@ grpc::string ProtoFileParser::GetSerializedProto( LogError("Failed to parse text format to proto."); return ""; } - ok = request_prototype_->SerializeToString(&serialized); + ok = msg->SerializeToString(&serialized); if (!ok) { LogError("Failed to serialize proto."); return ""; diff --git a/test/cpp/util/proto_file_parser.h b/test/cpp/util/proto_file_parser.h index 46cdd66503..b442d77db9 100644 --- a/test/cpp/util/proto_file_parser.h +++ b/test/cpp/util/proto_file_parser.h @@ -38,8 +38,10 @@ #include <google/protobuf/compiler/importer.h> #include <google/protobuf/dynamic_message.h> +#include <grpc++/channel.h> #include "src/compiler/config.h" +#include "test/cpp/util/proto_reflection_descriptor_database.h" namespace grpc { namespace testing { @@ -53,6 +55,9 @@ class ProtoFileParser { // even just Method. It will log an error if there is ambiguity. ProtoFileParser(const grpc::string& proto_path, const grpc::string& file_name, const grpc::string& method); + + ProtoFileParser(std::shared_ptr<grpc::Channel> channel, + const grpc::string& method); ~ProtoFileParser(); grpc::string GetFullMethodName() const { return full_method_name_; } @@ -68,12 +73,18 @@ class ProtoFileParser { void LogError(const grpc::string& error_msg); private: + void InitProtoFileParser( + const grpc::string& method, + const std::vector<const google::protobuf::ServiceDescriptor*> services); + bool has_error_; grpc::string request_text_; grpc::string full_method_name_; google::protobuf::compiler::DiskSourceTree source_tree_; std::unique_ptr<ErrorPrinter> error_printer_; std::unique_ptr<google::protobuf::compiler::Importer> importer_; + std::unique_ptr<grpc::ProtoReflectionDescriptorDatabase> desc_db_; + std::unique_ptr<google::protobuf::DescriptorPool> desc_pool_; std::unique_ptr<google::protobuf::DynamicMessageFactory> dynamic_factory_; std::unique_ptr<grpc::protobuf::Message> request_prototype_; std::unique_ptr<grpc::protobuf::Message> response_prototype_; diff --git a/test/cpp/util/proto_reflection_descriptor_database.cc b/test/cpp/util/proto_reflection_descriptor_database.cc index 25b720aee0..8fd466feb0 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.cc +++ b/test/cpp/util/proto_reflection_descriptor_database.cc @@ -53,10 +53,20 @@ ProtoReflectionDescriptorDatabase::ProtoReflectionDescriptorDatabase( std::shared_ptr<grpc::Channel> channel) : stub_(ServerReflection::NewStub(channel)) {} -ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() {} +ProtoReflectionDescriptorDatabase::~ProtoReflectionDescriptorDatabase() { + if (stream_) { + stream_->WritesDone(); + Status status = stream_->Finish(); + if (!status.ok()) { + gpr_log(GPR_ERROR, + "ServerReflectionInfo rpc failed. Error code: %d, details: %s", + (int)status.error_code(), status.error_message().c_str()); + } + } +} bool ProtoReflectionDescriptorDatabase::FindFileByName( - const string& filename, google::protobuf::FileDescriptorProto* output) { + const string& filename, protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileByName(filename, output)) { return true; } @@ -101,7 +111,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileByName( } bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol( - const string& symbol_name, google::protobuf::FileDescriptorProto* output) { + const string& symbol_name, protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileContainingSymbol(symbol_name, output)) { return true; } @@ -148,7 +158,7 @@ bool ProtoReflectionDescriptorDatabase::FindFileContainingSymbol( bool ProtoReflectionDescriptorDatabase::FindFileContainingExtension( const string& containing_type, int field_number, - google::protobuf::FileDescriptorProto* output) { + protobuf::FileDescriptorProto* output) { if (cached_db_.FindFileContainingExtension(containing_type, field_number, output)) { return true; @@ -276,10 +286,10 @@ bool ProtoReflectionDescriptorDatabase::GetServices( return false; } -const google::protobuf::FileDescriptorProto +const protobuf::FileDescriptorProto ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse( const std::string& byte_fd_proto) { - google::protobuf::FileDescriptorProto file_desc_proto; + protobuf::FileDescriptorProto file_desc_proto; file_desc_proto.ParseFromString(byte_fd_proto); return file_desc_proto; } @@ -287,7 +297,7 @@ ProtoReflectionDescriptorDatabase::ParseFileDescriptorProtoResponse( void ProtoReflectionDescriptorDatabase::AddFileFromResponse( const grpc::reflection::v1alpha::FileDescriptorResponse& response) { for (int i = 0; i < response.file_descriptor_proto_size(); ++i) { - const google::protobuf::FileDescriptorProto file_proto = + const protobuf::FileDescriptorProto file_proto = ParseFileDescriptorProtoResponse(response.file_descriptor_proto(i)); if (known_files_.find(file_proto.name()) == known_files_.end()) { known_files_.insert(file_proto.name()); diff --git a/test/cpp/util/proto_reflection_descriptor_database.h b/test/cpp/util/proto_reflection_descriptor_database.h index 99c00675bb..eb7cf4907d 100644 --- a/test/cpp/util/proto_reflection_descriptor_database.h +++ b/test/cpp/util/proto_reflection_descriptor_database.h @@ -38,19 +38,19 @@ #include <unordered_set> #include <vector> -#include <google/protobuf/descriptor.h> -#include <google/protobuf/descriptor.pb.h> -#include <google/protobuf/descriptor_database.h> +// GRPC_NO_GENERATED_CODE indicates generated pb files should not be used +#ifdef GRPC_NO_GENERATED_CODE +#include "src/proto/grpc/reflection/v1alpha/reflection.grpc.pb.h" +#else #include <grpc++/ext/reflection.grpc.pb.h> +#endif // GRPC_NO_GENERATED_CODE #include <grpc++/grpc++.h> - namespace grpc { // ProtoReflectionDescriptorDatabase takes a stub of ServerReflection and // provides the methods defined by DescriptorDatabase interfaces. It can be used // to feed a DescriptorPool instance. -class ProtoReflectionDescriptorDatabase - : public google::protobuf::DescriptorDatabase { +class ProtoReflectionDescriptorDatabase : public protobuf::DescriptorDatabase { public: explicit ProtoReflectionDescriptorDatabase( std::unique_ptr<reflection::v1alpha::ServerReflection::Stub> stub); @@ -65,14 +65,13 @@ class ProtoReflectionDescriptorDatabase // Find a file by file name. Fills in in *output and returns true if found. // Otherwise, returns false, leaving the contents of *output undefined. bool FindFileByName(const string& filename, - google::protobuf::FileDescriptorProto* output) - GRPC_OVERRIDE; + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Find the file that declares the given fully-qualified symbol name. // If found, fills in *output and returns true, otherwise returns false // and leaves *output undefined. bool FindFileContainingSymbol(const string& symbol_name, - google::protobuf::FileDescriptorProto* output) + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Find the file which defines an extension extending the given message type @@ -81,7 +80,7 @@ class ProtoReflectionDescriptorDatabase // must be a fully-qualified type name. bool FindFileContainingExtension( const string& containing_type, int field_number, - google::protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; + protobuf::FileDescriptorProto* output) GRPC_OVERRIDE; // Finds the tag numbers used by all known extensions of // extendee_type, and appends them to output in an undefined @@ -102,7 +101,7 @@ class ProtoReflectionDescriptorDatabase grpc::reflection::v1alpha::ServerReflectionResponse> ClientStream; - const google::protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse( + const protobuf::FileDescriptorProto ParseFileDescriptorProtoResponse( const std::string& byte_fd_proto); void AddFileFromResponse( @@ -123,7 +122,7 @@ class ProtoReflectionDescriptorDatabase std::unordered_map<string, std::vector<int>> cached_extension_numbers_; std::mutex stream_mutex_; - google::protobuf::SimpleDescriptorDatabase cached_db_; + protobuf::SimpleDescriptorDatabase cached_db_; }; } // namespace grpc diff --git a/test/distrib/python/run_distrib_test.sh b/test/distrib/python/run_distrib_test.sh index 8a983bc248..0c2154838f 100755 --- a/test/distrib/python/run_distrib_test.sh +++ b/test/distrib/python/run_distrib_test.sh @@ -33,34 +33,47 @@ set -ex cd $(dirname $0) # Pick up the source dist archive whatever its version is +SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.tar.gz BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio-*.whl +TOOLS_SDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.tar.gz TOOLS_BDIST_ARCHIVES=$EXTERNAL_GIT_ROOT/input_artifacts/grpcio_tools-*.whl -if [ ! -f ${SDIST_ARCHIVE} ] -then - echo "Archive ${SDIST_ARCHIVE} does not exist." - exit 1 -fi +function make_virtualenv() { + virtualenv $1 + $1/bin/python -m pip install --upgrade six pip + $1/bin/python -m pip install cython +} -PYTHON=python2 -PIP=pip2 -which $PYTHON || PYTHON=python -which $PIP || PIP=pip +function at_least_one_installs() { + for file in "$@"; do + if python -m pip install $file; then + return 0 + fi + done + return -1 +} -# TODO(jtattermusch): this shouldn't be required -# TODO(jtattermusch): run the command twice to workaround docker-on-overlay -# issue https://github.com/docker/docker/issues/12327 -# (first attempt will fail when using docker with overlayFS) -${PIP} install --upgrade six pip || ${PIP} install --upgrade six pip +make_virtualenv bdist_test +make_virtualenv sdist_test -# At least one of the bdist packages has to succeed (whichever one matches the -# test machine, anyway). -for bdist in ${BDIST_ARCHIVES} ${TOOLS_BDIST_ARCHIVES}; do - ($PYTHON -m pip install $bdist) || true -done +# +# Install our distributions in order of dependencies +# + +(source bdist_test/bin/activate && at_least_one_installs ${BDIST_ARCHIVES}) +(source bdist_test/bin/activate && at_least_one_installs ${TOOLS_BDIST_ARCHIVES}) + +(source sdist_test/bin/activate && at_least_one_installs ${SDIST_ARCHIVES}) +(source sdist_test/bin/activate && at_least_one_installs ${TOOLS_SDIST_ARCHIVES}) + +# +# Test our distributions +# # TODO(jtattermusch): add a .proto file to the distribtest, generate python # code from it and then use the generated code from distribtest.py -$PYTHON -m grpc.tools.protoc +(source bdist_test/bin/activate && python -m grpc.tools.protoc --help) +(source sdist_test/bin/activate && python -m grpc.tools.protoc --help) -$PYTHON distribtest.py +(source bdist_test/bin/activate && python distribtest.py) +(source sdist_test/bin/activate && python distribtest.py) |