diff options
author | 2017-10-26 11:27:03 -0700 | |
---|---|---|
committer | 2017-10-26 11:27:03 -0700 | |
commit | 52620b698e980ead64f542bbc68d50aae1f3ae6f (patch) | |
tree | d73fe3d666dcbb9289059a4e4b86ddf675f7d370 /test | |
parent | ee8eb727ea2533a5401a3ee56ec067aa083cf317 (diff) | |
parent | a2465b02f283425b6355707800100a7504a62ee2 (diff) |
Merge branch 'master' into census_update
Diffstat (limited to 'test')
-rw-r--r-- | test/core/security/BUILD | 12 | ||||
-rw-r--r-- | test/core/security/ssl_credentials_test.c | 66 | ||||
-rw-r--r-- | test/core/surface/completion_queue_test.c | 76 | ||||
-rw-r--r-- | test/core/util/mock_endpoint.c | 15 | ||||
-rw-r--r-- | test/core/util/passthru_endpoint.c | 15 | ||||
-rw-r--r-- | test/core/util/trickle_endpoint.c | 21 | ||||
-rw-r--r-- | test/cpp/end2end/async_end2end_test.cc | 111 | ||||
-rw-r--r-- | test/cpp/end2end/grpclb_end2end_test.cc | 10 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_chttp2_hpack.cc | 92 | ||||
-rw-r--r-- | test/cpp/microbenchmarks/bm_chttp2_transport.cc | 18 | ||||
-rw-r--r-- | test/cpp/qps/client.h | 47 | ||||
-rw-r--r-- | test/cpp/qps/client_async.cc | 48 | ||||
-rw-r--r-- | test/cpp/qps/client_sync.cc | 39 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 31 | ||||
-rw-r--r-- | test/cpp/util/error_details_test.cc | 20 |
15 files changed, 481 insertions, 140 deletions
diff --git a/test/core/security/BUILD b/test/core/security/BUILD index dc41759922..83b1747648 100644 --- a/test/core/security/BUILD +++ b/test/core/security/BUILD @@ -91,6 +91,18 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "ssl_credentials_test", + srcs = ["ssl_credentials_test.c"], + language = "C", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ] +) + grpc_cc_binary( name = "create_jwt", srcs = ["create_jwt.c"], diff --git a/test/core/security/ssl_credentials_test.c b/test/core/security/ssl_credentials_test.c new file mode 100644 index 0000000000..3c838faa60 --- /dev/null +++ b/test/core/security/ssl_credentials_test.c @@ -0,0 +1,66 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <stdio.h> +#include <string.h> + +#include <grpc/grpc_security.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/security/credentials/ssl/ssl_credentials.h" +#include "src/core/tsi/ssl_transport_security.h" +#include "test/core/util/test_config.h" + +static void test_convert_grpc_to_tsi_cert_pairs() { + grpc_ssl_pem_key_cert_pair grpc_pairs[] = {{"private_key1", "cert_chain1"}, + {"private_key2", "cert_chain2"}, + {"private_key3", "cert_chain3"}}; + const size_t num_pairs = 3; + + { + tsi_ssl_pem_key_cert_pair *tsi_pairs = + grpc_convert_grpc_to_tsi_cert_pairs(grpc_pairs, 0); + GPR_ASSERT(tsi_pairs == NULL); + } + + { + tsi_ssl_pem_key_cert_pair *tsi_pairs = + grpc_convert_grpc_to_tsi_cert_pairs(grpc_pairs, num_pairs); + + GPR_ASSERT(tsi_pairs != NULL); + for (size_t i = 0; i < num_pairs; i++) { + GPR_ASSERT(strncmp(grpc_pairs[i].private_key, tsi_pairs[i].private_key, + strlen(grpc_pairs[i].private_key)) == 0); + GPR_ASSERT(strncmp(grpc_pairs[i].cert_chain, tsi_pairs[i].cert_chain, + strlen(grpc_pairs[i].cert_chain)) == 0); + } + + grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_pairs, num_pairs); + } +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_init(); + + test_convert_grpc_to_tsi_cert_pairs(); + + grpc_shutdown(); + return 0; +} diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index e6372a379c..e4e4c9f1b2 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -158,6 +158,80 @@ static void test_cq_end_op(void) { } } +static void test_cq_tls_cache_full(void) { + grpc_event ev; + grpc_completion_queue *cc; + grpc_cq_completion completion; + grpc_cq_polling_type polling_types[] = { + GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; + grpc_completion_queue_attributes attr; + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; + void *tag = create_test_tag(); + void *res_tag; + int ok; + + LOG_TEST("test_cq_tls_cache_full"); + + attr.version = 1; + attr.cq_completion_type = GRPC_CQ_NEXT; + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + exec_ctx = init_exec_ctx; // Reset exec_ctx + attr.cq_polling_type = polling_types[i]; + cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, NULL); + + grpc_completion_queue_thread_local_cache_init(cc); + GPR_ASSERT(grpc_cq_begin_op(cc, tag)); + grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, + do_nothing_end_completion, NULL, &completion); + + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); + + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1); + GPR_ASSERT(res_tag == tag); + GPR_ASSERT(ok); + + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); + + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +static void test_cq_tls_cache_empty(void) { + grpc_completion_queue *cc; + grpc_cq_polling_type polling_types[] = { + GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; + grpc_completion_queue_attributes attr; + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; + void *res_tag; + int ok; + + LOG_TEST("test_cq_tls_cache_empty"); + + attr.version = 1; + attr.cq_completion_type = GRPC_CQ_NEXT; + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + exec_ctx = init_exec_ctx; // Reset exec_ctx + attr.cq_polling_type = polling_types[i]; + cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, NULL); + + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); + grpc_completion_queue_thread_local_cache_init(cc); + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } +} + static void test_shutdown_then_next_polling(void) { grpc_cq_polling_type polling_types[] = { GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; @@ -300,6 +374,8 @@ int main(int argc, char **argv) { test_cq_end_op(); test_pluck(); test_pluck_after_shutdown(); + test_cq_tls_cache_full(); + test_cq_tls_cache_empty(); grpc_shutdown(); return 0; } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index bd386b2148..7cae5c045e 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -69,6 +69,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} +static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; @@ -103,8 +107,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) { static int me_get_fd(grpc_endpoint *ep) { return -1; } static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_resource_user, me_get_peer, + me_read, + me_write, + me_add_to_pollset, + me_add_to_pollset_set, + me_delete_from_pollset_set, + me_shutdown, + me_destroy, + me_get_resource_user, + me_get_peer, me_get_fd, }; diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 38a47584d5..1bf2888503 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -107,6 +107,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} +static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { half *m = (half *)ep; @@ -160,8 +164,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) { } static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_resource_user, me_get_peer, + me_read, + me_write, + me_add_to_pollset, + me_add_to_pollset_set, + me_delete_from_pollset_set, + me_shutdown, + me_destroy, + me_get_resource_user, + me_get_peer, me_get_fd, }; diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index fc066f9d80..d761f72297 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -89,6 +89,13 @@ static void te_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set); } +static void te_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { + trickle_endpoint *te = (trickle_endpoint *)ep; + grpc_endpoint_delete_from_pollset_set(exec_ctx, te->wrapped, pollset_set); +} + static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { trickle_endpoint *te = (trickle_endpoint *)ep; @@ -135,10 +142,16 @@ static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&te->mu); } -static const grpc_endpoint_vtable vtable = { - te_read, te_write, te_add_to_pollset, te_add_to_pollset_set, - te_shutdown, te_destroy, te_get_resource_user, te_get_peer, - te_get_fd}; +static const grpc_endpoint_vtable vtable = {te_read, + te_write, + te_add_to_pollset, + te_add_to_pollset_set, + te_delete_from_pollset_set, + te_shutdown, + te_destroy, + te_get_resource_user, + te_get_peer, + te_get_fd}; grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, double bytes_per_second) { diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 2a33e8ae11..b7634d0438 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -99,7 +99,7 @@ class PollingOverrider { class Verifier { public: - explicit Verifier(bool spin) : spin_(spin) {} + explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {} // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { return ExpectUnless(i, expect_ok, false); @@ -142,6 +142,18 @@ class Verifier { return detag(got_tag); } + template <typename T> + CompletionQueue::NextStatus DoOnceThenAsyncNext( + CompletionQueue* cq, void** got_tag, bool* ok, T deadline, + std::function<void(void)> lambda) { + if (lambda_run_) { + return cq->AsyncNext(got_tag, ok, deadline); + } else { + lambda_run_ = true; + return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline); + } + } + // Verify keeps calling Next until all currently set // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } @@ -154,6 +166,7 @@ class Verifier { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { @@ -193,6 +206,47 @@ class Verifier { } } + // This version of Verify stops after a certain deadline, and uses the + // DoThenAsyncNext API + // to call the lambda + void Verify(CompletionQueue* cq, + std::chrono::system_clock::time_point deadline, + std::function<void(void)> lambda) { + if (expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + while (std::chrono::system_clock::now() < deadline) { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + while (!expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + GPR_ASSERT(std::chrono::system_clock::now() < deadline); + auto r = DoOnceThenAsyncNext( + cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::GOT_EVENT); + } + GotTag(got_tag, ok, false); + } + } + } + private: void GotTag(void* got_tag, bool ok, bool ignore_ok) { auto it = expectations_.find(got_tag); @@ -226,6 +280,7 @@ class Verifier { std::map<void*, bool> expectations_; std::map<void*, MaybeExpect> maybe_expectations_; bool spin_; + bool lambda_run_; }; bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) { @@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { EXPECT_TRUE(recv_status.ok()); } +// Test a simple RPC using the async version of Next +TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + std::chrono::system_clock::time_point time_now( + std::chrono::system_clock::now()); + std::chrono::system_clock::time_point time_limit( + std::chrono::system_clock::now() + std::chrono::seconds(10)); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + + auto resp_writer_ptr = &response_writer; + auto lambda_2 = [&, this, resp_writer_ptr]() { + gpr_log(GPR_ERROR, "CALLED"); + service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(), + cq_.get(), tag(2)); + }; + + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit, lambda_2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + auto recv_resp_ptr = &recv_response; + auto status_ptr = &recv_status; + send_response.set_message(recv_request.message()); + auto lambda_3 = [&, this, resp_writer_ptr, send_response]() { + resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); + }; + response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max(), + lambda_3); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // Two pings and a final pong. TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ResetStub(); diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index f73a9c1791..c370302c49 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -332,8 +332,7 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds), - kRequestMessage_("Live long and prosper.") {} + client_load_reporting_interval_seconds) {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -559,7 +558,6 @@ class GrpclbEnd2endTest : public ::testing::Test { std::unique_ptr<std::thread> thread_; }; - const grpc::string kMessage_ = "Live long and prosper."; const grpc::string server_host_; const size_t num_backends_; const size_t num_balancers_; @@ -571,7 +569,7 @@ class GrpclbEnd2endTest : public ::testing::Test { std::vector<ServerThread<BackendService>> backend_servers_; std::vector<ServerThread<BalancerService>> balancer_servers_; grpc_fake_resolver_response_generator* response_generator_; - const grpc::string kRequestMessage_; + const grpc::string kRequestMessage_ = "Live long and prosper."; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -1086,7 +1084,7 @@ TEST_F(SingleBalancerTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); @@ -1210,7 +1208,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index f813bb7b64..bc2157b9f1 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -442,8 +442,7 @@ static void UnrefHeader(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_MDELEM_UNREF(exec_ctx, md); } -template <class Fixture, - void (*OnHeader)(grpc_exec_ctx *, void *, grpc_mdelem) = UnrefHeader> +template <class Fixture, void (*OnHeader)(grpc_exec_ctx *, void *, grpc_mdelem)> static void BM_HpackParserParseHeader(benchmark::State &state) { TrackCounters track_counters; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -794,34 +793,6 @@ static void OnHeaderNew(grpc_exec_ctx *exec_ctx, void *user_data, } } -// Current implementation. -static void OnHeaderOld(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_mdelem md) { - if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) { - grpc_millis *cached_timeout = - static_cast<grpc_millis *>(grpc_mdelem_get_user_data(md, free_timeout)); - grpc_millis timeout; - if (cached_timeout == NULL) { - /* not already parsed: parse it now, and store the result away */ - cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); - if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); - gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); - gpr_free(val); - *cached_timeout = GRPC_MILLIS_INF_FUTURE; - } - timeout = *cached_timeout; - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); - } else { - timeout = *cached_timeout; - } - benchmark::DoNotOptimize(timeout); - GRPC_MDELEM_UNREF(exec_ctx, md); - } else { - GPR_ASSERT(0); - } -} - // Send the same deadline repeatedly class SameDeadline { public: @@ -836,34 +807,49 @@ class SameDeadline { } }; -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>, + UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeClientInitialMetadata); + RepresentativeClientInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - MoreRepresentativeClientInitialMetadata); + MoreRepresentativeClientInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerInitialMetadata); + RepresentativeServerInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerTrailingMetadata); + RepresentativeServerTrailingMetadata, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderOld); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderNew); } // namespace hpack_parser_fixtures diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 3a484bb790..e9f537faa4 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -44,10 +44,16 @@ auto &force_library_initialization = Library::get(); class DummyEndpoint : public grpc_endpoint { public: DummyEndpoint() { - static const grpc_endpoint_vtable my_vtable = { - read, write, add_to_pollset, add_to_pollset_set, - shutdown, destroy, get_resource_user, get_peer, - get_fd}; + static const grpc_endpoint_vtable my_vtable = {read, + write, + add_to_pollset, + add_to_pollset_set, + delete_from_pollset_set, + shutdown, + destroy, + get_resource_user, + get_peer, + get_fd}; grpc_endpoint::vtable = &my_vtable; ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); } @@ -102,6 +108,10 @@ class DummyEndpoint : public grpc_endpoint { static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} + static void delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_resource_user_shutdown(exec_ctx, diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 48c8995666..82c6361abd 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -230,8 +230,6 @@ class Client { } virtual void DestroyMultithreading() = 0; - virtual void InitThreadFunc(size_t thread_idx) = 0; - virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -279,7 +277,6 @@ class Client { : std::bind(&Client::NextIssueTime, this, thread_idx); } - private: class Thread { public: Thread(Client* client, size_t idx) @@ -299,6 +296,16 @@ class Client { MergeStatusHistogram(statuses_, s); } + void UpdateHistogram(HistogramEntry* entry) { + std::lock_guard<std::mutex> g(mu_); + if (entry->value_used()) { + histogram_.Add(entry->value()); + } + if (entry->status_used()) { + statuses_[entry->status()]++; + } + } + private: Thread(const Thread&); Thread& operator=(const Thread&); @@ -314,29 +321,8 @@ class Client { wait_loop++; } - client_->InitThreadFunc(idx_); - - for (;;) { - // run the loop body - HistogramEntry entry; - const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); - // lock, update histogram if needed and see if we're done - std::lock_guard<std::mutex> g(mu_); - if (entry.value_used()) { - histogram_.Add(entry.value()); - } - if (entry.status_used()) { - statuses_[entry.status()]++; - } - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } - if (!thread_still_ok || - static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) { - client_->CompleteThread(); - return; - } - } + client_->ThreadFunc(idx_, this); + client_->CompleteThread(); } std::mutex mu_; @@ -347,6 +333,12 @@ class Client { std::thread impl_; }; + bool ThreadCompleted() { + return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_)); + } + + virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0; + std::vector<std::unique_ptr<Thread>> threads_; std::unique_ptr<UsageTimer> timer_; @@ -433,9 +425,6 @@ class ClientImpl : public Client { !config.security_params().use_test_ca(), std::shared_ptr<CallCredentials>(), args); gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); - GPR_ASSERT(channel_->WaitForConnected( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(300, GPR_TIMESPAN)))); is_inproc_ = false; } else { grpc::string tgt = target; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ed4e0b355..b5c7208664 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,33 +236,47 @@ class AsyncClient : public ClientImpl<StubType, RequestType> { this->EndThreads(); // this needed for resolution } - void InitThreadFunc(size_t thread_idx) override final {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { + void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { void* got_tag; bool ok; - if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + HistogramEntry entry; + HistogramEntry* entry_ptr = &entry; + if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ClientRpcContext* ctx; + std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; + do { + t->UpdateHistogram(entry_ptr); // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + shutdown_mu->lock(); if (shutdown_state_[thread_idx]->shutdown) { ctx->TryCancel(); delete ctx; - return true; - } - if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; + while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + ctx = ClientRpcContext::detag(got_tag); + ctx->TryCancel(); + delete ctx; + } + shutdown_mu->unlock(); + return; } - return true; - } else { - // queue is shutting down, so we must be done - return true; - } + } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + bool next_ok = ok; + if (!ctx->RunNextState(next_ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 94554a46b2..9f20b148eb 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -62,6 +62,25 @@ class SynchronousClient virtual ~SynchronousClient(){}; + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; + + void ThreadFunc(size_t thread_idx, Thread* t) override { + InitThreadFuncImpl(thread_idx); + for (;;) { + // run the loop body + HistogramEntry entry; + const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); + t->UpdateHistogram(&entry); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + } + if (!thread_still_ok || ThreadCompleted()) { + return; + } + } + } + protected: // WaitToIssue returns false if we realize that we need to break out bool WaitToIssue(int thread_idx) { @@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFunc(size_t thread_idx) override {} + void InitThreadFuncImpl(size_t thread_idx) override {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], &responses_[thread_idx]); last_issue_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { return true; @@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromServer(&context_[thread_idx], request_); last_recv_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { double now = UsageTimer::Now(); @@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 776371a2c6..4576be5bb3 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -202,23 +202,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); + if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ServerRpcContext *ctx; + std::mutex *mu_ptr; + do { + ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex); + mu_ptr = &shutdown_state_[thread_idx]->mutex; + mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { + mu_ptr->unlock(); return; } - std::lock_guard<ServerRpcContext> l2(*ctx); - const bool still_going = ctx->RunNextState(ok); - // if this RPC context is done, refresh it - if (!still_going) { - ctx->Reset(); - } - } - return; + } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, mu_ptr]() { + ctx->lock(); + if (!ctx->RunNextState(ok)) { + ctx->Reset(); + } + ctx->unlock(); + mu_ptr->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } class ServerRpcContext { diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc index 69a6876a3f..16a00fb201 100644 --- a/test/cpp/util/error_details_test.cc +++ b/test/cpp/util/error_details_test.cc @@ -82,7 +82,7 @@ TEST(SetTest, NullInput) { TEST(SetTest, OutOfScopeErrorCode) { google::rpc::Status expected; - expected.set_code(20); // Out of scope (DATA_LOSS is 15). + expected.set_code(17); // Out of scope (UNAUTHENTICATED is 16). expected.set_message("I am an error message"); testing::EchoRequest expected_details; expected_details.set_message(grpc::string(100, '\0')); @@ -96,6 +96,24 @@ TEST(SetTest, OutOfScopeErrorCode) { EXPECT_EQ(expected.SerializeAsString(), to.error_details()); } +TEST(SetTest, ValidScopeErrorCode) { + for (int c = StatusCode::OK; c <= StatusCode::UNAUTHENTICATED; c++) { + google::rpc::Status expected; + expected.set_code(c); + expected.set_message("I am an error message"); + testing::EchoRequest expected_details; + expected_details.set_message(grpc::string(100, '\0')); + expected.add_details()->PackFrom(expected_details); + + Status to; + Status s = SetErrorDetails(expected, &to); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(c, to.error_code()); + EXPECT_EQ(expected.message(), to.error_message()); + EXPECT_EQ(expected.SerializeAsString(), to.error_details()); + } +} + } // namespace } // namespace grpc |