diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-10-25 21:43:49 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-10-25 21:43:49 -0700 |
commit | 312feb4202b6b5b1b5049102cb8b4b4ebac44551 (patch) | |
tree | cfa17ae44bf77de906ebb0eae8962d35993cbb1b | |
parent | 3896dabb85bd4f7314fbe9f850c4af4294157019 (diff) |
Adding generic rpc and unimplemented rpc test for server interceptors
-rw-r--r-- | include/grpcpp/impl/codegen/server_interface.h | 6 | ||||
-rw-r--r-- | src/cpp/server/server_cc.cc | 17 | ||||
-rw-r--r-- | test/cpp/end2end/server_interceptors_end2end_test.cc | 145 |
3 files changed, 152 insertions, 16 deletions
diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h index 532fff7a54..dab08057cd 100644 --- a/include/grpcpp/impl/codegen/server_interface.h +++ b/include/grpcpp/impl/codegen/server_interface.h @@ -184,7 +184,6 @@ class ServerInterface : public internal::CallHook { const char* name); virtual bool FinalizeResult(void** tag, bool* status) override { - gpr_log(GPR_ERROR, "finalize registeredasyncrequest"); /* If we are done intercepting, then there is nothing more for us to do */ if (done_intercepting_) { return BaseAsyncRequest::FinalizeResult(tag, status); @@ -238,7 +237,6 @@ class ServerInterface : public internal::CallHook { notification_cq_(notification_cq), tag_(tag), request_(request) { - gpr_log(GPR_ERROR, "new payload request"); IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(), notification_cq); } @@ -248,7 +246,6 @@ class ServerInterface : public internal::CallHook { } bool FinalizeResult(void** tag, bool* status) override { - gpr_log(GPR_ERROR, "finalize PayloadAsyncRequest"); /* If we are done intercepting, then there is nothing more for us to do */ if (done_intercepting_) { return RegisteredAsyncRequest::FinalizeResult(tag, status); @@ -313,7 +310,6 @@ class ServerInterface : public internal::CallHook { ServerCompletionQueue* notification_cq, void* tag, Message* message) { GPR_CODEGEN_ASSERT(method); - gpr_log(GPR_ERROR, "request async method with payload"); new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, notification_cq, tag, message); } @@ -324,7 +320,6 @@ class ServerInterface : public internal::CallHook { CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { GPR_CODEGEN_ASSERT(method); - gpr_log(GPR_ERROR, "request async method with no payload"); new NoPayloadAsyncRequest(method, this, context, stream, call_cq, notification_cq, tag); } @@ -334,7 +329,6 @@ class ServerInterface : public internal::CallHook { CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - gpr_log(GPR_ERROR, "request async generic call"); new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, tag, true); } diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 2f5493dbf8..6da4b32406 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -132,10 +132,13 @@ class Server::UnimplementedAsyncResponse final ~UnimplementedAsyncResponse() { delete request_; } bool FinalizeResult(void** tag, bool* status) override { - internal::CallOpSet< - internal::CallOpSendInitialMetadata, - internal::CallOpServerSendStatus>::FinalizeResult(tag, status); - delete this; + if (internal::CallOpSet< + internal::CallOpSendInitialMetadata, + internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) { + delete this; + } else { + // The tag was swallowed due to interception. We will see it again. + } return false; } @@ -755,7 +758,6 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest( /* Set up interception state partially for the receive ops. call_wrapper_ is * not filled at this point, but it will be filled before the interceptors are * run. */ - gpr_log(GPR_ERROR, "Created base async request"); interceptor_methods_.SetCall(&call_wrapper_); interceptor_methods_.SetReverse(); call_cq_->RegisterAvalanching(); // This op will trigger more ops @@ -767,9 +769,7 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() { bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { - gpr_log(GPR_ERROR, "in finalize result"); if (done_intercepting_) { - gpr_log(GPR_ERROR, "done running interceptors"); *tag = tag_; if (delete_on_finalize_) { delete this; @@ -788,7 +788,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, stream_->BindCall(&call_wrapper_); if (*status && call_ && call_wrapper_.server_rpc_info()) { - gpr_log(GPR_ERROR, "here"); done_intercepting_ = true; // Set interception point for RECV INITIAL METADATA interceptor_methods_.AddInterceptionHookPoint( @@ -803,7 +802,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, // There were interceptors to be run, so // ContinueFinalizeResultAfterInterception will be run when interceptors // are done. - gpr_log(GPR_ERROR, "don't return this tag"); return false; } } @@ -819,7 +817,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, void ServerInterface::BaseAsyncRequest:: ContinueFinalizeResultAfterInterception() { - gpr_log(GPR_ERROR, "continue finalize result"); context_->BeginCompletionOp(&call_wrapper_); // Queue a tag which will be returned immediately grpc_core::ExecCtx exec_ctx; diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index 956aec9359..49e4f13670 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -425,6 +425,151 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) { grpc_recycle_unused_port(port); } +TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { + DummyInterceptor::Reset(); + int port = grpc_pick_unused_port_or_die(); + string server_address = "localhost:" + std::to_string(port); + ServerBuilder builder; + AsyncGenericService service; + builder.AddListeningPort(server_address, InsecureServerCredentials()); + builder.RegisterAsyncGenericService(&service); + std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + creators; + for (auto i = 0; i < 20; i++) { + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( + new DummyInterceptorFactory())); + } + builder.experimental().SetInterceptorCreators(std::move(creators)); + auto cq = builder.AddCompletionQueue(); + auto server = builder.BuildAndStart(); + + ChannelArguments args; + auto channel = CreateChannel(server_address, InsecureChannelCredentials()); + GenericStub generic_stub(channel); + + const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo"); + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + GenericServerContext srv_ctx; + GenericServerAsyncReaderWriter stream(&srv_ctx); + + // The string needs to be long enough to test heap-based slice. + send_request.set_message("Hello"); + cli_ctx.AddMetadata("testkey", "testvalue"); + + std::unique_ptr<GenericClientAsyncReaderWriter> call = + generic_stub.PrepareCall(&cli_ctx, kMethodName, cq.get()); + call->StartCall(tag(1)); + Verifier().Expect(1, true).Verify(cq.get()); + std::unique_ptr<ByteBuffer> send_buffer = + SerializeToByteBuffer(&send_request); + call->Write(*send_buffer, tag(2)); + // Send ByteBuffer can be destroyed after calling Write. + send_buffer.reset(); + Verifier().Expect(2, true).Verify(cq.get()); + call->WritesDone(tag(3)); + Verifier().Expect(3, true).Verify(cq.get()); + + service.RequestCall(&srv_ctx, &stream, cq.get(), cq.get(), tag(4)); + + Verifier().Expect(4, true).Verify(cq.get()); + EXPECT_EQ(kMethodName, srv_ctx.method()); + EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue")); + srv_ctx.AddTrailingMetadata("testkey", "testvalue"); + + ByteBuffer recv_buffer; + stream.Read(&recv_buffer, tag(5)); + Verifier().Expect(5, true).Verify(cq.get()); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + send_buffer = SerializeToByteBuffer(&send_response); + stream.Write(*send_buffer, tag(6)); + send_buffer.reset(); + Verifier().Expect(6, true).Verify(cq.get()); + + stream.Finish(Status::OK, tag(7)); + Verifier().Expect(7, true).Verify(cq.get()); + + recv_buffer.Clear(); + call->Read(&recv_buffer, tag(8)); + Verifier().Expect(8, true).Verify(cq.get()); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); + + call->Finish(&recv_status, tag(9)); + Verifier().Expect(9, true).Verify(cq.get()); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey", + "testvalue")); + + // Make sure all 20 dummy interceptors were run + EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); + + server->Shutdown(); + cq->Shutdown(); + void* ignored_tag; + bool ignored_ok; + while (cq->Next(&ignored_tag, &ignored_ok)) + ; + grpc_recycle_unused_port(port); +} + +TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) { + DummyInterceptor::Reset(); + int port = grpc_pick_unused_port_or_die(); + string server_address = "localhost:" + std::to_string(port); + ServerBuilder builder; + builder.AddListeningPort(server_address, InsecureServerCredentials()); + std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + creators; + for (auto i = 0; i < 20; i++) { + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( + new DummyInterceptorFactory())); + } + builder.experimental().SetInterceptorCreators(std::move(creators)); + auto cq = builder.AddCompletionQueue(); + auto server = builder.BuildAndStart(); + + ChannelArguments args; + std::shared_ptr<Channel> channel = + CreateChannel(server_address, InsecureChannelCredentials()); + std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub; + stub = grpc::testing::UnimplementedEchoService::NewStub(channel); + EchoRequest send_request; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + send_request.set_message("Hello"); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get())); + + response_reader->Finish(&recv_response, &recv_status, tag(4)); + Verifier().Expect(4, true).Verify(cq.get()); + + EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code()); + EXPECT_EQ("", recv_status.error_message()); + + // Make sure all 20 dummy interceptors were run + // EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20); + + server->Shutdown(); + cq->Shutdown(); + void* ignored_tag; + bool ignored_ok; + while (cq->Next(&ignored_tag, &ignored_ok)) + ; + grpc_recycle_unused_port(port); +} + } // namespace } // namespace testing } // namespace grpc |