aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/transport
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-29 12:16:56 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-29 12:16:56 -0700
commitd9b82bdecbf44cc7f9116be45a862457a727e87f (patch)
tree3d1894476ca48867e5ffe799301112f0735a8859 /test/core/transport
parent016ad385e776cd41c0021739435c7bceedd9555c (diff)
parent9811915ba3fa1ccdf44b6a70fe1b1dd4782cd508 (diff)
Merge github.com:grpc/grpc into grpc_millis
Diffstat (limited to 'test/core/transport')
-rw-r--r--test/core/transport/BUILD12
-rw-r--r--test/core/transport/bdp_estimator_test.c7
-rw-r--r--test/core/transport/byte_stream_test.c279
3 files changed, 295 insertions, 3 deletions
diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD
index 8091cf9c63..040c0c35c2 100644
--- a/test/core/transport/BUILD
+++ b/test/core/transport/BUILD
@@ -36,6 +36,18 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "byte_stream_test",
+ srcs = ["byte_stream_test.c"],
+ language = "C",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
+grpc_cc_test(
name = "connectivity_state_test",
srcs = ["connectivity_state_test.c"],
language = "C",
diff --git a/test/core/transport/bdp_estimator_test.c b/test/core/transport/bdp_estimator_test.c
index e2612c7718..dda48f45b1 100644
--- a/test/core/transport/bdp_estimator_test.c
+++ b/test/core/transport/bdp_estimator_test.c
@@ -43,12 +43,13 @@ static void test_get_estimate_no_samples(void) {
static void add_samples(grpc_bdp_estimator *estimator, int64_t *samples,
size_t n) {
- GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567) == true);
+ grpc_bdp_estimator_add_incoming_bytes(estimator, 1234567);
+ GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == true);
grpc_bdp_estimator_schedule_ping(estimator);
grpc_bdp_estimator_start_ping(estimator);
for (size_t i = 0; i < n; i++) {
- GPR_ASSERT(grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]) ==
- false);
+ grpc_bdp_estimator_add_incoming_bytes(estimator, samples[i]);
+ GPR_ASSERT(grpc_bdp_estimator_need_ping(estimator) == false);
}
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1, GPR_TIMESPAN)));
diff --git a/test/core/transport/byte_stream_test.c b/test/core/transport/byte_stream_test.c
new file mode 100644
index 0000000000..a0c5f961cf
--- /dev/null
+++ b/test/core/transport/byte_stream_test.c
@@ -0,0 +1,279 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/transport/byte_stream.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/test_config.h"
+
+//
+// grpc_slice_buffer_stream tests
+//
+
+static void not_called_closure(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ GPR_ASSERT(false);
+}
+
+static void test_slice_buffer_stream_basic(void) {
+ gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ // Create byte stream.
+ grpc_slice_buffer_stream stream;
+ grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+ GPR_ASSERT(stream.base.length == 6);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read each slice. Note that next() always returns synchronously.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_slice_buffer_stream_shutdown(void) {
+ gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ // Create byte stream.
+ grpc_slice_buffer_stream stream;
+ grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+ GPR_ASSERT(stream.base.length == 6);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read the first slice.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Now shutdown.
+ grpc_error *shutdown_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
+ grpc_byte_stream_shutdown(&exec_ctx, &stream.base,
+ GRPC_ERROR_REF(shutdown_error));
+ // After shutdown, the next pull() should return the error.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == shutdown_error);
+ GRPC_ERROR_UNREF(error);
+ GRPC_ERROR_UNREF(shutdown_error);
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+//
+// grpc_caching_byte_stream tests
+//
+
+static void test_caching_byte_stream_basic(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and caching stream.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream;
+ grpc_caching_byte_stream_init(&stream, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read each slice. Note that next() always returns synchronously,
+ // because the underlying byte stream always does.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_reset(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and caching stream.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream;
+ grpc_caching_byte_stream_init(&stream, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read one slice.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Reset the caching stream. The reads should start over from the
+ // first slice.
+ grpc_caching_byte_stream_reset(&stream);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_shared_cache(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and two caching streams.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream1;
+ grpc_caching_byte_stream_init(&stream1, &cache);
+ grpc_caching_byte_stream stream2;
+ grpc_caching_byte_stream_init(&stream2, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read one slice from stream1.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Read all slices from stream2.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream2.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream2.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Now read the second slice from stream1.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[1], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream1.base);
+ grpc_byte_stream_destroy(&exec_ctx, &stream2.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_slice_buffer_stream_basic();
+ test_slice_buffer_stream_shutdown();
+ test_caching_byte_stream_basic();
+ test_caching_byte_stream_reset();
+ test_caching_byte_stream_shared_cache();
+ return 0;
+}