From 272eebbbcd74c03dbdf66f96121115bb7f9a2f32 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 8 Nov 2017 15:25:51 -0800 Subject: Only allocate what we need in the last slice for proto serialization --- include/grpc++/impl/codegen/proto_utils.h | 39 +++++++++++++++++-------------- 1 file changed, 21 insertions(+), 18 deletions(-) (limited to 'include/grpc++') diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 0f7e115c9a..b5ad3d8470 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024; class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { public: - explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size) - : block_size_(block_size), byte_count_(0), have_backup_(false) { + GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size) + : block_size_(block_size), + total_size_(total_size), + byte_count_(0), + have_backup_(false) { *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0); slice_buffer_ = &(*bp)->data.raw.slice_buffer; } @@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { } bool Next(void** data, int* size) override { + // Protobuf should not ask for more memory than total_size_. + GPR_CODEGEN_ASSERT(byte_count_ < total_size_); if (have_backup_) { slice_ = backup_slice_; have_backup_ = false; } else { - slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_); + // When less than a whole block is needed, only allocate that much. + // But make sure the allocated slice is not inlined. + size_t remain = total_size_ - byte_count_ > block_size_ + ? block_size_ + : total_size_ - byte_count_; + slice_ = g_core_codegen_interface->grpc_slice_malloc( + remain > GRPC_SLICE_INLINED_SIZE ? remain + : GRPC_SLICE_INLINED_SIZE + 1); } *data = GRPC_SLICE_START_PTR(slice_); // On win x64, int is only 32bit @@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { void BackUp(int count) override { g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_); - if (count == block_size_) { + if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) { backup_slice_ = slice_; } else { backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail( @@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream { protected: friend class GrpcBufferWriterPeer; const int block_size_; + const int total_size_; int64_t byte_count_; grpc_slice_buffer* slice_buffer_; bool have_backup_; @@ -175,20 +188,10 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); - if (byte_size <= kGrpcBufferWriterMaxBufferLength) { - grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); - GPR_CODEGEN_ASSERT( - GRPC_SLICE_END_PTR(slice) == - msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); - g_core_codegen_interface->grpc_slice_unref(slice); - return g_core_codegen_interface->ok(); - } else { - BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength); - return msg.SerializeToZeroCopyStream(&writer) - ? g_core_codegen_interface->ok() - : Status(StatusCode::INTERNAL, "Failed to serialize message"); - } + BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); + return msg.SerializeToZeroCopyStream(&writer) + ? g_core_codegen_interface->ok() + : Status(StatusCode::INTERNAL, "Failed to serialize message"); } // BufferReader must be a subclass of io::ZeroCopyInputStream. -- cgit v1.2.3 From c88185900d5deb54fe02c3cdde3fc9359e185287 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 8 Nov 2017 16:33:49 -0800 Subject: handle 0 byte size message... --- include/grpc++/impl/codegen/proto_utils.h | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'include/grpc++') diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index b5ad3d8470..799d236e0d 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -188,6 +188,11 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); + if (byte_size == 0) { + grpc_slice empty_slice = g_core_codegen_interface->grpc_empty_slice(); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&empty_slice, 1); + return g_core_codegen_interface->ok(); + } BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); return msg.SerializeToZeroCopyStream(&writer) ? g_core_codegen_interface->ok() -- cgit v1.2.3 From b90f0e66e503a8fc675c1cdf3a4301b28a27d62e Mon Sep 17 00:00:00 2001 From: yang-g Date: Thu, 9 Nov 2017 09:51:29 -0800 Subject: relax for inlined bytes to avoid allocation --- include/grpc++/impl/codegen/proto_utils.h | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'include/grpc++') diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h index 799d236e0d..b7636034d4 100644 --- a/include/grpc++/impl/codegen/proto_utils.h +++ b/include/grpc++/impl/codegen/proto_utils.h @@ -188,9 +188,14 @@ Status GenericSerialize(const grpc::protobuf::Message& msg, "BufferWriter must be a subclass of io::ZeroCopyOutputStream"); *own_buffer = true; int byte_size = msg.ByteSize(); - if (byte_size == 0) { - grpc_slice empty_slice = g_core_codegen_interface->grpc_empty_slice(); - *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&empty_slice, 1); + if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) { + grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size); + GPR_CODEGEN_ASSERT( + GRPC_SLICE_END_PTR(slice) == + msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice))); + *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1); + g_core_codegen_interface->grpc_slice_unref(slice); + return g_core_codegen_interface->ok(); } BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size); -- cgit v1.2.3 From 2cb57e5ef4a6e8a06d87e38f09a00c8245d6a6f9 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 14 Nov 2017 11:48:11 -0800 Subject: Avalanching operations on completion queue should be private, not API --- include/grpc++/impl/codegen/completion_queue.h | 32 ++++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) (limited to 'include/grpc++') diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 11cc588879..b8a7862578 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -65,6 +65,7 @@ class CompletionQueue; class Server; class ServerBuilder; class ServerContext; +class ServerInterface; namespace internal { class CompletionQueueTag; @@ -187,21 +188,6 @@ class CompletionQueue : private GrpcLibraryCodegen { /// owership is performed. grpc_completion_queue* cq() { return cq_; } - /// Manage state of avalanching operations : completion queue tags that - /// trigger other completion queue operations. The underlying core completion - /// queue should not really shutdown until all avalanching operations have - /// been finalized. Note that we maintain the requirement that an avalanche - /// registration must take place before CQ shutdown (which must be maintained - /// elsehwere) - void InitialAvalanching() { - gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); - } - void RegisterAvalanching() { - gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, - static_cast(1)); - } - void CompleteAvalanching(); - protected: /// Private constructor of CompletionQueue only visible to friend classes CompletionQueue(const grpc_completion_queue_attributes& attributes) { @@ -238,6 +224,7 @@ class CompletionQueue : private GrpcLibraryCodegen { friend class ::grpc::internal::UnknownMethodHandler; friend class ::grpc::Server; friend class ::grpc::ServerContext; + friend class ::grpc::ServerInterface; template friend class ::grpc::internal::BlockingUnaryCallImpl; @@ -309,6 +296,21 @@ class CompletionQueue : private GrpcLibraryCodegen { GPR_CODEGEN_ASSERT(!tag->FinalizeResult(&ignored, &ok)); } + /// Manage state of avalanching operations : completion queue tags that + /// trigger other completion queue operations. The underlying core completion + /// queue should not really shutdown until all avalanching operations have + /// been finalized. Note that we maintain the requirement that an avalanche + /// registration must take place before CQ shutdown (which must be maintained + /// elsehwere) + void InitialAvalanching() { + gpr_atm_rel_store(&avalanches_in_flight_, static_cast(1)); + } + void RegisterAvalanching() { + gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_, + static_cast(1)); + } + void CompleteAvalanching(); + grpc_completion_queue* cq_; // owned gpr_atm avalanches_in_flight_; -- cgit v1.2.3