diff options
Diffstat (limited to 'test/core/end2end')
-rw-r--r-- | test/core/end2end/end2end_nosec_tests.c | 24 | ||||
-rw-r--r-- | test/core/end2end/end2end_tests.c | 24 | ||||
-rw-r--r-- | test/core/end2end/fuzzers/hpack.dictionary | 10 | ||||
-rwxr-xr-x | test/core/end2end/gen_build_yaml.py | 4 | ||||
-rwxr-xr-x | test/core/end2end/generate_tests.bzl | 3 | ||||
-rw-r--r-- | test/core/end2end/tests/stream_compression_compressed_payload.c | 652 | ||||
-rw-r--r-- | test/core/end2end/tests/stream_compression_payload.c | 305 | ||||
-rw-r--r-- | test/core/end2end/tests/stream_compression_ping_pong_streaming.c | 291 |
8 files changed, 1311 insertions, 2 deletions
diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c index 6a061a4e2d..3236feea56 100644 --- a/test/core/end2end/end2end_nosec_tests.c +++ b/test/core/end2end/end2end_nosec_tests.c @@ -130,6 +130,12 @@ extern void simple_metadata(grpc_end2end_test_config config); extern void simple_metadata_pre_init(void); extern void simple_request(grpc_end2end_test_config config); extern void simple_request_pre_init(void); +extern void stream_compression_compressed_payload(grpc_end2end_test_config config); +extern void stream_compression_compressed_payload_pre_init(void); +extern void stream_compression_payload(grpc_end2end_test_config config); +extern void stream_compression_payload_pre_init(void); +extern void stream_compression_ping_pong_streaming(grpc_end2end_test_config config); +extern void stream_compression_ping_pong_streaming_pre_init(void); extern void streaming_error_response(grpc_end2end_test_config config); extern void streaming_error_response_pre_init(void); extern void trailing_metadata(grpc_end2end_test_config config); @@ -195,6 +201,9 @@ void grpc_end2end_tests_pre_init(void) { simple_delayed_request_pre_init(); simple_metadata_pre_init(); simple_request_pre_init(); + stream_compression_compressed_payload_pre_init(); + stream_compression_payload_pre_init(); + stream_compression_ping_pong_streaming_pre_init(); streaming_error_response_pre_init(); trailing_metadata_pre_init(); workaround_cronet_compression_pre_init(); @@ -259,6 +268,9 @@ void grpc_end2end_tests(int argc, char **argv, simple_delayed_request(config); simple_metadata(config); simple_request(config); + stream_compression_compressed_payload(config); + stream_compression_payload(config); + stream_compression_ping_pong_streaming(config); streaming_error_response(config); trailing_metadata(config); workaround_cronet_compression(config); @@ -468,6 +480,18 @@ void grpc_end2end_tests(int argc, char **argv, simple_request(config); continue; } + if (0 == strcmp("stream_compression_compressed_payload", argv[i])) { + stream_compression_compressed_payload(config); + continue; + } + if (0 == strcmp("stream_compression_payload", argv[i])) { + stream_compression_payload(config); + continue; + } + if (0 == strcmp("stream_compression_ping_pong_streaming", argv[i])) { + stream_compression_ping_pong_streaming(config); + continue; + } if (0 == strcmp("streaming_error_response", argv[i])) { streaming_error_response(config); continue; diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c index 3fc7c3fb6c..ca9443b642 100644 --- a/test/core/end2end/end2end_tests.c +++ b/test/core/end2end/end2end_tests.c @@ -132,6 +132,12 @@ extern void simple_metadata(grpc_end2end_test_config config); extern void simple_metadata_pre_init(void); extern void simple_request(grpc_end2end_test_config config); extern void simple_request_pre_init(void); +extern void stream_compression_compressed_payload(grpc_end2end_test_config config); +extern void stream_compression_compressed_payload_pre_init(void); +extern void stream_compression_payload(grpc_end2end_test_config config); +extern void stream_compression_payload_pre_init(void); +extern void stream_compression_ping_pong_streaming(grpc_end2end_test_config config); +extern void stream_compression_ping_pong_streaming_pre_init(void); extern void streaming_error_response(grpc_end2end_test_config config); extern void streaming_error_response_pre_init(void); extern void trailing_metadata(grpc_end2end_test_config config); @@ -198,6 +204,9 @@ void grpc_end2end_tests_pre_init(void) { simple_delayed_request_pre_init(); simple_metadata_pre_init(); simple_request_pre_init(); + stream_compression_compressed_payload_pre_init(); + stream_compression_payload_pre_init(); + stream_compression_ping_pong_streaming_pre_init(); streaming_error_response_pre_init(); trailing_metadata_pre_init(); workaround_cronet_compression_pre_init(); @@ -263,6 +272,9 @@ void grpc_end2end_tests(int argc, char **argv, simple_delayed_request(config); simple_metadata(config); simple_request(config); + stream_compression_compressed_payload(config); + stream_compression_payload(config); + stream_compression_ping_pong_streaming(config); streaming_error_response(config); trailing_metadata(config); workaround_cronet_compression(config); @@ -476,6 +488,18 @@ void grpc_end2end_tests(int argc, char **argv, simple_request(config); continue; } + if (0 == strcmp("stream_compression_compressed_payload", argv[i])) { + stream_compression_compressed_payload(config); + continue; + } + if (0 == strcmp("stream_compression_payload", argv[i])) { + stream_compression_payload(config); + continue; + } + if (0 == strcmp("stream_compression_ping_pong_streaming", argv[i])) { + stream_compression_ping_pong_streaming(config); + continue; + } if (0 == strcmp("streaming_error_response", argv[i])) { streaming_error_response(config); continue; diff --git a/test/core/end2end/fuzzers/hpack.dictionary b/test/core/end2end/fuzzers/hpack.dictionary index 2bb9de34c5..7c77512aa9 100644 --- a/test/core/end2end/fuzzers/hpack.dictionary +++ b/test/core/end2end/fuzzers/hpack.dictionary @@ -14,7 +14,10 @@ "\x0Dgrpc-tags-bin" "\x0Egrpc-trace-bin" "\x0Ccontent-type" +"\x10content-encoding" +"\x0Faccept-encoding" "\x1Egrpc-internal-encoding-request" +"%grpc-internal-stream-encoding-request" "\x0Auser-agent" "\x04host" "\x08lb-token" @@ -49,7 +52,6 @@ "\x03400" "\x03500" "\x0Eaccept-charset" -"\x0Faccept-encoding" "\x0Dgzip, deflate" "\x0Faccept-language" "\x0Daccept-ranges" @@ -60,7 +62,6 @@ "\x0Dauthorization" "\x0Dcache-control" "\x13content-disposition" -"\x10content-encoding" "\x10content-language" "\x0Econtent-length" "\x10content-location" @@ -134,6 +135,8 @@ "\x00\x0Dauthorization\x00" "\x00\x0Dcache-control\x00" "\x00\x13content-disposition\x00" +"\x00\x10content-encoding\x08identity" +"\x00\x10content-encoding\x04gzip" "\x00\x10content-encoding\x00" "\x00\x10content-language\x00" "\x00\x0Econtent-length\x00" @@ -179,3 +182,6 @@ "\x00\x14grpc-accept-encoding\x0Didentity,gzip" "\x00\x14grpc-accept-encoding\x0Cdeflate,gzip" "\x00\x14grpc-accept-encoding\x15identity,deflate,gzip" +"\x00\x0Faccept-encoding\x08identity" +"\x00\x0Faccept-encoding\x04gzip" +"\x00\x0Faccept-encoding\x0Didentity,gzip" diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index 18bae63a8a..33fd97f3bd 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -137,6 +137,10 @@ END2END_TESTS = { 'shutdown_finishes_calls': default_test_options._replace(cpu_cost=LOWCPU), 'shutdown_finishes_tags': default_test_options._replace(cpu_cost=LOWCPU), 'simple_cacheable_request': default_test_options._replace(cpu_cost=LOWCPU), + 'stream_compression_compressed_payload': default_test_options._replace(proxyable=False, + exclude_inproc=True), + 'stream_compression_payload': default_test_options._replace(exclude_inproc=True), + 'stream_compression_ping_pong_streaming': default_test_options._replace(exclude_inproc=True), 'simple_delayed_request': connectivity_test_options, 'simple_metadata': default_test_options, 'simple_request': default_test_options, diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index 6d1917c0ff..9bbba26108 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -138,6 +138,9 @@ END2END_TESTS = { 'simple_metadata': test_options(), 'simple_request': test_options(), 'streaming_error_response': test_options(), + 'stream_compression_compressed_payload': test_options(proxyable=False, exclude_inproc=True), + 'stream_compression_payload': test_options(exclude_inproc=True), + 'stream_compression_ping_pong_streaming': test_options(exclude_inproc=True), 'trailing_metadata': test_options(), 'authority_not_supported': test_options(), 'filter_latency': test_options(), diff --git a/test/core/end2end/tests/stream_compression_compressed_payload.c b/test/core/end2end/tests/stream_compression_compressed_payload.c new file mode 100644 index 0000000000..11e7999a1c --- /dev/null +++ b/test/core/end2end/tests/stream_compression_compressed_payload.c @@ -0,0 +1,652 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/byte_buffer.h> +#include <grpc/byte_buffer_reader.h> +#include <grpc/compression.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/call_test_only.h" +#include "src/core/lib/transport/static_metadata.h" +#include "test/core/end2end/cq_verifier.h" + +static void *tag(intptr_t t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +static void request_for_disabled_algorithm( + grpc_end2end_test_config config, const char *test_name, + uint32_t send_flags_bitmask, + grpc_stream_compression_algorithm algorithm_to_disable, + grpc_stream_compression_algorithm requested_client_compression_algorithm, + grpc_status_code expected_error, grpc_metadata *client_metadata) { + grpc_call *c; + grpc_call *s; + grpc_slice request_payload_slice; + grpc_byte_buffer *request_payload; + grpc_channel_args *client_args; + grpc_channel_args *server_args; + grpc_end2end_test_fixture f; + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *request_payload_recv = NULL; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + cq_verifier *cqv; + char str[1024]; + + memset(str, 'x', 1023); + str[1023] = '\0'; + request_payload_slice = grpc_slice_from_copied_string(str); + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + + client_args = grpc_channel_args_set_stream_compression_algorithm( + NULL, requested_client_compression_algorithm); + server_args = grpc_channel_args_set_stream_compression_algorithm( + NULL, GRPC_STREAM_COMPRESS_NONE); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + server_args = grpc_channel_args_stream_compression_algorithm_set_state( + &exec_ctx, &server_args, algorithm_to_disable, false); + grpc_exec_ctx_finish(&exec_ctx); + } + + f = begin_test(config, test_name, client_args, server_args); + cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call( + f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/foo"), + get_host_override_slice("foo.test.google.fr:1234", config), deadline, + NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + if (client_metadata != NULL) { + op->data.send_initial_metadata.count = 1; + op->data.send_initial_metadata.metadata = client_metadata; + } else { + op->data.send_initial_metadata.count = 0; + } + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = send_flags_bitmask; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), true); + cq_verify(cqv); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(102), false); + + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(103), true); + CQ_EXPECT_COMPLETION(cqv, tag(1), true); + cq_verify(cqv); + + /* call was cancelled (closed) ... */ + GPR_ASSERT(was_cancelled != 0); + /* with a certain error */ + GPR_ASSERT(status == expected_error); + + char *algo_name = NULL; + GPR_ASSERT( + grpc_stream_compression_algorithm_name(algorithm_to_disable, &algo_name)); + char *expected_details = NULL; + gpr_asprintf(&expected_details, + "Stream compression algorithm '%s' is disabled.", algo_name); + /* and we expect a specific reason for it */ + GPR_ASSERT(0 == grpc_slice_str_cmp(details, expected_details)); + gpr_free(expected_details); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); + validate_host_override_string("foo.test.google.fr:1234", call_details.host, + config); + + grpc_slice_unref(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + grpc_slice_unref(request_payload_slice); + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(request_payload_recv); + + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); + } + + end_test(&f); + config.tear_down_data(&f); +} + +static void request_with_payload_template( + grpc_end2end_test_config config, const char *test_name, + uint32_t client_send_flags_bitmask, + grpc_stream_compression_algorithm + default_client_channel_compression_algorithm, + grpc_stream_compression_algorithm + default_server_channel_compression_algorithm, + grpc_stream_compression_algorithm expected_client_compression_algorithm, + grpc_stream_compression_algorithm expected_server_compression_algorithm, + grpc_metadata *client_init_metadata, bool set_server_level, + grpc_stream_compression_level server_compression_level, + bool send_message_before_initial_metadata, + bool set_default_server_message_compression_algorithm, + grpc_compression_algorithm default_server_message_compression_algorithm) { + grpc_call *c; + grpc_call *s; + grpc_slice request_payload_slice; + grpc_byte_buffer *request_payload = NULL; + grpc_channel_args *client_args; + grpc_channel_args *server_args; + grpc_end2end_test_fixture f; + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *request_payload_recv = NULL; + grpc_byte_buffer *response_payload; + grpc_byte_buffer *response_payload_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + cq_verifier *cqv; + char request_str[1024]; + char response_str[1024]; + + memset(request_str, 'x', 1023); + request_str[1023] = '\0'; + + memset(response_str, 'y', 1023); + response_str[1023] = '\0'; + + request_payload_slice = grpc_slice_from_copied_string(request_str); + grpc_slice response_payload_slice = + grpc_slice_from_copied_string(response_str); + + client_args = grpc_channel_args_set_stream_compression_algorithm( + NULL, default_client_channel_compression_algorithm); + if (set_default_server_message_compression_algorithm) { + server_args = grpc_channel_args_set_compression_algorithm( + NULL, default_server_message_compression_algorithm); + } else { + server_args = grpc_channel_args_set_stream_compression_algorithm( + NULL, default_server_channel_compression_algorithm); + } + + f = begin_test(config, test_name, client_args, server_args); + cqv = cq_verifier_create(f.cq); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call( + f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/foo"), + get_host_override_slice("foo.test.google.fr:1234", config), deadline, + NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + if (send_message_before_initial_metadata) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = client_send_flags_bitmask; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(2), true); + } + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + if (client_init_metadata != NULL) { + op->data.send_initial_metadata.count = 1; + op->data.send_initial_metadata.metadata = client_init_metadata; + } else { + op->data.send_initial_metadata.count = 0; + } + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(100), true); + cq_verify(cqv); + + GPR_ASSERT(GPR_BITCOUNT(grpc_call_test_only_get_encodings_accepted_by_peer( + s)) == GRPC_COMPRESS_ALGORITHMS_COUNT); + GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s), + GRPC_COMPRESS_NONE) != 0); + GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s), + GRPC_COMPRESS_DEFLATE) != 0); + GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s), + GRPC_COMPRESS_GZIP) != 0); + GPR_ASSERT( + GPR_BITCOUNT(grpc_call_test_only_get_stream_encodings_accepted_by_peer( + s)) == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT); + GPR_ASSERT( + GPR_BITGET(grpc_call_test_only_get_stream_encodings_accepted_by_peer(s), + GRPC_STREAM_COMPRESS_NONE) != 0); + GPR_ASSERT( + GPR_BITGET(grpc_call_test_only_get_stream_encodings_accepted_by_peer(s), + GRPC_STREAM_COMPRESS_GZIP) != 0); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + if (set_server_level) { + op->data.send_initial_metadata.maybe_stream_compression_level.is_set = true; + op->data.send_initial_metadata.maybe_stream_compression_level.level = + server_compression_level; + } + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + for (int i = 0; i < 2; i++) { + response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); + + if (i > 0 || !send_message_before_initial_metadata) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = client_send_flags_bitmask; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(2), 1); + } + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); + cq_verify(cqv); + + GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW); + GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, request_str)); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(103), 1); + CQ_EXPECT_COMPLETION(cqv, tag(3), 1); + cq_verify(cqv); + + GPR_ASSERT(response_payload_recv->type == GRPC_BB_RAW); + GPR_ASSERT(byte_buffer_eq_string(response_payload_recv, response_str)); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + } + + grpc_slice_unref(request_payload_slice); + grpc_slice_unref(response_payload_slice); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(4), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + CQ_EXPECT_COMPLETION(cqv, tag(4), 1); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + CQ_EXPECT_COMPLETION(cqv, tag(104), 1); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); + validate_host_override_string("foo.test.google.fr:1234", call_details.host, + config); + GPR_ASSERT(was_cancelled == 0); + + grpc_slice_unref(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); + } + + end_test(&f); + config.tear_down_data(&f); +} + +static void test_invoke_request_with_compressed_payload( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_compressed_payload", 0, + GRPC_STREAM_COMPRESS_GZIP, GRPC_STREAM_COMPRESS_GZIP, + GRPC_STREAM_COMPRESS_GZIP, GRPC_STREAM_COMPRESS_GZIP, NULL, + false, /* ignored */ + GRPC_STREAM_COMPRESS_LEVEL_NONE, false, false, GRPC_COMPRESS_NONE); +} + +static void test_invoke_request_with_send_message_before_initial_metadata( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_send_message_before_initial_metadata", + 0, GRPC_STREAM_COMPRESS_GZIP, GRPC_STREAM_COMPRESS_GZIP, + GRPC_STREAM_COMPRESS_GZIP, GRPC_STREAM_COMPRESS_GZIP, NULL, + false, /* ignored */ + GRPC_STREAM_COMPRESS_LEVEL_NONE, true, false, GRPC_COMPRESS_NONE); +} + +static void test_invoke_request_with_server_level( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_server_level", 0, + GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE, + GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_GZIP, + /* ignored */ NULL, true, GRPC_STREAM_COMPRESS_LEVEL_HIGH, false, false, + GRPC_COMPRESS_NONE); +} + +static void test_invoke_request_with_compressed_payload_md_override( + grpc_end2end_test_config config) { + grpc_metadata gzip_compression_override; + grpc_metadata identity_compression_override; + + gzip_compression_override.key = + GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST; + gzip_compression_override.value = grpc_slice_from_static_string("gzip"); + memset(&gzip_compression_override.internal_data, 0, + sizeof(gzip_compression_override.internal_data)); + + identity_compression_override.key = + GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST; + identity_compression_override.value = + grpc_slice_from_static_string("identity"); + memset(&identity_compression_override.internal_data, 0, + sizeof(identity_compression_override.internal_data)); + + /* Channel default NONE (aka IDENTITY), call override to stream GZIP */ + request_with_payload_template( + config, "test_invoke_request_with_compressed_payload_md_override_1", 0, + GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE, + GRPC_STREAM_COMPRESS_GZIP, GRPC_STREAM_COMPRESS_NONE, + &gzip_compression_override, false, + /*ignored*/ GRPC_STREAM_COMPRESS_LEVEL_NONE, false, false, + GRPC_COMPRESS_NONE); + + /* Channel default stream GZIP, call override to NONE (aka IDENTITY) */ + request_with_payload_template( + config, "test_invoke_request_with_compressed_payload_md_override_3", 0, + GRPC_STREAM_COMPRESS_GZIP, GRPC_STREAM_COMPRESS_NONE, + GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE, + &identity_compression_override, false, + /*ignored*/ GRPC_STREAM_COMPRESS_LEVEL_NONE, false, false, + GRPC_COMPRESS_NONE); +} + +static void test_invoke_request_with_disabled_algorithm( + grpc_end2end_test_config config) { + request_for_disabled_algorithm( + config, "test_invoke_request_with_disabled_algorithm", 0, + GRPC_STREAM_COMPRESS_GZIP, GRPC_STREAM_COMPRESS_GZIP, + GRPC_STATUS_UNIMPLEMENTED, NULL); +} + +static void test_stream_compression_override_message_compression( + grpc_end2end_test_config config) { + grpc_stream_compression_level level = GRPC_STREAM_COMPRESS_LEVEL_MED; + request_with_payload_template( + config, "test_stream_compression_override_message_compression", 0, + GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE, + GRPC_STREAM_COMPRESS_NONE, + grpc_stream_compression_algorithm_for_level( + level, (1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) - 1), + /* ignored */ NULL, true, level, false, true, GRPC_COMPRESS_GZIP); +} + +void stream_compression_compressed_payload(grpc_end2end_test_config config) { + test_invoke_request_with_compressed_payload(config); + test_invoke_request_with_send_message_before_initial_metadata(config); + test_invoke_request_with_server_level(config); + test_invoke_request_with_compressed_payload_md_override(config); + test_invoke_request_with_disabled_algorithm(config); + test_stream_compression_override_message_compression(config); +} + +void stream_compression_compressed_payload_pre_init(void) {} diff --git a/test/core/end2end/tests/stream_compression_payload.c b/test/core/end2end/tests/stream_compression_payload.c new file mode 100644 index 0000000000..5135df81ed --- /dev/null +++ b/test/core/end2end/tests/stream_compression_payload.c @@ -0,0 +1,305 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/byte_buffer.h> +#include <grpc/compression.h> +#include <grpc/compression.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/surface/call.h" +#include "test/core/end2end/cq_verifier.h" + +static void *tag(intptr_t t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +/* Creates and returns a grpc_slice containing random alphanumeric characters. + */ +static grpc_slice generate_random_slice() { + size_t i; + static const char chars[] = "abcdefghijklmnopqrstuvwxyz1234567890"; + char *output; + const size_t output_size = 1024 * 1024; + output = gpr_malloc(output_size); + for (i = 0; i < output_size - 1; ++i) { + output[i] = chars[rand() % (int)(sizeof(chars) - 1)]; + } + output[output_size - 1] = '\0'; + grpc_slice out = grpc_slice_from_copied_string(output); + gpr_free(output); + return out; +} + +static void request_response_with_payload(grpc_end2end_test_config config, + grpc_end2end_test_fixture f) { + /* Create large request and response bodies. These are big enough to require + * multiple round trips to deliver to the peer, and their exact contents of + * will be verified on completion. */ + grpc_slice request_payload_slice = generate_random_slice(); + grpc_slice response_payload_slice = generate_random_slice(); + + grpc_call *c; + grpc_call *s; + grpc_byte_buffer *request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_byte_buffer *response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *request_payload_recv = NULL; + grpc_byte_buffer *response_payload_recv = NULL; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + + gpr_timespec deadline = n_seconds_from_now(60); + c = grpc_channel_create_call( + f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/foo"), + get_host_override_slice("foo.test.google.fr:1234", config), deadline, + NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(103), 1); + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz")); + GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo")); + validate_host_override_string("foo.test.google.fr:1234", call_details.host, + config); + GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice)); + GPR_ASSERT( + byte_buffer_eq_slice(response_payload_recv, response_payload_slice)); + + grpc_slice_unref(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); +} + +/* Client sends a request with payload, server reads then returns a response + payload and status. */ +static void test_invoke_request_response_with_payload( + grpc_end2end_test_config config) { + grpc_channel_args *client_args = + grpc_channel_args_set_stream_compression_algorithm( + NULL, GRPC_STREAM_COMPRESS_GZIP); + grpc_channel_args *server_args = + grpc_channel_args_set_stream_compression_algorithm( + NULL, GRPC_STREAM_COMPRESS_GZIP); + grpc_end2end_test_fixture f = + begin_test(config, "test_invoke_request_response_with_payload", + client_args, server_args); + request_response_with_payload(config, f); + end_test(&f); + config.tear_down_data(&f); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +static void test_invoke_10_request_response_with_payload( + grpc_end2end_test_config config) { + int i; + grpc_end2end_test_fixture f = begin_test( + config, "test_invoke_10_request_response_with_payload", NULL, NULL); + for (i = 0; i < 10; i++) { + request_response_with_payload(config, f); + } + end_test(&f); + config.tear_down_data(&f); +} + +void stream_compression_payload(grpc_end2end_test_config config) { + test_invoke_request_response_with_payload(config); + test_invoke_10_request_response_with_payload(config); +} + +void stream_compression_payload_pre_init(void) {} diff --git a/test/core/end2end/tests/stream_compression_ping_pong_streaming.c b/test/core/end2end/tests/stream_compression_ping_pong_streaming.c new file mode 100644 index 0000000000..4c1a34cc64 --- /dev/null +++ b/test/core/end2end/tests/stream_compression_ping_pong_streaming.c @@ -0,0 +1,291 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/byte_buffer.h> +#include <grpc/compression.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/surface/call.h" +#include "test/core/end2end/cq_verifier.h" + +static void *tag(intptr_t t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_from_now(int n) { + return grpc_timeout_seconds_to_deadline(n); +} + +static gpr_timespec five_seconds_from_now(void) { + return n_seconds_from_now(5); +} + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_from_now(), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); +} + +/* Client pings and server pongs. Repeat messages rounds before finishing. */ +static void test_pingpong_streaming(grpc_end2end_test_config config, + int messages) { + grpc_channel_args *client_args = + grpc_channel_args_set_stream_compression_algorithm( + NULL, GRPC_STREAM_COMPRESS_GZIP); + grpc_channel_args *server_args = + grpc_channel_args_set_stream_compression_algorithm( + NULL, GRPC_STREAM_COMPRESS_GZIP); + grpc_end2end_test_fixture f = + begin_test(config, "test_pingpong_streaming", client_args, server_args); + grpc_call *c; + grpc_call *s; + cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + grpc_slice details; + int was_cancelled = 2; + grpc_byte_buffer *request_payload; + grpc_byte_buffer *request_payload_recv; + grpc_byte_buffer *response_payload; + grpc_byte_buffer *response_payload_recv; + int i; + grpc_slice request_payload_slice = + grpc_slice_from_copied_string("hello world"); + grpc_slice response_payload_slice = + grpc_slice_from_copied_string("hello you"); + + gpr_timespec deadline = five_seconds_from_now(); + c = grpc_channel_create_call( + f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + grpc_slice_from_static_string("/foo"), + get_host_override_slice("foo.test.google.fr:1234", config), deadline, + NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(100), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + for (i = 0; i < messages; i++) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(103), 1); + CQ_EXPECT_COMPLETION(cqv, tag(2), 1); + cq_verify(cqv); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(request_payload_recv); + grpc_byte_buffer_destroy(response_payload_recv); + } + + grpc_slice_unref(request_payload_slice); + grpc_slice_unref(response_payload_slice); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + CQ_EXPECT_COMPLETION(cqv, tag(3), 1); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + CQ_EXPECT_COMPLETION(cqv, tag(104), 1); + cq_verify(cqv); + + grpc_call_unref(c); + grpc_call_unref(s); + + cq_verifier_destroy(cqv); + + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_slice_unref(details); + + end_test(&f); + config.tear_down_data(&f); + { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_args_destroy(&exec_ctx, client_args); + grpc_channel_args_destroy(&exec_ctx, server_args); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +void stream_compression_ping_pong_streaming(grpc_end2end_test_config config) { + int i; + + for (i = 1; i < 10; i++) { + test_pingpong_streaming(config, i); + } +} + +void stream_compression_ping_pong_streaming_pre_init(void) {} |