diff options
author | 2018-10-23 13:56:36 -0700 | |
---|---|---|
committer | 2018-10-23 13:56:36 -0700 | |
commit | 4da91c1156488778d989db1c623876bc211b140e (patch) | |
tree | 1eca8e0cd9fddb750ccbcc9967a9869aeb405214 | |
parent | 0c7250c7b40608bc476f3e40eeafa616cb50df03 (diff) |
First test for server interception
-rw-r--r-- | include/grpcpp/impl/codegen/call.h | 38 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/method_handler_impl.h | 3 | ||||
-rw-r--r-- | include/grpcpp/impl/codegen/server_interceptor.h | 2 | ||||
-rw-r--r-- | test/cpp/end2end/BUILD | 19 | ||||
-rw-r--r-- | test/cpp/end2end/client_interceptors_end2end_test.cc | 9 | ||||
-rw-r--r-- | test/cpp/end2end/server_interceptors_end2end_test.cc | 232 |
6 files changed, 280 insertions, 23 deletions
diff --git a/include/grpcpp/impl/codegen/call.h b/include/grpcpp/impl/codegen/call.h index 1fc8481bc6..5a12ac539e 100644 --- a/include/grpcpp/impl/codegen/call.h +++ b/include/grpcpp/impl/codegen/call.h @@ -963,12 +963,13 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { // them is invoked if there were no interceptors registered. bool RunInterceptors() { auto* client_rpc_info = call_->client_rpc_info(); - if (client_rpc_info == nullptr || - client_rpc_info->interceptors_.size() == 0) { - return true; - } else { - RunClientInterceptors(); - return false; + if (client_rpc_info != nullptr) { + if (client_rpc_info->interceptors_.size() == 0) { + return true; + } else { + RunClientInterceptors(); + return false; + } } auto* server_rpc_info = call_->server_rpc_info(); @@ -1070,6 +1071,8 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { curr_iteration_++; if (curr_iteration_ < static_cast<long>(rpc_info->interceptors_.size())) { return rpc_info->RunInterceptor(this, curr_iteration_); + } else if (ops_) { + return ops_->ContinueFillOpsAfterInterception(); } } else { curr_iteration_--; @@ -1077,12 +1080,10 @@ class InterceptorBatchMethodsImpl : public InternalInterceptorBatchMethods { if (curr_iteration_ >= 0) { // Continue running interceptors return rpc_info->RunInterceptor(this, curr_iteration_); + } else if (ops_) { + return ops_->ContinueFinalizeResultAfterInterception(); } } - // we are done running all the interceptors - if (ops_) { - ops_->ContinueFinalizeResultAfterInterception(); - } GPR_CODEGEN_ASSERT(callback_); callback_(); } @@ -1165,13 +1166,14 @@ class CallOpSet : public CallOpSetInterface, } void FillOps(Call* call) override { - // gpr_log(GPR_ERROR, "filling ops %p", this); + gpr_log(GPR_ERROR, "filling ops %p", this); done_intercepting_ = false; g_core_codegen_interface->grpc_call_ref(call->call()); call_ = *call; // It's fine to create a copy of call since it's just pointers if (RunInterceptors()) { + gpr_log(GPR_ERROR, "no interceptors on send path"); ContinueFillOpsAfterInterception(); } else { // After the interceptors are run, ContinueFillOpsAfterInterception will @@ -1180,12 +1182,12 @@ class CallOpSet : public CallOpSetInterface, } bool FinalizeResult(void** tag, bool* status) override { - // gpr_log(GPR_ERROR, "finalizing result %p", this); + gpr_log(GPR_ERROR, "finalizing result %p", this); if (done_intercepting_) { // We have already finished intercepting and filling in the results. This // round trip from the core needed to be made because interceptors were // run - // gpr_log(GPR_ERROR, "done intercepting"); + gpr_log(GPR_ERROR, "done intercepting"); *tag = return_tag_; g_core_codegen_interface->grpc_call_unref(call_.call()); return true; @@ -1197,15 +1199,15 @@ class CallOpSet : public CallOpSetInterface, this->Op4::FinishOp(status); this->Op5::FinishOp(status); this->Op6::FinishOp(status); - // gpr_log(GPR_ERROR, "done finish ops"); + gpr_log(GPR_ERROR, "done finish ops"); if (RunInterceptorsPostRecv()) { *tag = return_tag_; g_core_codegen_interface->grpc_call_unref(call_.call()); - // gpr_log(GPR_ERROR, "no interceptors"); + gpr_log(GPR_ERROR, "no interceptors"); return true; } - // gpr_log(GPR_ERROR, "running interceptors"); + gpr_log(GPR_ERROR, "running interceptors"); // Interceptors are going to be run, so we can't return the tag just yet. // After the interceptors are run, ContinueFinalizeResultAfterInterception return false; @@ -1243,7 +1245,7 @@ class CallOpSet : public CallOpSetInterface, this->Op4::AddOp(ops, &nops); this->Op5::AddOp(ops, &nops); this->Op6::AddOp(ops, &nops); - // gpr_log(GPR_ERROR, "going to start call batch %p", this); + gpr_log(GPR_ERROR, "going to start call batch %p", this); GPR_CODEGEN_ASSERT(GRPC_CALL_OK == g_core_codegen_interface->grpc_call_start_batch( call_.call(), ops, nops, cq_tag(), nullptr)); @@ -1269,8 +1271,6 @@ class CallOpSet : public CallOpSetInterface, this->Op6::SetInterceptionHookPoint(&interceptor_methods_); interceptor_methods_.SetCallOpSetInterface(this); interceptor_methods_.SetCall(&call_); - // interceptor_methods_.SetFunctions(ContinueFillOpsAfterInterception, - // SetHijackingState, ContinueFinalizeResultAfterInterception); return interceptor_methods_.RunInterceptors(); } // Returns true if no interceptors need to be run diff --git a/include/grpcpp/impl/codegen/method_handler_impl.h b/include/grpcpp/impl/codegen/method_handler_impl.h index 4f02e3e39b..f1f5b43031 100644 --- a/include/grpcpp/impl/codegen/method_handler_impl.h +++ b/include/grpcpp/impl/codegen/method_handler_impl.h @@ -59,6 +59,7 @@ class RpcMethodHandler : public MethodHandler { : func_(func), service_(service) {} void RunHandler(const HandlerParameter& param) final { + gpr_log(GPR_ERROR, "running handler"); ResponseType rsp; Status status = param.status; if (status.ok()) { @@ -83,7 +84,7 @@ class RpcMethodHandler : public MethodHandler { } ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); + GPR_CODEGEN_ASSERT(param.call->cq()->Pluck(&ops)); } void* Deserialize(grpc_byte_buffer* req, Status* status) final { diff --git a/include/grpcpp/impl/codegen/server_interceptor.h b/include/grpcpp/impl/codegen/server_interceptor.h index 3f8cbcca8d..5c38ceaace 100644 --- a/include/grpcpp/impl/codegen/server_interceptor.h +++ b/include/grpcpp/impl/codegen/server_interceptor.h @@ -73,7 +73,9 @@ class ServerRpcInfo { const std::vector< std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>& creators) { + gpr_log(GPR_ERROR, "Registering interceptors"); for (const auto& creator : creators) { + gpr_log(GPR_ERROR, "registering one"); interceptors_.push_back(std::unique_ptr<experimental::Interceptor>( creator->CreateServerInterceptor(this))); } diff --git a/test/cpp/end2end/BUILD b/test/cpp/end2end/BUILD index 019ec43f96..d246f41ff8 100644 --- a/test/cpp/end2end/BUILD +++ b/test/cpp/end2end/BUILD @@ -489,6 +489,25 @@ grpc_cc_binary( ) grpc_cc_test( + name = "server_interceptors_end2end_test", + srcs = ["server_interceptors_end2end_test.cc"], + external_deps = [ + "gtest", + ], + deps = [ + ":test_service_impl", + "//:gpr", + "//:grpc", + "//:grpc++", + "//src/proto/grpc/testing:echo_messages_proto", + "//src/proto/grpc/testing:echo_proto", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + "//test/cpp/util:test_util", + ], +) + +grpc_cc_test( name = "server_load_reporting_end2end_test", srcs = ["server_load_reporting_end2end_test.cc"], external_deps = [ diff --git a/test/cpp/end2end/client_interceptors_end2end_test.cc b/test/cpp/end2end/client_interceptors_end2end_test.cc index 5640158bee..3716582c58 100644 --- a/test/cpp/end2end/client_interceptors_end2end_test.cc +++ b/test/cpp/end2end/client_interceptors_end2end_test.cc @@ -64,9 +64,9 @@ class EchoTestServiceStreamingImpl : public EchoTestService::Service { } }; -class ClientInterceptorsStreamingEnd2EndTest : public ::testing::Test { +class ClientInterceptorsStreamingEnd2endTest : public ::testing::Test { protected: - ClientInterceptorsStreamingEnd2EndTest() { + ClientInterceptorsStreamingEnd2endTest() { int port = grpc_pick_unused_port_or_die(); ServerBuilder builder; @@ -75,6 +75,9 @@ class ClientInterceptorsStreamingEnd2EndTest : public ::testing::Test { builder.RegisterService(&service_); server_ = builder.BuildAndStart(); } + + ~ClientInterceptorsStreamingEnd2endTest() { server_->Shutdown(); } + std::string server_address_; EchoTestServiceStreamingImpl service_; std::unique_ptr<Server> server_; @@ -613,7 +616,7 @@ TEST_F(ClientInterceptorsEnd2endTest, EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); } -TEST_F(ClientInterceptorsStreamingEnd2EndTest, ClientInterceptorLoggingTest) { +TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientInterceptorLoggingTest) { ChannelArguments args; DummyInterceptor::Reset(); auto creators = std::unique_ptr<std::vector< diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc new file mode 100644 index 0000000000..ef1df53189 --- /dev/null +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -0,0 +1,232 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <memory> +#include <vector> + +#include <grpcpp/channel.h> +#include <grpcpp/client_context.h> +#include <grpcpp/create_channel.h> +#include <grpcpp/generic/generic_stub.h> +#include <grpcpp/impl/codegen/proto_utils.h> +#include <grpcpp/impl/codegen/server_interceptor.h> +#include <grpcpp/server.h> +#include <grpcpp/server_builder.h> +#include <grpcpp/server_context.h> + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" +#include "test/cpp/util/byte_buffer_proto_helper.h" +#include "test/cpp/util/string_ref_helper.h" + +#include <gtest/gtest.h> + +namespace grpc { +namespace testing { +namespace { + +/* This interceptor does nothing. Just keeps a global count on the number of + * times it was invoked. */ +class DummyInterceptor : public experimental::Interceptor { + public: + DummyInterceptor(experimental::ServerRpcInfo* info) {} + + virtual void Intercept(experimental::InterceptorBatchMethods* methods) { + gpr_log(GPR_ERROR, "running dummy"); + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { + num_times_run_++; + } else if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints:: + POST_RECV_INITIAL_METADATA)) { + num_times_run_reverse_++; + } + methods->Proceed(); + } + + static void Reset() { + num_times_run_.store(0); + num_times_run_reverse_.store(0); + } + + static int GetNumTimesRun() { + EXPECT_EQ(num_times_run_.load(), num_times_run_reverse_.load()); + return num_times_run_.load(); + } + + private: + static std::atomic<int> num_times_run_; + static std::atomic<int> num_times_run_reverse_; +}; + +std::atomic<int> DummyInterceptor::num_times_run_; +std::atomic<int> DummyInterceptor::num_times_run_reverse_; + +class DummyInterceptorFactory + : public experimental::ServerInterceptorFactoryInterface { + public: + virtual experimental::Interceptor* CreateServerInterceptor( + experimental::ServerRpcInfo* info) override { + gpr_log(GPR_ERROR, "created dummy"); + return new DummyInterceptor(info); + } +}; + +class LoggingInterceptor : public experimental::Interceptor { + public: + LoggingInterceptor(experimental::ServerRpcInfo* info) { info_ = info; } + + virtual void Intercept(experimental::InterceptorBatchMethods* methods) { + // gpr_log(GPR_ERROR, "ran this"); + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { + auto* map = methods->GetSendInitialMetadata(); + // Check that we can see the test metadata + ASSERT_EQ(map->size(), static_cast<unsigned>(1)); + auto iterator = map->begin(); + EXPECT_EQ("testkey", iterator->first); + EXPECT_EQ("testvalue", iterator->second); + } + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { + EchoRequest req; + auto* buffer = methods->GetSendMessage(); + auto copied_buffer = *buffer; + SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req); + EXPECT_TRUE(req.message().find("Hello") == 0); + } + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { + // Got nothing to do here for now + } + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) { + auto* map = methods->GetRecvInitialMetadata(); + // Got nothing better to do here for now + EXPECT_EQ(map->size(), static_cast<unsigned>(0)); + } + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) { + EchoResponse* resp = + static_cast<EchoResponse*>(methods->GetRecvMessage()); + EXPECT_TRUE(resp->message().find("Hello") == 0); + } + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::POST_RECV_STATUS)) { + auto* map = methods->GetRecvTrailingMetadata(); + bool found = false; + // Check that we received the metadata as an echo + for (const auto& pair : *map) { + found = pair.first.starts_with("testkey") && + pair.second.starts_with("testvalue"); + if (found) break; + } + EXPECT_EQ(found, true); + auto* status = methods->GetRecvStatus(); + EXPECT_EQ(status->ok(), true); + } + methods->Proceed(); + } + + private: + experimental::ServerRpcInfo* info_; +}; + +class LoggingInterceptorFactory + : public experimental::ServerInterceptorFactoryInterface { + public: + virtual experimental::Interceptor* CreateServerInterceptor( + experimental::ServerRpcInfo* info) override { + return new LoggingInterceptor(info); + } +}; + +class ServerInterceptorsEnd2endTest : public ::testing::Test { + protected: + ServerInterceptorsEnd2endTest() { + int port = grpc_pick_unused_port_or_die(); + + ServerBuilder builder; + server_address_ = "localhost:" + std::to_string(port); + builder.AddListeningPort(server_address_, InsecureServerCredentials()); + builder.RegisterService(&service_); + + std::vector< + std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + creators; + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( + new DummyInterceptorFactory())); + builder.experimental().SetInterceptorCreators(std::move(creators)); + server_ = builder.BuildAndStart(); + } + std::string server_address_; + TestServiceImpl service_; + std::unique_ptr<Server> server_; +}; + +void MakeCall(const std::shared_ptr<Channel>& channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + req.mutable_param()->set_echo_metadata(true); + ctx.AddMetadata("testkey", "testvalue"); + req.set_message("Hello"); + EchoResponse resp; + Status s = stub->Echo(&ctx, req, &resp); + EXPECT_EQ(s.ok(), true); + EXPECT_EQ(resp.message(), "Hello"); +} + +/*void MakeStreamingCall(const std::shared_ptr<Channel>& channel) { + auto stub = grpc::testing::EchoTestService::NewStub(channel); + ClientContext ctx; + EchoRequest req; + EchoResponse resp; + ctx.AddMetadata("testkey", "testvalue"); + auto stream = stub->BidiStream(&ctx); + for (auto i = 0; i < 10; i++) { + req.set_message("Hello" + std::to_string(i)); + stream->Write(req); + stream->Read(&resp); + EXPECT_EQ(req.message(), resp.message()); + } + ASSERT_TRUE(stream->WritesDone()); + Status s = stream->Finish(); + EXPECT_EQ(s.ok(), true); +}*/ + +TEST_F(ServerInterceptorsEnd2endTest, ServerInterceptorDummyTest) { + ChannelArguments args; + DummyInterceptor::Reset(); + auto channel = CreateChannel(server_address_, InsecureChannelCredentials()); + MakeCall(channel); + // Make sure all 20 dummy interceptors were run + EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 1); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} |