From 726561ed062c6bb3635b48044bc5bb00de31e97e Mon Sep 17 00:00:00 2001 From: vjpai Date: Wed, 22 Jun 2016 11:14:24 -0700 Subject: Initial definition of flow-controlled unary type along with code generation for this type --- include/grpc++/impl/codegen/fc_unary.h | 97 +++++++++++++++++++++++ include/grpc++/impl/codegen/method_handler_impl.h | 35 ++++++++ include/grpc++/impl/codegen/rpc_method.h | 3 +- 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 include/grpc++/impl/codegen/fc_unary.h (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/fc_unary.h b/include/grpc++/impl/codegen/fc_unary.h new file mode 100644 index 0000000000..abb204939d --- /dev/null +++ b/include/grpc++/impl/codegen/fc_unary.h @@ -0,0 +1,97 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_FC_UNARY_H +#define GRPCXX_IMPL_CODEGEN_FC_UNARY_H + +#include +#include +#include + +namespace grpc { + +/// A class to represent a flow-controlled unary call. This is something +/// of a hybrid between conventional unary and streaming. This is invoked +/// through a unary call on the client side, but the server responds to it +/// as though it were a single-ping-pong streaming call. The server can use +/// the \a Size method to determine an upper-bound on the size of the message +/// A key difference relative to streaming: an FCUnary must have exactly 1 Read +/// and exactly 1 Write, in that order, to function correctly. +/// Otherwise, the RPC is in error +template + class FCUnary GRPC_FINAL { + public: + FCUnary(Call* call, ServerContext* ctx, int max_message_size): call_(call), ctx_(ctx), max_msg_size_(max_message_size), read_done_(false), write_done_(false) {} + ~FCUnary() {} + uint32_t Size() {return max_msg_size_;} + bool Read(RequestType *request) { + if (read_done_) { + return false; + } + read_done_ = true; + CallOpSet> ops; + ops.RecvMessage(request); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; + } + bool Write(const ResponseType& response) {return Write(response, WriteOptions());} + bool Write(const ResponseType& response, const WriteOptions& options) { + if (write_done_ || !read_done_) { + return false; + } + write_done_ = true; + CallOpSet ops; + if (!ops.SendMessage(response, options).ok()) { + return false; + } + if (!ctx_->sent_initial_metadata_) { + ops.SendInitialMetadata(ctx_->initial_metadata_, + ctx_->initial_metadata_flags()); + ctx_->sent_initial_metadata_ = true; + } else { + return false; + } + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); + } + private: + Call* const call_; + ServerContext* const ctx_; + const int max_msg_size_; + bool read_done_; + bool write_done_; +}; + +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_FC_UNARY_H diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 21ac6c4fb5..ecf481ebb6 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -35,6 +35,7 @@ #define GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H #include +#include #include #include @@ -190,6 +191,40 @@ class BidiStreamingHandler : public MethodHandler { ServiceType* service_; }; +// A wrapper class of an application provided rpc method handler +// specifically to apply to the flow-controlled implementation of a unary +// method +template +class FCUnaryMethodHandler : public MethodHandler { + public: + FCUnaryMethodHandler(std::function*)> + func, ServiceType* service) + : func_(func), service_(service) {} + + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + FCUnary fc_unary(param.call, + param.server_context, + param.max_message_size); + Status status = func_(service_, param.server_context, &fc_unary); + if (!param.server_context->sent_initial_metadata_) { + // means that the write never happened, which is bad + } else { + CallOpSet ops; + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); + } + } + private: + // Application provided rpc handler function. + std::function*)> + func_; + // The class the above handler function lives in. + ServiceType* service_; +}; + // Handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler { public: diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index 39cb4f75df..53bee3727e 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -46,7 +46,8 @@ class RpcMethod { NORMAL_RPC = 0, CLIENT_STREAMING, // request streaming SERVER_STREAMING, // response streaming - BIDI_STREAMING + BIDI_STREAMING, + FC_UNARY // flow-controlled unary call }; RpcMethod(const char* name, RpcType type) -- cgit v1.2.3 From ba6597f297b1cc1a8d91ca9154fee81c1639acdd Mon Sep 17 00:00:00 2001 From: vjpai Date: Wed, 22 Jun 2016 15:49:48 -0700 Subject: Actually generate code for FC Unary and make it work --- include/grpc++/impl/codegen/fc_unary.h | 1 + src/compiler/cpp_generator.cc | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/fc_unary.h b/include/grpc++/impl/codegen/fc_unary.h index abb204939d..c5e44ca0fd 100644 --- a/include/grpc++/impl/codegen/fc_unary.h +++ b/include/grpc++/impl/codegen/fc_unary.h @@ -35,6 +35,7 @@ #define GRPCXX_IMPL_CODEGEN_FC_UNARY_H #include +#include #include #include diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index e5db84ab11..eadbb3be85 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -128,6 +128,7 @@ grpc::string GetHeaderIncludes(File *file, static const char *headers_strs[] = { "grpc++/impl/codegen/async_stream.h", "grpc++/impl/codegen/async_unary_call.h", + "grpc++/impl/codegen/fc_unary.h", "grpc++/impl/codegen/proto_utils.h", "grpc++/impl/codegen/rpc_method.h", "grpc++/impl/codegen/service_type.h", @@ -642,9 +643,8 @@ void PrintHeaderServerMethodFCUnary( *vars, "// replace default version of this method with FCUnary\n" "::grpc::Status $Method$(" - "::grpc::ServerContext* context, ::grpc::FCUnary<$Request$,$Response$>* streaming_unary)" + "::grpc::ServerContext* context, ::grpc::FCUnary< $Request$,$Response$>* streaming_unary)" " GRPC_FINAL GRPC_OVERRIDE;\n"); - printer->Print("}\n"); printer->Outdent(); printer->Print(*vars, "};\n"); } @@ -815,6 +815,12 @@ void PrintHeaderService(Printer *printer, PrintHeaderServerMethodGeneric(printer, service->method(i).get(), vars); } + // Server side - FC Unary + for (int i = 0; i < service->method_count(); ++i) { + (*vars)["Idx"] = as_string(i); + PrintHeaderServerMethodFCUnary(printer, service->method(i).get(), vars); + } + printer->Outdent(); printer->Print("};\n"); printer->Print(service->GetTrailingComments().c_str()); @@ -917,6 +923,7 @@ grpc::string GetSourceIncludes(File *file, "grpc++/impl/codegen/async_unary_call.h", "grpc++/impl/codegen/channel_interface.h", "grpc++/impl/codegen/client_unary_call.h", + "grpc++/impl/codegen/fc_unary.h", "grpc++/impl/codegen/method_handler_impl.h", "grpc++/impl/codegen/rpc_service_method.h", "grpc++/impl/codegen/service_type.h", -- cgit v1.2.3 From 7142a91fc961fa6be7093134b13d7b00896e40f0 Mon Sep 17 00:00:00 2001 From: vjpai Date: Thu, 23 Jun 2016 10:16:00 -0700 Subject: Fix up service types and method handlers so that FC unary can work properly. --- include/grpc++/impl/codegen/rpc_method.h | 3 ++- include/grpc++/impl/codegen/rpc_service_method.h | 1 + include/grpc++/impl/codegen/service_type.h | 7 +++++++ src/compiler/cpp_generator.cc | 9 +++++++++ 4 files changed, 19 insertions(+), 1 deletion(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index 53bee3727e..728ba3e334 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -61,11 +61,12 @@ class RpcMethod { const char* name() const { return name_; } RpcType method_type() const { return method_type_; } + void SetMethodType(RpcType type) { method_type_ = type; } void* channel_tag() const { return channel_tag_; } private: const char* const name_; - const RpcType method_type_; + RpcType method_type_; void* const channel_tag_; }; diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h index 8b1f026c91..02da2a2617 100644 --- a/include/grpc++/impl/codegen/rpc_service_method.h +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -82,6 +82,7 @@ class RpcServiceMethod : public RpcMethod { // if MethodHandler is nullptr, then this is an async method MethodHandler* handler() const { return handler_.get(); } void ResetHandler() { handler_.reset(); } + void SetHandler(MethodHandler *handler) { handler_.reset(handler); } private: void* server_tag_; diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index c19dfc7d45..c0eeace8a4 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -147,6 +147,13 @@ class Service { methods_[index].reset(); } + void MarkMethodFCUnary(int index, MethodHandler* fc_unary_method) { + GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && + "Cannot mark an async or generic method as FCUnary"); + methods_[index]->SetMethodType(::grpc::RpcMethod::FC_UNARY); + methods_[index]->SetHandler(fc_unary_method); + } + private: friend class Server; friend class ServerInterface; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index eadbb3be85..bbd476783c 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -129,6 +129,7 @@ grpc::string GetHeaderIncludes(File *file, "grpc++/impl/codegen/async_stream.h", "grpc++/impl/codegen/async_unary_call.h", "grpc++/impl/codegen/fc_unary.h", + "grpc++/impl/codegen/method_handler_impl.h", "grpc++/impl/codegen/proto_utils.h", "grpc++/impl/codegen/rpc_method.h", "grpc++/impl/codegen/service_type.h", @@ -625,6 +626,11 @@ void PrintHeaderServerMethodFCUnary( printer->Indent(); printer->Print(*vars, "WithFCUnaryMethod_$Method$() {\n" + " ::grpc::Status (*fn)(::grpc::ServerContext*, ::grpc::FCUnary< $Request$,$Response$>*) = this->WithFCUnaryMethod_$Method$::$Method$;\n" + " ::grpc::Service::MarkMethodFCUnary($Idx$,\n" + " new ::grpc::FCUnaryMethodHandler(fn, this));\n" "}\n"); printer->Print(*vars, "~WithFCUnaryMethod_$Method$() GRPC_OVERRIDE {\n" @@ -1138,6 +1144,9 @@ void PrintSourceService(Printer *printer, (*vars)["Idx"] = as_string(i); if (method->NoStreaming()) { (*vars)["StreamingType"] = "NORMAL_RPC"; + // NOTE: There is no reason to consider FC_UNARY as a separate + // category here since this part is setting up the client-side stub + // and this appears as a NORMAL_RPC from the client-side. } else if (method->ClientOnlyStreaming()) { (*vars)["StreamingType"] = "CLIENT_STREAMING"; } else if (method->ServerOnlyStreaming()) { -- cgit v1.2.3 From a12276932da60bab9a0feff0589965ec1f08e88f Mon Sep 17 00:00:00 2001 From: vjpai Date: Thu, 23 Jun 2016 13:18:19 -0700 Subject: Add some const --- include/grpc++/impl/codegen/call.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index fab85d1517..a9a5eb9f00 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -656,10 +656,10 @@ class Call GRPC_FINAL { call_hook_->PerformOpsOnCall(ops, this); } - grpc_call* call() { return call_; } - CompletionQueue* cq() { return cq_; } + grpc_call* call() const { return call_; } + CompletionQueue* cq() const { return cq_; } - int max_message_size() { return max_message_size_; } + int max_message_size() const { return max_message_size_; } private: CallHook* call_hook_; -- cgit v1.2.3 From fcb98a578cb6b359e4fd26fb618578def4d8193d Mon Sep 17 00:00:00 2001 From: vjpai Date: Thu, 23 Jun 2016 13:18:50 -0700 Subject: Add NextMessageSize method to all readable streams as an upper-bound on the actual message size. Rename Size of FCUnary to NextMessageSize for consistency --- include/grpc++/impl/codegen/fc_unary.h | 10 +++++----- include/grpc++/impl/codegen/method_handler_impl.h | 3 +-- include/grpc++/impl/codegen/sync_stream.h | 11 +++++++++++ test/cpp/end2end/mock_test.cc | 3 +++ 4 files changed, 20 insertions(+), 7 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/fc_unary.h b/include/grpc++/impl/codegen/fc_unary.h index c5e44ca0fd..abbdd42c78 100644 --- a/include/grpc++/impl/codegen/fc_unary.h +++ b/include/grpc++/impl/codegen/fc_unary.h @@ -45,16 +45,17 @@ namespace grpc { /// of a hybrid between conventional unary and streaming. This is invoked /// through a unary call on the client side, but the server responds to it /// as though it were a single-ping-pong streaming call. The server can use -/// the \a Size method to determine an upper-bound on the size of the message +/// the \a NextMessageSize method to determine an upper-bound on the size of +/// the message. /// A key difference relative to streaming: an FCUnary must have exactly 1 Read /// and exactly 1 Write, in that order, to function correctly. -/// Otherwise, the RPC is in error +/// Otherwise, the RPC is in error. template class FCUnary GRPC_FINAL { public: - FCUnary(Call* call, ServerContext* ctx, int max_message_size): call_(call), ctx_(ctx), max_msg_size_(max_message_size), read_done_(false), write_done_(false) {} + FCUnary(Call* call, ServerContext* ctx): call_(call), ctx_(ctx), read_done_(false), write_done_(false) {} ~FCUnary() {} - uint32_t Size() {return max_msg_size_;} + uint32_t NextMessageSize() {return call_->max_message_size();} bool Read(RequestType *request) { if (read_done_) { return false; @@ -88,7 +89,6 @@ template private: Call* const call_; ServerContext* const ctx_; - const int max_msg_size_; bool read_done_; bool write_done_; }; diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index ecf481ebb6..2a14ef3977 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -204,8 +204,7 @@ class FCUnaryMethodHandler : public MethodHandler { void RunHandler(const HandlerParameter& param) GRPC_FINAL { FCUnary fc_unary(param.call, - param.server_context, - param.max_message_size); + param.server_context); Status status = func_(service_, param.server_context, &fc_unary); if (!param.server_context->sent_initial_metadata_) { // means that the write never happened, which is bad diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index e94ffe5842..eb76edd140 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -70,6 +70,9 @@ class ReaderInterface { public: virtual ~ReaderInterface() {} + /// Upper bound on the next message size available for reading on this stream + virtual uint32_t NextMessageSize() = 0; + /// Blocking read a message and parse to \a msg. Returns \a true on success. /// /// \param[out] msg The read message. @@ -143,6 +146,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { cq_.Pluck(&ops); /// status ignored } + uint32_t NextMessageSize() GRPC_OVERRIDE {return call_.max_message_size();} + bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; if (!context_->initial_metadata_received_) { @@ -286,6 +291,8 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { cq_.Pluck(&ops); // status ignored } + uint32_t NextMessageSize() GRPC_OVERRIDE {return call_.max_message_size();} + bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; if (!context_->initial_metadata_received_) { @@ -345,6 +352,8 @@ class ServerReader GRPC_FINAL : public ReaderInterface { call_->cq()->Pluck(&ops); } + uint32_t NextMessageSize() GRPC_OVERRIDE {return call_->max_message_size();} + bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; ops.RecvMessage(msg); @@ -411,6 +420,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, call_->cq()->Pluck(&ops); } + uint32_t NextMessageSize() GRPC_OVERRIDE {return call_->max_message_size();} + bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; ops.RecvMessage(msg); diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 0ace5d9418..744a7cd9eb 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -31,6 +31,7 @@ * */ +#include #include #include @@ -63,6 +64,7 @@ class MockClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { public: void WaitForInitialMetadata() GRPC_OVERRIDE {} + uint32_t NextMessageSize() GRPC_OVERRIDE {return UINT_MAX;} bool Read(R* msg) GRPC_OVERRIDE { return true; } bool Write(const W& msg) GRPC_OVERRIDE { return true; } bool WritesDone() GRPC_OVERRIDE { return true; } @@ -74,6 +76,7 @@ class MockClientReaderWriter GRPC_FINAL public: MockClientReaderWriter() : writes_done_(false) {} void WaitForInitialMetadata() GRPC_OVERRIDE {} + uint32_t NextMessageSize() GRPC_OVERRIDE {return UINT_MAX;} bool Read(EchoResponse* msg) GRPC_OVERRIDE { if (writes_done_) return false; msg->set_message(last_message_); -- cgit v1.2.3 From c0c38b1f19c98f4c21ac2b75ba968e0c3aeefd19 Mon Sep 17 00:00:00 2001 From: vjpai Date: Thu, 23 Jun 2016 17:27:11 -0700 Subject: Make the FCUnary class actually work and test it --- include/grpc++/impl/codegen/completion_queue.h | 8 +++++ include/grpc++/impl/codegen/method_handler_impl.h | 19 +++++------ include/grpc++/impl/codegen/server_context.h | 8 +++++ src/compiler/cpp_generator.cc | 8 ++--- test/cpp/end2end/hybrid_end2end_test.cc | 39 ++++++++++++++++++++++- 5 files changed, 68 insertions(+), 14 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 1b84b44705..2286f01b8a 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -59,6 +59,8 @@ template class ServerWriter; template class ServerReaderWriter; +template +class FCUnary; template class RpcMethodHandler; template @@ -67,6 +69,8 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; +template +class FCUnaryMethodHandler; class UnknownMethodHandler; class Channel; @@ -168,6 +172,8 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ::grpc::ServerWriter; template friend class ::grpc::ServerReaderWriter; + template + friend class ::grpc::FCUnary; template friend class RpcMethodHandler; template @@ -176,6 +182,8 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ServerStreamingHandler; template friend class BidiStreamingHandler; + template + friend class FCUnaryMethodHandler; friend class UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 2a14ef3977..293ae6a4d8 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -193,19 +193,22 @@ class BidiStreamingHandler : public MethodHandler { // A wrapper class of an application provided rpc method handler // specifically to apply to the flow-controlled implementation of a unary -// method +// method. +/// The argument to the constructor should be a member function already +/// bound to the appropriate service instance. The declaration gets too complicated +/// otherwise. template class FCUnaryMethodHandler : public MethodHandler { public: - FCUnaryMethodHandler(std::function*)> - func, ServiceType* service) - : func_(func), service_(service) {} + func) + : func_(func) {} void RunHandler(const HandlerParameter& param) GRPC_FINAL { FCUnary fc_unary(param.call, param.server_context); - Status status = func_(service_, param.server_context, &fc_unary); + Status status = func_(param.server_context, &fc_unary); if (!param.server_context->sent_initial_metadata_) { // means that the write never happened, which is bad } else { @@ -216,12 +219,10 @@ class FCUnaryMethodHandler : public MethodHandler { } } private: - // Application provided rpc handler function. - std::function*)> func_; - // The class the above handler function lives in. - ServiceType* service_; }; // Handle unknown method by returning UNIMPLEMENTED error. diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index a1e1ed176f..8a925d8037 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -67,6 +67,8 @@ template class ServerWriter; template class ServerReaderWriter; +template +class FCUnary; template class RpcMethodHandler; template @@ -75,6 +77,8 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; +template +class FCUnaryMethodHandler; class UnknownMethodHandler; class Call; @@ -177,6 +181,8 @@ class ServerContext { friend class ::grpc::ServerWriter; template friend class ::grpc::ServerReaderWriter; + template + friend class ::grpc::FCUnary; template friend class RpcMethodHandler; template @@ -185,6 +191,8 @@ class ServerContext { friend class ServerStreamingHandler; template friend class BidiStreamingHandler; + template + friend class FCUnaryMethodHandler; friend class UnknownMethodHandler; friend class ::grpc::ClientContext; diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index da89d433b1..9b493fe926 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -626,11 +626,11 @@ void PrintHeaderServerMethodFCUnary( printer->Indent(); printer->Print(*vars, "WithFCUnaryMethod_$Method$() {\n" - " ::grpc::Status (*fn)(::grpc::ServerContext*, ::grpc::FCUnary< $Request$,$Response$>*) = this->WithFCUnaryMethod_$Method$::$Method$;\n" " ::grpc::Service::MarkMethodFCUnary($Idx$,\n" " new ::grpc::FCUnaryMethodHandler(fn, this));\n" + "$Response$>(" + "std::bind(&WithFCUnaryMethod_$Method$::FC$Method$, this, std::placeholders::_1, std::placeholders::_2)));\n" "}\n"); printer->Print(*vars, "~WithFCUnaryMethod_$Method$() GRPC_OVERRIDE {\n" @@ -648,9 +648,9 @@ void PrintHeaderServerMethodFCUnary( printer->Print( *vars, "// replace default version of this method with FCUnary\n" - "::grpc::Status $Method$(" + "virtual ::grpc::Status FC$Method$(" "::grpc::ServerContext* context, ::grpc::FCUnary< $Request$,$Response$>* fc_unary)" - " GRPC_FINAL GRPC_OVERRIDE;\n"); + " = 0;\n"); printer->Outdent(); printer->Print(*vars, "};\n"); } diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 7e0c0e8a7c..699cf49b26 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -199,7 +199,7 @@ class HybridEnd2endTest : public ::testing::Test { HybridEnd2endTest() {} void SetUpServer(::grpc::Service* service1, ::grpc::Service* service2, - AsyncGenericService* generic_service) { + AsyncGenericService* generic_service, int max_message_size = 0) { int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; @@ -217,6 +217,11 @@ class HybridEnd2endTest : public ::testing::Test { if (generic_service) { builder.RegisterAsyncGenericService(generic_service); } + + if (max_message_size != 0) { + builder.SetMaxMessageSize(max_message_size); + } + // Create a separate cq for each potential handler. for (int i = 0; i < 5; i++) { cqs_.push_back(builder.AddCompletionQueue(false)); @@ -415,6 +420,38 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) { request_stream_handler_thread.join(); } +// Add a second service with one sync FCUnary method. +class FCUnaryDupPkg : public duplicate::EchoTestService::WithFCUnaryMethod_Echo { +public: + Status FCEcho(ServerContext* context, FCUnary* fc_unary) GRPC_OVERRIDE { + EchoRequest req; + EchoResponse resp; + gpr_log(GPR_INFO, "FC Unary Next Message Size is %u", fc_unary->NextMessageSize()); + GPR_ASSERT(fc_unary->Read(&req)); + resp.set_message(req.message() + "_dup"); + GPR_ASSERT(fc_unary->Write(resp)); + return Status::OK; + } +}; + +TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncFCUnaryDupService) { + typedef EchoTestService::WithAsyncMethod_RequestStream< + EchoTestService::WithAsyncMethod_ResponseStream> + SType; + SType service; + FCUnaryDupPkg dup_service; + SetUpServer(&service, &dup_service, nullptr, 8192); + ResetStub(); + std::thread response_stream_handler_thread(HandleServerStreaming, + &service, cqs_[0].get()); + std::thread request_stream_handler_thread(HandleClientStreaming, + &service, cqs_[1].get()); + TestAllMethods(); + SendEchoToDupService(); + response_stream_handler_thread.join(); + request_stream_handler_thread.join(); +} + // Add a second service with one async method. TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_AsyncDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream< -- cgit v1.2.3 From 2d04dd827ce66a54ea8ddc6c691f9c028833fd56 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 27 Jul 2016 11:35:56 -0700 Subject: Change API for next message size to allow a bool return value for failure cases. --- include/grpc++/impl/codegen/fc_unary.h | 5 ++++- include/grpc++/impl/codegen/sync_stream.h | 22 +++++++++++++++++----- test/cpp/end2end/hybrid_end2end_test.cc | 4 +++- test/cpp/end2end/mock_test.cc | 10 ++++++++-- 4 files changed, 32 insertions(+), 9 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/fc_unary.h b/include/grpc++/impl/codegen/fc_unary.h index 22e40ab02e..768443912b 100644 --- a/include/grpc++/impl/codegen/fc_unary.h +++ b/include/grpc++/impl/codegen/fc_unary.h @@ -56,7 +56,10 @@ template public: FCUnary(Call* call, ServerContext* ctx): call_(call), ctx_(ctx), read_done_(false), write_done_(false) {} ~FCUnary() {} - uint32_t NextMessageSize() {return call_->max_message_size();} + bool NextMessageSize(uint32_t *sz) { + *sz = call_->max_message_size(); + return true; + } bool Read(RequestType *request) { if (read_done_) { return false; diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 0afa8b6aa4..930c72056c 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -71,7 +71,7 @@ class ReaderInterface { virtual ~ReaderInterface() {} /// Upper bound on the next message size available for reading on this stream - virtual uint32_t NextMessageSize() = 0; + virtual bool NextMessageSize(uint32_t *sz) = 0; /// Blocking read a message and parse to \a msg. Returns \a true on success. /// This is thread-safe with respect to \a Write or \WritesDone methods on @@ -151,7 +151,10 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { cq_.Pluck(&ops); /// status ignored } - uint32_t NextMessageSize() GRPC_OVERRIDE {return call_.max_message_size();} + bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + *sz = call_.max_message_size(); + return true; + } bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; @@ -298,7 +301,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { cq_.Pluck(&ops); // status ignored } - uint32_t NextMessageSize() GRPC_OVERRIDE {return call_.max_message_size();} + bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + *sz = call_.max_message_size(); + return true; + } bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; @@ -359,7 +365,10 @@ class ServerReader GRPC_FINAL : public ReaderInterface { call_->cq()->Pluck(&ops); } - uint32_t NextMessageSize() GRPC_OVERRIDE {return call_->max_message_size();} + bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + *sz = call_->max_message_size(); + return true; + } bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; @@ -427,7 +436,10 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, call_->cq()->Pluck(&ops); } - uint32_t NextMessageSize() GRPC_OVERRIDE {return call_->max_message_size();} + bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + *sz = call_->max_message_size(); + return true; + } bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 699cf49b26..b80010e803 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -426,7 +426,9 @@ public: Status FCEcho(ServerContext* context, FCUnary* fc_unary) GRPC_OVERRIDE { EchoRequest req; EchoResponse resp; - gpr_log(GPR_INFO, "FC Unary Next Message Size is %u", fc_unary->NextMessageSize()); + uint32_t next_msg_sz; + fc_unary->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "FC Unary Next Message Size is %u", next_msg_sz); GPR_ASSERT(fc_unary->Read(&req)); resp.set_message(req.message() + "_dup"); GPR_ASSERT(fc_unary->Write(resp)); diff --git a/test/cpp/end2end/mock_test.cc b/test/cpp/end2end/mock_test.cc index 744a7cd9eb..4052627122 100644 --- a/test/cpp/end2end/mock_test.cc +++ b/test/cpp/end2end/mock_test.cc @@ -64,7 +64,10 @@ class MockClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { public: void WaitForInitialMetadata() GRPC_OVERRIDE {} - uint32_t NextMessageSize() GRPC_OVERRIDE {return UINT_MAX;} + bool NextMessageSize(uint32_t* sz) GRPC_OVERRIDE { + *sz = UINT_MAX; + return true; + } bool Read(R* msg) GRPC_OVERRIDE { return true; } bool Write(const W& msg) GRPC_OVERRIDE { return true; } bool WritesDone() GRPC_OVERRIDE { return true; } @@ -76,7 +79,10 @@ class MockClientReaderWriter GRPC_FINAL public: MockClientReaderWriter() : writes_done_(false) {} void WaitForInitialMetadata() GRPC_OVERRIDE {} - uint32_t NextMessageSize() GRPC_OVERRIDE {return UINT_MAX;} + bool NextMessageSize(uint32_t* sz) GRPC_OVERRIDE { + *sz = UINT_MAX; + return true; + } bool Read(EchoResponse* msg) GRPC_OVERRIDE { if (writes_done_) return false; msg->set_message(last_message_); -- cgit v1.2.3 From 581097fe0dc321a31cedae97057838fa1eadf8b2 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 4 Aug 2016 10:50:51 -0700 Subject: Make FCUnary and ServerReaderWriter derived classes of a new ServerReaderWriterInterface so that some functions can be made to accept either. --- include/grpc++/impl/codegen/completion_queue.h | 8 ++--- include/grpc++/impl/codegen/fc_unary.h | 44 +++++++------------------- include/grpc++/impl/codegen/server_context.h | 8 ++--- include/grpc++/impl/codegen/sync_stream.h | 26 ++++++++------- src/compiler/cpp_generator.cc | 1 + 5 files changed, 32 insertions(+), 55 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 1d00bf2d1b..95ece77ce5 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -69,9 +69,7 @@ class ServerReader; template class ServerWriter; template -class ServerReaderWriter; -template -class FCUnary; +class ServerReaderWriterInterface; template class RpcMethodHandler; template @@ -182,9 +180,7 @@ class CompletionQueue : private GrpcLibraryCodegen { template friend class ::grpc::ServerWriter; template - friend class ::grpc::ServerReaderWriter; - template - friend class ::grpc::FCUnary; + friend class ::grpc::ServerReaderWriterInterface; template friend class RpcMethodHandler; template diff --git a/include/grpc++/impl/codegen/fc_unary.h b/include/grpc++/impl/codegen/fc_unary.h index 768443912b..9dbc4024bf 100644 --- a/include/grpc++/impl/codegen/fc_unary.h +++ b/include/grpc++/impl/codegen/fc_unary.h @@ -37,11 +37,10 @@ #include #include #include -#include #include +#include namespace grpc { - /// A class to represent a flow-controlled unary call. This is something /// of a hybrid between conventional unary and streaming. This is invoked /// through a unary call on the client side, but the server responds to it @@ -52,51 +51,32 @@ namespace grpc { /// and exactly 1 Write, in that order, to function correctly. /// Otherwise, the RPC is in error. template - class FCUnary GRPC_FINAL { - public: - FCUnary(Call* call, ServerContext* ctx): call_(call), ctx_(ctx), read_done_(false), write_done_(false) {} + class FCUnary GRPC_FINAL : public ServerReaderWriterInterface { +public: + FCUnary(Call* call, ServerContext* ctx): ServerReaderWriterInterface(call, ctx) , read_done_(false), write_done_(false) {} + ~FCUnary() {} - bool NextMessageSize(uint32_t *sz) { - *sz = call_->max_message_size(); - return true; - } - bool Read(RequestType *request) { + + bool Read(RequestType *request) GRPC_OVERRIDE { if (read_done_) { return false; } read_done_ = true; - CallOpSet> ops; - ops.RecvMessage(request); - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops) && ops.got_message; + return ServerReaderWriterInterface::Read(request); } - bool Write(const ResponseType& response) {return Write(response, WriteOptions());} - bool Write(const ResponseType& response, const WriteOptions& options) { + + using WriterInterface::Write; + bool Write(const ResponseType& response, const WriteOptions& options) GRPC_OVERRIDE { if (write_done_ || !read_done_) { return false; } write_done_ = true; - CallOpSet ops; - if (!ops.SendMessage(response, options).ok()) { - return false; - } - if (!ctx_->sent_initial_metadata_) { - ops.SendInitialMetadata(ctx_->initial_metadata_, - ctx_->initial_metadata_flags()); - ctx_->sent_initial_metadata_ = true; - } else { - return false; - } - call_->PerformOps(&ops); - return call_->cq()->Pluck(&ops); + return ServerReaderWriterInterface::Write(response, options); } private: - Call* const call_; - ServerContext* const ctx_; bool read_done_; bool write_done_; }; - } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_FC_UNARY_H diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index e1d7e980a1..aadae893ad 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -66,9 +66,7 @@ class ServerReader; template class ServerWriter; template -class ServerReaderWriter; -template -class FCUnary; +class ServerReaderWriterInterface; template class RpcMethodHandler; template @@ -187,9 +185,7 @@ class ServerContext { template friend class ::grpc::ServerWriter; template - friend class ::grpc::ServerReaderWriter; - template - friend class ::grpc::FCUnary; + friend class ::grpc::ServerReaderWriterInterface; template friend class RpcMethodHandler; template diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 31da2cbbe9..2e63479961 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -429,12 +429,11 @@ class ServerWriter GRPC_FINAL : public WriterInterface { /// Server-side interface for bi-directional streaming. template -class ServerReaderWriter GRPC_FINAL : public WriterInterface, - public ReaderInterface { - public: - ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} - - void SendInitialMetadata() { +class ServerReaderWriterInterface : public WriterInterface, + public ReaderInterface { +public: + ServerReaderWriterInterface(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + virtual void SendInitialMetadata() { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); CallOpSet ops; @@ -448,12 +447,12 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, call_->cq()->Pluck(&ops); } - bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + virtual bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { *sz = call_->max_message_size(); return true; } - bool Read(R* msg) GRPC_OVERRIDE { + virtual bool Read(R* msg) GRPC_OVERRIDE { CallOpSet> ops; ops.RecvMessage(msg); call_->PerformOps(&ops); @@ -461,7 +460,7 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, } using WriterInterface::Write; - bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { + virtual bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { CallOpSet ops; if (!ops.SendMessage(msg, options).ok()) { return false; @@ -477,12 +476,17 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface, call_->PerformOps(&ops); return call_->cq()->Pluck(&ops); } - - private: +private: Call* const call_; ServerContext* const ctx_; }; +template +class ServerReaderWriter GRPC_FINAL : public ServerReaderWriterInterface { +public: + ServerReaderWriter(Call* call, ServerContext* ctx) : ServerReaderWriterInterface(call, ctx) {} +}; + } // namespace grpc #endif // GRPCXX_IMPL_CODEGEN_SYNC_STREAM_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index dcf995ff61..d587023c65 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -131,6 +131,7 @@ grpc::string GetHeaderIncludes(File *file, const Parameters ¶ms) { "grpc++/impl/codegen/async_stream.h", "grpc++/impl/codegen/async_unary_call.h", "grpc++/impl/codegen/fc_unary.h", + "grpc++/impl/codegen/method_handler_impl.h", "grpc++/impl/codegen/proto_utils.h", "grpc++/impl/codegen/rpc_method.h", "grpc++/impl/codegen/service_type.h", -- cgit v1.2.3 From 5d94118d0d3e11b0333251fec219e4571f6a20e7 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 4 Aug 2016 11:48:47 -0700 Subject: clang-format --- include/grpc++/impl/codegen/fc_unary.h | 26 ++++++++----- include/grpc++/impl/codegen/method_handler_impl.h | 19 +++++----- include/grpc++/impl/codegen/rpc_method.h | 2 +- include/grpc++/impl/codegen/rpc_service_method.h | 2 +- include/grpc++/impl/codegen/service_type.h | 2 +- include/grpc++/impl/codegen/sync_stream.h | 25 +++++++------ src/compiler/cpp_generator.cc | 45 ++++++++++++----------- test/cpp/end2end/hybrid_end2end_test.cc | 22 +++++++---- 8 files changed, 81 insertions(+), 62 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/fc_unary.h b/include/grpc++/impl/codegen/fc_unary.h index 9dbc4024bf..a423f1f0b3 100644 --- a/include/grpc++/impl/codegen/fc_unary.h +++ b/include/grpc++/impl/codegen/fc_unary.h @@ -51,28 +51,36 @@ namespace grpc { /// and exactly 1 Write, in that order, to function correctly. /// Otherwise, the RPC is in error. template - class FCUnary GRPC_FINAL : public ServerReaderWriterInterface { -public: - FCUnary(Call* call, ServerContext* ctx): ServerReaderWriterInterface(call, ctx) , read_done_(false), write_done_(false) {} +class FCUnary GRPC_FINAL + : public ServerReaderWriterInterface { + public: + FCUnary(Call* call, ServerContext* ctx) + : ServerReaderWriterInterface(call, ctx), + read_done_(false), + write_done_(false) {} ~FCUnary() {} - bool Read(RequestType *request) GRPC_OVERRIDE { + bool Read(RequestType* request) GRPC_OVERRIDE { if (read_done_) { - return false; + return false; } read_done_ = true; - return ServerReaderWriterInterface::Read(request); + return ServerReaderWriterInterface::Read( + request); } using WriterInterface::Write; - bool Write(const ResponseType& response, const WriteOptions& options) GRPC_OVERRIDE { + bool Write(const ResponseType& response, + const WriteOptions& options) GRPC_OVERRIDE { if (write_done_ || !read_done_) { - return false; + return false; } write_done_ = true; - return ServerReaderWriterInterface::Write(response, options); + return ServerReaderWriterInterface::Write( + response, options); } + private: bool read_done_; bool write_done_; diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 14247b1f03..9c3af53b3a 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -207,19 +207,20 @@ class BidiStreamingHandler : public MethodHandler { // specifically to apply to the flow-controlled implementation of a unary // method. /// The argument to the constructor should be a member function already -/// bound to the appropriate service instance. The declaration gets too complicated +/// bound to the appropriate service instance. The declaration gets too +/// complicated /// otherwise. template class FCUnaryMethodHandler : public MethodHandler { public: - FCUnaryMethodHandler(std::function*)> - func) - : func_(func) {} + FCUnaryMethodHandler( + std::function*)> + func) + : func_(func) {} void RunHandler(const HandlerParameter& param) GRPC_FINAL { FCUnary fc_unary(param.call, - param.server_context); + param.server_context); Status status = func_(param.server_context, &fc_unary); if (!param.server_context->sent_initial_metadata_) { // means that the write never happened, which is bad @@ -230,11 +231,11 @@ class FCUnaryMethodHandler : public MethodHandler { param.call->cq()->Pluck(&ops); } } + private: // Application provided rpc handler function, already bound to its service. - std::function*)> - func_; + std::function*)> + func_; }; // Handle unknown method by returning UNIMPLEMENTED error. diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index 728ba3e334..b55d755075 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -47,7 +47,7 @@ class RpcMethod { CLIENT_STREAMING, // request streaming SERVER_STREAMING, // response streaming BIDI_STREAMING, - FC_UNARY // flow-controlled unary call + FC_UNARY // flow-controlled unary call }; RpcMethod(const char* name, RpcType type) diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h index 02da2a2617..52124fba0b 100644 --- a/include/grpc++/impl/codegen/rpc_service_method.h +++ b/include/grpc++/impl/codegen/rpc_service_method.h @@ -82,7 +82,7 @@ class RpcServiceMethod : public RpcMethod { // if MethodHandler is nullptr, then this is an async method MethodHandler* handler() const { return handler_.get(); } void ResetHandler() { handler_.reset(); } - void SetHandler(MethodHandler *handler) { handler_.reset(handler); } + void SetHandler(MethodHandler* handler) { handler_.reset(handler); } private: void* server_tag_; diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index c0eeace8a4..dcfc6b01b7 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -149,7 +149,7 @@ class Service { void MarkMethodFCUnary(int index, MethodHandler* fc_unary_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && - "Cannot mark an async or generic method as FCUnary"); + "Cannot mark an async or generic method as FCUnary"); methods_[index]->SetMethodType(::grpc::RpcMethod::FC_UNARY); methods_[index]->SetHandler(fc_unary_method); } diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 2e63479961..d9b7e6fec5 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -71,7 +71,7 @@ class ReaderInterface { virtual ~ReaderInterface() {} /// Upper bound on the next message size available for reading on this stream - virtual bool NextMessageSize(uint32_t *sz) = 0; + virtual bool NextMessageSize(uint32_t* sz) = 0; /// Blocking read a message and parse to \a msg. Returns \a true on success. /// This is thread-safe with respect to \a Write or \WritesDone methods on @@ -151,7 +151,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface { cq_.Pluck(&ops); /// status ignored } - bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + bool NextMessageSize(uint32_t* sz) GRPC_OVERRIDE { *sz = call_.max_message_size(); return true; } @@ -301,7 +301,7 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface { cq_.Pluck(&ops); // status ignored } - bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + bool NextMessageSize(uint32_t* sz) GRPC_OVERRIDE { *sz = call_.max_message_size(); return true; } @@ -368,7 +368,7 @@ class ServerReader GRPC_FINAL : public ReaderInterface { call_->cq()->Pluck(&ops); } - bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + bool NextMessageSize(uint32_t* sz) GRPC_OVERRIDE { *sz = call_->max_message_size(); return true; } @@ -431,8 +431,9 @@ class ServerWriter GRPC_FINAL : public WriterInterface { template class ServerReaderWriterInterface : public WriterInterface, public ReaderInterface { -public: - ServerReaderWriterInterface(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + public: + ServerReaderWriterInterface(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} virtual void SendInitialMetadata() { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); @@ -447,7 +448,7 @@ public: call_->cq()->Pluck(&ops); } - virtual bool NextMessageSize(uint32_t *sz) GRPC_OVERRIDE { + virtual bool NextMessageSize(uint32_t* sz) GRPC_OVERRIDE { *sz = call_->max_message_size(); return true; } @@ -476,15 +477,17 @@ public: call_->PerformOps(&ops); return call_->cq()->Pluck(&ops); } -private: + + private: Call* const call_; ServerContext* const ctx_; }; template -class ServerReaderWriter GRPC_FINAL : public ServerReaderWriterInterface { -public: - ServerReaderWriter(Call* call, ServerContext* ctx) : ServerReaderWriterInterface(call, ctx) {} +class ServerReaderWriter GRPC_FINAL : public ServerReaderWriterInterface { + public: + ServerReaderWriter(Call* call, ServerContext* ctx) + : ServerReaderWriterInterface(call, ctx) {} }; } // namespace grpc diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 29007b5e1d..252a92d971 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -607,8 +607,7 @@ void PrintHeaderServerMethodAsync(Printer *printer, const Method *method, } void PrintHeaderServerMethodFCUnary( - Printer *printer, - const Method *method, + Printer *printer, const Method *method, std::map *vars) { (*vars)["Method"] = method->name(); (*vars)["Request"] = method->input_type_name(); @@ -616,24 +615,27 @@ void PrintHeaderServerMethodFCUnary( if (method->NoStreaming()) { printer->Print(*vars, "template \n"); printer->Print(*vars, - "class WithFCUnaryMethod_$Method$ : public BaseClass {\n"); + "class WithFCUnaryMethod_$Method$ : public BaseClass {\n"); printer->Print( - " private:\n" - " void BaseClassMustBeDerivedFromService(const Service *service) {}\n"); + " private:\n" + " void BaseClassMustBeDerivedFromService(const Service *service) " + "{}\n"); printer->Print(" public:\n"); printer->Indent(); printer->Print(*vars, - "WithFCUnaryMethod_$Method$() {\n" - " ::grpc::Service::MarkMethodFCUnary($Idx$,\n" - " new ::grpc::FCUnaryMethodHandler(" - "std::bind(&WithFCUnaryMethod_$Method$::FC$Method$, this, std::placeholders::_1, std::placeholders::_2)));\n" - "}\n"); + "WithFCUnaryMethod_$Method$() {\n" + " ::grpc::Service::MarkMethodFCUnary($Idx$,\n" + " new ::grpc::FCUnaryMethodHandler(" + "std::bind(&WithFCUnaryMethod_$Method$::FC$" + "Method$, this, std::placeholders::_1, " + "std::placeholders::_2)));\n" + "}\n"); printer->Print(*vars, - "~WithFCUnaryMethod_$Method$() GRPC_OVERRIDE {\n" - " BaseClassMustBeDerivedFromService(this);\n" - "}\n"); + "~WithFCUnaryMethod_$Method$() GRPC_OVERRIDE {\n" + " BaseClassMustBeDerivedFromService(this);\n" + "}\n"); printer->Print( *vars, "// disable regular version of this method\n" @@ -643,12 +645,12 @@ void PrintHeaderServerMethodFCUnary( " abort();\n" " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); - printer->Print( - *vars, - "// replace default version of this method with FCUnary\n" - "virtual ::grpc::Status FC$Method$(" - "::grpc::ServerContext* context, ::grpc::FCUnary< $Request$,$Response$>* fc_unary)" - " = 0;\n"); + printer->Print(*vars, + "// replace default version of this method with FCUnary\n" + "virtual ::grpc::Status FC$Method$(" + "::grpc::ServerContext* context, ::grpc::FCUnary< " + "$Request$,$Response$>* fc_unary)" + " = 0;\n"); printer->Outdent(); printer->Print(*vars, "};\n"); } @@ -841,7 +843,6 @@ void PrintHeaderService(Printer *printer, const Service *service, } printer->Print(" FCUnaryService;\n"); - printer->Outdent(); printer->Print("};\n"); printer->Print(service->GetTrailingComments().c_str()); diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index a1a964a9d3..1512e99a3c 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -199,7 +199,8 @@ class HybridEnd2endTest : public ::testing::Test { HybridEnd2endTest() {} void SetUpServer(::grpc::Service* service1, ::grpc::Service* service2, - AsyncGenericService* generic_service, int max_message_size = 0) { + AsyncGenericService* generic_service, + int max_message_size = 0) { int port = grpc_pick_unused_port_or_die(); server_address_ << "localhost:" << port; @@ -421,9 +422,11 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) { } // Add a second service with one sync FCUnary method. -class FCUnaryDupPkg : public duplicate::EchoTestService::WithFCUnaryMethod_Echo { -public: - Status FCEcho(ServerContext* context, FCUnary* fc_unary) GRPC_OVERRIDE { +class FCUnaryDupPkg : public duplicate::EchoTestService::WithFCUnaryMethod_Echo< + TestServiceImplDupPkg> { + public: + Status FCEcho(ServerContext* context, + FCUnary* fc_unary) GRPC_OVERRIDE { EchoRequest req; EchoResponse resp; uint32_t next_msg_sz; @@ -436,7 +439,8 @@ public: } }; -TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncFCUnaryDupService) { +TEST_F(HybridEnd2endTest, + AsyncRequestStreamResponseStream_SyncFCUnaryDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream< EchoTestService::WithAsyncMethod_ResponseStream> SType; @@ -456,8 +460,9 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncFCUnaryDupService // Add a second service that is fully FCUnary class FullyFCUnaryDupPkg : public duplicate::EchoTestService::FCUnaryService { -public: - Status FCEcho(ServerContext* context, FCUnary* fc_unary) GRPC_OVERRIDE { + public: + Status FCEcho(ServerContext* context, + FCUnary* fc_unary) GRPC_OVERRIDE { EchoRequest req; EchoResponse resp; uint32_t next_msg_sz; @@ -470,7 +475,8 @@ public: } }; -TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncFullyFCUnaryDupService) { +TEST_F(HybridEnd2endTest, + AsyncRequestStreamResponseStream_SyncFullyFCUnaryDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream< EchoTestService::WithAsyncMethod_ResponseStream> SType; -- cgit v1.2.3 From cdc253535bb879918b9bd66b29007f27219f6e2e Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 25 Aug 2016 15:33:02 -0700 Subject: Remove FC_UNARY enum and treat it more like a special case of BIDI_STREAMING in all cases --- include/grpc++/impl/codegen/rpc_method.h | 3 +-- include/grpc++/impl/codegen/service_type.h | 5 ++++- src/compiler/cpp_generator.cc | 2 +- src/cpp/server/server.cc | 1 - 4 files changed, 6 insertions(+), 5 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h index b55d755075..4897428074 100644 --- a/include/grpc++/impl/codegen/rpc_method.h +++ b/include/grpc++/impl/codegen/rpc_method.h @@ -46,8 +46,7 @@ class RpcMethod { NORMAL_RPC = 0, CLIENT_STREAMING, // request streaming SERVER_STREAMING, // response streaming - BIDI_STREAMING, - FC_UNARY // flow-controlled unary call + BIDI_STREAMING }; RpcMethod(const char* name, RpcType type) diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index dcfc6b01b7..4af40422a1 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -150,8 +150,11 @@ class Service { void MarkMethodFCUnary(int index, MethodHandler* fc_unary_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && "Cannot mark an async or generic method as FCUnary"); - methods_[index]->SetMethodType(::grpc::RpcMethod::FC_UNARY); methods_[index]->SetHandler(fc_unary_method); + + // From the server's point of view, streamed unary is a special + // case of BIDI_STREAMING that has 1 read and 1 write, in that order. + methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING); } private: diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 252a92d971..c5d4c2573d 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -1154,7 +1154,7 @@ void PrintSourceService(Printer *printer, const Service *service, (*vars)["Idx"] = as_string(i); if (method->NoStreaming()) { (*vars)["StreamingType"] = "NORMAL_RPC"; - // NOTE: There is no reason to consider FC_UNARY as a separate + // NOTE: There is no reason to consider streamed-unary as a separate // category here since this part is setting up the client-side stub // and this appears as a NORMAL_RPC from the client-side. } else if (method->ClientOnlyStreaming()) { diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index c1fbaa09f5..af04fd4ca6 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -342,7 +342,6 @@ static grpc_server_register_method_payload_handling PayloadHandlingForMethod( return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER; case RpcMethod::CLIENT_STREAMING: case RpcMethod::BIDI_STREAMING: - case RpcMethod::FC_UNARY: return GRPC_SRM_PAYLOAD_NONE; } GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;); -- cgit v1.2.3 From a9c0d7f88b213d9a5e41808fd5d1eceaff1a034f Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 29 Aug 2016 16:42:04 -0700 Subject: Change names to StreamedUnary, ServerUnaryStreamer, etc. Use a templated method handler since most code shared between the new StreamedUnary and the existing BidiStreaming. Eliminate the separate enum case for streamed unary. Return a status failure if a StreamedUnary method handler doesn't actually do a write (since that is violating the appearance of unary-ness) --- include/grpc++/ext/reflection.grpc.pb.h | 2 +- include/grpc++/impl/codegen/completion_queue.h | 8 +- include/grpc++/impl/codegen/fc_unary.h | 90 ---------------------- include/grpc++/impl/codegen/method_handler_impl.h | 81 ++++++++----------- include/grpc++/impl/codegen/server_context.h | 8 +- .../grpc++/impl/codegen/server_streamed_unary.h | 90 ++++++++++++++++++++++ include/grpc++/impl/codegen/service_type.h | 7 +- include/grpc++/impl/codegen/status_code_enum.h | 5 ++ include/grpc++/support/fc_unary.h | 39 ---------- include/grpc++/support/server_streamed_unary.h | 39 ++++++++++ src/compiler/cpp_generator.cc | 44 ++++++----- src/cpp/ext/reflection.grpc.pb.cc | 2 +- test/cpp/end2end/hybrid_end2end_test.cc | 43 ++++++----- 13 files changed, 224 insertions(+), 234 deletions(-) delete mode 100644 include/grpc++/impl/codegen/fc_unary.h create mode 100644 include/grpc++/impl/codegen/server_streamed_unary.h delete mode 100644 include/grpc++/support/fc_unary.h create mode 100644 include/grpc++/support/server_streamed_unary.h (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/ext/reflection.grpc.pb.h b/include/grpc++/ext/reflection.grpc.pb.h index 18ec2ea1ec..822c2e374a 100644 --- a/include/grpc++/ext/reflection.grpc.pb.h +++ b/include/grpc++/ext/reflection.grpc.pb.h @@ -74,10 +74,10 @@ #include #include -#include #include #include #include +#include #include #include #include diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 95ece77ce5..ea317a7a79 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -78,8 +78,6 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; -template -class FCUnaryMethodHandler; class UnknownMethodHandler; class Channel; @@ -187,10 +185,8 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ClientStreamingHandler; template friend class ServerStreamingHandler; - template - friend class BidiStreamingHandler; - template - friend class FCUnaryMethodHandler; + template + friend class TemplatedBidiStreamingHandler; friend class UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; diff --git a/include/grpc++/impl/codegen/fc_unary.h b/include/grpc++/impl/codegen/fc_unary.h deleted file mode 100644 index a423f1f0b3..0000000000 --- a/include/grpc++/impl/codegen/fc_unary.h +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_IMPL_CODEGEN_FC_UNARY_H -#define GRPCXX_IMPL_CODEGEN_FC_UNARY_H - -#include -#include -#include -#include -#include - -namespace grpc { -/// A class to represent a flow-controlled unary call. This is something -/// of a hybrid between conventional unary and streaming. This is invoked -/// through a unary call on the client side, but the server responds to it -/// as though it were a single-ping-pong streaming call. The server can use -/// the \a NextMessageSize method to determine an upper-bound on the size of -/// the message. -/// A key difference relative to streaming: an FCUnary must have exactly 1 Read -/// and exactly 1 Write, in that order, to function correctly. -/// Otherwise, the RPC is in error. -template -class FCUnary GRPC_FINAL - : public ServerReaderWriterInterface { - public: - FCUnary(Call* call, ServerContext* ctx) - : ServerReaderWriterInterface(call, ctx), - read_done_(false), - write_done_(false) {} - - ~FCUnary() {} - - bool Read(RequestType* request) GRPC_OVERRIDE { - if (read_done_) { - return false; - } - read_done_ = true; - return ServerReaderWriterInterface::Read( - request); - } - - using WriterInterface::Write; - bool Write(const ResponseType& response, - const WriteOptions& options) GRPC_OVERRIDE { - if (write_done_ || !read_done_) { - return false; - } - write_done_ = true; - return ServerReaderWriterInterface::Write( - response, options); - } - - private: - bool read_done_; - bool write_done_; -}; -} // namespace grpc - -#endif // GRPCXX_IMPL_CODEGEN_FC_UNARY_H diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 9c3af53b3a..3a671fe830 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -35,8 +35,8 @@ #define GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H #include -#include #include +#include #include namespace grpc { @@ -168,20 +168,23 @@ class ServerStreamingHandler : public MethodHandler { }; // A wrapper class of an application provided bidi-streaming handler. -template -class BidiStreamingHandler : public MethodHandler { +// This also applies to server-streamed implementation of a unary method +// with the additional requirement that such methods must have done a +// write for status to be ok +// Since this is used by more than 1 class, the service is not passed in. +// Instead, it is expected to be an implicitly-captured argument of func +// (through bind or something along those lines) +template +class TemplatedBidiStreamingHandler : public MethodHandler { public: - BidiStreamingHandler( - std::function*)> - func, - ServiceType* service) - : func_(func), service_(service) {} + TemplatedBidiStreamingHandler( + std::function + func) + : func_(func), write_needed_(WriteNeeded) {} void RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerReaderWriter stream(param.call, - param.server_context); - Status status = func_(service_, param.server_context, &stream); + Streamer stream(param.call, param.server_context); + Status status = func_(param.server_context, &stream); CallOpSet ops; if (!param.server_context->sent_initial_metadata_) { @@ -190,6 +193,12 @@ class BidiStreamingHandler : public MethodHandler { if (param.server_context->compression_level_set()) { ops.set_compression_level(param.server_context->compression_level()); } + if (write_needed_ && status.ok()) { + // If we needed a write but never did one, we need to mark the + // status as a fail + status = Status(IMPROPER_IMPLEMENTATION, + "Service did not provide response message"); + } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); param.call->PerformOps(&ops); @@ -197,46 +206,24 @@ class BidiStreamingHandler : public MethodHandler { } private: - std::function*)> - func_; - ServiceType* service_; + std::function + func_; + const bool write_needed_; }; -// A wrapper class of an application provided rpc method handler -// specifically to apply to the flow-controlled implementation of a unary -// method. -/// The argument to the constructor should be a member function already -/// bound to the appropriate service instance. The declaration gets too -/// complicated -/// otherwise. template -class FCUnaryMethodHandler : public MethodHandler { + class BidiStreamingHandler : public TemplatedBidiStreamingHandler, false> { public: - FCUnaryMethodHandler( - std::function*)> - func) - : func_(func) {} + BidiStreamingHandler(std::function*)> func, ServiceType* service): TemplatedBidiStreamingHandler,false>(std::bind(func, service, std::placeholders::_1, std::placeholders::_2)) {} + }; - void RunHandler(const HandlerParameter& param) GRPC_FINAL { - FCUnary fc_unary(param.call, - param.server_context); - Status status = func_(param.server_context, &fc_unary); - if (!param.server_context->sent_initial_metadata_) { - // means that the write never happened, which is bad - } else { - CallOpSet ops; - ops.ServerSendStatus(param.server_context->trailing_metadata_, status); - param.call->PerformOps(&ops); - param.call->cq()->Pluck(&ops); - } - } - - private: - // Application provided rpc handler function, already bound to its service. - std::function*)> - func_; -}; + template + class StreamedUnaryHandler : public TemplatedBidiStreamingHandler, true> { + public: + explicit StreamedUnaryHandler(std::function*)> func): TemplatedBidiStreamingHandler, true>(func) {} + }; // Handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler { diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h index aadae893ad..8e101c3f15 100644 --- a/include/grpc++/impl/codegen/server_context.h +++ b/include/grpc++/impl/codegen/server_context.h @@ -75,8 +75,6 @@ template class ServerStreamingHandler; template class BidiStreamingHandler; -template -class FCUnaryMethodHandler; class UnknownMethodHandler; class Call; @@ -192,10 +190,8 @@ class ServerContext { friend class ClientStreamingHandler; template friend class ServerStreamingHandler; - template - friend class BidiStreamingHandler; - template - friend class FCUnaryMethodHandler; + template + friend class TemplatedBidiStreamingHandler; friend class UnknownMethodHandler; friend class ::grpc::ClientContext; diff --git a/include/grpc++/impl/codegen/server_streamed_unary.h b/include/grpc++/impl/codegen/server_streamed_unary.h new file mode 100644 index 0000000000..a23e6020ed --- /dev/null +++ b/include/grpc++/impl/codegen/server_streamed_unary.h @@ -0,0 +1,90 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_CODEGEN_SERVER_STREAMED_UNARY_H +#define GRPCXX_IMPL_CODEGEN_SERVER_STREAMED_UNARY_H + +#include +#include +#include +#include +#include + +namespace grpc { +/// A class to represent a flow-controlled unary call. This is something +/// of a hybrid between conventional unary and streaming. This is invoked +/// through a unary call on the client side, but the server responds to it +/// as though it were a single-ping-pong streaming call. The server can use +/// the \a NextMessageSize method to determine an upper-bound on the size of +/// the message. +/// A key difference relative to streaming: ServerUnaryStreamer +/// must have exactly 1 Read and exactly 1 Write, in that order, to function +/// correctly. Otherwise, the RPC is in error. +template +class ServerUnaryStreamer GRPC_FINAL + : public ServerReaderWriterInterface { + public: + ServerUnaryStreamer(Call* call, ServerContext* ctx) + : ServerReaderWriterInterface(call, ctx), + read_done_(false), + write_done_(false) {} + + ~ServerUnaryStreamer() {} + + bool Read(RequestType* request) GRPC_OVERRIDE { + if (read_done_) { + return false; + } + read_done_ = true; + return ServerReaderWriterInterface::Read( + request); + } + + using WriterInterface::Write; + bool Write(const ResponseType& response, + const WriteOptions& options) GRPC_OVERRIDE { + if (write_done_ || !read_done_) { + return false; + } + write_done_ = true; + return ServerReaderWriterInterface::Write( + response, options); + } + + private: + bool read_done_; + bool write_done_; +}; +} // namespace grpc + +#endif // GRPCXX_IMPL_CODEGEN_SERVER_STREAMED_UNARY_H diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 4af40422a1..9d2a80cbc4 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -147,10 +147,11 @@ class Service { methods_[index].reset(); } - void MarkMethodFCUnary(int index, MethodHandler* fc_unary_method) { + void MarkMethodStreamedUnary(int index, + MethodHandler* streamed_unary_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && - "Cannot mark an async or generic method as FCUnary"); - methods_[index]->SetHandler(fc_unary_method); + "Cannot mark an async or generic method Streamed Unary"); + methods_[index]->SetHandler(streamed_unary_method); // From the server's point of view, streamed unary is a special // case of BIDI_STREAMING that has 1 read and 1 write, in that order. diff --git a/include/grpc++/impl/codegen/status_code_enum.h b/include/grpc++/impl/codegen/status_code_enum.h index 9a90a18e2a..0f18a22c36 100644 --- a/include/grpc++/impl/codegen/status_code_enum.h +++ b/include/grpc++/impl/codegen/status_code_enum.h @@ -143,6 +143,11 @@ enum StatusCode { /// Unrecoverable data loss or corruption. DATA_LOSS = 15, + // Service was improperly implemented, violated a gRPC API requirement + // Not quite the same as unimplemented since it could just be that the API + // requirement was violated in this particular circumstance + IMPROPER_IMPLEMENTATION = 16, + /// Force users to include a default branch: DO_NOT_USE = -1 }; diff --git a/include/grpc++/support/fc_unary.h b/include/grpc++/support/fc_unary.h deleted file mode 100644 index 7e7dea8feb..0000000000 --- a/include/grpc++/support/fc_unary.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_SUPPORT_FC_UNARY_H -#define GRPCXX_SUPPORT_FC_UNARY_H - -#include - -#endif // GRPCXX_SUPPORT_FC_UNARY_H diff --git a/include/grpc++/support/server_streamed_unary.h b/include/grpc++/support/server_streamed_unary.h new file mode 100644 index 0000000000..109dfd4bca --- /dev/null +++ b/include/grpc++/support/server_streamed_unary.h @@ -0,0 +1,39 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_SUPPORT_SERVER_STREAMED_UNARY_H +#define GRPCXX_SUPPORT_SERVER_STREAMED_UNARY_H + +#include + +#endif // GRPCXX_SUPPORT_SERVER_STREAMED_UNARY_H diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index c5d4c2573d..7c70567d12 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -130,10 +130,10 @@ grpc::string GetHeaderIncludes(File *file, const Parameters ¶ms) { static const char *headers_strs[] = { "grpc++/impl/codegen/async_stream.h", "grpc++/impl/codegen/async_unary_call.h", - "grpc++/impl/codegen/fc_unary.h", "grpc++/impl/codegen/method_handler_impl.h", "grpc++/impl/codegen/proto_utils.h", "grpc++/impl/codegen/rpc_method.h", + "grpc++/impl/codegen/server_streamed_unary.h", "grpc++/impl/codegen/service_type.h", "grpc++/impl/codegen/status.h", "grpc++/impl/codegen/stub_options.h", @@ -606,7 +606,7 @@ void PrintHeaderServerMethodAsync(Printer *printer, const Method *method, printer->Print(*vars, "};\n"); } -void PrintHeaderServerMethodFCUnary( +void PrintHeaderServerMethodStreamedUnary( Printer *printer, const Method *method, std::map *vars) { (*vars)["Method"] = method->name(); @@ -615,7 +615,8 @@ void PrintHeaderServerMethodFCUnary( if (method->NoStreaming()) { printer->Print(*vars, "template \n"); printer->Print(*vars, - "class WithFCUnaryMethod_$Method$ : public BaseClass {\n"); + "class WithStreamedUnaryMethod_$Method$ : " + "public BaseClass {\n"); printer->Print( " private:\n" " void BaseClassMustBeDerivedFromService(const Service *service) " @@ -623,17 +624,16 @@ void PrintHeaderServerMethodFCUnary( printer->Print(" public:\n"); printer->Indent(); printer->Print(*vars, - "WithFCUnaryMethod_$Method$() {\n" - " ::grpc::Service::MarkMethodFCUnary($Idx$,\n" - " new ::grpc::FCUnaryMethodHandler(" - "std::bind(&WithFCUnaryMethod_$Method$::FC$" - "Method$, this, std::placeholders::_1, " - "std::placeholders::_2)));\n" + "WithStreamedUnaryMethod_$Method$() {\n" + " ::grpc::Service::MarkMethodStreamedUnary($Idx$,\n" + " new ::grpc::StreamedUnaryHandler<$Request$, " + "$Response$>(std::bind" + "(&WithStreamedUnaryMethod_$Method$::" + "Streamed$Method$, this, std::placeholders::_1, " + "std::placeholders::_2)));\n" "}\n"); printer->Print(*vars, - "~WithFCUnaryMethod_$Method$() GRPC_OVERRIDE {\n" + "~WithStreamedUnaryMethod_$Method$() GRPC_OVERRIDE {\n" " BaseClassMustBeDerivedFromService(this);\n" "}\n"); printer->Print( @@ -646,10 +646,11 @@ void PrintHeaderServerMethodFCUnary( " return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n" "}\n"); printer->Print(*vars, - "// replace default version of this method with FCUnary\n" - "virtual ::grpc::Status FC$Method$(" - "::grpc::ServerContext* context, ::grpc::FCUnary< " - "$Request$,$Response$>* fc_unary)" + "// replace default version of method with streamed unary\n" + "virtual ::grpc::Status Streamed$Method$(" + "::grpc::ServerContext* context, " + "::grpc::ServerUnaryStreamer< " + "$Request$,$Response$>* server_unary_streamer)" " = 0;\n"); printer->Outdent(); printer->Print(*vars, "};\n"); @@ -822,17 +823,18 @@ void PrintHeaderService(Printer *printer, const Service *service, PrintHeaderServerMethodGeneric(printer, service->method(i).get(), vars); } - // Server side - FC Unary + // Server side - Streamed Unary for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); - PrintHeaderServerMethodFCUnary(printer, service->method(i).get(), vars); + PrintHeaderServerMethodStreamedUnary(printer, service->method(i).get(), + vars); } printer->Print("typedef "); for (int i = 0; i < service->method_count(); ++i) { (*vars)["method_name"] = service->method(i).get()->name(); if (service->method(i)->NoStreaming()) { - printer->Print(*vars, "WithFCUnaryMethod_$method_name$<"); + printer->Print(*vars, "WithStreamedUnaryMethod_$method_name$<"); } } printer->Print("Service"); @@ -841,7 +843,7 @@ void PrintHeaderService(Printer *printer, const Service *service, printer->Print(" >"); } } - printer->Print(" FCUnaryService;\n"); + printer->Print(" StreamedUnaryService;\n"); printer->Outdent(); printer->Print("};\n"); @@ -943,9 +945,9 @@ grpc::string GetSourceIncludes(File *file, const Parameters ¶ms) { "grpc++/impl/codegen/async_unary_call.h", "grpc++/impl/codegen/channel_interface.h", "grpc++/impl/codegen/client_unary_call.h", - "grpc++/impl/codegen/fc_unary.h", "grpc++/impl/codegen/method_handler_impl.h", "grpc++/impl/codegen/rpc_service_method.h", + "grpc++/impl/codegen/server_streamed_unary.h", "grpc++/impl/codegen/service_type.h", "grpc++/impl/codegen/sync_stream.h"}; std::vector headers(headers_strs, array_end(headers_strs)); diff --git a/src/cpp/ext/reflection.grpc.pb.cc b/src/cpp/ext/reflection.grpc.pb.cc index f4a0b97d65..6603eadc4e 100644 --- a/src/cpp/ext/reflection.grpc.pb.cc +++ b/src/cpp/ext/reflection.grpc.pb.cc @@ -43,9 +43,9 @@ #include #include #include -#include #include #include +#include #include #include namespace grpc { diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index 1512e99a3c..eb7125cb04 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -421,31 +421,33 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) { request_stream_handler_thread.join(); } -// Add a second service with one sync FCUnary method. -class FCUnaryDupPkg : public duplicate::EchoTestService::WithFCUnaryMethod_Echo< +// Add a second service with one sync streamed unary method. +class StreamedUnaryDupPkg : public + duplicate::EchoTestService::WithStreamedUnaryMethod_Echo< TestServiceImplDupPkg> { public: - Status FCEcho(ServerContext* context, - FCUnary* fc_unary) GRPC_OVERRIDE { + Status StreamedEcho(ServerContext* context, + ServerUnaryStreamer* stream) + GRPC_OVERRIDE { EchoRequest req; EchoResponse resp; uint32_t next_msg_sz; - fc_unary->NextMessageSize(&next_msg_sz); - gpr_log(GPR_INFO, "FC Unary Next Message Size is %u", next_msg_sz); - GPR_ASSERT(fc_unary->Read(&req)); + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); resp.set_message(req.message() + "_dup"); - GPR_ASSERT(fc_unary->Write(resp)); + GPR_ASSERT(stream->Write(resp)); return Status::OK; } }; TEST_F(HybridEnd2endTest, - AsyncRequestStreamResponseStream_SyncFCUnaryDupService) { + AsyncRequestStreamResponseStream_SyncStreamedUnaryDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream< EchoTestService::WithAsyncMethod_ResponseStream> SType; SType service; - FCUnaryDupPkg dup_service; + StreamedUnaryDupPkg dup_service; SetUpServer(&service, &dup_service, nullptr, 8192); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, @@ -458,30 +460,31 @@ TEST_F(HybridEnd2endTest, request_stream_handler_thread.join(); } -// Add a second service that is fully FCUnary -class FullyFCUnaryDupPkg : public duplicate::EchoTestService::FCUnaryService { +// Add a second service that is fully Streamed Unary +class FullyStreamedUnaryDupPkg : public duplicate::EchoTestService::StreamedUnaryService { public: - Status FCEcho(ServerContext* context, - FCUnary* fc_unary) GRPC_OVERRIDE { + Status StreamedEcho(ServerContext* context, + ServerUnaryStreamer* stream) + GRPC_OVERRIDE { EchoRequest req; EchoResponse resp; uint32_t next_msg_sz; - fc_unary->NextMessageSize(&next_msg_sz); - gpr_log(GPR_INFO, "FC Unary Next Message Size is %u", next_msg_sz); - GPR_ASSERT(fc_unary->Read(&req)); + stream->NextMessageSize(&next_msg_sz); + gpr_log(GPR_INFO, "Streamed Unary Next Message Size is %u", next_msg_sz); + GPR_ASSERT(stream->Read(&req)); resp.set_message(req.message() + "_dup"); - GPR_ASSERT(fc_unary->Write(resp)); + GPR_ASSERT(stream->Write(resp)); return Status::OK; } }; TEST_F(HybridEnd2endTest, - AsyncRequestStreamResponseStream_SyncFullyFCUnaryDupService) { + AsyncRequestStreamResponseStream_SyncFullyStreamedUnaryDupService) { typedef EchoTestService::WithAsyncMethod_RequestStream< EchoTestService::WithAsyncMethod_ResponseStream> SType; SType service; - FullyFCUnaryDupPkg dup_service; + FullyStreamedUnaryDupPkg dup_service; SetUpServer(&service, &dup_service, nullptr, 8192); ResetStub(); std::thread response_stream_handler_thread(HandleServerStreaming, -- cgit v1.2.3 From d4d5f4cd53a74f7618f720bffa176b05834eb3ac Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 29 Aug 2016 16:59:21 -0700 Subject: clang-format --- include/grpc++/impl/codegen/method_handler_impl.h | 48 ++++++++++++++--------- include/grpc++/impl/codegen/service_type.h | 2 +- src/compiler/cpp_generator.cc | 10 ++--- test/cpp/end2end/hybrid_end2end_test.cc | 17 ++++---- 4 files changed, 45 insertions(+), 32 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 3a671fe830..ef803483ea 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -178,9 +178,8 @@ template class TemplatedBidiStreamingHandler : public MethodHandler { public: TemplatedBidiStreamingHandler( - std::function - func) - : func_(func), write_needed_(WriteNeeded) {} + std::function func) + : func_(func), write_needed_(WriteNeeded) {} void RunHandler(const HandlerParameter& param) GRPC_FINAL { Streamer stream(param.call, param.server_context); @@ -194,10 +193,10 @@ class TemplatedBidiStreamingHandler : public MethodHandler { ops.set_compression_level(param.server_context->compression_level()); } if (write_needed_ && status.ok()) { - // If we needed a write but never did one, we need to mark the - // status as a fail - status = Status(IMPROPER_IMPLEMENTATION, - "Service did not provide response message"); + // If we needed a write but never did one, we need to mark the + // status as a fail + status = Status(IMPROPER_IMPLEMENTATION, + "Service did not provide response message"); } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); @@ -206,24 +205,37 @@ class TemplatedBidiStreamingHandler : public MethodHandler { } private: - std::function - func_; + std::function func_; const bool write_needed_; }; template - class BidiStreamingHandler : public TemplatedBidiStreamingHandler, false> { +class BidiStreamingHandler + : public TemplatedBidiStreamingHandler< + ServerReaderWriter, false> { public: - BidiStreamingHandler(std::function*)> func, ServiceType* service): TemplatedBidiStreamingHandler,false>(std::bind(func, service, std::placeholders::_1, std::placeholders::_2)) {} - }; + BidiStreamingHandler( + std::function*)> + func, + ServiceType* service) + : TemplatedBidiStreamingHandler< + ServerReaderWriter, false>(std::bind( + func, service, std::placeholders::_1, std::placeholders::_2)) {} +}; - template - class StreamedUnaryHandler : public TemplatedBidiStreamingHandler, true> { +template +class StreamedUnaryHandler + : public TemplatedBidiStreamingHandler< + ServerUnaryStreamer, true> { public: - explicit StreamedUnaryHandler(std::function*)> func): TemplatedBidiStreamingHandler, true>(func) {} - }; + explicit StreamedUnaryHandler( + std::function*)> + func) + : TemplatedBidiStreamingHandler< + ServerUnaryStreamer, true>(func) {} +}; // Handle unknown method by returning UNIMPLEMENTED error. class UnknownMethodHandler : public MethodHandler { diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h index 9d2a80cbc4..72b2225312 100644 --- a/include/grpc++/impl/codegen/service_type.h +++ b/include/grpc++/impl/codegen/service_type.h @@ -148,7 +148,7 @@ class Service { } void MarkMethodStreamedUnary(int index, - MethodHandler* streamed_unary_method) { + MethodHandler* streamed_unary_method) { GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() && "Cannot mark an async or generic method Streamed Unary"); methods_[index]->SetHandler(streamed_unary_method); diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 7c70567d12..744d3d62e8 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -616,7 +616,7 @@ void PrintHeaderServerMethodStreamedUnary( printer->Print(*vars, "template \n"); printer->Print(*vars, "class WithStreamedUnaryMethod_$Method$ : " - "public BaseClass {\n"); + "public BaseClass {\n"); printer->Print( " private:\n" " void BaseClassMustBeDerivedFromService(const Service *service) " @@ -628,9 +628,9 @@ void PrintHeaderServerMethodStreamedUnary( " ::grpc::Service::MarkMethodStreamedUnary($Idx$,\n" " new ::grpc::StreamedUnaryHandler<$Request$, " "$Response$>(std::bind" - "(&WithStreamedUnaryMethod_$Method$::" + "(&WithStreamedUnaryMethod_$Method$::" "Streamed$Method$, this, std::placeholders::_1, " - "std::placeholders::_2)));\n" + "std::placeholders::_2)));\n" "}\n"); printer->Print(*vars, "~WithStreamedUnaryMethod_$Method$() GRPC_OVERRIDE {\n" @@ -649,7 +649,7 @@ void PrintHeaderServerMethodStreamedUnary( "// replace default version of method with streamed unary\n" "virtual ::grpc::Status Streamed$Method$(" "::grpc::ServerContext* context, " - "::grpc::ServerUnaryStreamer< " + "::grpc::ServerUnaryStreamer< " "$Request$,$Response$>* server_unary_streamer)" " = 0;\n"); printer->Outdent(); @@ -827,7 +827,7 @@ void PrintHeaderService(Printer *printer, const Service *service, for (int i = 0; i < service->method_count(); ++i) { (*vars)["Idx"] = as_string(i); PrintHeaderServerMethodStreamedUnary(printer, service->method(i).get(), - vars); + vars); } printer->Print("typedef "); diff --git a/test/cpp/end2end/hybrid_end2end_test.cc b/test/cpp/end2end/hybrid_end2end_test.cc index eb7125cb04..a6ea13aa8b 100644 --- a/test/cpp/end2end/hybrid_end2end_test.cc +++ b/test/cpp/end2end/hybrid_end2end_test.cc @@ -422,13 +422,13 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream_SyncDupService) { } // Add a second service with one sync streamed unary method. -class StreamedUnaryDupPkg : public - duplicate::EchoTestService::WithStreamedUnaryMethod_Echo< - TestServiceImplDupPkg> { +class StreamedUnaryDupPkg + : public duplicate::EchoTestService::WithStreamedUnaryMethod_Echo< + TestServiceImplDupPkg> { public: Status StreamedEcho(ServerContext* context, - ServerUnaryStreamer* stream) - GRPC_OVERRIDE { + ServerUnaryStreamer* stream) + GRPC_OVERRIDE { EchoRequest req; EchoResponse resp; uint32_t next_msg_sz; @@ -461,11 +461,12 @@ TEST_F(HybridEnd2endTest, } // Add a second service that is fully Streamed Unary -class FullyStreamedUnaryDupPkg : public duplicate::EchoTestService::StreamedUnaryService { +class FullyStreamedUnaryDupPkg + : public duplicate::EchoTestService::StreamedUnaryService { public: Status StreamedEcho(ServerContext* context, - ServerUnaryStreamer* stream) - GRPC_OVERRIDE { + ServerUnaryStreamer* stream) + GRPC_OVERRIDE { EchoRequest req; EchoResponse resp; uint32_t next_msg_sz; -- cgit v1.2.3 From 77073ce99fae96f736d5df033c581ad6a1bc6804 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 30 Aug 2016 10:26:30 -0700 Subject: clang-format --- include/grpc++/impl/codegen/sync_stream.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 5c775ec957..e1d4660ae7 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -457,7 +457,8 @@ namespace internal { template class ServerReaderWriterBody GRPC_FINAL { public: - ServerReaderWriterBody(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {} + ServerReaderWriterBody(Call* call, ServerContext* ctx) + : call_(call), ctx_(ctx) {} void SendInitialMetadata() { GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); @@ -512,8 +513,7 @@ class ServerReaderWriterBody GRPC_FINAL { template class ServerReaderWriter GRPC_FINAL : public ServerReaderWriterInterface { public: - ServerReaderWriter(Call* call, ServerContext* ctx) - : body_(call, ctx) {} + ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {} void SendInitialMetadata() GRPC_OVERRIDE { body_.SendInitialMetadata(); } @@ -527,9 +527,9 @@ class ServerReaderWriter GRPC_FINAL : public ServerReaderWriterInterface { bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE { return body_.Write(msg, options); } - + private: - internal::ServerReaderWriterBody body_; + internal::ServerReaderWriterBody body_; }; /// A class to represent a flow-controlled unary call. This is something @@ -573,7 +573,7 @@ class ServerUnaryStreamer GRPC_FINAL } private: - internal::ServerReaderWriterBody body_; + internal::ServerReaderWriterBody body_; bool read_done_; bool write_done_; }; -- cgit v1.2.3 From 84033b1f6b34edd464be22967f23de6d07bf4316 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 30 Aug 2016 20:50:43 -0700 Subject: Stick to StatusCode::INTERNAL when there's no service response on a StreamedUnary --- include/grpc++/impl/codegen/method_handler_impl.h | 4 ++-- include/grpc++/impl/codegen/status_code_enum.h | 5 ----- 2 files changed, 2 insertions(+), 7 deletions(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 170852606f..3f25b546da 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -194,8 +194,8 @@ class TemplatedBidiStreamingHandler : public MethodHandler { if (write_needed_ && status.ok()) { // If we needed a write but never did one, we need to mark the // status as a fail - status = Status(IMPROPER_IMPLEMENTATION, - "Service did not provide response message"); + status = Status(StatusCode::INTERNAL, + "Service did not provide response message"); } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); diff --git a/include/grpc++/impl/codegen/status_code_enum.h b/include/grpc++/impl/codegen/status_code_enum.h index 0f18a22c36..9a90a18e2a 100644 --- a/include/grpc++/impl/codegen/status_code_enum.h +++ b/include/grpc++/impl/codegen/status_code_enum.h @@ -143,11 +143,6 @@ enum StatusCode { /// Unrecoverable data loss or corruption. DATA_LOSS = 15, - // Service was improperly implemented, violated a gRPC API requirement - // Not quite the same as unimplemented since it could just be that the API - // requirement was violated in this particular circumstance - IMPROPER_IMPLEMENTATION = 16, - /// Force users to include a default branch: DO_NOT_USE = -1 }; -- cgit v1.2.3 From 5c9a3438c56152b33f320494bc16bdbb2acf4232 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 30 Aug 2016 20:52:32 -0700 Subject: clang-format --- include/grpc++/impl/codegen/method_handler_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'include/grpc++/impl/codegen') diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h index 3f25b546da..d989263252 100644 --- a/include/grpc++/impl/codegen/method_handler_impl.h +++ b/include/grpc++/impl/codegen/method_handler_impl.h @@ -195,7 +195,7 @@ class TemplatedBidiStreamingHandler : public MethodHandler { // If we needed a write but never did one, we need to mark the // status as a fail status = Status(StatusCode::INTERNAL, - "Service did not provide response message"); + "Service did not provide response message"); } } ops.ServerSendStatus(param.server_context->trailing_metadata_, status); -- cgit v1.2.3