diff options
author | yang-g <yangg@google.com> | 2015-07-06 14:05:54 -0700 |
---|---|---|
committer | yang-g <yangg@google.com> | 2015-07-06 14:05:54 -0700 |
commit | 5ea46ab2482c3724fbc7fd0aab55f324fb65999c (patch) | |
tree | 55eebc4aae8f06f931c8f75ddf84d56595f99fa1 /include/grpc++ | |
parent | 3abe60b9d08ff5a784a39f7c4a10c631547c3526 (diff) | |
parent | d426864934ac60f46e538ba81932e405fa8949b1 (diff) |
merge with upstream and resolve conflicts
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/async_unary_call.h | 56 | ||||
-rw-r--r-- | include/grpc++/byte_buffer.h | 29 | ||||
-rw-r--r-- | include/grpc++/client_context.h | 10 | ||||
-rw-r--r-- | include/grpc++/completion_queue.h | 30 | ||||
-rw-r--r-- | include/grpc++/config.h | 43 | ||||
-rw-r--r-- | include/grpc++/config_protobuf.h | 72 | ||||
-rw-r--r-- | include/grpc++/impl/call.h | 428 | ||||
-rw-r--r-- | include/grpc++/impl/client_unary_call.h | 26 | ||||
-rw-r--r-- | include/grpc++/impl/proto_utils.h | 76 | ||||
-rw-r--r-- | include/grpc++/impl/rpc_service_method.h | 111 | ||||
-rw-r--r-- | include/grpc++/impl/serialization_traits.h | 68 | ||||
-rw-r--r-- | include/grpc++/impl/service_type.h | 41 | ||||
-rw-r--r-- | include/grpc++/server.h | 126 | ||||
-rw-r--r-- | include/grpc++/server_context.h | 16 | ||||
-rw-r--r-- | include/grpc++/stream.h | 466 |
15 files changed, 1159 insertions, 439 deletions
diff --git a/include/grpc++/async_unary_call.h b/include/grpc++/async_unary_call.h index abb6308782..d631ccd134 100644 --- a/include/grpc++/async_unary_call.h +++ b/include/grpc++/async_unary_call.h @@ -51,47 +51,50 @@ class ClientAsyncResponseReaderInterface { virtual ~ClientAsyncResponseReaderInterface() {} virtual void ReadInitialMetadata(void* tag) = 0; virtual void Finish(R* msg, Status* status, void* tag) = 0; - }; template <class R> class ClientAsyncResponseReader GRPC_FINAL : public ClientAsyncResponseReaderInterface<R> { public: + template <class W> ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request) + const W& request) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - init_buf_.AddSendMessage(request); - init_buf_.AddClientSendClose(); + init_buf_.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(init_buf_.SendMessage(request).ok()); + init_buf_.ClientSendClose(); call_.PerformOps(&init_buf_); } void ReadInitialMetadata(void* tag) { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); + meta_buf_.set_output_tag(tag); + meta_buf_.RecvInitialMetadata(context_); call_.PerformOps(&meta_buf_); } void Finish(R* msg, Status* status, void* tag) { - finish_buf_.Reset(tag); + finish_buf_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_buf_.RecvInitialMetadata(context_); } - finish_buf_.AddRecvMessage(msg); - finish_buf_.AddClientRecvStatus(context_, status); + finish_buf_.RecvMessage(msg); + finish_buf_.ClientRecvStatus(context_, status); call_.PerformOps(&finish_buf_); } private: ClientContext* context_; Call call_; - SneakyCallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer finish_buf_; + SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> init_buf_; + CallOpSet<CallOpRecvInitialMetadata> meta_buf_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>, + CallOpClientRecvStatus> finish_buf_; }; template <class W> @@ -104,34 +107,36 @@ class ServerAsyncResponseWriter GRPC_FINAL void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_buf_.set_output_tag(tag); + meta_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; call_.PerformOps(&meta_buf_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_buf_.AddSendMessage(msg); + finish_buf_.ServerSendStatus( + ctx_->trailing_metadata_, finish_buf_.SendMessage(msg)); + } else { + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_buf_.Reset(tag); + finish_buf_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_buf_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); + finish_buf_.ServerSendStatus(ctx_->trailing_metadata_, status); call_.PerformOps(&finish_buf_); } @@ -140,8 +145,9 @@ class ServerAsyncResponseWriter GRPC_FINAL Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_buf_; }; } // namespace grpc diff --git a/include/grpc++/byte_buffer.h b/include/grpc++/byte_buffer.h index 3e40eaed1d..cb3c6a1159 100644 --- a/include/grpc++/byte_buffer.h +++ b/include/grpc++/byte_buffer.h @@ -39,6 +39,8 @@ #include <grpc/support/log.h> #include <grpc++/config.h> #include <grpc++/slice.h> +#include <grpc++/status.h> +#include <grpc++/impl/serialization_traits.h> #include <vector> @@ -48,7 +50,7 @@ class ByteBuffer GRPC_FINAL { public: ByteBuffer() : buffer_(nullptr) {} - ByteBuffer(Slice* slices, size_t nslices); + ByteBuffer(const Slice* slices, size_t nslices); ~ByteBuffer() { if (buffer_) { @@ -56,13 +58,16 @@ class ByteBuffer GRPC_FINAL { } } - void Dump(std::vector<Slice>* slices); + void Dump(std::vector<Slice>* slices) const; void Clear(); - size_t Length(); + size_t Length() const; private: - friend class CallOpBuffer; + friend class SerializationTraits<ByteBuffer, void>; + + ByteBuffer(const ByteBuffer&); + ByteBuffer& operator=(const ByteBuffer&); // takes ownership void set_buffer(grpc_byte_buffer* buf) { @@ -78,6 +83,22 @@ class ByteBuffer GRPC_FINAL { grpc_byte_buffer* buffer_; }; +template <> +class SerializationTraits<ByteBuffer, void> { + public: + static Status Deserialize(grpc_byte_buffer* byte_buffer, ByteBuffer* dest, + int max_message_size) { + dest->set_buffer(byte_buffer); + return Status::OK; + } + static Status Serialize(const ByteBuffer& source, grpc_byte_buffer** buffer, + bool* own_buffer) { + *buffer = source.buffer(); + *own_buffer = false; + return Status::OK; + } +}; + } // namespace grpc #endif // GRPCXX_BYTE_BUFFER_H diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 66d3c249a1..3bf5edc6c0 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -50,7 +50,6 @@ struct grpc_completion_queue; namespace grpc { -class CallOpBuffer; class ChannelInterface; class CompletionQueue; class Credentials; @@ -118,7 +117,8 @@ class ClientContext { ClientContext(const ClientContext&); ClientContext& operator=(const ClientContext&); - friend class CallOpBuffer; + friend class CallOpClientRecvStatus; + friend class CallOpRecvInitialMetadata; friend class Channel; template <class R> friend class ::grpc::ClientReader; @@ -134,6 +134,12 @@ class ClientContext { friend class ::grpc::ClientAsyncReaderWriter; template <class R> friend class ::grpc::ClientAsyncResponseReader; + template <class InputMessage, class OutputMessage> + friend Status BlockingUnaryCall(ChannelInterface* channel, + const RpcMethod& method, + ClientContext* context, + const InputMessage& request, + OutputMessage* result); grpc_call* call() { return call_; } void set_call(grpc_call* call, diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h index e8429c8f41..f32cbff06c 100644 --- a/include/grpc++/completion_queue.h +++ b/include/grpc++/completion_queue.h @@ -35,8 +35,8 @@ #define GRPCXX_COMPLETION_QUEUE_H #include <grpc/support/time.h> -#include <grpc++/impl/client_unary_call.h> #include <grpc++/impl/grpc_library.h> +#include <grpc++/status.h> #include <grpc++/time.h> struct grpc_completion_queue; @@ -55,8 +55,19 @@ template <class W> class ServerWriter; template <class R, class W> class ServerReaderWriter; - +template <class ServiceType, class RequestType, class ResponseType> +class RpcMethodHandler; +template <class ServiceType, class RequestType, class ResponseType> +class ClientStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class ServerStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class BidiStreamingHandler; + +class ChannelInterface; +class ClientContext; class CompletionQueue; +class RpcMethod; class Server; class ServerBuilder; class ServerContext; @@ -84,7 +95,7 @@ class CompletionQueue : public GrpcLibrary { // Nonblocking (until deadline) read from queue. // Cannot rely on result of tag or ok if return is TIMEOUT - template<typename T> + template <typename T> NextStatus AsyncNext(void** tag, bool* ok, const T& deadline) { TimePoint<T> deadline_tp(deadline); return AsyncNextInternal(tag, ok, deadline_tp.raw_time()); @@ -118,13 +129,22 @@ class CompletionQueue : public GrpcLibrary { friend class ::grpc::ServerWriter; template <class R, class W> friend class ::grpc::ServerReaderWriter; + template <class ServiceType, class RequestType, class ResponseType> + friend class RpcMethodHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ClientStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ServerStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class BidiStreamingHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; + template <class InputMessage, class OutputMessage> friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); + const InputMessage& request, + OutputMessage* result); NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); diff --git a/include/grpc++/config.h b/include/grpc++/config.h index ca74064be2..1362c0a1fa 100644 --- a/include/grpc++/config.h +++ b/include/grpc++/config.h @@ -77,31 +77,6 @@ #define GRPC_OVERRIDE override #endif -#ifndef GRPC_CUSTOM_PROTOBUF_INT64 -#include <google/protobuf/stubs/common.h> -#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 -#endif - -#ifndef GRPC_CUSTOM_MESSAGE -#include <google/protobuf/message.h> -#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message -#endif - -#ifndef GRPC_CUSTOM_STRING -#include <string> -#define GRPC_CUSTOM_STRING std::string -#endif - -#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream.h> -#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ - ::google::protobuf::io::ZeroCopyOutputStream -#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ - ::google::protobuf::io::ZeroCopyInputStream -#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream -#endif - #ifdef GRPC_CXX0X_NO_NULLPTR #include <memory> const class { @@ -125,23 +100,15 @@ const class { } nullptr = {}; #endif +#ifndef GRPC_CUSTOM_STRING +#include <string> +#define GRPC_CUSTOM_STRING std::string +#endif + namespace grpc { typedef GRPC_CUSTOM_STRING string; -namespace protobuf { - -typedef GRPC_CUSTOM_MESSAGE Message; -typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; - -namespace io { -typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; -typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; -typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; -} // namespace io - -} // namespace protobuf - } // namespace grpc #endif // GRPCXX_CONFIG_H diff --git a/include/grpc++/config_protobuf.h b/include/grpc++/config_protobuf.h new file mode 100644 index 0000000000..3afc7a58e2 --- /dev/null +++ b/include/grpc++/config_protobuf.h @@ -0,0 +1,72 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_CONFIG_PROTOBUF_H +#define GRPCXX_CONFIG_PROTOBUF_H + +#ifndef GRPC_CUSTOM_PROTOBUF_INT64 +#include <google/protobuf/stubs/common.h> +#define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 +#endif + +#ifndef GRPC_CUSTOM_MESSAGE +#include <google/protobuf/message.h> +#define GRPC_CUSTOM_MESSAGE ::google::protobuf::Message +#endif + +#ifndef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream.h> +#define GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM \ + ::google::protobuf::io::ZeroCopyOutputStream +#define GRPC_CUSTOM_ZEROCOPYINPUTSTREAM \ + ::google::protobuf::io::ZeroCopyInputStream +#define GRPC_CUSTOM_CODEDINPUTSTREAM ::google::protobuf::io::CodedInputStream +#endif + +namespace grpc { +namespace protobuf { + +typedef GRPC_CUSTOM_MESSAGE Message; +typedef GRPC_CUSTOM_PROTOBUF_INT64 int64; + +namespace io { +typedef GRPC_CUSTOM_ZEROCOPYOUTPUTSTREAM ZeroCopyOutputStream; +typedef GRPC_CUSTOM_ZEROCOPYINPUTSTREAM ZeroCopyInputStream; +typedef GRPC_CUSTOM_CODEDINPUTSTREAM CodedInputStream; +} // namespace io + +} // namespace protobuf +} // namespace grpc + +#endif // GRPCXX_CONFIG_PROTOBUF_H diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h index aae199db1b..64fa5d6efb 100644 --- a/include/grpc++/impl/call.h +++ b/include/grpc++/impl/call.h @@ -34,14 +34,19 @@ #ifndef GRPCXX_IMPL_CALL_H #define GRPCXX_IMPL_CALL_H -#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc++/client_context.h> #include <grpc++/completion_queue.h> #include <grpc++/config.h> #include <grpc++/status.h> +#include <grpc++/impl/serialization_traits.h> +#include <functional> #include <memory> #include <map> +#include <string.h> + struct grpc_call; struct grpc_op; @@ -50,84 +55,383 @@ namespace grpc { class ByteBuffer; class Call; -class CallOpBuffer : public CompletionQueueTag { +void FillMetadataMap(grpc_metadata_array* arr, + std::multimap<grpc::string, grpc::string>* metadata); +grpc_metadata* FillMetadataArray( + const std::multimap<grpc::string, grpc::string>& metadata); + +/// Default argument for CallOpSet. I is unused by the class, but can be +/// used for generating multiple names for the same thing. +template <int I> +class CallNoOp { + protected: + void AddOp(grpc_op* ops, size_t* nops) {} + void FinishOp(bool* status, int max_message_size) {} +}; + +class CallOpSendInitialMetadata { public: - CallOpBuffer(); - ~CallOpBuffer(); - - void Reset(void* next_return_tag); - - // Does not take ownership. - void AddSendInitialMetadata( - std::multimap<grpc::string, grpc::string>* metadata); - void AddSendInitialMetadata(ClientContext* ctx); - void AddRecvInitialMetadata(ClientContext* ctx); - void AddSendMessage(const grpc::protobuf::Message& message); - void AddSendMessage(const ByteBuffer& message); - void AddRecvMessage(grpc::protobuf::Message* message); - void AddRecvMessage(ByteBuffer* message); - void AddClientSendClose(); - void AddClientRecvStatus(ClientContext* ctx, Status* status); - void AddServerSendStatus(std::multimap<grpc::string, grpc::string>* metadata, - const Status& status); - void AddServerRecvClose(bool* cancelled); - - // INTERNAL API: - - // Convert to an array of grpc_op elements - void FillOps(grpc_op* ops, size_t* nops); - - // Called by completion queue just prior to returning from Next() or Pluck() - bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + CallOpSendInitialMetadata() : send_(false) {} - void set_max_message_size(int max_message_size) { - max_message_size_ = max_message_size; + void SendInitialMetadata( + const std::multimap<grpc::string, grpc::string>& metadata) { + send_ = true; + initial_metadata_count_ = metadata.size(); + initial_metadata_ = FillMetadataArray(metadata); } - bool got_message; + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->flags = 0; + op->data.send_initial_metadata.count = initial_metadata_count_; + op->data.send_initial_metadata.metadata = initial_metadata_; + } + void FinishOp(bool* status, int max_message_size) { + if (!send_) return; + gpr_free(initial_metadata_); + send_ = false; + } - private: - void* return_tag_; - // Send initial metadata - bool send_initial_metadata_; + bool send_; size_t initial_metadata_count_; grpc_metadata* initial_metadata_; - // Recv initial metadta - std::multimap<grpc::string, grpc::string>* recv_initial_metadata_; - grpc_metadata_array recv_initial_metadata_arr_; - // Send message - const grpc::protobuf::Message* send_message_; - const ByteBuffer* send_message_buffer_; +}; + +class CallOpSendMessage { + public: + CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {} + + template <class M> + Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (send_buf_ == nullptr) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_MESSAGE; + op->flags = 0; + op->data.send_message = send_buf_; + } + void FinishOp(bool* status, int max_message_size) { + if (own_buf_) grpc_byte_buffer_destroy(send_buf_); + send_buf_ = nullptr; + } + + private: grpc_byte_buffer* send_buf_; - // Recv message - grpc::protobuf::Message* recv_message_; - ByteBuffer* recv_message_buffer_; + bool own_buf_; +}; + +template <class M> +Status CallOpSendMessage::SendMessage(const M& message) { + return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_); +} + +template <class R> +class CallOpRecvMessage { + public: + CallOpRecvMessage() : got_message(false), message_(nullptr) {} + + void RecvMessage(R* message) { message_ = message; } + + bool got_message; + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (message_ == nullptr) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_MESSAGE; + op->flags = 0; + op->data.recv_message = &recv_buf_; + } + + void FinishOp(bool* status, int max_message_size) { + if (message_ == nullptr) return; + if (recv_buf_) { + if (*status) { + got_message = true; + *status = SerializationTraits<R>::Deserialize(recv_buf_, message_, + max_message_size) + .ok(); + } else { + got_message = false; + grpc_byte_buffer_destroy(recv_buf_); + } + } else { + got_message = false; + *status = false; + } + message_ = nullptr; + } + + private: + R* message_; grpc_byte_buffer* recv_buf_; - int max_message_size_; - // Client send close - bool client_send_close_; - // Client recv status +}; + +class CallOpGenericRecvMessage { + public: + CallOpGenericRecvMessage() : got_message(false) {} + + template <class R> + void RecvMessage(R* message) { + deserialize_ = [message](grpc_byte_buffer* buf, + int max_message_size) -> Status { + return SerializationTraits<R>::Deserialize(buf, message, + max_message_size); + }; + } + + bool got_message; + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!deserialize_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_MESSAGE; + op->flags = 0; + op->data.recv_message = &recv_buf_; + } + + void FinishOp(bool* status, int max_message_size) { + if (!deserialize_) return; + if (recv_buf_) { + if (*status) { + got_message = true; + *status = deserialize_(recv_buf_, max_message_size).ok(); + } else { + got_message = false; + grpc_byte_buffer_destroy(recv_buf_); + } + } else { + got_message = false; + *status = false; + } + deserialize_ = DeserializeFunc(); + } + + private: + typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc; + DeserializeFunc deserialize_; + grpc_byte_buffer* recv_buf_; +}; + +class CallOpClientSendClose { + public: + CallOpClientSendClose() : send_(false) {} + + void ClientSendClose() { send_ = true; } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + } + void FinishOp(bool* status, int max_message_size) { send_ = false; } + + private: + bool send_; +}; + +class CallOpServerSendStatus { + public: + CallOpServerSendStatus() : send_status_available_(false) {} + + void ServerSendStatus( + const std::multimap<grpc::string, grpc::string>& trailing_metadata, + const Status& status) { + trailing_metadata_count_ = trailing_metadata.size(); + trailing_metadata_ = FillMetadataArray(trailing_metadata); + send_status_available_ = true; + send_status_code_ = static_cast<grpc_status_code>(status.error_code()); + send_status_details_ = status.error_message(); + } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!send_status_available_) return; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = + trailing_metadata_count_; + op->data.send_status_from_server.trailing_metadata = trailing_metadata_; + op->data.send_status_from_server.status = send_status_code_; + op->data.send_status_from_server.status_details = + send_status_details_.empty() ? nullptr : send_status_details_.c_str(); + op->flags = 0; + } + + void FinishOp(bool* status, int max_message_size) { + if (!send_status_available_) return; + gpr_free(trailing_metadata_); + send_status_available_ = false; + } + + private: + bool send_status_available_; + grpc_status_code send_status_code_; + grpc::string send_status_details_; + size_t trailing_metadata_count_; + grpc_metadata* trailing_metadata_; +}; + +class CallOpRecvInitialMetadata { + public: + CallOpRecvInitialMetadata() : recv_initial_metadata_(nullptr) {} + + void RecvInitialMetadata(ClientContext* context) { + context->initial_metadata_received_ = true; + recv_initial_metadata_ = &context->recv_initial_metadata_; + } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (!recv_initial_metadata_) return; + memset(&recv_initial_metadata_arr_, 0, sizeof(recv_initial_metadata_arr_)); + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &recv_initial_metadata_arr_; + op->flags = 0; + } + void FinishOp(bool* status, int max_message_size) { + if (recv_initial_metadata_ == nullptr) return; + FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_); + recv_initial_metadata_ = nullptr; + } + + private: + std::multimap<grpc::string, grpc::string>* recv_initial_metadata_; + grpc_metadata_array recv_initial_metadata_arr_; +}; + +class CallOpClientRecvStatus { + public: + CallOpClientRecvStatus() : recv_status_(nullptr) {} + + void ClientRecvStatus(ClientContext* context, Status* status) { + recv_trailing_metadata_ = &context->trailing_metadata_; + recv_status_ = status; + } + + protected: + void AddOp(grpc_op* ops, size_t* nops) { + if (recv_status_ == nullptr) return; + memset(&recv_trailing_metadata_arr_, 0, + sizeof(recv_trailing_metadata_arr_)); + status_details_ = nullptr; + status_details_capacity_ = 0; + grpc_op* op = &ops[(*nops)++]; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &recv_trailing_metadata_arr_; + op->data.recv_status_on_client.status = &status_code_; + op->data.recv_status_on_client.status_details = &status_details_; + op->data.recv_status_on_client.status_details_capacity = + &status_details_capacity_; + op->flags = 0; + } + + void FinishOp(bool* status, int max_message_size) { + if (recv_status_ == nullptr) return; + FillMetadataMap(&recv_trailing_metadata_arr_, recv_trailing_metadata_); + *recv_status_ = Status( + static_cast<StatusCode>(status_code_), + status_details_ ? grpc::string(status_details_) : grpc::string()); + gpr_free(status_details_); + recv_status_ = nullptr; + } + + private: std::multimap<grpc::string, grpc::string>* recv_trailing_metadata_; Status* recv_status_; grpc_metadata_array recv_trailing_metadata_arr_; grpc_status_code status_code_; char* status_details_; size_t status_details_capacity_; - // Server send status - bool send_status_available_; - grpc_status_code send_status_code_; - grpc::string send_status_details_; - size_t trailing_metadata_count_; - grpc_metadata* trailing_metadata_; - int cancelled_buf_; - bool* recv_closed_; }; -// SneakyCallOpBuffer does not post completions to the completion queue -class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { +/// An abstract collection of call ops, used to generate the +/// grpc_call_op structure to pass down to the lower layers, +/// and as it is-a CompletionQueueTag, also massages the final +/// completion into the correct form for consumption in the C++ +/// API. +class CallOpSetInterface : public CompletionQueueTag { + public: + CallOpSetInterface() : max_message_size_(0) {} + /// Fills in grpc_op, starting from ops[*nops] and moving + /// upwards. + virtual void FillOps(grpc_op* ops, size_t* nops) = 0; + + void set_max_message_size(int max_message_size) { + max_message_size_ = max_message_size; + } + + protected: + int max_message_size_; +}; + +/// Primary implementaiton of CallOpSetInterface. +/// Since we cannot use variadic templates, we declare slots up to +/// the maximum count of ops we'll need in a set. We leverage the +/// empty base class optimization to slim this class (especially +/// when there are many unused slots used). To avoid duplicate base classes, +/// the template parmeter for CallNoOp is varied by argument position. +template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, + class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, + class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> +class CallOpSet : public CallOpSetInterface, + public Op1, + public Op2, + public Op3, + public Op4, + public Op5, + public Op6 { + public: + CallOpSet() : return_tag_(this) {} + void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE { + this->Op1::AddOp(ops, nops); + this->Op2::AddOp(ops, nops); + this->Op3::AddOp(ops, nops); + this->Op4::AddOp(ops, nops); + this->Op5::AddOp(ops, nops); + this->Op6::AddOp(ops, nops); + } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + this->Op1::FinishOp(status, max_message_size_); + this->Op2::FinishOp(status, max_message_size_); + this->Op3::FinishOp(status, max_message_size_); + this->Op4::FinishOp(status, max_message_size_); + this->Op5::FinishOp(status, max_message_size_); + this->Op6::FinishOp(status, max_message_size_); + *tag = return_tag_; + return true; + } + + void set_output_tag(void* return_tag) { return_tag_ = return_tag; } + + private: + void* return_tag_; +}; + +/// A CallOpSet that does not post completions to the completion queue. +/// +/// Allows hiding some completions that the C core must generate from +/// C++ users. +template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, + class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, + class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> +class SneakyCallOpSet GRPC_FINAL + : public CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> { public: bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { - return CallOpBuffer::FinalizeResult(tag, status) && false; + typedef CallOpSet<Op1, Op2, Op3, Op4, Op5, Op6> Base; + return Base::FinalizeResult(tag, status) && false; } }; @@ -135,7 +439,7 @@ class SneakyCallOpBuffer GRPC_FINAL : public CallOpBuffer { class CallHook { public: virtual ~CallHook() {} - virtual void PerformOpsOnCall(CallOpBuffer* ops, Call* call) = 0; + virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0; }; // Straightforward wrapping of the C call object @@ -146,7 +450,7 @@ class Call GRPC_FINAL { Call(grpc_call* call, CallHook* call_hook_, CompletionQueue* cq, int max_message_size); - void PerformOps(CallOpBuffer* buffer); + void PerformOps(CallOpSetInterface* ops); grpc_call* call() { return call_; } CompletionQueue* cq() { return cq_; } diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h index 2f234fd3ac..b77ce7d02c 100644 --- a/include/grpc++/impl/client_unary_call.h +++ b/include/grpc++/impl/client_unary_call.h @@ -37,6 +37,8 @@ #include <grpc++/config.h> #include <grpc++/status.h> +#include <grpc++/impl/call.h> + namespace grpc { class ChannelInterface; @@ -45,10 +47,28 @@ class CompletionQueue; class RpcMethod; // Wrapper that performs a blocking unary call +template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, - const grpc::protobuf::Message& request, - grpc::protobuf::Message* result); + ClientContext* context, const InputMessage& request, + OutputMessage* result) { + CompletionQueue cq; + Call call(channel->CreateCall(method, context, &cq)); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, + CallOpClientSendClose, CallOpClientRecvStatus> ops; + Status status = ops.SendMessage(request); + if (!status.ok()) { + return status; + } + ops.SendInitialMetadata(context->send_initial_metadata_); + ops.RecvInitialMetadata(context); + ops.RecvMessage(result); + ops.ClientSendClose(); + ops.ClientRecvStatus(context, &status); + call.PerformOps(&ops); + GPR_ASSERT((cq.Pluck(&ops) && ops.got_message) || !status.ok()); + return status; +} } // namespace grpc diff --git a/include/grpc++/impl/proto_utils.h b/include/grpc++/impl/proto_utils.h new file mode 100644 index 0000000000..ebefa3e1be --- /dev/null +++ b/include/grpc++/impl/proto_utils.h @@ -0,0 +1,76 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H +#define GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H + +#include <type_traits> + +#include <grpc/grpc.h> +#include <grpc++/impl/serialization_traits.h> +#include <grpc++/config_protobuf.h> +#include <grpc++/status.h> + +namespace grpc { + +// Serialize the msg into a buffer created inside the function. The caller +// should destroy the returned buffer when done with it. If serialization fails, +// false is returned and buffer is left unchanged. +Status SerializeProto(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer); + +// The caller keeps ownership of buffer and msg. +Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, + int max_message_size); + +template <class T> +class SerializationTraits<T, typename std::enable_if<std::is_base_of< + grpc::protobuf::Message, T>::value>::type> { + public: + static Status Serialize(const grpc::protobuf::Message& msg, + grpc_byte_buffer** buffer, bool* own_buffer) { + *own_buffer = true; + return SerializeProto(msg, buffer); + } + static Status Deserialize(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg, + int max_message_size) { + auto status = DeserializeProto(buffer, msg, max_message_size); + grpc_byte_buffer_destroy(buffer); + return status; + } +}; + +} // namespace grpc + +#endif // GRPC_INTERNAL_CPP_PROTO_PROTO_UTILS_H diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h index 50204d2099..3cfbef7806 100644 --- a/include/grpc++/impl/rpc_service_method.h +++ b/include/grpc++/impl/rpc_service_method.h @@ -55,16 +55,19 @@ class MethodHandler { public: virtual ~MethodHandler() {} struct HandlerParameter { - HandlerParameter(Call* c, ServerContext* context, - const grpc::protobuf::Message* req, - grpc::protobuf::Message* resp) - : call(c), server_context(context), request(req), response(resp) {} + HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req, + int max_size) + : call(c), + server_context(context), + request(req), + max_message_size(max_size) {} Call* call; ServerContext* server_context; - const grpc::protobuf::Message* request; - grpc::protobuf::Message* response; + // Handler required to grpc_byte_buffer_destroy this + grpc_byte_buffer* request; + int max_message_size; }; - virtual Status RunHandler(const HandlerParameter& param) = 0; + virtual void RunHandler(const HandlerParameter& param) = 0; }; // A wrapper class of an application provided rpc method handler. @@ -77,11 +80,25 @@ class RpcMethodHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - // Invoke application function, cast proto messages to their actual types. - return func_(service_, param.server_context, - dynamic_cast<const RequestType*>(param.request), - dynamic_cast<ResponseType*>(param.response)); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize( + param.request, &req, param.max_message_size); + ResponseType rsp; + if (status.ok()) { + status = func_(service_, param.server_context, &req, &rsp); + } + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -102,10 +119,21 @@ class ClientStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReader<RequestType> reader(param.call, param.server_context); - return func_(service_, param.server_context, &reader, - dynamic_cast<ResponseType*>(param.response)); + ResponseType rsp; + Status status = func_(service_, param.server_context, &reader, &rsp); + + GPR_ASSERT(!param.server_context->sent_initial_metadata_); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> ops; + ops.SendInitialMetadata(param.server_context->initial_metadata_); + if (status.ok()) { + status = ops.SendMessage(rsp); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -124,10 +152,23 @@ class ServerStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { - ServerWriter<ResponseType> writer(param.call, param.server_context); - return func_(service_, param.server_context, - dynamic_cast<const RequestType*>(param.request), &writer); + void RunHandler(const HandlerParameter& param) GRPC_FINAL { + RequestType req; + Status status = SerializationTraits<RequestType>::Deserialize( + param.request, &req, param.max_message_size); + + if (status.ok()) { + ServerWriter<ResponseType> writer(param.call, param.server_context); + status = func_(service_, param.server_context, &req, &writer); + } + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -147,10 +188,18 @@ class BidiStreamingHandler : public MethodHandler { ServiceType* service) : func_(func), service_(service) {} - Status RunHandler(const HandlerParameter& param) GRPC_FINAL { + void RunHandler(const HandlerParameter& param) GRPC_FINAL { ServerReaderWriter<ResponseType, RequestType> stream(param.call, param.server_context); - return func_(service_, param.server_context, &stream); + Status status = func_(service_, param.server_context, &stream); + + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops; + if (!param.server_context->sent_initial_metadata_) { + ops.SendInitialMetadata(param.server_context->initial_metadata_); + } + ops.ServerSendStatus(param.server_context->trailing_metadata_, status); + param.call->PerformOps(&ops); + param.call->cq()->Pluck(&ops); } private: @@ -162,29 +211,15 @@ class BidiStreamingHandler : public MethodHandler { // Server side rpc method class class RpcServiceMethod : public RpcMethod { public: - // Takes ownership of the handler and two prototype objects. + // Takes ownership of the handler RpcServiceMethod(const char* name, RpcMethod::RpcType type, - MethodHandler* handler, - grpc::protobuf::Message* request_prototype, - grpc::protobuf::Message* response_prototype) - : RpcMethod(name, type, nullptr), - handler_(handler), - request_prototype_(request_prototype), - response_prototype_(response_prototype) {} + MethodHandler* handler) + : RpcMethod(name, type, nullptr), handler_(handler) {} MethodHandler* handler() { return handler_.get(); } - grpc::protobuf::Message* AllocateRequestProto() { - return request_prototype_->New(); - } - grpc::protobuf::Message* AllocateResponseProto() { - return response_prototype_->New(); - } - private: std::unique_ptr<MethodHandler> handler_; - std::unique_ptr<grpc::protobuf::Message> request_prototype_; - std::unique_ptr<grpc::protobuf::Message> response_prototype_; }; // This class contains all the method information for an rpc service. It is diff --git a/include/grpc++/impl/serialization_traits.h b/include/grpc++/impl/serialization_traits.h new file mode 100644 index 0000000000..1f5c674e4c --- /dev/null +++ b/include/grpc++/impl/serialization_traits.h @@ -0,0 +1,68 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_IMPL_SERIALIZATION_TRAITS_H +#define GRPCXX_IMPL_SERIALIZATION_TRAITS_H + +namespace grpc { + +/// Defines how to serialize and deserialize some type. +/// +/// Used for hooking different message serialization API's into GRPC. +/// Each SerializationTraits implementation must provide the following +/// functions: +/// static Status Serialize(const Message& msg, +/// grpc_byte_buffer** buffer, +// bool* own_buffer); +/// static Status Deserialize(grpc_byte_buffer* buffer, +/// Message* msg, +/// int max_message_size); +/// +/// Serialize is required to convert message to a grpc_byte_buffer, and +/// to store a pointer to that byte buffer at *buffer. *own_buffer should +/// be set to true if the caller owns said byte buffer, or false if +/// ownership is retained elsewhere. +/// +/// Deserialize is required to convert buffer into the message stored at +/// msg. max_message_size is passed in as a bound on the maximum number of +/// message bytes Deserialize should accept. +/// +/// Both functions return a Status, allowing them to explain what went +/// wrong if required. +template <class Message, + class UnusedButHereForPartialTemplateSpecialization = void> +class SerializationTraits; + +} // namespace grpc + +#endif // GRPCXX_IMPL_SERIALIZATION_TRAITS_H diff --git a/include/grpc++/impl/service_type.h b/include/grpc++/impl/service_type.h index 25e437edad..c33a278f5b 100644 --- a/include/grpc++/impl/service_type.h +++ b/include/grpc++/impl/service_type.h @@ -35,6 +35,8 @@ #define GRPCXX_IMPL_SERVICE_TYPE_H #include <grpc++/config.h> +#include <grpc++/impl/serialization_traits.h> +#include <grpc++/server.h> #include <grpc++/status.h> namespace grpc { @@ -65,20 +67,8 @@ class ServerAsyncStreamingInterface { class AsynchronousService { public: - // this is Server, but in disguise to avoid a link dependency - class DispatchImpl { - public: - virtual void RequestAsyncCall(void* registered_method, - ServerContext* context, - ::grpc::protobuf::Message* request, - ServerAsyncStreamingInterface* stream, - CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) = 0; - }; - AsynchronousService(const char** method_names, size_t method_count) - : dispatch_impl_(nullptr), + : server_(nullptr), method_names_(method_names), method_count_(method_count), request_args_(nullptr) {} @@ -86,42 +76,43 @@ class AsynchronousService { ~AsynchronousService() { delete[] request_args_; } protected: - void RequestAsyncUnary(int index, ServerContext* context, - grpc::protobuf::Message* request, + template <class Message> + void RequestAsyncUnary(int index, ServerContext* context, Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag, request); } void RequestClientStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag); } + template <class Message> void RequestServerStreaming(int index, ServerContext* context, - grpc::protobuf::Message* request, + Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, request, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag, request); } void RequestBidiStreaming(int index, ServerContext* context, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, void* tag) { - dispatch_impl_->RequestAsyncCall(request_args_[index], context, nullptr, - stream, call_cq, notification_cq, tag); + server_->RequestAsyncCall(request_args_[index], context, stream, call_cq, + notification_cq, tag); } private: friend class Server; - DispatchImpl* dispatch_impl_; + Server* server_; const char** const method_names_; size_t method_count_; void** request_args_; diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 2cfeb359fc..6a9e757e77 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -41,25 +41,24 @@ #include <grpc++/config.h> #include <grpc++/impl/call.h> #include <grpc++/impl/grpc_library.h> -#include <grpc++/impl/service_type.h> #include <grpc++/impl/sync.h> #include <grpc++/status.h> struct grpc_server; namespace grpc { + class AsynchronousService; class GenericServerContext; class AsyncGenericService; class RpcService; class RpcServiceMethod; +class ServerAsyncStreamingInterface; class ServerCredentials; class ThreadPoolInterface; // Currently it only supports handling rpcs in a single thread. -class Server GRPC_FINAL : public GrpcLibrary, - private CallHook, - private AsynchronousService::DispatchImpl { +class Server GRPC_FINAL : public GrpcLibrary, private CallHook { public: ~Server(); @@ -73,6 +72,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private: friend class AsyncGenericService; + friend class AsynchronousService; friend class ServerBuilder; class SyncRequest; @@ -96,21 +96,123 @@ class Server GRPC_FINAL : public GrpcLibrary, void RunRpc(); void ScheduleCallback(); - void PerformOpsOnCall(CallOpBuffer* ops, Call* call) GRPC_OVERRIDE; + void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE; + + class BaseAsyncRequest : public CompletionQueueTag { + public: + BaseAsyncRequest(Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, void* tag); + virtual ~BaseAsyncRequest(); + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + protected: + Server* const server_; + ServerContext* const context_; + ServerAsyncStreamingInterface* const stream_; + CompletionQueue* const call_cq_; + void* const tag_; + grpc_call* call_; + grpc_metadata_array initial_metadata_array_; + }; + + class RegisteredAsyncRequest : public BaseAsyncRequest { + public: + RegisteredAsyncRequest(Server* server, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, void* tag); + + // uses BaseAsyncRequest::FinalizeResult + + protected: + void IssueRequest(void* registered_method, grpc_byte_buffer** payload, + ServerCompletionQueue* notification_cq); + }; + + class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { + public: + NoPayloadAsyncRequest(void* registered_method, Server* server, + ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag) + : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { + IssueRequest(registered_method, nullptr, notification_cq); + } + + // uses RegisteredAsyncRequest::FinalizeResult + }; + + template <class Message> + class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest { + public: + PayloadAsyncRequest(void* registered_method, Server* server, + ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + Message* request) + : RegisteredAsyncRequest(server, context, stream, call_cq, tag), + request_(request) { + IssueRequest(registered_method, &payload_, notification_cq); + } + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { + bool serialization_status = + *status && payload_ && + SerializationTraits<Message>::Deserialize(payload_, request_, + server_->max_message_size_) + .ok(); + bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status); + *status = serialization_status && *status; + return ret; + } + + private: + grpc_byte_buffer* payload_; + Message* const request_; + }; + + class GenericAsyncRequest GRPC_FINAL : public BaseAsyncRequest { + public: + GenericAsyncRequest(Server* server, GenericServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag); + + bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE; + + private: + grpc_call_details call_details_; + }; + + template <class Message> + void RequestAsyncCall(void* registered_method, ServerContext* context, + ServerAsyncStreamingInterface* stream, + CompletionQueue* call_cq, + ServerCompletionQueue* notification_cq, void* tag, + Message* message) { + new PayloadAsyncRequest<Message>(registered_method, this, context, stream, + call_cq, notification_cq, tag, message); + } - // DispatchImpl void RequestAsyncCall(void* registered_method, ServerContext* context, - grpc::protobuf::Message* request, ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, - ServerCompletionQueue* notification_cq, - void* tag) GRPC_OVERRIDE; + ServerCompletionQueue* notification_cq, void* tag) { + new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq, + notification_cq, tag); + } void RequestAsyncGenericCall(GenericServerContext* context, ServerAsyncStreamingInterface* stream, - CompletionQueue* cq, + CompletionQueue* call_cq, ServerCompletionQueue* notification_cq, - void* tag); + void* tag) { + new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, + tag); + } const int max_message_size_; @@ -133,8 +235,6 @@ class Server GRPC_FINAL : public GrpcLibrary, ThreadPoolInterface* thread_pool_; // Whether the thread pool is created and owned by the server. bool thread_pool_owned_; - private: - Server() : max_message_size_(-1), server_(NULL) { abort(); } }; } // namespace grpc diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 5a6af299e3..3bf21e02bf 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -62,6 +62,14 @@ template <class W> class ServerWriter; template <class R, class W> class ServerReaderWriter; +template <class ServiceType, class RequestType, class ResponseType> +class RpcMethodHandler; +template <class ServiceType, class RequestType, class ResponseType> +class ClientStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class ServerStreamingHandler; +template <class ServiceType, class RequestType, class ResponseType> +class BidiStreamingHandler; class Call; class CallOpBuffer; @@ -109,6 +117,14 @@ class ServerContext { friend class ::grpc::ServerWriter; template <class R, class W> friend class ::grpc::ServerReaderWriter; + template <class ServiceType, class RequestType, class ResponseType> + friend class RpcMethodHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ClientStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class ServerStreamingHandler; + template <class ServiceType, class RequestType, class ResponseType> + friend class BidiStreamingHandler; // Prevent copying. ServerContext(const ServerContext&); diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h index 472911e62b..dd5e52d6d3 100644 --- a/include/grpc++/stream.h +++ b/include/grpc++/stream.h @@ -93,15 +93,18 @@ template <class R> class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { public: // Blocking create a stream and write the first request out. + template <class W> ClientReader(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, const grpc::protobuf::Message& request) + ClientContext* context, const W& request) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - buf.AddSendMessage(request); - buf.AddClientSendClose(); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpClientSendClose> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(ops.SendMessage(request).ok()); + ops.ClientSendClose(); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } // Blocking wait for initial metadata from server. The received metadata @@ -111,28 +114,28 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> { void WaitForInitialMetadata() { GPR_ASSERT(!context_->initial_metadata_received_); - CallOpBuffer buf; - buf.AddRecvInitialMetadata(context_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); // status ignored + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(context_); + ops.RecvInitialMetadata(context_); } - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf) && buf.got_message; + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; } Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpClientRecvStatus> ops; Status status; - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } @@ -150,48 +153,49 @@ class ClientWriterInterface : public ClientStreamingInterface, }; template <class W> -class ClientWriter GRPC_FINAL : public ClientWriterInterface<W> { +class ClientWriter : public ClientWriterInterface<W> { public: // Blocking create a stream. + template <class R> ClientWriter(ChannelInterface* channel, const RpcMethod& method, - ClientContext* context, grpc::protobuf::Message* response) - : context_(context), - response_(response), - call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + ClientContext* context, R* response) + : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + finish_ops_.RecvMessage(response); + + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpSendMessage> ops; + if (!ops.SendMessage(msg).ok()) { + return false; + } + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } // Read the final response and wait for the final status. Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; Status status; - buf.AddRecvMessage(response_); - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + finish_ops_.ClientRecvStatus(context_, &status); + call_.PerformOps(&finish_ops_); + GPR_ASSERT(cq_.Pluck(&finish_ops_)); return status; } private: ClientContext* context_; - grpc::protobuf::Message* const response_; + CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_; CompletionQueue cq_; Call call_; }; @@ -213,10 +217,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) : context_(context), call_(channel->CreateCall(method, context, &cq_)) { - CallOpBuffer buf; - buf.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); } // Blocking wait for initial metadata from server. The received metadata @@ -226,42 +230,42 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> { void WaitForInitialMetadata() { GPR_ASSERT(!context_->initial_metadata_received_); - CallOpBuffer buf; - buf.AddRecvInitialMetadata(context_); - call_.PerformOps(&buf); - cq_.Pluck(&buf); // status ignored + CallOpSet<CallOpRecvInitialMetadata> ops; + ops.RecvInitialMetadata(context_); + call_.PerformOps(&ops); + cq_.Pluck(&ops); // status ignored } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; if (!context_->initial_metadata_received_) { - buf.AddRecvInitialMetadata(context_); + ops.RecvInitialMetadata(context_); } - buf.AddRecvMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf) && buf.got_message; + ops.RecvMessage(msg); + call_.PerformOps(&ops); + return cq_.Pluck(&ops) && ops.got_message; } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddSendMessage(msg); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpSendMessage> ops; + if (!ops.SendMessage(msg).ok()) return false; + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } bool WritesDone() GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddClientSendClose(); - call_.PerformOps(&buf); - return cq_.Pluck(&buf); + CallOpSet<CallOpClientSendClose> ops; + ops.ClientSendClose(); + call_.PerformOps(&ops); + return cq_.Pluck(&ops); } Status Finish() GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpClientRecvStatus> ops; Status status; - buf.AddClientRecvStatus(context_, &status); - call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + ops.ClientRecvStatus(context_, &status); + call_.PerformOps(&ops); + GPR_ASSERT(cq_.Pluck(&ops)); return status; } @@ -279,18 +283,18 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> { void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf) && buf.got_message; + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; } private: @@ -306,22 +310,24 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> { void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; + if (!ops.SendMessage(msg).ok()) { + return false; + } if (!ctx_->sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - buf.AddSendMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); } private: @@ -339,29 +345,31 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>, void SendInitialMetadata() { GPR_ASSERT(!ctx_->sent_initial_metadata_); - CallOpBuffer buf; - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + CallOpSet<CallOpSendInitialMetadata> ops; + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_->PerformOps(&buf); - call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + call_->cq()->Pluck(&ops); } bool Read(R* msg) GRPC_OVERRIDE { - CallOpBuffer buf; - buf.AddRecvMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf) && buf.got_message; + CallOpSet<CallOpRecvMessage<R>> ops; + ops.RecvMessage(msg); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops) && ops.got_message; } bool Write(const W& msg) GRPC_OVERRIDE { - CallOpBuffer buf; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops; + if (!ops.SendMessage(msg).ok()) { + return false; + } if (!ctx_->sent_initial_metadata_) { - buf.AddSendInitialMetadata(&ctx_->initial_metadata_); + ops.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - buf.AddSendMessage(msg); - call_->PerformOps(&buf); - return call_->cq()->Pluck(&buf); + call_->PerformOps(&ops); + return call_->cq()->Pluck(&ops); } private: @@ -400,57 +408,59 @@ class AsyncWriterInterface { template <class R> class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface, - public AsyncReaderInterface<R> { -}; + public AsyncReaderInterface<R> {}; template <class R> class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> { public: // Create a stream and write the first request out. + template <class W> ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - const grpc::protobuf::Message& request, void* tag) + const W& request, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - init_buf_.AddSendMessage(request); - init_buf_.AddClientSendClose(); - call_.PerformOps(&init_buf_); + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + // TODO(ctiller): don't assert + GPR_ASSERT(init_ops_.SendMessage(request).ok()); + init_ops_.ClientSendClose(); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> + init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; template <class W> @@ -463,56 +473,57 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface, template <class W> class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> { public: + template <class R> ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq, const RpcMethod& method, ClientContext* context, - grpc::protobuf::Message* response, void* tag) - : context_(context), - response_(response), - call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&init_buf_); + R* response, void* tag) + : context_(context), call_(channel->CreateCall(method, context, cq)) { + finish_ops_.RecvMessage(response); + + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.set_output_tag(tag); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_buf_.Reset(tag); - writes_done_buf_.AddClientSendClose(); - call_.PerformOps(&writes_done_buf_); + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddRecvMessage(response_); - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; - grpc::protobuf::Message* const response_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer write_buf_; - CallOpBuffer writes_done_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage, + CallOpClientRecvStatus> finish_ops_; }; // Client-side interface for bi-directional streaming. @@ -532,58 +543,59 @@ class ClientAsyncReaderWriter GRPC_FINAL const RpcMethod& method, ClientContext* context, void* tag) : context_(context), call_(channel->CreateCall(method, context, cq)) { - init_buf_.Reset(tag); - init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_); - call_.PerformOps(&init_buf_); + init_ops_.set_output_tag(tag); + init_ops_.SendInitialMetadata(context->send_initial_metadata_); + call_.PerformOps(&init_ops_); } void ReadInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!context_->initial_metadata_received_); - meta_buf_.Reset(tag); - meta_buf_.AddRecvInitialMetadata(context_); - call_.PerformOps(&meta_buf_); + meta_ops_.set_output_tag(tag); + meta_ops_.RecvInitialMetadata(context_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); + read_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - read_buf_.AddRecvInitialMetadata(context_); + read_ops_.RecvInitialMetadata(context_); } - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + write_ops_.set_output_tag(tag); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void WritesDone(void* tag) GRPC_OVERRIDE { - writes_done_buf_.Reset(tag); - writes_done_buf_.AddClientSendClose(); - call_.PerformOps(&writes_done_buf_); + writes_done_ops_.set_output_tag(tag); + writes_done_ops_.ClientSendClose(); + call_.PerformOps(&writes_done_ops_); } void Finish(Status* status, void* tag) GRPC_OVERRIDE { - finish_buf_.Reset(tag); + finish_ops_.set_output_tag(tag); if (!context_->initial_metadata_received_) { - finish_buf_.AddRecvInitialMetadata(context_); + finish_ops_.RecvInitialMetadata(context_); } - finish_buf_.AddClientRecvStatus(context_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ClientRecvStatus(context_, status); + call_.PerformOps(&finish_ops_); } private: ClientContext* context_; Call call_; - CallOpBuffer init_buf_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer write_buf_; - CallOpBuffer writes_done_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> init_ops_; + CallOpSet<CallOpRecvInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendMessage> write_ops_; + CallOpSet<CallOpClientSendClose> writes_done_ops_; + CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_; }; template <class W, class R> @@ -596,41 +608,44 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Finish(const W& msg, const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } // The response is dropped if the status is not OK. if (status.ok()) { - finish_buf_.AddSendMessage(msg); + finish_ops_.ServerSendStatus( + ctx_->trailing_metadata_, + finish_ops_.SendMessage(msg)); + } else { + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + call_.PerformOps(&finish_ops_); } void FinishWithError(const Status& status, void* tag) { GPR_ASSERT(!status.ok()); - finish_buf_.Reset(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -638,9 +653,10 @@ class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, + CallOpServerSendStatus> finish_ops_; }; template <class W> @@ -653,30 +669,31 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -684,9 +701,9 @@ class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer write_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; // Server-side interface for bi-directional streaming. @@ -701,36 +718,37 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, void SendInitialMetadata(void* tag) GRPC_OVERRIDE { GPR_ASSERT(!ctx_->sent_initial_metadata_); - meta_buf_.Reset(tag); - meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + meta_ops_.set_output_tag(tag); + meta_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; - call_.PerformOps(&meta_buf_); + call_.PerformOps(&meta_ops_); } void Read(R* msg, void* tag) GRPC_OVERRIDE { - read_buf_.Reset(tag); - read_buf_.AddRecvMessage(msg); - call_.PerformOps(&read_buf_); + read_ops_.set_output_tag(tag); + read_ops_.RecvMessage(msg); + call_.PerformOps(&read_ops_); } void Write(const W& msg, void* tag) GRPC_OVERRIDE { - write_buf_.Reset(tag); + write_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + write_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - write_buf_.AddSendMessage(msg); - call_.PerformOps(&write_buf_); + // TODO(ctiller): don't assert + GPR_ASSERT(write_ops_.SendMessage(msg).ok()); + call_.PerformOps(&write_ops_); } void Finish(const Status& status, void* tag) { - finish_buf_.Reset(tag); + finish_ops_.set_output_tag(tag); if (!ctx_->sent_initial_metadata_) { - finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_); + finish_ops_.SendInitialMetadata(ctx_->initial_metadata_); ctx_->sent_initial_metadata_ = true; } - finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status); - call_.PerformOps(&finish_buf_); + finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status); + call_.PerformOps(&finish_ops_); } private: @@ -738,10 +756,10 @@ class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface, Call call_; ServerContext* ctx_; - CallOpBuffer meta_buf_; - CallOpBuffer read_buf_; - CallOpBuffer write_buf_; - CallOpBuffer finish_buf_; + CallOpSet<CallOpSendInitialMetadata> meta_ops_; + CallOpSet<CallOpRecvMessage<R>> read_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_; + CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_; }; } // namespace grpc |