diff options
author | 2017-05-11 12:27:45 -0700 | |
---|---|---|
committer | 2017-05-11 12:27:45 -0700 | |
commit | 54773cfed86f011b791984f9e7a1084807d9f007 (patch) | |
tree | b3293bc8a90411527fc6561a8bd9e5ad2394941a /include/grpc++ | |
parent | b79e809918aeea3f5a64aea6b870bec57128bd6f (diff) | |
parent | 45b89fb11ca3cd524787aeba7a1270f744a1256c (diff) |
Merge github.com:grpc/grpc into serve_fries
Diffstat (limited to 'include/grpc++')
-rw-r--r-- | include/grpc++/impl/codegen/call.h | 37 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/config_protobuf.h | 2 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen.h | 6 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/core_codegen_interface.h | 7 | ||||
-rw-r--r-- | include/grpc++/impl/codegen/proto_utils.h | 111 |
5 files changed, 88 insertions, 75 deletions
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h index f334ba61d6..9fe2bbb75e 100644 --- a/include/grpc++/impl/codegen/call.h +++ b/include/grpc++/impl/codegen/call.h @@ -364,28 +364,6 @@ class CallOpRecvMessage { bool allow_not_getting_message_; }; -namespace CallOpGenericRecvMessageHelper { -class DeserializeFunc { - public: - virtual Status Deserialize(grpc_byte_buffer* buf) = 0; - virtual ~DeserializeFunc() {} -}; - -template <class R> -class DeserializeFuncType final : public DeserializeFunc { - public: - DeserializeFuncType(R* message) : message_(message) {} - Status Deserialize(grpc_byte_buffer* buf) override { - return SerializationTraits<R>::Deserialize(buf, message_); - } - - ~DeserializeFuncType() override {} - - private: - R* message_; // Not a managed pointer because management is external to this -}; -} // namespace CallOpGenericRecvMessageHelper - class CallOpGenericRecvMessage { public: CallOpGenericRecvMessage() @@ -393,11 +371,9 @@ class CallOpGenericRecvMessage { template <class R> void RecvMessage(R* message) { - // Use an explicit base class pointer to avoid resolution error in the - // following unique_ptr::reset for some old implementations. - CallOpGenericRecvMessageHelper::DeserializeFunc* func = - new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message); - deserialize_.reset(func); + deserialize_ = [message](grpc_byte_buffer* buf) -> Status { + return SerializationTraits<R>::Deserialize(buf, message); + }; } // Do not change status if no message is received. @@ -420,7 +396,7 @@ class CallOpGenericRecvMessage { if (recv_buf_) { if (*status) { got_message = true; - *status = deserialize_->Deserialize(recv_buf_).ok(); + *status = deserialize_(recv_buf_).ok(); } else { got_message = false; g_core_codegen_interface->grpc_byte_buffer_destroy(recv_buf_); @@ -431,11 +407,12 @@ class CallOpGenericRecvMessage { *status = false; } } - deserialize_.reset(); + deserialize_ = DeserializeFunc(); } private: - std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_; + typedef std::function<Status(grpc_byte_buffer*)> DeserializeFunc; + DeserializeFunc deserialize_; grpc_byte_buffer* recv_buf_; bool allow_not_getting_message_; }; diff --git a/include/grpc++/impl/codegen/config_protobuf.h b/include/grpc++/impl/codegen/config_protobuf.h index 8620d4b218..e66189f8d1 100644 --- a/include/grpc++/impl/codegen/config_protobuf.h +++ b/include/grpc++/impl/codegen/config_protobuf.h @@ -34,6 +34,8 @@ #ifndef GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H #define GRPCXX_IMPL_CODEGEN_CONFIG_PROTOBUF_H +#define GRPC_OPEN_SOURCE_PROTO + #ifndef GRPC_CUSTOM_PROTOBUF_INT64 #include <google/protobuf/stubs/common.h> #define GRPC_CUSTOM_PROTOBUF_INT64 ::google::protobuf::int64 diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index a1593729eb..7a1236a716 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -90,11 +90,15 @@ class CoreCodegen final : public CoreCodegenInterface { grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) override; - + grpc_slice grpc_slice_new_with_user_data(void* p, size_t len, + void (*destroy)(void*), + void* user_data) override; grpc_slice grpc_empty_slice() override; grpc_slice grpc_slice_malloc(size_t length) override; void grpc_slice_unref(grpc_slice slice) override; + grpc_slice grpc_slice_ref(grpc_slice slice) override; grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) override; + grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) override; void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) override; void grpc_slice_buffer_pop(grpc_slice_buffer* sb) override; grpc_slice grpc_slice_from_static_buffer(const void* buffer, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 7cc3a82476..4ac37f5255 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -101,15 +101,18 @@ class CoreCodegenInterface { virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice, size_t nslices) = 0; - + virtual grpc_slice grpc_slice_new_with_user_data(void* p, size_t len, + void (*destroy)(void*), + void* user_data) = 0; virtual void grpc_call_ref(grpc_call* call) = 0; virtual void grpc_call_unref(grpc_call* call) = 0; virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) = 0; - virtual grpc_slice grpc_empty_slice() = 0; virtual grpc_slice grpc_slice_malloc(size_t length) = 0; virtual void grpc_slice_unref(grpc_slice slice) = 0; + virtual grpc_slice grpc_slice_ref(grpc_slice slice) = 0; virtual grpc_slice grpc_slice_split_tail(grpc_slice* s, size_t split) = 0; + virtual grpc_slice grpc_slice_split_head(grpc_slice* s, size_t split) = 0; virtual void grpc_slice_buffer_add(grpc_slice_buffer* sb, grpc_slice slice) = 0; virtual void grpc_slice_buffer_pop(grpc_slice_buffer* sb) = 0; diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 8c0c32bfc5..5cdb2a7a15 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -54,8 +54,7 @@ class GrpcBufferWriterPeer; const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; -class GrpcBufferWriter final - : public ::grpc::protobuf::io::ZeroCopyOutputStream { +class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) : block_size_(block_size), byte_count_(0), have_backup_(false) { @@ -103,6 +102,8 @@ class GrpcBufferWriter final grpc::protobuf::int64 ByteCount() const override { return byte_count_; } + grpc_slice_buffer* SliceBuffer() { return slice_buffer_; } + private: friend class GrpcBufferWriterPeer; const int block_size_; @@ -113,8 +114,7 @@ class GrpcBufferWriter final grpc_slice slice_; }; -class GrpcBufferReader final - : public ::grpc::protobuf::io::ZeroCopyInputStream { +class GrpcBufferReader : public ::grpc::protobuf::io::ZeroCopyInputStream { public: explicit GrpcBufferReader(grpc_byte_buffer* buffer) : byte_count_(0), backup_count_(0), status_() { @@ -175,64 +175,91 @@ class GrpcBufferReader final return byte_count_ - backup_count_; } - private: + protected: int64_t byte_count_; int64_t backup_count_; grpc_byte_buffer_reader reader_; grpc_slice slice_; Status status_; }; + +template <class BufferWriter, class T> +Status GenericSerialize(const grpc::protobuf::Message& msg, + grpc_byte_buffer** bp, bool* own_buffer) { + static_assert( + std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value, + "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); + *own_buffer = true; + int byte_size = msg.ByteSize(); + if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) { + grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); + GPR_CODEGEN_ASSERT( + GRPC_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); + g_core_codegen_interface->grpc_slice_unref(slice); + return g_core_codegen_interface->ok(); + } else { + BufferWriter writer(bp, internal::kGrpcBufferWriterMaxBufferLength); + return msg.SerializeToZeroCopyStream(&writer) + ? g_core_codegen_interface->ok() + : Status(StatusCode::INTERNAL, "Failed to serialize message"); + } +} + +template <class BufferReader, class T> +Status GenericDeserialize(grpc_byte_buffer* buffer, + grpc::protobuf::Message* msg) { + static_assert( + std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value, + "BufferReader must be a subclass of io::ZeroCopyInputStream"); + if (buffer == nullptr) { + return Status(StatusCode::INTERNAL, "No payload"); + } + Status result = g_core_codegen_interface->ok(); + { + BufferReader reader(buffer); + if (!reader.status().ok()) { + return reader.status(); + } + ::grpc::protobuf::io::CodedInputStream decoder(&reader); + decoder.SetTotalBytesLimit(INT_MAX, INT_MAX); + if (!msg->ParseFromCodedStream(&decoder)) { + result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); + } + if (!decoder.ConsumedEntireMessage()) { + result = Status(StatusCode::INTERNAL, "Did not read entire message"); + } + } + g_core_codegen_interface->grpc_byte_buffer_destroy(buffer); + return result; +} + } // namespace internal +// this is needed so the following class does not conflict with protobuf +// serializers that utilize internal-only tools. +#ifdef GRPC_OPEN_SOURCE_PROTO +// This class provides a protobuf serializer. It translates between protobuf +// objects and grpc_byte_buffers. More information about SerializationTraits can +// be found in include/grpc++/impl/codegen/serialization_traits.h. template <class T> class SerializationTraits<T, typename std::enable_if<std::is_base_of< grpc::protobuf::Message, T>::value>::type> { public: static Status Serialize(const grpc::protobuf::Message& msg, grpc_byte_buffer** bp, bool* own_buffer) { - *own_buffer = true; - int byte_size = msg.ByteSize(); - if (byte_size <= internal::kGrpcBufferWriterMaxBufferLength) { - grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); - GPR_CODEGEN_ASSERT( - GRPC_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); - g_core_codegen_interface->grpc_slice_unref(slice); - return g_core_codegen_interface->ok(); - } else { - internal::GrpcBufferWriter writer( - bp, internal::kGrpcBufferWriterMaxBufferLength); - return msg.SerializeToZeroCopyStream(&writer) - ? g_core_codegen_interface->ok() - : Status(StatusCode::INTERNAL, "Failed to serialize message"); - } + return internal::GenericSerialize<internal::GrpcBufferWriter, T>( + msg, bp, own_buffer); } static Status Deserialize(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg) { - if (buffer == nullptr) { - return Status(StatusCode::INTERNAL, "No payload"); - } - Status result = g_core_codegen_interface->ok(); - { - internal::GrpcBufferReader reader(buffer); - if (!reader.status().ok()) { - return reader.status(); - } - ::grpc::protobuf::io::CodedInputStream decoder(&reader); - decoder.SetTotalBytesLimit(INT_MAX, INT_MAX); - if (!msg->ParseFromCodedStream(&decoder)) { - result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); - } - if (!decoder.ConsumedEntireMessage()) { - result = Status(StatusCode::INTERNAL, "Did not read entire message"); - } - } - g_core_codegen_interface->grpc_byte_buffer_destroy(buffer); - return result; + return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer, + msg); } }; +#endif } // namespace grpc |