diff options
Diffstat (limited to 'include/grpc++/impl')
-rw-r--r-- | include/grpc++/impl/call.h | 428 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 26 | ||||
-rw-r--r-- | include/grpc++/impl/proto_utils.h | 76 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 111 | ||||
-rw-r--r-- | include/grpc++/impl/serialization_traits.h | 68 | ||||
-rw-r--r-- | include/grpc++/impl/service_type.h | 41 |
6 files changed, 622 insertions, 128 deletions
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index aae199db1b..64fa5d6efb 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -34,14 +34,19 @@ #ifndef GRPCXX_IMPL_CALL_H #define GRPCXX_IMPL_CALL_H -#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc++/client_context.h> #include <grpc++/completion_queue.h> #include <grpc++/config.h> #include <grpc++/status.h> +#include <grpc++/impl/serialization_traits.h> +#include <functional> #include <memory> #include <map> +#include <string.h> + struct grpc_call; struct grpc_op; @@ -50,84 +55,383 @@ namespace grpc { class ByteBuffer; class Call; -class CallOpBuffer : public CompletionQueueTag { +void FillMetadataMap(grpc_metadata_array* arr, + std::multimap<grpc::string, grpc::string>* metadata); +grpc_metadata* FillMetadataArray( + const std::multimap<grpc::string, grpc::string>& metadata); + +/// Default argument for CallOpSet. I is unused by the class, but can be +/// used for generating multiple names for the same thing. +template <int I> +class CallNoOp { + protected: + void AddOp(grpc_op* ops, size_t* nops) {} + void FinishOp(bool* status, int max_message_size) {} +}; + +class CallOpSendInitialMetadata { public: - CallOpBuffer(); - ~CallOpBuffer(); - - void Reset(void* next_return_tag); - - // Does not take ownership. - void AddSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata); - void AddSendInitialMetadata(ClientContext* ctx); - void AddRecvInitialMetadata(ClientContext* ctx); - void AddSendMessage(const grpc::protobuf::Message& message); - void AddSendMessage(const ByteBuffer& message); - void AddRecvMessage(grpc::protobuf::Message* message); - void AddRecvMessage(ByteBuffer* message); - void AddClientSendClose(); - void AddClientRecvStatus(ClientContext* ctx, Status* status); - void AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata, - const Status& status); - void AddServerRecvClose(bool* cancelled); - - // INTERNAL API: - - // Convert to an array of grpc_op elements - void FillOps(grpc_op* ops, size_t* nops); - - // Called by completion queue just prior to returning from Next() or Pluck() - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + CallOpSendInitialMetadata() : send_(false) {} - void set_max_message_size(int max_message_size) { - max_message_size_ = max_message_size; + void SendInitialMetadata( + const std::multimap<grpc::string, grpc::string>& metadata) { + send_ = true; + initial_metadata_count_ = metadata.size(); + initial_metadata_ = FillMetadataArray(metadata); } - bool got_message; + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->flags = 0; + op->data.send_initial_metadata.count = initial_metadata_count_; + op->data.send_initial_metadata.metadata = initial_metadata_; + } + void FinishOp(bool* status, int max_message_size) { + if (!send_) return; + gpr_free(initial_metadata_); + send_ = false; + } - private: - void* return_tag_; - // Send initial metadata - bool send_initial_metadata_; + bool send_; size_t initial_metadata_count_; grpc_metadata* initial_metadata_; - // Recv initial metadta - std::multimap<grpc::string, grpc::string>* recv_initial_metadata_; - grpc_metadata_array recv_initial_metadata_arr_; - // Send message - const grpc::protobuf::Message* send_message_; - const ByteBuffer* send_message_buffer_; +}; + +class CallOpSendMessage { + public: + CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {} + + template <class M> + Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (send_buf_ == nullptr) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_MESSAGE; + op->flags = 0; + op->data.send_message = send_buf_; + } + void FinishOp(bool* status, int max_message_size) { + if (own_buf_) grpc_byte_buffer_destroy(send_buf_); + send_buf_ = nullptr; + } + + private: grpc_byte_buffer* send_buf_; - // Recv message - grpc::protobuf::Message* recv_message_; - ByteBuffer* recv_message_buffer_; + bool own_buf_; +}; + +template <class M> +Status CallOpSendMessage::SendMessage(const M& message) { + return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); +} + +template <class R> +class CallOpRecvMessage { + public: + CallOpRecvMessage() : got_message(false), message_(nullptr) {} + + void RecvMessage(R* message) { message_ = message; } + + bool got_message; + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (message_ == nullptr) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_MESSAGE; + op->flags = 0; + op->data.recv_message = &recv_buf_; + } + + void FinishOp(bool* status, int max_message_size) { + if (message_ == nullptr) return; + if (recv_buf_) { + if (*status) { + got_message = true; + *status = SerializationTraits<R>::Deserialize(recv_buf_, message_, + max_message_size) + .ok(); + } else { + got_message = false; + grpc_byte_buffer_destroy(recv_buf_); + } + } else { + got_message = false; + *status = false; + } + message_ = nullptr; + } + + private: + R* message_; grpc_byte_buffer* recv_buf_; - int max_message_size_; - // Client send close - bool client_send_close_; - // Client recv status +}; + +class CallOpGenericRecvMessage { + public: + CallOpGenericRecvMessage() : got_message(false) {} + + template <class R> + void RecvMessage(R* message) { + deserialize_ = [message](grpc_byte_buffer* buf, + int max_message_size) -> Status { + return SerializationTraits<R>::Deserialize(buf, message, + max_message_size); + }; + } + + bool got_message; + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!deserialize_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_MESSAGE; + op->flags = 0; + op->data.recv_message = &recv_buf_; + } + + void FinishOp(bool* status, int max_message_size) { + if (!deserialize_) return; + if (recv_buf_) { + if (*status) { + got_message = true; + *status = deserialize_(recv_buf_, max_message_size).ok(); + } else { + got_message = false; + grpc_byte_buffer_destroy(recv_buf_); + } + } else { + got_message = false; + *status = false; + } + deserialize_ = DeserializeFunc(); + } + + private: + typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc; + DeserializeFunc deserialize_; + grpc_byte_buffer* recv_buf_; +}; + +class CallOpClientSendClose { + public: + CallOpClientSendClose() : send_(false) {} + + void ClientSendClose() { send_ = true; } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + } + void FinishOp(bool* status, int max_message_size) { send_ = false; } + + private: + bool send_; +}; + +class CallOpServerSendStatus { + public: + CallOpServerSendStatus() : send_status_available_(false) {} + + void ServerSendStatus( + const std::multimap<grpc::string, grpc::string>& trailing_metadata, + const Status& status) { + trailing_metadata_count_ = trailing_metadata.size(); + trailing_metadata_ = FillMetadataArray(trailing_metadata); + send_status_available_ = true; + send_status_code_ = static_cast<grpc_status_code>(status.error_code()); + send_status_details_ = status.error_message(); + } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_status_available_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = + trailing_metadata_count_; + op->data.send_status_from_server.trailing_metadata = trailing_metadata_; + op->data.send_status_from_server.status = send_status_code_; + op->data.send_status_from_server.status_details = + send_status_details_.empty() ? nullptr : send_status_details_.c_str(); + op->flags = 0; + } + + void FinishOp(bool* status, int max_message_size) { + if (!send_status_available_) return; + gpr_free(trailing_metadata_); + send_status_available_ = false; + } + + private: + bool send_status_available_; + grpc_status_code send_status_code_; + grpc::string send_status_details_; + size_t trailing_metadata_count_; + grpc_metadata* trailing_metadata_; +}; + +class CallOpRecvInitialMetadata { + public: + CallOpRecvInitialMetadata() : recv_initial_metadata_(nullptr) {} + + void RecvInitialMetadata(ClientContext* context) { + context->initial_metadata_received_ = true; + recv_initial_metadata_ = &context->recv_initial_metadata_; + } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!recv_initial_metadata_) return; + memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_)); + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &recv_initial_metadata_arr_; + op->flags = 0; + } + void FinishOp(bool* status, int max_message_size) { + if (recv_initial_metadata_ == nullptr) return; + FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); + recv_initial_metadata_ = nullptr; + } + + private: + std::multimap<grpc::string, grpc::string>* recv_initial_metadata_; + grpc_metadata_array recv_initial_metadata_arr_; +}; + +class CallOpClientRecvStatus { + public: + CallOpClientRecvStatus() : recv_status_(nullptr) {} + + void ClientRecvStatus(ClientContext* context, Status* status) { + recv_trailing_metadata_ = &context->trailing_metadata_; + recv_status_ = status; + } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (recv_status_ == nullptr) return; + memset(&recv_trailing_metadata_arr_, 0, + sizeof(recv_trailing_metadata_arr_)); + status_details_ = nullptr; + status_details_capacity_ = 0; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &recv_trailing_metadata_arr_; + op->data.recv_status_on_client.status = &status_code_; + op->data.recv_status_on_client.status_details = &status_details_; + op->data.recv_status_on_client.status_details_capacity = + &status_details_capacity_; + op->flags = 0; + } + + void FinishOp(bool* status, int max_message_size) { + if (recv_status_ == nullptr) return; + FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_); + *recv_status_ = Status( + static_cast<StatusCode>(status_code_), + status_details_ ? grpc::string(status_details_) : grpc::string()); + gpr_free(status_details_); + recv_status_ = nullptr; + } + + private: std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_; Status* recv_status_; grpc_metadata_array recv_trailing_metadata_arr_; grpc_status_code status_code_; char* status_details_; size_t status_details_capacity_; - // Server send status - bool send_status_available_; - grpc_status_code send_status_code_; - grpc::string send_status_details_; - size_t trailing_metadata_count_; - grpc_metadata* trailing_metadata_; - int cancelled_buf_; - bool* recv_closed_; }; -// SneakyCallOpBuffer does not post completions to the completion queue -class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { +/// An abstract collection of call ops, used to generate the +/// grpc_call_op structure to pass down to the lower layers, +/// and as it is-a CompletionQueueTag, also massages the final +/// completion into the correct form for consumption in the C++ +/// API. +class CallOpSetInterface : public CompletionQueueTag { + public: + CallOpSetInterface() : max_message_size_(0) {} + /// Fills in grpc_op, starting from ops[*nops] and moving + /// upwards. + virtual void FillOps(grpc_op* ops, size_t* nops) = 0; + + void set_max_message_size(int max_message_size) { + max_message_size_ = max_message_size; + } + + protected: + int max_message_size_; +}; + +/// Primary implementaiton of CallOpSetInterface. +/// Since we cannot use variadic templates, we declare slots up to +/// the maximum count of ops we'll need in a set. We leverage the +/// empty base class optimization to slim this class (especially +/// when there are many unused slots used). To avoid duplicate base classes, +/// the template parmeter for CallNoOp is varied by argument position. +template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, + class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, + class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> +class CallOpSet : public CallOpSetInterface, + public Op1, + public Op2, + public Op3, + public Op4, + public Op5, + public Op6 { + public: + CallOpSet() : return_tag_(this) {} + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE { + this->Op1::AddOp(ops, nops); + this->Op2::AddOp(ops, nops); + this->Op3::AddOp(ops, nops); + this->Op4::AddOp(ops, nops); + this->Op5::AddOp(ops, nops); + this->Op6::AddOp(ops, nops); + } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + this->Op1::FinishOp(status, max_message_size_); + this->Op2::FinishOp(status, max_message_size_); + this->Op3::FinishOp(status, max_message_size_); + this->Op4::FinishOp(status, max_message_size_); + this->Op5::FinishOp(status, max_message_size_); + this->Op6::FinishOp(status, max_message_size_); + *tag = return_tag_; + return true; + } + + void set_output_tag(void* return_tag) { return_tag_ = return_tag; } + + private: + void* return_tag_; +}; + +/// A CallOpSet that does not post completions to the completion queue. +/// +/// Allows hiding some completions that the C core must generate from +/// C++ users. +template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, + class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, + class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> +class SneakyCallOpSet GRPC_FINAL + : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { public: bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - return CallOpBuffer::FinalizeResult(tag, status) && false; + typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base; + return Base::FinalizeResult(tag, status) && false; } }; @@ -135,7 +439,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { class CallHook { public: virtual ~CallHook() {} - virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; // Straightforward wrapping of the C call object @@ -146,7 +450,7 @@ class Call GRPC_FINAL { Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq, int max_message_size); - void PerformOps(CallOpBuffer* buffer); + void PerformOps(CallOpSetInterface* ops); grpc_call* call() { return call_; } CompletionQueue* cq() { return cq_; } diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 2f234fd3ac..b77ce7d02c 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -37,6 +37,8 @@ #include <grpc++/config.h> #include <grpc++/status.h> +#include <grpc++/impl/call.h> + namespace grpc { class ChannelInterface; @@ -45,10 +47,28 @@ class CompletionQueue; class RpcMethod; // Wrapper that performs a blocking unary call +template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); + ClientContext* context, const InputMessage& request, + OutputMessage* result) { + CompletionQueue cq; + Call call(channel->CreateCall(method, context, &cq)); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, + CallOpClientSendClose, CallOpClientRecvStatus> ops; + Status status = ops.SendMessage(request); + if (!status.ok()) { + return status; + } + ops.SendInitialMetadata(context->send_initial_metadata_); + ops.RecvInitialMetadata(context); + ops.RecvMessage(result); + ops.ClientSendClose(); + ops.ClientRecvStatus(context, &status); + call.PerformOps(&ops); + GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok()); + return status; +} } // namespace grpc diff --git a/include/grpc++/impl/proto_utils.h b/include/grpc++/impl/proto_utils.h new file mode 100644 index 0000000000..ebefa3e1be --- /dev/null +++ b/include/grpc++/impl/proto_utils.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H +#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H + +#include <type_traits> + +#include <grpc/grpc.h> +#include <grpc++/impl/serialization_traits.h> +#include <grpc++/config_protobuf.h> +#include <grpc++/status.h> + +namespace grpc { + +// Serialize the msg into a buffer created inside the function. The caller +// should destroy the returned buffer when done with it. If serialization fails, +// false is returned and buffer is left unchanged. +Status SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer); + +// The caller keeps ownership of buffer and msg. +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size); + +template <class T> +class SerializationTraits<T, typename std::enable_if<std::is_base_of< + grpc::protobuf::Message, T>::value>::type> { + public: + static Status Serialize(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer, bool* own_buffer) { + *own_buffer = true; + return SerializeProto(msg, buffer); + } + static Status Deserialize(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg, + int max_message_size) { + auto status = DeserializeProto(buffer, msg, max_message_size); + grpc_byte_buffer_destroy(buffer); + return status; + } +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 50204d2099..3cfbef7806 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -55,16 +55,19 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { - HandlerParameter(Call* c, ServerContext* context, - const grpc::protobuf::Message* req, - grpc::protobuf::Message* resp) - : call(c), server_context(context), request(req), response(resp) {} + HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req, + int max_size) + : call(c), + server_context(context), + request(req), + max_message_size(max_size) {} Call* call; ServerContext* server_context; - const grpc::protobuf::Message* request; - grpc::protobuf::Message* response; + // Handler required to grpc_byte_buffer_destroy this + grpc_byte_buffer* request; + int max_message_size; }; - virtual Status RunHandler(const HandlerParameter& param) = 0; + virtual void RunHandler(const HandlerParameter& param) = 0; }; // A wrapper class of an application provided rpc method handler. @@ -77,11 +80,25 @@ class RpcMethodHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - // Invoke application function, cast proto messages to their actual types. - return func_(service_, param.server_context, - dynamic_cast<const RequestType*>(param.request), - dynamic_cast<ResponseType*>(param.response)); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize( + param.request, &req, param.max_message_size); + ResponseType rsp; + if (status.ok()) { + status = func_(service_, param.server_context, &req, &rsp); + } + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -102,10 +119,21 @@ class ClientStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReader<RequestType> reader(param.call, param.server_context); - return func_(service_, param.server_context, &reader, - dynamic_cast<ResponseType*>(param.response)); + ResponseType rsp; + Status status = func_(service_, param.server_context, &reader, &rsp); + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -124,10 +152,23 @@ class ServerStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerWriter<ResponseType> writer(param.call, param.server_context); - return func_(service_, param.server_context, - dynamic_cast<const RequestType*>(param.request), &writer); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize( + param.request, &req, param.max_message_size); + + if (status.ok()) { + ServerWriter<ResponseType> writer(param.call, param.server_context); + status = func_(service_, param.server_context, &req, &writer); + } + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -147,10 +188,18 @@ class BidiStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReaderWriter<ResponseType, RequestType> stream(param.call, param.server_context); - return func_(service_, param.server_context, &stream); + Status status = func_(service_, param.server_context, &stream); + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -162,29 +211,15 @@ class BidiStreamingHandler : public MethodHandler { // Server side rpc method class class RpcServiceMethod : public RpcMethod { public: - // Takes ownership of the handler and two prototype objects. + // Takes ownership of the handler RpcServiceMethod(const char* name, RpcMethod::RpcType type, - MethodHandler* handler, - grpc::protobuf::Message* request_prototype, - grpc::protobuf::Message* response_prototype) - : RpcMethod(name, type, nullptr), - handler_(handler), - request_prototype_(request_prototype), - response_prototype_(response_prototype) {} + MethodHandler* handler) + : RpcMethod(name, type, nullptr), handler_(handler) {} MethodHandler* handler() { return handler_.get(); } - grpc::protobuf::Message* AllocateRequestProto() { - return request_prototype_->New(); - } - grpc::protobuf::Message* AllocateResponseProto() { - return response_prototype_->New(); - } - private: std::unique_ptr<MethodHandler> handler_; - std::unique_ptr<grpc::protobuf::Message> request_prototype_; - std::unique_ptr<grpc::protobuf::Message> response_prototype_; }; // This class contains all the method information for an rpc service. It is diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h new file mode 100644 index 0000000000..1f5c674e4c --- /dev/null +++ b/include/grpc++/impl/serialization_traits.h @@ -0,0 +1,68 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H +#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H + +namespace grpc { + +/// Defines how to serialize and deserialize some type. +/// +/// Used for hooking different message serialization API's into GRPC. +/// Each SerializationTraits implementation must provide the following +/// functions: +/// static Status Serialize(const Message& msg, +/// grpc_byte_buffer** buffer, +// bool* own_buffer); +/// static Status Deserialize(grpc_byte_buffer* buffer, +/// Message* msg, +/// int max_message_size); +/// +/// Serialize is required to convert message to a grpc_byte_buffer, and +/// to store a pointer to that byte buffer at *buffer. *own_buffer should +/// be set to true if the caller owns said byte buffer, or false if +/// ownership is retained elsewhere. +/// +/// Deserialize is required to convert buffer into the message stored at +/// msg. max_message_size is passed in as a bound on the maximum number of +/// message bytes Deserialize should accept. +/// +/// Both functions return a Status, allowing them to explain what went +/// wrong if required. +template <class Message, + class UnusedButHereForPartialTemplateSpecialization = void> +class SerializationTraits; + +} // namespace grpc + +#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 25e437edad..c33a278f5b 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -35,6 +35,8 @@ #define GRPCXX_IMPL_SERVICE_TYPE_H #include <grpc++/config.h> +#include <grpc++/impl/serialization_traits.h> +#include <grpc++/server.h> #include <grpc++/status.h> namespace grpc { @@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface { class AsynchronousService { public: - // this is Server, but in disguise to avoid a link dependency - class DispatchImpl { - public: - virtual void RequestAsyncCall(void* registered_method, - ServerContext* context, - ::grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) = 0; - }; - AsynchronousService(const char** method_names, size_t method_count) - : dispatch_impl_(nullptr), + : server_(nullptr), method_names_(method_names), method_count_(method_count), request_args_(nullptr) {} @@ -86,42 +76,43 @@ class AsynchronousService { ~AsynchronousService() { delete[] request_args_; } protected: - void RequestAsyncUnary(int index, ServerContext* context, - grpc::protobuf::Message* request, + template <class Message> + void RequestAsyncUnary(int index, ServerContext* context, Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag, request); } void RequestClientStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag); } + template <class Message> void RequestServerStreaming(int index, ServerContext* context, - grpc::protobuf::Message* request, + Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag, request); } void RequestBidiStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag); } private: friend class Server; - DispatchImpl* dispatch_impl_; + Server* server_; const char** const method_names_; size_t method_count_; void** request_args_; |