aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/config.h7
-rw-r--r--src/compiler/cpp_generator.cc194
-rw-r--r--src/cpp/client/channel.cc11
-rw-r--r--src/cpp/client/channel.h10
-rw-r--r--src/cpp/client/client_unary_call.cc64
-rw-r--r--src/cpp/common/call.cc307
-rw-r--r--src/cpp/proto/proto_utils.cc26
-rw-r--r--src/cpp/proto/proto_utils.h55
-rw-r--r--src/cpp/server/server.cc240
-rw-r--r--src/cpp/server/server_context.cc21
10 files changed, 226 insertions, 709 deletions
diff --git a/src/compiler/config.h b/src/compiler/config.h
index e81de8d6c8..cd52aca57d 100644
--- a/src/compiler/config.h
+++ b/src/compiler/config.h
@@ -35,6 +35,7 @@
#define SRC_COMPILER_CONFIG_H
#include <grpc++/config.h>
+#include <grpc++/config_protobuf.h>
#ifndef GRPC_CUSTOM_DESCRIPTOR
#include <google/protobuf/descriptor.h>
@@ -48,7 +49,8 @@
#ifndef GRPC_CUSTOM_CODEGENERATOR
#include <google/protobuf/compiler/code_generator.h>
#define GRPC_CUSTOM_CODEGENERATOR ::google::protobuf::compiler::CodeGenerator
-#define GRPC_CUSTOM_GENERATORCONTEXT ::google::protobuf::compiler::GeneratorContext
+#define GRPC_CUSTOM_GENERATORCONTEXT \
+ ::google::protobuf::compiler::GeneratorContext
#endif
#ifndef GRPC_CUSTOM_PRINTER
@@ -57,7 +59,8 @@
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#define GRPC_CUSTOM_PRINTER ::google::protobuf::io::Printer
#define GRPC_CUSTOM_CODEDOUTPUTSTREAM ::google::protobuf::io::CodedOutputStream
-#define GRPC_CUSTOM_STRINGOUTPUTSTREAM ::google::protobuf::io::StringOutputStream
+#define GRPC_CUSTOM_STRINGOUTPUTSTREAM \
+ ::google::protobuf::io::StringOutputStream
#endif
#ifndef GRPC_CUSTOM_PLUGINMAIN
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 6cd615019b..75659947df 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -97,7 +97,8 @@ grpc::string GetHeaderPrologue(const grpc::protobuf::FileDescriptor *file,
vars["filename_base"] = grpc_generator::StripProto(file->name());
printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
- printer.Print(vars, "// If you make any local change, they will be lost.\n");
+ printer.Print(vars,
+ "// If you make any local change, they will be lost.\n");
printer.Print(vars, "// source: $filename$\n");
printer.Print(vars, "#ifndef GRPC_$filename_identifier$__INCLUDED\n");
printer.Print(vars, "#define GRPC_$filename_identifier$__INCLUDED\n");
@@ -113,6 +114,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
+ "#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
@@ -141,10 +143,10 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
return temp;
}
-void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
- const grpc::protobuf::MethodDescriptor *method,
- std::map<grpc::string, grpc::string> *vars,
- bool is_public) {
+void PrintHeaderClientMethodInterfaces(
+ grpc::protobuf::io::Printer *printer,
+ const grpc::protobuf::MethodDescriptor *method,
+ std::map<grpc::string, grpc::string> *vars, bool is_public) {
(*vars)["Method"] = method->name();
(*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true);
@@ -157,19 +159,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
*vars,
"virtual ::grpc::Status $Method$(::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) = 0;\n");
- printer->Print(
- *vars,
- "std::unique_ptr< "
- "::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
- "Async$Method$(::grpc::ClientContext* context, "
- "const $Request$& request, "
- "::grpc::CompletionQueue* cq) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< "
+ "::grpc::ClientAsyncResponseReaderInterface< $Response$>> "
+ "Async$Method$(::grpc::ClientContext* context, "
+ "const $Request$& request, "
+ "::grpc::CompletionQueue* cq) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
- "Async$Method$Raw(context, request, cq));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncResponseReaderInterface< $Response$>>("
+ "Async$Method$Raw(context, request, cq));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
@@ -188,14 +188,14 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
printer->Print(
*vars,
"std::unique_ptr< ::grpc::ClientAsyncWriterInterface< $Request$>>"
- " Async$Method$(::grpc::ClientContext* context, $Response$* response, "
+ " Async$Method$(::grpc::ClientContext* context, $Response$* "
+ "response, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncWriterInterface< $Request$>>("
- "Async$Method$Raw(context, response, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncWriterInterface< $Request$>>("
+ "Async$Method$Raw(context, response, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ServerOnlyStreaming(method)) {
@@ -218,18 +218,17 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncReaderInterface< $Response$>>("
- "Async$Method$Raw(context, request, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncReaderInterface< $Response$>>("
+ "Async$Method$Raw(context, request, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
} else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "std::unique_ptr< ::grpc::ClientReaderWriterInterface< $Request$, $Response$>> "
- "$Method$(::grpc::ClientContext* context) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< ::grpc::ClientReaderWriterInterface< "
+ "$Request$, $Response$>> "
+ "$Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
printer->Print(
*vars,
@@ -267,12 +266,11 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"virtual ::grpc::ClientWriterInterface< $Request$>*"
" $Method$Raw("
"::grpc::ClientContext* context, $Response$* response) = 0;\n");
- printer->Print(
- *vars,
- "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*"
- " Async$Method$Raw(::grpc::ClientContext* context, "
- "$Response$* response, "
- "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
+ printer->Print(*vars,
+ "virtual ::grpc::ClientAsyncWriterInterface< $Request$>*"
+ " Async$Method$Raw(::grpc::ClientContext* context, "
+ "$Response$* response, "
+ "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -285,16 +283,15 @@ void PrintHeaderClientMethodInterfaces(grpc::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq, void* tag) = 0;\n");
} else if (BidiStreaming(method)) {
- printer->Print(
- *vars,
- "virtual ::grpc::ClientReaderWriterInterface< $Request$, $Response$>* "
- "$Method$Raw(::grpc::ClientContext* context) = 0;\n");
- printer->Print(
- *vars,
- "virtual ::grpc::ClientAsyncReaderWriterInterface< "
- "$Request$, $Response$>* "
- "Async$Method$Raw(::grpc::ClientContext* context, "
- "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
+ printer->Print(*vars,
+ "virtual ::grpc::ClientReaderWriterInterface< $Request$, "
+ "$Response$>* "
+ "$Method$Raw(::grpc::ClientContext* context) = 0;\n");
+ printer->Print(*vars,
+ "virtual ::grpc::ClientAsyncReaderWriterInterface< "
+ "$Request$, $Response$>* "
+ "Async$Method$Raw(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue* cq, void* tag) = 0;\n");
}
}
}
@@ -321,11 +318,10 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncResponseReader< $Response$>>("
- "Async$Method$Raw(context, request, cq));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncResponseReader< $Response$>>("
+ "Async$Method$Raw(context, request, cq));\n");
printer->Outdent();
printer->Print("}\n");
} else if (ClientOnlyStreaming(method)) {
@@ -335,17 +331,16 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
" $Method$("
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< ::grpc::ClientWriter< $Request$>>"
- "($Method$Raw(context, response));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< ::grpc::ClientWriter< $Request$>>"
+ "($Method$Raw(context, response));\n");
printer->Outdent();
printer->Print("}\n");
- printer->Print(
- *vars,
- "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>"
- " Async$Method$(::grpc::ClientContext* context, $Response$* response, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< ::grpc::ClientAsyncWriter< $Request$>>"
+ " Async$Method$(::grpc::ClientContext* context, "
+ "$Response$* response, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
printer->Print(
*vars,
@@ -385,53 +380,47 @@ void PrintHeaderClientMethod(grpc::protobuf::io::Printer *printer,
"std::unique_ptr< ::grpc::ClientReaderWriter< $Request$, $Response$>>"
" $Method$(::grpc::ClientContext* context) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientReaderWriter< $Request$, $Response$>>("
- "$Method$Raw(context));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientReaderWriter< $Request$, $Response$>>("
+ "$Method$Raw(context));\n");
printer->Outdent();
printer->Print("}\n");
- printer->Print(
- *vars,
- "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
- "$Request$, $Response$>> "
- "Async$Method$(::grpc::ClientContext* context, "
- "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print(*vars,
+ "std::unique_ptr< ::grpc::ClientAsyncReaderWriter< "
+ "$Request$, $Response$>> "
+ "Async$Method$(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
printer->Indent();
- printer->Print(
- *vars,
- "return std::unique_ptr< "
- "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>("
- "Async$Method$Raw(context, cq, tag));\n");
+ printer->Print(*vars,
+ "return std::unique_ptr< "
+ "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>>("
+ "Async$Method$Raw(context, cq, tag));\n");
printer->Outdent();
printer->Print("}\n");
}
} else {
if (NoStreaming(method)) {
- printer->Print(
- *vars,
- "::grpc::ClientAsyncResponseReader< $Response$>* "
- "Async$Method$Raw(::grpc::ClientContext* context, "
- "const $Request$& request, "
- "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
+ printer->Print(*vars,
+ "::grpc::ClientAsyncResponseReader< $Response$>* "
+ "Async$Method$Raw(::grpc::ClientContext* context, "
+ "const $Request$& request, "
+ "::grpc::CompletionQueue* cq) GRPC_OVERRIDE;\n");
} else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "::grpc::ClientWriter< $Request$>* $Method$Raw("
- "::grpc::ClientContext* context, $Response$* response) "
- "GRPC_OVERRIDE;\n");
+ printer->Print(*vars,
+ "::grpc::ClientWriter< $Request$>* $Method$Raw("
+ "::grpc::ClientContext* context, $Response$* response) "
+ "GRPC_OVERRIDE;\n");
printer->Print(
*vars,
"::grpc::ClientAsyncWriter< $Request$>* Async$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq, void* tag) GRPC_OVERRIDE;\n");
} else if (ServerOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "::grpc::ClientReader< $Response$>* $Method$Raw("
- "::grpc::ClientContext* context, const $Request$& request)"
- " GRPC_OVERRIDE;\n");
+ printer->Print(*vars,
+ "::grpc::ClientReader< $Response$>* $Method$Raw("
+ "::grpc::ClientContext* context, const $Request$& request)"
+ " GRPC_OVERRIDE;\n");
printer->Print(
*vars,
"::grpc::ClientAsyncReader< $Response$>* Async$Method$Raw("
@@ -629,7 +618,7 @@ grpc::string GetHeaderServices(const grpc::protobuf::FileDescriptor *file,
const Parameters &params) {
grpc::string output;
{
- // Scope the output stream so it closes and finalizes output to the string.
+ // Scope the output stream so it closes and finalizes output to the string.
grpc::protobuf::io::StringOutputStream output_stream(&output);
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
@@ -693,7 +682,8 @@ grpc::string GetSourcePrologue(const grpc::protobuf::FileDescriptor *file,
vars["filename_base"] = grpc_generator::StripProto(file->name());
printer.Print(vars, "// Generated by the gRPC protobuf plugin.\n");
- printer.Print(vars, "// If you make any local change, they will be lost.\n");
+ printer.Print(vars,
+ "// If you make any local change, they will be lost.\n");
printer.Print(vars, "// source: $filename$\n\n");
printer.Print(vars, "#include \"$filename_base$.pb.h\"\n");
printer.Print(vars, "#include \"$filename_base$.grpc.pb.h\"\n");
@@ -1056,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1066,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1076,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
@@ -1086,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
printer->Print("return service_;\n");
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 475a20d883..6e6278cb05 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -41,7 +41,6 @@
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
return Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
- buf->FillOps(ops, &nops);
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index cd239247c8..9108713c58 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -44,22 +44,22 @@ struct grpc_channel;
namespace grpc {
class Call;
-class CallOpBuffer;
+class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
class StreamContextInterface;
-class Channel GRPC_FINAL : public GrpcLibrary,
- public ChannelInterface {
+class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
public:
Channel(const grpc::string& target, grpc_channel* c_channel);
~Channel() GRPC_OVERRIDE;
- virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
+ virtual void* RegisterMethod(const char* method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops,
+ Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;
diff --git a/src/cpp/client/client_unary_call.cc b/src/cpp/client/client_unary_call.cc
deleted file mode 100644
index 55e589306f..0000000000
--- a/src/cpp/client/client_unary_call.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc++/impl/client_unary_call.h>
-#include <grpc++/impl/call.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/client_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/status.h>
-#include <grpc/support/log.h>
-
-namespace grpc {
-
-// Wrapper that performs a blocking unary call
-Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context,
- const grpc::protobuf::Message& request,
- grpc::protobuf::Message* result) {
- CompletionQueue cq;
- Call call(channel->CreateCall(method, context, &cq));
- CallOpBuffer buf;
- Status status;
- buf.AddSendInitialMetadata(context);
- buf.AddSendMessage(request);
- buf.AddRecvInitialMetadata(context);
- buf.AddRecvMessage(result);
- buf.AddClientSendClose();
- buf.AddClientRecvStatus(context, &status);
- call.PerformOps(&buf);
- GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.ok());
- return status;
-}
-
-} // namespace grpc
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index edce6396bd..0a5c976e01 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -39,107 +39,32 @@
#include <grpc++/channel_interface.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
-CallOpBuffer::CallOpBuffer()
- : return_tag_(this),
- send_initial_metadata_(false),
- initial_metadata_count_(0),
- initial_metadata_(nullptr),
- recv_initial_metadata_(nullptr),
- send_message_(nullptr),
- send_message_buffer_(nullptr),
- send_buf_(nullptr),
- recv_message_(nullptr),
- recv_message_buffer_(nullptr),
- recv_buf_(nullptr),
- max_message_size_(-1),
- client_send_close_(false),
- recv_trailing_metadata_(nullptr),
- recv_status_(nullptr),
- status_code_(GRPC_STATUS_OK),
- status_details_(nullptr),
- status_details_capacity_(0),
- send_status_available_(false),
- send_status_code_(GRPC_STATUS_OK),
- trailing_metadata_count_(0),
- trailing_metadata_(nullptr),
- cancelled_buf_(0),
- recv_closed_(nullptr) {
- memset(&recv_trailing_metadata_arr_, 0, sizeof(recv_trailing_metadata_arr_));
- memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_));
- recv_trailing_metadata_arr_.metadata = nullptr;
- recv_initial_metadata_arr_.metadata = nullptr;
-}
-
-void CallOpBuffer::Reset(void* next_return_tag) {
- return_tag_ = next_return_tag;
-
- send_initial_metadata_ = false;
- initial_metadata_count_ = 0;
- gpr_free(initial_metadata_);
-
- recv_initial_metadata_ = nullptr;
- recv_initial_metadata_arr_.count = 0;
-
- if (send_buf_ && send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
- }
- send_message_ = nullptr;
- send_message_buffer_ = nullptr;
- send_buf_ = nullptr;
-
- got_message = false;
- if (recv_buf_ && recv_message_) {
- grpc_byte_buffer_destroy(recv_buf_);
- }
- recv_message_ = nullptr;
- recv_message_buffer_ = nullptr;
- recv_buf_ = nullptr;
-
- client_send_close_ = false;
-
- recv_trailing_metadata_ = nullptr;
- recv_status_ = nullptr;
- recv_trailing_metadata_arr_.count = 0;
-
- status_code_ = GRPC_STATUS_OK;
-
- send_status_available_ = false;
- send_status_code_ = GRPC_STATUS_OK;
- send_status_details_.clear();
- trailing_metadata_count_ = 0;
- trailing_metadata_ = nullptr;
-
- recv_closed_ = nullptr;
-}
-
-CallOpBuffer::~CallOpBuffer() {
- gpr_free(status_details_);
- gpr_free(recv_initial_metadata_arr_.metadata);
- gpr_free(recv_trailing_metadata_arr_.metadata);
- if (recv_buf_ && recv_message_) {
- grpc_byte_buffer_destroy(recv_buf_);
- }
- if (send_buf_ && send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
+void FillMetadataMap(grpc_metadata_array* arr,
+ std::multimap<grpc::string, grpc::string>* metadata) {
+ for (size_t i = 0; i < arr->count; i++) {
+ // TODO(yangg) handle duplicates?
+ metadata->insert(std::pair<grpc::string, grpc::string>(
+ arr->metadata[i].key,
+ grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
}
+ grpc_metadata_array_destroy(arr);
+ grpc_metadata_array_init(arr);
}
-namespace {
// TODO(yangg) if the map is changed before we send, the pointers will be a
// mess. Make sure it does not happen.
grpc_metadata* FillMetadataArray(
- std::multimap<grpc::string, grpc::string>* metadata) {
- if (metadata->empty()) {
+ const std::multimap<grpc::string, grpc::string>& metadata) {
+ if (metadata.empty()) {
return nullptr;
}
grpc_metadata* metadata_array =
- (grpc_metadata*)gpr_malloc(metadata->size() * sizeof(grpc_metadata));
+ (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata));
size_t i = 0;
- for (auto iter = metadata->cbegin(); iter != metadata->cend(); ++iter, ++i) {
+ for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
metadata_array[i].key = iter->first.c_str();
metadata_array[i].value = iter->second.c_str();
metadata_array[i].value_length = iter->second.size();
@@ -147,206 +72,6 @@ grpc_metadata* FillMetadataArray(
return metadata_array;
}
-void FillMetadataMap(grpc_metadata_array* arr,
- std::multimap<grpc::string, grpc::string>* metadata) {
- for (size_t i = 0; i < arr->count; i++) {
- // TODO(yangg) handle duplicates?
- metadata->insert(std::pair<grpc::string, grpc::string>(
- arr->metadata[i].key,
- grpc::string(arr->metadata[i].value, arr->metadata[i].value_length)));
- }
- grpc_metadata_array_destroy(arr);
- grpc_metadata_array_init(arr);
-}
-} // namespace
-
-void CallOpBuffer::AddSendInitialMetadata(
- std::multimap<grpc::string, grpc::string>* metadata) {
- send_initial_metadata_ = true;
- initial_metadata_count_ = metadata->size();
- initial_metadata_ = FillMetadataArray(metadata);
-}
-
-void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) {
- ctx->initial_metadata_received_ = true;
- recv_initial_metadata_ = &ctx->recv_initial_metadata_;
-}
-
-void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
- AddSendInitialMetadata(&ctx->send_initial_metadata_);
-}
-
-void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
- send_message_ = &message;
-}
-
-void CallOpBuffer::AddSendMessage(const ByteBuffer& message) {
- send_message_buffer_ = &message;
-}
-
-void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
- recv_message_ = message;
- recv_message_->Clear();
-}
-
-void CallOpBuffer::AddRecvMessage(ByteBuffer* message) {
- recv_message_buffer_ = message;
- recv_message_buffer_->Clear();
-}
-
-void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; }
-
-void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
- recv_closed_ = cancelled;
-}
-
-void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) {
- recv_trailing_metadata_ = &context->trailing_metadata_;
- recv_status_ = status;
-}
-
-void CallOpBuffer::AddServerSendStatus(
- std::multimap<grpc::string, grpc::string>* metadata, const Status& status) {
- if (metadata != NULL) {
- trailing_metadata_count_ = metadata->size();
- trailing_metadata_ = FillMetadataArray(metadata);
- } else {
- trailing_metadata_count_ = 0;
- }
- send_status_available_ = true;
- send_status_code_ = static_cast<grpc_status_code>(status.error_code());
- send_status_details_ = status.error_message();
-}
-
-void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
- *nops = 0;
- if (send_initial_metadata_) {
- ops[*nops].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[*nops].data.send_initial_metadata.count = initial_metadata_count_;
- ops[*nops].data.send_initial_metadata.metadata = initial_metadata_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_initial_metadata_) {
- ops[*nops].op = GRPC_OP_RECV_INITIAL_METADATA;
- ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (send_message_ || send_message_buffer_) {
- if (send_message_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_SERIALIZE, 0);
- bool success = SerializeProto(*send_message_, &send_buf_);
- if (!success) {
- abort();
- // TODO handle parse failure
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_SERIALIZE, 0);
- } else {
- send_buf_ = send_message_buffer_->buffer();
- }
- ops[*nops].op = GRPC_OP_SEND_MESSAGE;
- ops[*nops].data.send_message = send_buf_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_message_ || recv_message_buffer_) {
- ops[*nops].op = GRPC_OP_RECV_MESSAGE;
- ops[*nops].data.recv_message = &recv_buf_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (client_send_close_) {
- ops[*nops].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_status_) {
- ops[*nops].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- ops[*nops].data.recv_status_on_client.trailing_metadata =
- &recv_trailing_metadata_arr_;
- ops[*nops].data.recv_status_on_client.status = &status_code_;
- ops[*nops].data.recv_status_on_client.status_details = &status_details_;
- ops[*nops].data.recv_status_on_client.status_details_capacity =
- &status_details_capacity_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (send_status_available_) {
- ops[*nops].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
- ops[*nops].data.send_status_from_server.trailing_metadata_count =
- trailing_metadata_count_;
- ops[*nops].data.send_status_from_server.trailing_metadata =
- trailing_metadata_;
- ops[*nops].data.send_status_from_server.status = send_status_code_;
- ops[*nops].data.send_status_from_server.status_details =
- send_status_details_.empty() ? nullptr : send_status_details_.c_str();
- ops[*nops].flags = 0;
- (*nops)++;
- }
- if (recv_closed_) {
- ops[*nops].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops[*nops].data.recv_close_on_server.cancelled = &cancelled_buf_;
- ops[*nops].flags = 0;
- (*nops)++;
- }
-}
-
-bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
- // Release send buffers.
- if (send_buf_ && send_message_) {
- if (send_message_) {
- grpc_byte_buffer_destroy(send_buf_);
- }
- send_buf_ = nullptr;
- }
- if (initial_metadata_) {
- gpr_free(initial_metadata_);
- initial_metadata_ = nullptr;
- }
- if (trailing_metadata_count_) {
- gpr_free(trailing_metadata_);
- trailing_metadata_ = nullptr;
- }
- // Set user-facing tag.
- *tag = return_tag_;
- // Process received initial metadata
- if (recv_initial_metadata_) {
- FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
- }
- // Parse received message if any.
- if (recv_message_ || recv_message_buffer_) {
- if (recv_buf_) {
- got_message = *status;
- if (recv_message_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, 0);
- *status = *status &&
- DeserializeProto(recv_buf_, recv_message_, max_message_size_);
- grpc_byte_buffer_destroy(recv_buf_);
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, 0);
- } else {
- recv_message_buffer_->set_buffer(recv_buf_);
- }
- recv_buf_ = nullptr;
- } else {
- // Read failed
- got_message = false;
- *status = false;
- }
- }
- // Parse received status.
- if (recv_status_) {
- FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_);
- *recv_status_ = Status(
- static_cast<StatusCode>(status_code_),
- status_details_ ? grpc::string(status_details_) : grpc::string());
- }
- if (recv_closed_) {
- *recv_closed_ = cancelled_buf_ != 0;
- }
- return true;
-}
-
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
: call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
@@ -357,11 +82,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
call_(call),
max_message_size_(max_message_size) {}
-void Call::PerformOps(CallOpBuffer* buffer) {
+void Call::PerformOps(CallOpSetInterface* ops) {
if (max_message_size_ > 0) {
- buffer->set_max_message_size(max_message_size_);
+ ops->set_max_message_size(max_message_size_);
}
- call_hook_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(ops, this);
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index f4cf5cf17a..268e4f6d1f 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -31,7 +31,7 @@
*
*/
-#include "src/cpp/proto/proto_utils.h"
+#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
@@ -67,7 +67,7 @@ class GrpcBufferWriter GRPC_FINAL
slice_ = gpr_slice_malloc(block_size_);
}
*data = GPR_SLICE_START_PTR(slice_);
- byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
gpr_slice_buffer_add(slice_buffer_, slice_);
return true;
}
@@ -118,7 +118,7 @@ class GrpcBufferReader GRPC_FINAL
}
gpr_slice_unref(slice_);
*data = GPR_SLICE_START_PTR(slice_);
- byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
return true;
}
@@ -152,20 +152,28 @@ class GrpcBufferReader GRPC_FINAL
namespace grpc {
-bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
+Status SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
GrpcBufferWriter writer(bp);
- return msg.SerializeToZeroCopyStream(&writer);
+ return msg.SerializeToZeroCopyStream(&writer) ? Status::OK : Status(INVALID_ARGUMENT, "Failed to serialize message");
}
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size) {
- if (!buffer) return false;
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size) {
+ if (!buffer) {
+ return Status(INVALID_ARGUMENT, "No payload");
+ }
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
- return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ return Status(INVALID_ARGUMENT, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ return Status(INVALID_ARGUMENT, "Did not read entire message");
+ }
+ return Status::OK;
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h
deleted file mode 100644
index 67a775b3ca..0000000000
--- a/src/cpp/proto/proto_utils.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
-#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
-
-#include <grpc++/config.h>
-
-struct grpc_byte_buffer;
-
-namespace grpc {
-
-// Serialize the msg into a buffer created inside the function. The caller
-// should destroy the returned buffer when done with it. If serialization fails,
-// false is returned and buffer is left unchanged.
-bool SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** buffer);
-
-// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size);
-
-} // namespace grpc
-
-#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 024537c34a..31b6a0ee00 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -48,7 +48,6 @@
#include <grpc++/time.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@@ -69,16 +68,11 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
- has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::CLIENT_STREAMING),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
- ~SyncRequest() {
- grpc_metadata_array_destroy(&request_metadata_);
- }
+ ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@@ -91,9 +85,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void SetupRequest() {
- cq_ = grpc_completion_queue_create();
- }
+ void SetupRequest() { cq_ = grpc_completion_queue_create(); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -125,7 +117,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
- has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
@@ -142,35 +133,10 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
void Run() {
- std::unique_ptr<grpc::protobuf::Message> req;
- std::unique_ptr<grpc::protobuf::Message> res;
- if (has_request_payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- req.reset(method_->AllocateRequestProto());
- if (!DeserializeProto(request_payload_, req.get(),
- call_.max_message_size())) {
- // FIXME(yangg) deal with deserialization failure
- cq_.Shutdown();
- return;
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- }
- if (has_response_payload_) {
- res.reset(method_->AllocateResponseProto());
- }
ctx_.BeginCompletionOp(&call_);
- auto status = method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
- CallOpBuffer buf;
- if (!ctx_.sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
- }
- if (has_response_payload_) {
- buf.AddSendMessage(*res);
- }
- buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); /* status ignored */
+ method_->handler()->RunHandler(MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_payload_, call_.max_message_size()));
+ request_payload_ = nullptr;
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
@@ -182,7 +148,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Call call_;
ServerContext ctx_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_byte_buffer* request_payload_;
RpcServiceMethod* const method_;
};
@@ -192,7 +157,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
@@ -260,9 +224,9 @@ bool Server::RegisterService(RpcService* service) {
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
- GPR_ASSERT(service->dispatch_impl_ == nullptr &&
+ GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
- service->dispatch_impl_ = this;
+ service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
@@ -328,141 +292,87 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
- buf->FillOps(ops, &nops);
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
}
-class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
- public:
- AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
- grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : tag_(tag),
- request_(request),
- stream_(stream),
- call_cq_(call_cq),
- ctx_(ctx),
- generic_ctx_(nullptr),
- server_(server),
- call_(nullptr),
- payload_(nullptr) {
- memset(&array_, 0, sizeof(array_));
- grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
- grpc_server_request_registered_call(
- server->server_, registered_method, &call_, &call_details_.deadline,
- &array_, request ? &payload_ : nullptr, call_cq->cq(),
- notification_cq->cq(), this);
- }
+Server::BaseAsyncRequest::BaseAsyncRequest(
+ Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : server_(server),
+ context_(context),
+ stream_(stream),
+ call_cq_(call_cq),
+ tag_(tag),
+ call_(nullptr) {
+ memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
+}
- AsyncRequest(Server* server, GenericServerContext* ctx,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
- : tag_(tag),
- request_(nullptr),
- stream_(stream),
- call_cq_(call_cq),
- ctx_(nullptr),
- generic_ctx_(ctx),
- server_(server),
- call_(nullptr),
- payload_(nullptr) {
- memset(&array_, 0, sizeof(array_));
- grpc_call_details_init(&call_details_);
- GPR_ASSERT(notification_cq);
- GPR_ASSERT(call_cq);
- grpc_server_request_call(server->server_, &call_, &call_details_, &array_,
- call_cq->cq(), notification_cq->cq(), this);
- }
+Server::BaseAsyncRequest::~BaseAsyncRequest() {}
- ~AsyncRequest() {
- if (payload_) {
- grpc_byte_buffer_destroy(payload_);
+bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ if (*status) {
+ for (size_t i = 0; i < initial_metadata_array_.count; i++) {
+ context_->client_metadata_.insert(std::make_pair(
+ grpc::string(initial_metadata_array_.metadata[i].key),
+ grpc::string(initial_metadata_array_.metadata[i].value,
+ initial_metadata_array_.metadata[i].value +
+ initial_metadata_array_.metadata[i].value_length)));
}
- grpc_metadata_array_destroy(&array_);
}
-
- bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- *tag = tag_;
- bool orig_status = *status;
- if (*status && request_) {
- if (payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_);
- *status =
- DeserializeProto(payload_, request_, server_->max_message_size_);
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_);
- } else {
- *status = false;
- }
- }
- ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
- GPR_ASSERT(ctx);
- if (*status) {
- ctx->deadline_ = call_details_.deadline;
- for (size_t i = 0; i < array_.count; i++) {
- ctx->client_metadata_.insert(std::make_pair(
- grpc::string(array_.metadata[i].key),
- grpc::string(
- array_.metadata[i].value,
- array_.metadata[i].value + array_.metadata[i].value_length)));
- }
- if (generic_ctx_) {
- // TODO(yangg) remove the copy here.
- generic_ctx_->method_ = call_details_.method;
- generic_ctx_->host_ = call_details_.host;
- gpr_free(call_details_.method);
- gpr_free(call_details_.host);
- }
- }
- ctx->call_ = call_;
- ctx->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_message_size_);
- if (orig_status && call_) {
- ctx->BeginCompletionOp(&call);
- }
- // just the pointers inside call are copied here
- stream_->BindCall(&call);
- delete this;
- return true;
+ grpc_metadata_array_destroy(&initial_metadata_array_);
+ context_->call_ = call_;
+ context_->cq_ = call_cq_;
+ Call call(call_, server_, call_cq_, server_->max_message_size_);
+ if (*status && call_) {
+ context_->BeginCompletionOp(&call);
}
+ // just the pointers inside call are copied here
+ stream_->BindCall(&call);
+ *tag = tag_;
+ delete this;
+ return true;
+}
- private:
- void* const tag_;
- grpc::protobuf::Message* const request_;
- ServerAsyncStreamingInterface* const stream_;
- CompletionQueue* const call_cq_;
- ServerContext* const ctx_;
- GenericServerContext* const generic_ctx_;
- Server* const server_;
- grpc_call* call_;
- grpc_call_details call_details_;
- grpc_metadata_array array_;
- grpc_byte_buffer* payload_;
-};
+Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
+ Server* server, ServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag) {}
+
+void Server::RegisteredAsyncRequest::IssueRequest(
+ void* registered_method, grpc_byte_buffer** payload,
+ ServerCompletionQueue* notification_cq) {
+ grpc_server_request_registered_call(
+ server_->server_, registered_method, &call_, &context_->deadline_,
+ &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
+ this);
+}
-void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
- grpc::protobuf::Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- new AsyncRequest(this, registered_method, context, request, stream, call_cq,
- notification_cq, tag);
+Server::GenericAsyncRequest::GenericAsyncRequest(
+ Server* server, GenericServerContext* context,
+ ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag)
+ : BaseAsyncRequest(server, context, stream, call_cq, tag) {
+ grpc_call_details_init(&call_details_);
+ GPR_ASSERT(notification_cq);
+ GPR_ASSERT(call_cq);
+ grpc_server_request_call(server->server_, &call_, &call_details_,
+ &initial_metadata_array_, call_cq->cq(),
+ notification_cq->cq(), this);
}
-void Server::RequestAsyncGenericCall(GenericServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
- new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
+bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
+ // TODO(yangg) remove the copy here.
+ static_cast<GenericServerContext*>(context_)->method_ = call_details_.method;
+ static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
+ gpr_free(call_details_.method);
+ gpr_free(call_details_.host);
+ return BaseAsyncRequest::FinalizeResult(tag, status);
}
void Server::ScheduleCallback() {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 6b5e41d0a8..eea9645e37 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -43,12 +43,12 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
+class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : refs_(2), finalized_(false), cancelled_(false) {
- AddServerRecvClose(&cancelled_);
- }
+ CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
+
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
@@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
grpc::mutex mu_;
int refs_;
bool finalized_;
- bool cancelled_;
+ int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
@@ -73,14 +73,19 @@ void ServerContext::CompletionOp::Unref() {
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ : false;
+ return finalized_ ? cancelled_ != 0 : false;
+}
+
+void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
+ ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops->data.recv_close_on_server.cancelled = &cancelled_;
+ *nops = 1;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
- if (!*status) cancelled_ = true;
+ if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;