aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-04 12:53:40 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-04 12:53:40 -0700
commit50a7a68ca2a5ade22a97502389ec1e0d4dcb0a10 (patch)
tree072e2c0feee2c064e35c8f1d9f65dd2200ca1cfe /src
parenta6ab47c75b80f85327f2e37c7c6865f1812059ba (diff)
Progress commit on fixing up C++
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc13
-rw-r--r--src/cpp/client/channel.cc11
-rw-r--r--src/cpp/client/channel.h4
-rw-r--r--src/cpp/common/call.cc9
-rw-r--r--src/cpp/proto/proto_utils.cc16
-rw-r--r--src/cpp/proto/proto_utils.h55
-rw-r--r--src/cpp/server/server.cc59
-rw-r--r--src/cpp/server/server_context.cc21
8 files changed, 55 insertions, 133 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index b0d2b5d229..d03eaddf40 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -113,6 +113,7 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::string temp =
"#include <grpc++/impl/internal_stub.h>\n"
"#include <grpc++/impl/rpc_method.h>\n"
+ "#include <grpc++/impl/proto_utils.h>\n"
"#include <grpc++/impl/service_type.h>\n"
"#include <grpc++/async_unary_call.h>\n"
"#include <grpc++/status.h>\n"
@@ -1045,8 +1046,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1055,8 +1055,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::CLIENT_STREAMING,\n"
" new ::grpc::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -1065,8 +1064,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::SERVER_STREAMING,\n"
" new ::grpc::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (BidiStreaming(method)) {
printer->Print(
*vars,
@@ -1075,8 +1073,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
" ::grpc::RpcMethod::BIDI_STREAMING,\n"
" new ::grpc::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
- " std::mem_fn(&$ns$$Service$::Service::$Method$), this),\n"
- " new $Request$, new $Response$));\n");
+ " std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
}
printer->Print("return service_;\n");
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 475a20d883..6e6278cb05 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -41,7 +41,6 @@
#include <grpc/support/slice.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@@ -75,14 +74,14 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
return Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
- buf->FillOps(ops, &nops);
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index cd239247c8..69baa419a6 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -44,7 +44,7 @@ struct grpc_channel;
namespace grpc {
class Call;
-class CallOpBuffer;
+class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class Credentials;
@@ -59,7 +59,7 @@ class Channel GRPC_FINAL : public GrpcLibrary,
virtual void *RegisterMethod(const char *method) GRPC_OVERRIDE;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) GRPC_OVERRIDE;
- virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE;
+ virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
private:
const grpc::string target_;
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
index 1068111e3f..dc3d36faa2 100644
--- a/src/cpp/common/call.cc
+++ b/src/cpp/common/call.cc
@@ -39,10 +39,10 @@
#include <grpc++/channel_interface.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
+#if 0
CallOpBuffer::CallOpBuffer()
: return_tag_(this),
send_initial_metadata_(false),
@@ -338,6 +338,7 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
}
return true;
}
+#endif
Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
: call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
@@ -349,11 +350,11 @@ Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
call_(call),
max_message_size_(max_message_size) {}
-void Call::PerformOps(CallOpBuffer* buffer) {
+void Call::PerformOps(CallOpSetInterface* ops) {
if (max_message_size_ > 0) {
- buffer->set_max_message_size(max_message_size_);
+ ops->set_max_message_size(max_message_size_);
}
- call_hook_->PerformOpsOnCall(buffer, this);
+ call_hook_->PerformOpsOnCall(ops, this);
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 7a7e73bba4..a4a37c7000 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -31,7 +31,7 @@
*
*/
-#include "src/cpp/proto/proto_utils.h"
+#include <grpc++/impl/proto_utils.h>
#include <grpc++/config.h>
#include <grpc/grpc.h>
@@ -157,15 +157,23 @@ bool SerializeProto(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp) {
return msg.SerializeToZeroCopyStream(&writer);
}
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
int max_message_size) {
- if (!buffer) return false;
+ if (!buffer) {
+ return Status(INVALID_ARGUMENT, "No payload");
+ }
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
- return msg->ParseFromCodedStream(&decoder) && decoder.ConsumedEntireMessage();
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ return Status(INVALID_ARGUMENT, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ return Status(INVALID_ARGUMENT, "Did not read entire message");
+ }
+ return Status::OK;
}
} // namespace grpc
diff --git a/src/cpp/proto/proto_utils.h b/src/cpp/proto/proto_utils.h
deleted file mode 100644
index 67a775b3ca..0000000000
--- a/src/cpp/proto/proto_utils.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
-#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
-
-#include <grpc++/config.h>
-
-struct grpc_byte_buffer;
-
-namespace grpc {
-
-// Serialize the msg into a buffer created inside the function. The caller
-// should destroy the returned buffer when done with it. If serialization fails,
-// false is returned and buffer is left unchanged.
-bool SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** buffer);
-
-// The caller keeps ownership of buffer and msg.
-bool DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
- int max_message_size);
-
-} // namespace grpc
-
-#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index dbd88c5b8c..c08506c97f 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -48,7 +48,6 @@
#include <grpc++/time.h>
#include "src/core/profiling/timers.h"
-#include "src/cpp/proto/proto_utils.h"
namespace grpc {
@@ -68,10 +67,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
in_flight_(false),
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
- RpcMethod::SERVER_STREAMING),
- has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::CLIENT_STREAMING) {
+ RpcMethod::SERVER_STREAMING) {
grpc_metadata_array_init(&request_metadata_);
}
@@ -116,7 +112,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
mrd->request_metadata_.count),
has_request_payload_(mrd->has_request_payload_),
- has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
@@ -133,35 +128,9 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
void Run() {
- std::unique_ptr<grpc::protobuf::Message> req;
- std::unique_ptr<grpc::protobuf::Message> res;
- if (has_request_payload_) {
- GRPC_TIMER_BEGIN(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- req.reset(method_->AllocateRequestProto());
- if (!DeserializeProto(request_payload_, req.get(),
- call_.max_message_size())) {
- // FIXME(yangg) deal with deserialization failure
- cq_.Shutdown();
- return;
- }
- GRPC_TIMER_END(GRPC_PTAG_PROTO_DESERIALIZE, call_.call());
- }
- if (has_response_payload_) {
- res.reset(method_->AllocateResponseProto());
- }
ctx_.BeginCompletionOp(&call_);
- auto status = method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, req.get(), res.get()));
- CallOpBuffer buf;
- if (!ctx_.sent_initial_metadata_) {
- buf.AddSendInitialMetadata(&ctx_.initial_metadata_);
- }
- if (has_response_payload_) {
- buf.AddSendMessage(*res);
- }
- buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
- call_.PerformOps(&buf);
- cq_.Pluck(&buf); /* status ignored */
+ method_->handler()->RunHandler(
+ MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_, call_.max_message_size()));
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
@@ -173,7 +142,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
Call call_;
ServerContext ctx_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_byte_buffer* request_payload_;
RpcServiceMethod* const method_;
};
@@ -183,7 +151,6 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
- const bool has_response_payload_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
@@ -251,9 +218,9 @@ bool Server::RegisterService(RpcService* service) {
}
bool Server::RegisterAsyncService(AsynchronousService* service) {
- GPR_ASSERT(service->dispatch_impl_ == nullptr &&
+ GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
- service->dispatch_impl_ = this;
+ service->server_ = this;
service->request_args_ = new void*[service->method_count_];
for (size_t i = 0; i < service->method_count_; ++i) {
void* tag = grpc_server_register_method(server_, service->method_names_[i],
@@ -318,15 +285,16 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpBuffer* buf, Call* call) {
+void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
static const size_t MAX_OPS = 8;
- size_t nops = MAX_OPS;
- grpc_op ops[MAX_OPS];
- buf->FillOps(ops, &nops);
+ size_t nops = 0;
+ grpc_op cops[MAX_OPS];
+ ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), ops, nops, buf));
+ grpc_call_start_batch(call->call(), cops, nops, ops));
}
+#if 0
class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
public:
AsyncRequest(Server* server, void* registered_method, ServerContext* ctx,
@@ -352,9 +320,7 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
notification_cq->cq(), this);
}
- AsyncRequest(Server* server, GenericServerContext* ctx,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq, void* tag)
+ AsyncRequest()
: tag_(tag),
request_(nullptr),
stream_(stream),
@@ -454,6 +420,7 @@ void Server::RequestAsyncGenericCall(GenericServerContext* context,
void* tag) {
new AsyncRequest(this, context, stream, call_cq, notification_cq, tag);
}
+#endif
void Server::ScheduleCallback() {
{
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 6b5e41d0a8..eea9645e37 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -43,12 +43,12 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
+class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
- CompletionOp() : refs_(2), finalized_(false), cancelled_(false) {
- AddServerRecvClose(&cancelled_);
- }
+ CompletionOp() : refs_(2), finalized_(false), cancelled_(0) {}
+
+ void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
bool CheckCancelled(CompletionQueue* cq);
@@ -59,7 +59,7 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpBuffer {
grpc::mutex mu_;
int refs_;
bool finalized_;
- bool cancelled_;
+ int cancelled_;
};
void ServerContext::CompletionOp::Unref() {
@@ -73,14 +73,19 @@ void ServerContext::CompletionOp::Unref() {
bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
cq->TryPluck(this);
grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ : false;
+ return finalized_ ? cancelled_ != 0 : false;
+}
+
+void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
+ ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops->data.recv_close_on_server.cancelled = &cancelled_;
+ *nops = 1;
}
bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
- GPR_ASSERT(CallOpBuffer::FinalizeResult(tag, status));
grpc::unique_lock<grpc::mutex> lock(mu_);
finalized_ = true;
- if (!*status) cancelled_ = true;
+ if (!*status) cancelled_ = 1;
if (--refs_ == 0) {
lock.unlock();
delete this;