aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++/impl
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-04 12:53:40 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-04 12:53:40 -0700
commit50a7a68ca2a5ade22a97502389ec1e0d4dcb0a10 (patch)
tree072e2c0feee2c064e35c8f1d9f65dd2200ca1cfe /include/grpc++/impl
parenta6ab47c75b80f85327f2e37c7c6865f1812059ba (diff)
Progress commit on fixing up C++
Diffstat (limited to 'include/grpc++/impl')
-rw-r--r--include/grpc++/impl/call.h109
-rw-r--r--include/grpc++/impl/client_unary_call.h12
-rw-r--r--include/grpc++/impl/proto_utils.h70
-rw-r--r--include/grpc++/impl/rpc_service_method.h101
-rw-r--r--include/grpc++/impl/serialization_traits.h5
5 files changed, 225 insertions, 72 deletions
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index f8b290a851..3701e403de 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -38,6 +38,7 @@
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
+#include <grpc++/impl/serialization_traits.h>
#include <memory>
#include <map>
@@ -53,7 +54,7 @@ class Call;
class CallNoOp {
protected:
void AddOp(grpc_op* ops, size_t* nops) {}
- void FinishOp(void* tag, bool* status) {}
+ void FinishOp(void* tag, bool* status, int max_message_size) {}
};
class CallOpSendInitialMetadata {
@@ -62,24 +63,71 @@ class CallOpSendInitialMetadata {
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpSendMessage {
public:
+ CallOpSendMessage() : send_buf_(nullptr) {}
+
template <class M>
- void SendMessage(const M& message);
+ bool SendMessage(const M& message) {
+ return SerializationTraits<M>::Serialize(message, &send_buf_);
+ }
protected:
- void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (send_buf_ == nullptr) return;
+ grpc_op* op = &ops[(*nops)++];
+ op->op = GRPC_OP_SEND_MESSAGE;
+ op->data.send_message = send_buf_;
+ }
+ void FinishOp(void* tag, bool* status, int max_message_size) {
+ grpc_byte_buffer_destroy(send_buf_);
+ }
+
+ private:
+ grpc_byte_buffer* send_buf_;
};
-template <class M>
+template <class R>
class CallOpRecvMessage {
+ public:
+ CallOpRecvMessage() : got_message(false), message_(nullptr) {}
+
+ void RecvMessage(R* message) {
+ message_ = message;
+ }
+
+ bool got_message;
+
protected:
- void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void AddOp(grpc_op* ops, size_t* nops) {
+ if (message_ == nullptr) return;
+ grpc_op *op = &ops[(*nops)++];
+ op->op = GRPC_OP_RECV_MESSAGE;
+ op->data.recv_message = &recv_buf_;
+ }
+
+ void FinishOp(void* tag, bool* status, int max_message_size) {
+ if (message_ == nullptr) return;
+ if (recv_buf_) {
+ if (*status) {
+ got_message = true;
+ *status = SerializationTraits<R>::Deserialize(recv_buf_, message_, max_message_size).IsOk();
+ } else {
+ got_message = false;
+ grpc_byte_buffer_destroy(recv_buf_);
+ }
+ } else {
+ got_message = false;
+ *status = false;
+ }
+ }
+
+ private:
+ R* message_;
+ grpc_byte_buffer* recv_buf_;
};
class CallOpGenericRecvMessage {
@@ -87,9 +135,11 @@ class CallOpGenericRecvMessage {
template <class R>
void RecvMessage(R* message);
+ bool got_message;
+
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpClientSendClose {
@@ -98,7 +148,7 @@ class CallOpClientSendClose {
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpServerSendStatus {
@@ -107,7 +157,7 @@ class CallOpServerSendStatus {
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpRecvInitialMetadata {
@@ -116,7 +166,7 @@ class CallOpRecvInitialMetadata {
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpClientRecvStatus {
@@ -125,12 +175,18 @@ class CallOpClientRecvStatus {
protected:
void AddOp(grpc_op* ops, size_t* nops);
- void FinishOp(void* tag, bool* status);
+ void FinishOp(void* tag, bool* status, int max_message_size);
};
class CallOpSetInterface : public CompletionQueueTag {
public:
+ CallOpSetInterface() : max_message_size_(0) {}
virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
+
+ void set_max_message_size(int max_message_size) { max_message_size_ = max_message_size; }
+
+ protected:
+ int max_message_size_;
};
template <class T, int I>
@@ -145,27 +201,28 @@ public WrapAndDerive<Op4, 4>,
public WrapAndDerive<Op5, 5>,
public WrapAndDerive<Op6, 6> {
public:
+ CallOpSet() : return_tag_(this) {}
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE {
- this->Op1::AddOp(ops, nops);
- this->Op2::AddOp(ops, nops);
- this->Op3::AddOp(ops, nops);
- this->Op4::AddOp(ops, nops);
- this->Op5::AddOp(ops, nops);
- this->Op6::AddOp(ops, nops);
+ this->WrapAndDerive<Op1, 1>::AddOp(ops, nops);
+ this->WrapAndDerive<Op2, 2>::AddOp(ops, nops);
+ this->WrapAndDerive<Op3, 3>::AddOp(ops, nops);
+ this->WrapAndDerive<Op4, 4>::AddOp(ops, nops);
+ this->WrapAndDerive<Op5, 5>::AddOp(ops, nops);
+ this->WrapAndDerive<Op6, 6>::AddOp(ops, nops);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
- this->Op1::FinishOp(*tag, status);
- this->Op2::FinishOp(*tag, status);
- this->Op3::FinishOp(*tag, status);
- this->Op4::FinishOp(*tag, status);
- this->Op5::FinishOp(*tag, status);
- this->Op6::FinishOp(*tag, status);
+ this->WrapAndDerive<Op1, 1>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op2, 2>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op3, 3>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op4, 4>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op5, 5>::FinishOp(*tag, status, max_message_size_);
+ this->WrapAndDerive<Op6, 6>::FinishOp(*tag, status, max_message_size_);
*tag = return_tag_;
return true;
}
- void SetOutputTag(void* return_tag) { return_tag_ = return_tag; }
+ void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
private:
void *return_tag_;
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
index 561c4721ef..8c42fb4792 100644
--- a/include/grpc++/impl/client_unary_call.h
+++ b/include/grpc++/impl/client_unary_call.h
@@ -62,12 +62,12 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
CallOpClientSendClose,
CallOpClientRecvStatus> ops;
Status status;
- ops.AddSendInitialMetadata(context);
- ops.AddSendMessage(request);
- ops.AddRecvInitialMetadata(context);
- ops.AddRecvMessage(result);
- ops.AddClientSendClose();
- ops.AddClientRecvStatus(context, &status);
+ ops.SendInitialMetadata(context->send_initial_metadata_);
+ ops.SendMessage(request);
+ ops.RecvInitialMetadata(context);
+ ops.RecvMessage(result);
+ ops.ClientSendClose();
+ ops.ClientRecvStatus(context, &status);
call.PerformOps(&ops);
GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk());
return status;
diff --git a/include/grpc++/impl/proto_utils.h b/include/grpc++/impl/proto_utils.h
new file mode 100644
index 0000000000..1a0cc31a8a
--- /dev/null
+++ b/include/grpc++/impl/proto_utils.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
+#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
+
+#include <type_traits>
+
+#include <grpc++/impl/serialization_traits.h>
+#include <grpc++/config_protobuf.h>
+#include <grpc++/status.h>
+
+struct grpc_byte_buffer;
+
+namespace grpc {
+
+// Serialize the msg into a buffer created inside the function. The caller
+// should destroy the returned buffer when done with it. If serialization fails,
+// false is returned and buffer is left unchanged.
+bool SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** buffer);
+
+// The caller keeps ownership of buffer and msg.
+Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
+ int max_message_size);
+
+template <class T>
+class SerializationTraits<T, typename std::enable_if<std::is_base_of<grpc::protobuf::Message, T>::value>::type> {
+ public:
+ static bool Serialize(const grpc::protobuf::Message& msg, grpc_byte_buffer** buffer) {
+ return SerializeProto(msg, buffer);
+ }
+ static Status Deserialize(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) {
+ return DeserializeProto(buffer, msg, max_message_size);
+ }
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 50204d2099..05bba6ef7c 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -56,15 +56,15 @@ class MethodHandler {
virtual ~MethodHandler() {}
struct HandlerParameter {
HandlerParameter(Call* c, ServerContext* context,
- const grpc::protobuf::Message* req,
- grpc::protobuf::Message* resp)
- : call(c), server_context(context), request(req), response(resp) {}
+ grpc_byte_buffer* req, int max_size)
+ : call(c), server_context(context), request(req), max_message_size(max_size) {}
Call* call;
ServerContext* server_context;
- const grpc::protobuf::Message* request;
- grpc::protobuf::Message* response;
+ // Handler required to grpc_byte_buffer_destroy this
+ grpc_byte_buffer* request;
+ int max_message_size;
};
- virtual Status RunHandler(const HandlerParameter& param) = 0;
+ virtual void RunHandler(const HandlerParameter& param) = 0;
};
// A wrapper class of an application provided rpc method handler.
@@ -77,11 +77,23 @@ class RpcMethodHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- // Invoke application function, cast proto messages to their actual types.
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request),
- dynamic_cast<ResponseType*>(param.response));
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size);
+ ResponseType rsp;
+ if (status.IsOk()) {
+ status = func_(service_, param.server_context, &req, &rsp);
+ }
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.IsOk()) {
+ ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -102,10 +114,20 @@ class ClientStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReader<RequestType> reader(param.call, param.server_context);
- return func_(service_, param.server_context, &reader,
- dynamic_cast<ResponseType*>(param.response));
+ ResponseType rsp;
+ Status status = func_(service_, param.server_context, &reader, &rsp);
+
+ GPR_ASSERT(!param.server_context->sent_initial_metadata_);
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops;
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ if (status.IsOk()) {
+ ops.SendMessage(rsp);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -124,10 +146,22 @@ class ServerStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
- ServerWriter<ResponseType> writer(param.call, param.server_context);
- return func_(service_, param.server_context,
- dynamic_cast<const RequestType*>(param.request), &writer);
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ RequestType req;
+ Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size);
+
+ if (status.IsOk()) {
+ ServerWriter<ResponseType> writer(param.call, param.server_context);
+ status = func_(service_, param.server_context, &req, &writer);
+ }
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -147,10 +181,18 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service)
: func_(func), service_(service) {}
- Status RunHandler(const HandlerParameter& param) GRPC_FINAL {
+ void RunHandler(const HandlerParameter& param) GRPC_FINAL {
ServerReaderWriter<ResponseType, RequestType> stream(param.call,
param.server_context);
- return func_(service_, param.server_context, &stream);
+ Status status = func_(service_, param.server_context, &stream);
+
+ CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
+ if (!param.server_context->sent_initial_metadata_) {
+ ops.SendInitialMetadata(param.server_context->initial_metadata_);
+ }
+ ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
+ param.call->PerformOps(&ops);
+ param.call->cq()->Pluck(&ops);
}
private:
@@ -162,29 +204,16 @@ class BidiStreamingHandler : public MethodHandler {
// Server side rpc method class
class RpcServiceMethod : public RpcMethod {
public:
- // Takes ownership of the handler and two prototype objects.
+ // Takes ownership of the handler
RpcServiceMethod(const char* name, RpcMethod::RpcType type,
- MethodHandler* handler,
- grpc::protobuf::Message* request_prototype,
- grpc::protobuf::Message* response_prototype)
+ MethodHandler* handler)
: RpcMethod(name, type, nullptr),
- handler_(handler),
- request_prototype_(request_prototype),
- response_prototype_(response_prototype) {}
+ handler_(handler) {}
MethodHandler* handler() { return handler_.get(); }
- grpc::protobuf::Message* AllocateRequestProto() {
- return request_prototype_->New();
- }
- grpc::protobuf::Message* AllocateResponseProto() {
- return response_prototype_->New();
- }
-
private:
std::unique_ptr<MethodHandler> handler_;
- std::unique_ptr<grpc::protobuf::Message> request_prototype_;
- std::unique_ptr<grpc::protobuf::Message> response_prototype_;
};
// This class contains all the method information for an rpc service. It is
diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h
index d21ad92475..4648bbfc33 100644
--- a/include/grpc++/impl/serialization_traits.h
+++ b/include/grpc++/impl/serialization_traits.h
@@ -38,12 +38,9 @@ struct grpc_byte_buffer;
namespace grpc {
-template <class Message>
+template <class Message, class UnusedButHereForPartialTemplateSpecialization = void>
class SerializationTraits;
-typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest);
-typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst);
-
} // namespace grpc
#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H