diff options
Diffstat (limited to 'include/grpc++/impl')
-rw-r--r-- | include/grpc++/impl/call.h | 109 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 12 | ||||
-rw-r--r-- | include/grpc++/impl/proto_utils.h | 70 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 101 | ||||
-rw-r--r-- | include/grpc++/impl/serialization_traits.h | 5 |
5 files changed, 225 insertions, 72 deletions
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index f8b290a851..3701e403de 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -38,6 +38,7 @@ #include <grpc++/completion_queue.h> #include <grpc++/config.h> #include <grpc++/status.h> +#include <grpc++/impl/serialization_traits.h> #include <memory> #include <map> @@ -53,7 +54,7 @@ class Call; class CallNoOp { protected: void AddOp(grpc_op* ops, size_t* nops) {} - void FinishOp(void* tag, bool* status) {} + void FinishOp(void* tag, bool* status, int max_message_size) {} }; class CallOpSendInitialMetadata { @@ -62,24 +63,71 @@ class CallOpSendInitialMetadata { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpSendMessage { public: + CallOpSendMessage() : send_buf_(nullptr) {} + template <class M> - void SendMessage(const M& message); + bool SendMessage(const M& message) { + return SerializationTraits<M>::Serialize(message, &send_buf_); + } protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void AddOp(grpc_op* ops, size_t* nops) { + if (send_buf_ == nullptr) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = send_buf_; + } + void FinishOp(void* tag, bool* status, int max_message_size) { + grpc_byte_buffer_destroy(send_buf_); + } + + private: + grpc_byte_buffer* send_buf_; }; -template <class M> +template <class R> class CallOpRecvMessage { + public: + CallOpRecvMessage() : got_message(false), message_(nullptr) {} + + void RecvMessage(R* message) { + message_ = message; + } + + bool got_message; + protected: - void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void AddOp(grpc_op* ops, size_t* nops) { + if (message_ == nullptr) return; + grpc_op *op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &recv_buf_; + } + + void FinishOp(void* tag, bool* status, int max_message_size) { + if (message_ == nullptr) return; + if (recv_buf_) { + if (*status) { + got_message = true; + *status = SerializationTraits<R>::Deserialize(recv_buf_, message_, max_message_size).IsOk(); + } else { + got_message = false; + grpc_byte_buffer_destroy(recv_buf_); + } + } else { + got_message = false; + *status = false; + } + } + + private: + R* message_; + grpc_byte_buffer* recv_buf_; }; class CallOpGenericRecvMessage { @@ -87,9 +135,11 @@ class CallOpGenericRecvMessage { template <class R> void RecvMessage(R* message); + bool got_message; + protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpClientSendClose { @@ -98,7 +148,7 @@ class CallOpClientSendClose { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpServerSendStatus { @@ -107,7 +157,7 @@ class CallOpServerSendStatus { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpRecvInitialMetadata { @@ -116,7 +166,7 @@ class CallOpRecvInitialMetadata { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpClientRecvStatus { @@ -125,12 +175,18 @@ class CallOpClientRecvStatus { protected: void AddOp(grpc_op* ops, size_t* nops); - void FinishOp(void* tag, bool* status); + void FinishOp(void* tag, bool* status, int max_message_size); }; class CallOpSetInterface : public CompletionQueueTag { public: + CallOpSetInterface() : max_message_size_(0) {} virtual void FillOps(grpc_op* ops, size_t* nops) = 0; + + void set_max_message_size(int max_message_size) { max_message_size_ = max_message_size; } + + protected: + int max_message_size_; }; template <class T, int I> @@ -145,27 +201,28 @@ public WrapAndDerive<Op4, 4>, public WrapAndDerive<Op5, 5>, public WrapAndDerive<Op6, 6> { public: + CallOpSet() : return_tag_(this) {} void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE { - this->Op1::AddOp(ops, nops); - this->Op2::AddOp(ops, nops); - this->Op3::AddOp(ops, nops); - this->Op4::AddOp(ops, nops); - this->Op5::AddOp(ops, nops); - this->Op6::AddOp(ops, nops); + this->WrapAndDerive<Op1, 1>::AddOp(ops, nops); + this->WrapAndDerive<Op2, 2>::AddOp(ops, nops); + this->WrapAndDerive<Op3, 3>::AddOp(ops, nops); + this->WrapAndDerive<Op4, 4>::AddOp(ops, nops); + this->WrapAndDerive<Op5, 5>::AddOp(ops, nops); + this->WrapAndDerive<Op6, 6>::AddOp(ops, nops); } bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - this->Op1::FinishOp(*tag, status); - this->Op2::FinishOp(*tag, status); - this->Op3::FinishOp(*tag, status); - this->Op4::FinishOp(*tag, status); - this->Op5::FinishOp(*tag, status); - this->Op6::FinishOp(*tag, status); + this->WrapAndDerive<Op1, 1>::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive<Op2, 2>::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive<Op3, 3>::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive<Op4, 4>::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive<Op5, 5>::FinishOp(*tag, status, max_message_size_); + this->WrapAndDerive<Op6, 6>::FinishOp(*tag, status, max_message_size_); *tag = return_tag_; return true; } - void SetOutputTag(void* return_tag) { return_tag_ = return_tag; } + void set_output_tag(void* return_tag) { return_tag_ = return_tag; } private: void *return_tag_; diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 561c4721ef..8c42fb4792 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -62,12 +62,12 @@ Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, CallOpClientSendClose, CallOpClientRecvStatus> ops; Status status; - ops.AddSendInitialMetadata(context); - ops.AddSendMessage(request); - ops.AddRecvInitialMetadata(context); - ops.AddRecvMessage(result); - ops.AddClientSendClose(); - ops.AddClientRecvStatus(context, &status); + ops.SendInitialMetadata(context->send_initial_metadata_); + ops.SendMessage(request); + ops.RecvInitialMetadata(context); + ops.RecvMessage(result); + ops.ClientSendClose(); + ops.ClientRecvStatus(context, &status); call.PerformOps(&ops); GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.IsOk()); return status; diff --git a/include/grpc++/impl/proto_utils.h b/include/grpc++/impl/proto_utils.h new file mode 100644 index 0000000000..1a0cc31a8a --- /dev/null +++ b/include/grpc++/impl/proto_utils.h @@ -0,0 +1,70 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H +#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H + +#include <type_traits> + +#include <grpc++/impl/serialization_traits.h> +#include <grpc++/config_protobuf.h> +#include <grpc++/status.h> + +struct grpc_byte_buffer; + +namespace grpc { + +// Serialize the msg into a buffer created inside the function. The caller +// should destroy the returned buffer when done with it. If serialization fails, +// false is returned and buffer is left unchanged. +bool SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer); + +// The caller keeps ownership of buffer and msg. +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size); + +template <class T> +class SerializationTraits<T, typename std::enable_if<std::is_base_of<grpc::protobuf::Message, T>::value>::type> { + public: + static bool Serialize(const grpc::protobuf::Message& msg, grpc_byte_buffer** buffer) { + return SerializeProto(msg, buffer); + } + static Status Deserialize(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, int max_message_size) { + return DeserializeProto(buffer, msg, max_message_size); + } +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 50204d2099..05bba6ef7c 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -56,15 +56,15 @@ class MethodHandler { virtual ~MethodHandler() {} struct HandlerParameter { HandlerParameter(Call* c, ServerContext* context, - const grpc::protobuf::Message* req, - grpc::protobuf::Message* resp) - : call(c), server_context(context), request(req), response(resp) {} + grpc_byte_buffer* req, int max_size) + : call(c), server_context(context), request(req), max_message_size(max_size) {} Call* call; ServerContext* server_context; - const grpc::protobuf::Message* request; - grpc::protobuf::Message* response; + // Handler required to grpc_byte_buffer_destroy this + grpc_byte_buffer* request; + int max_message_size; }; - virtual Status RunHandler(const HandlerParameter& param) = 0; + virtual void RunHandler(const HandlerParameter& param) = 0; }; // A wrapper class of an application provided rpc method handler. @@ -77,11 +77,23 @@ class RpcMethodHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - // Invoke application function, cast proto messages to their actual types. - return func_(service_, param.server_context, - dynamic_cast<const RequestType*>(param.request), - dynamic_cast<ResponseType*>(param.response)); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size); + ResponseType rsp; + if (status.IsOk()) { + status = func_(service_, param.server_context, &req, &rsp); + } + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.IsOk()) { + ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -102,10 +114,20 @@ class ClientStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReader<RequestType> reader(param.call, param.server_context); - return func_(service_, param.server_context, &reader, - dynamic_cast<ResponseType*>(param.response)); + ResponseType rsp; + Status status = func_(service_, param.server_context, &reader, &rsp); + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpServerSendStatus> ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.IsOk()) { + ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -124,10 +146,22 @@ class ServerStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerWriter<ResponseType> writer(param.call, param.server_context); - return func_(service_, param.server_context, - dynamic_cast<const RequestType*>(param.request), &writer); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize(param.request, &req, param.max_message_size); + + if (status.IsOk()) { + ServerWriter<ResponseType> writer(param.call, param.server_context); + status = func_(service_, param.server_context, &req, &writer); + } + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -147,10 +181,18 @@ class BidiStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReaderWriter<ResponseType, RequestType> stream(param.call, param.server_context); - return func_(service_, param.server_context, &stream); + Status status = func_(service_, param.server_context, &stream); + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -162,29 +204,16 @@ class BidiStreamingHandler : public MethodHandler { // Server side rpc method class class RpcServiceMethod : public RpcMethod { public: - // Takes ownership of the handler and two prototype objects. + // Takes ownership of the handler RpcServiceMethod(const char* name, RpcMethod::RpcType type, - MethodHandler* handler, - grpc::protobuf::Message* request_prototype, - grpc::protobuf::Message* response_prototype) + MethodHandler* handler) : RpcMethod(name, type, nullptr), - handler_(handler), - request_prototype_(request_prototype), - response_prototype_(response_prototype) {} + handler_(handler) {} MethodHandler* handler() { return handler_.get(); } - grpc::protobuf::Message* AllocateRequestProto() { - return request_prototype_->New(); - } - grpc::protobuf::Message* AllocateResponseProto() { - return response_prototype_->New(); - } - private: std::unique_ptr<MethodHandler> handler_; - std::unique_ptr<grpc::protobuf::Message> request_prototype_; - std::unique_ptr<grpc::protobuf::Message> response_prototype_; }; // This class contains all the method information for an rpc service. It is diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h index d21ad92475..4648bbfc33 100644 --- a/include/grpc++/impl/serialization_traits.h +++ b/include/grpc++/impl/serialization_traits.h @@ -38,12 +38,9 @@ struct grpc_byte_buffer; namespace grpc { -template <class Message> +template <class Message, class UnusedButHereForPartialTemplateSpecialization = void> class SerializationTraits; -typedef bool (*SerializationTraitsReadFunction)(grpc_byte_buffer* src, void* dest); -typedef bool (*SerializationTraitsWriteFunction)(const void* src, grpc_byte_buffer* dst); - } // namespace grpc #endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H |