diff options
-rw-r--r-- | include/grpc++/ext/reflection.grpc.pb.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/method_handler_impl.h | 13 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/service_type.h | 11 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/sync_stream.h | 39 | ||||
-rw-r--r-- | src/compiler/cpp_generator.cc | 96 | ||||
-rw-r--r-- | src/proto/grpc/testing/duplicate/echo_duplicate.proto | 1 | ||||
-rw-r--r-- | test/cpp/codegen/compiler_test_golden | 8 | ||||
-rw-r--r-- | test/cpp/end2end/hybrid_end2end_test.cc | 157 |
8 files changed, 318 insertions, 9 deletions
diff --git a/include/grpc++/ext/reflection.grpc.pb.h b/include/grpc++/ext/reflection.grpc.pb.h index 6e56088497..151271c65b 100644 --- a/include/grpc++/ext/reflection.grpc.pb.h +++ b/include/grpc++/ext/reflection.grpc.pb.h @@ -176,6 +176,8 @@ class ServerReflection GRPC_FINAL { } }; typedef Service StreamedUnaryService; + typedef Service SplitStreamedService; + typedef Service StreamedService; }; } // namespace v1alpha diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 52f927631c..bb992f0e18 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -236,6 +236,19 @@ class StreamedUnaryHandler ServerUnaryStreamer<RequestType, ResponseType>, true>(func) {} }; +template <class RequestType, class ResponseType> +class SplitServerStreamingHandler + : public TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false> { + public: + explicit SplitServerStreamingHandler( + std::function<Status(ServerContext*, + ServerSplitStreamer<RequestType, ResponseType>*)> + func) + : TemplatedBidiStreamingHandler< + ServerSplitStreamer<RequestType, ResponseType>, false>(func) {} +}; + // Handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler { public: diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 72b2225312..bd65ea009e 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -147,14 +147,15 @@ class Service { methods_[index].reset(); } - void MarkMethodStreamedUnary(int index, - MethodHandler* streamed_unary_method) { + void MarkMethodStreamed(int index, MethodHandler* streamed_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && - "Cannot mark an async or generic method Streamed Unary"); - methods_[index]->SetHandler(streamed_unary_method); + "Cannot mark an async or generic method Streamed"); + methods_[index]->SetHandler(streamed_method); // From the server's point of view, streamed unary is a special - // case of BIDI_STREAMING that has 1 read and 1 write, in that order. + // case of BIDI_STREAMING that has 1 read and 1 write, in that order, + // and split server-side streaming is BIDI_STREAMING with 1 read and + // any number of writes, in that order methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING); } diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index e3c5a919b1..9a3efb5119 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -538,7 +538,7 @@ class ServerReaderWriter GRPC_FINAL : public ServerReaderWriterInterface<W, R> { /// the \a NextMessageSize method to determine an upper-bound on the size of /// the message. /// A key difference relative to streaming: ServerUnaryStreamer -/// must have exactly 1 Read and exactly 1 Write, in that order, to function +/// must have exactly 1 Read and exactly 1 Write, in that order, to function /// correctly. Otherwise, the RPC is in error. template <class RequestType, class ResponseType> class ServerUnaryStreamer GRPC_FINAL @@ -577,6 +577,43 @@ class ServerUnaryStreamer GRPC_FINAL bool write_done_; }; +/// A class to represent a flow-controlled server-side streaming call. +/// This is something of a hybrid between server-side and bidi streaming. +/// This is invoked through a server-side streaming call on the client side, +/// but the server responds to it as though it were a bidi streaming call that +/// must first have exactly 1 Read and then any number of Writes. +template <class RequestType, class ResponseType> +class ServerSplitStreamer GRPC_FINAL + : public ServerReaderWriterInterface<ResponseType, RequestType> { + public: + ServerSplitStreamer(Call* call, ServerContext* ctx) + : body_(call, ctx), read_done_(false) {} + + void SendInitialMetadata() GRPC_OVERRIDE { body_.SendInitialMetadata(); } + + bool NextMessageSize(uint32_t* sz) GRPC_OVERRIDE { + return body_.NextMessageSize(sz); + } + + bool Read(RequestType* request) GRPC_OVERRIDE { + if (read_done_) { + return false; + } + read_done_ = true; + return body_.Read(request); + } + + using WriterInterface<ResponseType>::Write; + bool Write(const ResponseType& response, + const WriteOptions& options) GRPC_OVERRIDE { + return read_done_ && body_.Write(response, options); + } + + private: + internal::ServerReaderWriterBody<ResponseType, RequestType> body_; + bool read_done_; +}; + } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index d0c35ea1ab..fa72f9b0d9 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -624,7 +624,7 @@ void PrintHeaderServerMethodStreamedUnary( printer->Indent(); printer->Print(*vars, "WithStreamedUnaryMethod_$Method$() {\n" - " ::grpc::Service::MarkMethodStreamedUnary($Idx$,\n" + " ::grpc::Service::MarkMethodStreamed($Idx$,\n" " new ::grpc::StreamedUnaryHandler< $Request$, " "$Response$>(std::bind" "(&WithStreamedUnaryMethod_$Method$<BaseClass>::" @@ -656,6 +656,58 @@ void PrintHeaderServerMethodStreamedUnary( } } +void PrintHeaderServerMethodSplitStreaming( + Printer *printer, const Method *method, + std::map<grpc::string, grpc::string> *vars) { + (*vars)["Method"] = method->name(); + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + if (method->ServerOnlyStreaming()) { + printer->Print(*vars, "template <class BaseClass>\n"); + printer->Print(*vars, + "class WithSplitStreamingMethod_$Method$ : " + "public BaseClass {\n"); + printer->Print( + " private:\n" + " void BaseClassMustBeDerivedFromService(const Service *service) " + "{}\n"); + printer->Print(" public:\n"); + printer->Indent(); + printer->Print(*vars, + "WithSplitStreamingMethod_$Method$() {\n" + " ::grpc::Service::MarkMethodStreamed($Idx$,\n" + " new ::grpc::SplitServerStreamingHandler< $Request$, " + "$Response$>(std::bind" + "(&WithSplitStreamingMethod_$Method$<BaseClass>::" + "Streamed$Method$, this, std::placeholders::_1, " + "std::placeholders::_2)));\n" + "}\n"); + printer->Print(*vars, + "~WithSplitStreamingMethod_$Method$() GRPC_OVERRIDE {\n" + " BaseClassMustBeDerivedFromService(this);\n" + "}\n"); + printer->Print( + *vars, + "// disable regular version of this method\n" + "::grpc::Status $Method$(" + "::grpc::ServerContext* context, const $Request$* request, " + "::grpc::ServerWriter< $Response$>* writer) GRPC_FINAL GRPC_OVERRIDE " + "{\n" + " abort();\n" + " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" + "}\n"); + printer->Print(*vars, + "// replace default version of method with split streamed\n" + "virtual ::grpc::Status Streamed$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerSplitStreamer< " + "$Request$,$Response$>* server_split_streamer)" + " = 0;\n"); + printer->Outdent(); + printer->Print(*vars, "};\n"); + } +} + void PrintHeaderServerMethodGeneric( Printer *printer, const Method *method, std::map<grpc::string, grpc::string> *vars) { @@ -844,6 +896,48 @@ void PrintHeaderService(Printer *printer, const Service *service, } printer->Print(" StreamedUnaryService;\n"); + // Server side - controlled server-side streaming + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["Idx"] = as_string(i); + PrintHeaderServerMethodSplitStreaming(printer, service->method(i).get(), + vars); + } + + printer->Print("typedef "); + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["method_name"] = service->method(i).get()->name(); + if (service->method(i)->ServerOnlyStreaming()) { + printer->Print(*vars, "WithSplitStreamingMethod_$method_name$<"); + } + } + printer->Print("Service"); + for (int i = 0; i < service->method_count(); ++i) { + if (service->method(i)->ServerOnlyStreaming()) { + printer->Print(" >"); + } + } + printer->Print(" SplitStreamedService;\n"); + + // Server side - typedef for controlled both unary and server-side streaming + printer->Print("typedef "); + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["method_name"] = service->method(i).get()->name(); + if (service->method(i)->ServerOnlyStreaming()) { + printer->Print(*vars, "WithSplitStreamingMethod_$method_name$<"); + } + if (service->method(i)->NoStreaming()) { + printer->Print(*vars, "WithStreamedUnaryMethod_$method_name$<"); + } + } + printer->Print("Service"); + for (int i = 0; i < service->method_count(); ++i) { + if (service->method(i)->NoStreaming() || + service->method(i)->ServerOnlyStreaming()) { + printer->Print(" >"); + } + } + printer->Print(" StreamedService;\n"); + printer->Outdent(); printer->Print("};\n"); printer->Print(service->GetTrailingComments().c_str()); diff --git a/src/proto/grpc/testing/duplicate/echo_duplicate.proto b/src/proto/grpc/testing/duplicate/echo_duplicate.proto index 94130ea767..97fdbc4fd3 100644 --- a/src/proto/grpc/testing/duplicate/echo_duplicate.proto +++ b/src/proto/grpc/testing/duplicate/echo_duplicate.proto @@ -38,4 +38,5 @@ package grpc.testing.duplicate; service EchoTestService { rpc Echo(grpc.testing.EchoRequest) returns (grpc.testing.EchoResponse); + rpc ResponseStream(EchoRequest) returns (stream EchoResponse); } diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 7b0fd6ce80..5f0e824655 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -213,7 +213,7 @@ class ServiceA GRPC_FINAL { void BaseClassMustBeDerivedFromService(const Service *service) {} public: WithStreamedUnaryMethod_MethodA1() { - ::grpc::Service::MarkMethodStreamedUnary(0, + ::grpc::Service::MarkMethodStreamed(0, new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodA1() GRPC_OVERRIDE { @@ -228,6 +228,8 @@ class ServiceA GRPC_FINAL { virtual ::grpc::Status StreamedMethodA1(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::grpc::testing::Request,::grpc::testing::Response>* server_unary_streamer) = 0; }; typedef WithStreamedUnaryMethod_MethodA1<Service > StreamedUnaryService; + typedef Service SplitStreamedService; + typedef WithStreamedUnaryMethod_MethodA1<Service > StreamedService; }; // ServiceB leading comment 1 @@ -312,7 +314,7 @@ class ServiceB GRPC_FINAL { void BaseClassMustBeDerivedFromService(const Service *service) {} public: WithStreamedUnaryMethod_MethodB1() { - ::grpc::Service::MarkMethodStreamedUnary(0, + ::grpc::Service::MarkMethodStreamed(0, new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2))); } ~WithStreamedUnaryMethod_MethodB1() GRPC_OVERRIDE { @@ -327,6 +329,8 @@ class ServiceB GRPC_FINAL { virtual ::grpc::Status StreamedMethodB1(::grpc::ServerContext* context, ::grpc::ServerUnaryStreamer< ::grpc::testing::Request,::grpc::testing::Response>* server_unary_streamer) = 0; }; typedef WithStreamedUnaryMethod_MethodB1<Service > StreamedUnaryService; + typedef Service SplitStreamedService; + typedef WithStreamedUnaryMethod_MethodB1<Service > StreamedService; }; // ServiceB trailing comment 1 diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 8cd2e66347..76a5732f33 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -320,6 +320,29 @@ class HybridEnd2endTest : public ::testing::Test { EXPECT_TRUE(s.ok()); } + void SendSimpleServerStreamingToDupService() { + std::shared_ptr<Channel> channel = + CreateChannel(server_address_.str(), InsecureChannelCredentials()); + auto stub = grpc::testing::duplicate::EchoTestService::NewStub(channel); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_wait_for_ready(true); + request.set_message("hello"); + + auto stream = stub->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0_dup"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1_dup"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "2_dup"); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); + } + void SendBidiStreaming() { EchoRequest request; EchoResponse response; @@ -498,6 +521,140 @@ TEST_F(HybridEnd2endTest, request_stream_handler_thread.join(); } +// Add a second service with one sync split server streaming method. +class SplitResponseStreamDupPkg + : public duplicate::EchoTestService:: + WithSplitStreamingMethod_ResponseStream<TestServiceImplDupPkg> { + public: + Status StreamedResponseStream( + ServerContext* context, + ServerSplitStreamer<EchoRequest, EchoResponse>* stream) GRPC_OVERRIDE { + EchoRequest req; + EchoResponse resp; + uint32_t next_msg_sz; + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + resp.set_message(req.message() + grpc::to_string(i) + "_dup"); + GPR_ASSERT(stream->Write(resp)); + } + return Status::OK; + } +}; + +TEST_F(HybridEnd2endTest, + AsyncRequestStreamResponseStream_SyncSplitStreamedDupService) { + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; + SplitResponseStreamDupPkg dup_service; + SetUpServer(&service, &dup_service, nullptr, 8192); + ResetStub(); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); + TestAllMethods(); + SendSimpleServerStreamingToDupService(); + response_stream_handler_thread.join(); + request_stream_handler_thread.join(); +} + +// Add a second service that is fully split server streamed +class FullySplitStreamedDupPkg + : public duplicate::EchoTestService::SplitStreamedService { + public: + Status StreamedResponseStream( + ServerContext* context, + ServerSplitStreamer<EchoRequest, EchoResponse>* stream) GRPC_OVERRIDE { + EchoRequest req; + EchoResponse resp; + uint32_t next_msg_sz; + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + resp.set_message(req.message() + grpc::to_string(i) + "_dup"); + GPR_ASSERT(stream->Write(resp)); + } + return Status::OK; + } +}; + +TEST_F(HybridEnd2endTest, + AsyncRequestStreamResponseStream_FullySplitStreamedDupService) { + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; + FullySplitStreamedDupPkg dup_service; + SetUpServer(&service, &dup_service, nullptr, 8192); + ResetStub(); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); + TestAllMethods(); + SendSimpleServerStreamingToDupService(); + response_stream_handler_thread.join(); + request_stream_handler_thread.join(); +} + +// Add a second service that is fully server streamed +class FullyStreamedDupPkg : public duplicate::EchoTestService::StreamedService { + public: + Status StreamedEcho(ServerContext* context, + ServerUnaryStreamer<EchoRequest, EchoResponse>* stream) + GRPC_OVERRIDE { + EchoRequest req; + EchoResponse resp; + uint32_t next_msg_sz; + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); + resp.set_message(req.message() + "_dup"); + GPR_ASSERT(stream->Write(resp)); + return Status::OK; + } + Status StreamedResponseStream( + ServerContext* context, + ServerSplitStreamer<EchoRequest, EchoResponse>* stream) GRPC_OVERRIDE { + EchoRequest req; + EchoResponse resp; + uint32_t next_msg_sz; + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Split Streamed Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); + for (int i = 0; i < kNumResponseStreamsMsgs; i++) { + resp.set_message(req.message() + grpc::to_string(i) + "_dup"); + GPR_ASSERT(stream->Write(resp)); + } + return Status::OK; + } +}; + +TEST_F(HybridEnd2endTest, + AsyncRequestStreamResponseStream_FullyStreamedDupService) { + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl>> + SType; + SType service; + FullyStreamedDupPkg dup_service; + SetUpServer(&service, &dup_service, nullptr, 8192); + ResetStub(); + std::thread response_stream_handler_thread(HandleServerStreaming<SType>, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming<SType>, + &service, cqs_[1].get()); + TestAllMethods(); + SendEchoToDupService(); + SendSimpleServerStreamingToDupService(); + response_stream_handler_thread.join(); + request_stream_handler_thread.join(); +} + // Add a second service with one async method. TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream< |