aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-23 21:08:39 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-23 21:08:39 -0700
commit62280b42c72a46e23ecb12c4f665890de8c44004 (patch)
treeb4690e569b7322e8deb3fa58afc5f49db7a21d28
parent66cc56bb034e3864e4bdb0ad869b6de4e524f3d8 (diff)
Add client streaming, server streaming and bidi streaming tests
-rw-r--r--build.yaml2
-rw-r--r--include/grpcpp/impl/codegen/call.h33
-rw-r--r--include/grpcpp/impl/codegen/method_handler_impl.h4
-rw-r--r--include/grpcpp/impl/codegen/server_interceptor.h2
-rw-r--r--src/cpp/server/server_cc.cc1
-rw-r--r--test/cpp/end2end/BUILD15
-rw-r--r--test/cpp/end2end/client_interceptors_end2end_test.cc135
-rw-r--r--test/cpp/end2end/interceptors_util.h178
-rw-r--r--test/cpp/end2end/server_interceptors_end2end_test.cc73
-rw-r--r--tools/run_tests/generated/sources_and_headers.json7
10 files changed, 301 insertions, 149 deletions
diff --git a/build.yaml b/build.yaml
index 9e0db739bc..1ebc996f9f 100644
--- a/build.yaml
+++ b/build.yaml
@@ -4517,6 +4517,8 @@ targets:
cpu_cost: 0.5
build: test
language: c++
+ headers:
+ - test/cpp/end2end/interceptors_util.h
src:
- test/cpp/end2end/client_interceptors_end2end_test.cc
deps:
diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h
index 5a12ac539e..505055e7e6 100644
--- a/include/grpcpp/impl/codegen/call.h
+++ b/include/grpcpp/impl/codegen/call.h
@@ -943,6 +943,13 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods {
curr_iteration_ + 1));
}
+ // Clears all state
+ void ClearState() {
+ reverse_ = false;
+ ran_hijacking_interceptor_ = false;
+ ClearHookPoints();
+ }
+
// Prepares for Post_recv operations
void SetReverse() {
reverse_ = true;
@@ -1166,14 +1173,14 @@ class CallOpSet : public CallOpSetInterface,
}
void FillOps(Call* call) override {
- gpr_log(GPR_ERROR, "filling ops %p", this);
+ // gpr_log(GPR_ERROR, "filling ops %p", this);
done_intercepting_ = false;
g_core_codegen_interface->grpc_call_ref(call->call());
call_ =
*call; // It's fine to create a copy of call since it's just pointers
if (RunInterceptors()) {
- gpr_log(GPR_ERROR, "no interceptors on send path");
+ // gpr_log(GPR_ERROR, "no interceptors on send path");
ContinueFillOpsAfterInterception();
} else {
// After the interceptors are run, ContinueFillOpsAfterInterception will
@@ -1182,12 +1189,12 @@ class CallOpSet : public CallOpSetInterface,
}
bool FinalizeResult(void** tag, bool* status) override {
- gpr_log(GPR_ERROR, "finalizing result %p", this);
+ // gpr_log(GPR_ERROR, "finalizing result %p", this);
if (done_intercepting_) {
// We have already finished intercepting and filling in the results. This
// round trip from the core needed to be made because interceptors were
// run
- gpr_log(GPR_ERROR, "done intercepting");
+ // gpr_log(GPR_ERROR, "done intercepting");
*tag = return_tag_;
g_core_codegen_interface->grpc_call_unref(call_.call());
return true;
@@ -1199,15 +1206,14 @@ class CallOpSet : public CallOpSetInterface,
this->Op4::FinishOp(status);
this->Op5::FinishOp(status);
this->Op6::FinishOp(status);
- gpr_log(GPR_ERROR, "done finish ops");
-
+ // gpr_log(GPR_ERROR, "done finish ops");
if (RunInterceptorsPostRecv()) {
*tag = return_tag_;
g_core_codegen_interface->grpc_call_unref(call_.call());
- gpr_log(GPR_ERROR, "no interceptors");
+ // gpr_log(GPR_ERROR, "no interceptors");
return true;
}
- gpr_log(GPR_ERROR, "running interceptors");
+ // gpr_log(GPR_ERROR, "running interceptors");
// Interceptors are going to be run, so we can't return the tag just yet.
// After the interceptors are run, ContinueFinalizeResultAfterInterception
return false;
@@ -1245,7 +1251,8 @@ class CallOpSet : public CallOpSetInterface,
this->Op4::AddOp(ops, &nops);
this->Op5::AddOp(ops, &nops);
this->Op6::AddOp(ops, &nops);
- gpr_log(GPR_ERROR, "going to start call batch %p", this);
+ // gpr_log(GPR_ERROR, "going to start call batch %p with %lu ops", this,
+ // nops);
GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
g_core_codegen_interface->grpc_call_start_batch(
call_.call(), ops, nops, cq_tag(), nullptr));
@@ -1255,6 +1262,7 @@ class CallOpSet : public CallOpSetInterface,
// path
void ContinueFinalizeResultAfterInterception() override {
done_intercepting_ = true;
+ // gpr_log(GPR_ERROR, "going to start call batch %p for dummy tag", this);
GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
g_core_codegen_interface->grpc_call_start_batch(
call_.call(), nullptr, 0, cq_tag(), nullptr));
@@ -1263,18 +1271,21 @@ class CallOpSet : public CallOpSetInterface,
private:
// Returns true if no interceptors need to be run
bool RunInterceptors() {
+ interceptor_methods_.ClearState();
+ interceptor_methods_.SetCallOpSetInterface(this);
+ interceptor_methods_.SetCall(&call_);
this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
- interceptor_methods_.SetCallOpSetInterface(this);
- interceptor_methods_.SetCall(&call_);
return interceptor_methods_.RunInterceptors();
}
// Returns true if no interceptors need to be run
bool RunInterceptorsPostRecv() {
+ // Call and OpSet had already been set on the set state.
+ // SetReverse also clears previously set hook points
interceptor_methods_.SetReverse();
this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h
index 176e4ed31b..279dce53bc 100644
--- a/include/grpcpp/impl/codegen/method_handler_impl.h
+++ b/include/grpcpp/impl/codegen/method_handler_impl.h
@@ -121,6 +121,7 @@ class ClientStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
+ gpr_log(GPR_ERROR, "running client streaming handler");
ServerReader<RequestType> reader(param.call, param.server_context);
ResponseType rsp;
Status status = CatchingFunctionHandler([this, &param, &reader, &rsp] {
@@ -164,6 +165,7 @@ class ServerStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
void RunHandler(const HandlerParameter& param) final {
+ gpr_log(GPR_ERROR, "running server streaming handler");
Status status = param.status;
if (status.ok()) {
ServerWriter<ResponseType> writer(param.call, param.server_context);
@@ -225,6 +227,7 @@ class TemplatedBidiStreamingHandler : public MethodHandler {
: func_(func), write_needed_(WriteNeeded) {}
void RunHandler(const HandlerParameter& param) final {
+ gpr_log(GPR_ERROR, "running bidi streaming handler");
Streamer stream(param.call, param.server_context);
Status status = CatchingFunctionHandler([this, &param, &stream] {
return func_(param.server_context, &stream);
@@ -318,6 +321,7 @@ class ErrorMethodHandler : public MethodHandler {
}
void RunHandler(const HandlerParameter& param) final {
+ gpr_log(GPR_ERROR, "running error handler");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
FillOps(param.server_context, &ops);
param.call->PerformOps(&ops);
diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h
index 5c38ceaace..3f8cbcca8d 100644
--- a/include/grpcpp/impl/codegen/server_interceptor.h
+++ b/include/grpcpp/impl/codegen/server_interceptor.h
@@ -73,9 +73,7 @@ class ServerRpcInfo {
const std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>&
creators) {
- gpr_log(GPR_ERROR, "Registering interceptors");
for (const auto& creator : creators) {
- gpr_log(GPR_ERROR, "registering one");
interceptors_.push_back(std::unique_ptr<experimental::Interceptor>(
creator->CreateServerInterceptor(this)));
}
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 9f4ec3e4ab..93d234a0c4 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -275,6 +275,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
global_callbacks_->PreSynchronousRequest(&ctx_);
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
+ gpr_log(GPR_ERROR, "got method %s", method_->name());
handler->RunHandler(internal::MethodHandler::HandlerParameter(
&call_, &ctx_, request_, request_status_));
request_ = nullptr;
diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD
index d246f41ff8..235249e8bf 100644
--- a/test/cpp/end2end/BUILD
+++ b/test/cpp/end2end/BUILD
@@ -35,6 +35,19 @@ grpc_cc_library(
],
)
+grpc_cc_library(
+ name = "interceptors_util",
+ testonly = True,
+ hdrs = ["interceptors_util.h"],
+ external_deps = [
+ "gtest",
+ ],
+ deps = [
+ "//src/proto/grpc/testing:echo_proto",
+ "//test/cpp/util:test_util",
+ ],
+)
+
grpc_cc_test(
name = "async_end2end_test",
srcs = ["async_end2end_test.cc"],
@@ -124,6 +137,7 @@ grpc_cc_test(
"gtest",
],
deps = [
+ ":interceptors_util",
":test_service_impl",
"//:gpr",
"//:grpc",
@@ -495,6 +509,7 @@ grpc_cc_test(
"gtest",
],
deps = [
+ ":interceptors_util",
":test_service_impl",
"//:gpr",
"//:grpc",
diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc
index 3716582c58..ff012f6f48 100644
--- a/test/cpp/end2end/client_interceptors_end2end_test.cc
+++ b/test/cpp/end2end/client_interceptors_end2end_test.cc
@@ -32,6 +32,7 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/interceptors_util.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/byte_buffer_proto_helper.h"
#include "test/cpp/util/string_ref_helper.h"
@@ -42,28 +43,6 @@ namespace grpc {
namespace testing {
namespace {
-class EchoTestServiceStreamingImpl : public EchoTestService::Service {
- public:
- ~EchoTestServiceStreamingImpl() override {}
-
- Status BidiStream(
- ServerContext* context,
- grpc::ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
- EchoRequest req;
- EchoResponse resp;
- auto client_metadata = context->client_metadata();
- for (const auto& pair : client_metadata) {
- context->AddTrailingMetadata(ToString(pair.first), ToString(pair.second));
- }
-
- while (stream->Read(&req)) {
- resp.set_message(req.message());
- stream->Write(resp, grpc::WriteOptions());
- }
- return Status::OK;
- }
-};
-
class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test {
protected:
ClientInterceptorsStreamingEnd2endTest() {
@@ -157,7 +136,6 @@ class HijackingInterceptor : public experimental::Interceptor {
}
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
- // gpr_log(GPR_ERROR, "ran this");
bool hijack = false;
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
@@ -261,7 +239,6 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor {
}
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
- // gpr_log(GPR_ERROR, "ran this");
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
auto* map = methods->GetSendInitialMetadata();
@@ -329,6 +306,7 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor {
}
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_RECV_INITIAL_METADATA)) {
+ gpr_log(GPR_ERROR, "hijacked");
auto* map = methods->GetRecvInitialMetadata();
// Got nothing better to do here at the moment
EXPECT_EQ(map->size(), static_cast<unsigned>(0));
@@ -345,7 +323,7 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor {
auto* map = methods->GetRecvTrailingMetadata();
// insert the metadata that we want
EXPECT_EQ(map->size(), static_cast<unsigned>(0));
- *map = ctx_.GetServerTrailingMetadata();
+ map->insert(std::make_pair("testkey", "testvalue"));
auto* status = methods->GetRecvStatus();
*status = Status(StatusCode::OK, "");
}
@@ -376,7 +354,6 @@ class LoggingInterceptor : public experimental::Interceptor {
LoggingInterceptor(experimental::ClientRpcInfo* info) { info_ = info; }
virtual void Intercept(experimental::InterceptorBatchMethods* methods) {
- // gpr_log(GPR_ERROR, "ran this");
if (methods->QueryInterceptionHookPoint(
experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
auto* map = methods->GetSendInitialMetadata();
@@ -440,63 +417,6 @@ class LoggingInterceptorFactory
}
};
-void MakeCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- req.mutable_param()->set_echo_metadata(true);
- ctx.AddMetadata("testkey", "testvalue");
- req.set_message("Hello");
- EchoResponse resp;
- Status s = stub->Echo(&ctx, req, &resp);
- EXPECT_EQ(s.ok(), true);
- EXPECT_EQ(resp.message(), "Hello");
-}
-
-void MakeCallbackCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- std::mutex mu;
- std::condition_variable cv;
- bool done = false;
- req.mutable_param()->set_echo_metadata(true);
- ctx.AddMetadata("testkey", "testvalue");
- req.set_message("Hello");
- EchoResponse resp;
- stub->experimental_async()->Echo(&ctx, &req, &resp,
- [&resp, &mu, &done, &cv](Status s) {
- // gpr_log(GPR_ERROR, "got the callback");
- EXPECT_EQ(s.ok(), true);
- EXPECT_EQ(resp.message(), "Hello");
- std::lock_guard<std::mutex> l(mu);
- done = true;
- cv.notify_one();
- });
- std::unique_lock<std::mutex> l(mu);
- while (!done) {
- cv.wait(l);
- }
-}
-
-void MakeStreamingCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- EchoResponse resp;
- ctx.AddMetadata("testkey", "testvalue");
- auto stream = stub->BidiStream(&ctx);
- for (auto i = 0; i < 10; i++) {
- req.set_message("Hello" + std::to_string(i));
- stream->Write(req);
- stream->Read(&resp);
- EXPECT_EQ(req.message(), resp.message());
- }
- ASSERT_TRUE(stream->WritesDone());
- Status s = stream->Finish();
- EXPECT_EQ(s.ok(), true);
-}
-
TEST_F(ClientInterceptorsEnd2endTest, ClientInterceptorLoggingTest) {
ChannelArguments args;
DummyInterceptor::Reset();
@@ -582,9 +502,6 @@ TEST_F(ClientInterceptorsEnd2endTest,
creators->push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
}
- // auto channel = experimental::CreateCustomChannelWithInterceptors(
- // server_address_, InsecureChannelCredentials(), args,
- // std::move(creators));
auto channel = server_->experimental().InProcessChannelWithInterceptors(
args, std::move(creators));
@@ -616,7 +533,49 @@ TEST_F(ClientInterceptorsEnd2endTest,
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
}
-TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientInterceptorLoggingTest) {
+TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientStreamingTest) {
+ ChannelArguments args;
+ DummyInterceptor::Reset();
+ auto creators = std::unique_ptr<std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>(
+ new std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>());
+ creators->push_back(std::unique_ptr<LoggingInterceptorFactory>(
+ new LoggingInterceptorFactory()));
+ // Add 20 dummy interceptors
+ for (auto i = 0; i < 20; i++) {
+ creators->push_back(std::unique_ptr<DummyInterceptorFactory>(
+ new DummyInterceptorFactory()));
+ }
+ auto channel = experimental::CreateCustomChannelWithInterceptors(
+ server_address_, InsecureChannelCredentials(), args, std::move(creators));
+ MakeClientStreamingCall(channel);
+ // Make sure all 20 dummy interceptors were run
+ EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
+}
+
+TEST_F(ClientInterceptorsStreamingEnd2endTest, ServerStreamingTest) {
+ ChannelArguments args;
+ DummyInterceptor::Reset();
+ auto creators = std::unique_ptr<std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>>(
+ new std::vector<
+ std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>());
+ creators->push_back(std::unique_ptr<LoggingInterceptorFactory>(
+ new LoggingInterceptorFactory()));
+ // Add 20 dummy interceptors
+ for (auto i = 0; i < 20; i++) {
+ creators->push_back(std::unique_ptr<DummyInterceptorFactory>(
+ new DummyInterceptorFactory()));
+ }
+ auto channel = experimental::CreateCustomChannelWithInterceptors(
+ server_address_, InsecureChannelCredentials(), args, std::move(creators));
+ MakeServerStreamingCall(channel);
+ // Make sure all 20 dummy interceptors were run
+ EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
+}
+
+TEST_F(ClientInterceptorsStreamingEnd2endTest, BidiStreamingTest) {
ChannelArguments args;
DummyInterceptor::Reset();
auto creators = std::unique_ptr<std::vector<
@@ -632,7 +591,7 @@ TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientInterceptorLoggingTest) {
}
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
- MakeStreamingCall(channel);
+ MakeBidiStreamingCall(channel);
// Make sure all 20 dummy interceptors were run
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
}
diff --git a/test/cpp/end2end/interceptors_util.h b/test/cpp/end2end/interceptors_util.h
new file mode 100644
index 0000000000..bc6211517d
--- /dev/null
+++ b/test/cpp/end2end/interceptors_util.h
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/proto/grpc/testing/echo.grpc.pb.h"
+#include "test/cpp/util/string_ref_helper.h"
+
+#include <gtest/gtest.h>
+
+namespace grpc {
+namespace testing {
+class EchoTestServiceStreamingImpl : public EchoTestService::Service {
+ public:
+ ~EchoTestServiceStreamingImpl() override {}
+
+ Status BidiStream(
+ ServerContext* context,
+ grpc::ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
+ EchoRequest req;
+ EchoResponse resp;
+ auto client_metadata = context->client_metadata();
+ for (const auto& pair : client_metadata) {
+ context->AddTrailingMetadata(ToString(pair.first), ToString(pair.second));
+ }
+
+ while (stream->Read(&req)) {
+ resp.set_message(req.message());
+ EXPECT_TRUE(stream->Write(resp, grpc::WriteOptions()));
+ }
+ return Status::OK;
+ }
+
+ Status RequestStream(ServerContext* context,
+ ServerReader<EchoRequest>* reader,
+ EchoResponse* resp) override {
+ auto client_metadata = context->client_metadata();
+ for (const auto& pair : client_metadata) {
+ context->AddTrailingMetadata(ToString(pair.first), ToString(pair.second));
+ }
+
+ EchoRequest req;
+ string response_str = "";
+ while (reader->Read(&req)) {
+ response_str += req.message();
+ }
+ resp->set_message(response_str);
+ return Status::OK;
+ }
+
+ Status ResponseStream(ServerContext* context, const EchoRequest* req,
+ ServerWriter<EchoResponse>* writer) override {
+ auto client_metadata = context->client_metadata();
+ for (const auto& pair : client_metadata) {
+ context->AddTrailingMetadata(ToString(pair.first), ToString(pair.second));
+ }
+
+ EchoResponse resp;
+ resp.set_message(req->message());
+ for (int i = 0; i < 10; i++) {
+ EXPECT_TRUE(writer->Write(resp));
+ }
+ return Status::OK;
+ }
+};
+
+void MakeCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ Status s = stub->Echo(&ctx, req, &resp);
+ EXPECT_EQ(s.ok(), true);
+ EXPECT_EQ(resp.message(), "Hello");
+}
+
+void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ string expected_resp = "";
+ auto writer = stub->RequestStream(&ctx, &resp);
+ for (int i = 0; i < 10; i++) {
+ writer->Write(req);
+ expected_resp += "Hello";
+ }
+ writer->WritesDone();
+ Status s = writer->Finish();
+ EXPECT_EQ(s.ok(), true);
+ EXPECT_EQ(resp.message(), expected_resp);
+}
+
+void MakeServerStreamingCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ string expected_resp = "";
+ auto reader = stub->ResponseStream(&ctx, req);
+ int count = 0;
+ while (reader->Read(&resp)) {
+ EXPECT_EQ(resp.message(), "Hello");
+ count++;
+ }
+ ASSERT_EQ(count, 10);
+ Status s = reader->Finish();
+ EXPECT_EQ(s.ok(), true);
+}
+
+void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ EchoResponse resp;
+ ctx.AddMetadata("testkey", "testvalue");
+ auto stream = stub->BidiStream(&ctx);
+ for (auto i = 0; i < 10; i++) {
+ req.set_message("Hello" + std::to_string(i));
+ stream->Write(req);
+ stream->Read(&resp);
+ EXPECT_EQ(req.message(), resp.message());
+ }
+ ASSERT_TRUE(stream->WritesDone());
+ Status s = stream->Finish();
+ EXPECT_EQ(s.ok(), true);
+}
+
+void MakeCallbackCall(const std::shared_ptr<Channel>& channel) {
+ auto stub = grpc::testing::EchoTestService::NewStub(channel);
+ ClientContext ctx;
+ EchoRequest req;
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ req.mutable_param()->set_echo_metadata(true);
+ ctx.AddMetadata("testkey", "testvalue");
+ req.set_message("Hello");
+ EchoResponse resp;
+ stub->experimental_async()->Echo(&ctx, &req, &resp,
+ [&resp, &mu, &done, &cv](Status s) {
+ // gpr_log(GPR_ERROR, "got the callback");
+ EXPECT_EQ(s.ok(), true);
+ EXPECT_EQ(resp.message(), "Hello");
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+ std::unique_lock<std::mutex> l(mu);
+ while (!done) {
+ cv.wait(l);
+ }
+}
+
+} // namespace testing
+} // namespace grpc \ No newline at end of file
diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc
index f59fc63f06..57b85a479e 100644
--- a/test/cpp/end2end/server_interceptors_end2end_test.cc
+++ b/test/cpp/end2end/server_interceptors_end2end_test.cc
@@ -32,9 +32,9 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
+#include "test/cpp/end2end/interceptors_util.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/byte_buffer_proto_helper.h"
-#include "test/cpp/util/string_ref_helper.h"
#include <gtest/gtest.h>
@@ -42,28 +42,6 @@ namespace grpc {
namespace testing {
namespace {
-class EchoTestServiceStreamingImpl : public EchoTestService::Service {
- public:
- ~EchoTestServiceStreamingImpl() override {}
-
- Status BidiStream(
- ServerContext* context,
- grpc::ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
- EchoRequest req;
- EchoResponse resp;
- auto client_metadata = context->client_metadata();
- for (const auto& pair : client_metadata) {
- context->AddTrailingMetadata(ToString(pair.first), ToString(pair.second));
- }
-
- while (stream->Read(&req)) {
- resp.set_message(req.message());
- stream->Write(resp, grpc::WriteOptions());
- }
- return Status::OK;
- }
-};
-
/* This interceptor does nothing. Just keeps a global count on the number of
* times it was invoked. */
class DummyInterceptor : public experimental::Interceptor {
@@ -181,20 +159,7 @@ class LoggingInterceptorFactory
}
};
-void MakeCall(const std::shared_ptr<Channel>& channel) {
- auto stub = grpc::testing::EchoTestService::NewStub(channel);
- ClientContext ctx;
- EchoRequest req;
- req.mutable_param()->set_echo_metadata(true);
- ctx.AddMetadata("testkey", "testvalue");
- req.set_message("Hello");
- EchoResponse resp;
- Status s = stub->Echo(&ctx, req, &resp);
- EXPECT_EQ(s.ok(), true);
- EXPECT_EQ(resp.message(), "Hello");
-}
-
-/*void MakeStreamingCall(const std::shared_ptr<Channel>& channel) {
+void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
auto stub = grpc::testing::EchoTestService::NewStub(channel);
ClientContext ctx;
EchoRequest req;
@@ -210,7 +175,7 @@ void MakeCall(const std::shared_ptr<Channel>& channel) {
ASSERT_TRUE(stream->WritesDone());
Status s = stream->Finish();
EXPECT_EQ(s.ok(), true);
-}*/
+}
class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test {
protected:
@@ -240,7 +205,7 @@ class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test {
std::unique_ptr<Server> server_;
};
-TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, ServerInterceptorTest) {
+TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) {
ChannelArguments args;
DummyInterceptor::Reset();
auto channel = CreateChannel(server_address_, InsecureChannelCredentials());
@@ -249,10 +214,9 @@ TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, ServerInterceptorTest) {
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
}
-class ServerInterceptorsEnd2endSyncClientStreamingTest
- : public ::testing::Test {
+class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test {
protected:
- ServerInterceptorsEnd2endSyncClientStreamingTest() {
+ ServerInterceptorsEnd2endSyncStreamingTest() {
int port = grpc_pick_unused_port_or_die();
ServerBuilder builder;
@@ -274,16 +238,33 @@ class ServerInterceptorsEnd2endSyncClientStreamingTest
server_ = builder.BuildAndStart();
}
std::string server_address_;
- TestServiceImpl service_;
+ EchoTestServiceStreamingImpl service_;
std::unique_ptr<Server> server_;
};
-TEST_F(ServerInterceptorsEnd2endSyncClientStreamingTest,
- ServerInterceptorTest) {
+TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ClientStreamingTest) {
ChannelArguments args;
DummyInterceptor::Reset();
auto channel = CreateChannel(server_address_, InsecureChannelCredentials());
- MakeCall(channel);
+ MakeClientStreamingCall(channel);
+ // Make sure all 20 dummy interceptors were run
+ EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
+}
+
+TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ServerStreamingTest) {
+ ChannelArguments args;
+ DummyInterceptor::Reset();
+ auto channel = CreateChannel(server_address_, InsecureChannelCredentials());
+ MakeServerStreamingCall(channel);
+ // Make sure all 20 dummy interceptors were run
+ EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
+}
+
+TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, BidiStreamingTest) {
+ ChannelArguments args;
+ DummyInterceptor::Reset();
+ auto channel = CreateChannel(server_address_, InsecureChannelCredentials());
+ MakeBidiStreamingCall(channel);
// Make sure all 20 dummy interceptors were run
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
}
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 32441038b8..a2240d28ce 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -3394,12 +3394,15 @@
"grpc++_test_util",
"grpc_test_util"
],
- "headers": [],
+ "headers": [
+ "test/cpp/end2end/interceptors_util.h"
+ ],
"is_filegroup": false,
"language": "c++",
"name": "client_interceptors_end2end_test",
"src": [
- "test/cpp/end2end/client_interceptors_end2end_test.cc"
+ "test/cpp/end2end/client_interceptors_end2end_test.cc",
+ "test/cpp/end2end/interceptors_util.h"
],
"third_party": false,
"type": "target"