diff options
author | Craig Tiller <ctiller@google.com> | 2016-03-28 22:54:31 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-03-28 22:54:31 -0700 |
commit | 97684571012a1f40218dc31c08cb68790d325a10 (patch) | |
tree | 94c9020d2c5e33caa0aa7ae5b511d5da8f450eec /src/core/lib/channel | |
parent | 781bcd2fe263fc98dfea8393f3fd7b53b214c973 (diff) | |
parent | c5ff781091db0cf6df282d0768102a2ea17d7e24 (diff) |
Merge branch 'ignore_connectivity' of github.com:ctiller/grpc into ignore_connectivity
Diffstat (limited to 'src/core/lib/channel')
19 files changed, 3498 insertions, 0 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c new file mode 100644 index 0000000000..1a02f1f4aa --- /dev/null +++ b/src/core/lib/channel/channel_args.c @@ -0,0 +1,271 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/channel_args.h" +#include <grpc/grpc.h> +#include "src/core/lib/support/string.h" + +#include <grpc/census.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/useful.h> + +#include <string.h> + +static grpc_arg copy_arg(const grpc_arg *src) { + grpc_arg dst; + dst.type = src->type; + dst.key = gpr_strdup(src->key); + switch (dst.type) { + case GRPC_ARG_STRING: + dst.value.string = gpr_strdup(src->value.string); + break; + case GRPC_ARG_INTEGER: + dst.value.integer = src->value.integer; + break; + case GRPC_ARG_POINTER: + dst.value.pointer = src->value.pointer; + dst.value.pointer.p = + src->value.pointer.vtable->copy(src->value.pointer.p); + break; + } + return dst; +} + +grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, + const grpc_arg *to_add, + size_t num_to_add) { + grpc_channel_args *dst = gpr_malloc(sizeof(grpc_channel_args)); + size_t i; + size_t src_num_args = (src == NULL) ? 0 : src->num_args; + if (!src && !to_add) { + dst->num_args = 0; + dst->args = NULL; + return dst; + } + dst->num_args = src_num_args + num_to_add; + dst->args = gpr_malloc(sizeof(grpc_arg) * dst->num_args); + for (i = 0; i < src_num_args; i++) { + dst->args[i] = copy_arg(&src->args[i]); + } + for (i = 0; i < num_to_add; i++) { + dst->args[i + src_num_args] = copy_arg(&to_add[i]); + } + return dst; +} + +grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src) { + return grpc_channel_args_copy_and_add(src, NULL, 0); +} + +grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, + const grpc_channel_args *b) { + return grpc_channel_args_copy_and_add(a, b->args, b->num_args); +} + +static int cmp_arg(const grpc_arg *a, const grpc_arg *b) { + int c = GPR_ICMP(a->type, b->type); + if (c != 0) return c; + c = strcmp(a->key, b->key); + if (c != 0) return c; + switch (a->type) { + case GRPC_ARG_STRING: + return strcmp(a->value.string, b->value.string); + case GRPC_ARG_INTEGER: + return GPR_ICMP(a->value.integer, b->value.integer); + case GRPC_ARG_POINTER: + c = GPR_ICMP(a->value.pointer.p, b->value.pointer.p); + if (c != 0) { + c = GPR_ICMP(a->value.pointer.vtable, b->value.pointer.vtable); + if (c == 0) { + c = a->value.pointer.vtable->cmp(a->value.pointer.p, + b->value.pointer.p); + } + } + return c; + } + GPR_UNREACHABLE_CODE(return 0); +} + +/* stabilizing comparison function: since channel_args ordering matters for + * keys with the same name, we need to preserve that ordering */ +static int cmp_key_stable(const void *ap, const void *bp) { + const grpc_arg *const *a = ap; + const grpc_arg *const *b = bp; + int c = strcmp((*a)->key, (*b)->key); + if (c == 0) c = GPR_ICMP(*a, *b); + return c; +} + +grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a) { + grpc_arg **args = gpr_malloc(sizeof(grpc_arg *) * a->num_args); + for (size_t i = 0; i < a->num_args; i++) { + args[i] = &a->args[i]; + } + qsort(args, a->num_args, sizeof(grpc_arg *), cmp_key_stable); + + grpc_channel_args *b = gpr_malloc(sizeof(grpc_channel_args)); + b->num_args = a->num_args; + b->args = gpr_malloc(sizeof(grpc_arg) * b->num_args); + for (size_t i = 0; i < a->num_args; i++) { + b->args[i] = copy_arg(args[i]); + } + + gpr_free(args); + return b; +} + +void grpc_channel_args_destroy(grpc_channel_args *a) { + size_t i; + for (i = 0; i < a->num_args; i++) { + switch (a->args[i].type) { + case GRPC_ARG_STRING: + gpr_free(a->args[i].value.string); + break; + case GRPC_ARG_INTEGER: + break; + case GRPC_ARG_POINTER: + a->args[i].value.pointer.vtable->destroy(a->args[i].value.pointer.p); + break; + } + gpr_free(a->args[i].key); + } + gpr_free(a->args); + gpr_free(a); +} + +int grpc_channel_args_is_census_enabled(const grpc_channel_args *a) { + size_t i; + if (a == NULL) return 0; + for (i = 0; i < a->num_args; i++) { + if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) { + return a->args[i].value.integer != 0 && census_enabled(); + } + } + return census_enabled(); +} + +grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( + const grpc_channel_args *a) { + size_t i; + if (a == NULL) return 0; + for (i = 0; i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) { + return (grpc_compression_algorithm)a->args[i].value.integer; + break; + } + } + return GRPC_COMPRESS_NONE; +} + +grpc_channel_args *grpc_channel_args_set_compression_algorithm( + grpc_channel_args *a, grpc_compression_algorithm algorithm) { + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG; + tmp.value.integer = algorithm; + return grpc_channel_args_copy_and_add(a, &tmp, 1); +} + +/** Returns 1 if the argument for compression algorithm's enabled states bitset + * was found in \a a, returning the arg's value in \a states. Otherwise, returns + * 0. */ +static int find_compression_algorithm_states_bitset(const grpc_channel_args *a, + int **states_arg) { + if (a != NULL) { + size_t i; + for (i = 0; i < a->num_args; ++i) { + if (a->args[i].type == GRPC_ARG_INTEGER && + !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) { + *states_arg = &a->args[i].value.integer; + return 1; /* GPR_TRUE */ + } + } + } + return 0; /* GPR_FALSE */ +} + +grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( + grpc_channel_args **a, grpc_compression_algorithm algorithm, int state) { + int *states_arg; + grpc_channel_args *result = *a; + const int states_arg_found = + find_compression_algorithm_states_bitset(*a, &states_arg); + + if (states_arg_found) { + if (state != 0) { + GPR_BITSET((unsigned *)states_arg, algorithm); + } else { + GPR_BITCLEAR((unsigned *)states_arg, algorithm); + } + } else { + /* create a new arg */ + grpc_arg tmp; + tmp.type = GRPC_ARG_INTEGER; + tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG; + /* all enabled by default */ + tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; + if (state != 0) { + GPR_BITSET((unsigned *)&tmp.value.integer, algorithm); + } else { + GPR_BITCLEAR((unsigned *)&tmp.value.integer, algorithm); + } + result = grpc_channel_args_copy_and_add(*a, &tmp, 1); + grpc_channel_args_destroy(*a); + *a = result; + } + return result; +} + +int grpc_channel_args_compression_algorithm_get_states( + const grpc_channel_args *a) { + int *states_arg; + if (find_compression_algorithm_states_bitset(a, &states_arg)) { + return *states_arg; + } else { + return (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1; /* All algs. enabled */ + } +} + +int grpc_channel_args_compare(const grpc_channel_args *a, + const grpc_channel_args *b) { + int c = GPR_ICMP(a->num_args, b->num_args); + if (c != 0) return c; + for (size_t i = 0; i < a->num_args; i++) { + c = cmp_arg(&a->args[i], &b->args[i]); + if (c != 0) return c; + } + return 0; +} diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h new file mode 100644 index 0000000000..67d287ec6b --- /dev/null +++ b/src/core/lib/channel/channel_args.h @@ -0,0 +1,94 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H +#define GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H + +#include <grpc/compression.h> +#include <grpc/grpc.h> + +/* Copy some arguments */ +grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src); + +/* Copy some arguments, stably sorting keys */ +grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a); + +/** Copy some arguments and add the to_add parameter in the end. + If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */ +grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src, + const grpc_arg *to_add, + size_t num_to_add); + +/** Copy args from a then args from b into a new channel args */ +grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a, + const grpc_channel_args *b); + +/** Destroy arguments created by grpc_channel_args_copy */ +void grpc_channel_args_destroy(grpc_channel_args *a); + +/** Reads census_enabled settings from channel args. Returns 1 if census_enabled + * is specified in channel args, otherwise returns 0. */ +int grpc_channel_args_is_census_enabled(const grpc_channel_args *a); + +/** Returns the compression algorithm set in \a a. */ +grpc_compression_algorithm grpc_channel_args_get_compression_algorithm( + const grpc_channel_args *a); + +/** Returns a channel arg instance with compression enabled. If \a a is + * non-NULL, its args are copied. N.B. GRPC_COMPRESS_NONE disables compression + * for the channel. */ +grpc_channel_args *grpc_channel_args_set_compression_algorithm( + grpc_channel_args *a, grpc_compression_algorithm algorithm); + +/** Sets the support for the given compression algorithm. By default, all + * compression algorithms are enabled. It's an error to disable an algorithm set + * by grpc_channel_args_set_compression_algorithm. + * + * Returns an instance with the updated algorithm states. The \a a pointer is + * modified to point to the returned instance (which may be different from the + * input value of \a a). */ +grpc_channel_args *grpc_channel_args_compression_algorithm_set_state( + grpc_channel_args **a, grpc_compression_algorithm algorithm, int enabled); + +/** Returns the bitset representing the support state (true for enabled, false + * for disabled) for compression algorithms. + * + * The i-th bit of the returned bitset corresponds to the i-th entry in the + * grpc_compression_algorithm enum. */ +int grpc_channel_args_compression_algorithm_get_states( + const grpc_channel_args *a); + +int grpc_channel_args_compare(const grpc_channel_args *a, + const grpc_channel_args *b); + +#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_ARGS_H */ diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c new file mode 100644 index 0000000000..52283e35fa --- /dev/null +++ b/src/core/lib/channel/channel_stack.c @@ -0,0 +1,262 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/channel_stack.h" +#include <grpc/support/log.h> + +#include <stdlib.h> +#include <string.h> + +int grpc_trace_channel = 0; + +/* Memory layouts. + + Channel stack is laid out as: { + grpc_channel_stack stk; + padding to GPR_MAX_ALIGNMENT + grpc_channel_element[stk.count]; + per-filter memory, aligned to GPR_MAX_ALIGNMENT + } + + Call stack is laid out as: { + grpc_call_stack stk; + padding to GPR_MAX_ALIGNMENT + grpc_call_element[stk.count]; + per-filter memory, aligned to GPR_MAX_ALIGNMENT + } */ + +/* Given a size, round up to the next multiple of sizeof(void*) */ +#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ + (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) + +size_t grpc_channel_stack_size(const grpc_channel_filter **filters, + size_t filter_count) { + /* always need the header, and size for the channel elements */ + size_t size = + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); + size_t i; + + GPR_ASSERT((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0 && + "GPR_MAX_ALIGNMENT must be a power of two"); + + /* add the size for each filter */ + for (i = 0; i < filter_count; i++) { + size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); + } + + return size; +} + +#define CHANNEL_ELEMS_FROM_STACK(stk) \ + ((grpc_channel_element *)((char *)(stk) + ROUND_UP_TO_ALIGNMENT_SIZE( \ + sizeof(grpc_channel_stack)))) + +#define CALL_ELEMS_FROM_STACK(stk) \ + ((grpc_call_element *)((char *)(stk) + \ + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)))) + +grpc_channel_element *grpc_channel_stack_element( + grpc_channel_stack *channel_stack, size_t index) { + return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index; +} + +grpc_channel_element *grpc_channel_stack_last_element( + grpc_channel_stack *channel_stack) { + return grpc_channel_stack_element(channel_stack, channel_stack->count - 1); +} + +grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, + size_t index) { + return CALL_ELEMS_FROM_STACK(call_stack) + index; +} + +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, + const grpc_channel_filter **filters, + size_t filter_count, + const grpc_channel_args *channel_args, + const char *name, grpc_channel_stack *stack) { + size_t call_size = + ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element)); + grpc_channel_element *elems; + grpc_channel_element_args args; + char *user_data; + size_t i; + + stack->count = filter_count; + GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg, + name); + elems = CHANNEL_ELEMS_FROM_STACK(stack); + user_data = + ((char *)elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_channel_element)); + + /* init per-filter data */ + for (i = 0; i < filter_count; i++) { + args.channel_stack = stack; + args.channel_args = channel_args; + args.is_first = i == 0; + args.is_last = i == (filter_count - 1); + elems[i].filter = filters[i]; + elems[i].channel_data = user_data; + elems[i].filter->init_channel_elem(exec_ctx, &elems[i], &args); + user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); + call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); + } + + GPR_ASSERT(user_data > (char *)stack); + GPR_ASSERT((uintptr_t)(user_data - (char *)stack) == + grpc_channel_stack_size(filters, filter_count)); + + stack->call_stack_size = call_size; +} + +void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *stack) { + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(stack); + size_t count = stack->count; + size_t i; + + /* destroy per-filter data */ + for (i = 0; i < count; i++) { + channel_elems[i].filter->destroy_channel_elem(exec_ctx, &channel_elems[i]); + } +} + +void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, + const void *transport_server_data, + grpc_call_stack *call_stack) { + grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); + grpc_call_element_args args; + size_t count = channel_stack->count; + grpc_call_element *call_elems; + char *user_data; + size_t i; + + call_stack->count = count; + GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, + destroy_arg, "CALL_STACK"); + call_elems = CALL_ELEMS_FROM_STACK(call_stack); + user_data = ((char *)call_elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); + + /* init per-filter data */ + for (i = 0; i < count; i++) { + args.call_stack = call_stack; + args.server_transport_data = transport_server_data; + args.context = context; + call_elems[i].filter = channel_elems[i].filter; + call_elems[i].channel_data = channel_elems[i].channel_data; + call_elems[i].call_data = user_data; + call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args); + user_data += + ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); + } +} + +void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_stack *call_stack, + grpc_pollset *pollset) { + size_t count = call_stack->count; + grpc_call_element *call_elems; + char *user_data; + size_t i; + + call_elems = CALL_ELEMS_FROM_STACK(call_stack); + user_data = ((char *)call_elems) + + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); + + /* init per-filter data */ + for (i = 0; i < count; i++) { + call_elems[i].filter->set_pollset(exec_ctx, &call_elems[i], pollset); + user_data += + ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); + } +} + +void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_pollset *pollset) {} + +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { + grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); + size_t count = stack->count; + size_t i; + + /* destroy per-filter data */ + for (i = 0; i < count; i++) { + elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]); + } +} + +void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op *op) { + grpc_call_element *next_elem = elem + 1; + next_elem->filter->start_transport_stream_op(exec_ctx, next_elem, op); +} + +char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + grpc_call_element *next_elem = elem + 1; + return next_elem->filter->get_peer(exec_ctx, next_elem); +} + +void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_transport_op *op) { + grpc_channel_element *next_elem = elem + 1; + next_elem->filter->start_transport_op(exec_ctx, next_elem, op); +} + +grpc_channel_stack *grpc_channel_stack_from_top_element( + grpc_channel_element *elem) { + return (grpc_channel_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_channel_stack))); +} + +grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { + return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( + sizeof(grpc_call_stack))); +} + +void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, + grpc_call_element *cur_elem) { + grpc_transport_stream_op op; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + grpc_call_next_op(exec_ctx, cur_elem, &op); +} diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h new file mode 100644 index 0000000000..b29bee411d --- /dev/null +++ b/src/core/lib/channel/channel_stack.h @@ -0,0 +1,260 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H +#define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H + +/* A channel filter defines how operations on a channel are implemented. + Channel filters are chained together to create full channels, and if those + chains are linear, then channel stacks provide a mechanism to minimize + allocations for that chain. + Call stacks are created by channel stacks and represent the per-call data + for that stack. */ + +#include <stddef.h> + +#include <grpc/grpc.h> +#include <grpc/support/log.h> +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/transport/transport.h" + +typedef struct grpc_channel_element grpc_channel_element; +typedef struct grpc_call_element grpc_call_element; + +typedef struct grpc_channel_stack grpc_channel_stack; +typedef struct grpc_call_stack grpc_call_stack; + +typedef struct { + grpc_channel_stack *channel_stack; + const grpc_channel_args *channel_args; + int is_first; + int is_last; +} grpc_channel_element_args; + +typedef struct { + grpc_call_stack *call_stack; + const void *server_transport_data; + grpc_call_context_element *context; +} grpc_call_element_args; + +/* Channel filters specify: + 1. the amount of memory needed in the channel & call (via the sizeof_XXX + members) + 2. functions to initialize and destroy channel & call data + (init_XXX, destroy_XXX) + 3. functions to implement call operations and channel operations (call_op, + channel_op) + 4. a name, which is useful when debugging + + Members are laid out in approximate frequency of use order. */ +typedef struct { + /* Called to eg. send/receive data on a call. + See grpc_call_next_op on how to call the next element in the stack */ + void (*start_transport_stream_op)(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op); + /* Called to handle channel level operations - e.g. new calls, or transport + closure. + See grpc_channel_next_op on how to call the next element in the stack */ + void (*start_transport_op)(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_transport_op *op); + + /* sizeof(per call data) */ + size_t sizeof_call_data; + /* Initialize per call data. + elem is initialized at the start of the call, and elem->call_data is what + needs initializing. + The filter does not need to do any chaining. + server_transport_data is an opaque pointer. If it is NULL, this call is + on a client; if it is non-NULL, then it points to memory owned by the + transport and is on the server. Most filters want to ignore this + argument. */ + void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args); + void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_pollset *pollset); + /* Destroy per call data. + The filter does not need to do any chaining */ + void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + + /* sizeof(per channel data) */ + size_t sizeof_channel_data; + /* Initialize per-channel data. + elem is initialized at the start of the call, and elem->channel_data is + what needs initializing. + is_first, is_last designate this elements position in the stack, and are + useful for asserting correct configuration by upper layer code. + The filter does not need to do any chaining */ + void (*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_channel_element_args *args); + /* Destroy per channel data. + The filter does not need to do any chaining */ + void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem); + + /* Implement grpc_call_get_peer() */ + char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + + /* The name of this filter */ + const char *name; +} grpc_channel_filter; + +/* A channel_element tracks its filter and the filter requested memory within + a channel allocation */ +struct grpc_channel_element { + const grpc_channel_filter *filter; + void *channel_data; +}; + +/* A call_element tracks its filter, the filter requested memory within + a channel allocation, and the filter requested memory within a call + allocation */ +struct grpc_call_element { + const grpc_channel_filter *filter; + void *channel_data; + void *call_data; +}; + +/* A channel stack tracks a set of related filters for one channel, and + guarantees they live within a single malloc() allocation */ +struct grpc_channel_stack { + grpc_stream_refcount refcount; + size_t count; + /* Memory required for a call stack (computed at channel stack + initialization) */ + size_t call_stack_size; +}; + +/* A call stack tracks a set of related filters for one call, and guarantees + they live within a single malloc() allocation */ +struct grpc_call_stack { + /* shared refcount for this channel stack. + MUST be the first element: the underlying code calls destroy + with the address of the refcount, but higher layers prefer to think + about the address of the call stack itself. */ + grpc_stream_refcount refcount; + size_t count; +}; + +/* Get a channel element given a channel stack and its index */ +grpc_channel_element *grpc_channel_stack_element(grpc_channel_stack *stack, + size_t i); +/* Get the last channel element in a channel stack */ +grpc_channel_element *grpc_channel_stack_last_element( + grpc_channel_stack *stack); +/* Get a call stack element given a call stack and an index */ +grpc_call_element *grpc_call_stack_element(grpc_call_stack *stack, size_t i); + +/* Determine memory required for a channel stack containing a set of filters */ +size_t grpc_channel_stack_size(const grpc_channel_filter **filters, + size_t filter_count); +/* Initialize a channel stack given some filters */ +void grpc_channel_stack_init(grpc_exec_ctx *exec_ctx, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, + const grpc_channel_filter **filters, + size_t filter_count, const grpc_channel_args *args, + const char *name, grpc_channel_stack *stack); +/* Destroy a channel stack */ +void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *stack); + +/* Initialize a call stack given a channel stack. transport_server_data is + expected to be NULL on a client, or an opaque transport owned pointer on the + server. */ +void grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, int initial_refs, + grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, + const void *transport_server_data, + grpc_call_stack *call_stack); +/* Set a pollset for a call stack: must occur before the first op is started */ +void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_stack *call_stack, + grpc_pollset *pollset); + +#ifdef GRPC_STREAM_REFCOUNT_DEBUG +#define GRPC_CALL_STACK_REF(call_stack, reason) \ + grpc_stream_ref(&(call_stack)->refcount, reason) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ + grpc_stream_unref(exec_ctx, &(call_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount, reason) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount, reason) +#else +#define GRPC_CALL_STACK_REF(call_stack, reason) \ + grpc_stream_ref(&(call_stack)->refcount) +#define GRPC_CALL_STACK_UNREF(exec_ctx, call_stack, reason) \ + grpc_stream_unref(exec_ctx, &(call_stack)->refcount) +#define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \ + grpc_stream_ref(&(channel_stack)->refcount) +#define GRPC_CHANNEL_STACK_UNREF(exec_ctx, channel_stack, reason) \ + grpc_stream_unref(exec_ctx, &(channel_stack)->refcount) +#endif + +/* Destroy a call stack */ +void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); + +/* Ignore set pollset - used by filters to implement the set_pollset method + if they don't care about pollsets at all. Does nothing. */ +void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_pollset *pollset); +/* Call the next operation in a call stack */ +void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op *op); +/* Call the next operation (depending on call directionality) in a channel + stack */ +void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_transport_op *op); +/* Pass through a request to get_peer to the next child element */ +char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + +/* Given the top element of a channel stack, get the channel stack itself */ +grpc_channel_stack *grpc_channel_stack_from_top_element( + grpc_channel_element *elem); +/* Given the top element of a call stack, get the call stack itself */ +grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); + +void grpc_call_log_op(char *file, int line, gpr_log_severity severity, + grpc_call_element *elem, grpc_transport_stream_op *op); + +void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx, + grpc_call_element *cur_elem); + +extern int grpc_trace_channel; + +#define GRPC_CALL_LOG_OP(sev, elem, op) \ + if (grpc_trace_channel) grpc_call_log_op(sev, elem, op) + +#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H */ diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c new file mode 100644 index 0000000000..1ce0c4e07f --- /dev/null +++ b/src/core/lib/channel/channel_stack_builder.c @@ -0,0 +1,258 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/channel_stack_builder.h" + +#include <string.h> + +#include <grpc/support/alloc.h> + +int grpc_trace_channel_stack_builder = 0; + +typedef struct filter_node { + struct filter_node *next; + struct filter_node *prev; + const grpc_channel_filter *filter; + grpc_post_filter_create_init_func init; + void *init_arg; +} filter_node; + +struct grpc_channel_stack_builder { + // sentinel nodes for filters that have been added + filter_node begin; + filter_node end; + // various set/get-able parameters + const grpc_channel_args *args; + grpc_transport *transport; + const char *name; +}; + +struct grpc_channel_stack_builder_iterator { + grpc_channel_stack_builder *builder; + filter_node *node; +}; + +grpc_channel_stack_builder *grpc_channel_stack_builder_create(void) { + grpc_channel_stack_builder *b = gpr_malloc(sizeof(*b)); + memset(b, 0, sizeof(*b)); + + b->begin.filter = NULL; + b->end.filter = NULL; + b->begin.next = &b->end; + b->begin.prev = &b->end; + b->end.next = &b->begin; + b->end.prev = &b->begin; + + return b; +} + +static grpc_channel_stack_builder_iterator *create_iterator_at_filter_node( + grpc_channel_stack_builder *builder, filter_node *node) { + grpc_channel_stack_builder_iterator *it = gpr_malloc(sizeof(*it)); + it->builder = builder; + it->node = node; + return it; +} + +void grpc_channel_stack_builder_iterator_destroy( + grpc_channel_stack_builder_iterator *it) { + gpr_free(it); +} + +grpc_channel_stack_builder_iterator * +grpc_channel_stack_builder_create_iterator_at_first( + grpc_channel_stack_builder *builder) { + return create_iterator_at_filter_node(builder, &builder->begin); +} + +grpc_channel_stack_builder_iterator * +grpc_channel_stack_builder_create_iterator_at_last( + grpc_channel_stack_builder *builder) { + return create_iterator_at_filter_node(builder, &builder->end); +} + +bool grpc_channel_stack_builder_move_next( + grpc_channel_stack_builder_iterator *iterator) { + if (iterator->node == &iterator->builder->end) return false; + iterator->node = iterator->node->next; + return true; +} + +bool grpc_channel_stack_builder_move_prev( + grpc_channel_stack_builder_iterator *iterator) { + if (iterator->node == &iterator->builder->begin) return false; + iterator->node = iterator->node->prev; + return true; +} + +bool grpc_channel_stack_builder_move_prev( + grpc_channel_stack_builder_iterator *iterator); + +void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder, + const char *name) { + GPR_ASSERT(builder->name == NULL); + builder->name = name; +} + +void grpc_channel_stack_builder_set_channel_arguments( + grpc_channel_stack_builder *builder, const grpc_channel_args *args) { + GPR_ASSERT(builder->args == NULL); + builder->args = args; +} + +void grpc_channel_stack_builder_set_transport( + grpc_channel_stack_builder *builder, grpc_transport *transport) { + GPR_ASSERT(builder->transport == NULL); + builder->transport = transport; +} + +grpc_transport *grpc_channel_stack_builder_get_transport( + grpc_channel_stack_builder *builder) { + return builder->transport; +} + +const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments( + grpc_channel_stack_builder *builder) { + return builder->args; +} + +bool grpc_channel_stack_builder_append_filter( + grpc_channel_stack_builder *builder, const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, void *user_data) { + grpc_channel_stack_builder_iterator *it = + grpc_channel_stack_builder_create_iterator_at_last(builder); + bool ok = grpc_channel_stack_builder_add_filter_before( + it, filter, post_init_func, user_data); + grpc_channel_stack_builder_iterator_destroy(it); + return ok; +} + +bool grpc_channel_stack_builder_prepend_filter( + grpc_channel_stack_builder *builder, const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, void *user_data) { + grpc_channel_stack_builder_iterator *it = + grpc_channel_stack_builder_create_iterator_at_first(builder); + bool ok = grpc_channel_stack_builder_add_filter_after( + it, filter, post_init_func, user_data); + grpc_channel_stack_builder_iterator_destroy(it); + return ok; +} + +static void add_after(filter_node *before, const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, + void *user_data) { + filter_node *new = gpr_malloc(sizeof(*new)); + new->next = before->next; + new->prev = before; + new->next->prev = new->prev->next = new; + new->filter = filter; + new->init = post_init_func; + new->init_arg = user_data; +} + +bool grpc_channel_stack_builder_add_filter_before( + grpc_channel_stack_builder_iterator *iterator, + const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, void *user_data) { + if (iterator->node == &iterator->builder->begin) return false; + add_after(iterator->node->prev, filter, post_init_func, user_data); + return true; +} + +bool grpc_channel_stack_builder_add_filter_after( + grpc_channel_stack_builder_iterator *iterator, + const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, void *user_data) { + if (iterator->node == &iterator->builder->end) return false; + add_after(iterator->node, filter, post_init_func, user_data); + return true; +} + +void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder) { + filter_node *p = builder->begin.next; + while (p != &builder->end) { + filter_node *next = p->next; + gpr_free(p); + p = next; + } + gpr_free(builder); +} + +void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, + size_t prefix_bytes, int initial_refs, + grpc_iomgr_cb_func destroy, + void *destroy_arg) { + // count the number of filters + size_t num_filters = 0; + for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) { + num_filters++; + } + + // create an array of filters + const grpc_channel_filter **filters = + gpr_malloc(sizeof(*filters) * num_filters); + size_t i = 0; + for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) { + filters[i++] = p->filter; + } + + // calculate the size of the channel stack + size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters); + + // allocate memory, with prefix_bytes followed by channel_stack_size + char *result = gpr_malloc(prefix_bytes + channel_stack_size); + // fetch a pointer to the channel stack + grpc_channel_stack *channel_stack = + (grpc_channel_stack *)(result + prefix_bytes); + // and initialize it + grpc_channel_stack_init(exec_ctx, initial_refs, destroy, + destroy_arg == NULL ? result : destroy_arg, filters, + num_filters, builder->args, builder->name, + channel_stack); + + // run post-initialization functions + i = 0; + for (filter_node *p = builder->begin.next; p != &builder->end; p = p->next) { + if (p->init != NULL) { + p->init(channel_stack, grpc_channel_stack_element(channel_stack, i), + p->init_arg); + } + i++; + } + + grpc_channel_stack_builder_destroy(builder); + gpr_free((grpc_channel_filter **)filters); + + return result; +} diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h new file mode 100644 index 0000000000..8532c4462a --- /dev/null +++ b/src/core/lib/channel/channel_stack_builder.h @@ -0,0 +1,155 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H +#define GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H + +#include <stdbool.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" + +/// grpc_channel_stack_builder offers a programmatic interface to selected +/// and order channel filters +typedef struct grpc_channel_stack_builder grpc_channel_stack_builder; +typedef struct grpc_channel_stack_builder_iterator + grpc_channel_stack_builder_iterator; + +/// Create a new channel stack builder +grpc_channel_stack_builder *grpc_channel_stack_builder_create(void); + +/// Assign a name to the channel stack: \a name must be statically allocated +void grpc_channel_stack_builder_set_name(grpc_channel_stack_builder *builder, + const char *name); + +/// Attach \a transport to the builder (does not take ownership) +void grpc_channel_stack_builder_set_transport( + grpc_channel_stack_builder *builder, grpc_transport *transport); + +/// Fetch attached transport +grpc_transport *grpc_channel_stack_builder_get_transport( + grpc_channel_stack_builder *builder); + +/// Set channel arguments: \a args must continue to exist until after +/// grpc_channel_stack_builder_finish returns +void grpc_channel_stack_builder_set_channel_arguments( + grpc_channel_stack_builder *builder, const grpc_channel_args *args); + +/// Return a borrowed pointer to the channel arguments +const grpc_channel_args *grpc_channel_stack_builder_get_channel_arguments( + grpc_channel_stack_builder *builder); + +/// Begin iterating over already defined filters in the builder at the beginning +grpc_channel_stack_builder_iterator * +grpc_channel_stack_builder_create_iterator_at_first( + grpc_channel_stack_builder *builder); + +/// Begin iterating over already defined filters in the builder at the end +grpc_channel_stack_builder_iterator * +grpc_channel_stack_builder_create_iterator_at_last( + grpc_channel_stack_builder *builder); + +/// Is an iterator at the first element? +bool grpc_channel_stack_builder_iterator_is_first( + grpc_channel_stack_builder_iterator *iterator); + +/// Is an iterator at the end? +bool grpc_channel_stack_builder_iterator_is_end( + grpc_channel_stack_builder_iterator *iterator); + +/// Move an iterator to the next item +bool grpc_channel_stack_builder_move_next( + grpc_channel_stack_builder_iterator *iterator); + +/// Move an iterator to the previous item +bool grpc_channel_stack_builder_move_prev( + grpc_channel_stack_builder_iterator *iterator); + +typedef void (*grpc_post_filter_create_init_func)( + grpc_channel_stack *channel_stack, grpc_channel_element *elem, void *arg); + +/// Add \a filter to the stack, after \a iterator. +/// Call \a post_init_func(..., \a user_data) once the channel stack is +/// created. +bool grpc_channel_stack_builder_add_filter_after( + grpc_channel_stack_builder_iterator *iterator, + const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, + void *user_data) GRPC_MUST_USE_RESULT; + +/// Add \a filter to the stack, before \a iterator. +/// Call \a post_init_func(..., \a user_data) once the channel stack is +/// created. +bool grpc_channel_stack_builder_add_filter_before( + grpc_channel_stack_builder_iterator *iterator, + const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, + void *user_data) GRPC_MUST_USE_RESULT; + +/// Add \a filter to the beginning of the filter list. +/// Call \a post_init_func(..., \a user_data) once the channel stack is +/// created. +bool grpc_channel_stack_builder_prepend_filter( + grpc_channel_stack_builder *builder, const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, + void *user_data) GRPC_MUST_USE_RESULT; + +/// Add \a filter to the end of the filter list. +/// Call \a post_init_func(..., \a user_data) once the channel stack is +/// created. +bool grpc_channel_stack_builder_append_filter( + grpc_channel_stack_builder *builder, const grpc_channel_filter *filter, + grpc_post_filter_create_init_func post_init_func, + void *user_data) GRPC_MUST_USE_RESULT; + +/// Terminate iteration and destroy \a iterator +void grpc_channel_stack_builder_iterator_destroy( + grpc_channel_stack_builder_iterator *iterator); + +/// Destroy the builder, return the freshly minted channel stack +/// Allocates \a prefix_bytes bytes before the channel stack +/// Returns the base pointer of the allocated block +/// \a initial_refs, \a destroy, \a destroy_arg are as per +/// grpc_channel_stack_init +void *grpc_channel_stack_builder_finish(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, + size_t prefix_bytes, int initial_refs, + grpc_iomgr_cb_func destroy, + void *destroy_arg); + +/// Destroy the builder without creating a channel stack +void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder); + +extern int grpc_trace_channel_stack_builder; + +#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */ diff --git a/src/core/lib/channel/client_channel.c b/src/core/lib/channel/client_channel.c new file mode 100644 index 0000000000..58eeb9f745 --- /dev/null +++ b/src/core/lib/channel/client_channel.c @@ -0,0 +1,548 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/client_channel.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/channel/subchannel_call_holder.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/transport/connectivity_state.h" + +/* Client channel implementation */ + +typedef grpc_subchannel_call_holder call_data; + +typedef struct client_channel_channel_data { + /** resolver for this channel */ + grpc_resolver *resolver; + /** have we started resolving this channel */ + int started_resolving; + + /** mutex protecting client configuration, including all + variables below in this data structure */ + gpr_mu mu_config; + /** currently active load balancer - guarded by mu_config */ + grpc_lb_policy *lb_policy; + /** incoming configuration - set by resolver.next + guarded by mu_config */ + grpc_client_config *incoming_configuration; + /** a list of closures that are all waiting for config to come in */ + grpc_closure_list waiting_for_config_closures; + /** resolver callback */ + grpc_closure on_config_changed; + /** connectivity state being tracked */ + grpc_connectivity_state_tracker state_tracker; + /** when an lb_policy arrives, should we try to exit idle */ + int exit_idle_when_lb_policy_arrives; + /** owning stack */ + grpc_channel_stack *owning_stack; + /** interested parties (owned) */ + grpc_pollset_set *interested_parties; +} channel_data; + +/** We create one watcher for each new lb_policy that is returned from a + resolver, + to watch for state changes from the lb_policy. When a state change is seen, + we + update the channel, and create a new watcher */ +typedef struct { + channel_data *chand; + grpc_closure on_changed; + grpc_connectivity_state state; + grpc_lb_policy *lb_policy; +} lb_policy_connectivity_watcher; + +typedef struct { + grpc_closure closure; + grpc_call_element *elem; +} waiting_call; + +static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); +} + +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op); +} + +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state); + +static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, + channel_data *chand, + grpc_connectivity_state state, + const char *reason) { + if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE || + state == GRPC_CHANNEL_FATAL_FAILURE) && + chand->lb_policy != NULL) { + /* cancel fail-fast picks */ + grpc_lb_policy_cancel_picks( + exec_ctx, chand->lb_policy, + /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, + /* check= */ 0); + } + grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, reason); +} + +static void on_lb_policy_state_changed_locked( + grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) { + grpc_connectivity_state publish_state = w->state; + /* check if the notification is for a stale policy */ + if (w->lb_policy != w->chand->lb_policy) return; + + if (publish_state == GRPC_CHANNEL_FATAL_FAILURE && + w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; + } + set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + "lb_changed"); + if (w->state != GRPC_CHANNEL_FATAL_FAILURE) { + watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + } +} + +static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, + bool iomgr_success) { + lb_policy_connectivity_watcher *w = arg; + + gpr_mu_lock(&w->chand->mu_config); + on_lb_policy_state_changed_locked(exec_ctx, w); + gpr_mu_unlock(&w->chand->mu_config); + + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); + gpr_free(w); +} + +static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state) { + lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); + + w->chand = chand; + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w); + w->state = current_state; + w->lb_policy = lb_policy; + grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, + &w->on_changed); +} + +static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, + bool iomgr_success) { + channel_data *chand = arg; + grpc_lb_policy *lb_policy = NULL; + grpc_lb_policy *old_lb_policy; + grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; + int exit_idle = 0; + + if (chand->incoming_configuration != NULL) { + lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration); + if (lb_policy != NULL) { + GRPC_LB_POLICY_REF(lb_policy, "channel"); + GRPC_LB_POLICY_REF(lb_policy, "config_change"); + state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy); + } + + grpc_client_config_unref(exec_ctx, chand->incoming_configuration); + } + + chand->incoming_configuration = NULL; + + if (lb_policy != NULL) { + grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, + chand->interested_parties); + } + + gpr_mu_lock(&chand->mu_config); + old_lb_policy = chand->lb_policy; + chand->lb_policy = lb_policy; + if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); + } + if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { + GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); + exit_idle = 1; + chand->exit_idle_when_lb_policy_arrives = 0; + } + + if (iomgr_success && chand->resolver) { + set_channel_connectivity_state_locked(exec_ctx, chand, state, + "new_lb+resolver"); + if (lb_policy != NULL) { + watch_lb_policy(exec_ctx, chand, lb_policy, state); + } + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + gpr_mu_unlock(&chand->mu_config); + } else { + if (chand->resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + chand->resolver = NULL; + } + set_channel_connectivity_state_locked( + exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); + gpr_mu_unlock(&chand->mu_config); + } + + if (exit_idle) { + grpc_lb_policy_exit_idle(exec_ctx, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); + } + + if (old_lb_policy != NULL) { + grpc_pollset_set_del_pollset_set( + exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); + GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); + } + + if (lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); + } + + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); +} + +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + + grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); + + GPR_ASSERT(op->set_accept_stream == false); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, + op->bind_pollset); + } + + gpr_mu_lock(&chand->mu_config); + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, op->connectivity_state, + op->on_connectivity_state_change); + op->on_connectivity_state_change = NULL; + op->connectivity_state = NULL; + } + + if (op->send_ping != NULL) { + if (chand->lb_policy == NULL) { + grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL); + } else { + grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); + op->bind_pollset = NULL; + } + op->send_ping = NULL; + } + + if (op->disconnect && chand->resolver != NULL) { + set_channel_connectivity_state_locked( + exec_ctx, chand, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + chand->resolver = NULL; + if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, + chand->lb_policy->interested_parties, + chand->interested_parties); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + chand->lb_policy = NULL; + } + } + gpr_mu_unlock(&chand->mu_config); +} + +typedef struct { + grpc_metadata_batch *initial_metadata; + uint32_t initial_metadata_flags; + grpc_connected_subchannel **connected_subchannel; + grpc_closure *on_ready; + grpc_call_element *elem; + grpc_closure closure; +} continue_picking_args; + +static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, + grpc_closure *on_ready); + +static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + continue_picking_args *cpa = arg; + if (!success) { + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); + } else if (cpa->connected_subchannel == NULL) { + /* cancelled, do nothing */ + } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, cpa->on_ready)) { + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL); + } + gpr_free(cpa); +} + +static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, + grpc_closure *on_ready) { + grpc_call_element *elem = elemp; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + continue_picking_args *cpa; + grpc_closure *closure; + + GPR_ASSERT(connected_subchannel); + + gpr_mu_lock(&chand->mu_config); + if (initial_metadata == NULL) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, + connected_subchannel); + } + for (closure = chand->waiting_for_config_closures.head; closure != NULL; + closure = grpc_closure_next(closure)) { + cpa = closure->cb_arg; + if (cpa->connected_subchannel == connected_subchannel) { + cpa->connected_subchannel = NULL; + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); + } + } + gpr_mu_unlock(&chand->mu_config); + return 1; + } + if (chand->lb_policy != NULL) { + grpc_lb_policy *lb_policy = chand->lb_policy; + int r; + GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel"); + gpr_mu_unlock(&chand->mu_config); + r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset, + initial_metadata, initial_metadata_flags, + connected_subchannel, on_ready); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel"); + return r; + } + if (chand->resolver != NULL && !chand->started_resolving) { + chand->started_resolving = 1; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } + cpa = gpr_malloc(sizeof(*cpa)); + cpa->initial_metadata = initial_metadata; + cpa->initial_metadata_flags = initial_metadata_flags; + cpa->connected_subchannel = connected_subchannel; + cpa->on_ready = on_ready; + cpa->elem = elem; + grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1); + gpr_mu_unlock(&chand->mu_config); + return 0; +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args) { + grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem, + args->call_stack); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + channel_data *chand = elem->channel_data; + + memset(chand, 0, sizeof(*chand)); + + GPR_ASSERT(args->is_last); + GPR_ASSERT(elem->filter == &grpc_client_channel_filter); + + gpr_mu_init(&chand->mu_config); + grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); + chand->owning_stack = args->channel_stack; + + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, + "client_channel"); + chand->interested_parties = grpc_pollset_set_create(); +} + +/* Destructor for channel_data */ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + + if (chand->resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + } + if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, + chand->lb_policy->interested_parties, + chand->interested_parties); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); + } + grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); + grpc_pollset_set_destroy(chand->interested_parties); + gpr_mu_destroy(&chand->mu_config); +} + +static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_pollset *pollset) { + call_data *calld = elem->call_data; + calld->pollset = pollset; +} + +const grpc_channel_filter grpc_client_channel_filter = { + cc_start_transport_stream_op, + cc_start_transport_op, + sizeof(call_data), + init_call_elem, + cc_set_pollset, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + cc_get_peer, + "client-channel", +}; + +void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + grpc_resolver *resolver) { + /* post construction initialization: set the transport setup pointer */ + grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu_config); + GPR_ASSERT(!chand->resolver); + chand->resolver = resolver; + GRPC_RESOLVER_REF(resolver, "channel"); + if (!grpc_closure_list_empty(chand->waiting_for_config_closures) || + chand->exit_idle_when_lb_policy_arrives) { + chand->started_resolving = 1; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, + &chand->on_config_changed); + } + gpr_mu_unlock(&chand->mu_config); +} + +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { + channel_data *chand = elem->channel_data; + grpc_connectivity_state out; + gpr_mu_lock(&chand->mu_config); + out = grpc_connectivity_state_check(&chand->state_tracker); + if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = 1; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + chand->started_resolving = 1; + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, + &chand->on_config_changed); + } + } + } + gpr_mu_unlock(&chand->mu_config); + return out; +} + +typedef struct { + channel_data *chand; + grpc_pollset *pollset; + grpc_closure *on_complete; + grpc_closure my_closure; +} external_connectivity_watcher; + +static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, + bool iomgr_success) { + external_connectivity_watcher *w = arg; + grpc_closure *follow_up = w->on_complete; + grpc_pollset_set_del_pollset(exec_ctx, w->chand->interested_parties, + w->pollset); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, + "external_connectivity_watcher"); + gpr_free(w); + follow_up->cb(exec_ctx, follow_up->cb_arg, iomgr_success); +} + +void grpc_client_channel_watch_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, + grpc_connectivity_state *state, grpc_closure *on_complete) { + channel_data *chand = elem->channel_data; + external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); + w->chand = chand; + w->pollset = pollset; + w->on_complete = on_complete; + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); + grpc_closure_init(&w->my_closure, on_external_watch_complete, w); + GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, + "external_connectivity_watcher"); + gpr_mu_lock(&chand->mu_config); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &chand->state_tracker, state, &w->my_closure); + gpr_mu_unlock(&chand->mu_config); +} diff --git a/src/core/lib/channel/client_channel.h b/src/core/lib/channel/client_channel.h new file mode 100644 index 0000000000..8777796fb6 --- /dev/null +++ b/src/core/lib/channel/client_channel.h @@ -0,0 +1,63 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H +#define GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H + +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/client_config/resolver.h" + +/* A client channel is a channel that begins disconnected, and can connect + to some endpoint on demand. If that endpoint disconnects, it will be + connected to again later. + + Calls on a disconnected client channel are queued until a connection is + established. */ + +extern const grpc_channel_filter grpc_client_channel_filter; + +/* post-construction initializer to let the client channel know which + transport setup it should cancel upon destruction, or initiate when it needs + a connection */ +void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + grpc_resolver *resolver); + +grpc_connectivity_state grpc_client_channel_check_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect); + +void grpc_client_channel_watch_connectivity_state( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, + grpc_connectivity_state *state, grpc_closure *on_complete); + +#endif /* GRPC_CORE_LIB_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c new file mode 100644 index 0000000000..04bb7cc76f --- /dev/null +++ b/src/core/lib/channel/compress_filter.c @@ -0,0 +1,304 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <assert.h> +#include <string.h> + +#include <grpc/compression.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/compress_filter.h" +#include "src/core/lib/compression/algorithm_metadata.h" +#include "src/core/lib/compression/message_compress.h" +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/transport/static_metadata.h" + +typedef struct call_data { + gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */ + grpc_linked_mdelem compression_algorithm_storage; + grpc_linked_mdelem accept_encoding_storage; + uint32_t remaining_slice_bytes; + /** Compression algorithm we'll try to use. It may be given by incoming + * metadata, or by the channel's default compression settings. */ + grpc_compression_algorithm compression_algorithm; + /** If true, contents of \a compression_algorithm are authoritative */ + int has_compression_algorithm; + + grpc_transport_stream_op send_op; + uint32_t send_length; + uint32_t send_flags; + gpr_slice incoming_slice; + grpc_slice_buffer_stream replacement_stream; + grpc_closure *post_send; + grpc_closure send_done; + grpc_closure got_slice; +} call_data; + +typedef struct channel_data { + /** The default, channel-level, compression algorithm */ + grpc_compression_algorithm default_compression_algorithm; + /** Compression options for the channel */ + grpc_compression_options compression_options; + /** Supported compression algorithms */ + uint32_t supported_compression_algorithms; +} channel_data; + +/** For each \a md element from the incoming metadata, filter out the entry for + * "grpc-encoding", using its value to populate the call data's + * compression_algorithm field. */ +static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + + if (md->key == GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST) { + const char *md_c_str = grpc_mdstr_as_c_string(md->value); + if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str), + &calld->compression_algorithm)) { + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (unknown). Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, calld->compression_algorithm) == + 0) { + gpr_log(GPR_ERROR, + "Invalid compression algorithm: '%s' (previously disabled). " + "Ignoring.", + md_c_str); + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } + calld->has_compression_algorithm = 1; + return NULL; + } + + return md; +} + +static int skip_compression(grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + if (calld->has_compression_algorithm) { + if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { + return 1; + } + return 0; /* we have an actual call-specific algorithm */ + } + /* no per-call compression override */ + return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE; +} + +/** Filter initial metadata */ +static void process_send_initial_metadata( + grpc_call_element *elem, grpc_metadata_batch *initial_metadata) { + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + /* Parse incoming request for compression. If any, it'll be available + * at calld->compression_algorithm */ + grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem); + if (!calld->has_compression_algorithm) { + /* If no algorithm was found in the metadata and we aren't + * exceptionally skipping compression, fall back to the channel + * default */ + calld->compression_algorithm = channeld->default_compression_algorithm; + calld->has_compression_algorithm = 1; /* GPR_TRUE */ + } + /* hint compression algorithm */ + grpc_metadata_batch_add_tail( + initial_metadata, &calld->compression_algorithm_storage, + grpc_compression_encoding_mdelem(calld->compression_algorithm)); + + /* convey supported compression algorithms */ + grpc_metadata_batch_add_tail(initial_metadata, + &calld->accept_encoding_storage, + GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( + channeld->supported_compression_algorithms)); +} + +static void continue_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem); + +static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { + grpc_call_element *elem = elemp; + call_data *calld = elem->call_data; + gpr_slice_buffer_reset_and_unref(&calld->slices); + calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success); +} + +static void finish_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = elem->call_data; + int did_compress; + gpr_slice_buffer tmp; + gpr_slice_buffer_init(&tmp); + did_compress = + grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp); + if (did_compress) { + gpr_slice_buffer_swap(&calld->slices, &tmp); + calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } + gpr_slice_buffer_destroy(&tmp); + + grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, + calld->send_flags); + calld->send_op.send_message = &calld->replacement_stream.base; + calld->post_send = calld->send_op.on_complete; + calld->send_op.on_complete = &calld->send_done; + + grpc_call_next_op(exec_ctx, elem, &calld->send_op); +} + +static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) { + grpc_call_element *elem = elemp; + call_data *calld = elem->call_data; + gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + if (calld->send_length == calld->slices.length) { + finish_send_message(exec_ctx, elem); + } else { + continue_send_message(exec_ctx, elem); + } +} + +static void continue_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = elem->call_data; + while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message, + &calld->incoming_slice, ~(size_t)0, + &calld->got_slice)) { + gpr_slice_buffer_add(&calld->slices, calld->incoming_slice); + if (calld->send_length == calld->slices.length) { + finish_send_message(exec_ctx, elem); + break; + } + } +} + +static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + + GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0); + + if (op->send_initial_metadata) { + process_send_initial_metadata(elem, op->send_initial_metadata); + } + if (op->send_message != NULL && !skip_compression(elem) && + 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) { + calld->send_op = *op; + calld->send_length = op->send_message->length; + calld->send_flags = op->send_message->flags; + continue_send_message(exec_ctx, elem); + } else { + /* pass control down the stack */ + grpc_call_next_op(exec_ctx, elem, op); + } + + GPR_TIMER_END("compress_start_transport_stream_op", 0); +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + + /* initialize members */ + gpr_slice_buffer_init(&calld->slices); + calld->has_compression_algorithm = 0; + grpc_closure_init(&calld->got_slice, got_slice, elem); + grpc_closure_init(&calld->send_done, send_done, elem); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + gpr_slice_buffer_destroy(&calld->slices); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + channel_data *channeld = elem->channel_data; + grpc_compression_algorithm algo_idx; + + grpc_compression_options_init(&channeld->compression_options); + channeld->compression_options.enabled_algorithms_bitset = + (uint32_t)grpc_channel_args_compression_algorithm_get_states( + args->channel_args); + + channeld->default_compression_algorithm = + grpc_channel_args_get_compression_algorithm(args->channel_args); + /* Make sure the default isn't disabled. */ + GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, channeld->default_compression_algorithm)); + channeld->compression_options.default_compression_algorithm = + channeld->default_compression_algorithm; + + channeld->supported_compression_algorithms = 0; + for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { + /* skip disabled algorithms */ + if (grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, algo_idx) == 0) { + continue; + } + channeld->supported_compression_algorithms |= 1u << algo_idx; + } + + GPR_ASSERT(!args->is_last); +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) {} + +const grpc_channel_filter grpc_compress_filter = { + compress_start_transport_stream_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "compress"}; diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h new file mode 100644 index 0000000000..9010074335 --- /dev/null +++ b/src/core/lib/channel/compress_filter.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H +#define GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request" + +/** Compression filter for outgoing data. + * + * See <grpc/compression.h> for the available compression settings. + * + * Compression settings may come from: + * - Channel configuration, as established at channel creation time. + * - The metadata accompanying the outgoing data to be compressed. This is + * taken as a request only. We may choose not to honor it. The metadata key + * is given by \a GRPC_COMPRESS_REQUEST_ALGORITHM_KEY. + * + * Compression can be disabled for concrete messages (for instance in order to + * prevent CRIME/BEAST type attacks) by having the GRPC_WRITE_NO_COMPRESS set in + * the BEGIN_MESSAGE flags. + * + * The attempted compression mechanism is added to the resulting initial + * metadata under the'grpc-encoding' key. + * + * If compression is actually performed, BEGIN_MESSAGE's flag is modified to + * incorporate GRPC_WRITE_INTERNAL_COMPRESS. Otherwise, and regardless of the + * aforementioned 'grpc-encoding' metadata value, data will pass through + * uncompressed. */ + +extern const grpc_channel_filter grpc_compress_filter; + +#endif /* GRPC_CORE_LIB_CHANNEL_COMPRESS_FILTER_H */ diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c new file mode 100644 index 0000000000..5e3a8974ce --- /dev/null +++ b/src/core/lib/channel/connected_channel.c @@ -0,0 +1,176 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/connected_channel.h" + +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +#include <grpc/byte_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/transport/transport.h" + +#define MAX_BUFFER_LENGTH 8192 + +typedef struct connected_channel_channel_data { + grpc_transport *transport; +} channel_data; + +typedef struct connected_channel_call_data { void *unused; } call_data; + +/* We perform a small hack to locate transport data alongside the connected + channel data in call allocations, to allow everything to be pulled in minimal + cache line requests */ +#define TRANSPORT_STREAM_FROM_CALL_DATA(calld) ((grpc_stream *)((calld) + 1)) +#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ + (((call_data *)(transport_stream)) - 1) + +/* Intercept a call operation and either push it directly up or translate it + into transport stream operations */ +static void con_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + + grpc_transport_perform_stream_op(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); +} + +static void con_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + grpc_transport_perform_op(exec_ctx, chand->transport, op); +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + int r; + + r = grpc_transport_init_stream( + exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), + &args->call_stack->refcount, args->server_transport_data); + GPR_ASSERT(r == 0); +} + +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_pollset *pollset) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_transport_set_pollset(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollset); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + grpc_transport_destroy_stream(exec_ctx, chand->transport, + TRANSPORT_STREAM_FROM_CALL_DATA(calld)); +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + channel_data *cd = (channel_data *)elem->channel_data; + GPR_ASSERT(args->is_last); + cd->transport = NULL; +} + +/* Destructor for channel_data */ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + channel_data *cd = (channel_data *)elem->channel_data; + grpc_transport_destroy(exec_ctx, cd->transport); +} + +static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + return grpc_transport_get_peer(exec_ctx, chand->transport); +} + +static const grpc_channel_filter connected_channel_filter = { + con_start_transport_stream_op, + con_start_transport_op, + sizeof(call_data), + init_call_elem, + set_pollset, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + con_get_peer, + "connected", +}; + +static void bind_transport(grpc_channel_stack *channel_stack, + grpc_channel_element *elem, void *t) { + channel_data *cd = (channel_data *)elem->channel_data; + GPR_ASSERT(elem->filter == &connected_channel_filter); + GPR_ASSERT(cd->transport == NULL); + cd->transport = t; + + /* HACK(ctiller): increase call stack size for the channel to make space + for channel data. We need a cleaner (but performant) way to do this, + and I'm not sure what that is yet. + This is only "safe" because call stacks place no additional data after + the last call element, and the last call element MUST be the connected + channel. */ + channel_stack->call_stack_size += grpc_transport_stream_size(t); +} + +bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, + void *arg_must_be_null) { + GPR_ASSERT(arg_must_be_null == NULL); + grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); + GPR_ASSERT(t != NULL); + return grpc_channel_stack_builder_append_filter( + builder, &connected_channel_filter, bind_transport, t); +} + +grpc_stream *grpc_connected_channel_get_stream(grpc_call_element *elem) { + call_data *calld = elem->call_data; + return TRANSPORT_STREAM_FROM_CALL_DATA(calld); +} diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h new file mode 100644 index 0000000000..4f20b751cc --- /dev/null +++ b/src/core/lib/channel/connected_channel.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H +#define GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H + +#include "src/core/lib/channel/channel_stack_builder.h" + +bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, + void *arg_must_be_null); + +#endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h new file mode 100644 index 0000000000..bca102da9a --- /dev/null +++ b/src/core/lib/channel/context.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_CONTEXT_H +#define GRPC_CORE_LIB_CHANNEL_CONTEXT_H + +/* Call object context pointers */ +typedef enum { + GRPC_CONTEXT_SECURITY = 0, + GRPC_CONTEXT_TRACING, + GRPC_CONTEXT_COUNT +} grpc_context_index; + +typedef struct { + void *value; + void (*destroy)(void *); +} grpc_call_context_element; + +#endif /* GRPC_CORE_LIB_CHANNEL_CONTEXT_H */ diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c new file mode 100644 index 0000000000..129bddcbab --- /dev/null +++ b/src/core/lib/channel/http_client_filter.c @@ -0,0 +1,259 @@ +/* + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/http_client_filter.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <string.h> +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/transport/static_metadata.h" + +typedef struct call_data { + grpc_linked_mdelem method; + grpc_linked_mdelem scheme; + grpc_linked_mdelem authority; + grpc_linked_mdelem te_trailers; + grpc_linked_mdelem content_type; + grpc_linked_mdelem user_agent; + + grpc_metadata_batch *recv_initial_metadata; + + /** Closure to call when finished with the hc_on_recv hook */ + grpc_closure *on_done_recv; + /** Receive closures are chained: we inject this closure as the on_done_recv + up-call on transport_op, and remember to call our on_done_recv member + after handling it. */ + grpc_closure hc_on_recv; +} call_data; + +typedef struct channel_data { + grpc_mdelem *static_scheme; + grpc_mdelem *user_agent; +} channel_data; + +typedef struct { + grpc_call_element *elem; + grpc_exec_ctx *exec_ctx; +} client_recv_filter_args; + +static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { + client_recv_filter_args *a = user_data; + if (md == GRPC_MDELEM_STATUS_200) { + return NULL; + } else if (md->key == GRPC_MDSTR_STATUS) { + grpc_call_element_send_cancel(a->exec_ctx, a->elem); + return NULL; + } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) { + return NULL; + } + return md; +} + +static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + client_recv_filter_args a; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(calld->recv_initial_metadata, client_recv_filter, + &a); + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); +} + +static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { + /* eat the things we'd like to set ourselves */ + if (md->key == GRPC_MDSTR_METHOD) return NULL; + if (md->key == GRPC_MDSTR_SCHEME) return NULL; + if (md->key == GRPC_MDSTR_TE) return NULL; + if (md->key == GRPC_MDSTR_CONTENT_TYPE) return NULL; + if (md->key == GRPC_MDSTR_USER_AGENT) return NULL; + return md; +} + +static void hc_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + channel_data *channeld = elem->channel_data; + if (op->send_initial_metadata != NULL) { + grpc_metadata_batch_filter(op->send_initial_metadata, client_strip_filter, + elem); + /* Send : prefixed headers, which have to be before any application + layer headers. */ + grpc_metadata_batch_add_head( + op->send_initial_metadata, &calld->method, + op->send_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST + ? GRPC_MDELEM_METHOD_PUT + : GRPC_MDELEM_METHOD_POST); + grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->scheme, + channeld->static_scheme); + grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->te_trailers, + GRPC_MDELEM_TE_TRAILERS); + grpc_metadata_batch_add_tail( + op->send_initial_metadata, &calld->content_type, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); + grpc_metadata_batch_add_tail(op->send_initial_metadata, &calld->user_agent, + GRPC_MDELEM_REF(channeld->user_agent)); + } + + if (op->recv_initial_metadata != NULL) { + /* substitute our callback for the higher callback */ + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hc_on_recv; + } +} + +static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("hc_start_transport_op", 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + hc_mutate_op(elem, op); + GPR_TIMER_END("hc_start_transport_op", 0); + grpc_call_next_op(exec_ctx, elem, op); +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args) { + call_data *calld = elem->call_data; + calld->on_done_recv = NULL; + grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) {} + +static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { + unsigned i; + size_t j; + grpc_mdelem *valid_schemes[] = {GRPC_MDELEM_SCHEME_HTTP, + GRPC_MDELEM_SCHEME_HTTPS}; + if (args != NULL) { + for (i = 0; i < args->num_args; ++i) { + if (args->args[i].type == GRPC_ARG_STRING && + strcmp(args->args[i].key, GRPC_ARG_HTTP2_SCHEME) == 0) { + for (j = 0; j < GPR_ARRAY_SIZE(valid_schemes); j++) { + if (0 == strcmp(grpc_mdstr_as_c_string(valid_schemes[j]->value), + args->args[i].value.string)) { + return valid_schemes[j]; + } + } + } + } + } + return GRPC_MDELEM_SCHEME_HTTP; +} + +static grpc_mdstr *user_agent_from_args(const grpc_channel_args *args) { + gpr_strvec v; + size_t i; + int is_first = 1; + char *tmp; + grpc_mdstr *result; + + gpr_strvec_init(&v); + + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_PRIMARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_PRIMARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } + } + } + + gpr_asprintf(&tmp, "%sgrpc-c/%s (%s)", is_first ? "" : " ", + grpc_version_string(), GPR_PLATFORM_STRING); + is_first = 0; + gpr_strvec_add(&v, tmp); + + for (i = 0; args && i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_SECONDARY_USER_AGENT_STRING)) { + if (args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "Channel argument '%s' should be a string", + GRPC_ARG_SECONDARY_USER_AGENT_STRING); + } else { + if (!is_first) gpr_strvec_add(&v, gpr_strdup(" ")); + is_first = 0; + gpr_strvec_add(&v, gpr_strdup(args->args[i].value.string)); + } + } + } + + tmp = gpr_strvec_flatten(&v, NULL); + gpr_strvec_destroy(&v); + result = grpc_mdstr_from_string(tmp); + gpr_free(tmp); + + return result; +} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + channel_data *chand = elem->channel_data; + GPR_ASSERT(!args->is_last); + chand->static_scheme = scheme_from_args(args->channel_args); + chand->user_agent = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_USER_AGENT, user_agent_from_args(args->channel_args)); +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + channel_data *chand = elem->channel_data; + GRPC_MDELEM_UNREF(chand->user_agent); +} + +const grpc_channel_filter grpc_http_client_filter = { + hc_start_transport_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "http-client"}; diff --git a/src/core/lib/channel/http_client_filter.h b/src/core/lib/channel/http_client_filter.h new file mode 100644 index 0000000000..418426e9cc --- /dev/null +++ b/src/core/lib/channel/http_client_filter.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H +#define GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +/* Processes metadata on the client side for HTTP2 transports */ +extern const grpc_channel_filter grpc_http_client_filter; + +#define GRPC_ARG_HTTP2_SCHEME "grpc.http2_scheme" + +#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_CLIENT_FILTER_H */ diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c new file mode 100644 index 0000000000..db1a3d5010 --- /dev/null +++ b/src/core/lib/channel/http_server_filter.c @@ -0,0 +1,248 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/http_server_filter.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <string.h> +#include "src/core/lib/profiling/timers.h" +#include "src/core/lib/transport/static_metadata.h" + +typedef struct call_data { + uint8_t seen_path; + uint8_t seen_method; + uint8_t sent_status; + uint8_t seen_scheme; + uint8_t seen_te_trailers; + uint8_t seen_authority; + grpc_linked_mdelem status; + grpc_linked_mdelem content_type; + + grpc_metadata_batch *recv_initial_metadata; + bool *recv_idempotent_request; + /** Closure to call when finished with the hs_on_recv hook */ + grpc_closure *on_done_recv; + /** Receive closures are chained: we inject this closure as the on_done_recv + up-call on transport_op, and remember to call our on_done_recv member + after handling it. */ + grpc_closure hs_on_recv; +} call_data; + +typedef struct channel_data { uint8_t unused; } channel_data; + +typedef struct { + grpc_call_element *elem; + grpc_exec_ctx *exec_ctx; +} server_filter_args; + +static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { + server_filter_args *a = user_data; + grpc_call_element *elem = a->elem; + call_data *calld = elem->call_data; + + /* Check if it is one of the headers we care about. */ + if (md == GRPC_MDELEM_TE_TRAILERS || md == GRPC_MDELEM_METHOD_POST || + md == GRPC_MDELEM_METHOD_PUT || md == GRPC_MDELEM_SCHEME_HTTP || + md == GRPC_MDELEM_SCHEME_HTTPS || + md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) { + /* swallow it */ + if (md == GRPC_MDELEM_METHOD_POST) { + calld->seen_method = 1; + *calld->recv_idempotent_request = false; + } else if (md == GRPC_MDELEM_METHOD_PUT) { + calld->seen_method = 1; + *calld->recv_idempotent_request = true; + } else if (md->key == GRPC_MDSTR_SCHEME) { + calld->seen_scheme = 1; + } else if (md == GRPC_MDELEM_TE_TRAILERS) { + calld->seen_te_trailers = 1; + } + /* TODO(klempner): Track that we've seen all the headers we should + require */ + return NULL; + } else if (md->key == GRPC_MDSTR_CONTENT_TYPE) { + if (strncmp(grpc_mdstr_as_c_string(md->value), "application/grpc+", 17) == + 0) { + /* Although the C implementation doesn't (currently) generate them, + any custom +-suffix is explicitly valid. */ + /* TODO(klempner): We should consider preallocating common values such + as +proto or +json, or at least stashing them if we see them. */ + /* TODO(klempner): Should we be surfacing this to application code? */ + } else { + /* TODO(klempner): We're currently allowing this, but we shouldn't + see it without a proxy so log for now. */ + gpr_log(GPR_INFO, "Unexpected content-type %s", + grpc_mdstr_as_c_string(md->value)); + } + return NULL; + } else if (md->key == GRPC_MDSTR_TE || md->key == GRPC_MDSTR_METHOD || + md->key == GRPC_MDSTR_SCHEME) { + gpr_log(GPR_ERROR, "Invalid %s: header: '%s'", + grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)); + /* swallow it and error everything out. */ + /* TODO(klempner): We ought to generate more descriptive error messages + on the wire here. */ + grpc_call_element_send_cancel(a->exec_ctx, elem); + return NULL; + } else if (md->key == GRPC_MDSTR_PATH) { + if (calld->seen_path) { + gpr_log(GPR_ERROR, "Received :path twice"); + return NULL; + } + calld->seen_path = 1; + return md; + } else if (md->key == GRPC_MDSTR_AUTHORITY) { + calld->seen_authority = 1; + return md; + } else if (md->key == GRPC_MDSTR_HOST) { + /* translate host to :authority since :authority may be + omitted */ + grpc_mdelem *authority = grpc_mdelem_from_metadata_strings( + GRPC_MDSTR_AUTHORITY, GRPC_MDSTR_REF(md->value)); + calld->seen_authority = 1; + return authority; + } else { + return md; + } +} + +static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + if (success) { + server_filter_args a; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, &a); + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_method && calld->seen_scheme && calld->seen_te_trailers && + calld->seen_path && calld->seen_authority) { + /* do nothing */ + } else { + if (!calld->seen_path) { + gpr_log(GPR_ERROR, "Missing :path header"); + } + if (!calld->seen_authority) { + gpr_log(GPR_ERROR, "Missing :authority header"); + } + if (!calld->seen_method) { + gpr_log(GPR_ERROR, "Missing :method header"); + } + if (!calld->seen_scheme) { + gpr_log(GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) { + gpr_log(GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + success = 0; + grpc_call_element_send_cancel(exec_ctx, elem); + } + } + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); +} + +static void hs_mutate_op(grpc_call_element *elem, + grpc_transport_stream_op *op) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + + if (op->send_initial_metadata != NULL && !calld->sent_status) { + calld->sent_status = 1; + grpc_metadata_batch_add_head(op->send_initial_metadata, &calld->status, + GRPC_MDELEM_STATUS_200); + grpc_metadata_batch_add_tail( + op->send_initial_metadata, &calld->content_type, + GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); + } + + if (op->recv_initial_metadata) { + /* substitute our callback for the higher callback */ + GPR_ASSERT(op->recv_idempotent_request != NULL); + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->recv_idempotent_request = op->recv_idempotent_request; + calld->on_done_recv = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->hs_on_recv; + } +} + +static void hs_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + GPR_TIMER_BEGIN("hs_start_transport_op", 0); + hs_mutate_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); + GPR_TIMER_END("hs_start_transport_op", 0); +} + +/* Constructor for call_data */ +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args) { + /* grab pointers to our data from the call element */ + call_data *calld = elem->call_data; + /* initialize members */ + memset(calld, 0, sizeof(*calld)); + grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); +} + +/* Destructor for call_data */ +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) {} + +/* Constructor for channel_data */ +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + GPR_ASSERT(!args->is_last); +} + +/* Destructor for channel data */ +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) {} + +const grpc_channel_filter grpc_http_server_filter = { + hs_start_transport_op, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + "http-server"}; diff --git a/src/core/lib/channel/http_server_filter.h b/src/core/lib/channel/http_server_filter.h new file mode 100644 index 0000000000..c8cf920ded --- /dev/null +++ b/src/core/lib/channel/http_server_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H +#define GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +/* Processes metadata on the client side for HTTP2 transports */ +extern const grpc_channel_filter grpc_http_server_filter; + +#endif /* GRPC_CORE_LIB_CHANNEL_HTTP_SERVER_FILTER_H */ diff --git a/src/core/lib/channel/subchannel_call_holder.c b/src/core/lib/channel/subchannel_call_holder.c new file mode 100644 index 0000000000..d7f71c0767 --- /dev/null +++ b/src/core/lib/channel/subchannel_call_holder.c @@ -0,0 +1,260 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/channel/subchannel_call_holder.h" + +#include <grpc/support/alloc.h> + +#include "src/core/lib/profiling/timers.h" + +#define GET_CALL(holder) \ + ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call))) + +#define CANCELLED_CALL ((grpc_subchannel_call *)1) + +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder, + bool success); +static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args, + bool success); + +static void add_waiting_locked(grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op); +static void fail_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); +static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +void grpc_subchannel_call_holder_init( + grpc_subchannel_call_holder *holder, + grpc_subchannel_call_holder_pick_subchannel pick_subchannel, + void *pick_subchannel_arg, grpc_call_stack *owning_call) { + gpr_atm_rel_store(&holder->subchannel_call, 0); + holder->pick_subchannel = pick_subchannel; + holder->pick_subchannel_arg = pick_subchannel_arg; + gpr_mu_init(&holder->mu); + holder->connected_subchannel = NULL; + holder->waiting_ops = NULL; + holder->waiting_ops_count = 0; + holder->waiting_ops_capacity = 0; + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + holder->owning_call = owning_call; +} + +void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + grpc_subchannel_call *call = GET_CALL(holder); + if (call != NULL && call != CANCELLED_CALL) { + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder"); + } + GPR_ASSERT(holder->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); + gpr_mu_destroy(&holder->mu); + GPR_ASSERT(holder->waiting_ops_count == 0); + gpr_free(holder->waiting_ops); +} + +void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op) { + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(holder); + GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + /* we failed; lock and figure out what to do */ + gpr_mu_lock(&holder->mu); +retry: + /* need to recheck that another thread hasn't set the call */ + call = GET_CALL(holder); + if (call == CANCELLED_CALL) { + gpr_mu_unlock(&holder->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + if (call != NULL) { + gpr_mu_unlock(&holder->mu); + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + /* if this is a cancellation, then we can raise our cancelled flag */ + if (op->cancel_with_status != GRPC_STATUS_OK) { + if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) { + goto retry; + } else { + switch (holder->creation_phase) { + case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: + fail_locked(exec_ctx, holder); + break; + case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: + holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL, + 0, &holder->connected_subchannel, NULL); + break; + } + gpr_mu_unlock(&holder->mu); + grpc_transport_stream_op_finish_with_failure(exec_ctx, op); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); + return; + } + } + /* if we don't have a subchannel, try to get one */ + if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + holder->connected_subchannel == NULL && + op->send_initial_metadata != NULL) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; + grpc_closure_init(&holder->next_step, subchannel_ready, holder); + GRPC_CALL_STACK_REF(holder->owning_call, "pick_subchannel"); + if (holder->pick_subchannel( + exec_ctx, holder->pick_subchannel_arg, op->send_initial_metadata, + op->send_initial_metadata_flags, &holder->connected_subchannel, + &holder->next_step)) { + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); + } + } + /* if we've got a subchannel, then let's ask it to create a call */ + if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && + holder->connected_subchannel != NULL) { + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); + goto retry; + } + /* nothing to be done but wait */ + add_waiting_locked(holder, op); + gpr_mu_unlock(&holder->mu); + GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0); +} + +static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + grpc_subchannel_call_holder *holder = arg; + gpr_mu_lock(&holder->mu); + GPR_ASSERT(holder->creation_phase == + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + if (holder->connected_subchannel == NULL) { + fail_locked(exec_ctx, holder); + } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) { + /* already cancelled before subchannel became ready */ + fail_locked(exec_ctx, holder); + } else { + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); + } + gpr_mu_unlock(&holder->mu); + GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); +} + +typedef struct { + grpc_transport_stream_op *ops; + size_t nops; + grpc_subchannel_call *call; +} retry_ops_args; + +static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + retry_ops_args *a = gpr_malloc(sizeof(*a)); + a->ops = holder->waiting_ops; + a->nops = holder->waiting_ops_count; + a->call = GET_CALL(holder); + if (a->call == CANCELLED_CALL) { + gpr_free(a); + fail_locked(exec_ctx, holder); + return; + } + holder->waiting_ops = NULL; + holder->waiting_ops_count = 0; + holder->waiting_ops_capacity = 0; + GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); + grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true, + NULL); +} + +static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) { + retry_ops_args *a = args; + size_t i; + for (i = 0; i < a->nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]); + } + GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); + gpr_free(a->ops); + gpr_free(a); +} + +static void add_waiting_locked(grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op) { + GPR_TIMER_BEGIN("add_waiting_locked", 0); + if (holder->waiting_ops_count == holder->waiting_ops_capacity) { + holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity); + holder->waiting_ops = + gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity * + sizeof(*holder->waiting_ops)); + } + holder->waiting_ops[holder->waiting_ops_count++] = *op; + GPR_TIMER_END("add_waiting_locked", 0); +} + +static void fail_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder) { + size_t i; + for (i = 0; i < holder->waiting_ops_count; i++) { + grpc_transport_stream_op_finish_with_failure(exec_ctx, + &holder->waiting_ops[i]); + } + holder->waiting_ops_count = 0; +} + +char *grpc_subchannel_call_holder_get_peer( + grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { + grpc_subchannel_call *subchannel_call = GET_CALL(holder); + + if (subchannel_call) { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); + } else { + return NULL; + } +} diff --git a/src/core/lib/channel/subchannel_call_holder.h b/src/core/lib/channel/subchannel_call_holder.h new file mode 100644 index 0000000000..9b10786a71 --- /dev/null +++ b/src/core/lib/channel/subchannel_call_holder.h @@ -0,0 +1,98 @@ +/* + * + * Copyright 2015-2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H +#define GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H + +#include "src/core/lib/client_config/subchannel.h" + +/** Pick a subchannel for grpc_subchannel_call_holder; + Return 1 if subchannel is available immediately (in which case on_ready + should not be called), or 0 otherwise (in which case on_ready should be + called when the subchannel is available) */ +typedef int (*grpc_subchannel_call_holder_pick_subchannel)( + grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready); + +typedef enum { + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, + GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL +} grpc_subchannel_call_holder_creation_phase; + +/** Wrapper for holding a pointer to grpc_subchannel_call, and the + associated machinery to create such a pointer. + Handles queueing of stream ops until a call object is ready, waiting + for initial metadata before trying to create a call object, + and handling cancellation gracefully. + + The channel filter uses this as their call_data. */ +typedef struct grpc_subchannel_call_holder { + /** either 0 for no call, 1 for cancelled, or a pointer to a + grpc_subchannel_call */ + gpr_atm subchannel_call; + /** Helper function to choose the subchannel on which to create + the call object. Channel filter delegates to the load + balancing policy (once it's ready). */ + grpc_subchannel_call_holder_pick_subchannel pick_subchannel; + void *pick_subchannel_arg; + + gpr_mu mu; + + grpc_subchannel_call_holder_creation_phase creation_phase; + grpc_connected_subchannel *connected_subchannel; + grpc_pollset *pollset; + + grpc_transport_stream_op *waiting_ops; + size_t waiting_ops_count; + size_t waiting_ops_capacity; + + grpc_closure next_step; + + grpc_call_stack *owning_call; +} grpc_subchannel_call_holder; + +void grpc_subchannel_call_holder_init( + grpc_subchannel_call_holder *holder, + grpc_subchannel_call_holder_pick_subchannel pick_subchannel, + void *pick_subchannel_arg, grpc_call_stack *owning_call); +void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder, + grpc_transport_stream_op *op); +char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx, + grpc_subchannel_call_holder *holder); + +#endif /* GRPC_CORE_LIB_CHANNEL_SUBCHANNEL_CALL_HOLDER_H */ |