aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2018-03-12 11:05:36 -0700
committerGravatar GitHub <noreply@github.com>2018-03-12 11:05:36 -0700
commit41c396ec8880e696b7709d1b636a48fbd882b581 (patch)
tree331a120633c00c625842d3ecb0b9239f8b3f9545 /test
parentec154f6f2de6461e712b7679ebe96035469d46d0 (diff)
parent3d8b32d8b3d7e85d0588064994efa6763d6fec02 (diff)
Merge pull request #14607 from markdroth/c++_byte_stream
Convert byte_stream API to C++.
Diffstat (limited to 'test')
-rw-r--r--test/core/transport/BUILD3
-rw-r--r--test/core/transport/byte_stream_test.cc194
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc39
3 files changed, 114 insertions, 122 deletions
diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD
index 2c2d05b9ae..84fb3a1421 100644
--- a/test/core/transport/BUILD
+++ b/test/core/transport/BUILD
@@ -43,6 +43,9 @@ grpc_cc_test(
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
+ external_deps = [
+ "gtest",
+ ],
)
grpc_cc_test(
diff --git a/test/core/transport/byte_stream_test.cc b/test/core/transport/byte_stream_test.cc
index 6947d50976..80692ec2da 100644
--- a/test/core/transport/byte_stream_test.cc
+++ b/test/core/transport/byte_stream_test.cc
@@ -27,16 +27,18 @@
#include "test/core/util/test_config.h"
+#include <gtest/gtest.h>
+
+namespace grpc_core {
+namespace {
+
//
-// grpc_slice_buffer_stream tests
+// SliceBufferByteStream tests
//
-static void not_called_closure(void* arg, grpc_error* error) {
- GPR_ASSERT(false);
-}
+void NotCalledClosure(void* arg, grpc_error* error) { GPR_ASSERT(false); }
-static void test_slice_buffer_stream_basic(void) {
- gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic");
+TEST(SliceBufferByteStream, Basic) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer.
grpc_slice_buffer buffer;
@@ -49,28 +51,26 @@ static void test_slice_buffer_stream_basic(void) {
grpc_slice_buffer_add(&buffer, input[i]);
}
// Create byte stream.
- grpc_slice_buffer_stream stream;
- grpc_slice_buffer_stream_init(&stream, &buffer, 0);
- GPR_ASSERT(stream.base.length == 6);
+ SliceBufferByteStream stream(&buffer, 0);
+ grpc_slice_buffer_destroy_internal(&buffer);
+ EXPECT_EQ(6U, stream.length());
grpc_closure closure;
- GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
+ GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
- // Read each slice. Note that next() always returns synchronously.
+ // Read each slice. Note that Next() always returns synchronously.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
- GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
+ ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
- grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_error* error = stream.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Clean up.
- grpc_byte_stream_destroy(&stream.base);
- grpc_slice_buffer_destroy_internal(&buffer);
+ stream.Orphan();
}
-static void test_slice_buffer_stream_shutdown(void) {
- gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown");
+TEST(SliceBufferByteStream, Shutdown) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer.
grpc_slice_buffer buffer;
@@ -83,40 +83,38 @@ static void test_slice_buffer_stream_shutdown(void) {
grpc_slice_buffer_add(&buffer, input[i]);
}
// Create byte stream.
- grpc_slice_buffer_stream stream;
- grpc_slice_buffer_stream_init(&stream, &buffer, 0);
- GPR_ASSERT(stream.base.length == 6);
+ SliceBufferByteStream stream(&buffer, 0);
+ grpc_slice_buffer_destroy_internal(&buffer);
+ EXPECT_EQ(6U, stream.length());
grpc_closure closure;
- GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
+ GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read the first slice.
- GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
+ ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
- grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_error* error = stream.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(output);
// Now shutdown.
grpc_error* shutdown_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
- grpc_byte_stream_shutdown(&stream.base, GRPC_ERROR_REF(shutdown_error));
+ stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
// After shutdown, the next pull() should return the error.
- GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
- error = grpc_byte_stream_pull(&stream.base, &output);
- GPR_ASSERT(error == shutdown_error);
+ ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
+ error = stream.Pull(&output);
+ EXPECT_TRUE(error == shutdown_error);
GRPC_ERROR_UNREF(error);
GRPC_ERROR_UNREF(shutdown_error);
// Clean up.
- grpc_byte_stream_destroy(&stream.base);
- grpc_slice_buffer_destroy_internal(&buffer);
+ stream.Orphan();
}
//
-// grpc_caching_byte_stream tests
+// CachingByteStream tests
//
-static void test_caching_byte_stream_basic(void) {
- gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic");
+TEST(CachingByteStream, Basic) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
@@ -128,34 +126,30 @@ static void test_caching_byte_stream_basic(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
- grpc_slice_buffer_stream underlying_stream;
- grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ SliceBufferByteStream underlying_stream(&buffer, 0);
+ grpc_slice_buffer_destroy_internal(&buffer);
// Create cache and caching stream.
- grpc_byte_stream_cache cache;
- grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
- grpc_caching_byte_stream stream;
- grpc_caching_byte_stream_init(&stream, &cache);
+ ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
+ ByteStreamCache::CachingByteStream stream(&cache);
grpc_closure closure;
- GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
+ GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read each slice. Note that next() always returns synchronously,
// because the underlying byte stream always does.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
- GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
+ ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
- grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_error* error = stream.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Clean up.
- grpc_byte_stream_destroy(&stream.base);
- grpc_byte_stream_cache_destroy(&cache);
- grpc_slice_buffer_destroy_internal(&buffer);
+ stream.Orphan();
+ cache.Destroy();
}
-static void test_caching_byte_stream_reset(void) {
- gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset");
+TEST(CachingByteStream, Reset) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
@@ -167,41 +161,37 @@ static void test_caching_byte_stream_reset(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
- grpc_slice_buffer_stream underlying_stream;
- grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ SliceBufferByteStream underlying_stream(&buffer, 0);
+ grpc_slice_buffer_destroy_internal(&buffer);
// Create cache and caching stream.
- grpc_byte_stream_cache cache;
- grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
- grpc_caching_byte_stream stream;
- grpc_caching_byte_stream_init(&stream, &cache);
+ ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
+ ByteStreamCache::CachingByteStream stream(&cache);
grpc_closure closure;
- GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
+ GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read one slice.
- GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
+ ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
- grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_error* error = stream.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(output);
// Reset the caching stream. The reads should start over from the
// first slice.
- grpc_caching_byte_stream_reset(&stream);
+ stream.Reset();
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
- GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
- error = grpc_byte_stream_pull(&stream.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[i], output));
+ ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
+ error = stream.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Clean up.
- grpc_byte_stream_destroy(&stream.base);
- grpc_byte_stream_cache_destroy(&cache);
- grpc_slice_buffer_destroy_internal(&buffer);
+ stream.Orphan();
+ cache.Destroy();
}
-static void test_caching_byte_stream_shared_cache(void) {
- gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache");
+TEST(CachingByteStream, SharedCache) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
@@ -213,54 +203,50 @@ static void test_caching_byte_stream_shared_cache(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
- grpc_slice_buffer_stream underlying_stream;
- grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ SliceBufferByteStream underlying_stream(&buffer, 0);
+ grpc_slice_buffer_destroy_internal(&buffer);
// Create cache and two caching streams.
- grpc_byte_stream_cache cache;
- grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
- grpc_caching_byte_stream stream1;
- grpc_caching_byte_stream_init(&stream1, &cache);
- grpc_caching_byte_stream stream2;
- grpc_caching_byte_stream_init(&stream2, &cache);
+ ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
+ ByteStreamCache::CachingByteStream stream1(&cache);
+ ByteStreamCache::CachingByteStream stream2(&cache);
grpc_closure closure;
- GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
+ GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read one slice from stream1.
- GPR_ASSERT(grpc_byte_stream_next(&stream1.base, ~(size_t)0, &closure));
+ EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
grpc_slice output;
- grpc_error* error = grpc_byte_stream_pull(&stream1.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_error* error = stream1.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(output);
// Read all slices from stream2.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
- GPR_ASSERT(grpc_byte_stream_next(&stream2.base, ~(size_t)0, &closure));
- error = grpc_byte_stream_pull(&stream2.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[i], output));
+ EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
+ error = stream2.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Now read the second slice from stream1.
- GPR_ASSERT(grpc_byte_stream_next(&stream1.base, ~(size_t)0, &closure));
- error = grpc_byte_stream_pull(&stream1.base, &output);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- GPR_ASSERT(grpc_slice_eq(input[1], output));
+ EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
+ error = stream1.Pull(&output);
+ EXPECT_TRUE(error == GRPC_ERROR_NONE);
+ EXPECT_TRUE(grpc_slice_eq(input[1], output));
grpc_slice_unref_internal(output);
// Clean up.
- grpc_byte_stream_destroy(&stream1.base);
- grpc_byte_stream_destroy(&stream2.base);
- grpc_byte_stream_cache_destroy(&cache);
- grpc_slice_buffer_destroy_internal(&buffer);
+ stream1.Orphan();
+ stream2.Orphan();
+ cache.Destroy();
}
+} // namespace
+} // namespace grpc_core
+
int main(int argc, char** argv) {
grpc_init();
grpc_test_init(argc, argv);
- test_slice_buffer_stream_basic();
- test_slice_buffer_stream_shutdown();
- test_caching_byte_stream_basic();
- test_caching_byte_stream_reset();
- test_caching_byte_stream_shared_cache();
+ ::testing::InitGoogleTest(&argc, argv);
+ int retval = RUN_ALL_TESTS();
grpc_shutdown();
- return 0;
+ return retval;
}
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 259bef4fd1..d00c79b610 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -398,13 +398,13 @@ static void BM_TransportStreamSend(benchmark::State& state) {
memset(&op, 0, sizeof(op));
op.payload = &op_payload;
};
- grpc_slice_buffer_stream send_stream;
- grpc_slice_buffer send_buffer;
- grpc_slice_buffer_init(&send_buffer);
- grpc_slice_buffer_add(&send_buffer, gpr_slice_malloc(state.range(0)));
- memset(GRPC_SLICE_START_PTR(send_buffer.slices[0]), 0,
- GRPC_SLICE_LENGTH(send_buffer.slices[0]));
-
+ // Create the send_message payload slice.
+ // Note: We use grpc_slice_malloc_large() instead of grpc_slice_malloc()
+ // to force the slice to be refcounted, so that it remains alive when it
+ // is unreffed after each send_message op.
+ grpc_slice send_slice = grpc_slice_malloc_large(state.range(0));
+ memset(GRPC_SLICE_START_PTR(send_slice), 0, GRPC_SLICE_LENGTH(send_slice));
+ grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> send_stream;
grpc_metadata_batch b;
grpc_metadata_batch_init(&b);
b.deadline = GRPC_MILLIS_INF_FUTURE;
@@ -424,14 +424,18 @@ static void BM_TransportStreamSend(benchmark::State& state) {
gpr_event_set(bm_done, (void*)1);
return;
}
+ grpc_slice_buffer send_buffer;
+ grpc_slice_buffer_init(&send_buffer);
+ grpc_slice_buffer_add(&send_buffer, grpc_slice_ref(send_slice));
+ send_stream.Init(&send_buffer, 0);
+ grpc_slice_buffer_destroy(&send_buffer);
// force outgoing window to be yuge
s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
- grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
reset_op();
op.on_complete = c.get();
op.send_message = true;
- op.payload->send_message.send_message = &send_stream.base;
+ op.payload->send_message.send_message.reset(send_stream.get());
s->Op(&op);
});
@@ -454,7 +458,7 @@ static void BM_TransportStreamSend(benchmark::State& state) {
s.reset();
track_counters.Finish(state);
grpc_metadata_batch_destroy(&b);
- grpc_slice_buffer_destroy(&send_buffer);
+ grpc_slice_unref(send_slice);
}
BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024);
@@ -524,7 +528,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
grpc_transport_stream_op_batch_payload op_payload;
memset(&op_payload, 0, sizeof(op_payload));
grpc_transport_stream_op_batch op;
- grpc_byte_stream* recv_stream;
+ grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_stream;
grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384);
auto reset_op = [&]() {
@@ -579,21 +583,20 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
drain = MakeClosure([&](grpc_error* error) {
do {
- if (received == recv_stream->length) {
- grpc_byte_stream_destroy(recv_stream);
+ if (received == recv_stream->length()) {
+ recv_stream.reset();
GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE);
return;
}
- } while (grpc_byte_stream_next(recv_stream, recv_stream->length - received,
- drain_continue.get()) &&
- GRPC_ERROR_NONE ==
- grpc_byte_stream_pull(recv_stream, &recv_slice) &&
+ } while (recv_stream->Next(recv_stream->length() - received,
+ drain_continue.get()) &&
+ GRPC_ERROR_NONE == recv_stream->Pull(&recv_slice) &&
(received += GRPC_SLICE_LENGTH(recv_slice),
grpc_slice_unref_internal(recv_slice), true));
});
drain_continue = MakeClosure([&](grpc_error* error) {
- grpc_byte_stream_pull(recv_stream, &recv_slice);
+ recv_stream->Pull(&recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(recv_slice);
GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE);