diff options
author | Craig Tiller <ctiller@google.com> | 2017-07-12 15:44:10 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-07-12 15:44:10 -0700 |
commit | 1cb30c2764485864d78b226a5fc8a6f1edcb3614 (patch) | |
tree | 198b90a32d0ff905ec0728e9d4a167fbd3e4054a /test | |
parent | 8551e4d6876de66f01f83558433e463f859eef74 (diff) | |
parent | f2e15655bc6eaa4bbe66234946056856ede090da (diff) |
Merge github.com:grpc/grpc into min_stack
Diffstat (limited to 'test')
23 files changed, 389 insertions, 138 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 9454aba136..c3964ca84b 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -63,16 +63,9 @@ static void server_setup_transport(void *ts, grpc_transport *transport) { grpc_exec_ctx_finish(&exec_ctx); } -typedef struct { - grpc_bad_client_client_stream_validator validator; - grpc_slice_buffer incoming; - gpr_event read_done; -} read_args; - static void read_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - read_args *a = arg; - a->validator(&a->incoming); - gpr_event_set(&a->read_done, (void *)1); + gpr_event *read_done = arg; + gpr_event_set(read_done, (void *)1); } void grpc_run_bad_client_test( @@ -159,24 +152,31 @@ void grpc_run_bad_client_test( if (sfd.client != NULL) { // Validate client stream, if requested. if (client_validator != NULL) { - read_args args; - args.validator = client_validator; - grpc_slice_buffer_init(&args.incoming); - gpr_event_init(&args.read_done); - grpc_closure read_done_closure; - GRPC_CLOSURE_INIT(&read_done_closure, read_done, &args, - grpc_schedule_on_exec_ctx); - grpc_endpoint_read(&exec_ctx, sfd.client, &args.incoming, - &read_done_closure); - grpc_exec_ctx_finish(&exec_ctx); gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5); - while (!gpr_event_get(&args.read_done)) { - GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0); - GPR_ASSERT(grpc_completion_queue_next( - a.cq, grpc_timeout_milliseconds_to_deadline(100), NULL) - .type == GRPC_QUEUE_TIMEOUT); + grpc_slice_buffer incoming; + grpc_slice_buffer_init(&incoming); + // We may need to do multiple reads to read the complete server response. + while (true) { + gpr_event read_done_event; + gpr_event_init(&read_done_event); + grpc_closure read_done_closure; + GRPC_CLOSURE_INIT(&read_done_closure, read_done, &read_done_event, + grpc_schedule_on_exec_ctx); + grpc_endpoint_read(&exec_ctx, sfd.client, &incoming, + &read_done_closure); + grpc_exec_ctx_finish(&exec_ctx); + do { + GPR_ASSERT(gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0); + GPR_ASSERT(grpc_completion_queue_next( + a.cq, grpc_timeout_milliseconds_to_deadline(100), NULL) + .type == GRPC_QUEUE_TIMEOUT); + } while (!gpr_event_get(&read_done_event)); + if (client_validator(&incoming)) break; + gpr_log(GPR_INFO, + "client validator failed; trying additional read " + "in case we didn't get all the data"); } - grpc_slice_buffer_destroy_internal(&exec_ctx, &args.incoming); + grpc_slice_buffer_destroy_internal(&exec_ctx, &incoming); } // Shutdown. grpc_endpoint_shutdown( diff --git a/test/core/bad_client/bad_client.h b/test/core/bad_client/bad_client.h index 2bb150d8c2..22f1a3abc7 100644 --- a/test/core/bad_client/bad_client.h +++ b/test/core/bad_client/bad_client.h @@ -20,6 +20,9 @@ #define GRPC_TEST_CORE_BAD_CLIENT_BAD_CLIENT_H #include <grpc/grpc.h> + +#include <stdbool.h> + #include "test/core/util/test_config.h" #define GRPC_BAD_CLIENT_REGISTERED_METHOD "/registered/bar" @@ -29,7 +32,8 @@ typedef void (*grpc_bad_client_server_side_validator)(grpc_server *server, grpc_completion_queue *cq, void *registered_method); -typedef void (*grpc_bad_client_client_stream_validator)( +// Returns false if we need to read more data. +typedef bool (*grpc_bad_client_client_stream_validator)( grpc_slice_buffer *incoming); #define GRPC_BAD_CLIENT_DISCONNECT 1 diff --git a/test/core/bad_client/tests/large_metadata.c b/test/core/bad_client/tests/large_metadata.c index b9a2d97c6f..ca3d234be9 100644 --- a/test/core/bad_client/tests/large_metadata.c +++ b/test/core/bad_client/tests/large_metadata.c @@ -30,6 +30,7 @@ // actually appended to this in a single string, since the string would // be longer than the C99 string literal limit. Instead, we dynamically // construct it by adding the large headers one at a time. + #define PFX_TOO_MUCH_METADATA_FROM_CLIENT_PREFIX_STR \ "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" /* settings frame */ \ "\x00\x00\x00\x04\x00\x00\x00\x00\x00" /* headers: generated from \ @@ -63,15 +64,15 @@ // The number of headers we're adding and the total size of the client // payload. -#define NUM_HEADERS 95 +#define NUM_HEADERS 46 #define PFX_TOO_MUCH_METADATA_FROM_CLIENT_PAYLOAD_SIZE \ ((sizeof(PFX_TOO_MUCH_METADATA_FROM_CLIENT_PREFIX_STR) - 1) + \ (NUM_HEADERS * PFX_TOO_MUCH_METADATA_FROM_CLIENT_HEADER_SIZE) + 1) #define PFX_TOO_MUCH_METADATA_FROM_SERVER_STR \ "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" /* settings frame: sets \ - MAX_HEADER_LIST_SIZE to 16K */ \ - "\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x06\x00\x00\x40\x00" /* headers: \ + MAX_HEADER_LIST_SIZE to 8K */ \ + "\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x06\x00\x00\x20\x00" /* headers: \ generated \ from \ simple_request.headers \ @@ -141,7 +142,7 @@ static void server_verifier_sends_too_much_metadata(grpc_server *server, GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.host, "localhost")); GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo/bar")); - const size_t metadata_value_size = 16 * 1024; + const size_t metadata_value_size = 8 * 1024; grpc_metadata meta; meta.key = grpc_slice_from_static_string("key"); meta.value = grpc_slice_malloc(metadata_value_size); @@ -166,34 +167,41 @@ static void server_verifier_sends_too_much_metadata(grpc_server *server, cq_verifier_destroy(cqv); } -static void client_validator(grpc_slice_buffer *incoming) { +static bool client_validator(grpc_slice_buffer *incoming) { + for (size_t i = 0; i < incoming->count; ++i) { + const char *s = (const char *)GRPC_SLICE_START_PTR(incoming->slices[i]); + char *hex = gpr_dump(s, GRPC_SLICE_LENGTH(incoming->slices[i]), + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_INFO, "RESPONSE SLICE %" PRIdPTR ": %s", i, hex); + gpr_free(hex); + } + // Get last frame from incoming slice buffer. grpc_slice_buffer last_frame_buffer; grpc_slice_buffer_init(&last_frame_buffer); grpc_slice_buffer_trim_end(incoming, 13, &last_frame_buffer); GPR_ASSERT(last_frame_buffer.count == 1); grpc_slice last_frame = last_frame_buffer.slices[0]; + const uint8_t *p = GRPC_SLICE_START_PTR(last_frame); - // Length = 4 - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p++ == 4); - // Frame type (RST_STREAM) - GPR_ASSERT(*p++ == 3); - // Flags - GPR_ASSERT(*p++ == 0); - // Stream ID. - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p++ == 1); - // Payload (error code) - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p++ == 0); - GPR_ASSERT(*p == 0 || *p == 11); + bool success = + // Length == 4 + *p++ != 0 || *p++ != 0 || *p++ != 4 || + // Frame type (RST_STREAM) + *p++ != 3 || + // Flags + *p++ != 0 || + // Stream ID. + *p++ != 0 || *p++ != 0 || *p++ != 0 || *p++ != 1 || + // Payload (error code) + *p++ == 0 || *p++ == 0 || *p++ == 0 || *p == 0 || *p == 11; + + if (!success) { + gpr_log(GPR_INFO, "client expected RST_STREAM frame, not found"); + } grpc_slice_buffer_destroy(&last_frame_buffer); + return success; } int main(int argc, char **argv) { diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index fd8af2f152..7ecd947e1f 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -38,8 +38,10 @@ static grpc_pollset *g_pollset; static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( size_t slice_size, grpc_slice *leftover_slices, size_t leftover_nslices) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - tsi_frame_protector *fake_read_protector = tsi_create_fake_protector(NULL); - tsi_frame_protector *fake_write_protector = tsi_create_fake_protector(NULL); + tsi_frame_protector *fake_read_protector = + tsi_create_fake_frame_protector(NULL); + tsi_frame_protector *fake_write_protector = + tsi_create_fake_frame_protector(NULL); grpc_endpoint_test_fixture f; grpc_endpoint_pair tcp; diff --git a/test/core/support/BUILD b/test/core/support/BUILD index 298eebd9b8..37870d922d 100644 --- a/test/core/support/BUILD +++ b/test/core/support/BUILD @@ -127,6 +127,16 @@ grpc_cc_test( ) grpc_cc_test( + name = "stack_lockfree_test", + srcs = ["stack_lockfree_test.c"], + language = "C", + deps = [ + "//:gpr", + "//test/core/util:gpr_test_util", + ], +) + +grpc_cc_test( name = "string_test", srcs = ["string_test.c"], language = "C", diff --git a/test/core/support/stack_lockfree_test.c b/test/core/support/stack_lockfree_test.c new file mode 100644 index 0000000000..4b1f60ce01 --- /dev/null +++ b/test/core/support/stack_lockfree_test.c @@ -0,0 +1,140 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/stack_lockfree.h" + +#include <stdlib.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include "test/core/util/test_config.h" + +/* max stack size supported */ +#define MAX_STACK_SIZE 65534 + +#define MAX_THREADS 32 + +static void test_serial_sized(size_t size) { + gpr_stack_lockfree *stack = gpr_stack_lockfree_create(size); + size_t i; + size_t j; + + /* First try popping empty */ + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1); + + /* Now add one item and check it */ + gpr_stack_lockfree_push(stack, 3); + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == 3); + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1); + + /* Now add repeatedly more items and check them */ + for (i = 1; i < size; i *= 2) { + for (j = 0; j <= i; j++) { + GPR_ASSERT(gpr_stack_lockfree_push(stack, (int)j) == (j == 0)); + } + for (j = 0; j <= i; j++) { + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == (int)(i - j)); + } + GPR_ASSERT(gpr_stack_lockfree_pop(stack) == -1); + } + + gpr_stack_lockfree_destroy(stack); +} + +static void test_serial() { + size_t i; + for (i = 128; i < MAX_STACK_SIZE; i *= 2) { + test_serial_sized(i); + } + test_serial_sized(MAX_STACK_SIZE); +} + +struct test_arg { + gpr_stack_lockfree *stack; + int stack_size; + int nthreads; + int rank; + int sum; +}; + +static void test_mt_body(void *v) { + struct test_arg *arg = (struct test_arg *)v; + int lo, hi; + int i; + int res; + lo = arg->rank * arg->stack_size / arg->nthreads; + hi = (arg->rank + 1) * arg->stack_size / arg->nthreads; + for (i = lo; i < hi; i++) { + gpr_stack_lockfree_push(arg->stack, i); + if ((res = gpr_stack_lockfree_pop(arg->stack)) != -1) { + arg->sum += res; + } + } + while ((res = gpr_stack_lockfree_pop(arg->stack)) != -1) { + arg->sum += res; + } +} + +static void test_mt_sized(size_t size, int nth) { + gpr_stack_lockfree *stack; + struct test_arg args[MAX_THREADS]; + gpr_thd_id thds[MAX_THREADS]; + int sum; + int i; + gpr_thd_options options = gpr_thd_options_default(); + + stack = gpr_stack_lockfree_create(size); + for (i = 0; i < nth; i++) { + args[i].stack = stack; + args[i].stack_size = (int)size; + args[i].nthreads = nth; + args[i].rank = i; + args[i].sum = 0; + } + gpr_thd_options_set_joinable(&options); + for (i = 0; i < nth; i++) { + GPR_ASSERT(gpr_thd_new(&thds[i], test_mt_body, &args[i], &options)); + } + sum = 0; + for (i = 0; i < nth; i++) { + gpr_thd_join(thds[i]); + sum = sum + args[i].sum; + } + GPR_ASSERT((unsigned)sum == ((unsigned)size * (size - 1)) / 2); + gpr_stack_lockfree_destroy(stack); +} + +static void test_mt() { + size_t size; + int nth; + for (nth = 1; nth < MAX_THREADS; nth++) { + for (size = 128; size < MAX_STACK_SIZE; size *= 2) { + test_mt_sized(size, nth); + } + test_mt_sized(MAX_STACK_SIZE, nth); + } +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + test_serial(); + test_mt(); + return 0; +} diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index c27337aaa8..f9d88d6327 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -93,7 +93,7 @@ static void test_pollset_conversion(void) { attr.cq_polling_type = polling_types[j]; cq = grpc_completion_queue_create( grpc_completion_queue_factory_lookup(&attr), &attr, NULL); - GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq); + GPR_ASSERT(grpc_cq_pollset(cq) != NULL); shutdown_and_destroy(cq); } } diff --git a/test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-minimized-4857057310146560 b/test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-minimized-4857057310146560 new file mode 100644 index 0000000000..1692480d07 --- /dev/null +++ b/test/core/transport/chttp2/hpack_parser_corpus/clusterfuzz-testcase-minimized-4857057310146560 @@ -0,0 +1 @@ +D:path
\ No newline at end of file diff --git a/test/cpp/codegen/codegen_test_full.cc b/test/cpp/codegen/codegen_test_full.cc index 2eacc99d82..98792bde04 100644 --- a/test/cpp/codegen/codegen_test_full.cc +++ b/test/cpp/codegen/codegen_test_full.cc @@ -27,8 +27,8 @@ class CodegenTestFull : public ::testing::Test {}; TEST_F(CodegenTestFull, Init) { grpc::CompletionQueue cq; - void* tag; - bool ok; + void* tag = nullptr; + bool ok = false; cq.AsyncNext(&tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)); ASSERT_FALSE(ok); } diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index b43c27f3f7..f8c768831e 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -39,7 +39,6 @@ namespace grpc { class CompletionQueue; class Channel; -class RpcService; class ServerCompletionQueue; class ServerContext; } // namespace grpc @@ -137,10 +136,10 @@ class ServiceA final { ::grpc::ClientAsyncReader< ::grpc::testing::Response>* AsyncMethodA3Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq, void* tag) override; ::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override; ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override; - const ::grpc::RpcMethod rpcmethod_MethodA1_; - const ::grpc::RpcMethod rpcmethod_MethodA2_; - const ::grpc::RpcMethod rpcmethod_MethodA3_; - const ::grpc::RpcMethod rpcmethod_MethodA4_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA2_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA3_; + const ::grpc::internal::RpcMethod rpcmethod_MethodA4_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -320,7 +319,7 @@ class ServiceA final { public: WithStreamedUnaryMethod_MethodA1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodA1() override { BaseClassMustBeDerivedFromService(this); @@ -341,7 +340,7 @@ class ServiceA final { public: WithSplitStreamingMethod_MethodA3() { ::grpc::Service::MarkMethodStreamed(2, - new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2))); } ~WithSplitStreamingMethod_MethodA3() override { BaseClassMustBeDerivedFromService(this); @@ -387,7 +386,7 @@ class ServiceB final { private: std::shared_ptr< ::grpc::ChannelInterface> channel_; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; - const ::grpc::RpcMethod rpcmethod_MethodB1_; + const ::grpc::internal::RpcMethod rpcmethod_MethodB1_; }; static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions()); @@ -444,7 +443,7 @@ class ServiceB final { public: WithStreamedUnaryMethod_MethodB1() { ::grpc::Service::MarkMethodStreamed(0, - new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); + new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodB1() override { BaseClassMustBeDerivedFromService(this); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index a2a6e36709..7b78071217 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -1752,7 +1752,9 @@ std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking, messages.push_back(big_msg); } - for (auto health_check_service : {false, true}) { + // TODO (sreek) Renable tests with health check service after the issue + // https://github.com/grpc/grpc/issues/11223 is resolved + for (auto health_check_service : {false}) { for (auto cred = credentials_types.begin(); cred != credentials_types.end(); ++cred) { for (auto msg = messages.begin(); msg != messages.end(); msg++) { diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc index f71e557450..6d3f5a9d46 100644 --- a/test/cpp/end2end/client_lb_end2end_test.cc +++ b/test/cpp/end2end/client_lb_end2end_test.cc @@ -21,8 +21,6 @@ #include <mutex> #include <thread> -#include <gtest/gtest.h> - #include <grpc++/channel.h> #include <grpc++/client_context.h> #include <grpc++/create_channel.h> @@ -44,6 +42,8 @@ extern "C" { #include "test/core/util/test_config.h" #include "test/cpp/end2end/test_service_impl.h" +#include <gtest/gtest.h> + using grpc::testing::EchoRequest; using grpc::testing::EchoResponse; using std::chrono::system_clock; @@ -463,6 +463,11 @@ TEST_F(ClientLbEnd2endTest, RoundRobinManyUpdates) { EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } +TEST_F(ClientLbEnd2endTest, RoundRobinConcurrentUpdates) { + // TODO(dgq): replicate the way internal testing exercises the concurrent + // update provisions of RR. +} + TEST_F(ClientLbEnd2endTest, RoundRobinReconnect) { // Start servers and send one RPC per server. const int kNumServers = 1; diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index da1c9b1f15..d72dda3f59 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -437,7 +437,7 @@ class End2endServerTryCancelTest : public End2endTest { auto stream = stub_->ResponseStream(&context, request); int num_msgs_read = 0; - while (num_msgs_read < kNumResponseStreamsMsgs) { + while (num_msgs_read < kServerDefaultResponseStreamsToSend) { if (!stream->Read(&response)) { break; } @@ -463,14 +463,14 @@ class End2endServerTryCancelTest : public End2endTest { case CANCEL_DURING_PROCESSING: // Server cancelled while writing messages. Client must have read less // than or equal to the expected number of messages - EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); + EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); break; case CANCEL_AFTER_PROCESSING: // Even though the Server cancelled after writing all messages, the RPC // may be cancelled before the Client got a chance to read all the // messages. - EXPECT_LE(num_msgs_read, kNumResponseStreamsMsgs); + EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); break; default: { @@ -743,12 +743,10 @@ TEST_P(End2endTest, ResponseStream) { request.set_message("hello"); auto stream = stub_->ResponseStream(&context, request); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "0"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "1"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "2"); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + grpc::to_string(i)); + } EXPECT_FALSE(stream->Read(&response)); Status s = stream->Finish(); @@ -764,12 +762,33 @@ TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { context.AddMetadata(kServerUseCoalescingApi, "1"); auto stream = stub_->ResponseStream(&context, request); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + grpc::to_string(i)); + } + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +// This was added to prevent regression from issue: +// https://github.com/grpc/grpc/issues/11546 +TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerUseCoalescingApi, "1"); + // We will only send one message, forcing everything (init metadata, message, + // trailing) to be coalesced together. + context.AddMetadata(kServerResponseStreamsToSend, "1"); + + auto stream = stub_->ResponseStream(&context, request); EXPECT_TRUE(stream->Read(&response)); EXPECT_EQ(response.message(), request.message() + "0"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "1"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "2"); + EXPECT_FALSE(stream->Read(&response)); Status s = stream->Finish(); @@ -785,20 +804,12 @@ TEST_P(End2endTest, BidiStream) { auto stream = stub_->BidiStream(&context); - request.set_message(msg + "0"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - request.set_message(msg + "1"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - request.set_message(msg + "2"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + request.set_message(msg + grpc::to_string(i)); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + } stream->WritesDone(); EXPECT_FALSE(stream->Read(&response)); @@ -841,6 +852,31 @@ TEST_P(End2endTest, BidiStreamWithCoalescingApi) { EXPECT_TRUE(s.ok()); } +// This was added to prevent regression from issue: +// https://github.com/grpc/grpc/issues/11546 +TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "1"); + context.set_initial_metadata_corked(true); + grpc::string msg("hello"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + // Talk to the two services with the same name but different package names. // The two stubs are created on the same channel. TEST_P(End2endTest, DiffPackageServices) { diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 8a31ab77b2..cb515533ed 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -521,7 +521,7 @@ class SplitResponseStreamDupPkg stream->NextMessageSize(&next_msg_sz); gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); GPR_ASSERT(stream->Read(&req)); - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { resp.set_message(req.message() + grpc::to_string(i) + "_dup"); GPR_ASSERT(stream->Write(resp)); } @@ -561,7 +561,7 @@ class FullySplitStreamedDupPkg stream->NextMessageSize(&next_msg_sz); gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); GPR_ASSERT(stream->Read(&req)); - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { resp.set_message(req.message() + grpc::to_string(i) + "_dup"); GPR_ASSERT(stream->Write(resp)); } @@ -613,7 +613,7 @@ class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService { stream->NextMessageSize(&next_msg_sz); gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); GPR_ASSERT(stream->Read(&req)); - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { resp.set_message(req.message() + grpc::to_string(i) + "_dup"); GPR_ASSERT(stream->Write(resp)); } diff --git a/test/cpp/end2end/round_robin_end2end_test.cc b/test/cpp/end2end/round_robin_end2end_test.cc index 0cb727f0ed..eee32ce85d 100644 --- a/test/cpp/end2end/round_robin_end2end_test.cc +++ b/test/cpp/end2end/round_robin_end2end_test.cc @@ -73,9 +73,12 @@ class RoundRobinEnd2endTest : public ::testing::Test { protected: RoundRobinEnd2endTest() : server_host_("localhost") {} - void StartServers(int num_servers) { - for (int i = 0; i < num_servers; ++i) { - servers_.emplace_back(new ServerData(server_host_)); + void StartServers(size_t num_servers, + std::vector<int> ports = std::vector<int>()) { + for (size_t i = 0; i < num_servers; ++i) { + int port = 0; + if (ports.size() == num_servers) port = ports[i]; + servers_.emplace_back(new ServerData(server_host_, port)); } } @@ -99,15 +102,19 @@ class RoundRobinEnd2endTest : public ::testing::Test { stub_ = grpc::testing::EchoTestService::NewStub(channel_); } - void SendRpc(int num_rpcs) { + void SendRpc(int num_rpcs, bool expect_ok = true) { EchoRequest request; EchoResponse response; request.set_message("Live long and prosper."); for (int i = 0; i < num_rpcs; i++) { ClientContext context; Status status = stub_->Echo(&context, request, &response); - EXPECT_TRUE(status.ok()); - EXPECT_EQ(response.message(), request.message()); + if (expect_ok) { + EXPECT_TRUE(status.ok()); + EXPECT_EQ(response.message(), request.message()); + } else { + EXPECT_FALSE(status.ok()); + } } } @@ -116,8 +123,8 @@ class RoundRobinEnd2endTest : public ::testing::Test { std::unique_ptr<Server> server_; MyTestServiceImpl service_; - explicit ServerData(const grpc::string& server_host) { - port_ = grpc_pick_unused_port_or_die(); + explicit ServerData(const grpc::string& server_host, int port = 0) { + port_ = port > 0 ? port : grpc_pick_unused_port_or_die(); gpr_log(GPR_INFO, "starting server on port %d", port_); std::ostringstream server_address; server_address << server_host << ":" << port_; @@ -176,6 +183,38 @@ TEST_F(RoundRobinEnd2endTest, RoundRobin) { EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); } +TEST_F(RoundRobinEnd2endTest, RoundRobinReconnect) { + // Start servers and send one RPC per server. + const int kNumServers = 1; + std::vector<int> ports; + ports.push_back(grpc_pick_unused_port_or_die()); + StartServers(kNumServers, ports); + ResetStub(true /* round_robin */); + // Send one RPC per backend and make sure they are used in order. + // Note: This relies on the fact that the subchannels are reported in + // state READY in the order in which the addresses are specified, + // which is only true because the backends are all local. + for (size_t i = 0; i < servers_.size(); ++i) { + SendRpc(1); + EXPECT_EQ(1, servers_[i]->service_.request_count()) << "for backend #" << i; + } + // Check LB policy name for the channel. + EXPECT_EQ("round_robin", channel_->GetLoadBalancingPolicyName()); + + // Kill all servers + for (size_t i = 0; i < servers_.size(); ++i) { + servers_[i]->Shutdown(); + } + // Client request should fail. + SendRpc(1, false); + + // Bring servers back up on the same port (we aren't recreating the channel). + StartServers(kNumServers, ports); + + // Client request should succeed. + SendRpc(1); +} + } // namespace } // namespace testing } // namespace grpc diff --git a/test/cpp/end2end/test_service_impl.cc b/test/cpp/end2end/test_service_impl.cc index e1260277b4..4fa98c24f5 100644 --- a/test/cpp/end2end/test_service_impl.cc +++ b/test/cpp/end2end/test_service_impl.cc @@ -239,6 +239,10 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, int server_coalescing_api = GetIntValueFromMetadata( kServerUseCoalescingApi, context->client_metadata(), 0); + int server_responses_to_send = GetIntValueFromMetadata( + kServerResponseStreamsToSend, context->client_metadata(), + kServerDefaultResponseStreamsToSend); + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { ServerTryCancel(context); return Status::CANCELLED; @@ -251,9 +255,9 @@ Status TestServiceImpl::ResponseStream(ServerContext* context, new std::thread(&TestServiceImpl::ServerTryCancel, this, context); } - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < server_responses_to_send; i++) { response.set_message(request->message() + grpc::to_string(i)); - if (i == kNumResponseStreamsMsgs - 1 && server_coalescing_api != 0) { + if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { writer->WriteLast(response, WriteOptions()); } else { writer->Write(response); diff --git a/test/cpp/end2end/test_service_impl.h b/test/cpp/end2end/test_service_impl.h index 52f1b991c7..e485769bb2 100644 --- a/test/cpp/end2end/test_service_impl.h +++ b/test/cpp/end2end/test_service_impl.h @@ -29,7 +29,8 @@ namespace grpc { namespace testing { -const int kNumResponseStreamsMsgs = 3; +const int kServerDefaultResponseStreamsToSend = 3; +const char* const kServerResponseStreamsToSend = "server_responses_to_send"; const char* const kServerCancelAfterReads = "cancel_after_reads"; const char* const kServerTryCancelRequest = "server_try_cancel"; const char* const kDebugInfoTrailerKey = "debug-info-bin"; diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index 18308a2e16..6bb6ad8018 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -69,7 +69,7 @@ BENCHMARK(BM_CreateDestroyCore); static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg, grpc_cq_completion* completion) {} -class DummyTag final : public CompletionQueueTag { +class DummyTag final : public internal::CompletionQueueTag { public: bool FinalizeResult(void** tag, bool* status) override { return true; } }; diff --git a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc index 6100906a9a..aeec7d831b 100644 --- a/test/cpp/microbenchmarks/bm_fullstack_trickle.cc +++ b/test/cpp/microbenchmarks/bm_fullstack_trickle.cc @@ -315,7 +315,7 @@ BENCHMARK(BM_PumpStreamServerToClient_Trickle)->Apply(StreamingTrickleArgs); static void BM_PumpUnbalancedUnary_Trickle(benchmark::State& state) { EchoTestService::AsyncService service; std::unique_ptr<TrickledCHTTP2> fixture(new TrickledCHTTP2( - &service, true, state.range(0) /* req_size */, + &service, false, state.range(0) /* req_size */, state.range(1) /* resp_size */, state.range(2) /* bw in kbit/s */)); EchoRequest send_request; EchoResponse send_response; diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h index 21dffc67be..4b699e0708 100644 --- a/test/cpp/qps/server.h +++ b/test/cpp/qps/server.h @@ -100,33 +100,36 @@ class Server { } } - static void ApplyServerConfig(const ServerConfig& config, - ServerBuilder* builder) { + virtual int GetPollCount() { + // For sync server. + return 0; + } + + protected: + static void ApplyConfigToBuilder(const ServerConfig& config, + ServerBuilder* builder) { if (config.resource_quota_size() > 0) { - builder->SetResourceQuota( - ResourceQuota("QpsServerTest").Resize(config.resource_quota_size())); + builder->SetResourceQuota(ResourceQuota("AsyncQpsServerTest") + .Resize(config.resource_quota_size())); } - - for (auto arg : config.channel_args()) { - switch (arg.value_case()) { + for (const auto& channel_arg : config.channel_args()) { + switch (channel_arg.value_case()) { case ChannelArg::kStrValue: - builder->AddChannelArgument(arg.name(), arg.str_value()); + builder->AddChannelArgument(channel_arg.name(), + channel_arg.str_value()); break; case ChannelArg::kIntValue: - builder->AddChannelArgument(arg.name(), arg.int_value()); + builder->AddChannelArgument(channel_arg.name(), + channel_arg.int_value()); + break; + case ChannelArg::VALUE_NOT_SET: + gpr_log(GPR_ERROR, "Channel arg '%s' does not have a value", + channel_arg.name().c_str()); break; - default: - gpr_log(GPR_ERROR, "Channel arg '%s' ignored due to unknown type", - arg.name().c_str()); } } } - virtual int GetPollCount() { - // For sync server. - return 0; - } - private: int port_; int cores_; diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 3115b5caa7..122976c397 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -99,7 +99,7 @@ class AsyncQpsServerTest final : public grpc::testing::Server { cq_.emplace_back(i % srv_cqs_.size()); } - ApplyServerConfig(config, &builder); + ApplyConfigToBuilder(config, &builder); server_ = builder.BuildAndStart(); diff --git a/test/cpp/qps/server_sync.cc b/test/cpp/qps/server_sync.cc index 09cf9ca299..9954e2c0bf 100644 --- a/test/cpp/qps/server_sync.cc +++ b/test/cpp/qps/server_sync.cc @@ -19,15 +19,12 @@ #include <atomic> #include <thread> -#include <grpc++/resource_quota.h> #include <grpc++/security/server_credentials.h> #include <grpc++/server.h> -#include <grpc++/server_builder.h> #include <grpc++/server_context.h> #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> -#include <grpc/support/log.h> #include "src/proto/grpc/testing/services.grpc.pb.h" #include "test/cpp/qps/server.h" @@ -115,8 +112,8 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service { } private: - static Status ClientPull(ServerContext* context, - ReaderInterface<SimpleRequest>* stream, + template <class R> + static Status ClientPull(ServerContext* context, R* stream, SimpleResponse* response) { SimpleRequest request; while (stream->Read(&request)) { @@ -129,8 +126,8 @@ class BenchmarkServiceImpl final : public BenchmarkService::Service { } return Status::OK; } - static Status ServerPush(ServerContext* context, - WriterInterface<SimpleResponse>* stream, + template <class W> + static Status ServerPush(ServerContext* context, W* stream, const SimpleResponse& response, std::function<bool()> done) { while ((done == nullptr) || !done()) { @@ -166,7 +163,7 @@ class SynchronousServer final : public grpc::testing::Server { Server::CreateServerCredentials(config)); gpr_free(server_address); - ApplyServerConfig(config, &builder); + ApplyConfigToBuilder(config, &builder); builder.RegisterService(&service_); diff --git a/test/cpp/util/grpc_tool_test.cc b/test/cpp/util/grpc_tool_test.cc index 4069c7a91a..dd00581f2b 100644 --- a/test/cpp/util/grpc_tool_test.cc +++ b/test/cpp/util/grpc_tool_test.cc @@ -87,7 +87,7 @@ DECLARE_bool(l); namespace { -const int kNumResponseStreamsMsgs = 3; +const int kServerDefaultResponseStreamsToSend = 3; class TestCliCredentials final : public grpc::testing::CliCredentials { public: @@ -159,7 +159,7 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service { context->AddTrailingMetadata("trailing_key", "trailing_value"); EchoResponse response; - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { response.set_message(request->message() + grpc::to_string(i)); writer->Write(response); } @@ -463,7 +463,7 @@ TEST_F(GrpcToolTest, CallCommandResponseStream) { std::placeholders::_1))); // Expected output: "message: \"Hello{n}\"" - for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + for (int i = 0; i < kServerDefaultResponseStreamsToSend; i++) { grpc::string expected_response_text = "message: \"Hello" + grpc::to_string(i) + "\"\n"; EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(), |