diff options
author | 2015-06-04 12:53:40 -0700 | |
---|---|---|
committer | 2015-06-04 12:53:40 -0700 | |
commit | 50a7a68ca2a5ade22a97502389ec1e0d4dcb0a10 (patch) | |
tree | 072e2c0feee2c064e35c8f1d9f65dd2200ca1cfe /src | |
parent | a6ab47c75b80f85327f2e37c7c6865f1812059ba (diff) |
Progress commit on fixing up C++
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/cpp_generator.cc | 13 | ||||
-rw-r--r-- | src/cpp/client/channel.cc | 11 | ||||
-rw-r--r-- | src/cpp/client/channel.h | 4 | ||||
-rw-r--r-- | src/cpp/common/call.cc | 9 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.cc | 16 | ||||
-rw-r--r-- | src/cpp/proto/proto_utils.h | 55 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 59 | ||||
-rw-r--r-- | src/cpp/server/server_context.cc | 21 |
8 files changed, 55 insertions, 133 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b0d2b5d229..d03eaddf40 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -113,6 +113,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file, grpc::string temp = "#include <grpc++/impl/internal_stub.h>\n" "#include <grpc++/impl/rpc_method.h>\n" + "#include <grpc++/impl/proto_utils.h>\n" "#include <grpc++/impl/service_type.h>\n" "#include <grpc++/async_unary_call.h>\n" "#include <grpc++/status.h>\n" @@ -1045,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, " "$Request$, " "$Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ClientOnlyStreaming(method)) { printer->Print( *vars, @@ -1055,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::CLIENT_STREAMING,\n" " new ::grpc::ClientStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (ServerOnlyStreaming(method)) { printer->Print( *vars, @@ -1065,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::SERVER_STREAMING,\n" " new ::grpc::ServerStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } else if (BidiStreaming(method)) { printer->Print( *vars, @@ -1075,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer, " ::grpc::RpcMethod::BIDI_STREAMING,\n" " new ::grpc::BidiStreamingHandler< " "$ns$$Service$::Service, $Request$, $Response$>(\n" - " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n" - " new $Request$, new $Response$));\n"); + " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n"); } } printer->Print("return service_;\n"); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 475a20d883..6e6278cb05 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -41,7 +41,6 @@ #include <grpc/support/slice.h> #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" #include <grpc++/channel_arguments.h> #include <grpc++/client_context.h> #include <grpc++/completion_queue.h> @@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, return Call(c_call, this, cq); } -void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { +void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; - size_t nops = MAX_OPS; - grpc_op ops[MAX_OPS]; + size_t nops = 0; + grpc_op cops[MAX_OPS]; GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); - buf->FillOps(ops, &nops); + ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), ops, nops, buf)); + grpc_call_start_batch(call->call(), cops, nops, ops)); GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call()); } diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h index cd239247c8..69baa419a6 100644 --- a/src/cpp/client/channel.h +++ b/src/cpp/client/channel.h @@ -44,7 +44,7 @@ struct grpc_channel; namespace grpc { class Call; -class CallOpBuffer; +class CallOpSetInterface; class ChannelArguments; class CompletionQueue; class Credentials; @@ -59,7 +59,7 @@ class Channel GRPC_FINAL : public GrpcLibrary, virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE; virtual Call CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) GRPC_OVERRIDE; - virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; private: const grpc::string target_; diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc index 1068111e3f..dc3d36faa2 100644 --- a/src/cpp/common/call.cc +++ b/src/cpp/common/call.cc @@ -39,10 +39,10 @@ #include <grpc++/channel_interface.h> #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" namespace grpc { +#if 0 CallOpBuffer::CallOpBuffer() : return_tag_(this), send_initial_metadata_(false), @@ -338,6 +338,7 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) { } return true; } +#endif Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq) : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {} @@ -349,11 +350,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq, call_(call), max_message_size_(max_message_size) {} -void Call::PerformOps(CallOpBuffer* buffer) { +void Call::PerformOps(CallOpSetInterface* ops) { if (max_message_size_ > 0) { - buffer->set_max_message_size(max_message_size_); + ops->set_max_message_size(max_message_size_); } - call_hook_->PerformOpsOnCall(buffer, this); + call_hook_->PerformOpsOnCall(ops, this); } } // namespace grpc diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 7a7e73bba4..a4a37c7000 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -31,7 +31,7 @@ * */ -#include "src/cpp/proto/proto_utils.h" +#include <grpc++/impl/proto_utils.h> #include <grpc++/config.h> #include <grpc/grpc.h> @@ -157,15 +157,23 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) { return msg.SerializeToZeroCopyStream(&writer); } -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) { - if (!buffer) return false; + if (!buffer) { + return Status(INVALID_ARGUMENT, "No payload"); + } GrpcBufferReader reader(buffer); ::grpc::protobuf::io::CodedInputStream decoder(&reader); if (max_message_size > 0) { decoder.SetTotalBytesLimit(max_message_size, max_message_size); } - return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage(); + if (!msg->ParseFromCodedStream(&decoder)) { + return Status(INVALID_ARGUMENT, msg->InitializationErrorString()); + } + if (!decoder.ConsumedEntireMessage()) { + return Status(INVALID_ARGUMENT, "Did not read entire message"); + } + return Status::OK; } } // namespace grpc diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h deleted file mode 100644 index 67a775b3ca..0000000000 --- a/src/cpp/proto/proto_utils.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H -#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H - -#include <grpc++/config.h> - -struct grpc_byte_buffer; - -namespace grpc { - -// Serialize the msg into a buffer created inside the function. The caller -// should destroy the returned buffer when done with it. If serialization fails, -// false is returned and buffer is left unchanged. -bool SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** buffer); - -// The caller keeps ownership of buffer and msg. -bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, - int max_message_size); - -} // namespace grpc - -#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index dbd88c5b8c..c08506c97f 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -48,7 +48,6 @@ #include <grpc++/time.h> #include "src/core/profiling/timers.h" -#include "src/cpp/proto/proto_utils.h" namespace grpc { @@ -68,10 +67,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { in_flight_(false), has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || method->method_type() == - RpcMethod::SERVER_STREAMING), - has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC || - method->method_type() == - RpcMethod::CLIENT_STREAMING) { + RpcMethod::SERVER_STREAMING) { grpc_metadata_array_init(&request_metadata_); } @@ -116,7 +112,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { ctx_(mrd->deadline_, mrd->request_metadata_.metadata, mrd->request_metadata_.count), has_request_payload_(mrd->has_request_payload_), - has_response_payload_(mrd->has_response_payload_), request_payload_(mrd->request_payload_), method_(mrd->method_) { ctx_.call_ = mrd->call_; @@ -133,35 +128,9 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } void Run() { - std::unique_ptr<grpc::protobuf::Message> req; - std::unique_ptr<grpc::protobuf::Message> res; - if (has_request_payload_) { - GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call()); - req.reset(method_->AllocateRequestProto()); - if (!DeserializeProto(request_payload_, req.get(), - call_.max_message_size())) { - // FIXME(yangg) deal with deserialization failure - cq_.Shutdown(); - return; - } - GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call()); - } - if (has_response_payload_) { - res.reset(method_->AllocateResponseProto()); - } ctx_.BeginCompletionOp(&call_); - auto status = method_->handler()->RunHandler( - MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get())); - CallOpBuffer buf; - if (!ctx_.sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_.initial_metadata_); - } - if (has_response_payload_) { - buf.AddSendMessage(*res); - } - buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); - call_.PerformOps(&buf); - cq_.Pluck(&buf); /* status ignored */ + method_->handler()->RunHandler( + MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_, call_.max_message_size())); void* ignored_tag; bool ignored_ok; cq_.Shutdown(); @@ -173,7 +142,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { Call call_; ServerContext ctx_; const bool has_request_payload_; - const bool has_response_payload_; grpc_byte_buffer* request_payload_; RpcServiceMethod* const method_; }; @@ -183,7 +151,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { void* const tag_; bool in_flight_; const bool has_request_payload_; - const bool has_response_payload_; grpc_call* call_; gpr_timespec deadline_; grpc_metadata_array request_metadata_; @@ -251,9 +218,9 @@ bool Server::RegisterService(RpcService* service) { } bool Server::RegisterAsyncService(AsynchronousService* service) { - GPR_ASSERT(service->dispatch_impl_ == nullptr && + GPR_ASSERT(service->server_ == nullptr && "Can only register an asynchronous service against one server."); - service->dispatch_impl_ = this; + service->server_ = this; service->request_args_ = new void*[service->method_count_]; for (size_t i = 0; i < service->method_count_; ++i) { void* tag = grpc_server_register_method(server_, service->method_names_[i], @@ -318,15 +285,16 @@ void Server::Wait() { } } -void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) { +void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; - size_t nops = MAX_OPS; - grpc_op ops[MAX_OPS]; - buf->FillOps(ops, &nops); + size_t nops = 0; + grpc_op cops[MAX_OPS]; + ops->FillOps(cops, &nops); GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call->call(), ops, nops, buf)); + grpc_call_start_batch(call->call(), cops, nops, ops)); } +#if 0 class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { public: AsyncRequest(Server* server, void* registered_method, ServerContext* ctx, @@ -352,9 +320,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag { notification_cq->cq(), this); } - AsyncRequest(Server* server, GenericServerContext* ctx, - ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, void* tag) + AsyncRequest() : tag_(tag), request_(nullptr), stream_(stream), @@ -454,6 +420,7 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context, void* tag) { new AsyncRequest(this, context, stream, call_cq, notification_cq, tag); } +#endif void Server::ScheduleCallback() { { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 6b5e41d0a8..eea9645e37 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -43,12 +43,12 @@ namespace grpc { // CompletionOp -class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer { +class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface { public: // initial refs: one in the server context, one in the cq - CompletionOp() : refs_(2), finalized_(false), cancelled_(false) { - AddServerRecvClose(&cancelled_); - } + CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {} + + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE; bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; bool CheckCancelled(CompletionQueue* cq); @@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer { grpc::mutex mu_; int refs_; bool finalized_; - bool cancelled_; + int cancelled_; }; void ServerContext::CompletionOp::Unref() { @@ -73,14 +73,19 @@ void ServerContext::CompletionOp::Unref() { bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) { cq->TryPluck(this); grpc::lock_guard<grpc::mutex> g(mu_); - return finalized_ ? cancelled_ : false; + return finalized_ ? cancelled_ != 0 : false; +} + +void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { + ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + ops->data.recv_close_on_server.cancelled = &cancelled_; + *nops = 1; } bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) { - GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status)); grpc::unique_lock<grpc::mutex> lock(mu_); finalized_ = true; - if (!*status) cancelled_ = true; + if (!*status) cancelled_ = 1; if (--refs_ == 0) { lock.unlock(); delete this; |