diff options
author | Chilledheart <rwindz0@gmail.com> | 2015-03-10 01:20:18 +0800 |
---|---|---|
committer | Chilledheart <rwindz0@gmail.com> | 2015-03-10 01:21:34 +0800 |
commit | bf5ec2fd3d6c72bb4f887fc25c41de2fc5ef6f87 (patch) | |
tree | b901488896a4b852ddce96055a95cac1ee6a5ce1 /src | |
parent | ca767c0d9e314489fd10d7bc3ce406f8e223e70e (diff) |
Avoid unnecessary copies during protobuf serialization and deserialization
- avoid string copy due to google::protobuf::message::SerializeToString
- avoid string copy due to google::protobuf::message::ParseFromString
- split large message into 8k slices during protobuf serialization
- correct GrpcBufferReader::BackUp
Diffstat (limited to 'src')
-rw-r--r-- | src/cpp/proto/proto_utils.cc | 149 |
1 files changed, 123 insertions, 26 deletions
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 72f1bf7441..e4e51bfebf 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -35,38 +35,135 @@ #include <grpc++/config.h> #include <grpc/grpc.h> +#include <grpc/byte_buffer.h> #include <grpc/support/slice.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/port_platform.h> +#include <google/protobuf/io/zero_copy_stream.h> -namespace grpc { +const int kMaxBufferLength = 8192; -bool SerializeProto(const grpc::protobuf::Message &msg, - grpc_byte_buffer **bp) { - grpc::string msg_str; - bool success = msg.SerializeToString(&msg_str); - if (success) { - gpr_slice slice = - gpr_slice_from_copied_buffer(msg_str.data(), msg_str.length()); - *bp = grpc_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); +class GrpcBufferWriter GRPC_FINAL + : public ::google::protobuf::io::ZeroCopyOutputStream { + public: + explicit GrpcBufferWriter(grpc_byte_buffer **bp, + int block_size = kMaxBufferLength) + : block_size_(block_size), byte_count_(0), have_backup_(false) { + *bp = grpc_byte_buffer_create(NULL, 0); + slice_buffer_ = &(*bp)->data.slice_buffer; + } + + ~GrpcBufferWriter() GRPC_OVERRIDE { + if (have_backup_) { + gpr_slice_unref(backup_slice_); + } + } + + bool Next(void **data, int *size) GRPC_OVERRIDE { + if (have_backup_) { + slice_ = backup_slice_; + have_backup_ = false; + } else { + slice_ = gpr_slice_malloc(block_size_); + } + *data = GPR_SLICE_START_PTR(slice_); + byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + gpr_slice_buffer_add(slice_buffer_, slice_); + return true; + } + + void BackUp(int count) GRPC_OVERRIDE { + gpr_slice_buffer_pop(slice_buffer_); + if (count == block_size_) { + backup_slice_ = slice_; + } else { + backup_slice_ = + gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count); + gpr_slice_buffer_add(slice_buffer_, slice_); + } + have_backup_ = true; + byte_count_ -= count; + } + + gpr_int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; } + + private: + const int block_size_; + gpr_int64 byte_count_; + gpr_slice_buffer *slice_buffer_; + bool have_backup_; + gpr_slice backup_slice_; + gpr_slice slice_; +}; + +class GrpcBufferReader GRPC_FINAL + : public ::google::protobuf::io::ZeroCopyInputStream { + public: + explicit GrpcBufferReader(grpc_byte_buffer *buffer) + : byte_count_(0), backup_count_(0) { + reader_ = grpc_byte_buffer_reader_create(buffer); + } + ~GrpcBufferReader() GRPC_OVERRIDE { + grpc_byte_buffer_reader_destroy(reader_); } - return success; -} -bool DeserializeProto(grpc_byte_buffer *buffer, - grpc::protobuf::Message *msg) { - grpc::string msg_string; - grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer); - gpr_slice slice; - while (grpc_byte_buffer_reader_next(reader, &slice)) { - const char *data = reinterpret_cast<const char *>( - slice.refcount ? slice.data.refcounted.bytes - : slice.data.inlined.bytes); - msg_string.append(data, slice.refcount ? slice.data.refcounted.length - : slice.data.inlined.length); - gpr_slice_unref(slice); + bool Next(const void **data, int *size) GRPC_OVERRIDE { + if (backup_count_ > 0) { + *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - + backup_count_; + *size = backup_count_; + backup_count_ = 0; + return true; + } + if (!grpc_byte_buffer_reader_next(reader_, &slice_)) { + return false; + } + gpr_slice_unref(slice_); + *data = GPR_SLICE_START_PTR(slice_); + byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + return true; } - grpc_byte_buffer_reader_destroy(reader); - return msg->ParseFromString(msg_string); + + void BackUp(int count) GRPC_OVERRIDE { + backup_count_ = count; + } + + bool Skip(int count) GRPC_OVERRIDE { + const void *data; + int size; + while (Next(&data, &size)) { + if (size >= count) { + BackUp(size - count); + return true; + } + // size < count; + count -= size; + } + // error or we have too large count; + return false; + } + + gpr_int64 ByteCount() const GRPC_OVERRIDE { + return byte_count_ - backup_count_; + } + + private: + gpr_int64 byte_count_; + gpr_int64 backup_count_; + grpc_byte_buffer_reader *reader_; + gpr_slice slice_; +}; + +namespace grpc { + +bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) { + GrpcBufferWriter writer(bp); + return msg.SerializeToZeroCopyStream(&writer); +} + +bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) { + GrpcBufferReader reader(buffer); + return msg->ParseFromZeroCopyStream(&reader); } } // namespace grpc |