aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-10-25 21:43:49 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-10-25 21:43:49 -0700
commit312feb4202b6b5b1b5049102cb8b4b4ebac44551 (patch)
treecfa17ae44bf77de906ebb0eae8962d35993cbb1b
parent3896dabb85bd4f7314fbe9f850c4af4294157019 (diff)
Adding generic rpc and unimplemented rpc test for server interceptors
-rw-r--r--include/grpcpp/impl/codegen/server_interface.h6
-rw-r--r--src/cpp/server/server_cc.cc17
-rw-r--r--test/cpp/end2end/server_interceptors_end2end_test.cc145
3 files changed, 152 insertions, 16 deletions
diff --git a/include/grpcpp/impl/codegen/server_interface.h b/include/grpcpp/impl/codegen/server_interface.h
index 532fff7a54..dab08057cd 100644
--- a/include/grpcpp/impl/codegen/server_interface.h
+++ b/include/grpcpp/impl/codegen/server_interface.h
@@ -184,7 +184,6 @@ class ServerInterface : public internal::CallHook {
const char* name);
virtual bool FinalizeResult(void** tag, bool* status) override {
- gpr_log(GPR_ERROR, "finalize registeredasyncrequest");
/* If we are done intercepting, then there is nothing more for us to do */
if (done_intercepting_) {
return BaseAsyncRequest::FinalizeResult(tag, status);
@@ -238,7 +237,6 @@ class ServerInterface : public internal::CallHook {
notification_cq_(notification_cq),
tag_(tag),
request_(request) {
- gpr_log(GPR_ERROR, "new payload request");
IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
notification_cq);
}
@@ -248,7 +246,6 @@ class ServerInterface : public internal::CallHook {
}
bool FinalizeResult(void** tag, bool* status) override {
- gpr_log(GPR_ERROR, "finalize PayloadAsyncRequest");
/* If we are done intercepting, then there is nothing more for us to do */
if (done_intercepting_) {
return RegisteredAsyncRequest::FinalizeResult(tag, status);
@@ -313,7 +310,6 @@ class ServerInterface : public internal::CallHook {
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
GPR_CODEGEN_ASSERT(method);
- gpr_log(GPR_ERROR, "request async method with payload");
new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
notification_cq, tag, message);
}
@@ -324,7 +320,6 @@ class ServerInterface : public internal::CallHook {
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
GPR_CODEGEN_ASSERT(method);
- gpr_log(GPR_ERROR, "request async method with no payload");
new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
notification_cq, tag);
}
@@ -334,7 +329,6 @@ class ServerInterface : public internal::CallHook {
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
- gpr_log(GPR_ERROR, "request async generic call");
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag, true);
}
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 2f5493dbf8..6da4b32406 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -132,10 +132,13 @@ class Server::UnimplementedAsyncResponse final
~UnimplementedAsyncResponse() { delete request_; }
bool FinalizeResult(void** tag, bool* status) override {
- internal::CallOpSet<
- internal::CallOpSendInitialMetadata,
- internal::CallOpServerSendStatus>::FinalizeResult(tag, status);
- delete this;
+ if (internal::CallOpSet<
+ internal::CallOpSendInitialMetadata,
+ internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) {
+ delete this;
+ } else {
+ // The tag was swallowed due to interception. We will see it again.
+ }
return false;
}
@@ -755,7 +758,6 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
/* Set up interception state partially for the receive ops. call_wrapper_ is
* not filled at this point, but it will be filled before the interceptors are
* run. */
- gpr_log(GPR_ERROR, "Created base async request");
interceptor_methods_.SetCall(&call_wrapper_);
interceptor_methods_.SetReverse();
call_cq_->RegisterAvalanching(); // This op will trigger more ops
@@ -767,9 +769,7 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) {
- gpr_log(GPR_ERROR, "in finalize result");
if (done_intercepting_) {
- gpr_log(GPR_ERROR, "done running interceptors");
*tag = tag_;
if (delete_on_finalize_) {
delete this;
@@ -788,7 +788,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
stream_->BindCall(&call_wrapper_);
if (*status && call_ && call_wrapper_.server_rpc_info()) {
- gpr_log(GPR_ERROR, "here");
done_intercepting_ = true;
// Set interception point for RECV INITIAL METADATA
interceptor_methods_.AddInterceptionHookPoint(
@@ -803,7 +802,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
// There were interceptors to be run, so
// ContinueFinalizeResultAfterInterception will be run when interceptors
// are done.
- gpr_log(GPR_ERROR, "don't return this tag");
return false;
}
}
@@ -819,7 +817,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() {
- gpr_log(GPR_ERROR, "continue finalize result");
context_->BeginCompletionOp(&call_wrapper_);
// Queue a tag which will be returned immediately
grpc_core::ExecCtx exec_ctx;
diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc
index 956aec9359..49e4f13670 100644
--- a/test/cpp/end2end/server_interceptors_end2end_test.cc
+++ b/test/cpp/end2end/server_interceptors_end2end_test.cc
@@ -425,6 +425,151 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) {
grpc_recycle_unused_port(port);
}
+TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
+ DummyInterceptor::Reset();
+ int port = grpc_pick_unused_port_or_die();
+ string server_address = "localhost:" + std::to_string(port);
+ ServerBuilder builder;
+ AsyncGenericService service;
+ builder.AddListeningPort(server_address, InsecureServerCredentials());
+ builder.RegisterAsyncGenericService(&service);
+ std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
+ creators;
+ for (auto i = 0; i < 20; i++) {
+ creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
+ new DummyInterceptorFactory()));
+ }
+ builder.experimental().SetInterceptorCreators(std::move(creators));
+ auto cq = builder.AddCompletionQueue();
+ auto server = builder.BuildAndStart();
+
+ ChannelArguments args;
+ auto channel = CreateChannel(server_address, InsecureChannelCredentials());
+ GenericStub generic_stub(channel);
+
+ const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ GenericServerContext srv_ctx;
+ GenericServerAsyncReaderWriter stream(&srv_ctx);
+
+ // The string needs to be long enough to test heap-based slice.
+ send_request.set_message("Hello");
+ cli_ctx.AddMetadata("testkey", "testvalue");
+
+ std::unique_ptr<GenericClientAsyncReaderWriter> call =
+ generic_stub.PrepareCall(&cli_ctx, kMethodName, cq.get());
+ call->StartCall(tag(1));
+ Verifier().Expect(1, true).Verify(cq.get());
+ std::unique_ptr<ByteBuffer> send_buffer =
+ SerializeToByteBuffer(&send_request);
+ call->Write(*send_buffer, tag(2));
+ // Send ByteBuffer can be destroyed after calling Write.
+ send_buffer.reset();
+ Verifier().Expect(2, true).Verify(cq.get());
+ call->WritesDone(tag(3));
+ Verifier().Expect(3, true).Verify(cq.get());
+
+ service.RequestCall(&srv_ctx, &stream, cq.get(), cq.get(), tag(4));
+
+ Verifier().Expect(4, true).Verify(cq.get());
+ EXPECT_EQ(kMethodName, srv_ctx.method());
+ EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
+ srv_ctx.AddTrailingMetadata("testkey", "testvalue");
+
+ ByteBuffer recv_buffer;
+ stream.Read(&recv_buffer, tag(5));
+ Verifier().Expect(5, true).Verify(cq.get());
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
+ EXPECT_EQ(send_request.message(), recv_request.message());
+
+ send_response.set_message(recv_request.message());
+ send_buffer = SerializeToByteBuffer(&send_response);
+ stream.Write(*send_buffer, tag(6));
+ send_buffer.reset();
+ Verifier().Expect(6, true).Verify(cq.get());
+
+ stream.Finish(Status::OK, tag(7));
+ Verifier().Expect(7, true).Verify(cq.get());
+
+ recv_buffer.Clear();
+ call->Read(&recv_buffer, tag(8));
+ Verifier().Expect(8, true).Verify(cq.get());
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
+
+ call->Finish(&recv_status, tag(9));
+ Verifier().Expect(9, true).Verify(cq.get());
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.ok());
+ EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
+ "testvalue"));
+
+ // Make sure all 20 dummy interceptors were run
+ EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
+
+ server->Shutdown();
+ cq->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ while (cq->Next(&ignored_tag, &ignored_ok))
+ ;
+ grpc_recycle_unused_port(port);
+}
+
+TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) {
+ DummyInterceptor::Reset();
+ int port = grpc_pick_unused_port_or_die();
+ string server_address = "localhost:" + std::to_string(port);
+ ServerBuilder builder;
+ builder.AddListeningPort(server_address, InsecureServerCredentials());
+ std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
+ creators;
+ for (auto i = 0; i < 20; i++) {
+ creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
+ new DummyInterceptorFactory()));
+ }
+ builder.experimental().SetInterceptorCreators(std::move(creators));
+ auto cq = builder.AddCompletionQueue();
+ auto server = builder.BuildAndStart();
+
+ ChannelArguments args;
+ std::shared_ptr<Channel> channel =
+ CreateChannel(server_address, InsecureChannelCredentials());
+ std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
+ stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
+ EchoRequest send_request;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ send_request.set_message("Hello");
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
+ stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get()));
+
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ Verifier().Expect(4, true).Verify(cq.get());
+
+ EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
+ EXPECT_EQ("", recv_status.error_message());
+
+ // Make sure all 20 dummy interceptors were run
+ // EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
+
+ server->Shutdown();
+ cq->Shutdown();
+ void* ignored_tag;
+ bool ignored_ok;
+ while (cq->Next(&ignored_tag, &ignored_ok))
+ ;
+ grpc_recycle_unused_port(port);
+}
+
} // namespace
} // namespace testing
} // namespace grpc