aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2015-03-10 22:56:02 -0700
committerGravatar Yang Gao <yangg@google.com>2015-03-10 22:56:02 -0700
commit42734b58eebcbb8881b2a92fc006af6ffdd56710 (patch)
tree8df0f47aed23d6ea1baef22207360bc258622bbb /src/cpp
parentc65569fba8b9f509ff7cd79e057f95ee98164e1a (diff)
parentbf5ec2fd3d6c72bb4f887fc25c41de2fc5ef6f87 (diff)
Merge pull request #978 from Chilledheart/master
Avoid unnecessary copies during protobuf serialization and deserialization
Diffstat (limited to 'src/cpp')
-rw-r--r--src/cpp/proto/proto_utils.cc149
1 files changed, 123 insertions, 26 deletions
diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc
index 72f1bf7441..e4e51bfebf 100644
--- a/src/cpp/proto/proto_utils.cc
+++ b/src/cpp/proto/proto_utils.cc
@@ -35,38 +35,135 @@
#include <grpc++/config.h>
#include <grpc/grpc.h>
+#include <grpc/byte_buffer.h>
#include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/port_platform.h>
+#include <google/protobuf/io/zero_copy_stream.h>
-namespace grpc {
+const int kMaxBufferLength = 8192;
-bool SerializeProto(const grpc::protobuf::Message &msg,
- grpc_byte_buffer **bp) {
- grpc::string msg_str;
- bool success = msg.SerializeToString(&msg_str);
- if (success) {
- gpr_slice slice =
- gpr_slice_from_copied_buffer(msg_str.data(), msg_str.length());
- *bp = grpc_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
+class GrpcBufferWriter GRPC_FINAL
+ : public ::google::protobuf::io::ZeroCopyOutputStream {
+ public:
+ explicit GrpcBufferWriter(grpc_byte_buffer **bp,
+ int block_size = kMaxBufferLength)
+ : block_size_(block_size), byte_count_(0), have_backup_(false) {
+ *bp = grpc_byte_buffer_create(NULL, 0);
+ slice_buffer_ = &(*bp)->data.slice_buffer;
+ }
+
+ ~GrpcBufferWriter() GRPC_OVERRIDE {
+ if (have_backup_) {
+ gpr_slice_unref(backup_slice_);
+ }
+ }
+
+ bool Next(void **data, int *size) GRPC_OVERRIDE {
+ if (have_backup_) {
+ slice_ = backup_slice_;
+ have_backup_ = false;
+ } else {
+ slice_ = gpr_slice_malloc(block_size_);
+ }
+ *data = GPR_SLICE_START_PTR(slice_);
+ byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ gpr_slice_buffer_add(slice_buffer_, slice_);
+ return true;
+ }
+
+ void BackUp(int count) GRPC_OVERRIDE {
+ gpr_slice_buffer_pop(slice_buffer_);
+ if (count == block_size_) {
+ backup_slice_ = slice_;
+ } else {
+ backup_slice_ =
+ gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
+ gpr_slice_buffer_add(slice_buffer_, slice_);
+ }
+ have_backup_ = true;
+ byte_count_ -= count;
+ }
+
+ gpr_int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
+
+ private:
+ const int block_size_;
+ gpr_int64 byte_count_;
+ gpr_slice_buffer *slice_buffer_;
+ bool have_backup_;
+ gpr_slice backup_slice_;
+ gpr_slice slice_;
+};
+
+class GrpcBufferReader GRPC_FINAL
+ : public ::google::protobuf::io::ZeroCopyInputStream {
+ public:
+ explicit GrpcBufferReader(grpc_byte_buffer *buffer)
+ : byte_count_(0), backup_count_(0) {
+ reader_ = grpc_byte_buffer_reader_create(buffer);
+ }
+ ~GrpcBufferReader() GRPC_OVERRIDE {
+ grpc_byte_buffer_reader_destroy(reader_);
}
- return success;
-}
-bool DeserializeProto(grpc_byte_buffer *buffer,
- grpc::protobuf::Message *msg) {
- grpc::string msg_string;
- grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
- gpr_slice slice;
- while (grpc_byte_buffer_reader_next(reader, &slice)) {
- const char *data = reinterpret_cast<const char *>(
- slice.refcount ? slice.data.refcounted.bytes
- : slice.data.inlined.bytes);
- msg_string.append(data, slice.refcount ? slice.data.refcounted.length
- : slice.data.inlined.length);
- gpr_slice_unref(slice);
+ bool Next(const void **data, int *size) GRPC_OVERRIDE {
+ if (backup_count_ > 0) {
+ *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
+ backup_count_;
+ *size = backup_count_;
+ backup_count_ = 0;
+ return true;
+ }
+ if (!grpc_byte_buffer_reader_next(reader_, &slice_)) {
+ return false;
+ }
+ gpr_slice_unref(slice_);
+ *data = GPR_SLICE_START_PTR(slice_);
+ byte_count_ += *size = GPR_SLICE_LENGTH(slice_);
+ return true;
}
- grpc_byte_buffer_reader_destroy(reader);
- return msg->ParseFromString(msg_string);
+
+ void BackUp(int count) GRPC_OVERRIDE {
+ backup_count_ = count;
+ }
+
+ bool Skip(int count) GRPC_OVERRIDE {
+ const void *data;
+ int size;
+ while (Next(&data, &size)) {
+ if (size >= count) {
+ BackUp(size - count);
+ return true;
+ }
+ // size < count;
+ count -= size;
+ }
+ // error or we have too large count;
+ return false;
+ }
+
+ gpr_int64 ByteCount() const GRPC_OVERRIDE {
+ return byte_count_ - backup_count_;
+ }
+
+ private:
+ gpr_int64 byte_count_;
+ gpr_int64 backup_count_;
+ grpc_byte_buffer_reader *reader_;
+ gpr_slice slice_;
+};
+
+namespace grpc {
+
+bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) {
+ GrpcBufferWriter writer(bp);
+ return msg.SerializeToZeroCopyStream(&writer);
+}
+
+bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) {
+ GrpcBufferReader reader(buffer);
+ return msg->ParseFromZeroCopyStream(&reader);
}
} // namespace grpc