diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/config.h | 7 | ||||
-rw-r--r-- | src/compiler/cpp_generator.cc | 194 | ||||
-rw-r--r-- | src/cpp/client/channel.cc | 11 | ||||
-rw-r--r-- | src/cpp/client/channel.h | 10 | ||||
-rw-r--r-- | src/cpp/client/client_unary_call.cc | 64 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 307 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.cc | 26 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.h | 55 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 240 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 21 |
10 files changed, 226 insertions, 709 deletions
diff --git a/src/compiler/config.h b/src/compiler/config.h index e81de8d6c8..cd52aca57d 100644 --- a/src/compiler/config.h +++ b/src/compiler/config.h @@ -35,6 +35,7 @@ #define SRC_COMPILER_CONFIG_H #include <grpc++/config.h> +#include <grpc++/config_protobuf.h> #ifndef GRPC_CUSTOM_DESCRIPTOR #include <google/protobuf/descriptor.h> @@ -48,7 +49,8 @@ #ifndef GRPC_CUSTOM_CODEGENERATOR #include <google/protobuf/compiler/code_generator.h> #define GRPC_CUSTOM_CODEGENERATOR ::google::protobuf::compiler::CodeGenerator -#define GRPC_CUSTOM_GENERATORCONTEXT ::google::protobuf::compiler::GeneratorContext +#define GRPC_CUSTOM_GENERATORCONTEXT \ + ::google::protobuf::compiler::GeneratorContext #endif #ifndef GRPC_CUSTOM_PRINTER @@ -57,7 +59,8 @@ #include <google/protobuf/io/zero_copy_stream_impl_lite.h> #define GRPC_CUSTOM_PRINTER ::google::protobuf::io::Printer #define GRPC_CUSTOM_CODEDOUTPUTSTREAM ::google::protobuf::io::CodedOutputStream -#define GRPC_CUSTOM_STRINGOUTPUTSTREAM ::google::protobuf::io::StringOutputStream +#define GRPC_CUSTOM_STRINGOUTPUTSTREAM \ + ::google::protobuf::io::StringOutputStream #endif #ifndef GRPC_CUSTOM_PLUGINMAIN diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 6cd615019b..75659947df 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -97,7 +97,8 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file, vars["filename_base"] = grpc_generator::StripProto(file->name()); printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n"); - printer.Print(vars, "// If you make any local change, they will be lost.\n"); + printer.Print(vars, + "// If you make any local change, they will be lost.\n"); printer.Print(vars, "// source: $filename$\n"); printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n"); printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n"); @@ -113,6 +114,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, grpc::string temp = "#include <grpc++/impl/internal_stub.h>\n" "#include <grpc++/impl/rpc_method.h>\n" + "#include <grpc++/impl/proto_utils.h>\n" "#include <grpc++/impl/service_type.h>\n" "#include <grpc++/async_unary_call.h>\n" "#include <grpc++/status.h>\n" @@ -141,10 +143,10 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, return temp; } -void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer, - const grpc::protobuf::MethodDescriptor *method, - std::map<grpc::string, grpc::string> *vars, - bool is_public) { +void PrintHeaderClientMethodInterfaces( + grpc::protobuf::io::Printer *printer, + const grpc::protobuf::MethodDescriptor *method, + std::map<grpc::string, grpc::string> *vars, bool is_public) { (*vars)["Method"] = method->name(); (*vars)["Request"] = grpc_cpp_generator::ClassName(method->input_type(), true); @@ -157,19 +159,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer, *vars, "virtual ::grpc::Status $Method$(::grpc::ClientContext* context, " "const $Request$& request, $Response$* response) = 0;\n"); - printer->Print( - *vars, - "std::unique_ptr< " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) {\n"); + printer->Print(*vars, + "std::unique_ptr< " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>> " + "Async$Method$(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" - "Async$Method$Raw(context, request, cq));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncResponseReaderInterface< $Response$>>(" + "Async$Method$Raw(context, request, cq));\n"); printer->Outdent(); printer->Print("}\n"); } else if (ClientOnlyStreaming(method)) { @@ -188,14 +188,14 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer, printer->Print( *vars, "std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>" - " Async$Method$(::grpc::ClientContext* context, $Response$* response, " + " Async$Method$(::grpc::ClientContext* context, $Response$* " + "response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncWriterInterface< $Request$>>(" - "Async$Method$Raw(context, response, cq, tag));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncWriterInterface< $Request$>>(" + "Async$Method$Raw(context, response, cq, tag));\n"); printer->Outdent(); printer->Print("}\n"); } else if (ServerOnlyStreaming(method)) { @@ -218,18 +218,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer, "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderInterface< $Response$>>(" - "Async$Method$Raw(context, request, cq, tag));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderInterface< $Response$>>(" + "Async$Method$Raw(context, request, cq, tag));\n"); printer->Outdent(); printer->Print("}\n"); } else if (BidiStreaming(method)) { - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientReaderWriterInterface< $Request$, $Response$>> " - "$Method$(::grpc::ClientContext* context) {\n"); + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientReaderWriterInterface< " + "$Request$, $Response$>> " + "$Method$(::grpc::ClientContext* context) {\n"); printer->Indent(); printer->Print( *vars, @@ -267,12 +266,11 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer, "virtual ::grpc::ClientWriterInterface< $Request$>*" " $Method$Raw(" "::grpc::ClientContext* context, $Response$* response) = 0;\n"); - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*" - " Async$Method$Raw(::grpc::ClientContext* context, " - "$Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + printer->Print(*vars, + "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*" + " Async$Method$Raw(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -285,16 +283,15 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer, "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); } else if (BidiStreaming(method)) { - printer->Print( - *vars, - "virtual ::grpc::ClientReaderWriterInterface< $Request$, $Response$>* " - "$Method$Raw(::grpc::ClientContext* context) = 0;\n"); - printer->Print( - *vars, - "virtual ::grpc::ClientAsyncReaderWriterInterface< " - "$Request$, $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); + printer->Print(*vars, + "virtual ::grpc::ClientReaderWriterInterface< $Request$, " + "$Response$>* " + "$Method$Raw(::grpc::ClientContext* context) = 0;\n"); + printer->Print(*vars, + "virtual ::grpc::ClientAsyncReaderWriterInterface< " + "$Request$, $Response$>* " + "Async$Method$Raw(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq, void* tag) = 0;\n"); } } } @@ -321,11 +318,10 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, "const $Request$& request, " "::grpc::CompletionQueue* cq) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncResponseReader< $Response$>>(" - "Async$Method$Raw(context, request, cq));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncResponseReader< $Response$>>(" + "Async$Method$Raw(context, request, cq));\n"); printer->Outdent(); printer->Print("}\n"); } else if (ClientOnlyStreaming(method)) { @@ -335,17 +331,16 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, " $Method$(" "::grpc::ClientContext* context, $Response$* response) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< ::grpc::ClientWriter< $Request$>>" - "($Method$Raw(context, response));\n"); + printer->Print(*vars, + "return std::unique_ptr< ::grpc::ClientWriter< $Request$>>" + "($Method$Raw(context, response));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>" - " Async$Method$(::grpc::ClientContext* context, $Response$* response, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>" + " Async$Method$(::grpc::ClientContext* context, " + "$Response$* response, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Indent(); printer->Print( *vars, @@ -385,53 +380,47 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer, "std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>>" " $Method$(::grpc::ClientContext* context) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientReaderWriter< $Request$, $Response$>>(" - "$Method$Raw(context));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientReaderWriter< $Request$, $Response$>>(" + "$Method$Raw(context));\n"); printer->Outdent(); printer->Print("}\n"); - printer->Print( - *vars, - "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " - "$Request$, $Response$>> " - "Async$Method$(::grpc::ClientContext* context, " - "::grpc::CompletionQueue* cq, void* tag) {\n"); + printer->Print(*vars, + "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< " + "$Request$, $Response$>> " + "Async$Method$(::grpc::ClientContext* context, " + "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Indent(); - printer->Print( - *vars, - "return std::unique_ptr< " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>(" - "Async$Method$Raw(context, cq, tag));\n"); + printer->Print(*vars, + "return std::unique_ptr< " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>(" + "Async$Method$Raw(context, cq, tag));\n"); printer->Outdent(); printer->Print("}\n"); } } else { if (NoStreaming(method)) { - printer->Print( - *vars, - "::grpc::ClientAsyncResponseReader< $Response$>* " - "Async$Method$Raw(::grpc::ClientContext* context, " - "const $Request$& request, " - "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n"); + printer->Print(*vars, + "::grpc::ClientAsyncResponseReader< $Response$>* " + "Async$Method$Raw(::grpc::ClientContext* context, " + "const $Request$& request, " + "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n"); } else if (ClientOnlyStreaming(method)) { - printer->Print( - *vars, - "::grpc::ClientWriter< $Request$>* $Method$Raw(" - "::grpc::ClientContext* context, $Response$* response) " - "GRPC_OVERRIDE;\n"); + printer->Print(*vars, + "::grpc::ClientWriter< $Request$>* $Method$Raw(" + "::grpc::ClientContext* context, $Response$* response) " + "GRPC_OVERRIDE;\n"); printer->Print( *vars, "::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw(" "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n"); } else if (ServerOnlyStreaming(method)) { - printer->Print( - *vars, - "::grpc::ClientReader< $Response$>* $Method$Raw(" - "::grpc::ClientContext* context, const $Request$& request)" - " GRPC_OVERRIDE;\n"); + printer->Print(*vars, + "::grpc::ClientReader< $Response$>* $Method$Raw(" + "::grpc::ClientContext* context, const $Request$& request)" + " GRPC_OVERRIDE;\n"); printer->Print( *vars, "::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw(" @@ -629,7 +618,7 @@ grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file, const Parameters ¶ms) { grpc::string output; { - // Scope the output stream so it closes and finalizes output to the string. + // Scope the output stream so it closes and finalizes output to the string. grpc::protobuf::io::StringOutputStream output_stream(&output); grpc::protobuf::io::Printer printer(&output_stream, '$'); std::map<grpc::string, grpc::string> vars; @@ -693,7 +682,8 @@ grpc::string GetSourcePrologue(const grpc::protobuf::FileDescriptor *file, vars["filename_base"] = grpc_generator::StripProto(file->name()); printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n"); - printer.Print(vars, "// If you make any local change, they will be lost.\n"); + printer.Print(vars, + "// If you make any local change, they will be lost.\n"); printer.Print(vars, "// source: $filename$\n\n"); printer.Print(vars, "#include \"$filename_base$.pb.h\"\n"); printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n"); @@ -1056,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -1066,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::CLIENT_STREAMING,\n" " new ::grpc::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -1076,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::SERVER_STREAMING,\n" " new ::grpc::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, @@ -1086,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::BIDI_STREAMING,\n" " new ::grpc::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } } printer->Print("return service_;\n"); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 475a20d883..6e6278cb05 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -41,7 +41,6 @@ #include <grpc/support/slice.h> #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" #include <grpc++/channel_arguments.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> @@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, return Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { +void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; - size_t nops = MAX_OPS; - grpc_op ops[MAX_OPS]; + size_t nops = 0; + grpc_op cops[MAX_OPS]; GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); - buf->FillOps(ops, &nops); + ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), ops, nops, buf)); + grpc_call_start_batch(call->call(), cops, nops, ops)); GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); } diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index cd239247c8..9108713c58 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -44,22 +44,22 @@ struct grpc_channel; namespace grpc { class Call; -class CallOpBuffer; +class CallOpSetInterface; class ChannelArguments; class CompletionQueue; class Credentials; class StreamContextInterface; -class Channel GRPC_FINAL : public GrpcLibrary, - public ChannelInterface { +class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface { public: Channel(const grpc::string& target, grpc_channel* c_channel); ~Channel() GRPC_OVERRIDE; - virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE; + virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE; virtual Call CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) GRPC_OVERRIDE; - virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, + Call* call) GRPC_OVERRIDE; private: const grpc::string target_; diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc deleted file mode 100644 index 55e589306f..0000000000 --- a/src/cpp/client/client_unary_call.cc +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc++/impl/client_unary_call.h> -#include <grpc++/impl/call.h> -#include <grpc++/channel_interface.h> -#include <grpc++/client_context.h> -#include <grpc++/completion_queue.h> -#include <grpc++/status.h> -#include <grpc/support/log.h> - -namespace grpc { - -// Wrapper that performs a blocking unary call -Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result) { - CompletionQueue cq; - Call call(channel->CreateCall(method, context, &cq)); - CallOpBuffer buf; - Status status; - buf.AddSendInitialMetadata(context); - buf.AddSendMessage(request); - buf.AddRecvInitialMetadata(context); - buf.AddRecvMessage(result); - buf.AddClientSendClose(); - buf.AddClientRecvStatus(context, &status); - call.PerformOps(&buf); - GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.ok()); - return status; -} - -} // namespace grpc diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index edce6396bd..0a5c976e01 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -39,107 +39,32 @@ #include <grpc++/channel_interface.h> #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" namespace grpc { -CallOpBuffer::CallOpBuffer() - : return_tag_(this), - send_initial_metadata_(false), - initial_metadata_count_(0), - initial_metadata_(nullptr), - recv_initial_metadata_(nullptr), - send_message_(nullptr), - send_message_buffer_(nullptr), - send_buf_(nullptr), - recv_message_(nullptr), - recv_message_buffer_(nullptr), - recv_buf_(nullptr), - max_message_size_(-1), - client_send_close_(false), - recv_trailing_metadata_(nullptr), - recv_status_(nullptr), - status_code_(GRPC_STATUS_OK), - status_details_(nullptr), - status_details_capacity_(0), - send_status_available_(false), - send_status_code_(GRPC_STATUS_OK), - trailing_metadata_count_(0), - trailing_metadata_(nullptr), - cancelled_buf_(0), - recv_closed_(nullptr) { - memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_)); - memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_)); - recv_trailing_metadata_arr_.metadata = nullptr; - recv_initial_metadata_arr_.metadata = nullptr; -} - -void CallOpBuffer::Reset(void* next_return_tag) { - return_tag_ = next_return_tag; - - send_initial_metadata_ = false; - initial_metadata_count_ = 0; - gpr_free(initial_metadata_); - - recv_initial_metadata_ = nullptr; - recv_initial_metadata_arr_.count = 0; - - if (send_buf_ && send_message_) { - grpc_byte_buffer_destroy(send_buf_); - } - send_message_ = nullptr; - send_message_buffer_ = nullptr; - send_buf_ = nullptr; - - got_message = false; - if (recv_buf_ && recv_message_) { - grpc_byte_buffer_destroy(recv_buf_); - } - recv_message_ = nullptr; - recv_message_buffer_ = nullptr; - recv_buf_ = nullptr; - - client_send_close_ = false; - - recv_trailing_metadata_ = nullptr; - recv_status_ = nullptr; - recv_trailing_metadata_arr_.count = 0; - - status_code_ = GRPC_STATUS_OK; - - send_status_available_ = false; - send_status_code_ = GRPC_STATUS_OK; - send_status_details_.clear(); - trailing_metadata_count_ = 0; - trailing_metadata_ = nullptr; - - recv_closed_ = nullptr; -} - -CallOpBuffer::~CallOpBuffer() { - gpr_free(status_details_); - gpr_free(recv_initial_metadata_arr_.metadata); - gpr_free(recv_trailing_metadata_arr_.metadata); - if (recv_buf_ && recv_message_) { - grpc_byte_buffer_destroy(recv_buf_); - } - if (send_buf_ && send_message_) { - grpc_byte_buffer_destroy(send_buf_); +void FillMetadataMap(grpc_metadata_array* arr, + std::multimap<grpc::string, grpc::string>* metadata) { + for (size_t i = 0; i < arr->count; i++) { + // TODO(yangg) handle duplicates? + metadata->insert(std::pair<grpc::string, grpc::string>( + arr->metadata[i].key, + grpc::string(arr->metadata[i].value, arr->metadata[i].value_length))); } + grpc_metadata_array_destroy(arr); + grpc_metadata_array_init(arr); } -namespace { // TODO(yangg) if the map is changed before we send, the pointers will be a // mess. Make sure it does not happen. grpc_metadata* FillMetadataArray( - std::multimap<grpc::string, grpc::string>* metadata) { - if (metadata->empty()) { + const std::multimap<grpc::string, grpc::string>& metadata) { + if (metadata.empty()) { return nullptr; } grpc_metadata* metadata_array = - (grpc_metadata*)gpr_malloc(metadata->size() * sizeof(grpc_metadata)); + (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata)); size_t i = 0; - for (auto iter = metadata->cbegin(); iter != metadata->cend(); ++iter, ++i) { + for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { metadata_array[i].key = iter->first.c_str(); metadata_array[i].value = iter->second.c_str(); metadata_array[i].value_length = iter->second.size(); @@ -147,206 +72,6 @@ grpc_metadata* FillMetadataArray( return metadata_array; } -void FillMetadataMap(grpc_metadata_array* arr, - std::multimap<grpc::string, grpc::string>* metadata) { - for (size_t i = 0; i < arr->count; i++) { - // TODO(yangg) handle duplicates? - metadata->insert(std::pair<grpc::string, grpc::string>( - arr->metadata[i].key, - grpc::string(arr->metadata[i].value, arr->metadata[i].value_length))); - } - grpc_metadata_array_destroy(arr); - grpc_metadata_array_init(arr); -} -} // namespace - -void CallOpBuffer::AddSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata) { - send_initial_metadata_ = true; - initial_metadata_count_ = metadata->size(); - initial_metadata_ = FillMetadataArray(metadata); -} - -void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) { - ctx->initial_metadata_received_ = true; - recv_initial_metadata_ = &ctx->recv_initial_metadata_; -} - -void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) { - AddSendInitialMetadata(&ctx->send_initial_metadata_); -} - -void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) { - send_message_ = &message; -} - -void CallOpBuffer::AddSendMessage(const ByteBuffer& message) { - send_message_buffer_ = &message; -} - -void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) { - recv_message_ = message; - recv_message_->Clear(); -} - -void CallOpBuffer::AddRecvMessage(ByteBuffer* message) { - recv_message_buffer_ = message; - recv_message_buffer_->Clear(); -} - -void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; } - -void CallOpBuffer::AddServerRecvClose(bool* cancelled) { - recv_closed_ = cancelled; -} - -void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) { - recv_trailing_metadata_ = &context->trailing_metadata_; - recv_status_ = status; -} - -void CallOpBuffer::AddServerSendStatus( - std::multimap<grpc::string, grpc::string>* metadata, const Status& status) { - if (metadata != NULL) { - trailing_metadata_count_ = metadata->size(); - trailing_metadata_ = FillMetadataArray(metadata); - } else { - trailing_metadata_count_ = 0; - } - send_status_available_ = true; - send_status_code_ = static_cast<grpc_status_code>(status.error_code()); - send_status_details_ = status.error_message(); -} - -void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) { - *nops = 0; - if (send_initial_metadata_) { - ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[*nops].data.send_initial_metadata.count = initial_metadata_count_; - ops[*nops].data.send_initial_metadata.metadata = initial_metadata_; - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_initial_metadata_) { - ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_; - ops[*nops].flags = 0; - (*nops)++; - } - if (send_message_ || send_message_buffer_) { - if (send_message_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_SERIALIZE, 0); - bool success = SerializeProto(*send_message_, &send_buf_); - if (!success) { - abort(); - // TODO handle parse failure - } - GRPC_TIMER_END(GRPC_PTAG_PROTO_SERIALIZE, 0); - } else { - send_buf_ = send_message_buffer_->buffer(); - } - ops[*nops].op = GRPC_OP_SEND_MESSAGE; - ops[*nops].data.send_message = send_buf_; - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_message_ || recv_message_buffer_) { - ops[*nops].op = GRPC_OP_RECV_MESSAGE; - ops[*nops].data.recv_message = &recv_buf_; - ops[*nops].flags = 0; - (*nops)++; - } - if (client_send_close_) { - ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_status_) { - ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[*nops].data.recv_status_on_client.trailing_metadata = - &recv_trailing_metadata_arr_; - ops[*nops].data.recv_status_on_client.status = &status_code_; - ops[*nops].data.recv_status_on_client.status_details = &status_details_; - ops[*nops].data.recv_status_on_client.status_details_capacity = - &status_details_capacity_; - ops[*nops].flags = 0; - (*nops)++; - } - if (send_status_available_) { - ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER; - ops[*nops].data.send_status_from_server.trailing_metadata_count = - trailing_metadata_count_; - ops[*nops].data.send_status_from_server.trailing_metadata = - trailing_metadata_; - ops[*nops].data.send_status_from_server.status = send_status_code_; - ops[*nops].data.send_status_from_server.status_details = - send_status_details_.empty() ? nullptr : send_status_details_.c_str(); - ops[*nops].flags = 0; - (*nops)++; - } - if (recv_closed_) { - ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER; - ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_; - ops[*nops].flags = 0; - (*nops)++; - } -} - -bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { - // Release send buffers. - if (send_buf_ && send_message_) { - if (send_message_) { - grpc_byte_buffer_destroy(send_buf_); - } - send_buf_ = nullptr; - } - if (initial_metadata_) { - gpr_free(initial_metadata_); - initial_metadata_ = nullptr; - } - if (trailing_metadata_count_) { - gpr_free(trailing_metadata_); - trailing_metadata_ = nullptr; - } - // Set user-facing tag. - *tag = return_tag_; - // Process received initial metadata - if (recv_initial_metadata_) { - FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); - } - // Parse received message if any. - if (recv_message_ || recv_message_buffer_) { - if (recv_buf_) { - got_message = *status; - if (recv_message_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, 0); - *status = *status && - DeserializeProto(recv_buf_, recv_message_, max_message_size_); - grpc_byte_buffer_destroy(recv_buf_); - GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, 0); - } else { - recv_message_buffer_->set_buffer(recv_buf_); - } - recv_buf_ = nullptr; - } else { - // Read failed - got_message = false; - *status = false; - } - } - // Parse received status. - if (recv_status_) { - FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_); - *recv_status_ = Status( - static_cast<StatusCode>(status_code_), - status_details_ ? grpc::string(status_details_) : grpc::string()); - } - if (recv_closed_) { - *recv_closed_ = cancelled_buf_ != 0; - } - return true; -} - Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {} @@ -357,11 +82,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, call_(call), max_message_size_(max_message_size) {} -void Call::PerformOps(CallOpBuffer* buffer) { +void Call::PerformOps(CallOpSetInterface* ops) { if (max_message_size_ > 0) { - buffer->set_max_message_size(max_message_size_); + ops->set_max_message_size(max_message_size_); } - call_hook_->PerformOpsOnCall(buffer, this); + call_hook_->PerformOpsOnCall(ops, this); } } // namespace grpc diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index f4cf5cf17a..268e4f6d1f 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -31,7 +31,7 @@ * */ -#include "src/cpp/proto/proto_utils.h" +#include <grpc++/impl/proto_utils.h> #include <grpc++/config.h> #include <grpc/grpc.h> @@ -67,7 +67,7 @@ class GrpcBufferWriter GRPC_FINAL slice_ = gpr_slice_malloc(block_size_); } *data = GPR_SLICE_START_PTR(slice_); - byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + byte_count_ += * size = GPR_SLICE_LENGTH(slice_); gpr_slice_buffer_add(slice_buffer_, slice_); return true; } @@ -118,7 +118,7 @@ class GrpcBufferReader GRPC_FINAL } gpr_slice_unref(slice_); *data = GPR_SLICE_START_PTR(slice_); - byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + byte_count_ += * size = GPR_SLICE_LENGTH(slice_); return true; } @@ -152,20 +152,28 @@ class GrpcBufferReader GRPC_FINAL namespace grpc { -bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { +Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { GrpcBufferWriter writer(bp); - return msg.SerializeToZeroCopyStream(&writer); + return msg.SerializeToZeroCopyStream(&writer) ? Status::OK : Status(INVALID_ARGUMENT, "Failed to serialize message"); } -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, - int max_message_size) { - if (!buffer) return false; +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size) { + if (!buffer) { + return Status(INVALID_ARGUMENT, "No payload"); + } GrpcBufferReader reader(buffer); ::grpc::protobuf::io::CodedInputStream decoder(&reader); if (max_message_size > 0) { decoder.SetTotalBytesLimit(max_message_size, max_message_size); } - return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage(); + if (!msg->ParseFromCodedStream(&decoder)) { + return Status(INVALID_ARGUMENT, msg->InitializationErrorString()); + } + if (!decoder.ConsumedEntireMessage()) { + return Status(INVALID_ARGUMENT, "Did not read entire message"); + } + return Status::OK; } } // namespace grpc diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h deleted file mode 100644 index 67a775b3ca..0000000000 --- a/src/cpp/proto/proto_utils.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H -#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H - -#include <grpc++/config.h> - -struct grpc_byte_buffer; - -namespace grpc { - -// Serialize the msg into a buffer created inside the function. The caller -// should destroy the returned buffer when done with it. If serialization fails, -// false is returned and buffer is left unchanged. -bool SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** buffer); - -// The caller keeps ownership of buffer and msg. -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, - int max_message_size); - -} // namespace grpc - -#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 024537c34a..31b6a0ee00 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -48,7 +48,6 @@ #include <grpc++/time.h> #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" namespace grpc { @@ -69,16 +68,11 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == RpcMethod::SERVER_STREAMING), - has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::CLIENT_STREAMING), cq_(nullptr) { grpc_metadata_array_init(&request_metadata_); } - ~SyncRequest() { - grpc_metadata_array_destroy(&request_metadata_); - } + ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); } static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { void* tag = nullptr; @@ -91,9 +85,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } - void SetupRequest() { - cq_ = grpc_completion_queue_create(); - } + void SetupRequest() { cq_ = grpc_completion_queue_create(); } void TeardownRequest() { grpc_completion_queue_destroy(cq_); @@ -125,7 +117,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), - has_response_payload_(mrd->has_response_payload_), request_payload_(mrd->request_payload_), method_(mrd->method_) { ctx_.call_ = mrd->call_; @@ -142,35 +133,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } void Run() { - std::unique_ptr<grpc::protobuf::Message> req; - std::unique_ptr<grpc::protobuf::Message> res; - if (has_request_payload_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call()); - req.reset(method_->AllocateRequestProto()); - if (!DeserializeProto(request_payload_, req.get(), - call_.max_message_size())) { - // FIXME(yangg) deal with deserialization failure - cq_.Shutdown(); - return; - } - GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call()); - } - if (has_response_payload_) { - res.reset(method_->AllocateResponseProto()); - } ctx_.BeginCompletionOp(&call_); - auto status = method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); - CallOpBuffer buf; - if (!ctx_.sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_.initial_metadata_); - } - if (has_response_payload_) { - buf.AddSendMessage(*res); - } - buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); - call_.PerformOps(&buf); - cq_.Pluck(&buf); /* status ignored */ + method_->handler()->RunHandler(MethodHandler::HandlerParameter( + &call_, &ctx_, request_payload_, call_.max_message_size())); + request_payload_ = nullptr; void* ignored_tag; bool ignored_ok; cq_.Shutdown(); @@ -182,7 +148,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { Call call_; ServerContext ctx_; const bool has_request_payload_; - const bool has_response_payload_; grpc_byte_buffer* request_payload_; RpcServiceMethod* const method_; }; @@ -192,7 +157,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; bool in_flight_; const bool has_request_payload_; - const bool has_response_payload_; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; @@ -260,9 +224,9 @@ bool Server::RegisterService(RpcService* service) { } bool Server::RegisterAsyncService(AsynchronousService* service) { - GPR_ASSERT(service->dispatch_impl_ == nullptr && + GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); - service->dispatch_impl_ = this; + service->server_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { void* tag = grpc_server_register_method(server_, service->method_names_[i], @@ -328,141 +292,87 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { +void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; - size_t nops = MAX_OPS; - grpc_op ops[MAX_OPS]; - buf->FillOps(ops, &nops); + size_t nops = 0; + grpc_op cops[MAX_OPS]; + ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), ops, nops, buf)); + grpc_call_start_batch(call->call(), cops, nops, ops)); } -class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { - public: - AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, - grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : tag_(tag), - request_(request), - stream_(stream), - call_cq_(call_cq), - ctx_(ctx), - generic_ctx_(nullptr), - server_(server), - call_(nullptr), - payload_(nullptr) { - memset(&array_, 0, sizeof(array_)); - grpc_call_details_init(&call_details_); - GPR_ASSERT(notification_cq); - GPR_ASSERT(call_cq); - grpc_server_request_registered_call( - server->server_, registered_method, &call_, &call_details_.deadline, - &array_, request ? &payload_ : nullptr, call_cq->cq(), - notification_cq->cq(), this); - } +Server::BaseAsyncRequest::BaseAsyncRequest( + Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + : server_(server), + context_(context), + stream_(stream), + call_cq_(call_cq), + tag_(tag), + call_(nullptr) { + memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_)); +} - AsyncRequest(Server* server, GenericServerContext* ctx, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) - : tag_(tag), - request_(nullptr), - stream_(stream), - call_cq_(call_cq), - ctx_(nullptr), - generic_ctx_(ctx), - server_(server), - call_(nullptr), - payload_(nullptr) { - memset(&array_, 0, sizeof(array_)); - grpc_call_details_init(&call_details_); - GPR_ASSERT(notification_cq); - GPR_ASSERT(call_cq); - grpc_server_request_call(server->server_, &call_, &call_details_, &array_, - call_cq->cq(), notification_cq->cq(), this); - } +Server::BaseAsyncRequest::~BaseAsyncRequest() {} - ~AsyncRequest() { - if (payload_) { - grpc_byte_buffer_destroy(payload_); +bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) { + if (*status) { + for (size_t i = 0; i < initial_metadata_array_.count; i++) { + context_->client_metadata_.insert(std::make_pair( + grpc::string(initial_metadata_array_.metadata[i].key), + grpc::string(initial_metadata_array_.metadata[i].value, + initial_metadata_array_.metadata[i].value + + initial_metadata_array_.metadata[i].value_length))); } - grpc_metadata_array_destroy(&array_); } - - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - *tag = tag_; - bool orig_status = *status; - if (*status && request_) { - if (payload_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_); - *status = - DeserializeProto(payload_, request_, server_->max_message_size_); - GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_); - } else { - *status = false; - } - } - ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_; - GPR_ASSERT(ctx); - if (*status) { - ctx->deadline_ = call_details_.deadline; - for (size_t i = 0; i < array_.count; i++) { - ctx->client_metadata_.insert(std::make_pair( - grpc::string(array_.metadata[i].key), - grpc::string( - array_.metadata[i].value, - array_.metadata[i].value + array_.metadata[i].value_length))); - } - if (generic_ctx_) { - // TODO(yangg) remove the copy here. - generic_ctx_->method_ = call_details_.method; - generic_ctx_->host_ = call_details_.host; - gpr_free(call_details_.method); - gpr_free(call_details_.host); - } - } - ctx->call_ = call_; - ctx->cq_ = call_cq_; - Call call(call_, server_, call_cq_, server_->max_message_size_); - if (orig_status && call_) { - ctx->BeginCompletionOp(&call); - } - // just the pointers inside call are copied here - stream_->BindCall(&call); - delete this; - return true; + grpc_metadata_array_destroy(&initial_metadata_array_); + context_->call_ = call_; + context_->cq_ = call_cq_; + Call call(call_, server_, call_cq_, server_->max_message_size_); + if (*status && call_) { + context_->BeginCompletionOp(&call); } + // just the pointers inside call are copied here + stream_->BindCall(&call); + *tag = tag_; + delete this; + return true; +} - private: - void* const tag_; - grpc::protobuf::Message* const request_; - ServerAsyncStreamingInterface* const stream_; - CompletionQueue* const call_cq_; - ServerContext* const ctx_; - GenericServerContext* const generic_ctx_; - Server* const server_; - grpc_call* call_; - grpc_call_details call_details_; - grpc_metadata_array array_; - grpc_byte_buffer* payload_; -}; +Server::RegisteredAsyncRequest::RegisteredAsyncRequest( + Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag) + : BaseAsyncRequest(server, context, stream, call_cq, tag) {} + +void Server::RegisteredAsyncRequest::IssueRequest( + void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq) { + grpc_server_request_registered_call( + server_->server_, registered_method, &call_, &context_->deadline_, + &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(), + this); +} -void Server::RequestAsyncCall(void* registered_method, ServerContext* context, - grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { - new AsyncRequest(this, registered_method, context, request, stream, call_cq, - notification_cq, tag); +Server::GenericAsyncRequest::GenericAsyncRequest( + Server* server, GenericServerContext* context, + ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) + : BaseAsyncRequest(server, context, stream, call_cq, tag) { + grpc_call_details_init(&call_details_); + GPR_ASSERT(notification_cq); + GPR_ASSERT(call_cq); + grpc_server_request_call(server->server_, &call_, &call_details_, + &initial_metadata_array_, call_cq->cq(), + notification_cq->cq(), this); } -void Server::RequestAsyncGenericCall(GenericServerContext* context, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) { - new AsyncRequest(this, context, stream, call_cq, notification_cq, tag); +bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) { + // TODO(yangg) remove the copy here. + static_cast<GenericServerContext*>(context_)->method_ = call_details_.method; + static_cast<GenericServerContext*>(context_)->host_ = call_details_.host; + gpr_free(call_details_.method); + gpr_free(call_details_.host); + return BaseAsyncRequest::FinalizeResult(tag, status); } void Server::ScheduleCallback() { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 6b5e41d0a8..eea9645e37 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -43,12 +43,12 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer { +class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - CompletionOp() : refs_(2), finalized_(false), cancelled_(false) { - AddServerRecvClose(&cancelled_); - } + CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {} + + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; bool CheckCancelled(CompletionQueue* cq); @@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer { grpc::mutex mu_; int refs_; bool finalized_; - bool cancelled_; + int cancelled_; }; void ServerContext::CompletionOp::Unref() { @@ -73,14 +73,19 @@ void ServerContext::CompletionOp::Unref() { bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { cq->TryPluck(this); grpc::lock_guard<grpc::mutex> g(mu_); - return finalized_ ? cancelled_ : false; + return finalized_ ? cancelled_ != 0 : false; +} + +void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { + ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops->data.recv_close_on_server.cancelled = &cancelled_; + *nops = 1; } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { - GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status)); grpc::unique_lock<grpc::mutex> lock(mu_); finalized_ = true; - if (!*status) cancelled_ = true; + if (!*status) cancelled_ = 1; if (--refs_ == 0) { lock.unlock(); delete this; |