From f74a49ed1400762af07bd65e3cad99e65cf7f6c0 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 18 Jun 2015 17:22:45 -0700 Subject: WIP. Need to merge and add more tests. --- .../tests/request_with_compressed_payload.c | 251 +++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 test/core/end2end/tests/request_with_compressed_payload.c (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c new file mode 100644 index 0000000000..9722f49c5d --- /dev/null +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -0,0 +1,251 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "test/core/end2end/cq_verifier.h" +#include "src/core/channel/channel_args.h" + +enum { TIMEOUT = 200000 }; + +static void *tag(gpr_intptr t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "%s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_client(&f, client_args); + config.init_server(&f, server_args); + return f; +} + +static gpr_timespec n_seconds_time(int n) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +} + +static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_time()); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->server_cq); + drain_cq(f->server_cq); + grpc_completion_queue_destroy(f->server_cq); + grpc_completion_queue_shutdown(f->client_cq); + drain_cq(f->client_cq); + grpc_completion_queue_destroy(f->client_cq); +} + +/* Client sends a request with payload, server reads then returns status. */ +static void test_invoke_request_with_compressed_payload( + grpc_end2end_test_config config) { + grpc_call *c; + grpc_call *s; + gpr_slice request_payload_slice; + grpc_byte_buffer *request_payload; + gpr_timespec deadline = five_seconds_time(); + grpc_channel_args *client_args; + grpc_channel_args *server_args; + grpc_end2end_test_fixture f; + cq_verifier *v_client; + cq_verifier *v_server; + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *request_payload_recv = NULL; + grpc_call_details call_details; + grpc_status_code status; + char *details = NULL; + size_t details_capacity = 0; + int was_cancelled = 2; + + char str[1024]; memset(&str[0], 1023, 'x'); str[1023] = '\0'; + request_payload_slice = gpr_slice_from_copied_string(str); + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + + client_args = + grpc_channel_args_set_compression_level(NULL, GRPC_COMPRESS_LEVEL_HIGH); + server_args = + grpc_channel_args_set_compression_level(NULL, GRPC_COMPRESS_LEVEL_HIGH); + + f = begin_test(config, "test_invoke_request_with_compressed_payload", + client_args, server_args); + v_client = cq_verifier_create(f.client_cq); + v_server = cq_verifier_create(f.server_cq); + + + c = grpc_channel_create_call(f.client, f.client_cq, "/foo", + "foo.test.google.fr", deadline); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = 0; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op++; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(c, ops, op - ops, tag(1))); + + GPR_ASSERT(GRPC_CALL_OK == + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.server_cq, + f.server_cq, tag(101))); + cq_expect_completion(v_server, tag(101), 1); + cq_verify(v_server); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op++; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); + + cq_expect_completion(v_server, tag(102), 1); + cq_verify(v_server); + + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; + op++; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); + + cq_expect_completion(v_server, tag(103), 1); + cq_verify(v_server); + + cq_expect_completion(v_client, tag(1), 1); + cq_verify(v_client); + + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == strcmp(details, "xyz")); + GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); + GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); + GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, str)); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_destroy(c); + grpc_call_destroy(s); + + cq_verifier_destroy(v_client); + cq_verifier_destroy(v_server); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(request_payload_recv); + + grpc_channel_args_destroy(client_args); + grpc_channel_args_destroy(server_args); + + end_test(&f); + config.tear_down_data(&f); +} + +void grpc_end2end_tests(grpc_end2end_test_config config) { + test_invoke_request_with_compressed_payload(config); +} -- cgit v1.2.3 From 5927aec9b72dba7694218501e2f93d6c127776af Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 18 Jun 2015 17:24:44 -0700 Subject: Added generated configs --- gRPC.podspec | 4 +- src/core/channel/compress_filter.c | 10 ++-- src/core/channel/compress_filter.h | 13 +++++- src/core/surface/call.c | 2 +- src/core/surface/channel.c | 1 - src/core/surface/secure_channel_create.c | 5 ++ test/core/end2end/fixtures/chttp2_socket_pair.c | 2 + .../chttp2_socket_pair_one_byte_at_a_time.c | 2 + .../fixtures/chttp2_socket_pair_with_grpc_trace.c | 2 + .../tests/request_with_compressed_payload.c | 53 +++++++++++++++++----- tools/doxygen/Doxyfile.core.internal | 2 +- 11 files changed, 73 insertions(+), 23 deletions(-) (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/gRPC.podspec b/gRPC.podspec index 5e98a09732..5be902b4d1 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -25,8 +25,8 @@ Pod::Spec.new do |s| # Core cross-platform gRPC library, written in C. s.subspec 'C-Core' do |cs| - cs.source_files = 'src/core/support/env.h', 'src/core/support/file.h', 'src/core/support/murmur_hash.h', 'src/core/support/grpc_string.h', 'src/core/support/string_win32.h', 'src/core/support/thd_internal.h', 'include/grpc/support/alloc.h', 'include/grpc/support/atm.h', 'include/grpc/support/atm_gcc_atomic.h', 'include/grpc/support/atm_gcc_sync.h', 'include/grpc/support/atm_win32.h', 'include/grpc/support/cancellable_platform.h', 'include/grpc/support/cmdline.h', 'include/grpc/support/cpu.h', 'include/grpc/support/histogram.h', 'include/grpc/support/host_port.h', 'include/grpc/support/log.h', 'include/grpc/support/log_win32.h', 'include/grpc/support/port_platform.h', 'include/grpc/support/slice.h', 'include/grpc/support/slice_buffer.h', 'include/grpc/support/string_util.h', 'include/grpc/support/subprocess.h', 'include/grpc/support/sync.h', 'include/grpc/support/sync_generic.h', 'include/grpc/support/sync_posix.h', 'include/grpc/support/sync_win32.h', 'include/grpc/support/thd.h', 'include/grpc/support/grpc_time.h', 'include/grpc/support/tls.h', 'include/grpc/support/tls_gcc.h', 'include/grpc/support/tls_msvc.h', 'include/grpc/support/tls_pthread.h', 'include/grpc/support/useful.h', 'src/core/support/alloc.c', 'src/core/support/cancellable.c', 'src/core/support/cmdline.c', 'src/core/support/cpu_iphone.c', 'src/core/support/cpu_linux.c', 'src/core/support/cpu_posix.c', 'src/core/support/cpu_windows.c', 'src/core/support/env_linux.c', 'src/core/support/env_posix.c', 'src/core/support/env_win32.c', 'src/core/support/file.c', 'src/core/support/file_posix.c', 'src/core/support/file_win32.c', 'src/core/support/histogram.c', 'src/core/support/host_port.c', 'src/core/support/log.c', 'src/core/support/log_android.c', 'src/core/support/log_linux.c', 'src/core/support/log_posix.c', 'src/core/support/log_win32.c', 'src/core/support/murmur_hash.c', 'src/core/support/slice.c', 'src/core/support/slice_buffer.c', 'src/core/support/string.c', 'src/core/support/string_posix.c', 'src/core/support/string_win32.c', 'src/core/support/subprocess_posix.c', 'src/core/support/sync.c', 'src/core/support/sync_posix.c', 'src/core/support/sync_win32.c', 'src/core/support/thd.c', 'src/core/support/thd_posix.c', 'src/core/support/thd_win32.c', 'src/core/support/time.c', 'src/core/support/time_posix.c', 'src/core/support/time_win32.c', 'src/core/support/tls_pthread.c', 'src/core/httpcli/format_request.h', 'src/core/httpcli/httpcli.h', 'src/core/httpcli/httpcli_security_connector.h', 'src/core/httpcli/parser.h', 'src/core/security/auth_filters.h', 'src/core/security/base64.h', 'src/core/security/credentials.h', 'src/core/security/json_token.h', 'src/core/security/secure_endpoint.h', 'src/core/security/secure_transport_setup.h', 'src/core/security/security_connector.h', 'src/core/security/security_context.h', 'src/core/tsi/fake_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_interface.h', 'src/core/census/grpc_context.h', 'src/core/channel/census_filter.h', 'src/core/channel/channel_args.h', 'src/core/channel/channel_stack.h', 'src/core/channel/child_channel.h', 'src/core/channel/client_channel.h', 'src/core/channel/client_setup.h', 'src/core/channel/connected_channel.h', 'src/core/channel/context.h', 'src/core/channel/http_client_filter.h', 'src/core/channel/http_server_filter.h', 'src/core/channel/noop_filter.h', 'src/core/compression/message_compress.h', 'src/core/debug/trace.h', 'src/core/iomgr/alarm.h', 'src/core/iomgr/alarm_heap.h', 'src/core/iomgr/alarm_internal.h', 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/fd_posix.h', 'src/core/iomgr/iocp_windows.h', 'src/core/iomgr/iomgr.h', 'src/core/iomgr/iomgr_internal.h', 'src/core/iomgr/iomgr_posix.h', 'src/core/iomgr/pollset.h', 'src/core/iomgr/pollset_kick.h', 'src/core/iomgr/pollset_kick_posix.h', 'src/core/iomgr/pollset_kick_windows.h', 'src/core/iomgr/pollset_posix.h', 'src/core/iomgr/pollset_windows.h', 'src/core/iomgr/resolve_address.h', 'src/core/iomgr/sockaddr.h', 'src/core/iomgr/sockaddr_posix.h', 'src/core/iomgr/sockaddr_utils.h', 'src/core/iomgr/sockaddr_win32.h', 'src/core/iomgr/socket_utils_posix.h', 'src/core/iomgr/socket_windows.h', 'src/core/iomgr/tcp_client.h', 'src/core/iomgr/tcp_posix.h', 'src/core/iomgr/tcp_server.h', 'src/core/iomgr/tcp_windows.h', 'src/core/iomgr/time_averaged_stats.h', 'src/core/iomgr/wakeup_fd_pipe.h', 'src/core/iomgr/wakeup_fd_posix.h', 'src/core/json/json.h', 'src/core/json/json_common.h', 'src/core/json/json_reader.h', 'src/core/json/json_writer.h', 'src/core/profiling/timers.h', 'src/core/profiling/timers_preciseclock.h', 'src/core/surface/byte_buffer_queue.h', 'src/core/surface/call.h', 'src/core/surface/channel.h', 'src/core/surface/client.h', 'src/core/surface/completion_queue.h', 'src/core/surface/event_string.h', 'src/core/surface/init.h', 'src/core/surface/server.h', 'src/core/surface/surface_trace.h', 'src/core/transport/chttp2/alpn.h', 'src/core/transport/chttp2/bin_encoder.h', 'src/core/transport/chttp2/frame.h', 'src/core/transport/chttp2/frame_data.h', 'src/core/transport/chttp2/frame_goaway.h', 'src/core/transport/chttp2/frame_ping.h', 'src/core/transport/chttp2/frame_rst_stream.h', 'src/core/transport/chttp2/frame_settings.h', 'src/core/transport/chttp2/frame_window_update.h', 'src/core/transport/chttp2/hpack_parser.h', 'src/core/transport/chttp2/hpack_table.h', 'src/core/transport/chttp2/http2_errors.h', 'src/core/transport/chttp2/huffsyms.h', 'src/core/transport/chttp2/status_conversion.h', 'src/core/transport/chttp2/stream_encoder.h', 'src/core/transport/chttp2/stream_map.h', 'src/core/transport/chttp2/timeout_encoding.h', 'src/core/transport/chttp2/varint.h', 'src/core/transport/chttp2_transport.h', 'src/core/transport/metadata.h', 'src/core/transport/stream_op.h', 'src/core/transport/transport.h', 'src/core/transport/transport_impl.h', 'src/core/census/context.h', 'include/grpc/grpc_security.h', 'include/grpc/byte_buffer.h', 'include/grpc/byte_buffer_reader.h', 'include/grpc/compression.h', 'include/grpc/grpc.h', 'include/grpc/status.h', 'include/grpc/census.h', 'src/core/httpcli/format_request.c', 'src/core/httpcli/httpcli.c', 'src/core/httpcli/httpcli_security_connector.c', 'src/core/httpcli/parser.c', 'src/core/security/base64.c', 'src/core/security/client_auth_filter.c', 'src/core/security/credentials.c', 'src/core/security/credentials_metadata.c', 'src/core/security/credentials_posix.c', 'src/core/security/credentials_win32.c', 'src/core/security/google_default_credentials.c', 'src/core/security/json_token.c', 'src/core/security/secure_endpoint.c', 'src/core/security/secure_transport_setup.c', 'src/core/security/security_connector.c', 'src/core/security/security_context.c', 'src/core/security/server_auth_filter.c', 'src/core/security/server_secure_chttp2.c', 'src/core/surface/init_secure.c', 'src/core/surface/secure_channel_create.c', 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/ssl_transport_security.c', 'src/core/tsi/transport_security.c', 'src/core/census/grpc_context.c', 'src/core/channel/channel_args.c', 'src/core/channel/channel_stack.c', 'src/core/channel/child_channel.c', 'src/core/channel/client_channel.c', 'src/core/channel/client_setup.c', 'src/core/channel/connected_channel.c', 'src/core/channel/http_client_filter.c', 'src/core/channel/http_server_filter.c', 'src/core/channel/noop_filter.c', 'src/core/compression/algorithm.c', 'src/core/compression/message_compress.c', 'src/core/debug/trace.c', 'src/core/iomgr/alarm.c', 'src/core/iomgr/alarm_heap.c', 'src/core/iomgr/endpoint.c', 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/fd_posix.c', 'src/core/iomgr/iocp_windows.c', 'src/core/iomgr/iomgr.c', 'src/core/iomgr/iomgr_posix.c', 'src/core/iomgr/iomgr_windows.c', 'src/core/iomgr/pollset_kick.c', 'src/core/iomgr/pollset_multipoller_with_epoll.c', 'src/core/iomgr/pollset_multipoller_with_poll_posix.c', 'src/core/iomgr/pollset_posix.c', 'src/core/iomgr/pollset_windows.c', 'src/core/iomgr/resolve_address_posix.c', 'src/core/iomgr/resolve_address_windows.c', 'src/core/iomgr/sockaddr_utils.c', 'src/core/iomgr/socket_utils_common_posix.c', 'src/core/iomgr/socket_utils_linux.c', 'src/core/iomgr/socket_utils_posix.c', 'src/core/iomgr/socket_windows.c', 'src/core/iomgr/tcp_client_posix.c', 'src/core/iomgr/tcp_client_windows.c', 'src/core/iomgr/tcp_posix.c', 'src/core/iomgr/tcp_server_posix.c', 'src/core/iomgr/tcp_server_windows.c', 'src/core/iomgr/tcp_windows.c', 'src/core/iomgr/time_averaged_stats.c', 'src/core/iomgr/wakeup_fd_eventfd.c', 'src/core/iomgr/wakeup_fd_nospecial.c', 'src/core/iomgr/wakeup_fd_pipe.c', 'src/core/iomgr/wakeup_fd_posix.c', 'src/core/json/json.c', 'src/core/json/json_reader.c', 'src/core/json/json_string.c', 'src/core/json/json_writer.c', 'src/core/profiling/basic_timers.c', 'src/core/profiling/stap_timers.c', 'src/core/surface/byte_buffer.c', 'src/core/surface/byte_buffer_queue.c', 'src/core/surface/byte_buffer_reader.c', 'src/core/surface/call.c', 'src/core/surface/call_details.c', 'src/core/surface/call_log_batch.c', 'src/core/surface/channel.c', 'src/core/surface/channel_create.c', 'src/core/surface/client.c', 'src/core/surface/completion_queue.c', 'src/core/surface/event_string.c', 'src/core/surface/init.c', 'src/core/surface/lame_client.c', 'src/core/surface/metadata_array.c', 'src/core/surface/server.c', 'src/core/surface/server_chttp2.c', 'src/core/surface/server_create.c', 'src/core/surface/surface_trace.c', 'src/core/transport/chttp2/alpn.c', 'src/core/transport/chttp2/bin_encoder.c', 'src/core/transport/chttp2/frame_data.c', 'src/core/transport/chttp2/frame_goaway.c', 'src/core/transport/chttp2/frame_ping.c', 'src/core/transport/chttp2/frame_rst_stream.c', 'src/core/transport/chttp2/frame_settings.c', 'src/core/transport/chttp2/frame_window_update.c', 'src/core/transport/chttp2/hpack_parser.c', 'src/core/transport/chttp2/hpack_table.c', 'src/core/transport/chttp2/huffsyms.c', 'src/core/transport/chttp2/status_conversion.c', 'src/core/transport/chttp2/stream_encoder.c', 'src/core/transport/chttp2/stream_map.c', 'src/core/transport/chttp2/timeout_encoding.c', 'src/core/transport/chttp2/varint.c', 'src/core/transport/chttp2_transport.c', 'src/core/transport/metadata.c', 'src/core/transport/stream_op.c', 'src/core/transport/transport.c', 'src/core/transport/transport_op_string.c', 'src/core/census/context.c', 'src/core/census/initialize.c', - cs.private_header_files = 'src/core/support/env.h', 'src/core/support/file.h', 'src/core/support/murmur_hash.h', 'src/core/support/string.h', 'src/core/support/string_win32.h', 'src/core/support/thd_internal.h', 'src/core/httpcli/format_request.h', 'src/core/httpcli/httpcli.h', 'src/core/httpcli/httpcli_security_connector.h', 'src/core/httpcli/parser.h', 'src/core/security/auth_filters.h', 'src/core/security/base64.h', 'src/core/security/credentials.h', 'src/core/security/json_token.h', 'src/core/security/secure_endpoint.h', 'src/core/security/secure_transport_setup.h', 'src/core/security/security_connector.h', 'src/core/security/security_context.h', 'src/core/tsi/fake_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_interface.h', 'src/core/census/grpc_context.h', 'src/core/channel/census_filter.h', 'src/core/channel/channel_args.h', 'src/core/channel/channel_stack.h', 'src/core/channel/child_channel.h', 'src/core/channel/client_channel.h', 'src/core/channel/client_setup.h', 'src/core/channel/connected_channel.h', 'src/core/channel/context.h', 'src/core/channel/http_client_filter.h', 'src/core/channel/http_server_filter.h', 'src/core/channel/noop_filter.h', 'src/core/compression/message_compress.h', 'src/core/debug/trace.h', 'src/core/iomgr/alarm.h', 'src/core/iomgr/alarm_heap.h', 'src/core/iomgr/alarm_internal.h', 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/fd_posix.h', 'src/core/iomgr/iocp_windows.h', 'src/core/iomgr/iomgr.h', 'src/core/iomgr/iomgr_internal.h', 'src/core/iomgr/iomgr_posix.h', 'src/core/iomgr/pollset.h', 'src/core/iomgr/pollset_kick.h', 'src/core/iomgr/pollset_kick_posix.h', 'src/core/iomgr/pollset_kick_windows.h', 'src/core/iomgr/pollset_posix.h', 'src/core/iomgr/pollset_windows.h', 'src/core/iomgr/resolve_address.h', 'src/core/iomgr/sockaddr.h', 'src/core/iomgr/sockaddr_posix.h', 'src/core/iomgr/sockaddr_utils.h', 'src/core/iomgr/sockaddr_win32.h', 'src/core/iomgr/socket_utils_posix.h', 'src/core/iomgr/socket_windows.h', 'src/core/iomgr/tcp_client.h', 'src/core/iomgr/tcp_posix.h', 'src/core/iomgr/tcp_server.h', 'src/core/iomgr/tcp_windows.h', 'src/core/iomgr/time_averaged_stats.h', 'src/core/iomgr/wakeup_fd_pipe.h', 'src/core/iomgr/wakeup_fd_posix.h', 'src/core/json/json.h', 'src/core/json/json_common.h', 'src/core/json/json_reader.h', 'src/core/json/json_writer.h', 'src/core/profiling/timers.h', 'src/core/profiling/timers_preciseclock.h', 'src/core/surface/byte_buffer_queue.h', 'src/core/surface/call.h', 'src/core/surface/channel.h', 'src/core/surface/client.h', 'src/core/surface/completion_queue.h', 'src/core/surface/event_string.h', 'src/core/surface/init.h', 'src/core/surface/server.h', 'src/core/surface/surface_trace.h', 'src/core/transport/chttp2/alpn.h', 'src/core/transport/chttp2/bin_encoder.h', 'src/core/transport/chttp2/frame.h', 'src/core/transport/chttp2/frame_data.h', 'src/core/transport/chttp2/frame_goaway.h', 'src/core/transport/chttp2/frame_ping.h', 'src/core/transport/chttp2/frame_rst_stream.h', 'src/core/transport/chttp2/frame_settings.h', 'src/core/transport/chttp2/frame_window_update.h', 'src/core/transport/chttp2/hpack_parser.h', 'src/core/transport/chttp2/hpack_table.h', 'src/core/transport/chttp2/http2_errors.h', 'src/core/transport/chttp2/huffsyms.h', 'src/core/transport/chttp2/status_conversion.h', 'src/core/transport/chttp2/stream_encoder.h', 'src/core/transport/chttp2/stream_map.h', 'src/core/transport/chttp2/timeout_encoding.h', 'src/core/transport/chttp2/varint.h', 'src/core/transport/chttp2_transport.h', 'src/core/transport/metadata.h', 'src/core/transport/stream_op.h', 'src/core/transport/transport.h', 'src/core/transport/transport_impl.h', 'src/core/census/context.h', + cs.source_files = 'src/core/support/env.h', 'src/core/support/file.h', 'src/core/support/murmur_hash.h', 'src/core/support/grpc_string.h', 'src/core/support/string_win32.h', 'src/core/support/thd_internal.h', 'include/grpc/support/alloc.h', 'include/grpc/support/atm.h', 'include/grpc/support/atm_gcc_atomic.h', 'include/grpc/support/atm_gcc_sync.h', 'include/grpc/support/atm_win32.h', 'include/grpc/support/cancellable_platform.h', 'include/grpc/support/cmdline.h', 'include/grpc/support/cpu.h', 'include/grpc/support/histogram.h', 'include/grpc/support/host_port.h', 'include/grpc/support/log.h', 'include/grpc/support/log_win32.h', 'include/grpc/support/port_platform.h', 'include/grpc/support/slice.h', 'include/grpc/support/slice_buffer.h', 'include/grpc/support/string_util.h', 'include/grpc/support/subprocess.h', 'include/grpc/support/sync.h', 'include/grpc/support/sync_generic.h', 'include/grpc/support/sync_posix.h', 'include/grpc/support/sync_win32.h', 'include/grpc/support/thd.h', 'include/grpc/support/grpc_time.h', 'include/grpc/support/tls.h', 'include/grpc/support/tls_gcc.h', 'include/grpc/support/tls_msvc.h', 'include/grpc/support/tls_pthread.h', 'include/grpc/support/useful.h', 'src/core/support/alloc.c', 'src/core/support/cancellable.c', 'src/core/support/cmdline.c', 'src/core/support/cpu_iphone.c', 'src/core/support/cpu_linux.c', 'src/core/support/cpu_posix.c', 'src/core/support/cpu_windows.c', 'src/core/support/env_linux.c', 'src/core/support/env_posix.c', 'src/core/support/env_win32.c', 'src/core/support/file.c', 'src/core/support/file_posix.c', 'src/core/support/file_win32.c', 'src/core/support/histogram.c', 'src/core/support/host_port.c', 'src/core/support/log.c', 'src/core/support/log_android.c', 'src/core/support/log_linux.c', 'src/core/support/log_posix.c', 'src/core/support/log_win32.c', 'src/core/support/murmur_hash.c', 'src/core/support/slice.c', 'src/core/support/slice_buffer.c', 'src/core/support/string.c', 'src/core/support/string_posix.c', 'src/core/support/string_win32.c', 'src/core/support/subprocess_posix.c', 'src/core/support/sync.c', 'src/core/support/sync_posix.c', 'src/core/support/sync_win32.c', 'src/core/support/thd.c', 'src/core/support/thd_posix.c', 'src/core/support/thd_win32.c', 'src/core/support/time.c', 'src/core/support/time_posix.c', 'src/core/support/time_win32.c', 'src/core/support/tls_pthread.c', 'src/core/httpcli/format_request.h', 'src/core/httpcli/httpcli.h', 'src/core/httpcli/httpcli_security_connector.h', 'src/core/httpcli/parser.h', 'src/core/security/auth_filters.h', 'src/core/security/base64.h', 'src/core/security/credentials.h', 'src/core/security/json_token.h', 'src/core/security/secure_endpoint.h', 'src/core/security/secure_transport_setup.h', 'src/core/security/security_connector.h', 'src/core/security/security_context.h', 'src/core/tsi/fake_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_interface.h', 'src/core/census/grpc_context.h', 'src/core/channel/census_filter.h', 'src/core/channel/channel_args.h', 'src/core/channel/channel_stack.h', 'src/core/channel/child_channel.h', 'src/core/channel/client_channel.h', 'src/core/channel/client_setup.h', 'src/core/channel/compress_filter.h', 'src/core/channel/connected_channel.h', 'src/core/channel/context.h', 'src/core/channel/http_client_filter.h', 'src/core/channel/http_server_filter.h', 'src/core/channel/noop_filter.h', 'src/core/compression/message_compress.h', 'src/core/debug/trace.h', 'src/core/iomgr/alarm.h', 'src/core/iomgr/alarm_heap.h', 'src/core/iomgr/alarm_internal.h', 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/fd_posix.h', 'src/core/iomgr/iocp_windows.h', 'src/core/iomgr/iomgr.h', 'src/core/iomgr/iomgr_internal.h', 'src/core/iomgr/iomgr_posix.h', 'src/core/iomgr/pollset.h', 'src/core/iomgr/pollset_kick.h', 'src/core/iomgr/pollset_kick_posix.h', 'src/core/iomgr/pollset_kick_windows.h', 'src/core/iomgr/pollset_posix.h', 'src/core/iomgr/pollset_windows.h', 'src/core/iomgr/resolve_address.h', 'src/core/iomgr/sockaddr.h', 'src/core/iomgr/sockaddr_posix.h', 'src/core/iomgr/sockaddr_utils.h', 'src/core/iomgr/sockaddr_win32.h', 'src/core/iomgr/socket_utils_posix.h', 'src/core/iomgr/socket_windows.h', 'src/core/iomgr/tcp_client.h', 'src/core/iomgr/tcp_posix.h', 'src/core/iomgr/tcp_server.h', 'src/core/iomgr/tcp_windows.h', 'src/core/iomgr/time_averaged_stats.h', 'src/core/iomgr/wakeup_fd_pipe.h', 'src/core/iomgr/wakeup_fd_posix.h', 'src/core/json/json.h', 'src/core/json/json_common.h', 'src/core/json/json_reader.h', 'src/core/json/json_writer.h', 'src/core/profiling/timers.h', 'src/core/profiling/timers_preciseclock.h', 'src/core/surface/byte_buffer_queue.h', 'src/core/surface/call.h', 'src/core/surface/channel.h', 'src/core/surface/client.h', 'src/core/surface/completion_queue.h', 'src/core/surface/event_string.h', 'src/core/surface/init.h', 'src/core/surface/server.h', 'src/core/surface/surface_trace.h', 'src/core/transport/chttp2/alpn.h', 'src/core/transport/chttp2/bin_encoder.h', 'src/core/transport/chttp2/frame.h', 'src/core/transport/chttp2/frame_data.h', 'src/core/transport/chttp2/frame_goaway.h', 'src/core/transport/chttp2/frame_ping.h', 'src/core/transport/chttp2/frame_rst_stream.h', 'src/core/transport/chttp2/frame_settings.h', 'src/core/transport/chttp2/frame_window_update.h', 'src/core/transport/chttp2/hpack_parser.h', 'src/core/transport/chttp2/hpack_table.h', 'src/core/transport/chttp2/http2_errors.h', 'src/core/transport/chttp2/huffsyms.h', 'src/core/transport/chttp2/status_conversion.h', 'src/core/transport/chttp2/stream_encoder.h', 'src/core/transport/chttp2/stream_map.h', 'src/core/transport/chttp2/timeout_encoding.h', 'src/core/transport/chttp2/varint.h', 'src/core/transport/chttp2_transport.h', 'src/core/transport/metadata.h', 'src/core/transport/stream_op.h', 'src/core/transport/transport.h', 'src/core/transport/transport_impl.h', 'src/core/census/context.h', 'include/grpc/grpc_security.h', 'include/grpc/byte_buffer.h', 'include/grpc/byte_buffer_reader.h', 'include/grpc/compression.h', 'include/grpc/grpc.h', 'include/grpc/status.h', 'include/grpc/census.h', 'src/core/httpcli/format_request.c', 'src/core/httpcli/httpcli.c', 'src/core/httpcli/httpcli_security_connector.c', 'src/core/httpcli/parser.c', 'src/core/security/base64.c', 'src/core/security/client_auth_filter.c', 'src/core/security/credentials.c', 'src/core/security/credentials_metadata.c', 'src/core/security/credentials_posix.c', 'src/core/security/credentials_win32.c', 'src/core/security/google_default_credentials.c', 'src/core/security/json_token.c', 'src/core/security/secure_endpoint.c', 'src/core/security/secure_transport_setup.c', 'src/core/security/security_connector.c', 'src/core/security/security_context.c', 'src/core/security/server_auth_filter.c', 'src/core/security/server_secure_chttp2.c', 'src/core/surface/init_secure.c', 'src/core/surface/secure_channel_create.c', 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/ssl_transport_security.c', 'src/core/tsi/transport_security.c', 'src/core/census/grpc_context.c', 'src/core/channel/channel_args.c', 'src/core/channel/channel_stack.c', 'src/core/channel/child_channel.c', 'src/core/channel/client_channel.c', 'src/core/channel/client_setup.c', 'src/core/channel/compress_filter.c', 'src/core/channel/connected_channel.c', 'src/core/channel/http_client_filter.c', 'src/core/channel/http_server_filter.c', 'src/core/channel/noop_filter.c', 'src/core/compression/algorithm.c', 'src/core/compression/message_compress.c', 'src/core/debug/trace.c', 'src/core/iomgr/alarm.c', 'src/core/iomgr/alarm_heap.c', 'src/core/iomgr/endpoint.c', 'src/core/iomgr/endpoint_pair_posix.c', 'src/core/iomgr/endpoint_pair_windows.c', 'src/core/iomgr/fd_posix.c', 'src/core/iomgr/iocp_windows.c', 'src/core/iomgr/iomgr.c', 'src/core/iomgr/iomgr_posix.c', 'src/core/iomgr/iomgr_windows.c', 'src/core/iomgr/pollset_kick.c', 'src/core/iomgr/pollset_multipoller_with_epoll.c', 'src/core/iomgr/pollset_multipoller_with_poll_posix.c', 'src/core/iomgr/pollset_posix.c', 'src/core/iomgr/pollset_windows.c', 'src/core/iomgr/resolve_address_posix.c', 'src/core/iomgr/resolve_address_windows.c', 'src/core/iomgr/sockaddr_utils.c', 'src/core/iomgr/socket_utils_common_posix.c', 'src/core/iomgr/socket_utils_linux.c', 'src/core/iomgr/socket_utils_posix.c', 'src/core/iomgr/socket_windows.c', 'src/core/iomgr/tcp_client_posix.c', 'src/core/iomgr/tcp_client_windows.c', 'src/core/iomgr/tcp_posix.c', 'src/core/iomgr/tcp_server_posix.c', 'src/core/iomgr/tcp_server_windows.c', 'src/core/iomgr/tcp_windows.c', 'src/core/iomgr/time_averaged_stats.c', 'src/core/iomgr/wakeup_fd_eventfd.c', 'src/core/iomgr/wakeup_fd_nospecial.c', 'src/core/iomgr/wakeup_fd_pipe.c', 'src/core/iomgr/wakeup_fd_posix.c', 'src/core/json/json.c', 'src/core/json/json_reader.c', 'src/core/json/json_string.c', 'src/core/json/json_writer.c', 'src/core/profiling/basic_timers.c', 'src/core/profiling/stap_timers.c', 'src/core/surface/byte_buffer.c', 'src/core/surface/byte_buffer_queue.c', 'src/core/surface/byte_buffer_reader.c', 'src/core/surface/call.c', 'src/core/surface/call_details.c', 'src/core/surface/call_log_batch.c', 'src/core/surface/channel.c', 'src/core/surface/channel_create.c', 'src/core/surface/client.c', 'src/core/surface/completion_queue.c', 'src/core/surface/event_string.c', 'src/core/surface/init.c', 'src/core/surface/lame_client.c', 'src/core/surface/metadata_array.c', 'src/core/surface/server.c', 'src/core/surface/server_chttp2.c', 'src/core/surface/server_create.c', 'src/core/surface/surface_trace.c', 'src/core/transport/chttp2/alpn.c', 'src/core/transport/chttp2/bin_encoder.c', 'src/core/transport/chttp2/frame_data.c', 'src/core/transport/chttp2/frame_goaway.c', 'src/core/transport/chttp2/frame_ping.c', 'src/core/transport/chttp2/frame_rst_stream.c', 'src/core/transport/chttp2/frame_settings.c', 'src/core/transport/chttp2/frame_window_update.c', 'src/core/transport/chttp2/hpack_parser.c', 'src/core/transport/chttp2/hpack_table.c', 'src/core/transport/chttp2/huffsyms.c', 'src/core/transport/chttp2/status_conversion.c', 'src/core/transport/chttp2/stream_encoder.c', 'src/core/transport/chttp2/stream_map.c', 'src/core/transport/chttp2/timeout_encoding.c', 'src/core/transport/chttp2/varint.c', 'src/core/transport/chttp2_transport.c', 'src/core/transport/metadata.c', 'src/core/transport/stream_op.c', 'src/core/transport/transport.c', 'src/core/transport/transport_op_string.c', 'src/core/census/context.c', 'src/core/census/initialize.c', + cs.private_header_files = 'src/core/support/env.h', 'src/core/support/file.h', 'src/core/support/murmur_hash.h', 'src/core/support/string.h', 'src/core/support/string_win32.h', 'src/core/support/thd_internal.h', 'src/core/httpcli/format_request.h', 'src/core/httpcli/httpcli.h', 'src/core/httpcli/httpcli_security_connector.h', 'src/core/httpcli/parser.h', 'src/core/security/auth_filters.h', 'src/core/security/base64.h', 'src/core/security/credentials.h', 'src/core/security/json_token.h', 'src/core/security/secure_endpoint.h', 'src/core/security/secure_transport_setup.h', 'src/core/security/security_connector.h', 'src/core/security/security_context.h', 'src/core/tsi/fake_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_interface.h', 'src/core/census/grpc_context.h', 'src/core/channel/census_filter.h', 'src/core/channel/channel_args.h', 'src/core/channel/channel_stack.h', 'src/core/channel/child_channel.h', 'src/core/channel/client_channel.h', 'src/core/channel/client_setup.h', 'src/core/channel/compress_filter.h', 'src/core/channel/connected_channel.h', 'src/core/channel/context.h', 'src/core/channel/http_client_filter.h', 'src/core/channel/http_server_filter.h', 'src/core/channel/noop_filter.h', 'src/core/compression/message_compress.h', 'src/core/debug/trace.h', 'src/core/iomgr/alarm.h', 'src/core/iomgr/alarm_heap.h', 'src/core/iomgr/alarm_internal.h', 'src/core/iomgr/endpoint.h', 'src/core/iomgr/endpoint_pair.h', 'src/core/iomgr/fd_posix.h', 'src/core/iomgr/iocp_windows.h', 'src/core/iomgr/iomgr.h', 'src/core/iomgr/iomgr_internal.h', 'src/core/iomgr/iomgr_posix.h', 'src/core/iomgr/pollset.h', 'src/core/iomgr/pollset_kick.h', 'src/core/iomgr/pollset_kick_posix.h', 'src/core/iomgr/pollset_kick_windows.h', 'src/core/iomgr/pollset_posix.h', 'src/core/iomgr/pollset_windows.h', 'src/core/iomgr/resolve_address.h', 'src/core/iomgr/sockaddr.h', 'src/core/iomgr/sockaddr_posix.h', 'src/core/iomgr/sockaddr_utils.h', 'src/core/iomgr/sockaddr_win32.h', 'src/core/iomgr/socket_utils_posix.h', 'src/core/iomgr/socket_windows.h', 'src/core/iomgr/tcp_client.h', 'src/core/iomgr/tcp_posix.h', 'src/core/iomgr/tcp_server.h', 'src/core/iomgr/tcp_windows.h', 'src/core/iomgr/time_averaged_stats.h', 'src/core/iomgr/wakeup_fd_pipe.h', 'src/core/iomgr/wakeup_fd_posix.h', 'src/core/json/json.h', 'src/core/json/json_common.h', 'src/core/json/json_reader.h', 'src/core/json/json_writer.h', 'src/core/profiling/timers.h', 'src/core/profiling/timers_preciseclock.h', 'src/core/surface/byte_buffer_queue.h', 'src/core/surface/call.h', 'src/core/surface/channel.h', 'src/core/surface/client.h', 'src/core/surface/completion_queue.h', 'src/core/surface/event_string.h', 'src/core/surface/init.h', 'src/core/surface/server.h', 'src/core/surface/surface_trace.h', 'src/core/transport/chttp2/alpn.h', 'src/core/transport/chttp2/bin_encoder.h', 'src/core/transport/chttp2/frame.h', 'src/core/transport/chttp2/frame_data.h', 'src/core/transport/chttp2/frame_goaway.h', 'src/core/transport/chttp2/frame_ping.h', 'src/core/transport/chttp2/frame_rst_stream.h', 'src/core/transport/chttp2/frame_settings.h', 'src/core/transport/chttp2/frame_window_update.h', 'src/core/transport/chttp2/hpack_parser.h', 'src/core/transport/chttp2/hpack_table.h', 'src/core/transport/chttp2/http2_errors.h', 'src/core/transport/chttp2/huffsyms.h', 'src/core/transport/chttp2/status_conversion.h', 'src/core/transport/chttp2/stream_encoder.h', 'src/core/transport/chttp2/stream_map.h', 'src/core/transport/chttp2/timeout_encoding.h', 'src/core/transport/chttp2/varint.h', 'src/core/transport/chttp2_transport.h', 'src/core/transport/metadata.h', 'src/core/transport/stream_op.h', 'src/core/transport/transport.h', 'src/core/transport/transport_impl.h', 'src/core/census/context.h', cs.header_mappings_dir = '.' # The core library includes its headers as either "src/core/..." or "grpc/...", meaning we have # to tell XCode to look for headers under the "include" subdirectory too. diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index b33640d2ae..918cb2dd79 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -43,7 +43,7 @@ typedef struct call_data { gpr_slice_buffer slices; int remaining_slice_bytes; - int no_compress; /**< whether skip compression for this specific call */ + int no_compress; /**< whether to skip compression for this specific call */ grpc_linked_mdelem compression_algorithm; } call_data; @@ -113,8 +113,9 @@ static void process_send_ops(grpc_call_element *elem, } } - GPR_ASSERT(metadata_op_index >= 0); - GPR_ASSERT(begin_message_index >= 0); + if (metadata_op_index < 0 || begin_message_index < 0) { /* bail out */ + return; + } /* update both the metadata and the begin_message's flags */ if (calld->no_compress) { @@ -212,9 +213,6 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel_args_get_compression_level(args); const grpc_compression_algorithm none_alg = GRPC_COMPRESS_NONE; - /*We shouldn't be in this filter if compression is disabled. */ - GPR_ASSERT(clevel != GRPC_COMPRESS_LEVEL_NONE); - channeld->compress_algorithm_md = grpc_mdelem_from_string_and_buffer( mdctx, "grpc-compression-level", (gpr_uint8*)&clevel, sizeof(clevel)); channeld->compress_algorithm = grpc_compression_algorithm_for_level(clevel); diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h index de3fadebb4..ea667969e1 100644 --- a/src/core/channel/compress_filter.h +++ b/src/core/channel/compress_filter.h @@ -36,7 +36,18 @@ #include "src/core/channel/channel_stack.h" -/* XXX */ +/** Message-level compression filter. + * + * See for the available compression levels. + * + * Use grpc_channel_args_set_compression_level and + * grpc_channel_args_get_compression_level to interact with the compression + * settings for a channel. + * + * grpc_op instances of type GRPC_OP_SEND_MESSAGE can have the bit specified by + * the GRPC_WRITE_NO_COMPRESS mask in order to disable compression in an + * otherwise compressed channel. + * */ extern const grpc_channel_filter grpc_compress_filter; #endif /* GRPC_INTERNAL_CORE_CHANNEL_COMPRESS_FILTER_H */ diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 20ce6f9bf8..41257419c0 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1295,7 +1295,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_MESSAGE; req->data.send_message = op->data.send_message; - req->flags = ops->flags; + req->flags = op->flags; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: /* Flag validation: currently allow no flags */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 1fe3bf357b..6353a83b4f 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -265,7 +265,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { return channel->grpc_compression_level_string; } - grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { return grpc_mdelem_ref(channel->grpc_status_elem[i]); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index 8b39934881..4c94e29e30 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -42,6 +42,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/client_setup.h" +#include "src/core/channel/compress_filter.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/iomgr/resolve_address.h" @@ -239,6 +240,10 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ + if (grpc_channel_args_get_compression_level(args) > + GRPC_COMPRESS_LEVEL_NONE) { + filters[n++] = &grpc_compress_filter; + } filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index 48c121c7c4..6d4b8f12de 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -39,6 +39,7 @@ #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_server_filter.h" +#include "src/core/channel/compress_filter.h" #include "src/core/iomgr/endpoint_pair.h" #include "src/core/iomgr/iomgr.h" #include "src/core/surface/channel.h" @@ -77,6 +78,7 @@ static grpc_transport_setup_result client_setup_transport( const grpc_channel_filter *filters[] = {&grpc_client_surface_filter, &grpc_http_client_filter, + &grpc_compress_filter, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 1d2e6f51c1..43c4665fb9 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -39,6 +39,7 @@ #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_server_filter.h" +#include "src/core/channel/compress_filter.h" #include "src/core/iomgr/endpoint_pair.h" #include "src/core/iomgr/iomgr.h" #include "src/core/surface/channel.h" @@ -77,6 +78,7 @@ static grpc_transport_setup_result client_setup_transport( const grpc_channel_filter *filters[] = {&grpc_client_surface_filter, &grpc_http_client_filter, + &grpc_compress_filter, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c index 0834987fbe..4f81a96778 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_with_grpc_trace.c @@ -39,6 +39,7 @@ #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" #include "src/core/channel/http_server_filter.h" +#include "src/core/channel/compress_filter.h" #include "src/core/iomgr/endpoint_pair.h" #include "src/core/iomgr/iomgr.h" #include "src/core/support/env.h" @@ -78,6 +79,7 @@ static grpc_transport_setup_result client_setup_transport( const grpc_channel_filter *filters[] = {&grpc_client_surface_filter, &grpc_http_client_filter, + &grpc_compress_filter, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); grpc_channel *channel = grpc_channel_create_from_filters( diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index 9722f49c5d..fe41780702 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -103,9 +103,11 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_destroy(f->client_cq); } -/* Client sends a request with payload, server reads then returns status. */ -static void test_invoke_request_with_compressed_payload( - grpc_end2end_test_config config) { +static void request_with_payload_template( + grpc_end2end_test_config config, const char *test_name, + gpr_uint32 send_flags_bitmask, + grpc_compression_level requested_compression_level, + grpc_compression_algorithm expected_compression_algorithm) { grpc_call *c; grpc_call *s; gpr_slice request_payload_slice; @@ -132,17 +134,15 @@ static void test_invoke_request_with_compressed_payload( request_payload_slice = gpr_slice_from_copied_string(str); request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - client_args = - grpc_channel_args_set_compression_level(NULL, GRPC_COMPRESS_LEVEL_HIGH); - server_args = - grpc_channel_args_set_compression_level(NULL, GRPC_COMPRESS_LEVEL_HIGH); + client_args = grpc_channel_args_set_compression_level( + NULL, requested_compression_level); + server_args = grpc_channel_args_set_compression_level( + NULL, requested_compression_level); - f = begin_test(config, "test_invoke_request_with_compressed_payload", - client_args, server_args); + f = begin_test(config, test_name, client_args, server_args); v_client = cq_verifier_create(f.client_cq); v_server = cq_verifier_create(f.server_cq); - c = grpc_channel_create_call(f.client, f.client_cq, "/foo", "foo.test.google.fr", deadline); GPR_ASSERT(c); @@ -159,7 +159,7 @@ static void test_invoke_request_with_compressed_payload( op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = request_payload; - op->flags = 0; + op->flags = send_flags_bitmask; op++; op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; op->flags = 0; @@ -222,6 +222,11 @@ static void test_invoke_request_with_compressed_payload( GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr")); GPR_ASSERT(was_cancelled == 0); + + GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW); + GPR_ASSERT(request_payload_recv->data.raw.compression == + expected_compression_algorithm); + GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, str)); gpr_free(details); @@ -246,6 +251,32 @@ static void test_invoke_request_with_compressed_payload( config.tear_down_data(&f); } +static void test_invoke_request_with_excepcionally_uncompressed_payload( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_excepcionally_uncompressed_payload", + GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE); +} + +static void test_invoke_request_with_compressed_payload( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_compressed_payload", 0, + GRPC_COMPRESS_LEVEL_HIGH, + grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_HIGH)); +} + +static void test_invoke_request_with_uncompressed_payload( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_uncompressed_payload", 0, + GRPC_COMPRESS_LEVEL_NONE, + grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_NONE)); +} + + void grpc_end2end_tests(grpc_end2end_test_config config) { + test_invoke_request_with_excepcionally_uncompressed_payload(config); test_invoke_request_with_compressed_payload(config); + test_invoke_request_with_uncompressed_payload(config); } diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index c1c4b9f66d..824dbef40f 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -760,7 +760,7 @@ WARN_LOGFILE = # spaces. # Note: If this tag is empty the current directory is searched. -INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/compression.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h src/core/httpcli/format_request.h src/core/httpcli/httpcli.h src/core/httpcli/httpcli_security_connector.h src/core/httpcli/parser.h src/core/security/auth_filters.h src/core/security/base64.h src/core/security/credentials.h src/core/security/json_token.h src/core/security/secure_endpoint.h src/core/security/secure_transport_setup.h src/core/security/security_connector.h src/core/security/security_context.h src/core/tsi/fake_transport_security.h src/core/tsi/ssl_transport_security.h src/core/tsi/transport_security.h src/core/tsi/transport_security_interface.h src/core/census/grpc_context.h src/core/channel/census_filter.h src/core/channel/channel_args.h src/core/channel/channel_stack.h src/core/channel/child_channel.h src/core/channel/client_channel.h src/core/channel/client_setup.h src/core/channel/connected_channel.h src/core/channel/context.h src/core/channel/http_client_filter.h src/core/channel/http_server_filter.h src/core/channel/noop_filter.h src/core/compression/message_compress.h src/core/debug/trace.h src/core/iomgr/alarm.h src/core/iomgr/alarm_heap.h src/core/iomgr/alarm_internal.h src/core/iomgr/endpoint.h src/core/iomgr/endpoint_pair.h src/core/iomgr/fd_posix.h src/core/iomgr/iocp_windows.h src/core/iomgr/iomgr.h src/core/iomgr/iomgr_internal.h src/core/iomgr/iomgr_posix.h src/core/iomgr/pollset.h src/core/iomgr/pollset_kick.h src/core/iomgr/pollset_kick_posix.h src/core/iomgr/pollset_kick_windows.h src/core/iomgr/pollset_posix.h src/core/iomgr/pollset_windows.h src/core/iomgr/resolve_address.h src/core/iomgr/sockaddr.h src/core/iomgr/sockaddr_posix.h src/core/iomgr/sockaddr_utils.h src/core/iomgr/sockaddr_win32.h src/core/iomgr/socket_utils_posix.h src/core/iomgr/socket_windows.h src/core/iomgr/tcp_client.h src/core/iomgr/tcp_posix.h src/core/iomgr/tcp_server.h src/core/iomgr/tcp_windows.h src/core/iomgr/time_averaged_stats.h src/core/iomgr/wakeup_fd_pipe.h src/core/iomgr/wakeup_fd_posix.h src/core/json/json.h src/core/json/json_common.h src/core/json/json_reader.h src/core/json/json_writer.h src/core/profiling/timers.h src/core/profiling/timers_preciseclock.h src/core/surface/byte_buffer_queue.h src/core/surface/call.h src/core/surface/channel.h src/core/surface/client.h src/core/surface/completion_queue.h src/core/surface/event_string.h src/core/surface/init.h src/core/surface/server.h src/core/surface/surface_trace.h src/core/transport/chttp2/alpn.h src/core/transport/chttp2/bin_encoder.h src/core/transport/chttp2/frame.h src/core/transport/chttp2/frame_data.h src/core/transport/chttp2/frame_goaway.h src/core/transport/chttp2/frame_ping.h src/core/transport/chttp2/frame_rst_stream.h src/core/transport/chttp2/frame_settings.h src/core/transport/chttp2/frame_window_update.h src/core/transport/chttp2/hpack_parser.h src/core/transport/chttp2/hpack_table.h src/core/transport/chttp2/http2_errors.h src/core/transport/chttp2/huffsyms.h src/core/transport/chttp2/status_conversion.h src/core/transport/chttp2/stream_encoder.h src/core/transport/chttp2/stream_map.h src/core/transport/chttp2/timeout_encoding.h src/core/transport/chttp2/varint.h src/core/transport/chttp2_transport.h src/core/transport/metadata.h src/core/transport/stream_op.h src/core/transport/transport.h src/core/transport/transport_impl.h src/core/census/context.h src/core/httpcli/format_request.c src/core/httpcli/httpcli.c src/core/httpcli/httpcli_security_connector.c src/core/httpcli/parser.c src/core/security/base64.c src/core/security/client_auth_filter.c src/core/security/credentials.c src/core/security/credentials_metadata.c src/core/security/credentials_posix.c src/core/security/credentials_win32.c src/core/security/google_default_credentials.c src/core/security/json_token.c src/core/security/secure_endpoint.c src/core/security/secure_transport_setup.c src/core/security/security_connector.c src/core/security/security_context.c src/core/security/server_auth_filter.c src/core/security/server_secure_chttp2.c src/core/surface/init_secure.c src/core/surface/secure_channel_create.c src/core/tsi/fake_transport_security.c src/core/tsi/ssl_transport_security.c src/core/tsi/transport_security.c src/core/census/grpc_context.c src/core/channel/channel_args.c src/core/channel/channel_stack.c src/core/channel/child_channel.c src/core/channel/client_channel.c src/core/channel/client_setup.c src/core/channel/connected_channel.c src/core/channel/http_client_filter.c src/core/channel/http_server_filter.c src/core/channel/noop_filter.c src/core/compression/algorithm.c src/core/compression/message_compress.c src/core/debug/trace.c src/core/iomgr/alarm.c src/core/iomgr/alarm_heap.c src/core/iomgr/endpoint.c src/core/iomgr/endpoint_pair_posix.c src/core/iomgr/endpoint_pair_windows.c src/core/iomgr/fd_posix.c src/core/iomgr/iocp_windows.c src/core/iomgr/iomgr.c src/core/iomgr/iomgr_posix.c src/core/iomgr/iomgr_windows.c src/core/iomgr/pollset_kick.c src/core/iomgr/pollset_multipoller_with_epoll.c src/core/iomgr/pollset_multipoller_with_poll_posix.c src/core/iomgr/pollset_posix.c src/core/iomgr/pollset_windows.c src/core/iomgr/resolve_address_posix.c src/core/iomgr/resolve_address_windows.c src/core/iomgr/sockaddr_utils.c src/core/iomgr/socket_utils_common_posix.c src/core/iomgr/socket_utils_linux.c src/core/iomgr/socket_utils_posix.c src/core/iomgr/socket_windows.c src/core/iomgr/tcp_client_posix.c src/core/iomgr/tcp_client_windows.c src/core/iomgr/tcp_posix.c src/core/iomgr/tcp_server_posix.c src/core/iomgr/tcp_server_windows.c src/core/iomgr/tcp_windows.c src/core/iomgr/time_averaged_stats.c src/core/iomgr/wakeup_fd_eventfd.c src/core/iomgr/wakeup_fd_nospecial.c src/core/iomgr/wakeup_fd_pipe.c src/core/iomgr/wakeup_fd_posix.c src/core/json/json.c src/core/json/json_reader.c src/core/json/json_string.c src/core/json/json_writer.c src/core/profiling/basic_timers.c src/core/profiling/stap_timers.c src/core/surface/byte_buffer.c src/core/surface/byte_buffer_queue.c src/core/surface/byte_buffer_reader.c src/core/surface/call.c src/core/surface/call_details.c src/core/surface/call_log_batch.c src/core/surface/channel.c src/core/surface/channel_create.c src/core/surface/client.c src/core/surface/completion_queue.c src/core/surface/event_string.c src/core/surface/init.c src/core/surface/lame_client.c src/core/surface/metadata_array.c src/core/surface/server.c src/core/surface/server_chttp2.c src/core/surface/server_create.c src/core/surface/surface_trace.c src/core/transport/chttp2/alpn.c src/core/transport/chttp2/bin_encoder.c src/core/transport/chttp2/frame_data.c src/core/transport/chttp2/frame_goaway.c src/core/transport/chttp2/frame_ping.c src/core/transport/chttp2/frame_rst_stream.c src/core/transport/chttp2/frame_settings.c src/core/transport/chttp2/frame_window_update.c src/core/transport/chttp2/hpack_parser.c src/core/transport/chttp2/hpack_table.c src/core/transport/chttp2/huffsyms.c src/core/transport/chttp2/status_conversion.c src/core/transport/chttp2/stream_encoder.c src/core/transport/chttp2/stream_map.c src/core/transport/chttp2/timeout_encoding.c src/core/transport/chttp2/varint.c src/core/transport/chttp2_transport.c src/core/transport/metadata.c src/core/transport/stream_op.c src/core/transport/transport.c src/core/transport/transport_op_string.c src/core/census/context.c src/core/census/initialize.c include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h src/core/support/env.h src/core/support/file.h src/core/support/murmur_hash.h src/core/support/string.h src/core/support/string_win32.h src/core/support/thd_internal.h src/core/support/alloc.c src/core/support/cancellable.c src/core/support/cmdline.c src/core/support/cpu_iphone.c src/core/support/cpu_linux.c src/core/support/cpu_posix.c src/core/support/cpu_windows.c src/core/support/env_linux.c src/core/support/env_posix.c src/core/support/env_win32.c src/core/support/file.c src/core/support/file_posix.c src/core/support/file_win32.c src/core/support/histogram.c src/core/support/host_port.c src/core/support/log.c src/core/support/log_android.c src/core/support/log_linux.c src/core/support/log_posix.c src/core/support/log_win32.c src/core/support/murmur_hash.c src/core/support/slice.c src/core/support/slice_buffer.c src/core/support/string.c src/core/support/string_posix.c src/core/support/string_win32.c src/core/support/subprocess_posix.c src/core/support/sync.c src/core/support/sync_posix.c src/core/support/sync_win32.c src/core/support/thd.c src/core/support/thd_posix.c src/core/support/thd_win32.c src/core/support/time.c src/core/support/time_posix.c src/core/support/time_win32.c src/core/support/tls_pthread.c +INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/compression.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h src/core/httpcli/format_request.h src/core/httpcli/httpcli.h src/core/httpcli/httpcli_security_connector.h src/core/httpcli/parser.h src/core/security/auth_filters.h src/core/security/base64.h src/core/security/credentials.h src/core/security/json_token.h src/core/security/secure_endpoint.h src/core/security/secure_transport_setup.h src/core/security/security_connector.h src/core/security/security_context.h src/core/tsi/fake_transport_security.h src/core/tsi/ssl_transport_security.h src/core/tsi/transport_security.h src/core/tsi/transport_security_interface.h src/core/census/grpc_context.h src/core/channel/census_filter.h src/core/channel/channel_args.h src/core/channel/channel_stack.h src/core/channel/child_channel.h src/core/channel/client_channel.h src/core/channel/client_setup.h src/core/channel/compress_filter.h src/core/channel/connected_channel.h src/core/channel/context.h src/core/channel/http_client_filter.h src/core/channel/http_server_filter.h src/core/channel/noop_filter.h src/core/compression/message_compress.h src/core/debug/trace.h src/core/iomgr/alarm.h src/core/iomgr/alarm_heap.h src/core/iomgr/alarm_internal.h src/core/iomgr/endpoint.h src/core/iomgr/endpoint_pair.h src/core/iomgr/fd_posix.h src/core/iomgr/iocp_windows.h src/core/iomgr/iomgr.h src/core/iomgr/iomgr_internal.h src/core/iomgr/iomgr_posix.h src/core/iomgr/pollset.h src/core/iomgr/pollset_kick.h src/core/iomgr/pollset_kick_posix.h src/core/iomgr/pollset_kick_windows.h src/core/iomgr/pollset_posix.h src/core/iomgr/pollset_windows.h src/core/iomgr/resolve_address.h src/core/iomgr/sockaddr.h src/core/iomgr/sockaddr_posix.h src/core/iomgr/sockaddr_utils.h src/core/iomgr/sockaddr_win32.h src/core/iomgr/socket_utils_posix.h src/core/iomgr/socket_windows.h src/core/iomgr/tcp_client.h src/core/iomgr/tcp_posix.h src/core/iomgr/tcp_server.h src/core/iomgr/tcp_windows.h src/core/iomgr/time_averaged_stats.h src/core/iomgr/wakeup_fd_pipe.h src/core/iomgr/wakeup_fd_posix.h src/core/json/json.h src/core/json/json_common.h src/core/json/json_reader.h src/core/json/json_writer.h src/core/profiling/timers.h src/core/profiling/timers_preciseclock.h src/core/surface/byte_buffer_queue.h src/core/surface/call.h src/core/surface/channel.h src/core/surface/client.h src/core/surface/completion_queue.h src/core/surface/event_string.h src/core/surface/init.h src/core/surface/server.h src/core/surface/surface_trace.h src/core/transport/chttp2/alpn.h src/core/transport/chttp2/bin_encoder.h src/core/transport/chttp2/frame.h src/core/transport/chttp2/frame_data.h src/core/transport/chttp2/frame_goaway.h src/core/transport/chttp2/frame_ping.h src/core/transport/chttp2/frame_rst_stream.h src/core/transport/chttp2/frame_settings.h src/core/transport/chttp2/frame_window_update.h src/core/transport/chttp2/hpack_parser.h src/core/transport/chttp2/hpack_table.h src/core/transport/chttp2/http2_errors.h src/core/transport/chttp2/huffsyms.h src/core/transport/chttp2/status_conversion.h src/core/transport/chttp2/stream_encoder.h src/core/transport/chttp2/stream_map.h src/core/transport/chttp2/timeout_encoding.h src/core/transport/chttp2/varint.h src/core/transport/chttp2_transport.h src/core/transport/metadata.h src/core/transport/stream_op.h src/core/transport/transport.h src/core/transport/transport_impl.h src/core/census/context.h src/core/httpcli/format_request.c src/core/httpcli/httpcli.c src/core/httpcli/httpcli_security_connector.c src/core/httpcli/parser.c src/core/security/base64.c src/core/security/client_auth_filter.c src/core/security/credentials.c src/core/security/credentials_metadata.c src/core/security/credentials_posix.c src/core/security/credentials_win32.c src/core/security/google_default_credentials.c src/core/security/json_token.c src/core/security/secure_endpoint.c src/core/security/secure_transport_setup.c src/core/security/security_connector.c src/core/security/security_context.c src/core/security/server_auth_filter.c src/core/security/server_secure_chttp2.c src/core/surface/init_secure.c src/core/surface/secure_channel_create.c src/core/tsi/fake_transport_security.c src/core/tsi/ssl_transport_security.c src/core/tsi/transport_security.c src/core/census/grpc_context.c src/core/channel/channel_args.c src/core/channel/channel_stack.c src/core/channel/child_channel.c src/core/channel/client_channel.c src/core/channel/client_setup.c src/core/channel/compress_filter.c src/core/channel/connected_channel.c src/core/channel/http_client_filter.c src/core/channel/http_server_filter.c src/core/channel/noop_filter.c src/core/compression/algorithm.c src/core/compression/message_compress.c src/core/debug/trace.c src/core/iomgr/alarm.c src/core/iomgr/alarm_heap.c src/core/iomgr/endpoint.c src/core/iomgr/endpoint_pair_posix.c src/core/iomgr/endpoint_pair_windows.c src/core/iomgr/fd_posix.c src/core/iomgr/iocp_windows.c src/core/iomgr/iomgr.c src/core/iomgr/iomgr_posix.c src/core/iomgr/iomgr_windows.c src/core/iomgr/pollset_kick.c src/core/iomgr/pollset_multipoller_with_epoll.c src/core/iomgr/pollset_multipoller_with_poll_posix.c src/core/iomgr/pollset_posix.c src/core/iomgr/pollset_windows.c src/core/iomgr/resolve_address_posix.c src/core/iomgr/resolve_address_windows.c src/core/iomgr/sockaddr_utils.c src/core/iomgr/socket_utils_common_posix.c src/core/iomgr/socket_utils_linux.c src/core/iomgr/socket_utils_posix.c src/core/iomgr/socket_windows.c src/core/iomgr/tcp_client_posix.c src/core/iomgr/tcp_client_windows.c src/core/iomgr/tcp_posix.c src/core/iomgr/tcp_server_posix.c src/core/iomgr/tcp_server_windows.c src/core/iomgr/tcp_windows.c src/core/iomgr/time_averaged_stats.c src/core/iomgr/wakeup_fd_eventfd.c src/core/iomgr/wakeup_fd_nospecial.c src/core/iomgr/wakeup_fd_pipe.c src/core/iomgr/wakeup_fd_posix.c src/core/json/json.c src/core/json/json_reader.c src/core/json/json_string.c src/core/json/json_writer.c src/core/profiling/basic_timers.c src/core/profiling/stap_timers.c src/core/surface/byte_buffer.c src/core/surface/byte_buffer_queue.c src/core/surface/byte_buffer_reader.c src/core/surface/call.c src/core/surface/call_details.c src/core/surface/call_log_batch.c src/core/surface/channel.c src/core/surface/channel_create.c src/core/surface/client.c src/core/surface/completion_queue.c src/core/surface/event_string.c src/core/surface/init.c src/core/surface/lame_client.c src/core/surface/metadata_array.c src/core/surface/server.c src/core/surface/server_chttp2.c src/core/surface/server_create.c src/core/surface/surface_trace.c src/core/transport/chttp2/alpn.c src/core/transport/chttp2/bin_encoder.c src/core/transport/chttp2/frame_data.c src/core/transport/chttp2/frame_goaway.c src/core/transport/chttp2/frame_ping.c src/core/transport/chttp2/frame_rst_stream.c src/core/transport/chttp2/frame_settings.c src/core/transport/chttp2/frame_window_update.c src/core/transport/chttp2/hpack_parser.c src/core/transport/chttp2/hpack_table.c src/core/transport/chttp2/huffsyms.c src/core/transport/chttp2/status_conversion.c src/core/transport/chttp2/stream_encoder.c src/core/transport/chttp2/stream_map.c src/core/transport/chttp2/timeout_encoding.c src/core/transport/chttp2/varint.c src/core/transport/chttp2_transport.c src/core/transport/metadata.c src/core/transport/stream_op.c src/core/transport/transport.c src/core/transport/transport_op_string.c src/core/census/context.c src/core/census/initialize.c include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h src/core/support/env.h src/core/support/file.h src/core/support/murmur_hash.h src/core/support/string.h src/core/support/string_win32.h src/core/support/thd_internal.h src/core/support/alloc.c src/core/support/cancellable.c src/core/support/cmdline.c src/core/support/cpu_iphone.c src/core/support/cpu_linux.c src/core/support/cpu_posix.c src/core/support/cpu_windows.c src/core/support/env_linux.c src/core/support/env_posix.c src/core/support/env_win32.c src/core/support/file.c src/core/support/file_posix.c src/core/support/file_win32.c src/core/support/histogram.c src/core/support/host_port.c src/core/support/log.c src/core/support/log_android.c src/core/support/log_linux.c src/core/support/log_posix.c src/core/support/log_win32.c src/core/support/murmur_hash.c src/core/support/slice.c src/core/support/slice_buffer.c src/core/support/string.c src/core/support/string_posix.c src/core/support/string_win32.c src/core/support/subprocess_posix.c src/core/support/sync.c src/core/support/sync_posix.c src/core/support/sync_win32.c src/core/support/thd.c src/core/support/thd_posix.c src/core/support/thd_win32.c src/core/support/time.c src/core/support/time_posix.c src/core/support/time_win32.c src/core/support/tls_pthread.c # This tag can be used to specify the character encoding of the source files # that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses -- cgit v1.2.3 From d16af0ea52a932ced7562e21d5ec8f57eafa51c5 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Mon, 22 Jun 2015 22:39:21 -0700 Subject: Redesign of compression algorithm propagation based on metadata --- src/core/channel/compress_filter.c | 182 +++++++++++++-------- src/core/surface/call.c | 27 ++- src/core/surface/channel.c | 12 +- src/core/surface/channel.h | 3 +- .../tests/request_with_compressed_payload.c | 6 +- 5 files changed, 139 insertions(+), 91 deletions(-) (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 918cb2dd79..ad42bbb61c 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -31,6 +31,7 @@ * */ +#include #include #include "src/core/channel/compress_filter.h" @@ -42,16 +43,16 @@ typedef struct call_data { gpr_slice_buffer slices; + grpc_linked_mdelem compression_algorithm_storage; int remaining_slice_bytes; - int no_compress; /**< whether to skip compression for this specific call */ - - grpc_linked_mdelem compression_algorithm; + grpc_compression_algorithm compression_algorithm; + gpr_uint8 has_compression_algorithm; } call_data; typedef struct channel_data { - grpc_compression_algorithm compress_algorithm; - grpc_mdelem *compress_algorithm_md; - grpc_mdelem *no_compression_md; + grpc_mdstr *mdstr_compression_algorithm_key; + grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT]; + grpc_compression_algorithm default_compression_algorithm; } channel_data; /** Compress \a slices in place using \a algorithm. Returns 1 if compression did @@ -70,14 +71,41 @@ static int compress_send_sb(grpc_compression_algorithm algorithm, return did_compress; } +/** For each \a md element from the incoming metadata, filter out the entry for + * "grpc-compression-algorithm", using its value to populate the call data's + * compression_algorithm field. */ +static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + if (md->key == channeld->mdstr_compression_algorithm_key) { + assert(GPR_SLICE_LENGTH(md->value->slice) == + sizeof(grpc_compression_algorithm)); + memcpy(&calld->compression_algorithm, GPR_SLICE_START_PTR(md->value->slice), + sizeof(grpc_compression_algorithm)); + calld->has_compression_algorithm = 1; + return NULL; + } + + return md; +} + +static int skip_compression(channel_data *channeld, call_data *calld) { + if (calld->has_compression_algorithm && + (calld->compression_algorithm == GRPC_COMPRESS_NONE)) { + return 1; + } + /* no per-call compression override */ + return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; +} + static void process_send_ops(grpc_call_element *elem, grpc_stream_op_buffer *send_ops) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; size_t i, j; - int begin_message_index = -1; - int metadata_op_index = -1; - grpc_mdelem *actual_compression_md; + int did_compress = 0; /* buffer up slices until we've processed all the expected ones (as given by * GRPC_OP_BEGIN_MESSAGE) */ @@ -85,73 +113,83 @@ static void process_send_ops(grpc_call_element *elem, grpc_stream_op *sop = &send_ops->ops[i]; switch (sop->type) { case GRPC_OP_BEGIN_MESSAGE: - begin_message_index = i; calld->remaining_slice_bytes = sop->data.begin_message.length; - calld->no_compress = - !!(sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS); + /* TODO(dgq): we may want to get rid of the flags mechanism to have + * exceptions to compression: we can rely solely on metadata to set NONE + * as the compression algorithm */ + if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) { + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + break; + case GRPC_OP_METADATA: + /* Parse incoming request for compression. If any, it'll be available at + * calld->compression_algorithm */ + grpc_metadata_batch_filter(&(sop->data.metadata), compression_md_filter, + elem); + if (!calld->has_compression_algorithm) { + /* If no algorithm was found in the metadata and we aren't + * exceptionally skipping compression, fall back to the channel + * default */ + calld->compression_algorithm = + channeld->default_compression_algorithm; + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + } break; case GRPC_OP_SLICE: - if (calld->no_compress) continue; + if (skip_compression(channeld, calld)) continue; GPR_ASSERT(calld->remaining_slice_bytes > 0); /* add to calld->slices */ gpr_slice_buffer_add(&calld->slices, sop->data.slice); calld->remaining_slice_bytes -= GPR_SLICE_LENGTH(sop->data.slice); if (calld->remaining_slice_bytes == 0) { /* compress */ - if (!compress_send_sb(channeld->compress_algorithm, &calld->slices)) { - calld->no_compress = 1; /* GPR_TRUE */ - } + did_compress = + compress_send_sb(calld->compression_algorithm, &calld->slices); } break; - case GRPC_OP_METADATA: - /* Save the index of the first metadata op, to be processed after we - * know whether compression actually happened */ - if (metadata_op_index < 0) metadata_op_index = i; - break; case GRPC_NO_OP: - ; /* fallthrough, ignore */ + ; /* fallthrough */ } } - if (metadata_op_index < 0 || begin_message_index < 0) { /* bail out */ - return; - } - - /* update both the metadata and the begin_message's flags */ - if (calld->no_compress) { - /* either because the user requested the exception or because compressing - * would have resulted in a larger output */ - channeld->compress_algorithm = GRPC_COMPRESS_NONE; - actual_compression_md = channeld->no_compression_md; - /* reset the flag compression bit */ - send_ops->ops[begin_message_index].data.begin_message.flags &= - ~GRPC_WRITE_INTERNAL_COMPRESS; - } else { /* DID compress */ - actual_compression_md = channeld->compress_algorithm_md; - /* at this point, calld->slices contains the *compressed* slices from - * send_ops->ops[*]->data.slice. We now replace these input slices with the - * compressed ones. */ - for (i = 0, j = 0; i < send_ops->nops; ++i) { - grpc_stream_op *sop = &send_ops->ops[i]; - GPR_ASSERT(j < calld->slices.count); - switch (sop->type) { - case GRPC_OP_SLICE: - gpr_slice_unref(sop->data.slice); - sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); - break; - case GRPC_OP_BEGIN_MESSAGE: + /* We need to: + * - (OP_SLICE) If compression happened, replace the input slices with the + * compressed ones. + * - (BEGIN_MESSAGE) Update the message info (size, flags). + * - (OP_METADATA) Convey the compression configuration */ + for (i = 0, j = 0; i < send_ops->nops; ++i) { + grpc_stream_op *sop = &send_ops->ops[i]; + switch (sop->type) { + case GRPC_OP_BEGIN_MESSAGE: + if (did_compress) { sop->data.begin_message.length = calld->slices.length; sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; - case GRPC_NO_OP: - case GRPC_OP_METADATA: - ; /* fallthrough, ignore */ - } + } else { + /* either because the user requested the exception or because compressing + * would have resulted in a larger output */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + /* reset the flag compression bit */ + sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS; + } + break; + case GRPC_OP_METADATA: + grpc_metadata_batch_add_head( + &(sop->data.metadata), &calld->compression_algorithm_storage, + grpc_mdelem_ref(channeld->mdelem_compression_algorithms + [calld->compression_algorithm])); + break; + case GRPC_OP_SLICE: + if (did_compress) { + GPR_ASSERT(j < calld->slices.count); + gpr_slice_unref(sop->data.slice); + sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); + } + break; + case GRPC_NO_OP: + ; /* fallthrough */ } } - - grpc_metadata_batch_add_head( - &(send_ops->ops[metadata_op_index].data.metadata), - &calld->compression_algorithm, grpc_mdelem_ref(actual_compression_md)); } /* Called either: @@ -189,6 +227,7 @@ static void init_call_elem(grpc_call_element *elem, /* initialize members */ gpr_slice_buffer_init(&calld->slices); + calld->has_compression_algorithm = 0; if (initial_op) { if (initial_op->send_ops && initial_op->send_ops->nops > 0) { @@ -209,17 +248,23 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *channeld = elem->channel_data; + grpc_compression_algorithm algo_idx; const grpc_compression_level clevel = grpc_channel_args_get_compression_level(args); - const grpc_compression_algorithm none_alg = GRPC_COMPRESS_NONE; - channeld->compress_algorithm_md = grpc_mdelem_from_string_and_buffer( - mdctx, "grpc-compression-level", (gpr_uint8*)&clevel, sizeof(clevel)); - channeld->compress_algorithm = grpc_compression_algorithm_for_level(clevel); + channeld->default_compression_algorithm = + grpc_compression_algorithm_for_level(clevel); - channeld->no_compression_md = grpc_mdelem_from_string_and_buffer( - mdctx, "grpc-compression-level", (gpr_uint8 *)&none_alg, - sizeof(none_alg)); + channeld->mdstr_compression_algorithm_key = + grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm"); + + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { + channeld->mdelem_compression_algorithms[algo_idx] = + grpc_mdelem_from_metadata_strings( + mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key), + grpc_mdstr_from_buffer(mdctx, (gpr_uint8 *)&algo_idx, + sizeof(algo_idx))); + } /* The first and the last filters tend to be implemented differently to handle the case that there's no 'next' filter to call on the up or down @@ -231,8 +276,13 @@ static void init_channel_elem(grpc_channel_element *elem, /* Destructor for channel data */ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; - grpc_mdelem_unref(channeld->compress_algorithm_md); - grpc_mdelem_unref(channeld->no_compression_md); + grpc_compression_algorithm algo_idx; + + grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key); + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; + ++algo_idx) { + grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]); + } } const grpc_channel_filter grpc_compress_filter = { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 41257419c0..2b2d92cac7 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -209,8 +209,8 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /** Compression level for the call */ - grpc_compression_level compression_level; + /** Compression algorithm for the call */ + grpc_compression_algorithm compression_algorithm; /* Contexts for various subsystems (security, tracing, ...). */ grpc_call_context_element context[GRPC_CONTEXT_COUNT]; @@ -395,9 +395,9 @@ static void set_status_code(grpc_call *call, status_source source, } } -static void set_compression_level(grpc_call *call, - grpc_compression_level clevel) { - call->compression_level = clevel; +static void set_compression_algorithm(grpc_call *call, + grpc_compression_algorithm algo) { + call->compression_algorithm = algo; } static void set_status_details(grpc_call *call, status_source source, @@ -651,12 +651,10 @@ static void finish_message(grpc_call *call) { /* some aliases for readability */ gpr_slice *slices = call->incoming_message.slices; const size_t nslices = call->incoming_message.count; - const grpc_compression_algorithm compression_algorithm = - grpc_compression_algorithm_for_level(call->compression_level); - if (call->compression_level > GRPC_COMPRESS_LEVEL_NONE) { - byte_buffer = grpc_raw_compressed_byte_buffer_create(slices, nslices, - compression_algorithm); + if (call->compression_algorithm > GRPC_COMPRESS_NONE) { + byte_buffer = grpc_raw_compressed_byte_buffer_create( + slices, nslices, call->compression_algorithm); } else { byte_buffer = grpc_raw_byte_buffer_create(slices, nslices); } @@ -683,12 +681,11 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { * compression level should already be present in the call, as parsed off its * corresponding metadata. */ if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && - (call->compression_level == GRPC_COMPRESS_LEVEL_NONE)) { + (call->compression_algorithm == GRPC_COMPRESS_NONE)) { char *message = NULL; gpr_asprintf( &message, "Invalid compression algorithm (%s) for compressed message.", - grpc_compression_algorithm_name( - grpc_compression_algorithm_for_level(call->compression_level))); + grpc_compression_algorithm_name(call->compression_algorithm)); cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message, 1); } /* stash away parameters, and prepare for incoming slices */ @@ -1183,8 +1180,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); } else if (key == - grpc_channel_get_compresssion_level_string(call->channel)) { - set_compression_level(call, decode_compression(md)); + grpc_channel_get_compresssion_algorithm_string(call->channel)) { + set_compression_algorithm(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; if (dest->count == dest->capacity) { diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 6353a83b4f..e841a5d493 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -64,7 +64,7 @@ struct grpc_channel { grpc_mdctx *metadata_context; /** mdstr for the grpc-status key */ grpc_mdstr *grpc_status_string; - grpc_mdstr *grpc_compression_level_string; + grpc_mdstr *grpc_compression_algorithm_string; grpc_mdstr *grpc_message_string; grpc_mdstr *path_string; grpc_mdstr *authority_string; @@ -99,8 +99,8 @@ grpc_channel *grpc_channel_create_from_filters( gpr_ref_init(&channel->refs, 1 + is_client); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); - channel->grpc_compression_level_string = - grpc_mdstr_from_string(mdctx, "grpc-compression-level"); + channel->grpc_compression_algorithm_string = + grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; @@ -202,7 +202,7 @@ static void destroy_channel(void *p, int ok) { grpc_mdelem_unref(channel->grpc_status_elem[i]); } grpc_mdstr_unref(channel->grpc_status_string); - grpc_mdstr_unref(channel->grpc_compression_level_string); + grpc_mdstr_unref(channel->grpc_compression_algorithm_string); grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->authority_string); @@ -261,8 +261,8 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } -grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) { - return channel->grpc_compression_level_string; +grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(grpc_channel *channel) { + return channel->grpc_compression_algorithm_string; } grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index f838129148..f4df06a0c3 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -53,7 +53,8 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); -grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); +grpc_mdstr *grpc_channel_get_compresssion_algorithm_string( + grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index fe41780702..8cc4cb7fda 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -251,10 +251,10 @@ static void request_with_payload_template( config.tear_down_data(&f); } -static void test_invoke_request_with_excepcionally_uncompressed_payload( +static void test_invoke_request_with_exceptionally_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( - config, "test_invoke_request_with_excepcionally_uncompressed_payload", + config, "test_invoke_request_with_exceptionally_uncompressed_payload", GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE); } @@ -276,7 +276,7 @@ static void test_invoke_request_with_uncompressed_payload( void grpc_end2end_tests(grpc_end2end_test_config config) { - test_invoke_request_with_excepcionally_uncompressed_payload(config); + test_invoke_request_with_exceptionally_uncompressed_payload(config); test_invoke_request_with_compressed_payload(config); test_invoke_request_with_uncompressed_payload(config); } -- cgit v1.2.3 From 92ce5885592e4b854ce56857feb23b55741781e0 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 23 Jun 2015 17:07:12 -0700 Subject: Fixed bad merge. --- src/core/channel/compress_filter.c | 9 +++-- src/core/surface/call.c | 14 ++----- .../tests/request_with_compressed_payload.c | 47 +++++++++------------- 3 files changed, 27 insertions(+), 43 deletions(-) (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index ad42bbb61c..d617c4f1fd 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -166,8 +166,8 @@ static void process_send_ops(grpc_call_element *elem, sop->data.begin_message.length = calld->slices.length; sop->data.begin_message.flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { - /* either because the user requested the exception or because compressing - * would have resulted in a larger output */ + /* either because the user requested the exception or because + * compressing would have resulted in a larger output */ calld->compression_algorithm = GRPC_COMPRESS_NONE; /* reset the flag compression bit */ sop->data.begin_message.flags &= ~GRPC_WRITE_INTERNAL_COMPRESS; @@ -181,9 +181,10 @@ static void process_send_ops(grpc_call_element *elem, break; case GRPC_OP_SLICE: if (did_compress) { - GPR_ASSERT(j < calld->slices.count); gpr_slice_unref(sop->data.slice); - sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); + if (j < calld->slices.count) { + sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); + } } break; case GRPC_NO_OP: diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 06a9a8269d..baa9e3a4e4 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1207,20 +1207,12 @@ static void destroy_compression(void *ignored) {} static gpr_uint32 decode_compression(grpc_mdelem *md) { grpc_compression_level clevel; - void *user_data = grpc_mdelem_get_user_data(md, destroy_status); + void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); if (user_data) { clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; } else { - gpr_uint32 parsed_clevel_bytes; - if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { - /* the following cast is safe, as a gpr_uint32 should be able to hold all - * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level) parsed_clevel_bytes; - } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ - } + GPR_ASSERT(sizeof(clevel) == GPR_SLICE_LENGTH(md->value->slice)); + memcpy(&clevel, GPR_SLICE_START_PTR(md->value->slice), sizeof(clevel)); grpc_mdelem_set_user_data(md, destroy_compression, (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); } diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index 8cc4cb7fda..7ebdec5cc9 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -77,8 +77,8 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->server_cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck(f->server_cq, tag(1000), + grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); @@ -95,12 +95,9 @@ static void end_test(grpc_end2end_test_fixture *f) { shutdown_server(f); shutdown_client(f); - grpc_completion_queue_shutdown(f->server_cq); - drain_cq(f->server_cq); - grpc_completion_queue_destroy(f->server_cq); - grpc_completion_queue_shutdown(f->client_cq); - drain_cq(f->client_cq); - grpc_completion_queue_destroy(f->client_cq); + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); } static void request_with_payload_template( @@ -116,8 +113,6 @@ static void request_with_payload_template( grpc_channel_args *client_args; grpc_channel_args *server_args; grpc_end2end_test_fixture f; - cq_verifier *v_client; - cq_verifier *v_server; grpc_op ops[6]; grpc_op *op; grpc_metadata_array initial_metadata_recv; @@ -129,6 +124,7 @@ static void request_with_payload_template( char *details = NULL; size_t details_capacity = 0; int was_cancelled = 2; + cq_verifier *cqv; char str[1024]; memset(&str[0], 1023, 'x'); str[1023] = '\0'; request_payload_slice = gpr_slice_from_copied_string(str); @@ -140,10 +136,9 @@ static void request_with_payload_template( NULL, requested_compression_level); f = begin_test(config, test_name, client_args, server_args); - v_client = cq_verifier_create(f.client_cq); - v_server = cq_verifier_create(f.server_cq); + cqv = cq_verifier_create(f.cq); - c = grpc_channel_create_call(f.client, f.client_cq, "/foo", + c = grpc_channel_create_call(f.client, f.cq, "/foo", "foo.test.google.fr", deadline); GPR_ASSERT(c); @@ -179,10 +174,10 @@ static void request_with_payload_template( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, - &request_metadata_recv, f.server_cq, - f.server_cq, tag(101))); - cq_expect_completion(v_server, tag(101), 1); - cq_verify(v_server); + &request_metadata_recv, f.cq, + f.cq, tag(101))); + cq_expect_completion(cqv, tag(101), 1); + cq_verify(cqv); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; @@ -195,8 +190,8 @@ static void request_with_payload_template( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(102))); - cq_expect_completion(v_server, tag(102), 1); - cq_verify(v_server); + cq_expect_completion(cqv, tag(102), 1); + cq_verify(cqv); op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; @@ -211,11 +206,9 @@ static void request_with_payload_template( op++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(103))); - cq_expect_completion(v_server, tag(103), 1); - cq_verify(v_server); - - cq_expect_completion(v_client, tag(1), 1); - cq_verify(v_client); + cq_expect_completion(cqv, tag(103), 1); + cq_expect_completion(cqv, tag(1), 1); + cq_verify(cqv); GPR_ASSERT(status == GRPC_STATUS_OK); GPR_ASSERT(0 == strcmp(details, "xyz")); @@ -238,8 +231,7 @@ static void request_with_payload_template( grpc_call_destroy(c); grpc_call_destroy(s); - cq_verifier_destroy(v_client); - cq_verifier_destroy(v_server); + cq_verifier_destroy(cqv); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(request_payload_recv); @@ -274,9 +266,8 @@ static void test_invoke_request_with_uncompressed_payload( grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_NONE)); } - void grpc_end2end_tests(grpc_end2end_test_config config) { test_invoke_request_with_exceptionally_uncompressed_payload(config); - test_invoke_request_with_compressed_payload(config); test_invoke_request_with_uncompressed_payload(config); + test_invoke_request_with_compressed_payload(config); } -- cgit v1.2.3 From fc0fa3381c7b7088936ce6f41a8f8c41ef3c38ac Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Thu, 25 Jun 2015 18:11:07 -0700 Subject: Moar tests, fixed wrongly named vbles, minor bugs. --- include/grpc/compression.h | 21 +++++++- src/core/channel/compress_filter.c | 26 +++++---- src/core/compression/algorithm.c | 46 +++++++++++++--- src/core/surface/call.c | 26 +++++---- src/core/surface/channel.c | 5 +- test/core/compression/message_compress_test.c | 4 +- .../tests/request_with_compressed_payload.c | 62 ++++++++++++++++++---- 7 files changed, 149 insertions(+), 41 deletions(-) (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/include/grpc/compression.h b/include/grpc/compression.h index 61bce05b50..1cff5d2d7e 100644 --- a/include/grpc/compression.h +++ b/include/grpc/compression.h @@ -50,12 +50,29 @@ typedef enum { GRPC_COMPRESS_LEVEL_NONE = 0, GRPC_COMPRESS_LEVEL_LOW, GRPC_COMPRESS_LEVEL_MED, - GRPC_COMPRESS_LEVEL_HIGH + GRPC_COMPRESS_LEVEL_HIGH, + GRPC_COMPRESS_LEVEL_COUNT } grpc_compression_level; -const char *grpc_compression_algorithm_name( +/** Parses \a name as a grpc_compression_algorithm instance, updating \a + * algorithm. Returns 1 upon success, 0 otherwise. */ +int grpc_compression_algorithm_parse(const char *name, + grpc_compression_algorithm *algorithm); + +/** Updates \a name with the encoding name corresponding to a valid \a + * algorithm. Returns 1 upon success, 0 otherwise. */ +int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, + char **name); + +/** Returns the compression level corresponding to \a algorithm. + * + * It abort()s for unknown algorithms. */ +grpc_compression_level grpc_compression_level_for_algorithm( grpc_compression_algorithm algorithm); +/** Returns the compression algorithm corresponding to \a level. + * + * It abort()s for unknown levels . */ grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_level level); diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index bff2b6f57b..f5fe87d6b8 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -80,10 +80,13 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { channel_data *channeld = elem->channel_data; if (md->key == channeld->mdstr_compression_algorithm_key) { - assert(GPR_SLICE_LENGTH(md->value->slice) == - sizeof(grpc_compression_algorithm)); - memcpy(&calld->compression_algorithm, GPR_SLICE_START_PTR(md->value->slice), - sizeof(grpc_compression_algorithm)); + const char *md_c_str = grpc_mdstr_as_c_string(md->value); + if (!grpc_compression_algorithm_parse(md_c_str, + &calld->compression_algorithm)) { + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'. Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } calld->has_compression_algorithm = 1; return NULL; } @@ -92,9 +95,11 @@ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { } static int skip_compression(channel_data *channeld, call_data *calld) { - if (calld->has_compression_algorithm && - (calld->compression_algorithm == GRPC_COMPRESS_NONE)) { - return 1; + if (calld->has_compression_algorithm) { + if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { + return 1; + } + return 0; /* we have an actual call-specific algorithm */ } /* no per-call compression override */ return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; @@ -255,14 +260,15 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_compression_algorithm_for_level(clevel); channeld->mdstr_compression_algorithm_key = - grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm"); + grpc_mdstr_from_string(mdctx, "grpc-encoding"); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { + char *algorith_name; + GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key), - grpc_mdstr_from_buffer(mdctx, (gpr_uint8 *)&algo_idx, - sizeof(algo_idx))); + grpc_mdstr_from_string(mdctx, algorith_name)); } /* The first and the last filters tend to be implemented differently to diff --git a/src/core/compression/algorithm.c b/src/core/compression/algorithm.c index 4db48df6cb..e426241d0a 100644 --- a/src/core/compression/algorithm.c +++ b/src/core/compression/algorithm.c @@ -32,21 +32,39 @@ */ #include +#include #include -const char *grpc_compression_algorithm_name( - grpc_compression_algorithm algorithm) { +int grpc_compression_algorithm_parse(const char* name, + grpc_compression_algorithm *algorithm) { + if (strcmp(name, "none") == 0) { + *algorithm = GRPC_COMPRESS_NONE; + } else if (strcmp(name, "gzip") == 0) { + *algorithm = GRPC_COMPRESS_GZIP; + } else if (strcmp(name, "deflate") == 0) { + *algorithm = GRPC_COMPRESS_DEFLATE; + } else { + return 0; + } + return 1; +} + +int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm, + char **name) { switch (algorithm) { case GRPC_COMPRESS_NONE: - return "none"; + *name = "none"; + break; case GRPC_COMPRESS_DEFLATE: - return "deflate"; + *name = "deflate"; + break; case GRPC_COMPRESS_GZIP: - return "gzip"; - case GRPC_COMPRESS_ALGORITHMS_COUNT: - return "error"; + *name = "gzip"; + break; + default: + return 0; } - return "error"; + return 1; } /* TODO(dgq): Add the ability to specify parameters to the individual @@ -65,3 +83,15 @@ grpc_compression_algorithm grpc_compression_algorithm_for_level( abort(); } } + +grpc_compression_level grpc_compression_level_for_algorithm( + grpc_compression_algorithm algorithm) { + grpc_compression_level clevel; + for (clevel = GRPC_COMPRESS_LEVEL_NONE; clevel < GRPC_COMPRESS_LEVEL_COUNT; + ++clevel) { + if (grpc_compression_algorithm_for_level(clevel) == algorithm) { + return clevel; + } + } + abort(); +} diff --git a/src/core/surface/call.c b/src/core/surface/call.c index baa9e3a4e4..37dadecb35 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -724,9 +724,14 @@ static int begin_message(grpc_call *call, grpc_begin_message msg) { if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) && (call->compression_algorithm == GRPC_COMPRESS_NONE)) { char *message = NULL; - gpr_asprintf( - &message, "Invalid compression algorithm (%s) for compressed message.", - grpc_compression_algorithm_name(call->compression_algorithm)); + char *alg_name; + if (!grpc_compression_algorithm_name(call->compression_algorithm, &alg_name)) { + /* This shouldn't happen, other than due to data corruption */ + alg_name = ""; + } + gpr_asprintf(&message, + "Invalid compression algorithm (%s) for compressed message.", + alg_name); cancel_with_status(call, GRPC_STATUS_FAILED_PRECONDITION, message); } /* stash away parameters, and prepare for incoming slices */ @@ -1206,17 +1211,20 @@ static gpr_uint32 decode_status(grpc_mdelem *md) { static void destroy_compression(void *ignored) {} static gpr_uint32 decode_compression(grpc_mdelem *md) { - grpc_compression_level clevel; + grpc_compression_algorithm algorithm; void *user_data = grpc_mdelem_get_user_data(md, destroy_compression); if (user_data) { - clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; + algorithm = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET; } else { - GPR_ASSERT(sizeof(clevel) == GPR_SLICE_LENGTH(md->value->slice)); - memcpy(&clevel, GPR_SLICE_START_PTR(md->value->slice), sizeof(clevel)); + const char *md_c_str = grpc_mdstr_as_c_string(md->value); + if (!grpc_compression_algorithm_parse(md_c_str, &algorithm)) { + gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str); + assert(0); + } grpc_mdelem_set_user_data(md, destroy_compression, - (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); + (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET)); } - return clevel; + return algorithm; } static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index bf0d282861..d3dcb2255f 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -100,7 +100,7 @@ grpc_channel *grpc_channel_create_from_filters( channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_compression_algorithm_string = - grpc_mdstr_from_string(mdctx, "grpc-compression-algorithm"); + grpc_mdstr_from_string(mdctx, "grpc-encoding"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { char buf[GPR_LTOA_MIN_BUFSIZE]; @@ -273,7 +273,8 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } -grpc_mdstr *grpc_channel_get_compresssion_algorithm_string(grpc_channel *channel) { +grpc_mdstr *grpc_channel_get_compresssion_algorithm_string( + grpc_channel *channel) { return channel->grpc_compression_algorithm_string; } diff --git a/test/core/compression/message_compress_test.c b/test/core/compression/message_compress_test.c index 4033c18131..f5f21cff25 100644 --- a/test/core/compression/message_compress_test.c +++ b/test/core/compression/message_compress_test.c @@ -61,13 +61,15 @@ static void assert_passthrough(gpr_slice value, gpr_slice_buffer output; gpr_slice final; int was_compressed; + char *algorithm_name; + GPR_ASSERT(grpc_compression_algorithm_name(algorithm, &algorithm_name) != 0); gpr_log(GPR_INFO, "assert_passthrough: value_length=%d value_hash=0x%08x " "algorithm='%s' uncompressed_split='%s' compressed_split='%s'", GPR_SLICE_LENGTH(value), gpr_murmur_hash3(GPR_SLICE_START_PTR(value), GPR_SLICE_LENGTH(value), 0), - grpc_compression_algorithm_name(algorithm), + algorithm_name, grpc_slice_split_mode_name(uncompressed_split_mode), grpc_slice_split_mode_name(compressed_split_mode)); diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index 7ebdec5cc9..ca16bc7d52 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -104,7 +104,8 @@ static void request_with_payload_template( grpc_end2end_test_config config, const char *test_name, gpr_uint32 send_flags_bitmask, grpc_compression_level requested_compression_level, - grpc_compression_algorithm expected_compression_algorithm) { + grpc_compression_algorithm expected_compression_algorithm, + grpc_metadata *client_metadata) { grpc_call *c; grpc_call *s; gpr_slice request_payload_slice; @@ -125,8 +126,9 @@ static void request_with_payload_template( size_t details_capacity = 0; int was_cancelled = 2; cq_verifier *cqv; + char str[1024]; - char str[1024]; memset(&str[0], 1023, 'x'); str[1023] = '\0'; + memset(&str[0], 1023, 'x'); str[1023] = '\0'; request_payload_slice = gpr_slice_from_copied_string(str); request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); @@ -149,7 +151,12 @@ static void request_with_payload_template( op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; + if (client_metadata != NULL) { + op->data.send_initial_metadata.count = 1; + op->data.send_initial_metadata.metadata = client_metadata; + } else { + op->data.send_initial_metadata.count = 0; + } op->flags = 0; op++; op->op = GRPC_OP_SEND_MESSAGE; @@ -247,7 +254,16 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( config, "test_invoke_request_with_exceptionally_uncompressed_payload", - GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE); + GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE, + NULL); +} + +static void test_invoke_request_with_uncompressed_payload( + grpc_end2end_test_config config) { + request_with_payload_template( + config, "test_invoke_request_with_uncompressed_payload", 0, + GRPC_COMPRESS_LEVEL_NONE, + grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_NONE), NULL); } static void test_invoke_request_with_compressed_payload( @@ -255,19 +271,47 @@ static void test_invoke_request_with_compressed_payload( request_with_payload_template( config, "test_invoke_request_with_compressed_payload", 0, GRPC_COMPRESS_LEVEL_HIGH, - grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_HIGH)); + grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_HIGH), NULL); } -static void test_invoke_request_with_uncompressed_payload( +static void test_invoke_request_with_compressed_payload_md_override( grpc_end2end_test_config config) { + grpc_metadata gzip_compression_override; + grpc_metadata none_compression_override; + + gzip_compression_override.key = "grpc-encoding"; + gzip_compression_override.value = "gzip"; + gzip_compression_override.value_length = 4; + memset(&gzip_compression_override.internal_data, 0, + sizeof(gzip_compression_override.internal_data)); + + none_compression_override.key = "grpc-encoding"; + none_compression_override.value = "none"; + none_compression_override.value_length = 4; + memset(&none_compression_override.internal_data, 0, + sizeof(none_compression_override.internal_data)); + + /* Channel default NONE, call override to GZIP */ request_with_payload_template( - config, "test_invoke_request_with_uncompressed_payload", 0, - GRPC_COMPRESS_LEVEL_NONE, - grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_NONE)); + config, "test_invoke_request_with_compressed_payload_md_override_1", 0, + GRPC_COMPRESS_LEVEL_NONE, GRPC_COMPRESS_GZIP, &gzip_compression_override); + + /* Channel default DEFLATE, call override to GZIP */ + request_with_payload_template( + config, "test_invoke_request_with_compressed_payload_md_override_2", 0, + grpc_compression_level_for_algorithm(GRPC_COMPRESS_DEFLATE), + GRPC_COMPRESS_GZIP, &gzip_compression_override); + + /* Channel default DEFLATE, call override to NONE */ + request_with_payload_template( + config, "test_invoke_request_with_compressed_payload_md_override_3", 0, + grpc_compression_level_for_algorithm(GRPC_COMPRESS_DEFLATE), + GRPC_COMPRESS_NONE, &none_compression_override); } void grpc_end2end_tests(grpc_end2end_test_config config) { test_invoke_request_with_exceptionally_uncompressed_payload(config); test_invoke_request_with_uncompressed_payload(config); test_invoke_request_with_compressed_payload(config); + test_invoke_request_with_compressed_payload_md_override(config); } -- cgit v1.2.3 From d7d9ce27c523798384051246e18e3f00b29dd8c9 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 30 Jun 2015 23:29:03 -0700 Subject: WIP in *_end2end_test.cc. Tests pass. Fixed leaks and introduced concept of compression request thru MD --- include/grpc++/client_context.h | 15 +++++++++++++++ include/grpc++/server_context.h | 14 ++++++++++++++ include/grpc/compression.h | 8 ++++++++ src/core/channel/compress_filter.c | 19 ++++++++++++------- src/core/channel/compress_filter.h | 2 ++ src/core/surface/call.c | 2 +- src/core/surface/channel.c | 2 +- src/core/surface/channel.h | 2 +- src/core/surface/secure_channel_create.c | 5 +---- src/cpp/client/client_context.cc | 21 +++++++++++++++++++++ src/cpp/proto/proto_utils.cc | 4 +++- src/cpp/server/server_context.cc | 20 ++++++++++++++++++++ .../end2end/tests/request_with_compressed_payload.c | 6 ++++-- test/cpp/end2end/end2end_test.cc | 3 ++- test/cpp/end2end/generic_end2end_test.cc | 1 + 15 files changed, 106 insertions(+), 18 deletions(-) (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 5e10875260..88954e227b 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -38,6 +38,7 @@ #include #include +#include #include #include #include @@ -107,6 +108,17 @@ class ClientContext { creds_ = creds; } + grpc_compression_level get_compression_level() const { + return compression_level_; + } + void set_compression_level(grpc_compression_level level); + + grpc_compression_algorithm get_compression_algorithm() const { + return compression_algorithm_; + } + void set_compression_algorithm(grpc_compression_algorithm algorithm); + + void TryCancel(); private: @@ -157,6 +169,9 @@ class ClientContext { std::multimap send_initial_metadata_; std::multimap recv_initial_metadata_; std::multimap trailing_metadata_; + + grpc_compression_level compression_level_; + grpc_compression_algorithm compression_algorithm_; }; } // namespace grpc diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h index 326b6a125c..a2f0a2f990 100644 --- a/include/grpc++/server_context.h +++ b/include/grpc++/server_context.h @@ -36,6 +36,7 @@ #include +#include #include #include #include @@ -97,6 +98,16 @@ class ServerContext { return client_metadata_; } + grpc_compression_level get_compression_level() const { + return compression_level_; + } + void set_compression_level(grpc_compression_level level); + + grpc_compression_algorithm get_compression_algorithm() const { + return compression_algorithm_; + } + void set_compression_algorithm(grpc_compression_algorithm algorithm); + private: friend class ::grpc::Server; template @@ -142,6 +153,9 @@ class ServerContext { std::multimap client_metadata_; std::multimap initial_metadata_; std::multimap trailing_metadata_; + + grpc_compression_level compression_level_; + grpc_compression_algorithm compression_algorithm_; }; } // namespace grpc diff --git a/include/grpc/compression.h b/include/grpc/compression.h index 1cff5d2d7e..dd7e1d0a12 100644 --- a/include/grpc/compression.h +++ b/include/grpc/compression.h @@ -34,6 +34,10 @@ #ifndef GRPC_COMPRESSION_H #define GRPC_COMPRESSION_H +#ifdef __cplusplus +extern "C" { +#endif + /** To be used in channel arguments */ #define GRPC_COMPRESSION_LEVEL_ARG "grpc.compression_level" @@ -76,4 +80,8 @@ grpc_compression_level grpc_compression_level_for_algorithm( grpc_compression_algorithm grpc_compression_algorithm_for_level( grpc_compression_level level); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_COMPRESSION_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index f5fe87d6b8..6100a90668 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -50,7 +50,8 @@ typedef struct call_data { } call_data; typedef struct channel_data { - grpc_mdstr *mdstr_compression_algorithm_key; + grpc_mdstr *mdstr_request_compression_algorithm_key; + grpc_mdstr *mdstr_outgoing_compression_algorithm_key; grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT]; grpc_compression_algorithm default_compression_algorithm; } channel_data; @@ -72,14 +73,14 @@ static int compress_send_sb(grpc_compression_algorithm algorithm, } /** For each \a md element from the incoming metadata, filter out the entry for - * "grpc-compression-algorithm", using its value to populate the call data's + * "grpc-encoding", using its value to populate the call data's * compression_algorithm field. */ static grpc_mdelem* compression_md_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; - if (md->key == channeld->mdstr_compression_algorithm_key) { + if (md->key == channeld->mdstr_request_compression_algorithm_key) { const char *md_c_str = grpc_mdstr_as_c_string(md->value); if (!grpc_compression_algorithm_parse(md_c_str, &calld->compression_algorithm)) { @@ -184,7 +185,6 @@ static void process_send_ops(grpc_call_element *elem, break; case GRPC_OP_SLICE: if (did_compress) { - gpr_slice_unref(sop->data.slice); if (j < calld->slices.count) { sop->data.slice = gpr_slice_ref(calld->slices.slices[j++]); } @@ -259,7 +259,10 @@ static void init_channel_elem(grpc_channel_element *elem, channeld->default_compression_algorithm = grpc_compression_algorithm_for_level(clevel); - channeld->mdstr_compression_algorithm_key = + channeld->mdstr_request_compression_algorithm_key = + grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY); + + channeld->mdstr_outgoing_compression_algorithm_key = grpc_mdstr_from_string(mdctx, "grpc-encoding"); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { @@ -267,7 +270,8 @@ static void init_channel_elem(grpc_channel_element *elem, GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorith_name) != 0); channeld->mdelem_compression_algorithms[algo_idx] = grpc_mdelem_from_metadata_strings( - mdctx, grpc_mdstr_ref(channeld->mdstr_compression_algorithm_key), + mdctx, + grpc_mdstr_ref(channeld->mdstr_outgoing_compression_algorithm_key), grpc_mdstr_from_string(mdctx, algorith_name)); } @@ -283,7 +287,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - grpc_mdstr_unref(channeld->mdstr_compression_algorithm_key); + grpc_mdstr_unref(channeld->mdstr_request_compression_algorithm_key); + grpc_mdstr_unref(channeld->mdstr_outgoing_compression_algorithm_key); for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { grpc_mdelem_unref(channeld->mdelem_compression_algorithms[algo_idx]); diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h index ea667969e1..3a196eb7bf 100644 --- a/src/core/channel/compress_filter.h +++ b/src/core/channel/compress_filter.h @@ -36,6 +36,8 @@ #include "src/core/channel/channel_stack.h" +#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "internal:grpc-encoding-request" + /** Message-level compression filter. * * See for the available compression levels. diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 37dadecb35..5f489c0f4e 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -1243,7 +1243,7 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); } else if (key == - grpc_channel_get_compresssion_algorithm_string(call->channel)) { + grpc_channel_get_compression_algorithm_string(call->channel)) { set_compression_algorithm(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index d3dcb2255f..cab99e71d3 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -273,7 +273,7 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) { return channel->grpc_status_string; } -grpc_mdstr *grpc_channel_get_compresssion_algorithm_string( +grpc_mdstr *grpc_channel_get_compression_algorithm_string( grpc_channel *channel) { return channel->grpc_compression_algorithm_string; } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 8d0fe812ce..66924ad72c 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -53,7 +53,7 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel); grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int status_code); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); -grpc_mdstr *grpc_channel_get_compresssion_algorithm_string( +grpc_mdstr *grpc_channel_get_compression_algorithm_string( grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c index be46c54427..cfa869ec71 100644 --- a/src/core/surface/secure_channel_create.c +++ b/src/core/surface/secure_channel_create.c @@ -244,10 +244,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds, if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ - if (grpc_channel_args_get_compression_level(args) > - GRPC_COMPRESS_LEVEL_NONE) { - filters[n++] = &grpc_compress_filter; - } + filters[n++] = &grpc_compress_filter; filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1); diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 72cdd49d19..0eba554e33 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -34,9 +34,12 @@ #include #include +#include #include #include +#include "src/core/channel/compress_filter.h" + namespace grpc { ClientContext::ClientContext() @@ -75,6 +78,24 @@ void ClientContext::set_call(grpc_call* call, } } +void ClientContext::set_compression_level(grpc_compression_level level) { + const grpc_compression_algorithm algorithm_for_level = + grpc_compression_algorithm_for_level(level); + set_compression_algorithm(algorithm_for_level); +} + +void ClientContext::set_compression_algorithm( + grpc_compression_algorithm algorithm) { + char* algorithm_name = NULL; + if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { + gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", + algorithm); + abort(); + } + GPR_ASSERT(algorithm_name != NULL); + AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name); +} + void ClientContext::TryCancel() { if (call_) { grpc_call_cancel(call_); diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 268e4f6d1f..337f680129 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -103,7 +103,9 @@ class GrpcBufferReader GRPC_FINAL : byte_count_(0), backup_count_(0) { grpc_byte_buffer_reader_init(&reader_, buffer); } - ~GrpcBufferReader() GRPC_OVERRIDE {} + ~GrpcBufferReader() GRPC_OVERRIDE { + grpc_byte_buffer_reader_destroy(&reader_); + } bool Next(const void** data, int* size) GRPC_OVERRIDE { if (backup_count_ > 0) { diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 699895a3cf..087e28d33a 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -39,6 +39,8 @@ #include #include +#include "src/core/channel/compress_filter.h" + namespace grpc { // CompletionOp @@ -146,4 +148,22 @@ bool ServerContext::IsCancelled() { return completion_op_ && completion_op_->CheckCancelled(cq_); } +void ServerContext::set_compression_level(grpc_compression_level level) { + const grpc_compression_algorithm algorithm_for_level = + grpc_compression_algorithm_for_level(level); + set_compression_algorithm(algorithm_for_level); +} + +void ServerContext::set_compression_algorithm( + grpc_compression_algorithm algorithm) { + char* algorithm_name = NULL; + if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { + gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.", + algorithm); + abort(); + } + GPR_ASSERT(algorithm_name != NULL); + AddInitialMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name); +} + } // namespace grpc diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index ca16bc7d52..a6057457c4 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -45,6 +45,7 @@ #include "test/core/end2end/cq_verifier.h" #include "src/core/channel/channel_args.h" +#include "src/core/channel/compress_filter.h" enum { TIMEOUT = 200000 }; @@ -240,6 +241,7 @@ static void request_with_payload_template( cq_verifier_destroy(cqv); + gpr_slice_unref(request_payload_slice); grpc_byte_buffer_destroy(request_payload); grpc_byte_buffer_destroy(request_payload_recv); @@ -279,13 +281,13 @@ static void test_invoke_request_with_compressed_payload_md_override( grpc_metadata gzip_compression_override; grpc_metadata none_compression_override; - gzip_compression_override.key = "grpc-encoding"; + gzip_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY; gzip_compression_override.value = "gzip"; gzip_compression_override.value_length = 4; memset(&gzip_compression_override.internal_data, 0, sizeof(gzip_compression_override.internal_data)); - none_compression_override.key = "grpc-encoding"; + none_compression_override.key = GRPC_COMPRESS_REQUEST_ALGORITHM_KEY; none_compression_override.value = "none"; none_compression_override.value_length = 4; memset(&none_compression_override.internal_data, 0, diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index 45ba8b0878..49070a7df1 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -226,10 +226,11 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, int num_rpcs) { EchoRequest request; EchoResponse response; - request.set_message("Hello"); + request.set_message("Hello hello hello hello"); for (int i = 0; i < num_rpcs; ++i) { ClientContext context; + context.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); Status s = stub->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.ok()); diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index b9d47b32de..e9d86cc9f7 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -227,6 +227,7 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { GenericServerContext srv_ctx; GenericServerAsyncReaderWriter srv_stream(&srv_ctx); + cli_ctx.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); send_request.set_message("Hello"); std::unique_ptr cli_stream = generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); -- cgit v1.2.3 From 81f77dbc5ec1e852a00baff3d2bc9470f4b4c606 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Wed, 8 Jul 2015 23:51:37 -0700 Subject: Fixed stupid bug from another dimension. Thanks msan. --- test/core/end2end/tests/request_with_compressed_payload.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index a6057457c4..0c1b065bd8 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -129,7 +129,7 @@ static void request_with_payload_template( cq_verifier *cqv; char str[1024]; - memset(&str[0], 1023, 'x'); str[1023] = '\0'; + memset(str, 'x', 1023); str[1023] = '\0'; request_payload_slice = gpr_slice_from_copied_string(str); request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); -- cgit v1.2.3 From cadbf22467ef7c636dabec53a0178f1b669ba42e Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 17 Jul 2015 15:33:13 -0700 Subject: Removed compression levels from clients and _experimental_'d signature of methods manipulating compression algorithms --- include/grpc++/channel_arguments.h | 5 +++-- include/grpc++/client_context.h | 12 ++++------ include/grpc/compression.h | 2 +- src/core/channel/channel_args.c | 14 ++++++------ src/core/channel/channel_args.h | 12 +++++----- src/core/channel/compress_filter.c | 24 +++++++++++++++++--- src/core/channel/compress_filter.h | 6 +---- src/cpp/client/channel_arguments.cc | 5 +++-- src/cpp/client/client_context.cc | 8 +------ .../fixtures/chttp2_fullstack_compression.c | 8 +++---- .../tests/request_with_compressed_payload.c | 26 +++++++++------------- test/cpp/end2end/end2end_test.cc | 2 +- test/cpp/end2end/generic_end2end_test.cc | 2 +- 13 files changed, 64 insertions(+), 62 deletions(-) (limited to 'test/core/end2end/tests/request_with_compressed_payload.c') diff --git a/include/grpc++/channel_arguments.h b/include/grpc++/channel_arguments.h index 68f24cde4a..7b17830a86 100644 --- a/include/grpc++/channel_arguments.h +++ b/include/grpc++/channel_arguments.h @@ -59,8 +59,9 @@ class ChannelArguments { void SetSslTargetNameOverride(const grpc::string& name); // TODO(yangg) add flow control options - // Set the compression level for the channel. - void SetCompressionLevel(grpc_compression_level level); + // Set the compression algorithm for the channel. + void _Experimental_SetCompressionAlgorithm( + grpc_compression_algorithm algorithm); // Generic channel argument setters. Only for advanced use cases. void SetInt(const grpc::string& key, int value); diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 4bad20de42..6b8d7211d1 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -110,15 +110,12 @@ class ClientContext { creds_ = creds; } - grpc_compression_level get_compression_level() const { - return compression_level_; - } - void set_compression_level(grpc_compression_level level); - - grpc_compression_algorithm get_compression_algorithm() const { + grpc_compression_algorithm _experimental_get_compression_algorithm() const { return compression_algorithm_; } - void set_compression_algorithm(grpc_compression_algorithm algorithm); + + void _experimental_set_compression_algorithm( + grpc_compression_algorithm algorithm); std::shared_ptr auth_context() const; @@ -179,7 +176,6 @@ class ClientContext { std::multimap recv_initial_metadata_; std::multimap trailing_metadata_; - grpc_compression_level compression_level_; grpc_compression_algorithm compression_algorithm_; }; diff --git a/include/grpc/compression.h b/include/grpc/compression.h index dd7e1d0a12..913e553ba9 100644 --- a/include/grpc/compression.h +++ b/include/grpc/compression.h @@ -39,7 +39,7 @@ extern "C" { #endif /** To be used in channel arguments */ -#define GRPC_COMPRESSION_LEVEL_ARG "grpc.compression_level" +#define GRPC_COMPRESSION_ALGORITHM_ARG "grpc.compression_algorithm" /* The various compression algorithms supported by GRPC */ typedef enum { diff --git a/src/core/channel/channel_args.c b/src/core/channel/channel_args.c index d45898f2f4..c430b56fa2 100644 --- a/src/core/channel/channel_args.c +++ b/src/core/channel/channel_args.c @@ -124,25 +124,25 @@ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { return 0; } -grpc_compression_level grpc_channel_args_get_compression_level( +grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( const grpc_channel_args *a) { size_t i; if (a == NULL) return 0; for (i = 0; i < a->num_args; ++i) { if (a->args[i].type == GRPC_ARG_INTEGER && - !strcmp(GRPC_COMPRESSION_LEVEL_ARG, a->args[i].key)) { + !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) { return a->args[i].value.integer; break; } } - return GRPC_COMPRESS_LEVEL_NONE; + return GRPC_COMPRESS_NONE; } -grpc_channel_args *grpc_channel_args_set_compression_level( - grpc_channel_args *a, grpc_compression_level level) { +grpc_channel_args *grpc_channel_args_set_compression_algorithm( + grpc_channel_args *a, grpc_compression_algorithm algorithm) { grpc_arg tmp; tmp.type = GRPC_ARG_INTEGER; - tmp.key = GRPC_COMPRESSION_LEVEL_ARG; - tmp.value.integer = level; + tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG; + tmp.value.integer = algorithm; return grpc_channel_args_copy_and_add(a, &tmp, 1); } diff --git a/src/core/channel/channel_args.h b/src/core/channel/channel_args.h index 17321010c5..7e6ddd3997 100644 --- a/src/core/channel/channel_args.h +++ b/src/core/channel/channel_args.h @@ -57,14 +57,14 @@ void grpc_channel_args_destroy(grpc_channel_args *a); * is specified in channel args, otherwise returns 0. */ int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); -/** Returns the compression level set in \a a. */ -grpc_compression_level grpc_channel_args_get_compression_level( +/** Returns the compression algorithm set in \a a. */ +grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( const grpc_channel_args *a); /** Returns a channel arg instance with compression enabled. If \a a is - * non-NULL, its args are copied. N.B. GRPC_COMPRESS_LEVEL_NONE disables - * compression for the channel. */ -grpc_channel_args *grpc_channel_args_set_compression_level( - grpc_channel_args *a, grpc_compression_level level); + * non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression + * for the channel. */ +grpc_channel_args *grpc_channel_args_set_compression_algorithm( + grpc_channel_args *a, grpc_compression_algorithm algorithm); #endif /* GRPC_INTERNAL_CORE_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 93b9b203e5..3d85ed41c5 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -31,6 +31,26 @@ * */ +/** Compression filter for outgoing data. + * + * Compression settings may come from: + * - Channel configuration, as established at channel creation time. + * - The metadata accompanying the outgoing data to be compressed. This is + * taken as a request only. We may choose not to honor it. The metadata key + * is given by \a GRPC_COMPRESS_REQUEST_ALGORITHM_KEY. + * + * Compression can be disabled for concrete messages (for instance in order to + * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in + * the BEGIN_MESSAGE flags. + * + * The attempted compression mechanism is added to the resulting initial + * metadata under the'grpc-encoding' key. + * + * If compression is actually performed, BEGIN_MESSAGE's flag is modified to + * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the + * aforementioned 'grpc-encoding' metadata value, data will pass through + * uncompressed. */ + #include #include @@ -277,11 +297,9 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, int is_first, int is_last) { channel_data *channeld = elem->channel_data; grpc_compression_algorithm algo_idx; - const grpc_compression_level clevel = - grpc_channel_args_get_compression_level(args); channeld->default_compression_algorithm = - grpc_compression_algorithm_for_level(clevel); + grpc_channel_args_get_compression_algorithm(args); channeld->mdstr_request_compression_algorithm_key = grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY); diff --git a/src/core/channel/compress_filter.h b/src/core/channel/compress_filter.h index 3a196eb7bf..8d9c3ba697 100644 --- a/src/core/channel/compress_filter.h +++ b/src/core/channel/compress_filter.h @@ -40,11 +40,7 @@ /** Message-level compression filter. * - * See for the available compression levels. - * - * Use grpc_channel_args_set_compression_level and - * grpc_channel_args_get_compression_level to interact with the compression - * settings for a channel. + * See for the available compression settings. * * grpc_op instances of type GRPC_OP_SEND_MESSAGE can have the bit specified by * the GRPC_WRITE_NO_COMPRESS mask in order to disable compression in an diff --git a/src/cpp/client/channel_arguments.cc b/src/cpp/client/channel_arguments.cc index b271650673..92ac5ea6fd 100644 --- a/src/cpp/client/channel_arguments.cc +++ b/src/cpp/client/channel_arguments.cc @@ -37,8 +37,9 @@ namespace grpc { -void ChannelArguments::SetCompressionLevel(grpc_compression_level level) { - SetInt(GRPC_COMPRESSION_LEVEL_ARG, level); +void ChannelArguments::_Experimental_SetCompressionAlgorithm( + grpc_compression_algorithm algorithm) { + SetInt(GRPC_COMPRESSION_ALGORITHM_ARG, algorithm); } void ChannelArguments::SetInt(const grpc::string& key, int value) { diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index cc5f51d618..c81f06186a 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -79,13 +79,7 @@ void ClientContext::set_call(grpc_call* call, } } -void ClientContext::set_compression_level(grpc_compression_level level) { - const grpc_compression_algorithm algorithm_for_level = - grpc_compression_algorithm_for_level(level); - set_compression_algorithm(algorithm_for_level); -} - -void ClientContext::set_compression_algorithm( +void ClientContext::_experimental_set_compression_algorithm( grpc_compression_algorithm algorithm) { char* algorithm_name = NULL; if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) { diff --git a/test/core/end2end/fixtures/chttp2_fullstack_compression.c b/test/core/end2end/fixtures/chttp2_fullstack_compression.c index 19658ed38f..0a9a312296 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack_compression.c +++ b/test/core/end2end/fixtures/chttp2_fullstack_compression.c @@ -80,8 +80,8 @@ void chttp2_init_client_fullstack_compression(grpc_end2end_test_fixture *f, if (ffd->client_args_compression != NULL) { grpc_channel_args_destroy(ffd->client_args_compression); } - ffd->client_args_compression = grpc_channel_args_set_compression_level( - client_args, GRPC_COMPRESS_LEVEL_HIGH); + ffd->client_args_compression = grpc_channel_args_set_compression_algorithm( + client_args, GRPC_COMPRESS_GZIP); f->client = grpc_channel_create(ffd->localaddr, ffd->client_args_compression); } @@ -91,8 +91,8 @@ void chttp2_init_server_fullstack_compression(grpc_end2end_test_fixture *f, if (ffd->server_args_compression != NULL) { grpc_channel_args_destroy(ffd->server_args_compression); } - ffd->server_args_compression = grpc_channel_args_set_compression_level( - server_args, GRPC_COMPRESS_LEVEL_HIGH); + ffd->server_args_compression = grpc_channel_args_set_compression_algorithm( + server_args, GRPC_COMPRESS_GZIP); if (f->server) { grpc_server_destroy(f->server); } diff --git a/test/core/end2end/tests/request_with_compressed_payload.c b/test/core/end2end/tests/request_with_compressed_payload.c index 0c1b065bd8..2599f796d2 100644 --- a/test/core/end2end/tests/request_with_compressed_payload.c +++ b/test/core/end2end/tests/request_with_compressed_payload.c @@ -104,7 +104,7 @@ static void end_test(grpc_end2end_test_fixture *f) { static void request_with_payload_template( grpc_end2end_test_config config, const char *test_name, gpr_uint32 send_flags_bitmask, - grpc_compression_level requested_compression_level, + grpc_compression_algorithm requested_compression_algorithm, grpc_compression_algorithm expected_compression_algorithm, grpc_metadata *client_metadata) { grpc_call *c; @@ -133,10 +133,10 @@ static void request_with_payload_template( request_payload_slice = gpr_slice_from_copied_string(str); request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - client_args = grpc_channel_args_set_compression_level( - NULL, requested_compression_level); - server_args = grpc_channel_args_set_compression_level( - NULL, requested_compression_level); + client_args = grpc_channel_args_set_compression_algorithm( + NULL, requested_compression_algorithm); + server_args = grpc_channel_args_set_compression_algorithm( + NULL, requested_compression_algorithm); f = begin_test(config, test_name, client_args, server_args); cqv = cq_verifier_create(f.cq); @@ -256,7 +256,7 @@ static void test_invoke_request_with_exceptionally_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( config, "test_invoke_request_with_exceptionally_uncompressed_payload", - GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_LEVEL_HIGH, GRPC_COMPRESS_NONE, + GRPC_WRITE_NO_COMPRESS, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_NONE, NULL); } @@ -264,16 +264,14 @@ static void test_invoke_request_with_uncompressed_payload( grpc_end2end_test_config config) { request_with_payload_template( config, "test_invoke_request_with_uncompressed_payload", 0, - GRPC_COMPRESS_LEVEL_NONE, - grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_NONE), NULL); + GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE, NULL); } static void test_invoke_request_with_compressed_payload( grpc_end2end_test_config config) { request_with_payload_template( config, "test_invoke_request_with_compressed_payload", 0, - GRPC_COMPRESS_LEVEL_HIGH, - grpc_compression_algorithm_for_level(GRPC_COMPRESS_LEVEL_HIGH), NULL); + GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, NULL); } static void test_invoke_request_with_compressed_payload_md_override( @@ -296,19 +294,17 @@ static void test_invoke_request_with_compressed_payload_md_override( /* Channel default NONE, call override to GZIP */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_1", 0, - GRPC_COMPRESS_LEVEL_NONE, GRPC_COMPRESS_GZIP, &gzip_compression_override); + GRPC_COMPRESS_NONE, GRPC_COMPRESS_GZIP, &gzip_compression_override); /* Channel default DEFLATE, call override to GZIP */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_2", 0, - grpc_compression_level_for_algorithm(GRPC_COMPRESS_DEFLATE), - GRPC_COMPRESS_GZIP, &gzip_compression_override); + GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_GZIP, &gzip_compression_override); /* Channel default DEFLATE, call override to NONE */ request_with_payload_template( config, "test_invoke_request_with_compressed_payload_md_override_3", 0, - grpc_compression_level_for_algorithm(GRPC_COMPRESS_DEFLATE), - GRPC_COMPRESS_NONE, &none_compression_override); + GRPC_COMPRESS_DEFLATE, GRPC_COMPRESS_NONE, &none_compression_override); } void grpc_end2end_tests(grpc_end2end_test_config config) { diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc index c086d66a7c..b3523b8330 100644 --- a/test/cpp/end2end/end2end_test.cc +++ b/test/cpp/end2end/end2end_test.cc @@ -271,7 +271,7 @@ static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub, for (int i = 0; i < num_rpcs; ++i) { ClientContext context; - context.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + context._experimental_set_compression_algorithm(GRPC_COMPRESS_GZIP); Status s = stub->Echo(&context, request, &response); EXPECT_EQ(response.message(), request.message()); EXPECT_TRUE(s.ok()); diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index e9d86cc9f7..8fe0d6886a 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -227,7 +227,7 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { GenericServerContext srv_ctx; GenericServerAsyncReaderWriter srv_stream(&srv_ctx); - cli_ctx.set_compression_level(GRPC_COMPRESS_LEVEL_HIGH); + cli_ctx._experimental_set_compression_algorithm(GRPC_COMPRESS_GZIP); send_request.set_message("Hello"); std::unique_ptr cli_stream = generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); -- cgit v1.2.3