aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/transport
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-07-26 14:29:52 -0700
committerGravatar Mark D. Roth <roth@google.com>2017-07-26 14:29:52 -0700
commit5794061764976c4771f69a3193cd38b3cdba193e (patch)
tree49aa5451f12f373ae744ab54be78ecdfa08991ff /test/core/transport
parent3064423b71ac076d4df6638fd54c2caf54641546 (diff)
Improvements to grpc_byte_stream API and handling.
- Add shutdown() method (to be used in forthcoming call combiner code). - Use a vtable instead of storing method pointers in each instance. - Check all callers of pull() to make sure that they are properly handling errors. - Clarify ownership rules and attempt to adhere to them. - Added a new grpc_caching_byte_stream implementation, which is used in http_client_filter to avoid having to read the whole send_message byte stream before passing control down the stack. (This class may also be used in the retry code I'm working on separately.) - As part of this, did a major rewrite of http_client_filter, which made the code more readable and fixed a number of potential bugs. Note that some of this code is hard to test right now, due to the fact that the send_message byte stream is always a slice_buffer stream, for which next() is always synchronous and no destruction is needed. However, some future work (specifically, my call combiner work and Craig's incremental send work) will start leveraging this.
Diffstat (limited to 'test/core/transport')
-rw-r--r--test/core/transport/BUILD12
-rw-r--r--test/core/transport/byte_stream_test.c279
2 files changed, 291 insertions, 0 deletions
diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD
index 8091cf9c63..040c0c35c2 100644
--- a/test/core/transport/BUILD
+++ b/test/core/transport/BUILD
@@ -36,6 +36,18 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "byte_stream_test",
+ srcs = ["byte_stream_test.c"],
+ language = "C",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
+grpc_cc_test(
name = "connectivity_state_test",
srcs = ["connectivity_state_test.c"],
language = "C",
diff --git a/test/core/transport/byte_stream_test.c b/test/core/transport/byte_stream_test.c
new file mode 100644
index 0000000000..a0c5f961cf
--- /dev/null
+++ b/test/core/transport/byte_stream_test.c
@@ -0,0 +1,279 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/transport/byte_stream.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/slice/slice_internal.h"
+
+#include "test/core/util/test_config.h"
+
+//
+// grpc_slice_buffer_stream tests
+//
+
+static void not_called_closure(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ GPR_ASSERT(false);
+}
+
+static void test_slice_buffer_stream_basic(void) {
+ gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ // Create byte stream.
+ grpc_slice_buffer_stream stream;
+ grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+ GPR_ASSERT(stream.base.length == 6);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read each slice. Note that next() always returns synchronously.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_slice_buffer_stream_shutdown(void) {
+ gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ // Create byte stream.
+ grpc_slice_buffer_stream stream;
+ grpc_slice_buffer_stream_init(&stream, &buffer, 0);
+ GPR_ASSERT(stream.base.length == 6);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read the first slice.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Now shutdown.
+ grpc_error *shutdown_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
+ grpc_byte_stream_shutdown(&exec_ctx, &stream.base,
+ GRPC_ERROR_REF(shutdown_error));
+ // After shutdown, the next pull() should return the error.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == shutdown_error);
+ GRPC_ERROR_UNREF(error);
+ GRPC_ERROR_UNREF(shutdown_error);
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+//
+// grpc_caching_byte_stream tests
+//
+
+static void test_caching_byte_stream_basic(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and caching stream.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream;
+ grpc_caching_byte_stream_init(&stream, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read each slice. Note that next() always returns synchronously,
+ // because the underlying byte stream always does.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_reset(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and caching stream.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream;
+ grpc_caching_byte_stream_init(&stream, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read one slice.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Reset the caching stream. The reads should start over from the
+ // first slice.
+ grpc_caching_byte_stream_reset(&stream);
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void test_caching_byte_stream_shared_cache(void) {
+ gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache");
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // Create and populate slice buffer byte stream.
+ grpc_slice_buffer buffer;
+ grpc_slice_buffer_init(&buffer);
+ grpc_slice input[] = {
+ grpc_slice_from_static_string("foo"),
+ grpc_slice_from_static_string("bar"),
+ };
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ grpc_slice_buffer_add(&buffer, input[i]);
+ }
+ grpc_slice_buffer_stream underlying_stream;
+ grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
+ // Create cache and two caching streams.
+ grpc_byte_stream_cache cache;
+ grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
+ grpc_caching_byte_stream stream1;
+ grpc_caching_byte_stream_init(&stream1, &cache);
+ grpc_caching_byte_stream stream2;
+ grpc_caching_byte_stream_init(&stream2, &cache);
+ grpc_closure closure;
+ GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
+ grpc_schedule_on_exec_ctx);
+ // Read one slice from stream1.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+ grpc_slice output;
+ grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[0], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Read all slices from stream2.
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream2.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream2.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[i], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ }
+ // Now read the second slice from stream1.
+ GPR_ASSERT(
+ grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
+ error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_slice_eq(input[1], output));
+ grpc_slice_unref_internal(&exec_ctx, output);
+ // Clean up.
+ grpc_byte_stream_destroy(&exec_ctx, &stream1.base);
+ grpc_byte_stream_destroy(&exec_ctx, &stream2.base);
+ grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_slice_buffer_stream_basic();
+ test_slice_buffer_stream_shutdown();
+ test_caching_byte_stream_basic();
+ test_caching_byte_stream_reset();
+ test_caching_byte_stream_shared_cache();
+ return 0;
+}