diff options
Diffstat (limited to 'test')
20 files changed, 575 insertions, 496 deletions
diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 18d6bcec06..17f37d6719 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -166,7 +166,8 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(2))); + f.server_cq, + tag(2))); cq_expect_completion(v_server, tag(2), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c index 123c8bc415..51486cc169 100644 --- a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c +++ b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c @@ -79,7 +79,7 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown(f->server); + /* don't shutdown, just destroy, to tickle this code edge */ grpc_server_destroy(f->server); f->server = NULL; } diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index 940e327d22..a71e3a7736 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -175,7 +175,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index 80cb629542..f7394a25b0 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -168,7 +168,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index b07f51da85..be4beb537a 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -162,7 +162,8 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c index e547604619..637a9ffe1c 100644 --- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c @@ -169,7 +169,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index eb6180c399..fff83dcbf0 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -166,7 +166,8 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 2bf0fa3717..2b520046bb 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -157,7 +157,8 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 80763fe6cd..ac248f9be0 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -144,7 +144,8 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, &call_details, &request_metadata_recv, - f->server_cq, tag(101))); + f->server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 968be74cfb..ab03479586 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -150,7 +150,8 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index e906f302cf..ae6994ef07 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -66,7 +66,7 @@ static void test_no_op(void) { static void test_no_op_with_start(void) { grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST(); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start(void) { GPR_ASSERT( grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); grpc_tcp_server_destroy(s); } @@ -120,7 +120,7 @@ static void test_connect(int n) { GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0); GPR_ASSERT(addr_len <= sizeof(addr)); - grpc_tcp_server_start(s, NULL, on_connect, NULL); + grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); for (i = 0; i < n; i++) { deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000)); diff --git a/test/core/statistics/census_log_tests.c b/test/core/statistics/census_log_tests.c index c7b2b2e46d..e2ad78a6f2 100644 --- a/test/core/statistics/census_log_tests.c +++ b/test/core/statistics/census_log_tests.c @@ -35,7 +35,7 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#include "src/core/support/cpu.h" +#include <grpc/support/cpu.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/sync.h> diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc new file mode 100644 index 0000000000..7e827cb0e5 --- /dev/null +++ b/test/cpp/end2end/async_end2end_test.cc @@ -0,0 +1,528 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <chrono> +#include <memory> + +#include "test/core/util/test_config.h" +#include "test/cpp/util/echo_duplicate.pb.h" +#include "test/cpp/util/echo.pb.h" +#include "src/cpp/util/time.h" +#include <grpc++/channel_arguments.h> +#include <grpc++/channel_interface.h> +#include <grpc++/client_context.h> +#include <grpc++/create_channel.h> +#include <grpc++/credentials.h> +#include <grpc++/server.h> +#include <grpc++/server_builder.h> +#include <grpc++/server_context.h> +#include <grpc++/status.h> +#include <grpc++/stream.h> +#include "test/core/util/port.h" +#include <gtest/gtest.h> + +#include <grpc/grpc.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> + +using grpc::cpp::test::util::EchoRequest; +using grpc::cpp::test::util::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { + +namespace { + +void* tag(int i) { + return (void*)(gpr_intptr)i; +} + +void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { + bool ok; + void* got_tag; + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + EXPECT_EQ(expect_ok, ok); + EXPECT_EQ(tag(i), got_tag); +} + +class AsyncEnd2endTest : public ::testing::Test { + protected: + AsyncEnd2endTest() : service_(&srv_cq_) {} + + void SetUp() override { + int port = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port; + // Setup server + ServerBuilder builder; + builder.AddPort(server_address_.str()); + builder.RegisterAsyncService(&service_); + server_ = builder.BuildAndStart(); + } + + void TearDown() override { server_->Shutdown(); } + + void ResetStub() { + std::shared_ptr<ChannelInterface> channel = + CreateChannel(server_address_.str(), ChannelArguments()); + stub_.reset(grpc::cpp::test::util::TestService::NewStub(channel)); + } + + void server_ok(int i) { + verify_ok(&srv_cq_, i, true); + } + void client_ok(int i) { + verify_ok(&cli_cq_, i , true); + } + void server_fail(int i) { + verify_ok(&srv_cq_, i, false); + } + void client_fail(int i) { + verify_ok(&cli_cq_, i, false); + } + + void SendRpc(int num_rpcs) { + for (int i = 0; i < num_rpcs; i++) { + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(3)); + + server_ok(3); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); + } + } + + CompletionQueue cli_cq_; + CompletionQueue srv_cq_; + std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; + std::unique_ptr<Server> server_; + grpc::cpp::test::util::TestService::AsyncService service_; + std::ostringstream server_address_; +}; + +TEST_F(AsyncEnd2endTest, SimpleRpc) { + ResetStub(); + SendRpc(1); +} + +TEST_F(AsyncEnd2endTest, SequentialRpcs) { + ResetStub(); + SendRpc(10); +} + +// Two pings and a final pong. +TEST_F(AsyncEnd2endTest, SimpleClientStreaming) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message("Hello"); + std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream( + stub_->RequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1))); + + service_.RequestRequestStream( + &srv_ctx, &srv_stream, &srv_cq_, tag(2)); + + server_ok(2); + client_ok(1); + + cli_stream->Write(send_request, tag(3)); + client_ok(3); + + srv_stream.Read(&recv_request, tag(4)); + server_ok(4); + EXPECT_EQ(send_request.message(), recv_request.message()); + + cli_stream->Write(send_request, tag(5)); + client_ok(5); + + srv_stream.Read(&recv_request, tag(6)); + server_ok(6); + + EXPECT_EQ(send_request.message(), recv_request.message()); + cli_stream->WritesDone(tag(7)); + client_ok(7); + + srv_stream.Read(&recv_request, tag(8)); + server_fail(8); + + send_response.set_message(recv_request.message()); + srv_stream.Finish(send_response, Status::OK, tag(9)); + server_ok(9); + + cli_stream->Finish(&recv_status, tag(10)); + client_ok(10); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); +} + +// One ping, two pongs. +TEST_F(AsyncEnd2endTest, SimpleServerStreaming) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx); + + send_request.set_message("Hello"); + std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream( + stub_->ResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1))); + + service_.RequestResponseStream( + &srv_ctx, &recv_request, &srv_stream, &srv_cq_, tag(2)); + + server_ok(2); + client_ok(1); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(3)); + server_ok(3); + + cli_stream->Read(&recv_response, tag(4)); + client_ok(4); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.Write(send_response, tag(5)); + server_ok(5); + + cli_stream->Read(&recv_response, tag(6)); + client_ok(6); + EXPECT_EQ(send_response.message(), recv_response.message()); + + srv_stream.Finish(Status::OK, tag(7)); + server_ok(7); + + cli_stream->Read(&recv_response, tag(8)); + client_fail(8); + + cli_stream->Finish(&recv_status, tag(9)); + client_ok(9); + + EXPECT_TRUE(recv_status.IsOk()); +} + +// One ping, one pong. +TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + ServerContext srv_ctx; + ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); + + send_request.set_message("Hello"); + std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> > + cli_stream(stub_->BidiStream(&cli_ctx, &cli_cq_, tag(1))); + + service_.RequestBidiStream( + &srv_ctx, &srv_stream, &srv_cq_, tag(2)); + + server_ok(2); + client_ok(1); + + cli_stream->Write(send_request, tag(3)); + client_ok(3); + + srv_stream.Read(&recv_request, tag(4)); + server_ok(4); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + srv_stream.Write(send_response, tag(5)); + server_ok(5); + + cli_stream->Read(&recv_response, tag(6)); + client_ok(6); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->WritesDone(tag(7)); + client_ok(7); + + srv_stream.Read(&recv_request, tag(8)); + server_fail(8); + + srv_stream.Finish(Status::OK, tag(9)); + server_ok(9); + + cli_stream->Finish(&recv_status, tag(10)); + client_ok(10); + + EXPECT_TRUE(recv_status.IsOk()); +} + +// Metadata tests +TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::pair<grpc::string, grpc::string> meta1("key1", "val1"); + std::pair<grpc::string, grpc::string> meta2("key2", "val2"); + cli_ctx.AddMetadata(meta1.first, meta1.second); + cli_ctx.AddMetadata(meta2.first, meta2.second); + + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + auto client_initial_metadata = srv_ctx.client_metadata(); + EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); + EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); + EXPECT_EQ(2, client_initial_metadata.size()); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(3)); + + server_ok(3); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); +} + +TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::pair<grpc::string, grpc::string> meta1("key1", "val1"); + std::pair<grpc::string, grpc::string> meta2("key2", "val2"); + + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + srv_ctx.AddInitialMetadata(meta1.first, meta1.second); + srv_ctx.AddInitialMetadata(meta2.first, meta2.second); + response_writer.SendInitialMetadata(tag(3)); + server_ok(3); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(4)); + + server_ok(4); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); + auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); + EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second); + EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second); + EXPECT_EQ(2, server_initial_metadata.size()); +} + +TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::pair<grpc::string, grpc::string> meta1("key1", "val1"); + std::pair<grpc::string, grpc::string> meta2("key2", "val2"); + + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + response_writer.SendInitialMetadata(tag(3)); + server_ok(3); + + send_response.set_message(recv_request.message()); + srv_ctx.AddTrailingMetadata(meta1.first, meta1.second); + srv_ctx.AddTrailingMetadata(meta2.first, meta2.second); + response_writer.Finish(send_response, Status::OK, tag(4)); + + server_ok(4); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); + auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); + EXPECT_EQ(meta1.second, server_trailing_metadata.find(meta1.first)->second); + EXPECT_EQ(meta2.second, server_trailing_metadata.find(meta2.first)->second); + EXPECT_EQ(2, server_trailing_metadata.size()); +} + +TEST_F(AsyncEnd2endTest, MetadataRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("Hello"); + std::pair<grpc::string, grpc::string> meta1("key1", "val1"); + std::pair<grpc::string, grpc::string> meta2("key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13}); + std::pair<grpc::string, grpc::string> meta3("key3", "val3"); + std::pair<grpc::string, grpc::string> meta6("key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14}); + std::pair<grpc::string, grpc::string> meta5("key5", "val5"); + std::pair<grpc::string, grpc::string> meta4("key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15}); + + cli_ctx.AddMetadata(meta1.first, meta1.second); + cli_ctx.AddMetadata(meta2.first, meta2.second); + + stub_->Echo( + &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1)); + + service_.RequestEcho( + &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2)); + server_ok(2); + EXPECT_EQ(send_request.message(), recv_request.message()); + auto client_initial_metadata = srv_ctx.client_metadata(); + EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second); + EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second); + EXPECT_EQ(2, client_initial_metadata.size()); + + srv_ctx.AddInitialMetadata(meta3.first, meta3.second); + srv_ctx.AddInitialMetadata(meta4.first, meta4.second); + response_writer.SendInitialMetadata(tag(3)); + server_ok(3); + + send_response.set_message(recv_request.message()); + srv_ctx.AddTrailingMetadata(meta5.first, meta5.second); + srv_ctx.AddTrailingMetadata(meta6.first, meta6.second); + response_writer.Finish(send_response, Status::OK, tag(4)); + + server_ok(4); + + client_ok(1); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.IsOk()); + auto server_initial_metadata = cli_ctx.GetServerInitialMetadata(); + EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second); + EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second); + EXPECT_EQ(2, server_initial_metadata.size()); + auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata(); + EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second); + EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second); + EXPECT_EQ(2, server_trailing_metadata.size()); +} +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + grpc_init(); + ::testing::InitGoogleTest(&argc, argv); + int result = RUN_ALL_TESTS(); + grpc_shutdown(); + google::protobuf::ShutdownProtobufLibrary(); + return result; +} diff --git a/test/cpp/end2end/async_test_server.cc b/test/cpp/end2end/async_test_server.cc deleted file mode 100644 index f18b6c00bc..0000000000 --- a/test/cpp/end2end/async_test_server.cc +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "test/cpp/end2end/async_test_server.h" - -#include <chrono> - -#include <grpc/support/log.h> -#include "src/cpp/proto/proto_utils.h" -#include "test/cpp/util/echo.pb.h" -#include <grpc++/async_server.h> -#include <grpc++/async_server_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/status.h> -#include <gtest/gtest.h> - -using grpc::cpp::test::util::EchoRequest; -using grpc::cpp::test::util::EchoResponse; - -using std::chrono::duration_cast; -using std::chrono::microseconds; -using std::chrono::seconds; -using std::chrono::system_clock; - -namespace grpc { -namespace testing { - -AsyncTestServer::AsyncTestServer() : server_(&cq_), cq_drained_(false) {} - -AsyncTestServer::~AsyncTestServer() {} - -void AsyncTestServer::AddPort(const grpc::string& addr) { - server_.AddPort(addr); -} - -void AsyncTestServer::Start() { server_.Start(); } - -// Return true if deadline actual is within 0.5s from expected. -bool DeadlineMatched(const system_clock::time_point& actual, - const system_clock::time_point& expected) { - microseconds diff_usecs = duration_cast<microseconds>(expected - actual); - gpr_log(GPR_INFO, "diff_usecs= %d", diff_usecs.count()); - return diff_usecs.count() < 500000 && diff_usecs.count() > -500000; -} - -void AsyncTestServer::RequestOneRpc() { server_.RequestOneRpc(); } - -void AsyncTestServer::MainLoop() { - EchoRequest request; - EchoResponse response; - void* tag = nullptr; - - RequestOneRpc(); - - while (true) { - CompletionQueue::CompletionType t = cq_.Next(&tag); - AsyncServerContext* server_context = static_cast<AsyncServerContext*>(tag); - switch (t) { - case CompletionQueue::SERVER_RPC_NEW: - gpr_log(GPR_INFO, "SERVER_RPC_NEW %p", server_context); - if (server_context) { - EXPECT_EQ(server_context->method(), "/foo"); - // TODO(ctiller): verify deadline - server_context->Accept(cq_.cq()); - // Handle only one rpc at a time. - RequestOneRpc(); - server_context->StartRead(&request); - } - break; - case CompletionQueue::RPC_END: - gpr_log(GPR_INFO, "RPC_END %p", server_context); - delete server_context; - break; - case CompletionQueue::SERVER_READ_OK: - gpr_log(GPR_INFO, "SERVER_READ_OK %p", server_context); - response.set_message(request.message()); - server_context->StartWrite(response, 0); - break; - case CompletionQueue::SERVER_READ_ERROR: - gpr_log(GPR_INFO, "SERVER_READ_ERROR %p", server_context); - server_context->StartWriteStatus(Status::OK); - break; - case CompletionQueue::HALFCLOSE_OK: - gpr_log(GPR_INFO, "HALFCLOSE_OK %p", server_context); - // Do nothing, just wait for RPC_END. - break; - case CompletionQueue::SERVER_WRITE_OK: - gpr_log(GPR_INFO, "SERVER_WRITE_OK %p", server_context); - server_context->StartRead(&request); - break; - case CompletionQueue::SERVER_WRITE_ERROR: - EXPECT_TRUE(0); - break; - case CompletionQueue::QUEUE_CLOSED: { - gpr_log(GPR_INFO, "QUEUE_CLOSED"); - HandleQueueClosed(); - return; - } - default: - EXPECT_TRUE(0); - break; - } - } -} - -void AsyncTestServer::HandleQueueClosed() { - std::unique_lock<std::mutex> lock(cq_drained_mu_); - cq_drained_ = true; - cq_drained_cv_.notify_all(); -} - -void AsyncTestServer::Shutdown() { - // The server need to be shut down before cq_ as grpc_server flushes all - // pending requested calls to the completion queue at shutdown. - server_.Shutdown(); - cq_.Shutdown(); - std::unique_lock<std::mutex> lock(cq_drained_mu_); - while (!cq_drained_) { - cq_drained_cv_.wait(lock); - } -} - -} // namespace testing -} // namespace grpc diff --git a/test/cpp/end2end/async_test_server.h b/test/cpp/end2end/async_test_server.h deleted file mode 100644 index a277061ace..0000000000 --- a/test/cpp/end2end/async_test_server.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__ -#define __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__ - -#include <condition_variable> -#include <mutex> -#include <string> - -#include <grpc++/async_server.h> -#include <grpc++/completion_queue.h> - -namespace grpc { - -namespace testing { - -class AsyncTestServer { - public: - AsyncTestServer(); - virtual ~AsyncTestServer(); - - void AddPort(const grpc::string& addr); - void Start(); - void RequestOneRpc(); - virtual void MainLoop(); - void Shutdown(); - - CompletionQueue* completion_queue() { return &cq_; } - - protected: - void HandleQueueClosed(); - - private: - CompletionQueue cq_; - AsyncServer server_; - bool cq_drained_; - std::mutex cq_drained_mu_; - std::condition_variable cq_drained_cv_; -}; - -} // namespace testing -} // namespace grpc - -#endif // __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__ diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 4dea77ea81..974717f6e2 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -38,6 +38,7 @@ #include "test/cpp/util/echo_duplicate.pb.h" #include "test/cpp/util/echo.pb.h" #include "src/cpp/util/time.h" +#include "src/cpp/server/thread_pool.h" #include <grpc++/channel_arguments.h> #include <grpc++/channel_interface.h> #include <grpc++/client_context.h> @@ -76,6 +77,7 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request, response->mutable_param()->set_request_deadline(deadline.tv_sec); } } + } // namespace class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service { @@ -141,14 +143,17 @@ class TestServiceImplDupPkg class End2endTest : public ::testing::Test { protected: + End2endTest() : thread_pool_(2) {} + void SetUp() override { int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; // Setup server ServerBuilder builder; builder.AddPort(server_address_.str()); - builder.RegisterService(service_.service()); - builder.RegisterService(dup_pkg_service_.service()); + builder.RegisterService(&service_); + builder.RegisterService(&dup_pkg_service_); + builder.SetThreadPool(&thread_pool_); server_ = builder.BuildAndStart(); } @@ -165,6 +170,7 @@ class End2endTest : public ::testing::Test { std::ostringstream server_address_; TestServiceImpl service_; TestServiceImplDupPkg dup_pkg_service_; + ThreadPool thread_pool_; }; static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, @@ -290,7 +296,7 @@ TEST_F(End2endTest, RequestStreamOneRequest) { request.set_message("hello"); EXPECT_TRUE(stream->Write(request)); stream->WritesDone(); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.IsOk()); @@ -308,7 +314,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) { EXPECT_TRUE(stream->Write(request)); EXPECT_TRUE(stream->Write(request)); stream->WritesDone(); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_EQ(response.message(), "hellohello"); EXPECT_TRUE(s.IsOk()); @@ -323,7 +329,7 @@ TEST_F(End2endTest, ResponseStream) { request.set_message("hello"); ClientReader<EchoResponse>* stream = - stub_->ResponseStream(&context, &request); + stub_->ResponseStream(&context, request); EXPECT_TRUE(stream->Read(&response)); EXPECT_EQ(response.message(), request.message() + "0"); EXPECT_TRUE(stream->Read(&response)); @@ -332,7 +338,7 @@ TEST_F(End2endTest, ResponseStream) { EXPECT_EQ(response.message(), request.message() + "2"); EXPECT_FALSE(stream->Read(&response)); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_TRUE(s.IsOk()); delete stream; @@ -366,7 +372,7 @@ TEST_F(End2endTest, BidiStream) { stream->WritesDone(); EXPECT_FALSE(stream->Read(&response)); - Status s = stream->Wait(); + Status s = stream->Finish(); EXPECT_TRUE(s.IsOk()); delete stream; @@ -422,7 +428,7 @@ TEST_F(End2endTest, BadCredentials) { ClientContext context2; ClientReaderWriter<EchoRequest, EchoResponse>* stream = stub->BidiStream(&context2); - s = stream->Wait(); + s = stream->Finish(); EXPECT_FALSE(s.IsOk()); EXPECT_EQ(StatusCode::UNKNOWN, s.code()); EXPECT_EQ("Rpc sent on a lame channel.", s.details()); @@ -439,5 +445,6 @@ int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); int result = RUN_ALL_TESTS(); grpc_shutdown(); + google::protobuf::ShutdownProtobufLibrary(); return result; } diff --git a/test/cpp/end2end/sync_client_async_server_test.cc b/test/cpp/end2end/sync_client_async_server_test.cc deleted file mode 100644 index 9955eb306f..0000000000 --- a/test/cpp/end2end/sync_client_async_server_test.cc +++ /dev/null @@ -1,236 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <chrono> -#include <memory> -#include <sstream> -#include <string> - -#include <grpc/grpc.h> -#include <grpc/support/thd.h> -#include "test/cpp/util/echo.pb.h" -#include <grpc++/channel_arguments.h> -#include <grpc++/channel_interface.h> -#include <grpc++/client_context.h> -#include <grpc++/create_channel.h> -#include <grpc++/impl/internal_stub.h> -#include <grpc++/impl/rpc_method.h> -#include <grpc++/status.h> -#include <grpc++/stream.h> -#include "test/cpp/end2end/async_test_server.h" -#include "test/core/util/port.h" -#include <gtest/gtest.h> - -using grpc::cpp::test::util::EchoRequest; -using grpc::cpp::test::util::EchoResponse; - -using std::chrono::duration_cast; -using std::chrono::microseconds; -using std::chrono::seconds; -using std::chrono::system_clock; - -using grpc::testing::AsyncTestServer; - -namespace grpc { -namespace { - -void ServerLoop(void* s) { - AsyncTestServer* server = static_cast<AsyncTestServer*>(s); - server->MainLoop(); -} - -class End2endTest : public ::testing::Test { - protected: - void SetUp() override { - int port = grpc_pick_unused_port_or_die(); - // TODO(yangg) protobuf has a StringPrintf, maybe use that - std::ostringstream oss; - oss << "[::]:" << port; - // Setup server - server_.reset(new AsyncTestServer()); - server_->AddPort(oss.str()); - server_->Start(); - - RunServerThread(); - - // Setup client - oss.str(""); - oss << "127.0.0.1:" << port; - std::shared_ptr<ChannelInterface> channel = - CreateChannel(oss.str(), ChannelArguments()); - stub_.set_channel(channel); - } - - void RunServerThread() { - gpr_thd_id id; - EXPECT_TRUE(gpr_thd_new(&id, ServerLoop, server_.get(), NULL)); - } - - void TearDown() override { server_->Shutdown(); } - - std::unique_ptr<AsyncTestServer> server_; - InternalStub stub_; -}; - -TEST_F(End2endTest, NoOpTest) { EXPECT_TRUE(stub_.channel() != nullptr); } - -TEST_F(End2endTest, SimpleRpc) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo"); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - Status s = - stub_.channel()->StartBlockingRpc(method, &context, request, &result); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, KSequentialSimpleRpcs) { - int k = 3; - for (int i = 0; i < k; i++) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo"); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - Status s = - stub_.channel()->StartBlockingRpc(method, &context, request, &result); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); - } -} - -TEST_F(End2endTest, OnePingpongBidiStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, nullptr, nullptr); - std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream( - new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface)); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&result)); - stream->WritesDone(); - EXPECT_FALSE(stream->Read(&result)); - Status s = stream->Wait(); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, TwoPingpongBidiStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, nullptr, nullptr); - std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream( - new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface)); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&result)); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&result)); - EXPECT_EQ(result.message(), request.message()); - stream->WritesDone(); - EXPECT_FALSE(stream->Read(&result)); - Status s = stream->Wait(); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, OnePingpongClientStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::CLIENT_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, nullptr, &result); - std::unique_ptr<ClientWriter<EchoRequest>> stream( - new ClientWriter<EchoRequest>(stream_interface)); - EXPECT_TRUE(stream->Write(request)); - stream->WritesDone(); - Status s = stream->Wait(); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -TEST_F(End2endTest, OnePingpongServerStream) { - EchoRequest request; - request.set_message("hello"); - EchoResponse result; - ClientContext context; - RpcMethod method("/foo", RpcMethod::RpcType::SERVER_STREAMING); - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(10); - context.set_absolute_deadline(deadline); - StreamContextInterface* stream_interface = - stub_.channel()->CreateStream(method, &context, &request, nullptr); - std::unique_ptr<ClientReader<EchoResponse>> stream( - new ClientReader<EchoResponse>(stream_interface)); - EXPECT_TRUE(stream->Read(&result)); - EXPECT_FALSE(stream->Read(nullptr)); - Status s = stream->Wait(); - EXPECT_EQ(result.message(), request.message()); - EXPECT_TRUE(s.IsOk()); -} - -} // namespace -} // namespace grpc - -int main(int argc, char** argv) { - grpc_init(); - ::testing::InitGoogleTest(&argc, argv); - int result = RUN_ALL_TESTS(); - grpc_shutdown(); - return result; -} diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index 0fa76f0e02..57a503f84f 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -248,7 +248,7 @@ void DoRequestStreaming() { aggregated_payload_size += request_stream_sizes[i]; } stream->WritesDone(); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size); GPR_ASSERT(s.IsOk()); @@ -269,7 +269,7 @@ void DoResponseStreaming() { } StreamingOutputCallResponse response; std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream( - stub->StreamingOutputCall(&context, &request)); + stub->StreamingOutputCall(&context, request)); unsigned int i = 0; while (stream->Read(&response)) { @@ -278,7 +278,7 @@ void DoResponseStreaming() { ++i; } GPR_ASSERT(response_stream_sizes.size() == i); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Response streaming done."); @@ -299,7 +299,7 @@ void DoResponseStreamingWithSlowConsumer() { } StreamingOutputCallResponse response; std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream( - stub->StreamingOutputCall(&context, &request)); + stub->StreamingOutputCall(&context, request)); int i = 0; while (stream->Read(&response)) { @@ -311,7 +311,7 @@ void DoResponseStreamingWithSlowConsumer() { ++i; } GPR_ASSERT(kNumResponseMessages == i); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Response streaming done."); @@ -345,7 +345,7 @@ void DoHalfDuplex() { ++i; } GPR_ASSERT(response_stream_sizes.size() == i); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Half-duplex streaming rpc done."); } @@ -378,7 +378,7 @@ void DoPingPong() { stream->WritesDone(); GPR_ASSERT(!stream->Read(&response)); - grpc::Status s = stream->Wait(); + grpc::Status s = stream->Finish(); GPR_ASSERT(s.IsOk()); gpr_log(GPR_INFO, "Ping pong streaming done."); } diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc index 8a6be57929..a8399779b9 100644 --- a/test/cpp/interop/server.cc +++ b/test/cpp/interop/server.cc @@ -200,7 +200,7 @@ void RunServer() { ServerBuilder builder; builder.AddPort(server_address.str()); - builder.RegisterService(service.service()); + builder.RegisterService(&service); if (FLAGS_enable_ssl) { SslServerCredentialsOptions ssl_opts = { "", {{test_server1_key, test_server1_cert}}}; diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc index 3a432b6fbb..718046170f 100644 --- a/test/cpp/qps/server.cc +++ b/test/cpp/qps/server.cc @@ -128,7 +128,7 @@ static void RunServer() { ServerBuilder builder; builder.AddPort(server_address); - builder.RegisterService(service.service()); + builder.RegisterService(&service); std::unique_ptr<ThreadPool> pool(new ThreadPool(FLAGS_server_threads)); builder.SetThreadPool(pool.get()); |