aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Jim King <jsking@google.com>2017-10-26 11:27:03 -0700
committerGravatar GitHub <noreply@github.com>2017-10-26 11:27:03 -0700
commit52620b698e980ead64f542bbc68d50aae1f3ae6f (patch)
treed73fe3d666dcbb9289059a4e4b86ddf675f7d370 /test
parentee8eb727ea2533a5401a3ee56ec067aa083cf317 (diff)
parenta2465b02f283425b6355707800100a7504a62ee2 (diff)
Merge branch 'master' into census_update
Diffstat (limited to 'test')
-rw-r--r--test/core/security/BUILD12
-rw-r--r--test/core/security/ssl_credentials_test.c66
-rw-r--r--test/core/surface/completion_queue_test.c76
-rw-r--r--test/core/util/mock_endpoint.c15
-rw-r--r--test/core/util/passthru_endpoint.c15
-rw-r--r--test/core/util/trickle_endpoint.c21
-rw-r--r--test/cpp/end2end/async_end2end_test.cc111
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc10
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_hpack.cc92
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc18
-rw-r--r--test/cpp/qps/client.h47
-rw-r--r--test/cpp/qps/client_async.cc48
-rw-r--r--test/cpp/qps/client_sync.cc39
-rw-r--r--test/cpp/qps/server_async.cc31
-rw-r--r--test/cpp/util/error_details_test.cc20
15 files changed, 481 insertions, 140 deletions
diff --git a/test/core/security/BUILD b/test/core/security/BUILD
index dc41759922..83b1747648 100644
--- a/test/core/security/BUILD
+++ b/test/core/security/BUILD
@@ -91,6 +91,18 @@ grpc_cc_test(
],
)
+grpc_cc_test(
+ name = "ssl_credentials_test",
+ srcs = ["ssl_credentials_test.c"],
+ language = "C",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ]
+)
+
grpc_cc_binary(
name = "create_jwt",
srcs = ["create_jwt.c"],
diff --git a/test/core/security/ssl_credentials_test.c b/test/core/security/ssl_credentials_test.c
new file mode 100644
index 0000000000..3c838faa60
--- /dev/null
+++ b/test/core/security/ssl_credentials_test.c
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/security/credentials/ssl/ssl_credentials.h"
+#include "src/core/tsi/ssl_transport_security.h"
+#include "test/core/util/test_config.h"
+
+static void test_convert_grpc_to_tsi_cert_pairs() {
+ grpc_ssl_pem_key_cert_pair grpc_pairs[] = {{"private_key1", "cert_chain1"},
+ {"private_key2", "cert_chain2"},
+ {"private_key3", "cert_chain3"}};
+ const size_t num_pairs = 3;
+
+ {
+ tsi_ssl_pem_key_cert_pair *tsi_pairs =
+ grpc_convert_grpc_to_tsi_cert_pairs(grpc_pairs, 0);
+ GPR_ASSERT(tsi_pairs == NULL);
+ }
+
+ {
+ tsi_ssl_pem_key_cert_pair *tsi_pairs =
+ grpc_convert_grpc_to_tsi_cert_pairs(grpc_pairs, num_pairs);
+
+ GPR_ASSERT(tsi_pairs != NULL);
+ for (size_t i = 0; i < num_pairs; i++) {
+ GPR_ASSERT(strncmp(grpc_pairs[i].private_key, tsi_pairs[i].private_key,
+ strlen(grpc_pairs[i].private_key)) == 0);
+ GPR_ASSERT(strncmp(grpc_pairs[i].cert_chain, tsi_pairs[i].cert_chain,
+ strlen(grpc_pairs[i].cert_chain)) == 0);
+ }
+
+ grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_pairs, num_pairs);
+ }
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+
+ test_convert_grpc_to_tsi_cert_pairs();
+
+ grpc_shutdown();
+ return 0;
+}
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index e6372a379c..e4e4c9f1b2 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -158,6 +158,80 @@ static void test_cq_end_op(void) {
}
}
+static void test_cq_tls_cache_full(void) {
+ grpc_event ev;
+ grpc_completion_queue *cc;
+ grpc_cq_completion completion;
+ grpc_cq_polling_type polling_types[] = {
+ GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
+ grpc_completion_queue_attributes attr;
+ grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_exec_ctx exec_ctx;
+ void *tag = create_test_tag();
+ void *res_tag;
+ int ok;
+
+ LOG_TEST("test_cq_tls_cache_full");
+
+ attr.version = 1;
+ attr.cq_completion_type = GRPC_CQ_NEXT;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
+ exec_ctx = init_exec_ctx; // Reset exec_ctx
+ attr.cq_polling_type = polling_types[i];
+ cc = grpc_completion_queue_create(
+ grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
+
+ grpc_completion_queue_thread_local_cache_init(cc);
+ GPR_ASSERT(grpc_cq_begin_op(cc, tag));
+ grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE,
+ do_nothing_end_completion, NULL, &completion);
+
+ ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
+ GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
+
+ GPR_ASSERT(
+ grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
+ GPR_ASSERT(res_tag == tag);
+ GPR_ASSERT(ok);
+
+ ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
+ GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
+
+ shutdown_and_destroy(cc);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+}
+
+static void test_cq_tls_cache_empty(void) {
+ grpc_completion_queue *cc;
+ grpc_cq_polling_type polling_types[] = {
+ GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
+ grpc_completion_queue_attributes attr;
+ grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_exec_ctx exec_ctx;
+ void *res_tag;
+ int ok;
+
+ LOG_TEST("test_cq_tls_cache_empty");
+
+ attr.version = 1;
+ attr.cq_completion_type = GRPC_CQ_NEXT;
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
+ exec_ctx = init_exec_ctx; // Reset exec_ctx
+ attr.cq_polling_type = polling_types[i];
+ cc = grpc_completion_queue_create(
+ grpc_completion_queue_factory_lookup(&attr), &attr, NULL);
+
+ GPR_ASSERT(
+ grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
+ grpc_completion_queue_thread_local_cache_init(cc);
+ GPR_ASSERT(
+ grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
+ shutdown_and_destroy(cc);
+ grpc_exec_ctx_finish(&exec_ctx);
+ }
+}
+
static void test_shutdown_then_next_polling(void) {
grpc_cq_polling_type polling_types[] = {
GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
@@ -300,6 +374,8 @@ int main(int argc, char **argv) {
test_cq_end_op();
test_pluck();
test_pluck_after_shutdown();
+ test_cq_tls_cache_full();
+ test_cq_tls_cache_empty();
grpc_shutdown();
return 0;
}
diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c
index bd386b2148..7cae5c045e 100644
--- a/test/core/util/mock_endpoint.c
+++ b/test/core/util/mock_endpoint.c
@@ -69,6 +69,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
+static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep,
+ grpc_pollset_set *pollset) {}
+
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep;
@@ -103,8 +107,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
static int me_get_fd(grpc_endpoint *ep) { return -1; }
static const grpc_endpoint_vtable vtable = {
- me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
- me_shutdown, me_destroy, me_get_resource_user, me_get_peer,
+ me_read,
+ me_write,
+ me_add_to_pollset,
+ me_add_to_pollset_set,
+ me_delete_from_pollset_set,
+ me_shutdown,
+ me_destroy,
+ me_get_resource_user,
+ me_get_peer,
me_get_fd,
};
diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c
index 38a47584d5..1bf2888503 100644
--- a/test/core/util/passthru_endpoint.c
+++ b/test/core/util/passthru_endpoint.c
@@ -107,6 +107,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
+static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep,
+ grpc_pollset_set *pollset) {}
+
static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
half *m = (half *)ep;
@@ -160,8 +164,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) {
}
static const grpc_endpoint_vtable vtable = {
- me_read, me_write, me_add_to_pollset, me_add_to_pollset_set,
- me_shutdown, me_destroy, me_get_resource_user, me_get_peer,
+ me_read,
+ me_write,
+ me_add_to_pollset,
+ me_add_to_pollset_set,
+ me_delete_from_pollset_set,
+ me_shutdown,
+ me_destroy,
+ me_get_resource_user,
+ me_get_peer,
me_get_fd,
};
diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c
index fc066f9d80..d761f72297 100644
--- a/test/core/util/trickle_endpoint.c
+++ b/test/core/util/trickle_endpoint.c
@@ -89,6 +89,13 @@ static void te_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set);
}
+static void te_delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep,
+ grpc_pollset_set *pollset_set) {
+ trickle_endpoint *te = (trickle_endpoint *)ep;
+ grpc_endpoint_delete_from_pollset_set(exec_ctx, te->wrapped, pollset_set);
+}
+
static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
trickle_endpoint *te = (trickle_endpoint *)ep;
@@ -135,10 +142,16 @@ static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&te->mu);
}
-static const grpc_endpoint_vtable vtable = {
- te_read, te_write, te_add_to_pollset, te_add_to_pollset_set,
- te_shutdown, te_destroy, te_get_resource_user, te_get_peer,
- te_get_fd};
+static const grpc_endpoint_vtable vtable = {te_read,
+ te_write,
+ te_add_to_pollset,
+ te_add_to_pollset_set,
+ te_delete_from_pollset_set,
+ te_shutdown,
+ te_destroy,
+ te_get_resource_user,
+ te_get_peer,
+ te_get_fd};
grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap,
double bytes_per_second) {
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 2a33e8ae11..b7634d0438 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -99,7 +99,7 @@ class PollingOverrider {
class Verifier {
public:
- explicit Verifier(bool spin) : spin_(spin) {}
+ explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {}
// Expect sets the expected ok value for a specific tag
Verifier& Expect(int i, bool expect_ok) {
return ExpectUnless(i, expect_ok, false);
@@ -142,6 +142,18 @@ class Verifier {
return detag(got_tag);
}
+ template <typename T>
+ CompletionQueue::NextStatus DoOnceThenAsyncNext(
+ CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
+ std::function<void(void)> lambda) {
+ if (lambda_run_) {
+ return cq->AsyncNext(got_tag, ok, deadline);
+ } else {
+ lambda_run_ = true;
+ return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
+ }
+ }
+
// Verify keeps calling Next until all currently set
// expected tags are complete
void Verify(CompletionQueue* cq) { Verify(cq, false); }
@@ -154,6 +166,7 @@ class Verifier {
Next(cq, ignore_ok);
}
}
+
// This version of Verify stops after a certain deadline
void Verify(CompletionQueue* cq,
std::chrono::system_clock::time_point deadline) {
@@ -193,6 +206,47 @@ class Verifier {
}
}
+ // This version of Verify stops after a certain deadline, and uses the
+ // DoThenAsyncNext API
+ // to call the lambda
+ void Verify(CompletionQueue* cq,
+ std::chrono::system_clock::time_point deadline,
+ std::function<void(void)> lambda) {
+ if (expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ while (std::chrono::system_clock::now() < deadline) {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::TIMEOUT);
+ }
+ } else {
+ while (!expectations_.empty()) {
+ bool ok;
+ void* got_tag;
+ if (spin_) {
+ for (;;) {
+ GPR_ASSERT(std::chrono::system_clock::now() < deadline);
+ auto r = DoOnceThenAsyncNext(
+ cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda);
+ if (r == CompletionQueue::TIMEOUT) continue;
+ if (r == CompletionQueue::GOT_EVENT) break;
+ gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
+ abort();
+ }
+ } else {
+ EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
+ CompletionQueue::GOT_EVENT);
+ }
+ GotTag(got_tag, ok, false);
+ }
+ }
+ }
+
private:
void GotTag(void* got_tag, bool ok, bool ignore_ok) {
auto it = expectations_.find(got_tag);
@@ -226,6 +280,7 @@ class Verifier {
std::map<void*, bool> expectations_;
std::map<void*, MaybeExpect> maybe_expectations_;
bool spin_;
+ bool lambda_run_;
};
bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
@@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
EXPECT_TRUE(recv_status.ok());
}
+// Test a simple RPC using the async version of Next
+TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message(GetParam().message_content);
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
+
+ std::chrono::system_clock::time_point time_now(
+ std::chrono::system_clock::now());
+ std::chrono::system_clock::time_point time_limit(
+ std::chrono::system_clock::now() + std::chrono::seconds(10));
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+ Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
+
+ auto resp_writer_ptr = &response_writer;
+ auto lambda_2 = [&, this, resp_writer_ptr]() {
+ gpr_log(GPR_ERROR, "CALLED");
+ service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
+ cq_.get(), tag(2));
+ };
+
+ Verifier(GetParam().disable_blocking)
+ .Expect(2, true)
+ .Verify(cq_.get(), time_limit, lambda_2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ auto recv_resp_ptr = &recv_response;
+ auto status_ptr = &recv_status;
+ send_response.set_message(recv_request.message());
+ auto lambda_3 = [&, this, resp_writer_ptr, send_response]() {
+ resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
+ };
+ response_reader->Finish(recv_resp_ptr, status_ptr, tag(4));
+ Verifier(GetParam().disable_blocking)
+ .Expect(3, true)
+ .Expect(4, true)
+ .Verify(cq_.get(), std::chrono::system_clock::time_point::max(),
+ lambda_3);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+}
+
// Two pings and a final pong.
TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index f73a9c1791..c370302c49 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -332,8 +332,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
num_backends_(num_backends),
num_balancers_(num_balancers),
client_load_reporting_interval_seconds_(
- client_load_reporting_interval_seconds),
- kRequestMessage_("Live long and prosper.") {}
+ client_load_reporting_interval_seconds) {}
void SetUp() override {
response_generator_ = grpc_fake_resolver_response_generator_create();
@@ -559,7 +558,6 @@ class GrpclbEnd2endTest : public ::testing::Test {
std::unique_ptr<std::thread> thread_;
};
- const grpc::string kMessage_ = "Live long and prosper.";
const grpc::string server_host_;
const size_t num_backends_;
const size_t num_balancers_;
@@ -571,7 +569,7 @@ class GrpclbEnd2endTest : public ::testing::Test {
std::vector<ServerThread<BackendService>> backend_servers_;
std::vector<ServerThread<BalancerService>> balancer_servers_;
grpc_fake_resolver_response_generator* response_generator_;
- const grpc::string kRequestMessage_;
+ const grpc::string kRequestMessage_ = "Live long and prosper.";
};
class SingleBalancerTest : public GrpclbEnd2endTest {
@@ -1086,7 +1084,7 @@ TEST_F(SingleBalancerTest, Drop) {
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
+ EXPECT_EQ(response.message(), kRequestMessage_);
}
}
EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
@@ -1210,7 +1208,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) {
} else {
EXPECT_TRUE(status.ok()) << "code=" << status.error_code()
<< " message=" << status.error_message();
- EXPECT_EQ(response.message(), kMessage_);
+ EXPECT_EQ(response.message(), kRequestMessage_);
}
}
EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops);
diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
index f813bb7b64..bc2157b9f1 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc
@@ -442,8 +442,7 @@ static void UnrefHeader(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_MDELEM_UNREF(exec_ctx, md);
}
-template <class Fixture,
- void (*OnHeader)(grpc_exec_ctx *, void *, grpc_mdelem) = UnrefHeader>
+template <class Fixture, void (*OnHeader)(grpc_exec_ctx *, void *, grpc_mdelem)>
static void BM_HpackParserParseHeader(benchmark::State &state) {
TrackCounters track_counters;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -794,34 +793,6 @@ static void OnHeaderNew(grpc_exec_ctx *exec_ctx, void *user_data,
}
}
-// Current implementation.
-static void OnHeaderOld(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_mdelem md) {
- if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) {
- grpc_millis *cached_timeout =
- static_cast<grpc_millis *>(grpc_mdelem_get_user_data(md, free_timeout));
- grpc_millis timeout;
- if (cached_timeout == NULL) {
- /* not already parsed: parse it now, and store the result away */
- cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis));
- if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) {
- char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
- gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val);
- gpr_free(val);
- *cached_timeout = GRPC_MILLIS_INF_FUTURE;
- }
- timeout = *cached_timeout;
- grpc_mdelem_set_user_data(md, free_timeout, cached_timeout);
- } else {
- timeout = *cached_timeout;
- }
- benchmark::DoNotOptimize(timeout);
- GRPC_MDELEM_UNREF(exec_ctx, md);
- } else {
- GPR_ASSERT(0);
- }
-}
-
// Send the same deadline repeatedly
class SameDeadline {
public:
@@ -836,34 +807,49 @@ class SameDeadline {
}
};
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch, UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem, UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>,
+ UnrefHeader);
+BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>,
+ UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- RepresentativeClientInitialMetadata);
+ RepresentativeClientInitialMetadata, UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- MoreRepresentativeClientInitialMetadata);
+ MoreRepresentativeClientInitialMetadata, UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- RepresentativeServerInitialMetadata);
+ RepresentativeServerInitialMetadata, UnrefHeader);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader,
- RepresentativeServerTrailingMetadata);
+ RepresentativeServerTrailingMetadata, UnrefHeader);
-BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderOld);
BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderNew);
} // namespace hpack_parser_fixtures
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 3a484bb790..e9f537faa4 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -44,10 +44,16 @@ auto &force_library_initialization = Library::get();
class DummyEndpoint : public grpc_endpoint {
public:
DummyEndpoint() {
- static const grpc_endpoint_vtable my_vtable = {
- read, write, add_to_pollset, add_to_pollset_set,
- shutdown, destroy, get_resource_user, get_peer,
- get_fd};
+ static const grpc_endpoint_vtable my_vtable = {read,
+ write,
+ add_to_pollset,
+ add_to_pollset_set,
+ delete_from_pollset_set,
+ shutdown,
+ destroy,
+ get_resource_user,
+ get_peer,
+ get_fd};
grpc_endpoint::vtable = &my_vtable;
ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint");
}
@@ -102,6 +108,10 @@ class DummyEndpoint : public grpc_endpoint {
static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset) {}
+ static void delete_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_endpoint *ep,
+ grpc_pollset_set *pollset) {}
+
static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_resource_user_shutdown(exec_ctx,
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 48c8995666..82c6361abd 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -230,8 +230,6 @@ class Client {
}
virtual void DestroyMultithreading() = 0;
- virtual void InitThreadFunc(size_t thread_idx) = 0;
- virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0;
void SetupLoadTest(const ClientConfig& config, size_t num_threads) {
// Set up the load distribution based on the number of threads
@@ -279,7 +277,6 @@ class Client {
: std::bind(&Client::NextIssueTime, this, thread_idx);
}
- private:
class Thread {
public:
Thread(Client* client, size_t idx)
@@ -299,6 +296,16 @@ class Client {
MergeStatusHistogram(statuses_, s);
}
+ void UpdateHistogram(HistogramEntry* entry) {
+ std::lock_guard<std::mutex> g(mu_);
+ if (entry->value_used()) {
+ histogram_.Add(entry->value());
+ }
+ if (entry->status_used()) {
+ statuses_[entry->status()]++;
+ }
+ }
+
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
@@ -314,29 +321,8 @@ class Client {
wait_loop++;
}
- client_->InitThreadFunc(idx_);
-
- for (;;) {
- // run the loop body
- HistogramEntry entry;
- const bool thread_still_ok = client_->ThreadFunc(&entry, idx_);
- // lock, update histogram if needed and see if we're done
- std::lock_guard<std::mutex> g(mu_);
- if (entry.value_used()) {
- histogram_.Add(entry.value());
- }
- if (entry.status_used()) {
- statuses_[entry.status()]++;
- }
- if (!thread_still_ok) {
- gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
- }
- if (!thread_still_ok ||
- static_cast<bool>(gpr_atm_acq_load(&client_->thread_pool_done_))) {
- client_->CompleteThread();
- return;
- }
- }
+ client_->ThreadFunc(idx_, this);
+ client_->CompleteThread();
}
std::mutex mu_;
@@ -347,6 +333,12 @@ class Client {
std::thread impl_;
};
+ bool ThreadCompleted() {
+ return static_cast<bool>(gpr_atm_acq_load(&thread_pool_done_));
+ }
+
+ virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0;
+
std::vector<std::unique_ptr<Thread>> threads_;
std::unique_ptr<UsageTimer> timer_;
@@ -433,9 +425,6 @@ class ClientImpl : public Client {
!config.security_params().use_test_ca(),
std::shared_ptr<CallCredentials>(), args);
gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
- GPR_ASSERT(channel_->WaitForConnected(
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(300, GPR_TIMESPAN))));
is_inproc_ = false;
} else {
grpc::string tgt = target;
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index 9ed4e0b355..b5c7208664 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -236,33 +236,47 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
this->EndThreads(); // this needed for resolution
}
- void InitThreadFunc(size_t thread_idx) override final {}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final {
+ void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
void* got_tag;
bool ok;
- if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ HistogramEntry entry;
+ HistogramEntry* entry_ptr = &entry;
+ if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ClientRpcContext* ctx;
+ std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
+ do {
+ t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it
- ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
+ ctx = ClientRpcContext::detag(got_tag);
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ shutdown_mu->lock();
if (shutdown_state_[thread_idx]->shutdown) {
ctx->TryCancel();
delete ctx;
- return true;
- }
- if (!ctx->RunNextState(ok, entry)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- delete ctx;
+ while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ ctx = ClientRpcContext::detag(got_tag);
+ ctx->TryCancel();
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ return;
}
- return true;
- } else {
- // queue is shutting down, so we must be done
- return true;
- }
+ } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, entry_ptr, shutdown_mu]() {
+ bool next_ok = ok;
+ if (!ctx->RunNextState(next_ok, entry_ptr)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc
index 94554a46b2..9f20b148eb 100644
--- a/test/cpp/qps/client_sync.cc
+++ b/test/cpp/qps/client_sync.cc
@@ -62,6 +62,25 @@ class SynchronousClient
virtual ~SynchronousClient(){};
+ virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
+ virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
+
+ void ThreadFunc(size_t thread_idx, Thread* t) override {
+ InitThreadFuncImpl(thread_idx);
+ for (;;) {
+ // run the loop body
+ HistogramEntry entry;
+ const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
+ t->UpdateHistogram(&entry);
+ if (!thread_still_ok) {
+ gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
+ }
+ if (!thread_still_ok || ThreadCompleted()) {
+ return;
+ }
+ }
+ }
+
protected:
// WaitToIssue returns false if we realize that we need to break out
bool WaitToIssue(int thread_idx) {
@@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient {
}
~SynchronousUnaryClient() {}
- void InitThreadFunc(size_t thread_idx) override {}
+ void InitThreadFuncImpl(size_t thread_idx) override {}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
}
@@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
messages_issued_[thread_idx] = 0;
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
if (!WaitToIssue(thread_idx)) {
return true;
}
@@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
&responses_[thread_idx]);
last_issue_[thread_idx] = UsageTimer::Now();
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// Figure out how to make histogram sensible if this is rate-paced
if (!WaitToIssue(thread_idx)) {
return true;
@@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final
public:
SynchronousStreamingFromServerClient(const ClientConfig& config)
: SynchronousStreamingClient(config), last_recv_(num_threads_) {}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] =
stub->StreamingFromServer(&context_[thread_idx], request_);
last_recv_[thread_idx] = UsageTimer::Now();
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
double now = UsageTimer::Now();
@@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final
}
}
- void InitThreadFunc(size_t thread_idx) override {
+ void InitThreadFuncImpl(size_t thread_idx) override {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
}
- bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override {
+ bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
// TODO (vjpai): Do this
return true;
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 776371a2c6..4576be5bb3 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -202,23 +202,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
- while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
- ServerRpcContext *ctx = detag(got_tag);
+ if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
+ return;
+ }
+ ServerRpcContext *ctx;
+ std::mutex *mu_ptr;
+ do {
+ ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- std::lock_guard<std::mutex> l(shutdown_state_[thread_idx]->mutex);
+ mu_ptr = &shutdown_state_[thread_idx]->mutex;
+ mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) {
+ mu_ptr->unlock();
return;
}
- std::lock_guard<ServerRpcContext> l2(*ctx);
- const bool still_going = ctx->RunNextState(ok);
- // if this RPC context is done, refresh it
- if (!still_going) {
- ctx->Reset();
- }
- }
- return;
+ } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, mu_ptr]() {
+ ctx->lock();
+ if (!ctx->RunNextState(ok)) {
+ ctx->Reset();
+ }
+ ctx->unlock();
+ mu_ptr->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
}
class ServerRpcContext {
diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc
index 69a6876a3f..16a00fb201 100644
--- a/test/cpp/util/error_details_test.cc
+++ b/test/cpp/util/error_details_test.cc
@@ -82,7 +82,7 @@ TEST(SetTest, NullInput) {
TEST(SetTest, OutOfScopeErrorCode) {
google::rpc::Status expected;
- expected.set_code(20); // Out of scope (DATA_LOSS is 15).
+ expected.set_code(17); // Out of scope (UNAUTHENTICATED is 16).
expected.set_message("I am an error message");
testing::EchoRequest expected_details;
expected_details.set_message(grpc::string(100, '\0'));
@@ -96,6 +96,24 @@ TEST(SetTest, OutOfScopeErrorCode) {
EXPECT_EQ(expected.SerializeAsString(), to.error_details());
}
+TEST(SetTest, ValidScopeErrorCode) {
+ for (int c = StatusCode::OK; c <= StatusCode::UNAUTHENTICATED; c++) {
+ google::rpc::Status expected;
+ expected.set_code(c);
+ expected.set_message("I am an error message");
+ testing::EchoRequest expected_details;
+ expected_details.set_message(grpc::string(100, '\0'));
+ expected.add_details()->PackFrom(expected_details);
+
+ Status to;
+ Status s = SetErrorDetails(expected, &to);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(c, to.error_code());
+ EXPECT_EQ(expected.message(), to.error_message());
+ EXPECT_EQ(expected.SerializeAsString(), to.error_details());
+ }
+}
+
} // namespace
} // namespace grpc