diff options
Diffstat (limited to 'test')
21 files changed, 680 insertions, 246 deletions
diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c index b71299c09e..2893bddc43 100644 --- a/test/core/end2end/end2end_nosec_tests.c +++ b/test/core/end2end/end2end_nosec_tests.c @@ -115,6 +115,8 @@ extern void simple_metadata(grpc_end2end_test_config config); extern void simple_metadata_pre_init(void); extern void simple_request(grpc_end2end_test_config config); extern void simple_request_pre_init(void); +extern void streaming_error_response(grpc_end2end_test_config config); +extern void streaming_error_response_pre_init(void); extern void trailing_metadata(grpc_end2end_test_config config); extern void trailing_metadata_pre_init(void); @@ -157,6 +159,7 @@ void grpc_end2end_tests_pre_init(void) { simple_delayed_request_pre_init(); simple_metadata_pre_init(); simple_request_pre_init(); + streaming_error_response_pre_init(); trailing_metadata_pre_init(); } @@ -203,6 +206,7 @@ void grpc_end2end_tests(int argc, char **argv, simple_delayed_request(config); simple_metadata(config); simple_request(config); + streaming_error_response(config); trailing_metadata(config); return; } @@ -352,6 +356,10 @@ void grpc_end2end_tests(int argc, char **argv, simple_request(config); continue; } + if (0 == strcmp("streaming_error_response", argv[i])) { + streaming_error_response(config); + continue; + } if (0 == strcmp("trailing_metadata", argv[i])) { trailing_metadata(config); continue; diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c index 00c9c44a78..96a38e76dc 100644 --- a/test/core/end2end/end2end_tests.c +++ b/test/core/end2end/end2end_tests.c @@ -117,6 +117,8 @@ extern void simple_metadata(grpc_end2end_test_config config); extern void simple_metadata_pre_init(void); extern void simple_request(grpc_end2end_test_config config); extern void simple_request_pre_init(void); +extern void streaming_error_response(grpc_end2end_test_config config); +extern void streaming_error_response_pre_init(void); extern void trailing_metadata(grpc_end2end_test_config config); extern void trailing_metadata_pre_init(void); @@ -160,6 +162,7 @@ void grpc_end2end_tests_pre_init(void) { simple_delayed_request_pre_init(); simple_metadata_pre_init(); simple_request_pre_init(); + streaming_error_response_pre_init(); trailing_metadata_pre_init(); } @@ -207,6 +210,7 @@ void grpc_end2end_tests(int argc, char **argv, simple_delayed_request(config); simple_metadata(config); simple_request(config); + streaming_error_response(config); trailing_metadata(config); return; } @@ -360,6 +364,10 @@ void grpc_end2end_tests(int argc, char **argv, simple_request(config); continue; } + if (0 == strcmp("streaming_error_response", argv[i])) { + streaming_error_response(config); + continue; + } if (0 == strcmp("trailing_metadata", argv[i])) { trailing_metadata(config); continue; diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/2c452818a10ddef09b90c89a53db14b9b56b21f3 b/test/core/end2end/fuzzers/client_fuzzer_corpus/2c452818a10ddef09b90c89a53db14b9b56b21f3 Binary files differnew file mode 100644 index 0000000000..059634fda1 --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/2c452818a10ddef09b90c89a53db14b9b56b21f3 diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/42ead79c94eccdf8a8c3d8036be73e14fa260dd5 b/test/core/end2end/fuzzers/client_fuzzer_corpus/42ead79c94eccdf8a8c3d8036be73e14fa260dd5 Binary files differnew file mode 100644 index 0000000000..b9c53b26ed --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/42ead79c94eccdf8a8c3d8036be73e14fa260dd5 diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/4e05d6cf1c3f0c04f6ee92d09a53ee0fe35c085a b/test/core/end2end/fuzzers/client_fuzzer_corpus/4e05d6cf1c3f0c04f6ee92d09a53ee0fe35c085a Binary files differnew file mode 100644 index 0000000000..8a4a279998 --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/4e05d6cf1c3f0c04f6ee92d09a53ee0fe35c085a diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/8f980dd25f1c77e3536131c2c620aa32e8c13180 b/test/core/end2end/fuzzers/client_fuzzer_corpus/8f980dd25f1c77e3536131c2c620aa32e8c13180 Binary files differnew file mode 100644 index 0000000000..fcebab7a64 --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/8f980dd25f1c77e3536131c2c620aa32e8c13180 diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/aef36c49d7dec0dcf8cdc224d9e9221fa2cb1db0 b/test/core/end2end/fuzzers/client_fuzzer_corpus/aef36c49d7dec0dcf8cdc224d9e9221fa2cb1db0 Binary files differnew file mode 100644 index 0000000000..6b015fe66e --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/aef36c49d7dec0dcf8cdc224d9e9221fa2cb1db0 diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/crash-14ed70cd9ea7987cdd0c8f6e39398ee7c60ee2ff b/test/core/end2end/fuzzers/client_fuzzer_corpus/crash-14ed70cd9ea7987cdd0c8f6e39398ee7c60ee2ff Binary files differnew file mode 100644 index 0000000000..be6366049d --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/crash-14ed70cd9ea7987cdd0c8f6e39398ee7c60ee2ff diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/dcb06a6e34cbed15515e5b3581ca666f704777bd b/test/core/end2end/fuzzers/client_fuzzer_corpus/dcb06a6e34cbed15515e5b3581ca666f704777bd Binary files differnew file mode 100644 index 0000000000..92750f94a3 --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/dcb06a6e34cbed15515e5b3581ca666f704777bd diff --git a/test/core/end2end/fuzzers/client_fuzzer_corpus/ea46b684f1e67a27c231f2d536c41da631189b9c b/test/core/end2end/fuzzers/client_fuzzer_corpus/ea46b684f1e67a27c231f2d536c41da631189b9c Binary files differnew file mode 100644 index 0000000000..c891d9adc8 --- /dev/null +++ b/test/core/end2end/fuzzers/client_fuzzer_corpus/ea46b684f1e67a27c231f2d536c41da631189b9c diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index 325d9b3cad..6d3d8f8d3c 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -126,6 +126,7 @@ END2END_TESTS = { 'simple_delayed_request': connectivity_test_options, 'simple_metadata': default_test_options, 'simple_request': default_test_options, + 'streaming_error_response': default_test_options, 'trailing_metadata': default_test_options, } diff --git a/test/core/end2end/tests/streaming_error_response.c b/test/core/end2end/tests/streaming_error_response.c new file mode 100644 index 0000000000..e15c132d63 --- /dev/null +++ b/test/core/end2end/tests/streaming_error_response.c @@ -0,0 +1,278 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/byte_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "test/core/end2end/cq_verifier.h" + +enum { TIMEOUT = 200000 }; + +static void *tag(intptr_t t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args, + bool request_status_early) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "%s/%s/request_status_early=%s", test_name, config.name, + request_status_early ? "true" : "false"); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_time(int n) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +} + +static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck( + f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); +} + +/* Client sends a request with payload, server reads then returns status. */ +static void test(grpc_end2end_test_config config, bool request_status_early) { + grpc_call *c; + grpc_call *s; + gpr_slice response_payload1_slice = gpr_slice_from_copied_string("hello"); + grpc_byte_buffer *response_payload1 = + grpc_raw_byte_buffer_create(&response_payload1_slice, 1); + gpr_slice response_payload2_slice = gpr_slice_from_copied_string("world"); + grpc_byte_buffer *response_payload2 = + grpc_raw_byte_buffer_create(&response_payload2_slice, 1); + gpr_timespec deadline = five_seconds_time(); + grpc_end2end_test_fixture f = begin_test(config, "streaming_error_response", + NULL, NULL, request_status_early); + cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *response_payload1_recv = NULL; + grpc_byte_buffer *response_payload2_recv = NULL; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + int was_cancelled = 2; + + c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + "/foo", "foo.test.google.fr", deadline, NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &response_payload1_recv; + op++; + if (request_status_early) { + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op++; + } + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call( + f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101))); + cq_expect_completion(cqv, tag(101), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload1; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(102), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload2; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(103), 1); + if (!request_status_early) { + cq_expect_completion(cqv, tag(1), 1); + } + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_FAILED_PRECONDITION; + op->data.send_status_from_server.status_details = "xyz"; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + if (!request_status_early) { + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &response_payload2_recv; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + } + + cq_expect_completion(cqv, tag(104), 1); + if (request_status_early) { + cq_expect_completion(cqv, tag(1), 1); + } else { + cq_expect_completion(cqv, tag(2), 1); + } + cq_verify(cqv); + + if (!request_status_early) { + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(3), 1); + cq_verify(cqv); + + GPR_ASSERT(response_payload1_recv != NULL); + GPR_ASSERT(response_payload2_recv != NULL); + } + + GPR_ASSERT(status == GRPC_STATUS_FAILED_PRECONDITION); + GPR_ASSERT(0 == strcmp(details, "xyz")); + GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); + GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); + GPR_ASSERT(was_cancelled == 1); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_destroy(c); + grpc_call_destroy(s); + + cq_verifier_destroy(cqv); + + grpc_byte_buffer_destroy(response_payload1); + grpc_byte_buffer_destroy(response_payload2); + grpc_byte_buffer_destroy(response_payload1_recv); + grpc_byte_buffer_destroy(response_payload2_recv); + + end_test(&f); + config.tear_down_data(&f); +} + +void streaming_error_response(grpc_end2end_test_config config) { + test(config, false); + test(config, true); +} + +void streaming_error_response_pre_init(void) {} diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc index addaf174f2..e8ae6ee572 100644 --- a/test/cpp/interop/client.cc +++ b/test/cpp/interop/client.cc @@ -40,7 +40,9 @@ #include <grpc++/client_context.h> #include <grpc/grpc.h> #include <grpc/support/log.h> +#include <grpc/support/useful.h> +#include "src/core/lib/support/string.h" #include "test/cpp/interop/client_helper.h" #include "test/cpp/interop/interop_client.h" #include "test/cpp/util/test_config.h" @@ -52,30 +54,31 @@ DEFINE_string(server_host, "127.0.0.1", "Server host to connect to"); DEFINE_string(server_host_override, "foo.test.google.fr", "Override the server host which is sent in HTTP header"); DEFINE_string(test_case, "large_unary", - "Configure different test cases. Valid options are: " - "empty_unary : empty (zero bytes) request and response; " - "large_unary : single request and (large) response; " - "large_compressed_unary : single request and compressed (large) " - "response; " - "client_streaming : request streaming with single response; " - "server_streaming : single request with response streaming; " + "Configure different test cases. Valid options are:\n\n" + "all : all test cases;\n" + "cancel_after_begin : cancel stream after starting it;\n" + "cancel_after_first_response: cancel on first response;\n" + "client_compressed_streaming : compressed request streaming with " + "client_compressed_unary : single compressed request;\n" + "client_streaming : request streaming with single response;\n" + "compute_engine_creds: large_unary with compute engine auth;\n" + "custom_metadata: server will echo custom metadata;\n" + "empty_stream : bi-di stream with no request/response;\n" + "empty_unary : empty (zero bytes) request and response;\n" + "half_duplex : half-duplex streaming;\n" + "jwt_token_creds: large_unary with JWT token auth;\n" + "large_unary : single request and (large) response;\n" + "oauth2_auth_token: raw oauth2 access token auth;\n" + "per_rpc_creds: raw oauth2 access token on a single rpc;\n" + "ping_pong : full-duplex streaming;\n" + "response streaming;\n" "server_compressed_streaming : single request with compressed " - "response streaming; " - "slow_consumer : single request with response; " - " streaming with slow client consumer; " - "half_duplex : half-duplex streaming; " - "ping_pong : full-duplex streaming; " - "cancel_after_begin : cancel stream after starting it; " - "cancel_after_first_response: cancel on first response; " - "timeout_on_sleeping_server: deadline exceeds on stream; " - "empty_stream : bi-di stream with no request/response; " - "compute_engine_creds: large_unary with compute engine auth; " - "jwt_token_creds: large_unary with JWT token auth; " - "oauth2_auth_token: raw oauth2 access token auth; " - "per_rpc_creds: raw oauth2 access token on a single rpc; " - "status_code_and_message: verify status code & message; " - "custom_metadata: server will echo custom metadata;" - "all : all of above."); + "server_compressed_unary : single compressed response;\n" + "server_streaming : single request with response streaming;\n" + "slow_consumer : single request with response streaming with " + "slow client consumer;\n" + "status_code_and_message: verify status code & message;\n" + "timeout_on_sleeping_server: deadline exceeds on stream;\n"); DEFINE_string(default_service_account, "", "Email of GCE default service account"); DEFINE_string(service_account_key_file, "", @@ -104,14 +107,18 @@ int main(int argc, char** argv) { client.DoEmpty(); } else if (FLAGS_test_case == "large_unary") { client.DoLargeUnary(); - } else if (FLAGS_test_case == "large_compressed_unary") { - client.DoLargeCompressedUnary(); + } else if (FLAGS_test_case == "server_compressed_unary") { + client.DoServerCompressedUnary(); + } else if (FLAGS_test_case == "client_compressed_unary") { + client.DoClientCompressedUnary(); } else if (FLAGS_test_case == "client_streaming") { client.DoRequestStreaming(); } else if (FLAGS_test_case == "server_streaming") { client.DoResponseStreaming(); } else if (FLAGS_test_case == "server_compressed_streaming") { - client.DoResponseCompressedStreaming(); + client.DoServerCompressedStreaming(); + } else if (FLAGS_test_case == "client_compressed_streaming") { + client.DoClientCompressedStreaming(); } else if (FLAGS_test_case == "slow_consumer") { client.DoResponseStreamingWithSlowConsumer(); } else if (FLAGS_test_case == "half_duplex") { @@ -144,9 +151,12 @@ int main(int argc, char** argv) { } else if (FLAGS_test_case == "all") { client.DoEmpty(); client.DoLargeUnary(); + client.DoClientCompressedUnary(); + client.DoServerCompressedUnary(); client.DoRequestStreaming(); client.DoResponseStreaming(); - client.DoResponseCompressedStreaming(); + client.DoClientCompressedStreaming(); + client.DoServerCompressedStreaming(); client.DoHalfDuplex(); client.DoPingPong(); client.DoCancelAfterBegin(); @@ -165,15 +175,35 @@ int main(int argc, char** argv) { } // compute_engine_creds only runs in GCE. } else { - gpr_log( - GPR_ERROR, - "Unsupported test case %s. Valid options are all|empty_unary|" - "large_unary|large_compressed_unary|client_streaming|server_streaming|" - "server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|" - "cancel_after_first_response|timeout_on_sleeping_server|empty_stream|" - "compute_engine_creds|jwt_token_creds|oauth2_auth_token|per_rpc_creds|" - "status_code_and_message|custom_metadata", - FLAGS_test_case.c_str()); + const char* testcases[] = {"all", + "cancel_after_begin", + "cancel_after_first_response", + "client_compressed_streaming", + "client_compressed_unary", + "client_streaming", + "compute_engine_creds", + "custom_metadata", + "empty_stream", + "empty_unary", + "half_duplex", + "jwt_token_creds", + "large_unary", + "oauth2_auth_token", + "oauth2_auth_token", + "per_rpc_creds", + "per_rpc_creds", + "ping_pong", + "server_compressed_streaming", + "server_compressed_unary", + "server_streaming", + "status_code_and_message", + "timeout_on_sleeping_server"}; + char* joined_testcases = + gpr_strjoin_sep(testcases, GPR_ARRAY_SIZE(testcases), "\n", NULL); + + gpr_log(GPR_ERROR, "Unsupported test case %s. Valid options are\n%s", + FLAGS_test_case.c_str(), joined_testcases); + gpr_free(joined_testcases); ret = 1; } diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc index 608f902d6f..89f841dbe9 100644 --- a/test/cpp/interop/interop_client.cc +++ b/test/cpp/interop/interop_client.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -57,7 +57,7 @@ namespace testing { namespace { // The same value is defined by the Java client. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904}; -const std::vector<int> response_stream_sizes = {31415, 59, 2653, 58979}; +const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979}; const int kNumResponseMessages = 2000; const int kResponseMessageSize = 1030; const int kReceiveDelayMilliSeconds = 20; @@ -67,28 +67,23 @@ const int kLargeResponseSize = 314159; void NoopChecks(const InteropClientContextInspector& inspector, const SimpleRequest* request, const SimpleResponse* response) {} -void CompressionChecks(const InteropClientContextInspector& inspector, - const SimpleRequest* request, - const SimpleResponse* response) { +void UnaryCompressionChecks(const InteropClientContextInspector& inspector, + const SimpleRequest* request, + const SimpleResponse* response) { const grpc_compression_algorithm received_compression = inspector.GetCallCompressionAlgorithm(); - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { - if (request->request_compressed_response() && - received_compression == GRPC_COMPRESS_NONE) { + if (request->response_compressed().value()) { + if (received_compression == GRPC_COMPRESS_NONE) { // Requested some compression, got NONE. This is an error. gpr_log(GPR_ERROR, "Failure: Requested compression but got uncompressed response " "from server."); abort(); } - } - if (!request->request_compressed_response()) { - GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } else if (request->response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should always - // be compressed. GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // Didn't request compression -> make sure the response is uncompressed + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } } } // namespace @@ -190,11 +185,16 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, CheckerFn custom_checks_fn) { ClientContext context; InteropClientContextInspector inspector(context); - // If the request doesn't already specify the response type, default to - // COMPRESSABLE. request->set_response_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize); + if (request->has_expect_compressed()) { + if (request->expect_compressed().value()) { + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + } else { + context.set_compression_algorithm(GRPC_COMPRESS_NONE); + } + } Status s = serviceStub_.Get()->UnaryCall(&context, *request, response); if (!AssertStatusOk(s)) { @@ -204,27 +204,8 @@ bool InteropClient::PerformLargeUnary(SimpleRequest* request, custom_checks_fn(inspector, request, response); // Payload related checks. - GPR_ASSERT(response->payload().type() == request->response_type()); - switch (response->payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response->payload().body() == - grpc::string(kLargeResponseSize, '\0')); - break; - case PayloadType::UNCOMPRESSABLE: { - // We don't really check anything: We can't assert that the payload is - // uncompressed because it's the server's prerogative to decide on that, - // and different implementations decide differently (ie, Java always - // compresses when requested to do so, whereas C core throws away the - // compressed payload if the output is larger than the input). - // In addition, we don't compare the actual random bytes received because - // asserting that data is sent/received properly isn't the purpose of this - // test. Moreover, different implementations are also free to use - // different sets of random bytes. - } break; - default: - GPR_ASSERT(false); - } - + GPR_ASSERT(response->payload().body() == + grpc::string(kLargeResponseSize, '\0')); return true; } @@ -237,7 +218,6 @@ bool InteropClient::DoComputeEngineCreds( SimpleResponse response; request.set_fill_username(true); request.set_fill_oauth_scope(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -311,7 +291,6 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) { SimpleRequest request; SimpleResponse response; request.set_fill_username(true); - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; @@ -327,7 +306,6 @@ bool InteropClient::DoLargeUnary() { gpr_log(GPR_DEBUG, "Sending a large unary rpc..."); SimpleRequest request; SimpleResponse response; - request.set_response_type(PayloadType::COMPRESSABLE); if (!PerformLargeUnary(&request, &response)) { return false; } @@ -335,32 +313,73 @@ bool InteropClient::DoLargeUnary() { return true; } -bool InteropClient::DoLargeCompressedUnary() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Sending a large compressed unary rpc %s.", - log_suffix); - SimpleRequest request; - SimpleResponse response; - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - if (!PerformLargeUnary(&request, &response, CompressionChecks)) { - gpr_log(GPR_ERROR, "Large compressed unary failed %s", log_suffix); - gpr_free(log_suffix); - return false; - } - - gpr_log(GPR_DEBUG, "Large compressed unary done %s.", log_suffix); +bool InteropClient::DoClientCompressedUnary() { + // Probing for compression-checks support. + ClientContext probe_context; + SimpleRequest probe_req; + SimpleResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + + probe_req.set_response_size(kLargeResponseSize); + probe_req.mutable_payload()->set_body(grpc::string(kLargeRequestSize, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed unary request."); + const Status s = + serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed unary request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding."); + + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending compressed unary request %s.", log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_expect_compressed()->set_value(compressions[i]); + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + return false; + } + + gpr_log(GPR_DEBUG, "Compressed unary request failed %s", log_suffix); + gpr_free(log_suffix); + } + + return true; +} + +bool InteropClient::DoServerCompressedUnary() { + const std::vector<bool> compressions = {true, false}; + for (size_t i = 0; i < compressions.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s)", + compressions[i] ? "true" : "false"); + + gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.", + log_suffix); + SimpleRequest request; + SimpleResponse response; + request.mutable_response_compressed()->set_value(compressions[i]); + + if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) { + gpr_log(GPR_ERROR, "Request for compressed unary failed %s", log_suffix); gpr_free(log_suffix); + return false; } + + gpr_log(GPR_DEBUG, "Request for compressed unary failed %s", log_suffix); + gpr_free(log_suffix); } return true; @@ -387,7 +406,7 @@ bool InteropClient::DoRequestStreaming() { serviceStub_.Get()->StreamingInputCall(&context, &response)); int aggregated_payload_size = 0; - for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) { + for (size_t i = 0; i < request_stream_sizes.size(); ++i) { Payload* payload = request.mutable_payload(); payload->set_body(grpc::string(request_stream_sizes[i], '\0')); if (!stream->Write(request)) { @@ -396,7 +415,7 @@ bool InteropClient::DoRequestStreaming() { } aggregated_payload_size += request_stream_sizes[i]; } - stream->WritesDone(); + GPR_ASSERT(stream->WritesDone()); Status s = stream->Finish(); if (!AssertStatusOk(s)) { @@ -446,92 +465,129 @@ bool InteropClient::DoResponseStreaming() { return true; } -bool InteropClient::DoResponseCompressedStreaming() { - const bool request_compression[] = {false, true}; - const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE}; - for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) { - for (size_t j = 0; j < GPR_ARRAY_SIZE(request_compression); j++) { - ClientContext context; - InteropClientContextInspector inspector(context); - StreamingOutputCallRequest request; - - char* log_suffix; - gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)", - request_compression[j] ? "true" : "false", - PayloadType_Name(payload_types[i]).c_str()); - - gpr_log(GPR_DEBUG, "Receiving response streaming rpc %s.", log_suffix); - - request.set_response_type(payload_types[i]); - request.set_request_compressed_response(request_compression[j]); - - for (size_t k = 0; k < response_stream_sizes.size(); ++k) { - ResponseParameters* response_parameter = - request.add_response_parameters(); - response_parameter->set_size(response_stream_sizes[k]); - } - StreamingOutputCallResponse response; - - std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( - serviceStub_.Get()->StreamingOutputCall(&context, request)); - - size_t k = 0; - while (stream->Read(&response)) { - // Payload related checks. - GPR_ASSERT(response.payload().type() == request.response_type()); - switch (response.payload().type()) { - case PayloadType::COMPRESSABLE: - GPR_ASSERT(response.payload().body() == - grpc::string(response_stream_sizes[k], '\0')); - break; - case PayloadType::UNCOMPRESSABLE: - break; - default: - GPR_ASSERT(false); - } - - // Compression related checks. - if (request.request_compressed_response()) { - GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > - GRPC_COMPRESS_NONE); - if (request.response_type() == PayloadType::COMPRESSABLE) { - // requested compression and compressable response => results should - // always be compressed. - GPR_ASSERT(inspector.GetMessageFlags() & - GRPC_WRITE_INTERNAL_COMPRESS); - } - } else { - // requested *no* compression. - GPR_ASSERT( - !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); - } - - ++k; - } - - gpr_log(GPR_DEBUG, "Response streaming done %s.", log_suffix); - gpr_free(log_suffix); +bool InteropClient::DoClientCompressedStreaming() { + // Probing for compression-checks support. + ClientContext probe_context; + StreamingInputCallRequest probe_req; + StreamingInputCallResponse probe_res; + + probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE); + probe_req.mutable_expect_compressed()->set_value(true); // lies! + probe_req.mutable_payload()->set_body(grpc::string(27182, '\0')); + + gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request."); + + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream( + serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res)); + + if (!probe_stream->Write(probe_req)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + Status s = probe_stream->Finish(); + if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) { + // The server isn't able to evaluate incoming compression, making the rest + // of this test moot. + gpr_log(GPR_DEBUG, "Compressed streaming request probe failed"); + return false; + } + gpr_log(GPR_DEBUG, + "Compressed streaming request probe succeeded. Proceeding."); + + ClientContext context; + StreamingInputCallRequest request; + StreamingInputCallResponse response; + + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream( + serviceStub_.Get()->StreamingInputCall(&context, &response)); + + request.mutable_payload()->set_body(grpc::string(27182, '\0')); + request.mutable_expect_compressed()->set_value(true); + gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled"); + if (!stream->Write(request)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + + WriteOptions wopts; + wopts.set_no_compression(); + request.mutable_payload()->set_body(grpc::string(45904, '\0')); + request.mutable_expect_compressed()->set_value(false); + gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled"); + if (!stream->Write(request, wopts)) { + gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__); + return TransientFailureOrAbort(); + } + GPR_ASSERT(stream->WritesDone()); + + s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } + + return true; +} - if (k < response_stream_sizes.size()) { - // stream->Read() failed before reading all the expected messages. This - // is most likely due to a connection failure. - gpr_log(GPR_ERROR, - "DoResponseCompressedStreaming(): Responses read (k=%" PRIuPTR - ") is " - "less than the expected messages (i.e " - "response_stream_sizes.size() (%" PRIuPTR ")). (i=%" PRIuPTR - ", j=%" PRIuPTR ")", - k, response_stream_sizes.size(), i, j); - return TransientFailureOrAbort(); - } - - Status s = stream->Finish(); - if (!AssertStatusOk(s)) { - return false; - } +bool InteropClient::DoServerCompressedStreaming() { + const std::vector<bool> compressions = {true, false}; + const std::vector<int> sizes = {31415, 92653}; + + ClientContext context; + InteropClientContextInspector inspector(context); + StreamingOutputCallRequest request; + + GPR_ASSERT(compressions.size() == sizes.size()); + for (size_t i = 0; i < sizes.size(); i++) { + char* log_suffix; + gpr_asprintf(&log_suffix, "(compression=%s; size=%d)", + compressions[i] ? "true" : "false", sizes[i]); + + gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix); + gpr_free(log_suffix); + + ResponseParameters* const response_parameter = + request.add_response_parameters(); + response_parameter->mutable_compressed()->set_value(compressions[i]); + response_parameter->set_size(sizes[i]); + } + std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream( + serviceStub_.Get()->StreamingOutputCall(&context, request)); + + size_t k = 0; + StreamingOutputCallResponse response; + while (stream->Read(&response)) { + // Payload size checks. + GPR_ASSERT(response.payload().body() == + grpc::string(request.response_parameters(k).size(), '\0')); + + // Compression checks. + GPR_ASSERT(request.response_parameters(k).has_compressed()); + if (request.response_parameters(k).compressed().value()) { + GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE); + GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS); + } else { + // requested *no* compression. + GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)); } + ++k; + } + + if (k < sizes.size()) { + // stream->Read() failed before reading all the expected messages. This + // is most likely due to a connection failure. + gpr_log(GPR_ERROR, "%s(): Responses read (k=%" PRIuPTR + ") is " + "less than the expected messages (i.e " + "response_stream_sizes.size() (%" PRIuPTR ")).", + __func__, k, response_stream_sizes.size()); + return TransientFailureOrAbort(); } + Status s = stream->Finish(); + if (!AssertStatusOk(s)) { + return false; + } return true; } @@ -632,7 +688,6 @@ bool InteropClient::DoPingPong() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); Payload* payload = request.mutable_payload(); StreamingOutputCallResponse response; @@ -699,7 +754,6 @@ bool InteropClient::DoCancelAfterFirstResponse() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(31415); request.mutable_payload()->set_body(grpc::string(27182, '\0')); @@ -839,7 +893,6 @@ bool InteropClient::DoCustomMetadata() { stream(serviceStub_.Get()->FullDuplexCall(&context)); StreamingOutputCallRequest request; - request.set_response_type(PayloadType::COMPRESSABLE); ResponseParameters* response_parameter = request.add_response_parameters(); response_parameter->set_size(kLargeResponseSize); grpc::string payload(kLargeRequestSize, '\0'); diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h index c8d6810c12..eb886fcb7e 100644 --- a/test/cpp/interop/interop_client.h +++ b/test/cpp/interop/interop_client.h @@ -64,12 +64,14 @@ class InteropClient { bool DoEmpty(); bool DoLargeUnary(); - bool DoLargeCompressedUnary(); + bool DoServerCompressedUnary(); + bool DoClientCompressedUnary(); bool DoPingPong(); bool DoHalfDuplex(); bool DoRequestStreaming(); bool DoResponseStreaming(); - bool DoResponseCompressedStreaming(); + bool DoServerCompressedStreaming(); + bool DoClientCompressedStreaming(); bool DoResponseStreamingWithSlowConsumer(); bool DoCancelAfterBegin(); bool DoCancelAfterFirstResponse(); diff --git a/test/cpp/interop/server_main.cc b/test/cpp/interop/interop_server.cc index 062857f3d6..ebef0002a3 100644 --- a/test/cpp/interop/server_main.cc +++ b/test/cpp/interop/interop_server.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,6 +48,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/lib/transport/byte_stream.h" #include "src/proto/grpc/testing/empty.grpc.pb.h" #include "src/proto/grpc/testing/messages.grpc.pb.h" #include "src/proto/grpc/testing/test.grpc.pb.h" @@ -64,10 +65,10 @@ using grpc::ServerCredentials; using grpc::ServerReader; using grpc::ServerReaderWriter; using grpc::ServerWriter; +using grpc::WriteOptions; using grpc::SslServerCredentialsOptions; using grpc::testing::InteropServerContextInspector; using grpc::testing::Payload; -using grpc::testing::PayloadType; using grpc::testing::SimpleRequest; using grpc::testing::SimpleResponse; using grpc::testing::StreamingInputCallRequest; @@ -78,7 +79,6 @@ using grpc::testing::TestService; using grpc::Status; static bool got_sigint = false; -static const char* kRandomFile = "test/cpp/interop/rnd.dat"; const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial"; const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin"; @@ -110,34 +110,41 @@ void MaybeEchoMetadata(ServerContext* context) { } } -bool SetPayload(PayloadType response_type, int size, Payload* payload) { - payload->set_type(response_type); - switch (response_type) { - case PayloadType::COMPRESSABLE: { - std::unique_ptr<char[]> body(new char[size]()); - payload->set_body(body.get(), size); - } break; - case PayloadType::UNCOMPRESSABLE: { - std::unique_ptr<char[]> body(new char[size]()); - std::ifstream rnd_file(kRandomFile); - GPR_ASSERT(rnd_file.good()); - rnd_file.read(body.get(), size); - GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available - payload->set_body(body.get(), size); - } break; - default: - GPR_ASSERT(false); - } +bool SetPayload(int size, Payload* payload) { + std::unique_ptr<char[]> body(new char[size]()); + payload->set_body(body.get(), size); return true; } -template <typename RequestType> -void SetResponseCompression(ServerContext* context, - const RequestType& request) { - if (request.request_compressed_response()) { - // Any level would do, let's go for HIGH because we are overachievers. - context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); +bool CheckExpectedCompression(const ServerContext& context, + const bool compression_expected) { + const InteropServerContextInspector inspector(context); + const grpc_compression_algorithm received_compression = + inspector.GetCallCompressionAlgorithm(); + + if (compression_expected) { + if (received_compression == GRPC_COMPRESS_NONE) { + // Expected some compression, got NONE. This is an error. + gpr_log(GPR_ERROR, + "Expected compression but got uncompressed request from client."); + return false; + } + if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) { + gpr_log(GPR_ERROR, + "Failure: Requested compression in a compressable request, but " + "compression bit in message flags not set."); + return false; + } + } else { + // Didn't expect compression -> make sure the request is uncompressed + if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) { + gpr_log(GPR_ERROR, + "Failure: Didn't requested compression, but compression bit in " + "message flags set."); + return false; + } } + return true; } class TestServiceImpl : public TestService::Service { @@ -151,11 +158,26 @@ class TestServiceImpl : public TestService::Service { Status UnaryCall(ServerContext* context, const SimpleRequest* request, SimpleResponse* response) { MaybeEchoMetadata(context); - SetResponseCompression(context, *request); + if (request->has_response_compressed()) { + const bool compression_requested = request->response_compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (compression_requested) { + // Any level would do, let's go for HIGH because we are overachievers. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + } else { + context->set_compression_level(GRPC_COMPRESS_LEVEL_NONE); + } + } + if (!CheckExpectedCompression(*context, + request->expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request->response_size() > 0) { - if (!SetPayload(request->response_type(), request->response_size(), - response->mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + if (!SetPayload(request->response_size(), response->mutable_payload())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); } } @@ -171,15 +193,26 @@ class TestServiceImpl : public TestService::Service { Status StreamingOutputCall( ServerContext* context, const StreamingOutputCallRequest* request, ServerWriter<StreamingOutputCallResponse>* writer) { - SetResponseCompression(context, *request); StreamingOutputCallResponse response; bool write_success = true; for (int i = 0; write_success && i < request->response_parameters_size(); i++) { - if (!SetPayload(request->response_type(), - request->response_parameters(i).size(), + if (!SetPayload(request->response_parameters(i).size(), response.mutable_payload())) { - return Status(grpc::StatusCode::INTERNAL, "Error creating payload."); + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Error creating payload."); + } + WriteOptions wopts; + if (request->response_parameters(i).has_compressed()) { + // Compress by default. Disabled on a per-message basis. + context->set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + const bool compression_requested = + request->response_parameters(i).compressed().value(); + gpr_log(GPR_DEBUG, "Request for compression (%s) present for %s", + compression_requested ? "enabled" : "disabled", __func__); + if (!compression_requested) { + wopts.set_no_compression(); + } // else, compression is already enabled via the context. } int time_us; if ((time_us = request->response_parameters(i).interval_us()) > 0) { @@ -189,7 +222,7 @@ class TestServiceImpl : public TestService::Service { gpr_time_from_micros(time_us, GPR_TIMESPAN)); gpr_sleep_until(sleep_time); } - write_success = writer->Write(response); + write_success = writer->Write(response, wopts); } if (write_success) { return Status::OK; @@ -204,6 +237,11 @@ class TestServiceImpl : public TestService::Service { StreamingInputCallRequest request; int aggregated_payload_size = 0; while (reader->Read(&request)) { + if (!CheckExpectedCompression(*context, + request.expect_compressed().value())) { + return Status(grpc::StatusCode::INVALID_ARGUMENT, + "Compressed request expectation not met."); + } if (request.has_payload()) { aggregated_payload_size += request.payload().body().size(); } @@ -221,7 +259,6 @@ class TestServiceImpl : public TestService::Service { StreamingOutputCallResponse response; bool write_success = true; while (write_success && stream->Read(&request)) { - SetResponseCompression(context, request); if (request.response_parameters_size() != 0) { response.mutable_payload()->set_type(request.payload().type()); response.mutable_payload()->set_body( diff --git a/test/cpp/interop/rnd.dat b/test/cpp/interop/rnd.dat Binary files differdeleted file mode 100644 index 8c7f38f9e0..0000000000 --- a/test/cpp/interop/rnd.dat +++ /dev/null diff --git a/test/cpp/interop/server_helper.cc b/test/cpp/interop/server_helper.cc index c6d891ad71..8b0b511bcb 100644 --- a/test/cpp/interop/server_helper.cc +++ b/test/cpp/interop/server_helper.cc @@ -72,6 +72,10 @@ uint32_t InteropServerContextInspector::GetEncodingsAcceptedByClient() const { return grpc_call_test_only_get_encodings_accepted_by_peer(context_.call_); } +uint32_t InteropServerContextInspector::GetMessageFlags() const { + return grpc_call_test_only_get_message_flags(context_.call_); +} + std::shared_ptr<const AuthContext> InteropServerContextInspector::GetAuthContext() const { return context_.auth_context(); diff --git a/test/cpp/interop/server_helper.h b/test/cpp/interop/server_helper.h index 12865e4032..a1da14a4c8 100644 --- a/test/cpp/interop/server_helper.h +++ b/test/cpp/interop/server_helper.h @@ -54,6 +54,7 @@ class InteropServerContextInspector { bool IsCancelled() const; grpc_compression_algorithm GetCallCompressionAlgorithm() const; uint32_t GetEncodingsAcceptedByClient() const; + uint32_t GetMessageFlags() const; private: const ::grpc::ServerContext& context_; diff --git a/test/cpp/interop/stress_interop_client.cc b/test/cpp/interop/stress_interop_client.cc index aa95682e74..1d5fc80cf2 100644 --- a/test/cpp/interop/stress_interop_client.cc +++ b/test/cpp/interop/stress_interop_client.cc @@ -138,8 +138,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoLargeUnary(); break; } - case LARGE_COMPRESSED_UNARY: { - is_success = interop_client_->DoLargeCompressedUnary(); + case CLIENT_COMPRESSED_UNARY: { + is_success = interop_client_->DoClientCompressedUnary(); + break; + } + case CLIENT_COMPRESSED_STREAMING: { + is_success = interop_client_->DoClientCompressedStreaming(); break; } case CLIENT_STREAMING: { @@ -150,8 +154,12 @@ bool StressTestInteropClient::RunTest(TestCaseType test_case) { is_success = interop_client_->DoResponseStreaming(); break; } + case SERVER_COMPRESSED_UNARY: { + is_success = interop_client_->DoServerCompressedUnary(); + break; + } case SERVER_COMPRESSED_STREAMING: { - is_success = interop_client_->DoResponseCompressedStreaming(); + is_success = interop_client_->DoServerCompressedStreaming(); break; } case SLOW_CONSUMER: { diff --git a/test/cpp/interop/stress_interop_client.h b/test/cpp/interop/stress_interop_client.h index aa93b58b4a..cf6a713473 100644 --- a/test/cpp/interop/stress_interop_client.h +++ b/test/cpp/interop/stress_interop_client.h @@ -51,29 +51,33 @@ using std::vector; enum TestCaseType { UNKNOWN_TEST = -1, - EMPTY_UNARY = 0, - LARGE_UNARY = 1, - LARGE_COMPRESSED_UNARY = 2, - CLIENT_STREAMING = 3, - SERVER_STREAMING = 4, - SERVER_COMPRESSED_STREAMING = 5, - SLOW_CONSUMER = 6, - HALF_DUPLEX = 7, - PING_PONG = 8, - CANCEL_AFTER_BEGIN = 9, - CANCEL_AFTER_FIRST_RESPONSE = 10, - TIMEOUT_ON_SLEEPING_SERVER = 11, - EMPTY_STREAM = 12, - STATUS_CODE_AND_MESSAGE = 13, - CUSTOM_METADATA = 14 + EMPTY_UNARY, + LARGE_UNARY, + CLIENT_COMPRESSED_UNARY, + CLIENT_COMPRESSED_STREAMING, + CLIENT_STREAMING, + SERVER_STREAMING, + SERVER_COMPRESSED_UNARY, + SERVER_COMPRESSED_STREAMING, + SLOW_CONSUMER, + HALF_DUPLEX, + PING_PONG, + CANCEL_AFTER_BEGIN, + CANCEL_AFTER_FIRST_RESPONSE, + TIMEOUT_ON_SLEEPING_SERVER, + EMPTY_STREAM, + STATUS_CODE_AND_MESSAGE, + CUSTOM_METADATA }; const vector<pair<TestCaseType, grpc::string>> kTestCaseList = { {EMPTY_UNARY, "empty_unary"}, {LARGE_UNARY, "large_unary"}, - {LARGE_COMPRESSED_UNARY, "large_compressed_unary"}, + {CLIENT_COMPRESSED_UNARY, "client_compressed_unary"}, + {CLIENT_COMPRESSED_STREAMING, "client_compressed_streaming"}, {CLIENT_STREAMING, "client_streaming"}, {SERVER_STREAMING, "server_streaming"}, + {SERVER_COMPRESSED_UNARY, "server_compressed_unary"}, {SERVER_COMPRESSED_STREAMING, "server_compressed_streaming"}, {SLOW_CONSUMER, "slow_consumer"}, {HALF_DUPLEX, "half_duplex"}, |