diff options
author | Yuchen Zeng <zyc@google.com> | 2016-05-06 15:50:15 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-05-06 15:50:15 -0700 |
commit | 26a1785cfc5a2118b5fbb4c2e7e89edd6a241a61 (patch) | |
tree | 851f2b92ccd86254bbb0d973462932a1d555fab6 /src/cpp | |
parent | 7d099a5c907d9ab164416ea875ae07a3074adedb (diff) | |
parent | c916c1ce17fe273dff881e1c87e17ad991c2078a (diff) |
Resolve conflicts
Diffstat (limited to 'src/cpp')
-rw-r--r-- | src/cpp/common/core_codegen.cc | 204 | ||||
-rw-r--r-- | src/cpp/common/core_codegen.h | 26 |
2 files changed, 61 insertions, 169 deletions
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 33a8f755e6..8e8d42eb29 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -48,124 +48,6 @@ #include "src/core/lib/profiling/timers.h" -namespace { - -const int kGrpcBufferWriterMaxBufferLength = 8192; - -class GrpcBufferWriter GRPC_FINAL - : public ::grpc::protobuf::io::ZeroCopyOutputStream { - public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) - : block_size_(block_size), byte_count_(0), have_backup_(false) { - *bp = grpc_raw_byte_buffer_create(NULL, 0); - slice_buffer_ = &(*bp)->data.raw.slice_buffer; - } - - ~GrpcBufferWriter() GRPC_OVERRIDE { - if (have_backup_) { - gpr_slice_unref(backup_slice_); - } - } - - bool Next(void** data, int* size) GRPC_OVERRIDE { - if (have_backup_) { - slice_ = backup_slice_; - have_backup_ = false; - } else { - slice_ = gpr_slice_malloc(block_size_); - } - *data = GPR_SLICE_START_PTR(slice_); - // On win x64, int is only 32bit - GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); - gpr_slice_buffer_add(slice_buffer_, slice_); - return true; - } - - void BackUp(int count) GRPC_OVERRIDE { - gpr_slice_buffer_pop(slice_buffer_); - if (count == block_size_) { - backup_slice_ = slice_; - } else { - backup_slice_ = - gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count); - gpr_slice_buffer_add(slice_buffer_, slice_); - } - have_backup_ = true; - byte_count_ -= count; - } - - grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; } - - private: - const int block_size_; - int64_t byte_count_; - gpr_slice_buffer* slice_buffer_; - bool have_backup_; - gpr_slice backup_slice_; - gpr_slice slice_; -}; - -class GrpcBufferReader GRPC_FINAL - : public ::grpc::protobuf::io::ZeroCopyInputStream { - public: - explicit GrpcBufferReader(grpc_byte_buffer* buffer) - : byte_count_(0), backup_count_(0) { - grpc_byte_buffer_reader_init(&reader_, buffer); - } - ~GrpcBufferReader() GRPC_OVERRIDE { - grpc_byte_buffer_reader_destroy(&reader_); - } - - bool Next(const void** data, int* size) GRPC_OVERRIDE { - if (backup_count_ > 0) { - *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - - backup_count_; - GPR_ASSERT(backup_count_ <= INT_MAX); - *size = (int)backup_count_; - backup_count_ = 0; - return true; - } - if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) { - return false; - } - gpr_slice_unref(slice_); - *data = GPR_SLICE_START_PTR(slice_); - // On win x64, int is only 32bit - GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); - return true; - } - - void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; } - - bool Skip(int count) GRPC_OVERRIDE { - const void* data; - int size; - while (Next(&data, &size)) { - if (size >= count) { - BackUp(size - count); - return true; - } - // size < count; - count -= size; - } - // error or we have too large count; - return false; - } - - grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { - return byte_count_ - backup_count_; - } - - private: - int64_t byte_count_; - int64_t backup_count_; - grpc_byte_buffer_reader reader_; - gpr_slice slice_; -}; -} // namespace - namespace grpc { grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( @@ -192,6 +74,44 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } +void CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, + grpc_byte_buffer* buffer) { + ::grpc_byte_buffer_reader_init(reader, buffer); +} + +void CoreCodegen::grpc_byte_buffer_reader_destroy( + grpc_byte_buffer_reader* reader) { + ::grpc_byte_buffer_reader_destroy(reader); +} + +int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, + gpr_slice* slice) { + return ::grpc_byte_buffer_reader_next(reader, slice); +} + +grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(gpr_slice* slice, + size_t nslices) { + return ::grpc_raw_byte_buffer_create(slice, nslices); +} + +gpr_slice CoreCodegen::gpr_slice_malloc(size_t length) { + return ::gpr_slice_malloc(length); +} + +void CoreCodegen::gpr_slice_unref(gpr_slice slice) { ::gpr_slice_unref(slice); } + +gpr_slice CoreCodegen::gpr_slice_split_tail(gpr_slice* s, size_t split) { + return ::gpr_slice_split_tail(s, split); +} + +void CoreCodegen::gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) { + ::gpr_slice_buffer_add(sb, slice); +} + +void CoreCodegen::gpr_slice_buffer_pop(gpr_slice_buffer* sb) { + ::gpr_slice_buffer_pop(sb); +} + void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) { ::grpc_metadata_array_init(array); } @@ -200,6 +120,10 @@ void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) { ::grpc_metadata_array_destroy(array); } +const Status& CoreCodegen::ok() { return grpc::Status::OK; } + +const Status& CoreCodegen::cancelled() { return grpc::Status::CANCELLED; } + gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) { return ::gpr_inf_future(type); } @@ -209,48 +133,4 @@ void CoreCodegen::assert_fail(const char* failed_assertion) { abort(); } -Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) { - GPR_TIMER_SCOPE("SerializeProto", 0); - int byte_size = msg.ByteSize(); - if (byte_size <= kGrpcBufferWriterMaxBufferLength) { - gpr_slice slice = gpr_slice_malloc(byte_size); - GPR_ASSERT(GPR_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); - *bp = grpc_raw_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); - return Status::OK; - } else { - GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); - return msg.SerializeToZeroCopyStream(&writer) - ? Status::OK - : Status(StatusCode::INTERNAL, "Failed to serialize message"); - } -} - -Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg, - int max_message_size) { - GPR_TIMER_SCOPE("DeserializeProto", 0); - if (buffer == nullptr) { - return Status(StatusCode::INTERNAL, "No payload"); - } - Status result = Status::OK; - { - GrpcBufferReader reader(buffer); - ::grpc::protobuf::io::CodedInputStream decoder(&reader); - if (max_message_size > 0) { - decoder.SetTotalBytesLimit(max_message_size, max_message_size); - } - if (!msg->ParseFromCodedStream(&decoder)) { - result = Status(StatusCode::INTERNAL, msg->InitializationErrorString()); - } - if (!decoder.ConsumedEntireMessage()) { - result = Status(StatusCode::INTERNAL, "Did not read entire message"); - } - } - grpc_byte_buffer_destroy(buffer); - return result; -} - } // namespace grpc diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h index e15cb4c34a..656b11e7e7 100644 --- a/src/cpp/common/core_codegen.h +++ b/src/cpp/common/core_codegen.h @@ -42,13 +42,6 @@ namespace grpc { /// Implementation of the core codegen interface. class CoreCodegen : public CoreCodegenInterface { private: - Status SerializeProto(const grpc::protobuf::Message& msg, - grpc_byte_buffer** bp) override; - - Status DeserializeProto(grpc_byte_buffer* buffer, - grpc::protobuf::Message* msg, - int max_message_size) override; - grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, @@ -60,11 +53,30 @@ class CoreCodegen : public CoreCodegenInterface { void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override; + void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, + grpc_byte_buffer* buffer) override; + void grpc_byte_buffer_reader_destroy( + grpc_byte_buffer_reader* reader) override; + int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader, + gpr_slice* slice) override; + + grpc_byte_buffer* grpc_raw_byte_buffer_create(gpr_slice* slice, + size_t nslices) override; + + gpr_slice gpr_slice_malloc(size_t length) override; + void gpr_slice_unref(gpr_slice slice) override; + gpr_slice gpr_slice_split_tail(gpr_slice* s, size_t split) override; + void gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) override; + void gpr_slice_buffer_pop(gpr_slice_buffer* sb) override; + void grpc_metadata_array_init(grpc_metadata_array* array) override; void grpc_metadata_array_destroy(grpc_metadata_array* array) override; gpr_timespec gpr_inf_future(gpr_clock_type type) override; + virtual const Status& ok() override; + virtual const Status& cancelled() override; + void assert_fail(const char* failed_assertion) override; }; |