aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/core/end2end/tests/cancel_after_accept.c3
-rw-r--r--test/core/end2end/tests/early_server_shutdown_finishes_tags.c2
-rw-r--r--test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c3
-rw-r--r--test/core/end2end/tests/request_response_with_metadata_and_payload.c3
-rw-r--r--test/core/end2end/tests/request_response_with_payload.c3
-rw-r--r--test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c3
-rw-r--r--test/core/end2end/tests/request_with_large_metadata.c3
-rw-r--r--test/core/end2end/tests/request_with_payload.c3
-rw-r--r--test/core/end2end/tests/simple_delayed_request.c3
-rw-r--r--test/core/end2end/tests/simple_request.c3
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c6
-rw-r--r--test/core/statistics/census_log_tests.c2
-rw-r--r--test/cpp/end2end/async_end2end_test.cc528
-rw-r--r--test/cpp/end2end/async_test_server.cc154
-rw-r--r--test/cpp/end2end/async_test_server.h75
-rw-r--r--test/cpp/end2end/end2end_test.cc23
-rw-r--r--test/cpp/end2end/sync_client_async_server_test.cc236
-rw-r--r--test/cpp/interop/client.cc14
-rw-r--r--test/cpp/interop/server.cc2
-rw-r--r--test/cpp/qps/server.cc2
20 files changed, 575 insertions, 496 deletions
diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c
index 18d6bcec06..17f37d6719 100644
--- a/test/core/end2end/tests/cancel_after_accept.c
+++ b/test/core/end2end/tests/cancel_after_accept.c
@@ -166,7 +166,8 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(2)));
+ f.server_cq,
+ tag(2)));
cq_expect_completion(v_server, tag(2), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
index 123c8bc415..51486cc169 100644
--- a/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
+++ b/test/core/end2end/tests/early_server_shutdown_finishes_tags.c
@@ -79,7 +79,7 @@ static void drain_cq(grpc_completion_queue *cq) {
static void shutdown_server(grpc_end2end_test_fixture *f) {
if (!f->server) return;
- grpc_server_shutdown(f->server);
+ /* don't shutdown, just destroy, to tickle this code edge */
grpc_server_destroy(f->server);
f->server = NULL;
}
diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
index 940e327d22..a71e3a7736 100644
--- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
@@ -175,7 +175,8 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(101)));
+ f.server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
index 80cb629542..f7394a25b0 100644
--- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c
@@ -168,7 +168,8 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(101)));
+ f.server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c
index b07f51da85..be4beb537a 100644
--- a/test/core/end2end/tests/request_response_with_payload.c
+++ b/test/core/end2end/tests/request_response_with_payload.c
@@ -162,7 +162,8 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(101)));
+ f.server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
index e547604619..637a9ffe1c 100644
--- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
+++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
@@ -169,7 +169,8 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(101)));
+ f.server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c
index eb6180c399..fff83dcbf0 100644
--- a/test/core/end2end/tests/request_with_large_metadata.c
+++ b/test/core/end2end/tests/request_with_large_metadata.c
@@ -166,7 +166,8 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(101)));
+ f.server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c
index 2bf0fa3717..2b520046bb 100644
--- a/test/core/end2end/tests/request_with_payload.c
+++ b/test/core/end2end/tests/request_with_payload.c
@@ -157,7 +157,8 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(101)));
+ f.server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c
index 80763fe6cd..ac248f9be0 100644
--- a/test/core/end2end/tests/simple_delayed_request.c
+++ b/test/core/end2end/tests/simple_delayed_request.c
@@ -144,7 +144,8 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s,
&call_details,
&request_metadata_recv,
- f->server_cq, tag(101)));
+ f->server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c
index 968be74cfb..ab03479586 100644
--- a/test/core/end2end/tests/simple_request.c
+++ b/test/core/end2end/tests/simple_request.c
@@ -150,7 +150,8 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
- f.server_cq, tag(101)));
+ f.server_cq,
+ tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index e906f302cf..ae6994ef07 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -66,7 +66,7 @@ static void test_no_op(void) {
static void test_no_op_with_start(void) {
grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST();
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@@ -120,7 +120,7 @@ static void test_connect(int n) {
GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
GPR_ASSERT(addr_len <= sizeof(addr));
- grpc_tcp_server_start(s, NULL, on_connect, NULL);
+ grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
for (i = 0; i < n; i++) {
deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000));
diff --git a/test/core/statistics/census_log_tests.c b/test/core/statistics/census_log_tests.c
index c7b2b2e46d..e2ad78a6f2 100644
--- a/test/core/statistics/census_log_tests.c
+++ b/test/core/statistics/census_log_tests.c
@@ -35,7 +35,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include "src/core/support/cpu.h"
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
new file mode 100644
index 0000000000..7e827cb0e5
--- /dev/null
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -0,0 +1,528 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <chrono>
+#include <memory>
+
+#include "test/core/util/test_config.h"
+#include "test/cpp/util/echo_duplicate.pb.h"
+#include "test/cpp/util/echo.pb.h"
+#include "src/cpp/util/time.h"
+#include <grpc++/channel_arguments.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc++/status.h>
+#include <grpc++/stream.h>
+#include "test/core/util/port.h"
+#include <gtest/gtest.h>
+
+#include <grpc/grpc.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+using grpc::cpp::test::util::EchoRequest;
+using grpc::cpp::test::util::EchoResponse;
+using std::chrono::system_clock;
+
+namespace grpc {
+namespace testing {
+
+namespace {
+
+void* tag(int i) {
+ return (void*)(gpr_intptr)i;
+}
+
+void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
+ bool ok;
+ void* got_tag;
+ EXPECT_TRUE(cq->Next(&got_tag, &ok));
+ EXPECT_EQ(expect_ok, ok);
+ EXPECT_EQ(tag(i), got_tag);
+}
+
+class AsyncEnd2endTest : public ::testing::Test {
+ protected:
+ AsyncEnd2endTest() : service_(&srv_cq_) {}
+
+ void SetUp() override {
+ int port = grpc_pick_unused_port_or_die();
+ server_address_ << "localhost:" << port;
+ // Setup server
+ ServerBuilder builder;
+ builder.AddPort(server_address_.str());
+ builder.RegisterAsyncService(&service_);
+ server_ = builder.BuildAndStart();
+ }
+
+ void TearDown() override { server_->Shutdown(); }
+
+ void ResetStub() {
+ std::shared_ptr<ChannelInterface> channel =
+ CreateChannel(server_address_.str(), ChannelArguments());
+ stub_.reset(grpc::cpp::test::util::TestService::NewStub(channel));
+ }
+
+ void server_ok(int i) {
+ verify_ok(&srv_cq_, i, true);
+ }
+ void client_ok(int i) {
+ verify_ok(&cli_cq_, i , true);
+ }
+ void server_fail(int i) {
+ verify_ok(&srv_cq_, i, false);
+ }
+ void client_fail(int i) {
+ verify_ok(&cli_cq_, i, false);
+ }
+
+ void SendRpc(int num_rpcs) {
+ for (int i = 0; i < num_rpcs; i++) {
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ response_writer.Finish(send_response, Status::OK, tag(3));
+
+ server_ok(3);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+ }
+ }
+
+ CompletionQueue cli_cq_;
+ CompletionQueue srv_cq_;
+ std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
+ std::unique_ptr<Server> server_;
+ grpc::cpp::test::util::TestService::AsyncService service_;
+ std::ostringstream server_address_;
+};
+
+TEST_F(AsyncEnd2endTest, SimpleRpc) {
+ ResetStub();
+ SendRpc(1);
+}
+
+TEST_F(AsyncEnd2endTest, SequentialRpcs) {
+ ResetStub();
+ SendRpc(10);
+}
+
+// Two pings and a final pong.
+TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
+ stub_->RequestStream(&cli_ctx, &recv_response, &cli_cq_, tag(1)));
+
+ service_.RequestRequestStream(
+ &srv_ctx, &srv_stream, &srv_cq_, tag(2));
+
+ server_ok(2);
+ client_ok(1);
+
+ cli_stream->Write(send_request, tag(3));
+ client_ok(3);
+
+ srv_stream.Read(&recv_request, tag(4));
+ server_ok(4);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ cli_stream->Write(send_request, tag(5));
+ client_ok(5);
+
+ srv_stream.Read(&recv_request, tag(6));
+ server_ok(6);
+
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ cli_stream->WritesDone(tag(7));
+ client_ok(7);
+
+ srv_stream.Read(&recv_request, tag(8));
+ server_fail(8);
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Finish(send_response, Status::OK, tag(9));
+ server_ok(9);
+
+ cli_stream->Finish(&recv_status, tag(10));
+ client_ok(10);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+}
+
+// One ping, two pongs.
+TEST_F(AsyncEnd2endTest, SimpleServerStreaming) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream(
+ stub_->ResponseStream(&cli_ctx, send_request, &cli_cq_, tag(1)));
+
+ service_.RequestResponseStream(
+ &srv_ctx, &recv_request, &srv_stream, &srv_cq_, tag(2));
+
+ server_ok(2);
+ client_ok(1);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(3));
+ server_ok(3);
+
+ cli_stream->Read(&recv_response, tag(4));
+ client_ok(4);
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.Write(send_response, tag(5));
+ server_ok(5);
+
+ cli_stream->Read(&recv_response, tag(6));
+ client_ok(6);
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ srv_stream.Finish(Status::OK, tag(7));
+ server_ok(7);
+
+ cli_stream->Read(&recv_response, tag(8));
+ client_fail(8);
+
+ cli_stream->Finish(&recv_status, tag(9));
+ client_ok(9);
+
+ EXPECT_TRUE(recv_status.IsOk());
+}
+
+// One ping, one pong.
+TEST_F(AsyncEnd2endTest, SimpleBidiStreaming) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
+ cli_stream(stub_->BidiStream(&cli_ctx, &cli_cq_, tag(1)));
+
+ service_.RequestBidiStream(
+ &srv_ctx, &srv_stream, &srv_cq_, tag(2));
+
+ server_ok(2);
+ client_ok(1);
+
+ cli_stream->Write(send_request, tag(3));
+ client_ok(3);
+
+ srv_stream.Read(&recv_request, tag(4));
+ server_ok(4);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ srv_stream.Write(send_response, tag(5));
+ server_ok(5);
+
+ cli_stream->Read(&recv_response, tag(6));
+ client_ok(6);
+ EXPECT_EQ(send_response.message(), recv_response.message());
+
+ cli_stream->WritesDone(tag(7));
+ client_ok(7);
+
+ srv_stream.Read(&recv_request, tag(8));
+ server_fail(8);
+
+ srv_stream.Finish(Status::OK, tag(9));
+ server_ok(9);
+
+ cli_stream->Finish(&recv_status, tag(10));
+ client_ok(10);
+
+ EXPECT_TRUE(recv_status.IsOk());
+}
+
+// Metadata tests
+TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::pair<grpc::string, grpc::string> meta1("key1", "val1");
+ std::pair<grpc::string, grpc::string> meta2("key2", "val2");
+ cli_ctx.AddMetadata(meta1.first, meta1.second);
+ cli_ctx.AddMetadata(meta2.first, meta2.second);
+
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ auto client_initial_metadata = srv_ctx.client_metadata();
+ EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
+ EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
+ EXPECT_EQ(2, client_initial_metadata.size());
+
+ send_response.set_message(recv_request.message());
+ response_writer.Finish(send_response, Status::OK, tag(3));
+
+ server_ok(3);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+}
+
+TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::pair<grpc::string, grpc::string> meta1("key1", "val1");
+ std::pair<grpc::string, grpc::string> meta2("key2", "val2");
+
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
+ srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
+ response_writer.SendInitialMetadata(tag(3));
+ server_ok(3);
+
+ send_response.set_message(recv_request.message());
+ response_writer.Finish(send_response, Status::OK, tag(4));
+
+ server_ok(4);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+ auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
+ EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
+ EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
+ EXPECT_EQ(2, server_initial_metadata.size());
+}
+
+TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::pair<grpc::string, grpc::string> meta1("key1", "val1");
+ std::pair<grpc::string, grpc::string> meta2("key2", "val2");
+
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ response_writer.SendInitialMetadata(tag(3));
+ server_ok(3);
+
+ send_response.set_message(recv_request.message());
+ srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
+ srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
+ response_writer.Finish(send_response, Status::OK, tag(4));
+
+ server_ok(4);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+ auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
+ EXPECT_EQ(meta1.second, server_trailing_metadata.find(meta1.first)->second);
+ EXPECT_EQ(meta2.second, server_trailing_metadata.find(meta2.first)->second);
+ EXPECT_EQ(2, server_trailing_metadata.size());
+}
+
+TEST_F(AsyncEnd2endTest, MetadataRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::pair<grpc::string, grpc::string> meta1("key1", "val1");
+ std::pair<grpc::string, grpc::string> meta2("key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
+ std::pair<grpc::string, grpc::string> meta3("key3", "val3");
+ std::pair<grpc::string, grpc::string> meta6("key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
+ std::pair<grpc::string, grpc::string> meta5("key5", "val5");
+ std::pair<grpc::string, grpc::string> meta4("key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
+
+ cli_ctx.AddMetadata(meta1.first, meta1.second);
+ cli_ctx.AddMetadata(meta2.first, meta2.second);
+
+ stub_->Echo(
+ &cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
+
+ service_.RequestEcho(
+ &srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
+ server_ok(2);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ auto client_initial_metadata = srv_ctx.client_metadata();
+ EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
+ EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
+ EXPECT_EQ(2, client_initial_metadata.size());
+
+ srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
+ srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
+ response_writer.SendInitialMetadata(tag(3));
+ server_ok(3);
+
+ send_response.set_message(recv_request.message());
+ srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
+ srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
+ response_writer.Finish(send_response, Status::OK, tag(4));
+
+ server_ok(4);
+
+ client_ok(1);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+ auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
+ EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
+ EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
+ EXPECT_EQ(2, server_initial_metadata.size());
+ auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
+ EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second);
+ EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second);
+ EXPECT_EQ(2, server_trailing_metadata.size());
+}
+} // namespace
+} // namespace testing
+} // namespace grpc
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+ ::testing::InitGoogleTest(&argc, argv);
+ int result = RUN_ALL_TESTS();
+ grpc_shutdown();
+ google::protobuf::ShutdownProtobufLibrary();
+ return result;
+}
diff --git a/test/cpp/end2end/async_test_server.cc b/test/cpp/end2end/async_test_server.cc
deleted file mode 100644
index f18b6c00bc..0000000000
--- a/test/cpp/end2end/async_test_server.cc
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "test/cpp/end2end/async_test_server.h"
-
-#include <chrono>
-
-#include <grpc/support/log.h>
-#include "src/cpp/proto/proto_utils.h"
-#include "test/cpp/util/echo.pb.h"
-#include <grpc++/async_server.h>
-#include <grpc++/async_server_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/status.h>
-#include <gtest/gtest.h>
-
-using grpc::cpp::test::util::EchoRequest;
-using grpc::cpp::test::util::EchoResponse;
-
-using std::chrono::duration_cast;
-using std::chrono::microseconds;
-using std::chrono::seconds;
-using std::chrono::system_clock;
-
-namespace grpc {
-namespace testing {
-
-AsyncTestServer::AsyncTestServer() : server_(&cq_), cq_drained_(false) {}
-
-AsyncTestServer::~AsyncTestServer() {}
-
-void AsyncTestServer::AddPort(const grpc::string& addr) {
- server_.AddPort(addr);
-}
-
-void AsyncTestServer::Start() { server_.Start(); }
-
-// Return true if deadline actual is within 0.5s from expected.
-bool DeadlineMatched(const system_clock::time_point& actual,
- const system_clock::time_point& expected) {
- microseconds diff_usecs = duration_cast<microseconds>(expected - actual);
- gpr_log(GPR_INFO, "diff_usecs= %d", diff_usecs.count());
- return diff_usecs.count() < 500000 && diff_usecs.count() > -500000;
-}
-
-void AsyncTestServer::RequestOneRpc() { server_.RequestOneRpc(); }
-
-void AsyncTestServer::MainLoop() {
- EchoRequest request;
- EchoResponse response;
- void* tag = nullptr;
-
- RequestOneRpc();
-
- while (true) {
- CompletionQueue::CompletionType t = cq_.Next(&tag);
- AsyncServerContext* server_context = static_cast<AsyncServerContext*>(tag);
- switch (t) {
- case CompletionQueue::SERVER_RPC_NEW:
- gpr_log(GPR_INFO, "SERVER_RPC_NEW %p", server_context);
- if (server_context) {
- EXPECT_EQ(server_context->method(), "/foo");
- // TODO(ctiller): verify deadline
- server_context->Accept(cq_.cq());
- // Handle only one rpc at a time.
- RequestOneRpc();
- server_context->StartRead(&request);
- }
- break;
- case CompletionQueue::RPC_END:
- gpr_log(GPR_INFO, "RPC_END %p", server_context);
- delete server_context;
- break;
- case CompletionQueue::SERVER_READ_OK:
- gpr_log(GPR_INFO, "SERVER_READ_OK %p", server_context);
- response.set_message(request.message());
- server_context->StartWrite(response, 0);
- break;
- case CompletionQueue::SERVER_READ_ERROR:
- gpr_log(GPR_INFO, "SERVER_READ_ERROR %p", server_context);
- server_context->StartWriteStatus(Status::OK);
- break;
- case CompletionQueue::HALFCLOSE_OK:
- gpr_log(GPR_INFO, "HALFCLOSE_OK %p", server_context);
- // Do nothing, just wait for RPC_END.
- break;
- case CompletionQueue::SERVER_WRITE_OK:
- gpr_log(GPR_INFO, "SERVER_WRITE_OK %p", server_context);
- server_context->StartRead(&request);
- break;
- case CompletionQueue::SERVER_WRITE_ERROR:
- EXPECT_TRUE(0);
- break;
- case CompletionQueue::QUEUE_CLOSED: {
- gpr_log(GPR_INFO, "QUEUE_CLOSED");
- HandleQueueClosed();
- return;
- }
- default:
- EXPECT_TRUE(0);
- break;
- }
- }
-}
-
-void AsyncTestServer::HandleQueueClosed() {
- std::unique_lock<std::mutex> lock(cq_drained_mu_);
- cq_drained_ = true;
- cq_drained_cv_.notify_all();
-}
-
-void AsyncTestServer::Shutdown() {
- // The server need to be shut down before cq_ as grpc_server flushes all
- // pending requested calls to the completion queue at shutdown.
- server_.Shutdown();
- cq_.Shutdown();
- std::unique_lock<std::mutex> lock(cq_drained_mu_);
- while (!cq_drained_) {
- cq_drained_cv_.wait(lock);
- }
-}
-
-} // namespace testing
-} // namespace grpc
diff --git a/test/cpp/end2end/async_test_server.h b/test/cpp/end2end/async_test_server.h
deleted file mode 100644
index a277061ace..0000000000
--- a/test/cpp/end2end/async_test_server.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__
-#define __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__
-
-#include <condition_variable>
-#include <mutex>
-#include <string>
-
-#include <grpc++/async_server.h>
-#include <grpc++/completion_queue.h>
-
-namespace grpc {
-
-namespace testing {
-
-class AsyncTestServer {
- public:
- AsyncTestServer();
- virtual ~AsyncTestServer();
-
- void AddPort(const grpc::string& addr);
- void Start();
- void RequestOneRpc();
- virtual void MainLoop();
- void Shutdown();
-
- CompletionQueue* completion_queue() { return &cq_; }
-
- protected:
- void HandleQueueClosed();
-
- private:
- CompletionQueue cq_;
- AsyncServer server_;
- bool cq_drained_;
- std::mutex cq_drained_mu_;
- std::condition_variable cq_drained_cv_;
-};
-
-} // namespace testing
-} // namespace grpc
-
-#endif // __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 4dea77ea81..974717f6e2 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -38,6 +38,7 @@
#include "test/cpp/util/echo_duplicate.pb.h"
#include "test/cpp/util/echo.pb.h"
#include "src/cpp/util/time.h"
+#include "src/cpp/server/thread_pool.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
@@ -76,6 +77,7 @@ void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
response->mutable_param()->set_request_deadline(deadline.tv_sec);
}
}
+
} // namespace
class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
@@ -141,14 +143,17 @@ class TestServiceImplDupPkg
class End2endTest : public ::testing::Test {
protected:
+ End2endTest() : thread_pool_(2) {}
+
void SetUp() override {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
builder.AddPort(server_address_.str());
- builder.RegisterService(service_.service());
- builder.RegisterService(dup_pkg_service_.service());
+ builder.RegisterService(&service_);
+ builder.RegisterService(&dup_pkg_service_);
+ builder.SetThreadPool(&thread_pool_);
server_ = builder.BuildAndStart();
}
@@ -165,6 +170,7 @@ class End2endTest : public ::testing::Test {
std::ostringstream server_address_;
TestServiceImpl service_;
TestServiceImplDupPkg dup_pkg_service_;
+ ThreadPool thread_pool_;
};
static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
@@ -290,7 +296,7 @@ TEST_F(End2endTest, RequestStreamOneRequest) {
request.set_message("hello");
EXPECT_TRUE(stream->Write(request));
stream->WritesDone();
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
@@ -308,7 +314,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) {
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Write(request));
stream->WritesDone();
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_EQ(response.message(), "hellohello");
EXPECT_TRUE(s.IsOk());
@@ -323,7 +329,7 @@ TEST_F(End2endTest, ResponseStream) {
request.set_message("hello");
ClientReader<EchoResponse>* stream =
- stub_->ResponseStream(&context, &request);
+ stub_->ResponseStream(&context, request);
EXPECT_TRUE(stream->Read(&response));
EXPECT_EQ(response.message(), request.message() + "0");
EXPECT_TRUE(stream->Read(&response));
@@ -332,7 +338,7 @@ TEST_F(End2endTest, ResponseStream) {
EXPECT_EQ(response.message(), request.message() + "2");
EXPECT_FALSE(stream->Read(&response));
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_TRUE(s.IsOk());
delete stream;
@@ -366,7 +372,7 @@ TEST_F(End2endTest, BidiStream) {
stream->WritesDone();
EXPECT_FALSE(stream->Read(&response));
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_TRUE(s.IsOk());
delete stream;
@@ -422,7 +428,7 @@ TEST_F(End2endTest, BadCredentials) {
ClientContext context2;
ClientReaderWriter<EchoRequest, EchoResponse>* stream =
stub->BidiStream(&context2);
- s = stream->Wait();
+ s = stream->Finish();
EXPECT_FALSE(s.IsOk());
EXPECT_EQ(StatusCode::UNKNOWN, s.code());
EXPECT_EQ("Rpc sent on a lame channel.", s.details());
@@ -439,5 +445,6 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();
grpc_shutdown();
+ google::protobuf::ShutdownProtobufLibrary();
return result;
}
diff --git a/test/cpp/end2end/sync_client_async_server_test.cc b/test/cpp/end2end/sync_client_async_server_test.cc
deleted file mode 100644
index 9955eb306f..0000000000
--- a/test/cpp/end2end/sync_client_async_server_test.cc
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <chrono>
-#include <memory>
-#include <sstream>
-#include <string>
-
-#include <grpc/grpc.h>
-#include <grpc/support/thd.h>
-#include "test/cpp/util/echo.pb.h"
-#include <grpc++/channel_arguments.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/client_context.h>
-#include <grpc++/create_channel.h>
-#include <grpc++/impl/internal_stub.h>
-#include <grpc++/impl/rpc_method.h>
-#include <grpc++/status.h>
-#include <grpc++/stream.h>
-#include "test/cpp/end2end/async_test_server.h"
-#include "test/core/util/port.h"
-#include <gtest/gtest.h>
-
-using grpc::cpp::test::util::EchoRequest;
-using grpc::cpp::test::util::EchoResponse;
-
-using std::chrono::duration_cast;
-using std::chrono::microseconds;
-using std::chrono::seconds;
-using std::chrono::system_clock;
-
-using grpc::testing::AsyncTestServer;
-
-namespace grpc {
-namespace {
-
-void ServerLoop(void* s) {
- AsyncTestServer* server = static_cast<AsyncTestServer*>(s);
- server->MainLoop();
-}
-
-class End2endTest : public ::testing::Test {
- protected:
- void SetUp() override {
- int port = grpc_pick_unused_port_or_die();
- // TODO(yangg) protobuf has a StringPrintf, maybe use that
- std::ostringstream oss;
- oss << "[::]:" << port;
- // Setup server
- server_.reset(new AsyncTestServer());
- server_->AddPort(oss.str());
- server_->Start();
-
- RunServerThread();
-
- // Setup client
- oss.str("");
- oss << "127.0.0.1:" << port;
- std::shared_ptr<ChannelInterface> channel =
- CreateChannel(oss.str(), ChannelArguments());
- stub_.set_channel(channel);
- }
-
- void RunServerThread() {
- gpr_thd_id id;
- EXPECT_TRUE(gpr_thd_new(&id, ServerLoop, server_.get(), NULL));
- }
-
- void TearDown() override { server_->Shutdown(); }
-
- std::unique_ptr<AsyncTestServer> server_;
- InternalStub stub_;
-};
-
-TEST_F(End2endTest, NoOpTest) { EXPECT_TRUE(stub_.channel() != nullptr); }
-
-TEST_F(End2endTest, SimpleRpc) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo");
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- Status s =
- stub_.channel()->StartBlockingRpc(method, &context, request, &result);
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, KSequentialSimpleRpcs) {
- int k = 3;
- for (int i = 0; i < k; i++) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo");
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- Status s =
- stub_.channel()->StartBlockingRpc(method, &context, request, &result);
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
- }
-}
-
-TEST_F(End2endTest, OnePingpongBidiStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, nullptr, nullptr);
- std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream(
- new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface));
- EXPECT_TRUE(stream->Write(request));
- EXPECT_TRUE(stream->Read(&result));
- stream->WritesDone();
- EXPECT_FALSE(stream->Read(&result));
- Status s = stream->Wait();
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, TwoPingpongBidiStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, nullptr, nullptr);
- std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream(
- new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface));
- EXPECT_TRUE(stream->Write(request));
- EXPECT_TRUE(stream->Read(&result));
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(stream->Write(request));
- EXPECT_TRUE(stream->Read(&result));
- EXPECT_EQ(result.message(), request.message());
- stream->WritesDone();
- EXPECT_FALSE(stream->Read(&result));
- Status s = stream->Wait();
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, OnePingpongClientStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::CLIENT_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, nullptr, &result);
- std::unique_ptr<ClientWriter<EchoRequest>> stream(
- new ClientWriter<EchoRequest>(stream_interface));
- EXPECT_TRUE(stream->Write(request));
- stream->WritesDone();
- Status s = stream->Wait();
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, OnePingpongServerStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::SERVER_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, &request, nullptr);
- std::unique_ptr<ClientReader<EchoResponse>> stream(
- new ClientReader<EchoResponse>(stream_interface));
- EXPECT_TRUE(stream->Read(&result));
- EXPECT_FALSE(stream->Read(nullptr));
- Status s = stream->Wait();
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-} // namespace
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc_init();
- ::testing::InitGoogleTest(&argc, argv);
- int result = RUN_ALL_TESTS();
- grpc_shutdown();
- return result;
-}
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 0fa76f0e02..57a503f84f 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -248,7 +248,7 @@ void DoRequestStreaming() {
aggregated_payload_size += request_stream_sizes[i];
}
stream->WritesDone();
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
GPR_ASSERT(s.IsOk());
@@ -269,7 +269,7 @@ void DoResponseStreaming() {
}
StreamingOutputCallResponse response;
std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream(
- stub->StreamingOutputCall(&context, &request));
+ stub->StreamingOutputCall(&context, request));
unsigned int i = 0;
while (stream->Read(&response)) {
@@ -278,7 +278,7 @@ void DoResponseStreaming() {
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Response streaming done.");
@@ -299,7 +299,7 @@ void DoResponseStreamingWithSlowConsumer() {
}
StreamingOutputCallResponse response;
std::unique_ptr<grpc::ClientReader<StreamingOutputCallResponse>> stream(
- stub->StreamingOutputCall(&context, &request));
+ stub->StreamingOutputCall(&context, request));
int i = 0;
while (stream->Read(&response)) {
@@ -311,7 +311,7 @@ void DoResponseStreamingWithSlowConsumer() {
++i;
}
GPR_ASSERT(kNumResponseMessages == i);
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Response streaming done.");
@@ -345,7 +345,7 @@ void DoHalfDuplex() {
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
}
@@ -378,7 +378,7 @@ void DoPingPong() {
stream->WritesDone();
GPR_ASSERT(!stream->Read(&response));
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Ping pong streaming done.");
}
diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc
index 8a6be57929..a8399779b9 100644
--- a/test/cpp/interop/server.cc
+++ b/test/cpp/interop/server.cc
@@ -200,7 +200,7 @@ void RunServer() {
ServerBuilder builder;
builder.AddPort(server_address.str());
- builder.RegisterService(service.service());
+ builder.RegisterService(&service);
if (FLAGS_enable_ssl) {
SslServerCredentialsOptions ssl_opts = {
"", {{test_server1_key, test_server1_cert}}};
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc
index 3a432b6fbb..718046170f 100644
--- a/test/cpp/qps/server.cc
+++ b/test/cpp/qps/server.cc
@@ -128,7 +128,7 @@ static void RunServer() {
ServerBuilder builder;
builder.AddPort(server_address);
- builder.RegisterService(service.service());
+ builder.RegisterService(&service);
std::unique_ptr<ThreadPool> pool(new ThreadPool(FLAGS_server_threads));
builder.SetThreadPool(pool.get());