aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2017-11-08 15:25:51 -0800
committerGravatar yang-g <yangg@google.com>2017-11-09 09:54:54 -0800
commit272eebbbcd74c03dbdf66f96121115bb7f9a2f32 (patch)
treed273f806300d3808603cf0f6759d04b434f15144
parent2da79b8febc7e63994ee19ba4b0f43312050ea78 (diff)
Only allocate what we need in the last slice for proto serialization
-rw-r--r--include/grpc++/impl/codegen/proto_utils.h39
-rw-r--r--test/cpp/codegen/proto_utils_test.cc110
2 files changed, 119 insertions, 30 deletions
diff --git a/include/grpc++/impl/codegen/proto_utils.h b/include/grpc++/impl/codegen/proto_utils.h
index 0f7e115c9a..b5ad3d8470 100644
--- a/include/grpc++/impl/codegen/proto_utils.h
+++ b/include/grpc++/impl/codegen/proto_utils.h
@@ -41,8 +41,11 @@ const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
- explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
- : block_size_(block_size), byte_count_(0), have_backup_(false) {
+ GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size)
+ : block_size_(block_size),
+ total_size_(total_size),
+ byte_count_(0),
+ have_backup_(false) {
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
@@ -54,11 +57,20 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
}
bool Next(void** data, int* size) override {
+ // Protobuf should not ask for more memory than total_size_.
+ GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
} else {
- slice_ = g_core_codegen_interface->grpc_slice_malloc(block_size_);
+ // When less than a whole block is needed, only allocate that much.
+ // But make sure the allocated slice is not inlined.
+ size_t remain = total_size_ - byte_count_ > block_size_
+ ? block_size_
+ : total_size_ - byte_count_;
+ slice_ = g_core_codegen_interface->grpc_slice_malloc(
+ remain > GRPC_SLICE_INLINED_SIZE ? remain
+ : GRPC_SLICE_INLINED_SIZE + 1);
}
*data = GRPC_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
@@ -70,7 +82,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
void BackUp(int count) override {
g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
- if (count == block_size_) {
+ if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
backup_slice_ = slice_;
} else {
backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
@@ -90,6 +102,7 @@ class GrpcBufferWriter : public ::grpc::protobuf::io::ZeroCopyOutputStream {
protected:
friend class GrpcBufferWriterPeer;
const int block_size_;
+ const int total_size_;
int64_t byte_count_;
grpc_slice_buffer* slice_buffer_;
bool have_backup_;
@@ -175,20 +188,10 @@ Status GenericSerialize(const grpc::protobuf::Message& msg,
"BufferWriter must be a subclass of io::ZeroCopyOutputStream");
*own_buffer = true;
int byte_size = msg.ByteSize();
- if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
- grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
- GPR_CODEGEN_ASSERT(
- GRPC_SLICE_END_PTR(slice) ==
- msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
- *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
- g_core_codegen_interface->grpc_slice_unref(slice);
- return g_core_codegen_interface->ok();
- } else {
- BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
- return msg.SerializeToZeroCopyStream(&writer)
- ? g_core_codegen_interface->ok()
- : Status(StatusCode::INTERNAL, "Failed to serialize message");
- }
+ BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);
+ return msg.SerializeToZeroCopyStream(&writer)
+ ? g_core_codegen_interface->ok()
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
}
// BufferReader must be a subclass of io::ZeroCopyInputStream.
diff --git a/test/cpp/codegen/proto_utils_test.cc b/test/cpp/codegen/proto_utils_test.cc
index fd05c90e9d..ba89b299ef 100644
--- a/test/cpp/codegen/proto_utils_test.cc
+++ b/test/cpp/codegen/proto_utils_test.cc
@@ -16,15 +16,16 @@
*
*/
+#include <grpc++/impl/codegen/grpc_library.h>
#include <grpc++/impl/codegen/proto_utils.h>
#include <grpc++/impl/grpc_library.h>
+#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/slice.h>
#include <gtest/gtest.h>
namespace grpc {
namespace internal {
-static GrpcLibraryInitializer g_gli_initializer;
-
// Provide access to GrpcBufferWriter internals.
class GrpcBufferWriterPeer {
public:
@@ -44,35 +45,120 @@ class ProtoUtilsTest : public ::testing::Test {};
// GrpcBufferWriter Next()/Backup() invocations could result in a dangling
// pointer returned by Next() due to the interaction between grpc_slice inlining
// and GRPC_SLICE_START_PTR.
-TEST_F(ProtoUtilsTest, BackupNext) {
- // Ensure the GrpcBufferWriter internals are initialized.
- g_gli_initializer.summon();
-
+TEST_F(ProtoUtilsTest, TinyBackupThenNext) {
grpc_byte_buffer* bp;
- GrpcBufferWriter writer(&bp, 8192);
+ const int block_size = 1024;
+ GrpcBufferWriter writer(&bp, block_size, 8192);
GrpcBufferWriterPeer peer(&writer);
void* data;
int size;
// Allocate a slice.
ASSERT_TRUE(writer.Next(&data, &size));
- EXPECT_EQ(8192, size);
- // Return a single byte. Before the fix that this test acts as a regression
- // for, this would have resulted in an inlined backup slice.
+ EXPECT_EQ(block_size, size);
+ // Return a single byte.
writer.BackUp(1);
- EXPECT_TRUE(!peer.have_backup());
- // On the next allocation, the slice is non-inlined.
+ EXPECT_FALSE(peer.have_backup());
+ // On the next allocation, the returned slice is non-inlined.
ASSERT_TRUE(writer.Next(&data, &size));
EXPECT_TRUE(peer.slice().refcount != NULL);
+ EXPECT_EQ(block_size, size);
// Cleanup.
g_core_codegen_interface->grpc_byte_buffer_destroy(bp);
}
+namespace {
+
+// Set backup_size to 0 to indicate no backup is needed.
+void BufferWriterTest(int block_size, int total_size, int backup_size) {
+ grpc_byte_buffer* bp;
+ GrpcBufferWriter writer(&bp, block_size, total_size);
+
+ int written_size = 0;
+ void* data;
+ int size = 0;
+ bool backed_up_entire_slice = false;
+
+ while (written_size < total_size) {
+ EXPECT_TRUE(writer.Next(&data, &size));
+ EXPECT_GT(size, 0);
+ EXPECT_TRUE(data);
+ int write_size = size;
+ bool should_backup = false;
+ if (backup_size > 0 && size > backup_size) {
+ write_size = size - backup_size;
+ should_backup = true;
+ } else if (size == backup_size && !backed_up_entire_slice) {
+ // only backup entire slice once.
+ backed_up_entire_slice = true;
+ should_backup = true;
+ write_size = 0;
+ }
+ // May need a last backup.
+ if (write_size + written_size > total_size) {
+ write_size = total_size - written_size;
+ should_backup = true;
+ backup_size = size - write_size;
+ ASSERT_GT(backup_size, 0);
+ }
+ for (int i = 0; i < write_size; i++) {
+ ((uint8_t*)data)[i] = written_size % 128;
+ written_size++;
+ }
+ if (should_backup) {
+ writer.BackUp(backup_size);
+ }
+ }
+ EXPECT_EQ(grpc_byte_buffer_length(bp), (size_t)total_size);
+
+ grpc_byte_buffer_reader reader;
+ grpc_byte_buffer_reader_init(&reader, bp);
+ int read_bytes = 0;
+ while (read_bytes < total_size) {
+ grpc_slice s;
+ EXPECT_TRUE(grpc_byte_buffer_reader_next(&reader, &s));
+ for (size_t i = 0; i < GRPC_SLICE_LENGTH(s); i++) {
+ EXPECT_EQ(GRPC_SLICE_START_PTR(s)[i], read_bytes % 128);
+ read_bytes++;
+ }
+ grpc_slice_unref(s);
+ }
+ EXPECT_EQ(read_bytes, total_size);
+ grpc_byte_buffer_reader_destroy(&reader);
+ grpc_byte_buffer_destroy(bp);
+}
+
+TEST(WriterTest, TinyBlockTinyBackup) {
+ for (int i = 2; i < (int)GRPC_SLICE_INLINED_SIZE; i++) {
+ BufferWriterTest(i, 256, 1);
+ }
+}
+
+TEST(WriterTest, SmallBlockTinyBackup) { BufferWriterTest(64, 256, 1); }
+
+TEST(WriterTest, SmallBlockNoBackup) { BufferWriterTest(64, 256, 0); }
+
+TEST(WriterTest, SmallBlockFullBackup) { BufferWriterTest(64, 256, 64); }
+
+TEST(WriterTest, LargeBlockTinyBackup) { BufferWriterTest(4096, 8192, 1); }
+
+TEST(WriterTest, LargeBlockNoBackup) { BufferWriterTest(4096, 8192, 0); }
+
+TEST(WriterTest, LargeBlockFullBackup) { BufferWriterTest(4096, 8192, 4096); }
+
+TEST(WriterTest, LargeBlockLargeBackup) { BufferWriterTest(4096, 8192, 4095); }
+
+} // namespace
} // namespace internal
} // namespace grpc
int main(int argc, char** argv) {
+ // Ensure the GrpcBufferWriter internals are initialized.
+ grpc::internal::GrpcLibraryInitializer init;
+ init.summon();
+ grpc::GrpcLibraryCodegen lib;
+
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}