aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-04-14 08:22:55 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-04-14 08:22:55 -0700
commit3136568e36abf32891cef392b02dddc9f8916d11 (patch)
tree5132d8849b93a3ea2cd0d1f5bc9d429f9163c8f5 /src/core/lib
parent4f505464bdd8435a1ffe345357e30bb63fbf2120 (diff)
parentb81fb794a397b053df0d4bed7b1525a0ff51535f (diff)
Merge remote-tracking branch 'upstream/master' into revert-10619-revert-9626-lazy-deframe
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_args.c28
-rw-r--r--src/core/lib/channel/channel_args.h5
-rw-r--r--src/core/lib/channel/channel_stack_builder.c11
-rw-r--r--src/core/lib/channel/channel_stack_builder.h4
-rw-r--r--src/core/lib/channel/compress_filter.c363
-rw-r--r--src/core/lib/channel/compress_filter.h67
-rw-r--r--src/core/lib/channel/deadline_filter.c348
-rw-r--r--src/core/lib/channel/deadline_filter.h102
-rw-r--r--src/core/lib/channel/http_client_filter.c613
-rw-r--r--src/core/lib/channel/http_client_filter.h47
-rw-r--r--src/core/lib/channel/http_server_filter.c445
-rw-r--r--src/core/lib/channel/http_server_filter.h42
-rw-r--r--src/core/lib/channel/message_size_filter.c270
-rw-r--r--src/core/lib/channel/message_size_filter.h39
-rw-r--r--src/core/lib/security/credentials/credentials.c1
-rw-r--r--src/core/lib/security/credentials/ssl/ssl_credentials.c1
-rw-r--r--src/core/lib/surface/channel_init.c21
-rw-r--r--src/core/lib/surface/channel_stack_type.c18
-rw-r--r--src/core/lib/surface/channel_stack_type.h2
-rw-r--r--src/core/lib/surface/completion_queue.c48
-rw-r--r--src/core/lib/surface/completion_queue.h6
-rw-r--r--src/core/lib/surface/completion_queue_factory.c33
-rw-r--r--src/core/lib/surface/init.c50
-rw-r--r--src/core/lib/surface/server.c9
-rw-r--r--src/core/lib/surface/version.c2
25 files changed, 147 insertions, 2428 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index a6d124c719..3de31d99da 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -329,7 +329,9 @@ const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
return NULL;
}
-int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) {
+int grpc_channel_arg_get_integer(const grpc_arg *arg,
+ grpc_integer_options options) {
+ if (arg == NULL) return options.default_value;
if (arg->type != GRPC_ARG_INTEGER) {
gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key);
return options.default_value;
@@ -347,9 +349,25 @@ int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) {
return arg->value.integer;
}
+bool grpc_channel_arg_get_bool(const grpc_arg *arg, bool default_value) {
+ if (arg == NULL) return default_value;
+ if (arg->type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key);
+ return default_value;
+ }
+ switch (arg->value.integer) {
+ case 0:
+ return false;
+ case 1:
+ return true;
+ default:
+ gpr_log(GPR_ERROR, "%s treated as bool but set to %d (assuming true)",
+ arg->key, arg->value.integer);
+ return true;
+ }
+}
+
bool grpc_channel_args_want_minimal_stack(const grpc_channel_args *args) {
- const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_MINIMAL_STACK);
- if (arg == NULL) return false;
- if (arg->type == GRPC_ARG_INTEGER && arg->value.integer == 0) return false;
- return true;
+ return grpc_channel_arg_get_bool(
+ grpc_channel_args_find(args, GRPC_ARG_MINIMAL_STACK), false);
}
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 158cda5b21..5ffcacb7fd 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -121,6 +121,9 @@ typedef struct grpc_integer_options {
int max_value;
} grpc_integer_options;
/** Returns the value of \a arg, subject to the contraints in \a options. */
-int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options);
+int grpc_channel_arg_get_integer(const grpc_arg *arg,
+ grpc_integer_options options);
+
+bool grpc_channel_arg_get_bool(const grpc_arg *arg, bool default_value);
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */
diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c
index b515b7321a..88c02edb70 100644
--- a/src/core/lib/channel/channel_stack_builder.c
+++ b/src/core/lib/channel/channel_stack_builder.c
@@ -113,6 +113,17 @@ grpc_channel_stack_builder_create_iterator_at_last(
return create_iterator_at_filter_node(builder, &builder->end);
}
+bool grpc_channel_stack_builder_iterator_is_end(
+ grpc_channel_stack_builder_iterator *iterator) {
+ return iterator->node == &iterator->builder->end;
+}
+
+const char *grpc_channel_stack_builder_iterator_filter_name(
+ grpc_channel_stack_builder_iterator *iterator) {
+ if (iterator->node->filter == NULL) return NULL;
+ return iterator->node->filter->name;
+}
+
bool grpc_channel_stack_builder_move_next(
grpc_channel_stack_builder_iterator *iterator) {
if (iterator->node == &iterator->builder->end) return false;
diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h
index 8adf38e27b..c78111b00d 100644
--- a/src/core/lib/channel/channel_stack_builder.h
+++ b/src/core/lib/channel/channel_stack_builder.h
@@ -98,6 +98,10 @@ bool grpc_channel_stack_builder_iterator_is_first(
bool grpc_channel_stack_builder_iterator_is_end(
grpc_channel_stack_builder_iterator *iterator);
+/// What is the name of the filter at this iterator position?
+const char *grpc_channel_stack_builder_iterator_filter_name(
+ grpc_channel_stack_builder_iterator *iterator);
+
/// Move an iterator to the next item
bool grpc_channel_stack_builder_move_next(
grpc_channel_stack_builder_iterator *iterator);
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
deleted file mode 100644
index 764524b24d..0000000000
--- a/src/core/lib/channel/compress_filter.c
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <assert.h>
-#include <string.h>
-
-#include <grpc/compression.h>
-#include <grpc/slice_buffer.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/compress_filter.h"
-#include "src/core/lib/compression/algorithm_metadata.h"
-#include "src/core/lib/compression/message_compress.h"
-#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
-#include "src/core/lib/transport/static_metadata.h"
-
-int grpc_compression_trace = 0;
-
-typedef struct call_data {
- grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
- grpc_linked_mdelem compression_algorithm_storage;
- grpc_linked_mdelem accept_encoding_storage;
- uint32_t remaining_slice_bytes;
- /** Compression algorithm we'll try to use. It may be given by incoming
- * metadata, or by the channel's default compression settings. */
- grpc_compression_algorithm compression_algorithm;
- /** If true, contents of \a compression_algorithm are authoritative */
- int has_compression_algorithm;
-
- grpc_transport_stream_op_batch *send_op;
- uint32_t send_length;
- uint32_t send_flags;
- grpc_slice incoming_slice;
- grpc_slice_buffer_stream replacement_stream;
- grpc_closure *post_send;
- grpc_closure send_done;
- grpc_closure got_slice;
-} call_data;
-
-typedef struct channel_data {
- /** The default, channel-level, compression algorithm */
- grpc_compression_algorithm default_compression_algorithm;
- /** Bitset of enabled algorithms */
- uint32_t enabled_algorithms_bitset;
- /** Supported compression algorithms */
- uint32_t supported_compression_algorithms;
-} channel_data;
-
-static int skip_compression(grpc_call_element *elem, uint32_t flags) {
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
-
- if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) {
- return 1;
- }
- if (calld->has_compression_algorithm) {
- if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
- return 1;
- }
- return 0; /* we have an actual call-specific algorithm */
- }
- /* no per-call compression override */
- return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
-}
-
-/** Filter initial metadata */
-static grpc_error *process_send_initial_metadata(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_metadata_batch *initial_metadata) GRPC_MUST_USE_RESULT;
-static grpc_error *process_send_initial_metadata(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_metadata_batch *initial_metadata) {
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- /* Parse incoming request for compression. If any, it'll be available
- * at calld->compression_algorithm */
- if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) {
- grpc_mdelem md =
- initial_metadata->idx.named.grpc_internal_encoding_request->md;
- if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md),
- &calld->compression_algorithm)) {
- char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
- gpr_log(GPR_ERROR,
- "Invalid compression algorithm: '%s' (unknown). Ignoring.", val);
- gpr_free(val);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
- calld->compression_algorithm)) {
- char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md));
- gpr_log(GPR_ERROR,
- "Invalid compression algorithm: '%s' (previously disabled). "
- "Ignoring.",
- val);
- gpr_free(val);
- calld->compression_algorithm = GRPC_COMPRESS_NONE;
- }
- calld->has_compression_algorithm = 1;
-
- grpc_metadata_batch_remove(
- exec_ctx, initial_metadata,
- initial_metadata->idx.named.grpc_internal_encoding_request);
- } else {
- /* If no algorithm was found in the metadata and we aren't
- * exceptionally skipping compression, fall back to the channel
- * default */
- calld->compression_algorithm = channeld->default_compression_algorithm;
- calld->has_compression_algorithm = 1; /* GPR_TRUE */
- }
-
- grpc_error *error = GRPC_ERROR_NONE;
- /* hint compression algorithm */
- if (calld->compression_algorithm != GRPC_COMPRESS_NONE) {
- error = grpc_metadata_batch_add_tail(
- exec_ctx, initial_metadata, &calld->compression_algorithm_storage,
- grpc_compression_encoding_mdelem(calld->compression_algorithm));
- }
-
- if (error != GRPC_ERROR_NONE) return error;
-
- /* convey supported compression algorithms */
- error = grpc_metadata_batch_add_tail(
- exec_ctx, initial_metadata, &calld->accept_encoding_storage,
- GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
- channeld->supported_compression_algorithms));
-
- return error;
-}
-
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem);
-
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
-}
-
-static void finish_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- int did_compress;
- grpc_slice_buffer tmp;
- grpc_slice_buffer_init(&tmp);
- did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm,
- &calld->slices, &tmp);
- if (did_compress) {
- if (grpc_compression_trace) {
- char *algo_name;
- const size_t before_size = calld->slices.length;
- const size_t after_size = tmp.length;
- const float savings_ratio = 1.0f - (float)after_size / (float)before_size;
- GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
- &algo_name));
- gpr_log(GPR_DEBUG, "Compressed[%s] %" PRIuPTR " bytes vs. %" PRIuPTR
- " bytes (%.2f%% savings)",
- algo_name, before_size, after_size, 100 * savings_ratio);
- }
- grpc_slice_buffer_swap(&calld->slices, &tmp);
- calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
- } else {
- if (grpc_compression_trace) {
- char *algo_name;
- GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
- &algo_name));
- gpr_log(GPR_DEBUG,
- "Algorithm '%s' enabled but decided not to compress. Input size: "
- "%" PRIuPTR,
- algo_name, calld->slices.length);
- }
- }
-
- grpc_slice_buffer_destroy_internal(exec_ctx, &tmp);
-
- grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- calld->send_flags);
- calld->send_op->payload->send_message.send_message =
- &calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
-
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
-}
-
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- finish_send_message(exec_ctx, elem);
- } else {
- continue_send_message(exec_ctx, elem);
- }
-}
-
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- finish_send_message(exec_ctx, elem);
- break;
- }
- }
-}
-
-static void compress_start_transport_stream_op_batch(
- grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- call_data *calld = elem->call_data;
-
- GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
-
- if (op->send_initial_metadata) {
- grpc_error *error = process_send_initial_metadata(
- exec_ctx, elem,
- op->payload->send_initial_metadata.send_initial_metadata);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
- }
- if (op->send_message &&
- !skip_compression(elem, op->payload->send_message.send_message->flags)) {
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
- } else {
- /* pass control down the stack */
- grpc_call_next_op(exec_ctx, elem, op);
- }
-
- GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
-}
-
-/* Constructor for call_data */
-static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_call_element_args *args) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
-
- /* initialize members */
- grpc_slice_buffer_init(&calld->slices);
- calld->has_compression_algorithm = 0;
- grpc_closure_init(&calld->got_slice, got_slice, elem,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&calld->send_done, send_done, elem,
- grpc_schedule_on_exec_ctx);
-
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *ignored) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
-}
-
-/* Constructor for channel_data */
-static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- channel_data *channeld = elem->channel_data;
-
- channeld->enabled_algorithms_bitset =
- grpc_channel_args_compression_algorithm_get_states(args->channel_args);
-
- channeld->default_compression_algorithm =
- grpc_channel_args_get_compression_algorithm(args->channel_args);
- /* Make sure the default isn't disabled. */
- if (!GPR_BITGET(channeld->enabled_algorithms_bitset,
- channeld->default_compression_algorithm)) {
- gpr_log(GPR_DEBUG,
- "compression algorithm %d not enabled: switching to none",
- channeld->default_compression_algorithm);
- channeld->default_compression_algorithm = GRPC_COMPRESS_NONE;
- }
-
- channeld->supported_compression_algorithms = 1; /* always support identity */
- for (grpc_compression_algorithm algo_idx = 1;
- algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
- /* skip disabled algorithms */
- if (!GPR_BITGET(channeld->enabled_algorithms_bitset, algo_idx)) {
- continue;
- }
- channeld->supported_compression_algorithms |= 1u << algo_idx;
- }
-
- GPR_ASSERT(!args->is_last);
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {}
-
-const grpc_channel_filter grpc_compress_filter = {
- compress_start_transport_stream_op_batch,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- grpc_channel_next_get_info,
- "compress"};
diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h
deleted file mode 100644
index e4a2a829d5..0000000000
--- a/src/core/lib/channel/compress_filter.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H
-#define GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H
-
-#include <grpc/impl/codegen/compression_types.h>
-
-#include "src/core/lib/channel/channel_stack.h"
-
-extern int grpc_compression_trace;
-
-/** Compression filter for outgoing data.
- *
- * See <grpc/compression.h> for the available compression settings.
- *
- * Compression settings may come from:
- * - Channel configuration, as established at channel creation time.
- * - The metadata accompanying the outgoing data to be compressed. This is
- * taken as a request only. We may choose not to honor it. The metadata key
- * is given by \a GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY.
- *
- * Compression can be disabled for concrete messages (for instance in order to
- * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in
- * the BEGIN_MESSAGE flags.
- *
- * The attempted compression mechanism is added to the resulting initial
- * metadata under the'grpc-encoding' key.
- *
- * If compression is actually performed, BEGIN_MESSAGE's flag is modified to
- * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the
- * aforementioned 'grpc-encoding' metadata value, data will pass through
- * uncompressed. */
-
-extern const grpc_channel_filter grpc_compress_filter;
-
-#endif /* GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H */
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
deleted file mode 100644
index fda099b021..0000000000
--- a/src/core/lib/channel/deadline_filter.c
+++ /dev/null
@@ -1,348 +0,0 @@
-//
-// Copyright 2016, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#include "src/core/lib/channel/deadline_filter.h"
-
-#include <stdbool.h>
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/slice/slice_internal.h"
-
-//
-// grpc_deadline_state
-//
-
-// Timer callback.
-static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- grpc_call_element* elem = arg;
- grpc_deadline_state* deadline_state = elem->call_data;
- if (error != GRPC_ERROR_CANCELLED) {
- grpc_call_element_signal_error(
- exec_ctx, elem,
- grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Deadline Exceeded"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED));
- }
- GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
-}
-
-// Starts the deadline timer.
-static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- gpr_timespec deadline) {
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
- return;
- }
- grpc_deadline_state* deadline_state = elem->call_data;
- grpc_deadline_timer_state cur_state;
- grpc_closure* closure = NULL;
-retry:
- cur_state =
- (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
- switch (cur_state) {
- case GRPC_DEADLINE_STATE_PENDING:
- // Note: We do not start the timer if there is already a timer
- return;
- case GRPC_DEADLINE_STATE_FINISHED:
- if (gpr_atm_rel_cas(&deadline_state->timer_state,
- GRPC_DEADLINE_STATE_FINISHED,
- GRPC_DEADLINE_STATE_PENDING)) {
- // If we've already created and destroyed a timer, we always create a
- // new closure: we have no other guarantee that the inlined closure is
- // not in use (it may hold a pending call to timer_callback)
- closure = grpc_closure_create(timer_callback, elem,
- grpc_schedule_on_exec_ctx);
- } else {
- goto retry;
- }
- break;
- case GRPC_DEADLINE_STATE_INITIAL:
- if (gpr_atm_rel_cas(&deadline_state->timer_state,
- GRPC_DEADLINE_STATE_INITIAL,
- GRPC_DEADLINE_STATE_PENDING)) {
- closure =
- grpc_closure_init(&deadline_state->timer_callback, timer_callback,
- elem, grpc_schedule_on_exec_ctx);
- } else {
- goto retry;
- }
- break;
- }
- GPR_ASSERT(closure);
- GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
- grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
- gpr_now(GPR_CLOCK_MONOTONIC));
-}
-
-// Cancels the deadline timer.
-static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
- grpc_deadline_state* deadline_state) {
- if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
- GRPC_DEADLINE_STATE_FINISHED)) {
- grpc_timer_cancel(exec_ctx, &deadline_state->timer);
- } else {
- // timer was either in STATE_INITAL (nothing to cancel)
- // OR in STATE_FINISHED (again nothing to cancel)
- }
-}
-
-// Callback run when the call is complete.
-static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
- grpc_deadline_state* deadline_state = arg;
- cancel_timer_if_needed(exec_ctx, deadline_state);
- // Invoke the next callback.
- grpc_closure_run(exec_ctx, deadline_state->next_on_complete,
- GRPC_ERROR_REF(error));
-}
-
-// Inject our own on_complete callback into op.
-static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
- grpc_transport_stream_op_batch* op) {
- deadline_state->next_on_complete = op->on_complete;
- grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state,
- grpc_schedule_on_exec_ctx);
- op->on_complete = &deadline_state->on_complete;
-}
-
-void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_call_stack* call_stack) {
- grpc_deadline_state* deadline_state = elem->call_data;
- deadline_state->call_stack = call_stack;
-}
-
-void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem) {
- grpc_deadline_state* deadline_state = elem->call_data;
- cancel_timer_if_needed(exec_ctx, deadline_state);
-}
-
-// Callback and associated state for starting the timer after call stack
-// initialization has been completed.
-struct start_timer_after_init_state {
- grpc_call_element* elem;
- gpr_timespec deadline;
- grpc_closure closure;
-};
-static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- struct start_timer_after_init_state* state = arg;
- start_timer_if_needed(exec_ctx, state->elem, state->deadline);
- gpr_free(state);
-}
-
-void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- gpr_timespec deadline) {
- // Deadline will always be infinite on servers, so the timer will only be
- // set on clients with a finite deadline.
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
- // When the deadline passes, we indicate the failure by sending down
- // an op with cancel_error set. However, we can't send down any ops
- // until after the call stack is fully initialized. If we start the
- // timer here, we have no guarantee that the timer won't pop before
- // call stack initialization is finished. To avoid that problem, we
- // create a closure to start the timer, and we schedule that closure
- // to be run after call stack initialization is done.
- struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
- state->elem = elem;
- state->deadline = deadline;
- grpc_closure_init(&state->closure, start_timer_after_init, state,
- grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE);
- }
-}
-
-void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- gpr_timespec new_deadline) {
- grpc_deadline_state* deadline_state = elem->call_data;
- cancel_timer_if_needed(exec_ctx, deadline_state);
- start_timer_if_needed(exec_ctx, elem, new_deadline);
-}
-
-void grpc_deadline_state_client_start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* op) {
- grpc_deadline_state* deadline_state = elem->call_data;
- if (op->cancel_stream) {
- cancel_timer_if_needed(exec_ctx, deadline_state);
- } else {
- // Make sure we know when the call is complete, so that we can cancel
- // the timer.
- if (op->recv_trailing_metadata) {
- inject_on_complete_cb(deadline_state, op);
- }
- }
-}
-
-//
-// filter code
-//
-
-// Constructor for channel_data. Used for both client and server filters.
-static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
- grpc_channel_element_args* args) {
- GPR_ASSERT(!args->is_last);
- return GRPC_ERROR_NONE;
-}
-
-// Destructor for channel_data. Used for both client and server filters.
-static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {}
-
-// Call data used for both client and server filter.
-typedef struct base_call_data {
- grpc_deadline_state deadline_state;
-} base_call_data;
-
-// Additional call data used only for the server filter.
-typedef struct server_call_data {
- base_call_data base; // Must be first.
- // The closure for receiving initial metadata.
- grpc_closure recv_initial_metadata_ready;
- // Received initial metadata batch.
- grpc_metadata_batch* recv_initial_metadata;
- // The original recv_initial_metadata_ready closure, which we chain to
- // after our own closure is invoked.
- grpc_closure* next_recv_initial_metadata_ready;
-} server_call_data;
-
-// Constructor for call_data. Used for both client and server filters.
-static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- const grpc_call_element_args* args) {
- grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
- grpc_deadline_state_start(exec_ctx, elem, args->deadline);
- return GRPC_ERROR_NONE;
-}
-
-// Destructor for call_data. Used for both client and server filters.
-static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- const grpc_call_final_info* final_info,
- grpc_closure* ignored) {
- grpc_deadline_state_destroy(exec_ctx, elem);
-}
-
-// Method for starting a call op for client filter.
-static void client_start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* op) {
- grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem,
- op);
- // Chain to next filter.
- grpc_call_next_op(exec_ctx, elem, op);
-}
-
-// Callback for receiving initial metadata on the server.
-static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- grpc_call_element* elem = arg;
- server_call_data* calld = elem->call_data;
- // Get deadline from metadata and start the timer if needed.
- start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
- // Invoke the next callback.
- calld->next_recv_initial_metadata_ready->cb(
- exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
-}
-
-// Method for starting a call op for server filter.
-static void server_start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* op) {
- server_call_data* calld = elem->call_data;
- if (op->cancel_stream) {
- cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
- } else {
- // If we're receiving initial metadata, we need to get the deadline
- // from the recv_initial_metadata_ready callback. So we inject our
- // own callback into that hook.
- if (op->recv_initial_metadata) {
- calld->next_recv_initial_metadata_ready =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- grpc_closure_init(&calld->recv_initial_metadata_ready,
- recv_initial_metadata_ready, elem,
- grpc_schedule_on_exec_ctx);
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->recv_initial_metadata_ready;
- }
- // Make sure we know when the call is complete, so that we can cancel
- // the timer.
- // Note that we trigger this on recv_trailing_metadata, even though
- // the client never sends trailing metadata, because this is the
- // hook that tells us when the call is complete on the server side.
- if (op->recv_trailing_metadata) {
- inject_on_complete_cb(&calld->base.deadline_state, op);
- }
- }
- // Chain to next filter.
- grpc_call_next_op(exec_ctx, elem, op);
-}
-
-const grpc_channel_filter grpc_client_deadline_filter = {
- client_start_transport_stream_op_batch,
- grpc_channel_next_op,
- sizeof(base_call_data),
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- 0, // sizeof(channel_data)
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- grpc_channel_next_get_info,
- "deadline",
-};
-
-const grpc_channel_filter grpc_server_deadline_filter = {
- server_start_transport_stream_op_batch,
- grpc_channel_next_op,
- sizeof(server_call_data),
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- 0, // sizeof(channel_data)
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- grpc_channel_next_get_info,
- "deadline",
-};
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
deleted file mode 100644
index d8db9a9f97..0000000000
--- a/src/core/lib/channel/deadline_filter.h
+++ /dev/null
@@ -1,102 +0,0 @@
-//
-// Copyright 2016, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#ifndef GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
-#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
-
-#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/iomgr/timer.h"
-
-typedef enum grpc_deadline_timer_state {
- GRPC_DEADLINE_STATE_INITIAL,
- GRPC_DEADLINE_STATE_PENDING,
- GRPC_DEADLINE_STATE_FINISHED
-} grpc_deadline_timer_state;
-
-// State used for filters that enforce call deadlines.
-// Must be the first field in the filter's call_data.
-typedef struct grpc_deadline_state {
- // We take a reference to the call stack for the timer callback.
- grpc_call_stack* call_stack;
- gpr_atm timer_state;
- grpc_timer timer;
- grpc_closure timer_callback;
- // Closure to invoke when the call is complete.
- // We use this to cancel the timer.
- grpc_closure on_complete;
- // The original on_complete closure, which we chain to after our own
- // closure is invoked.
- grpc_closure* next_on_complete;
-} grpc_deadline_state;
-
-//
-// NOTE: All of these functions require that the first field in
-// elem->call_data is a grpc_deadline_state.
-//
-
-// assumes elem->call_data is zero'd
-void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_call_stack* call_stack);
-void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem);
-
-// Starts the timer with the specified deadline.
-// Should be called from the filter's init_call_elem() method.
-void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- gpr_timespec deadline);
-
-// Cancels the existing timer and starts a new one with new_deadline.
-//
-// Note: It is generally safe to call this with an earlier deadline
-// value than the current one, but not the reverse. No checks are done
-// to ensure that the timer callback is not invoked while it is in the
-// process of being reset, which means that attempting to increase the
-// deadline may result in the timer being called twice.
-void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- gpr_timespec new_deadline);
-
-// To be called from the client-side filter's start_transport_stream_op_batch()
-// method. Ensures that the deadline timer is cancelled when the call
-// is completed.
-//
-// Note: It is the caller's responsibility to chain to the next filter if
-// necessary after this function returns.
-void grpc_deadline_state_client_start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* op);
-
-// Deadline filters for direct client channels and server channels.
-// Note: Deadlines for non-direct client channels are handled by the
-// client_channel filter.
-extern const grpc_channel_filter grpc_client_deadline_filter;
-extern const grpc_channel_filter grpc_server_deadline_filter;
-
-#endif /* GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H */
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
deleted file mode 100644
index 2affe1762d..0000000000
--- a/src/core/lib/channel/http_client_filter.c
+++ /dev/null
@@ -1,613 +0,0 @@
-/*
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/channel/http_client_filter.h"
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <string.h>
-#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/slice/b64.h"
-#include "src/core/lib/slice/percent_encoding.h"
-#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/support/string.h"
-#include "src/core/lib/transport/static_metadata.h"
-#include "src/core/lib/transport/transport_impl.h"
-
-#define EXPECTED_CONTENT_TYPE "application/grpc"
-#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
-
-/* default maximum size of payload eligable for GET request */
-static const size_t kMaxPayloadSizeForGet = 2048;
-
-typedef struct call_data {
- grpc_linked_mdelem method;
- grpc_linked_mdelem scheme;
- grpc_linked_mdelem authority;
- grpc_linked_mdelem te_trailers;
- grpc_linked_mdelem content_type;
- grpc_linked_mdelem user_agent;
-
- grpc_metadata_batch *recv_initial_metadata;
- grpc_metadata_batch *recv_trailing_metadata;
- uint8_t *payload_bytes;
-
- /* Vars to read data off of send_message */
- grpc_transport_stream_op_batch *send_op;
- uint32_t send_length;
- uint32_t send_flags;
- grpc_slice incoming_slice;
- grpc_slice_buffer_stream replacement_stream;
- grpc_slice_buffer slices;
- /* flag that indicates that all slices of send_messages aren't availble */
- bool send_message_blocked;
-
- /** Closure to call when finished with the hc_on_recv hook */
- grpc_closure *on_done_recv_initial_metadata;
- grpc_closure *on_done_recv_trailing_metadata;
- grpc_closure *on_complete;
- grpc_closure *post_send;
-
- /** Receive closures are chained: we inject this closure as the on_done_recv
- up-call on transport_op, and remember to call our on_done_recv member
- after handling it. */
- grpc_closure hc_on_recv_initial_metadata;
- grpc_closure hc_on_recv_trailing_metadata;
- grpc_closure hc_on_complete;
- grpc_closure got_slice;
- grpc_closure send_done;
-} call_data;
-
-typedef struct channel_data {
- grpc_mdelem static_scheme;
- grpc_mdelem user_agent;
- size_t max_payload_size_for_get;
-} channel_data;
-
-static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_metadata_batch *b) {
- if (b->idx.named.status != NULL) {
- if (grpc_mdelem_eq(b->idx.named.status->md, GRPC_MDELEM_STATUS_200)) {
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.status);
- } else {
- char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.status->md),
- GPR_DUMP_ASCII);
- char *msg;
- gpr_asprintf(&msg, "Received http2 header with status: %s", val);
- grpc_error *e = grpc_error_set_str(
- grpc_error_set_int(
- grpc_error_set_str(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Received http2 :status header with non-200 OK status"),
- GRPC_ERROR_STR_VALUE, grpc_slice_from_copied_string(val)),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED),
- GRPC_ERROR_STR_GRPC_MESSAGE, grpc_slice_from_copied_string(msg));
- gpr_free(val);
- gpr_free(msg);
- return e;
- }
- }
-
- if (b->idx.named.grpc_message != NULL) {
- grpc_slice pct_decoded_msg = grpc_permissive_percent_decode_slice(
- GRPC_MDVALUE(b->idx.named.grpc_message->md));
- if (grpc_slice_is_equivalent(pct_decoded_msg,
- GRPC_MDVALUE(b->idx.named.grpc_message->md))) {
- grpc_slice_unref_internal(exec_ctx, pct_decoded_msg);
- } else {
- grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message,
- pct_decoded_msg);
- }
- }
-
- if (b->idx.named.content_type != NULL) {
- if (!grpc_mdelem_eq(b->idx.named.content_type->md,
- GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)) {
- if (grpc_slice_buf_start_eq(GRPC_MDVALUE(b->idx.named.content_type->md),
- EXPECTED_CONTENT_TYPE,
- EXPECTED_CONTENT_TYPE_LENGTH) &&
- (GRPC_SLICE_START_PTR(GRPC_MDVALUE(
- b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
- '+' ||
- GRPC_SLICE_START_PTR(GRPC_MDVALUE(
- b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
- ';')) {
- /* Although the C implementation doesn't (currently) generate them,
- any custom +-suffix is explicitly valid. */
- /* TODO(klempner): We should consider preallocating common values such
- as +proto or +json, or at least stashing them if we see them. */
- /* TODO(klempner): Should we be surfacing this to application code? */
- } else {
- /* TODO(klempner): We're currently allowing this, but we shouldn't
- see it without a proxy so log for now. */
- char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.content_type->md),
- GPR_DUMP_ASCII);
- gpr_log(GPR_INFO, "Unexpected content-type '%s'", val);
- gpr_free(val);
- }
- }
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type);
- }
-
- return GRPC_ERROR_NONE;
-}
-
-static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
- void *user_data, grpc_error *error) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- if (error == GRPC_ERROR_NONE) {
- error = client_filter_incoming_metadata(exec_ctx, elem,
- calld->recv_initial_metadata);
- } else {
- GRPC_ERROR_REF(error);
- }
- grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error);
-}
-
-static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
- void *user_data, grpc_error *error) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- if (error == GRPC_ERROR_NONE) {
- error = client_filter_incoming_metadata(exec_ctx, elem,
- calld->recv_trailing_metadata);
- } else {
- GRPC_ERROR_REF(error);
- }
- grpc_closure_run(exec_ctx, calld->on_done_recv_trailing_metadata, error);
-}
-
-static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *error) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- if (calld->payload_bytes) {
- gpr_free(calld->payload_bytes);
- calld->payload_bytes = NULL;
- }
- calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error);
-}
-
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
- calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error);
-}
-
-static void remove_if_present(grpc_exec_ctx *exec_ctx,
- grpc_metadata_batch *batch,
- grpc_metadata_batch_callouts_index idx) {
- if (batch->idx.array[idx] != NULL) {
- grpc_metadata_batch_remove(exec_ctx, batch, batch->idx.array[idx]);
- }
-}
-
-static void continue_send_message(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- call_data *calld = elem->call_data;
- uint8_t *wrptr = calld->payload_bytes;
- while (grpc_byte_stream_next(
- exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
- &calld->got_slice)) {
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice);
- if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) {
- memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
- GRPC_SLICE_LENGTH(calld->incoming_slice));
- }
- wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- calld->send_message_blocked = false;
- break;
- }
- }
-}
-
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
- grpc_call_element *elem = elemp;
- call_data *calld = elem->call_data;
- calld->send_message_blocked = false;
- if (GRPC_ERROR_NONE !=
- grpc_byte_stream_pull(exec_ctx,
- calld->send_op->payload->send_message.send_message,
- &calld->incoming_slice)) {
- /* Should never reach here */
- abort();
- }
- grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
- if (calld->send_length == calld->slices.length) {
- /* Pass down the original send_message op that was blocked.*/
- grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
- calld->send_flags);
- calld->send_op->payload->send_message.send_message =
- &calld->replacement_stream.base;
- calld->post_send = calld->send_op->on_complete;
- calld->send_op->on_complete = &calld->send_done;
- grpc_call_next_op(exec_ctx, elem, calld->send_op);
- } else {
- continue_send_message(exec_ctx, elem);
- }
-}
-
-static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- channel_data *channeld = elem->channel_data;
- grpc_error *error;
-
- if (op->send_initial_metadata) {
- /* Decide which HTTP VERB to use. We use GET if the request is marked
- cacheable, and the operation contains both initial metadata and send
- message, and the payload is below the size threshold, and all the data
- for this request is immediately available. */
- grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
- if (op->send_message &&
- (op->payload->send_initial_metadata.send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
- op->payload->send_message.send_message->length <
- channeld->max_payload_size_for_get) {
- method = GRPC_MDELEM_METHOD_GET;
- /* The following write to calld->send_message_blocked isn't racy with
- reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because
- being here means ops->send_message is not NULL, which is primarily
- guarding the read there. */
- calld->send_message_blocked = true;
- } else if (op->payload->send_initial_metadata.send_initial_metadata_flags &
- GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
- method = GRPC_MDELEM_METHOD_PUT;
- }
-
- /* Attempt to read the data from send_message and create a header field. */
- if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) {
- /* allocate memory to hold the entire payload */
- calld->payload_bytes =
- gpr_malloc(op->payload->send_message.send_message->length);
-
- /* read slices of send_message and copy into payload_bytes */
- calld->send_op = op;
- calld->send_length = op->payload->send_message.send_message->length;
- calld->send_flags = op->payload->send_message.send_message->flags;
- continue_send_message(exec_ctx, elem);
-
- if (calld->send_message_blocked == false) {
- /* when all the send_message data is available, then modify the path
- * MDELEM by appending base64 encoded query to the path */
- const int k_url_safe = 1;
- const int k_multi_line = 0;
- const unsigned char k_query_separator = '?';
-
- grpc_slice path_slice =
- GRPC_MDVALUE(op->payload->send_initial_metadata
- .send_initial_metadata->idx.named.path->md);
- /* sum up individual component's lengths and allocate enough memory to
- * hold combined path+query */
- size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
- estimated_len++; /* for the '?' */
- estimated_len += grpc_base64_estimate_encoded_size(
- op->payload->send_message.send_message->length, k_url_safe,
- k_multi_line);
- estimated_len += 1; /* for the trailing 0 */
- grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len);
-
- /* memcopy individual pieces into this slice */
- uint8_t *write_ptr =
- (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice);
- uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
- memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
- write_ptr += GRPC_SLICE_LENGTH(path_slice);
-
- *write_ptr = k_query_separator;
- write_ptr++; /* for the '?' */
-
- grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes,
- op->payload->send_message.send_message->length,
- k_url_safe, k_multi_line);
-
- /* remove trailing unused memory and add trailing 0 to terminate string
- */
- char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
- /* safe to use strlen since base64_encode will always add '\0' */
- size_t path_length = strlen(t) + 1;
- *(t + path_length) = '\0';
- path_with_query_slice =
- grpc_slice_sub(path_with_query_slice, 0, path_length);
-
- /* substitute previous path with the new path+query */
- grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
- grpc_metadata_batch *b =
- op->payload->send_initial_metadata.send_initial_metadata;
- error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
- mdelem_path_and_query);
- if (error != GRPC_ERROR_NONE) return error;
-
- calld->on_complete = op->on_complete;
- op->on_complete = &calld->hc_on_complete;
- op->send_message = false;
- grpc_slice_unref_internal(exec_ctx, path_with_query_slice);
- } else {
- /* Not all data is available. Fall back to POST. */
- gpr_log(GPR_DEBUG,
- "Request is marked Cacheable but not all data is available.\
- Falling back to POST");
- method = GRPC_MDELEM_METHOD_POST;
- }
- }
-
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_METHOD);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_SCHEME);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_TE);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_CONTENT_TYPE);
- remove_if_present(exec_ctx,
- op->payload->send_initial_metadata.send_initial_metadata,
- GRPC_BATCH_USER_AGENT);
-
- /* Send : prefixed headers, which have to be before any application
- layer headers. */
- error = grpc_metadata_batch_add_head(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
- &calld->method, method);
- if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_head(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
- &calld->scheme, channeld->static_scheme);
- if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
- &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
- if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
- &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
- if (error != GRPC_ERROR_NONE) return error;
- error = grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
- &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
- if (error != GRPC_ERROR_NONE) return error;
- }
-
- if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- calld->on_done_recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->hc_on_recv_initial_metadata;
- }
-
- if (op->recv_trailing_metadata) {
- /* substitute our callback for the higher callback */
- calld->recv_trailing_metadata =
- op->payload->recv_trailing_metadata.recv_trailing_metadata;
- calld->on_done_recv_trailing_metadata = op->on_complete;
- op->on_complete = &calld->hc_on_recv_trailing_metadata;
- }
-
- return GRPC_ERROR_NONE;
-}
-
-static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GPR_TIMER_BEGIN("hc_start_transport_op", 0);
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- } else {
- call_data *calld = elem->call_data;
- if (op->send_message && calld->send_message_blocked) {
- /* Don't forward the op. send_message contains slices that aren't ready
- yet. The call will be forwarded by the op_complete of slice read call.
- */
- } else {
- grpc_call_next_op(exec_ctx, elem, op);
- }
- }
- GPR_TIMER_END("hc_start_transport_op", 0);
-}
-
-/* Constructor for call_data */
-static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_call_element_args *args) {
- call_data *calld = elem->call_data;
- calld->on_done_recv_initial_metadata = NULL;
- calld->on_done_recv_trailing_metadata = NULL;
- calld->on_complete = NULL;
- calld->payload_bytes = NULL;
- calld->send_message_blocked = false;
- grpc_slice_buffer_init(&calld->slices);
- grpc_closure_init(&calld->hc_on_recv_initial_metadata,
- hc_on_recv_initial_metadata, elem,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&calld->hc_on_recv_trailing_metadata,
- hc_on_recv_trailing_metadata, elem,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&calld->hc_on_complete, hc_on_complete, elem,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&calld->got_slice, got_slice, elem,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&calld->send_done, send_done, elem,
- grpc_schedule_on_exec_ctx);
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *ignored) {
- call_data *calld = elem->call_data;
- grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
-}
-
-static grpc_mdelem scheme_from_args(const grpc_channel_args *args) {
- unsigned i;
- size_t j;
- grpc_mdelem valid_schemes[] = {GRPC_MDELEM_SCHEME_HTTP,
- GRPC_MDELEM_SCHEME_HTTPS};
- if (args != NULL) {
- for (i = 0; i < args->num_args; ++i) {
- if (args->args[i].type == GRPC_ARG_STRING &&
- strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) {
- for (j = 0; j < GPR_ARRAY_SIZE(valid_schemes); j++) {
- if (0 == grpc_slice_str_cmp(GRPC_MDVALUE(valid_schemes[j]),
- args->args[i].value.string)) {
- return valid_schemes[j];
- }
- }
- }
- }
- }
- return GRPC_MDELEM_SCHEME_HTTP;
-}
-
-static size_t max_payload_size_from_args(const grpc_channel_args *args) {
- if (args != NULL) {
- for (size_t i = 0; i < args->num_args; ++i) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET)) {
- if (args->args[i].type != GRPC_ARG_INTEGER) {
- gpr_log(GPR_ERROR, "%s: must be an integer",
- GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET);
- } else {
- return (size_t)args->args[i].value.integer;
- }
- }
- }
- }
- return kMaxPayloadSizeForGet;
-}
-
-static grpc_slice user_agent_from_args(const grpc_channel_args *args,
- const char *transport_name) {
- gpr_strvec v;
- size_t i;
- int is_first = 1;
- char *tmp;
- grpc_slice result;
-
- gpr_strvec_init(&v);
-
- for (i = 0; args && i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) {
- if (args->args[i].type != GRPC_ARG_STRING) {
- gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
- GRPC_ARG_PRIMARY_USER_AGENT_STRING);
- } else {
- if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
- is_first = 0;
- gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
- }
- }
- }
-
- gpr_asprintf(&tmp, "%sgrpc-c/%s (%s; %s; %s)", is_first ? "" : " ",
- grpc_version_string(), GPR_PLATFORM_STRING, transport_name,
- grpc_g_stands_for());
- is_first = 0;
- gpr_strvec_add(&v, tmp);
-
- for (i = 0; args && i < args->num_args; i++) {
- if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) {
- if (args->args[i].type != GRPC_ARG_STRING) {
- gpr_log(GPR_ERROR, "Channel argument '%s' should be a string",
- GRPC_ARG_SECONDARY_USER_AGENT_STRING);
- } else {
- if (!is_first) gpr_strvec_add(&v, gpr_strdup(" "));
- is_first = 0;
- gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string));
- }
- }
- }
-
- tmp = gpr_strvec_flatten(&v, NULL);
- gpr_strvec_destroy(&v);
- result = grpc_slice_intern(grpc_slice_from_static_string(tmp));
- gpr_free(tmp);
-
- return result;
-}
-
-/* Constructor for channel_data */
-static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- channel_data *chand = elem->channel_data;
- GPR_ASSERT(!args->is_last);
- GPR_ASSERT(args->optional_transport != NULL);
- chand->static_scheme = scheme_from_args(args->channel_args);
- chand->max_payload_size_for_get =
- max_payload_size_from_args(args->channel_args);
- chand->user_agent = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_USER_AGENT,
- user_agent_from_args(args->channel_args,
- args->optional_transport->vtable->name));
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {
- channel_data *chand = elem->channel_data;
- GRPC_MDELEM_UNREF(exec_ctx, chand->user_agent);
-}
-
-const grpc_channel_filter grpc_http_client_filter = {
- hc_start_transport_op,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- grpc_channel_next_get_info,
- "http-client"};
diff --git a/src/core/lib/channel/http_client_filter.h b/src/core/lib/channel/http_client_filter.h
deleted file mode 100644
index 9e6e106e9c..0000000000
--- a/src/core/lib/channel/http_client_filter.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H
-#define GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H
-
-#include "src/core/lib/channel/channel_stack.h"
-
-/* Processes metadata on the client side for HTTP2 transports */
-extern const grpc_channel_filter grpc_http_client_filter;
-
-/* Channel arg to override the http2 :scheme header */
-#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme"
-
-/* Channel arg to determine maximum size of payload eligable for GET request */
-#define GRPC_ARG_MAX_PAYLOAD_SIZE_FOR_GET "grpc.max_payload_size_for_get"
-
-#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H */
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
deleted file mode 100644
index c1e49ffacc..0000000000
--- a/src/core/lib/channel/http_server_filter.c
+++ /dev/null
@@ -1,445 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/lib/channel/http_server_filter.h"
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <string.h>
-#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/slice/b64.h"
-#include "src/core/lib/slice/percent_encoding.h"
-#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/slice/slice_string_helpers.h"
-#include "src/core/lib/transport/static_metadata.h"
-
-#define EXPECTED_CONTENT_TYPE "application/grpc"
-#define EXPECTED_CONTENT_TYPE_LENGTH sizeof(EXPECTED_CONTENT_TYPE) - 1
-
-extern int grpc_http_trace;
-
-typedef struct call_data {
- grpc_linked_mdelem status;
- grpc_linked_mdelem content_type;
-
- /* did this request come with path query containing request payload */
- bool seen_path_with_query;
- /* flag to ensure payload_bin is delivered only once */
- bool payload_bin_delivered;
-
- grpc_metadata_batch *recv_initial_metadata;
- uint32_t *recv_initial_metadata_flags;
- /** Closure to call when finished with the hs_on_recv hook */
- grpc_closure *on_done_recv;
- /** Closure to call when we retrieve read message from the path URI
- */
- grpc_closure *recv_message_ready;
- grpc_closure *on_complete;
- grpc_byte_stream **pp_recv_message;
- grpc_slice_buffer read_slice_buffer;
- grpc_slice_buffer_stream read_stream;
-
- /** Receive closures are chained: we inject this closure as the on_done_recv
- up-call on transport_op, and remember to call our on_done_recv member
- after handling it. */
- grpc_closure hs_on_recv;
- grpc_closure hs_on_complete;
- grpc_closure hs_recv_message_ready;
-} call_data;
-
-typedef struct channel_data { uint8_t unused; } channel_data;
-
-static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_metadata_batch *b) {
- if (b->idx.named.grpc_message != NULL) {
- grpc_slice pct_encoded_msg = grpc_percent_encode_slice(
- GRPC_MDVALUE(b->idx.named.grpc_message->md),
- grpc_compatible_percent_encoding_unreserved_bytes);
- if (grpc_slice_is_equivalent(pct_encoded_msg,
- GRPC_MDVALUE(b->idx.named.grpc_message->md))) {
- grpc_slice_unref_internal(exec_ctx, pct_encoded_msg);
- } else {
- grpc_metadata_batch_set_value(exec_ctx, b->idx.named.grpc_message,
- pct_encoded_msg);
- }
- }
- return GRPC_ERROR_NONE;
-}
-
-static void add_error(const char *error_name, grpc_error **cumulative,
- grpc_error *new) {
- if (new == GRPC_ERROR_NONE) return;
- if (*cumulative == GRPC_ERROR_NONE) {
- *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name);
- }
- *cumulative = grpc_error_add_child(*cumulative, new);
-}
-
-static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_metadata_batch *b) {
- call_data *calld = elem->call_data;
- grpc_error *error = GRPC_ERROR_NONE;
- static const char *error_name = "Failed processing incoming headers";
-
- if (b->idx.named.method != NULL) {
- if (grpc_mdelem_eq(b->idx.named.method->md, GRPC_MDELEM_METHOD_POST)) {
- *calld->recv_initial_metadata_flags &=
- ~(GRPC_INITIAL_METADATA_CACHEABLE_REQUEST |
- GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST);
- } else if (grpc_mdelem_eq(b->idx.named.method->md,
- GRPC_MDELEM_METHOD_PUT)) {
- *calld->recv_initial_metadata_flags &=
- ~GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
- *calld->recv_initial_metadata_flags |=
- GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
- } else if (grpc_mdelem_eq(b->idx.named.method->md,
- GRPC_MDELEM_METHOD_GET)) {
- *calld->recv_initial_metadata_flags |=
- GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
- *calld->recv_initial_metadata_flags &=
- ~GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
- } else {
- add_error(error_name, &error,
- grpc_attach_md_to_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
- b->idx.named.method->md));
- }
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.method);
- } else {
- add_error(
- error_name, &error,
- grpc_error_set_str(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
- GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":method")));
- }
-
- if (b->idx.named.te != NULL) {
- if (!grpc_mdelem_eq(b->idx.named.te->md, GRPC_MDELEM_TE_TRAILERS)) {
- add_error(error_name, &error,
- grpc_attach_md_to_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
- b->idx.named.te->md));
- }
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.te);
- } else {
- add_error(error_name, &error,
- grpc_error_set_str(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
- GRPC_ERROR_STR_KEY, grpc_slice_from_static_string("te")));
- }
-
- if (b->idx.named.scheme != NULL) {
- if (!grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTP) &&
- !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_HTTPS) &&
- !grpc_mdelem_eq(b->idx.named.scheme->md, GRPC_MDELEM_SCHEME_GRPC)) {
- add_error(error_name, &error,
- grpc_attach_md_to_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Bad header"),
- b->idx.named.scheme->md));
- }
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.scheme);
- } else {
- add_error(
- error_name, &error,
- grpc_error_set_str(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
- GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":scheme")));
- }
-
- if (b->idx.named.content_type != NULL) {
- if (!grpc_mdelem_eq(b->idx.named.content_type->md,
- GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC)) {
- if (grpc_slice_buf_start_eq(GRPC_MDVALUE(b->idx.named.content_type->md),
- EXPECTED_CONTENT_TYPE,
- EXPECTED_CONTENT_TYPE_LENGTH) &&
- (GRPC_SLICE_START_PTR(GRPC_MDVALUE(
- b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
- '+' ||
- GRPC_SLICE_START_PTR(GRPC_MDVALUE(
- b->idx.named.content_type->md))[EXPECTED_CONTENT_TYPE_LENGTH] ==
- ';')) {
- /* Although the C implementation doesn't (currently) generate them,
- any custom +-suffix is explicitly valid. */
- /* TODO(klempner): We should consider preallocating common values such
- as +proto or +json, or at least stashing them if we see them. */
- /* TODO(klempner): Should we be surfacing this to application code? */
- } else {
- /* TODO(klempner): We're currently allowing this, but we shouldn't
- see it without a proxy so log for now. */
- char *val = grpc_dump_slice(GRPC_MDVALUE(b->idx.named.content_type->md),
- GPR_DUMP_ASCII);
- gpr_log(GPR_INFO, "Unexpected content-type '%s'", val);
- gpr_free(val);
- }
- }
- grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_type);
- }
-
- if (b->idx.named.path == NULL) {
- add_error(error_name, &error,
- grpc_error_set_str(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
- GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":path")));
- } else if (*calld->recv_initial_metadata_flags &
- GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
- /* We have a cacheable request made with GET verb. The path contains the
- * query parameter which is base64 encoded request payload. */
- const char k_query_separator = '?';
- grpc_slice path_slice = GRPC_MDVALUE(b->idx.named.path->md);
- uint8_t *path_ptr = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
- size_t path_length = GRPC_SLICE_LENGTH(path_slice);
- /* offset of the character '?' */
- size_t offset = 0;
- for (offset = 0; offset < path_length && *path_ptr != k_query_separator;
- path_ptr++, offset++)
- ;
- if (offset < path_length) {
- grpc_slice query_slice =
- grpc_slice_sub(path_slice, offset + 1, path_length);
-
- /* substitute path metadata with just the path (not query) */
- grpc_mdelem mdelem_path_without_query = grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_PATH, grpc_slice_sub(path_slice, 0, offset));
-
- grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
- mdelem_path_without_query);
-
- /* decode payload from query and add to the slice buffer to be returned */
- const int k_url_safe = 1;
- grpc_slice_buffer_add(
- &calld->read_slice_buffer,
- grpc_base64_decode(exec_ctx,
- (const char *)GRPC_SLICE_START_PTR(query_slice),
- k_url_safe));
- grpc_slice_buffer_stream_init(&calld->read_stream,
- &calld->read_slice_buffer, 0);
- calld->seen_path_with_query = true;
- grpc_slice_unref_internal(exec_ctx, query_slice);
- } else {
- gpr_log(GPR_ERROR, "GET request without QUERY");
- }
- }
-
- if (b->idx.named.host != NULL && b->idx.named.authority == NULL) {
- grpc_linked_mdelem *el = b->idx.named.host;
- grpc_mdelem md = GRPC_MDELEM_REF(el->md);
- grpc_metadata_batch_remove(exec_ctx, b, el);
- add_error(
- error_name, &error,
- grpc_metadata_batch_add_head(
- exec_ctx, b, el, grpc_mdelem_from_slices(
- exec_ctx, GRPC_MDSTR_AUTHORITY,
- grpc_slice_ref_internal(GRPC_MDVALUE(md)))));
- GRPC_MDELEM_UNREF(exec_ctx, md);
- }
-
- if (b->idx.named.authority == NULL) {
- add_error(
- error_name, &error,
- grpc_error_set_str(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing header"),
- GRPC_ERROR_STR_KEY, grpc_slice_from_static_string(":authority")));
- }
-
- return error;
-}
-
-static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *err) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- if (err == GRPC_ERROR_NONE) {
- err = server_filter_incoming_metadata(exec_ctx, elem,
- calld->recv_initial_metadata);
- } else {
- GRPC_ERROR_REF(err);
- }
- grpc_closure_run(exec_ctx, calld->on_done_recv, err);
-}
-
-static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *err) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- /* Call recv_message_ready if we got the payload via the path field */
- if (calld->seen_path_with_query && calld->recv_message_ready != NULL) {
- *calld->pp_recv_message = calld->payload_bin_delivered
- ? NULL
- : (grpc_byte_stream *)&calld->read_stream;
- grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
- calld->recv_message_ready = NULL;
- calld->payload_bin_delivered = true;
- }
- grpc_closure_run(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err));
-}
-
-static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *err) {
- grpc_call_element *elem = user_data;
- call_data *calld = elem->call_data;
- if (calld->seen_path_with_query) {
- /* do nothing. This is probably a GET request, and payload will be returned
- in hs_on_complete callback. */
- } else {
- grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
- }
-}
-
-static void hs_mutate_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
-
- if (op->send_initial_metadata) {
- grpc_error *error = GRPC_ERROR_NONE;
- static const char *error_name = "Failed sending initial metadata";
- add_error(
- error_name, &error,
- grpc_metadata_batch_add_head(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
- &calld->status, GRPC_MDELEM_STATUS_200));
- add_error(
- error_name, &error,
- grpc_metadata_batch_add_tail(
- exec_ctx, op->payload->send_initial_metadata.send_initial_metadata,
- &calld->content_type,
- GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC));
- add_error(error_name, &error,
- server_filter_outgoing_metadata(
- exec_ctx, elem,
- op->payload->send_initial_metadata.send_initial_metadata));
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
- }
-
- if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
- GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags != NULL);
- calld->recv_initial_metadata =
- op->payload->recv_initial_metadata.recv_initial_metadata;
- calld->recv_initial_metadata_flags =
- op->payload->recv_initial_metadata.recv_flags;
- calld->on_done_recv =
- op->payload->recv_initial_metadata.recv_initial_metadata_ready;
- op->payload->recv_initial_metadata.recv_initial_metadata_ready =
- &calld->hs_on_recv;
- }
-
- if (op->recv_message) {
- calld->recv_message_ready = op->payload->recv_message.recv_message_ready;
- calld->pp_recv_message = op->payload->recv_message.recv_message;
- if (op->payload->recv_message.recv_message_ready) {
- op->payload->recv_message.recv_message_ready =
- &calld->hs_recv_message_ready;
- }
- if (op->on_complete) {
- calld->on_complete = op->on_complete;
- op->on_complete = &calld->hs_on_complete;
- }
- }
-
- if (op->send_trailing_metadata) {
- grpc_error *error = server_filter_outgoing_metadata(
- exec_ctx, elem,
- op->payload->send_trailing_metadata.send_trailing_metadata);
- if (error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
- return;
- }
- }
-}
-
-static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
- GPR_TIMER_BEGIN("hs_start_transport_op", 0);
- hs_mutate_op(exec_ctx, elem, op);
- grpc_call_next_op(exec_ctx, elem, op);
- GPR_TIMER_END("hs_start_transport_op", 0);
-}
-
-/* Constructor for call_data */
-static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- const grpc_call_element_args *args) {
- /* grab pointers to our data from the call element */
- call_data *calld = elem->call_data;
- /* initialize members */
- grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem,
- grpc_schedule_on_exec_ctx);
- grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem,
- grpc_schedule_on_exec_ctx);
- grpc_slice_buffer_init(&calld->read_slice_buffer);
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for call_data */
-static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_final_info *final_info,
- grpc_closure *ignored) {
- call_data *calld = elem->call_data;
- grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer);
-}
-
-/* Constructor for channel_data */
-static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_channel_element_args *args) {
- GPR_ASSERT(!args->is_last);
- return GRPC_ERROR_NONE;
-}
-
-/* Destructor for channel data */
-static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem) {}
-
-const grpc_channel_filter grpc_http_server_filter = {
- hs_start_transport_op,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- grpc_channel_next_get_info,
- "http-server"};
diff --git a/src/core/lib/channel/http_server_filter.h b/src/core/lib/channel/http_server_filter.h
deleted file mode 100644
index 77ba2d263d..0000000000
--- a/src/core/lib/channel/http_server_filter.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H
-#define GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H
-
-#include "src/core/lib/channel/channel_stack.h"
-
-/* Processes metadata on the client side for HTTP2 transports */
-extern const grpc_channel_filter grpc_http_server_filter;
-
-#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H */
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
deleted file mode 100644
index c80b48ee13..0000000000
--- a/src/core/lib/channel/message_size_filter.c
+++ /dev/null
@@ -1,270 +0,0 @@
-//
-// Copyright 2016, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#include "src/core/lib/channel/message_size_filter.h"
-
-#include <limits.h>
-#include <string.h>
-
-#include <grpc/impl/codegen/grpc_types.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/support/string.h"
-#include "src/core/lib/transport/service_config.h"
-
-typedef struct message_size_limits {
- int max_send_size;
- int max_recv_size;
-} message_size_limits;
-
-static void* message_size_limits_copy(void* value) {
- void* new_value = gpr_malloc(sizeof(message_size_limits));
- memcpy(new_value, value, sizeof(message_size_limits));
- return new_value;
-}
-
-static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) {
- gpr_free(value);
-}
-
-static const grpc_slice_hash_table_vtable message_size_limits_vtable = {
- message_size_limits_free, message_size_limits_copy};
-
-static void* message_size_limits_create_from_json(const grpc_json* json) {
- int max_request_message_bytes = -1;
- int max_response_message_bytes = -1;
- for (grpc_json* field = json->child; field != NULL; field = field->next) {
- if (field->key == NULL) continue;
- if (strcmp(field->key, "maxRequestMessageBytes") == 0) {
- if (max_request_message_bytes >= 0) return NULL; // Duplicate.
- if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) {
- return NULL;
- }
- max_request_message_bytes = gpr_parse_nonnegative_int(field->value);
- if (max_request_message_bytes == -1) return NULL;
- } else if (strcmp(field->key, "maxResponseMessageBytes") == 0) {
- if (max_response_message_bytes >= 0) return NULL; // Duplicate.
- if (field->type != GRPC_JSON_STRING && field->type != GRPC_JSON_NUMBER) {
- return NULL;
- }
- max_response_message_bytes = gpr_parse_nonnegative_int(field->value);
- if (max_response_message_bytes == -1) return NULL;
- }
- }
- message_size_limits* value = gpr_malloc(sizeof(message_size_limits));
- value->max_send_size = max_request_message_bytes;
- value->max_recv_size = max_response_message_bytes;
- return value;
-}
-
-typedef struct call_data {
- int max_send_size;
- int max_recv_size;
- // Receive closures are chained: we inject this closure as the
- // recv_message_ready up-call on transport_stream_op, and remember to
- // call our next_recv_message_ready member after handling it.
- grpc_closure recv_message_ready;
- // Used by recv_message_ready.
- grpc_byte_stream** recv_message;
- // Original recv_message_ready callback, invoked after our own.
- grpc_closure* next_recv_message_ready;
-} call_data;
-
-typedef struct channel_data {
- int max_send_size;
- int max_recv_size;
- // Maps path names to message_size_limits structs.
- grpc_slice_hash_table* method_limit_table;
-} channel_data;
-
-// Callback invoked when we receive a message. Here we check the max
-// receive message size.
-static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
- grpc_error* error) {
- grpc_call_element* elem = user_data;
- call_data* calld = elem->call_data;
- if (*calld->recv_message != NULL && calld->max_recv_size >= 0 &&
- (*calld->recv_message)->length > (size_t)calld->max_recv_size) {
- char* message_string;
- gpr_asprintf(&message_string,
- "Received message larger than max (%u vs. %d)",
- (*calld->recv_message)->length, calld->max_recv_size);
- grpc_error* new_error = grpc_error_set_int(
- GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
- if (error == GRPC_ERROR_NONE) {
- error = new_error;
- } else {
- error = grpc_error_add_child(error, new_error);
- GRPC_ERROR_UNREF(new_error);
- }
- gpr_free(message_string);
- }
- // Invoke the next callback.
- grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error);
-}
-
-// Start transport stream op.
-static void start_transport_stream_op_batch(
- grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- grpc_transport_stream_op_batch* op) {
- call_data* calld = elem->call_data;
- // Check max send message size.
- if (op->send_message && calld->max_send_size >= 0 &&
- op->payload->send_message.send_message->length >
- (size_t)calld->max_send_size) {
- char* message_string;
- gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
- op->payload->send_message.send_message->length,
- calld->max_send_size);
- grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
- GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_RESOURCE_EXHAUSTED));
- gpr_free(message_string);
- return;
- }
- // Inject callback for receiving a message.
- if (op->recv_message) {
- calld->next_recv_message_ready =
- op->payload->recv_message.recv_message_ready;
- calld->recv_message = op->payload->recv_message.recv_message;
- op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
- }
- // Chain to the next filter.
- grpc_call_next_op(exec_ctx, elem, op);
-}
-
-// Constructor for call_data.
-static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- const grpc_call_element_args* args) {
- channel_data* chand = elem->channel_data;
- call_data* calld = elem->call_data;
- calld->next_recv_message_ready = NULL;
- grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem,
- grpc_schedule_on_exec_ctx);
- // Get max sizes from channel data, then merge in per-method config values.
- // Note: Per-method config is only available on the client, so we
- // apply the max request size to the send limit and the max response
- // size to the receive limit.
- calld->max_send_size = chand->max_send_size;
- calld->max_recv_size = chand->max_recv_size;
- if (chand->method_limit_table != NULL) {
- message_size_limits* limits = grpc_method_config_table_get(
- exec_ctx, chand->method_limit_table, args->path);
- if (limits != NULL) {
- if (limits->max_send_size >= 0 &&
- (limits->max_send_size < calld->max_send_size ||
- calld->max_send_size < 0)) {
- calld->max_send_size = limits->max_send_size;
- }
- if (limits->max_recv_size >= 0 &&
- (limits->max_recv_size < calld->max_recv_size ||
- calld->max_recv_size < 0)) {
- calld->max_recv_size = limits->max_recv_size;
- }
- }
- }
- return GRPC_ERROR_NONE;
-}
-
-// Destructor for call_data.
-static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
- const grpc_call_final_info* final_info,
- grpc_closure* ignored) {}
-
-// Constructor for channel_data.
-static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem,
- grpc_channel_element_args* args) {
- GPR_ASSERT(!args->is_last);
- channel_data* chand = elem->channel_data;
- chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH;
- chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH;
- for (size_t i = 0; i < args->channel_args->num_args; ++i) {
- if (strcmp(args->channel_args->args[i].key,
- GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) {
- const grpc_integer_options options = {
- GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH, -1, INT_MAX};
- chand->max_send_size =
- grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
- }
- if (strcmp(args->channel_args->args[i].key,
- GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
- const grpc_integer_options options = {
- GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH, -1, INT_MAX};
- chand->max_recv_size =
- grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
- }
- }
- // Get method config table from channel args.
- const grpc_arg* channel_arg =
- grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
- if (channel_arg != NULL) {
- GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
- grpc_service_config* service_config =
- grpc_service_config_create(channel_arg->value.string);
- if (service_config != NULL) {
- chand->method_limit_table =
- grpc_service_config_create_method_config_table(
- exec_ctx, service_config, message_size_limits_create_from_json,
- &message_size_limits_vtable);
- grpc_service_config_destroy(service_config);
- }
- }
- return GRPC_ERROR_NONE;
-}
-
-// Destructor for channel_data.
-static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
- grpc_channel_element* elem) {
- channel_data* chand = elem->channel_data;
- grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table);
-}
-
-const grpc_channel_filter grpc_message_size_filter = {
- start_transport_stream_op_batch,
- grpc_channel_next_op,
- sizeof(call_data),
- init_call_elem,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- destroy_call_elem,
- sizeof(channel_data),
- init_channel_elem,
- destroy_channel_elem,
- grpc_call_next_get_peer,
- grpc_channel_next_get_info,
- "message_size"};
diff --git a/src/core/lib/channel/message_size_filter.h b/src/core/lib/channel/message_size_filter.h
deleted file mode 100644
index a88ff7f81a..0000000000
--- a/src/core/lib/channel/message_size_filter.h
+++ /dev/null
@@ -1,39 +0,0 @@
-//
-// Copyright 2016, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-//
-
-#ifndef GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H
-#define GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H
-
-#include "src/core/lib/channel/channel_stack.h"
-
-extern const grpc_channel_filter grpc_message_size_filter;
-
-#endif /* GRPC_CORE_LIB_CHANNEL_MESSAGE_SIZE_FILTER_H */
diff --git a/src/core/lib/security/credentials/credentials.c b/src/core/lib/security/credentials/credentials.c
index 52b80141d1..d89da47fc1 100644
--- a/src/core/lib/security/credentials/credentials.c
+++ b/src/core/lib/security/credentials/credentials.c
@@ -37,7 +37,6 @@
#include <string.h>
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/http/httpcli.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/executor.h"
diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.c b/src/core/lib/security/credentials/ssl/ssl_credentials.c
index 4b17ac8098..b63bb6b6e9 100644
--- a/src/core/lib/security/credentials/ssl/ssl_credentials.c
+++ b/src/core/lib/security/credentials/ssl/ssl_credentials.c
@@ -36,7 +36,6 @@
#include <string.h>
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/surface/api_trace.h"
#include <grpc/support/alloc.h>
diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c
index 7acb444d9b..20f5753004 100644
--- a/src/core/lib/surface/channel_init.c
+++ b/src/core/lib/surface/channel_init.c
@@ -104,30 +104,13 @@ void grpc_channel_init_shutdown(void) {
}
}
-static const char *name_for_type(grpc_channel_stack_type type) {
- switch (type) {
- case GRPC_CLIENT_CHANNEL:
- return "CLIENT_CHANNEL";
- case GRPC_CLIENT_SUBCHANNEL:
- return "CLIENT_SUBCHANNEL";
- case GRPC_SERVER_CHANNEL:
- return "SERVER_CHANNEL";
- case GRPC_CLIENT_LAME_CHANNEL:
- return "CLIENT_LAME_CHANNEL";
- case GRPC_CLIENT_DIRECT_CHANNEL:
- return "CLIENT_DIRECT_CHANNEL";
- case GRPC_NUM_CHANNEL_STACK_TYPES:
- break;
- }
- GPR_UNREACHABLE_CODE(return "UNKNOWN");
-}
-
bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
grpc_channel_stack_type type) {
GPR_ASSERT(g_finalized);
- grpc_channel_stack_builder_set_name(builder, name_for_type(type));
+ grpc_channel_stack_builder_set_name(builder,
+ grpc_channel_stack_type_string(type));
for (size_t i = 0; i < g_slots[type].num_slots; i++) {
const stage_slot *slot = &g_slots[type].slots[i];
diff --git a/src/core/lib/surface/channel_stack_type.c b/src/core/lib/surface/channel_stack_type.c
index c35d603ca3..ed3b53fb36 100644
--- a/src/core/lib/surface/channel_stack_type.c
+++ b/src/core/lib/surface/channel_stack_type.c
@@ -52,3 +52,21 @@ bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type) {
}
GPR_UNREACHABLE_CODE(return true;);
}
+
+const char *grpc_channel_stack_type_string(grpc_channel_stack_type type) {
+ switch (type) {
+ case GRPC_CLIENT_CHANNEL:
+ return "CLIENT_CHANNEL";
+ case GRPC_CLIENT_SUBCHANNEL:
+ return "CLIENT_SUBCHANNEL";
+ case GRPC_SERVER_CHANNEL:
+ return "SERVER_CHANNEL";
+ case GRPC_CLIENT_LAME_CHANNEL:
+ return "CLIENT_LAME_CHANNEL";
+ case GRPC_CLIENT_DIRECT_CHANNEL:
+ return "CLIENT_DIRECT_CHANNEL";
+ case GRPC_NUM_CHANNEL_STACK_TYPES:
+ break;
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
diff --git a/src/core/lib/surface/channel_stack_type.h b/src/core/lib/surface/channel_stack_type.h
index 4eea4f1b01..ccf4e53d27 100644
--- a/src/core/lib/surface/channel_stack_type.h
+++ b/src/core/lib/surface/channel_stack_type.h
@@ -55,4 +55,6 @@ typedef enum {
bool grpc_channel_stack_type_is_client(grpc_channel_stack_type type);
+const char *grpc_channel_stack_type_string(grpc_channel_stack_type type);
+
#endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_STACK_TYPE_H */
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3273addf1d..35e9f7eb30 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -64,6 +64,10 @@ typedef struct {
struct grpc_completion_queue {
/** owned by pollset */
gpr_mu *mu;
+
+ grpc_cq_completion_type completion_type;
+ grpc_cq_polling_type polling_type;
+
/** completed events */
grpc_cq_completion completed_head;
grpc_cq_completion *completed_tail;
@@ -79,6 +83,7 @@ struct grpc_completion_queue {
int shutdown_called;
int is_server_cq;
/** Can the server cq accept incoming channels */
+ /* TODO: sreek - This will no longer be needed. Use polling_type set */
int is_non_listening_server_cq;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@@ -110,13 +115,17 @@ int grpc_cq_event_timeout_trace;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
grpc_error *error);
-grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
+grpc_completion_queue *grpc_completion_queue_create_internal(
+ grpc_cq_completion_type completion_type,
+ grpc_cq_polling_type polling_type) {
grpc_completion_queue *cc;
- GPR_ASSERT(!reserved);
- GPR_TIMER_BEGIN("grpc_completion_queue_create", 0);
+ GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0);
- GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
+ GRPC_API_TRACE(
+ "grpc_completion_queue_create_internal(completion_type=%d, "
+ "polling_type=%d)",
+ 2, (completion_type, polling_type));
cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size());
grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu);
@@ -125,6 +134,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
cc->outstanding_tag_capacity = 0;
#endif
+ cc->completion_type = completion_type;
+ cc->polling_type = polling_type;
+
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
/* One for destroy(), one for pollset_shutdown */
@@ -143,11 +155,19 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc,
grpc_schedule_on_exec_ctx);
- GPR_TIMER_END("grpc_completion_queue_create", 0);
+ GPR_TIMER_END("grpc_completion_queue_create_internal", 0);
return cc;
}
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) {
+ return cc->completion_type;
+}
+
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) {
+ return cc->polling_type;
+}
+
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line) {
@@ -347,6 +367,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
grpc_event ret;
gpr_timespec now;
+ if (cc->completion_type != GRPC_CQ_NEXT) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_next() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_NEXT");
+ abort();
+ }
+
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
GRPC_API_TRACE(
@@ -516,6 +543,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
+ if (cc->completion_type != GRPC_CQ_PLUCK) {
+ gpr_log(GPR_ERROR,
+ "grpc_completion_queue_pluck() cannot be called on this completion "
+ "queue since its completion type is not GRPC_CQ_PLUCK");
+ abort();
+ }
+
if (grpc_cq_pluck_trace) {
GRPC_API_TRACE(
"grpc_completion_queue_pluck("
@@ -680,10 +714,14 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
}
void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO: sreek - use cc->polling_type field here and add a validation check
+ (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose
+ polling_type is set to GRPC_CQ_NON_LISTENING */
cc->is_non_listening_server_cq = 1;
}
bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) {
+ /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */
return (cc->is_non_listening_server_cq == 1);
}
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index 5d73dd7216..1ff3d64293 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -99,4 +99,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc);
void grpc_cq_mark_server_cq(grpc_completion_queue *cc);
int grpc_cq_is_server_cq(grpc_completion_queue *cc);
+grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc);
+grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc);
+
+grpc_completion_queue *grpc_completion_queue_create_internal(
+ grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type);
+
#endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */
diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c
index db67a5192b..d68b84eddd 100644
--- a/src/core/lib/surface/completion_queue_factory.c
+++ b/src/core/lib/surface/completion_queue_factory.c
@@ -36,12 +36,15 @@
#include <grpc/support/log.h>
-/* TODO (sreek) - Currently this does not use the attributes arg. This will be
- added in a future PR */
+/*
+ * == Default completion queue factory implementation ==
+ */
+
static grpc_completion_queue* default_create(
const grpc_completion_queue_factory* factory,
- const grpc_completion_queue_attributes* attributes) {
- return grpc_completion_queue_create(NULL);
+ const grpc_completion_queue_attributes* attr) {
+ return grpc_completion_queue_create_internal(attr->cq_completion_type,
+ attr->cq_polling_type);
}
static grpc_completion_queue_factory_vtable default_vtable = {default_create};
@@ -49,19 +52,24 @@ static grpc_completion_queue_factory_vtable default_vtable = {default_create};
static const grpc_completion_queue_factory g_default_cq_factory = {
"Default Factory", NULL, &default_vtable};
+/*
+ * == Completion queue factory APIs
+ */
+
const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) {
- /* As we add more fields to grpc_completion_queue_attributes, we may have to
- change this assert to:
- GPR_ASSERT (attributes->version >= 1 &&
- attributes->version <= GRPC_CQ_CURRENT_VERSION) */
- GPR_ASSERT(attributes->version == 1);
+ GPR_ASSERT(attributes->version >= 1 &&
+ attributes->version <= GRPC_CQ_CURRENT_VERSION);
/* The default factory can handle version 1 of the attributes structure. We
may have to change this as more fields are added to the structure */
return &g_default_cq_factory;
}
+/*
+ * == Completion queue creation APIs ==
+ */
+
grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) {
GPR_ASSERT(!reserved);
grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT,
@@ -75,3 +83,10 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) {
GRPC_CQ_DEFAULT_POLLING};
return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr);
}
+
+grpc_completion_queue* grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attr, void* reserved) {
+ GPR_ASSERT(!reserved);
+ return factory->vtable->create(factory, attr);
+}
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 91bd014a0e..4b381b1954 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -41,13 +41,8 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_stack.h"
-#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
-#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/channel/handshaker_registry.h"
-#include "src/core/lib/channel/http_client_filter.h"
-#include "src/core/lib/channel/http_server_filter.h"
-#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -95,57 +90,13 @@ static bool prepend_filter(grpc_exec_ctx *exec_ctx,
builder, (const grpc_channel_filter *)arg, NULL, NULL);
}
-static bool maybe_add_http_filter(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack_builder *builder,
- void *arg) {
- grpc_transport *t = grpc_channel_stack_builder_get_transport(builder);
- if (t && strstr(t->vtable->name, "http")) {
- return grpc_channel_stack_builder_prepend_filter(
- builder, (const grpc_channel_filter *)arg, NULL, NULL);
- }
- return true;
-}
-
static void register_builtin_channel_init() {
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_client_deadline_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_server_deadline_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_message_size_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- prepend_filter, (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
- (void *)&grpc_compress_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_client_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_client_filter);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
- maybe_add_http_filter, (void *)&grpc_http_server_filter);
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
grpc_add_connected_filter, NULL);
@@ -189,7 +140,6 @@ void grpc_init(void) {
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
- grpc_register_tracer("compression", &grpc_compression_trace);
grpc_register_tracer("queue_pluck", &grpc_cq_pluck_trace);
grpc_register_tracer("combiner", &grpc_combiner_trace);
grpc_register_tracer("server_channel", &grpc_server_channel_trace);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 191ee75252..9496f90390 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1000,6 +1000,15 @@ void grpc_server_register_completion_queue(grpc_server *server,
GRPC_API_TRACE(
"grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
(server, cq, reserved));
+
+ if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
+ gpr_log(GPR_INFO,
+ "Completion queue which is not of type GRPC_CQ_NEXT is being "
+ "registered as a server-completion-queue");
+ /* Ideally we should log an error and abort but ruby-wrapped-language API
+ calls grpc_completion_queue_pluck() on server completion queues */
+ }
+
register_completion_queue(server, cq, false, reserved);
}
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index ba80bd801e..3793845559 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,6 +36,6 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "3.0.0-dev"; }
+const char *grpc_version_string(void) { return "4.0.0-dev"; }
const char *grpc_g_stands_for(void) { return "gentle"; }