diff options
Diffstat (limited to 'tensorflow/core/distributed_runtime/rpc/grpc_serialization_traits.h')
-rw-r--r-- | tensorflow/core/distributed_runtime/rpc/grpc_serialization_traits.h | 69 |
1 files changed, 35 insertions, 34 deletions
diff --git a/tensorflow/core/distributed_runtime/rpc/grpc_serialization_traits.h b/tensorflow/core/distributed_runtime/rpc/grpc_serialization_traits.h index 0616b65af0..b35d4843e8 100644 --- a/tensorflow/core/distributed_runtime/rpc/grpc_serialization_traits.h +++ b/tensorflow/core/distributed_runtime/rpc/grpc_serialization_traits.h @@ -17,6 +17,7 @@ limitations under the License. #define THIRD_PARTY_TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_RPC_GRPC_SERIALIZATION_TRAITS_H_ #include "grpc++/impl/codegen/proto_utils.h" +#include "grpc++/support/slice.h" namespace grpc { @@ -24,7 +25,7 @@ namespace tensorflow_helper { const int kGrpcBufferWriterMaxBufferLength = 8192; -class GrpcBufferWriter GRPC_FINAL +class GrpcBufferWriter final : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) @@ -33,35 +34,35 @@ class GrpcBufferWriter GRPC_FINAL slice_buffer_ = &(*bp)->data.raw.slice_buffer; } - ~GrpcBufferWriter() GRPC_OVERRIDE { + ~GrpcBufferWriter() override { if (have_backup_) { - g_core_codegen_interface->gpr_slice_unref(backup_slice_); + g_core_codegen_interface->grpc_slice_unref(backup_slice_); } } - bool Next(void** data, int* size) GRPC_OVERRIDE { + bool Next(void** data, int* size) override { if (have_backup_) { slice_ = backup_slice_; have_backup_ = false; } else { - slice_ = g_core_codegen_interface->gpr_slice_malloc(block_size_); + slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_); } - *data = GPR_SLICE_START_PTR(slice_); + *data = GRPC_SLICE_START_PTR(slice_); // On win x64, int is only 32bit - GPR_CODEGEN_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); - g_core_codegen_interface->gpr_slice_buffer_add(slice_buffer_, slice_); + GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); + byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); + g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); return true; } - void BackUp(int count) GRPC_OVERRIDE { - g_core_codegen_interface->gpr_slice_buffer_pop(slice_buffer_); + void BackUp(int count) override { + g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); if (count == block_size_) { backup_slice_ = slice_; } else { - backup_slice_ = g_core_codegen_interface->gpr_slice_split_tail( - &slice_, GPR_SLICE_LENGTH(slice_) - count); - g_core_codegen_interface->gpr_slice_buffer_add(slice_buffer_, slice_); + backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( + &slice_, GRPC_SLICE_LENGTH(slice_) - count); + g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_); } // It's dangerous to keep an inlined grpc_slice as the backup slice, since // on a following Next() call, a reference will be returned to this slice @@ -71,18 +72,18 @@ class GrpcBufferWriter GRPC_FINAL byte_count_ -= count; } - grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; } + grpc::protobuf::int64 ByteCount() const override { return byte_count_; } private: const int block_size_; int64_t byte_count_; - gpr_slice_buffer* slice_buffer_; + grpc_slice_buffer* slice_buffer_; bool have_backup_; - gpr_slice backup_slice_; - gpr_slice slice_; + grpc_slice backup_slice_; + grpc_slice slice_; }; -class GrpcBufferReader GRPC_FINAL +class GrpcBufferReader final : public ::grpc::protobuf::io::ZeroCopyInputStream { typedef void (CoreCodegenInterface::*OldReaderInitAPI)( grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer); @@ -104,13 +105,13 @@ class GrpcBufferReader GRPC_FINAL ReaderInit(&CoreCodegenInterface::grpc_byte_buffer_reader_init, &reader_, buffer); } - ~GrpcBufferReader() GRPC_OVERRIDE { + ~GrpcBufferReader() override { g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_); } - bool Next(const void** data, int* size) GRPC_OVERRIDE { + bool Next(const void** data, int* size) override { if (backup_count_ > 0) { - *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - + *data = GRPC_SLICE_START_PTR(slice_) + GRPC_SLICE_LENGTH(slice_) - backup_count_; GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX); *size = (int)backup_count_; @@ -121,17 +122,17 @@ class GrpcBufferReader GRPC_FINAL &slice_)) { return false; } - g_core_codegen_interface->gpr_slice_unref(slice_); - *data = GPR_SLICE_START_PTR(slice_); + g_core_codegen_interface->grpc_slice_unref(slice_); + *data = GRPC_SLICE_START_PTR(slice_); // On win x64, int is only 32bit - GPR_CODEGEN_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); - byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); + GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX); + byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_); return true; } - void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; } + void BackUp(int count) override { backup_count_ = count; } - bool Skip(int count) GRPC_OVERRIDE { + bool Skip(int count) override { const void* data; int size; while (Next(&data, &size)) { @@ -146,7 +147,7 @@ class GrpcBufferReader GRPC_FINAL return false; } - grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { + grpc::protobuf::int64 ByteCount() const override { return byte_count_ - backup_count_; } @@ -154,7 +155,7 @@ class GrpcBufferReader GRPC_FINAL int64_t byte_count_; int64_t backup_count_; grpc_byte_buffer_reader reader_; - gpr_slice slice_; + grpc_slice slice_; }; } // namespace tensorflow_helper @@ -175,12 +176,12 @@ class UnlimitedSizeProtoSerializationTraits { return Status(StatusCode::INTERNAL, "Message length was negative"); } else if (byte_size <= tensorflow_helper::kGrpcBufferWriterMaxBufferLength) { - gpr_slice slice = g_core_codegen_interface->gpr_slice_malloc(byte_size); + grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); GPR_CODEGEN_ASSERT( - GPR_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); + GRPC_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); - g_core_codegen_interface->gpr_slice_unref(slice); + g_core_codegen_interface->grpc_slice_unref(slice); return g_core_codegen_interface->ok(); } else { tensorflow_helper::GrpcBufferWriter writer( |